pungi/pungi/wrappers/scm.py
Lubomír Sedlář a49704b2b8 scm: Stop decoding output of post-clone command
There is no guarantee that it will print any text. We don't even need
the output, we just print it to error log if there is a problem.

Fixes: https://pagure.io/pungi/issue/847
Signed-off-by: Lubomír Sedlář <lsedlar@redhat.com>
2018-02-20 12:18:48 +01:00

302 lines
12 KiB
Python

from __future__ import absolute_import
# -*- coding: utf-8 -*-
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 2 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Library General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <https://gnu.org/licenses/>.
import os
import shutil
import glob
import six
from six.moves import shlex_quote
import kobo.log
from kobo.shortcuts import run, force_list
from pungi.util import (explode_rpm_package, makedirs, copy_all, temp_dir,
retry)
class ScmBase(kobo.log.LoggingBase):
def __init__(self, logger=None, command=None):
kobo.log.LoggingBase.__init__(self, logger=logger)
self.command = command
@retry(interval=60, timeout=300, wait_on=RuntimeError)
def retry_run(self, cmd, **kwargs):
"""
@param cmd - cmd passed to kobo.shortcuts.run()
@param **kwargs - args passed to kobo.shortcuts.run()
"""
return run(cmd, **kwargs)
def run_process_command(self, cwd):
if self.command:
self.log_debug('Running "%s"' % self.command)
retcode, output = run(self.command, workdir=cwd, can_fail=True)
if retcode != 0:
self.log_error('Output was: %r' % output)
raise RuntimeError('%r failed with exit code %s'
% (self.command, retcode))
class FileWrapper(ScmBase):
def export_dir(self, scm_root, scm_dir, target_dir, scm_branch=None):
self.log_debug("Exporting directory %s from current working directory..."
% (scm_dir))
if scm_root:
raise ValueError("FileWrapper: 'scm_root' should be empty.")
dirs = glob.glob(scm_dir)
if not dirs:
raise RuntimeError('No directories matched, can not export.')
for i in dirs:
copy_all(i, target_dir)
def export_file(self, scm_root, scm_file, target_dir, scm_branch=None):
if scm_root:
raise ValueError("FileWrapper: 'scm_root' should be empty.")
self.log_debug("Exporting file %s from current working directory..."
% (scm_file))
files = glob.glob(scm_file)
if not files:
raise RuntimeError('No files matched, can not export.')
for i in files:
target_path = os.path.join(target_dir, os.path.basename(i))
shutil.copy2(i, target_path)
class CvsWrapper(ScmBase):
def export_dir(self, scm_root, scm_dir, target_dir, scm_branch=None):
scm_dir = scm_dir.lstrip("/")
scm_branch = scm_branch or "HEAD"
with temp_dir() as tmp_dir:
self.log_debug("Exporting directory %s from CVS %s (branch %s)..."
% (scm_dir, scm_root, scm_branch))
self.retry_run(["/usr/bin/cvs", "-q", "-d", scm_root, "export", "-r", scm_branch, scm_dir],
workdir=tmp_dir, show_cmd=True)
copy_all(os.path.join(tmp_dir, scm_dir), target_dir)
def export_file(self, scm_root, scm_file, target_dir, scm_branch=None):
scm_file = scm_file.lstrip("/")
scm_branch = scm_branch or "HEAD"
with temp_dir() as tmp_dir:
target_path = os.path.join(target_dir, os.path.basename(scm_file))
self.log_debug("Exporting file %s from CVS %s (branch %s)..." % (scm_file, scm_root, scm_branch))
self.retry_run(["/usr/bin/cvs", "-q", "-d", scm_root, "export", "-r", scm_branch, scm_file],
workdir=tmp_dir, show_cmd=True)
makedirs(target_dir)
shutil.copy2(os.path.join(tmp_dir, scm_file), target_path)
class GitWrapper(ScmBase):
def export_dir(self, scm_root, scm_dir, target_dir, scm_branch=None):
scm_dir = scm_dir.lstrip("/")
scm_branch = scm_branch or "master"
with temp_dir() as tmp_dir:
if "://" not in scm_root:
scm_root = "file://%s" % scm_root
self.log_debug("Exporting directory %s from git %s (branch %s)..."
% (scm_dir, scm_root, scm_branch))
cmd = ("/usr/bin/git archive --remote=%s %s %s | tar xf -"
% (shlex_quote(scm_root), shlex_quote(scm_branch), shlex_quote(scm_dir)))
# git archive is not supported by http/https
# or by smart http https://git-scm.com/book/en/v2/Git-on-the-Server-Smart-HTTP
if scm_root.startswith("http") or self.command:
cmd = ("/usr/bin/git clone --depth 1 --branch=%s %s %s"
% (shlex_quote(scm_branch), shlex_quote(scm_root), shlex_quote(tmp_dir)))
self.retry_run(cmd, workdir=tmp_dir, show_cmd=True)
self.run_process_command(tmp_dir)
copy_all(os.path.join(tmp_dir, scm_dir), target_dir)
def export_file(self, scm_root, scm_file, target_dir, scm_branch=None):
scm_file = scm_file.lstrip("/")
scm_branch = scm_branch or "master"
with temp_dir() as tmp_dir:
target_path = os.path.join(target_dir, os.path.basename(scm_file))
if "://" not in scm_root:
scm_root = "file://%s" % scm_root
self.log_debug("Exporting file %s from git %s (branch %s)..."
% (scm_file, scm_root, scm_branch))
cmd = ("/usr/bin/git archive --remote=%s %s %s | tar xf -"
% (shlex_quote(scm_root), shlex_quote(scm_branch), shlex_quote(scm_file)))
# git archive is not supported by http/https
# or by smart http https://git-scm.com/book/en/v2/Git-on-the-Server-Smart-HTTP
if scm_root.startswith("http") or self.command:
cmd = ("/usr/bin/git clone --depth 1 --branch=%s %s %s"
% (shlex_quote(scm_branch), shlex_quote(scm_root), shlex_quote(tmp_dir)))
self.retry_run(cmd, workdir=tmp_dir, show_cmd=True)
self.run_process_command(tmp_dir)
makedirs(target_dir)
shutil.copy2(os.path.join(tmp_dir, scm_file), target_path)
class RpmScmWrapper(ScmBase):
def _list_rpms(self, pats):
for pat in force_list(pats):
for rpm in glob.glob(pat):
yield rpm
def export_dir(self, scm_root, scm_dir, target_dir, scm_branch=None):
for rpm in self._list_rpms(scm_root):
scm_dir = scm_dir.lstrip("/")
with temp_dir() as tmp_dir:
self.log_debug("Extracting directory %s from RPM package %s..." % (scm_dir, rpm))
explode_rpm_package(rpm, tmp_dir)
makedirs(target_dir)
# "dir" includes the whole directory while "dir/" includes it's content
if scm_dir.endswith("/"):
copy_all(os.path.join(tmp_dir, scm_dir), target_dir)
else:
run("cp -a %s %s/" % (shlex_quote(os.path.join(tmp_dir, scm_dir)),
shlex_quote(target_dir)))
def export_file(self, scm_root, scm_file, target_dir, scm_branch=None):
for rpm in self._list_rpms(scm_root):
scm_file = scm_file.lstrip("/")
with temp_dir() as tmp_dir:
self.log_debug("Exporting file %s from RPM file %s..." % (scm_file, rpm))
explode_rpm_package(rpm, tmp_dir)
makedirs(target_dir)
for src in glob.glob(os.path.join(tmp_dir, scm_file)):
dst = os.path.join(target_dir, os.path.basename(src))
shutil.copy2(src, dst)
def _get_wrapper(scm_type, *args, **kwargs):
SCM_WRAPPERS = {
"file": FileWrapper,
"cvs": CvsWrapper,
"git": GitWrapper,
"rpm": RpmScmWrapper,
}
try:
return SCM_WRAPPERS[scm_type](*args, **kwargs)
except KeyError:
raise ValueError("Unknown SCM type: %s" % scm_type)
def get_file_from_scm(scm_dict, target_path, logger=None):
"""
Copy one or more files from source control to a target path. A list of files
created in ``target_path`` is returned.
:param scm_dict:
A dictionary describing the source control repository; this can
optionally be a path to a directory on the local filesystem or reference
an RPM. Supported keys for the dictionary are ``scm``, ``repo``,
``file``, and ``branch``. ``scm`` is the type of version control system
used ('git', 'cvs', 'rpm', etc.), ``repo`` is the URL of the repository
(or, if 'rpm' is the ``scm``, the package name), ``file`` is either a
path or list of paths to copy, and ``branch`` is the branch to check
out, if any.
:param target_path:
The destination path for the files being copied.
:param logger:
The logger to use for any logging performed.
Example:
>>> scm_dict = {
>>> 'scm': 'git',
>>> 'repo': 'https://pagure.io/pungi.git',
>>> 'file': ['share/variants.dtd'],
>>> }
>>> target_path = '/tmp/path/'
>>> get_file_from_scm(scm_dict, target_path)
['/tmp/path/share/variants.dtd']
"""
if isinstance(scm_dict, six.string_types):
scm_type = "file"
scm_repo = None
scm_file = os.path.abspath(scm_dict)
scm_branch = None
command = None
else:
scm_type = scm_dict["scm"]
scm_repo = scm_dict["repo"]
scm_file = scm_dict["file"]
scm_branch = scm_dict.get("branch", None)
command = scm_dict.get('command')
scm = _get_wrapper(scm_type, logger=logger, command=command)
files_copied = []
for i in force_list(scm_file):
with temp_dir(prefix="scm_checkout_") as tmp_dir:
scm.export_file(scm_repo, i, scm_branch=scm_branch, target_dir=tmp_dir)
files_copied += copy_all(tmp_dir, target_path)
return files_copied
def get_dir_from_scm(scm_dict, target_path, logger=None):
"""
Copy a directory from source control to a target path. A list of files
created in ``target_path`` is returned.
:param scm_dict:
A dictionary describing the source control repository; this can
optionally be a path to a directory on the local filesystem or reference
an RPM. Supported keys for the dictionary are ``scm``, ``repo``,
``dir``, and ``branch``. ``scm`` is the type of version control system
used ('git', 'cvs', 'rpm', etc.), ``repo`` is the URL of the repository
(or, if 'rpm' is the ``scm``, the package name), ``dir`` is the
directory to copy, and ``branch`` is the branch to check out, if any.
:param target_path:
The destination path for the directory being copied.
:param logger:
The logger to use for any logging performed.
Example:
>>> scm_dict = {
>>> 'scm': 'git',
>>> 'repo': 'https://pagure.io/pungi.git',
>>> 'dir': 'share,
>>> }
>>> target_path = '/tmp/path/'
>>> get_dir_from_scm(scm_dict, target_path)
['/tmp/path/share/variants.dtd', '/tmp/path/share/rawhide-fedora.ks', ...]
"""
if isinstance(scm_dict, six.string_types):
scm_type = "file"
scm_repo = None
scm_dir = os.path.abspath(scm_dict)
scm_branch = None
command = None
else:
scm_type = scm_dict["scm"]
scm_repo = scm_dict.get("repo", None)
scm_dir = scm_dict["dir"]
scm_branch = scm_dict.get("branch", None)
command = scm_dict.get("command")
scm = _get_wrapper(scm_type, logger=logger, command=command)
with temp_dir(prefix="scm_checkout_") as tmp_dir:
scm.export_dir(scm_repo, scm_dir, scm_branch=scm_branch, target_dir=tmp_dir)
files_copied = copy_all(tmp_dir, target_path)
return files_copied