From f0bac40d7f05ae6e25bd3ebaf7d7fbc6c4034cd5 Mon Sep 17 00:00:00 2001 From: "Brian C. Lane" Date: Fri, 30 Nov 2018 15:45:18 -0800 Subject: [PATCH] lorax-composer: Check the queue and results at startup If the system ran out of space, or was rebooted unexpectedly, the state of the queue symlinks, or the results STATUS files may be inconsistent. This checks them and: * Removes broken symlinks from queue/new and queue/run * Removes symlinks from run and sets the build to FAILED * Sets builds w/o a STATUS to FAILED * Sets builds with STATUS of RUNNING to FAILED * Creates missing queue/new symlinks to results with STATUS of WAITING So, any builds that were running during the reboot will be FAILED, and any that were waiting to be started will be started upon rebooting. Resolves: rhbz#1647985 (cherry picked from commit 4dd9004d13d8dca461ac4120339bfc1442f254ff) --- src/pylorax/api/queue.py | 51 ++++++++++++++--- tests/pylorax/test_queue.py | 108 ++++++++++++++++++++++++++++++++++++ 2 files changed, 151 insertions(+), 8 deletions(-) create mode 100644 tests/pylorax/test_queue.py diff --git a/src/pylorax/api/queue.py b/src/pylorax/api/queue.py index b6344e26..2236d423 100644 --- a/src/pylorax/api/queue.py +++ b/src/pylorax/api/queue.py @@ -36,6 +36,47 @@ from pylorax.base import DataHolder from pylorax.creator import run_creator from pylorax.sysutils import joinpaths +def check_queues(cfg): + """Check to make sure the new and run queue symlinks are correct + + :param cfg: Configuration settings + :type cfg: DataHolder + + Also check all of the existing results and make sure any with WAITING + set in STATUS have a symlink in queue/new/ + """ + # Remove broken symlinks from the new and run queues + queue_symlinks = glob(joinpaths(cfg.composer_dir, "queue/new/*")) + \ + glob(joinpaths(cfg.composer_dir, "queue/run/*")) + for link in queue_symlinks: + if not os.path.isdir(os.path.realpath(link)): + log.info("Removing broken symlink %s", link) + os.unlink(link) + + # Write FAILED to the STATUS of any run queue symlinks and remove them + for link in glob(joinpaths(cfg.composer_dir, "queue/run/*")): + log.info("Setting build %s to FAILED, and removing symlink from queue/run/", os.path.basename(link)) + open(joinpaths(link, "STATUS"), "w").write("FAILED\n") + os.unlink(link) + + # Check results STATUS messages + # - If STATUS is missing, set it to FAILED + # - RUNNING should be changed to FAILED + # - WAITING should have a symlink in the new queue + for link in glob(joinpaths(cfg.composer_dir, "results/*")): + if not os.path.exists(joinpaths(link, "STATUS")): + open(joinpaths(link, "STATUS"), "w").write("FAILED\n") + continue + + status = open(joinpaths(link, "STATUS")).read().strip() + if status == "RUNNING": + log.info("Setting build %s to FAILED", os.path.basename(link)) + open(joinpaths(link, "STATUS"), "w").write("FAILED\n") + elif status == "WAITING": + if not os.path.islink(joinpaths(cfg.composer_dir, "queue/new/", os.path.basename(link))): + log.info("Creating missing symlink to new build %s", os.path.basename(link)) + os.symlink(link, joinpaths(cfg.composer_dir, "queue/new/", os.path.basename(link))) + def start_queue_monitor(cfg, uid, gid): """Start the queue monitor as a mp process @@ -69,7 +110,7 @@ def monitor(cfg): compose is finished) the symlink will be moved into ./queue/run/ and a STATUS file will be created in the results directory. - STATUS can contain one of: RUNNING, FINISHED, FAILED + STATUS can contain one of: WAITING, RUNNING, FINISHED, FAILED If the system is restarted while a compose is running it will move any old symlinks from ./queue/run/ to ./queue/new/ and rerun them. @@ -78,13 +119,7 @@ def monitor(cfg): """Sort the queue entries by their mtime, not their names""" return os.stat(joinpaths(cfg.composer_dir, "queue/new", uuid)).st_mtime - # Move any symlinks in the run queue back to the new queue - for link in os.listdir(joinpaths(cfg.composer_dir, "queue/run")): - src = joinpaths(cfg.composer_dir, "queue/run", link) - dst = joinpaths(cfg.composer_dir, "queue/new", link) - os.rename(src, dst) - log.debug("Moved unfinished compose %s back to new state", src) - + check_queues(cfg) while True: uuids = sorted(os.listdir(joinpaths(cfg.composer_dir, "queue/new")), key=queue_sort) diff --git a/tests/pylorax/test_queue.py b/tests/pylorax/test_queue.py new file mode 100644 index 00000000..10dc001d --- /dev/null +++ b/tests/pylorax/test_queue.py @@ -0,0 +1,108 @@ +# +# Copyright (C) 2018 Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +import os +import shutil +import tempfile +import unittest +from uuid import uuid4 + +from pylorax.api.config import configure, make_queue_dirs +from pylorax.api.queue import check_queues +from pylorax.base import DataHolder +from pylorax.sysutils import joinpaths + + +class QueueTestCase(unittest.TestCase): + @classmethod + def setUpClass(self): + self.maxDiff = None + self.config = dict() + + repo_dir = tempfile.mkdtemp(prefix="lorax.test.repo.") + self.config["REPO_DIR"] = repo_dir + + self.config["COMPOSER_CFG"] = configure(root_dir=repo_dir, test_config=True) + os.makedirs(joinpaths(self.config["COMPOSER_CFG"].get("composer", "share_dir"), "composer")) + errors = make_queue_dirs(self.config["COMPOSER_CFG"], os.getgid()) + if errors: + raise RuntimeError("\n".join(errors)) + + lib_dir = self.config["COMPOSER_CFG"].get("composer", "lib_dir") + share_dir = self.config["COMPOSER_CFG"].get("composer", "share_dir") + tmp = self.config["COMPOSER_CFG"].get("composer", "tmp") + self.monitor_cfg = DataHolder(composer_dir=lib_dir, share_dir=share_dir, uid=0, gid=0, tmp=tmp) + + @classmethod + def tearDownClass(self): + shutil.rmtree(self.config["REPO_DIR"]) + + def test_broken_run_symlinks(self): + """Put a broken symlink into queue/run and make sure it is removed""" + uuid = str(uuid4()) + os.symlink(joinpaths(self.monitor_cfg.composer_dir, "results", uuid), + joinpaths(self.monitor_cfg.composer_dir, "queue/run", uuid)) + self.assertTrue(os.path.islink(joinpaths(self.monitor_cfg.composer_dir, "queue/run", uuid))) + check_queues(self.monitor_cfg) + self.assertFalse(os.path.islink(joinpaths(self.monitor_cfg.composer_dir, "queue/run", uuid))) + + def test_broken_new_symlinks(self): + """Put a broken symlink into queue/new and make sure it is removed""" + uuid = str(uuid4()) + os.symlink(joinpaths(self.monitor_cfg.composer_dir, "results", uuid), + joinpaths(self.monitor_cfg.composer_dir, "queue/new", uuid)) + self.assertTrue(os.path.islink(joinpaths(self.monitor_cfg.composer_dir, "queue/new", uuid))) + check_queues(self.monitor_cfg) + self.assertFalse(os.path.islink(joinpaths(self.monitor_cfg.composer_dir, "queue/new", uuid))) + + def test_stale_run_symlink(self): + """Put a valid symlink in run, make sure it is set to FAILED and removed""" + uuid = str(uuid4()) + os.makedirs(joinpaths(self.monitor_cfg.composer_dir, "results", uuid)) + os.symlink(joinpaths(self.monitor_cfg.composer_dir, "results", uuid), + joinpaths(self.monitor_cfg.composer_dir, "queue/run", uuid)) + self.assertTrue(os.path.islink(joinpaths(self.monitor_cfg.composer_dir, "queue/run", uuid))) + check_queues(self.monitor_cfg) + self.assertFalse(os.path.islink(joinpaths(self.monitor_cfg.composer_dir, "queue/run", uuid))) + status = open(joinpaths(self.monitor_cfg.composer_dir, "results", uuid, "STATUS")).read().strip() + self.assertEqual(status, "FAILED") + + def test_missing_status(self): + """Create a results dir w/o STATUS and confirm it is set to FAILED""" + uuid = str(uuid4()) + os.makedirs(joinpaths(self.monitor_cfg.composer_dir, "results", uuid)) + check_queues(self.monitor_cfg) + status = open(joinpaths(self.monitor_cfg.composer_dir, "results", uuid, "STATUS")).read().strip() + self.assertEqual(status, "FAILED") + + def test_running_status(self): + """Create a results dir with STATUS set to RUNNING and confirm it is set to FAILED""" + uuid = str(uuid4()) + os.makedirs(joinpaths(self.monitor_cfg.composer_dir, "results", uuid)) + open(joinpaths(self.monitor_cfg.composer_dir, "results", uuid, "STATUS"), "w").write("RUNNING\n") + check_queues(self.monitor_cfg) + status = open(joinpaths(self.monitor_cfg.composer_dir, "results", uuid, "STATUS")).read().strip() + self.assertEqual(status, "FAILED") + + def test_missing_new_symlink(self): + """Create a results dir with STATUS set to WAITING and confirm a symlink is created in queue/new""" + uuid = str(uuid4()) + os.makedirs(joinpaths(self.monitor_cfg.composer_dir, "results", uuid)) + open(joinpaths(self.monitor_cfg.composer_dir, "results", uuid, "STATUS"), "w").write("WAITING\n") + check_queues(self.monitor_cfg) + status = open(joinpaths(self.monitor_cfg.composer_dir, "results", uuid, "STATUS")).read().strip() + self.assertEqual(status, "WAITING") + self.assertTrue(os.path.islink(joinpaths(self.monitor_cfg.composer_dir, "queue/new", uuid)))