pungi/tests/test_phase_base.py

134 lines
4.3 KiB
Python

# -*- coding: utf-8 -*-
from unittest import mock
try:
import unittest2 as unittest
except ImportError:
import unittest
import random
import time
from pungi.phases import weaver
from tests.helpers import DummyCompose, boom
class TestWeaver(unittest.TestCase):
def setUp(self):
# prepare 6 phase mock-objects.
for test_phase_number in range(1, 7):
# This is equvalent to:
# self.pX = mock.Mock()
# self.pX.name = "phase X"
# self.pX.start = default_method
test_phase_name = "p" + repr(test_phase_number)
tmp = mock.Mock()
tmp.name = "phase %d" % test_phase_number
tmp.start.side_effect = self.method_regular
setattr(self, test_phase_name, tmp)
self.compose = DummyCompose(None, {})
def method_regular(self):
"""
It only have to cause some delay (tens of miliseconds).
Delay is needed for threads that has enough time to start.
"""
multiplier = random.sample(range(1, 10), 1)
time.sleep(multiplier[0] * 0.01)
def method_with_exception(self):
self.method_regular() # make some delay
boom() # throw exception
def assertFinalized(self, p):
self.assertEqual(p.mock_calls, [mock.call.start(), mock.call.stop()])
def assertInterrupted(self, p):
self.assertEqual(p.mock_calls, [mock.call.start()])
def assertMissed(self, p):
self.assertEqual(p.mock_calls, [])
def test_parallel(self):
phases_schema = (self.p1, self.p2)
weaver_phase = weaver.WeaverPhase(self.compose, phases_schema)
weaver_phase.start()
weaver_phase.stop()
self.assertFinalized(self.p1)
self.assertFinalized(self.p2)
def test_pipeline(self):
phases_schema = ((self.p1, self.p2),)
weaver_phase = weaver.WeaverPhase(self.compose, phases_schema)
weaver_phase.start()
weaver_phase.stop()
self.assertFinalized(self.p1)
self.assertFinalized(self.p2)
def test_stop_on_failure(self):
self.p2.start.side_effect = self.method_with_exception
phases_schema = ((self.p1, self.p2, self.p3),) # one pipeline
weaver_phase = weaver.WeaverPhase(self.compose, phases_schema)
with self.assertRaises(Exception) as ctx:
weaver_phase.start()
weaver_phase.stop()
self.assertEqual("BOOM", str(ctx.exception))
self.assertFinalized(self.p1)
self.assertInterrupted(self.p2)
self.assertMissed(self.p3)
def test_parallel_stop_on_failure(self):
self.p2.start.side_effect = self.method_with_exception
phases_schema = (self.p1, self.p2, self.p3) # one pipeline
weaver_phase = weaver.WeaverPhase(self.compose, phases_schema)
with self.assertRaises(Exception) as ctx:
weaver_phase.start()
weaver_phase.stop()
self.assertEqual("BOOM", str(ctx.exception))
self.assertFinalized(self.p1)
self.assertInterrupted(self.p2)
self.assertFinalized(self.p3)
def test_multiple_fail(self):
self.p2.start.side_effect = self.method_with_exception
self.p3.start.side_effect = self.method_with_exception
phases_schema = ((self.p1, self.p2, self.p3),) # one pipeline
weaver_phase = weaver.WeaverPhase(self.compose, phases_schema)
with self.assertRaises(Exception) as ctx:
weaver_phase.start()
weaver_phase.stop()
self.assertEqual("BOOM", str(ctx.exception))
self.assertFinalized(self.p1)
self.assertInterrupted(self.p2)
self.assertMissed(self.p3)
def test_multi_pipeline(self):
self.p2.start.side_effect = self.method_with_exception
phases_schema = (
self.p1,
(self.p2, self.p3, self.p4),
(self.p5, self.p6),
)
weaver_phase = weaver.WeaverPhase(self.compose, phases_schema)
with self.assertRaises(Exception) as ctx:
weaver_phase.start()
weaver_phase.stop()
self.assertEqual("BOOM", str(ctx.exception))
self.assertFinalized(self.p1)
self.assertInterrupted(self.p2)
self.assertMissed(self.p3)
self.assertMissed(self.p4)
self.assertFinalized(self.p5)
self.assertFinalized(self.p6)