diff --git a/src/pylorax/api/queue.py b/src/pylorax/api/queue.py index b6344e26..2236d423 100644 --- a/src/pylorax/api/queue.py +++ b/src/pylorax/api/queue.py @@ -36,6 +36,47 @@ from pylorax.base import DataHolder from pylorax.creator import run_creator from pylorax.sysutils import joinpaths +def check_queues(cfg): + """Check to make sure the new and run queue symlinks are correct + + :param cfg: Configuration settings + :type cfg: DataHolder + + Also check all of the existing results and make sure any with WAITING + set in STATUS have a symlink in queue/new/ + """ + # Remove broken symlinks from the new and run queues + queue_symlinks = glob(joinpaths(cfg.composer_dir, "queue/new/*")) + \ + glob(joinpaths(cfg.composer_dir, "queue/run/*")) + for link in queue_symlinks: + if not os.path.isdir(os.path.realpath(link)): + log.info("Removing broken symlink %s", link) + os.unlink(link) + + # Write FAILED to the STATUS of any run queue symlinks and remove them + for link in glob(joinpaths(cfg.composer_dir, "queue/run/*")): + log.info("Setting build %s to FAILED, and removing symlink from queue/run/", os.path.basename(link)) + open(joinpaths(link, "STATUS"), "w").write("FAILED\n") + os.unlink(link) + + # Check results STATUS messages + # - If STATUS is missing, set it to FAILED + # - RUNNING should be changed to FAILED + # - WAITING should have a symlink in the new queue + for link in glob(joinpaths(cfg.composer_dir, "results/*")): + if not os.path.exists(joinpaths(link, "STATUS")): + open(joinpaths(link, "STATUS"), "w").write("FAILED\n") + continue + + status = open(joinpaths(link, "STATUS")).read().strip() + if status == "RUNNING": + log.info("Setting build %s to FAILED", os.path.basename(link)) + open(joinpaths(link, "STATUS"), "w").write("FAILED\n") + elif status == "WAITING": + if not os.path.islink(joinpaths(cfg.composer_dir, "queue/new/", os.path.basename(link))): + log.info("Creating missing symlink to new build %s", os.path.basename(link)) + os.symlink(link, joinpaths(cfg.composer_dir, "queue/new/", os.path.basename(link))) + def start_queue_monitor(cfg, uid, gid): """Start the queue monitor as a mp process @@ -69,7 +110,7 @@ def monitor(cfg): compose is finished) the symlink will be moved into ./queue/run/ and a STATUS file will be created in the results directory. - STATUS can contain one of: RUNNING, FINISHED, FAILED + STATUS can contain one of: WAITING, RUNNING, FINISHED, FAILED If the system is restarted while a compose is running it will move any old symlinks from ./queue/run/ to ./queue/new/ and rerun them. @@ -78,13 +119,7 @@ def monitor(cfg): """Sort the queue entries by their mtime, not their names""" return os.stat(joinpaths(cfg.composer_dir, "queue/new", uuid)).st_mtime - # Move any symlinks in the run queue back to the new queue - for link in os.listdir(joinpaths(cfg.composer_dir, "queue/run")): - src = joinpaths(cfg.composer_dir, "queue/run", link) - dst = joinpaths(cfg.composer_dir, "queue/new", link) - os.rename(src, dst) - log.debug("Moved unfinished compose %s back to new state", src) - + check_queues(cfg) while True: uuids = sorted(os.listdir(joinpaths(cfg.composer_dir, "queue/new")), key=queue_sort) diff --git a/tests/pylorax/test_queue.py b/tests/pylorax/test_queue.py new file mode 100644 index 00000000..10dc001d --- /dev/null +++ b/tests/pylorax/test_queue.py @@ -0,0 +1,108 @@ +# +# Copyright (C) 2018 Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +import os +import shutil +import tempfile +import unittest +from uuid import uuid4 + +from pylorax.api.config import configure, make_queue_dirs +from pylorax.api.queue import check_queues +from pylorax.base import DataHolder +from pylorax.sysutils import joinpaths + + +class QueueTestCase(unittest.TestCase): + @classmethod + def setUpClass(self): + self.maxDiff = None + self.config = dict() + + repo_dir = tempfile.mkdtemp(prefix="lorax.test.repo.") + self.config["REPO_DIR"] = repo_dir + + self.config["COMPOSER_CFG"] = configure(root_dir=repo_dir, test_config=True) + os.makedirs(joinpaths(self.config["COMPOSER_CFG"].get("composer", "share_dir"), "composer")) + errors = make_queue_dirs(self.config["COMPOSER_CFG"], os.getgid()) + if errors: + raise RuntimeError("\n".join(errors)) + + lib_dir = self.config["COMPOSER_CFG"].get("composer", "lib_dir") + share_dir = self.config["COMPOSER_CFG"].get("composer", "share_dir") + tmp = self.config["COMPOSER_CFG"].get("composer", "tmp") + self.monitor_cfg = DataHolder(composer_dir=lib_dir, share_dir=share_dir, uid=0, gid=0, tmp=tmp) + + @classmethod + def tearDownClass(self): + shutil.rmtree(self.config["REPO_DIR"]) + + def test_broken_run_symlinks(self): + """Put a broken symlink into queue/run and make sure it is removed""" + uuid = str(uuid4()) + os.symlink(joinpaths(self.monitor_cfg.composer_dir, "results", uuid), + joinpaths(self.monitor_cfg.composer_dir, "queue/run", uuid)) + self.assertTrue(os.path.islink(joinpaths(self.monitor_cfg.composer_dir, "queue/run", uuid))) + check_queues(self.monitor_cfg) + self.assertFalse(os.path.islink(joinpaths(self.monitor_cfg.composer_dir, "queue/run", uuid))) + + def test_broken_new_symlinks(self): + """Put a broken symlink into queue/new and make sure it is removed""" + uuid = str(uuid4()) + os.symlink(joinpaths(self.monitor_cfg.composer_dir, "results", uuid), + joinpaths(self.monitor_cfg.composer_dir, "queue/new", uuid)) + self.assertTrue(os.path.islink(joinpaths(self.monitor_cfg.composer_dir, "queue/new", uuid))) + check_queues(self.monitor_cfg) + self.assertFalse(os.path.islink(joinpaths(self.monitor_cfg.composer_dir, "queue/new", uuid))) + + def test_stale_run_symlink(self): + """Put a valid symlink in run, make sure it is set to FAILED and removed""" + uuid = str(uuid4()) + os.makedirs(joinpaths(self.monitor_cfg.composer_dir, "results", uuid)) + os.symlink(joinpaths(self.monitor_cfg.composer_dir, "results", uuid), + joinpaths(self.monitor_cfg.composer_dir, "queue/run", uuid)) + self.assertTrue(os.path.islink(joinpaths(self.monitor_cfg.composer_dir, "queue/run", uuid))) + check_queues(self.monitor_cfg) + self.assertFalse(os.path.islink(joinpaths(self.monitor_cfg.composer_dir, "queue/run", uuid))) + status = open(joinpaths(self.monitor_cfg.composer_dir, "results", uuid, "STATUS")).read().strip() + self.assertEqual(status, "FAILED") + + def test_missing_status(self): + """Create a results dir w/o STATUS and confirm it is set to FAILED""" + uuid = str(uuid4()) + os.makedirs(joinpaths(self.monitor_cfg.composer_dir, "results", uuid)) + check_queues(self.monitor_cfg) + status = open(joinpaths(self.monitor_cfg.composer_dir, "results", uuid, "STATUS")).read().strip() + self.assertEqual(status, "FAILED") + + def test_running_status(self): + """Create a results dir with STATUS set to RUNNING and confirm it is set to FAILED""" + uuid = str(uuid4()) + os.makedirs(joinpaths(self.monitor_cfg.composer_dir, "results", uuid)) + open(joinpaths(self.monitor_cfg.composer_dir, "results", uuid, "STATUS"), "w").write("RUNNING\n") + check_queues(self.monitor_cfg) + status = open(joinpaths(self.monitor_cfg.composer_dir, "results", uuid, "STATUS")).read().strip() + self.assertEqual(status, "FAILED") + + def test_missing_new_symlink(self): + """Create a results dir with STATUS set to WAITING and confirm a symlink is created in queue/new""" + uuid = str(uuid4()) + os.makedirs(joinpaths(self.monitor_cfg.composer_dir, "results", uuid)) + open(joinpaths(self.monitor_cfg.composer_dir, "results", uuid, "STATUS"), "w").write("WAITING\n") + check_queues(self.monitor_cfg) + status = open(joinpaths(self.monitor_cfg.composer_dir, "results", uuid, "STATUS")).read().strip() + self.assertEqual(status, "WAITING") + self.assertTrue(os.path.islink(joinpaths(self.monitor_cfg.composer_dir, "queue/new", uuid)))