from __future__ import annotations import io import json import shutil import sqlite3 import tempfile import zipfile from datetime import datetime, timezone from pathlib import Path BACKUP_FILENAME_PREFIX = "nouri-backup" RESTORE_CONFIRMATION_TEXT = "WIEDERHERSTELLEN" def list_backup_tables(database: sqlite3.Connection) -> list[str]: rows = database.execute( """ SELECT name FROM sqlite_master WHERE type = 'table' AND name NOT LIKE 'sqlite_%' ORDER BY name """ ).fetchall() return [row["name"] for row in rows] def export_backup_archive( database: sqlite3.Connection, upload_folder: str | Path, app_version: str, ) -> tuple[str, str]: timestamp = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S") backup_name = f"{BACKUP_FILENAME_PREFIX}-{timestamp}.zip" temp_handle = tempfile.NamedTemporaryFile(prefix="nouri-backup-", suffix=".zip", delete=False) temp_handle.close() archive_path = temp_handle.name tables = list_backup_tables(database) payload = { "meta": { "created_at": datetime.now(timezone.utc).isoformat(), "app_version": app_version, "format_version": 1, }, "tables": {}, } for table_name in tables: rows = database.execute(f"SELECT * FROM {table_name}").fetchall() payload["tables"][table_name] = [dict(row) for row in rows] uploads_root = Path(upload_folder) with zipfile.ZipFile(archive_path, "w", compression=zipfile.ZIP_DEFLATED) as archive: archive.writestr("backup.json", json.dumps(payload, ensure_ascii=False, indent=2)) if uploads_root.exists(): for file_path in uploads_root.rglob("*"): if file_path.is_file(): relative_path = file_path.relative_to(uploads_root) archive.write(file_path, f"uploads/{relative_path.as_posix()}") return archive_path, backup_name def _extract_uploads_to_temp(archive: zipfile.ZipFile) -> Path: temp_dir = Path(tempfile.mkdtemp(prefix="nouri-restore-uploads-")) for member in archive.infolist(): if not member.filename.startswith("uploads/") or member.is_dir(): continue relative_target = member.filename.removeprefix("uploads/").lstrip("/") if not relative_target: continue target_path = temp_dir / relative_target target_path.parent.mkdir(parents=True, exist_ok=True) with archive.open(member, "r") as source, target_path.open("wb") as destination: shutil.copyfileobj(source, destination) return temp_dir def _replace_uploads(temp_dir: Path, upload_folder: str | Path) -> None: upload_root = Path(upload_folder) previous_root = upload_root.with_name(f"{upload_root.name}-previous") if previous_root.exists(): shutil.rmtree(previous_root) if upload_root.exists(): upload_root.rename(previous_root) upload_root.mkdir(parents=True, exist_ok=True) for file_path in temp_dir.rglob("*"): if not file_path.is_file(): continue relative_path = file_path.relative_to(temp_dir) target_path = upload_root / relative_path target_path.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(file_path, target_path) shutil.rmtree(temp_dir, ignore_errors=True) shutil.rmtree(previous_root, ignore_errors=True) def restore_backup_archive( database: sqlite3.Connection, upload_folder: str | Path, backup_file, ) -> dict: backup_bytes = backup_file.read() if not backup_bytes: raise ValueError("Bitte ein gültiges Backup auswählen.") with zipfile.ZipFile(io.BytesIO(backup_bytes)) as archive: try: backup_payload = json.loads(archive.read("backup.json").decode("utf-8")) except KeyError as exc: raise ValueError("Im Backup fehlt die Datei backup.json.") from exc except json.JSONDecodeError as exc: raise ValueError("Das Backup konnte nicht gelesen werden.") from exc tables = backup_payload.get("tables") if not isinstance(tables, dict): raise ValueError("Das Backup enthält keine gültigen Tabellen-Daten.") current_tables = list_backup_tables(database) restore_tables = [table for table in current_tables if table in tables] upload_temp_dir = _extract_uploads_to_temp(archive) try: database.execute("PRAGMA foreign_keys = OFF") try: for table_name in reversed(restore_tables): database.execute(f"DELETE FROM {table_name}") database.execute("DELETE FROM sqlite_sequence") for table_name in restore_tables: rows = tables.get(table_name, []) if not rows: continue columns = list(rows[0].keys()) placeholders = ", ".join(["?"] * len(columns)) column_list = ", ".join(columns) for row in rows: values = [row.get(column) for column in columns] database.execute( f"INSERT INTO {table_name} ({column_list}) VALUES ({placeholders})", values, ) finally: database.execute("PRAGMA foreign_keys = ON") _replace_uploads(upload_temp_dir, upload_folder) except Exception: shutil.rmtree(upload_temp_dir, ignore_errors=True) raise return backup_payload.get("meta", {})