#!/bin/python3 import argparse import json import logging import os import re SKIPS = [ # we haven't really figured out how to do NFS in openQA yet 'nfs-repo-and-addon', # the prep for this one probably needs to be in fedora_openqa_schedule 'liveimg', # i'll figure this one out later 'basic-ostree', # FIXMEs: # keyboard - changes keyboard layout, fucks with openQA's script runner # hostname - changes hostname, breaks 'text_console_login' needle # packages-and-groups-1 - boots to GUI login # encrypt-device - need to set ENCRYPT_PASSWORD, also 'kickstart insufficient'? ] SUBSTS = [ ('@KSTEST_URL@', '--mirrorlist=https://mirrors.fedoraproject.org/mirrorlist?repo=fedora-$releasever&arch=$basearch'), ('@KSTEST_FTP_URL@', 'ftp://mirror.utexas.edu/pub/fedora/linux/development/rawhide/Everything/$basearch/os/'), # we need to reboot not shutdown for openQA ('shutdown', 'reboot') ] def _find_tests(dir): """Find the tests to run from a directory name (should be a checkout of kickstart-tests). """ # find .ks.in files ksins = [ks for ks in os.listdir(dir) if ks.endswith('ks.in')] # filter tests we can't do yet ksins = [ks for ks in ksins if not any(ks.startswith(skip) for skip in SKIPS)] # strip .ks.in return [ksin.replace('.ks.in', '') for ksin in ksins] def prep_kickstarts(indir, outdir): """Produce kickstarts in 'outdir' from .ks.in files in 'indir'. """ if not os.path.isdir(outdir): raise ValueError("Output directory {0} does not exist!".format(outdir)) tests = _find_tests(indir) if not tests: raise ValueError("No tests found!") for test in tests: # read in the .ks.in file with open('{0}/{1}.ks.in'.format(indir, test), 'r') as ksinfh: kstext = ksinfh.read() for (orig, repl) in SUBSTS: kstext = kstext.replace(orig, repl) # write out the processed .ks ksout = "{0}.ks".format(test) with open('{0}/{1}'.format(outdir, ksout), 'w') as ksoutfh: ksoutfh.write(kstext) def merge_templates(indir, baseurl, tempfile, outfile): """Produce openQA test suites and job templates for all tests in indir, merge them with the existing openQA templates file 'tempfile', and write the combined templates file as 'outfile'. 'baseurl' is the URL to the path where the kickstart files for the tests can be found. """ tests = _find_tests(indir) if not tests: raise ValueError("No tests found!") testsuites = [create_testsuite(test, indir, baseurl) for test in tests] with open(tempfile, 'r') as tempfh: templates = json.loads(tempfh.read()) templates = merge_testsuites(templates, testsuites) with open(outfile, 'w') as outfh: outfh.write(json.dumps(templates, sort_keys=True, indent=4, separators=(',', ': '))) def _get_settings_disk(sh): """Given text of a kickstart_tests test (.sh file) as sh, return a list of appropriate openQA settings dicts for hard disks. """ # most prepare_disks just create empty disks of a given size in # a standard way. in practice it's good enough to just create the # right number of disks at openQA's standard size (10GB), that'll # make things work. This is a rough assumption that may break in # future. A missing openQA feature here is you can't tell it to # create multiple clean disk images with different sizes. settings = [] simpre = re.compile(r'qemu-img create -q -f qcow2 \$\{(tmp|disk)dir\}/disk-.\.img \d+G') numdisks = len(simpre.findall(sh)) # the one tricky case that exists so far is the driverdisk case. # it gets created elsewhere. here we just point to it. if 'mkdud.py' in sh: numdisks += 1 settings.append({'key': 'HDD_{0}'.format(str(numdisks)), 'value': "driverdisk.img"}) if numdisks > 0: settings.append({'key': 'NUMDISKS', 'value': str(numdisks)}) return settings def _get_settings_postinstall(sh): """Given text of a kickstart_tests test (.sh file) as sh, return a list of appropriate openQA settings dict for post-install test settings (currently a single-item list, but we're future-proofing here). """ # just one test checks for RESULT in /home instead of /root if '/home/RESULT' in sh: return [{'key': 'POSTINSTALL', 'value': 'kstest_home'}] else: return [{'key': 'POSTINSTALL', 'value': 'kstest_root'}] def _get_settings_keyboard(ksin): """Given text of a kickstart_tests .ks.in file as ksin, return a list of appropriate openQA settings dict for keyboard test settings (currently a single-item list, but we're future-proofing here). """ # if the kickstart sets a keyboard layout, we set VNCKB to the # same layout. This tells openQA to run qemu with '-k (layout)', # which makes it use that layout for converting keysyms received # via VNC to keycodes which are passed on to the guest OS, which # converts them back into keysyms. Basically if we want to set a # non-US layout in the guest and have 'type_string qwerty' still # type 'qwerty' and not 'qwertz' or 'azert' or something, this is # how we do that. match = re.search(r'--vckeymap (\S+)', ksin) if match: return [{'key': 'VNCKB', 'value': match.group(1)}] else: return [] def create_testsuite(test, path, baseurl): """Create an openQA 'test suite' for a given kickstart_test. test is the test name, path is the directory the test files are in. """ # duh. these are all kickstart tests. settings = [{'key': 'KICKSTART', 'value': '1'}] # get text of .sh and .ks.in files with open('{0}/{1}.sh'.format(path, test), 'r') as shfh: sh = shfh.read() with open('{0}/{1}.ks.in'.format(path, test), 'r') as ksinfh: ksin = ksinfh.read() # call all the functions for getting different settings. settings.extend(_get_settings_disk(sh)) settings.extend(_get_settings_postinstall(sh)) settings.extend(_get_settings_keyboard(ksin)) # do some very simple ones ourselves (to avoid this function # growing too much, the rule is that if it needs more than one # line, split it out). # root password. for now we assume there is one. settings.append({'key': 'ROOT_PASSWORD', 'value': re.search(r'rootpw (.+)', ksin).group(1)}) # kickstart URL settings.append({'key': 'GRUB', 'value': "inst.ks={0}/{1}.ks".format(baseurl.strip('/'), test)}) # we never want to do a user login for these settings.append({'key': 'USER_LOGIN', 'value': "false"}) return {'name': "kstest_{0}".format(test), 'settings': settings} def merge_testsuites(templates, testsuites, machine='64bit', arch='x86_64', distri='fedora', prio=50, flavor='kstests', version='*'): """Merge some test suites (as produced by create_testsuite) into 'templates', which is expected to be an openQA templates file parsed into a dict. Returns the merged dict. """ for testsuite in testsuites: templates['TestSuites'].append(testsuite) jobt = { 'machine': {'name': machine}, 'prio': prio, 'test_suite': {'name': testsuite['name']}, 'product': { 'arch': arch, 'distri': distri, 'flavor': flavor, 'version': version, }, } templates['JobTemplates'].append(jobt) foundprod = False for product in templates['Products']: if ( product['flavor'] == flavor and product['distri'] == distri and product['version'] == version and product['arch'] == arch ): foundprod = True break if not foundprod: # add a Product for the flavor templates['Products'].append( { 'arch': arch, 'distri': distri, 'flavor': flavor, 'name': "", 'settings': [], 'version': version } ) return templates def cmd_kickstarts(args): """kickstarts subcommand function: produce kickstarts from .ks.in files. """ try: prep_kickstarts(args.indir, args.outdir) except ValueError as err: sys.exit(err) def cmd_templates(args): """templates subcommand function: produce openQA test suites and job templates and merge into a templates file. """ try: merge_templates(args.indir, args.baseurl, args.tempfile, args.outfile) except ValueError as err: sys.exit(err) def parse_args(): """Parse arguments with argparse.""" parser = argparse.ArgumentParser(description=( "Translation layer to convert anaconda kickstart-tests into " "openQA tests. 'kickstarts' parses kickstart-tests .ks.in " "files to kickstarts. 'templates' produces openQA test suites " "from kickstart-tests and merges them into a pre-existing " "openQA templats file.")) parser.add_argument( '-l', '--loglevel', help="The level of log messages to show", choices=('debug', 'info', 'warning', 'error', 'critical'), default='info') # This is a workaround for a somewhat infamous argparse bug # in Python 3. See: # https://stackoverflow.com/questions/23349349/argparse-with-required-subparser # http://bugs.python.org/issue16308 subparsers = parser.add_subparsers(dest='subcommand') subparsers.required = True parser_kickstarts = subparsers.add_parser( 'kickstarts', description="Produce kickstarts from .ks.in " "files and write them to a specified directory.") parser_kickstarts.add_argument( 'indir', help="Input directory (containing .ks.in files)") parser_kickstarts.add_argument( 'outdir', help="Output directory (where .ks files are written") parser_kickstarts.set_defaults(func=cmd_kickstarts) parser_templates = subparsers.add_parser( 'templates', description="Produce openQA 'test suites' and " "'job templates' from anaconda-kickstarts tests and merge " "them into an existing openQA templates file.") parser_templates.add_argument( 'indir', help="Input directory (containing .ks.in and .sh files)") parser_templates.add_argument( 'baseurl', help="URL to a directory containing .ks files (as " "produced by 'kickstarts' subcommand)") parser_templates.add_argument( 'tempfile', help="Path to openQA templates file (must be JSON " "format, not Perl)") parser_templates.add_argument('outfile', help="Path to output file") parser_templates.set_defaults(func=cmd_templates) return parser.parse_args() def main(): """Main loop.""" try: args = parse_args() loglevel = getattr( logging, args.loglevel.upper(), logging.INFO) logging.basicConfig(level=loglevel) args.func(args) except KeyboardInterrupt: sys.stderr.write("Interrupted, exiting...\n") sys.exit(1) if __name__ == '__main__': main()