release nouri 0.6.0 polish backup and pwa
This commit is contained in:
+154
@@ -0,0 +1,154 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
import json
|
||||
import shutil
|
||||
import sqlite3
|
||||
import tempfile
|
||||
import zipfile
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
BACKUP_FILENAME_PREFIX = "nouri-backup"
|
||||
RESTORE_CONFIRMATION_TEXT = "WIEDERHERSTELLEN"
|
||||
|
||||
|
||||
def list_backup_tables(database: sqlite3.Connection) -> list[str]:
|
||||
rows = database.execute(
|
||||
"""
|
||||
SELECT name
|
||||
FROM sqlite_master
|
||||
WHERE type = 'table' AND name NOT LIKE 'sqlite_%'
|
||||
ORDER BY name
|
||||
"""
|
||||
).fetchall()
|
||||
return [row["name"] for row in rows]
|
||||
|
||||
|
||||
def export_backup_archive(
|
||||
database: sqlite3.Connection,
|
||||
upload_folder: str | Path,
|
||||
app_version: str,
|
||||
) -> tuple[str, str]:
|
||||
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S")
|
||||
backup_name = f"{BACKUP_FILENAME_PREFIX}-{timestamp}.zip"
|
||||
temp_handle = tempfile.NamedTemporaryFile(prefix="nouri-backup-", suffix=".zip", delete=False)
|
||||
temp_handle.close()
|
||||
archive_path = temp_handle.name
|
||||
|
||||
tables = list_backup_tables(database)
|
||||
payload = {
|
||||
"meta": {
|
||||
"created_at": datetime.now(timezone.utc).isoformat(),
|
||||
"app_version": app_version,
|
||||
"format_version": 1,
|
||||
},
|
||||
"tables": {},
|
||||
}
|
||||
|
||||
for table_name in tables:
|
||||
rows = database.execute(f"SELECT * FROM {table_name}").fetchall()
|
||||
payload["tables"][table_name] = [dict(row) for row in rows]
|
||||
|
||||
uploads_root = Path(upload_folder)
|
||||
with zipfile.ZipFile(archive_path, "w", compression=zipfile.ZIP_DEFLATED) as archive:
|
||||
archive.writestr("backup.json", json.dumps(payload, ensure_ascii=False, indent=2))
|
||||
if uploads_root.exists():
|
||||
for file_path in uploads_root.rglob("*"):
|
||||
if file_path.is_file():
|
||||
relative_path = file_path.relative_to(uploads_root)
|
||||
archive.write(file_path, f"uploads/{relative_path.as_posix()}")
|
||||
|
||||
return archive_path, backup_name
|
||||
|
||||
|
||||
def _extract_uploads_to_temp(archive: zipfile.ZipFile) -> Path:
|
||||
temp_dir = Path(tempfile.mkdtemp(prefix="nouri-restore-uploads-"))
|
||||
for member in archive.infolist():
|
||||
if not member.filename.startswith("uploads/") or member.is_dir():
|
||||
continue
|
||||
relative_target = member.filename.removeprefix("uploads/").lstrip("/")
|
||||
if not relative_target:
|
||||
continue
|
||||
target_path = temp_dir / relative_target
|
||||
target_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with archive.open(member, "r") as source, target_path.open("wb") as destination:
|
||||
shutil.copyfileobj(source, destination)
|
||||
return temp_dir
|
||||
|
||||
|
||||
def _replace_uploads(temp_dir: Path, upload_folder: str | Path) -> None:
|
||||
upload_root = Path(upload_folder)
|
||||
previous_root = upload_root.with_name(f"{upload_root.name}-previous")
|
||||
if previous_root.exists():
|
||||
shutil.rmtree(previous_root)
|
||||
if upload_root.exists():
|
||||
upload_root.rename(previous_root)
|
||||
upload_root.mkdir(parents=True, exist_ok=True)
|
||||
for file_path in temp_dir.rglob("*"):
|
||||
if not file_path.is_file():
|
||||
continue
|
||||
relative_path = file_path.relative_to(temp_dir)
|
||||
target_path = upload_root / relative_path
|
||||
target_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
shutil.copy2(file_path, target_path)
|
||||
shutil.rmtree(temp_dir, ignore_errors=True)
|
||||
shutil.rmtree(previous_root, ignore_errors=True)
|
||||
|
||||
|
||||
def restore_backup_archive(
|
||||
database: sqlite3.Connection,
|
||||
upload_folder: str | Path,
|
||||
backup_file,
|
||||
) -> dict:
|
||||
backup_bytes = backup_file.read()
|
||||
if not backup_bytes:
|
||||
raise ValueError("Bitte ein gültiges Backup auswählen.")
|
||||
|
||||
with zipfile.ZipFile(io.BytesIO(backup_bytes)) as archive:
|
||||
try:
|
||||
backup_payload = json.loads(archive.read("backup.json").decode("utf-8"))
|
||||
except KeyError as exc:
|
||||
raise ValueError("Im Backup fehlt die Datei backup.json.") from exc
|
||||
except json.JSONDecodeError as exc:
|
||||
raise ValueError("Das Backup konnte nicht gelesen werden.") from exc
|
||||
|
||||
tables = backup_payload.get("tables")
|
||||
if not isinstance(tables, dict):
|
||||
raise ValueError("Das Backup enthält keine gültigen Tabellen-Daten.")
|
||||
|
||||
current_tables = list_backup_tables(database)
|
||||
restore_tables = [table for table in current_tables if table in tables]
|
||||
|
||||
upload_temp_dir = _extract_uploads_to_temp(archive)
|
||||
|
||||
try:
|
||||
database.execute("PRAGMA foreign_keys = OFF")
|
||||
try:
|
||||
for table_name in reversed(restore_tables):
|
||||
database.execute(f"DELETE FROM {table_name}")
|
||||
database.execute("DELETE FROM sqlite_sequence")
|
||||
|
||||
for table_name in restore_tables:
|
||||
rows = tables.get(table_name, [])
|
||||
if not rows:
|
||||
continue
|
||||
columns = list(rows[0].keys())
|
||||
placeholders = ", ".join(["?"] * len(columns))
|
||||
column_list = ", ".join(columns)
|
||||
for row in rows:
|
||||
values = [row.get(column) for column in columns]
|
||||
database.execute(
|
||||
f"INSERT INTO {table_name} ({column_list}) VALUES ({placeholders})",
|
||||
values,
|
||||
)
|
||||
finally:
|
||||
database.execute("PRAGMA foreign_keys = ON")
|
||||
|
||||
_replace_uploads(upload_temp_dir, upload_folder)
|
||||
except Exception:
|
||||
shutil.rmtree(upload_temp_dir, ignore_errors=True)
|
||||
raise
|
||||
|
||||
return backup_payload.get("meta", {})
|
||||
Reference in New Issue
Block a user