From 2952a4ea50fd1d720410394456c98732ba6b7cc8 Mon Sep 17 00:00:00 2001 From: Inessa Vasilevskaya Date: Wed, 5 Apr 2023 16:53:10 +0200 Subject: [PATCH 05/18] Improve handling of a remediation with non-ascii data When a remediation command is printed that manipulates non-ascii but totally valid utf-8 strings, leapp should not give out any tracebacks upon report generation. The idea of a fix is to make repr implementation of Remediation aware of utf-8 encoding. The root of the problem is that a mixture of native text / encoded string can appear in a report message and that caused report generating code go crazy while creating actual log files. Unit test has been added as well. Related-to: OAMG-8629 --- leapp/reporting/__init__.py | 43 ++++++++++++++++++++++++++++----- leapp/utils/report.py | 35 +++++++++++++++++++-------- tests/scripts/test_reporting.py | 18 +++++++++++++- 3 files changed, 79 insertions(+), 17 deletions(-) diff --git a/leapp/reporting/__init__.py b/leapp/reporting/__init__.py index 821f19f..7b87a38 100644 --- a/leapp/reporting/__init__.py +++ b/leapp/reporting/__init__.py @@ -238,30 +238,61 @@ class BaseRemediation(BaseListPrimitive): return ('detail', 'remediations') +def _guarantee_encoded_str(text, encoding='utf-8'): + """Guarantees that result is a utf-8 encoded string""" + # Apply conversion only in py2 case + if six.PY3: + return text + # Unless data is already encoded -> encode it + try: + return text.encode(encoding) + except (UnicodeDecodeError, AttributeError): + return text + + +def _guarantee_decoded_str(text, encoding='utf-8'): + """Guarantees that result is 'native' text""" + # Apply conversion only in py2 case + if six.PY3: + return text + # Unless data is already decoded -> decode it + try: + return text.decode(encoding) + except (UnicodeEncodeError, AttributeError): + return text + + class RemediationCommand(BaseRemediation): def __init__(self, value=None): if not isinstance(value, list): raise TypeError('Value of "RemediationCommand" must be a list') - self._value = {'type': 'command', 'context': value} + self._value = {'type': 'command', 'context': [_guarantee_decoded_str(c) for c in value]} def __repr__(self): - return "[{}] {}".format(self._value['type'], ' '.join(self._value['context'])) + # NOTE(ivasilev) As the message can contain non-ascii characters let's deal with it properly. + # As per python practices repr has to return an encoded string + return "[{}] {}".format(self._value['type'], + ' '.join([_guarantee_encoded_str(c) for c in self._value['context']])) class RemediationHint(BaseRemediation): def __init__(self, value=None): - self._value = {'type': 'hint', 'context': value} + self._value = {'type': 'hint', 'context': _guarantee_decoded_str(value)} def __repr__(self): - return "[{}] {}".format(self._value['type'], self._value['context']) + # NOTE(ivasilev) As the message can contain non-ascii characters let's deal with it properly. + # As per python practices repr has to return an encoded string + return "[{}] {}".format(self._value['type'], _guarantee_encoded_str(self._value['context'])) class RemediationPlaybook(BaseRemediation): def __init__(self, value=None): - self._value = {'type': 'playbook', 'context': value} + self._value = {'type': 'playbook', 'context': _guarantee_decoded_str(value)} def __repr__(self): - return "[{}] {}".format(self._value['type'], self._value['context']) + # NOTE(ivasilev) As the message can contain non-ascii characters let's deal with it properly. + # As per python practices repr has to return an encoded string + return "[{}] {}".format(self._value['type'], _guarantee_encoded_str(self._value['context'])) class Remediation(object): diff --git a/leapp/utils/report.py b/leapp/utils/report.py index 3efb14c..71d47e3 100644 --- a/leapp/utils/report.py +++ b/leapp/utils/report.py @@ -1,7 +1,10 @@ import hashlib +import io import json import os +import six + from leapp.reporting import ( _DEPRECATION_FLAGS, Groups, @@ -110,21 +113,29 @@ def generate_report_file(messages_to_report, context, path, report_schema='1.1.0 # NOTE(ivasilev) Int conversion should not break as only specific formats of report_schema versions are allowed report_schema_tuple = tuple(int(x) for x in report_schema.split('.')) if path.endswith(".txt"): - with open(path, 'w') as f: + with io.open(path, 'w', encoding='utf-8') as f: for message in sorted(messages_to_report, key=importance): - f.write('Risk Factor: {}{}\n'.format(message['severity'], - ' (inhibitor)' if is_inhibitor(message) else '')) - f.write('Title: {}\n'.format(message['title'])) - f.write('Summary: {}\n'.format(message['summary'])) + f.write(u'Risk Factor: {}{}\n'.format(message['severity'], + ' (inhibitor)' if is_inhibitor(message) else '')) + f.write(u'Title: {}\n'.format(message['title'])) + f.write(u'Summary: {}\n'.format(message['summary'])) remediation = Remediation.from_dict(message.get('detail', {})) if remediation: - f.write('Remediation: {}\n'.format(remediation)) + # NOTE(ivasilev) Decoding is necessary in case of python2 as remediation is an encoded string, + # while io.open expects "true text" input. For python3 repr will return proper py3 str, no + # decoding will be needed. + # This hassle and clumsiness makes me sad, so suggestions are welcome. + remediation_text = 'Remediation: {}\n'.format(remediation) + if isinstance(remediation_text, six.binary_type): + # This will be true for py2 where repr returns an encoded string + remediation_text = remediation_text.decode('utf-8') + f.write(remediation_text) if report_schema_tuple > (1, 0, 0): # report-schema 1.0.0 doesn't have a stable report key - f.write('Key: {}\n'.format(message['key'])) - f.write('-' * 40 + '\n') + f.write(u'Key: {}\n'.format(message['key'])) + f.write(u'-' * 40 + '\n') elif path.endswith(".json"): - with open(path, 'w') as f: + with io.open(path, 'w', encoding='utf-8') as f: # Here all possible convertions will take place if report_schema_tuple < (1, 1, 0): # report-schema 1.0.0 doesn't have a stable report key @@ -150,4 +161,8 @@ def generate_report_file(messages_to_report, context, path, report_schema='1.1.0 # NOTE(ivasilev) As flags field is created only if there is an inhibitor # a default value to pop has to be specified not to end up with a KeyError msg.pop('flags', None) - json.dump({'entries': messages_to_report, 'leapp_run_id': context}, f, indent=2) + # NOTE(ivasilev) Has to be done in this way (dumps + py2 conversion + write to file instead of json.dump) + # because of a py2 bug in the json module that can produce a mix of unicode and str objects that will be + # incompatible with write. https://bugs.python.org/issue13769 + data = json.dumps({'entries': messages_to_report, 'leapp_run_id': context}, indent=2, ensure_ascii=False) + f.write(data) diff --git a/tests/scripts/test_reporting.py b/tests/scripts/test_reporting.py index 71576fe..4a9a59b 100644 --- a/tests/scripts/test_reporting.py +++ b/tests/scripts/test_reporting.py @@ -1,3 +1,4 @@ +# -*- coding: utf-8 -*- from collections import namedtuple import datetime import json @@ -13,7 +14,7 @@ from leapp.reporting import ( create_report_from_deprecation, create_report_from_error, _create_report_object, - Audience, Flags, Groups, Key, RelatedResource, Summary, Severity, Tags, Title + Audience, Flags, Groups, Key, RelatedResource, Remediation, Summary, Severity, Tags, Title ) from leapp.utils.report import generate_report_file @@ -102,6 +103,21 @@ def test_report_tags_and_flags(): assert Flags(["This is a new flag", Groups.INHIBITOR]).value == ["This is a new flag", "inhibitor"] +@pytest.mark.parametrize("report_suffix", ('.json', '.txt')) +def test_remediation_with_non_ascii_value(report_suffix): + report_entries = [Title('Some report title'), Summary('Some summary not used for dynamical key generation'), + Audience('sysadmin')] + # Partly related to leapp-repository PR1052 + rem_command = ["ln", "-snf", "root/břeťa", "/bobošík"] + rem_hint = "Don't forget to check /bobošík directory!" + rem_playbook = "bobošík.yml" + rem = Remediation(commands=[rem_command], hint=rem_hint, playbook=rem_playbook) + report_entries.append(rem) + report_message = _create_report_object(report_entries).report + with tempfile.NamedTemporaryFile(suffix=report_suffix) as reportfile: + generate_report_file([report_message], 'leapp-run-id', reportfile.name, '1.1.0') + + def test_convert_from_error_to_report(): error_dict_no_details = { 'message': 'The system is not registered or subscribed.', -- 2.41.0