from datetime import datetime from typing import Union, Dict, List, Optional import logging import psycopg2 from .models.build_db import BuildDB from .models.build_task_db import BuildTaskDB from .models.build_node_stat_db import BuildNodeStatDB from .models.db_config import DbConfig from .models.web_node_stat_db import WebNodeStatDB from .models.test_task_db import TestTaskDB class DB(): def __init__(self, config: DbConfig): self.__conn = psycopg2.connect(database=config.name, host=config.host, user=config.username, password=config.password, port=config.port) def close_conn(self): self.__conn.close() def __del__(self): self.close_conn() def row_exists(self, pk: int, table: str) -> bool: assert table in ['builds', 'test_tasks'] sql = f''' SELECT COUNT(id) FROM {table} WHERE id = %s; ''' cur = self.__conn.cursor() cur.execute(sql, (pk,)) val = int(cur.fetchone()[0]) return val == 1 def insert_build(self, build: BuildDB): sql = ''' INSERT INTO builds(id, url, created_at, finished_at) VALUES (%s, %s, %s, %s); ''' cur = self.__conn.cursor() cur.execute(sql, (build.id, build.url, build.created_at, build.finished_at)) self.__conn.commit() def insert_buildtask(self, build_task: BuildTaskDB, web_node_stats: List[WebNodeStatDB], build_node_stats: List[BuildNodeStatDB]): cur = self.__conn.cursor() # inserting build_task sql = ''' INSERT INTO build_tasks(id, name, build_id, arch_id, started_at, finished_at, status_id) VALUES (%s, %s, %s, %s, %s, %s, %s); ''' cur.execute(sql, (build_task.id, build_task.name, build_task.build_id, build_task.arch_id, build_task.started_at, build_task.finished_at, build_task.status_id)) # inserting web node stats for stat in web_node_stats: # do not insert empty stats if stat.start_ts is None: continue sql = ''' INSERT INTO web_node_stats (build_task_id, stat_name_id, start_ts, end_ts) VALUES (%s, %s, %s, %s); ''' cur.execute(sql, (stat.build_task_id, stat.stat_name_id, stat.start_ts, stat.end_ts)) logging.debug('raw SQL query: %s', cur.query) self.__conn.commit() # inserting build node stats for stat in build_node_stats: # do not insert empty stats if stat.start_ts is None: continue sql = ''' INSERT INTO build_node_stats(build_task_id, stat_name_id, start_ts, end_ts) VALUES (%s, %s, %s, %s); ''' cur.execute(sql, (stat.build_task_id, stat.stat_name_id, stat.start_ts, stat.end_ts)) logging.debug('raw SQL query: %s', cur.query) # commiting changes self.__conn.commit() def get_latest_build_id(self) -> Union[int, None]: sql = "SELECT id from builds ORDER BY id DESC LIMIT 1;" cur = self.__conn.cursor() cur.execute(sql) val = cur.fetchone() if not val: return None return int(val[0]) def cleanup_builds(self, oldest_to_keep: datetime) -> int: params = (int(oldest_to_keep.timestamp()),) sql = "DELETE FROM builds WHERE created_at < %s;" cur = self.__conn.cursor() cur.execute(sql, params) self.__conn.commit() return cur.rowcount def get_unfinished_builds(self, not_before: datetime) -> Dict[int, Dict[int, int]]: """ Getting list of unfinished builds and build_tasks Dict[build_id, Dict[build_task_id, task_status_id]] """ res: Dict[int, Dict[int, int]] = {} # getting unfinished builds sql = 'SELECT id FROM builds where finished_at is NULL AND created_at > %s;' cur = self.__conn.cursor() cur.execute(sql, (not_before.timestamp(),)) logging.debug('raw SQL query: %s', cur.query) for row in cur.fetchall(): res[row[0]] = {} # getting list of unfinished tasks sql = 'SELECT id, build_id, status_id FROM build_tasks WHERE status_id < 2;' cur = self.__conn.cursor() cur.execute(sql) for row in cur.fetchall(): build_task_id: int = row[0] build_id: int = row[1] status_id: int = row[2] try: res[build_id][build_task_id] = status_id except KeyError: res[build_id] = {build_task_id: status_id} return res def update_build(self, build: BuildDB): sql = ''' UPDATE builds SET finished_at = %s WHERE id = %s; ''' cur = self.__conn.cursor() cur.execute(sql, (build.finished_at, build.id)) self.__conn.commit() def update_build_task(self, build_task: BuildTaskDB, web_node_stats: List[WebNodeStatDB], build_node_stats: List[BuildNodeStatDB]): cur = self.__conn.cursor() sql = ''' UPDATE build_tasks SET status_id = %s, started_at = %s, finished_at = %s WHERE id = %s; ''' cur.execute(sql, (build_task.status_id, build_task.started_at, build_task.finished_at, build_task.id)) logging.debug('raw SQL query: %s', cur.query) # updating web_node_stats for stat in web_node_stats: logging.debug( 'updating web_node_stats %s build_task %s', stat.stat_name_id, build_task.id) if self.stat_exists(task_id=stat.build_task_id, stat_name_id=stat.stat_name_id, table_name='web_node_stats', column_name='build_task_id'): sql = ''' UPDATE web_node_stats SET start_ts = %(start_ts)s, end_ts = %(end_ts)s WHERE build_task_id = %(build_task_id)s AND stat_name_id = %(stat_name_id)s ''' else: sql = ''' INSERT INTO web_node_stats(build_task_id, stat_name_id, start_ts, end_ts) VALUES (%(build_task_id)s, %(stat_name_id)s, %(start_ts)s, %(end_ts)s); ''' params = {'build_task_id': build_task.id, 'stat_name_id': stat.stat_name_id, 'start_ts': stat.start_ts, 'end_ts': stat.end_ts} cur.execute(sql, params) logging.debug('raw SQL query: %s', cur.query) # updating build_node_stats for stat in build_node_stats: logging.debug( 'updating build_node_stats %s build_task %s', stat.stat_name_id, build_task.id) if self.stat_exists(task_id=stat.build_task_id, stat_name_id=stat.stat_name_id, table_name='build_node_stats', column_name='build_task_id'): sql = ''' UPDATE build_node_stats SET start_ts = %(start_ts)s, end_ts = %(end_ts)s WHERE build_task_id = %(build_task_id)s AND stat_name_id = %(stat_name_id)s ''' else: sql = ''' INSERT INTO build_node_stats(build_task_id, stat_name_id, start_ts, end_ts) VALUES (%(build_task_id)s, %(stat_name_id)s, %(start_ts)s, %(end_ts)s); ''' params = {'build_task_id': build_task.id, 'stat_name_id': stat.stat_name_id, 'start_ts': stat.start_ts, 'end_ts': stat.end_ts} logging.debug('raw SQL query: %s', cur.query) cur.execute(sql, params) # commiting changes self.__conn.commit() def get_db_schema_version(self) -> Optional[int]: sql = ''' SELECT * FROM schema_version LIMIT 1; ''' cur = self.__conn.cursor() cur.execute(sql) val = cur.fetchone() if not val: return None return int(val[0]) def stat_exists(self, task_id: int, stat_name_id: int, table_name: str, column_name: str) -> bool: sql = f''' SELECT COUNT({column_name}) FROM {table_name} WHERE {column_name} = %s AND stat_name_id = %s; ''' cur = self.__conn.cursor() cur.execute(sql, (task_id, stat_name_id)) val = int(cur.fetchone()[0]) return val == 1 def get_build_tasks_for_unfinished_tests(self, not_before: datetime) -> List[int]: ''' getting build tasks id of unfinished test tasks ''' cur = self.__conn.cursor() sql = ''' SELECT DISTINCT bt.id FROM build_tasks as bt INNER JOIN test_tasks AS tt ON bt.id = tt.build_task_id WHERE tt.status_id < 3 AND bt.started_at > %s; ''' cur.execute(sql, (not_before.timestamp(),)) logging.debug('raw SQL query: %s', cur.query) result = [int(row[0]) for row in cur.fetchall()] return result def insert_update_test_tasks(self, test_tasks: List[TestTaskDB]): cur = self.__conn.cursor() # test tasks for task in test_tasks: if self.row_exists(pk=task.id, table='test_tasks'): sql = ''' UPDATE test_tasks SET revision = %s, status_id = %s, started_at = %s WHERE id = %s; ''' cur.execute(sql, (task.revision, task.status_id, task.started_at, task.id)) assert cur.rowcount == 1 else: sql = ''' INSERT INTO test_tasks( id, build_task_id, revision, status_id, package_fullname, started_at) VALUES (%s, %s, %s, %s, %s, %s); ''' cur.execute(sql, (task.id, task.build_task_id, task.revision, task.status_id, task.package_fullname, task.started_at)) # test step if not task.steps_stats: continue for s in task.steps_stats: logging.debug('test_task_id %s, stat_name_id %s', s.test_task_id, s.stat_name_id) if self.stat_exists(s.test_task_id, s.stat_name_id, 'test_steps_stats', 'test_task_id'): sql = ''' UPDATE test_steps_stats SET start_ts = %s, finish_ts = %s WHERE test_task_id = %s AND stat_name_id = %s; ''' cur.execute(sql, (s.start_ts, s.finish_ts, s.test_task_id, s.stat_name_id)) assert cur.rowcount == 1 else: sql = ''' INSERT INTO test_steps_stats ( test_task_id, stat_name_id, start_ts, finish_ts) VALUES (%s, %s, %s, %s); ''' cur.execute(sql, (s.test_task_id, s.stat_name_id, s.start_ts, s.finish_ts)) # commiting changes self.__conn.commit()