from schema import get_connection def row_to_dict(row): if row is None: return None return dict(row) def init_db(): from schema import init_db as _init_db _init_db() def get_user_by_username(username): conn = get_connection() try: cur = conn.execute("SELECT * FROM users WHERE username = ?", (username,)) return row_to_dict(cur.fetchone()) finally: conn.close() def get_user_by_id(user_id): conn = get_connection() try: cur = conn.execute("SELECT * FROM users WHERE id = ?", (user_id,)) return row_to_dict(cur.fetchone()) finally: conn.close() def create_user(username, password_hash, role="user", max_goals=None): if max_goals is None: max_goals = 5 conn = get_connection() try: cur = conn.execute( "INSERT INTO users (username, password_hash, role, max_goals) VALUES (?, ?, ?, ?)", (username, password_hash, role, max_goals) ) conn.commit() return row_to_dict(conn.execute("SELECT * FROM users WHERE id = ?", (cur.lastrowid,)).fetchone()) finally: conn.close() def update_user(user_id, **kwargs): if not kwargs: return sets = ", ".join(f"{k} = ?" for k in kwargs) values = list(kwargs.values()) + [user_id] conn = get_connection() try: conn.execute(f"UPDATE users SET {sets} WHERE id = ?", values) conn.commit() finally: conn.close() def get_all_users(): conn = get_connection() try: cur = conn.execute("SELECT * FROM users") return [row_to_dict(r) for r in cur.fetchall()] finally: conn.close() def get_goals_by_user(user_id): conn = get_connection() try: cur = conn.execute("SELECT * FROM goals WHERE user_id = ?", (user_id,)) return [row_to_dict(r) for r in cur.fetchall()] finally: conn.close() def get_goal_by_id(goal_id): conn = get_connection() try: cur = conn.execute("SELECT * FROM goals WHERE id = ?", (goal_id,)) return row_to_dict(cur.fetchone()) finally: conn.close() def create_goal(user_id, title): conn = get_connection() try: cur = conn.execute( "INSERT INTO goals (user_id, title, activated) VALUES (?, ?, 1)", (user_id, title) ) conn.commit() return row_to_dict(conn.execute("SELECT * FROM goals WHERE id = ?", (cur.lastrowid,)).fetchone()) finally: conn.close() def update_goal(goal_id, **kwargs): if not kwargs: return sets = ", ".join(f"{k} = ?" for k in kwargs) values = list(kwargs.values()) + [goal_id] conn = get_connection() try: conn.execute(f"UPDATE goals SET {sets} WHERE id = ?", values) conn.commit() finally: conn.close() def delete_goal(goal_id): conn = get_connection() try: conn.execute("DELETE FROM tasks WHERE goal_id = ?", (goal_id,)) conn.execute("DELETE FROM goals WHERE id = ?", (goal_id,)) conn.commit() finally: conn.close() def count_goals_by_user(user_id): conn = get_connection() try: cur = conn.execute("SELECT COUNT(*) FROM goals WHERE user_id = ?", (user_id,)) return cur.fetchone()[0] finally: conn.close() def get_tasks_by_goal(goal_id): conn = get_connection() try: cur = conn.execute("SELECT * FROM tasks WHERE goal_id = ?", (goal_id,)) return [row_to_dict(r) for r in cur.fetchall()] finally: conn.close() def get_task_by_id(task_id): conn = get_connection() try: cur = conn.execute("SELECT * FROM tasks WHERE id = ?", (task_id,)) return row_to_dict(cur.fetchone()) finally: conn.close() def create_task(goal_id, title, desc="", status="todo", order=None): if order is None: conn = get_connection() try: cur = conn.execute( """SELECT COALESCE(MAX("order"), 0) + 1.0 FROM tasks WHERE goal_id = ? AND status != 'done'""", (goal_id,) ) order = cur.fetchone()[0] finally: conn.close() conn = get_connection() try: cur = conn.execute( """INSERT INTO tasks (goal_id, title, desc, status, start_time, finished_time, "order") VALUES (?, ?, ?, ?, ?, ?, ?)""", (goal_id, title, desc, status, None, None, order) ) conn.commit() return row_to_dict(conn.execute("SELECT * FROM tasks WHERE id = ?", (cur.lastrowid,)).fetchone()) finally: conn.close() def update_task(task_id, **kwargs): if not kwargs: return sets = ", ".join(f"{k} = ?" for k in kwargs) values = list(kwargs.values()) + [task_id] conn = get_connection() try: conn.execute(f"UPDATE tasks SET {sets} WHERE id = ?", values) conn.commit() finally: conn.close() def delete_task(task_id): conn = get_connection() try: conn.execute("DELETE FROM tasks WHERE id = ?", (task_id,)) conn.commit() finally: conn.close() def get_tasks_sorted(goal_id): conn = get_connection() try: cur = conn.execute( """SELECT * FROM tasks WHERE goal_id = ? AND status = 'done' ORDER BY finished_time DESC""", (goal_id,) ) finished = [row_to_dict(r) for r in cur.fetchall()] cur = conn.execute( """SELECT * FROM tasks WHERE goal_id = ? AND status != 'done' ORDER BY "order" ASC""", (goal_id,) ) unfinished = [row_to_dict(r) for r in cur.fetchall()] return finished + unfinished finally: conn.close()