os-autoinst-distri-fedora/kstest-converter

462 lines
19 KiB
Python
Executable File

#!/bin/python3
import argparse
import json
import logging
import os
import re
import shutil
import subprocess
import sys
import warnings
with warnings.catch_warnings():
warnings.simplefilter("ignore")
import pykickstart.version
import pykickstart.parser
SKIPS = [
# we haven't really figured out how to do NFS in openQA yet
'nfs-repo-and-addon',
# the prep for this one probably needs to be in fedora_openqa_schedule
'liveimg',
# i'll figure this one out later
'basic-ostree',
# doesn't install enough bits for the system to be bootable
'container',
# FIXMEs:
# hostname - changes hostname, breaks 'text_console_login' needle
# raid-1 - only works on Rawhide (not 24)
# default-fstype, reqpart - trip up on xfs vs. ext4 with Server image
# driverdisk-disk - I didn't hook up image generation at all yet
# proxy-cmdline, proxy-auth - require generation of some test repos
]
def _find_tests(dir):
"""Find the tests to run from a directory name (should be a
checkout of kickstart-tests).
"""
# find .ks.in files
ksins = [ks for ks in os.listdir(dir) if ks.endswith('ks.in')]
# filter tests we can't do yet
ksins = [ks for ks in ksins if not any(ks.startswith(skip) for skip in SKIPS)]
# strip .ks.in
return [ksin.replace('.ks.in', '') for ksin in ksins]
def _get_texts(path, test):
"""Return the text of the .ks.in and .sh files."""
# get text of .sh and .ks.in files
with open('{0}/{1}.sh'.format(path, test), 'r') as shfh:
sh = shfh.read()
with open('{0}/{1}.ks.in'.format(path, test), 'r') as ksinfh:
ksin = ksinfh.read()
return (sh, ksin)
def _kickstart_substitutions(ksin, ksurl, httprepo, nfsrepo, kstesturl=None, kstestftpurl=None):
"""Do the various substitutions necessary on .ks.in files. ksin is
the text of the file. 'ksurl' is the URL to the path where the
produced kickstarts will be available via HTTP; this is needed for
the 'ks-include' test which tests including another kickstart
file. 'kstesturl' is the text to replace @KSTEST_URL@ with - it
should be the '--mirrorlist=(url)' or '--url=(url)' component of a
kickstart 'repo' line. kstestftpurl is, similarly, the replacement
text for @KSTEST_FTP_URL@. If not set, defaults are used.
"""
if not kstesturl:
kstesturl = '--mirrorlist=https://mirrors.fedoraproject.org/mirrorlist?repo=fedora-$releasever&arch=$basearch'
if not kstestftpurl:
kstestftpurl = 'ftp://mirror.utexas.edu/pub/fedora/linux/development/rawhide/Everything/$basearch/os/'
ksin = ksin.replace('@KSTEST_URL@', kstesturl)
ksin = ksin.replace('@KSTEST_FTP_URL@', kstestftpurl)
ksin = ksin.replace('@KSTEST_HTTP_ADDON_REPO@', httprepo)
# this has two different placeholders, for some reason...
ksin = ksin.replace('HTTP-ADDON-REPO', httprepo)
ksin = ksin.replace('@KSTEST_NFS_ADDON_REPO@', nfsrepo)
# we always want to reboot, not shutdown
ksin = ksin.replace('shutdown', 'reboot')
# handle includes. note this means we're testing a slightly
# different path to the kickstart-tests runner: it tests
# inclusion of a local file while we test inclusion of an HTTP
# served file.
ksin = ksin.replace('KS-TEST-INCLUDE', "{0}/{1}".format(ksurl, 'ks-include-post.ks'))
return ksin
def prep_kickstarts(indir, ksurl, httprepo, nfsrepo, outdir, kstesturl=None, kstestftpurl=None):
"""Produce kickstarts in 'outdir' from .ks.in files in 'indir'.
'ksurl' is the URL to the path where the produced kickstarts
will be available via HTTP; this is needed for the 'ks-include'
test which tests including another kickstart file. 'kstesturl'
is the text to replace @KSTEST_URL@ with - it should be the
'--mirrorlist=(url)' or '--url=(url)' component of a kickstart
'repo' line. kstestftpurl is, similarly, the replacement text
for @KSTEST_FTP_URL@. If not set, defaults are used.
"""
if not kstesturl:
kstesturl = '--mirrorlist=https://mirrors.fedoraproject.org/mirrorlist?repo=fedora-$releasever&arch=$basearch'
if not kstestftpurl:
kstestftpurl = 'ftp://mirror.utexas.edu/pub/fedora/linux/development/rawhide/Everything/$basearch/os/'
if not os.path.isdir(outdir):
raise ValueError("Output directory {0} does not exist!".format(outdir))
# probably safest
indir = os.path.abspath(indir)
# build proxy-common.ks
# FIXME: we should do something more like run_kickstart_tests here
# but exclude install.img as it doesn't work
subprocess.check_call(('make', '-s', '-f', 'Makefile.prereqs', 'proxy-common.ks'),
cwd="{0}/{1}".format(indir, 'scripts'))
tests = _find_tests(indir)
if not tests:
raise ValueError("No tests found!")
for test in tests:
(sh, ksin) = _get_texts(indir, test)
# HACK: this test can't handle https repo
if test == 'proxy-auth':
_kstesturl = kstesturl.replace('https://', 'http://')
else:
_kstesturl = kstesturl
# do standard substitutions
ksin = _kickstart_substitutions(
ksin, ksurl, httprepo, nfsrepo, kstesturl=_kstesturl, kstestftpurl=kstestftpurl)
# replace '(non-alpha)sd(alpha)(non-alpha)' with '(non-alpha)
# vd(alpha)(non-alpha)' (sda -> vda etc). This is because
# openQA's default way of attaching disks results in them
# being vdX while the kickstart-tests runner's way makes them
# sdX. we could maybe harmonize those somehow instead of doing
# this.
ksin = re.sub(r'([^a-zA-Z])s(d[a-z][^a-zA-Z])', r"\1v\2", ksin)
# flatten the kickstart (some use includes). this code copies
# ksflatten quite closely.
if 'ksflatten' in sh:
# includes may be relative; we have to do this from indir
cwd = os.getcwd()
os.chdir(indir)
# good grief, pykickstart is chatty
with warnings.catch_warnings():
warnings.simplefilter("ignore")
handler = pykickstart.version.makeVersion()
parser = pykickstart.parser.KickstartParser(handler)
parser.readKickstartFromString(ksin)
ksin = str(parser.handler)
os.chdir(cwd)
# write out the processed .ks
ksout = "{0}.ks".format(test)
with open('{0}/{1}'.format(outdir, ksout), 'w') as ksoutfh:
ksoutfh.write(ksin)
# copy the 'include' test kickstart straight over.
shutil.copy('{0}/{1}'.format(indir, 'ks-include-post.ks'),
'{0}/{1}'.format(outdir, 'ks-include-post.ks'))
def merge_templates(indir, ksurl, tempfile, outfile):
"""Produce openQA test suites and job templates for all tests in
indir, merge them with the existing openQA templates file
'tempfile', and write the combined templates file as 'outfile'.
'ksurl' is the URL to the path where the kickstart files for
the tests can be found.
"""
tests = _find_tests(indir)
if not tests:
raise ValueError("No tests found!")
testsuites = [create_testsuite(test, indir, ksurl) for test in tests]
with open(tempfile, 'r') as tempfh:
templates = json.loads(tempfh.read())
templates = merge_testsuites(templates, testsuites)
with open(outfile, 'w') as outfh:
outfh.write(json.dumps(templates, sort_keys=True, indent=4,
separators=(',', ': ')))
def _get_settings_disk(sh):
"""Given text of a kickstart_tests test (.sh file) as sh, return
a list of appropriate openQA settings dicts for hard disks.
"""
# most prepare_disks just create empty disks of a given size in
# a standard way. in practice it's good enough to just create the
# right number of disks at openQA's standard size (10GB), that'll
# make things work. This is a rough assumption that may break in
# future. A missing openQA feature here is you can't tell it to
# create multiple clean disk images with different sizes.
settings = []
simpre = re.compile(r'qemu-img create -q -f qcow2 \$\{(tmp|disk)dir\}/disk-.\.img \d+G')
numdisks = len(simpre.findall(sh))
# the one tricky case that exists so far is the driverdisk case.
# it gets created elsewhere. here we just point to it.
if 'mkdud.py' in sh:
numdisks += 1
settings.append({'key': 'HDD_{0}'.format(str(numdisks)), 'value': "driverdisk.img"})
if numdisks > 0:
settings.append({'key': 'NUMDISKS', 'value': str(numdisks)})
return settings
def _get_settings_postinstall(sh):
"""Given text of a kickstart_tests test (.sh file) as sh, return
a list of appropriate openQA settings dict for post-install test
settings (currently a single-item list, but we're future-proofing
here).
"""
# just one test checks for RESULT in /home instead of /root
if '/home/RESULT' in sh:
return [{'key': 'POSTINSTALL', 'value': 'kstest_home'}]
else:
return [{'key': 'POSTINSTALL', 'value': 'kstest_root'}]
def _get_settings_keyboard(ksin):
"""Given text of a kickstart_tests .ks.in file as ksin, return
a list of appropriate openQA settings dict for keyboard test
settings (currently a single-item list, but we're future-proofing
here).
"""
# if the kickstart sets a keyboard layout, we set the LOADKEYS
# var to tell openQA to do 'loadkeys us' after login and LOGIN_
# KEYMAP to tell it to use the specified keymap for logging in
# and running loadkeys.
match = re.search(r'--vckeymap (\S+)', ksin)
if match:
return [
{'key': 'LOADKEYS', 'value': 1},
{'key': 'LOGIN_KEYMAP', 'value': match.group(1)},
]
else:
return []
def _get_settings_encrypt(ksin):
"""Given text of a kickstart_tests .ks.in file as ksin, return
a list of appropriate openQA settings dict for disk encryption
settings (currently a single-item list, but we're future-proofing
here).
"""
# the (?:) are non-capturing groups. We only care about the second
# group. The fun at the end is to capture a passphrase if it's
# inside quotes, and to stop matching when we reach either a white
# space character or the end of the line.
match = re.search(r"(?:logvol|part).*--passphrase='?(\S+?)'?(?:\s|$)", ksin)
if match:
return [{'key': 'ENCRYPT_PASSWORD', 'value': match.group(1)}]
else:
return []
def _get_settings_boot(sh, test, ksurl):
"""Given text of a kickstart_tests test (.sh file) as sh, the test
name as test and the kickstar base URL as ksurl, return a list
of appropriate openQA settings related to the bootloader. Usually
we just set inst.ks, but kickstart-tests has capacity for setting
other params, we must handle that; used by proxy-cmdline for e.g.
"""
value = "inst.ks={0}/{1}.ks".format(ksurl.strip('/'), test)
# ok, this is an awful excuse for parsing. sorry. It does handle
# comments! it'll break if the function is not just a single line
# of args, though.
lines = sh.splitlines()
args = []
for (num, line) in enumerate(lines):
# start of the 'kernel_args' function that prints the args
if line.startswith('kernel_args()'):
# take the next line (we assume that's 'echo arg1 arg2...')
args = lines[num+1].strip().split()
if args:
# split the line and ditch some args we don't want to take
args = [arg for arg in args if arg not in ('echo', 'vnc')]
# stick the inst.ks arg on the end of the list, and...
args.append(value)
# ...join it back together
value = ' '.join(args)
return [{'key': 'GRUB', 'value': value}]
def create_testsuite(test, path, ksurl):
"""Create an openQA 'test suite' for a given kickstart_test. test
is the test name, path is the directory the test files are in.
"""
# duh. these are all kickstart tests.
settings = [{'key': 'KICKSTART', 'value': '1'}]
# get text of .sh and .ks.in files
(sh, ksin) = _get_texts(path, test)
# call all the functions for getting different settings.
settings.extend(_get_settings_disk(sh))
settings.extend(_get_settings_postinstall(sh))
settings.extend(_get_settings_keyboard(ksin))
settings.extend(_get_settings_encrypt(ksin))
settings.extend(_get_settings_boot(sh, test, ksurl))
# do some very simple ones ourselves (to avoid this function
# growing too much, the rule is that if it needs more than two
# lines, split it out).
# root password. for now we assume there is one.
settings.append({'key': 'ROOT_PASSWORD', 'value': re.search(r'rootpw (.+)', ksin).group(1)})
# we never want to do a user login for these
settings.append({'key': 'USER_LOGIN', 'value': "false"})
# these install xfce so they boot to lightdm, force console login
if test in ('groups-and-envs-2', 'packages-and-groups-1'):
settings.append({'key': 'FORCE_CONSOLE_LOGIN', 'value': "1"})
# we have to do an ugly success check workaround for these
if test in ('reqpart', 'default-fstype'):
settings.append({'key': 'KSTEST_SERVER_FSTYPE', 'value': "1"})
return {'name': "kstest_{0}".format(test), 'settings': settings}
def merge_testsuites(templates, testsuites, machine='64bit', arch='x86_64',
distri='fedora', prio=50, flavor='kstests', version='*'):
"""Merge some test suites (as produced by create_testsuite) into
'templates', which is expected to be an openQA templates file
parsed into a dict. Returns the merged dict.
"""
for testsuite in testsuites:
templates['TestSuites'].append(testsuite)
jobt = {
'machine': {'name': machine},
'prio': prio,
'test_suite': {'name': testsuite['name']},
'product': {
'arch': arch,
'distri': distri,
'flavor': flavor,
'version': version,
},
}
templates['JobTemplates'].append(jobt)
foundprod = False
for product in templates['Products']:
if (
product['flavor'] == flavor and product['distri'] == distri and
product['version'] == version and product['arch'] == arch
):
foundprod = True
break
if not foundprod:
# add a Product for the flavor
templates['Products'].append(
{
'arch': arch,
'distri': distri,
'flavor': flavor,
'name': "",
'settings': [],
'version': version
}
)
return templates
def cmd_kickstarts(args):
"""kickstarts subcommand function: produce kickstarts from .ks.in
files.
"""
if args.mirrorlist:
kstesturl = '--mirrorlist={0}'.format(args.mirrorlist)
elif args.http:
kstesturl = '--url={0}'.format(args.http)
else:
kstesturl = None
try:
prep_kickstarts(args.indir, args.ksurl, args.httprepo, args.nfsrepo, args.outdir,
kstesturl, args.ftp)
except ValueError as err:
sys.exit(err)
def cmd_templates(args):
"""templates subcommand function: produce openQA test suites and
job templates and merge into a templates file.
"""
try:
merge_templates(args.indir, args.ksurl, args.tempfile, args.outfile)
except ValueError as err:
sys.exit(err)
def parse_args():
"""Parse arguments with argparse."""
parser = argparse.ArgumentParser(description=(
"Translation layer to convert anaconda kickstart-tests into "
"openQA tests. 'kickstarts' parses kickstart-tests .ks.in "
"files to kickstarts. 'templates' produces openQA test suites "
"from kickstart-tests and merges them into a pre-existing "
"openQA templats file."))
parser.add_argument(
'-l', '--loglevel', help="The level of log messages to show",
choices=('debug', 'info', 'warning', 'error', 'critical'),
default='info')
# This is a workaround for a somewhat infamous argparse bug
# in Python 3. See:
# https://stackoverflow.com/questions/23349349/argparse-with-required-subparser
# http://bugs.python.org/issue16308
subparsers = parser.add_subparsers(dest='subcommand')
subparsers.required = True
parser_kickstarts = subparsers.add_parser(
'kickstarts', description="Produce kickstarts from .ks.in "
"files and write them to a specified directory.")
parser_kickstarts.add_argument(
'indir', help="Input directory (containing .ks.in files)")
parser_kickstarts.add_argument(
'ksurl', help="URL to path where generated .ks files will be "
"available")
parser_kickstarts.add_argument(
'httprepo', help="URL for the HTTP repository required for additional "
"repository tests (created with make-addon-pkgs.py). MUST BE HTTP "
"not HTTPS for proxy tests to work")
parser_kickstarts.add_argument(
'nfsrepo', help="URL for the NFS repository required for additional "
"repository tests (created with make-addon-pkgs.py)")
parser_kickstarts.add_argument(
'outdir', help="Output directory (where .ks files are written")
parser_kickstarts.add_argument(
'--mirrorlist', '-m', help="Mirror list URL to use as the base repo "
"for HTTP repo tests, instead of the default (the official mirror "
"list)")
parser_kickstarts.add_argument(
'--http', help="Direct mirror URL to use as the base repo for "
"HTTP repo tests, instead of the default (the official mirror "
"list)")
parser_kickstarts.add_argument(
'--ftp', help="FTP URL to use as the base repo for FTP repo "
"tests, instead of the default (a server in Texas, USA)")
parser_kickstarts.set_defaults(func=cmd_kickstarts)
parser_templates = subparsers.add_parser(
'templates', description="Produce openQA 'test suites' and "
"'job templates' from anaconda-kickstarts tests and merge "
"them into an existing openQA templates file.")
parser_templates.add_argument(
'indir', help="Input directory (containing .ks.in and .sh files)")
parser_templates.add_argument(
'ksurl', help="URL to a directory containing .ks files (as "
"produced by 'kickstarts' subcommand)")
parser_templates.add_argument(
'tempfile', help="Path to openQA templates file (must be JSON "
"format, not Perl)")
parser_templates.add_argument('outfile', help="Path to output file")
parser_templates.set_defaults(func=cmd_templates)
return parser.parse_args()
def main():
"""Main loop."""
try:
# check for needed commands
for (cmd, pkg) in (('createrepo_c', 'createrepo_c'),
('make', 'make')):
if not shutil.which(cmd):
sys.exit("Required command {0} not found! Please install it (try `dnf install "
"{1}`)".format(cmd, pkg))
args = parse_args()
loglevel = getattr(
logging, args.loglevel.upper(), logging.INFO)
logging.basicConfig(level=loglevel)
args.func(args)
except KeyboardInterrupt:
sys.stderr.write("Interrupted, exiting...\n")
sys.exit(1)
if __name__ == '__main__':
main()