kiwi-el8/test/unit/command_process_test.py
Marcus Schäfer 24bef17f0e
Fix logging of stderr data in command calls
The stderr data was presented as one blob without line
breaks. Hard to read and smells like a bug. This commit
fixes the output to become readable
2025-06-24 15:30:49 +02:00

164 lines
6.1 KiB
Python

import logging
from unittest.mock import (
patch, Mock
)
from pytest import (
raises, fixture
)
from builtins import bytes
from kiwi.command_process import (
CommandProcess,
CommandIterator
)
from kiwi.exceptions import KiwiCommandError
class TestCommandProcess:
@fixture(autouse=True)
def inject_fixtures(self, caplog):
self._caplog = caplog
def fake_matcher(self, item, output):
return True
def poll(self):
return self.data_flow.pop()
def outavailable(self):
return True
def erravailable(self):
return True
def outdata(self):
return self.data_out.pop()
def errdata(self):
return self.data_err.pop()
def create_flow_method(self, method):
def create_method(arg=None):
return method()
return create_method
def setup(self):
self.data_flow = [True, None, None, None, None, None, None, None]
self.data_out = [
bytes(b''), bytes(b''), bytes(b'\n'), bytes(b'a'),
bytes(b't'), bytes(b'a'), bytes(b'd')
]
self.data_err = [
bytes(b''), bytes(b'\n'), bytes(b'r'), bytes(b'o'),
bytes(b'r'), bytes(b'r'), bytes(b'e')
]
self.flow = self.create_flow_method(self.poll)
self.flow_out_available = self.create_flow_method(self.outavailable)
self.flow_err_available = self.create_flow_method(self.erravailable)
self.flow_out = self.create_flow_method(self.outdata)
self.flow_err = self.create_flow_method(self.errdata)
def setup_method(self, cls):
self.setup()
@patch('kiwi.command.Command')
def test_returncode(self, mock_command):
command = Mock()
mock_command.return_value = command
process = CommandProcess(command)
assert process.returncode() == command.process.returncode
@patch('kiwi.command.Command')
def test_poll_show_progress(self, mock_command):
match_method = CommandProcess(mock_command).create_match_method(
self.fake_matcher
)
process = CommandProcess(mock_command)
process.command.command.process.poll = self.flow
process.command.command.output_available = self.flow_out_available
process.command.command.error_available = self.flow_err_available
process.command.command.output.read = self.flow_out
process.command.command.error.read = self.flow_err
process.command.command.process.returncode = 0
with self._caplog.at_level(logging.DEBUG):
process.poll_show_progress(['a', 'b'], match_method, True)
assert 'system: data' in self._caplog.text
@patch('kiwi.command.Command')
def test_poll_show_progress_raises(self, mock_command):
match_method = CommandProcess(mock_command).create_match_method(
self.fake_matcher
)
process = CommandProcess(mock_command)
process.command.command.process.poll = self.flow
process.command.command.output_available = self.flow_out_available
process.command.command.error_available = self.flow_err_available
process.command.command.output.read = self.flow_out
process.command.command.error.read = self.flow_err
process.command.command.process.returncode = 1
with raises(KiwiCommandError):
process.poll_show_progress(['a', 'b'], match_method)
@patch('kiwi.command.Command')
def test_poll(self, mock_command):
process = CommandProcess(mock_command)
process.command.command.process.poll = self.flow
process.command.command.output_available = self.flow_out_available
process.command.command.error_available = self.flow_err_available
process.command.command.output.read = self.flow_out
process.command.command.error.read = self.flow_err
process.command.command.process.returncode = 0
with self._caplog.at_level(logging.DEBUG):
process.poll()
assert 'system: data' in self._caplog.text
@patch('kiwi.command.Command')
def test_poll_raises(self, mock_command):
process = CommandProcess(mock_command)
process.command.command.process.poll = self.flow
process.command.command.output_available = self.flow_out_available
process.command.command.error_available = self.flow_err_available
process.command.command.output.read = self.flow_out
process.command.command.error.read = self.flow_err
process.command.command.output.read.return_value = 'data'
process.command.command.process.returncode = 1
with raises(KiwiCommandError):
process.poll()
@patch('kiwi.command.Command')
def test_poll_and_watch(self, mock_command):
process = CommandProcess(mock_command)
process.command.command.process.poll = self.flow
process.command.command.output_available = self.flow_out_available
process.command.command.error_available = self.flow_err_available
process.command.command.output.read = self.flow_out
process.command.command.error.read = self.flow_err
process.command.command.process.returncode = 1
with self._caplog.at_level(logging.DEBUG):
result = process.poll_and_watch()
assert '--------------out start-------------' in self._caplog.text
assert 'data' in self._caplog.text
assert '--------------out stop--------------' in self._caplog.text
assert result.stderr == 'error\n'
@patch('kiwi.command.Command')
def test_create_match_method(self, mock_command):
match_method = CommandProcess(mock_command).create_match_method(
self.fake_matcher
)
assert match_method('a', 'b') is True
def test_command_iterator(self):
iterator = CommandIterator(Mock())
assert iterator.__iter__() == iterator
def test_kill(self):
iterator = CommandIterator(Mock())
iterator.kill()
iterator.command.process.kill.assert_called_once_with()
def test_get_pid(self):
iterator = CommandIterator(Mock())
assert iterator.get_pid() == iterator.command.process.pid