From 747a9e442fce1886274038341936dfaa3939d352 Mon Sep 17 00:00:00 2001 From: Jakub Jelen Date: Tue, 4 Jul 2023 16:14:04 +0200 Subject: [PATCH 40/41] Move code handling GPG keys to separate library This decouples gpg keys handling and some code duplication from the MissingGpgKeysInhibitor actor to separate library that will be usable from more actors. The new actor TrustedGpgKeysScanner actor is crated, which handles reading the source RPM DB and trusted keys directory and produces a new model describing what keys are supposed to be trusted on the target system. This also removes the code duplication for detecting the --no-gpgcheck and for defining the directory where to look for the gpg keys. Petr Stodulka updates: * updated docstrings for public functions in the shared library We want them documented better in comparison to functions in private (actor's) libraries as they could be used by everyone. * some functions are renamed: * read_gpg_fp_from_file -> get_gpg_fp_from_file * the_nogpgcheck_option_used -> is_nogpgcheck_set The related code has been updated. * use the gpg library in the shared dnfplugin library * make some unit-tests conditional so we know the results are always valid (skip if distro ID is not rhel or centos) * update tests and improve the test coverage Signed-off-by: Jakub Jelen --- .../actors/missinggpgkeysinhibitor/actor.py | 4 +- .../libraries/missinggpgkey.py | 153 ++-------------- .../tests/component_test_missinggpgkey.py | 109 ++++-------- .../tests/unit_test_missinggpgkey.py | 168 +----------------- .../libraries/userspacegen.py | 21 +-- .../actors/trustedgpgkeysscanner/actor.py | 21 +++ .../libraries/trustedgpgkeys.py | 38 ++++ .../tests/test_trustedgpgkeys.py | 87 +++++++++ .../common/libraries/dnfplugin.py | 9 +- repos/system_upgrade/common/libraries/gpg.py | 137 ++++++++++++++ .../common/libraries/tests/test_gpg.py | 147 +++++++++++++++ .../common/models/trustedgpgkeys.py | 19 ++ 12 files changed, 506 insertions(+), 407 deletions(-) create mode 100644 repos/system_upgrade/common/actors/trustedgpgkeysscanner/actor.py create mode 100644 repos/system_upgrade/common/actors/trustedgpgkeysscanner/libraries/trustedgpgkeys.py create mode 100644 repos/system_upgrade/common/actors/trustedgpgkeysscanner/tests/test_trustedgpgkeys.py create mode 100644 repos/system_upgrade/common/libraries/gpg.py create mode 100644 repos/system_upgrade/common/libraries/tests/test_gpg.py create mode 100644 repos/system_upgrade/common/models/trustedgpgkeys.py diff --git a/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/actor.py b/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/actor.py index 6f836a5b..faa96452 100644 --- a/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/actor.py +++ b/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/actor.py @@ -2,9 +2,9 @@ from leapp.actors import Actor from leapp.libraries.actor import missinggpgkey from leapp.models import ( DNFWorkaround, - InstalledRPM, TargetUserSpaceInfo, TMPTargetRepositoriesFacts, + TrustedGpgKeys, UsedTargetRepositories ) from leapp.reporting import Report @@ -28,7 +28,7 @@ class MissingGpgKeysInhibitor(Actor): name = 'missing_gpg_keys_inhibitor' consumes = ( - InstalledRPM, + TrustedGpgKeys, TMPTargetRepositoriesFacts, TargetUserSpaceInfo, UsedTargetRepositories, diff --git a/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/libraries/missinggpgkey.py b/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/libraries/missinggpgkey.py index 1880986d..9a806ca2 100644 --- a/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/libraries/missinggpgkey.py +++ b/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/libraries/missinggpgkey.py @@ -8,113 +8,21 @@ from six.moves import urllib from leapp import reporting from leapp.exceptions import StopActorExecutionError -from leapp.libraries.common import config -from leapp.libraries.common.config.version import get_source_major_version, get_target_major_version -from leapp.libraries.stdlib import api, run +from leapp.libraries.common.config.version import get_target_major_version +from leapp.libraries.common.gpg import get_gpg_fp_from_file, get_path_to_gpg_certs, is_nogpgcheck_set +from leapp.libraries.stdlib import api from leapp.models import ( DNFWorkaround, - InstalledRPM, TargetUserSpaceInfo, TMPTargetRepositoriesFacts, + TrustedGpgKeys, UsedTargetRepositories ) from leapp.utils.deprecation import suppress_deprecation -GPG_CERTS_FOLDER = 'rpm-gpg' FMT_LIST_SEPARATOR = '\n - ' -def _gpg_show_keys(key_path): - """ - Show keys in given file in version-agnostic manner - - This runs gpg --show-keys (EL8) or gpg --with-fingerprints (EL7) - to verify the given file exists, is readable and contains valid - OpenPGP key data, which is printed in parsable format (--with-colons). - """ - try: - cmd = ['gpg2'] - # RHEL7 gnupg requires different switches to get the same output - if get_source_major_version() == '7': - cmd.append('--with-fingerprint') - else: - cmd.append('--show-keys') - cmd += ['--with-colons', key_path] - # TODO: discussed, most likely the checked=False will be dropped - # and error will be handled in other functions - return run(cmd, split=True, checked=False) - except OSError as err: - # NOTE: this is hypothetic; gnupg2 has to be installed on RHEL 7+ - error = 'Failed to read fingerprint from GPG key {}: {}'.format(key_path, str(err)) - api.current_logger().error(error) - return {} - - -def _parse_fp_from_gpg(output): - """ - Parse the output of gpg --show-keys --with-colons. - - Return list of 8 characters fingerprints per each gpgkey for the given - output from stdlib.run() or None if some error occurred. Either the - command return non-zero exit code, the file does not exists, its not - readable or does not contain any openpgp data. - """ - if not output or output['exit_code']: - return [] - - # we are interested in the lines of the output starting with "pub:" - # the colons are used for separating the fields in output like this - # pub:-:4096:1:999F7CBF38AB71F4:1612983048:::-:::escESC::::::23::0: - # ^--------------^ this is the fingerprint we need - # ^------^ but RPM version is just the last 8 chars lowercase - # Also multiple gpg keys can be stored in the file, so go through all "pub" - # lines - gpg_fps = [] - for line in output['stdout']: - if not line or not line.startswith('pub:'): - continue - parts = line.split(':') - if len(parts) >= 4 and len(parts[4]) == 16: - gpg_fps.append(parts[4][8:].lower()) - else: - api.current_logger().warning( - 'Cannot parse the gpg2 output. Line: "{}"' - .format(line) - ) - - return gpg_fps - - -def _read_gpg_fp_from_file(key_path): - """ - Returns the list of public key fingerprints from the given file - - Logs warning in case no OpenPGP data found in the given file or it is not - readable for some reason. - """ - res = _gpg_show_keys(key_path) - fp = _parse_fp_from_gpg(res) - if not fp: - error = 'Unable to read OpenPGP keys from {}: {}'.format(key_path, res['stderr']) - api.current_logger().error(error) - return fp - - -def _get_path_to_gpg_certs(): - """ - Get path to the directory with trusted target gpg keys in leapp tree - """ - # XXX This is copy&paste from TargetUserspaceCreator actor. - # Potential changes need to happen in both places to keep them in sync. - target_major_version = get_target_major_version() - target_product_type = config.get_product_type('target') - certs_dir = target_major_version - # only beta is special in regards to the GPG signing keys - if target_product_type == 'beta': - certs_dir = '{}beta'.format(target_major_version) - return os.path.join(api.get_common_folder_path(GPG_CERTS_FOLDER), certs_dir) - - def _expand_vars(path): """ Expand variables like $releasever and $basearch to the target system version @@ -152,38 +60,6 @@ def _get_abs_file_path(target_userspace, file_url): return os.path.join('/', file_path) -def _pubkeys_from_rpms(installed_rpms): - """ - Return the list of fingerprints of GPG keys in RPM DB - - This function returns short 8 characters fingerprints of trusted GPG keys - "installed" in the source OS RPM database. These look like normal packages - named "gpg-pubkey" and the fingerprint is present in the version field. - """ - return [pkg.version for pkg in installed_rpms.items if pkg.name == 'gpg-pubkey'] - - -def _get_pubkeys(installed_rpms): - """ - Get pubkeys from installed rpms and the trusted directory - """ - pubkeys = _pubkeys_from_rpms(installed_rpms) - certs_path = _get_path_to_gpg_certs() - for certname in os.listdir(certs_path): - key_file = os.path.join(certs_path, certname) - fps = _read_gpg_fp_from_file(key_file) - if fps: - pubkeys += fps - # TODO: what about else: ? - # The warning is now logged in _read_gpg_fp_from_file. We can raise - # the priority of the message or convert it to report though. - return pubkeys - - -def _the_nogpgcheck_option_used(): - return config.get_env('LEAPP_NOGPGCHECK', False) == '1' - - def _consume_data(): try: used_target_repos = next(api.consume(UsedTargetRepositories)).repos @@ -199,10 +75,10 @@ def _consume_data(): 'Could not check for valid GPG keys', details={'details': 'No TMPTargetRepositoriesFacts facts'} ) try: - installed_rpms = next(api.consume(InstalledRPM)) + trusted_gpg_keys = next(api.consume(TrustedGpgKeys)) except StopIteration: raise StopActorExecutionError( - 'Could not check for valid GPG keys', details={'details': 'No InstalledRPM facts'} + 'Could not check for valid GPG keys', details={'details': 'No TrustedGpgKeys facts'} ) try: target_userspace = next(api.consume(TargetUserSpaceInfo)) @@ -211,7 +87,7 @@ def _consume_data(): 'Could not check for valid GPG keys', details={'details': 'No TargetUserSpaceInfo facts'} ) - return used_target_repos, target_repos, installed_rpms, target_userspace + return used_target_repos, target_repos, trusted_gpg_keys, target_userspace def _get_repo_gpgkey_urls(repo): @@ -274,7 +150,7 @@ def _report(title, summary, keys, inhibitor=False): ' prior the upgrade.' ' If you want to proceed the in-place upgrade without checking any RPM' ' signatures, execute leapp with the `--nogpgcheck` option.' - .format(_get_path_to_gpg_certs()) + .format(get_path_to_gpg_certs()) ) groups = [reporting.Groups.REPOSITORY] if inhibitor: @@ -306,7 +182,7 @@ def _report_missing_keys(keys): summary = ( 'Some of the target repositories require GPG keys that are not installed' ' in the current RPM DB or are not stored in the {trust_dir} directory.' - .format(trust_dir=_get_path_to_gpg_certs()) + .format(trust_dir=get_path_to_gpg_certs()) ) _report('Detected unknown GPG keys for target system repositories', summary, keys, True) @@ -383,7 +259,7 @@ def register_dnfworkaround(): api.produce(DNFWorkaround( display_name='import trusted gpg keys to RPM DB', script_path=api.current_actor().get_common_tool_path('importrpmgpgkeys'), - script_args=[_get_path_to_gpg_certs()], + script_args=[get_path_to_gpg_certs()], )) @@ -396,11 +272,11 @@ def process(): them from model TMPTargetRepositoriesFacts. """ # when the user decided to ignore gpg signatures on the packages, we can ignore these checks altogether - if _the_nogpgcheck_option_used(): + if is_nogpgcheck_set(): api.current_logger().warning('The --nogpgcheck option is used: skipping all related checks.') return - used_target_repos, target_repos, installed_rpms, target_userspace = _consume_data() + used_target_repos, target_repos, trusted_gpg_keys, target_userspace = _consume_data() target_repo_id_to_repositories_facts_map = { repo.repoid: repo @@ -415,8 +291,7 @@ def process(): invalid_keys = list() repos_missing_keys = list() - # These are used only for getting the installed gpg-pubkey "packages" - pubkeys = _get_pubkeys(installed_rpms) + pubkeys = [key.fingerprint for key in trusted_gpg_keys.items] processed_gpgkey_urls = set() tmpdir = None for repoid in used_target_repos: @@ -454,7 +329,7 @@ def process(): api.current_logger().error( 'Skipping unknown protocol for gpgkey {}'.format(gpgkey_url)) continue - fps = _read_gpg_fp_from_file(key_file) + fps = get_gpg_fp_from_file(key_file) if not fps: invalid_keys.append(gpgkey_url) api.current_logger().warning( diff --git a/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/tests/component_test_missinggpgkey.py b/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/tests/component_test_missinggpgkey.py index 7da13cec..6d3fa0b2 100644 --- a/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/tests/component_test_missinggpgkey.py +++ b/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/tests/component_test_missinggpgkey.py @@ -3,12 +3,13 @@ from six.moves.urllib.error import URLError from leapp import reporting from leapp.exceptions import StopActorExecutionError -from leapp.libraries.actor.missinggpgkey import _pubkeys_from_rpms, process +from leapp.libraries.actor.missinggpgkey import process +from leapp.libraries.common.gpg import get_pubkeys_from_rpms from leapp.libraries.common.testutils import create_report_mocked, CurrentActorMocked, logger_mocked, produce_mocked from leapp.libraries.stdlib import api from leapp.models import ( DNFWorkaround, - InstalledRPM, + GpgKey, Report, RepositoriesFacts, RepositoryData, @@ -16,6 +17,7 @@ from leapp.models import ( RPM, TargetUserSpaceInfo, TMPTargetRepositoriesFacts, + TrustedGpgKeys, UsedTargetRepositories, UsedTargetRepository ) @@ -26,59 +28,21 @@ from leapp.utils.deprecation import suppress_deprecation # whole process as I was initially advised not to use these component tests. -def _get_test_installedrpm_no_my_key(): +def _get_test_gpgkeys_missing(): """ - Valid RPM packages missing the key we are looking for (epel9) + Return list of Trusted GPG keys without the epel9 key we look for """ return [ - RPM( - name='rpm', - version='4.16.1.3', - release='17.el9', - epoch='0', - packager='Red Hat, Inc. ', - arch='x86_64', - pgpsig='RSA/SHA256, Mon 08 Aug 2022 09:10:15 AM UTC, Key ID 199e2f91fd431d51', - repository='BaseOS', - ), - RPM( - name='gpg-pubkey', - version='fd431d51', - release='4ae0493b', - epoch='0', - packager='Red Hat, Inc. (release key 2) ', - arch='noarch', - pgpsig='' - ), - RPM( - name='gpg-pubkey', - version='5a6340b3', - release='6229229e', - epoch='0', - packager='Red Hat, Inc. (auxiliary key 3) ', - arch='noarch', - pgpsig='' - ), + GpgKey(fingerprint='fd431d51', rpmdb=True), + GpgKey(fingerprint='5a6340b3', rpmdb=True), ] -def _get_test_installedrpm(): +def _get_test_gpgkeys(): """ - All test RPMS packages + Return all the Trusted GPG keys for a test """ - return InstalledRPM( - items=[ - RPM( - name='gpg-pubkey', - version='3228467c', - release='613798eb', - epoch='0', - packager='Fedora (epel9) ', - arch='noarch', - pgpsig='' - ), - ] + _get_test_installedrpm_no_my_key(), - ) + return TrustedGpgKeys(items=[GpgKey(fingerprint='3228467c', rpmdb=True)] + _get_test_gpgkeys_missing()) def _get_test_targuserspaceinfo(path='/'): @@ -189,7 +153,7 @@ def test_perform_nogpgcheck(monkeypatch): monkeypatch.setattr(api, 'current_actor', CurrentActorMocked( envars={'LEAPP_NOGPGCHECK': '1'}, msgs=[ - _get_test_installedrpm(), + _get_test_gpgkeys(), _get_test_usedtargetrepositories(), _get_test_tmptargetrepositoriesfacts(), ], @@ -206,13 +170,13 @@ def test_perform_nogpgcheck(monkeypatch): @pytest.mark.parametrize('msgs', [ [], - [_get_test_installedrpm], + [_get_test_gpgkeys], [_get_test_usedtargetrepositories], [_get_test_tmptargetrepositoriesfacts], # These are just incomplete lists of required facts - [_get_test_installedrpm(), _get_test_usedtargetrepositories()], + [_get_test_gpgkeys(), _get_test_usedtargetrepositories()], [_get_test_usedtargetrepositories(), _get_test_tmptargetrepositoriesfacts()], - [_get_test_installedrpm(), _get_test_tmptargetrepositoriesfacts()], + [_get_test_gpgkeys(), _get_test_tmptargetrepositoriesfacts()], ]) def test_perform_missing_facts(monkeypatch, msgs): """ @@ -238,7 +202,7 @@ def test_perform_missing_facts(monkeypatch, msgs): @suppress_deprecation(TMPTargetRepositoriesFacts) def _get_test_tmptargetrepositoriesfacts_partial(): return [ - _get_test_installedrpm(), + _get_test_gpgkeys(), _get_test_usedtargetrepositories(), TMPTargetRepositoriesFacts( repositories=[ @@ -298,7 +262,7 @@ def _get_pubkeys_mocked(installed_rpms): """ This skips getting fps from files in container for simplification """ - return _pubkeys_from_rpms(installed_rpms) + return get_pubkeys_from_rpms(installed_rpms) def test_perform_missing_some_repo_facts(monkeypatch): @@ -314,7 +278,7 @@ def test_perform_missing_some_repo_facts(monkeypatch): monkeypatch.setattr(api, 'produce', produce_mocked()) monkeypatch.setattr(api, 'current_logger', logger_mocked()) monkeypatch.setattr(reporting, 'create_report', create_report_mocked()) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._gpg_show_keys', _gpg_show_keys_mocked) + monkeypatch.setattr('leapp.libraries.common.gpg._gpg_show_keys', _gpg_show_keys_mocked) with pytest.raises(StopActorExecutionError): process() @@ -326,7 +290,7 @@ def test_perform_missing_some_repo_facts(monkeypatch): def _get_test_tmptargetrepositoriesfacts_https_unused(): return [ _get_test_targuserspaceinfo(), - _get_test_installedrpm(), + _get_test_gpgkeys(), _get_test_usedtargetrepositories(), TMPTargetRepositoriesFacts( repositories=[ @@ -362,8 +326,7 @@ def test_perform_https_gpgkey_unused(monkeypatch): monkeypatch.setattr(api, 'produce', produce_mocked()) monkeypatch.setattr(api, 'current_logger', logger_mocked()) monkeypatch.setattr(reporting, 'create_report', create_report_mocked()) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._gpg_show_keys', _gpg_show_keys_mocked) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._get_pubkeys', _get_pubkeys_mocked) + monkeypatch.setattr('leapp.libraries.common.gpg._gpg_show_keys', _gpg_show_keys_mocked) process() assert not api.current_logger.warnmsg @@ -376,7 +339,7 @@ def test_perform_https_gpgkey_unused(monkeypatch): def get_test_tmptargetrepositoriesfacts_https(): return ( _get_test_targuserspaceinfo(), - _get_test_installedrpm(), + _get_test_gpgkeys(), UsedTargetRepositories( repos=_get_test_usedtargetrepositories_list() + [ UsedTargetRepository( @@ -409,7 +372,7 @@ def get_test_tmptargetrepositoriesfacts_https(): def get_test_tmptargetrepositoriesfacts_ftp(): return ( _get_test_targuserspaceinfo(), - _get_test_installedrpm(), + _get_test_gpgkeys(), UsedTargetRepositories( repos=_get_test_usedtargetrepositories_list() + [ UsedTargetRepository( @@ -454,8 +417,7 @@ def test_perform_https_gpgkey(monkeypatch): monkeypatch.setattr(api, 'produce', produce_mocked()) monkeypatch.setattr(api, 'current_logger', logger_mocked()) monkeypatch.setattr(reporting, 'create_report', create_report_mocked()) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._gpg_show_keys', _gpg_show_keys_mocked) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._get_pubkeys', _get_pubkeys_mocked) + monkeypatch.setattr('leapp.libraries.common.gpg._gpg_show_keys', _gpg_show_keys_mocked) monkeypatch.setattr('six.moves.urllib.request.urlretrieve', _urlretrive_mocked) process() @@ -482,8 +444,7 @@ def test_perform_https_gpgkey_urlerror(monkeypatch): monkeypatch.setattr(api, 'produce', produce_mocked()) monkeypatch.setattr(api, 'current_logger', logger_mocked()) monkeypatch.setattr(reporting, 'create_report', create_report_mocked()) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._gpg_show_keys', _gpg_show_keys_mocked) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._get_pubkeys', _get_pubkeys_mocked) + monkeypatch.setattr('leapp.libraries.common.gpg._gpg_show_keys', _gpg_show_keys_mocked) monkeypatch.setattr('six.moves.urllib.request.urlretrieve', _urlretrive_mocked_urlerror) process() @@ -508,8 +469,7 @@ def test_perform_ftp_gpgkey(monkeypatch): monkeypatch.setattr(api, 'produce', produce_mocked()) monkeypatch.setattr(api, 'current_logger', logger_mocked()) monkeypatch.setattr(reporting, 'create_report', create_report_mocked()) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._gpg_show_keys', _gpg_show_keys_mocked) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._get_pubkeys', _get_pubkeys_mocked) + monkeypatch.setattr('leapp.libraries.common.gpg._gpg_show_keys', _gpg_show_keys_mocked) process() assert len(api.current_logger.errmsg) == 1 @@ -525,7 +485,7 @@ def test_perform_ftp_gpgkey(monkeypatch): def get_test_data_missing_key(): return [ _get_test_targuserspaceinfo(), - InstalledRPM(items=_get_test_installedrpm_no_my_key()), + TrustedGpgKeys(items=_get_test_gpgkeys_missing()), _get_test_usedtargetrepositories(), _get_test_tmptargetrepositoriesfacts(), ] @@ -543,8 +503,7 @@ def test_perform_report(monkeypatch): monkeypatch.setattr(api, 'produce', produce_mocked()) monkeypatch.setattr(api, 'current_logger', logger_mocked()) monkeypatch.setattr(reporting, 'create_report', create_report_mocked()) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._gpg_show_keys', _gpg_show_keys_mocked) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._get_pubkeys', _get_pubkeys_mocked) + monkeypatch.setattr('leapp.libraries.common.gpg._gpg_show_keys', _gpg_show_keys_mocked) process() assert not api.current_logger.warnmsg @@ -559,7 +518,7 @@ def test_perform_report(monkeypatch): def get_test_data_no_gpg_data(): return [ _get_test_targuserspaceinfo(), - _get_test_installedrpm(), + _get_test_gpgkeys(), _get_test_usedtargetrepositories(), _get_test_tmptargetrepositoriesfacts(), ] @@ -593,12 +552,11 @@ def test_perform_invalid_key(monkeypatch): monkeypatch.setattr(api, 'produce', produce_mocked()) monkeypatch.setattr(api, 'current_logger', logger_mocked()) monkeypatch.setattr(reporting, 'create_report', create_report_mocked()) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._gpg_show_keys', _gpg_show_keys_mocked_my_empty) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._get_pubkeys', _get_pubkeys_mocked) + monkeypatch.setattr('leapp.libraries.common.gpg._gpg_show_keys', _gpg_show_keys_mocked_my_empty) process() - assert len(api.current_logger.warnmsg) == 1 - assert 'Cannot get any gpg key from the file' in api.current_logger.warnmsg[0] + assert len(api.current_logger.warnmsg) == 2, api.current_logger.warnmsg + assert 'Cannot get any gpg key from the file' in api.current_logger.warnmsg[1] assert api.produce.called == 1 assert isinstance(api.produce.model_instances[0], DNFWorkaround) assert reporting.create_report.called == 1 @@ -610,7 +568,7 @@ def test_perform_invalid_key(monkeypatch): def get_test_data_gpgcheck_without_gpgkey(): return [ _get_test_targuserspaceinfo(), - _get_test_installedrpm(), + _get_test_gpgkeys(), UsedTargetRepositories( repos=_get_test_usedtargetrepositories_list() + [ UsedTargetRepository( @@ -651,8 +609,7 @@ def test_perform_gpgcheck_without_gpgkey(monkeypatch): monkeypatch.setattr(api, 'produce', produce_mocked()) monkeypatch.setattr(api, 'current_logger', logger_mocked()) monkeypatch.setattr(reporting, 'create_report', create_report_mocked()) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._gpg_show_keys', _gpg_show_keys_mocked) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._get_pubkeys', _get_pubkeys_mocked) + monkeypatch.setattr('leapp.libraries.common.gpg._gpg_show_keys', _gpg_show_keys_mocked) process() assert len(api.current_logger.warnmsg) == 1 diff --git a/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/tests/unit_test_missinggpgkey.py b/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/tests/unit_test_missinggpgkey.py index 68e4cdfe..8cd00531 100644 --- a/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/tests/unit_test_missinggpgkey.py +++ b/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/tests/unit_test_missinggpgkey.py @@ -6,134 +6,12 @@ import tempfile import distro import pytest -from leapp.libraries.actor.missinggpgkey import ( - _expand_vars, - _get_abs_file_path, - _get_path_to_gpg_certs, - _get_pubkeys, - _get_repo_gpgkey_urls, - _gpg_show_keys, - _parse_fp_from_gpg, - _pubkeys_from_rpms -) +from leapp.libraries.actor.missinggpgkey import _expand_vars, _get_abs_file_path, _get_repo_gpgkey_urls from leapp.libraries.common.testutils import CurrentActorMocked from leapp.libraries.stdlib import api from leapp.models import InstalledRPM, RepositoryData, RPM, TargetUserSpaceInfo -def is_rhel7(): - return int(distro.major_version()) < 8 - - -def test_gpg_show_keys(current_actor_context, monkeypatch): - src = '7.9' if is_rhel7() else '8.6' - current_actor = CurrentActorMocked(src_ver=src) - monkeypatch.setattr(api, 'current_actor', current_actor) - - # python2 compatibility :/ - dirpath = tempfile.mkdtemp() - - # using GNUPGHOME env should avoid gnupg modifying the system - os.environ['GNUPGHOME'] = dirpath - - try: - # non-existing file - non_existent_path = os.path.join(dirpath, 'nonexistent') - res = _gpg_show_keys(non_existent_path) - if is_rhel7(): - err_msg = "gpg: can't open `{}'".format(non_existent_path) - else: - err_msg = "gpg: can't open '{}': No such file or directory\n".format(non_existent_path) - assert not res['stdout'] - assert err_msg in res['stderr'] - assert res['exit_code'] == 2 - - fp = _parse_fp_from_gpg(res) - assert fp == [] - - # no gpg data found - no_key_path = os.path.join(dirpath, "no_key") - with open(no_key_path, "w") as f: - f.write('test') - - res = _gpg_show_keys(no_key_path) - if is_rhel7(): - err_msg = ('gpg: no valid OpenPGP data found.\n' - 'gpg: processing message failed: Unknown system error\n') - else: - err_msg = 'gpg: no valid OpenPGP data found.\n' - assert not res['stdout'] - assert res['stderr'] == err_msg - assert res['exit_code'] == 2 - - fp = _parse_fp_from_gpg(res) - assert fp == [] - - # with some test data now -- rhel9 release key - # rhel9_key_path = os.path.join(api.get_common_folder_path('rpm-gpg'), '9') - cur_dir = os.path.dirname(os.path.abspath(__file__)) - rhel9_key_path = os.path.join(cur_dir, '..', '..', '..', 'files', 'rpm-gpg', '9', - 'RPM-GPG-KEY-redhat-release') - res = _gpg_show_keys(rhel9_key_path) - finally: - shutil.rmtree(dirpath) - - if is_rhel7(): - assert len(res['stdout']) == 4 - assert res['stdout'][0] == ('pub:-:4096:1:199E2F91FD431D51:1256212795:::-:' - 'Red Hat, Inc. (release key 2) :') - assert res['stdout'][1] == 'fpr:::::::::567E347AD0044ADE55BA8A5F199E2F91FD431D51:' - assert res['stdout'][2] == ('pub:-:4096:1:5054E4A45A6340B3:1646863006:::-:' - 'Red Hat, Inc. (auxiliary key 3) :') - assert res['stdout'][3] == 'fpr:::::::::7E4624258C406535D56D6F135054E4A45A6340B3:' - else: - assert len(res['stdout']) == 6 - assert res['stdout'][0] == 'pub:-:4096:1:199E2F91FD431D51:1256212795:::-:::scSC::::::23::0:' - assert res['stdout'][1] == 'fpr:::::::::567E347AD0044ADE55BA8A5F199E2F91FD431D51:' - assert res['stdout'][2] == ('uid:-::::1256212795::DC1CAEC7997B3575101BB0FCAAC6191792660D8F::' - 'Red Hat, Inc. (release key 2) ::::::::::0:') - assert res['stdout'][3] == 'pub:-:4096:1:5054E4A45A6340B3:1646863006:::-:::scSC::::::23::0:' - assert res['stdout'][4] == 'fpr:::::::::7E4624258C406535D56D6F135054E4A45A6340B3:' - assert res['stdout'][5] == ('uid:-::::1646863006::DA7F68E3872D6E7BDCE05225E7EB5F3ACDD9699F::' - 'Red Hat, Inc. (auxiliary key 3) ::::::::::0:') - - err = '{}/trustdb.gpg: trustdb created'.format(dirpath) - assert err in res['stderr'] - assert res['exit_code'] == 0 - - # now, parse the output too - fp = _parse_fp_from_gpg(res) - assert fp == ['fd431d51', '5a6340b3'] - - -@pytest.mark.parametrize('res, exp', [ - ({'exit_code': 2, 'stdout': '', 'stderr': ''}, []), - ({'exit_code': 2, 'stdout': '', 'stderr': 'bash: gpg2: command not found...'}, []), - ({'exit_code': 0, 'stdout': 'Some other output', 'stderr': ''}, []), - ({'exit_code': 0, 'stdout': ['Some other output', 'other line'], 'stderr': ''}, []), - ({'exit_code': 0, 'stdout': ['pub:-:4096:1:199E2F91FD431D:'], 'stderr': ''}, []), - ({'exit_code': 0, 'stdout': ['pub:-:4096:1:5054E4A45A6340B3:1..'], 'stderr': ''}, ['5a6340b3']), -]) -def test_parse_fp_from_gpg(res, exp): - fp = _parse_fp_from_gpg(res) - assert fp == exp - - -@pytest.mark.parametrize('target, product_type, exp', [ - ('8.6', 'beta', '../../files/rpm-gpg/8beta'), - ('8.8', 'htb', '../../files/rpm-gpg/8'), - ('9.0', 'beta', '../../files/rpm-gpg/9beta'), - ('9.2', 'ga', '../../files/rpm-gpg/9'), -]) -def test_get_path_to_gpg_certs(current_actor_context, monkeypatch, target, product_type, exp): - current_actor = CurrentActorMocked(dst_ver=target, - envars={'LEAPP_DEVEL_TARGET_PRODUCT_TYPE': product_type}) - monkeypatch.setattr(api, 'current_actor', current_actor) - - p = _get_path_to_gpg_certs() - assert p == exp - - @pytest.mark.parametrize('data, exp', [ ('bare string', 'bare string'), ('with dollar$$$', 'with dollar$$$'), @@ -148,50 +26,6 @@ def test_expand_vars(monkeypatch, data, exp): assert res == exp -def _get_test_installed_rmps(): - return InstalledRPM( - items=[ - RPM(name='gpg-pubkey', - version='9570ff31', - release='5e3006fb', - epoch='0', - packager='Fedora (33) ', - arch='noarch', - pgpsig=''), - RPM(name='rpm', - version='4.17.1', - release='3.fc35', - epoch='0', - packager='Fedora Project', - arch='x86_64', - pgpsig='RSA/SHA256, Tue 02 Aug 2022 03:12:43 PM CEST, Key ID db4639719867c58f'), - ], - ) - - -def test_pubkeys_from_rpms(): - installed_rpm = _get_test_installed_rmps() - assert _pubkeys_from_rpms(installed_rpm) == ['9570ff31'] - - -# @pytest.mark.parametrize('target, product_type, exp', [ -# ('8.6', 'beta', ['F21541EB']), -# ('8.8', 'htb', ['FD431D51', 'D4082792']), # ga -# ('9.0', 'beta', ['F21541EB']), -# ('9.2', 'ga', ['FD431D51', '5A6340B3']), -# ]) -# Def test_get_pubkeys(current_actor_context, monkeypatch, target, product_type, exp): -# current_actor = CurrentActorMocked(dst_ver=target, -# envars={'LEAPP_DEVEL_TARGET_PRODUCT_TYPE': product_type}) -# monkeypatch.setattr(api, 'current_actor', current_actor) -# installed_rpm = _get_test_installed_rmps() -# -# p = _get_pubkeys(installed_rpm) -# assert '9570ff31' in p -# for x in exp: -# assert x in p - - @pytest.mark.parametrize('repo, exp', [ (RepositoryData(repoid='dummy', name='name'), None), (RepositoryData(repoid='dummy', name='name', additional_fields='{}'), None), diff --git a/repos/system_upgrade/common/actors/targetuserspacecreator/libraries/userspacegen.py b/repos/system_upgrade/common/actors/targetuserspacecreator/libraries/userspacegen.py index e015a741..d605ba0e 100644 --- a/repos/system_upgrade/common/actors/targetuserspacecreator/libraries/userspacegen.py +++ b/repos/system_upgrade/common/actors/targetuserspacecreator/libraries/userspacegen.py @@ -9,6 +9,7 @@ from leapp.libraries.actor import constants from leapp.libraries.common import dnfplugin, mounting, overlaygen, repofileutils, rhsm, utils from leapp.libraries.common.config import get_env, get_product_type from leapp.libraries.common.config.version import get_target_major_version +from leapp.libraries.common.gpg import get_path_to_gpg_certs, is_nogpgcheck_set from leapp.libraries.stdlib import api, CalledProcessError, config, run from leapp.models import RequiredTargetUserspacePackages # deprecated from leapp.models import TMPTargetRepositoriesFacts # deprecated all the time @@ -54,7 +55,6 @@ from leapp.utils.deprecation import suppress_deprecation # Issue: #486 PROD_CERTS_FOLDER = 'prod-certs' -GPG_CERTS_FOLDER = 'rpm-gpg' PERSISTENT_PACKAGE_CACHE_DIR = '/var/lib/leapp/persistent_package_cache' DEDICATED_LEAPP_PART_URL = 'https://access.redhat.com/solutions/7011704' @@ -143,21 +143,8 @@ def _backup_to_persistent_package_cache(userspace_dir): shutil.move(src_cache, PERSISTENT_PACKAGE_CACHE_DIR) -def _the_nogpgcheck_option_used(): - return get_env('LEAPP_NOGPGCHECK', False) == '1' - - -def _get_path_to_gpg_certs(target_major_version): - target_product_type = get_product_type('target') - certs_dir = target_major_version - # only beta is special in regards to the GPG signing keys - if target_product_type == 'beta': - certs_dir = '{}beta'.format(target_major_version) - return os.path.join(api.get_common_folder_path(GPG_CERTS_FOLDER), certs_dir) - - def _import_gpg_keys(context, install_root_dir, target_major_version): - certs_path = _get_path_to_gpg_certs(target_major_version) + certs_path = get_path_to_gpg_certs() # Import the RHEL X+1 GPG key to be able to verify the installation of initial packages try: # Import also any other keys provided by the customer in the same directory @@ -234,13 +221,13 @@ def prepare_target_userspace(context, userspace_dir, enabled_repos, packages): install_root_dir = '/el{}target'.format(target_major_version) with mounting.BindMount(source=userspace_dir, target=os.path.join(context.base_dir, install_root_dir.lstrip('/'))): _restore_persistent_package_cache(userspace_dir) - if not _the_nogpgcheck_option_used(): + if not is_nogpgcheck_set(): _import_gpg_keys(context, install_root_dir, target_major_version) repos_opt = [['--enablerepo', repo] for repo in enabled_repos] repos_opt = list(itertools.chain(*repos_opt)) cmd = ['dnf', 'install', '-y'] - if _the_nogpgcheck_option_used(): + if is_nogpgcheck_set(): cmd.append('--nogpgcheck') cmd += [ '--setopt=module_platform_id=platform:el{}'.format(target_major_version), diff --git a/repos/system_upgrade/common/actors/trustedgpgkeysscanner/actor.py b/repos/system_upgrade/common/actors/trustedgpgkeysscanner/actor.py new file mode 100644 index 00000000..46e8f9ec --- /dev/null +++ b/repos/system_upgrade/common/actors/trustedgpgkeysscanner/actor.py @@ -0,0 +1,21 @@ +from leapp.actors import Actor +from leapp.libraries.actor import trustedgpgkeys +from leapp.models import InstalledRPM, TrustedGpgKeys +from leapp.tags import FactsPhaseTag, IPUWorkflowTag + + +class TrustedGpgKeysScanner(Actor): + """ + Scan for trusted GPG keys. + + These include keys readily available in the source RPM DB, keys for N+1 + Red Hat release and custom keys stored in the trusted directory. + """ + + name = 'trusted_gpg_keys_scanner' + consumes = (InstalledRPM,) + produces = (TrustedGpgKeys,) + tags = (IPUWorkflowTag, FactsPhaseTag) + + def process(self): + trustedgpgkeys.process() diff --git a/repos/system_upgrade/common/actors/trustedgpgkeysscanner/libraries/trustedgpgkeys.py b/repos/system_upgrade/common/actors/trustedgpgkeysscanner/libraries/trustedgpgkeys.py new file mode 100644 index 00000000..6377f767 --- /dev/null +++ b/repos/system_upgrade/common/actors/trustedgpgkeysscanner/libraries/trustedgpgkeys.py @@ -0,0 +1,38 @@ +import os + +from leapp.exceptions import StopActorExecutionError +from leapp.libraries.common.gpg import get_gpg_fp_from_file, get_path_to_gpg_certs, get_pubkeys_from_rpms +from leapp.libraries.stdlib import api +from leapp.models import GpgKey, InstalledRPM, TrustedGpgKeys + + +def _get_pubkeys(installed_rpms): + """ + Get pubkeys from installed rpms and the trusted directory + """ + pubkeys = get_pubkeys_from_rpms(installed_rpms) + db_pubkeys = [key.fingerprint for key in pubkeys] + certs_path = get_path_to_gpg_certs() + for certname in os.listdir(certs_path): + key_file = os.path.join(certs_path, certname) + fps = get_gpg_fp_from_file(key_file) + for fp in fps: + if fp not in db_pubkeys: + pubkeys.append(GpgKey(fingerprint=fp, rpmdb=False, filename=key_file)) + db_pubkeys += fp + return pubkeys + + +def process(): + """ + Process keys in RPM DB and the ones in trusted directory to produce a list of trusted keys + """ + + try: + installed_rpms = next(api.consume(InstalledRPM)) + except StopIteration: + raise StopActorExecutionError( + 'Could not check for valid GPG keys', details={'details': 'No InstalledRPM facts'} + ) + pubkeys = _get_pubkeys(installed_rpms) + api.produce(TrustedGpgKeys(items=pubkeys)) diff --git a/repos/system_upgrade/common/actors/trustedgpgkeysscanner/tests/test_trustedgpgkeys.py b/repos/system_upgrade/common/actors/trustedgpgkeysscanner/tests/test_trustedgpgkeys.py new file mode 100644 index 00000000..0d98aad7 --- /dev/null +++ b/repos/system_upgrade/common/actors/trustedgpgkeysscanner/tests/test_trustedgpgkeys.py @@ -0,0 +1,87 @@ +import os + +from leapp import reporting +from leapp.libraries.actor import trustedgpgkeys +from leapp.libraries.common.gpg import get_pubkeys_from_rpms +from leapp.libraries.common.testutils import create_report_mocked, CurrentActorMocked, logger_mocked, produce_mocked +from leapp.libraries.stdlib import api +from leapp.models import GpgKey, InstalledRPM, RPM, TrustedGpgKeys + + +def _get_test_installed_rmps(fps): + # adding at least one rpm that is not gpg-pubkey + rpms = [RPM( + name='rpm', + version='4.17.1', + release='3.fc35', + epoch='0', + packager='Fedora Project', + arch='x86_64', + pgpsig='RSA/SHA256, Tue 02 Aug 2022 03:12:43 PM CEST, Key ID db4639719867c58f' + )] + for fp in fps: + rpms.append(RPM( + name='gpg-pubkey', + version=fp, + release='5e3006fb', + epoch='0', + packager='Fedora (33) ', + arch='noarch', + pgpsig='' + )) + return InstalledRPM(items=rpms) + + +class MockedGetGpgFromFile(object): + def __init__(self, file_fps_tuples): + # e.g. file_fps_tuple = [('/mydir/myfile', ['0000ff31', '0000ff32'])] + self._data = {} + for fname, fps in file_fps_tuples: + self._data[fname] = fps + + def get_files(self): + return self._data.keys() # noqa: W1655; pylint: disable=dict-keys-not-iterating + + def __call__(self, fname): + return self._data.get(fname, []) + + +def test_get_pubkeys(monkeypatch): + """ + Very basic test of _get_pubkeys function + """ + rpm_fps = ['9570ff31', '99900000'] + file_fps = ['0000ff31', '0000ff32'] + installed_rpms = _get_test_installed_rmps(rpm_fps) + mocked_gpg_files = MockedGetGpgFromFile([('/mydir/myfile', ['0000ff31', '0000ff32'])]) + + def _mocked_listdir(dummy): + return [os.path.basename(i) for i in mocked_gpg_files.get_files()] + + monkeypatch.setattr(trustedgpgkeys.os, 'listdir', _mocked_listdir) + monkeypatch.setattr(trustedgpgkeys, 'get_path_to_gpg_certs', lambda: '/mydir/') + monkeypatch.setattr(trustedgpgkeys, 'get_gpg_fp_from_file', mocked_gpg_files) + + pubkeys = trustedgpgkeys._get_pubkeys(installed_rpms) + assert len(pubkeys) == len(rpm_fps + file_fps) + assert set(rpm_fps) == {pkey.fingerprint for pkey in pubkeys if pkey.rpmdb} + assert set(file_fps) == {pkey.fingerprint for pkey in pubkeys if not pkey.rpmdb} + assert list({pkey.filename for pkey in pubkeys if not pkey.rpmdb})[0] == '/mydir/myfile' + + +def test_process(monkeypatch): + """ + Executes the "main" function + """ + monkeypatch.setattr(api, 'current_actor', CurrentActorMocked( + msgs=[_get_test_installed_rmps(['9570ff31'])]) + ) + monkeypatch.setattr(api, 'produce', produce_mocked()) + monkeypatch.setattr(api, 'current_logger', logger_mocked()) + monkeypatch.setattr(reporting, 'create_report', create_report_mocked()) + monkeypatch.setattr(trustedgpgkeys, '_get_pubkeys', get_pubkeys_from_rpms) + + trustedgpgkeys.process() + assert api.produce.called == 1 + assert isinstance(api.produce.model_instances[0], TrustedGpgKeys) + assert reporting.create_report.called == 0 diff --git a/repos/system_upgrade/common/libraries/dnfplugin.py b/repos/system_upgrade/common/libraries/dnfplugin.py index d3ec5901..fbd58246 100644 --- a/repos/system_upgrade/common/libraries/dnfplugin.py +++ b/repos/system_upgrade/common/libraries/dnfplugin.py @@ -9,6 +9,7 @@ from leapp.exceptions import StopActorExecutionError from leapp.libraries.common import dnfconfig, guards, mounting, overlaygen, rhsm, utils from leapp.libraries.common.config import get_env from leapp.libraries.common.config.version import get_target_major_version, get_target_version +from leapp.libraries.common.gpg import is_nogpgcheck_set from leapp.libraries.stdlib import api, CalledProcessError, config from leapp.models import DNFWorkaround @@ -77,10 +78,6 @@ def _rebuild_rpm_db(context, root=None): context.call(cmd) -def _the_nogpgcheck_option_used(): - return get_env('LEAPP_NOGPGCHECK', '0') == '1' - - def build_plugin_data(target_repoids, debug, test, tasks, on_aws): """ Generates a dictionary with the DNF plugin data. @@ -100,7 +97,7 @@ def build_plugin_data(target_repoids, debug, test, tasks, on_aws): 'debugsolver': debug, 'disable_repos': True, 'enable_repos': target_repoids, - 'gpgcheck': not _the_nogpgcheck_option_used(), + 'gpgcheck': not is_nogpgcheck_set(), 'platform_id': 'platform:el{}'.format(get_target_major_version()), 'releasever': get_target_version(), 'installroot': '/installroot', @@ -367,7 +364,7 @@ def install_initramdisk_requirements(packages, target_userspace_info, used_repos 'dnf', 'install', '-y'] - if _the_nogpgcheck_option_used(): + if is_nogpgcheck_set(): cmd.append('--nogpgcheck') cmd += [ '--setopt=module_platform_id=platform:el{}'.format(get_target_major_version()), diff --git a/repos/system_upgrade/common/libraries/gpg.py b/repos/system_upgrade/common/libraries/gpg.py new file mode 100644 index 00000000..a8071329 --- /dev/null +++ b/repos/system_upgrade/common/libraries/gpg.py @@ -0,0 +1,137 @@ +import os + +from leapp.libraries.common import config +from leapp.libraries.common.config.version import get_source_major_version, get_target_major_version +from leapp.libraries.stdlib import api, run +from leapp.models import GpgKey + +GPG_CERTS_FOLDER = 'rpm-gpg' + + +def get_pubkeys_from_rpms(installed_rpms): + """ + Return the list of fingerprints of GPG keys in RPM DB + + This function returns short 8 characters fingerprints of trusted GPG keys + "installed" in the source OS RPM database. These look like normal packages + named "gpg-pubkey" and the fingerprint is present in the version field. + + :param installed_rpms: List of installed RPMs + :type installed_rpms: list(leapp.models.RPM) + :return: list of GPG keys from RPM DB + :rtype: list(leapp.models.GpgKey) + """ + return [GpgKey(fingerprint=pkg.version, rpmdb=True) for pkg in installed_rpms.items if pkg.name == 'gpg-pubkey'] + + +def _gpg_show_keys(key_path): + """ + Show keys in given file in version-agnostic manner + + This runs gpg --show-keys (EL8) or gpg --with-fingerprints (EL7) + to verify the given file exists, is readable and contains valid + OpenPGP key data, which is printed in parsable format (--with-colons). + """ + try: + cmd = ['gpg2'] + # RHEL7 gnupg requires different switches to get the same output + if get_source_major_version() == '7': + cmd.append('--with-fingerprint') + else: + cmd.append('--show-keys') + cmd += ['--with-colons', key_path] + # TODO: discussed, most likely the checked=False will be dropped + # and error will be handled in other functions + return run(cmd, split=True, checked=False) + except OSError as err: + # NOTE: this is hypothetic; gnupg2 has to be installed on RHEL 7+ + error = 'Failed to read fingerprint from GPG key {}: {}'.format(key_path, str(err)) + api.current_logger().error(error) + return {} + + +def _parse_fp_from_gpg(output): + """ + Parse the output of gpg --show-keys --with-colons. + + Return list of 8 characters fingerprints per each gpgkey for the given + output from stdlib.run() or None if some error occurred. Either the + command return non-zero exit code, the file does not exists, its not + readable or does not contain any openpgp data. + """ + if not output or output['exit_code']: + return [] + + # we are interested in the lines of the output starting with "pub:" + # the colons are used for separating the fields in output like this + # pub:-:4096:1:999F7CBF38AB71F4:1612983048:::-:::escESC::::::23::0: + # ^--------------^ this is the fingerprint we need + # ^------^ but RPM version is just the last 8 chars lowercase + # Also multiple gpg keys can be stored in the file, so go through all "pub" + # lines + gpg_fps = [] + for line in output['stdout']: + if not line or not line.startswith('pub:'): + continue + parts = line.split(':') + if len(parts) >= 4 and len(parts[4]) == 16: + gpg_fps.append(parts[4][8:].lower()) + else: + api.current_logger().warning( + 'Cannot parse the gpg2 output. Line: "{}"' + .format(line) + ) + + return gpg_fps + + +def get_gpg_fp_from_file(key_path): + """ + Return the list of public key fingerprints from the given file + + Log warning in case no OpenPGP data found in the given file or it is not + readable for some reason. + + :param key_path: Path to the file with GPG key(s) + :type key_path: str + :return: List of public key fingerprints from the given file + :rtype: list(str) + """ + res = _gpg_show_keys(key_path) + fp = _parse_fp_from_gpg(res) + if not fp: + error_msg = 'Unable to read OpenPGP keys from {}: {}'.format(key_path, res['stderr']) + api.current_logger().warning(error_msg) + return fp + + +def get_path_to_gpg_certs(): + """ + Get path to the directory with trusted target gpg keys in the common leapp repository. + + GPG keys stored under this directory are considered as trusted and are + installed during the upgrade process. + + :return: Path to the directory with GPG keys stored under the common leapp repository. + :rtype: str + """ + target_major_version = get_target_major_version() + target_product_type = config.get_product_type('target') + certs_dir = target_major_version + # only beta is special in regards to the GPG signing keys + if target_product_type == 'beta': + certs_dir = '{}beta'.format(target_major_version) + return os.path.join(api.get_common_folder_path(GPG_CERTS_FOLDER), certs_dir) + + +def is_nogpgcheck_set(): + """ + Return True if the GPG check should be skipped. + + The GPG check is skipped if leapp is executed with LEAPP_NOGPGCHECK=1 + or with the --nogpgcheck CLI option. In both cases, actors will see + LEAPP_NOGPGCHECK is '1'. + + :rtype: bool + """ + return config.get_env('LEAPP_NOGPGCHECK', False) == '1' diff --git a/repos/system_upgrade/common/libraries/tests/test_gpg.py b/repos/system_upgrade/common/libraries/tests/test_gpg.py new file mode 100644 index 00000000..7cf37fa2 --- /dev/null +++ b/repos/system_upgrade/common/libraries/tests/test_gpg.py @@ -0,0 +1,147 @@ +import os +import shutil +import tempfile + +import distro +import pytest + +from leapp.libraries.common import gpg +from leapp.libraries.common.testutils import CurrentActorMocked +from leapp.libraries.stdlib import api +from leapp.models import GpgKey, InstalledRPM, RPM + + +@pytest.mark.parametrize('target, product_type, exp', [ + ('8.6', 'beta', '../../files/rpm-gpg/8beta'), + ('8.8', 'htb', '../../files/rpm-gpg/8'), + ('9.0', 'beta', '../../files/rpm-gpg/9beta'), + ('9.2', 'ga', '../../files/rpm-gpg/9'), +]) +def test_get_path_to_gpg_certs(monkeypatch, target, product_type, exp): + current_actor = CurrentActorMocked(dst_ver=target, + envars={'LEAPP_DEVEL_TARGET_PRODUCT_TYPE': product_type}) + monkeypatch.setattr(api, 'current_actor', current_actor) + + p = gpg.get_path_to_gpg_certs() + assert p == exp + + +def is_rhel7(): + return int(distro.major_version()) < 8 + + +@pytest.mark.skipif(distro.id() not in ("rhel", "centos"), reason="Requires RHEL or CentOS for valid results.") +def test_gpg_show_keys(loaded_leapp_repository, monkeypatch): + src = '7.9' if is_rhel7() else '8.6' + current_actor = CurrentActorMocked(src_ver=src) + monkeypatch.setattr(api, 'current_actor', current_actor) + + # python2 compatibility :/ + dirpath = tempfile.mkdtemp() + + # using GNUPGHOME env should avoid gnupg modifying the system + os.environ['GNUPGHOME'] = dirpath + + try: + # non-existing file + non_existent_path = os.path.join(dirpath, 'nonexistent') + res = gpg._gpg_show_keys(non_existent_path) + if is_rhel7(): + err_msg = "gpg: can't open `{}'".format(non_existent_path) + else: + err_msg = "gpg: can't open '{}': No such file or directory\n".format(non_existent_path) + assert not res['stdout'] + assert err_msg in res['stderr'] + assert res['exit_code'] == 2 + + fp = gpg._parse_fp_from_gpg(res) + assert fp == [] + + # no gpg data found + no_key_path = os.path.join(dirpath, "no_key") + with open(no_key_path, "w") as f: + f.write('test') + + res = gpg._gpg_show_keys(no_key_path) + if is_rhel7(): + err_msg = ('gpg: no valid OpenPGP data found.\n' + 'gpg: processing message failed: Unknown system error\n') + else: + err_msg = 'gpg: no valid OpenPGP data found.\n' + assert not res['stdout'] + assert res['stderr'] == err_msg + assert res['exit_code'] == 2 + + fp = gpg._parse_fp_from_gpg(res) + assert fp == [] + + # with some test data now -- rhel9 release key + # rhel9_key_path = os.path.join(api.get_common_folder_path('rpm-gpg'), '9') + cur_dir = os.path.dirname(os.path.abspath(__file__)) + rhel9_key_path = os.path.join(cur_dir, '..', '..', 'files', 'rpm-gpg', '9', + 'RPM-GPG-KEY-redhat-release') + res = gpg._gpg_show_keys(rhel9_key_path) + finally: + shutil.rmtree(dirpath) + + if is_rhel7(): + assert len(res['stdout']) == 4 + assert res['stdout'][0] == ('pub:-:4096:1:199E2F91FD431D51:1256212795:::-:' + 'Red Hat, Inc. (release key 2) :') + assert res['stdout'][1] == 'fpr:::::::::567E347AD0044ADE55BA8A5F199E2F91FD431D51:' + assert res['stdout'][2] == ('pub:-:4096:1:5054E4A45A6340B3:1646863006:::-:' + 'Red Hat, Inc. (auxiliary key 3) :') + assert res['stdout'][3] == 'fpr:::::::::7E4624258C406535D56D6F135054E4A45A6340B3:' + else: + assert len(res['stdout']) == 6 + assert res['stdout'][0] == 'pub:-:4096:1:199E2F91FD431D51:1256212795:::-:::scSC::::::23::0:' + assert res['stdout'][1] == 'fpr:::::::::567E347AD0044ADE55BA8A5F199E2F91FD431D51:' + assert res['stdout'][2] == ('uid:-::::1256212795::DC1CAEC7997B3575101BB0FCAAC6191792660D8F::' + 'Red Hat, Inc. (release key 2) ::::::::::0:') + assert res['stdout'][3] == 'pub:-:4096:1:5054E4A45A6340B3:1646863006:::-:::scSC::::::23::0:' + assert res['stdout'][4] == 'fpr:::::::::7E4624258C406535D56D6F135054E4A45A6340B3:' + assert res['stdout'][5] == ('uid:-::::1646863006::DA7F68E3872D6E7BDCE05225E7EB5F3ACDD9699F::' + 'Red Hat, Inc. (auxiliary key 3) ::::::::::0:') + + err = '{}/trustdb.gpg: trustdb created'.format(dirpath) + assert err in res['stderr'] + assert res['exit_code'] == 0 + + # now, parse the output too + fp = gpg._parse_fp_from_gpg(res) + assert fp == ['fd431d51', '5a6340b3'] + + +@pytest.mark.parametrize('res, exp', [ + ({'exit_code': 2, 'stdout': '', 'stderr': ''}, []), + ({'exit_code': 2, 'stdout': '', 'stderr': 'bash: gpg2: command not found...'}, []), + ({'exit_code': 0, 'stdout': 'Some other output', 'stderr': ''}, []), + ({'exit_code': 0, 'stdout': ['Some other output', 'other line'], 'stderr': ''}, []), + ({'exit_code': 0, 'stdout': ['pub:-:4096:1:199E2F91FD431D:'], 'stderr': ''}, []), + ({'exit_code': 0, 'stdout': ['pub:-:4096:1:5054E4A45A6340B3:1..'], 'stderr': ''}, ['5a6340b3']), +]) +def test_parse_fp_from_gpg(res, exp): + fp = gpg._parse_fp_from_gpg(res) + assert fp == exp + + +def test_pubkeys_from_rpms(): + installed_rpms = InstalledRPM( + items=[ + RPM(name='gpg-pubkey', + version='9570ff31', + release='5e3006fb', + epoch='0', + packager='Fedora (33) ', + arch='noarch', + pgpsig=''), + RPM(name='rpm', + version='4.17.1', + release='3.fc35', + epoch='0', + packager='Fedora Project', + arch='x86_64', + pgpsig='RSA/SHA256, Tue 02 Aug 2022 03:12:43 PM CEST, Key ID db4639719867c58f'), + ], + ) + assert gpg.get_pubkeys_from_rpms(installed_rpms) == [GpgKey(fingerprint='9570ff31', rpmdb=True)] diff --git a/repos/system_upgrade/common/models/trustedgpgkeys.py b/repos/system_upgrade/common/models/trustedgpgkeys.py new file mode 100644 index 00000000..c397bea7 --- /dev/null +++ b/repos/system_upgrade/common/models/trustedgpgkeys.py @@ -0,0 +1,19 @@ +from leapp.models import fields, Model +from leapp.topics import SystemFactsTopic + + +class GpgKey(Model): + """ + GPG Public key + + It is represented by a record in the RPM DB or by a file in directory with trusted keys (or both). + """ + topic = SystemFactsTopic + fingerprint = fields.String() + rpmdb = fields.Boolean() + filename = fields.Nullable(fields.String()) + + +class TrustedGpgKeys(Model): + topic = SystemFactsTopic + items = fields.List(fields.Model(GpgKey), default=[]) -- 2.41.0