Files
app-installer/user_management.py
SimolZimol 1eac702d7d modified: app.py
new file:   file_manager.py
	modified:   requirements.txt
	new file:   sessions_db.json
	modified:   templates/base.html
	new file:   templates/file_manager.html
	new file:   templates/file_manager_new.html
	new file:   templates/login.html
	new file:   templates/profile.html
	new file:   templates/users_management.html
	new file:   user_management.py
	new file:   users_db.json
2025-07-10 00:00:59 +02:00

508 lines
17 KiB
Python

"""
User Management System für App Installer & Manager
Pterodactyl-ähnliches Benutzer- und Rollensystem
"""
import os
import json
import hashlib
import secrets
import time
from datetime import datetime, timedelta
from functools import wraps
from flask import session, request, redirect, url_for, flash, jsonify
# Benutzer-Datenbank Datei
USERS_DB_FILE = 'users_db.json'
SESSIONS_DB_FILE = 'sessions_db.json'
class UserRole:
"""Benutzerrollen-Definitionen"""
ADMIN = 'admin'
MODERATOR = 'moderator'
USER = 'user'
VIEWER = 'viewer'
# Rollen-Hierarchie (höhere Zahl = mehr Rechte)
HIERARCHY = {
VIEWER: 1,
USER: 2,
MODERATOR: 3,
ADMIN: 4
}
@classmethod
def get_all_roles(cls):
return list(cls.HIERARCHY.keys())
@classmethod
def has_permission(cls, user_role, required_role):
"""Prüfe ob Benutzerrolle ausreichende Berechtigung hat"""
user_level = cls.HIERARCHY.get(user_role, 0)
required_level = cls.HIERARCHY.get(required_role, 0)
return user_level >= required_level
class Permission:
"""Berechtigungs-Definitionen"""
# Projekt-Berechtigungen
PROJECT_VIEW = 'project.view'
PROJECT_CREATE = 'project.create'
PROJECT_UPDATE = 'project.update'
PROJECT_DELETE = 'project.delete'
PROJECT_START = 'project.start'
PROJECT_STOP = 'project.stop'
PROJECT_RESTART = 'project.restart'
PROJECT_LOGS = 'project.logs'
PROJECT_CONSOLE = 'project.console'
PROJECT_FILES = 'project.files'
# File-Berechtigungen
FILE_ACCESS = 'file.access'
FILE_WRITE = 'file.write'
# System-Berechtigungen
SYSTEM_CONFIG = 'system.config'
SYSTEM_USERS = 'system.users'
SYSTEM_MONITORING = 'system.monitoring'
SYSTEM_BACKUPS = 'system.backups'
SYSTEM_DOCKER = 'system.docker'
# Rollen-zu-Berechtigungen Mapping
ROLE_PERMISSIONS = {
UserRole.VIEWER: [
PROJECT_VIEW,
PROJECT_LOGS,
FILE_ACCESS
],
UserRole.USER: [
PROJECT_VIEW,
PROJECT_CREATE,
PROJECT_UPDATE,
PROJECT_START,
PROJECT_STOP,
PROJECT_RESTART,
PROJECT_LOGS,
PROJECT_FILES,
FILE_ACCESS,
FILE_WRITE
],
UserRole.MODERATOR: [
PROJECT_VIEW,
PROJECT_CREATE,
PROJECT_UPDATE,
PROJECT_DELETE,
PROJECT_START,
PROJECT_STOP,
PROJECT_RESTART,
PROJECT_LOGS,
PROJECT_CONSOLE,
PROJECT_FILES,
FILE_ACCESS,
FILE_WRITE,
SYSTEM_MONITORING,
SYSTEM_DOCKER
],
UserRole.ADMIN: [
PROJECT_VIEW,
PROJECT_CREATE,
PROJECT_UPDATE,
PROJECT_DELETE,
PROJECT_START,
PROJECT_STOP,
PROJECT_RESTART,
PROJECT_LOGS,
PROJECT_CONSOLE,
PROJECT_FILES,
FILE_ACCESS,
FILE_WRITE,
SYSTEM_CONFIG,
SYSTEM_USERS,
SYSTEM_MONITORING,
SYSTEM_BACKUPS,
SYSTEM_DOCKER
]
}
@classmethod
def user_has_permission(cls, user_role, permission):
"""Prüfe ob Benutzer eine spezifische Berechtigung hat"""
user_permissions = cls.ROLE_PERMISSIONS.get(user_role, [])
return permission in user_permissions
class UserManager:
"""Benutzer-Management System"""
def __init__(self):
self.users_db = self._load_users_db()
self.sessions_db = self._load_sessions_db()
self._ensure_admin_user()
def _load_users_db(self):
"""Lade Benutzer-Datenbank"""
if os.path.exists(USERS_DB_FILE):
try:
with open(USERS_DB_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"Fehler beim Laden der Benutzer-DB: {e}")
# Standard-Struktur
return {
'users': {},
'created_at': datetime.now().isoformat(),
'version': '1.0'
}
def _load_sessions_db(self):
"""Lade Session-Datenbank"""
if os.path.exists(SESSIONS_DB_FILE):
try:
with open(SESSIONS_DB_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"Fehler beim Laden der Session-DB: {e}")
return {
'sessions': {},
'created_at': datetime.now().isoformat()
}
def _save_users_db(self):
"""Speichere Benutzer-Datenbank"""
try:
with open(USERS_DB_FILE, 'w', encoding='utf-8') as f:
json.dump(self.users_db, f, indent=2, ensure_ascii=False)
except Exception as e:
print(f"Fehler beim Speichern der Benutzer-DB: {e}")
def _save_sessions_db(self):
"""Speichere Session-Datenbank"""
try:
with open(SESSIONS_DB_FILE, 'w', encoding='utf-8') as f:
json.dump(self.sessions_db, f, indent=2, ensure_ascii=False)
except Exception as e:
print(f"Fehler beim Speichern der Session-DB: {e}")
def _ensure_admin_user(self):
"""Stelle sicher, dass ein Admin-Benutzer existiert"""
if not self.users_db['users']:
# Erstelle Standard-Admin
admin_password = self._generate_secure_password()
self.create_user(
username='admin',
email='admin@localhost',
password=admin_password,
role=UserRole.ADMIN,
first_name='System',
last_name='Administrator'
)
print(f"🔐 Standard-Admin erstellt:")
print(f" Benutzername: admin")
print(f" Passwort: {admin_password}")
print(f" ⚠️ BITTE PASSWORT SOFORT ÄNDERN!")
def _generate_secure_password(self, length=12):
"""Generiere sicheres Passwort"""
import string
alphabet = string.ascii_letters + string.digits + "!@#$%^&*"
return ''.join(secrets.choice(alphabet) for _ in range(length))
def _hash_password(self, password, salt=None):
"""Hash Passwort mit Salt"""
if salt is None:
salt = secrets.token_hex(32)
# Verwende SHA-256 mit Salt
password_hash = hashlib.sha256((password + salt).encode()).hexdigest()
return password_hash, salt
def _verify_password(self, password, stored_hash, salt):
"""Verifiziere Passwort"""
password_hash, _ = self._hash_password(password, salt)
return password_hash == stored_hash
def create_user(self, username, email, password, role=UserRole.USER,
first_name='', last_name='', enabled=True):
"""Erstelle neuen Benutzer"""
if username in self.users_db['users']:
return False, f"Benutzer '{username}' existiert bereits"
if role not in UserRole.get_all_roles():
return False, f"Ungültige Rolle: {role}"
# Hash Passwort
password_hash, salt = self._hash_password(password)
# Erstelle Benutzer
user_data = {
'username': username,
'email': email,
'password_hash': password_hash,
'password_salt': salt,
'role': role,
'first_name': first_name,
'last_name': last_name,
'enabled': enabled,
'created_at': datetime.now().isoformat(),
'last_login': None,
'login_count': 0,
'projects_access': [], # Spezifische Projekt-Zugriffe
'preferences': {
'theme': 'light',
'language': 'de',
'notifications': True
}
}
self.users_db['users'][username] = user_data
self._save_users_db()
return True, f"Benutzer '{username}' erfolgreich erstellt"
def authenticate_user(self, username, password):
"""Authentifiziere Benutzer"""
if username not in self.users_db['users']:
return False, "Benutzername oder Passwort falsch"
user = self.users_db['users'][username]
# Prüfe ob Benutzer aktiviert ist
if not user.get('enabled', True):
return False, "Benutzer ist deaktiviert"
# Verifiziere Passwort
if not self._verify_password(password, user['password_hash'], user['password_salt']):
return False, "Benutzername oder Passwort falsch"
# Update Login-Statistiken
user['last_login'] = datetime.now().isoformat()
user['login_count'] = user.get('login_count', 0) + 1
self._save_users_db()
return True, user
def create_session(self, username):
"""Erstelle Session für Benutzer"""
session_token = secrets.token_urlsafe(32)
session_data = {
'username': username,
'created_at': datetime.now().isoformat(),
'last_activity': datetime.now().isoformat(),
'ip_address': request.environ.get('REMOTE_ADDR', 'unknown'),
'user_agent': request.environ.get('HTTP_USER_AGENT', 'unknown')
}
self.sessions_db['sessions'][session_token] = session_data
self._save_sessions_db()
# Cleanup alte Sessions (älter als 30 Tage)
self._cleanup_old_sessions()
return session_token
def validate_session(self, session_token):
"""Validiere Session"""
if session_token not in self.sessions_db['sessions']:
return False, None
session_data = self.sessions_db['sessions'][session_token]
# Prüfe Session-Alter (24 Stunden)
created_at = datetime.fromisoformat(session_data['created_at'])
if datetime.now() - created_at > timedelta(hours=24):
self.destroy_session(session_token)
return False, None
# Update letzte Aktivität
session_data['last_activity'] = datetime.now().isoformat()
self._save_sessions_db()
username = session_data['username']
user = self.users_db['users'].get(username)
if not user or not user.get('enabled', True):
self.destroy_session(session_token)
return False, None
return True, user
def destroy_session(self, session_token):
"""Zerstöre Session"""
if session_token in self.sessions_db['sessions']:
del self.sessions_db['sessions'][session_token]
self._save_sessions_db()
def _cleanup_old_sessions(self):
"""Cleanup alte Sessions"""
current_time = datetime.now()
sessions_to_remove = []
for token, session_data in self.sessions_db['sessions'].items():
created_at = datetime.fromisoformat(session_data['created_at'])
if current_time - created_at > timedelta(days=30):
sessions_to_remove.append(token)
for token in sessions_to_remove:
del self.sessions_db['sessions'][token]
if sessions_to_remove:
self._save_sessions_db()
def get_user(self, username):
"""Hole Benutzer-Informationen"""
return self.users_db['users'].get(username)
def get_all_users(self):
"""Hole alle Benutzer"""
return self.users_db['users']
def update_user(self, username, **kwargs):
"""Update Benutzer-Informationen"""
if username not in self.users_db['users']:
return False, f"Benutzer '{username}' nicht gefunden"
user = self.users_db['users'][username]
# Erlaubte Update-Felder
allowed_fields = ['email', 'role', 'first_name', 'last_name', 'enabled', 'projects_access', 'preferences']
for field, value in kwargs.items():
if field in allowed_fields:
user[field] = value
self._save_users_db()
return True, f"Benutzer '{username}' aktualisiert"
def change_password(self, username, old_password, new_password):
"""Ändere Benutzer-Passwort"""
if username not in self.users_db['users']:
return False, "Benutzer nicht gefunden"
user = self.users_db['users'][username]
# Verifiziere altes Passwort
if not self._verify_password(old_password, user['password_hash'], user['password_salt']):
return False, "Altes Passwort ist falsch"
# Setze neues Passwort
password_hash, salt = self._hash_password(new_password)
user['password_hash'] = password_hash
user['password_salt'] = salt
self._save_users_db()
return True, "Passwort erfolgreich geändert"
def delete_user(self, username):
"""Lösche Benutzer"""
if username not in self.users_db['users']:
return False, f"Benutzer '{username}' nicht gefunden"
# Verhindere Löschung des letzten Admins
if self.users_db['users'][username]['role'] == UserRole.ADMIN:
admin_count = sum(1 for user in self.users_db['users'].values()
if user['role'] == UserRole.ADMIN and user.get('enabled', True))
if admin_count <= 1:
return False, "Der letzte Administrator kann nicht gelöscht werden"
del self.users_db['users'][username]
self._save_users_db()
# Entferne alle Sessions des Benutzers
sessions_to_remove = [token for token, session_data in self.sessions_db['sessions'].items()
if session_data['username'] == username]
for token in sessions_to_remove:
del self.sessions_db['sessions'][token]
if sessions_to_remove:
self._save_sessions_db()
return True, f"Benutzer '{username}' gelöscht"
# Globale User Manager Instanz
user_manager = UserManager()
# Decorator für Login-Erfordernis
def login_required(f):
"""Decorator: Erfordert Login"""
@wraps(f)
def decorated_function(*args, **kwargs):
session_token = session.get('session_token')
if not session_token:
flash('Sie müssen sich anmelden.', 'warning')
return redirect(url_for('login'))
valid, user = user_manager.validate_session(session_token)
if not valid:
session.pop('session_token', None)
flash('Ihre Session ist abgelaufen.', 'warning')
return redirect(url_for('login'))
# Füge Benutzer-Info zu Request hinzu
request.current_user = user
return f(*args, **kwargs)
return decorated_function
# Decorator für Rollen-Berechtigung
def role_required(required_role):
"""Decorator: Erfordert spezifische Rolle"""
def decorator(f):
@wraps(f)
@login_required
def decorated_function(*args, **kwargs):
user = request.current_user
if not UserRole.has_permission(user['role'], required_role):
flash(f'Keine Berechtigung. Erforderliche Rolle: {required_role}', 'danger')
return redirect(url_for('index'))
return f(*args, **kwargs)
return decorated_function
return decorator
# Decorator für spezifische Berechtigung
def permission_required(permission):
"""Decorator: Erfordert spezifische Berechtigung"""
def decorator(f):
@wraps(f)
@login_required
def decorated_function(*args, **kwargs):
user = request.current_user
if not Permission.user_has_permission(user['role'], permission):
flash(f'Keine Berechtigung für: {permission}', 'danger')
return redirect(url_for('index'))
return f(*args, **kwargs)
return decorated_function
return decorator
# API Helper für JSON-Antworten
def api_login_required(f):
"""API Decorator: Erfordert Login für API-Calls"""
@wraps(f)
def decorated_function(*args, **kwargs):
session_token = session.get('session_token')
if not session_token:
return jsonify({'success': False, 'error': 'Login erforderlich'}), 401
valid, user = user_manager.validate_session(session_token)
if not valid:
session.pop('session_token', None)
return jsonify({'success': False, 'error': 'Session abgelaufen'}), 401
request.current_user = user
return f(*args, **kwargs)
return decorated_function
def api_permission_required(permission):
"""API Decorator: Erfordert spezifische Berechtigung für API-Calls"""
def decorator(f):
@wraps(f)
@api_login_required
def decorated_function(*args, **kwargs):
user = request.current_user
if not Permission.user_has_permission(user['role'], permission):
return jsonify({'success': False, 'error': f'Keine Berechtigung für: {permission}'}), 403
return f(*args, **kwargs)
return decorated_function
return decorator