Create MaterializedPackageSets in threads to make pkgset faster.

When modules are used, there are lot of small package sets. These
package sets have usually less than 500 packages. The createrepo
part of `MaterializedPackageSet.create` executed for such small
set of packages takes around 1 second. Most of this time
the createrepo_c runs in single thread. It does the initialization,
it writes the XML files, ...

The parts of createrepo which can be run in parallel and therefore
would use all the CPUs are quite small for very small package sets.

This commit therefore executes multiple threads with
`MaterializedPackageSet.create` for these very small package sets.

This saves around 40 seconds from pkgset phase for RHEL compose.

Signed-off-by: Jan Kaluza <jkaluza@redhat.com>
This commit is contained in:
Jan Kaluza 2020-03-12 07:34:54 +01:00
parent 9391ce3065
commit a209bda73c
3 changed files with 85 additions and 7 deletions

View File

@ -22,7 +22,7 @@ from kobo.threads import run_in_threads
from pungi.arch import get_valid_arches
from pungi.wrappers.createrepo import CreaterepoWrapper
from pungi.util import is_arch_multilib
from pungi.util import is_arch_multilib, PartialFuncWorkerThread, PartialFuncThreadPool
from pungi.module_util import Modulemd, collect_module_defaults
from pungi.phases.createrepo import add_modular_metadata
@ -221,6 +221,40 @@ class MaterializedPackageSet(object):
return klass(package_sets, paths)
@classmethod
def create_many(klass, create_partials):
"""
Creates multiple MaterializedPackageSet in threads.
:param list of functools.partial create_partials: List of Partial objects
created using functools.partial(MaterializedPackageSet.create, compose,
pkgset_global, path_prefix, mmd=mmd).
:return: List of MaterializedPackageSet objects.
"""
# Create two pools - small pool for small package sets and big pool for
# big package sets. This ensure there will not be too many createrepo
# tasks which would need lot of CPU or memory at the same time.
big_pool = PartialFuncThreadPool()
big_pool.add(PartialFuncWorkerThread(big_pool))
small_pool = PartialFuncThreadPool()
for i in range(10):
small_pool.add(PartialFuncWorkerThread(small_pool))
# Divide the package sets into big_pool/small_pool based on their size.
for partial in create_partials:
pkgset = partial.args[1]
if len(pkgset) < 500:
small_pool.queue_put(partial)
else:
big_pool.queue_put(partial)
small_pool.start()
big_pool.start()
small_pool.stop()
big_pool.stop()
return small_pool.results + big_pool.results
def get_all_arches(compose):
all_arches = set(["src"])

View File

@ -17,6 +17,7 @@
import os
import json
import re
import functools
from fnmatch import fnmatch
from itertools import groupby
@ -723,15 +724,23 @@ def populate_global_pkgset(compose, koji_wrapper, path_prefix, event):
# Optimization for case where we have just single compose
# tag - we do not have to merge in this case...
variant.pkgsets.add(compose_tag)
pkgsets.append(
MaterializedPackageSet.create(
compose, pkgset, path_prefix, mmd=tag_to_mmd.get(pkgset.name)
),
)
pkgset.write_reuse_file(compose, include_packages=modular_packages)
pkgsets.append(pkgset)
return pkgsets
# Create MaterializedPackageSets.
partials = []
for pkgset in pkgsets:
partials.append(
functools.partial(
MaterializedPackageSet.create,
compose,
pkgset,
path_prefix,
mmd=tag_to_mmd.get(pkgset.name),
)
)
return MaterializedPackageSet.create_many(partials)
def get_koji_event_info(compose, koji_wrapper):

View File

@ -32,6 +32,7 @@ from six.moves import urllib, range, shlex_quote
import kobo.conf
from kobo.shortcuts import run, force_list
from kobo.threads import WorkerThread, ThreadPool
from productmd.common import get_major_version
# Patterns that match all names of debuginfo packages
@ -1037,3 +1038,37 @@ def as_local_file(url):
else:
# Not a remote url, return unchanged.
yield url
class PartialFuncWorkerThread(WorkerThread):
"""
Worker thread executing partial_func and storing results
in the PartialFuncThreadPool.
"""
def process(self, partial_func, num):
self.pool._results.append(partial_func())
class PartialFuncThreadPool(ThreadPool):
"""
Thread pool for PartialFuncWorkerThread threads.
Example:
# Execute `pow` in one thread and print result.
pool = PartialFuncThreadPool()
pool.add(PartialFuncWorkerThread(pool))
pool.queue_put(functools.partial(pow, 323, 1235))
pool.start()
pool.stop()
print(pool.results)
"""
def __init__(self, logger=None):
ThreadPool.__init__(self, logger)
self._results = []
@property
def results(self):
return self._results