lorax/test/loraxtest.py

255 lines
9.0 KiB
Python

#!/usr/bin/python3
import argparse
import os
import subprocess
import sys
import time
import traceback
import unittest
# import Cockpit's machinery for test VMs and its browser test API
sys.path.append(os.path.join(os.path.dirname(__file__), "../bots/machine"))
import testvm # pylint: disable=import-error
import machine_core.exceptions as exceptions # pylint: disable=import-error
import machine_core.timeout as timeoutlib # pylint: disable=import-error
#pylint: disable=subprocess-run-check
def print_exception(etype, value, tb):
# only include relevant lines
limit = 0
while tb and '__unittest' in tb.tb_frame.f_globals:
limit += 1
tb = tb.tb_next
traceback.print_exception(etype, value, tb, limit=limit)
class VirtMachineTestCase(unittest.TestCase):
sit = False
network = None
machine = None
ssh_command = None
boot_id = None
def setUpTestMachine(self, image=testvm.DEFAULT_IMAGE, identity_file=None):
self.network = testvm.VirtNetwork(0)
# default overlay directory is not big enough to hold the large composed trees; thus put overlay into /var/tmp/
self.machine = testvm.VirtMachine(image, networking=self.network.host(),
cpus=2, memory_mb=2048,
overlay_dir="/var/tmp",
identity_file=identity_file)
print("Starting virtual machine '{}'".format(image))
self.machine.start()
# Modified wait_boot that doesn't check for /run/nologin
self.wait_boot()
# run a command to force starting the SSH master
self.machine.execute("uptime")
self.ssh_command = ["ssh", "-o", "ControlPath=" + self.machine.ssh_master,
"-p", self.machine.ssh_port,
self.machine.ssh_user + "@" + self.machine.ssh_address]
print("Machine is up. Connect to it via:")
print(" ".join(self.ssh_command))
print()
def tearDownTestMachine(self):
if os.environ.get('TEST_ATTACHMENTS'):
self.machine.download_dir('/var/log/tests', os.environ.get('TEST_ATTACHMENTS'))
# Peek into internal data structure, because there's no way to get the
# TestResult at this point. `errors` is a list of tuples (method, error)
errors = list(e[1] for e in self._outcome.errors if e[1])
if errors and self.sit:
for e in errors:
print_exception(*e)
print()
print(" ".join(self.ssh_command))
input("Press RETURN to continue...")
self.machine.stop()
def execute(self, command, **args):
"""Execute a command on the test machine.
**args and return value are the same as those for subprocess.run().
"""
return subprocess.run(self.ssh_command + command, **args)
def wait_boot(self, timeout_sec=120):
"""Wait until logging in as root works
The cockpit tests assume logging in as non-root, but that isn't always true
when testing things like boot.iso images. So this checks for ssh login without
checking for /run/nologin
raises an error if there was a timeout/failure to connect
"""
# This is mostly a copy of the SSHConnection wait_boot and wait_user_login
start_time = time.time()
boot_id = None
while (time.time() - start_time) < timeout_sec:
if self.machine.wait_execute(timeout_sec=15):
tries_left = 60
while (tries_left > 0):
try:
with timeoutlib.Timeout(seconds=30):
boot_id = self.machine.execute("cat /proc/sys/kernel/random/boot_id", direct=True)
break
except subprocess.CalledProcessError:
pass
except RuntimeError:
# timeout; assume that ssh just went down during reboot, go back to wait_boot()
break
tries_left = tries_left - 1
time.sleep(1)
else:
raise exceptions.Failure("Timed out waiting for boot_id")
if boot_id:
break
if not boot_id:
raise exceptions.Failure("Unable to reach machine {0} via ssh: {1}:{2}".format(self.machine.label, self.machine.ssh_address, self.machine.ssh_port))
self.boot_id = boot_id
class TestCase(VirtMachineTestCase):
def setUp(self):
self.setUpTestMachine()
# Upload the contents of the ./tests/ directory to the machine (it must have beakerlib already installed)
self.machine.upload(["../tests"], "/")
print("Waiting for backend to become ready...")
curl_command = ["curl", "--max-time", "360",
"--silent",
"--unix-socket", "/run/weldr/api.socket",
"http://localhost/api/status"]
r = subprocess.run(self.ssh_command + curl_command, stdout=subprocess.DEVNULL)
self.assertEqual(r.returncode, 0)
def tearDown(self):
self.tearDownVirt()
def tearDownVirt(self, virt_dir=None, local_dir=None):
if os.environ.get('TEST_ATTACHMENTS'):
self.machine.download_dir('/var/log/tests', os.environ.get('TEST_ATTACHMENTS'))
if virt_dir and local_dir:
self.machine.download_dir(virt_dir, local_dir)
self.tearDownTestMachine()
return local_dir
class TestResult(unittest.TestResult):
def name(self, test):
name = test.id().replace("__main__.", "")
if test.shortDescription():
name += ": " + test.shortDescription()
return name
def startTest(self, test):
super().startTest(test)
print("# ----------------------------------------------------------------------")
print("# ", self.name(test))
print("", flush=True)
def stopTest(self, test):
print(flush=True)
def addSuccess(self, test):
super().addSuccess(test)
print("ok {} {}".format(self.testsRun, self.name(test)))
def addError(self, test, err):
super().addError(test, err)
traceback.print_exception(*err, file=sys.stdout)
print("not ok {} {}".format(self.testsRun, self.name(test)))
def addFailure(self, test, err):
super().addError(test, err)
traceback.print_exception(*err, file=sys.stdout)
print("not ok {} {}".format(self.testsRun, self.name(test)))
def addSkip(self, test, reason):
super().addSkip(test, reason)
print("ok {} {} # SKIP {}".format(self.testsRun, self.name(test), reason))
def addExpectedFailure(self, test, err):
super().addExpectedFailure(test, err)
print("ok {} {}".format(self.testsRun, self.name(test)))
def addUnexpectedSuccess(self, test):
super().addUnexpectedSuccess(test)
print("not ok {} {}".format(self.testsRun, self.name(test)))
class TestRunner(object):
"""A test runner that (in combination with TestResult) outputs
results in a way that cockpit's log.html can read and format them.
"""
def __init__(self, failfast=False):
self.failfast = failfast
def run(self, testable):
result = TestResult()
result.failfast = self.failfast
result.startTestRun()
count = testable.countTestCases()
print("1.." + str(count))
try:
testable(result)
finally:
result.stopTestRun()
return result
def print_tests(tests):
for test in tests:
if isinstance(test, unittest.TestSuite):
print_tests(test)
elif isinstance(test, unittest.loader._FailedTest):
name = test.id().replace("unittest.loader._FailedTest.", "")
print("Error: '{}' does not match a test".format(name), file=sys.stderr)
else:
print(test.id().replace("__main__.", ""))
def main():
parser = argparse.ArgumentParser()
parser.add_argument("tests", nargs="*", help="List of tests modules, classes, and methods")
parser.add_argument("-l", "--list", action="store_true", help="Print the list of tests that would be executed")
parser.add_argument("-s", "--sit", action="store_true", help="Halt test execution (but keep VM running) when a test fails")
args = parser.parse_args()
TestCase.sit = args.sit
module = __import__("__main__")
if args.tests:
tests = unittest.defaultTestLoader.loadTestsFromNames(args.tests, module)
else:
tests = unittest.defaultTestLoader.loadTestsFromModule(module)
if args.list:
print_tests(tests)
return 0
runner = TestRunner(failfast=args.sit)
result = runner.run(tests)
if tests.countTestCases() != result.testsRun:
print("Error: unexpected number of tests were run", file=sys.stderr)
sys.exit(1)
sys.exit(not result.wasSuccessful())