Files
nouri-App/nouri/backup.py
T

155 lines
5.6 KiB
Python

from __future__ import annotations
import io
import json
import shutil
import sqlite3
import tempfile
import zipfile
from datetime import datetime, timezone
from pathlib import Path
BACKUP_FILENAME_PREFIX = "nouri-backup"
RESTORE_CONFIRMATION_TEXT = "WIEDERHERSTELLEN"
def list_backup_tables(database: sqlite3.Connection) -> list[str]:
rows = database.execute(
"""
SELECT name
FROM sqlite_master
WHERE type = 'table' AND name NOT LIKE 'sqlite_%'
ORDER BY name
"""
).fetchall()
return [row["name"] for row in rows]
def export_backup_archive(
database: sqlite3.Connection,
upload_folder: str | Path,
app_version: str,
) -> tuple[str, str]:
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S")
backup_name = f"{BACKUP_FILENAME_PREFIX}-{timestamp}.zip"
temp_handle = tempfile.NamedTemporaryFile(prefix="nouri-backup-", suffix=".zip", delete=False)
temp_handle.close()
archive_path = temp_handle.name
tables = list_backup_tables(database)
payload = {
"meta": {
"created_at": datetime.now(timezone.utc).isoformat(),
"app_version": app_version,
"format_version": 1,
},
"tables": {},
}
for table_name in tables:
rows = database.execute(f"SELECT * FROM {table_name}").fetchall()
payload["tables"][table_name] = [dict(row) for row in rows]
uploads_root = Path(upload_folder)
with zipfile.ZipFile(archive_path, "w", compression=zipfile.ZIP_DEFLATED) as archive:
archive.writestr("backup.json", json.dumps(payload, ensure_ascii=False, indent=2))
if uploads_root.exists():
for file_path in uploads_root.rglob("*"):
if file_path.is_file():
relative_path = file_path.relative_to(uploads_root)
archive.write(file_path, f"uploads/{relative_path.as_posix()}")
return archive_path, backup_name
def _extract_uploads_to_temp(archive: zipfile.ZipFile) -> Path:
temp_dir = Path(tempfile.mkdtemp(prefix="nouri-restore-uploads-"))
for member in archive.infolist():
if not member.filename.startswith("uploads/") or member.is_dir():
continue
relative_target = member.filename.removeprefix("uploads/").lstrip("/")
if not relative_target:
continue
target_path = temp_dir / relative_target
target_path.parent.mkdir(parents=True, exist_ok=True)
with archive.open(member, "r") as source, target_path.open("wb") as destination:
shutil.copyfileobj(source, destination)
return temp_dir
def _replace_uploads(temp_dir: Path, upload_folder: str | Path) -> None:
upload_root = Path(upload_folder)
previous_root = upload_root.with_name(f"{upload_root.name}-previous")
if previous_root.exists():
shutil.rmtree(previous_root)
if upload_root.exists():
upload_root.rename(previous_root)
upload_root.mkdir(parents=True, exist_ok=True)
for file_path in temp_dir.rglob("*"):
if not file_path.is_file():
continue
relative_path = file_path.relative_to(temp_dir)
target_path = upload_root / relative_path
target_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(file_path, target_path)
shutil.rmtree(temp_dir, ignore_errors=True)
shutil.rmtree(previous_root, ignore_errors=True)
def restore_backup_archive(
database: sqlite3.Connection,
upload_folder: str | Path,
backup_file,
) -> dict:
backup_bytes = backup_file.read()
if not backup_bytes:
raise ValueError("Bitte ein gültiges Backup auswählen.")
with zipfile.ZipFile(io.BytesIO(backup_bytes)) as archive:
try:
backup_payload = json.loads(archive.read("backup.json").decode("utf-8"))
except KeyError as exc:
raise ValueError("Im Backup fehlt die Datei backup.json.") from exc
except json.JSONDecodeError as exc:
raise ValueError("Das Backup konnte nicht gelesen werden.") from exc
tables = backup_payload.get("tables")
if not isinstance(tables, dict):
raise ValueError("Das Backup enthält keine gültigen Tabellen-Daten.")
current_tables = list_backup_tables(database)
restore_tables = [table for table in current_tables if table in tables]
upload_temp_dir = _extract_uploads_to_temp(archive)
try:
database.execute("PRAGMA foreign_keys = OFF")
try:
for table_name in reversed(restore_tables):
database.execute(f"DELETE FROM {table_name}")
database.execute("DELETE FROM sqlite_sequence")
for table_name in restore_tables:
rows = tables.get(table_name, [])
if not rows:
continue
columns = list(rows[0].keys())
placeholders = ", ".join(["?"] * len(columns))
column_list = ", ".join(columns)
for row in rows:
values = [row.get(column) for column in columns]
database.execute(
f"INSERT INTO {table_name} ({column_list}) VALUES ({placeholders})",
values,
)
finally:
database.execute("PRAGMA foreign_keys = ON")
_replace_uploads(upload_temp_dir, upload_folder)
except Exception:
shutil.rmtree(upload_temp_dir, ignore_errors=True)
raise
return backup_payload.get("meta", {})