albs_analytics/build_analytics/build_analytics/db.py

184 lines
6.2 KiB
Python

from datetime import datetime
from typing import Union, Dict, List, Optional
import logging
import psycopg2
from .models.build_db import BuildDB
from .models.build_task_db import BuildTaskDB
from .models.build_node_stat_db import BuildNodeStatDB
from .models.db_config import DbConfig
from .models.web_node_stat_db import WebNodeStatDB
class DB():
def __init__(self, config: DbConfig):
self.__conn = psycopg2.connect(database=config.name,
host=config.host,
user=config.username,
password=config.password,
port=config.port)
def close_conn(self):
self.__conn.close()
def __del__(self):
self.close_conn()
def insert_build(self, build: BuildDB):
sql = '''
INSERT INTO builds(id, url, created_at, finished_at)
VALUES (%s, %s, %s, %s);
'''
cur = self.__conn.cursor()
cur.execute(sql, (build.id, build.url,
build.created_at, build.finished_at))
self.__conn.commit()
def insert_buildtask(self, build_task: BuildTaskDB, web_node_stats: List[WebNodeStatDB],
build_node_stats: List[BuildNodeStatDB]):
cur = self.__conn.cursor()
# inserting build_task
sql = '''
INSERT INTO build_tasks(id, name, build_id, arch_id, started_at, finished_at, status_id)
VALUES (%s, %s, %s, %s, %s, %s, %s);
'''
cur.execute(sql, (build_task.id, build_task.name, build_task.build_id, build_task.arch_id,
build_task.started_at, build_task.finished_at, build_task.status_id))
# inserting web node stats
for stat in web_node_stats:
sql = '''
INSERT INTO web_node_stats (build_task_id, stat_name_id, start_ts, end_ts)
VALUES (%s, %s, %s, %s);
'''
cur.execute(sql, (stat.build_task_id, stat.stat_name_id,
stat.start_ts, stat.end_ts))
logging.debug('raw SQL query: %s', cur.query)
# inserting build node stats
for stat in build_node_stats:
logging.debug('BuildNodeStats: %s', stat)
sql = '''
INSERT INTO build_node_stats(build_task_id, stat_name_id, start_ts, end_ts)
VALUES (%s, %s, %s, %s);
'''
cur.execute(sql, (stat.build_task_id, stat.stat_name_id,
stat.start_ts, stat.end_ts))
logging.debug('raw SQL query: %s', cur.query)
# commiting changes
self.__conn.commit()
def get_latest_build_id(self) -> Union[int, None]:
sql = "SELECT id from builds ORDER BY id DESC LIMIT 1;"
cur = self.__conn.cursor()
cur.execute(sql)
val = cur.fetchone()
if not val:
return None
return int(val[0])
def cleanup_builds(self, oldest_to_keep: datetime) -> int:
params = (int(oldest_to_keep.timestamp()),)
sql = "DELETE FROM builds WHERE created_at < %s;"
cur = self.__conn.cursor()
cur.execute(sql, params)
self.__conn.commit()
return cur.rowcount
def get_unfinished_builds(self) -> Dict[int, Dict[int, int]]:
"""
Getting list of unfinished builds and build_tasks
Dict[build_id, Dict[build_task_id, task_status_id]]
"""
res: Dict[int, Dict[int, int]] = {}
# getting unfinished builds
sql = 'SELECT id FROM builds where finished_at is NULL;'
cur = self.__conn.cursor()
cur.execute(sql)
for row in cur.fetchall():
res[row[0]] = {}
# getting list of unfinished tasks
sql = 'SELECT id, build_id, status_id FROM build_tasks WHERE status_id < 2;'
cur = self.__conn.cursor()
cur.execute(sql)
for row in cur.fetchall():
build_task_id: int = row[0]
build_id: int = row[1]
status_id: int = row[2]
try:
res[build_id][build_task_id] = status_id
except KeyError:
res[build_id] = {build_task_id: status_id}
return res
def update_build(self, build: BuildDB):
sql = '''
UPDATE builds
SET finished_at = %s
WHERE id = %s;
'''
cur = self.__conn.cursor()
cur.execute(sql, (build.finished_at, build.id))
self.__conn.commit()
def update_build_task(self, build_task: BuildTaskDB,
web_node_stats: List[WebNodeStatDB],
build_node_stats: List[BuildNodeStatDB]):
cur = self.__conn.cursor()
# updating build_task
sql = '''
UPDATE build_tasks
SET status_id = %s,
started_at = %s,
finished_at = %s
WHERE id = %s;
'''
cur.execute(sql, (build_task.status_id, build_task.started_at,
build_task.finished_at, build_task.id))
# updating web_node_stats
for stat in web_node_stats:
sql = '''
UPDATE web_node_stats
SET start_ts = %s,
end_ts = %s
WHERE build_task_id = %s;
'''
cur.execute(sql, (stat.start_ts, stat.end_ts, build_task.id))
# updating build_node_stats
for stat in build_node_stats:
sql = '''
UPDATE build_node_stats
SET start_ts = %s,
end_ts = %s
WHERE build_task_id = %s;
'''
cur.execute(sql, (stat.start_ts, stat.end_ts, build_task.id))
# commiting changes
self.__conn.commit()
def get_db_schema_version(self) -> Optional[int]:
sql = '''
SELECT *
FROM schema_version
LIMIT 1;
'''
cur = self.__conn.cursor()
cur.execute(sql)
val = cur.fetchone()
print(val)
if not val:
return None
return int(val[0])