""" User Management System für App Installer & Manager Pterodactyl-ähnliches Benutzer- und Rollensystem """ import os import json import hashlib import secrets import time from datetime import datetime, timedelta from functools import wraps from flask import session, request, redirect, url_for, flash, jsonify # Benutzer-Datenbank Datei USERS_DB_FILE = 'users_db.json' SESSIONS_DB_FILE = 'sessions_db.json' class UserRole: """Benutzerrollen-Definitionen""" ADMIN = 'admin' MODERATOR = 'moderator' USER = 'user' VIEWER = 'viewer' # Rollen-Hierarchie (höhere Zahl = mehr Rechte) HIERARCHY = { VIEWER: 1, USER: 2, MODERATOR: 3, ADMIN: 4 } @classmethod def get_all_roles(cls): return list(cls.HIERARCHY.keys()) @classmethod def has_permission(cls, user_role, required_role): """Prüfe ob Benutzerrolle ausreichende Berechtigung hat""" user_level = cls.HIERARCHY.get(user_role, 0) required_level = cls.HIERARCHY.get(required_role, 0) return user_level >= required_level class Permission: """Berechtigungs-Definitionen""" # Projekt-Berechtigungen PROJECT_VIEW = 'project.view' PROJECT_CREATE = 'project.create' PROJECT_UPDATE = 'project.update' PROJECT_DELETE = 'project.delete' PROJECT_START = 'project.start' PROJECT_STOP = 'project.stop' PROJECT_RESTART = 'project.restart' PROJECT_LOGS = 'project.logs' PROJECT_CONSOLE = 'project.console' PROJECT_FILES = 'project.files' # File-Berechtigungen FILE_ACCESS = 'file.access' FILE_WRITE = 'file.write' # System-Berechtigungen SYSTEM_CONFIG = 'system.config' SYSTEM_USERS = 'system.users' SYSTEM_MONITORING = 'system.monitoring' SYSTEM_BACKUPS = 'system.backups' SYSTEM_DOCKER = 'system.docker' # Rollen-zu-Berechtigungen Mapping ROLE_PERMISSIONS = { UserRole.VIEWER: [ PROJECT_VIEW, PROJECT_LOGS, FILE_ACCESS ], UserRole.USER: [ PROJECT_VIEW, PROJECT_CREATE, PROJECT_UPDATE, PROJECT_START, PROJECT_STOP, PROJECT_RESTART, PROJECT_LOGS, PROJECT_FILES, FILE_ACCESS, FILE_WRITE ], UserRole.MODERATOR: [ PROJECT_VIEW, PROJECT_CREATE, PROJECT_UPDATE, PROJECT_DELETE, PROJECT_START, PROJECT_STOP, PROJECT_RESTART, PROJECT_LOGS, PROJECT_CONSOLE, PROJECT_FILES, FILE_ACCESS, FILE_WRITE, SYSTEM_MONITORING, SYSTEM_DOCKER ], UserRole.ADMIN: [ PROJECT_VIEW, PROJECT_CREATE, PROJECT_UPDATE, PROJECT_DELETE, PROJECT_START, PROJECT_STOP, PROJECT_RESTART, PROJECT_LOGS, PROJECT_CONSOLE, PROJECT_FILES, FILE_ACCESS, FILE_WRITE, SYSTEM_CONFIG, SYSTEM_USERS, SYSTEM_MONITORING, SYSTEM_BACKUPS, SYSTEM_DOCKER ] } @classmethod def user_has_permission(cls, user_role, permission): """Prüfe ob Benutzer eine spezifische Berechtigung hat""" user_permissions = cls.ROLE_PERMISSIONS.get(user_role, []) return permission in user_permissions class UserManager: """Benutzer-Management System""" def __init__(self): self.users_db = self._load_users_db() self.sessions_db = self._load_sessions_db() self._ensure_admin_user() def _load_users_db(self): """Lade Benutzer-Datenbank""" if os.path.exists(USERS_DB_FILE): try: with open(USERS_DB_FILE, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: print(f"Fehler beim Laden der Benutzer-DB: {e}") # Standard-Struktur return { 'users': {}, 'created_at': datetime.now().isoformat(), 'version': '1.0' } def _load_sessions_db(self): """Lade Session-Datenbank""" if os.path.exists(SESSIONS_DB_FILE): try: with open(SESSIONS_DB_FILE, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: print(f"Fehler beim Laden der Session-DB: {e}") return { 'sessions': {}, 'created_at': datetime.now().isoformat() } def _save_users_db(self): """Speichere Benutzer-Datenbank""" try: with open(USERS_DB_FILE, 'w', encoding='utf-8') as f: json.dump(self.users_db, f, indent=2, ensure_ascii=False) except Exception as e: print(f"Fehler beim Speichern der Benutzer-DB: {e}") def _save_sessions_db(self): """Speichere Session-Datenbank""" try: with open(SESSIONS_DB_FILE, 'w', encoding='utf-8') as f: json.dump(self.sessions_db, f, indent=2, ensure_ascii=False) except Exception as e: print(f"Fehler beim Speichern der Session-DB: {e}") def _ensure_admin_user(self): """Stelle sicher, dass ein Admin-Benutzer existiert""" if not self.users_db['users']: # Erstelle Standard-Admin admin_password = self._generate_secure_password() self.create_user( username='admin', email='admin@localhost', password=admin_password, role=UserRole.ADMIN, first_name='System', last_name='Administrator' ) print(f"🔐 Standard-Admin erstellt:") print(f" Benutzername: admin") print(f" Passwort: {admin_password}") print(f" ⚠️ BITTE PASSWORT SOFORT ÄNDERN!") def _generate_secure_password(self, length=12): """Generiere sicheres Passwort""" import string alphabet = string.ascii_letters + string.digits + "!@#$%^&*" return ''.join(secrets.choice(alphabet) for _ in range(length)) def _hash_password(self, password, salt=None): """Hash Passwort mit Salt""" if salt is None: salt = secrets.token_hex(32) # Verwende SHA-256 mit Salt password_hash = hashlib.sha256((password + salt).encode()).hexdigest() return password_hash, salt def _verify_password(self, password, stored_hash, salt): """Verifiziere Passwort""" password_hash, _ = self._hash_password(password, salt) return password_hash == stored_hash def create_user(self, username, email, password, role=UserRole.USER, first_name='', last_name='', enabled=True): """Erstelle neuen Benutzer""" if username in self.users_db['users']: return False, f"Benutzer '{username}' existiert bereits" if role not in UserRole.get_all_roles(): return False, f"Ungültige Rolle: {role}" # Hash Passwort password_hash, salt = self._hash_password(password) # Erstelle Benutzer user_data = { 'username': username, 'email': email, 'password_hash': password_hash, 'password_salt': salt, 'role': role, 'first_name': first_name, 'last_name': last_name, 'enabled': enabled, 'created_at': datetime.now().isoformat(), 'last_login': None, 'login_count': 0, 'projects_access': [], # Spezifische Projekt-Zugriffe 'preferences': { 'theme': 'light', 'language': 'de', 'notifications': True } } self.users_db['users'][username] = user_data self._save_users_db() return True, f"Benutzer '{username}' erfolgreich erstellt" def authenticate_user(self, username, password): """Authentifiziere Benutzer""" if username not in self.users_db['users']: return False, "Benutzername oder Passwort falsch" user = self.users_db['users'][username] # Prüfe ob Benutzer aktiviert ist if not user.get('enabled', True): return False, "Benutzer ist deaktiviert" # Verifiziere Passwort if not self._verify_password(password, user['password_hash'], user['password_salt']): return False, "Benutzername oder Passwort falsch" # Update Login-Statistiken user['last_login'] = datetime.now().isoformat() user['login_count'] = user.get('login_count', 0) + 1 self._save_users_db() return True, user def create_session(self, username): """Erstelle Session für Benutzer""" session_token = secrets.token_urlsafe(32) session_data = { 'username': username, 'created_at': datetime.now().isoformat(), 'last_activity': datetime.now().isoformat(), 'ip_address': request.environ.get('REMOTE_ADDR', 'unknown'), 'user_agent': request.environ.get('HTTP_USER_AGENT', 'unknown') } self.sessions_db['sessions'][session_token] = session_data self._save_sessions_db() # Cleanup alte Sessions (älter als 30 Tage) self._cleanup_old_sessions() return session_token def validate_session(self, session_token): """Validiere Session""" if session_token not in self.sessions_db['sessions']: return False, None session_data = self.sessions_db['sessions'][session_token] # Prüfe Session-Alter (24 Stunden) created_at = datetime.fromisoformat(session_data['created_at']) if datetime.now() - created_at > timedelta(hours=24): self.destroy_session(session_token) return False, None # Update letzte Aktivität session_data['last_activity'] = datetime.now().isoformat() self._save_sessions_db() username = session_data['username'] user = self.users_db['users'].get(username) if not user or not user.get('enabled', True): self.destroy_session(session_token) return False, None return True, user def destroy_session(self, session_token): """Zerstöre Session""" if session_token in self.sessions_db['sessions']: del self.sessions_db['sessions'][session_token] self._save_sessions_db() def _cleanup_old_sessions(self): """Cleanup alte Sessions""" current_time = datetime.now() sessions_to_remove = [] for token, session_data in self.sessions_db['sessions'].items(): created_at = datetime.fromisoformat(session_data['created_at']) if current_time - created_at > timedelta(days=30): sessions_to_remove.append(token) for token in sessions_to_remove: del self.sessions_db['sessions'][token] if sessions_to_remove: self._save_sessions_db() def get_user(self, username): """Hole Benutzer-Informationen""" return self.users_db['users'].get(username) def get_all_users(self): """Hole alle Benutzer""" return self.users_db['users'] def update_user(self, username, **kwargs): """Update Benutzer-Informationen""" if username not in self.users_db['users']: return False, f"Benutzer '{username}' nicht gefunden" user = self.users_db['users'][username] # Erlaubte Update-Felder allowed_fields = ['email', 'role', 'first_name', 'last_name', 'enabled', 'projects_access', 'preferences'] for field, value in kwargs.items(): if field in allowed_fields: user[field] = value self._save_users_db() return True, f"Benutzer '{username}' aktualisiert" def change_password(self, username, old_password, new_password): """Ändere Benutzer-Passwort""" if username not in self.users_db['users']: return False, "Benutzer nicht gefunden" user = self.users_db['users'][username] # Verifiziere altes Passwort if not self._verify_password(old_password, user['password_hash'], user['password_salt']): return False, "Altes Passwort ist falsch" # Setze neues Passwort password_hash, salt = self._hash_password(new_password) user['password_hash'] = password_hash user['password_salt'] = salt self._save_users_db() return True, "Passwort erfolgreich geändert" def delete_user(self, username): """Lösche Benutzer""" if username not in self.users_db['users']: return False, f"Benutzer '{username}' nicht gefunden" # Verhindere Löschung des letzten Admins if self.users_db['users'][username]['role'] == UserRole.ADMIN: admin_count = sum(1 for user in self.users_db['users'].values() if user['role'] == UserRole.ADMIN and user.get('enabled', True)) if admin_count <= 1: return False, "Der letzte Administrator kann nicht gelöscht werden" del self.users_db['users'][username] self._save_users_db() # Entferne alle Sessions des Benutzers sessions_to_remove = [token for token, session_data in self.sessions_db['sessions'].items() if session_data['username'] == username] for token in sessions_to_remove: del self.sessions_db['sessions'][token] if sessions_to_remove: self._save_sessions_db() return True, f"Benutzer '{username}' gelöscht" # Globale User Manager Instanz user_manager = UserManager() # Decorator für Login-Erfordernis def login_required(f): """Decorator: Erfordert Login""" @wraps(f) def decorated_function(*args, **kwargs): session_token = session.get('session_token') if not session_token: flash('Sie müssen sich anmelden.', 'warning') return redirect(url_for('login')) valid, user = user_manager.validate_session(session_token) if not valid: session.pop('session_token', None) flash('Ihre Session ist abgelaufen.', 'warning') return redirect(url_for('login')) # Füge Benutzer-Info zu Request hinzu request.current_user = user return f(*args, **kwargs) return decorated_function # Decorator für Rollen-Berechtigung def role_required(required_role): """Decorator: Erfordert spezifische Rolle""" def decorator(f): @wraps(f) @login_required def decorated_function(*args, **kwargs): user = request.current_user if not UserRole.has_permission(user['role'], required_role): flash(f'Keine Berechtigung. Erforderliche Rolle: {required_role}', 'danger') return redirect(url_for('index')) return f(*args, **kwargs) return decorated_function return decorator # Decorator für spezifische Berechtigung def permission_required(permission): """Decorator: Erfordert spezifische Berechtigung""" def decorator(f): @wraps(f) @login_required def decorated_function(*args, **kwargs): user = request.current_user if not Permission.user_has_permission(user['role'], permission): flash(f'Keine Berechtigung für: {permission}', 'danger') return redirect(url_for('index')) return f(*args, **kwargs) return decorated_function return decorator # API Helper für JSON-Antworten def api_login_required(f): """API Decorator: Erfordert Login für API-Calls""" @wraps(f) def decorated_function(*args, **kwargs): session_token = session.get('session_token') if not session_token: return jsonify({'success': False, 'error': 'Login erforderlich'}), 401 valid, user = user_manager.validate_session(session_token) if not valid: session.pop('session_token', None) return jsonify({'success': False, 'error': 'Session abgelaufen'}), 401 request.current_user = user return f(*args, **kwargs) return decorated_function def api_permission_required(permission): """API Decorator: Erfordert spezifische Berechtigung für API-Calls""" def decorator(f): @wraps(f) @api_login_required def decorated_function(*args, **kwargs): user = request.current_user if not Permission.user_has_permission(user['role'], permission): return jsonify({'success': False, 'error': f'Keine Berechtigung für: {permission}'}), 403 return f(*args, **kwargs) return decorated_function return decorator