gather: Collect and re-raise errors from gather method

When there is an exception in gathering (such as after seeing unsigned
packages in deps method), the exception was lost and the compose
continued to run until it tried to access the result and crashed on
KeyError.

Relates: https://pagure.io/releng/failed-composes/issue/587
JIRA: COMPOSE-3986
Signed-off-by: Lubomír Sedlář <lsedlar@redhat.com>
This commit is contained in:
Lubomír Sedlář 2019-11-25 14:28:34 +01:00
parent 6afbe6d20a
commit 7f35ac622a

View File

@ -443,6 +443,7 @@ def _gather_variants(result, compose, variant_type, package_sets, exclude_fulltr
continue continue
threads_list = [] threads_list = []
que = Queue() que = Queue()
errors = Queue()
for arch in variant.arches: for arch in variant.arches:
fulltree_excludes = set() fulltree_excludes = set()
if exclude_fulltree: if exclude_fulltree:
@ -454,11 +455,17 @@ def _gather_variants(result, compose, variant_type, package_sets, exclude_fulltr
# there. # there.
_update_lookaside_config(compose, variant, arch, result, package_sets) _update_lookaside_config(compose, variant, arch, result, package_sets)
def worker(que, errors, arch, *args, **kwargs):
try:
que.put((arch, gather_packages(*args, **kwargs)))
except Exception as exc:
errors.put(exc)
# Run gather_packages() in parallel with multi threads and store # Run gather_packages() in parallel with multi threads and store
# its return value in a Queue() for later use. # its return value in a Queue() for later use.
t = threading.Thread( t = threading.Thread(
target=lambda q, arch, *args, **kwargs: q.put((arch, gather_packages(*args, **kwargs))), target=worker,
args=(que, arch, compose, arch, variant, package_sets), args=(que, errors, arch, compose, arch, variant, package_sets),
kwargs={'fulltree_excludes': fulltree_excludes}, kwargs={'fulltree_excludes': fulltree_excludes},
) )
threads_list.append(t) threads_list.append(t)
@ -467,6 +474,10 @@ def _gather_variants(result, compose, variant_type, package_sets, exclude_fulltr
for t in threads_list: for t in threads_list:
t.join() t.join()
while not errors.empty():
exc = errors.get()
raise exc
while not que.empty(): while not que.empty():
arch, pkg_map = que.get() arch, pkg_map = que.get()
result.setdefault(arch, {})[variant.uid] = pkg_map result.setdefault(arch, {})[variant.uid] = pkg_map