Add support for git-credential-helper

This patch adds an additional field `options` to scm_dict, which can be
used to provide additional information to the backends.

It implements a single new option for GitWrapper. This option allows
setting a custom git credentials wrapper. This can be useful if Pungi
needs to get files from a git repository that requires authentication.

The helper can be as simple as this (assuming the username is already
provided in the url):

    #!/bin/sh
    echo password=i-am-secret

The helper would need to be referenced by an absolute path from the
pungi configuration, or prefixed with ! to have git interpret it as a
shell script and look it up in PATH.

See https://git-scm.com/docs/gitcredentials for more details.

Signed-off-by: Lubomír Sedlář <lsedlar@redhat.com>
JIRA: RHELCMP-11808
(cherry picked from commit ada8f4e346)
This commit is contained in:
Lubomír Sedlář 2023-06-14 13:08:31 +02:00 committed by Stepan Oksanichenko
parent cccfaea14e
commit f3485410ad
Signed by: soksanichenko
GPG Key ID: AB9983172AB1E45B
8 changed files with 139 additions and 51 deletions

View File

@ -41,6 +41,14 @@ which can contain following keys.
* ``command`` -- defines a shell command to run after Git clone to generate the
needed file (for example to run ``make``). Only supported in Git backend.
* ``options`` -- a dictionary of additional configuration options. These are
specific to different backends.
Currently supported values for Git:
* ``credential_helper`` -- path to a credential helper used to supply
username/password for remotes that require authentication.
Koji examples
-------------

View File

@ -387,6 +387,7 @@ def _extend_with_default_and_alias(validator_class, offline=False):
instance[property]["branch"] = resolver(
instance[property]["repo"],
instance[property].get("branch") or "HEAD",
instance[property].get("options"),
)
for error in _hook_errors(properties, instance, schema):
@ -520,6 +521,13 @@ def make_schema():
"file": {"type": "string"},
"dir": {"type": "string"},
"command": {"type": "string"},
"options": {
"type": "object",
"properties": {
"credential_helper": {"type": "string"},
},
"additionalProperties": False,
},
},
"additionalProperties": False,
},

View File

@ -279,7 +279,7 @@ class GitUrlResolveError(RuntimeError):
pass
def resolve_git_ref(repourl, ref):
def resolve_git_ref(repourl, ref, credential_helper=None):
"""Resolve a reference in a Git repo to a commit.
Raises RuntimeError if there was an error. Most likely cause is failure to
@ -289,7 +289,7 @@ def resolve_git_ref(repourl, ref):
# This looks like a commit ID already.
return ref
try:
_, output = git_ls_remote(repourl, ref)
_, output = git_ls_remote(repourl, ref, credential_helper)
except RuntimeError as e:
raise GitUrlResolveError(
"ref does not exist in remote repo %s with the error %s %s"
@ -316,7 +316,7 @@ def resolve_git_ref(repourl, ref):
return lines[0].split()[0]
def resolve_git_url(url):
def resolve_git_url(url, credential_helper=None):
"""Given a url to a Git repo specifying HEAD or origin/<branch> as a ref,
replace that specifier with actual SHA1 of the commit.
@ -335,7 +335,7 @@ def resolve_git_url(url):
scheme = r.scheme.replace("git+", "")
baseurl = urllib.parse.urlunsplit((scheme, r.netloc, r.path, "", ""))
fragment = resolve_git_ref(baseurl, ref)
fragment = resolve_git_ref(baseurl, ref, credential_helper)
result = urllib.parse.urlunsplit((r.scheme, r.netloc, r.path, r.query, fragment))
if "?#" in url:
@ -354,13 +354,18 @@ class GitUrlResolver(object):
self.offline = offline
self.cache = {}
def __call__(self, url, branch=None):
def __call__(self, url, branch=None, options=None):
credential_helper = options.get("credential_helper") if options else None
if self.offline:
return branch or url
key = (url, branch)
if key not in self.cache:
try:
res = resolve_git_ref(url, branch) if branch else resolve_git_url(url)
res = (
resolve_git_ref(url, branch, credential_helper)
if branch
else resolve_git_url(url, credential_helper)
)
self.cache[key] = res
except GitUrlResolveError as exc:
self.cache[key] = exc
@ -991,8 +996,12 @@ def retry(timeout=120, interval=30, wait_on=Exception):
@retry(wait_on=RuntimeError)
def git_ls_remote(baseurl, ref):
return run(["git", "ls-remote", baseurl, ref], universal_newlines=True)
def git_ls_remote(baseurl, ref, credential_helper=None):
cmd = ["git"]
if credential_helper:
cmd.extend(["-c", "credential.useHttpPath=true"])
cmd.extend(["-c", "credential.helper=%s" % credential_helper])
return run(cmd + ["ls-remote", baseurl, ref], universal_newlines=True)
def get_tz_offset():

View File

@ -31,10 +31,11 @@ from .kojiwrapper import KojiWrapper
class ScmBase(kobo.log.LoggingBase):
def __init__(self, logger=None, command=None, compose=None):
def __init__(self, logger=None, command=None, compose=None, options=None):
kobo.log.LoggingBase.__init__(self, logger=logger)
self.command = command
self.compose = compose
self.options = options or {}
@retry(interval=60, timeout=300, wait_on=RuntimeError)
def retry_run(self, cmd, **kwargs):
@ -156,22 +157,31 @@ class GitWrapper(ScmBase):
if "://" not in repo:
repo = "file://%s" % repo
git_cmd = ["git"]
if "credential_helper" in self.options:
git_cmd.extend(["-c", "credential.useHttpPath=true"])
git_cmd.extend(
["-c", "credential.helper=%s" % self.options["credential_helper"]]
)
run(["git", "init"], workdir=destdir)
try:
run(["git", "fetch", "--depth=1", repo, branch], workdir=destdir)
run(git_cmd + ["fetch", "--depth=1", repo, branch], workdir=destdir)
run(["git", "checkout", "FETCH_HEAD"], workdir=destdir)
except RuntimeError as e:
# Fetch failed, to do a full clone we add a remote to our empty
# repo, get its content and check out the reference we want.
self.log_debug(
"Trying to do a full clone because shallow clone failed: %s %s"
% (e, e.output)
% (e, getattr(e, "output", ""))
)
try:
# Re-run git init in case of previous failure breaking .git dir
run(["git", "init"], workdir=destdir)
run(["git", "remote", "add", "origin", repo], workdir=destdir)
self.retry_run(["git", "remote", "update", "origin"], workdir=destdir)
self.retry_run(
git_cmd + ["remote", "update", "origin"], workdir=destdir
)
run(["git", "checkout", branch], workdir=destdir)
except RuntimeError:
if self.compose:
@ -361,15 +371,19 @@ def get_file_from_scm(scm_dict, target_path, compose=None):
scm_file = os.path.abspath(scm_dict)
scm_branch = None
command = None
options = {}
else:
scm_type = scm_dict["scm"]
scm_repo = scm_dict["repo"]
scm_file = scm_dict["file"]
scm_branch = scm_dict.get("branch", None)
command = scm_dict.get("command")
options = scm_dict.get("options", {})
logger = compose._logger if compose else None
scm = _get_wrapper(scm_type, logger=logger, command=command, compose=compose)
scm = _get_wrapper(
scm_type, logger=logger, command=command, compose=compose, options=options
)
files_copied = []
for i in force_list(scm_file):
@ -450,15 +464,19 @@ def get_dir_from_scm(scm_dict, target_path, compose=None):
scm_dir = os.path.abspath(scm_dict)
scm_branch = None
command = None
options = {}
else:
scm_type = scm_dict["scm"]
scm_repo = scm_dict.get("repo", None)
scm_dir = scm_dict["dir"]
scm_branch = scm_dict.get("branch", None)
command = scm_dict.get("command")
options = scm_dict.get("options", {})
logger = compose._logger if compose else None
scm = _get_wrapper(scm_type, logger=logger, command=command, compose=compose)
scm = _get_wrapper(
scm_type, logger=logger, command=command, compose=compose, options=options
)
with temp_dir(prefix="scm_checkout_") as tmp_dir:
scm.export_dir(scm_repo, scm_dir, scm_branch=scm_branch, target_dir=tmp_dir)

View File

@ -21,6 +21,15 @@ from pungi import paths, checks
from pungi.module_util import Modulemd
GIT_WITH_CREDS = [
"git",
"-c",
"credential.useHttpPath=true",
"-c",
"credential.helper=!ch",
]
class BaseTestCase(unittest.TestCase):
def assertFilesEqual(self, fn1, fn2):
with open(fn1, "rb") as f1:

View File

@ -440,7 +440,7 @@ class LiveMediaConfigTestCase(ConfigTestCase):
live_media_version="Rawhide",
)
resolve_git_url.side_effect = lambda x: x.replace("HEAD", "CAFE")
resolve_git_url.side_effect = lambda x, _helper: x.replace("HEAD", "CAFE")
self.assertValidation(cfg)
self.assertEqual(cfg["live_media_ksurl"], "git://example.com/repo.git#CAFE")

View File

@ -13,8 +13,10 @@ import random
import os
import six
from parameterized import parameterized
from pungi.wrappers import scm
from tests.helpers import touch
from tests.helpers import touch, GIT_WITH_CREDS
from kobo.shortcuts import run
@ -109,37 +111,45 @@ class FileSCMTestCase(SCMBaseTest):
self.assertIn("No directories matched", str(ctx.exception))
CREDENTIALS_CONFIG = {"credential_helper": "!ch"}
class GitSCMTestCase(SCMBaseTest):
def assertCalls(self, mock_run, url, branch, command=None):
def assertCalls(self, mock_run, url, branch, command=None, with_creds=False):
git = GIT_WITH_CREDS if with_creds else ["git"]
command = [command] if command else []
self.assertEqual(
[call[0][0] for call in mock_run.call_args_list],
[
["git", "init"],
["git", "fetch", "--depth=1", url, branch],
git + ["fetch", "--depth=1", url, branch],
["git", "checkout", "FETCH_HEAD"],
]
+ command,
)
@mock.patch("pungi.wrappers.scm.run")
def test_get_file(self, run):
@parameterized.expand([("without_creds", {}), ("with_creds", CREDENTIALS_CONFIG)])
def test_get_file(self, _name, config):
def process(cmd, workdir=None, **kwargs):
touch(os.path.join(workdir, "some_file.txt"))
touch(os.path.join(workdir, "other_file.txt"))
run.side_effect = process
with mock.patch("pungi.wrappers.scm.run") as run:
run.side_effect = process
retval = scm.get_file_from_scm(
{
"scm": "git",
"repo": "git://example.com/git/repo.git",
"file": "some_file.txt",
"options": config,
},
self.destdir,
)
retval = scm.get_file_from_scm(
{
"scm": "git",
"repo": "git://example.com/git/repo.git",
"file": "some_file.txt",
},
self.destdir,
)
self.assertStructure(retval, ["some_file.txt"])
self.assertCalls(run, "git://example.com/git/repo.git", "master")
self.assertCalls(
run, "git://example.com/git/repo.git", "master", with_creds=bool(config)
)
@mock.patch("pungi.wrappers.scm.run")
def test_get_file_function(self, run):
@ -163,9 +173,10 @@ class GitSCMTestCase(SCMBaseTest):
self.assertEqual(retval, destination)
self.assertCalls(run, "git://example.com/git/repo.git", "master")
@mock.patch("pungi.wrappers.scm.run")
def test_get_file_fetch_fails(self, run):
@parameterized.expand([("without_creds", {}), ("with_creds", CREDENTIALS_CONFIG)])
def test_get_file_fetch_fails(self, _name, config):
url = "git://example.com/git/repo.git"
git = GIT_WITH_CREDS if config else ["git"]
def process(cmd, workdir=None, **kwargs):
if "fetch" in cmd:
@ -175,18 +186,20 @@ class GitSCMTestCase(SCMBaseTest):
touch(os.path.join(workdir, "some_file.txt"))
touch(os.path.join(workdir, "other_file.txt"))
run.side_effect = process
with mock.patch("pungi.wrappers.scm.run") as run:
run.side_effect = process
retval = scm.get_file_from_scm(
{"scm": "git", "repo": url, "file": "some_file.txt", "options": config},
self.destdir,
)
retval = scm.get_file_from_scm(
{"scm": "git", "repo": url, "file": "some_file.txt"}, self.destdir
)
self.assertStructure(retval, ["some_file.txt"])
self.assertEqual(
[call[0][0] for call in run.call_args_list],
[
["git", "init"],
[
"git",
git
+ [
"fetch",
"--depth=1",
"git://example.com/git/repo.git",
@ -194,7 +207,7 @@ class GitSCMTestCase(SCMBaseTest):
],
["git", "init"],
["git", "remote", "add", "origin", url],
["git", "remote", "update", "origin"],
git + ["remote", "update", "origin"],
["git", "checkout", "master"],
],
)
@ -243,20 +256,28 @@ class GitSCMTestCase(SCMBaseTest):
self.assertEqual(str(ctx.exception), "'make' failed with exit code 1")
@mock.patch("pungi.wrappers.scm.run")
def test_get_dir(self, run):
@parameterized.expand([("without_creds", {}), ("with_creds", CREDENTIALS_CONFIG)])
def test_get_dir(self, _name, config):
def process(cmd, workdir=None, **kwargs):
touch(os.path.join(workdir, "subdir", "first"))
touch(os.path.join(workdir, "subdir", "second"))
run.side_effect = process
with mock.patch("pungi.wrappers.scm.run") as run:
run.side_effect = process
retval = scm.get_dir_from_scm(
{
"scm": "git",
"repo": "git://example.com/git/repo.git",
"dir": "subdir",
"options": config,
},
self.destdir,
)
retval = scm.get_dir_from_scm(
{"scm": "git", "repo": "git://example.com/git/repo.git", "dir": "subdir"},
self.destdir,
)
self.assertStructure(retval, ["first", "second"])
self.assertCalls(run, "git://example.com/git/repo.git", "master")
self.assertCalls(
run, "git://example.com/git/repo.git", "master", with_creds=bool(config)
)
@mock.patch("pungi.wrappers.scm.run")
def test_get_dir_and_generate(self, run):

View File

@ -16,7 +16,7 @@ import six
from pungi import compose
from pungi import util
from tests.helpers import touch, PungiTestCase, mk_boom
from tests.helpers import touch, PungiTestCase, mk_boom, GIT_WITH_CREDS
class TestGitRefResolver(unittest.TestCase):
@ -32,6 +32,20 @@ class TestGitRefResolver(unittest.TestCase):
universal_newlines=True,
)
@mock.patch("pungi.util.run")
def test_successful_resolve_with_credentials(self, run):
run.return_value = (0, "CAFEBABE\tHEAD\n")
url = util.resolve_git_url(
"https://git.example.com/repo.git?somedir#HEAD", "!ch"
)
self.assertEqual(url, "https://git.example.com/repo.git?somedir#CAFEBABE")
run.assert_called_once_with(
GIT_WITH_CREDS + ["ls-remote", "https://git.example.com/repo.git", "HEAD"],
universal_newlines=True,
)
@mock.patch("pungi.util.run")
def test_successful_resolve_branch(self, run):
run.return_value = (0, "CAFEBABE\trefs/heads/f24\n")
@ -211,11 +225,12 @@ class TestGitRefResolver(unittest.TestCase):
self.assertEqual(resolver(url2), "2")
self.assertEqual(resolver(url3, ref2), "beef")
self.assertEqual(
mock_resolve_url.call_args_list, [mock.call(url1), mock.call(url2)]
mock_resolve_url.call_args_list,
[mock.call(url1, None), mock.call(url2, None)],
)
self.assertEqual(
mock_resolve_ref.call_args_list,
[mock.call(url3, ref1), mock.call(url3, ref2)],
[mock.call(url3, ref1, None), mock.call(url3, ref2, None)],
)
@mock.patch("pungi.util.resolve_git_url")
@ -227,7 +242,7 @@ class TestGitRefResolver(unittest.TestCase):
resolver(url)
with self.assertRaises(util.GitUrlResolveError):
resolver(url)
self.assertEqual(mock_resolve.call_args_list, [mock.call(url)])
self.assertEqual(mock_resolve.call_args_list, [mock.call(url, None)])
class TestGetVariantData(unittest.TestCase):