#!/bin/python3 import argparse import json import logging import os import re import shutil import subprocess import sys import warnings with warnings.catch_warnings(): warnings.simplefilter("ignore") import pykickstart.version import pykickstart.parser SKIPS = [ # we haven't really figured out how to do NFS in openQA yet 'nfs-repo-and-addon', # the prep for this one probably needs to be in fedora_openqa_schedule 'liveimg', # i'll figure this one out later 'basic-ostree', # doesn't install enough bits for the system to be bootable 'container', # FIXMEs: # hostname - changes hostname, breaks 'text_console_login' needle # raid-1 - only works on Rawhide (not 24) # default-fstype, reqpart - trip up on xfs vs. ext4 with Server image # driverdisk-disk - I didn't hook up image generation at all yet # proxy-cmdline, proxy-auth - require generation of some test repos ] def _find_tests(dir): """Find the tests to run from a directory name (should be a checkout of kickstart-tests). """ # find .ks.in files ksins = [ks for ks in os.listdir(dir) if ks.endswith('ks.in')] # filter tests we can't do yet ksins = [ks for ks in ksins if not any(ks.startswith(skip) for skip in SKIPS)] # strip .ks.in return [ksin.replace('.ks.in', '') for ksin in ksins] def _get_texts(path, test): """Return the text of the .ks.in and .sh files.""" # get text of .sh and .ks.in files with open('{0}/{1}.sh'.format(path, test), 'r') as shfh: sh = shfh.read() with open('{0}/{1}.ks.in'.format(path, test), 'r') as ksinfh: ksin = ksinfh.read() return (sh, ksin) def _kickstart_substitutions(ksin, ksurl, httprepo, nfsrepo, kstesturl=None, kstestftpurl=None): """Do the various substitutions necessary on .ks.in files. ksin is the text of the file. 'ksurl' is the URL to the path where the produced kickstarts will be available via HTTP; this is needed for the 'ks-include' test which tests including another kickstart file. 'kstesturl' is the text to replace @KSTEST_URL@ with - it should be the '--mirrorlist=(url)' or '--url=(url)' component of a kickstart 'repo' line. kstestftpurl is, similarly, the replacement text for @KSTEST_FTP_URL@. If not set, defaults are used. """ if not kstesturl: kstesturl = '--mirrorlist=https://mirrors.fedoraproject.org/mirrorlist?repo=fedora-$releasever&arch=$basearch' if not kstestftpurl: kstestftpurl = 'ftp://mirror.utexas.edu/pub/fedora/linux/development/rawhide/Everything/$basearch/os/' ksin = ksin.replace('@KSTEST_URL@', kstesturl) ksin = ksin.replace('@KSTEST_FTP_URL@', kstestftpurl) ksin = ksin.replace('@KSTEST_HTTP_ADDON_REPO@', httprepo) # this has two different placeholders, for some reason... ksin = ksin.replace('HTTP-ADDON-REPO', httprepo) ksin = ksin.replace('@KSTEST_NFS_ADDON_REPO@', nfsrepo) # we always want to reboot, not shutdown ksin = ksin.replace('shutdown', 'reboot') # handle includes. note this means we're testing a slightly # different path to the kickstart-tests runner: it tests # inclusion of a local file while we test inclusion of an HTTP # served file. ksin = ksin.replace('KS-TEST-INCLUDE', "{0}/{1}".format(ksurl, 'ks-include-post.ks')) return ksin def prep_kickstarts(indir, ksurl, httprepo, nfsrepo, outdir, kstesturl=None, kstestftpurl=None): """Produce kickstarts in 'outdir' from .ks.in files in 'indir'. 'ksurl' is the URL to the path where the produced kickstarts will be available via HTTP; this is needed for the 'ks-include' test which tests including another kickstart file. 'kstesturl' is the text to replace @KSTEST_URL@ with - it should be the '--mirrorlist=(url)' or '--url=(url)' component of a kickstart 'repo' line. kstestftpurl is, similarly, the replacement text for @KSTEST_FTP_URL@. If not set, defaults are used. """ if not kstesturl: kstesturl = '--mirrorlist=https://mirrors.fedoraproject.org/mirrorlist?repo=fedora-$releasever&arch=$basearch' if not kstestftpurl: kstestftpurl = 'ftp://mirror.utexas.edu/pub/fedora/linux/development/rawhide/Everything/$basearch/os/' if not os.path.isdir(outdir): raise ValueError("Output directory {0} does not exist!".format(outdir)) # probably safest indir = os.path.abspath(indir) # build proxy-common.ks # FIXME: we should do something more like run_kickstart_tests here # but exclude install.img as it doesn't work subprocess.check_call(('make', '-s', '-f', 'Makefile.prereqs', 'proxy-common.ks'), cwd="{0}/{1}".format(indir, 'scripts')) tests = _find_tests(indir) if not tests: raise ValueError("No tests found!") for test in tests: (sh, ksin) = _get_texts(indir, test) # HACK: this test can't handle https repo if test == 'proxy-auth': _kstesturl = kstesturl.replace('https://', 'http://') else: _kstesturl = kstesturl # do standard substitutions ksin = _kickstart_substitutions( ksin, ksurl, httprepo, nfsrepo, kstesturl=_kstesturl, kstestftpurl=kstestftpurl) # replace '(non-alpha)sd(alpha)(non-alpha)' with '(non-alpha) # vd(alpha)(non-alpha)' (sda -> vda etc). This is because # openQA's default way of attaching disks results in them # being vdX while the kickstart-tests runner's way makes them # sdX. we could maybe harmonize those somehow instead of doing # this. ksin = re.sub(r'([^a-zA-Z])s(d[a-z][^a-zA-Z])', r"\1v\2", ksin) # flatten the kickstart (some use includes). this code copies # ksflatten quite closely. if 'ksflatten' in sh: # includes may be relative; we have to do this from indir cwd = os.getcwd() os.chdir(indir) # good grief, pykickstart is chatty with warnings.catch_warnings(): warnings.simplefilter("ignore") handler = pykickstart.version.makeVersion() parser = pykickstart.parser.KickstartParser(handler) parser.readKickstartFromString(ksin) ksin = str(parser.handler) os.chdir(cwd) # write out the processed .ks ksout = "{0}.ks".format(test) with open('{0}/{1}'.format(outdir, ksout), 'w') as ksoutfh: ksoutfh.write(ksin) # copy the 'include' test kickstart straight over. shutil.copy('{0}/{1}'.format(indir, 'ks-include-post.ks'), '{0}/{1}'.format(outdir, 'ks-include-post.ks')) def merge_templates(indir, ksurl, tempfile, outfile): """Produce openQA test suites and job templates for all tests in indir, merge them with the existing openQA templates file 'tempfile', and write the combined templates file as 'outfile'. 'ksurl' is the URL to the path where the kickstart files for the tests can be found. """ tests = _find_tests(indir) if not tests: raise ValueError("No tests found!") testsuites = [create_testsuite(test, indir, ksurl) for test in tests] with open(tempfile, 'r') as tempfh: templates = json.loads(tempfh.read()) templates = merge_testsuites(templates, testsuites) with open(outfile, 'w') as outfh: outfh.write(json.dumps(templates, sort_keys=True, indent=4, separators=(',', ': '))) def _get_settings_disk(sh): """Given text of a kickstart_tests test (.sh file) as sh, return a list of appropriate openQA settings dicts for hard disks. """ # most prepare_disks just create empty disks of a given size in # a standard way. in practice it's good enough to just create the # right number of disks at openQA's standard size (10GB), that'll # make things work. This is a rough assumption that may break in # future. A missing openQA feature here is you can't tell it to # create multiple clean disk images with different sizes. settings = [] simpre = re.compile(r'qemu-img create -q -f qcow2 \$\{(tmp|disk)dir\}/disk-.\.img \d+G') numdisks = len(simpre.findall(sh)) # the one tricky case that exists so far is the driverdisk case. # it gets created elsewhere. here we just point to it. if 'mkdud.py' in sh: numdisks += 1 settings.append({'key': 'HDD_{0}'.format(str(numdisks)), 'value': "driverdisk.img"}) if numdisks > 0: settings.append({'key': 'NUMDISKS', 'value': str(numdisks)}) return settings def _get_settings_postinstall(sh): """Given text of a kickstart_tests test (.sh file) as sh, return a list of appropriate openQA settings dict for post-install test settings (currently a single-item list, but we're future-proofing here). """ # just one test checks for RESULT in /home instead of /root if '/home/RESULT' in sh: return [{'key': 'POSTINSTALL', 'value': 'kstest_home'}] else: return [{'key': 'POSTINSTALL', 'value': 'kstest_root'}] def _get_settings_keyboard(ksin): """Given text of a kickstart_tests .ks.in file as ksin, return a list of appropriate openQA settings dict for keyboard test settings (currently a single-item list, but we're future-proofing here). """ # if the kickstart sets a keyboard layout, we set the LOADKEYS # var to tell openQA to do 'loadkeys us' after login and LOGIN_ # KEYMAP to tell it to use the specified keymap for logging in # and running loadkeys. match = re.search(r'--vckeymap (\S+)', ksin) if match: return [ {'key': 'LOADKEYS', 'value': 1}, {'key': 'LOGIN_KEYMAP', 'value': match.group(1)}, ] else: return [] def _get_settings_encrypt(ksin): """Given text of a kickstart_tests .ks.in file as ksin, return a list of appropriate openQA settings dict for disk encryption settings (currently a single-item list, but we're future-proofing here). """ # the (?:) are non-capturing groups. We only care about the second # group. The fun at the end is to capture a passphrase if it's # inside quotes, and to stop matching when we reach either a white # space character or the end of the line. match = re.search(r"(?:logvol|part).*--passphrase='?(\S+?)'?(?:\s|$)", ksin) if match: return [{'key': 'ENCRYPT_PASSWORD', 'value': match.group(1)}] else: return [] def _get_settings_boot(sh, test, ksurl): """Given text of a kickstart_tests test (.sh file) as sh, the test name as test and the kickstar base URL as ksurl, return a list of appropriate openQA settings related to the bootloader. Usually we just set inst.ks, but kickstart-tests has capacity for setting other params, we must handle that; used by proxy-cmdline for e.g. """ value = "inst.ks={0}/{1}.ks".format(ksurl.strip('/'), test) # ok, this is an awful excuse for parsing. sorry. It does handle # comments! it'll break if the function is not just a single line # of args, though. lines = sh.splitlines() args = [] for (num, line) in enumerate(lines): # start of the 'kernel_args' function that prints the args if line.startswith('kernel_args()'): # take the next line (we assume that's 'echo arg1 arg2...') args = lines[num+1].strip().split() if args: # split the line and ditch some args we don't want to take args = [arg for arg in args if arg not in ('echo', 'vnc')] # stick the inst.ks arg on the end of the list, and... args.append(value) # ...join it back together value = ' '.join(args) return [{'key': 'GRUB', 'value': value}] def create_testsuite(test, path, ksurl): """Create an openQA 'test suite' for a given kickstart_test. test is the test name, path is the directory the test files are in. """ # duh. these are all kickstart tests. settings = [{'key': 'KICKSTART', 'value': '1'}] # get text of .sh and .ks.in files (sh, ksin) = _get_texts(path, test) # call all the functions for getting different settings. settings.extend(_get_settings_disk(sh)) settings.extend(_get_settings_postinstall(sh)) settings.extend(_get_settings_keyboard(ksin)) settings.extend(_get_settings_encrypt(ksin)) settings.extend(_get_settings_boot(sh, test, ksurl)) # do some very simple ones ourselves (to avoid this function # growing too much, the rule is that if it needs more than two # lines, split it out). # root password. for now we assume there is one. settings.append({'key': 'ROOT_PASSWORD', 'value': re.search(r'rootpw (.+)', ksin).group(1)}) # we never want to do a user login for these settings.append({'key': 'USER_LOGIN', 'value': "false"}) # these install xfce so they boot to lightdm, force console login if test in ('groups-and-envs-2', 'packages-and-groups-1'): settings.append({'key': 'FORCE_CONSOLE_LOGIN', 'value': "1"}) # we have to do an ugly success check workaround for these if test in ('reqpart', 'default-fstype'): settings.append({'key': 'KSTEST_SERVER_FSTYPE', 'value': "1"}) return {'name': "kstest_{0}".format(test), 'settings': settings} def merge_testsuites(templates, testsuites, machine='64bit', arch='x86_64', distri='fedora', prio=50, flavor='kstests', version='*'): """Merge some test suites (as produced by create_testsuite) into 'templates', which is expected to be an openQA templates file parsed into a dict. Returns the merged dict. """ for testsuite in testsuites: templates['TestSuites'].append(testsuite) jobt = { 'machine': {'name': machine}, 'prio': prio, 'test_suite': {'name': testsuite['name']}, 'product': { 'arch': arch, 'distri': distri, 'flavor': flavor, 'version': version, }, } templates['JobTemplates'].append(jobt) foundprod = False for product in templates['Products']: if ( product['flavor'] == flavor and product['distri'] == distri and product['version'] == version and product['arch'] == arch ): foundprod = True break if not foundprod: # add a Product for the flavor templates['Products'].append( { 'arch': arch, 'distri': distri, 'flavor': flavor, 'name': "", 'settings': [], 'version': version } ) return templates def cmd_kickstarts(args): """kickstarts subcommand function: produce kickstarts from .ks.in files. """ if args.mirrorlist: kstesturl = '--mirrorlist={0}'.format(args.mirrorlist) elif args.http: kstesturl = '--url={0}'.format(args.http) else: kstesturl = None try: prep_kickstarts(args.indir, args.ksurl, args.httprepo, args.nfsrepo, args.outdir, kstesturl, args.ftp) except ValueError as err: sys.exit(err) def cmd_templates(args): """templates subcommand function: produce openQA test suites and job templates and merge into a templates file. """ try: merge_templates(args.indir, args.ksurl, args.tempfile, args.outfile) except ValueError as err: sys.exit(err) def parse_args(): """Parse arguments with argparse.""" parser = argparse.ArgumentParser(description=( "Translation layer to convert anaconda kickstart-tests into " "openQA tests. 'kickstarts' parses kickstart-tests .ks.in " "files to kickstarts. 'templates' produces openQA test suites " "from kickstart-tests and merges them into a pre-existing " "openQA templats file.")) parser.add_argument( '-l', '--loglevel', help="The level of log messages to show", choices=('debug', 'info', 'warning', 'error', 'critical'), default='info') # This is a workaround for a somewhat infamous argparse bug # in Python 3. See: # https://stackoverflow.com/questions/23349349/argparse-with-required-subparser # http://bugs.python.org/issue16308 subparsers = parser.add_subparsers(dest='subcommand') subparsers.required = True parser_kickstarts = subparsers.add_parser( 'kickstarts', description="Produce kickstarts from .ks.in " "files and write them to a specified directory.") parser_kickstarts.add_argument( 'indir', help="Input directory (containing .ks.in files)") parser_kickstarts.add_argument( 'ksurl', help="URL to path where generated .ks files will be " "available") parser_kickstarts.add_argument( 'httprepo', help="URL for the HTTP repository required for additional " "repository tests (created with make-addon-pkgs.py). MUST BE HTTP " "not HTTPS for proxy tests to work") parser_kickstarts.add_argument( 'nfsrepo', help="URL for the NFS repository required for additional " "repository tests (created with make-addon-pkgs.py)") parser_kickstarts.add_argument( 'outdir', help="Output directory (where .ks files are written") parser_kickstarts.add_argument( '--mirrorlist', '-m', help="Mirror list URL to use as the base repo " "for HTTP repo tests, instead of the default (the official mirror " "list)") parser_kickstarts.add_argument( '--http', help="Direct mirror URL to use as the base repo for " "HTTP repo tests, instead of the default (the official mirror " "list)") parser_kickstarts.add_argument( '--ftp', help="FTP URL to use as the base repo for FTP repo " "tests, instead of the default (a server in Texas, USA)") parser_kickstarts.set_defaults(func=cmd_kickstarts) parser_templates = subparsers.add_parser( 'templates', description="Produce openQA 'test suites' and " "'job templates' from anaconda-kickstarts tests and merge " "them into an existing openQA templates file.") parser_templates.add_argument( 'indir', help="Input directory (containing .ks.in and .sh files)") parser_templates.add_argument( 'ksurl', help="URL to a directory containing .ks files (as " "produced by 'kickstarts' subcommand)") parser_templates.add_argument( 'tempfile', help="Path to openQA templates file (must be JSON " "format, not Perl)") parser_templates.add_argument('outfile', help="Path to output file") parser_templates.set_defaults(func=cmd_templates) return parser.parse_args() def main(): """Main loop.""" try: # check for needed commands for (cmd, pkg) in (('createrepo_c', 'createrepo_c'), ('make', 'make')): if not shutil.which(cmd): sys.exit("Required command {0} not found! Please install it (try `dnf install " "{1}`)".format(cmd, pkg)) args = parse_args() loglevel = getattr( logging, args.loglevel.upper(), logging.INFO) logging.basicConfig(level=loglevel) args.func(args) except KeyboardInterrupt: sys.stderr.write("Interrupted, exiting...\n") sys.exit(1) if __name__ == '__main__': main()