""" File Management System für App Installer & Manager Ermöglicht Pterodactyl-ähnliche Dateiverwaltung """ import os import shutil import mimetypes import zipfile import tempfile from pathlib import Path from datetime import datetime from werkzeug.utils import secure_filename class FileManager: def __init__(self, base_path): self.base_path = Path(base_path) self.allowed_extensions = { 'text': ['.txt', '.log', '.conf', '.cfg', '.ini', '.env', '.md', '.json', '.xml', '.yaml', '.yml'], 'code': ['.py', '.js', '.html', '.css', '.php', '.java', '.cpp', '.h', '.sql'], 'config': ['.dockerfile', '.dockerignore', '.gitignore', '.htaccess'], 'archive': ['.zip', '.tar', '.gz', '.rar'], 'image': ['.png', '.jpg', '.jpeg', '.gif', '.svg', '.ico'] } # Maximale Dateigröße (in Bytes) self.max_file_size = 50 * 1024 * 1024 # 50MB def get_directory_contents(self, relative_path=""): """Hole Verzeichnisinhalt mit Metadaten""" try: current_path = self.base_path / relative_path # Sicherheitscheck - Pfad darf nicht außerhalb der Basis liegen if not self._is_safe_path(current_path): raise ValueError("Zugriff außerhalb des erlaubten Bereichs") if not current_path.exists(): return {'error': 'Verzeichnis nicht gefunden'} items = [] # Parent Directory Link (außer im Root) if relative_path and relative_path != "": parent_path = str(Path(relative_path).parent) if parent_path == ".": parent_path = "" items.append({ 'name': '..', 'type': 'directory', 'path': parent_path, 'size': 0, 'modified': '', 'permissions': 'read' }) # Verzeichnisse und Dateien auflisten for item in sorted(current_path.iterdir()): try: stat_info = item.stat() relative_item_path = str(item.relative_to(self.base_path)) item_data = { 'name': item.name, 'type': 'directory' if item.is_dir() else 'file', 'path': relative_item_path, 'size': stat_info.st_size if item.is_file() else 0, 'modified': datetime.fromtimestamp(stat_info.st_mtime).strftime('%Y-%m-%d %H:%M:%S'), 'permissions': self._get_permissions(item), 'extension': item.suffix.lower() if item.is_file() else '', 'mime_type': mimetypes.guess_type(str(item))[0] if item.is_file() else None } items.append(item_data) except (PermissionError, OSError): # Dateien ohne Berechtigung überspringen continue return { 'current_path': relative_path, 'items': items, 'total_items': len(items) } except Exception as e: return {'error': str(e)} def read_file(self, relative_path): """Lese Dateiinhalt (nur Text-Dateien)""" try: file_path = self.base_path / relative_path if not self._is_safe_path(file_path): raise ValueError("Zugriff außerhalb des erlaubten Bereichs") if not file_path.exists() or file_path.is_dir(): raise ValueError("Datei nicht gefunden") # Prüfe ob es eine editierbare Datei ist if not self._is_editable_file(file_path): raise ValueError("Dateityp kann nicht bearbeitet werden") # Größencheck if file_path.stat().st_size > self.max_file_size: raise ValueError("Datei zu groß zum Anzeigen") # Versuche UTF-8, dann Latin-1 try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() except UnicodeDecodeError: with open(file_path, 'r', encoding='latin-1') as f: content = f.read() return { 'content': content, 'size': file_path.stat().st_size, 'encoding': 'utf-8', 'editable': True } except Exception as e: return {'error': str(e)} def write_file(self, relative_path, content): """Schreibe Dateiinhalt""" try: file_path = self.base_path / relative_path if not self._is_safe_path(file_path): raise ValueError("Zugriff außerhalb des erlaubten Bereichs") if not self._is_editable_file(file_path): raise ValueError("Dateityp kann nicht bearbeitet werden") # Backup erstellen wenn Datei existiert if file_path.exists(): backup_path = file_path.with_suffix(file_path.suffix + '.backup') shutil.copy2(file_path, backup_path) # Datei schreiben with open(file_path, 'w', encoding='utf-8') as f: f.write(content) return {'success': True, 'message': 'Datei gespeichert'} except Exception as e: return {'error': str(e)} def create_file(self, relative_path, name): """Erstelle neue Datei""" try: dir_path = self.base_path / relative_path file_path = dir_path / secure_filename(name) if not self._is_safe_path(file_path): raise ValueError("Zugriff außerhalb des erlaubten Bereichs") if file_path.exists(): raise ValueError("Datei existiert bereits") # Leere Datei erstellen file_path.touch() return {'success': True, 'message': 'Datei erstellt'} except Exception as e: return {'error': str(e)} def create_directory(self, relative_path, name): """Erstelle neues Verzeichnis""" try: dir_path = self.base_path / relative_path new_dir_path = dir_path / secure_filename(name) if not self._is_safe_path(new_dir_path): raise ValueError("Zugriff außerhalb des erlaubten Bereichs") if new_dir_path.exists(): raise ValueError("Verzeichnis existiert bereits") new_dir_path.mkdir(parents=True) return {'success': True, 'message': 'Verzeichnis erstellt'} except Exception as e: return {'error': str(e)} def delete_item(self, relative_path): """Lösche Datei oder Verzeichnis""" try: item_path = self.base_path / relative_path if not self._is_safe_path(item_path): raise ValueError("Zugriff außerhalb des erlaubten Bereichs") if not item_path.exists(): raise ValueError("Element nicht gefunden") if item_path.is_dir(): shutil.rmtree(item_path) else: item_path.unlink() return {'success': True, 'message': 'Element gelöscht'} except Exception as e: return {'error': str(e)} def rename_item(self, relative_path, new_name): """Benenne Datei oder Verzeichnis um""" try: old_path = self.base_path / relative_path new_path = old_path.parent / secure_filename(new_name) if not self._is_safe_path(old_path) or not self._is_safe_path(new_path): raise ValueError("Zugriff außerhalb des erlaubten Bereichs") if not old_path.exists(): raise ValueError("Element nicht gefunden") if new_path.exists(): raise ValueError("Zielname existiert bereits") old_path.rename(new_path) return {'success': True, 'message': 'Element umbenannt'} except Exception as e: return {'error': str(e)} def upload_file(self, relative_path, file_obj): """Lade Datei hoch""" try: if not file_obj or not file_obj.filename: raise ValueError("Keine Datei ausgewählt") filename = secure_filename(file_obj.filename) upload_path = self.base_path / relative_path / filename if not self._is_safe_path(upload_path): raise ValueError("Zugriff außerhalb des erlaubten Bereichs") # Größencheck file_obj.seek(0, 2) # Springe zum Ende size = file_obj.tell() file_obj.seek(0) # Zurück zum Anfang if size > self.max_file_size: raise ValueError(f"Datei zu groß (max. {self.max_file_size // 1024 // 1024}MB)") # Datei speichern file_obj.save(str(upload_path)) return {'success': True, 'message': 'Datei hochgeladen'} except Exception as e: return {'error': str(e)} def download_file(self, relative_path): """Bereite Datei für Download vor""" try: file_path = self.base_path / relative_path if not self._is_safe_path(file_path): raise ValueError("Zugriff außerhalb des erlaubten Bereichs") if not file_path.exists() or file_path.is_dir(): raise ValueError("Datei nicht gefunden") return { 'path': str(file_path), 'filename': file_path.name, 'size': file_path.stat().st_size } except Exception as e: return {'error': str(e)} def compress_directory(self, relative_path): """Komprimiere Verzeichnis zu ZIP""" try: dir_path = self.base_path / relative_path if not self._is_safe_path(dir_path): raise ValueError("Zugriff außerhalb des erlaubten Bereichs") if not dir_path.exists() or not dir_path.is_dir(): raise ValueError("Verzeichnis nicht gefunden") # Temporäre ZIP-Datei erstellen temp_zip = tempfile.NamedTemporaryFile(delete=False, suffix='.zip') with zipfile.ZipFile(temp_zip.name, 'w', zipfile.ZIP_DEFLATED) as zipf: for file_path in dir_path.rglob('*'): if file_path.is_file(): arcname = file_path.relative_to(dir_path) zipf.write(file_path, arcname) return { 'path': temp_zip.name, 'filename': f"{dir_path.name}.zip" } except Exception as e: return {'error': str(e)} def _is_safe_path(self, path): """Prüfe ob Pfad sicher ist (innerhalb der Basis)""" try: path.resolve().relative_to(self.base_path.resolve()) return True except ValueError: return False def _is_editable_file(self, path): """Prüfe ob Datei editierbar ist""" suffix = path.suffix.lower() for category in ['text', 'code', 'config']: if suffix in self.allowed_extensions[category]: return True return False def _get_permissions(self, path): """Ermittle Dateiberechtigungen (vereinfacht)""" try: if os.access(path, os.W_OK): return 'write' elif os.access(path, os.R_OK): return 'read' else: return 'none' except: return 'unknown'