ALBS-732: pungi: do not build anything if unsigned packages found #8
@ -22,6 +22,9 @@ It automatically finds a signed copies according to *sigkey_ordering*.
|
||||
import itertools
|
||||
import json
|
||||
import os
|
||||
|
||||
import pgpy
|
||||
import rpm
|
||||
from six.moves import cPickle as pickle
|
||||
|
||||
import kobo.log
|
||||
@ -493,8 +496,6 @@ class KojiPackageSet(PackageSetBase):
|
||||
|
||||
return response
|
||||
|
||||
|
||||
|
||||
def get_package_path(self, queue_item):
|
||||
rpm_info, build_info = queue_item
|
||||
|
||||
@ -834,7 +835,7 @@ class KojiMockPackageSet(PackageSetBase):
|
||||
and include in the package set. Useful when building testing compose
|
||||
with RPM scratch builds.
|
||||
"""
|
||||
super(KojiMockPackageSet , self).__init__(
|
||||
super(KojiMockPackageSet, self).__init__(
|
||||
name,
|
||||
sigkey_ordering=sigkey_ordering,
|
||||
arches=arches,
|
||||
@ -849,6 +850,8 @@ class KojiMockPackageSet(PackageSetBase):
|
||||
self.extra_builds = extra_builds or []
|
||||
self.extra_tasks = extra_tasks or []
|
||||
self.reuse = None
|
||||
self.sigkey_ordering = [sigkey.lower() for sigkey in sigkey_ordering] \
|
||||
or [None]
|
||||
|
||||
def __getstate__(self):
|
||||
result = self.__dict__.copy()
|
||||
@ -965,6 +968,20 @@ class KojiMockPackageSet(PackageSetBase):
|
||||
|
||||
return response
|
||||
|
||||
def _is_rpm_signed(self, rpm_path) -> bool:
|
||||
ts = rpm.TransactionSet()
|
||||
ts.setVSFlags(rpm._RPMVSF_NOSIGNATURES)
|
||||
with open(rpm_path, 'rb') as fd:
|
||||
header = ts.hdrFromFdno(fd)
|
||||
signature = header[rpm.RPMTAG_SIGGPG] or header[rpm.RPMTAG_SIGPGP]
|
||||
if not signature:
|
||||
return False
|
||||
pgp_msg = pgpy.PGPMessage.from_blob(signature)
|
||||
return any(
|
||||
signature.signer.lower() in self.sigkey_ordering
|
||||
for signature in pgp_msg.signatures
|
||||
)
|
||||
|
||||
def get_package_path(self, queue_item):
|
||||
rpm_info, build_info = queue_item
|
||||
|
||||
@ -982,6 +999,13 @@ class KojiMockPackageSet(PackageSetBase):
|
||||
|
||||
rpm_path = os.path.join(pathinfo.topdir, pathinfo.rpm(rpm_info))
|
||||
if os.path.isfile(rpm_path):
|
||||
if not self._is_rpm_signed(rpm_path):
|
||||
self._invalid_sigkey_rpms.append(rpm_info)
|
||||
self.log_error(
|
||||
'RPM "%s" not found for sigs: "%s". Path checked: "%s"',
|
||||
rpm_info, self.sigkey_ordering, rpm_path
|
||||
)
|
||||
return
|
||||
return rpm_path
|
||||
else:
|
||||
self.log_warning("RPM %s not found" % rpm_path)
|
||||
|
Loading…
Reference in New Issue
Block a user