lorax-composer: Add v1 API for projects/source/

This changes the source 'name' field to match the DNF usage of it as a
descriptive string. 'id' is now used as the short name to refer to the
source. The v0 API remains unchanged.

Tests for v1 behavior have been added.

Now that the v1 API is in use the status message will return api: 1
This commit is contained in:
Brian C. Lane 2019-08-07 13:19:59 -07:00
parent 278214ff8e
commit 1e88a99443
8 changed files with 454 additions and 51 deletions

View File

@ -409,13 +409,15 @@ def dnf_repo_to_file_repo(repo):
return repo_str return repo_str
def repo_to_source(repo, system_source): def repo_to_source(repo, system_source, api=1):
"""Return a Weldr Source dict created from the DNF Repository """Return a Weldr Source dict created from the DNF Repository
:param repo: DNF Repository :param repo: DNF Repository
:type repo: dnf.RepoDict :type repo: dnf.RepoDict
:param system_source: True if this source is an immutable system source :param system_source: True if this source is an immutable system source
:type system_source: bool :type system_source: bool
:param api: Select which api version of the dict to return (default 1)
:type api: int
:returns: A dict with Weldr Source fields filled in :returns: A dict with Weldr Source fields filled in
:rtype: dict :rtype: dict
@ -427,15 +429,23 @@ def repo_to_source(repo, system_source):
"gpgkey_url": [ "gpgkey_url": [
"file:///etc/pki/rpm-gpg/RPM-GPG-KEY-fedora-28-x86_64" "file:///etc/pki/rpm-gpg/RPM-GPG-KEY-fedora-28-x86_64"
], ],
"name": "fedora", "id": "fedora",
"name": "Fedora $releasever - $basearch",
"proxy": "http://proxy.brianlane.com:8123", "proxy": "http://proxy.brianlane.com:8123",
"system": true "system": true
"type": "yum-metalink", "type": "yum-metalink",
"url": "https://mirrors.fedoraproject.org/metalink?repo=fedora-28&arch=x86_64" "url": "https://mirrors.fedoraproject.org/metalink?repo=fedora-28&arch=x86_64"
} }
The ``name`` field has changed in v1 of the API.
In v0 of the API ``name`` is the repo.id, in v1 it is the repo.name and a new field,
``id`` has been added for the repo.id
""" """
if api==0:
source = {"name": repo.id, "system": system_source} source = {"name": repo.id, "system": system_source}
else:
source = {"id": repo.id, "name": repo.name, "system": system_source}
if repo.baseurl: if repo.baseurl:
source["url"] = repo.baseurl[0] source["url"] = repo.baseurl[0]
source["type"] = "yum-baseurl" source["type"] = "yum-baseurl"
@ -472,6 +482,8 @@ def source_to_repo(source, dnf_conf):
:param source: A Weldr source dict :param source: A Weldr source dict
:type source: dict :type source: dict
:param dnf_conf: The dnf Config object
:type dnf_conf: dnf.conf
:returns: A dnf Repo object :returns: A dnf Repo object
:rtype: dnf.Repo :rtype: dnf.Repo
@ -483,14 +495,23 @@ def source_to_repo(source, dnf_conf):
"gpgkey_urls": [ "gpgkey_urls": [
"file:///etc/pki/rpm-gpg/RPM-GPG-KEY-fedora-28-x86_64" "file:///etc/pki/rpm-gpg/RPM-GPG-KEY-fedora-28-x86_64"
], ],
"name": "fedora", "id": "fedora",
"name": "Fedora $releasever - $basearch",
"proxy": "http://proxy.brianlane.com:8123", "proxy": "http://proxy.brianlane.com:8123",
"system": True "system": True
"type": "yum-metalink", "type": "yum-metalink",
"url": "https://mirrors.fedoraproject.org/metalink?repo=fedora-28&arch=x86_64" "url": "https://mirrors.fedoraproject.org/metalink?repo=fedora-28&arch=x86_64"
} }
If the ``id`` field is included it is used for the repo id, otherwise ``name`` is used.
v0 of the API only used ``name``, v1 added the distinction between ``id`` and ``name``.
""" """
if "id" in source:
# This is an API v1 source definition
repo = dnf.repo.Repo(source["id"], dnf_conf)
if "name" in source:
repo.name = source["name"]
else:
repo = dnf.repo.Repo(source["name"], dnf_conf) repo = dnf.repo.Repo(source["name"], dnf_conf)
# This will allow errors to be raised so we can catch them # This will allow errors to be raised so we can catch them
# without this they are logged, but the repo is silently disabled # without this they are logged, but the repo is silently disabled
@ -551,11 +572,13 @@ def get_repo_sources(source_glob):
sources.extend(get_source_ids(f)) sources.extend(get_source_ids(f))
return sources return sources
def delete_repo_source(source_glob, source_name): def delete_repo_source(source_glob, source_id):
"""Delete a source from a repo file """Delete a source from a repo file
:param source_glob: A glob of the repo sources to search :param source_glob: A glob of the repo sources to search
:type source_glob: str :type source_glob: str
:param source_id: The repo id to delete
:type source_id: str
:returns: None :returns: None
:raises: ProjectsError if there was a problem :raises: ProjectsError if there was a problem
@ -563,16 +586,16 @@ def delete_repo_source(source_glob, source_name):
If it is the last one in the file, delete the file. If it is the last one in the file, delete the file.
WARNING: This will delete ANY source, the caller needs to ensure that a system WARNING: This will delete ANY source, the caller needs to ensure that a system
source_name isn't passed to it. source_id isn't passed to it.
""" """
found = False found = False
for f in glob(source_glob): for f in glob(source_glob):
try: try:
cfg = ConfigParser() cfg = ConfigParser()
cfg.read(f) cfg.read(f)
if source_name in cfg.sections(): if source_id in cfg.sections():
found = True found = True
cfg.remove_section(source_name) cfg.remove_section(source_id)
# If there are other sections, rewrite the file without the deleted one # If there are other sections, rewrite the file without the deleted one
if len(cfg.sections()) > 0: if len(cfg.sections()) > 0:
with open(f, "w") as cfg_file: with open(f, "w") as cfg_file:
@ -581,6 +604,6 @@ def delete_repo_source(source_glob, source_name):
# No sections left, just delete the file # No sections left, just delete the file
os.unlink(f) os.unlink(f)
except Exception as e: except Exception as e:
raise ProjectsError("Problem deleting repo source %s: %s" % (source_name, str(e))) raise ProjectsError("Problem deleting repo source %s: %s" % (source_id, str(e)))
if not found: if not found:
raise ProjectsError("source %s not found" % source_name) raise ProjectsError("source %s not found" % source_id)

View File

@ -74,7 +74,7 @@ def api_status():
""" """
return jsonify(backend="lorax-composer", return jsonify(backend="lorax-composer",
build=vernum, build=vernum,
api="0", api="1",
db_version="0", db_version="0",
schema_version="0", schema_version="0",
db_supported=True, db_supported=True,
@ -88,4 +88,7 @@ def bad_request(error):
server.register_blueprint(v0_api, url_prefix="/api/v0/") server.register_blueprint(v0_api, url_prefix="/api/v0/")
# Register the v1 API on /api/v1/ # Register the v1 API on /api/v1/
# Use v0 routes by default
server.register_blueprint(v0_api, url_prefix="/api/v1/",
skip_rules=["/projects/source/info/<source_names>", "/projects/source/new"])
server.register_blueprint(v1_api, url_prefix="/api/v1/") server.register_blueprint(v1_api, url_prefix="/api/v1/")

View File

@ -1144,7 +1144,7 @@ def v0_projects_source_info(source_names):
if not repo: if not repo:
errors.append({"id": UNKNOWN_SOURCE, "msg": "%s is not a valid source" % source}) errors.append({"id": UNKNOWN_SOURCE, "msg": "%s is not a valid source" % source})
continue continue
sources[repo.id] = repo_to_source(repo, repo.id in system_sources) sources[repo.id] = repo_to_source(repo, repo.id in system_sources, api=0)
if out_fmt == "toml" and not errors: if out_fmt == "toml" and not errors:
# With TOML output we just want to dump the raw sources, skipping the errors # With TOML output we just want to dump the raw sources, skipping the errors

View File

@ -17,7 +17,197 @@
""" Setup v1 of the API server """ Setup v1 of the API server
""" """
import logging
log = logging.getLogger("lorax-composer")
import os
from flask import jsonify, request
from flask import current_app as api
from pylorax.api.checkparams import checkparams
from pylorax.api.errors import INVALID_CHARS, PROJECTS_ERROR, SYSTEM_SOURCE, UNKNOWN_SOURCE
from pylorax.api.flask_blueprint import BlueprintSkip from pylorax.api.flask_blueprint import BlueprintSkip
from pylorax.api.projects import delete_repo_source, dnf_repo_to_file_repo, get_repo_sources, repo_to_source
from pylorax.api.projects import source_to_repo
from pylorax.api.projects import ProjectsError
from pylorax.api.regexes import VALID_API_STRING
import pylorax.api.toml as toml
from pylorax.sysutils import joinpaths
# Create the v1 routes Blueprint with skip_routes support # Create the v1 routes Blueprint with skip_routes support
v1_api = BlueprintSkip("v1_routes", __name__) v1_api = BlueprintSkip("v1_routes", __name__)
@v1_api.route("/projects/source/info", defaults={'source_ids': ""})
@v1_api.route("/projects/source/info/<source_ids>")
@checkparams([("source_ids", "", "no source names given")])
def v1_projects_source_info(source_ids):
"""Return detailed info about the list of sources
**/api/v1/projects/source/info/<source-ids>**
Return information about the comma-separated list of source ids. Or all of the
sources if '*' is passed. Note that general globbing is not supported, only '*'.
Immutable system sources will have the "system" field set to true. User added sources
will have it set to false. System sources cannot be changed or deleted.
Example::
{
"errors": [],
"sources": {
"fedora": {
"check_gpg": true,
"check_ssl": true,
"gpgkey_urls": [
"file:///etc/pki/rpm-gpg/RPM-GPG-KEY-fedora-28-x86_64"
],
"id": "fedora",
"name": "Fedora $releasever - $basearch",
"proxy": "http://proxy.brianlane.com:8123",
"system": true,
"type": "yum-metalink",
"url": "https://mirrors.fedoraproject.org/metalink?repo=fedora-28&arch=x86_64"
}
}
}
In v0 the ``name`` field was used for the id (a short name for the repo). In v1 ``name`` changed
to ``id`` and ``name`` is now used for the longer descriptive name of the repository.
"""
if VALID_API_STRING.match(source_ids) is None:
return jsonify(status=False, errors=[{"id": INVALID_CHARS, "msg": "Invalid characters in API path"}]), 400
out_fmt = request.args.get("format", "json")
if VALID_API_STRING.match(out_fmt) is None:
return jsonify(status=False, errors=[{"id": INVALID_CHARS, "msg": "Invalid characters in format argument"}]), 400
# Return info on all of the sources
if source_ids == "*":
with api.config["DNFLOCK"].lock:
source_ids = ",".join(r.id for r in api.config["DNFLOCK"].dbo.repos.iter_enabled())
sources = {}
errors = []
system_sources = get_repo_sources("/etc/yum.repos.d/*.repo")
for source in source_ids.split(","):
with api.config["DNFLOCK"].lock:
repo = api.config["DNFLOCK"].dbo.repos.get(source, None)
if not repo:
errors.append({"id": UNKNOWN_SOURCE, "msg": "%s is not a valid source" % source})
continue
sources[repo.id] = repo_to_source(repo, repo.id in system_sources, api=1)
if out_fmt == "toml" and not errors:
# With TOML output we just want to dump the raw sources, skipping the errors
return toml.dumps(sources)
elif out_fmt == "toml" and errors:
# TOML requested, but there was an error
return jsonify(status=False, errors=errors), 400
else:
return jsonify(sources=sources, errors=errors)
@v1_api.route("/projects/source/new", methods=["POST"])
def v1_projects_source_new():
"""Add a new package source. Or change an existing one
**POST /api/v0/projects/source/new**
Add (or change) a source for use when depsolving blueprints and composing images.
The ``proxy`` and ``gpgkey_urls`` entries are optional. All of the others are required. The supported
types for the urls are:
* ``yum-baseurl`` is a URL to a yum repository.
* ``yum-mirrorlist`` is a URL for a mirrorlist.
* ``yum-metalink`` is a URL for a metalink.
If ``check_ssl`` is true the https certificates must be valid. If they are self-signed you can either set
this to false, or add your Certificate Authority to the host system.
If ``check_gpg`` is true the GPG key must either be installed on the host system, or ``gpgkey_urls``
should point to it.
You can edit an existing source (other than system sources), by doing a POST
of the new version of the source. It will overwrite the previous one.
Example::
{
"id": "custom-source-1",
"name": "Custom Package Source #1",
"url": "https://url/path/to/repository/",
"type": "yum-baseurl",
"check_ssl": true,
"check_gpg": true,
"gpgkey_urls": [
"https://url/path/to/gpg-key"
]
}
In v0 the ``name`` field was used for the id (a short name for the repo). In v1 ``name`` changed
to ``id`` and ``name`` is now used for the longer descriptive name of the repository.
"""
if request.headers['Content-Type'] == "text/x-toml":
source = toml.loads(request.data)
else:
source = request.get_json(cache=False)
# XXX TODO
# Check for id in source, return error if not
# Add test for that
if "id" not in source:
return jsonify(status=False, errors=[{"id": UNKNOWN_SOURCE, "msg": "'id' field is missing from API v1 request."}]), 400
system_sources = get_repo_sources("/etc/yum.repos.d/*.repo")
if source["id"] in system_sources:
return jsonify(status=False, errors=[{"id": SYSTEM_SOURCE, "msg": "%s is a system source, it cannot be changed." % source["id"]}]), 400
try:
# Remove it from the RepoDict (NOTE that this isn't explicitly supported by the DNF API)
with api.config["DNFLOCK"].lock:
dbo = api.config["DNFLOCK"].dbo
# If this repo already exists, delete it and replace it with the new one
repos = list(r.id for r in dbo.repos.iter_enabled())
if source["id"] in repos:
del dbo.repos[source["id"]]
repo = source_to_repo(source, dbo.conf)
dbo.repos.add(repo)
log.info("Updating repository metadata after adding %s", source["id"])
dbo.fill_sack(load_system_repo=False)
dbo.read_comps()
# Write the new repo to disk, replacing any existing ones
repo_dir = api.config["COMPOSER_CFG"].get("composer", "repo_dir")
# Remove any previous sources with this id, ignore it if it isn't found
try:
delete_repo_source(joinpaths(repo_dir, "*.repo"), source["id"])
except ProjectsError:
pass
# Make sure the source id can't contain a path traversal by taking the basename
source_path = joinpaths(repo_dir, os.path.basename("%s.repo" % source["id"]))
with open(source_path, "w") as f:
f.write(dnf_repo_to_file_repo(repo))
except Exception as e:
log.error("(v0_projects_source_add) adding %s failed: %s", source["id"], str(e))
# Cleanup the mess, if loading it failed we don't want to leave it in memory
repos = list(r.id for r in dbo.repos.iter_enabled())
if source["id"] in repos:
with api.config["DNFLOCK"].lock:
dbo = api.config["DNFLOCK"].dbo
del dbo.repos[source["id"]]
log.info("Updating repository metadata after adding %s failed", source["id"])
dbo.fill_sack(load_system_repo=False)
dbo.read_comps()
return jsonify(status=False, errors=[{"id": PROJECTS_ERROR, "msg": str(e)}]), 400
return jsonify(status=True)

View File

@ -0,0 +1 @@
{"id": "new-repo-1-v1", "name": "API v1 json new repo", "url": "file:///tmp/lorax-empty-repo/", "type": "yum-baseurl", "check_ssl": true, "check_gpg": true, "gpgkey_urls": ["file:///etc/pki/rpm-gpg/RPM-GPG-KEY-fedora-$releasever-$basearch"]}

View File

@ -0,0 +1,7 @@
id = "new-repo-2-v1"
name = "API v1 toml new repo"
url = "file:///tmp/lorax-empty-repo/"
type = "yum-baseurl"
check_ssl = true
check_gpg = true
gpgkey_urls = ["file:///etc/pki/rpm-gpg/RPM-GPG-KEY-fedora-$releasever-$basearch"]

View File

@ -222,7 +222,7 @@ class ConfigureTest(unittest.TestCase):
config = configure(conf_file=self.conf_file + '.non-existing') config = configure(conf_file=self.conf_file + '.non-existing')
self.assertEqual(config.get('composer', 'cache_dir'), '/var/tmp/composer/cache') self.assertEqual(config.get('composer', 'cache_dir'), '/var/tmp/composer/cache')
def fakerepo_baseurl(): def fakerepo_baseurl_v0():
return { return {
"check_gpg": True, "check_gpg": True,
"check_ssl": True, "check_ssl": True,
@ -232,7 +232,13 @@ def fakerepo_baseurl():
"url": "https://fake-repo.base.url" "url": "https://fake-repo.base.url"
} }
def fakesystem_repo(): def fakerepo_baseurl_v1():
d = fakerepo_baseurl_v0()
d["id"] = "fake-repo-baseurl"
d["name"] = "A fake repo with a baseurl"
return d
def fakesystem_repo_v0():
return { return {
"check_gpg": True, "check_gpg": True,
"check_ssl": True, "check_ssl": True,
@ -242,7 +248,13 @@ def fakesystem_repo():
"url": "https://fake-repo.base.url" "url": "https://fake-repo.base.url"
} }
def fakerepo_metalink(): def fakesystem_repo_v1():
d = fakesystem_repo_v0()
d["id"] = "fake-repo-baseurl"
d["name"] = "A fake repo with a baseurl"
return d
def fakerepo_metalink_v0():
return { return {
"check_gpg": True, "check_gpg": True,
"check_ssl": True, "check_ssl": True,
@ -252,7 +264,13 @@ def fakerepo_metalink():
"url": "https://fake-repo.metalink" "url": "https://fake-repo.metalink"
} }
def fakerepo_mirrorlist(): def fakerepo_metalink_v1():
d = fakerepo_metalink_v0()
d["id"] = "fake-repo-metalink"
d["name"] = "A fake repo with a metalink"
return d
def fakerepo_mirrorlist_v0():
return { return {
"check_gpg": True, "check_gpg": True,
"check_ssl": True, "check_ssl": True,
@ -262,7 +280,13 @@ def fakerepo_mirrorlist():
"url": "https://fake-repo.mirrorlist" "url": "https://fake-repo.mirrorlist"
} }
def fakerepo_proxy(): def fakerepo_mirrorlist_v1():
d = fakerepo_mirrorlist_v0()
d["id"] = "fake-repo-mirrorlist"
d["name"] = "A fake repo with a mirrorlist"
return d
def fakerepo_proxy_v0():
return { return {
"check_gpg": True, "check_gpg": True,
"check_ssl": True, "check_ssl": True,
@ -273,7 +297,13 @@ def fakerepo_proxy():
"url": "https://fake-repo.base.url" "url": "https://fake-repo.base.url"
} }
def fakerepo_gpgkey(): def fakerepo_proxy_v1():
d = fakerepo_proxy_v0()
d["id"] = "fake-repo-proxy"
d["name"] = "A fake repo with a proxy"
return d
def fakerepo_gpgkey_v0():
return { return {
"check_gpg": True, "check_gpg": True,
"check_ssl": True, "check_ssl": True,
@ -286,7 +316,13 @@ def fakerepo_gpgkey():
"url": "https://fake-repo.base.url" "url": "https://fake-repo.base.url"
} }
def singlerepo(): def fakerepo_gpgkey_v1():
d = fakerepo_gpgkey_v0()
d["id"] = "fake-repo-gpgkey"
d["name"] = "A fake repo with a gpgkey"
return d
def singlerepo_v0():
return { return {
"check_gpg": True, "check_gpg": True,
"check_ssl": True, "check_ssl": True,
@ -299,6 +335,12 @@ def singlerepo():
"url": "file:///tmp/lorax-empty-repo/" "url": "file:///tmp/lorax-empty-repo/"
} }
def singlerepo_v1():
d = singlerepo_v0()
d["id"] = "single-repo"
d["name"] = "One repo in the file"
return d
class SourceTest(unittest.TestCase): class SourceTest(unittest.TestCase):
@classmethod @classmethod
def setUpClass(self): def setUpClass(self):
@ -320,28 +362,52 @@ class SourceTest(unittest.TestCase):
return open(joinpaths(self.tmp_dir, repo_file), "r").read() return open(joinpaths(self.tmp_dir, repo_file), "r").read()
def test_repo_to_source_baseurl(self): def test_repo_to_source_baseurl(self):
"""Test a repo with a baseurl""" """Test a repo with a baseurl API v0"""
self.assertEqual(repo_to_source(self.dbo.repos.get("fake-repo-baseurl"), False), fakerepo_baseurl()) self.assertEqual(repo_to_source(self.dbo.repos.get("fake-repo-baseurl"), False, 0), fakerepo_baseurl_v0())
def test_repo_to_source_baseurl_v1(self):
"""Test a repo with a baseurl API v1"""
self.assertEqual(repo_to_source(self.dbo.repos.get("fake-repo-baseurl"), False, 1), fakerepo_baseurl_v1())
def test_system_repo(self): def test_system_repo(self):
"""Test a system repo with a baseurl""" """Test a system repo with a baseurl API v0"""
self.assertEqual(repo_to_source(self.dbo.repos.get("fake-repo-baseurl"), True), fakesystem_repo()) self.assertEqual(repo_to_source(self.dbo.repos.get("fake-repo-baseurl"), True, 0), fakesystem_repo_v0())
def test_system_repo_v1(self):
"""Test a system repo with a baseurl API v1"""
self.assertEqual(repo_to_source(self.dbo.repos.get("fake-repo-baseurl"), True, 1), fakesystem_repo_v1())
def test_repo_to_source_metalink(self): def test_repo_to_source_metalink(self):
"""Test a repo with a metalink""" """Test a repo with a metalink API v0"""
self.assertEqual(repo_to_source(self.dbo.repos.get("fake-repo-metalink"), False), fakerepo_metalink()) self.assertEqual(repo_to_source(self.dbo.repos.get("fake-repo-metalink"), False, 0), fakerepo_metalink_v0())
def test_repo_to_source_metalink_v1(self):
"""Test a repo with a metalink API v1"""
self.assertEqual(repo_to_source(self.dbo.repos.get("fake-repo-metalink"), False, 1), fakerepo_metalink_v1())
def test_repo_to_source_mirrorlist(self): def test_repo_to_source_mirrorlist(self):
"""Test a repo with a mirrorlist""" """Test a repo with a mirrorlist API v0"""
self.assertEqual(repo_to_source(self.dbo.repos.get("fake-repo-mirrorlist"), False), fakerepo_mirrorlist()) self.assertEqual(repo_to_source(self.dbo.repos.get("fake-repo-mirrorlist"), False, 0), fakerepo_mirrorlist_v0())
def test_repo_to_source_mirrorlist_v1(self):
"""Test a repo with a mirrorlist API v1"""
self.assertEqual(repo_to_source(self.dbo.repos.get("fake-repo-mirrorlist"), False, 1), fakerepo_mirrorlist_v1())
def test_repo_to_source_proxy(self): def test_repo_to_source_proxy(self):
"""Test a repo with a proxy""" """Test a repo with a proxy API v0"""
self.assertEqual(repo_to_source(self.dbo.repos.get("fake-repo-proxy"), False), fakerepo_proxy()) self.assertEqual(repo_to_source(self.dbo.repos.get("fake-repo-proxy"), False, 0), fakerepo_proxy_v0())
def test_repo_to_source_proxy_v1(self):
"""Test a repo with a proxy API v1"""
self.assertEqual(repo_to_source(self.dbo.repos.get("fake-repo-proxy"), False, 1), fakerepo_proxy_v1())
def test_repo_to_source_gpgkey(self): def test_repo_to_source_gpgkey(self):
"""Test a repo with a GPG key""" """Test a repo with a GPG key API v0"""
self.assertEqual(repo_to_source(self.dbo.repos.get("fake-repo-gpgkey"), False), fakerepo_gpgkey()) self.assertEqual(repo_to_source(self.dbo.repos.get("fake-repo-gpgkey"), False, 0), fakerepo_gpgkey_v0())
def test_repo_to_source_gpgkey_v1(self):
"""Test a repo with a GPG key API v1"""
self.assertEqual(repo_to_source(self.dbo.repos.get("fake-repo-gpgkey"), False, 1), fakerepo_gpgkey_v1())
def test_get_repo_sources(self): def test_get_repo_sources(self):
"""Test getting a list of sources from a repo directory""" """Test getting a list of sources from a repo directory"""
@ -373,29 +439,54 @@ class SourceTest(unittest.TestCase):
self.assertTrue("other-repo" in sources) self.assertTrue("other-repo" in sources)
def test_source_to_repo_baseurl(self): def test_source_to_repo_baseurl(self):
"""Test creating a dnf.Repo with a baseurl""" """Test creating a dnf.Repo with a baseurl API v0"""
repo = source_to_repo(fakerepo_baseurl(), self.dbo.conf) repo = source_to_repo(fakerepo_baseurl_v0(), self.dbo.conf)
self.assertEqual(repo.baseurl[0], fakerepo_baseurl()["url"]) self.assertEqual(repo.baseurl[0], fakerepo_baseurl_v0()["url"])
def test_source_to_repo_baseurl_v1(self):
"""Test creating a dnf.Repo with a baseurl API v1"""
repo = source_to_repo(fakerepo_baseurl_v1(), self.dbo.conf)
self.assertEqual(repo.baseurl[0], fakerepo_baseurl_v1()["url"])
def test_source_to_repo_metalink(self): def test_source_to_repo_metalink(self):
"""Test creating a dnf.Repo with a metalink""" """Test creating a dnf.Repo with a metalink API v0"""
repo = source_to_repo(fakerepo_metalink(), self.dbo.conf) repo = source_to_repo(fakerepo_metalink_v0(), self.dbo.conf)
self.assertEqual(repo.metalink, fakerepo_metalink()["url"]) self.assertEqual(repo.metalink, fakerepo_metalink_v0()["url"])
def test_source_to_repo_metalink_v1(self):
"""Test creating a dnf.Repo with a metalink API v1"""
repo = source_to_repo(fakerepo_metalink_v1(), self.dbo.conf)
self.assertEqual(repo.metalink, fakerepo_metalink_v1()["url"])
def test_source_to_repo_mirrorlist(self): def test_source_to_repo_mirrorlist(self):
"""Test creating a dnf.Repo with a mirrorlist API v0"""
repo = source_to_repo(fakerepo_mirrorlist_v0(), self.dbo.conf)
self.assertEqual(repo.mirrorlist, fakerepo_mirrorlist_v0()["url"])
def test_source_to_repo_mirrorlist_v1(self):
"""Test creating a dnf.Repo with a mirrorlist""" """Test creating a dnf.Repo with a mirrorlist"""
repo = source_to_repo(fakerepo_mirrorlist(), self.dbo.conf) repo = source_to_repo(fakerepo_mirrorlist_v1(), self.dbo.conf)
self.assertEqual(repo.mirrorlist, fakerepo_mirrorlist()["url"]) self.assertEqual(repo.mirrorlist, fakerepo_mirrorlist_v1()["url"])
def test_source_to_repo_proxy(self): def test_source_to_repo_proxy(self):
"""Test creating a dnf.Repo with a proxy""" """Test creating a dnf.Repo with a proxy API v0"""
repo = source_to_repo(fakerepo_proxy(), self.dbo.conf) repo = source_to_repo(fakerepo_proxy_v0(), self.dbo.conf)
self.assertEqual(repo.proxy, fakerepo_proxy()["proxy"]) self.assertEqual(repo.proxy, fakerepo_proxy_v0()["proxy"])
def test_source_to_repo_proxy_v1(self):
"""Test creating a dnf.Repo with a proxy API v1"""
repo = source_to_repo(fakerepo_proxy_v1(), self.dbo.conf)
self.assertEqual(repo.proxy, fakerepo_proxy_v1()["proxy"])
def test_source_to_repo_gpgkey(self): def test_source_to_repo_gpgkey(self):
"""Test creating a dnf.Repo with a proxy""" """Test creating a dnf.Repo with a proxy API v0"""
repo = source_to_repo(fakerepo_gpgkey(), self.dbo.conf) repo = source_to_repo(fakerepo_gpgkey_v0(), self.dbo.conf)
self.assertEqual(repo.gpgkey[0], fakerepo_gpgkey()["gpgkey_urls"][0]) self.assertEqual(repo.gpgkey[0], fakerepo_gpgkey_v0()["gpgkey_urls"][0])
def test_source_to_repo_gpgkey_v1(self):
"""Test creating a dnf.Repo with a proxy API v1"""
repo = source_to_repo(fakerepo_gpgkey_v1(), self.dbo.conf)
self.assertEqual(repo.gpgkey[0], fakerepo_gpgkey_v1()["gpgkey_urls"][0])
def test_drtfr_baseurl(self): def test_drtfr_baseurl(self):
"""Test creating a dnf .repo file from a baseurl Repo object""" """Test creating a dnf .repo file from a baseurl Repo object"""
@ -423,5 +514,9 @@ class SourceTest(unittest.TestCase):
self._read("gpgkey-test.repo")) self._read("gpgkey-test.repo"))
def test_repo_to_source_json(self): def test_repo_to_source_json(self):
"""Test serializing repo_to_source results""" """Test serializing repo_to_source results API v0"""
self.assertEqual(repo_to_source(self.dbo.repos.get("single-repo"), False), singlerepo()) self.assertEqual(repo_to_source(self.dbo.repos.get("single-repo"), False, 0), singlerepo_v0())
def test_repo_to_source_json_v1(self):
"""Test serializing repo_to_source results API v1"""
self.assertEqual(repo_to_source(self.dbo.repos.get("single-repo"), False, 1), singlerepo_v1())

View File

@ -653,12 +653,27 @@ class ServerTestCase(unittest.TestCase):
def test_projects_source_00_info(self): def test_projects_source_00_info(self):
"""Test /api/v0/projects/source/info""" """Test /api/v0/projects/source/info"""
resp = self.server.get("/api/v0/projects/source/info/single-repo") resp = self.server.get("/api/v0/projects/source/info/lorax-3")
data = json.loads(resp.data) data = json.loads(resp.data)
self.assertNotEqual(data, None) self.assertNotEqual(data, None)
print(data["sources"]) print(data["sources"])
sources = data["sources"] sources = data["sources"]
self.assertTrue("single-repo" in sources) self.assertTrue("lorax-3" in sources)
self.assertTrue("id" not in sources["lorax-3"])
self.assertTrue("name" in sources["lorax-3"])
self.assertEqual(sources["lorax-3"]["name"], "lorax-3")
def test_projects_source_01_info(self):
"""Test /api/v1/projects/source/info"""
resp = self.server.get("/api/v1/projects/source/info/lorax-3")
data = json.loads(resp.data)
self.assertNotEqual(data, None)
sources = data["sources"]
self.assertTrue("lorax-3" in sources)
self.assertTrue("id" in sources["lorax-3"])
self.assertEqual(sources["lorax-3"]["id"], "lorax-3")
self.assertTrue("name" in sources["lorax-3"])
self.assertEqual(sources["lorax-3"]["name"], "Lorax test repo 3")
def test_projects_source_00_new_json(self): def test_projects_source_00_new_json(self):
"""Test /api/v0/projects/source/new with a new json source""" """Test /api/v0/projects/source/new with a new json source"""
@ -677,6 +692,38 @@ class ServerTestCase(unittest.TestCase):
sources = data["sources"] sources = data["sources"]
self.assertTrue("new-repo-1" in sources) self.assertTrue("new-repo-1" in sources)
def test_projects_source_01_new_json(self):
"""Test /api/v1/projects/source/new with a new json source"""
json_source = open("./tests/pylorax/source/test-repo-v1.json").read()
self.assertTrue(len(json_source) > 0)
resp = self.server.post("/api/v1/projects/source/new",
data=json_source,
content_type="application/json")
data = json.loads(resp.data)
self.assertEqual(data, {"status":True})
# Was it added, and was is it correct?
resp = self.server.get("/api/v1/projects/source/info/new-repo-1-v1")
data = json.loads(resp.data)
self.assertNotEqual(data, None)
sources = data["sources"]
self.assertTrue("new-repo-1-v1" in sources)
self.assertTrue("id" in sources["new-repo-1-v1"])
self.assertEqual(sources["new-repo-1-v1"]["id"], "new-repo-1-v1")
self.assertTrue("name" in sources["new-repo-1-v1"])
self.assertEqual(sources["new-repo-1-v1"]["name"], "API v1 json new repo")
def test_projects_source_02_new_json(self):
"""Test /api/v1/projects/source/new with a new json source missing id field"""
json_source = open("./tests/pylorax/source/test-repo.json").read()
self.assertTrue(len(json_source) > 0)
resp = self.server.post("/api/v1/projects/source/new",
data=json_source,
content_type="application/json")
self.assertEqual(resp.status_code, 400)
data = json.loads(resp.data)
self.assertEqual(data["status"], False)
def test_projects_source_00_new_toml(self): def test_projects_source_00_new_toml(self):
"""Test /api/v0/projects/source/new with a new toml source""" """Test /api/v0/projects/source/new with a new toml source"""
toml_source = open("./tests/pylorax/source/test-repo.toml").read() toml_source = open("./tests/pylorax/source/test-repo.toml").read()
@ -694,6 +741,38 @@ class ServerTestCase(unittest.TestCase):
sources = data["sources"] sources = data["sources"]
self.assertTrue("new-repo-2" in sources) self.assertTrue("new-repo-2" in sources)
def test_projects_source_01_new_toml(self):
"""Test /api/v1/projects/source/new with a new toml source"""
toml_source = open("./tests/pylorax/source/test-repo-v1.toml").read()
self.assertTrue(len(toml_source) > 0)
resp = self.server.post("/api/v1/projects/source/new",
data=toml_source,
content_type="text/x-toml")
data = json.loads(resp.data)
self.assertEqual(data, {"status":True})
# Was it added, and was is it correct?
resp = self.server.get("/api/v1/projects/source/info/new-repo-2-v1")
data = json.loads(resp.data)
self.assertNotEqual(data, None)
sources = data["sources"]
self.assertTrue("new-repo-2-v1" in sources)
self.assertTrue("id" in sources["new-repo-2-v1"])
self.assertEqual(sources["new-repo-2-v1"]["id"], "new-repo-2-v1")
self.assertTrue("name" in sources["new-repo-2-v1"])
self.assertEqual(sources["new-repo-2-v1"]["name"], "API v1 toml new repo")
def test_projects_source_02_new_toml(self):
"""Test /api/v1/projects/source/new with a new toml source w/o id field"""
toml_source = open("./tests/pylorax/source/test-repo.toml").read()
self.assertTrue(len(toml_source) > 0)
resp = self.server.post("/api/v1/projects/source/new",
data=toml_source,
content_type="text/x-toml")
self.assertEqual(resp.status_code, 400)
data = json.loads(resp.data)
self.assertEqual(data["status"], False)
def test_projects_source_00_replace(self): def test_projects_source_00_replace(self):
"""Test /api/v0/projects/source/new with a replacement source""" """Test /api/v0/projects/source/new with a replacement source"""
toml_source = open("./tests/pylorax/source/replace-repo.toml").read() toml_source = open("./tests/pylorax/source/replace-repo.toml").read()
@ -1424,10 +1503,15 @@ class ServerTestCase(unittest.TestCase):
self.assertInputError(resp) self.assertInputError(resp)
def test_projects_source_info_input(self): def test_projects_source_info_input(self):
"""Test the projects/source/info input character checking""" """Test the /api/v0/projects/source/info input character checking"""
resp = self.server.get("/api/v0/projects/source/info/" + UTF8_TEST_STRING) resp = self.server.get("/api/v0/projects/source/info/" + UTF8_TEST_STRING)
self.assertInputError(resp) self.assertInputError(resp)
def test_projects_source_info_v1_input(self):
"""Test the /api/v1/projects/source/info input character checking"""
resp = self.server.get("/api/v1/projects/source/info/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_projects_source_delete_input(self): def test_projects_source_delete_input(self):
"""Test the projects/source/delete input character checking""" """Test the projects/source/delete input character checking"""
resp = self.server.delete("/api/v0/projects/source/delete/" + UTF8_TEST_STRING) resp = self.server.delete("/api/v0/projects/source/delete/" + UTF8_TEST_STRING)