134 lines
4.3 KiB
Python
134 lines
4.3 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
from unittest import mock
|
|
|
|
try:
|
|
import unittest2 as unittest
|
|
except ImportError:
|
|
import unittest
|
|
import random
|
|
import time
|
|
|
|
from pungi.phases import weaver
|
|
from tests.helpers import DummyCompose, boom
|
|
|
|
|
|
class TestWeaver(unittest.TestCase):
|
|
def setUp(self):
|
|
# prepare 6 phase mock-objects.
|
|
for test_phase_number in range(1, 7):
|
|
# This is equvalent to:
|
|
# self.pX = mock.Mock()
|
|
# self.pX.name = "phase X"
|
|
# self.pX.start = default_method
|
|
test_phase_name = "p" + repr(test_phase_number)
|
|
tmp = mock.Mock()
|
|
tmp.name = "phase %d" % test_phase_number
|
|
tmp.start.side_effect = self.method_regular
|
|
setattr(self, test_phase_name, tmp)
|
|
|
|
self.compose = DummyCompose(None, {})
|
|
|
|
def method_regular(self):
|
|
"""
|
|
It only have to cause some delay (tens of miliseconds).
|
|
Delay is needed for threads that has enough time to start.
|
|
"""
|
|
multiplier = random.sample(range(1, 10), 1)
|
|
time.sleep(multiplier[0] * 0.01)
|
|
|
|
def method_with_exception(self):
|
|
self.method_regular() # make some delay
|
|
boom() # throw exception
|
|
|
|
def assertFinalized(self, p):
|
|
self.assertEqual(p.mock_calls, [mock.call.start(), mock.call.stop()])
|
|
|
|
def assertInterrupted(self, p):
|
|
self.assertEqual(p.mock_calls, [mock.call.start()])
|
|
|
|
def assertMissed(self, p):
|
|
self.assertEqual(p.mock_calls, [])
|
|
|
|
def test_parallel(self):
|
|
phases_schema = (self.p1, self.p2)
|
|
weaver_phase = weaver.WeaverPhase(self.compose, phases_schema)
|
|
weaver_phase.start()
|
|
weaver_phase.stop()
|
|
|
|
self.assertFinalized(self.p1)
|
|
self.assertFinalized(self.p2)
|
|
|
|
def test_pipeline(self):
|
|
phases_schema = ((self.p1, self.p2),)
|
|
weaver_phase = weaver.WeaverPhase(self.compose, phases_schema)
|
|
weaver_phase.start()
|
|
weaver_phase.stop()
|
|
|
|
self.assertFinalized(self.p1)
|
|
self.assertFinalized(self.p2)
|
|
|
|
def test_stop_on_failure(self):
|
|
self.p2.start.side_effect = self.method_with_exception
|
|
|
|
phases_schema = ((self.p1, self.p2, self.p3),) # one pipeline
|
|
weaver_phase = weaver.WeaverPhase(self.compose, phases_schema)
|
|
with self.assertRaises(Exception) as ctx:
|
|
weaver_phase.start()
|
|
weaver_phase.stop()
|
|
|
|
self.assertEqual("BOOM", str(ctx.exception))
|
|
self.assertFinalized(self.p1)
|
|
self.assertInterrupted(self.p2)
|
|
self.assertMissed(self.p3)
|
|
|
|
def test_parallel_stop_on_failure(self):
|
|
self.p2.start.side_effect = self.method_with_exception
|
|
|
|
phases_schema = (self.p1, self.p2, self.p3) # one pipeline
|
|
weaver_phase = weaver.WeaverPhase(self.compose, phases_schema)
|
|
with self.assertRaises(Exception) as ctx:
|
|
weaver_phase.start()
|
|
weaver_phase.stop()
|
|
|
|
self.assertEqual("BOOM", str(ctx.exception))
|
|
self.assertFinalized(self.p1)
|
|
self.assertInterrupted(self.p2)
|
|
self.assertFinalized(self.p3)
|
|
|
|
def test_multiple_fail(self):
|
|
self.p2.start.side_effect = self.method_with_exception
|
|
self.p3.start.side_effect = self.method_with_exception
|
|
|
|
phases_schema = ((self.p1, self.p2, self.p3),) # one pipeline
|
|
weaver_phase = weaver.WeaverPhase(self.compose, phases_schema)
|
|
with self.assertRaises(Exception) as ctx:
|
|
weaver_phase.start()
|
|
weaver_phase.stop()
|
|
|
|
self.assertEqual("BOOM", str(ctx.exception))
|
|
self.assertFinalized(self.p1)
|
|
self.assertInterrupted(self.p2)
|
|
self.assertMissed(self.p3)
|
|
|
|
def test_multi_pipeline(self):
|
|
self.p2.start.side_effect = self.method_with_exception
|
|
phases_schema = (
|
|
self.p1,
|
|
(self.p2, self.p3, self.p4),
|
|
(self.p5, self.p6),
|
|
)
|
|
|
|
weaver_phase = weaver.WeaverPhase(self.compose, phases_schema)
|
|
with self.assertRaises(Exception) as ctx:
|
|
weaver_phase.start()
|
|
weaver_phase.stop()
|
|
|
|
self.assertEqual("BOOM", str(ctx.exception))
|
|
self.assertFinalized(self.p1)
|
|
self.assertInterrupted(self.p2)
|
|
self.assertMissed(self.p3)
|
|
self.assertMissed(self.p4)
|
|
self.assertFinalized(self.p5)
|
|
self.assertFinalized(self.p6)
|