from datetime import datetime from typing import Union, Dict import psycopg2 from .models.build_db import BuildDB from .models.build_task_db import BuildTaskDB from .models.db_config import DbConfig class DB(): def __init__(self, config: DbConfig): self.__conn = psycopg2.connect(database=config.name, host=config.host, user=config.username, password=config.password, port=config.port) def close_conn(self): self.__conn.close() def __del__(self): self.close_conn() def insert_build(self, build: BuildDB): sql = ''' INSERT INTO builds(id, url, created_at, finished_at) VALUES (%s, %s, %s, %s); ''' cur = self.__conn.cursor() cur.execute(sql, (build.id, build.url, build.created_at, build.finished_at)) self.__conn.commit() def insert_buildtask(self, build_task: BuildTaskDB): sql = ''' INSERT INTO build_tasks(id, name, build_id, arch_id, started_at, finished_at, status_id) VALUES (%s, %s, %s, %s, %s, %s, %s); ''' cur = self.__conn.cursor() cur.execute(sql, (build_task.id, build_task.name, build_task.build_id, build_task.arch_id, build_task.started_at, build_task.finished_at, build_task.status_id)) self.__conn.commit() def get_latest_build_id(self) -> Union[int, None]: sql = "SELECT id from builds ORDER BY id DESC LIMIT 1;" cur = self.__conn.cursor() cur.execute(sql) val = cur.fetchone() if not val: return None return int(val[0]) def cleanup_builds(self, oldest_to_keep: datetime) -> int: params = (int(oldest_to_keep.timestamp()),) sql = "DELETE FROM builds WHERE created_at < %s;" cur = self.__conn.cursor() cur.execute(sql, params) self.__conn.commit() return cur.rowcount def get_unfinished_builds(self) -> Dict[int, Dict[int, int]]: """ Getting list of unfinished builds and build_tasks Dict[build_id, Dict[build_task_id, task_status_id]] """ res: Dict[int, Dict[int, int]] = {} # getting unfinished builds sql = 'SELECT id FROM builds where finished_at is NULL;' cur = self.__conn.cursor() cur.execute(sql) for row in cur.fetchall(): res[row[0]] = {} # getting list of unfinished tasks sql = 'SELECT id, build_id, status_id FROM build_tasks WHERE status_id < 2;' cur = self.__conn.cursor() cur.execute(sql) for row in cur.fetchall(): build_task_id: int = row[0] build_id: int = row[1] status_id: int = row[2] try: res[build_id][build_task_id] = status_id except KeyError: res[build_id] = {build_task_id: status_id} return res def update_build(self, build: BuildDB): sql = ''' UPDATE builds SET finished_at = %s WHERE id = %s; ''' cur = self.__conn.cursor() cur.execute(sql, (build.finished_at, build.id)) self.__conn.commit() def update_build_task(self, build: BuildTaskDB): sql = ''' UPDATE build_tasks SET status_id = %s, started_at = %s, finished_at = %s WHERE id = %s; ''' cur = self.__conn.cursor() cur.execute(sql, (build.status_id, build.started_at, build.finished_at, build.id)) self.__conn.commit()