new file: file_manager.py modified: requirements.txt new file: sessions_db.json modified: templates/base.html new file: templates/file_manager.html new file: templates/file_manager_new.html new file: templates/login.html new file: templates/profile.html new file: templates/users_management.html new file: user_management.py new file: users_db.json
508 lines
17 KiB
Python
508 lines
17 KiB
Python
"""
|
|
User Management System für App Installer & Manager
|
|
Pterodactyl-ähnliches Benutzer- und Rollensystem
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import hashlib
|
|
import secrets
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
from functools import wraps
|
|
from flask import session, request, redirect, url_for, flash, jsonify
|
|
|
|
# Benutzer-Datenbank Datei
|
|
USERS_DB_FILE = 'users_db.json'
|
|
SESSIONS_DB_FILE = 'sessions_db.json'
|
|
|
|
class UserRole:
|
|
"""Benutzerrollen-Definitionen"""
|
|
ADMIN = 'admin'
|
|
MODERATOR = 'moderator'
|
|
USER = 'user'
|
|
VIEWER = 'viewer'
|
|
|
|
# Rollen-Hierarchie (höhere Zahl = mehr Rechte)
|
|
HIERARCHY = {
|
|
VIEWER: 1,
|
|
USER: 2,
|
|
MODERATOR: 3,
|
|
ADMIN: 4
|
|
}
|
|
|
|
@classmethod
|
|
def get_all_roles(cls):
|
|
return list(cls.HIERARCHY.keys())
|
|
|
|
@classmethod
|
|
def has_permission(cls, user_role, required_role):
|
|
"""Prüfe ob Benutzerrolle ausreichende Berechtigung hat"""
|
|
user_level = cls.HIERARCHY.get(user_role, 0)
|
|
required_level = cls.HIERARCHY.get(required_role, 0)
|
|
return user_level >= required_level
|
|
|
|
class Permission:
|
|
"""Berechtigungs-Definitionen"""
|
|
# Projekt-Berechtigungen
|
|
PROJECT_VIEW = 'project.view'
|
|
PROJECT_CREATE = 'project.create'
|
|
PROJECT_UPDATE = 'project.update'
|
|
PROJECT_DELETE = 'project.delete'
|
|
PROJECT_START = 'project.start'
|
|
PROJECT_STOP = 'project.stop'
|
|
PROJECT_RESTART = 'project.restart'
|
|
PROJECT_LOGS = 'project.logs'
|
|
PROJECT_CONSOLE = 'project.console'
|
|
PROJECT_FILES = 'project.files'
|
|
|
|
# File-Berechtigungen
|
|
FILE_ACCESS = 'file.access'
|
|
FILE_WRITE = 'file.write'
|
|
|
|
# System-Berechtigungen
|
|
SYSTEM_CONFIG = 'system.config'
|
|
SYSTEM_USERS = 'system.users'
|
|
SYSTEM_MONITORING = 'system.monitoring'
|
|
SYSTEM_BACKUPS = 'system.backups'
|
|
SYSTEM_DOCKER = 'system.docker'
|
|
|
|
# Rollen-zu-Berechtigungen Mapping
|
|
ROLE_PERMISSIONS = {
|
|
UserRole.VIEWER: [
|
|
PROJECT_VIEW,
|
|
PROJECT_LOGS,
|
|
FILE_ACCESS
|
|
],
|
|
UserRole.USER: [
|
|
PROJECT_VIEW,
|
|
PROJECT_CREATE,
|
|
PROJECT_UPDATE,
|
|
PROJECT_START,
|
|
PROJECT_STOP,
|
|
PROJECT_RESTART,
|
|
PROJECT_LOGS,
|
|
PROJECT_FILES,
|
|
FILE_ACCESS,
|
|
FILE_WRITE
|
|
],
|
|
UserRole.MODERATOR: [
|
|
PROJECT_VIEW,
|
|
PROJECT_CREATE,
|
|
PROJECT_UPDATE,
|
|
PROJECT_DELETE,
|
|
PROJECT_START,
|
|
PROJECT_STOP,
|
|
PROJECT_RESTART,
|
|
PROJECT_LOGS,
|
|
PROJECT_CONSOLE,
|
|
PROJECT_FILES,
|
|
FILE_ACCESS,
|
|
FILE_WRITE,
|
|
SYSTEM_MONITORING,
|
|
SYSTEM_DOCKER
|
|
],
|
|
UserRole.ADMIN: [
|
|
PROJECT_VIEW,
|
|
PROJECT_CREATE,
|
|
PROJECT_UPDATE,
|
|
PROJECT_DELETE,
|
|
PROJECT_START,
|
|
PROJECT_STOP,
|
|
PROJECT_RESTART,
|
|
PROJECT_LOGS,
|
|
PROJECT_CONSOLE,
|
|
PROJECT_FILES,
|
|
FILE_ACCESS,
|
|
FILE_WRITE,
|
|
SYSTEM_CONFIG,
|
|
SYSTEM_USERS,
|
|
SYSTEM_MONITORING,
|
|
SYSTEM_BACKUPS,
|
|
SYSTEM_DOCKER
|
|
]
|
|
}
|
|
|
|
@classmethod
|
|
def user_has_permission(cls, user_role, permission):
|
|
"""Prüfe ob Benutzer eine spezifische Berechtigung hat"""
|
|
user_permissions = cls.ROLE_PERMISSIONS.get(user_role, [])
|
|
return permission in user_permissions
|
|
|
|
class UserManager:
|
|
"""Benutzer-Management System"""
|
|
|
|
def __init__(self):
|
|
self.users_db = self._load_users_db()
|
|
self.sessions_db = self._load_sessions_db()
|
|
self._ensure_admin_user()
|
|
|
|
def _load_users_db(self):
|
|
"""Lade Benutzer-Datenbank"""
|
|
if os.path.exists(USERS_DB_FILE):
|
|
try:
|
|
with open(USERS_DB_FILE, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
except Exception as e:
|
|
print(f"Fehler beim Laden der Benutzer-DB: {e}")
|
|
|
|
# Standard-Struktur
|
|
return {
|
|
'users': {},
|
|
'created_at': datetime.now().isoformat(),
|
|
'version': '1.0'
|
|
}
|
|
|
|
def _load_sessions_db(self):
|
|
"""Lade Session-Datenbank"""
|
|
if os.path.exists(SESSIONS_DB_FILE):
|
|
try:
|
|
with open(SESSIONS_DB_FILE, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
except Exception as e:
|
|
print(f"Fehler beim Laden der Session-DB: {e}")
|
|
|
|
return {
|
|
'sessions': {},
|
|
'created_at': datetime.now().isoformat()
|
|
}
|
|
|
|
def _save_users_db(self):
|
|
"""Speichere Benutzer-Datenbank"""
|
|
try:
|
|
with open(USERS_DB_FILE, 'w', encoding='utf-8') as f:
|
|
json.dump(self.users_db, f, indent=2, ensure_ascii=False)
|
|
except Exception as e:
|
|
print(f"Fehler beim Speichern der Benutzer-DB: {e}")
|
|
|
|
def _save_sessions_db(self):
|
|
"""Speichere Session-Datenbank"""
|
|
try:
|
|
with open(SESSIONS_DB_FILE, 'w', encoding='utf-8') as f:
|
|
json.dump(self.sessions_db, f, indent=2, ensure_ascii=False)
|
|
except Exception as e:
|
|
print(f"Fehler beim Speichern der Session-DB: {e}")
|
|
|
|
def _ensure_admin_user(self):
|
|
"""Stelle sicher, dass ein Admin-Benutzer existiert"""
|
|
if not self.users_db['users']:
|
|
# Erstelle Standard-Admin
|
|
admin_password = self._generate_secure_password()
|
|
self.create_user(
|
|
username='admin',
|
|
email='admin@localhost',
|
|
password=admin_password,
|
|
role=UserRole.ADMIN,
|
|
first_name='System',
|
|
last_name='Administrator'
|
|
)
|
|
print(f"🔐 Standard-Admin erstellt:")
|
|
print(f" Benutzername: admin")
|
|
print(f" Passwort: {admin_password}")
|
|
print(f" ⚠️ BITTE PASSWORT SOFORT ÄNDERN!")
|
|
|
|
def _generate_secure_password(self, length=12):
|
|
"""Generiere sicheres Passwort"""
|
|
import string
|
|
alphabet = string.ascii_letters + string.digits + "!@#$%^&*"
|
|
return ''.join(secrets.choice(alphabet) for _ in range(length))
|
|
|
|
def _hash_password(self, password, salt=None):
|
|
"""Hash Passwort mit Salt"""
|
|
if salt is None:
|
|
salt = secrets.token_hex(32)
|
|
|
|
# Verwende SHA-256 mit Salt
|
|
password_hash = hashlib.sha256((password + salt).encode()).hexdigest()
|
|
return password_hash, salt
|
|
|
|
def _verify_password(self, password, stored_hash, salt):
|
|
"""Verifiziere Passwort"""
|
|
password_hash, _ = self._hash_password(password, salt)
|
|
return password_hash == stored_hash
|
|
|
|
def create_user(self, username, email, password, role=UserRole.USER,
|
|
first_name='', last_name='', enabled=True):
|
|
"""Erstelle neuen Benutzer"""
|
|
if username in self.users_db['users']:
|
|
return False, f"Benutzer '{username}' existiert bereits"
|
|
|
|
if role not in UserRole.get_all_roles():
|
|
return False, f"Ungültige Rolle: {role}"
|
|
|
|
# Hash Passwort
|
|
password_hash, salt = self._hash_password(password)
|
|
|
|
# Erstelle Benutzer
|
|
user_data = {
|
|
'username': username,
|
|
'email': email,
|
|
'password_hash': password_hash,
|
|
'password_salt': salt,
|
|
'role': role,
|
|
'first_name': first_name,
|
|
'last_name': last_name,
|
|
'enabled': enabled,
|
|
'created_at': datetime.now().isoformat(),
|
|
'last_login': None,
|
|
'login_count': 0,
|
|
'projects_access': [], # Spezifische Projekt-Zugriffe
|
|
'preferences': {
|
|
'theme': 'light',
|
|
'language': 'de',
|
|
'notifications': True
|
|
}
|
|
}
|
|
|
|
self.users_db['users'][username] = user_data
|
|
self._save_users_db()
|
|
|
|
return True, f"Benutzer '{username}' erfolgreich erstellt"
|
|
|
|
def authenticate_user(self, username, password):
|
|
"""Authentifiziere Benutzer"""
|
|
if username not in self.users_db['users']:
|
|
return False, "Benutzername oder Passwort falsch"
|
|
|
|
user = self.users_db['users'][username]
|
|
|
|
# Prüfe ob Benutzer aktiviert ist
|
|
if not user.get('enabled', True):
|
|
return False, "Benutzer ist deaktiviert"
|
|
|
|
# Verifiziere Passwort
|
|
if not self._verify_password(password, user['password_hash'], user['password_salt']):
|
|
return False, "Benutzername oder Passwort falsch"
|
|
|
|
# Update Login-Statistiken
|
|
user['last_login'] = datetime.now().isoformat()
|
|
user['login_count'] = user.get('login_count', 0) + 1
|
|
self._save_users_db()
|
|
|
|
return True, user
|
|
|
|
def create_session(self, username):
|
|
"""Erstelle Session für Benutzer"""
|
|
session_token = secrets.token_urlsafe(32)
|
|
|
|
session_data = {
|
|
'username': username,
|
|
'created_at': datetime.now().isoformat(),
|
|
'last_activity': datetime.now().isoformat(),
|
|
'ip_address': request.environ.get('REMOTE_ADDR', 'unknown'),
|
|
'user_agent': request.environ.get('HTTP_USER_AGENT', 'unknown')
|
|
}
|
|
|
|
self.sessions_db['sessions'][session_token] = session_data
|
|
self._save_sessions_db()
|
|
|
|
# Cleanup alte Sessions (älter als 30 Tage)
|
|
self._cleanup_old_sessions()
|
|
|
|
return session_token
|
|
|
|
def validate_session(self, session_token):
|
|
"""Validiere Session"""
|
|
if session_token not in self.sessions_db['sessions']:
|
|
return False, None
|
|
|
|
session_data = self.sessions_db['sessions'][session_token]
|
|
|
|
# Prüfe Session-Alter (24 Stunden)
|
|
created_at = datetime.fromisoformat(session_data['created_at'])
|
|
if datetime.now() - created_at > timedelta(hours=24):
|
|
self.destroy_session(session_token)
|
|
return False, None
|
|
|
|
# Update letzte Aktivität
|
|
session_data['last_activity'] = datetime.now().isoformat()
|
|
self._save_sessions_db()
|
|
|
|
username = session_data['username']
|
|
user = self.users_db['users'].get(username)
|
|
|
|
if not user or not user.get('enabled', True):
|
|
self.destroy_session(session_token)
|
|
return False, None
|
|
|
|
return True, user
|
|
|
|
def destroy_session(self, session_token):
|
|
"""Zerstöre Session"""
|
|
if session_token in self.sessions_db['sessions']:
|
|
del self.sessions_db['sessions'][session_token]
|
|
self._save_sessions_db()
|
|
|
|
def _cleanup_old_sessions(self):
|
|
"""Cleanup alte Sessions"""
|
|
current_time = datetime.now()
|
|
sessions_to_remove = []
|
|
|
|
for token, session_data in self.sessions_db['sessions'].items():
|
|
created_at = datetime.fromisoformat(session_data['created_at'])
|
|
if current_time - created_at > timedelta(days=30):
|
|
sessions_to_remove.append(token)
|
|
|
|
for token in sessions_to_remove:
|
|
del self.sessions_db['sessions'][token]
|
|
|
|
if sessions_to_remove:
|
|
self._save_sessions_db()
|
|
|
|
def get_user(self, username):
|
|
"""Hole Benutzer-Informationen"""
|
|
return self.users_db['users'].get(username)
|
|
|
|
def get_all_users(self):
|
|
"""Hole alle Benutzer"""
|
|
return self.users_db['users']
|
|
|
|
def update_user(self, username, **kwargs):
|
|
"""Update Benutzer-Informationen"""
|
|
if username not in self.users_db['users']:
|
|
return False, f"Benutzer '{username}' nicht gefunden"
|
|
|
|
user = self.users_db['users'][username]
|
|
|
|
# Erlaubte Update-Felder
|
|
allowed_fields = ['email', 'role', 'first_name', 'last_name', 'enabled', 'projects_access', 'preferences']
|
|
|
|
for field, value in kwargs.items():
|
|
if field in allowed_fields:
|
|
user[field] = value
|
|
|
|
self._save_users_db()
|
|
return True, f"Benutzer '{username}' aktualisiert"
|
|
|
|
def change_password(self, username, old_password, new_password):
|
|
"""Ändere Benutzer-Passwort"""
|
|
if username not in self.users_db['users']:
|
|
return False, "Benutzer nicht gefunden"
|
|
|
|
user = self.users_db['users'][username]
|
|
|
|
# Verifiziere altes Passwort
|
|
if not self._verify_password(old_password, user['password_hash'], user['password_salt']):
|
|
return False, "Altes Passwort ist falsch"
|
|
|
|
# Setze neues Passwort
|
|
password_hash, salt = self._hash_password(new_password)
|
|
user['password_hash'] = password_hash
|
|
user['password_salt'] = salt
|
|
|
|
self._save_users_db()
|
|
return True, "Passwort erfolgreich geändert"
|
|
|
|
def delete_user(self, username):
|
|
"""Lösche Benutzer"""
|
|
if username not in self.users_db['users']:
|
|
return False, f"Benutzer '{username}' nicht gefunden"
|
|
|
|
# Verhindere Löschung des letzten Admins
|
|
if self.users_db['users'][username]['role'] == UserRole.ADMIN:
|
|
admin_count = sum(1 for user in self.users_db['users'].values()
|
|
if user['role'] == UserRole.ADMIN and user.get('enabled', True))
|
|
if admin_count <= 1:
|
|
return False, "Der letzte Administrator kann nicht gelöscht werden"
|
|
|
|
del self.users_db['users'][username]
|
|
self._save_users_db()
|
|
|
|
# Entferne alle Sessions des Benutzers
|
|
sessions_to_remove = [token for token, session_data in self.sessions_db['sessions'].items()
|
|
if session_data['username'] == username]
|
|
for token in sessions_to_remove:
|
|
del self.sessions_db['sessions'][token]
|
|
|
|
if sessions_to_remove:
|
|
self._save_sessions_db()
|
|
|
|
return True, f"Benutzer '{username}' gelöscht"
|
|
|
|
# Globale User Manager Instanz
|
|
user_manager = UserManager()
|
|
|
|
# Decorator für Login-Erfordernis
|
|
def login_required(f):
|
|
"""Decorator: Erfordert Login"""
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
session_token = session.get('session_token')
|
|
if not session_token:
|
|
flash('Sie müssen sich anmelden.', 'warning')
|
|
return redirect(url_for('login'))
|
|
|
|
valid, user = user_manager.validate_session(session_token)
|
|
if not valid:
|
|
session.pop('session_token', None)
|
|
flash('Ihre Session ist abgelaufen.', 'warning')
|
|
return redirect(url_for('login'))
|
|
|
|
# Füge Benutzer-Info zu Request hinzu
|
|
request.current_user = user
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated_function
|
|
|
|
# Decorator für Rollen-Berechtigung
|
|
def role_required(required_role):
|
|
"""Decorator: Erfordert spezifische Rolle"""
|
|
def decorator(f):
|
|
@wraps(f)
|
|
@login_required
|
|
def decorated_function(*args, **kwargs):
|
|
user = request.current_user
|
|
if not UserRole.has_permission(user['role'], required_role):
|
|
flash(f'Keine Berechtigung. Erforderliche Rolle: {required_role}', 'danger')
|
|
return redirect(url_for('index'))
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
return decorator
|
|
|
|
# Decorator für spezifische Berechtigung
|
|
def permission_required(permission):
|
|
"""Decorator: Erfordert spezifische Berechtigung"""
|
|
def decorator(f):
|
|
@wraps(f)
|
|
@login_required
|
|
def decorated_function(*args, **kwargs):
|
|
user = request.current_user
|
|
if not Permission.user_has_permission(user['role'], permission):
|
|
flash(f'Keine Berechtigung für: {permission}', 'danger')
|
|
return redirect(url_for('index'))
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
return decorator
|
|
|
|
# API Helper für JSON-Antworten
|
|
def api_login_required(f):
|
|
"""API Decorator: Erfordert Login für API-Calls"""
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
session_token = session.get('session_token')
|
|
if not session_token:
|
|
return jsonify({'success': False, 'error': 'Login erforderlich'}), 401
|
|
|
|
valid, user = user_manager.validate_session(session_token)
|
|
if not valid:
|
|
session.pop('session_token', None)
|
|
return jsonify({'success': False, 'error': 'Session abgelaufen'}), 401
|
|
|
|
request.current_user = user
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated_function
|
|
|
|
def api_permission_required(permission):
|
|
"""API Decorator: Erfordert spezifische Berechtigung für API-Calls"""
|
|
def decorator(f):
|
|
@wraps(f)
|
|
@api_login_required
|
|
def decorated_function(*args, **kwargs):
|
|
user = request.current_user
|
|
if not Permission.user_has_permission(user['role'], permission):
|
|
return jsonify({'success': False, 'error': f'Keine Berechtigung für: {permission}'}), 403
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
return decorator
|