tests: Add tests for /compose/uploads/schedule

Test it with a profile, and with one-time use settings. It does not
actually upload anything, it only tests that the upload is scheduled.
This commit is contained in:
Brian C. Lane 2019-09-06 15:39:04 -07:00
parent 0eda252d61
commit b307ff0430

View File

@ -3625,8 +3625,74 @@ class ServerAPIV1TestCase(unittest.TestCase):
self.assertEqual(data["status"], True)
self.assertEqual(data["upload_id"], upload_id)
def test_upload_05_uploads_schedule(self):
"""Test schedule upload and upload delete"""
def test_upload_04_providers_delete(self):
# Create a test compose
test_compose = {"blueprint_name": "example-custom-base",
"compose_type": "vhd",
"branch": "master"}
resp = self.server.post("/api/v1/compose?test=2",
data=json.dumps(test_compose),
content_type="application/json")
data = json.loads(resp.data)
self.assertNotEqual(data, None)
self.assertEqual(data["status"], True, "Failed to start test compose: %s" % data)
build_id = data["build_id"]
# Is it in the queue list (either new or run is fine, based on timing)
resp = self.server.get("/api/v1/compose/queue")
data = json.loads(resp.data)
self.assertNotEqual(data, None)
ids = [e["id"] for e in data["new"] + data["run"]]
self.assertEqual(build_id in ids, True, "Failed to add build to the queue")
# V1 API should have the uploads details in the results
uploads = all("uploads" in e for e in data["new"] + data["run"])
self.assertTrue(uploads, "V1 API should include 'uploads' field")
# Wait for it to start
self.assertEqual(_wait_for_status(self, build_id, ["RUNNING"], api=1), True, "Failed to start test compose")
# Wait for it to finish
self.assertEqual(_wait_for_status(self, build_id, ["FINISHED"], api=1), True, "Failed to finish test compose")
resp = self.server.get("/api/v1/compose/info/%s" % build_id)
data = json.loads(resp.data)
self.assertNotEqual(data, None)
self.assertEqual(data["queue_status"], "FINISHED", "Build not in FINISHED state")
# Schedule an upload of this image using settings
upload = {
"image_name": "Azure custom-base",
"provider": "azure",
"settings": test_profiles["azure"][1]
}
resp = self.server.post("/api/v1/compose/uploads/schedule/%s" % build_id,
data=json.dumps(upload),
content_type="application/json")
data = json.loads(resp.data)
self.assertNotEqual(data, None)
self.assertEqual(data["status"], True, "Failed to schedule upload: %s" % data)
self.assertTrue(len(data["upload_id"]) > 0)
# Schedule an upload of this image using settings
upload = {
"image_name": "Azure custom-base",
"provider": "azure",
"profile": test_profiles["azure"][0]
}
resp = self.server.post("/api/v1/compose/uploads/schedule/%s" % build_id,
data=json.dumps(upload),
content_type="application/json")
data = json.loads(resp.data)
self.assertNotEqual(data, None)
self.assertEqual(data["status"], True, "Failed to schedule upload: %s" % data)
self.assertTrue(len(data["upload_id"]) > 0)
def test_upload_06_providers_delete(self):
"""Delete a profile from a provider"""
# /api/v1/upload/providers/delete/provider/profile
resp = self.server.delete("/api/v1/upload/providers/delete/azure/%s" % test_profiles["azure"][0])