Compare commits
2 Commits
Author | SHA1 | Date | |
---|---|---|---|
7a1d84a076 | |||
4149925e8e |
@ -27,8 +27,6 @@ class APIclient():
|
||||
self.api_root = api_root
|
||||
self.jwt = jwt
|
||||
self.timeout = timeout
|
||||
# will be set at first call of __send_request
|
||||
self.session: Optional[requests.Session] = None
|
||||
|
||||
def get_builds(self, page_num: int = 1) -> List[Build]:
|
||||
ep = '/api/v1/builds'
|
||||
@ -36,7 +34,8 @@ class APIclient():
|
||||
params = {'pageNumber': page_num}
|
||||
headers = {'accept': 'appilication/json'}
|
||||
|
||||
response = self.__send_request(url, 'get', params, headers)
|
||||
response = requests.get(
|
||||
url, params=params, headers=headers, timeout=self.timeout)
|
||||
response.raise_for_status()
|
||||
|
||||
result = []
|
||||
@ -48,18 +47,11 @@ class APIclient():
|
||||
b, err, exc_info=True)
|
||||
return result
|
||||
|
||||
def get_build(self, build_id: int) -> Optional[Build]:
|
||||
'''
|
||||
method returns None if build was deleted from ALBS
|
||||
'''
|
||||
def get_build(self, build_id: int) -> Build:
|
||||
ep = f'/api/v1/builds/{build_id}'
|
||||
url = urljoin(self.api_root, ep)
|
||||
headers = {'accept': 'application/json'}
|
||||
response = self.__send_request(url, 'get', headers=headers)
|
||||
|
||||
if response.status_code == 404:
|
||||
return None
|
||||
|
||||
response = requests.get(url, headers=headers, timeout=self.timeout)
|
||||
response.raise_for_status()
|
||||
return self._parse_build(response.json())
|
||||
|
||||
@ -229,31 +221,3 @@ class APIclient():
|
||||
start_ts = stat.start_ts
|
||||
|
||||
return start_ts
|
||||
|
||||
def __send_request(self,
|
||||
url: str,
|
||||
method: str,
|
||||
params: Optional[Dict[str, Any]] = None,
|
||||
headers: Optional[Dict[str, Any]] = None,
|
||||
) -> requests.Response:
|
||||
"""
|
||||
Simple wrapper around requests.get/posts.. methods
|
||||
so we can use same session between API calls
|
||||
"""
|
||||
if not self.session:
|
||||
self.session = requests.Session()
|
||||
|
||||
m = getattr(self.session, method, None)
|
||||
if not m:
|
||||
raise ValueError(f"method {method} is not supported")
|
||||
|
||||
# pylint: disable=not-callable
|
||||
return m(url, params=params, headers=headers, timeout=self.timeout)
|
||||
|
||||
def close_session(self):
|
||||
if self.session:
|
||||
self.session.close()
|
||||
self.session = None
|
||||
|
||||
def __del__(self):
|
||||
self.close_session()
|
||||
|
@ -3,7 +3,7 @@
|
||||
from enum import IntEnum
|
||||
|
||||
# supported schema version
|
||||
DB_SCHEMA_VER = 4
|
||||
DB_SCHEMA_VER = 3
|
||||
|
||||
|
||||
# ENUMS
|
||||
@ -13,8 +13,6 @@ class ArchEnum(IntEnum):
|
||||
aarch64 = 2
|
||||
ppc64le = 3
|
||||
s390x = 4
|
||||
src = 5
|
||||
x86_64_v2 = 6
|
||||
|
||||
|
||||
class BuildTaskEnum(IntEnum):
|
||||
|
@ -62,34 +62,34 @@ class DB():
|
||||
build_task.started_at, build_task.finished_at, build_task.status_id))
|
||||
|
||||
# inserting web node stats
|
||||
for wn_stat in web_node_stats:
|
||||
for stat in web_node_stats:
|
||||
|
||||
# do not insert empty stats
|
||||
if wn_stat.start_ts is None:
|
||||
if stat.start_ts is None:
|
||||
continue
|
||||
|
||||
sql = '''
|
||||
INSERT INTO web_node_stats (build_task_id, stat_name_id, start_ts, end_ts)
|
||||
VALUES (%s, %s, %s, %s);
|
||||
'''
|
||||
cur.execute(sql, (wn_stat.build_task_id, wn_stat.stat_name_id,
|
||||
wn_stat.start_ts, wn_stat.end_ts))
|
||||
cur.execute(sql, (stat.build_task_id, stat.stat_name_id,
|
||||
stat.start_ts, stat.end_ts))
|
||||
logging.debug('raw SQL query: %s', cur.query)
|
||||
self.__conn.commit()
|
||||
|
||||
# inserting build node stats
|
||||
for bn_stat in build_node_stats:
|
||||
for stat in build_node_stats:
|
||||
|
||||
# do not insert empty stats
|
||||
if bn_stat.start_ts is None:
|
||||
if stat.start_ts is None:
|
||||
continue
|
||||
|
||||
sql = '''
|
||||
INSERT INTO build_node_stats(build_task_id, stat_name_id, start_ts, end_ts)
|
||||
VALUES (%s, %s, %s, %s);
|
||||
'''
|
||||
cur.execute(sql, (bn_stat.build_task_id, bn_stat.stat_name_id,
|
||||
bn_stat.start_ts, bn_stat.end_ts))
|
||||
cur.execute(sql, (stat.build_task_id, stat.stat_name_id,
|
||||
stat.start_ts, stat.end_ts))
|
||||
logging.debug('raw SQL query: %s', cur.query)
|
||||
|
||||
# commiting changes
|
||||
@ -121,12 +121,11 @@ class DB():
|
||||
|
||||
# getting unfinished builds
|
||||
sql = 'SELECT id FROM builds where finished_at is NULL AND created_at > %s;'
|
||||
builds_to_check: Dict[int, bool] = {}
|
||||
cur = self.__conn.cursor()
|
||||
cur.execute(sql, (not_before.timestamp(),))
|
||||
logging.debug('raw SQL query: %s', cur.query)
|
||||
for row in cur.fetchall():
|
||||
builds_to_check[row[0]] = True
|
||||
res[row[0]] = {}
|
||||
|
||||
# getting list of unfinished tasks
|
||||
sql = 'SELECT id, build_id, status_id FROM build_tasks WHERE status_id < 2;'
|
||||
@ -136,8 +135,6 @@ class DB():
|
||||
build_task_id: int = row[0]
|
||||
build_id: int = row[1]
|
||||
status_id: int = row[2]
|
||||
if build_id not in builds_to_check:
|
||||
continue
|
||||
try:
|
||||
res[build_id][build_task_id] = status_id
|
||||
except KeyError:
|
||||
@ -198,11 +195,11 @@ class DB():
|
||||
logging.debug('raw SQL query: %s', cur.query)
|
||||
|
||||
# updating build_node_stats
|
||||
for bn_stat in build_node_stats:
|
||||
for stat in build_node_stats:
|
||||
logging.debug(
|
||||
'updating build_node_stats %s build_task %s', bn_stat.stat_name_id, build_task.id)
|
||||
if self.stat_exists(task_id=bn_stat.build_task_id,
|
||||
stat_name_id=bn_stat.stat_name_id,
|
||||
'updating build_node_stats %s build_task %s', stat.stat_name_id, build_task.id)
|
||||
if self.stat_exists(task_id=stat.build_task_id,
|
||||
stat_name_id=stat.stat_name_id,
|
||||
table_name='build_node_stats',
|
||||
column_name='build_task_id'):
|
||||
sql = '''
|
||||
@ -216,9 +213,9 @@ class DB():
|
||||
VALUES (%(build_task_id)s, %(stat_name_id)s, %(start_ts)s, %(end_ts)s);
|
||||
'''
|
||||
params = {'build_task_id': build_task.id,
|
||||
'stat_name_id': bn_stat.stat_name_id,
|
||||
'start_ts': bn_stat.start_ts,
|
||||
'end_ts': bn_stat.end_ts}
|
||||
'stat_name_id': stat.stat_name_id,
|
||||
'start_ts': stat.start_ts,
|
||||
'end_ts': stat.end_ts}
|
||||
logging.debug('raw SQL query: %s', cur.query)
|
||||
cur.execute(sql, params)
|
||||
|
||||
@ -321,11 +318,3 @@ class DB():
|
||||
s.start_ts, s.finish_ts))
|
||||
# commiting changes
|
||||
self.__conn.commit()
|
||||
|
||||
def delete_build(self, build_id: int):
|
||||
params = (build_id,)
|
||||
sql = "DELETE FROM builds WHERE id = %s;"
|
||||
cur = self.__conn.cursor()
|
||||
|
||||
cur.execute(sql, params)
|
||||
self.__conn.commit()
|
||||
|
@ -1,10 +1,8 @@
|
||||
# pylint: disable=relative-beyond-top-level
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
import logging
|
||||
from typing import Dict, List
|
||||
|
||||
|
||||
from ..api_client import APIclient
|
||||
from ..const import BuildTaskEnum
|
||||
from ..db import DB
|
||||
@ -28,13 +26,11 @@ class Extractor:
|
||||
stop = False
|
||||
|
||||
while not stop:
|
||||
oldest_build_age = datetime.now().astimezone() - \
|
||||
timedelta(days=self.config.data_store_days)
|
||||
logging.info("page: %s", page_num)
|
||||
for build in self.api.get_builds(page_num):
|
||||
# check if we shoud stop processing build
|
||||
if build.id <= last_build_id or \
|
||||
build.created_at <= oldest_build_age:
|
||||
build.created_at <= self.config.oldest_build_age:
|
||||
stop = True
|
||||
break
|
||||
|
||||
@ -77,10 +73,9 @@ class Extractor:
|
||||
return build_count
|
||||
|
||||
def build_cleanup(self):
|
||||
oldest_to_keep = datetime.now().astimezone() - \
|
||||
timedelta(days=self.config.data_store_days)
|
||||
logging.info('Removing all buidls older then %s', oldest_to_keep)
|
||||
removed_count = self.db.cleanup_builds(oldest_to_keep)
|
||||
logging.info('Removing all buidls older then %s',
|
||||
self.config.oldest_build_age.strftime("%m/%d/%Y, %H:%M:%S"))
|
||||
removed_count = self.db.cleanup_builds(self.config.oldest_build_age)
|
||||
logging.info('removed %d entries', removed_count)
|
||||
|
||||
def __update_build_tasks(self, build_tasks: List[BuildTask],
|
||||
@ -110,20 +105,13 @@ class Extractor:
|
||||
b.build_id, b.id, BuildTaskEnum(b.status_id).name)
|
||||
|
||||
def update_builds(self):
|
||||
not_before = datetime.now().astimezone() - \
|
||||
timedelta(days=self.config.oldest_to_update_days)
|
||||
logging.info('Getting unfinished builds that were created after %s ',
|
||||
not_before)
|
||||
unfinished_tasks = self.db.get_unfinished_builds(not_before)
|
||||
logging.info('Getting list of tasks from DB')
|
||||
unfinished_tasks = self.db.get_unfinished_builds(
|
||||
self.config.oldest_to_update)
|
||||
for build_id, build_tasks_db in unfinished_tasks.items():
|
||||
try:
|
||||
logging.info('Getting status of build %d', build_id)
|
||||
build = self.api.get_build(build_id)
|
||||
if not build:
|
||||
logging.warning(
|
||||
"build %s was deleted from albs, removing it", build_id)
|
||||
self.db.delete_build(build_id)
|
||||
continue
|
||||
|
||||
logging.info('Updating build tasks')
|
||||
build_tasks_to_check = [
|
||||
@ -143,12 +131,10 @@ class Extractor:
|
||||
build_id, err, exc_info=True)
|
||||
|
||||
def updating_test_tasks(self):
|
||||
not_before = datetime.now().astimezone() - \
|
||||
timedelta(days=self.config.oldest_to_update_days)
|
||||
logging.info('getting build tasks for builds created after %s',
|
||||
not_before)
|
||||
self.config.oldest_to_update)
|
||||
build_task_ids = self.db.get_build_tasks_for_tests_update(
|
||||
not_before)
|
||||
self.config.oldest_to_update)
|
||||
for build_task_id in build_task_ids:
|
||||
try:
|
||||
logging.info('getting tests for build task %s', build_task_id)
|
||||
|
@ -1,8 +1,8 @@
|
||||
from datetime import datetime, timedelta
|
||||
import logging
|
||||
from logging.handlers import RotatingFileHandler
|
||||
import sys
|
||||
import time
|
||||
from typing import Dict, Any
|
||||
|
||||
import yaml
|
||||
|
||||
@ -22,15 +22,19 @@ def __get_config(yml_path: str) -> ExtractorConfig:
|
||||
with open(yml_path, 'r', encoding='utf-8') as flr:
|
||||
raw = yaml.safe_load(flr)
|
||||
|
||||
# Dbconfig
|
||||
db_params: Dict[str, Any] = {'name': raw['db_name'],
|
||||
'username': raw['db_username'],
|
||||
'password': raw['db_password'], }
|
||||
if 'db_port' in raw:
|
||||
db_params['port'] = raw['db_port']
|
||||
if 'db_host' in raw:
|
||||
db_params['host'] = raw['db_host']
|
||||
raw['db_config'] = DbConfig(**db_params)
|
||||
# adding new attrs
|
||||
raw['oldest_build_age'] = datetime.now().astimezone() \
|
||||
- timedelta(days=raw['data_store_days'])
|
||||
|
||||
raw['db_config'] = DbConfig(name=raw['db_name'],
|
||||
port=int(raw['db_port']),
|
||||
host=raw['db_host'],
|
||||
username=raw['db_username'],
|
||||
password=raw['db_password'])
|
||||
|
||||
if 'oldest_to_update_days' in raw:
|
||||
raw['oldest_to_update_days'] = datetime.now().astimezone() \
|
||||
- timedelta(days=raw['oldest_to_update_days'])
|
||||
|
||||
return ExtractorConfig(**raw)
|
||||
|
||||
@ -99,10 +103,7 @@ def start(yml_path: str):
|
||||
else:
|
||||
logging.info('test tasks were updated')
|
||||
|
||||
# freeing up resources
|
||||
extractor.db.close_conn()
|
||||
extractor.api.close_session()
|
||||
|
||||
logging.info("Extraction was finished")
|
||||
logging.info("Sleeping for %d seconds", config.scrape_interval)
|
||||
time.sleep(config.scrape_interval)
|
||||
|
@ -1,13 +1,9 @@
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
DB_PORT = 5432
|
||||
DB_HOST = "localhost"
|
||||
|
||||
|
||||
class DbConfig(BaseModel):
|
||||
name: str = Field(description="db name")
|
||||
port: int = Field(description="db server port", default=DB_PORT)
|
||||
host: str = Field(description="db server ip/hostname", default=DB_HOST)
|
||||
port: int = Field(description="db server port")
|
||||
host: str = Field(description="db server ip/hostname")
|
||||
username: str = Field(description="username to connect with")
|
||||
password: str = Field(description="password to connect with1")
|
||||
|
@ -1,3 +1,4 @@
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
|
||||
from pydantic import HttpUrl, Field, BaseModel # pylint: disable=no-name-in-module
|
||||
@ -7,10 +8,10 @@ from .db_config import DbConfig
|
||||
# DEFAULTS
|
||||
ALBS_URL_DEFAULT = 'https://build.almalinux.org'
|
||||
LOG_FILE_DEFAULT = '/tmp/extractor.log'
|
||||
API_TIMEOUT_DEFAULT = 30
|
||||
API_DEFAULT = 30
|
||||
SCRAPE_INTERVAL_DEFAULT = 3600
|
||||
START_FROM_DEFAULT = 5808
|
||||
OLDEST_TO_UPDATE_DAYS_DEFAULT = 7
|
||||
OLDEST_TO_UPDATE_DEFAULT = datetime.now().astimezone() - timedelta(days=7)
|
||||
|
||||
|
||||
class ExtractorConfig(BaseModel):
|
||||
@ -21,17 +22,17 @@ class ExtractorConfig(BaseModel):
|
||||
default=LOG_FILE_DEFAULT)
|
||||
albs_url: HttpUrl = Field(description='ALBS root URL',
|
||||
default=ALBS_URL_DEFAULT)
|
||||
data_store_days: int = \
|
||||
Field(description='oldest build (in days) to keep in DB')
|
||||
oldest_build_age: datetime = \
|
||||
Field(description='oldest build age to store')
|
||||
jwt: str = Field(description='ALBS JWT token')
|
||||
db_config: DbConfig = Field(description="database configuration")
|
||||
api_timeout: int = Field(
|
||||
description="max time in seconds to wait for API response",
|
||||
default=API_TIMEOUT_DEFAULT)
|
||||
default=API_DEFAULT)
|
||||
scrape_interval: int = Field(description='how often (in seconds) we will extract data from ALBS',
|
||||
default=SCRAPE_INTERVAL_DEFAULT)
|
||||
start_from: int = Field(description='build id to start populating empty db with',
|
||||
default=START_FROM_DEFAULT)
|
||||
oldest_to_update_days: int = \
|
||||
Field(description='oldest (in days) unfinished object (build/task/step...) that we will try to update',
|
||||
default=OLDEST_TO_UPDATE_DAYS_DEFAULT)
|
||||
oldest_to_update: datetime = \
|
||||
Field(description='oldest unfinished object (build/task/step...) that we will try to update',
|
||||
default=OLDEST_TO_UPDATE_DEFAULT)
|
||||
|
@ -10,6 +10,7 @@ albs_url: https://build.almalinux.org
|
||||
# required: yes
|
||||
jwt: ""
|
||||
|
||||
|
||||
# db_host
|
||||
# IP/hostname of database server
|
||||
# required: no
|
||||
@ -27,6 +28,7 @@ db_port: 5432
|
||||
# required: yes
|
||||
db_username: albs_analytics
|
||||
|
||||
|
||||
# db_password
|
||||
# password to connect with
|
||||
# required: yes
|
||||
@ -37,6 +39,7 @@ db_password: super_secret_password
|
||||
# required: yes
|
||||
db_name: albs_analytics
|
||||
|
||||
|
||||
# log_file
|
||||
# file to write logs to
|
||||
# required: no
|
||||
@ -59,7 +62,7 @@ scrape_interval: 3600
|
||||
# default: 5808 (first build with correct metrics)
|
||||
start_from: 5808
|
||||
|
||||
# oldest_to_update_days
|
||||
# oldest_to_update
|
||||
# oldest (in days) unfinished object (build/task/step...) that we will try to update
|
||||
# required: false
|
||||
# default: 7
|
||||
|
@ -1,11 +0,0 @@
|
||||
BEGIN;
|
||||
|
||||
INSERT INTO arch_enum (id, value)
|
||||
VALUES
|
||||
(5, 'src'),
|
||||
(6, 'x86_64_v2');
|
||||
|
||||
UPDATE schema_version
|
||||
SET version = 4;
|
||||
|
||||
COMMIT;
|
23
releases.txt
23
releases.txt
@ -21,24 +21,7 @@ First version
|
||||
0.3.2 (2023-03-23)
|
||||
- Bugfix ALBS-1060
|
||||
|
||||
0.3.3 (2023-04-24)
|
||||
build-analytics
|
||||
Improvements
|
||||
0.3.3 (IN PROGRESS)
|
||||
build-analytics:
|
||||
- [ALBS-1099] change source of Test task started_at timestamp
|
||||
- [ALBS-1077] start deleting builds that were removed from ALBS
|
||||
Bugfixes
|
||||
- 'Key error' when db_port/db_host is not set
|
||||
- update_builds() ignoring odldest_to_update attribute
|
||||
- [ALBS-1099] Test task started_at attribute is NULL
|
||||
- Max recursion error in 'Test task details.json'
|
||||
|
||||
0.3.4 (2023-05-12)
|
||||
build_analytics
|
||||
- Bigfix ALBS-1111
|
||||
|
||||
0.3.5 (2023-06-01)
|
||||
build_analytics:
|
||||
ALBS-1103 start using persistent HTTP connections
|
||||
|
||||
0.3.6 (2024-10-08)
|
||||
build_analytics:
|
||||
buildsystem#360 Added src and x86_64_v2 arches
|
Loading…
Reference in New Issue
Block a user