forked from rpms/leapp-repository
610 lines
23 KiB
Diff
610 lines
23 KiB
Diff
From 428c46051619a570b08189677bb27eedf69c2a9e Mon Sep 17 00:00:00 2001
|
|
From: karolinku <kkula@redhat.com>
|
|
Date: Fri, 17 Oct 2025 16:06:15 +0200
|
|
Subject: [PATCH 50/55] Add detection for third-party target Python modules
|
|
|
|
Introduce actors to detect presence of third-party
|
|
Python modules installed for target Python. Those modules could
|
|
interfere with the upgrade process or cause issues after rebooting
|
|
into the target system.
|
|
|
|
Scanner (scanthirdpartytargetpythonmodules):
|
|
- Identifies the target Python interpreter
|
|
- Queries the target Python's sys.path to determine where it searches
|
|
for modules
|
|
- Recursively scans these directories for Python files (.py, .so, .pyc)
|
|
- Cross-references found files against the RPM database to determine
|
|
ownership and categorize them
|
|
|
|
Checker (checkthirdpartytargetpythonmodules) creates a high severity
|
|
report to inform users about findings and presents full list of them
|
|
in logs and short version in report.
|
|
|
|
Jira: RHEL-71882
|
|
---
|
|
.../actor.py | 21 ++
|
|
.../checkthirdpartytargetpythonmodules.py | 74 +++++++
|
|
...check_third_party_target_python_modules.py | 46 +++++
|
|
.../actor.py | 19 ++
|
|
.../scanthirdpartytargetpythonmodules.py | 193 ++++++++++++++++++
|
|
..._scan_third_party_target_python_modules.py | 136 ++++++++++++
|
|
.../models/thirdpartytagetpythonmodules.py | 25 +++
|
|
requirements.txt | 1 +
|
|
8 files changed, 515 insertions(+)
|
|
create mode 100644 repos/system_upgrade/common/actors/checkthirdpartytargetpythonmodules/actor.py
|
|
create mode 100644 repos/system_upgrade/common/actors/checkthirdpartytargetpythonmodules/libraries/checkthirdpartytargetpythonmodules.py
|
|
create mode 100644 repos/system_upgrade/common/actors/checkthirdpartytargetpythonmodules/tests/test_check_third_party_target_python_modules.py
|
|
create mode 100644 repos/system_upgrade/common/actors/scanthirdpartytargetpythonmodules/actor.py
|
|
create mode 100644 repos/system_upgrade/common/actors/scanthirdpartytargetpythonmodules/libraries/scanthirdpartytargetpythonmodules.py
|
|
create mode 100644 repos/system_upgrade/common/actors/scanthirdpartytargetpythonmodules/tests/test_scan_third_party_target_python_modules.py
|
|
create mode 100644 repos/system_upgrade/common/models/thirdpartytagetpythonmodules.py
|
|
|
|
diff --git a/repos/system_upgrade/common/actors/checkthirdpartytargetpythonmodules/actor.py b/repos/system_upgrade/common/actors/checkthirdpartytargetpythonmodules/actor.py
|
|
new file mode 100644
|
|
index 00000000..e1868819
|
|
--- /dev/null
|
|
+++ b/repos/system_upgrade/common/actors/checkthirdpartytargetpythonmodules/actor.py
|
|
@@ -0,0 +1,21 @@
|
|
+from leapp.actors import Actor
|
|
+from leapp.libraries.actor.checkthirdpartytargetpythonmodules import perform_check
|
|
+from leapp.models import ThirdPartyTargetPythonModules
|
|
+from leapp.reporting import Report
|
|
+from leapp.tags import ChecksPhaseTag, IPUWorkflowTag
|
|
+
|
|
+
|
|
+class CheckThirdPartyTargetPythonModules(Actor):
|
|
+ """
|
|
+ Produces a report if any third-party target Python modules are detected on the source system.
|
|
+
|
|
+ If such modules are detected, a high risk report is produced.
|
|
+ """
|
|
+
|
|
+ name = 'check_third_party_target_python_modules'
|
|
+ consumes = (ThirdPartyTargetPythonModules,)
|
|
+ produces = (Report,)
|
|
+ tags = (ChecksPhaseTag, IPUWorkflowTag)
|
|
+
|
|
+ def process(self):
|
|
+ perform_check()
|
|
diff --git a/repos/system_upgrade/common/actors/checkthirdpartytargetpythonmodules/libraries/checkthirdpartytargetpythonmodules.py b/repos/system_upgrade/common/actors/checkthirdpartytargetpythonmodules/libraries/checkthirdpartytargetpythonmodules.py
|
|
new file mode 100644
|
|
index 00000000..7ed34738
|
|
--- /dev/null
|
|
+++ b/repos/system_upgrade/common/actors/checkthirdpartytargetpythonmodules/libraries/checkthirdpartytargetpythonmodules.py
|
|
@@ -0,0 +1,74 @@
|
|
+from leapp import reporting
|
|
+from leapp.libraries.stdlib import api
|
|
+from leapp.models import ThirdPartyTargetPythonModules
|
|
+
|
|
+FMT_LIST_SEPARATOR = '\n - '
|
|
+MAX_REPORTED_ITEMS = 30
|
|
+
|
|
+
|
|
+def _formatted_list_output_with_max_items(input_list, sep=FMT_LIST_SEPARATOR, max_items=MAX_REPORTED_ITEMS):
|
|
+ if not input_list:
|
|
+ return ''
|
|
+
|
|
+ total_count = len(input_list)
|
|
+ items_to_show = input_list[:max_items]
|
|
+ formatted = ['{}{}'.format(sep, item) for item in items_to_show]
|
|
+
|
|
+ if total_count > max_items:
|
|
+ formatted.append('{}... and {} more'.format(sep, total_count - max_items))
|
|
+
|
|
+ return ''.join(formatted)
|
|
+
|
|
+
|
|
+def check_third_party_target_python_modules(third_party_target_python_modules):
|
|
+ """Create an inhibitor when third-party Python modules are detected."""
|
|
+ target_python_version = third_party_target_python_modules.target_python.split('python')[1]
|
|
+ third_party_rpms = third_party_target_python_modules.third_party_rpm_names
|
|
+ third_party_modules = third_party_target_python_modules.third_party_modules
|
|
+
|
|
+ summary = (
|
|
+ 'Third-party target Python modules may interfere with '
|
|
+ 'the upgrade process or cause unexpected behavior after the upgrade.'
|
|
+ )
|
|
+
|
|
+ if third_party_rpms:
|
|
+ summary = (
|
|
+ '{pre}\n\nNon-distribution RPM packages detected:{rpmlist}'
|
|
+ .format(
|
|
+ pre=summary,
|
|
+ rpmlist=_formatted_list_output_with_max_items(third_party_rpms))
|
|
+ )
|
|
+
|
|
+ if third_party_modules:
|
|
+ summary = (
|
|
+ '{pre}\n\nNon-distribution modules detected (list can be incomplete):{modulelist}'
|
|
+ .format(
|
|
+ pre=summary,
|
|
+ modulelist=_formatted_list_output_with_max_items(third_party_modules))
|
|
+ )
|
|
+
|
|
+ reporting.create_report([
|
|
+ reporting.Title('Detected third-party Python modules for the target Python version'),
|
|
+ reporting.Summary(summary),
|
|
+ reporting.Remediation(
|
|
+ hint='Remove third-party target Python {} packages before attempting the upgrade or ensure '
|
|
+ 'that those modules are not interfering with distribution-provided modules.'
|
|
+ .format(target_python_version),
|
|
+ ),
|
|
+ reporting.Severity(reporting.Severity.HIGH)
|
|
+ ])
|
|
+
|
|
+
|
|
+def perform_check():
|
|
+ """Perform the check for third-party Python modules."""
|
|
+ third_party_target_python_modules_msg = next(api.consume(
|
|
+ ThirdPartyTargetPythonModules),
|
|
+ None,
|
|
+ )
|
|
+
|
|
+ if not third_party_target_python_modules_msg:
|
|
+ return
|
|
+
|
|
+ if (third_party_target_python_modules_msg.third_party_rpm_names or
|
|
+ third_party_target_python_modules_msg.third_party_modules):
|
|
+ check_third_party_target_python_modules(third_party_target_python_modules_msg)
|
|
diff --git a/repos/system_upgrade/common/actors/checkthirdpartytargetpythonmodules/tests/test_check_third_party_target_python_modules.py b/repos/system_upgrade/common/actors/checkthirdpartytargetpythonmodules/tests/test_check_third_party_target_python_modules.py
|
|
new file mode 100644
|
|
index 00000000..2a87d195
|
|
--- /dev/null
|
|
+++ b/repos/system_upgrade/common/actors/checkthirdpartytargetpythonmodules/tests/test_check_third_party_target_python_modules.py
|
|
@@ -0,0 +1,46 @@
|
|
+import pytest
|
|
+
|
|
+from leapp import reporting
|
|
+from leapp.libraries.actor import checkthirdpartytargetpythonmodules
|
|
+from leapp.libraries.common.testutils import create_report_mocked, CurrentActorMocked
|
|
+from leapp.libraries.stdlib import api
|
|
+from leapp.models import ThirdPartyTargetPythonModules
|
|
+
|
|
+
|
|
+def test_perform_check_no_message_available(monkeypatch):
|
|
+ monkeypatch.setattr(api, 'current_actor', CurrentActorMocked(msgs=[]))
|
|
+ monkeypatch.setattr(reporting, 'create_report', create_report_mocked())
|
|
+
|
|
+ checkthirdpartytargetpythonmodules.perform_check()
|
|
+
|
|
+ assert not reporting.create_report.called
|
|
+
|
|
+
|
|
+def test_perform_check_empty_lists(monkeypatch):
|
|
+ msg = ThirdPartyTargetPythonModules(
|
|
+ target_python='python3.9',
|
|
+ third_party_modules=[],
|
|
+ third_party_rpm_names=[]
|
|
+ )
|
|
+
|
|
+ monkeypatch.setattr(api, 'current_actor', CurrentActorMocked(msgs=[msg]))
|
|
+ monkeypatch.setattr(reporting, 'create_report', create_report_mocked())
|
|
+
|
|
+ checkthirdpartytargetpythonmodules.perform_check()
|
|
+
|
|
+ assert not reporting.create_report.called
|
|
+
|
|
+
|
|
+def test_perform_check_with_third_party_modules(monkeypatch):
|
|
+ msg = ThirdPartyTargetPythonModules(
|
|
+ target_python='python3.9',
|
|
+ third_party_modules=['third_party_module'],
|
|
+ third_party_rpm_names=['third_party_rpm']
|
|
+ )
|
|
+
|
|
+ monkeypatch.setattr(api, 'current_actor', CurrentActorMocked(msgs=[msg]))
|
|
+ monkeypatch.setattr(reporting, 'create_report', create_report_mocked())
|
|
+
|
|
+ checkthirdpartytargetpythonmodules.perform_check()
|
|
+
|
|
+ assert reporting.create_report.called
|
|
diff --git a/repos/system_upgrade/common/actors/scanthirdpartytargetpythonmodules/actor.py b/repos/system_upgrade/common/actors/scanthirdpartytargetpythonmodules/actor.py
|
|
new file mode 100644
|
|
index 00000000..2c0d1973
|
|
--- /dev/null
|
|
+++ b/repos/system_upgrade/common/actors/scanthirdpartytargetpythonmodules/actor.py
|
|
@@ -0,0 +1,19 @@
|
|
+from leapp.actors import Actor
|
|
+from leapp.libraries.actor import scanthirdpartytargetpythonmodules
|
|
+from leapp.models import DistributionSignedRPM, ThirdPartyTargetPythonModules
|
|
+from leapp.tags import FactsPhaseTag, IPUWorkflowTag
|
|
+
|
|
+
|
|
+class ScanThirdPartyTargetPythonModules(Actor):
|
|
+ """
|
|
+ Detect third-party target Python modules and RPMs on the source system.
|
|
+
|
|
+ """
|
|
+
|
|
+ name = 'scan_third_party_target_python_modules'
|
|
+ consumes = (DistributionSignedRPM,)
|
|
+ produces = (ThirdPartyTargetPythonModules,)
|
|
+ tags = (FactsPhaseTag, IPUWorkflowTag)
|
|
+
|
|
+ def process(self):
|
|
+ scanthirdpartytargetpythonmodules.process()
|
|
diff --git a/repos/system_upgrade/common/actors/scanthirdpartytargetpythonmodules/libraries/scanthirdpartytargetpythonmodules.py b/repos/system_upgrade/common/actors/scanthirdpartytargetpythonmodules/libraries/scanthirdpartytargetpythonmodules.py
|
|
new file mode 100644
|
|
index 00000000..1329c50f
|
|
--- /dev/null
|
|
+++ b/repos/system_upgrade/common/actors/scanthirdpartytargetpythonmodules/libraries/scanthirdpartytargetpythonmodules.py
|
|
@@ -0,0 +1,193 @@
|
|
+import json
|
|
+import os
|
|
+from collections import defaultdict
|
|
+from pathlib import Path
|
|
+
|
|
+import rpm
|
|
+
|
|
+from leapp.libraries.common.config.version import get_target_major_version
|
|
+from leapp.libraries.common.rpms import has_package
|
|
+from leapp.libraries.stdlib import api, run
|
|
+from leapp.models import DistributionSignedRPM, ThirdPartyTargetPythonModules
|
|
+
|
|
+PYTHON_EXTENSIONS = (".py", ".so", ".pyc")
|
|
+FMT_LIST_SEPARATOR = '\n - '
|
|
+
|
|
+
|
|
+def _formatted_list_output(input_list, sep=FMT_LIST_SEPARATOR):
|
|
+ return ['{}{}'.format(sep, item) for item in input_list]
|
|
+
|
|
+
|
|
+def get_python_sys_paths(python_interpreter):
|
|
+ """Get sys.path from the specified Python interpreter."""
|
|
+
|
|
+ result = run([python_interpreter, '-c', 'import sys, json; print(json.dumps(sys.path))'])['stdout']
|
|
+ raw_paths = json.loads(result)
|
|
+ paths = [Path(raw_path).resolve() for raw_path in raw_paths]
|
|
+ return paths
|
|
+
|
|
+
|
|
+def get_python_binary_for_rhel(rhel_version):
|
|
+ """
|
|
+ Maps RHEL major version to the appropriate Python binary.
|
|
+ """
|
|
+
|
|
+ version_map = {
|
|
+ '9': 'python3.9',
|
|
+ '10': 'python3.12',
|
|
+ }
|
|
+ return version_map.get(rhel_version)
|
|
+
|
|
+
|
|
+def is_target_python_present(target_python):
|
|
+ """
|
|
+ Checks if the target Python interpreter is available on the system.
|
|
+ """
|
|
+
|
|
+ result = run(['command', '-v', target_python], checked=False)
|
|
+ return not result['exit_code']
|
|
+
|
|
+
|
|
+def identify_files_of_pypackages(syspaths):
|
|
+ ts = rpm.TransactionSet()
|
|
+ # add a trailing slash by calling os.path.join(..., '')
|
|
+ roots = tuple(os.path.join(str(path), "") for path in syspaths)
|
|
+ file_to_pkg = {}
|
|
+
|
|
+ # Iterate over all installed packages
|
|
+ for header in ts.dbMatch():
|
|
+ pkg = header['name']
|
|
+ files = header['filenames']
|
|
+ for filename in files:
|
|
+ if filename and filename.endswith(PYTHON_EXTENSIONS) and filename.startswith(roots):
|
|
+ file_to_pkg[filename] = pkg
|
|
+ return file_to_pkg
|
|
+
|
|
+
|
|
+def find_python_related(root):
|
|
+ # recursively search for all files matching the given extension
|
|
+ for pattern in PYTHON_EXTENSIONS:
|
|
+ yield from root.rglob("*" + pattern)
|
|
+
|
|
+
|
|
+def _should_skip_file(file):
|
|
+ # pyc files are importable, but not if they are in __pycache__
|
|
+ return file.name.endswith(".pyc") and file.parent.name == "__pycache__"
|
|
+
|
|
+
|
|
+def scan_python_files(system_paths, rpm_files):
|
|
+ """
|
|
+ Scan system paths for Python files and categorize them by ownership.
|
|
+
|
|
+ :param system_paths: List of paths to scan for Python files
|
|
+ :param rpm_files: Dictionary mapping file paths to RPM package names
|
|
+ :return: Tuple of (rpms_to_check, third_party_unowned_files) where:
|
|
+ - rpms_to_check is a dict mapping RPM names to list of their files
|
|
+ - third_party_unowned_files is a list of files not owned by any RPM
|
|
+ """
|
|
+ rpms_to_check = defaultdict(list)
|
|
+ third_party_unowned_files = []
|
|
+
|
|
+ for path in system_paths:
|
|
+ if not path.is_dir():
|
|
+ continue
|
|
+ for file in find_python_related(path):
|
|
+ if _should_skip_file(file):
|
|
+ continue
|
|
+
|
|
+ file_path = str(file)
|
|
+ owner = rpm_files.get(file_path)
|
|
+ if owner:
|
|
+ rpms_to_check[owner].append(file_path)
|
|
+ else:
|
|
+ third_party_unowned_files.append(file_path)
|
|
+
|
|
+ return rpms_to_check, third_party_unowned_files
|
|
+
|
|
+
|
|
+def identify_unsigned_rpms(rpms_to_check):
|
|
+ """
|
|
+ Identify which RPMs are third-party (not signed by the distribution).
|
|
+
|
|
+ :param rpms_to_check: Dictionary mapping RPM names to list of their files
|
|
+ :return: Tuple of (third_party_rpms, third_party_files) where:
|
|
+ - third_party_rpms is a list of third-party RPM package names
|
|
+ - third_party_files is a list of files from third-party RPMs
|
|
+ """
|
|
+ third_party_rpms = []
|
|
+ third_party_files = []
|
|
+
|
|
+ for rpm_name, files in rpms_to_check.items():
|
|
+ if not has_package(DistributionSignedRPM, rpm_name):
|
|
+ third_party_rpms.append(rpm_name)
|
|
+ api.current_logger().warning(
|
|
+ 'Found Python files from non-distribution RPM package: {}'.format(rpm_name)
|
|
+ )
|
|
+ third_party_files.extend(files)
|
|
+
|
|
+ return third_party_rpms, third_party_files
|
|
+
|
|
+
|
|
+def process():
|
|
+ """
|
|
+ Main function to scan for third-party Python modules/RPMs on the target system.
|
|
+
|
|
+ This function:
|
|
+ 1. Validates the target RHEL version and Python interpreter
|
|
+ 2. Scans system paths for Python files
|
|
+ 3. Identifies third-party RPMs and modules
|
|
+ 4. Produces a message if any third-party modules/RPMs are detected
|
|
+ """
|
|
+ target_version = get_target_major_version()
|
|
+ target_python = get_python_binary_for_rhel(target_version)
|
|
+
|
|
+ if not target_python:
|
|
+ api.current_logger().info(
|
|
+ "RHEL version {} is not supported for third-party Python modules scanning, "
|
|
+ "skipping check.".format(target_version)
|
|
+ )
|
|
+ return
|
|
+
|
|
+ if not is_target_python_present(target_python):
|
|
+ api.current_logger().info(
|
|
+ "Target Python interpreter {} is not installed on the source system, "
|
|
+ "skipping check of 3rd party python modules.".format(target_python)
|
|
+ )
|
|
+ return
|
|
+ system_paths = get_python_sys_paths(target_python)
|
|
+ rpm_files = identify_files_of_pypackages(system_paths[1:])
|
|
+
|
|
+ rpms_to_check, third_party_unowned_files = scan_python_files(system_paths[1:], rpm_files)
|
|
+
|
|
+ third_party_rpms, third_party_rpm_files = identify_unsigned_rpms(rpms_to_check)
|
|
+
|
|
+ # Combine all third-party files (unowned + from third-party RPMs)
|
|
+ all_third_party_files = third_party_unowned_files + third_party_rpm_files
|
|
+
|
|
+ if third_party_rpms or all_third_party_files:
|
|
+ api.current_logger().warning(
|
|
+ 'Found {} third-party RPM package(s) and {} third-party Python file(s) '
|
|
+ 'for target Python {}'.format(
|
|
+ len(third_party_rpms), len(all_third_party_files), target_python
|
|
+ )
|
|
+ )
|
|
+
|
|
+ if third_party_rpms:
|
|
+ api.current_logger().info(
|
|
+ 'Complete list of third-party RPM packages:{}'.format(
|
|
+ ''.join(_formatted_list_output(third_party_rpms))
|
|
+ )
|
|
+ )
|
|
+
|
|
+ if all_third_party_files:
|
|
+ api.current_logger().info(
|
|
+ 'Complete list of third-party Python modules:{}'.format(
|
|
+ ''.join(_formatted_list_output(all_third_party_files))
|
|
+ )
|
|
+ )
|
|
+
|
|
+ api.produce(ThirdPartyTargetPythonModules(
|
|
+ target_python=target_python,
|
|
+ third_party_modules=all_third_party_files,
|
|
+ third_party_rpm_names=third_party_rpms
|
|
+ ))
|
|
diff --git a/repos/system_upgrade/common/actors/scanthirdpartytargetpythonmodules/tests/test_scan_third_party_target_python_modules.py b/repos/system_upgrade/common/actors/scanthirdpartytargetpythonmodules/tests/test_scan_third_party_target_python_modules.py
|
|
new file mode 100644
|
|
index 00000000..796185ae
|
|
--- /dev/null
|
|
+++ b/repos/system_upgrade/common/actors/scanthirdpartytargetpythonmodules/tests/test_scan_third_party_target_python_modules.py
|
|
@@ -0,0 +1,136 @@
|
|
+from collections import defaultdict, namedtuple
|
|
+from pathlib import Path
|
|
+
|
|
+import pytest
|
|
+
|
|
+from leapp.libraries.actor import scanthirdpartytargetpythonmodules
|
|
+from leapp.libraries.common.testutils import logger_mocked
|
|
+from leapp.libraries.stdlib import api
|
|
+from leapp.models import DistributionSignedRPM
|
|
+
|
|
+Parent = namedtuple('Parent', ['name'])
|
|
+MockFile = namedtuple('MockFile', ['name', 'parent', 'path'])
|
|
+
|
|
+
|
|
+def _mock_file_str(self):
|
|
+ return self.path
|
|
+
|
|
+
|
|
+MockFile.__str__ = _mock_file_str
|
|
+
|
|
+
|
|
+@pytest.mark.parametrize('rhel_version,expected_python', [
|
|
+ ('9', 'python3.9'),
|
|
+ ('10', 'python3.12'),
|
|
+ ('8', None),
|
|
+ ('7', None),
|
|
+ ('', None),
|
|
+ ('invalid', None),
|
|
+ (None, None),
|
|
+])
|
|
+def test_get_python_binary_for_rhel(rhel_version, expected_python):
|
|
+ assert scanthirdpartytargetpythonmodules.get_python_binary_for_rhel(rhel_version) == expected_python
|
|
+
|
|
+
|
|
+@pytest.mark.parametrize('file_name,parent_name,should_skip', [
|
|
+ ('module.pyc', '__pycache__', True),
|
|
+ ('module.pyc', 'site-packages', False),
|
|
+ ('module.py', '__pycache__', False),
|
|
+ ('module.so', '__pycache__', False),
|
|
+ ('module.py', 'site-packages', False),
|
|
+ ('module.so', 'site-packages', False),
|
|
+])
|
|
+def test_should_skip_file(file_name, parent_name, should_skip):
|
|
+ mock_file = MockFile(name=file_name, parent=Parent(name=parent_name), path='/dummy/path')
|
|
+ assert scanthirdpartytargetpythonmodules._should_skip_file(mock_file) is should_skip
|
|
+
|
|
+
|
|
+def test_scan_python_files(monkeypatch):
|
|
+ system_paths = [Path('/usr/lib/python3.9/site-packages')]
|
|
+ rpm_files = {
|
|
+ '/usr/lib/python3.9/site-packages/rpm_module.py': 'rpm-package',
|
|
+ '/usr/lib/python3.9/site-packages/another.py': 'another-rpm',
|
|
+ }
|
|
+
|
|
+ def mock_is_dir(self):
|
|
+ return True
|
|
+
|
|
+ def mock_find_python_related(root):
|
|
+ files = [
|
|
+ MockFile('rpm_module.py', Parent('site-packages'), '/usr/lib/python3.9/site-packages/rpm_module.py'),
|
|
+ MockFile('unowned.py', Parent('site-packages'), '/usr/lib/python3.9/site-packages/unowned.py'),
|
|
+ MockFile('another.py', Parent('site-packages'), '/usr/lib/python3.9/site-packages/another.py'),
|
|
+ ]
|
|
+ return iter(files)
|
|
+
|
|
+ monkeypatch.setattr(Path, 'is_dir', mock_is_dir)
|
|
+ monkeypatch.setattr(scanthirdpartytargetpythonmodules, 'find_python_related', mock_find_python_related)
|
|
+
|
|
+ rpms_to_check, unowned = scanthirdpartytargetpythonmodules.scan_python_files(system_paths, rpm_files)
|
|
+
|
|
+ assert 'rpm-package' in rpms_to_check
|
|
+ assert 'another-rpm' in rpms_to_check
|
|
+ assert '/usr/lib/python3.9/site-packages/unowned.py' in unowned
|
|
+ assert len(unowned) == 1
|
|
+
|
|
+
|
|
+@pytest.mark.parametrize('path_exists,mock_files', [
|
|
+ (False, None),
|
|
+ (True, [MockFile('module.pyc', Parent('__pycache__'), '/usr/lib/python3.9/site-packages/__pycache__/module.pyc')]),
|
|
+])
|
|
+def test_scan_python_files_filtering(monkeypatch, path_exists, mock_files):
|
|
+ system_paths = [Path('/usr/lib/python3.9/site-packages')]
|
|
+ rpm_files = {}
|
|
+
|
|
+ def mock_is_dir(self):
|
|
+ return path_exists
|
|
+
|
|
+ monkeypatch.setattr(Path, 'is_dir', mock_is_dir)
|
|
+
|
|
+ if mock_files is not None:
|
|
+ def mock_find_python_related(root):
|
|
+ return iter(mock_files)
|
|
+ monkeypatch.setattr(scanthirdpartytargetpythonmodules, 'find_python_related', mock_find_python_related)
|
|
+
|
|
+ rpms_to_check, unowned = scanthirdpartytargetpythonmodules.scan_python_files(system_paths, rpm_files)
|
|
+
|
|
+ assert len(rpms_to_check) == 0
|
|
+ assert len(unowned) == 0
|
|
+
|
|
+
|
|
+@pytest.mark.parametrize('is_signed,expected_rpm_count,expected_file_count', [
|
|
+ (False, 1, 2),
|
|
+ (True, 0, 0),
|
|
+])
|
|
+def test_identify_unsigned_rpms(monkeypatch, is_signed, expected_rpm_count, expected_file_count):
|
|
+ rpms_to_check = defaultdict(list)
|
|
+ package_name = 'test-package'
|
|
+ rpms_to_check[package_name] = [
|
|
+ '/path/to/file1.py',
|
|
+ '/path/to/file2.py',
|
|
+ ]
|
|
+
|
|
+ def mock_has_package(model, pkg_name):
|
|
+ return is_signed
|
|
+
|
|
+ monkeypatch.setattr(scanthirdpartytargetpythonmodules, 'has_package', mock_has_package)
|
|
+ monkeypatch.setattr(api, 'current_logger', logger_mocked())
|
|
+
|
|
+ third_party_rpms, third_party_files = scanthirdpartytargetpythonmodules.identify_unsigned_rpms(rpms_to_check)
|
|
+
|
|
+ assert len(third_party_rpms) == expected_rpm_count
|
|
+ assert len(third_party_files) == expected_file_count
|
|
+
|
|
+ if not is_signed:
|
|
+ assert package_name in third_party_rpms
|
|
+ assert '/path/to/file1.py' in third_party_files
|
|
+ assert '/path/to/file2.py' in third_party_files
|
|
+
|
|
+
|
|
+def test_identify_unsigned_rpms_empty_input():
|
|
+ rpms_to_check = defaultdict(list)
|
|
+
|
|
+ third_party_rpms, third_party_files = scanthirdpartytargetpythonmodules.identify_unsigned_rpms(rpms_to_check)
|
|
+
|
|
+ assert len(third_party_rpms) == 0
|
|
+ assert len(third_party_files) == 0
|
|
diff --git a/repos/system_upgrade/common/models/thirdpartytagetpythonmodules.py b/repos/system_upgrade/common/models/thirdpartytagetpythonmodules.py
|
|
new file mode 100644
|
|
index 00000000..105e9f2c
|
|
--- /dev/null
|
|
+++ b/repos/system_upgrade/common/models/thirdpartytagetpythonmodules.py
|
|
@@ -0,0 +1,25 @@
|
|
+from leapp.models import fields, Model
|
|
+from leapp.topics import SystemInfoTopic
|
|
+
|
|
+
|
|
+class ThirdPartyTargetPythonModules(Model):
|
|
+ """
|
|
+ Information about third-party target Python modules found on system.
|
|
+
|
|
+ """
|
|
+ topic = SystemInfoTopic
|
|
+
|
|
+ target_python = fields.String()
|
|
+ """
|
|
+ Target system Python version.
|
|
+ """
|
|
+
|
|
+ third_party_modules = fields.List(fields.String(), default=[])
|
|
+ """
|
|
+ List of third-party target Python modules found on the source system. Empty list if no modules found.
|
|
+ """
|
|
+
|
|
+ third_party_rpm_names = fields.List(fields.String(), default=[])
|
|
+ """
|
|
+ List of third-party RPMs found on the source system. Empty list if no modules found.
|
|
+ """
|
|
diff --git a/requirements.txt b/requirements.txt
|
|
index a1bb4725..3c79b23d 100644
|
|
--- a/requirements.txt
|
|
+++ b/requirements.txt
|
|
@@ -14,3 +14,4 @@ git+https://github.com/oamg/leapp
|
|
requests
|
|
# pinning a py27 troublemaking transitive dependency
|
|
lazy-object-proxy==1.5.2; python_version < '3'
|
|
+rpm
|
|
--
|
|
2.51.1
|
|
|