diff --git a/pungi/phases/pkgset/common.py b/pungi/phases/pkgset/common.py index ea712995..2c59b252 100644 --- a/pungi/phases/pkgset/common.py +++ b/pungi/phases/pkgset/common.py @@ -22,7 +22,7 @@ from kobo.threads import run_in_threads from pungi.arch import get_valid_arches from pungi.wrappers.createrepo import CreaterepoWrapper -from pungi.util import is_arch_multilib +from pungi.util import is_arch_multilib, PartialFuncWorkerThread, PartialFuncThreadPool from pungi.module_util import Modulemd, collect_module_defaults from pungi.phases.createrepo import add_modular_metadata @@ -221,6 +221,40 @@ class MaterializedPackageSet(object): return klass(package_sets, paths) + @classmethod + def create_many(klass, create_partials): + """ + Creates multiple MaterializedPackageSet in threads. + + :param list of functools.partial create_partials: List of Partial objects + created using functools.partial(MaterializedPackageSet.create, compose, + pkgset_global, path_prefix, mmd=mmd). + :return: List of MaterializedPackageSet objects. + """ + # Create two pools - small pool for small package sets and big pool for + # big package sets. This ensure there will not be too many createrepo + # tasks which would need lot of CPU or memory at the same time. + big_pool = PartialFuncThreadPool() + big_pool.add(PartialFuncWorkerThread(big_pool)) + small_pool = PartialFuncThreadPool() + for i in range(10): + small_pool.add(PartialFuncWorkerThread(small_pool)) + + # Divide the package sets into big_pool/small_pool based on their size. + for partial in create_partials: + pkgset = partial.args[1] + if len(pkgset) < 500: + small_pool.queue_put(partial) + else: + big_pool.queue_put(partial) + + small_pool.start() + big_pool.start() + small_pool.stop() + big_pool.stop() + + return small_pool.results + big_pool.results + def get_all_arches(compose): all_arches = set(["src"]) diff --git a/pungi/phases/pkgset/sources/source_koji.py b/pungi/phases/pkgset/sources/source_koji.py index c00c2ccf..3d912b54 100644 --- a/pungi/phases/pkgset/sources/source_koji.py +++ b/pungi/phases/pkgset/sources/source_koji.py @@ -17,6 +17,7 @@ import os import json import re +import functools from fnmatch import fnmatch from itertools import groupby @@ -723,15 +724,23 @@ def populate_global_pkgset(compose, koji_wrapper, path_prefix, event): # Optimization for case where we have just single compose # tag - we do not have to merge in this case... variant.pkgsets.add(compose_tag) - pkgsets.append( - MaterializedPackageSet.create( - compose, pkgset, path_prefix, mmd=tag_to_mmd.get(pkgset.name) - ), - ) pkgset.write_reuse_file(compose, include_packages=modular_packages) + pkgsets.append(pkgset) - return pkgsets + # Create MaterializedPackageSets. + partials = [] + for pkgset in pkgsets: + partials.append( + functools.partial( + MaterializedPackageSet.create, + compose, + pkgset, + path_prefix, + mmd=tag_to_mmd.get(pkgset.name), + ) + ) + return MaterializedPackageSet.create_many(partials) def get_koji_event_info(compose, koji_wrapper): diff --git a/pungi/util.py b/pungi/util.py index 007ba8e1..21bb0bb2 100644 --- a/pungi/util.py +++ b/pungi/util.py @@ -32,6 +32,7 @@ from six.moves import urllib, range, shlex_quote import kobo.conf from kobo.shortcuts import run, force_list +from kobo.threads import WorkerThread, ThreadPool from productmd.common import get_major_version # Patterns that match all names of debuginfo packages @@ -1037,3 +1038,37 @@ def as_local_file(url): else: # Not a remote url, return unchanged. yield url + + +class PartialFuncWorkerThread(WorkerThread): + """ + Worker thread executing partial_func and storing results + in the PartialFuncThreadPool. + """ + + def process(self, partial_func, num): + self.pool._results.append(partial_func()) + + +class PartialFuncThreadPool(ThreadPool): + """ + Thread pool for PartialFuncWorkerThread threads. + + Example: + + # Execute `pow` in one thread and print result. + pool = PartialFuncThreadPool() + pool.add(PartialFuncWorkerThread(pool)) + pool.queue_put(functools.partial(pow, 323, 1235)) + pool.start() + pool.stop() + print(pool.results) + """ + + def __init__(self, logger=None): + ThreadPool.__init__(self, logger) + self._results = [] + + @property + def results(self): + return self._results