added test tasks updating logic
This commit is contained in:
		
							parent
							
								
									45a6850056
								
							
						
					
					
						commit
						679328093a
					
				| @ -10,6 +10,7 @@ from .models.build_node_stat_db import BuildNodeStatDB | |||||||
| from .models.db_config import DbConfig | from .models.db_config import DbConfig | ||||||
| from .models.web_node_stat_db import WebNodeStatDB | from .models.web_node_stat_db import WebNodeStatDB | ||||||
| from .models.test_task_db import TestTaskDB | from .models.test_task_db import TestTaskDB | ||||||
|  | from .models.test_step_stat_db import TestStepStatDB | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| class DB(): | class DB(): | ||||||
| @ -100,7 +101,7 @@ class DB(): | |||||||
|         self.__conn.commit() |         self.__conn.commit() | ||||||
|         return cur.rowcount |         return cur.rowcount | ||||||
| 
 | 
 | ||||||
|     def get_unfinished_builds(self) -> Dict[int, Dict[int, int]]: |     def get_unfinished_builds(self, not_before: datetime) -> Dict[int, Dict[int, int]]: | ||||||
|         """ |         """ | ||||||
|         Getting list of unfinished builds and build_tasks |         Getting list of unfinished builds and build_tasks | ||||||
|         Dict[build_id, Dict[build_task_id, task_status_id]] |         Dict[build_id, Dict[build_task_id, task_status_id]] | ||||||
| @ -108,8 +109,8 @@ class DB(): | |||||||
|         res: Dict[int, Dict[int, int]] = {} |         res: Dict[int, Dict[int, int]] = {} | ||||||
| 
 | 
 | ||||||
|         # getting unfinished builds |         # getting unfinished builds | ||||||
|         sql = 'SELECT id FROM builds where finished_at is NULL;' |         sql = 'SELECT id FROM builds where finished_at is NULL AND created_at > %s ;' | ||||||
|         cur = self.__conn.cursor() |         cur = self.__conn.cursor(not_before.timestamp()) | ||||||
|         cur.execute(sql) |         cur.execute(sql) | ||||||
|         for row in cur.fetchall(): |         for row in cur.fetchall(): | ||||||
|             res[row[0]] = {} |             res[row[0]] = {} | ||||||
| @ -160,7 +161,7 @@ class DB(): | |||||||
|         for stat in web_node_stats: |         for stat in web_node_stats: | ||||||
|             logging.debug( |             logging.debug( | ||||||
|                 'updating web_node_stats %s build_task %s', stat.stat_name_id, build_task.id) |                 'updating web_node_stats %s build_task %s', stat.stat_name_id, build_task.id) | ||||||
|             if self.stat_exists(build_task_id=stat.build_task_id, |             if self.stat_exists(task_id=stat.build_task_id, | ||||||
|                                 stat_name_id=stat.stat_name_id, |                                 stat_name_id=stat.stat_name_id, | ||||||
|                                 table_name='web_node_stats'): |                                 table_name='web_node_stats'): | ||||||
|                 sql = ''' |                 sql = ''' | ||||||
| @ -184,7 +185,7 @@ class DB(): | |||||||
|         for stat in build_node_stats: |         for stat in build_node_stats: | ||||||
|             logging.debug( |             logging.debug( | ||||||
|                 'updating build_node_stats %s build_task %s', stat.stat_name_id, build_task.id) |                 'updating build_node_stats %s build_task %s', stat.stat_name_id, build_task.id) | ||||||
|             if self.stat_exists(build_task_id=stat.build_task_id, |             if self.stat_exists(task_id=stat.build_task_id, | ||||||
|                                 stat_name_id=stat.stat_name_id, |                                 stat_name_id=stat.stat_name_id, | ||||||
|                                 table_name='build_node_stats'): |                                 table_name='build_node_stats'): | ||||||
|                 sql = ''' |                 sql = ''' | ||||||
| @ -220,14 +221,14 @@ class DB(): | |||||||
|             return None |             return None | ||||||
|         return int(val[0]) |         return int(val[0]) | ||||||
| 
 | 
 | ||||||
|     def stat_exists(self, build_task_id: int, stat_name_id: int, table_name: str) -> bool: |     def stat_exists(self, task_id: int, stat_name_id: int, table_name: str) -> bool: | ||||||
|         sql = f''' |         sql = f''' | ||||||
|                 SELECT COUNT(build_task_id) |                 SELECT COUNT(build_task_id) | ||||||
|                 FROM {table_name} |                 FROM {table_name} | ||||||
|                 WHERE build_task_id = %s AND stat_name_id = %s; |                 WHERE build_task_id = %s AND stat_name_id = %s; | ||||||
|              ''' |              ''' | ||||||
|         cur = self.__conn.cursor() |         cur = self.__conn.cursor() | ||||||
|         cur.execute(sql, (build_task_id, stat_name_id)) |         cur.execute(sql, (task_id, stat_name_id)) | ||||||
|         val = int(cur.fetchone()[0]) |         val = int(cur.fetchone()[0]) | ||||||
|         return val == 1 |         return val == 1 | ||||||
| 
 | 
 | ||||||
| @ -255,3 +256,54 @@ class DB(): | |||||||
| 
 | 
 | ||||||
|         # commiting changes |         # commiting changes | ||||||
|         self.__conn.commit() |         self.__conn.commit() | ||||||
|  | 
 | ||||||
|  |     def get_build_tasks_for_unfinished_tests(self, not_before: datetime) -> List[int]: | ||||||
|  |         ''' | ||||||
|  |         getting build tasks id of unfinished test tasks | ||||||
|  |         ''' | ||||||
|  |         cur = self.__conn.cursor() | ||||||
|  |         sql = ''' | ||||||
|  |               SELECT bt.id | ||||||
|  |               FROM build_tasks as bt | ||||||
|  |               INNER JOIN test_tasks AS tt | ||||||
|  |                 ON bt.id = tt.build_task_id | ||||||
|  |               WHERE tt.status_id < 3 AND tt.started_at > %s; | ||||||
|  |               ''' | ||||||
|  |         cur.execute(sql, (not_before,)) | ||||||
|  |         result = [int(row[0]) for row in cur.fetchall()] | ||||||
|  |         return result | ||||||
|  | 
 | ||||||
|  |     def update_test_tasks(self, test_tasks: List[TestTaskDB]): | ||||||
|  |         cur = self.__conn.cursor() | ||||||
|  |         # test tasks | ||||||
|  |         for task in test_tasks: | ||||||
|  |             sql = ''' | ||||||
|  |                 UPDATE test_tasks | ||||||
|  |                 SET revision = %s, | ||||||
|  |                     status_id = %s, | ||||||
|  |                     started_at = %s | ||||||
|  |                 WHERE id = %s; | ||||||
|  |               ''' | ||||||
|  |             cur.execute(sql, (task.revision, task.status_id, | ||||||
|  |                         task.started_at, task.id)) | ||||||
|  | 
 | ||||||
|  |             # test step | ||||||
|  |             if not task.steps_stats: | ||||||
|  |                 continue | ||||||
|  |             for s in task.steps_stats: | ||||||
|  |                 if self.stat_exists(s.test_task_id, s.stat_name_id, 'test_steps_stats'): | ||||||
|  |                     sql = ''' | ||||||
|  |                             UPDATE test_steps_stats | ||||||
|  |                             SET start_ts = %s, | ||||||
|  |                             SET finish_ts = %s; | ||||||
|  |                           ''' | ||||||
|  |                     cur.execute(sql, s.start_ts, s.finish_ts) | ||||||
|  |                 else: | ||||||
|  |                     sql = ''' | ||||||
|  |                           INSERT INTO test_steps_stats (test_task_id, stat_name_id, start_ts, finish_ts) | ||||||
|  |                           VALUES (%s, %s, %s, %s); | ||||||
|  |                           ''' | ||||||
|  |                     cur.execute(sql, (s.test_task_id, s.stat_name_id, | ||||||
|  |                                 s.start_ts, s.finish_ts)) | ||||||
|  |         # commiting changes | ||||||
|  |         self.__conn.commit() | ||||||
|  | |||||||
| @ -12,8 +12,7 @@ from ..models.extractor_config import ExtractorConfig | |||||||
| 
 | 
 | ||||||
| class Extractor: | class Extractor: | ||||||
|     def __init__(self, config: ExtractorConfig, api: APIclient, db: DB): |     def __init__(self, config: ExtractorConfig, api: APIclient, db: DB): | ||||||
|         self.start_from = config.start_from |         self.config = config | ||||||
|         self.oldest_build_age = config.oldest_build_age |  | ||||||
|         self.api = api |         self.api = api | ||||||
|         self.db = db |         self.db = db | ||||||
| 
 | 
 | ||||||
| @ -22,7 +21,7 @@ class Extractor: | |||||||
|         page_num = 1 |         page_num = 1 | ||||||
|         last_build_id = self.db.get_latest_build_id() |         last_build_id = self.db.get_latest_build_id() | ||||||
|         if not last_build_id: |         if not last_build_id: | ||||||
|             last_build_id = self.start_from - 1 |             last_build_id = self.config.start_from - 1 | ||||||
|         logging.info("last_build_id: %s", last_build_id) |         logging.info("last_build_id: %s", last_build_id) | ||||||
|         stop = False |         stop = False | ||||||
| 
 | 
 | ||||||
| @ -31,7 +30,7 @@ class Extractor: | |||||||
|             for build in self.api.get_builds(page_num): |             for build in self.api.get_builds(page_num): | ||||||
|                 # check if we shoud stop processing build |                 # check if we shoud stop processing build | ||||||
|                 if build.id <= last_build_id or \ |                 if build.id <= last_build_id or \ | ||||||
|                    build.created_at <= self.oldest_build_age: |                    build.created_at <= self.config.oldest_build_age: | ||||||
|                     stop = True |                     stop = True | ||||||
|                     break |                     break | ||||||
| 
 | 
 | ||||||
| @ -71,8 +70,8 @@ class Extractor: | |||||||
| 
 | 
 | ||||||
|     def build_cleanup(self): |     def build_cleanup(self): | ||||||
|         logging.info('Removing all buidls older then %s', |         logging.info('Removing all buidls older then %s', | ||||||
|                      self.oldest_build_age.strftime("%m/%d/%Y, %H:%M:%S")) |                      self.config.oldest_build_age.strftime("%m/%d/%Y, %H:%M:%S")) | ||||||
|         removed_count = self.db.cleanup_builds(self.oldest_build_age) |         removed_count = self.db.cleanup_builds(self.config.oldest_build_age) | ||||||
|         logging.info('removed %d entries', removed_count) |         logging.info('removed %d entries', removed_count) | ||||||
| 
 | 
 | ||||||
|     def __update_build_tasks(self, build_tasks: List[BuildTask], |     def __update_build_tasks(self, build_tasks: List[BuildTask], | ||||||
| @ -125,3 +124,19 @@ class Extractor: | |||||||
|             except Exception as err:  # pylint: disable=broad-except |             except Exception as err:  # pylint: disable=broad-except | ||||||
|                 logging.error("Cant process build %d:  %s", |                 logging.error("Cant process build %d:  %s", | ||||||
|                               build_id, err, exc_info=True) |                               build_id, err, exc_info=True) | ||||||
|  | 
 | ||||||
|  |     def updating_test_tasks(self): | ||||||
|  |         logging.info('getting build task ids of unfinished tests') | ||||||
|  |         build_task_ids = self.db.get_build_tasks_for_unfinished_tests( | ||||||
|  |             self.config.oldest_to_update) | ||||||
|  |         for build_task_id in build_task_ids: | ||||||
|  |             try: | ||||||
|  |                 logging.info('getting tests for build task %s', build_task_id) | ||||||
|  |                 tasks_api = self.api.get_test_tasks(build_task_id) | ||||||
|  |                 logging.info('updating test tasks') | ||||||
|  |                 tasks_db = [t.as_db_model() for t in tasks_api] | ||||||
|  |                 self.db.update_test_tasks(tasks_db) | ||||||
|  |             except Exception as err: | ||||||
|  |                 logging.error( | ||||||
|  |                     'failed to update tests for %d build task: %s', | ||||||
|  |                     build_task_id, err, exc_info=True) | ||||||
|  | |||||||
| @ -15,20 +15,6 @@ from ..models.extractor_config import ExtractorConfig | |||||||
| from ..models.db_config import DbConfig | from ..models.db_config import DbConfig | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def __get_oldest_build_age(config: dict) -> datetime: |  | ||||||
|     oldest_build_age = datetime.now().astimezone() \ |  | ||||||
|         - timedelta(days=config['data_store_days']) |  | ||||||
|     return oldest_build_age |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| def __get_db_config(config: dict) -> DbConfig: |  | ||||||
|     return DbConfig(name=config['db_name'], |  | ||||||
|                     port=int(config['db_port']), |  | ||||||
|                     host=config['db_host'], |  | ||||||
|                     username=config['db_username'], |  | ||||||
|                     password=config['db_password']) |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| def __get_config(yml_path: str) -> ExtractorConfig: | def __get_config(yml_path: str) -> ExtractorConfig: | ||||||
|     """ |     """ | ||||||
|     get_config loads yml file and generates  instance |     get_config loads yml file and generates  instance | ||||||
| @ -37,8 +23,18 @@ def __get_config(yml_path: str) -> ExtractorConfig: | |||||||
|         raw = yaml.safe_load(flr) |         raw = yaml.safe_load(flr) | ||||||
| 
 | 
 | ||||||
|     # adding new attrs |     # adding new attrs | ||||||
|     raw['oldest_build_age'] = __get_oldest_build_age(raw) |     raw['oldest_build_age'] = datetime.now().astimezone() \ | ||||||
|     raw['db_config'] = __get_db_config(raw) |         - timedelta(days=raw['data_store_days']) | ||||||
|  | 
 | ||||||
|  |     raw['db_config'] = DbConfig(name=raw['db_name'], | ||||||
|  |                                 port=int(raw['db_port']), | ||||||
|  |                                 host=raw['db_host'], | ||||||
|  |                                 username=raw['db_username'], | ||||||
|  |                                 password=raw['db_password']) | ||||||
|  | 
 | ||||||
|  |     if 'oldest_to_update_days' in raw: | ||||||
|  |         raw['oldest_to_update_days'] = datetime.now().astimezone() \ | ||||||
|  |             - timedelta(days=raw['oldest_to_update_days']) | ||||||
| 
 | 
 | ||||||
|     return ExtractorConfig(**raw) |     return ExtractorConfig(**raw) | ||||||
| 
 | 
 | ||||||
| @ -83,21 +79,29 @@ def start(yml_path: str): | |||||||
|             logging.info( |             logging.info( | ||||||
|                 'Build extaction was finished. %d builds were inserted', inserted_count) |                 'Build extaction was finished. %d builds were inserted', inserted_count) | ||||||
| 
 | 
 | ||||||
|         logging.info('Starting old builds removal') |         logging.info('starting old builds removal') | ||||||
|         try: |         try: | ||||||
|             extractor.build_cleanup() |             extractor.build_cleanup() | ||||||
|         except Exception as err:  # pylint: disable=broad-except |         except Exception as err:  # pylint: disable=broad-except | ||||||
|             logging.critical("Unhandled exception %s", err, exc_info=True) |             logging.critical("unhandled exception %s", err, exc_info=True) | ||||||
|         else: |         else: | ||||||
|             logging.info('Cleanup finished') |             logging.info('cleanup finished') | ||||||
| 
 | 
 | ||||||
|         logging.info('Updating statuses of unfinished build tasks') |         logging.info('updating statuses of unfinished build tasks') | ||||||
|         try: |         try: | ||||||
|             extractor.update_builds() |             extractor.update_builds() | ||||||
|         except Exception as err:  # pylint: disable=broad-except |         except Exception as err:  # pylint: disable=broad-except | ||||||
|             logging.critical("Unhandled exception %s", err, exc_info=True) |             logging.critical("unhandled exception %s", err, exc_info=True) | ||||||
|         else: |         else: | ||||||
|             logging.info('Update finished') |             logging.info('update finished') | ||||||
|  | 
 | ||||||
|  |         logging.info('updating statuses of unfinished test tasks') | ||||||
|  |         try: | ||||||
|  |             extractor.updating_test_tasks() | ||||||
|  |         except Exception as err:  # pylint: disable=broad-except | ||||||
|  |             logging.critical("unhandled exception %s", err, exc_info=True) | ||||||
|  |         else: | ||||||
|  |             logging.info('test tasks were updated') | ||||||
| 
 | 
 | ||||||
|         extractor.db.close_conn() |         extractor.db.close_conn() | ||||||
|         logging.info("Extraction was finished") |         logging.info("Extraction was finished") | ||||||
|  | |||||||
| @ -1,4 +1,4 @@ | |||||||
| from datetime import datetime | from datetime import datetime, timedelta | ||||||
| from pathlib import Path | from pathlib import Path | ||||||
| 
 | 
 | ||||||
| from pydantic import HttpUrl, Field, BaseModel  # pylint: disable=no-name-in-module | from pydantic import HttpUrl, Field, BaseModel  # pylint: disable=no-name-in-module | ||||||
| @ -11,6 +11,7 @@ LOG_FILE_DEFAULT = '/tmp/extractor.log' | |||||||
| API_DEFAULT = 30 | API_DEFAULT = 30 | ||||||
| SCRAPE_INTERVAL_DEFAULT = 3600 | SCRAPE_INTERVAL_DEFAULT = 3600 | ||||||
| START_FROM_DEFAULT = 5808 | START_FROM_DEFAULT = 5808 | ||||||
|  | OLDEST_TO_UPDATE_DEFAULT = datetime.now().astimezone() - timedelta(days=3) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| class ExtractorConfig(BaseModel): | class ExtractorConfig(BaseModel): | ||||||
| @ -32,3 +33,6 @@ class ExtractorConfig(BaseModel): | |||||||
|                                  default=SCRAPE_INTERVAL_DEFAULT) |                                  default=SCRAPE_INTERVAL_DEFAULT) | ||||||
|     start_from: int = Field(description='build id to start populating empty db with', |     start_from: int = Field(description='build id to start populating empty db with', | ||||||
|                             default=START_FROM_DEFAULT) |                             default=START_FROM_DEFAULT) | ||||||
|  |     oldest_to_update: datetime = \ | ||||||
|  |         Field(description='oldest unfinished object (build/task/step...) that we will try to update', | ||||||
|  |               default=OLDEST_TO_UPDATE_DEFAULT) | ||||||
|  | |||||||
| @ -61,3 +61,11 @@ scrape_interval: 3600 | |||||||
| # required: false | # required: false | ||||||
| # default: 5808 (first build with correct metrics) | # default: 5808 (first build with correct metrics) | ||||||
| start_from: | start_from: | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | # oldest_to_update | ||||||
|  | # oldest (in days) unfinished object (build/task/step...) that we will try to update | ||||||
|  | # required: false | ||||||
|  | # default: 3 | ||||||
|  | oldest_to_update_days: 3 | ||||||
|  | 
 | ||||||
|  | |||||||
| @ -53,7 +53,7 @@ VALUES | |||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| -- test_steps | -- test_steps_stats | ||||||
| CREATE TABLE test_steps_stats( | CREATE TABLE test_steps_stats( | ||||||
|     test_task_id INTEGER REFERENCES test_tasks(id) ON DELETE CASCADE, |     test_task_id INTEGER REFERENCES test_tasks(id) ON DELETE CASCADE, | ||||||
|     stat_name_id  INTEGER REFERENCES test_steps_enum(id) ON DELETE SET NULL, |     stat_name_id  INTEGER REFERENCES test_steps_enum(id) ON DELETE SET NULL, | ||||||
|  | |||||||
| @ -10,4 +10,5 @@ First version | |||||||
|  - Added canceled Build task status |  - Added canceled Build task status | ||||||
| 
 | 
 | ||||||
| 0.3.0 (IN PROGRESS) | 0.3.0 (IN PROGRESS) | ||||||
|  - Added test tasks stats |  - Added test tasks stats | ||||||
|  |  - New config parameter: oldest_to_update_days | ||||||
		Loading…
	
		Reference in New Issue
	
	Block a user