albs_analytics/build_analytics/build_analytics/db.py

321 lines
12 KiB
Python

from datetime import datetime
from typing import Union, Dict, List, Optional
import logging
import psycopg2
from .models.build_db import BuildDB
from .models.build_task_db import BuildTaskDB
from .models.build_node_stat_db import BuildNodeStatDB
from .models.db_config import DbConfig
from .models.web_node_stat_db import WebNodeStatDB
from .models.test_task_db import TestTaskDB
class DB():
def __init__(self, config: DbConfig):
self.__conn = psycopg2.connect(database=config.name,
host=config.host,
user=config.username,
password=config.password,
port=config.port)
def close_conn(self):
self.__conn.close()
def __del__(self):
self.close_conn()
def row_exists(self, pk: int, table: str) -> bool:
assert table in ['builds', 'test_tasks']
sql = f'''
SELECT COUNT(id)
FROM {table}
WHERE id = %s;
'''
cur = self.__conn.cursor()
cur.execute(sql, (pk,))
val = int(cur.fetchone()[0])
return val == 1
def insert_build(self, build: BuildDB):
sql = '''
INSERT INTO builds(id, url, created_at, finished_at)
VALUES (%s, %s, %s, %s);
'''
cur = self.__conn.cursor()
cur.execute(sql, (build.id, build.url,
build.created_at, build.finished_at))
self.__conn.commit()
def insert_buildtask(self, build_task: BuildTaskDB, web_node_stats: List[WebNodeStatDB],
build_node_stats: List[BuildNodeStatDB]):
cur = self.__conn.cursor()
# inserting build_task
sql = '''
INSERT INTO build_tasks(id, name, build_id, arch_id, started_at, finished_at, status_id)
VALUES (%s, %s, %s, %s, %s, %s, %s);
'''
cur.execute(sql, (build_task.id, build_task.name, build_task.build_id, build_task.arch_id,
build_task.started_at, build_task.finished_at, build_task.status_id))
# inserting web node stats
for stat in web_node_stats:
# do not insert empty stats
if stat.start_ts is None:
continue
sql = '''
INSERT INTO web_node_stats (build_task_id, stat_name_id, start_ts, end_ts)
VALUES (%s, %s, %s, %s);
'''
cur.execute(sql, (stat.build_task_id, stat.stat_name_id,
stat.start_ts, stat.end_ts))
logging.debug('raw SQL query: %s', cur.query)
self.__conn.commit()
# inserting build node stats
for stat in build_node_stats:
# do not insert empty stats
if stat.start_ts is None:
continue
sql = '''
INSERT INTO build_node_stats(build_task_id, stat_name_id, start_ts, end_ts)
VALUES (%s, %s, %s, %s);
'''
cur.execute(sql, (stat.build_task_id, stat.stat_name_id,
stat.start_ts, stat.end_ts))
logging.debug('raw SQL query: %s', cur.query)
# commiting changes
self.__conn.commit()
def get_latest_build_id(self) -> Union[int, None]:
sql = "SELECT id from builds ORDER BY id DESC LIMIT 1;"
cur = self.__conn.cursor()
cur.execute(sql)
val = cur.fetchone()
if not val:
return None
return int(val[0])
def cleanup_builds(self, oldest_to_keep: datetime) -> int:
params = (int(oldest_to_keep.timestamp()),)
sql = "DELETE FROM builds WHERE created_at < %s;"
cur = self.__conn.cursor()
cur.execute(sql, params)
self.__conn.commit()
return cur.rowcount
def get_unfinished_builds(self, not_before: datetime) -> Dict[int, Dict[int, int]]:
"""
Getting list of unfinished builds and build_tasks
Dict[build_id, Dict[build_task_id, task_status_id]]
"""
res: Dict[int, Dict[int, int]] = {}
# getting unfinished builds
sql = 'SELECT id FROM builds where finished_at is NULL AND created_at > %s;'
cur = self.__conn.cursor()
cur.execute(sql, (not_before.timestamp(),))
logging.debug('raw SQL query: %s', cur.query)
for row in cur.fetchall():
res[row[0]] = {}
# getting list of unfinished tasks
sql = 'SELECT id, build_id, status_id FROM build_tasks WHERE status_id < 2;'
cur = self.__conn.cursor()
cur.execute(sql)
for row in cur.fetchall():
build_task_id: int = row[0]
build_id: int = row[1]
status_id: int = row[2]
try:
res[build_id][build_task_id] = status_id
except KeyError:
res[build_id] = {build_task_id: status_id}
return res
def update_build(self, build: BuildDB):
sql = '''
UPDATE builds
SET finished_at = %s
WHERE id = %s;
'''
cur = self.__conn.cursor()
cur.execute(sql, (build.finished_at, build.id))
self.__conn.commit()
def update_build_task(self, build_task: BuildTaskDB,
web_node_stats: List[WebNodeStatDB],
build_node_stats: List[BuildNodeStatDB]):
cur = self.__conn.cursor()
sql = '''
UPDATE build_tasks
SET status_id = %s,
started_at = %s,
finished_at = %s
WHERE id = %s;
'''
cur.execute(sql, (build_task.status_id, build_task.started_at,
build_task.finished_at, build_task.id))
logging.debug('raw SQL query: %s', cur.query)
# updating web_node_stats
for stat in web_node_stats:
logging.debug(
'updating web_node_stats %s build_task %s', stat.stat_name_id, build_task.id)
if self.stat_exists(task_id=stat.build_task_id,
stat_name_id=stat.stat_name_id,
table_name='web_node_stats',
column_name='build_task_id'):
sql = '''
UPDATE web_node_stats
SET start_ts = %(start_ts)s, end_ts = %(end_ts)s
WHERE build_task_id = %(build_task_id)s AND stat_name_id = %(stat_name_id)s
'''
else:
sql = '''
INSERT INTO web_node_stats(build_task_id, stat_name_id, start_ts, end_ts)
VALUES (%(build_task_id)s, %(stat_name_id)s, %(start_ts)s, %(end_ts)s);
'''
params = {'build_task_id': build_task.id,
'stat_name_id': stat.stat_name_id,
'start_ts': stat.start_ts,
'end_ts': stat.end_ts}
cur.execute(sql, params)
logging.debug('raw SQL query: %s', cur.query)
# updating build_node_stats
for stat in build_node_stats:
logging.debug(
'updating build_node_stats %s build_task %s', stat.stat_name_id, build_task.id)
if self.stat_exists(task_id=stat.build_task_id,
stat_name_id=stat.stat_name_id,
table_name='build_node_stats',
column_name='build_task_id'):
sql = '''
UPDATE build_node_stats
SET start_ts = %(start_ts)s, end_ts = %(end_ts)s
WHERE build_task_id = %(build_task_id)s AND stat_name_id = %(stat_name_id)s
'''
else:
sql = '''
INSERT INTO build_node_stats(build_task_id, stat_name_id, start_ts, end_ts)
VALUES (%(build_task_id)s, %(stat_name_id)s, %(start_ts)s, %(end_ts)s);
'''
params = {'build_task_id': build_task.id,
'stat_name_id': stat.stat_name_id,
'start_ts': stat.start_ts,
'end_ts': stat.end_ts}
logging.debug('raw SQL query: %s', cur.query)
cur.execute(sql, params)
# commiting changes
self.__conn.commit()
def get_db_schema_version(self) -> Optional[int]:
sql = '''
SELECT *
FROM schema_version
LIMIT 1;
'''
cur = self.__conn.cursor()
cur.execute(sql)
val = cur.fetchone()
if not val:
return None
return int(val[0])
def stat_exists(self, task_id: int, stat_name_id: int, table_name: str, column_name: str) -> bool:
sql = f'''
SELECT COUNT({column_name})
FROM {table_name}
WHERE {column_name} = %s AND stat_name_id = %s;
'''
cur = self.__conn.cursor()
cur.execute(sql, (task_id, stat_name_id))
val = int(cur.fetchone()[0])
return val == 1
def get_build_tasks_for_tests_update(self, not_before: datetime) -> List[int]:
'''
Getting build tasks id for test tasks that we need to update
https://cloudlinux.atlassian.net/browse/ALBS-1060
'''
cur = self.__conn.cursor()
sql = '''
SELECT bt.id
FROM build_tasks AS bt
INNER JOIN builds AS b
ON b.id = bt.build_id
WHERE b.created_at > %s;
'''
cur.execute(sql, (not_before.timestamp(),))
logging.debug('raw SQL query: %s', cur.query)
result = [int(row[0]) for row in cur.fetchall()]
return result
def insert_update_test_tasks(self, test_tasks: List[TestTaskDB]):
cur = self.__conn.cursor()
# test tasks
for task in test_tasks:
if self.row_exists(pk=task.id, table='test_tasks'):
sql = '''
UPDATE test_tasks
SET revision = %s,
status_id = %s,
started_at = %s
WHERE id = %s;
'''
cur.execute(sql, (task.revision, task.status_id,
task.started_at, task.id))
assert cur.rowcount == 1
else:
sql = '''
INSERT INTO test_tasks(
id, build_task_id, revision, status_id, package_fullname, started_at)
VALUES
(%s, %s, %s, %s, %s, %s);
'''
cur.execute(sql, (task.id, task.build_task_id, task.revision, task.status_id,
task.package_fullname, task.started_at))
# test step
if not task.steps_stats:
continue
for s in task.steps_stats:
logging.debug('test_task_id %s, stat_name_id %s',
s.test_task_id, s.stat_name_id)
if self.stat_exists(s.test_task_id,
s.stat_name_id,
'test_steps_stats',
'test_task_id'):
sql = '''
UPDATE test_steps_stats
SET start_ts = %s,
finish_ts = %s
WHERE test_task_id = %s AND stat_name_id = %s;
'''
cur.execute(sql, (s.start_ts, s.finish_ts,
s.test_task_id, s.stat_name_id))
assert cur.rowcount == 1
else:
sql = '''
INSERT INTO test_steps_stats (
test_task_id, stat_name_id, start_ts, finish_ts)
VALUES (%s, %s, %s, %s);
'''
cur.execute(sql, (s.test_task_id, s.stat_name_id,
s.start_ts, s.finish_ts))
# commiting changes
self.__conn.commit()