2023-02-27 19:51:53 +00:00
|
|
|
from datetime import datetime
|
|
|
|
import logging
|
|
|
|
|
|
|
|
from urllib.parse import urljoin
|
|
|
|
from typing import Dict, List
|
|
|
|
|
|
|
|
|
|
|
|
from .models.build import Build
|
|
|
|
from .models.build_task import BuildTask
|
|
|
|
|
|
|
|
import requests
|
|
|
|
|
|
|
|
|
|
|
|
TZ_OFFSET = '+00:00'
|
|
|
|
|
|
|
|
|
|
|
|
class APIclient():
|
|
|
|
"""
|
|
|
|
client for working with ALBS API
|
|
|
|
"""
|
|
|
|
|
2023-02-28 17:28:48 +00:00
|
|
|
def __init__(self, api_root: str, jwt: str, timeout: int):
|
2023-02-27 19:51:53 +00:00
|
|
|
self.api_root = api_root
|
|
|
|
self.jwt = jwt
|
2023-02-28 17:28:48 +00:00
|
|
|
self.timeout = timeout
|
2023-02-27 19:51:53 +00:00
|
|
|
|
|
|
|
def get_builds(self, page_num: int = 1) -> List[Build]:
|
|
|
|
ep = '/api/v1/builds'
|
|
|
|
url = urljoin(self.api_root, ep)
|
|
|
|
params = {'pageNumber': page_num}
|
|
|
|
headers = {'accept': 'appilication/json'}
|
|
|
|
|
2023-02-28 17:28:48 +00:00
|
|
|
response = requests.get(
|
|
|
|
url, params=params, headers=headers, timeout=self.timeout)
|
2023-02-27 19:51:53 +00:00
|
|
|
response.raise_for_status()
|
|
|
|
|
|
|
|
result = []
|
|
|
|
for b in response.json()['builds']:
|
|
|
|
try:
|
|
|
|
result.append(self._parse_build(b))
|
|
|
|
except Exception as err: # pylint: disable=broad-except
|
|
|
|
logging.error("Cant convert build JSON %s to Buildmodel: %s",
|
|
|
|
b, err, exc_info=True)
|
|
|
|
return result
|
|
|
|
|
2023-02-28 17:28:48 +00:00
|
|
|
def get_build(self, build_id: int) -> Build:
|
|
|
|
ep = f'/api/v1/builds/{build_id}'
|
|
|
|
url = urljoin(self.api_root, ep)
|
|
|
|
headers = {'accept': 'application/json'}
|
|
|
|
response = requests.get(url, headers=headers, timeout=self.timeout)
|
|
|
|
response.raise_for_status()
|
|
|
|
return self._parse_build(response.json())
|
|
|
|
|
2023-02-27 19:51:53 +00:00
|
|
|
def _parse_build_tasks(self, tasks_json: Dict, build_id: int) -> List[BuildTask]:
|
|
|
|
result = []
|
|
|
|
for task in tasks_json:
|
|
|
|
try:
|
|
|
|
started_at = datetime.fromisoformat(
|
|
|
|
task['started_at']+TZ_OFFSET) \
|
|
|
|
if task['started_at'] else None
|
|
|
|
finished_at = datetime.fromisoformat(task['finished_at']+TZ_OFFSET) \
|
|
|
|
if task['finished_at'] else None
|
2023-03-01 13:13:14 +00:00
|
|
|
name = task['ref']['url'].split('/')[-1].replace('.git', '')
|
2023-02-27 19:51:53 +00:00
|
|
|
params = {'id': task['id'],
|
2023-03-01 13:13:14 +00:00
|
|
|
'name': name,
|
2023-02-27 19:51:53 +00:00
|
|
|
'build_id': build_id,
|
|
|
|
'started_at': started_at,
|
|
|
|
'finished_at': finished_at,
|
|
|
|
'arch': task['arch'],
|
|
|
|
'status_id': task['status']}
|
|
|
|
result.append(BuildTask(**params))
|
|
|
|
except Exception as err: # pylint: disable=broad-except
|
|
|
|
logging.error("Cant convert build_task JSON %s (build_id %s) to BuildTask model: %s",
|
|
|
|
task, build_id, err, exc_info=True)
|
|
|
|
|
|
|
|
result.sort(key=lambda x: x.id, reverse=True)
|
|
|
|
return result
|
|
|
|
|
|
|
|
def _parse_build(self, build_json: Dict) -> Build:
|
|
|
|
url = f"https://build.almalinux.org/build/{build_json['id']}"
|
|
|
|
created_at = datetime.fromisoformat(build_json['created_at']+TZ_OFFSET)
|
|
|
|
finished_at = datetime.fromisoformat(build_json['finished_at']+TZ_OFFSET) \
|
|
|
|
if build_json['finished_at'] else None
|
|
|
|
build_tasks = self._parse_build_tasks(
|
|
|
|
build_json['tasks'], build_json['id'])
|
|
|
|
|
|
|
|
params = {
|
|
|
|
'id': build_json['id'],
|
|
|
|
'url': url,
|
|
|
|
'created_at': created_at,
|
|
|
|
'finished_at': finished_at,
|
|
|
|
'build_tasks': build_tasks}
|
|
|
|
|
|
|
|
return Build(**params)
|