createrepo: Allow customizing number of threads

The default is now to use one thread per CPU.

Signed-off-by: Lubomír Sedlář <lsedlar@redhat.com>
This commit is contained in:
Lubomír Sedlář 2017-10-03 15:09:53 +02:00
parent 44c523339c
commit dec00fe2f4
4 changed files with 25 additions and 5 deletions

View File

@ -399,6 +399,10 @@ Options
(*bool*) -- whether to pass ``--xz`` to the createrepo command. This will (*bool*) -- whether to pass ``--xz`` to the createrepo command. This will
cause the SQLite databases to be compressed with xz. cause the SQLite databases to be compressed with xz.
**createrepo_num_threads**
(*int*) -- how many concurrent ``createrepo`` process to run. The default
is to use one thread per CPU available on the machine.
**product_id** = None **product_id** = None
(:ref:`scm_dict <scm_support>`) -- If specified, it should point to a (:ref:`scm_dict <scm_support>`) -- If specified, it should point to a
directory with certificates ``<variant_uid>-<arch>-*.pem``. This directory with certificates ``<variant_uid>-<arch>-*.pem``. This

View File

@ -45,6 +45,7 @@ import jsonschema
import six import six
from kobo.shortcuts import force_list from kobo.shortcuts import force_list
from productmd.composeinfo import COMPOSE_TYPES from productmd.composeinfo import COMPOSE_TYPES
import multiprocessing
from . import util from . import util
@ -605,6 +606,10 @@ def make_schema():
"type": "boolean", "type": "boolean",
"default": False, "default": False,
}, },
"createrepo_num_threads": {
"type": "number",
"default": get_num_cpus(),
},
"repoclosure_strictness": _variant_arch_mapping({ "repoclosure_strictness": _variant_arch_mapping({
"type": "string", "type": "string",
"default": "lenient", "default": "lenient",
@ -1117,6 +1122,13 @@ def _one_or_list(value):
} }
def get_num_cpus():
try:
return multiprocessing.cpu_count()
except NotImplementedError:
return 3
# This is a mapping of configuration option dependencies and conflicts. # This is a mapping of configuration option dependencies and conflicts.
# #
# The key in this mapping is the trigger for the check. When the option is # The key in this mapping is the trigger for the check. When the option is

View File

@ -63,7 +63,7 @@ class CreaterepoPhase(PhaseBase):
def run(self): def run(self):
get_productids_from_scm(self.compose) get_productids_from_scm(self.compose)
for i in range(3): for i in range(self.compose.conf['createrepo_num_threads']):
self.pool.add(CreaterepoThread(self.pool)) self.pool.add(CreaterepoThread(self.pool))
for variant in self.compose.get_variants(): for variant in self.compose.get_variants():

View File

@ -59,8 +59,10 @@ class TestCreaterepoPhase(PungiTestCase):
self.assertIn('deltas', str(ctx.exception)) self.assertIn('deltas', str(ctx.exception))
@mock.patch('pungi.checks.get_num_cpus')
@mock.patch('pungi.phases.createrepo.ThreadPool') @mock.patch('pungi.phases.createrepo.ThreadPool')
def test_starts_jobs(self, ThreadPoolCls): def test_starts_jobs(self, ThreadPoolCls, get_num_cpus):
get_num_cpus.return_value = 5
compose = DummyCompose(self.topdir, {}) compose = DummyCompose(self.topdir, {})
pool = ThreadPoolCls.return_value pool = ThreadPoolCls.return_value
@ -69,7 +71,7 @@ class TestCreaterepoPhase(PungiTestCase):
phase.run() phase.run()
self.maxDiff = None self.maxDiff = None
self.assertEqual(len(pool.add.mock_calls), 3) self.assertEqual(len(pool.add.mock_calls), 5)
self.assertItemsEqual( self.assertItemsEqual(
pool.queue_put.mock_calls, pool.queue_put.mock_calls,
[mock.call((compose, 'x86_64', compose.variants['Server'], 'rpm')), [mock.call((compose, 'x86_64', compose.variants['Server'], 'rpm')),
@ -86,8 +88,10 @@ class TestCreaterepoPhase(PungiTestCase):
mock.call((compose, 'amd64', compose.variants['Client'], 'debuginfo')), mock.call((compose, 'amd64', compose.variants['Client'], 'debuginfo')),
mock.call((compose, None, compose.variants['Client'], 'srpm'))]) mock.call((compose, None, compose.variants['Client'], 'srpm'))])
@mock.patch('pungi.checks.get_num_cpus')
@mock.patch('pungi.phases.createrepo.ThreadPool') @mock.patch('pungi.phases.createrepo.ThreadPool')
def test_skips_empty_variants(self, ThreadPoolCls): def test_skips_empty_variants(self, ThreadPoolCls, get_num_cpus):
get_num_cpus.return_value = 5
compose = DummyCompose(self.topdir, {}) compose = DummyCompose(self.topdir, {})
compose.variants['Client'].is_empty = True compose.variants['Client'].is_empty = True
@ -97,7 +101,7 @@ class TestCreaterepoPhase(PungiTestCase):
phase.run() phase.run()
self.maxDiff = None self.maxDiff = None
self.assertEqual(len(pool.add.mock_calls), 3) self.assertEqual(len(pool.add.mock_calls), 5)
self.assertItemsEqual( self.assertItemsEqual(
pool.queue_put.mock_calls, pool.queue_put.mock_calls,
[mock.call((compose, 'x86_64', compose.variants['Server'], 'rpm')), [mock.call((compose, 'x86_64', compose.variants['Server'], 'rpm')),