lorax-composer: Add support for dnf variables to repo sources

This loads the system dnf vars from /etc/dnf/vars at startup if
system repos are enabled, and it substitutes the values in the sources
when loaded, and when a new source is added.

Also includes tests.
This commit is contained in:
Brian C. Lane 2019-08-13 18:24:21 -07:00
parent b8c1e706bb
commit 6f686ff9d6
8 changed files with 272 additions and 124 deletions

View File

@ -104,6 +104,11 @@ def get_base_object(conf):
if conf.has_option("dnf", "sslverify") and not conf.getboolean("dnf", "sslverify"): if conf.has_option("dnf", "sslverify") and not conf.getboolean("dnf", "sslverify"):
dbc.sslverify = False dbc.sslverify = False
# If the system repos are enabled read the dnf vars from /etc/dnf/vars/
if not conf.has_option("repos", "use_system_repos") or conf.getboolean("repos", "use_system_repos"):
dbc.substitutions.update_from_etc("/")
log.info("dnf vars: %s", dbc.substitutions)
_releasever = conf.get_default("composer", "releasever", None) _releasever = conf.get_default("composer", "releasever", None)
if not _releasever: if not _releasever:
# Use the releasever of the host system # Use the releasever of the host system

View File

@ -24,6 +24,7 @@ import os
import time import time
from pylorax.api.bisect import insort_left from pylorax.api.bisect import insort_left
from pylorax.sysutils import joinpaths
TIME_FORMAT = "%Y-%m-%dT%H:%M:%S" TIME_FORMAT = "%Y-%m-%dT%H:%M:%S"
@ -477,6 +478,58 @@ def repo_to_source(repo, system_source, api=1):
return source return source
def source_to_repodict(source):
"""Return a tuple suitable for use with dnf.add_new_repo
:param source: A Weldr source dict
:type source: dict
:returns: A tuple of dnf.Repo attributes
:rtype: (str, list, dict)
Return a tuple with (id, baseurl|(), kwargs) that can be used
with dnf.repos.add_new_repo
"""
kwargs = {}
if "id" in source:
# This is an API v1 source definition
repoid = source["id"]
if "name" in source:
kwargs["name"] = source["name"]
else:
repoid = source["name"]
# This will allow errors to be raised so we can catch them
# without this they are logged, but the repo is silently disabled
kwargs["skip_if_unavailable"] = False
if source["type"] == "yum-baseurl":
baseurl = [source["url"]]
elif source["type"] == "yum-metalink":
kwargs["metalink"] = source["url"]
baseurl = ()
elif source["type"] == "yum-mirrorlist":
kwargs["mirrorlist"] = source["url"]
baseurl = ()
if "proxy" in source:
kwargs["proxy"] = source["proxy"]
if source["check_ssl"]:
kwargs["sslverify"] = True
else:
kwargs["sslverify"] = False
if source["check_gpg"]:
kwargs["gpgcheck"] = True
else:
kwargs["gpgcheck"] = False
if "gpgkey_urls" in source:
kwargs["gpgkey"] = tuple(source["gpgkey_urls"])
return (repoid, baseurl, kwargs)
def source_to_repo(source, dnf_conf): def source_to_repo(source, dnf_conf):
"""Return a dnf Repo object created from a source dict """Return a dnf Repo object created from a source dict
@ -506,39 +559,14 @@ def source_to_repo(source, dnf_conf):
If the ``id`` field is included it is used for the repo id, otherwise ``name`` is used. If the ``id`` field is included it is used for the repo id, otherwise ``name`` is used.
v0 of the API only used ``name``, v1 added the distinction between ``id`` and ``name``. v0 of the API only used ``name``, v1 added the distinction between ``id`` and ``name``.
""" """
if "id" in source: repoid, baseurl, kwargs = source_to_repodict(source)
# This is an API v1 source definition repo = dnf.repo.Repo(repoid, dnf_conf)
repo = dnf.repo.Repo(source["id"], dnf_conf) if baseurl:
if "name" in source: repo.baseurl = baseurl
repo.name = source["name"]
else:
repo = dnf.repo.Repo(source["name"], dnf_conf)
# This will allow errors to be raised so we can catch them
# without this they are logged, but the repo is silently disabled
repo.skip_if_unavailable = False
if source["type"] == "yum-baseurl": # Apply the rest of the kwargs to the Repo object
repo.baseurl = source["url"] for k, v in kwargs.items():
elif source["type"] == "yum-metalink": setattr(repo, k, v)
repo.metalink = source["url"]
elif source["type"] == "yum-mirrorlist":
repo.mirrorlist = source["url"]
if "proxy" in source:
repo.proxy = source["proxy"]
if source["check_ssl"]:
repo.sslverify = True
else:
repo.sslverify = False
if source["check_gpg"]:
repo.gpgcheck = True
else:
repo.gpgcheck = False
if "gpgkey_urls" in source:
repo.gpgkey = tuple(source["gpgkey_urls"])
repo.enable() repo.enable()
@ -607,3 +635,63 @@ def delete_repo_source(source_glob, source_id):
raise ProjectsError("Problem deleting repo source %s: %s" % (source_id, str(e))) raise ProjectsError("Problem deleting repo source %s: %s" % (source_id, str(e)))
if not found: if not found:
raise ProjectsError("source %s not found" % source_id) raise ProjectsError("source %s not found" % source_id)
def new_repo_source(dbo, repoid, source, repo_dir):
"""Add a new repo source from a Weldr source dict
:param dbo: dnf base object
:type dbo: dnf.Base
:param id: The repo id (API v0 uses the name, v1 uses the id)
:type id: str
:param source: A Weldr source dict
:type source: dict
:returns: None
:raises: ...
Make sure access to the dbo has been locked before calling this.
The `id` parameter will the the 'name' field for API v0, and the 'id' field for API v1
DNF variables will be substituted at load time, and on restart.
"""
try:
# Remove it from the RepoDict (NOTE that this isn't explicitly supported by the DNF API)
# If this repo already exists, delete it and replace it with the new one
repos = list(r.id for r in dbo.repos.iter_enabled())
if repoid in repos:
del dbo.repos[repoid]
# Add the repo and substitute any dnf variables
_, baseurl, kwargs = source_to_repodict(source)
log.debug("repoid=%s, baseurl=%s, kwargs=%s", repoid, baseurl, kwargs)
r = dbo.repos.add_new_repo(repoid, dbo.conf, baseurl, **kwargs)
r.enable()
log.info("Updating repository metadata after adding %s", repoid)
dbo.fill_sack(load_system_repo=False)
dbo.read_comps()
# Remove any previous sources with this id, ignore it if it isn't found
try:
delete_repo_source(joinpaths(repo_dir, "*.repo"), repoid)
except ProjectsError:
pass
# Make sure the source id can't contain a path traversal by taking the basename
source_path = joinpaths(repo_dir, os.path.basename("%s.repo" % repoid))
# Write the un-substituted version of the repo to disk
with open(source_path, "w") as f:
repo = source_to_repo(source, dbo.conf)
f.write(dnf_repo_to_file_repo(repo))
except Exception as e:
log.error("(new_repo_source) adding %s failed: %s", repoid, str(e))
# Cleanup the mess, if loading it failed we don't want to leave it in memory
repos = list(r.id for r in dbo.repos.iter_enabled())
if repoid in repos:
del dbo.repos[repoid]
log.info("Updating repository metadata after adding %s failed", repoid)
dbo.fill_sack(load_system_repo=False)
dbo.read_comps()
raise

View File

@ -62,7 +62,7 @@ from pylorax.api.errors import * # pylint: disable
from pylorax.api.flask_blueprint import BlueprintSkip from pylorax.api.flask_blueprint import BlueprintSkip
from pylorax.api.projects import projects_list, projects_info, projects_depsolve from pylorax.api.projects import projects_list, projects_info, projects_depsolve
from pylorax.api.projects import modules_list, modules_info, ProjectsError, repo_to_source from pylorax.api.projects import modules_list, modules_info, ProjectsError, repo_to_source
from pylorax.api.projects import get_repo_sources, delete_repo_source, source_to_repo, dnf_repo_to_file_repo from pylorax.api.projects import get_repo_sources, delete_repo_source, new_repo_source
from pylorax.api.queue import queue_status, build_status, uuid_delete, uuid_status, uuid_info from pylorax.api.queue import queue_status, build_status, uuid_delete, uuid_status, uuid_info
from pylorax.api.queue import uuid_tar, uuid_image, uuid_cancel, uuid_log from pylorax.api.queue import uuid_tar, uuid_image, uuid_cancel, uuid_log
from pylorax.api.recipes import list_branch_files, read_recipe_commit, recipe_filename, list_commits from pylorax.api.recipes import list_branch_files, read_recipe_commit, recipe_filename, list_commits
@ -1206,46 +1206,9 @@ def v0_projects_source_new():
try: try:
# Remove it from the RepoDict (NOTE that this isn't explicitly supported by the DNF API) # Remove it from the RepoDict (NOTE that this isn't explicitly supported by the DNF API)
with api.config["DNFLOCK"].lock: with api.config["DNFLOCK"].lock:
dbo = api.config["DNFLOCK"].dbo repo_dir = api.config["COMPOSER_CFG"].get("composer", "repo_dir")
# If this repo already exists, delete it and replace it with the new one new_repo_source(api.config["DNFLOCK"].dbo, source["name"], source, repo_dir)
repos = list(r.id for r in dbo.repos.iter_enabled())
if source["name"] in repos:
del dbo.repos[source["name"]]
repo = source_to_repo(source, dbo.conf)
dbo.repos.add(repo)
log.info("Updating repository metadata after adding %s", source["name"])
dbo.fill_sack(load_system_repo=False)
dbo.read_comps()
# Write the new repo to disk, replacing any existing ones
repo_dir = api.config["COMPOSER_CFG"].get("composer", "repo_dir")
# Remove any previous sources with this name, ignore it if it isn't found
try:
delete_repo_source(joinpaths(repo_dir, "*.repo"), source["name"])
except ProjectsError:
pass
# Make sure the source name can't contain a path traversal by taking the basename
source_path = joinpaths(repo_dir, os.path.basename("%s.repo" % source["name"]))
with open(source_path, "w") as f:
f.write(dnf_repo_to_file_repo(repo))
except Exception as e: except Exception as e:
log.error("(v0_projects_source_add) adding %s failed: %s", source["name"], str(e))
# Cleanup the mess, if loading it failed we don't want to leave it in memory
repos = list(r.id for r in dbo.repos.iter_enabled())
if source["name"] in repos:
with api.config["DNFLOCK"].lock:
dbo = api.config["DNFLOCK"].dbo
del dbo.repos[source["name"]]
log.info("Updating repository metadata after adding %s failed", source["name"])
dbo.fill_sack(load_system_repo=False)
dbo.read_comps()
return jsonify(status=False, errors=[{"id": PROJECTS_ERROR, "msg": str(e)}]), 400 return jsonify(status=False, errors=[{"id": PROJECTS_ERROR, "msg": str(e)}]), 400
return jsonify(status=True) return jsonify(status=True)

View File

@ -19,7 +19,6 @@
""" """
import logging import logging
log = logging.getLogger("lorax-composer") log = logging.getLogger("lorax-composer")
import os
from flask import jsonify, request from flask import jsonify, request
from flask import current_app as api from flask import current_app as api
@ -27,12 +26,9 @@ from flask import current_app as api
from pylorax.api.checkparams import checkparams from pylorax.api.checkparams import checkparams
from pylorax.api.errors import INVALID_CHARS, PROJECTS_ERROR, SYSTEM_SOURCE, UNKNOWN_SOURCE from pylorax.api.errors import INVALID_CHARS, PROJECTS_ERROR, SYSTEM_SOURCE, UNKNOWN_SOURCE
from pylorax.api.flask_blueprint import BlueprintSkip from pylorax.api.flask_blueprint import BlueprintSkip
from pylorax.api.projects import delete_repo_source, dnf_repo_to_file_repo, get_repo_sources, repo_to_source from pylorax.api.projects import get_repo_sources, new_repo_source, repo_to_source
from pylorax.api.projects import source_to_repo
from pylorax.api.projects import ProjectsError
from pylorax.api.regexes import VALID_API_STRING from pylorax.api.regexes import VALID_API_STRING
import pylorax.api.toml as toml import pylorax.api.toml as toml
from pylorax.sysutils import joinpaths
# Create the v1 routes Blueprint with skip_routes support # Create the v1 routes Blueprint with skip_routes support
v1_api = BlueprintSkip("v1_routes", __name__) v1_api = BlueprintSkip("v1_routes", __name__)
@ -153,9 +149,7 @@ def v1_projects_source_new():
else: else:
source = request.get_json(cache=False) source = request.get_json(cache=False)
# XXX TODO
# Check for id in source, return error if not # Check for id in source, return error if not
# Add test for that
if "id" not in source: if "id" not in source:
return jsonify(status=False, errors=[{"id": UNKNOWN_SOURCE, "msg": "'id' field is missing from API v1 request."}]), 400 return jsonify(status=False, errors=[{"id": UNKNOWN_SOURCE, "msg": "'id' field is missing from API v1 request."}]), 400
@ -166,48 +160,9 @@ def v1_projects_source_new():
try: try:
# Remove it from the RepoDict (NOTE that this isn't explicitly supported by the DNF API) # Remove it from the RepoDict (NOTE that this isn't explicitly supported by the DNF API)
with api.config["DNFLOCK"].lock: with api.config["DNFLOCK"].lock:
dbo = api.config["DNFLOCK"].dbo repo_dir = api.config["COMPOSER_CFG"].get("composer", "repo_dir")
# If this repo already exists, delete it and replace it with the new one new_repo_source(api.config["DNFLOCK"].dbo, source["id"], source, repo_dir)
repos = list(r.id for r in dbo.repos.iter_enabled())
if source["id"] in repos:
del dbo.repos[source["id"]]
repo = source_to_repo(source, dbo.conf)
dbo.repos.add(repo)
log.info("Updating repository metadata after adding %s", source["id"])
dbo.fill_sack(load_system_repo=False)
dbo.read_comps()
# Write the new repo to disk, replacing any existing ones
repo_dir = api.config["COMPOSER_CFG"].get("composer", "repo_dir")
# Remove any previous sources with this id, ignore it if it isn't found
try:
delete_repo_source(joinpaths(repo_dir, "*.repo"), source["id"])
except ProjectsError:
pass
# Make sure the source id can't contain a path traversal by taking the basename
source_path = joinpaths(repo_dir, os.path.basename("%s.repo" % source["id"]))
with open(source_path, "w") as f:
f.write(dnf_repo_to_file_repo(repo))
except Exception as e: except Exception as e:
log.error("(v0_projects_source_add) adding %s failed: %s", source["id"], str(e))
# Cleanup the mess, if loading it failed we don't want to leave it in memory
repos = list(r.id for r in dbo.repos.iter_enabled())
if source["id"] in repos:
with api.config["DNFLOCK"].lock:
dbo = api.config["DNFLOCK"].dbo
del dbo.repos[source["id"]]
log.info("Updating repository metadata after adding %s failed", source["id"])
dbo.fill_sack(load_system_repo=False)
dbo.read_comps()
return jsonify(status=False, errors=[{"id": PROJECTS_ERROR, "msg": str(e)}]), 400 return jsonify(status=False, errors=[{"id": PROJECTS_ERROR, "msg": str(e)}]), 400
return jsonify(status=True) return jsonify(status=True)

View File

@ -0,0 +1,7 @@
id = "new-repo-2-v1-vars"
name = "API v1 toml new repo with vars"
url = "file:///tmp/lorax-empty-repo-v1-$releasever-$basearch/"
type = "yum-baseurl"
check_ssl = true
check_gpg = true
gpgkey_urls = ["file:///etc/pki/rpm-gpg/RPM-GPG-KEY-fedora-$releasever-$basearch"]

View File

@ -0,0 +1,6 @@
name = "new-repo-2-vars"
url = "file:///tmp/lorax-empty-repo-$releasever-$basearch/"
type = "yum-baseurl"
check_ssl = true
check_gpg = true
gpgkey_urls = ["file:///etc/pki/rpm-gpg/RPM-GPG-KEY-fedora-$releasever-$basearch"]

View File

@ -28,7 +28,7 @@ from pylorax.api.projects import api_time, api_changelog, pkg_to_project, pkg_to
from pylorax.api.projects import proj_to_module, projects_list, projects_info, projects_depsolve from pylorax.api.projects import proj_to_module, projects_list, projects_info, projects_depsolve
from pylorax.api.projects import modules_list, modules_info, ProjectsError, dep_evra, dep_nevra from pylorax.api.projects import modules_list, modules_info, ProjectsError, dep_evra, dep_nevra
from pylorax.api.projects import repo_to_source, get_repo_sources, delete_repo_source, source_to_repo from pylorax.api.projects import repo_to_source, get_repo_sources, delete_repo_source, source_to_repo
from pylorax.api.projects import dnf_repo_to_file_repo from pylorax.api.projects import source_to_repodict, dnf_repo_to_file_repo
from pylorax.api.dnfbase import get_base_object from pylorax.api.dnfbase import get_base_object
class Package(object): class Package(object):
@ -202,7 +202,6 @@ class ProjectsTest(unittest.TestCase):
self.assertTrue("ctags" in names) # default package self.assertTrue("ctags" in names) # default package
self.assertFalse("cmake" in names) # optional package self.assertFalse("cmake" in names) # optional package
class ConfigureTest(unittest.TestCase): class ConfigureTest(unittest.TestCase):
@classmethod @classmethod
def setUpClass(self): def setUpClass(self):
@ -341,6 +340,25 @@ def singlerepo_v1():
d["name"] = "One repo in the file" d["name"] = "One repo in the file"
return d return d
def singlerepo_vars_v0():
return {
"check_gpg": True,
"check_ssl": True,
"gpgkey_urls": [
"file:///etc/pki/rpm-gpg/RPM-GPG-KEY-fedora-$releasever-$basearch"
],
"name": "single-repo",
"system": False,
"type": "yum-baseurl",
"url": "file:///tmp/lorax-empty-repo-$releasever-$basearch/"
}
def singlerepo_vars_v1():
d = singlerepo_v0()
d["id"] = "single-repo"
d["name"] = "One repo in the file"
return d
class SourceTest(unittest.TestCase): class SourceTest(unittest.TestCase):
@classmethod @classmethod
def setUpClass(self): def setUpClass(self):
@ -443,51 +461,101 @@ class SourceTest(unittest.TestCase):
repo = source_to_repo(fakerepo_baseurl_v0(), self.dbo.conf) repo = source_to_repo(fakerepo_baseurl_v0(), self.dbo.conf)
self.assertEqual(repo.baseurl[0], fakerepo_baseurl_v0()["url"]) self.assertEqual(repo.baseurl[0], fakerepo_baseurl_v0()["url"])
def test_source_to_repodict_baseurl(self):
"""Test creating a repodict with a baseurl API v0"""
repo = source_to_repodict(fakerepo_baseurl_v0())
self.assertEqual(repo[1][0], fakerepo_baseurl_v0()["url"])
def test_source_to_repo_baseurl_v1(self): def test_source_to_repo_baseurl_v1(self):
"""Test creating a dnf.Repo with a baseurl API v1""" """Test creating a dnf.Repo with a baseurl API v1"""
repo = source_to_repo(fakerepo_baseurl_v1(), self.dbo.conf) repo = source_to_repo(fakerepo_baseurl_v1(), self.dbo.conf)
self.assertEqual(repo.baseurl[0], fakerepo_baseurl_v1()["url"]) self.assertEqual(repo.baseurl[0], fakerepo_baseurl_v1()["url"])
def test_source_to_repodict_baseurl_v1(self):
"""Test creating a repodict with a baseurl API v1"""
repo = source_to_repodict(fakerepo_baseurl_v1())
self.assertEqual(repo[1][0], fakerepo_baseurl_v1()["url"])
def test_source_to_repo_metalink(self): def test_source_to_repo_metalink(self):
"""Test creating a dnf.Repo with a metalink API v0""" """Test creating a dnf.Repo with a metalink API v0"""
repo = source_to_repo(fakerepo_metalink_v0(), self.dbo.conf) repo = source_to_repo(fakerepo_metalink_v0(), self.dbo.conf)
self.assertEqual(repo.metalink, fakerepo_metalink_v0()["url"]) self.assertEqual(repo.metalink, fakerepo_metalink_v0()["url"])
def test_source_to_repodict_metalink(self):
"""Test creating a repodict with a metalink API v0"""
repo = source_to_repodict(fakerepo_metalink_v0())
self.assertEqual(repo[2]["metalink"], fakerepo_metalink_v0()["url"])
def test_source_to_repo_metalink_v1(self): def test_source_to_repo_metalink_v1(self):
"""Test creating a dnf.Repo with a metalink API v1""" """Test creating a dnf.Repo with a metalink API v1"""
repo = source_to_repo(fakerepo_metalink_v1(), self.dbo.conf) repo = source_to_repo(fakerepo_metalink_v1(), self.dbo.conf)
self.assertEqual(repo.metalink, fakerepo_metalink_v1()["url"]) self.assertEqual(repo.metalink, fakerepo_metalink_v1()["url"])
def test_source_to_repodict_metalink_v1(self):
"""Test creating a repodict with a metalink API v1"""
repo = source_to_repodict(fakerepo_metalink_v1())
self.assertEqual(repo[2]["metalink"], fakerepo_metalink_v1()["url"])
def test_source_to_repo_mirrorlist(self): def test_source_to_repo_mirrorlist(self):
"""Test creating a dnf.Repo with a mirrorlist API v0""" """Test creating a dnf.Repo with a mirrorlist API v0"""
repo = source_to_repo(fakerepo_mirrorlist_v0(), self.dbo.conf) repo = source_to_repo(fakerepo_mirrorlist_v0(), self.dbo.conf)
self.assertEqual(repo.mirrorlist, fakerepo_mirrorlist_v0()["url"]) self.assertEqual(repo.mirrorlist, fakerepo_mirrorlist_v0()["url"])
def test_source_to_repodict_mirrorlist(self):
"""Test creating a repodict with a mirrorlist API v0"""
repo = source_to_repodict(fakerepo_mirrorlist_v0())
self.assertEqual(repo[2]["mirrorlist"], fakerepo_mirrorlist_v0()["url"])
def test_source_to_repo_mirrorlist_v1(self): def test_source_to_repo_mirrorlist_v1(self):
"""Test creating a dnf.Repo with a mirrorlist""" """Test creating a dnf.Repo with a mirrorlist"""
repo = source_to_repo(fakerepo_mirrorlist_v1(), self.dbo.conf) repo = source_to_repo(fakerepo_mirrorlist_v1(), self.dbo.conf)
self.assertEqual(repo.mirrorlist, fakerepo_mirrorlist_v1()["url"]) self.assertEqual(repo.mirrorlist, fakerepo_mirrorlist_v1()["url"])
def test_source_to_repodict_mirrorlist_v1(self):
"""Test creating a repodict with a mirrorlist"""
repo = source_to_repodict(fakerepo_mirrorlist_v1())
self.assertEqual(repo[2]["mirrorlist"], fakerepo_mirrorlist_v1()["url"])
def test_source_to_repo_proxy(self): def test_source_to_repo_proxy(self):
"""Test creating a dnf.Repo with a proxy API v0""" """Test creating a dnf.Repo with a proxy API v0"""
repo = source_to_repo(fakerepo_proxy_v0(), self.dbo.conf) repo = source_to_repo(fakerepo_proxy_v0(), self.dbo.conf)
self.assertEqual(repo.proxy, fakerepo_proxy_v0()["proxy"]) self.assertEqual(repo.proxy, fakerepo_proxy_v0()["proxy"])
def test_source_to_repodict_proxy(self):
"""Test creating a repodict with a proxy API v0"""
repo = source_to_repodict(fakerepo_proxy_v0())
self.assertEqual(repo[2]["proxy"], fakerepo_proxy_v0()["proxy"])
def test_source_to_repo_proxy_v1(self): def test_source_to_repo_proxy_v1(self):
"""Test creating a dnf.Repo with a proxy API v1""" """Test creating a dnf.Repo with a proxy API v1"""
repo = source_to_repo(fakerepo_proxy_v1(), self.dbo.conf) repo = source_to_repo(fakerepo_proxy_v1(), self.dbo.conf)
self.assertEqual(repo.proxy, fakerepo_proxy_v1()["proxy"]) self.assertEqual(repo.proxy, fakerepo_proxy_v1()["proxy"])
def test_source_to_repodict_proxy_v1(self):
"""Test creating a repodict with a proxy API v1"""
repo = source_to_repodict(fakerepo_proxy_v1())
self.assertEqual(repo[2]["proxy"], fakerepo_proxy_v1()["proxy"])
def test_source_to_repo_gpgkey(self): def test_source_to_repo_gpgkey(self):
"""Test creating a dnf.Repo with a proxy API v0""" """Test creating a dnf.Repo with a proxy API v0"""
repo = source_to_repo(fakerepo_gpgkey_v0(), self.dbo.conf) repo = source_to_repo(fakerepo_gpgkey_v0(), self.dbo.conf)
self.assertEqual(repo.gpgkey[0], fakerepo_gpgkey_v0()["gpgkey_urls"][0]) self.assertEqual(repo.gpgkey[0], fakerepo_gpgkey_v0()["gpgkey_urls"][0])
def test_source_to_repodict_gpgkey(self):
"""Test creating a repodict with a proxy API v0"""
repo = source_to_repodict(fakerepo_gpgkey_v0())
self.assertEqual(repo[2]["gpgkey"][0], fakerepo_gpgkey_v0()["gpgkey_urls"][0])
def test_source_to_repo_gpgkey_v1(self): def test_source_to_repo_gpgkey_v1(self):
"""Test creating a dnf.Repo with a proxy API v1""" """Test creating a dnf.Repo with a proxy API v1"""
repo = source_to_repo(fakerepo_gpgkey_v1(), self.dbo.conf) repo = source_to_repo(fakerepo_gpgkey_v1(), self.dbo.conf)
self.assertEqual(repo.gpgkey[0], fakerepo_gpgkey_v1()["gpgkey_urls"][0]) self.assertEqual(repo.gpgkey[0], fakerepo_gpgkey_v1()["gpgkey_urls"][0])
def test_source_to_repodict_gpgkey_v1(self):
"""Test creating a repodict with a proxy API v1"""
repo = source_to_repodict(fakerepo_gpgkey_v1())
self.assertEqual(repo[2]["gpgkey"][0], fakerepo_gpgkey_v1()["gpgkey_urls"][0])
def test_drtfr_baseurl(self): def test_drtfr_baseurl(self):
"""Test creating a dnf .repo file from a baseurl Repo object""" """Test creating a dnf .repo file from a baseurl Repo object"""
self.assertEqual(dnf_repo_to_file_repo(self.dbo.repos.get("fake-repo-baseurl")), self.assertEqual(dnf_repo_to_file_repo(self.dbo.repos.get("fake-repo-baseurl")),

View File

@ -18,6 +18,7 @@
import os import os
from configparser import ConfigParser, NoOptionError from configparser import ConfigParser, NoOptionError
from contextlib import contextmanager from contextlib import contextmanager
import dnf
from glob import glob from glob import glob
from rpmfluff import SimpleRpmBuild, expectedArch from rpmfluff import SimpleRpmBuild, expectedArch
import shutil import shutil
@ -127,10 +128,14 @@ class ServerTestCase(unittest.TestCase):
if os.path.exists("/etc/yum.repos.d/fedora-rawhide.repo"): if os.path.exists("/etc/yum.repos.d/fedora-rawhide.repo"):
self.rawhide = True self.rawhide = True
# Need the substitution values to create the directories before we can create the dnf.Base for real
dbo = dnf.Base()
repo_dirs = ["/tmp/lorax-empty-repo-%s-%s" % (dbo.conf.substitutions["releasever"], dbo.conf.substitutions["basearch"]),
"/tmp/lorax-empty-repo-v1-%s-%s" % (dbo.conf.substitutions["releasever"], dbo.conf.substitutions["basearch"])]
# dnf repo baseurl has to point to an absolute directory, so we use /tmp/lorax-empty-repo/ in the files # dnf repo baseurl has to point to an absolute directory, so we use /tmp/lorax-empty-repo/ in the files
# and create an empty repository. We now remove duplicate repo entries so we need a number of them. # and create an empty repository. We now remove duplicate repo entries so we need a number of them.
for d in ["/tmp/lorax-empty-repo/", "/tmp/lorax-other-empty-repo/", "/tmp/lorax-empty-repo-1/", for d in repo_dirs + ["/tmp/lorax-empty-repo/", "/tmp/lorax-other-empty-repo/", "/tmp/lorax-empty-repo-1/",
"/tmp/lorax-empty-repo-2/", "/tmp/lorax-empty-repo-3/", "/tmp/lorax-empty-repo-4/"]: "/tmp/lorax-empty-repo-2/", "/tmp/lorax-empty-repo-3/", "/tmp/lorax-empty-repo-4/"]:
os.makedirs(d) os.makedirs(d)
rc = os.system("createrepo_c %s" % d) rc = os.system("createrepo_c %s" % d)
if rc != 0: if rc != 0:
@ -139,6 +144,13 @@ class ServerTestCase(unittest.TestCase):
server.config["DNFLOCK"] = DNFLock(server.config["COMPOSER_CFG"]) server.config["DNFLOCK"] = DNFLock(server.config["COMPOSER_CFG"])
# Grab the substitution values for later
with server.config["DNFLOCK"].lock:
self.substitutions = server.config["DNFLOCK"].dbo.conf.substitutions
if "releasever" not in self.substitutions or "basearch" not in self.substitutions:
raise RuntimeError("DNF is missing the releasever and basearch substitutions")
# Include a message in /api/status output # Include a message in /api/status output
server.config["TEMPLATE_ERRORS"] = ["Test message"] server.config["TEMPLATE_ERRORS"] = ["Test message"]
@ -831,6 +843,29 @@ class ServerTestCase(unittest.TestCase):
sources = data["sources"] sources = data["sources"]
self.assertTrue("new-repo-2" in sources) self.assertTrue("new-repo-2" in sources)
def test_projects_source_00_new_toml_vars(self):
"""Test /api/v0/projects/source/new with a new toml source using vars"""
toml_source = open("./tests/pylorax/source/test-repo-vars.toml").read()
self.assertTrue(len(toml_source) > 0)
resp = self.server.post("/api/v0/projects/source/new",
data=toml_source,
content_type="text/x-toml")
data = json.loads(resp.data)
self.assertEqual(data, {"status":True})
# Was it added, and was is it correct?
resp = self.server.get("/api/v1/projects/source/info/new-repo-2-vars")
data = json.loads(resp.data)
self.assertNotEqual(data, None)
sources = data["sources"]
print(sources)
self.assertTrue("new-repo-2-vars" in sources)
self.assertTrue(self.substitutions["releasever"] in sources["new-repo-2-vars"]["url"])
self.assertTrue(self.substitutions["basearch"] in sources["new-repo-2-vars"]["url"])
self.assertTrue(self.substitutions["releasever"] in sources["new-repo-2-vars"]["gpgkey_urls"][0])
self.assertTrue(self.substitutions["basearch"] in sources["new-repo-2-vars"]["gpgkey_urls"][0])
def test_projects_source_01_new_toml(self): def test_projects_source_01_new_toml(self):
"""Test /api/v1/projects/source/new with a new toml source""" """Test /api/v1/projects/source/new with a new toml source"""
toml_source = open("./tests/pylorax/source/test-repo-v1.toml").read() toml_source = open("./tests/pylorax/source/test-repo-v1.toml").read()
@ -852,6 +887,27 @@ class ServerTestCase(unittest.TestCase):
self.assertTrue("name" in sources["new-repo-2-v1"]) self.assertTrue("name" in sources["new-repo-2-v1"])
self.assertEqual(sources["new-repo-2-v1"]["name"], "API v1 toml new repo") self.assertEqual(sources["new-repo-2-v1"]["name"], "API v1 toml new repo")
def test_projects_source_01_new_toml_vars(self):
"""Test /api/v1/projects/source/new with a new toml source using vars"""
toml_source = open("./tests/pylorax/source/test-repo-v1-vars.toml").read()
self.assertTrue(len(toml_source) > 0)
resp = self.server.post("/api/v1/projects/source/new",
data=toml_source,
content_type="text/x-toml")
data = json.loads(resp.data)
self.assertEqual(data, {"status":True})
# Was it added, and was is it correct?
resp = self.server.get("/api/v1/projects/source/info/new-repo-2-v1-vars")
data = json.loads(resp.data)
self.assertNotEqual(data, None)
sources = data["sources"]
self.assertTrue("new-repo-2-v1-vars" in sources)
self.assertTrue(self.substitutions["releasever"] in sources["new-repo-2-v1-vars"]["url"])
self.assertTrue(self.substitutions["basearch"] in sources["new-repo-2-v1-vars"]["url"])
self.assertTrue(self.substitutions["releasever"] in sources["new-repo-2-v1-vars"]["gpgkey_urls"][0])
self.assertTrue(self.substitutions["basearch"] in sources["new-repo-2-v1-vars"]["gpgkey_urls"][0])
def test_projects_source_02_new_toml(self): def test_projects_source_02_new_toml(self):
"""Test /api/v1/projects/source/new with a new toml source w/o id field""" """Test /api/v1/projects/source/new with a new toml source w/o id field"""
toml_source = open("./tests/pylorax/source/test-repo.toml").read() toml_source = open("./tests/pylorax/source/test-repo.toml").read()