From a209bda73c9a8b15d33d9bcb2cbe7da09b394f0f Mon Sep 17 00:00:00 2001 From: Jan Kaluza Date: Thu, 12 Mar 2020 07:34:54 +0100 Subject: [PATCH] Create MaterializedPackageSets in threads to make pkgset faster. When modules are used, there are lot of small package sets. These package sets have usually less than 500 packages. The createrepo part of `MaterializedPackageSet.create` executed for such small set of packages takes around 1 second. Most of this time the createrepo_c runs in single thread. It does the initialization, it writes the XML files, ... The parts of createrepo which can be run in parallel and therefore would use all the CPUs are quite small for very small package sets. This commit therefore executes multiple threads with `MaterializedPackageSet.create` for these very small package sets. This saves around 40 seconds from pkgset phase for RHEL compose. Signed-off-by: Jan Kaluza --- pungi/phases/pkgset/common.py | 36 +++++++++++++++++++++- pungi/phases/pkgset/sources/source_koji.py | 21 +++++++++---- pungi/util.py | 35 +++++++++++++++++++++ 3 files changed, 85 insertions(+), 7 deletions(-) diff --git a/pungi/phases/pkgset/common.py b/pungi/phases/pkgset/common.py index ea712995..2c59b252 100644 --- a/pungi/phases/pkgset/common.py +++ b/pungi/phases/pkgset/common.py @@ -22,7 +22,7 @@ from kobo.threads import run_in_threads from pungi.arch import get_valid_arches from pungi.wrappers.createrepo import CreaterepoWrapper -from pungi.util import is_arch_multilib +from pungi.util import is_arch_multilib, PartialFuncWorkerThread, PartialFuncThreadPool from pungi.module_util import Modulemd, collect_module_defaults from pungi.phases.createrepo import add_modular_metadata @@ -221,6 +221,40 @@ class MaterializedPackageSet(object): return klass(package_sets, paths) + @classmethod + def create_many(klass, create_partials): + """ + Creates multiple MaterializedPackageSet in threads. + + :param list of functools.partial create_partials: List of Partial objects + created using functools.partial(MaterializedPackageSet.create, compose, + pkgset_global, path_prefix, mmd=mmd). + :return: List of MaterializedPackageSet objects. + """ + # Create two pools - small pool for small package sets and big pool for + # big package sets. This ensure there will not be too many createrepo + # tasks which would need lot of CPU or memory at the same time. + big_pool = PartialFuncThreadPool() + big_pool.add(PartialFuncWorkerThread(big_pool)) + small_pool = PartialFuncThreadPool() + for i in range(10): + small_pool.add(PartialFuncWorkerThread(small_pool)) + + # Divide the package sets into big_pool/small_pool based on their size. + for partial in create_partials: + pkgset = partial.args[1] + if len(pkgset) < 500: + small_pool.queue_put(partial) + else: + big_pool.queue_put(partial) + + small_pool.start() + big_pool.start() + small_pool.stop() + big_pool.stop() + + return small_pool.results + big_pool.results + def get_all_arches(compose): all_arches = set(["src"]) diff --git a/pungi/phases/pkgset/sources/source_koji.py b/pungi/phases/pkgset/sources/source_koji.py index c00c2ccf..3d912b54 100644 --- a/pungi/phases/pkgset/sources/source_koji.py +++ b/pungi/phases/pkgset/sources/source_koji.py @@ -17,6 +17,7 @@ import os import json import re +import functools from fnmatch import fnmatch from itertools import groupby @@ -723,15 +724,23 @@ def populate_global_pkgset(compose, koji_wrapper, path_prefix, event): # Optimization for case where we have just single compose # tag - we do not have to merge in this case... variant.pkgsets.add(compose_tag) - pkgsets.append( - MaterializedPackageSet.create( - compose, pkgset, path_prefix, mmd=tag_to_mmd.get(pkgset.name) - ), - ) pkgset.write_reuse_file(compose, include_packages=modular_packages) + pkgsets.append(pkgset) - return pkgsets + # Create MaterializedPackageSets. + partials = [] + for pkgset in pkgsets: + partials.append( + functools.partial( + MaterializedPackageSet.create, + compose, + pkgset, + path_prefix, + mmd=tag_to_mmd.get(pkgset.name), + ) + ) + return MaterializedPackageSet.create_many(partials) def get_koji_event_info(compose, koji_wrapper): diff --git a/pungi/util.py b/pungi/util.py index 007ba8e1..21bb0bb2 100644 --- a/pungi/util.py +++ b/pungi/util.py @@ -32,6 +32,7 @@ from six.moves import urllib, range, shlex_quote import kobo.conf from kobo.shortcuts import run, force_list +from kobo.threads import WorkerThread, ThreadPool from productmd.common import get_major_version # Patterns that match all names of debuginfo packages @@ -1037,3 +1038,37 @@ def as_local_file(url): else: # Not a remote url, return unchanged. yield url + + +class PartialFuncWorkerThread(WorkerThread): + """ + Worker thread executing partial_func and storing results + in the PartialFuncThreadPool. + """ + + def process(self, partial_func, num): + self.pool._results.append(partial_func()) + + +class PartialFuncThreadPool(ThreadPool): + """ + Thread pool for PartialFuncWorkerThread threads. + + Example: + + # Execute `pow` in one thread and print result. + pool = PartialFuncThreadPool() + pool.add(PartialFuncWorkerThread(pool)) + pool.queue_put(functools.partial(pow, 323, 1235)) + pool.start() + pool.stop() + print(pool.results) + """ + + def __init__(self, logger=None): + ThreadPool.__init__(self, logger) + self._results = [] + + @property + def results(self): + return self._results