pungi/pungi/phases/weaver.py
Haibo Lin 41a629969c Format code base with black
https://black.readthedocs.io/en/stable/

JIRA: COMPOSE-4086
Signed-off-by: Haibo Lin <hlin@redhat.com>
2020-02-05 17:35:47 +08:00

82 lines
2.4 KiB
Python

# -*- coding: utf-8 -*-
from kobo import shortcuts
from kobo.threads import ThreadPool, WorkerThread
class WeaverPhase(object):
"""
Special "phase" that manages other phases' run.
It needs input-running schema where particular phases are composed
sequentially and in parallel as well. A Sequential set of phases
is named "pipeline".
If any of the phases fail, we must ensure that others will stop correctly.
Otherwise the whole process will hang.
:param compose: it is needed for logging
:param phases_schema: two-dimensional array of phases. Top dimension
denotes particular pipelines. Second dimension contains phases.
"""
name = "weaver"
def __init__(self, compose, phases_schema):
self.msg = "---------- PHASE: %s ----------" % self.name.upper()
self.compose = compose
self.finished = False
self.pool = ThreadPool(logger=self.compose._logger)
if not phases_schema:
msg = "No running schema was set for WeaverPhase"
self.pool.log_error(msg)
raise ValueError(msg)
self._phases_schema = phases_schema
def start(self):
if self.finished:
msg = (
"Phase '%s' has already finished and can not be started twice"
% self.name
)
self.pool.log_error(msg)
raise RuntimeError(msg)
self.compose.log_info("[BEGIN] %s" % self.msg)
self.run()
def run(self):
for pipeline in shortcuts.force_list(self._phases_schema):
self.pool.add(PipelineThread(self.pool))
self.pool.queue_put(shortcuts.force_list(pipeline))
self.pool.start()
def stop(self):
if self.finished:
return
if hasattr(self, "pool"):
self.pool.stop()
self.finished = True
self.compose.log_info("[DONE ] %s" % self.msg)
class PipelineThread(WorkerThread):
"""
Launches phases in pipeline sequentially
"""
def process(self, item, num):
pipeline = shortcuts.force_list(item)
phases_names = ", ".join(phase.name for phase in pipeline)
msg = "Running pipeline (%d/%d). Phases: %s" % (
num,
self.pool.queue_total,
phases_names,
)
self.pool.log_info("[BEGIN] %s" % (msg,))
for phase in pipeline:
phase.start()
phase.stop()
self.pool.log_info("[DONE ] %s" % (msg,))