335 lines
12 KiB
Python
335 lines
12 KiB
Python
from __future__ import annotations
|
|
|
|
import sqlite3
|
|
from pathlib import Path
|
|
|
|
import click
|
|
from flask import Flask, current_app, g
|
|
from flask.cli import with_appcontext
|
|
from werkzeug.security import generate_password_hash
|
|
|
|
from .constants import DAYPARTS, DEFAULT_CATEGORIES
|
|
|
|
|
|
def get_db() -> sqlite3.Connection:
|
|
if "db" not in g:
|
|
g.db = sqlite3.connect(
|
|
current_app.config["DATABASE_PATH"],
|
|
detect_types=sqlite3.PARSE_DECLTYPES,
|
|
)
|
|
g.db.row_factory = sqlite3.Row
|
|
g.db.execute("PRAGMA foreign_keys = ON")
|
|
return g.db
|
|
|
|
|
|
def close_db(_error=None) -> None:
|
|
database = g.pop("db", None)
|
|
if database is not None:
|
|
database.close()
|
|
|
|
|
|
def table_columns(database: sqlite3.Connection, table_name: str) -> set[str]:
|
|
rows = database.execute(f"PRAGMA table_info({table_name})").fetchall()
|
|
return {row["name"] for row in rows}
|
|
|
|
|
|
def table_exists(database: sqlite3.Connection, table_name: str) -> bool:
|
|
row = database.execute(
|
|
"SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?",
|
|
(table_name,),
|
|
).fetchone()
|
|
return row is not None
|
|
|
|
|
|
def add_column_if_missing(database: sqlite3.Connection, table_name: str, definition: str) -> None:
|
|
column_name = definition.split()[0]
|
|
if column_name not in table_columns(database, table_name):
|
|
database.execute(f"ALTER TABLE {table_name} ADD COLUMN {definition}")
|
|
|
|
|
|
def bootstrap_legacy_schema(database: sqlite3.Connection) -> None:
|
|
database.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS households (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT NOT NULL,
|
|
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
"""
|
|
)
|
|
|
|
database.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS household_categories (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
household_id INTEGER NOT NULL,
|
|
name TEXT NOT NULL,
|
|
sort_order INTEGER NOT NULL DEFAULT 100,
|
|
is_active INTEGER NOT NULL DEFAULT 1,
|
|
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
UNIQUE (household_id, name)
|
|
)
|
|
"""
|
|
)
|
|
|
|
if table_exists(database, "users"):
|
|
add_column_if_missing(database, "users", "household_id INTEGER")
|
|
add_column_if_missing(database, "users", "email TEXT")
|
|
add_column_if_missing(database, "users", "role TEXT NOT NULL DEFAULT 'member'")
|
|
add_column_if_missing(database, "users", "is_active INTEGER NOT NULL DEFAULT 1")
|
|
add_column_if_missing(database, "users", "updated_at TEXT")
|
|
|
|
if table_exists(database, "items"):
|
|
add_column_if_missing(database, "items", "household_id INTEGER")
|
|
add_column_if_missing(database, "items", "owner_user_id INTEGER")
|
|
add_column_if_missing(database, "items", "target_user_id INTEGER")
|
|
add_column_if_missing(database, "items", "visibility TEXT NOT NULL DEFAULT 'shared'")
|
|
|
|
if table_exists(database, "shopping_entries"):
|
|
add_column_if_missing(database, "shopping_entries", "household_id INTEGER")
|
|
add_column_if_missing(database, "shopping_entries", "owner_user_id INTEGER")
|
|
add_column_if_missing(database, "shopping_entries", "visibility TEXT NOT NULL DEFAULT 'shared'")
|
|
add_column_if_missing(database, "shopping_entries", "needed_for_date TEXT")
|
|
add_column_if_missing(database, "shopping_entries", "needed_for_daypart_id INTEGER")
|
|
|
|
if table_exists(database, "plan_entries"):
|
|
add_column_if_missing(database, "plan_entries", "household_id INTEGER")
|
|
add_column_if_missing(database, "plan_entries", "owner_user_id INTEGER")
|
|
add_column_if_missing(database, "plan_entries", "visibility TEXT NOT NULL DEFAULT 'shared'")
|
|
|
|
|
|
def ensure_default_household(database: sqlite3.Connection) -> int:
|
|
household = database.execute(
|
|
"SELECT id FROM households ORDER BY id LIMIT 1"
|
|
).fetchone()
|
|
if household:
|
|
return int(household["id"])
|
|
|
|
database.execute(
|
|
"INSERT INTO households (name) VALUES (?)",
|
|
("Unser Haushalt",),
|
|
)
|
|
return int(database.execute("SELECT id FROM households ORDER BY id LIMIT 1").fetchone()["id"])
|
|
|
|
|
|
def household_ids(database: sqlite3.Connection) -> list[int]:
|
|
return [int(row["id"]) for row in database.execute("SELECT id FROM households ORDER BY id").fetchall()]
|
|
|
|
|
|
def first_user_id(database: sqlite3.Connection) -> int | None:
|
|
row = database.execute("SELECT id FROM users ORDER BY id LIMIT 1").fetchone()
|
|
return int(row["id"]) if row else None
|
|
|
|
|
|
def sync_default_categories(database: sqlite3.Connection) -> None:
|
|
for household_id in household_ids(database):
|
|
for sort_order, name in enumerate(DEFAULT_CATEGORIES, start=10):
|
|
database.execute(
|
|
"""
|
|
INSERT OR IGNORE INTO household_categories (household_id, name, sort_order, is_active)
|
|
VALUES (?, ?, ?, 1)
|
|
""",
|
|
(household_id, name, sort_order),
|
|
)
|
|
|
|
|
|
def ensure_schema_upgrades(database: sqlite3.Connection) -> None:
|
|
add_column_if_missing(database, "users", "household_id INTEGER")
|
|
add_column_if_missing(database, "users", "email TEXT")
|
|
add_column_if_missing(database, "users", "role TEXT NOT NULL DEFAULT 'member'")
|
|
add_column_if_missing(database, "users", "is_active INTEGER NOT NULL DEFAULT 1")
|
|
add_column_if_missing(database, "users", "updated_at TEXT")
|
|
|
|
default_household_id = ensure_default_household(database)
|
|
database.execute(
|
|
"UPDATE users SET household_id = ? WHERE household_id IS NULL",
|
|
(default_household_id,),
|
|
)
|
|
database.execute("UPDATE users SET role = 'member' WHERE role IS NULL OR role = ''")
|
|
database.execute("UPDATE users SET is_active = 1 WHERE is_active IS NULL")
|
|
database.execute("UPDATE users SET email = NULL WHERE TRIM(COALESCE(email, '')) = ''")
|
|
database.execute("UPDATE users SET updated_at = COALESCE(updated_at, created_at, CURRENT_TIMESTAMP)")
|
|
|
|
admin_row = database.execute(
|
|
"SELECT id FROM users WHERE role = 'admin' AND is_active = 1 ORDER BY id LIMIT 1"
|
|
).fetchone()
|
|
if admin_row is None:
|
|
first_id = first_user_id(database)
|
|
if first_id is not None:
|
|
database.execute("UPDATE users SET role = 'admin' WHERE id = ?", (first_id,))
|
|
|
|
default_owner_id = first_user_id(database)
|
|
for table_name in ("items", "shopping_entries", "plan_entries"):
|
|
add_column_if_missing(database, table_name, "household_id INTEGER")
|
|
add_column_if_missing(database, table_name, "owner_user_id INTEGER")
|
|
add_column_if_missing(database, table_name, "visibility TEXT NOT NULL DEFAULT 'shared'")
|
|
add_column_if_missing(database, "items", "target_user_id INTEGER")
|
|
add_column_if_missing(database, "shopping_entries", "needed_for_date TEXT")
|
|
add_column_if_missing(database, "shopping_entries", "needed_for_daypart_id INTEGER")
|
|
|
|
if default_owner_id is not None:
|
|
database.execute(
|
|
"""
|
|
UPDATE items
|
|
SET household_id = COALESCE(household_id, ?),
|
|
owner_user_id = COALESCE(owner_user_id, created_by, ?),
|
|
visibility = CASE WHEN visibility IS NULL OR visibility = '' THEN 'shared' ELSE visibility END
|
|
WHERE household_id IS NULL OR owner_user_id IS NULL OR visibility IS NULL OR visibility = ''
|
|
""",
|
|
(default_household_id, default_owner_id),
|
|
)
|
|
database.execute(
|
|
"""
|
|
UPDATE shopping_entries
|
|
SET household_id = COALESCE(household_id, ?),
|
|
owner_user_id = COALESCE(owner_user_id, added_by, ?),
|
|
visibility = CASE WHEN visibility IS NULL OR visibility = '' THEN 'shared' ELSE visibility END
|
|
WHERE household_id IS NULL OR owner_user_id IS NULL OR visibility IS NULL OR visibility = ''
|
|
""",
|
|
(default_household_id, default_owner_id),
|
|
)
|
|
database.execute(
|
|
"""
|
|
UPDATE plan_entries
|
|
SET household_id = COALESCE(household_id, ?),
|
|
owner_user_id = COALESCE(owner_user_id, created_by, ?),
|
|
visibility = CASE WHEN visibility IS NULL OR visibility = '' THEN 'shared' ELSE visibility END
|
|
WHERE household_id IS NULL OR owner_user_id IS NULL OR visibility IS NULL OR visibility = ''
|
|
""",
|
|
(default_household_id, default_owner_id),
|
|
)
|
|
else:
|
|
database.execute("UPDATE items SET visibility = 'shared' WHERE visibility IS NULL OR visibility = ''")
|
|
database.execute("UPDATE shopping_entries SET visibility = 'shared' WHERE visibility IS NULL OR visibility = ''")
|
|
database.execute("UPDATE plan_entries SET visibility = 'shared' WHERE visibility IS NULL OR visibility = ''")
|
|
|
|
sync_default_categories(database)
|
|
|
|
database.execute(
|
|
"""
|
|
CREATE UNIQUE INDEX IF NOT EXISTS idx_users_email_unique
|
|
ON users (email)
|
|
WHERE email IS NOT NULL AND email != ''
|
|
"""
|
|
)
|
|
database.execute(
|
|
"""
|
|
CREATE INDEX IF NOT EXISTS idx_items_household_visibility
|
|
ON items (household_id, visibility, availability_state)
|
|
"""
|
|
)
|
|
database.execute(
|
|
"""
|
|
CREATE INDEX IF NOT EXISTS idx_items_target_user
|
|
ON items (target_user_id)
|
|
"""
|
|
)
|
|
database.execute(
|
|
"""
|
|
CREATE INDEX IF NOT EXISTS idx_plan_entries_household_visibility
|
|
ON plan_entries (household_id, visibility, plan_date)
|
|
"""
|
|
)
|
|
database.execute(
|
|
"""
|
|
CREATE INDEX IF NOT EXISTS idx_shopping_entries_household_visibility
|
|
ON shopping_entries (household_id, visibility, is_checked)
|
|
"""
|
|
)
|
|
|
|
|
|
def apply_schema(database: sqlite3.Connection) -> None:
|
|
bootstrap_legacy_schema(database)
|
|
schema_path = Path(__file__).with_name("schema.sql")
|
|
database.executescript(schema_path.read_text(encoding="utf-8"))
|
|
ensure_schema_upgrades(database)
|
|
sync_dayparts(database)
|
|
|
|
|
|
def init_db() -> None:
|
|
database = get_db()
|
|
apply_schema(database)
|
|
database.commit()
|
|
|
|
|
|
def sync_dayparts(database: sqlite3.Connection) -> None:
|
|
for entry in DAYPARTS:
|
|
database.execute(
|
|
"""
|
|
INSERT OR IGNORE INTO dayparts (slug, name, sort_order)
|
|
VALUES (?, ?, ?)
|
|
""",
|
|
(entry["slug"], entry["name"], entry["sort_order"]),
|
|
)
|
|
database.execute(
|
|
"""
|
|
UPDATE dayparts
|
|
SET name = ?, sort_order = ?
|
|
WHERE slug = ?
|
|
""",
|
|
(entry["name"], entry["sort_order"], entry["slug"]),
|
|
)
|
|
|
|
|
|
def init_db_if_needed(app: Flask) -> None:
|
|
with app.app_context():
|
|
init_db()
|
|
|
|
|
|
def user_count() -> int:
|
|
row = get_db().execute("SELECT COUNT(*) AS count FROM users").fetchone()
|
|
return int(row["count"])
|
|
|
|
|
|
def active_admin_count(household_id: int) -> int:
|
|
row = get_db().execute(
|
|
"""
|
|
SELECT COUNT(*) AS count
|
|
FROM users
|
|
WHERE household_id = ? AND role = 'admin' AND is_active = 1
|
|
""",
|
|
(household_id,),
|
|
).fetchone()
|
|
return int(row["count"])
|
|
|
|
|
|
@click.command("init-db")
|
|
@with_appcontext
|
|
def init_db_command() -> None:
|
|
init_db()
|
|
click.echo("Database initialized.")
|
|
|
|
|
|
@click.command("create-user")
|
|
@click.argument("username")
|
|
@click.argument("password")
|
|
@click.option("--display-name", default="", help="Friendly display name.")
|
|
@click.option("--email", default="", help="Optional email address.")
|
|
@click.option("--role", default="member", type=click.Choice(["admin", "member"]))
|
|
@with_appcontext
|
|
def create_user_command(username: str, password: str, display_name: str, email: str, role: str) -> None:
|
|
database = get_db()
|
|
household_id = ensure_default_household(database)
|
|
database.execute(
|
|
"""
|
|
INSERT INTO users (household_id, username, email, display_name, role, password_hash)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
household_id,
|
|
username.strip().lower(),
|
|
email.strip().lower() or None,
|
|
display_name.strip(),
|
|
role,
|
|
generate_password_hash(password),
|
|
),
|
|
)
|
|
database.commit()
|
|
click.echo(f"User '{username}' created.")
|
|
|
|
|
|
def init_app(app: Flask) -> None:
|
|
app.teardown_appcontext(close_db)
|
|
app.cli.add_command(init_db_command)
|
|
app.cli.add_command(create_user_command)
|