From dc7dc4d712c1e32a62701319130f8dd66da5ecc4 Mon Sep 17 00:00:00 2001 From: Lubomir Rintel Date: Mon, 26 Sep 2022 11:01:35 +0200 Subject: [PATCH 70/75] Make CheckNetworkDeprecations consume IfCfg and NetworkManagerConnection This actor used to scan the NetworkManager keyfiles and icfg files itself. No more! --- .../actors/networkdeprecations/actor.py | 7 +- .../libraries/networkdeprecations.py | 71 +++---- .../tests/unit_test_networkdeprecations.py | 192 ++++++++---------- 3 files changed, 111 insertions(+), 159 deletions(-) diff --git a/repos/system_upgrade/el8toel9/actors/networkdeprecations/actor.py b/repos/system_upgrade/el8toel9/actors/networkdeprecations/actor.py index 19113e4f..3074a3c7 100644 --- a/repos/system_upgrade/el8toel9/actors/networkdeprecations/actor.py +++ b/repos/system_upgrade/el8toel9/actors/networkdeprecations/actor.py @@ -1,7 +1,7 @@ from leapp.actors import Actor from leapp.libraries.actor import networkdeprecations -from leapp.models import Report -from leapp.tags import FactsPhaseTag, IPUWorkflowTag +from leapp.models import IfCfg, NetworkManagerConnection, Report +from leapp.tags import ChecksPhaseTag, IPUWorkflowTag class CheckNetworkDeprecations(Actor): @@ -16,8 +16,9 @@ class CheckNetworkDeprecations(Actor): """ name = "network_deprecations" + consumes = (IfCfg, NetworkManagerConnection,) produces = (Report,) - tags = (IPUWorkflowTag, FactsPhaseTag,) + tags = (ChecksPhaseTag, IPUWorkflowTag,) def process(self): networkdeprecations.process() diff --git a/repos/system_upgrade/el8toel9/actors/networkdeprecations/libraries/networkdeprecations.py b/repos/system_upgrade/el8toel9/actors/networkdeprecations/libraries/networkdeprecations.py index 2a6a2de9..92dfc51d 100644 --- a/repos/system_upgrade/el8toel9/actors/networkdeprecations/libraries/networkdeprecations.py +++ b/repos/system_upgrade/el8toel9/actors/networkdeprecations/libraries/networkdeprecations.py @@ -1,11 +1,6 @@ -import errno -import os - from leapp import reporting -from leapp.libraries.common import utils - -SYSCONFIG_DIR = '/etc/sysconfig/network-scripts' -NM_CONN_DIR = '/etc/NetworkManager/system-connections' +from leapp.libraries.stdlib import api +from leapp.models import IfCfg, NetworkManagerConnection FMT_LIST_SEPARATOR = '\n - ' @@ -13,56 +8,36 @@ FMT_LIST_SEPARATOR = '\n - ' def process(): wep_files = [] - # Scan NetworkManager native keyfiles - try: - keyfiles = os.listdir(NM_CONN_DIR) - except OSError as e: - if e.errno != errno.ENOENT: - raise - keyfiles = [] - - for f in keyfiles: - path = os.path.join(NM_CONN_DIR, f) - - cp = utils.parse_config(open(path, mode='r').read()) - - if not cp.has_section('wifi-security'): - continue + # Scan NetworkManager native keyfile connections + for nmconn in api.consume(NetworkManagerConnection): + for setting in nmconn.settings: + if not setting.name == 'wifi-security': + continue - key_mgmt = cp.get('wifi-security', 'key-mgmt') - if key_mgmt in ('none', 'ieee8021x'): - wep_files.append(path) + for prop in setting.properties: + if not prop.name == 'key-mgmt': + continue + if prop.value in ('none', 'ieee8021x'): + wep_files.append(nmconn.filename) # Scan legacy ifcfg files & secrets - try: - ifcfgs = os.listdir(SYSCONFIG_DIR) - except OSError as e: - if e.errno != errno.ENOENT: - raise - ifcfgs = [] - - for f in ifcfgs: - path = os.path.join(SYSCONFIG_DIR, f) + for ifcfg in api.consume(IfCfg): + props = ifcfg.properties + if ifcfg.secrets is not None: + props = props + ifcfg.secrets - if not f.startswith('ifcfg-') and not f.startswith('keys-'): - continue - - for line in open(path).readlines(): - try: - (key, value) = line.split('#')[0].strip().split('=') - except ValueError: - # We're not interested in lines that are not - # simple assignments. Play it safe. - continue + for prop in props: + name = prop.name + value = prop.value # Dynamic WEP - if key == 'KEY_MGMT' and value.upper() == 'IEEE8021X': - wep_files.append(path) + if name == 'KEY_MGMT' and value.upper() == 'IEEE8021X': + wep_files.append(ifcfg.filename) continue # Static WEP, possibly with agent-owned secrets - if key in ('KEY_PASSPHRASE1', 'KEY1', 'WEP_KEY_FLAGS'): - wep_files.append(path) + if name in ('KEY_PASSPHRASE1', 'KEY1', 'WEP_KEY_FLAGS'): + wep_files.append(ifcfg.filename) continue if wep_files: diff --git a/repos/system_upgrade/el8toel9/actors/networkdeprecations/tests/unit_test_networkdeprecations.py b/repos/system_upgrade/el8toel9/actors/networkdeprecations/tests/unit_test_networkdeprecations.py index bd140405..659ab993 100644 --- a/repos/system_upgrade/el8toel9/actors/networkdeprecations/tests/unit_test_networkdeprecations.py +++ b/repos/system_upgrade/el8toel9/actors/networkdeprecations/tests/unit_test_networkdeprecations.py @@ -1,148 +1,124 @@ -import errno -import textwrap - -import mock -import six - -from leapp import reporting -from leapp.libraries.actor import networkdeprecations -from leapp.libraries.common.testutils import create_report_mocked, make_OSError - - -def _listdir_nm_conn(path): - if path == networkdeprecations.NM_CONN_DIR: - return ['connection'] - raise make_OSError(errno.ENOENT) - - -def _listdir_ifcfg(path): - if path == networkdeprecations.SYSCONFIG_DIR: - return ['ifcfg-wireless'] - raise make_OSError(errno.ENOENT) - - -def _listdir_keys(path): - if path == networkdeprecations.SYSCONFIG_DIR: - return ['keys-wireless'] - raise make_OSError(errno.ENOENT) - - -def test_no_conf(monkeypatch): +from leapp.models import ( + IfCfg, + IfCfgProperty, + NetworkManagerConnection, + NetworkManagerConnectionProperty, + NetworkManagerConnectionSetting +) +from leapp.reporting import Report +from leapp.utils.report import is_inhibitor + + +def test_no_conf(current_actor_context): """ No report if there are no networks. """ - monkeypatch.setattr(networkdeprecations.os, 'listdir', lambda _: ()) - monkeypatch.setattr(reporting, 'create_report', create_report_mocked()) - networkdeprecations.process() - assert not reporting.create_report.called + current_actor_context.run() + assert not current_actor_context.consume(Report) -def test_no_wireless(monkeypatch): +def test_no_wireless(current_actor_context): """ No report if there's a keyfile, but it's not for a wireless connection. """ - mock_config = mock.mock_open(read_data='[connection]') - with mock.patch('builtins.open' if six.PY3 else '__builtin__.open', mock_config): - monkeypatch.setattr(networkdeprecations.os, 'listdir', _listdir_nm_conn) - monkeypatch.setattr(reporting, 'create_report', create_report_mocked()) - networkdeprecations.process() - assert not reporting.create_report.called + not_wifi_nm_conn = NetworkManagerConnection(filename='/NM/wlan0.nmconn', settings=( + NetworkManagerConnectionSetting(name='connection'), + )) + current_actor_context.feed(not_wifi_nm_conn) + current_actor_context.run() + assert not current_actor_context.consume(Report) -def test_keyfile_static_wep(monkeypatch): + +def test_keyfile_static_wep(current_actor_context): """ Report if there's a static WEP keyfile. """ - STATIC_WEP_CONN = textwrap.dedent(""" - [wifi-security] - auth-alg=open - key-mgmt=none - wep-key-type=1 - wep-key0=abcde - """) + static_wep_nm_conn = NetworkManagerConnection(filename='/NM/wlan0.nmconn', settings=( + NetworkManagerConnectionSetting(name='wifi-security', properties=( + NetworkManagerConnectionProperty(name='auth-alg', value='open'), + NetworkManagerConnectionProperty(name='key-mgmt', value='none'), + NetworkManagerConnectionProperty(name='wep-key-type', value='1'), + )), + )) - mock_config = mock.mock_open(read_data=STATIC_WEP_CONN) - with mock.patch('builtins.open' if six.PY3 else '__builtin__.open', mock_config): - monkeypatch.setattr(networkdeprecations.os, 'listdir', _listdir_nm_conn) - monkeypatch.setattr(reporting, 'create_report', create_report_mocked()) - networkdeprecations.process() - assert reporting.create_report.called + current_actor_context.feed(static_wep_nm_conn) + current_actor_context.run() + report_fields = current_actor_context.consume(Report)[0].report + assert is_inhibitor(report_fields) -def test_keyfile_dynamic_wep(monkeypatch): +def test_keyfile_dynamic_wep(current_actor_context): """ Report if there's a dynamic WEP keyfile. """ - DYNAMIC_WEP_CONN = textwrap.dedent(""" - [wifi-security] - key-mgmt=ieee8021x - """) + dynamic_wep_conn = NetworkManagerConnection(filename='/NM/wlan0.nmconn', settings=( + NetworkManagerConnectionSetting(name='wifi-security', properties=( + NetworkManagerConnectionProperty(name='key-mgmt', value='ieee8021x'), + )), + )) - mock_config = mock.mock_open(read_data=DYNAMIC_WEP_CONN) - with mock.patch('builtins.open' if six.PY3 else '__builtin__.open', mock_config): - monkeypatch.setattr(networkdeprecations.os, 'listdir', _listdir_nm_conn) - monkeypatch.setattr(reporting, 'create_report', create_report_mocked()) - networkdeprecations.process() - assert reporting.create_report.called + current_actor_context.feed(dynamic_wep_conn) + current_actor_context.run() + report_fields = current_actor_context.consume(Report)[0].report + assert is_inhibitor(report_fields) -def test_ifcfg_static_wep_ask(monkeypatch): +def test_ifcfg_static_wep_ask(current_actor_context): """ Report if there's a static WEP sysconfig without stored key. """ - STATIC_WEP_ASK_KEY_SYSCONFIG = textwrap.dedent(""" - TYPE=Wireless - ESSID=wep1 - NAME=wep1 - MODE=Managed - WEP_KEY_FLAGS=ask - SECURITYMODE=open - DEFAULTKEY=1 - KEY_TYPE=key - """) - - mock_config = mock.mock_open(read_data=STATIC_WEP_ASK_KEY_SYSCONFIG) - with mock.patch('builtins.open' if six.PY3 else '__builtin__.open', mock_config): - monkeypatch.setattr(networkdeprecations.os, 'listdir', _listdir_ifcfg) - monkeypatch.setattr(reporting, 'create_report', create_report_mocked()) - networkdeprecations.process() - assert reporting.create_report.called - - -def test_ifcfg_static_wep(monkeypatch): + static_wep_ask_key_ifcfg = IfCfg(filename='/NM/ifcfg-wlan0', properties=( + IfCfgProperty(name='TYPE', value='Wireless'), + IfCfgProperty(name='ESSID', value='wep1'), + IfCfgProperty(name='NAME', value='wep1'), + IfCfgProperty(name='MODE', value='Managed'), + IfCfgProperty(name='WEP_KEY_FLAGS', value='ask'), + IfCfgProperty(name='SECURITYMODE', value='open'), + IfCfgProperty(name='DEFAULTKEY', value='1'), + IfCfgProperty(name='KEY_TYPE', value='key'), + )) + + current_actor_context.feed(static_wep_ask_key_ifcfg) + current_actor_context.run() + report_fields = current_actor_context.consume(Report)[0].report + assert is_inhibitor(report_fields) + + +def test_ifcfg_static_wep(current_actor_context): """ Report if there's a static WEP sysconfig with a stored passphrase. """ - mock_config = mock.mock_open(read_data='KEY_PASSPHRASE1=Hell0') - with mock.patch('builtins.open' if six.PY3 else '__builtin__.open', mock_config): - monkeypatch.setattr(networkdeprecations.os, 'listdir', _listdir_keys) - monkeypatch.setattr(reporting, 'create_report', create_report_mocked()) - networkdeprecations.process() - assert reporting.create_report.called + static_wep_ifcfg = IfCfg(filename='/NM/ifcfg-wlan0', secrets=( + IfCfgProperty(name='KEY_PASSPHRASE1', value=None), + )) + + current_actor_context.feed(static_wep_ifcfg) + current_actor_context.run() + report_fields = current_actor_context.consume(Report)[0].report + assert is_inhibitor(report_fields) -def test_ifcfg_dynamic_wep(monkeypatch): +def test_ifcfg_dynamic_wep(current_actor_context): """ Report if there's a dynamic WEP sysconfig. """ - DYNAMIC_WEP_SYSCONFIG = textwrap.dedent(""" - ESSID=dynwep1 - MODE=Managed - KEY_MGMT=IEEE8021X # Dynamic WEP! - TYPE=Wireless - NAME=dynwep1 - """) - - mock_config = mock.mock_open(read_data=DYNAMIC_WEP_SYSCONFIG) - with mock.patch('builtins.open' if six.PY3 else '__builtin__.open', mock_config): - monkeypatch.setattr(networkdeprecations.os, 'listdir', _listdir_ifcfg) - monkeypatch.setattr(reporting, 'create_report', create_report_mocked()) - networkdeprecations.process() - assert reporting.create_report.called + dynamic_wep_ifcfg = IfCfg(filename='/NM/ifcfg-wlan0', properties=( + IfCfgProperty(name='ESSID', value='dynwep1'), + IfCfgProperty(name='MODE', value='Managed'), + IfCfgProperty(name='KEY_MGMT', value='IEEE8021X'), + IfCfgProperty(name='TYPE', value='Wireless'), + IfCfgProperty(name='NAME', value='dynwep1'), + )) + + current_actor_context.feed(dynamic_wep_ifcfg) + current_actor_context.run() + report_fields = current_actor_context.consume(Report)[0].report + assert is_inhibitor(report_fields) -- 2.39.0