goalsbreakdown/database.py
2026-05-08 16:23:00 +08:00

219 lines
5.7 KiB
Python

from schema import get_connection
def row_to_dict(row):
if row is None:
return None
return dict(row)
def init_db():
from schema import init_db as _init_db
_init_db()
def get_user_by_username(username):
conn = get_connection()
try:
cur = conn.execute("SELECT * FROM users WHERE username = ?", (username,))
return row_to_dict(cur.fetchone())
finally:
conn.close()
def get_user_by_id(user_id):
conn = get_connection()
try:
cur = conn.execute("SELECT * FROM users WHERE id = ?", (user_id,))
return row_to_dict(cur.fetchone())
finally:
conn.close()
def create_user(username, password_hash, role="user", max_goals=None):
if max_goals is None:
max_goals = 5
conn = get_connection()
try:
cur = conn.execute(
"INSERT INTO users (username, password_hash, role, max_goals) VALUES (?, ?, ?, ?)",
(username, password_hash, role, max_goals)
)
conn.commit()
return row_to_dict(conn.execute("SELECT * FROM users WHERE id = ?", (cur.lastrowid,)).fetchone())
finally:
conn.close()
def update_user(user_id, **kwargs):
if not kwargs:
return
sets = ", ".join(f"{k} = ?" for k in kwargs)
values = list(kwargs.values()) + [user_id]
conn = get_connection()
try:
conn.execute(f"UPDATE users SET {sets} WHERE id = ?", values)
conn.commit()
finally:
conn.close()
def get_all_users():
conn = get_connection()
try:
cur = conn.execute("SELECT * FROM users")
return [row_to_dict(r) for r in cur.fetchall()]
finally:
conn.close()
def get_goals_by_user(user_id):
conn = get_connection()
try:
cur = conn.execute("SELECT * FROM goals WHERE user_id = ?", (user_id,))
return [row_to_dict(r) for r in cur.fetchall()]
finally:
conn.close()
def get_goal_by_id(goal_id):
conn = get_connection()
try:
cur = conn.execute("SELECT * FROM goals WHERE id = ?", (goal_id,))
return row_to_dict(cur.fetchone())
finally:
conn.close()
def create_goal(user_id, title):
conn = get_connection()
try:
cur = conn.execute(
"INSERT INTO goals (user_id, title, activated) VALUES (?, ?, 1)",
(user_id, title)
)
conn.commit()
return row_to_dict(conn.execute("SELECT * FROM goals WHERE id = ?", (cur.lastrowid,)).fetchone())
finally:
conn.close()
def update_goal(goal_id, **kwargs):
if not kwargs:
return
sets = ", ".join(f"{k} = ?" for k in kwargs)
values = list(kwargs.values()) + [goal_id]
conn = get_connection()
try:
conn.execute(f"UPDATE goals SET {sets} WHERE id = ?", values)
conn.commit()
finally:
conn.close()
def delete_goal(goal_id):
conn = get_connection()
try:
conn.execute("DELETE FROM tasks WHERE goal_id = ?", (goal_id,))
conn.execute("DELETE FROM goals WHERE id = ?", (goal_id,))
conn.commit()
finally:
conn.close()
def count_goals_by_user(user_id):
conn = get_connection()
try:
cur = conn.execute("SELECT COUNT(*) FROM goals WHERE user_id = ?", (user_id,))
return cur.fetchone()[0]
finally:
conn.close()
def get_tasks_by_goal(goal_id):
conn = get_connection()
try:
cur = conn.execute("SELECT * FROM tasks WHERE goal_id = ?", (goal_id,))
return [row_to_dict(r) for r in cur.fetchall()]
finally:
conn.close()
def get_task_by_id(task_id):
conn = get_connection()
try:
cur = conn.execute("SELECT * FROM tasks WHERE id = ?", (task_id,))
return row_to_dict(cur.fetchone())
finally:
conn.close()
def create_task(goal_id, title, desc="", status="todo", order=None):
if order is None:
conn = get_connection()
try:
cur = conn.execute(
"""SELECT COALESCE(MAX("order"), 0) + 1.0 FROM tasks
WHERE goal_id = ? AND status != 'done'""",
(goal_id,)
)
order = cur.fetchone()[0]
finally:
conn.close()
conn = get_connection()
try:
cur = conn.execute(
"""INSERT INTO tasks (goal_id, title, desc, status, start_time, finished_time, "order")
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(goal_id, title, desc, status, None, None, order)
)
conn.commit()
return row_to_dict(conn.execute("SELECT * FROM tasks WHERE id = ?", (cur.lastrowid,)).fetchone())
finally:
conn.close()
def update_task(task_id, **kwargs):
if not kwargs:
return
sets = ", ".join(f"{k} = ?" for k in kwargs)
values = list(kwargs.values()) + [task_id]
conn = get_connection()
try:
conn.execute(f"UPDATE tasks SET {sets} WHERE id = ?", values)
conn.commit()
finally:
conn.close()
def delete_task(task_id):
conn = get_connection()
try:
conn.execute("DELETE FROM tasks WHERE id = ?", (task_id,))
conn.commit()
finally:
conn.close()
def get_tasks_sorted(goal_id):
conn = get_connection()
try:
cur = conn.execute(
"""SELECT * FROM tasks WHERE goal_id = ? AND status = 'done'
ORDER BY finished_time DESC""",
(goal_id,)
)
finished = [row_to_dict(r) for r in cur.fetchall()]
cur = conn.execute(
"""SELECT * FROM tasks WHERE goal_id = ? AND status != 'done'
ORDER BY "order" ASC""",
(goal_id,)
)
unfinished = [row_to_dict(r) for r in cur.fetchall()]
return finished + unfinished
finally:
conn.close()