Add basic composer queue handling
The queue is in /var/lib/weldr/queue/new by default. It watches the directory for new symlinks (to /var/lib/weldr/results/<dirname>) and handles running anaconda on the kickstart found in final-kickstart.ks inside the symlinked directory.
This commit is contained in:
parent
0ce4197a1e
commit
e2f4674fb3
128
src/pylorax/api/queue.py
Normal file
128
src/pylorax/api/queue.py
Normal file
@ -0,0 +1,128 @@
|
||||
# Copyright (C) 2018 Red Hat, Inc.
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation; either version 2 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
""" Functions to monitor compose queue and run anaconda"""
|
||||
import logging
|
||||
log = logging.getLogger("pylorax")
|
||||
|
||||
import os
|
||||
import time
|
||||
from pykickstart.version import makeVersion, RHEL7
|
||||
from pykickstart.parser import KickstartParser
|
||||
|
||||
from pylorax.base import DataHolder
|
||||
from pylorax.imgutils import default_image_name
|
||||
from pylorax.installer import novirt_install
|
||||
from pylorax.sysutils import joinpaths
|
||||
|
||||
# TODO needs a quit queue to cleanly manage quitting
|
||||
def monitor(cfg, cancel_q):
|
||||
""" Monitor the queue for new compose requests
|
||||
|
||||
The queue has 2 subdirectories, new and run. When a compose is ready to be run
|
||||
a symlink to the uniquely named results directory should be placed in ./queue/new/
|
||||
|
||||
When the it is ready to be run (it is checked every 30 seconds or after a previous
|
||||
compose is finished) the symlink will be moved into ./queue/run/ and a STATUS file
|
||||
will be created in the results directory.
|
||||
|
||||
STATUS can contain one of: RUNNING, FINISHED, FAILED
|
||||
|
||||
If the system is restarted while a compose is running it will move any old symlinks
|
||||
from ./queue/run/ to ./queue/new/ and rerun them.
|
||||
"""
|
||||
def queue_sort(job):
|
||||
"""Sort the queue entries by their mtime, not their names"""
|
||||
return os.stat(joinpaths(cfg.composer_dir, "queue/new", job)).st_mtime
|
||||
|
||||
# Move any symlinks in the run queue back to the new queue
|
||||
for link in os.listdir(joinpaths(cfg.composer_dir, "queue/run")):
|
||||
src = joinpaths(cfg.composer_dir, "queue/run", link)
|
||||
dst = joinpaths(cfg.composer_dir, "queue/new", link)
|
||||
os.rename(src, dst)
|
||||
log.debug("Moved unfinished compose %s back to new state", src)
|
||||
|
||||
while True:
|
||||
jobs = sorted(os.listdir(joinpaths(cfg.composer_dir, "queue/new")), key=queue_sort)
|
||||
log.debug("jobs = %s", jobs)
|
||||
|
||||
# Pick the oldest and move it into ./run/
|
||||
if not jobs:
|
||||
# No composes left to process, sleep for a bit
|
||||
time.sleep(30)
|
||||
else:
|
||||
src = joinpaths(cfg.composer_dir, "queue/new", jobs[0])
|
||||
dst = joinpaths(cfg.composer_dir, "queue/run", jobs[0])
|
||||
os.rename(src, dst)
|
||||
log.info("Starting new compose: %s", dst)
|
||||
open(joinpaths(dst, "STATUS"), "w").write("RUNNING\n")
|
||||
|
||||
try:
|
||||
make_compose(cfg, os.path.realpath(dst))
|
||||
log.info("Finished building %s, results are in %s", dst, os.path.realpath(dst))
|
||||
open(joinpaths(dst, "STATUS"), "w").write("FINISHED\n")
|
||||
except Exception as e:
|
||||
log.error("Error running compose: %s", e)
|
||||
open(joinpaths(dst, "STATUS"), "w").write("FAILED\n")
|
||||
os.unlink(dst)
|
||||
|
||||
def make_compose(cfg, results_dir):
|
||||
"""Run anaconda with the final-kickstart.ks from results_dir"""
|
||||
|
||||
# Check on the ks's presense
|
||||
ks_path = joinpaths(results_dir, "final-kickstart.ks")
|
||||
if not os.path.exists(ks_path):
|
||||
raise RuntimeError("Missing kickstart file at %s" % ks_path)
|
||||
|
||||
# The anaconda logs are copied into ./anaconda/ in this directory
|
||||
log_dir = joinpaths(results_dir, "logs/")
|
||||
if not os.path.exists(log_dir):
|
||||
os.makedirs(log_dir)
|
||||
|
||||
ks_version = makeVersion(RHEL7)
|
||||
ks = KickstartParser(ks_version, errorsAreFatal=False, missingIncludeIsFatal=False)
|
||||
ks.readKickstart(ks_path)
|
||||
repo_url = ks.handler.method.url
|
||||
|
||||
# TODO -- This will change based on the type of image requested
|
||||
# Configuration to pass to novirt_install
|
||||
install_cfg = DataHolder(
|
||||
image_name = default_image_name("xz", "root.tar"),
|
||||
compression = "xz",
|
||||
#compress_args = ["-9"],
|
||||
compress_args = [],
|
||||
ks = [ks_path],
|
||||
anaconda_args = "",
|
||||
proxy = "",
|
||||
armplatform = "",
|
||||
|
||||
make_tar = True,
|
||||
make_iso = False,
|
||||
make_fsimage = False,
|
||||
fslabel = "",
|
||||
qcow2 = False,
|
||||
|
||||
project = "Red Hat Enterprise Linux",
|
||||
releasever = "7",
|
||||
|
||||
logfile=log_dir
|
||||
)
|
||||
|
||||
# Some kludges for the 99-copy-logs %post, failure in it will crash the build
|
||||
for f in ["/tmp/NOSAVE_INPUT_KS", "/tmp/NOSAVE_LOGS"]:
|
||||
open(f, "w")
|
||||
|
||||
log.info("repo_url = %s, cfg = %s", repo_url, install_cfg)
|
||||
novirt_install(install_cfg, joinpaths(results_dir, install_cfg.image_name), None, repo_url)
|
@ -26,17 +26,24 @@ yum_log = logging.getLogger("yum")
|
||||
|
||||
import argparse
|
||||
import grp
|
||||
import multiprocessing as mp
|
||||
import os
|
||||
import pwd
|
||||
import sys
|
||||
from threading import Lock
|
||||
from gevent import socket
|
||||
from gevent.wsgi import WSGIServer
|
||||
|
||||
from pyanaconda.queue import QueueFactory
|
||||
|
||||
from pylorax import vernum
|
||||
from pylorax.api.config import configure
|
||||
from pylorax.api.queue import monitor
|
||||
from pylorax.api.recipes import open_or_create_repo, commit_recipe_directory
|
||||
from pylorax.api.server import server, GitLock, YumLock
|
||||
from pylorax.api.yumbase import get_base_object
|
||||
from pylorax.base import DataHolder
|
||||
from pylorax.sysutils import joinpaths
|
||||
|
||||
VERSION = "{0}-{1}".format(os.path.basename(sys.argv[0]), vernum)
|
||||
|
||||
@ -48,12 +55,16 @@ def get_parser():
|
||||
|
||||
parser.add_argument("--socket", default="/run/weldr/api.socket", metavar="SOCKET",
|
||||
help="Path to the socket file to listen on")
|
||||
parser.add_argument("--user", default="weldr", metavar="USER",
|
||||
help="User to use for reduced permissions")
|
||||
parser.add_argument("--group", default="weldr", metavar="GROUP",
|
||||
help="Group to set ownership of the socket to")
|
||||
parser.add_argument("--log", dest="logfile", default="/var/log/lorax-composer/composer.log", metavar="LOG",
|
||||
help="Path to logfile (/var/log/lorax-composer/composer.log)")
|
||||
parser.add_argument("--mockfiles", default="/var/tmp/bdcs-mockfiles/", metavar="MOCKFILES",
|
||||
help="Path to JSON files used for /api/mock/ paths (/var/tmp/bdcs-mockfiles/)")
|
||||
parser.add_argument("--libdir", default="/var/lib/weldr", metavar="LIBDIR",
|
||||
help="Path to queue and results directory (/var/lib/weldr/)")
|
||||
parser.add_argument("-V", action="store_true", dest="showver",
|
||||
help="show program's version number and exit")
|
||||
parser.add_argument("-c", "--config", default="/etc/lorax/composer.conf", metavar="CONFIG",
|
||||
@ -131,12 +142,39 @@ if __name__ == '__main__':
|
||||
setup_logging(opts.logfile)
|
||||
log.debug("opts=%s", opts)
|
||||
|
||||
errors = []
|
||||
# Check to make sure the user exists and get its uid
|
||||
try:
|
||||
uid = pwd.getpwnam(opts.user).pw_uid
|
||||
except KeyError:
|
||||
errors.append("Missing user '%s'" % opts.user)
|
||||
|
||||
# Check to make sure the group exists and get its gid
|
||||
try:
|
||||
gid = grp.getgrnam(opts.group).gr_gid
|
||||
except KeyError:
|
||||
log.error("Missing group '%s'", opts.group)
|
||||
sys.exit(1)
|
||||
errors.append("Missing group '%s'" % opts.group)
|
||||
|
||||
# Make sure the libdir path is setup correctly
|
||||
if not os.path.exists(opts.libdir):
|
||||
log.info("%s does not exist, creating it and the required subdirectories.", opts.libdir)
|
||||
orig_umask = os.umask(0)
|
||||
# Create the directories and set permissions and ownership
|
||||
for p in ["queue/run", "queue/new", "results"]:
|
||||
p_dir = joinpaths(opts.libdir, p)
|
||||
os.makedirs(p_dir, 0o770)
|
||||
os.chown(p_dir, 0, gid)
|
||||
os.umask(orig_umask)
|
||||
|
||||
# Check ownership and permissions on the libdir tree
|
||||
for p in ["queue/run", "queue/new", "results"]:
|
||||
p_dir = joinpaths(opts.libdir, p)
|
||||
p_stat = os.stat(p_dir)
|
||||
if p_stat.st_mode & 0o007 != 0:
|
||||
errors.append("Incorrect permissions on %s, no 'other' permissions are allowed." % p_dir)
|
||||
|
||||
if p_stat.st_gid != gid or p_stat.st_uid != 0:
|
||||
errors.append("%s should be owned by root:%s" % (p_dir, opts.group))
|
||||
|
||||
# Check the socket path to make sure it exists, and that ownership and permissions are correct.
|
||||
socket_dir = os.path.dirname(opts.socket)
|
||||
@ -147,17 +185,20 @@ if __name__ == '__main__':
|
||||
|
||||
sockdir_stat = os.stat(socket_dir)
|
||||
if sockdir_stat.st_mode & 0o007 != 0:
|
||||
log.error("Incorrect permissions on %s, no 'other' permissions are allowed.")
|
||||
sys.exit(1)
|
||||
errors.append("Incorrect permissions on %s, no 'other' permissions are allowed." % socket_dir)
|
||||
|
||||
if sockdir_stat.st_gid != gid or sockdir_stat.st_uid != 0:
|
||||
log.error("%s should be owned by root:%s", socket_dir, opts.group)
|
||||
sys.exit(1)
|
||||
errors.append("%s should be owned by root:%s" % (socket_dir, opts.group))
|
||||
|
||||
if not os.path.isdir(opts.RECIPES):
|
||||
log.warn("Creating empty recipe directory at %s", opts.RECIPES)
|
||||
os.makedirs(opts.RECIPES)
|
||||
|
||||
if errors:
|
||||
for e in errors:
|
||||
log.error(e)
|
||||
sys.exit(1)
|
||||
|
||||
server.config["REPO_DIR"] = opts.RECIPES
|
||||
repo = open_or_create_repo(server.config["REPO_DIR"])
|
||||
server.config["GITLOCK"] = GitLock(repo=repo, lock=Lock(), dir=opts.RECIPES)
|
||||
@ -183,6 +224,17 @@ if __name__ == '__main__':
|
||||
os.chown(opts.socket, 0, gid)
|
||||
listener.listen(1)
|
||||
|
||||
# Start queue monitor thread as root
|
||||
cancel_q = QueueFactory("cancel")
|
||||
cancel_q.addMessage("cancel", 0)
|
||||
cfg = DataHolder(composer_dir=opts.libdir, uid=uid, gid=gid)
|
||||
p = mp.Process(target=monitor, args=(cfg, cancel_q))
|
||||
p.daemon = True
|
||||
p.start()
|
||||
|
||||
# Drop root privileges on the main process
|
||||
os.setuid(uid)
|
||||
|
||||
log.info("Starting %s on %s with recipes from %s", VERSION, opts.socket, opts.RECIPES)
|
||||
http_server = WSGIServer(listener, server, log=LogWrapper(server_log))
|
||||
# The server writes directly to a file object, so point to our log directory
|
||||
|
Loading…
Reference in New Issue
Block a user