Add tests using repos.git in blueprints

- Check final-kickstart.ks for the rpm source
- Check final-kickstart.ks for the rpm package name and version
- Make sure depsolve works
- Make sure errors from a bad repo are returned correctly
- Make sure errors from a bad reference are returned correctly

This moves _wait_for_status into a helper function so it can be shared
between the test classes.

(cherry picked from commit 8c2184d59e)
(cherry picked from commit 78f79a94ec)
This commit is contained in:
Brian C. Lane 2019-03-11 17:00:05 -07:00
parent 5042867d19
commit c5108bce9f

View File

@ -28,6 +28,7 @@ import unittest
from flask import json
import pytoml as toml
from ..lib import create_git_repo
from pylorax.api.config import configure, make_dnf_dirs, make_queue_dirs
from pylorax.api.errors import * # pylint: disable=wildcard-import
from pylorax.api.queue import start_queue_monitor
@ -58,6 +59,31 @@ def get_system_repo():
# Failed to find one, fall back to using base
return "base"
def _wait_for_status(self, uuid, wait_status):
"""Helper function that waits for a status
:param uuid: UUID of the build to check
:type uuid: str
:param wait_status: List of statuses to exit on
:type wait_status: list of str
:returns: True if status was found, False if it timed out
:rtype: bool
This will time out after 60 seconds
"""
start = time.time()
while True:
resp = self.server.get("/api/v0/compose/info/%s" % uuid)
data = json.loads(resp.data)
self.assertNotEqual(data, None)
queue_status = data.get("queue_status")
if queue_status in wait_status:
return True
if time.time() > start + 60:
return False
time.sleep(1)
class ServerTestCase(unittest.TestCase):
@classmethod
def setUpClass(self):
@ -814,30 +840,6 @@ class ServerTestCase(unittest.TestCase):
resp = self.server.get("/api/docs/modules.html")
self.assert_documentation(resp)
def wait_for_status(self, uuid, wait_status):
"""Helper function that waits for a status
:param uuid: UUID of the build to check
:type uuid: str
:param wait_status: List of statuses to exit on
:type wait_status: list of str
:returns: True if status was found, False if it timed out
:rtype: bool
This will time out after 60 seconds
"""
start = time.time()
while True:
resp = self.server.get("/api/v0/compose/info/%s" % uuid)
data = json.loads(resp.data)
self.assertNotEqual(data, None)
queue_status = data.get("queue_status")
if queue_status in wait_status:
return True
if time.time() > start + 60:
return False
time.sleep(1)
def test_compose_01_types(self):
"""Test the /api/v0/compose/types route"""
resp = self.server.get("/api/v0/compose/types")
@ -952,10 +954,10 @@ class ServerTestCase(unittest.TestCase):
self.assertEqual(build_id in ids, True, "Failed to add build to the queue")
# Wait for it to start
self.assertEqual(self.wait_for_status(build_id, ["RUNNING"]), True, "Failed to start test compose")
self.assertEqual(_wait_for_status(self, build_id, ["RUNNING"]), True, "Failed to start test compose")
# Wait for it to finish
self.assertEqual(self.wait_for_status(build_id, ["FAILED"]), True, "Failed to finish test compose")
self.assertEqual(_wait_for_status(self, build_id, ["FAILED"]), True, "Failed to finish test compose")
resp = self.server.get("/api/v0/compose/info/%s" % build_id)
data = json.loads(resp.data)
@ -993,7 +995,7 @@ class ServerTestCase(unittest.TestCase):
cancel_id = data["build_id"]
# Wait for it to start
self.assertEqual(self.wait_for_status(cancel_id, ["RUNNING"]), True, "Failed to start test compose")
self.assertEqual(_wait_for_status(self, cancel_id, ["RUNNING"]), True, "Failed to start test compose")
# Cancel the build
resp = self.server.delete("/api/v0/compose/cancel/%s" % cancel_id)
@ -1038,10 +1040,10 @@ class ServerTestCase(unittest.TestCase):
self.assertEqual(build_id in ids, True, "Failed to add build to the queue")
# Wait for it to start
self.assertEqual(self.wait_for_status(build_id, ["RUNNING"]), True, "Failed to start test compose")
self.assertEqual(_wait_for_status(self, build_id, ["RUNNING"]), True, "Failed to start test compose")
# Wait for it to finish
self.assertEqual(self.wait_for_status(build_id, ["FINISHED"]), True, "Failed to finish test compose")
self.assertEqual(_wait_for_status(self, build_id, ["FINISHED"]), True, "Failed to finish test compose")
resp = self.server.get("/api/v0/compose/info/%s" % build_id)
data = json.loads(resp.data)
@ -1139,10 +1141,10 @@ class ServerTestCase(unittest.TestCase):
self.assertEqual(build_id_fail in ids, True, "Failed to add build to the queue")
# Wait for it to start
self.assertEqual(self.wait_for_status(build_id_fail, ["RUNNING"]), True, "Failed to start test compose")
self.assertEqual(_wait_for_status(self, build_id_fail, ["RUNNING"]), True, "Failed to start test compose")
# Wait for it to finish
self.assertEqual(self.wait_for_status(build_id_fail, ["FAILED"]), True, "Failed to finish test compose")
self.assertEqual(_wait_for_status(self, build_id_fail, ["FAILED"]), True, "Failed to finish test compose")
# Fire up the other one
resp = self.server.post("/api/v0/compose?test=2",
@ -1161,10 +1163,10 @@ class ServerTestCase(unittest.TestCase):
self.assertEqual(build_id_success in ids, True, "Failed to add build to the queue")
# Wait for it to start
self.assertEqual(self.wait_for_status(build_id_success, ["RUNNING"]), True, "Failed to start test compose")
self.assertEqual(_wait_for_status(self, build_id_success, ["RUNNING"]), True, "Failed to start test compose")
# Wait for it to finish
self.assertEqual(self.wait_for_status(build_id_success, ["FINISHED"]), True, "Failed to finish test compose")
self.assertEqual(_wait_for_status(self, build_id_success, ["FINISHED"]), True, "Failed to finish test compose")
# Test that both composes appear in /api/v0/compose/status/*
resp = self.server.get("/api/v0/compose/status/*")
@ -1227,10 +1229,10 @@ class ServerTestCase(unittest.TestCase):
self.assertEqual(build_id in ids, True, "Failed to add build to the queue")
# Wait for it to start
self.assertEqual(self.wait_for_status(build_id, ["RUNNING"]), True, "Failed to start test compose")
self.assertEqual(_wait_for_status(self, build_id, ["RUNNING"]), True, "Failed to start test compose")
# Wait for it to finish
self.assertEqual(self.wait_for_status(build_id, ["FINISHED"]), True, "Failed to finish test compose")
self.assertEqual(_wait_for_status(self, build_id, ["FINISHED"]), True, "Failed to finish test compose")
resp = self.server.get("/api/v0/compose/info/%s" % build_id)
data = json.loads(resp.data)
@ -1695,3 +1697,207 @@ class RepoCacheTestCase(unittest.TestCase):
pkg_deps = data["deps"]["packages"]
print(pkg_deps)
self.assertTrue(any([True for d in pkg_deps if d["name"] == "fake-milhouse" and d["version"] == "1.0.2"]))
class GitRPMBlueprintTestCase(unittest.TestCase):
"""Test to make sure that a blueprint with repos.git entry works."""
@classmethod
def setUpClass(self):
(self.gitrpm_repo, self.test_results, self.first_commit) = create_git_repo()
repo_dir = tempfile.mkdtemp(prefix="lorax.test.repo.")
server.config["REPO_DIR"] = repo_dir
repo = open_or_create_repo(server.config["REPO_DIR"])
server.config["GITLOCK"] = GitLock(repo=repo, lock=Lock(), dir=repo_dir)
server.config["COMPOSER_CFG"] = configure(root_dir=repo_dir, test_config=True)
os.makedirs(joinpaths(server.config["COMPOSER_CFG"].get("composer", "share_dir"), "composer"))
errors = make_queue_dirs(server.config["COMPOSER_CFG"], os.getgid())
if errors:
raise RuntimeError("\n".join(errors))
make_dnf_dirs(server.config["COMPOSER_CFG"], os.getuid(), os.getgid())
# Modify fedora vs. rawhide tests when running on rawhide
if os.path.exists("/etc/yum.repos.d/fedora-rawhide.repo"):
self.rawhide = True
server.config["DNFLOCK"] = DNFLock(server.config["COMPOSER_CFG"], expire_secs=10)
# Include a message in /api/status output
server.config["TEMPLATE_ERRORS"] = ["Test message"]
server.config['TESTING'] = True
self.server = server.test_client()
self.repo_dir = repo_dir
# Copy the shared files over to the directory tree we are using
share_path = "./share/composer/"
for f in glob(joinpaths(share_path, "*")):
shutil.copy(f, joinpaths(server.config["COMPOSER_CFG"].get("composer", "share_dir"), "composer"))
start_queue_monitor(server.config["COMPOSER_CFG"], 0, 0)
@classmethod
def tearDownClass(self):
shutil.rmtree(server.config["REPO_DIR"])
shutil.rmtree(self.gitrpm_repo)
def test_01_depsolve_gitrpm(self):
"""Make sure that depsolve works with repos.git"""
# Note that the git rpm isn't built and added until a compose, so it won't be listed
test_blueprint = """
name = "git-rpm-blueprint-test"
description = "A test blueprint including a rpm created from git"
version = "0.0.1"
[[repos.git]]
rpmname="git-rpm-test"
rpmversion="1.0.0"
rpmrelease="1"
summary="Testing the git rpm code"
repo="file://%s"
ref="%s"
destination="/srv/testing-rpm/"
[[packages]]
name="openssh-server"
version="*"
""" % (self.gitrpm_repo, self.first_commit)
resp = self.server.post("/api/v0/blueprints/new",
data=test_blueprint,
content_type="text/x-toml")
data = json.loads(resp.data)
self.assertEqual(data, {"status":True})
resp = self.server.get("/api/v0/blueprints/depsolve/git-rpm-blueprint-test")
data = json.loads(resp.data)
self.assertNotEqual(data, None)
blueprints = data.get("blueprints")
self.assertNotEqual(blueprints, None)
self.assertEqual(len(blueprints), 1)
self.assertEqual(blueprints[0]["blueprint"]["name"], "git-rpm-blueprint-test")
self.assertFalse(data.get("errors"))
deps = blueprints[0]["dependencies"]
print(deps)
self.assertEqual(len(blueprints[0]["dependencies"]) > 10, True)
def test_02_compose_gitrpm(self):
"""Test that the compose includes the git rpm repo and rpm"""
test_compose = {"blueprint_name": "git-rpm-blueprint-test",
"compose_type": "tar",
"branch": "master"}
resp = self.server.post("/api/v0/compose?test=2",
data=json.dumps(test_compose),
content_type="application/json")
data = json.loads(resp.data)
self.assertNotEqual(data, None)
self.assertEqual(data["status"], True, "Failed to start test compose: %s" % data)
build_id = data["build_id"]
# Is it in the queue list (either new or run is fine, based on timing)
resp = self.server.get("/api/v0/compose/queue")
data = json.loads(resp.data)
self.assertNotEqual(data, None)
ids = [e["id"] for e in data["new"] + data["run"]]
self.assertEqual(build_id in ids, True, "Failed to add build to the queue")
# Wait for it to start
self.assertEqual(_wait_for_status(self, build_id, ["RUNNING"]), True, "Failed to start test compose")
# Wait for it to finish
self.assertEqual(_wait_for_status(self, build_id, ["FINISHED"]), True, "Failed to finish test compose")
resp = self.server.get("/api/v0/compose/info/%s" % build_id)
data = json.loads(resp.data)
self.assertNotEqual(data, None)
self.assertEqual(data["queue_status"], "FINISHED", "Build not in FINISHED state")
# Examine the final-kickstart.ks for the customizations
# A bit kludgy since it examines the filesystem directly, but that's better than unpacking the metadata
final_ks = open(joinpaths(self.repo_dir, "var/lib/lorax/composer/results/", build_id, "final-kickstart.ks")).read()
# Is the source in the kickstart?
self.assertTrue('repo --name="gitrpms"' in final_ks)
# Is the rpm in the kickstart?
self.assertTrue("git-rpm-test-1.0.0-1" in final_ks)
def test_03_compose_badref_gitrpm(self):
"""Make sure that compose with a bad reference returns an error"""
test_blueprint = """
name = "git-rpm-blueprint-test"
description = "A test blueprint including a rpm created from git"
version = "0.0.2"
[[repos.git]]
rpmname="git-rpm-test"
rpmversion="1.0.0"
rpmrelease="1"
summary="Testing the git rpm code"
repo="file://%s"
ref="nobody-saw-me-do-it"
destination="/srv/testing-rpm/"
[[packages]]
name="openssh-server"
version="*"
""" % self.gitrpm_repo
resp = self.server.post("/api/v0/blueprints/new",
data=test_blueprint,
content_type="text/x-toml")
data = json.loads(resp.data)
self.assertEqual(data, {"status":True})
test_compose = {"blueprint_name": "git-rpm-blueprint-test",
"compose_type": "tar",
"branch": "master"}
resp = self.server.post("/api/v0/compose?test=2",
data=json.dumps(test_compose),
content_type="application/json")
data = json.loads(resp.data)
self.assertNotEqual(data, None)
self.assertEqual(data["status"], False)
self.assertTrue("errors" in data and len(data["errors"]) > 0)
self.assertEqual(data["errors"][0]["id"], "BuildFailed")
def test_04_compose_badrepo_gitrpm(self):
"""Make sure that compose with a bad repo returns an error"""
test_blueprint = """
name = "git-rpm-blueprint-test"
description = "A test blueprint including a rpm created from git"
version = "0.0.3"
[[repos.git]]
rpmname="git-rpm-test"
rpmversion="1.0.0"
rpmrelease="1"
summary="Testing the git rpm code"
repo="file:///not/a/repo/path/"
ref="origin/master"
destination="/srv/testing-rpm/"
[[packages]]
name="openssh-server"
version="*"
"""
resp = self.server.post("/api/v0/blueprints/new",
data=test_blueprint,
content_type="text/x-toml")
data = json.loads(resp.data)
self.assertEqual(data, {"status":True})
test_compose = {"blueprint_name": "git-rpm-blueprint-test",
"compose_type": "tar",
"branch": "master"}
resp = self.server.post("/api/v0/compose?test=2",
data=json.dumps(test_compose),
content_type="application/json")
data = json.loads(resp.data)
self.assertNotEqual(data, None)
self.assertEqual(data["status"], False)
self.assertTrue("errors" in data and len(data["errors"]) > 0)
self.assertEqual(data["errors"][0]["id"], "BuildFailed")