#!/bin/python3 import argparse import json import logging import os import re SKIPS = [ # we haven't really figured out how to do NFS in openQA yet 'nfs-repo-and-addon', # the prep for this one probably needs to be in fedora_openqa_schedule 'liveimg', # i'll figure this one out later 'basic-ostree', # FIXMEs: # keyboard - changes keyboard layout, fucks with openQA's script runner # hostname - changes hostname, breaks 'text_console_login' needle # packages-and-groups-1 - boots to GUI login # encrypt-device - need to set ENCRYPT_PASSWORD, also 'kickstart insufficient'? ] SUBSTS = [ ('@KSTEST_URL@', '--mirrorlist=https://mirrors.fedoraproject.org/mirrorlist?repo=fedora-$releasever&arch=$basearch'), ('@KSTEST_FTP_URL@', 'ftp://mirror.utexas.edu/pub/fedora/linux/development/rawhide/Everything/$basearch/os/'), # we need to reboot not shutdown for openQA ('shutdown', 'reboot') ] def _find_tests(dir): """Find the tests to run from a directory name (should be a checkout of kickstart-tests). """ # find .ks.in files ksins = [ks for ks in os.listdir(dir) if ks.endswith('ks.in')] # filter tests we can't do yet ksins = [ks for ks in ksins if not any(ks.startswith(skip) for skip in SKIPS)] # strip .ks.in return [ksin.replace('.ks.in', '') for ksin in ksins] def prep_kickstarts(indir, outdir): """Produce kickstarts in 'outdir' from .ks.in files in 'indir'. """ if not os.path.isdir(outdir): raise ValueError("Output directory {0} does not exist!".format(outdir)) tests = _find_tests(indir) if not tests: raise ValueError("No tests found!") for test in tests: with open('{0}/{1}.ks.in'.format(indir, test), 'r') as ksinfh: kstext = ksinfh.read() ksout = "{0}.ks".format(test) for (orig, repl) in SUBSTS: kstext = kstext.replace(orig, repl) with open('{0}/{1}'.format(outdir, ksout), 'w') as ksoutfh: ksoutfh.write(kstext) def merge_templates(indir, baseurl, tempfile, outfile): """Produce openQA test suites and job templates for all tests in indir, merge them with the existing openQA templates file 'tempfile', and write the combined templates file as 'outfile'. 'baseurl' is the URL to the path where the kickstart files for the tests can be found. """ tests = _find_tests(indir) if not tests: raise ValueError("No tests found!") testsuites = [create_testsuite(test, indir, baseurl) for test in tests] with open(tempfile, 'r') as tempfh: templates = json.loads(tempfh.read()) templates = merge_testsuites(templates, testsuites) with open(outfile, 'w') as outfh: outfh.write(json.dumps(templates, sort_keys=True, indent=4, separators=(',', ': '))) def _get_disk_settings(test): """Given text of a kickstart_tests test (.sh file) as test, return a list of appropriate openQA settings dicts for hard disks. """ # most prepare_disks just create empty disks of a given size in # a standard way. in practice it's good enough to just create the # right number of disks at openQA's standard size (10GB), that'll # make things work. This is a rough assumption that may break in # future. A missing openQA feature here is you can't tell it to # create multiple clean disk images with different sizes. settings = [] simpre = re.compile(r'qemu-img create -q -f qcow2 \$\{(tmp|disk)dir\}/disk-.\.img \d+G') numdisks = len(simpre.findall(test)) # the one tricky case that exists so far is the driverdisk case. # it gets created elsewhere. here we just point to it. if 'mkdud.py' in test: numdisks += 1 settings.append({'key': 'HDD_{0}'.format(str(numdisks)), 'value': "driverdisk.img"}) if numdisks > 0: settings.append({'key': 'NUMDISKS', 'value': str(numdisks)}) return settings def create_testsuite(test, path, baseurl): """Create an openQA 'test suite' for a given kickstart_test. test is the test name, path is the directory the test files are in. """ with open('{0}/{1}.sh'.format(path, test), 'r') as testfh: sh = testfh.read() with open('{0}/{1}.ks.in'.format(path, test), 'r') as ksfh: ks = ksfh.read() settings = _get_disk_settings(sh) settings.append({'key': 'KICKSTART', 'value': '1'}) # just one test checks for RESULT in /home instead of /root if '/home/RESULT' in sh: settings.append({'key': 'POSTINSTALL', 'value': 'kstest_home'}) else: settings.append({'key': 'POSTINSTALL', 'value': 'kstest_root'}) # for some goddamn reason there are two different root passwords. rootpatt = re.compile(r'rootpw (.+)') settings.append({'key': 'ROOT_PASSWORD', 'value': rootpatt.search(ks).group(1)}) settings.append({'key': 'GRUB', 'value': "inst.ks={0}/{1}.ks".format(baseurl.strip('/'), test)}) # we never want to do a user login for these. settings.append({'key': 'USER_LOGIN', 'value': "false"}) return {'name': "kstest_{0}".format(test), 'settings': settings} def merge_testsuites(templates, testsuites, machine='64bit', arch='x86_64', distri='fedora', prio=50, flavor='kstests', version='*'): """Merge some test suites (as produced by create_testsuite) into 'templates', which is expected to be an openQA templates file parsed into a dict. Returns the merged dict. """ for testsuite in testsuites: templates['TestSuites'].append(testsuite) jobt = { 'machine': {'name': machine}, 'prio': prio, 'test_suite': {'name': testsuite['name']}, 'product': { 'arch': arch, 'distri': distri, 'flavor': flavor, 'version': version, }, } templates['JobTemplates'].append(jobt) foundprod = False for product in templates['Products']: if ( product['flavor'] == flavor and product['distri'] == distri and product['version'] == version and product['arch'] == arch ): foundprod = True break if not foundprod: # add a Product for the flavor templates['Products'].append( { 'arch': arch, 'distri': distri, 'flavor': flavor, 'name': "", 'settings': [], 'version': version } ) return templates def cmd_kickstarts(args): """kickstarts subcommand function: produce kickstarts from .ks.in files. """ try: prep_kickstarts(args.indir, args.outdir) except ValueError as err: sys.exit(err) def cmd_templates(args): """templates subcommand function: produce openQA test suites and job templates and merge into a templates file. """ try: merge_templates(args.indir, args.baseurl, args.tempfile, args.outfile) except ValueError as err: sys.exit(err) def parse_args(): """Parse arguments with argparse.""" parser = argparse.ArgumentParser(description=( "Translation layer to convert anaconda kickstart-tests into " "openQA tests. 'kickstarts' parses kickstart-tests .ks.in " "files to kickstarts. 'templates' produces openQA test suites " "from kickstart-tests and merges them into a pre-existing " "openQA templats file.")) parser.add_argument( '-l', '--loglevel', help="The level of log messages to show", choices=('debug', 'info', 'warning', 'error', 'critical'), default='info') # This is a workaround for a somewhat infamous argparse bug # in Python 3. See: # https://stackoverflow.com/questions/23349349/argparse-with-required-subparser # http://bugs.python.org/issue16308 subparsers = parser.add_subparsers(dest='subcommand') subparsers.required = True parser_kickstarts = subparsers.add_parser( 'kickstarts', description="Produce kickstarts from .ks.in " "files and write them to a specified directory.") parser_kickstarts.add_argument( 'indir', help="Input directory (containing .ks.in files)") parser_kickstarts.add_argument( 'outdir', help="Output directory (where .ks files are written") parser_kickstarts.set_defaults(func=cmd_kickstarts) parser_templates = subparsers.add_parser( 'templates', description="Produce openQA 'test suites' and " "'job templates' from anaconda-kickstarts tests and merge " "them into an existing openQA templates file.") parser_templates.add_argument( 'indir', help="Input directory (containing .ks.in and .sh files)") parser_templates.add_argument( 'baseurl', help="URL to a directory containing .ks files (as " "produced by 'kickstarts' subcommand)") parser_templates.add_argument( 'tempfile', help="Path to openQA templates file (must be JSON " "format, not Perl)") parser_templates.add_argument('outfile', help="Path to output file") parser_templates.set_defaults(func=cmd_templates) return parser.parse_args() def main(): """Main loop.""" try: args = parse_args() loglevel = getattr( logging, args.loglevel.upper(), logging.INFO) logging.basicConfig(level=loglevel) args.func(args) except KeyboardInterrupt: sys.stderr.write("Interrupted, exiting...\n") sys.exit(1) if __name__ == '__main__': main()