albs_analytics/build_analitycs/build_analytics/api_client.py

83 lines
2.8 KiB
Python

from datetime import datetime
import logging
from urllib.parse import urljoin
from typing import Dict, List
from .models.build import Build
from .models.build_task import BuildTask
import requests
TZ_OFFSET = '+00:00'
class APIclient():
"""
client for working with ALBS API
"""
def __init__(self, api_root: str, jwt: str):
self.api_root = api_root
self.jwt = jwt
def get_builds(self, page_num: int = 1) -> List[Build]:
ep = '/api/v1/builds'
url = urljoin(self.api_root, ep)
params = {'pageNumber': page_num}
headers = {'accept': 'appilication/json'}
response = requests.get(url, params=params, headers=headers)
response.raise_for_status()
result = []
for b in response.json()['builds']:
try:
result.append(self._parse_build(b))
except Exception as err: # pylint: disable=broad-except
logging.error("Cant convert build JSON %s to Buildmodel: %s",
b, err, exc_info=True)
return result
def _parse_build_tasks(self, tasks_json: Dict, build_id: int) -> List[BuildTask]:
result = []
for task in tasks_json:
try:
started_at = datetime.fromisoformat(
task['started_at']+TZ_OFFSET) \
if task['started_at'] else None
finished_at = datetime.fromisoformat(task['finished_at']+TZ_OFFSET) \
if task['finished_at'] else None
params = {'id': task['id'],
'build_id': build_id,
'started_at': started_at,
'finished_at': finished_at,
'arch': task['arch'],
'status_id': task['status']}
result.append(BuildTask(**params))
except Exception as err: # pylint: disable=broad-except
logging.error("Cant convert build_task JSON %s (build_id %s) to BuildTask model: %s",
task, build_id, err, exc_info=True)
result.sort(key=lambda x: x.id, reverse=True)
return result
def _parse_build(self, build_json: Dict) -> Build:
url = f"https://build.almalinux.org/build/{build_json['id']}"
created_at = datetime.fromisoformat(build_json['created_at']+TZ_OFFSET)
finished_at = datetime.fromisoformat(build_json['finished_at']+TZ_OFFSET) \
if build_json['finished_at'] else None
build_tasks = self._parse_build_tasks(
build_json['tasks'], build_json['id'])
params = {
'id': build_json['id'],
'url': url,
'created_at': created_at,
'finished_at': finished_at,
'build_tasks': build_tasks}
return Build(**params)