Files
nouri-App/nouri/db.py
T

309 lines
11 KiB
Python

from __future__ import annotations
import sqlite3
from pathlib import Path
import click
from flask import Flask, current_app, g
from flask.cli import with_appcontext
from werkzeug.security import generate_password_hash
from .constants import DAYPARTS
def get_db() -> sqlite3.Connection:
if "db" not in g:
g.db = sqlite3.connect(
current_app.config["DATABASE_PATH"],
detect_types=sqlite3.PARSE_DECLTYPES,
)
g.db.row_factory = sqlite3.Row
g.db.execute("PRAGMA foreign_keys = ON")
return g.db
def close_db(_error=None) -> None:
database = g.pop("db", None)
if database is not None:
database.close()
def table_columns(database: sqlite3.Connection, table_name: str) -> set[str]:
rows = database.execute(f"PRAGMA table_info({table_name})").fetchall()
return {row["name"] for row in rows}
def add_column_if_missing(database: sqlite3.Connection, table_name: str, definition: str) -> None:
column_name = definition.split()[0]
if column_name not in table_columns(database, table_name):
database.execute(f"ALTER TABLE {table_name} ADD COLUMN {definition}")
def bootstrap_legacy_schema(database: sqlite3.Connection) -> None:
database.execute(
"""
CREATE TABLE IF NOT EXISTS households (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
)
"""
)
existing_tables = {
row["name"]
for row in database.execute(
"SELECT name FROM sqlite_master WHERE type = 'table'"
).fetchall()
}
if "users" in existing_tables:
add_column_if_missing(database, "users", "household_id INTEGER")
add_column_if_missing(database, "users", "email TEXT")
add_column_if_missing(database, "users", "role TEXT NOT NULL DEFAULT 'member'")
add_column_if_missing(database, "users", "is_active INTEGER NOT NULL DEFAULT 1")
add_column_if_missing(database, "users", "updated_at TEXT")
if "items" in existing_tables:
add_column_if_missing(database, "items", "household_id INTEGER")
add_column_if_missing(database, "items", "owner_user_id INTEGER")
add_column_if_missing(database, "items", "visibility TEXT NOT NULL DEFAULT 'shared'")
if "shopping_entries" in existing_tables:
add_column_if_missing(database, "shopping_entries", "household_id INTEGER")
add_column_if_missing(database, "shopping_entries", "owner_user_id INTEGER")
add_column_if_missing(database, "shopping_entries", "visibility TEXT NOT NULL DEFAULT 'shared'")
if "plan_entries" in existing_tables:
add_column_if_missing(database, "plan_entries", "household_id INTEGER")
add_column_if_missing(database, "plan_entries", "owner_user_id INTEGER")
add_column_if_missing(database, "plan_entries", "visibility TEXT NOT NULL DEFAULT 'shared'")
def ensure_default_household(database: sqlite3.Connection) -> int:
household = database.execute(
"SELECT id FROM households ORDER BY id LIMIT 1"
).fetchone()
if household:
return int(household["id"])
database.execute(
"INSERT INTO households (name) VALUES (?)",
("Unser Haushalt",),
)
return int(
database.execute("SELECT id FROM households ORDER BY id LIMIT 1").fetchone()["id"]
)
def first_user_id(database: sqlite3.Connection) -> int | None:
row = database.execute("SELECT id FROM users ORDER BY id LIMIT 1").fetchone()
return int(row["id"]) if row else None
def ensure_schema_upgrades(database: sqlite3.Connection) -> None:
add_column_if_missing(database, "users", "household_id INTEGER")
add_column_if_missing(database, "users", "email TEXT")
add_column_if_missing(database, "users", "role TEXT NOT NULL DEFAULT 'member'")
add_column_if_missing(database, "users", "is_active INTEGER NOT NULL DEFAULT 1")
add_column_if_missing(database, "users", "updated_at TEXT")
default_household_id = ensure_default_household(database)
database.execute(
"UPDATE users SET household_id = ? WHERE household_id IS NULL",
(default_household_id,),
)
database.execute(
"UPDATE users SET role = 'member' WHERE role IS NULL OR role = ''",
)
database.execute(
"UPDATE users SET is_active = 1 WHERE is_active IS NULL",
)
database.execute(
"UPDATE users SET email = NULL WHERE TRIM(COALESCE(email, '')) = ''",
)
database.execute(
"UPDATE users SET updated_at = COALESCE(updated_at, created_at, CURRENT_TIMESTAMP)"
)
admin_row = database.execute(
"SELECT id FROM users WHERE role = 'admin' AND is_active = 1 ORDER BY id LIMIT 1"
).fetchone()
if admin_row is None:
first_id = first_user_id(database)
if first_id is not None:
database.execute(
"UPDATE users SET role = 'admin' WHERE id = ?",
(first_id,),
)
default_owner_id = first_user_id(database)
for table_name in ("items", "shopping_entries", "plan_entries"):
add_column_if_missing(database, table_name, "household_id INTEGER")
add_column_if_missing(database, table_name, "owner_user_id INTEGER")
add_column_if_missing(database, table_name, "visibility TEXT NOT NULL DEFAULT 'shared'")
if default_owner_id is not None:
database.execute(
"""
UPDATE items
SET household_id = COALESCE(household_id, ?),
owner_user_id = COALESCE(owner_user_id, created_by, ?),
visibility = CASE WHEN visibility IS NULL OR visibility = '' THEN 'shared' ELSE visibility END
WHERE household_id IS NULL OR owner_user_id IS NULL OR visibility IS NULL OR visibility = ''
""",
(default_household_id, default_owner_id),
)
database.execute(
"""
UPDATE shopping_entries
SET household_id = COALESCE(household_id, ?),
owner_user_id = COALESCE(owner_user_id, added_by, ?),
visibility = CASE WHEN visibility IS NULL OR visibility = '' THEN 'shared' ELSE visibility END
WHERE household_id IS NULL OR owner_user_id IS NULL OR visibility IS NULL OR visibility = ''
""",
(default_household_id, default_owner_id),
)
database.execute(
"""
UPDATE plan_entries
SET household_id = COALESCE(household_id, ?),
owner_user_id = COALESCE(owner_user_id, created_by, ?),
visibility = CASE WHEN visibility IS NULL OR visibility = '' THEN 'shared' ELSE visibility END
WHERE household_id IS NULL OR owner_user_id IS NULL OR visibility IS NULL OR visibility = ''
""",
(default_household_id, default_owner_id),
)
else:
database.execute(
"UPDATE items SET visibility = 'shared' WHERE visibility IS NULL OR visibility = ''"
)
database.execute(
"UPDATE shopping_entries SET visibility = 'shared' WHERE visibility IS NULL OR visibility = ''"
)
database.execute(
"UPDATE plan_entries SET visibility = 'shared' WHERE visibility IS NULL OR visibility = ''"
)
database.execute(
"""
CREATE UNIQUE INDEX IF NOT EXISTS idx_users_email_unique
ON users (email)
WHERE email IS NOT NULL AND email != ''
"""
)
database.execute(
"""
CREATE INDEX IF NOT EXISTS idx_items_household_visibility
ON items (household_id, visibility, availability_state)
"""
)
database.execute(
"""
CREATE INDEX IF NOT EXISTS idx_plan_entries_household_visibility
ON plan_entries (household_id, visibility, plan_date)
"""
)
database.execute(
"""
CREATE INDEX IF NOT EXISTS idx_shopping_entries_household_visibility
ON shopping_entries (household_id, visibility, is_checked)
"""
)
def apply_schema(database: sqlite3.Connection) -> None:
bootstrap_legacy_schema(database)
schema_path = Path(__file__).with_name("schema.sql")
database.executescript(schema_path.read_text(encoding="utf-8"))
ensure_schema_upgrades(database)
sync_dayparts(database)
def init_db() -> None:
database = get_db()
apply_schema(database)
database.commit()
def sync_dayparts(database: sqlite3.Connection) -> None:
for entry in DAYPARTS:
database.execute(
"""
INSERT OR IGNORE INTO dayparts (slug, name, sort_order)
VALUES (?, ?, ?)
""",
(entry["slug"], entry["name"], entry["sort_order"]),
)
database.execute(
"""
UPDATE dayparts
SET name = ?, sort_order = ?
WHERE slug = ?
""",
(entry["name"], entry["sort_order"], entry["slug"]),
)
def init_db_if_needed(app: Flask) -> None:
with app.app_context():
init_db()
def user_count() -> int:
row = get_db().execute("SELECT COUNT(*) AS count FROM users").fetchone()
return int(row["count"])
def active_admin_count(household_id: int) -> int:
row = get_db().execute(
"""
SELECT COUNT(*) AS count
FROM users
WHERE household_id = ? AND role = 'admin' AND is_active = 1
""",
(household_id,),
).fetchone()
return int(row["count"])
@click.command("init-db")
@with_appcontext
def init_db_command() -> None:
init_db()
click.echo("Database initialized.")
@click.command("create-user")
@click.argument("username")
@click.argument("password")
@click.option("--display-name", default="", help="Friendly display name.")
@click.option("--email", default="", help="Optional email address.")
@click.option("--role", default="member", type=click.Choice(["admin", "member"]))
@with_appcontext
def create_user_command(username: str, password: str, display_name: str, email: str, role: str) -> None:
database = get_db()
household_id = ensure_default_household(database)
database.execute(
"""
INSERT INTO users (household_id, username, email, display_name, role, password_hash)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
household_id,
username.strip().lower(),
email.strip().lower() or None,
display_name.strip(),
role,
generate_password_hash(password),
),
)
database.commit()
click.echo(f"User '{username}' created.")
def init_app(app: Flask) -> None:
app.teardown_appcontext(close_db)
app.cli.add_command(init_db_command)
app.cli.add_command(create_user_command)