lorax-composer: Check the queue and results at startup

If the system ran out of space, or was rebooted unexpectedly, the state
of the queue symlinks, or the results STATUS files may be inconsistent.
This checks them and:
 * Removes broken symlinks from queue/new and queue/run
 * Removes symlinks from run and sets the build to FAILED
 * Sets builds w/o a STATUS to FAILED
 * Sets builds with STATUS of RUNNING to FAILED
 * Creates missing queue/new symlinks to results with STATUS of WAITING

So, any builds that were running during the reboot will be FAILED, and
any that were waiting to be started will be started upon rebooting.

Resolves: rhbz#1647985
This commit is contained in:
Brian C. Lane 2018-11-30 15:45:18 -08:00
parent 642b909d24
commit 4dd9004d13
2 changed files with 151 additions and 8 deletions

View File

@ -36,6 +36,47 @@ from pylorax.base import DataHolder
from pylorax.creator import run_creator from pylorax.creator import run_creator
from pylorax.sysutils import joinpaths from pylorax.sysutils import joinpaths
def check_queues(cfg):
"""Check to make sure the new and run queue symlinks are correct
:param cfg: Configuration settings
:type cfg: DataHolder
Also check all of the existing results and make sure any with WAITING
set in STATUS have a symlink in queue/new/
"""
# Remove broken symlinks from the new and run queues
queue_symlinks = glob(joinpaths(cfg.composer_dir, "queue/new/*")) + \
glob(joinpaths(cfg.composer_dir, "queue/run/*"))
for link in queue_symlinks:
if not os.path.isdir(os.path.realpath(link)):
log.info("Removing broken symlink %s", link)
os.unlink(link)
# Write FAILED to the STATUS of any run queue symlinks and remove them
for link in glob(joinpaths(cfg.composer_dir, "queue/run/*")):
log.info("Setting build %s to FAILED, and removing symlink from queue/run/", os.path.basename(link))
open(joinpaths(link, "STATUS"), "w").write("FAILED\n")
os.unlink(link)
# Check results STATUS messages
# - If STATUS is missing, set it to FAILED
# - RUNNING should be changed to FAILED
# - WAITING should have a symlink in the new queue
for link in glob(joinpaths(cfg.composer_dir, "results/*")):
if not os.path.exists(joinpaths(link, "STATUS")):
open(joinpaths(link, "STATUS"), "w").write("FAILED\n")
continue
status = open(joinpaths(link, "STATUS")).read().strip()
if status == "RUNNING":
log.info("Setting build %s to FAILED", os.path.basename(link))
open(joinpaths(link, "STATUS"), "w").write("FAILED\n")
elif status == "WAITING":
if not os.path.islink(joinpaths(cfg.composer_dir, "queue/new/", os.path.basename(link))):
log.info("Creating missing symlink to new build %s", os.path.basename(link))
os.symlink(link, joinpaths(cfg.composer_dir, "queue/new/", os.path.basename(link)))
def start_queue_monitor(cfg, uid, gid): def start_queue_monitor(cfg, uid, gid):
"""Start the queue monitor as a mp process """Start the queue monitor as a mp process
@ -69,7 +110,7 @@ def monitor(cfg):
compose is finished) the symlink will be moved into ./queue/run/ and a STATUS file compose is finished) the symlink will be moved into ./queue/run/ and a STATUS file
will be created in the results directory. will be created in the results directory.
STATUS can contain one of: RUNNING, FINISHED, FAILED STATUS can contain one of: WAITING, RUNNING, FINISHED, FAILED
If the system is restarted while a compose is running it will move any old symlinks If the system is restarted while a compose is running it will move any old symlinks
from ./queue/run/ to ./queue/new/ and rerun them. from ./queue/run/ to ./queue/new/ and rerun them.
@ -78,13 +119,7 @@ def monitor(cfg):
"""Sort the queue entries by their mtime, not their names""" """Sort the queue entries by their mtime, not their names"""
return os.stat(joinpaths(cfg.composer_dir, "queue/new", uuid)).st_mtime return os.stat(joinpaths(cfg.composer_dir, "queue/new", uuid)).st_mtime
# Move any symlinks in the run queue back to the new queue check_queues(cfg)
for link in os.listdir(joinpaths(cfg.composer_dir, "queue/run")):
src = joinpaths(cfg.composer_dir, "queue/run", link)
dst = joinpaths(cfg.composer_dir, "queue/new", link)
os.rename(src, dst)
log.debug("Moved unfinished compose %s back to new state", src)
while True: while True:
uuids = sorted(os.listdir(joinpaths(cfg.composer_dir, "queue/new")), key=queue_sort) uuids = sorted(os.listdir(joinpaths(cfg.composer_dir, "queue/new")), key=queue_sort)

108
tests/pylorax/test_queue.py Normal file
View File

@ -0,0 +1,108 @@
#
# Copyright (C) 2018 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import os
import shutil
import tempfile
import unittest
from uuid import uuid4
from pylorax.api.config import configure, make_queue_dirs
from pylorax.api.queue import check_queues
from pylorax.base import DataHolder
from pylorax.sysutils import joinpaths
class QueueTestCase(unittest.TestCase):
@classmethod
def setUpClass(self):
self.maxDiff = None
self.config = dict()
repo_dir = tempfile.mkdtemp(prefix="lorax.test.repo.")
self.config["REPO_DIR"] = repo_dir
self.config["COMPOSER_CFG"] = configure(root_dir=repo_dir, test_config=True)
os.makedirs(joinpaths(self.config["COMPOSER_CFG"].get("composer", "share_dir"), "composer"))
errors = make_queue_dirs(self.config["COMPOSER_CFG"], os.getgid())
if errors:
raise RuntimeError("\n".join(errors))
lib_dir = self.config["COMPOSER_CFG"].get("composer", "lib_dir")
share_dir = self.config["COMPOSER_CFG"].get("composer", "share_dir")
tmp = self.config["COMPOSER_CFG"].get("composer", "tmp")
self.monitor_cfg = DataHolder(composer_dir=lib_dir, share_dir=share_dir, uid=0, gid=0, tmp=tmp)
@classmethod
def tearDownClass(self):
shutil.rmtree(self.config["REPO_DIR"])
def test_broken_run_symlinks(self):
"""Put a broken symlink into queue/run and make sure it is removed"""
uuid = str(uuid4())
os.symlink(joinpaths(self.monitor_cfg.composer_dir, "results", uuid),
joinpaths(self.monitor_cfg.composer_dir, "queue/run", uuid))
self.assertTrue(os.path.islink(joinpaths(self.monitor_cfg.composer_dir, "queue/run", uuid)))
check_queues(self.monitor_cfg)
self.assertFalse(os.path.islink(joinpaths(self.monitor_cfg.composer_dir, "queue/run", uuid)))
def test_broken_new_symlinks(self):
"""Put a broken symlink into queue/new and make sure it is removed"""
uuid = str(uuid4())
os.symlink(joinpaths(self.monitor_cfg.composer_dir, "results", uuid),
joinpaths(self.monitor_cfg.composer_dir, "queue/new", uuid))
self.assertTrue(os.path.islink(joinpaths(self.monitor_cfg.composer_dir, "queue/new", uuid)))
check_queues(self.monitor_cfg)
self.assertFalse(os.path.islink(joinpaths(self.monitor_cfg.composer_dir, "queue/new", uuid)))
def test_stale_run_symlink(self):
"""Put a valid symlink in run, make sure it is set to FAILED and removed"""
uuid = str(uuid4())
os.makedirs(joinpaths(self.monitor_cfg.composer_dir, "results", uuid))
os.symlink(joinpaths(self.monitor_cfg.composer_dir, "results", uuid),
joinpaths(self.monitor_cfg.composer_dir, "queue/run", uuid))
self.assertTrue(os.path.islink(joinpaths(self.monitor_cfg.composer_dir, "queue/run", uuid)))
check_queues(self.monitor_cfg)
self.assertFalse(os.path.islink(joinpaths(self.monitor_cfg.composer_dir, "queue/run", uuid)))
status = open(joinpaths(self.monitor_cfg.composer_dir, "results", uuid, "STATUS")).read().strip()
self.assertEqual(status, "FAILED")
def test_missing_status(self):
"""Create a results dir w/o STATUS and confirm it is set to FAILED"""
uuid = str(uuid4())
os.makedirs(joinpaths(self.monitor_cfg.composer_dir, "results", uuid))
check_queues(self.monitor_cfg)
status = open(joinpaths(self.monitor_cfg.composer_dir, "results", uuid, "STATUS")).read().strip()
self.assertEqual(status, "FAILED")
def test_running_status(self):
"""Create a results dir with STATUS set to RUNNING and confirm it is set to FAILED"""
uuid = str(uuid4())
os.makedirs(joinpaths(self.monitor_cfg.composer_dir, "results", uuid))
open(joinpaths(self.monitor_cfg.composer_dir, "results", uuid, "STATUS"), "w").write("RUNNING\n")
check_queues(self.monitor_cfg)
status = open(joinpaths(self.monitor_cfg.composer_dir, "results", uuid, "STATUS")).read().strip()
self.assertEqual(status, "FAILED")
def test_missing_new_symlink(self):
"""Create a results dir with STATUS set to WAITING and confirm a symlink is created in queue/new"""
uuid = str(uuid4())
os.makedirs(joinpaths(self.monitor_cfg.composer_dir, "results", uuid))
open(joinpaths(self.monitor_cfg.composer_dir, "results", uuid, "STATUS"), "w").write("WAITING\n")
check_queues(self.monitor_cfg)
status = open(joinpaths(self.monitor_cfg.composer_dir, "results", uuid, "STATUS")).read().strip()
self.assertEqual(status, "WAITING")
self.assertTrue(os.path.islink(joinpaths(self.monitor_cfg.composer_dir, "queue/new", uuid)))