From e2f4674fb3e2b42e92c709c01b974028090139ed Mon Sep 17 00:00:00 2001 From: "Brian C. Lane" Date: Wed, 24 Jan 2018 12:11:03 -0800 Subject: [PATCH] Add basic composer queue handling The queue is in /var/lib/weldr/queue/new by default. It watches the directory for new symlinks (to /var/lib/weldr/results/) and handles running anaconda on the kickstart found in final-kickstart.ks inside the symlinked directory. --- src/pylorax/api/queue.py | 128 +++++++++++++++++++++++++++++++++++++++ src/sbin/lorax-composer | 64 ++++++++++++++++++-- 2 files changed, 186 insertions(+), 6 deletions(-) create mode 100644 src/pylorax/api/queue.py diff --git a/src/pylorax/api/queue.py b/src/pylorax/api/queue.py new file mode 100644 index 00000000..5bc4b74d --- /dev/null +++ b/src/pylorax/api/queue.py @@ -0,0 +1,128 @@ +# Copyright (C) 2018 Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +""" Functions to monitor compose queue and run anaconda""" +import logging +log = logging.getLogger("pylorax") + +import os +import time +from pykickstart.version import makeVersion, RHEL7 +from pykickstart.parser import KickstartParser + +from pylorax.base import DataHolder +from pylorax.imgutils import default_image_name +from pylorax.installer import novirt_install +from pylorax.sysutils import joinpaths + +# TODO needs a quit queue to cleanly manage quitting +def monitor(cfg, cancel_q): + """ Monitor the queue for new compose requests + + The queue has 2 subdirectories, new and run. When a compose is ready to be run + a symlink to the uniquely named results directory should be placed in ./queue/new/ + + When the it is ready to be run (it is checked every 30 seconds or after a previous + compose is finished) the symlink will be moved into ./queue/run/ and a STATUS file + will be created in the results directory. + + STATUS can contain one of: RUNNING, FINISHED, FAILED + + If the system is restarted while a compose is running it will move any old symlinks + from ./queue/run/ to ./queue/new/ and rerun them. + """ + def queue_sort(job): + """Sort the queue entries by their mtime, not their names""" + return os.stat(joinpaths(cfg.composer_dir, "queue/new", job)).st_mtime + + # Move any symlinks in the run queue back to the new queue + for link in os.listdir(joinpaths(cfg.composer_dir, "queue/run")): + src = joinpaths(cfg.composer_dir, "queue/run", link) + dst = joinpaths(cfg.composer_dir, "queue/new", link) + os.rename(src, dst) + log.debug("Moved unfinished compose %s back to new state", src) + + while True: + jobs = sorted(os.listdir(joinpaths(cfg.composer_dir, "queue/new")), key=queue_sort) + log.debug("jobs = %s", jobs) + + # Pick the oldest and move it into ./run/ + if not jobs: + # No composes left to process, sleep for a bit + time.sleep(30) + else: + src = joinpaths(cfg.composer_dir, "queue/new", jobs[0]) + dst = joinpaths(cfg.composer_dir, "queue/run", jobs[0]) + os.rename(src, dst) + log.info("Starting new compose: %s", dst) + open(joinpaths(dst, "STATUS"), "w").write("RUNNING\n") + + try: + make_compose(cfg, os.path.realpath(dst)) + log.info("Finished building %s, results are in %s", dst, os.path.realpath(dst)) + open(joinpaths(dst, "STATUS"), "w").write("FINISHED\n") + except Exception as e: + log.error("Error running compose: %s", e) + open(joinpaths(dst, "STATUS"), "w").write("FAILED\n") + os.unlink(dst) + +def make_compose(cfg, results_dir): + """Run anaconda with the final-kickstart.ks from results_dir""" + + # Check on the ks's presense + ks_path = joinpaths(results_dir, "final-kickstart.ks") + if not os.path.exists(ks_path): + raise RuntimeError("Missing kickstart file at %s" % ks_path) + + # The anaconda logs are copied into ./anaconda/ in this directory + log_dir = joinpaths(results_dir, "logs/") + if not os.path.exists(log_dir): + os.makedirs(log_dir) + + ks_version = makeVersion(RHEL7) + ks = KickstartParser(ks_version, errorsAreFatal=False, missingIncludeIsFatal=False) + ks.readKickstart(ks_path) + repo_url = ks.handler.method.url + + # TODO -- This will change based on the type of image requested + # Configuration to pass to novirt_install + install_cfg = DataHolder( + image_name = default_image_name("xz", "root.tar"), + compression = "xz", + #compress_args = ["-9"], + compress_args = [], + ks = [ks_path], + anaconda_args = "", + proxy = "", + armplatform = "", + + make_tar = True, + make_iso = False, + make_fsimage = False, + fslabel = "", + qcow2 = False, + + project = "Red Hat Enterprise Linux", + releasever = "7", + + logfile=log_dir + ) + + # Some kludges for the 99-copy-logs %post, failure in it will crash the build + for f in ["/tmp/NOSAVE_INPUT_KS", "/tmp/NOSAVE_LOGS"]: + open(f, "w") + + log.info("repo_url = %s, cfg = %s", repo_url, install_cfg) + novirt_install(install_cfg, joinpaths(results_dir, install_cfg.image_name), None, repo_url) diff --git a/src/sbin/lorax-composer b/src/sbin/lorax-composer index 66cfb671..44e1c73a 100755 --- a/src/sbin/lorax-composer +++ b/src/sbin/lorax-composer @@ -26,17 +26,24 @@ yum_log = logging.getLogger("yum") import argparse import grp +import multiprocessing as mp import os +import pwd import sys from threading import Lock from gevent import socket from gevent.wsgi import WSGIServer +from pyanaconda.queue import QueueFactory + from pylorax import vernum from pylorax.api.config import configure +from pylorax.api.queue import monitor from pylorax.api.recipes import open_or_create_repo, commit_recipe_directory from pylorax.api.server import server, GitLock, YumLock from pylorax.api.yumbase import get_base_object +from pylorax.base import DataHolder +from pylorax.sysutils import joinpaths VERSION = "{0}-{1}".format(os.path.basename(sys.argv[0]), vernum) @@ -48,12 +55,16 @@ def get_parser(): parser.add_argument("--socket", default="/run/weldr/api.socket", metavar="SOCKET", help="Path to the socket file to listen on") + parser.add_argument("--user", default="weldr", metavar="USER", + help="User to use for reduced permissions") parser.add_argument("--group", default="weldr", metavar="GROUP", help="Group to set ownership of the socket to") parser.add_argument("--log", dest="logfile", default="/var/log/lorax-composer/composer.log", metavar="LOG", help="Path to logfile (/var/log/lorax-composer/composer.log)") parser.add_argument("--mockfiles", default="/var/tmp/bdcs-mockfiles/", metavar="MOCKFILES", help="Path to JSON files used for /api/mock/ paths (/var/tmp/bdcs-mockfiles/)") + parser.add_argument("--libdir", default="/var/lib/weldr", metavar="LIBDIR", + help="Path to queue and results directory (/var/lib/weldr/)") parser.add_argument("-V", action="store_true", dest="showver", help="show program's version number and exit") parser.add_argument("-c", "--config", default="/etc/lorax/composer.conf", metavar="CONFIG", @@ -131,12 +142,39 @@ if __name__ == '__main__': setup_logging(opts.logfile) log.debug("opts=%s", opts) + errors = [] + # Check to make sure the user exists and get its uid + try: + uid = pwd.getpwnam(opts.user).pw_uid + except KeyError: + errors.append("Missing user '%s'" % opts.user) + # Check to make sure the group exists and get its gid try: gid = grp.getgrnam(opts.group).gr_gid except KeyError: - log.error("Missing group '%s'", opts.group) - sys.exit(1) + errors.append("Missing group '%s'" % opts.group) + + # Make sure the libdir path is setup correctly + if not os.path.exists(opts.libdir): + log.info("%s does not exist, creating it and the required subdirectories.", opts.libdir) + orig_umask = os.umask(0) + # Create the directories and set permissions and ownership + for p in ["queue/run", "queue/new", "results"]: + p_dir = joinpaths(opts.libdir, p) + os.makedirs(p_dir, 0o770) + os.chown(p_dir, 0, gid) + os.umask(orig_umask) + + # Check ownership and permissions on the libdir tree + for p in ["queue/run", "queue/new", "results"]: + p_dir = joinpaths(opts.libdir, p) + p_stat = os.stat(p_dir) + if p_stat.st_mode & 0o007 != 0: + errors.append("Incorrect permissions on %s, no 'other' permissions are allowed." % p_dir) + + if p_stat.st_gid != gid or p_stat.st_uid != 0: + errors.append("%s should be owned by root:%s" % (p_dir, opts.group)) # Check the socket path to make sure it exists, and that ownership and permissions are correct. socket_dir = os.path.dirname(opts.socket) @@ -147,17 +185,20 @@ if __name__ == '__main__': sockdir_stat = os.stat(socket_dir) if sockdir_stat.st_mode & 0o007 != 0: - log.error("Incorrect permissions on %s, no 'other' permissions are allowed.") - sys.exit(1) + errors.append("Incorrect permissions on %s, no 'other' permissions are allowed." % socket_dir) if sockdir_stat.st_gid != gid or sockdir_stat.st_uid != 0: - log.error("%s should be owned by root:%s", socket_dir, opts.group) - sys.exit(1) + errors.append("%s should be owned by root:%s" % (socket_dir, opts.group)) if not os.path.isdir(opts.RECIPES): log.warn("Creating empty recipe directory at %s", opts.RECIPES) os.makedirs(opts.RECIPES) + if errors: + for e in errors: + log.error(e) + sys.exit(1) + server.config["REPO_DIR"] = opts.RECIPES repo = open_or_create_repo(server.config["REPO_DIR"]) server.config["GITLOCK"] = GitLock(repo=repo, lock=Lock(), dir=opts.RECIPES) @@ -183,6 +224,17 @@ if __name__ == '__main__': os.chown(opts.socket, 0, gid) listener.listen(1) + # Start queue monitor thread as root + cancel_q = QueueFactory("cancel") + cancel_q.addMessage("cancel", 0) + cfg = DataHolder(composer_dir=opts.libdir, uid=uid, gid=gid) + p = mp.Process(target=monitor, args=(cfg, cancel_q)) + p.daemon = True + p.start() + + # Drop root privileges on the main process + os.setuid(uid) + log.info("Starting %s on %s with recipes from %s", VERSION, opts.socket, opts.RECIPES) http_server = WSGIServer(listener, server, log=LogWrapper(server_log)) # The server writes directly to a file object, so point to our log directory