[koji-wrapper] Retry watching on connection errors
With this patch Pungi should be more tolerant of network failures when running a blocking command (creating live media or live images). If the connection drops and the output indicates network problems, Pungi will try to watch the task with `koji watch-task`. This will be retried until it finishes (successfully or with some other failure). There is an increasing timeout after each retry. Currently the maximum number of retries is not limited. Signed-off-by: Lubomír Sedlář <lsedlar@redhat.com>
This commit is contained in:
parent
44c4ef5c41
commit
a813e926dc
@ -18,6 +18,7 @@
|
|||||||
import os
|
import os
|
||||||
import pipes
|
import pipes
|
||||||
import re
|
import re
|
||||||
|
import time
|
||||||
|
|
||||||
import koji
|
import koji
|
||||||
import rpmUtils.arch
|
import rpmUtils.arch
|
||||||
@ -234,28 +235,53 @@ class KojiWrapper(object):
|
|||||||
|
|
||||||
return cmd
|
return cmd
|
||||||
|
|
||||||
def run_blocking_cmd(self, command, log_file=None):
|
def _has_connection_error(self, output):
|
||||||
|
"""Checks if output indicates connection error."""
|
||||||
|
return re.search('error: failed to connect\n$', output)
|
||||||
|
|
||||||
|
def _wait_for_task(self, task_id, logfile=None, max_retries=None):
|
||||||
|
"""Tries to wait for a task to finish. On connection error it will
|
||||||
|
retry with `watch-task` command.
|
||||||
|
"""
|
||||||
|
cmd = [self.executable, 'watch-task', str(task_id)]
|
||||||
|
attempt = 0
|
||||||
|
|
||||||
|
while True:
|
||||||
|
retcode, output = run(cmd, can_fail=True, logfile=logfile)
|
||||||
|
|
||||||
|
if retcode == 0 or not self._has_connection_error(output):
|
||||||
|
# Task finished for reason other than connection error.
|
||||||
|
return retcode, output
|
||||||
|
|
||||||
|
attempt += 1
|
||||||
|
if max_retries and attempt >= max_retries:
|
||||||
|
break
|
||||||
|
time.sleep(attempt * 10)
|
||||||
|
|
||||||
|
raise RuntimeError('Failed to wait for task %s. Too many connection errors.' % task_id)
|
||||||
|
|
||||||
|
def run_blocking_cmd(self, command, log_file=None, max_retries=None):
|
||||||
"""
|
"""
|
||||||
Run a blocking koji command. Returns a dict with output of the command,
|
Run a blocking koji command. Returns a dict with output of the command,
|
||||||
its exit code and parsed task id. This method will block until the
|
its exit code and parsed task id. This method will block until the
|
||||||
command finishes.
|
command finishes.
|
||||||
"""
|
"""
|
||||||
try:
|
|
||||||
retcode, output = run(command, can_fail=True, logfile=log_file)
|
retcode, output = run(command, can_fail=True, logfile=log_file)
|
||||||
except RuntimeError, e:
|
|
||||||
raise RuntimeError("%s. %s failed with '%s'" % (e, command, output))
|
|
||||||
|
|
||||||
match = re.search(r"Created task: (\d+)", output)
|
match = re.search(r"Created task: (\d+)", output)
|
||||||
if not match:
|
if not match:
|
||||||
raise RuntimeError("Could not find task ID in output. Command '%s' returned '%s'."
|
raise RuntimeError("Could not find task ID in output. Command '%s' returned '%s'."
|
||||||
% (" ".join(command), output))
|
% (" ".join(command), output))
|
||||||
|
task_id = int(match.groups()[0])
|
||||||
|
|
||||||
result = {
|
if retcode != 0 and self._has_connection_error(output):
|
||||||
|
retcode, output = self._wait_for_task(task_id, logfile=log_file, max_retries=max_retries)
|
||||||
|
|
||||||
|
return {
|
||||||
"retcode": retcode,
|
"retcode": retcode,
|
||||||
"output": output,
|
"output": output,
|
||||||
"task_id": int(match.groups()[0]),
|
"task_id": task_id,
|
||||||
}
|
}
|
||||||
return result
|
|
||||||
|
|
||||||
def get_image_paths(self, task_id):
|
def get_image_paths(self, task_id):
|
||||||
"""
|
"""
|
||||||
|
@ -398,5 +398,112 @@ class RunrootKojiWrapperTest(KojiWrapperBaseTestCase):
|
|||||||
self.assertDictEqual(result, {'retcode': 1, 'output': output, 'task_id': 12345})
|
self.assertDictEqual(result, {'retcode': 1, 'output': output, 'task_id': 12345})
|
||||||
|
|
||||||
|
|
||||||
|
class RunBlockingCmdTest(KojiWrapperBaseTestCase):
|
||||||
|
@mock.patch('pungi.wrappers.kojiwrapper.run')
|
||||||
|
def test_minimal(self, run):
|
||||||
|
output = 'Created task: 1234\nHello\n'
|
||||||
|
run.return_value = (0, output)
|
||||||
|
|
||||||
|
result = self.koji.run_blocking_cmd('cmd')
|
||||||
|
|
||||||
|
self.assertDictEqual(result, {'retcode': 0, 'output': output, 'task_id': 1234})
|
||||||
|
self.assertItemsEqual(run.mock_calls,
|
||||||
|
[mock.call('cmd', can_fail=True, logfile=None)])
|
||||||
|
|
||||||
|
@mock.patch('pungi.wrappers.kojiwrapper.run')
|
||||||
|
def test_with_log(self, run):
|
||||||
|
output = 'Created task: 1234\nHello\n'
|
||||||
|
run.return_value = (0, output)
|
||||||
|
|
||||||
|
result = self.koji.run_blocking_cmd('cmd', log_file='logfile')
|
||||||
|
|
||||||
|
self.assertDictEqual(result, {'retcode': 0, 'output': output, 'task_id': 1234})
|
||||||
|
self.assertItemsEqual(run.mock_calls,
|
||||||
|
[mock.call('cmd', can_fail=True, logfile='logfile')])
|
||||||
|
|
||||||
|
@mock.patch('pungi.wrappers.kojiwrapper.run')
|
||||||
|
def test_fail_with_task_id(self, run):
|
||||||
|
output = 'Created task: 1234\nBoom\n'
|
||||||
|
run.return_value = (1, output)
|
||||||
|
|
||||||
|
result = self.koji.run_blocking_cmd('cmd')
|
||||||
|
|
||||||
|
self.assertDictEqual(result, {'retcode': 1, 'output': output, 'task_id': 1234})
|
||||||
|
self.assertItemsEqual(run.mock_calls,
|
||||||
|
[mock.call('cmd', can_fail=True, logfile=None)])
|
||||||
|
|
||||||
|
@mock.patch('pungi.wrappers.kojiwrapper.run')
|
||||||
|
def test_fail_without_task_id(self, run):
|
||||||
|
output = 'Not found\n'
|
||||||
|
run.return_value = (1, output)
|
||||||
|
|
||||||
|
with self.assertRaises(RuntimeError) as ctx:
|
||||||
|
self.koji.run_blocking_cmd('cmd')
|
||||||
|
|
||||||
|
self.assertItemsEqual(run.mock_calls,
|
||||||
|
[mock.call('cmd', can_fail=True, logfile=None)])
|
||||||
|
self.assertIn('Could not find task ID', ctx.exception.message)
|
||||||
|
|
||||||
|
@mock.patch('pungi.wrappers.kojiwrapper.run')
|
||||||
|
def test_disconnect_and_retry(self, run):
|
||||||
|
output = 'Created task: 1234\nerror: failed to connect\n'
|
||||||
|
retry = 'Created task: 1234\nOook\n'
|
||||||
|
run.side_effect = [(1, output), (0, retry)]
|
||||||
|
|
||||||
|
result = self.koji.run_blocking_cmd('cmd')
|
||||||
|
|
||||||
|
self.assertDictEqual(result, {'retcode': 0, 'output': retry, 'task_id': 1234})
|
||||||
|
self.assertEqual(run.mock_calls,
|
||||||
|
[mock.call('cmd', can_fail=True, logfile=None),
|
||||||
|
mock.call(['koji', 'watch-task', '1234'], can_fail=True, logfile=None)])
|
||||||
|
|
||||||
|
@mock.patch('pungi.wrappers.kojiwrapper.run')
|
||||||
|
def test_disconnect_and_retry_but_fail(self, run):
|
||||||
|
output = 'Created task: 1234\nerror: failed to connect\n'
|
||||||
|
retry = 'Created task: 1234\nNot working still\n'
|
||||||
|
run.side_effect = [(1, output), (1, retry)]
|
||||||
|
|
||||||
|
result = self.koji.run_blocking_cmd('cmd')
|
||||||
|
|
||||||
|
self.assertDictEqual(result, {'retcode': 1, 'output': retry, 'task_id': 1234})
|
||||||
|
self.assertEqual(run.mock_calls,
|
||||||
|
[mock.call('cmd', can_fail=True, logfile=None),
|
||||||
|
mock.call(['koji', 'watch-task', '1234'], can_fail=True, logfile=None)])
|
||||||
|
|
||||||
|
@mock.patch('time.sleep')
|
||||||
|
@mock.patch('pungi.wrappers.kojiwrapper.run')
|
||||||
|
def test_disconnect_and_retry_multiple_times(self, run, sleep):
|
||||||
|
output = 'Created task: 1234\nerror: failed to connect\n'
|
||||||
|
retry = 'Created task: 1234\nOK\n'
|
||||||
|
run.side_effect = [(1, output), (1, output), (1, output), (0, retry)]
|
||||||
|
|
||||||
|
result = self.koji.run_blocking_cmd('cmd')
|
||||||
|
|
||||||
|
self.assertDictEqual(result, {'retcode': 0, 'output': retry, 'task_id': 1234})
|
||||||
|
self.assertEqual(run.mock_calls,
|
||||||
|
[mock.call('cmd', can_fail=True, logfile=None),
|
||||||
|
mock.call(['koji', 'watch-task', '1234'], can_fail=True, logfile=None),
|
||||||
|
mock.call(['koji', 'watch-task', '1234'], can_fail=True, logfile=None),
|
||||||
|
mock.call(['koji', 'watch-task', '1234'], can_fail=True, logfile=None)])
|
||||||
|
self.assertEqual(sleep.mock_calls,
|
||||||
|
[mock.call(i * 10) for i in range(1, 3)])
|
||||||
|
|
||||||
|
@mock.patch('time.sleep')
|
||||||
|
@mock.patch('pungi.wrappers.kojiwrapper.run')
|
||||||
|
def test_disconnect_and_never_reconnect(self, run, sleep):
|
||||||
|
output = 'Created task: 1234\nerror: failed to connect\n'
|
||||||
|
run.side_effect = [(1, output), (1, output), (1, output), (1, output)]
|
||||||
|
|
||||||
|
with self.assertRaises(RuntimeError) as ctx:
|
||||||
|
self.koji.run_blocking_cmd('cmd', max_retries=2)
|
||||||
|
|
||||||
|
self.assertIn('Failed to wait', ctx.exception.message)
|
||||||
|
self.assertEqual(run.mock_calls,
|
||||||
|
[mock.call('cmd', can_fail=True, logfile=None),
|
||||||
|
mock.call(['koji', 'watch-task', '1234'], can_fail=True, logfile=None),
|
||||||
|
mock.call(['koji', 'watch-task', '1234'], can_fail=True, logfile=None)])
|
||||||
|
self.assertEqual(sleep.mock_calls, [mock.call(i * 10) for i in range(1, 2)])
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
Loading…
Reference in New Issue
Block a user