Adding multithreading support for pungi/phases/image_checksum.py
Multithreading was added to parallelize the computation of image checksums. Resulting memory structures are protected via synchronization primitives. Max number of threads is uncapped- experiments were done to determine whether a maximum number of threads would yield greater efficiency and there were no gains from this. Likewise, experiments were done to determine whether pools of threads computed in separate processes could likewise decrease compute-time. Evidence did not suggest that this was the case. This indicate that the checksum operation is bounded by I/O read/write times. Merges: https://pagure.io/pungi/pull-request/1520 Jira: RHELCMP-5967 Signed-off-by: James Kunstle jkunstle@redhat.com
This commit is contained in:
parent
5a8df7b69c
commit
3349585d78
1
.gitignore
vendored
1
.gitignore
vendored
@ -13,3 +13,4 @@ htmlcov/
|
|||||||
.coverage
|
.coverage
|
||||||
.idea/
|
.idea/
|
||||||
.tox
|
.tox
|
||||||
|
.venv
|
||||||
|
@ -3,6 +3,7 @@
|
|||||||
import os
|
import os
|
||||||
from kobo import shortcuts
|
from kobo import shortcuts
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
|
import threading
|
||||||
|
|
||||||
from .base import PhaseBase
|
from .base import PhaseBase
|
||||||
from ..util import get_format_substs, get_file_size
|
from ..util import get_format_substs, get_file_size
|
||||||
@ -68,6 +69,7 @@ class ImageChecksumPhase(PhaseBase):
|
|||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
topdir = self.compose.paths.compose.topdir()
|
topdir = self.compose.paths.compose.topdir()
|
||||||
|
|
||||||
make_checksums(
|
make_checksums(
|
||||||
topdir,
|
topdir,
|
||||||
self.compose.im,
|
self.compose.im,
|
||||||
@ -87,6 +89,8 @@ def _compute_checksums(
|
|||||||
checksum_types,
|
checksum_types,
|
||||||
base_checksum_name_gen,
|
base_checksum_name_gen,
|
||||||
one_file,
|
one_file,
|
||||||
|
results_lock,
|
||||||
|
cache_lock,
|
||||||
):
|
):
|
||||||
for image in images:
|
for image in images:
|
||||||
filename = os.path.basename(image.path)
|
filename = os.path.basename(image.path)
|
||||||
@ -96,14 +100,21 @@ def _compute_checksums(
|
|||||||
|
|
||||||
filesize = image.size or get_file_size(full_path)
|
filesize = image.size or get_file_size(full_path)
|
||||||
|
|
||||||
|
cache_lock.acquire()
|
||||||
if full_path not in cache:
|
if full_path not in cache:
|
||||||
|
cache_lock.release()
|
||||||
# Source ISO is listed under each binary architecture. There's no
|
# Source ISO is listed under each binary architecture. There's no
|
||||||
# point in checksumming it twice, so we can just remember the
|
# point in checksumming it twice, so we can just remember the
|
||||||
# digest from first run..
|
# digest from first run..
|
||||||
cache[full_path] = shortcuts.compute_file_checksums(
|
checksum_value = shortcuts.compute_file_checksums(full_path, checksum_types)
|
||||||
full_path, checksum_types
|
with cache_lock:
|
||||||
)
|
cache[full_path] = checksum_value
|
||||||
digests = cache[full_path]
|
else:
|
||||||
|
cache_lock.release()
|
||||||
|
|
||||||
|
with cache_lock:
|
||||||
|
digests = cache[full_path]
|
||||||
|
|
||||||
for checksum, digest in digests.items():
|
for checksum, digest in digests.items():
|
||||||
# Update metadata with the checksum
|
# Update metadata with the checksum
|
||||||
image.add_checksum(None, checksum, digest)
|
image.add_checksum(None, checksum, digest)
|
||||||
@ -112,7 +123,10 @@ def _compute_checksums(
|
|||||||
checksum_filename = os.path.join(
|
checksum_filename = os.path.join(
|
||||||
path, "%s.%sSUM" % (filename, checksum.upper())
|
path, "%s.%sSUM" % (filename, checksum.upper())
|
||||||
)
|
)
|
||||||
results[checksum_filename].add((filename, filesize, checksum, digest))
|
with results_lock:
|
||||||
|
results[checksum_filename].add(
|
||||||
|
(filename, filesize, checksum, digest)
|
||||||
|
)
|
||||||
|
|
||||||
if one_file:
|
if one_file:
|
||||||
dirname = os.path.basename(path)
|
dirname = os.path.basename(path)
|
||||||
@ -125,24 +139,42 @@ def _compute_checksums(
|
|||||||
checksum_filename = "%s%sSUM" % (base_checksum_name, checksum.upper())
|
checksum_filename = "%s%sSUM" % (base_checksum_name, checksum.upper())
|
||||||
checksum_path = os.path.join(path, checksum_filename)
|
checksum_path = os.path.join(path, checksum_filename)
|
||||||
|
|
||||||
results[checksum_path].add((filename, filesize, checksum, digest))
|
with results_lock:
|
||||||
|
results[checksum_path].add((filename, filesize, checksum, digest))
|
||||||
|
|
||||||
|
|
||||||
def make_checksums(topdir, im, checksum_types, one_file, base_checksum_name_gen):
|
def make_checksums(topdir, im, checksum_types, one_file, base_checksum_name_gen):
|
||||||
results = defaultdict(set)
|
results = defaultdict(set)
|
||||||
cache = {}
|
cache = {}
|
||||||
|
threads = []
|
||||||
|
results_lock = threading.Lock() # lock to synchronize access to the results dict.
|
||||||
|
cache_lock = threading.Lock() # lock to synchronize access to the cache dict.
|
||||||
|
|
||||||
|
# create all worker threads
|
||||||
for (variant, arch, path), images in get_images(topdir, im).items():
|
for (variant, arch, path), images in get_images(topdir, im).items():
|
||||||
_compute_checksums(
|
threads.append(
|
||||||
results,
|
threading.Thread(
|
||||||
cache,
|
target=_compute_checksums,
|
||||||
variant,
|
args=[
|
||||||
arch,
|
results,
|
||||||
path,
|
cache,
|
||||||
images,
|
variant,
|
||||||
checksum_types,
|
arch,
|
||||||
base_checksum_name_gen,
|
path,
|
||||||
one_file,
|
images,
|
||||||
|
checksum_types,
|
||||||
|
base_checksum_name_gen,
|
||||||
|
one_file,
|
||||||
|
results_lock,
|
||||||
|
cache_lock,
|
||||||
|
],
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
threads[-1].start()
|
||||||
|
|
||||||
|
# wait for all worker threads to finish
|
||||||
|
for thread in threads:
|
||||||
|
thread.join()
|
||||||
|
|
||||||
for file in results:
|
for file in results:
|
||||||
dump_checksums(file, results[file])
|
dump_checksums(file, results[file])
|
||||||
|
Loading…
Reference in New Issue
Block a user