155 lines
5.6 KiB
Python
155 lines
5.6 KiB
Python
from __future__ import annotations
|
|
|
|
import io
|
|
import json
|
|
import shutil
|
|
import sqlite3
|
|
import tempfile
|
|
import zipfile
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
|
|
BACKUP_FILENAME_PREFIX = "nouri-backup"
|
|
RESTORE_CONFIRMATION_TEXT = "WIEDERHERSTELLEN"
|
|
|
|
|
|
def list_backup_tables(database: sqlite3.Connection) -> list[str]:
|
|
rows = database.execute(
|
|
"""
|
|
SELECT name
|
|
FROM sqlite_master
|
|
WHERE type = 'table' AND name NOT LIKE 'sqlite_%'
|
|
ORDER BY name
|
|
"""
|
|
).fetchall()
|
|
return [row["name"] for row in rows]
|
|
|
|
|
|
def export_backup_archive(
|
|
database: sqlite3.Connection,
|
|
upload_folder: str | Path,
|
|
app_version: str,
|
|
) -> tuple[str, str]:
|
|
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S")
|
|
backup_name = f"{BACKUP_FILENAME_PREFIX}-{timestamp}.zip"
|
|
temp_handle = tempfile.NamedTemporaryFile(prefix="nouri-backup-", suffix=".zip", delete=False)
|
|
temp_handle.close()
|
|
archive_path = temp_handle.name
|
|
|
|
tables = list_backup_tables(database)
|
|
payload = {
|
|
"meta": {
|
|
"created_at": datetime.now(timezone.utc).isoformat(),
|
|
"app_version": app_version,
|
|
"format_version": 1,
|
|
},
|
|
"tables": {},
|
|
}
|
|
|
|
for table_name in tables:
|
|
rows = database.execute(f"SELECT * FROM {table_name}").fetchall()
|
|
payload["tables"][table_name] = [dict(row) for row in rows]
|
|
|
|
uploads_root = Path(upload_folder)
|
|
with zipfile.ZipFile(archive_path, "w", compression=zipfile.ZIP_DEFLATED) as archive:
|
|
archive.writestr("backup.json", json.dumps(payload, ensure_ascii=False, indent=2))
|
|
if uploads_root.exists():
|
|
for file_path in uploads_root.rglob("*"):
|
|
if file_path.is_file():
|
|
relative_path = file_path.relative_to(uploads_root)
|
|
archive.write(file_path, f"uploads/{relative_path.as_posix()}")
|
|
|
|
return archive_path, backup_name
|
|
|
|
|
|
def _extract_uploads_to_temp(archive: zipfile.ZipFile) -> Path:
|
|
temp_dir = Path(tempfile.mkdtemp(prefix="nouri-restore-uploads-"))
|
|
for member in archive.infolist():
|
|
if not member.filename.startswith("uploads/") or member.is_dir():
|
|
continue
|
|
relative_target = member.filename.removeprefix("uploads/").lstrip("/")
|
|
if not relative_target:
|
|
continue
|
|
target_path = temp_dir / relative_target
|
|
target_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with archive.open(member, "r") as source, target_path.open("wb") as destination:
|
|
shutil.copyfileobj(source, destination)
|
|
return temp_dir
|
|
|
|
|
|
def _replace_uploads(temp_dir: Path, upload_folder: str | Path) -> None:
|
|
upload_root = Path(upload_folder)
|
|
previous_root = upload_root.with_name(f"{upload_root.name}-previous")
|
|
if previous_root.exists():
|
|
shutil.rmtree(previous_root)
|
|
if upload_root.exists():
|
|
upload_root.rename(previous_root)
|
|
upload_root.mkdir(parents=True, exist_ok=True)
|
|
for file_path in temp_dir.rglob("*"):
|
|
if not file_path.is_file():
|
|
continue
|
|
relative_path = file_path.relative_to(temp_dir)
|
|
target_path = upload_root / relative_path
|
|
target_path.parent.mkdir(parents=True, exist_ok=True)
|
|
shutil.copy2(file_path, target_path)
|
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
|
shutil.rmtree(previous_root, ignore_errors=True)
|
|
|
|
|
|
def restore_backup_archive(
|
|
database: sqlite3.Connection,
|
|
upload_folder: str | Path,
|
|
backup_file,
|
|
) -> dict:
|
|
backup_bytes = backup_file.read()
|
|
if not backup_bytes:
|
|
raise ValueError("Bitte ein gültiges Backup auswählen.")
|
|
|
|
with zipfile.ZipFile(io.BytesIO(backup_bytes)) as archive:
|
|
try:
|
|
backup_payload = json.loads(archive.read("backup.json").decode("utf-8"))
|
|
except KeyError as exc:
|
|
raise ValueError("Im Backup fehlt die Datei backup.json.") from exc
|
|
except json.JSONDecodeError as exc:
|
|
raise ValueError("Das Backup konnte nicht gelesen werden.") from exc
|
|
|
|
tables = backup_payload.get("tables")
|
|
if not isinstance(tables, dict):
|
|
raise ValueError("Das Backup enthält keine gültigen Tabellen-Daten.")
|
|
|
|
current_tables = list_backup_tables(database)
|
|
restore_tables = [table for table in current_tables if table in tables]
|
|
|
|
upload_temp_dir = _extract_uploads_to_temp(archive)
|
|
|
|
try:
|
|
database.execute("PRAGMA foreign_keys = OFF")
|
|
try:
|
|
for table_name in reversed(restore_tables):
|
|
database.execute(f"DELETE FROM {table_name}")
|
|
database.execute("DELETE FROM sqlite_sequence")
|
|
|
|
for table_name in restore_tables:
|
|
rows = tables.get(table_name, [])
|
|
if not rows:
|
|
continue
|
|
columns = list(rows[0].keys())
|
|
placeholders = ", ".join(["?"] * len(columns))
|
|
column_list = ", ".join(columns)
|
|
for row in rows:
|
|
values = [row.get(column) for column in columns]
|
|
database.execute(
|
|
f"INSERT INTO {table_name} ({column_list}) VALUES ({placeholders})",
|
|
values,
|
|
)
|
|
finally:
|
|
database.execute("PRAGMA foreign_keys = ON")
|
|
|
|
_replace_uploads(upload_temp_dir, upload_folder)
|
|
except Exception:
|
|
shutil.rmtree(upload_temp_dir, ignore_errors=True)
|
|
raise
|
|
|
|
return backup_payload.get("meta", {})
|