From 7543878abf6641fb49a576b34323a3f33f1094f6 Mon Sep 17 00:00:00 2001 From: Kirill Zhukov Date: Tue, 28 Feb 2023 18:28:48 +0100 Subject: [PATCH] Added update builds functionality added firts grafana dashboard --- build_analitycs/build_analytics/api_client.py | 14 +- build_analitycs/build_analytics/db.py | 127 +++++--- .../build_analytics/extractor/extractor.py | 62 +++- .../build_analytics/extractor/start.py | 57 ++-- .../build_analytics/models/build.py | 3 +- .../build_analytics/models/build_db.py | 4 +- .../build_analytics/models/build_task.py | 2 +- .../models/extractor_config.py | 9 +- build_analitycs/db_schema/postgres.sql | 8 +- grafana-dashbords/albs_analytics.json | 280 ++++++++++++++++++ 10 files changed, 490 insertions(+), 76 deletions(-) create mode 100644 grafana-dashbords/albs_analytics.json diff --git a/build_analitycs/build_analytics/api_client.py b/build_analitycs/build_analytics/api_client.py index 1f59c18..a8cd460 100644 --- a/build_analitycs/build_analytics/api_client.py +++ b/build_analitycs/build_analytics/api_client.py @@ -19,9 +19,10 @@ class APIclient(): client for working with ALBS API """ - def __init__(self, api_root: str, jwt: str): + def __init__(self, api_root: str, jwt: str, timeout: int): self.api_root = api_root self.jwt = jwt + self.timeout = timeout def get_builds(self, page_num: int = 1) -> List[Build]: ep = '/api/v1/builds' @@ -29,7 +30,8 @@ class APIclient(): params = {'pageNumber': page_num} headers = {'accept': 'appilication/json'} - response = requests.get(url, params=params, headers=headers) + response = requests.get( + url, params=params, headers=headers, timeout=self.timeout) response.raise_for_status() result = [] @@ -41,6 +43,14 @@ class APIclient(): b, err, exc_info=True) return result + def get_build(self, build_id: int) -> Build: + ep = f'/api/v1/builds/{build_id}' + url = urljoin(self.api_root, ep) + headers = {'accept': 'application/json'} + response = requests.get(url, headers=headers, timeout=self.timeout) + response.raise_for_status() + return self._parse_build(response.json()) + def _parse_build_tasks(self, tasks_json: Dict, build_id: int) -> List[BuildTask]: result = [] for task in tasks_json: diff --git a/build_analitycs/build_analytics/db.py b/build_analitycs/build_analytics/db.py index d5f7055..cb5632c 100644 --- a/build_analitycs/build_analytics/db.py +++ b/build_analitycs/build_analytics/db.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import Union +from typing import Union, Dict import psycopg2 @@ -10,48 +10,45 @@ from .models.db_config import DbConfig class DB(): def __init__(self, config: DbConfig): - self.conf = config + self.__conn = psycopg2.connect(database=config.name, + host=config.host, + user=config.username, + password=config.password, + port=config.port) - def _get_conn(self): - conn = psycopg2.connect(database=self.conf.name, - host=self.conf.host, - user=self.conf.username, - password=self.conf.password, - port=self.conf.port) - return conn + def close_conn(self): + self.__conn.close() - def insert_update_build(self, build: BuildDB): - sql = f''' + def __del__(self): + self.close_conn() + + def insert_build(self, build: BuildDB): + sql = ''' INSERT INTO builds(id, url, created_at, finished_at) - VALUES (%s, %s, %s, %s) - ON CONFLICT (id) DO UPDATE SET - (url, created_at, finished_at) = (EXCLUDED.url, EXCLUDED.created_at, EXCLUDED.finished_at); - ''' - with self._get_conn() as conn: - cur = conn.cursor() - cur.execute(sql, (build.id, build.url, - build.created_at, build.finished_at)) - conn.commit() + VALUES (%s, %s, %s, %s); + ''' - def insert_update_buildtask(self, build_task: BuildTaskDB): - sql = f""" + cur = self.__conn.cursor() + cur.execute(sql, (build.id, build.url, + build.created_at, build.finished_at)) + self.__conn.commit() + + def insert_buildtask(self, build_task: BuildTaskDB): + sql = ''' INSERT INTO build_tasks(id, build_id, arch_id, started_at, finished_at, status_id) - VALUES (%s, %s, %s, %s, %s, %s) - ON CONFLICT (id) DO UPDATE SET - (id, build_id, arch_id, started_at, finished_at, status_id) = (EXCLUDED.ID, EXCLUDED.build_id, EXCLUDED.arch_id, EXCLUDED.started_at, EXCLUDED.finished_at, EXCLUDED.status_id) - """ - with self._get_conn() as conn: - cur = conn.cursor() - cur.execute(sql, (build_task.id, build_task.build_id, build_task.arch_id, - build_task.started_at, build_task.finished_at, build_task.status_id)) - conn.commit() + VALUES (%s, %s, %s, %s, %s, %s); + ''' + + cur = self.__conn.cursor() + cur.execute(sql, (build_task.id, build_task.build_id, build_task.arch_id, + build_task.started_at, build_task.finished_at, build_task.status_id)) + self.__conn.commit() def get_latest_build_id(self) -> Union[int, None]: sql = "SELECT id from builds ORDER BY id DESC LIMIT 1;" - with self._get_conn() as conn: - cur = conn.cursor() - cur.execute(sql) - val = cur.fetchone() + cur = self.__conn.cursor() + cur.execute(sql) + val = cur.fetchone() if not val: return None return int(val[0]) @@ -59,8 +56,60 @@ class DB(): def cleanup_builds(self, oldest_to_keep: datetime) -> int: params = (int(oldest_to_keep.timestamp()),) sql = "DELETE FROM builds WHERE created_at < %s;" - with self._get_conn() as conn: - cur = conn.cursor() - cur.execute(sql, params) - conn.commit() + cur = self.__conn.cursor() + cur.execute(sql, params) + self.__conn.commit() return cur.rowcount + + def get_unfinished_builds(self) -> Dict[int, Dict[int, int]]: + """ + Getting list of unfinished builds and build_tasks + Dict[build_id, Dict[build_task_id, task_status_id]] + """ + res: Dict[int, Dict[int, int]] = {} + + # getting unfinished builds + sql = 'SELECT id FROM builds where finished_at is NULL;' + cur = self.__conn.cursor() + cur.execute(sql) + for row in cur.fetchall(): + res[row[0]] = {} + + # getting list of unfinished tasks + sql = 'SELECT id, build_id, status_id FROM build_tasks WHERE status_id < 2;' + cur = self.__conn.cursor() + cur.execute(sql) + for row in cur.fetchall(): + build_task_id: int = row[0] + build_id: int = row[1] + status_id: int = row[2] + try: + res[build_id][build_task_id] = status_id + except KeyError: + res[build_id] = {build_task_id: status_id} + + return res + + def update_build(self, build: BuildDB): + sql = ''' + UPDATE builds + SET finished_at = %s + WHERE id = %s; + ''' + + cur = self.__conn.cursor() + cur.execute(sql, (build.finished_at, build.id)) + self.__conn.commit() + + def update_build_task(self, build: BuildTaskDB): + sql = ''' + UPDATE build_tasks + SET status_id = %s, + started_at = %s, + finished_at = %s + WHERE id = %s; + ''' + cur = self.__conn.cursor() + cur.execute(sql, (build.status_id, build.started_at, + build.finished_at, build.id)) + self.__conn.commit() diff --git a/build_analitycs/build_analytics/extractor/extractor.py b/build_analitycs/build_analytics/extractor/extractor.py index efa7d57..82b4b9b 100644 --- a/build_analitycs/build_analytics/extractor/extractor.py +++ b/build_analitycs/build_analytics/extractor/extractor.py @@ -1,8 +1,13 @@ +# pylint: disable=relative-beyond-top-level + import logging +from typing import List, Dict from ..models.extractor_config import ExtractorConfig -from ..api_client import APIclient +from ..models.enums import BuildTaskEnum +from ..models.build import BuildTask from ..db import DB +from ..api_client import APIclient class Extractor: @@ -17,11 +22,11 @@ class Extractor: last_build_id = self.db.get_latest_build_id() if not last_build_id: last_build_id = 0 - logging.info(f"last_build_id: {last_build_id}") + logging.info("last_build_id: %s", last_build_id) stop = False while not stop: - logging.info(f"page: {page_num}") + logging.info("page: %s", page_num) for build in self.api.get_builds(page_num): # check if we shoud stop processing build if build.id <= last_build_id or \ @@ -30,10 +35,10 @@ class Extractor: break # inserting build and build tasks - logging.info(f"inserting {build.id}") - self.db.insert_update_build(build.as_db_model()) + logging.info("inserting %s", build.id) + self.db.insert_build(build.as_db_model()) for build_task in build.build_tasks: - self.db.insert_update_buildtask(build_task.as_db_model()) + self.db.insert_buildtask(build_task.as_db_model()) build_count += 1 page_num += 1 return build_count @@ -43,3 +48,48 @@ class Extractor: self.oldest_build_age.strftime("%m/%d/%Y, %H:%M:%S")) removed_count = self.db.cleanup_builds(self.oldest_build_age) logging.info('removed %d entries', removed_count) + + def __update_build_tasks_statuses(self, build_tasks: List[BuildTask], + build_tasks_status_db: Dict[int, int]): + for b in build_tasks: + if b.status_id != build_tasks_status_db[b.id]: + logging.info('build taks %d status have changed %s -> %s. Updating DB', + b.id, BuildTaskEnum( + build_tasks_status_db[b.id]).name, + BuildTaskEnum(b.status_id).name) + try: + self.db.update_build_task(b.as_db_model()) + except Exception as err: # pylint: disable=broad-except + logging.error( + 'failed to update build task %d: %s', + b.id, err, exc_info=True) + else: + logging.info('build task %d was updated', b.id) + else: + logging.info( + "build_task %d is still %s. Skipping", b.id, BuildTaskEnum(b.status_id).name) + + def update_builds(self): + logging.info('Getting list of tasks from DB') + unfinished_tasks = self.db.get_unfinished_builds() + for build_id, build_tasks_db in unfinished_tasks.items(): + try: + logging.info('Getting status of build %d', build_id) + build = self.api.get_build(build_id) + + logging.info('Updating build tasks') + build_tasks_to_check = [ + b for b in build.build_tasks if b.id in build_tasks_db] + self.__update_build_tasks_statuses( + build_tasks_to_check, build_tasks_db) + + if build.finished_at: + logging.info( + "build is finished, we need to update finished_at attribute") + self.db.update_build(build.as_db_model()) + + logging.info('finished proccessing build %d', build_id) + + except Exception as err: # pylint: disable=broad-except + logging.error("Cant process build %d: %s", + build_id, err, exc_info=True) diff --git a/build_analitycs/build_analytics/extractor/start.py b/build_analitycs/build_analytics/extractor/start.py index 6319470..f566c30 100644 --- a/build_analitycs/build_analytics/extractor/start.py +++ b/build_analitycs/build_analytics/extractor/start.py @@ -1,9 +1,11 @@ from datetime import datetime, timedelta import logging from logging.handlers import RotatingFileHandler +import time import yaml +# pylint: disable=relative-beyond-top-level from ..api_client import APIclient from ..db import DB from .extractor import Extractor @@ -49,23 +51,40 @@ def start(yml_path: str): maxBytes=10000000, backupCount=3)]) - api = APIclient(api_root=config.albs_url, jwt=config.jwt) - db = DB(config.db_config) - extractor = Extractor(config, api, db) - logging.info('Starting builds insertion') - inserted_count = -1 - try: - inserted_count = extractor.extract_and_store() - except Exception as err: # pylint: disable=broad-except - logging.critical("Unhandled exeption %s", err, exc_info=True) - else: - logging.info( - 'Build extaction was finished. %d builds were inserted', inserted_count) + while True: + logging.info('Starting extraction proccess') + api = APIclient(api_root=config.albs_url, + jwt=config.jwt, timeout=config.api_timeout) + db = DB(config.db_config) + extractor = Extractor(config, api, db) - logging.info('Starting old builds removal') - try: - extractor.build_cleanup() - except Exception as err: - logging.critical("Unhandled exeption %s", err, exc_info=True) - else: - logging.info('Cleanup finished') + logging.info('Starting builds insertion') + inserted_count = -1 + try: + inserted_count = extractor.extract_and_store() + except Exception as err: # pylint: disable=broad-except + logging.critical("Unhandled exception %s", err, exc_info=True) + else: + logging.info( + 'Build extaction was finished. %d builds were inserted', inserted_count) + + logging.info('Starting old builds removal') + try: + extractor.build_cleanup() + except Exception as err: # pylint: disable=broad-except + logging.critical("Unhandled exception %s", err, exc_info=True) + else: + logging.info('Cleanup finished') + + logging.info('Updating statuses of unfinished build tasks') + try: + extractor.update_builds() + except Exception as err: # pylint: disable=broad-except + logging.critical("Unhandled exception %s", err, exc_info=True) + else: + logging.info('Update finished') + + extractor.db.close_conn() + logging.info("Extraction was finished") + logging.info("Sleeping for %d seconds", config.scrape_inteval) + time.sleep(config.scrape_inteval) diff --git a/build_analitycs/build_analytics/models/build.py b/build_analitycs/build_analytics/models/build.py index 878d113..3641ae4 100644 --- a/build_analitycs/build_analytics/models/build.py +++ b/build_analitycs/build_analytics/models/build.py @@ -1,7 +1,6 @@ from datetime import datetime from typing import List, Optional -from pydantic import BaseModel, HttpUrl -from typing import Any +from pydantic import BaseModel, HttpUrl # pylint: disable=no-name-in-module from .build_task import BuildTask from .build_db import BuildDB diff --git a/build_analitycs/build_analytics/models/build_db.py b/build_analitycs/build_analytics/models/build_db.py index 3bd7489..03c18a2 100644 --- a/build_analitycs/build_analytics/models/build_db.py +++ b/build_analitycs/build_analytics/models/build_db.py @@ -1,5 +1,5 @@ -from typing import Optional, Dict, Any -from pydantic import BaseModel, HttpUrl +from typing import Optional +from pydantic import BaseModel, HttpUrl # pylint: disable=no-name-in-module class BuildDB(BaseModel): diff --git a/build_analitycs/build_analytics/models/build_task.py b/build_analitycs/build_analytics/models/build_task.py index 303e629..410e3b5 100644 --- a/build_analitycs/build_analytics/models/build_task.py +++ b/build_analitycs/build_analytics/models/build_task.py @@ -1,7 +1,7 @@ from datetime import datetime from typing import Optional -from pydantic import BaseModel +from pydantic import BaseModel # pylint: disable=no-name-in-module from .build_task_db import BuildTaskDB from .enums import ArchEnum diff --git a/build_analitycs/build_analytics/models/extractor_config.py b/build_analitycs/build_analytics/models/extractor_config.py index 865f00e..af314b3 100644 --- a/build_analitycs/build_analytics/models/extractor_config.py +++ b/build_analitycs/build_analytics/models/extractor_config.py @@ -1,13 +1,15 @@ from datetime import datetime from pathlib import Path -from pydantic import HttpUrl, Field, BaseModel +from pydantic import HttpUrl, Field, BaseModel # pylint: disable=no-name-in-module from .db_config import DbConfig # DEFAULTS ALBS_URL_DEFAULT = 'https://build.almalinux.org' LOG_FILE_DEFAULT = '/tmp/extractor.log' +API_DEFAULT = 30 +SCRAPE_INTERVAL_DEFAULT = 3600 class ExtractorConfig(BaseModel): @@ -22,3 +24,8 @@ class ExtractorConfig(BaseModel): Field(description='oldest build age to extract and store') jwt: str = Field(description='ALBS JWT token') db_config: DbConfig = Field(description="database configuration") + api_timeout: int = Field( + description="max time in seconds to wait for API response", + default=API_DEFAULT) + scrape_inteval: int = Field(description='how often (in seconds) we will extract data from ALBS', + default=SCRAPE_INTERVAL_DEFAULT) diff --git a/build_analitycs/db_schema/postgres.sql b/build_analitycs/db_schema/postgres.sql index cb44990..b332a40 100644 --- a/build_analitycs/db_schema/postgres.sql +++ b/build_analitycs/db_schema/postgres.sql @@ -16,13 +16,13 @@ ON builds(finished_at); -- build_taks_enum -DROP TABLE IF EXISTS build_task_enum CASCADE; -CREATE TABLE IF NOT EXISTS build_task_enum( +DROP TABLE IF EXISTS build_task_status_enum CASCADE; +CREATE TABLE IF NOT EXISTS build_task_status_enum( id INTEGER PRIMARY KEY, value VARCHAR(15) ); -INSERT INTO build_task_enum (id, value) +INSERT INTO build_task_status_enum (id, value) VALUES (0, 'idle'), (1, 'started'), @@ -53,7 +53,7 @@ CREATE TABLE build_tasks ( id INTEGER PRIMARY KEY, build_id INTEGER REFERENCES builds(id) ON DELETE CASCADE, arch_id INTEGER REFERENCES arch_enum(id) ON DELETE SET NULL, - status_id INTEGER REFERENCES build_task_enum(id) ON DELETE SET NULL, + status_id INTEGER REFERENCES build_task_status_enum(id) ON DELETE SET NULL, started_at REAL, finished_at REAL ); diff --git a/grafana-dashbords/albs_analytics.json b/grafana-dashbords/albs_analytics.json new file mode 100644 index 0000000..6aa83f8 --- /dev/null +++ b/grafana-dashbords/albs_analytics.json @@ -0,0 +1,280 @@ +{ + "__inputs": [ + { + "name": "DS_POSTGRESQL", + "label": "PostgreSQL", + "description": "", + "type": "datasource", + "pluginId": "postgres", + "pluginName": "PostgreSQL" + } + ], + "__elements": {}, + "__requires": [ + { + "type": "grafana", + "id": "grafana", + "name": "Grafana", + "version": "9.3.2" + }, + { + "type": "datasource", + "id": "postgres", + "name": "PostgreSQL", + "version": "1.0.0" + }, + { + "type": "panel", + "id": "table", + "name": "Table", + "version": "" + } + ], + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "grafana", + "uid": "-- Grafana --" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "target": { + "limit": 100, + "matchAny": false, + "tags": [], + "type": "dashboard" + }, + "type": "dashboard" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": null, + "links": [], + "liveNow": false, + "panels": [ + { + "datasource": { + "type": "postgres", + "uid": "${DS_POSTGRESQL}" + }, + "description": "", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "custom": { + "align": "auto", + "displayMode": "auto", + "inspect": false + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [ + { + "matcher": { + "id": "byName", + "options": "id" + }, + "properties": [ + { + "id": "custom.width", + "value": 54 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "created_at" + }, + "properties": [ + { + "id": "custom.width", + "value": 226 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "finished_at" + }, + "properties": [ + { + "id": "custom.width", + "value": 209 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "finished" + }, + "properties": [ + { + "id": "custom.width", + "value": 187 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "created" + }, + "properties": [ + { + "id": "custom.width", + "value": 213 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "url" + }, + "properties": [ + { + "id": "custom.width", + "value": 279 + } + ] + } + ] + }, + "gridPos": { + "h": 12, + "w": 24, + "x": 0, + "y": 0 + }, + "id": 2, + "options": { + "footer": { + "fields": "", + "reducer": [ + "sum" + ], + "show": false + }, + "showHeader": true, + "sortBy": [ + { + "desc": true, + "displayName": "duration (h)" + } + ] + }, + "pluginVersion": "9.3.2", + "targets": [ + { + "cacheDurationSeconds": 300, + "datasource": { + "type": "postgres", + "uid": "${DS_POSTGRESQL}" + }, + "editorMode": "code", + "fields": [ + { + "jsonPath": "" + } + ], + "format": "table", + "hide": false, + "method": "GET", + "queryParams": "", + "rawQuery": true, + "rawSql": "SELECT id, url, created_at * 1000 as created, finished_at * 1000 as finished, (finished_at - created_at) / (60*60) as duration\nFROM builds\nWHERE $__unixEpochFilter(created_at) AND finished_at IS NOT NULL", + "refId": "A", + "sql": { + "columns": [ + { + "parameters": [], + "type": "function" + } + ], + "groupBy": [ + { + "property": { + "type": "string" + }, + "type": "groupBy" + } + ], + "limit": 50 + }, + "urlPath": "" + } + ], + "title": "Finished builds", + "transformations": [ + { + "id": "convertFieldType", + "options": { + "conversions": [ + { + "destinationType": "time", + "targetField": "created" + }, + { + "destinationType": "time", + "targetField": "finished" + } + ], + "fields": {} + } + }, + { + "id": "organize", + "options": { + "excludeByName": {}, + "indexByName": {}, + "renameByName": { + "duration": "duration (h)" + } + } + } + ], + "type": "table" + } + ], + "schemaVersion": 37, + "style": "dark", + "tags": [], + "templating": { + "list": [] + }, + "time": { + "from": "now-3h", + "to": "now" + }, + "timepicker": {}, + "timezone": "", + "title": "albs_analytics", + "uid": "02mg4oxVk", + "version": 1, + "weekStart": "" +} \ No newline at end of file