Added update builds functionality

added firts grafana dashboard
This commit is contained in:
Kirill Zhukov 2023-02-28 18:28:48 +01:00
parent 913a998265
commit 7543878abf
10 changed files with 490 additions and 76 deletions

View File

@ -19,9 +19,10 @@ class APIclient():
client for working with ALBS API client for working with ALBS API
""" """
def __init__(self, api_root: str, jwt: str): def __init__(self, api_root: str, jwt: str, timeout: int):
self.api_root = api_root self.api_root = api_root
self.jwt = jwt self.jwt = jwt
self.timeout = timeout
def get_builds(self, page_num: int = 1) -> List[Build]: def get_builds(self, page_num: int = 1) -> List[Build]:
ep = '/api/v1/builds' ep = '/api/v1/builds'
@ -29,7 +30,8 @@ class APIclient():
params = {'pageNumber': page_num} params = {'pageNumber': page_num}
headers = {'accept': 'appilication/json'} headers = {'accept': 'appilication/json'}
response = requests.get(url, params=params, headers=headers) response = requests.get(
url, params=params, headers=headers, timeout=self.timeout)
response.raise_for_status() response.raise_for_status()
result = [] result = []
@ -41,6 +43,14 @@ class APIclient():
b, err, exc_info=True) b, err, exc_info=True)
return result return result
def get_build(self, build_id: int) -> Build:
ep = f'/api/v1/builds/{build_id}'
url = urljoin(self.api_root, ep)
headers = {'accept': 'application/json'}
response = requests.get(url, headers=headers, timeout=self.timeout)
response.raise_for_status()
return self._parse_build(response.json())
def _parse_build_tasks(self, tasks_json: Dict, build_id: int) -> List[BuildTask]: def _parse_build_tasks(self, tasks_json: Dict, build_id: int) -> List[BuildTask]:
result = [] result = []
for task in tasks_json: for task in tasks_json:

View File

@ -1,5 +1,5 @@
from datetime import datetime from datetime import datetime
from typing import Union from typing import Union, Dict
import psycopg2 import psycopg2
@ -10,48 +10,45 @@ from .models.db_config import DbConfig
class DB(): class DB():
def __init__(self, config: DbConfig): def __init__(self, config: DbConfig):
self.conf = config self.__conn = psycopg2.connect(database=config.name,
host=config.host,
user=config.username,
password=config.password,
port=config.port)
def _get_conn(self): def close_conn(self):
conn = psycopg2.connect(database=self.conf.name, self.__conn.close()
host=self.conf.host,
user=self.conf.username,
password=self.conf.password,
port=self.conf.port)
return conn
def insert_update_build(self, build: BuildDB): def __del__(self):
sql = f''' self.close_conn()
def insert_build(self, build: BuildDB):
sql = '''
INSERT INTO builds(id, url, created_at, finished_at) INSERT INTO builds(id, url, created_at, finished_at)
VALUES (%s, %s, %s, %s) VALUES (%s, %s, %s, %s);
ON CONFLICT (id) DO UPDATE SET '''
(url, created_at, finished_at) = (EXCLUDED.url, EXCLUDED.created_at, EXCLUDED.finished_at);
'''
with self._get_conn() as conn:
cur = conn.cursor()
cur.execute(sql, (build.id, build.url,
build.created_at, build.finished_at))
conn.commit()
def insert_update_buildtask(self, build_task: BuildTaskDB): cur = self.__conn.cursor()
sql = f""" cur.execute(sql, (build.id, build.url,
build.created_at, build.finished_at))
self.__conn.commit()
def insert_buildtask(self, build_task: BuildTaskDB):
sql = '''
INSERT INTO build_tasks(id, build_id, arch_id, started_at, finished_at, status_id) INSERT INTO build_tasks(id, build_id, arch_id, started_at, finished_at, status_id)
VALUES (%s, %s, %s, %s, %s, %s) VALUES (%s, %s, %s, %s, %s, %s);
ON CONFLICT (id) DO UPDATE SET '''
(id, build_id, arch_id, started_at, finished_at, status_id) = (EXCLUDED.ID, EXCLUDED.build_id, EXCLUDED.arch_id, EXCLUDED.started_at, EXCLUDED.finished_at, EXCLUDED.status_id)
""" cur = self.__conn.cursor()
with self._get_conn() as conn: cur.execute(sql, (build_task.id, build_task.build_id, build_task.arch_id,
cur = conn.cursor() build_task.started_at, build_task.finished_at, build_task.status_id))
cur.execute(sql, (build_task.id, build_task.build_id, build_task.arch_id, self.__conn.commit()
build_task.started_at, build_task.finished_at, build_task.status_id))
conn.commit()
def get_latest_build_id(self) -> Union[int, None]: def get_latest_build_id(self) -> Union[int, None]:
sql = "SELECT id from builds ORDER BY id DESC LIMIT 1;" sql = "SELECT id from builds ORDER BY id DESC LIMIT 1;"
with self._get_conn() as conn: cur = self.__conn.cursor()
cur = conn.cursor() cur.execute(sql)
cur.execute(sql) val = cur.fetchone()
val = cur.fetchone()
if not val: if not val:
return None return None
return int(val[0]) return int(val[0])
@ -59,8 +56,60 @@ class DB():
def cleanup_builds(self, oldest_to_keep: datetime) -> int: def cleanup_builds(self, oldest_to_keep: datetime) -> int:
params = (int(oldest_to_keep.timestamp()),) params = (int(oldest_to_keep.timestamp()),)
sql = "DELETE FROM builds WHERE created_at < %s;" sql = "DELETE FROM builds WHERE created_at < %s;"
with self._get_conn() as conn: cur = self.__conn.cursor()
cur = conn.cursor() cur.execute(sql, params)
cur.execute(sql, params) self.__conn.commit()
conn.commit()
return cur.rowcount return cur.rowcount
def get_unfinished_builds(self) -> Dict[int, Dict[int, int]]:
"""
Getting list of unfinished builds and build_tasks
Dict[build_id, Dict[build_task_id, task_status_id]]
"""
res: Dict[int, Dict[int, int]] = {}
# getting unfinished builds
sql = 'SELECT id FROM builds where finished_at is NULL;'
cur = self.__conn.cursor()
cur.execute(sql)
for row in cur.fetchall():
res[row[0]] = {}
# getting list of unfinished tasks
sql = 'SELECT id, build_id, status_id FROM build_tasks WHERE status_id < 2;'
cur = self.__conn.cursor()
cur.execute(sql)
for row in cur.fetchall():
build_task_id: int = row[0]
build_id: int = row[1]
status_id: int = row[2]
try:
res[build_id][build_task_id] = status_id
except KeyError:
res[build_id] = {build_task_id: status_id}
return res
def update_build(self, build: BuildDB):
sql = '''
UPDATE builds
SET finished_at = %s
WHERE id = %s;
'''
cur = self.__conn.cursor()
cur.execute(sql, (build.finished_at, build.id))
self.__conn.commit()
def update_build_task(self, build: BuildTaskDB):
sql = '''
UPDATE build_tasks
SET status_id = %s,
started_at = %s,
finished_at = %s
WHERE id = %s;
'''
cur = self.__conn.cursor()
cur.execute(sql, (build.status_id, build.started_at,
build.finished_at, build.id))
self.__conn.commit()

View File

@ -1,8 +1,13 @@
# pylint: disable=relative-beyond-top-level
import logging import logging
from typing import List, Dict
from ..models.extractor_config import ExtractorConfig from ..models.extractor_config import ExtractorConfig
from ..api_client import APIclient from ..models.enums import BuildTaskEnum
from ..models.build import BuildTask
from ..db import DB from ..db import DB
from ..api_client import APIclient
class Extractor: class Extractor:
@ -17,11 +22,11 @@ class Extractor:
last_build_id = self.db.get_latest_build_id() last_build_id = self.db.get_latest_build_id()
if not last_build_id: if not last_build_id:
last_build_id = 0 last_build_id = 0
logging.info(f"last_build_id: {last_build_id}") logging.info("last_build_id: %s", last_build_id)
stop = False stop = False
while not stop: while not stop:
logging.info(f"page: {page_num}") logging.info("page: %s", page_num)
for build in self.api.get_builds(page_num): for build in self.api.get_builds(page_num):
# check if we shoud stop processing build # check if we shoud stop processing build
if build.id <= last_build_id or \ if build.id <= last_build_id or \
@ -30,10 +35,10 @@ class Extractor:
break break
# inserting build and build tasks # inserting build and build tasks
logging.info(f"inserting {build.id}") logging.info("inserting %s", build.id)
self.db.insert_update_build(build.as_db_model()) self.db.insert_build(build.as_db_model())
for build_task in build.build_tasks: for build_task in build.build_tasks:
self.db.insert_update_buildtask(build_task.as_db_model()) self.db.insert_buildtask(build_task.as_db_model())
build_count += 1 build_count += 1
page_num += 1 page_num += 1
return build_count return build_count
@ -43,3 +48,48 @@ class Extractor:
self.oldest_build_age.strftime("%m/%d/%Y, %H:%M:%S")) self.oldest_build_age.strftime("%m/%d/%Y, %H:%M:%S"))
removed_count = self.db.cleanup_builds(self.oldest_build_age) removed_count = self.db.cleanup_builds(self.oldest_build_age)
logging.info('removed %d entries', removed_count) logging.info('removed %d entries', removed_count)
def __update_build_tasks_statuses(self, build_tasks: List[BuildTask],
build_tasks_status_db: Dict[int, int]):
for b in build_tasks:
if b.status_id != build_tasks_status_db[b.id]:
logging.info('build taks %d status have changed %s -> %s. Updating DB',
b.id, BuildTaskEnum(
build_tasks_status_db[b.id]).name,
BuildTaskEnum(b.status_id).name)
try:
self.db.update_build_task(b.as_db_model())
except Exception as err: # pylint: disable=broad-except
logging.error(
'failed to update build task %d: %s',
b.id, err, exc_info=True)
else:
logging.info('build task %d was updated', b.id)
else:
logging.info(
"build_task %d is still %s. Skipping", b.id, BuildTaskEnum(b.status_id).name)
def update_builds(self):
logging.info('Getting list of tasks from DB')
unfinished_tasks = self.db.get_unfinished_builds()
for build_id, build_tasks_db in unfinished_tasks.items():
try:
logging.info('Getting status of build %d', build_id)
build = self.api.get_build(build_id)
logging.info('Updating build tasks')
build_tasks_to_check = [
b for b in build.build_tasks if b.id in build_tasks_db]
self.__update_build_tasks_statuses(
build_tasks_to_check, build_tasks_db)
if build.finished_at:
logging.info(
"build is finished, we need to update finished_at attribute")
self.db.update_build(build.as_db_model())
logging.info('finished proccessing build %d', build_id)
except Exception as err: # pylint: disable=broad-except
logging.error("Cant process build %d: %s",
build_id, err, exc_info=True)

View File

@ -1,9 +1,11 @@
from datetime import datetime, timedelta from datetime import datetime, timedelta
import logging import logging
from logging.handlers import RotatingFileHandler from logging.handlers import RotatingFileHandler
import time
import yaml import yaml
# pylint: disable=relative-beyond-top-level
from ..api_client import APIclient from ..api_client import APIclient
from ..db import DB from ..db import DB
from .extractor import Extractor from .extractor import Extractor
@ -49,23 +51,40 @@ def start(yml_path: str):
maxBytes=10000000, maxBytes=10000000,
backupCount=3)]) backupCount=3)])
api = APIclient(api_root=config.albs_url, jwt=config.jwt) while True:
db = DB(config.db_config) logging.info('Starting extraction proccess')
extractor = Extractor(config, api, db) api = APIclient(api_root=config.albs_url,
logging.info('Starting builds insertion') jwt=config.jwt, timeout=config.api_timeout)
inserted_count = -1 db = DB(config.db_config)
try: extractor = Extractor(config, api, db)
inserted_count = extractor.extract_and_store()
except Exception as err: # pylint: disable=broad-except
logging.critical("Unhandled exeption %s", err, exc_info=True)
else:
logging.info(
'Build extaction was finished. %d builds were inserted', inserted_count)
logging.info('Starting old builds removal') logging.info('Starting builds insertion')
try: inserted_count = -1
extractor.build_cleanup() try:
except Exception as err: inserted_count = extractor.extract_and_store()
logging.critical("Unhandled exeption %s", err, exc_info=True) except Exception as err: # pylint: disable=broad-except
else: logging.critical("Unhandled exception %s", err, exc_info=True)
logging.info('Cleanup finished') else:
logging.info(
'Build extaction was finished. %d builds were inserted', inserted_count)
logging.info('Starting old builds removal')
try:
extractor.build_cleanup()
except Exception as err: # pylint: disable=broad-except
logging.critical("Unhandled exception %s", err, exc_info=True)
else:
logging.info('Cleanup finished')
logging.info('Updating statuses of unfinished build tasks')
try:
extractor.update_builds()
except Exception as err: # pylint: disable=broad-except
logging.critical("Unhandled exception %s", err, exc_info=True)
else:
logging.info('Update finished')
extractor.db.close_conn()
logging.info("Extraction was finished")
logging.info("Sleeping for %d seconds", config.scrape_inteval)
time.sleep(config.scrape_inteval)

View File

@ -1,7 +1,6 @@
from datetime import datetime from datetime import datetime
from typing import List, Optional from typing import List, Optional
from pydantic import BaseModel, HttpUrl from pydantic import BaseModel, HttpUrl # pylint: disable=no-name-in-module
from typing import Any
from .build_task import BuildTask from .build_task import BuildTask
from .build_db import BuildDB from .build_db import BuildDB

View File

@ -1,5 +1,5 @@
from typing import Optional, Dict, Any from typing import Optional
from pydantic import BaseModel, HttpUrl from pydantic import BaseModel, HttpUrl # pylint: disable=no-name-in-module
class BuildDB(BaseModel): class BuildDB(BaseModel):

View File

@ -1,7 +1,7 @@
from datetime import datetime from datetime import datetime
from typing import Optional from typing import Optional
from pydantic import BaseModel from pydantic import BaseModel # pylint: disable=no-name-in-module
from .build_task_db import BuildTaskDB from .build_task_db import BuildTaskDB
from .enums import ArchEnum from .enums import ArchEnum

View File

@ -1,13 +1,15 @@
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from pydantic import HttpUrl, Field, BaseModel from pydantic import HttpUrl, Field, BaseModel # pylint: disable=no-name-in-module
from .db_config import DbConfig from .db_config import DbConfig
# DEFAULTS # DEFAULTS
ALBS_URL_DEFAULT = 'https://build.almalinux.org' ALBS_URL_DEFAULT = 'https://build.almalinux.org'
LOG_FILE_DEFAULT = '/tmp/extractor.log' LOG_FILE_DEFAULT = '/tmp/extractor.log'
API_DEFAULT = 30
SCRAPE_INTERVAL_DEFAULT = 3600
class ExtractorConfig(BaseModel): class ExtractorConfig(BaseModel):
@ -22,3 +24,8 @@ class ExtractorConfig(BaseModel):
Field(description='oldest build age to extract and store') Field(description='oldest build age to extract and store')
jwt: str = Field(description='ALBS JWT token') jwt: str = Field(description='ALBS JWT token')
db_config: DbConfig = Field(description="database configuration") db_config: DbConfig = Field(description="database configuration")
api_timeout: int = Field(
description="max time in seconds to wait for API response",
default=API_DEFAULT)
scrape_inteval: int = Field(description='how often (in seconds) we will extract data from ALBS',
default=SCRAPE_INTERVAL_DEFAULT)

View File

@ -16,13 +16,13 @@ ON builds(finished_at);
-- build_taks_enum -- build_taks_enum
DROP TABLE IF EXISTS build_task_enum CASCADE; DROP TABLE IF EXISTS build_task_status_enum CASCADE;
CREATE TABLE IF NOT EXISTS build_task_enum( CREATE TABLE IF NOT EXISTS build_task_status_enum(
id INTEGER PRIMARY KEY, id INTEGER PRIMARY KEY,
value VARCHAR(15) value VARCHAR(15)
); );
INSERT INTO build_task_enum (id, value) INSERT INTO build_task_status_enum (id, value)
VALUES VALUES
(0, 'idle'), (0, 'idle'),
(1, 'started'), (1, 'started'),
@ -53,7 +53,7 @@ CREATE TABLE build_tasks (
id INTEGER PRIMARY KEY, id INTEGER PRIMARY KEY,
build_id INTEGER REFERENCES builds(id) ON DELETE CASCADE, build_id INTEGER REFERENCES builds(id) ON DELETE CASCADE,
arch_id INTEGER REFERENCES arch_enum(id) ON DELETE SET NULL, arch_id INTEGER REFERENCES arch_enum(id) ON DELETE SET NULL,
status_id INTEGER REFERENCES build_task_enum(id) ON DELETE SET NULL, status_id INTEGER REFERENCES build_task_status_enum(id) ON DELETE SET NULL,
started_at REAL, started_at REAL,
finished_at REAL finished_at REAL
); );

View File

@ -0,0 +1,280 @@
{
"__inputs": [
{
"name": "DS_POSTGRESQL",
"label": "PostgreSQL",
"description": "",
"type": "datasource",
"pluginId": "postgres",
"pluginName": "PostgreSQL"
}
],
"__elements": {},
"__requires": [
{
"type": "grafana",
"id": "grafana",
"name": "Grafana",
"version": "9.3.2"
},
{
"type": "datasource",
"id": "postgres",
"name": "PostgreSQL",
"version": "1.0.0"
},
{
"type": "panel",
"id": "table",
"name": "Table",
"version": ""
}
],
"annotations": {
"list": [
{
"builtIn": 1,
"datasource": {
"type": "grafana",
"uid": "-- Grafana --"
},
"enable": true,
"hide": true,
"iconColor": "rgba(0, 211, 255, 1)",
"name": "Annotations & Alerts",
"target": {
"limit": 100,
"matchAny": false,
"tags": [],
"type": "dashboard"
},
"type": "dashboard"
}
]
},
"editable": true,
"fiscalYearStartMonth": 0,
"graphTooltip": 0,
"id": null,
"links": [],
"liveNow": false,
"panels": [
{
"datasource": {
"type": "postgres",
"uid": "${DS_POSTGRESQL}"
},
"description": "",
"fieldConfig": {
"defaults": {
"color": {
"mode": "thresholds"
},
"custom": {
"align": "auto",
"displayMode": "auto",
"inspect": false
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": [
{
"matcher": {
"id": "byName",
"options": "id"
},
"properties": [
{
"id": "custom.width",
"value": 54
}
]
},
{
"matcher": {
"id": "byName",
"options": "created_at"
},
"properties": [
{
"id": "custom.width",
"value": 226
}
]
},
{
"matcher": {
"id": "byName",
"options": "finished_at"
},
"properties": [
{
"id": "custom.width",
"value": 209
}
]
},
{
"matcher": {
"id": "byName",
"options": "finished"
},
"properties": [
{
"id": "custom.width",
"value": 187
}
]
},
{
"matcher": {
"id": "byName",
"options": "created"
},
"properties": [
{
"id": "custom.width",
"value": 213
}
]
},
{
"matcher": {
"id": "byName",
"options": "url"
},
"properties": [
{
"id": "custom.width",
"value": 279
}
]
}
]
},
"gridPos": {
"h": 12,
"w": 24,
"x": 0,
"y": 0
},
"id": 2,
"options": {
"footer": {
"fields": "",
"reducer": [
"sum"
],
"show": false
},
"showHeader": true,
"sortBy": [
{
"desc": true,
"displayName": "duration (h)"
}
]
},
"pluginVersion": "9.3.2",
"targets": [
{
"cacheDurationSeconds": 300,
"datasource": {
"type": "postgres",
"uid": "${DS_POSTGRESQL}"
},
"editorMode": "code",
"fields": [
{
"jsonPath": ""
}
],
"format": "table",
"hide": false,
"method": "GET",
"queryParams": "",
"rawQuery": true,
"rawSql": "SELECT id, url, created_at * 1000 as created, finished_at * 1000 as finished, (finished_at - created_at) / (60*60) as duration\nFROM builds\nWHERE $__unixEpochFilter(created_at) AND finished_at IS NOT NULL",
"refId": "A",
"sql": {
"columns": [
{
"parameters": [],
"type": "function"
}
],
"groupBy": [
{
"property": {
"type": "string"
},
"type": "groupBy"
}
],
"limit": 50
},
"urlPath": ""
}
],
"title": "Finished builds",
"transformations": [
{
"id": "convertFieldType",
"options": {
"conversions": [
{
"destinationType": "time",
"targetField": "created"
},
{
"destinationType": "time",
"targetField": "finished"
}
],
"fields": {}
}
},
{
"id": "organize",
"options": {
"excludeByName": {},
"indexByName": {},
"renameByName": {
"duration": "duration (h)"
}
}
}
],
"type": "table"
}
],
"schemaVersion": 37,
"style": "dark",
"tags": [],
"templating": {
"list": []
},
"time": {
"from": "now-3h",
"to": "now"
},
"timepicker": {},
"timezone": "",
"title": "albs_analytics",
"uid": "02mg4oxVk",
"version": 1,
"weekStart": ""
}