Cancel koji tasks when pungi terminated

JIRA: RHELCMP-4148
Signed-off-by: Haibo Lin <hlin@redhat.com>
This commit is contained in:
Haibo Lin 2021-03-22 15:52:47 +08:00
parent edb4517e80
commit 035b37c566
22 changed files with 92 additions and 39 deletions

View File

@ -103,6 +103,16 @@ class LogPaths(object):
makedirs(path)
return path
def koji_tasks_dir(self, create_dir=True):
"""
Examples:
logs/global/koji-tasks
"""
path = os.path.join(self.topdir(create_dir=create_dir), "koji-tasks")
if create_dir:
makedirs(path)
return path
def log_file(self, arch, log_name, create_dir=True):
arch = arch or "global"
if log_name.endswith(".log"):
@ -502,6 +512,9 @@ class WorkPaths(object):
"""
Returns the path to file in which the cached version of
PackageSetBase.file_cache should be stored.
Example:
work/global/pkgset_f33-compose_file_cache.pickle
"""
filename = "pkgset_%s_file_cache.pickle" % pkgset_name
return os.path.join(self.topdir(arch="global"), filename)

View File

@ -722,7 +722,7 @@ class BuildinstallThread(WorkerThread):
# Ask Koji for all the RPMs in the `runroot_tag` and check that
# those installed in the old buildinstall buildroot are still in the
# very same versions/releases.
koji_wrapper = kojiwrapper.KojiWrapper(compose.conf["koji_profile"])
koji_wrapper = kojiwrapper.KojiWrapper(compose)
rpms = koji_wrapper.koji_proxy.listTaggedRPMS(
compose.conf.get("runroot_tag"), inherit=True, latest=True
)[0]

View File

@ -346,7 +346,7 @@ def run_createiso_command(
build_arch = arch
if runroot.runroot_method == "koji" and not bootable:
runroot_tag = compose.conf["runroot_tag"]
koji_wrapper = kojiwrapper.KojiWrapper(compose.conf["koji_profile"])
koji_wrapper = kojiwrapper.KojiWrapper(compose)
koji_proxy = koji_wrapper.koji_proxy
tag_info = koji_proxy.getTag(runroot_tag)
if not tag_info:

View File

@ -607,7 +607,7 @@ def _make_lookaside_repo(compose, variant, arch, pkg_map, package_sets=None):
)
+ "/",
"koji": lambda: pungi.wrappers.kojiwrapper.KojiWrapper(
compose.conf["koji_profile"]
compose
).koji_module.config.topdir.rstrip("/")
+ "/",
}

View File

@ -223,7 +223,7 @@ class CreateImageBuildThread(WorkerThread):
)
self.pool.log_info("[BEGIN] %s" % msg)
koji_wrapper = KojiWrapper(compose.conf["koji_profile"])
koji_wrapper = KojiWrapper(compose)
# writes conf file for koji image-build
self.pool.log_info(

View File

@ -59,12 +59,14 @@ class ImageContainerThread(WorkerThread):
]
# Start task
koji = kojiwrapper.KojiWrapper(compose.conf["koji_profile"])
koji = kojiwrapper.KojiWrapper(compose)
koji.login()
task_id = koji.koji_proxy.buildContainer(
source, target, config, priority=priority
)
koji.save_task_id(task_id)
# Wait for it to finish and capture the output into log file (even
# though there is not much there).
log_dir = os.path.join(compose.paths.log.topdir(), "image_container")

View File

@ -186,7 +186,7 @@ class CreateLiveImageThread(WorkerThread):
)
self.pool.log_info("[BEGIN] %s" % msg)
koji_wrapper = KojiWrapper(compose.conf["koji_profile"])
koji_wrapper = KojiWrapper(compose)
_, version = compose.compose_id.rsplit("-", 1)
name = cmd["name"] or imgname
version = cmd["version"] or version

View File

@ -140,7 +140,7 @@ class LiveMediaThread(WorkerThread):
)
self.pool.log_info("[BEGIN] %s" % msg)
koji_wrapper = KojiWrapper(compose.conf["koji_profile"])
koji_wrapper = KojiWrapper(compose)
cmd = self._get_cmd(koji_wrapper, config)
log_file = self._get_log_file(compose, variant, subvariant, config)

View File

@ -77,7 +77,7 @@ class OSBSThread(WorkerThread):
def worker(self, compose, variant, config):
msg = "OSBS task for variant %s" % variant.uid
self.pool.log_info("[BEGIN] %s" % msg)
koji = kojiwrapper.KojiWrapper(compose.conf["koji_profile"])
koji = kojiwrapper.KojiWrapper(compose)
koji.login()
# Start task
@ -98,6 +98,8 @@ class OSBSThread(WorkerThread):
source, target, config, priority=priority
)
koji.save_task_id(task_id)
# Wait for it to finish and capture the output into log file (even
# though there is not much there).
log_dir = os.path.join(compose.paths.log.topdir(), "osbs")
@ -173,7 +175,7 @@ def add_metadata(variant, task_id, compose, is_scratch):
# Create new Koji session. The task could take so long to finish that
# our session will expire. This second session does not need to be
# authenticated since it will only do reading operations.
koji = kojiwrapper.KojiWrapper(compose.conf["koji_profile"])
koji = kojiwrapper.KojiWrapper(compose)
# Create metadata
metadata = {

View File

@ -110,7 +110,7 @@ class RunOSBuildThread(WorkerThread):
def worker(self, compose, variant, config, arches, version, release, target, repo):
msg = "OSBuild task for variant %s" % variant.uid
self.pool.log_info("[BEGIN] %s" % msg)
koji = kojiwrapper.KojiWrapper(compose.conf["koji_profile"])
koji = kojiwrapper.KojiWrapper(compose)
koji.login()
# Start task
@ -127,6 +127,8 @@ class RunOSBuildThread(WorkerThread):
opts=opts,
)
koji.save_task_id(task_id)
# Wait for it to finish and capture the output into log file.
log_dir = os.path.join(compose.paths.log.topdir(), "osbuild")
util.makedirs(log_dir)
@ -141,7 +143,7 @@ class RunOSBuildThread(WorkerThread):
# Refresh koji session which may have timed out while the task was
# running. Watching is done via a subprocess, so the session is
# inactive.
koji = kojiwrapper.KojiWrapper(compose.conf["koji_profile"])
koji = kojiwrapper.KojiWrapper(compose)
# Get build id via the task's result json data
result = koji.koji_proxy.getTaskResult(task_id)

View File

@ -31,7 +31,6 @@ import kobo.rpmlib
from kobo.threads import WorkerThread, ThreadPool
import pungi.wrappers.kojiwrapper
from pungi.util import pkg_is_srpm, copy_all
from pungi.arch import get_valid_arches, is_excluded
from pungi.errors import UnsignedPackagesError
@ -391,7 +390,6 @@ class KojiPackageSet(PackageSetBase):
def __getstate__(self):
result = self.__dict__.copy()
result["koji_profile"] = self.koji_wrapper.profile
del result["koji_wrapper"]
del result["_logger"]
if "cache_region" in result:
@ -399,8 +397,6 @@ class KojiPackageSet(PackageSetBase):
return result
def __setstate__(self, data):
koji_profile = data.pop("koji_profile")
self.koji_wrapper = pungi.wrappers.kojiwrapper.KojiWrapper(koji_profile)
self._logger = None
self.__dict__.update(data)

View File

@ -186,8 +186,7 @@ def get_koji_modules(compose, koji_wrapper, event, module_info_str):
class PkgsetSourceKoji(pungi.phases.pkgset.source.PkgsetSourceBase):
def __call__(self):
compose = self.compose
koji_profile = compose.conf["koji_profile"]
self.koji_wrapper = pungi.wrappers.kojiwrapper.KojiWrapper(koji_profile)
self.koji_wrapper = pungi.wrappers.kojiwrapper.KojiWrapper(compose)
# path prefix must contain trailing '/'
path_prefix = self.koji_wrapper.koji_module.config.topdir.rstrip("/") + "/"
package_sets = get_pkgset_from_koji(

View File

@ -110,7 +110,7 @@ class Runroot(kobo.log.LoggingBase):
runroot_tag = self.compose.conf["runroot_tag"]
log_dir = kwargs.pop("log_dir", None)
koji_wrapper = kojiwrapper.KojiWrapper(self.compose.conf["koji_profile"])
koji_wrapper = kojiwrapper.KojiWrapper(self.compose)
koji_cmd = koji_wrapper.get_runroot_cmd(
runroot_tag,
arch,
@ -303,7 +303,7 @@ class Runroot(kobo.log.LoggingBase):
runroot_channel = self.compose.conf.get("runroot_channel")
runroot_tag = self.compose.conf["runroot_tag"]
koji_wrapper = kojiwrapper.KojiWrapper(self.compose.conf["koji_profile"])
koji_wrapper = kojiwrapper.KojiWrapper(self.compose)
koji_cmd = koji_wrapper.get_pungi_buildinstall_cmd(
runroot_tag,
arch,
@ -337,7 +337,7 @@ class Runroot(kobo.log.LoggingBase):
runroot_channel = self.compose.conf.get("runroot_channel")
runroot_tag = self.compose.conf["runroot_tag"]
koji_wrapper = kojiwrapper.KojiWrapper(self.compose.conf["koji_profile"])
koji_wrapper = kojiwrapper.KojiWrapper(self.compose)
koji_cmd = koji_wrapper.get_pungi_ostree_cmd(
runroot_tag, arch, args, channel=runroot_channel, **kwargs
)

View File

@ -22,6 +22,7 @@ from six.moves import shlex_quote
from pungi.phases import PHASES_NAMES
from pungi import get_full_version, util
from pungi.errors import UnsignedPackagesError
from pungi.wrappers import kojiwrapper
# force C locales
@ -615,9 +616,25 @@ def try_kill_children(signal):
COMPOSE.log_warning("Failed to kill all subprocesses")
def try_kill_koji_tasks():
try:
if COMPOSE:
koji_tasks_dir = COMPOSE.paths.log.koji_tasks_dir(create_dir=False)
if os.path.exists(koji_tasks_dir):
COMPOSE.log_warning("Trying to kill koji tasks")
koji = kojiwrapper.KojiWrapper(COMPOSE)
koji.login()
for task_id in os.listdir(koji_tasks_dir):
koji.koji_proxy.cancelTask(int(task_id))
except Exception:
if COMPOSE:
COMPOSE.log_warning("Failed to kill koji tasks")
def sigterm_handler(signum, frame):
if COMPOSE:
try_kill_children(signum)
try_kill_koji_tasks()
COMPOSE.log_error("Compose run failed: signal %s" % signum)
COMPOSE.log_error("Traceback:\n%s" % "\n".join(traceback.format_stack(frame)))
COMPOSE.log_critical("Compose failed: %s" % COMPOSE.topdir)

View File

@ -36,10 +36,14 @@ KOJI_BUILD_DELETED = koji.BUILD_STATES["DELETED"]
class KojiWrapper(object):
lock = threading.Lock()
def __init__(self, profile):
self.profile = profile
def __init__(self, compose):
self.compose = compose
try:
self.profile = self.compose.conf["koji_profile"]
except KeyError:
raise RuntimeError("Koji profile must be configured")
with self.lock:
self.koji_module = koji.get_profile_module(profile)
self.koji_module = koji.get_profile_module(self.profile)
session_opts = {}
for key in (
"timeout",
@ -300,6 +304,8 @@ class KojiWrapper(object):
task_id = int(match.groups()[0])
self.save_task_id(task_id)
retcode, output = self._wait_for_task(task_id, logfile=log_file)
return {
@ -542,6 +548,8 @@ class KojiWrapper(object):
)
task_id = int(match.groups()[0])
self.save_task_id(task_id)
if retcode != 0 and (
self._has_connection_error(output) or self._has_offline_error(output)
):
@ -827,13 +835,22 @@ class KojiWrapper(object):
"""
return self.multicall_map(*args, **kwargs)
def save_task_id(self, task_id):
"""Save task id by creating a file using task_id as file name
:param int task_id: ID of koji task
"""
log_dir = self.compose.paths.log.koji_tasks_dir()
with open(os.path.join(log_dir, str(task_id)), "w"):
pass
def get_buildroot_rpms(compose, task_id):
"""Get build root RPMs - either from runroot or local"""
result = []
if task_id:
# runroot
koji = KojiWrapper(compose.conf["koji_profile"])
koji = KojiWrapper(compose)
buildroot_infos = koji.koji_proxy.listBuildroots(taskID=task_id)
if not buildroot_infos:
children_tasks = koji.koji_proxy.getTaskChildren(task_id)

View File

@ -265,11 +265,7 @@ class RpmScmWrapper(ScmBase):
class KojiScmWrapper(ScmBase):
def __init__(self, *args, **kwargs):
super(KojiScmWrapper, self).__init__(*args, **kwargs)
try:
profile = kwargs["compose"].conf["koji_profile"]
except KeyError:
raise RuntimeError("Koji profile must be configured")
wrapper = KojiWrapper(profile)
wrapper = KojiWrapper(kwargs["compose"])
self.koji = wrapper.koji_module
self.proxy = wrapper.koji_proxy

View File

@ -171,6 +171,7 @@ class ImageContainerThreadTest(helpers.PungiTestCase):
opts,
priority=None,
),
mock.call.save_task_id(12345),
mock.call.watch_task(
12345,
os.path.join(

View File

@ -10,6 +10,7 @@ except ImportError:
import tempfile
import os
import shutil
import six
@ -33,7 +34,9 @@ def mock_imagebuild_path(id):
class KojiWrapperBaseTestCase(unittest.TestCase):
def setUp(self):
_, self.tmpfile = tempfile.mkstemp()
self.koji_profile = mock.Mock()
compose = mock.Mock(conf={"koji_profile": "custom-koji"})
self.tmpdir = tempfile.mkdtemp()
compose.paths.log.koji_tasks_dir.return_value = self.tmpdir
with mock.patch("pungi.wrappers.kojiwrapper.koji") as koji:
koji.gssapi_login = mock.Mock()
koji.get_profile_module = mock.Mock(
@ -51,10 +54,11 @@ class KojiWrapperBaseTestCase(unittest.TestCase):
)
)
self.koji_profile = koji.get_profile_module.return_value
self.koji = KojiWrapper("custom-koji")
self.koji = KojiWrapper(compose)
def tearDown(self):
os.remove(self.tmpfile)
shutil.rmtree(self.tmpdir)
class KojiWrapperTest(KojiWrapperBaseTestCase):
@ -1163,7 +1167,7 @@ class TestGetBuildrootRPMs(unittest.TestCase):
rpms = get_buildroot_rpms(compose, 1234)
self.assertEqual(KojiWrapper.call_args_list, [mock.call("koji")])
self.assertEqual(KojiWrapper.call_args_list, [mock.call(compose)])
self.assertEqual(
KojiWrapper.return_value.mock_calls,
[

View File

@ -220,6 +220,7 @@ class OSBSThreadTest(helpers.PungiTestCase):
options,
priority=None,
),
mock.call.save_task_id(12345),
mock.call.watch_task(
12345, self.topdir + "/logs/global/osbs/Server-1-watch-task.log"
),

View File

@ -196,6 +196,7 @@ class RunOSBuildThreadTest(helpers.PungiTestCase):
"repo": [self.topdir + "/compose/Everything/$arch/os"],
},
),
mock.call.save_task_id(1234),
mock.call.watch_task(1234, mock.ANY),
mock.call.koji_proxy.getTaskResult(1234),
mock.call.koji_proxy.getBuild(build_id),
@ -312,6 +313,7 @@ class RunOSBuildThreadTest(helpers.PungiTestCase):
["aarch64", "x86_64"],
opts={"repo": [self.topdir + "/compose/Everything/$arch/os"]},
),
mock.call.save_task_id(1234),
mock.call.watch_task(1234, mock.ANY),
mock.call.koji_proxy.getTaskResult(1234),
mock.call.koji_proxy.getBuild(build_id),

View File

@ -446,7 +446,7 @@ class TestSourceKoji(helpers.PungiTestCase):
self.assertEqual(pkgsets, gpfk.return_value)
self.assertEqual(path_prefix, "/prefix/")
self.assertEqual(KojiWrapper.mock_calls, [mock.call("koji")])
self.assertEqual(KojiWrapper.mock_calls, [mock.call(compose)])
class TestCorrectNVR(helpers.PungiTestCase):

View File

@ -588,9 +588,8 @@ class CvsSCMTestCase(SCMBaseTest):
@mock.patch("pungi.wrappers.scm.urlretrieve")
@mock.patch("pungi.wrappers.scm.KojiWrapper")
class KojiSCMTestCase(SCMBaseTest):
def test_without_koji_profile(self, KW, dl):
def test_without_koji_profile(self, dl):
compose = mock.Mock(conf={})
with self.assertRaises(RuntimeError) as ctx:
@ -600,9 +599,9 @@ class KojiSCMTestCase(SCMBaseTest):
compose=compose,
)
self.assertIn("Koji profile must be configured", str(ctx.exception))
self.assertEqual(KW.mock_calls, [])
self.assertEqual(dl.mock_calls, [])
@mock.patch("pungi.wrappers.scm.KojiWrapper")
def test_doesnt_get_dirs(self, KW, dl):
compose = mock.Mock(conf={"koji_profile": "koji"})
@ -613,7 +612,7 @@ class KojiSCMTestCase(SCMBaseTest):
compose=compose,
)
self.assertIn("Only files can be exported", str(ctx.exception))
self.assertEqual(KW.mock_calls, [mock.call("koji")])
self.assertEqual(KW.mock_calls, [mock.call(compose)])
self.assertEqual(dl.mock_calls, [])
def _setup_koji_wrapper(self, KW, build_id, files):
@ -627,6 +626,7 @@ class KojiSCMTestCase(SCMBaseTest):
]
KW.return_value.koji_proxy.listTagged.return_value = [buildinfo]
@mock.patch("pungi.wrappers.scm.KojiWrapper")
def test_get_from_build(self, KW, dl):
compose = mock.Mock(conf={"koji_profile": "koji"})
@ -646,7 +646,7 @@ class KojiSCMTestCase(SCMBaseTest):
self.assertEqual(
KW.mock_calls,
[
mock.call("koji"),
mock.call(compose),
mock.call().koji_proxy.getBuild("my-build-1.0-2"),
mock.call().koji_proxy.listArchives(123),
mock.call().koji_module.pathinfo.typedir({"build_id": 123}, "image"),
@ -657,6 +657,7 @@ class KojiSCMTestCase(SCMBaseTest):
[mock.call("http://koji.local/koji/images/abc.tar", mock.ANY)],
)
@mock.patch("pungi.wrappers.scm.KojiWrapper")
def test_get_from_latest_build(self, KW, dl):
compose = mock.Mock(conf={"koji_profile": "koji"})
@ -676,7 +677,7 @@ class KojiSCMTestCase(SCMBaseTest):
self.assertEqual(
KW.mock_calls,
[
mock.call("koji"),
mock.call(compose),
mock.call().koji_proxy.listTagged(
"images", package="my-build", inherit=True, latest=True
),