pungi/pungi/wrappers/scm.py
Jeremy Cline ee1ee0467b extra-files: Write a metadata file enumerating extra files
Introduces a new metadata file to track arbitrary files added during the
extra-files phase. This file is placed in the root of each tree and is
called ``extra_files.json``. It is a JSON file containing a single
object, which contains a "header" key with an object describing the
metadata, and a "data" key, which is an array of objects, where each
object represents a file. Each object contains the "file", "checksums",
and "size" keys. "file" is the relative path from the tree root to the
extra file. "checksums" is an object containing one or more checksums,
where the key is the digest type and the value of that key is the hex
digest. Finally, the size is the size of the file in bytes.

For example:
{
  "header": {"version": "1.0},
  "data": [
    {
      "file": "GPL",
      "checksums": {
        "sha256": "8177f97513213526df2cf6184d8ff986c675afb514d4e68a404010521b880643"
      },
      "size": 18092
    },
    {
      "file": "release-notes/notes.html",
      "checksums": {
        "sha256": "82b1ba8db522aadf101dca6404235fba179e559b95ea24ff39ee1e5d9a53bdcb"
      },
      "size": 1120
    }
  ]
}

Signed-off-by: Jeremy Cline <jeremy@jcline.org>
Fixes: #295
2016-09-07 13:02:48 +02:00

315 lines
13 KiB
Python

from __future__ import absolute_import
# -*- coding: utf-8 -*-
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 2 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Library General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
import os
import tempfile
import shutil
import pipes
import glob
import time
import contextlib
import kobo.log
from kobo.shortcuts import run, force_list
from pungi.util import explode_rpm_package, makedirs, copy_all
class ScmBase(kobo.log.LoggingBase):
def __init__(self, logger=None):
kobo.log.LoggingBase.__init__(self, logger=logger)
@contextlib.contextmanager
def _temp_dir(self, tmp_dir=None):
if tmp_dir is not None:
makedirs(tmp_dir)
path = tempfile.mkdtemp(prefix="cvswrapper_", dir=tmp_dir)
yield path
self.log_debug("Removing %s" % path)
try:
shutil.rmtree(path)
except OSError as ex:
self.log_warning("Error removing %s: %s" % (path, ex))
def retry_run(self, cmd, retries=5, timeout=60, **kwargs):
"""
@param cmd - cmd passed to kobo.shortcuts.run()
@param retries=5 - attempt to execute n times
@param timeout=60 - seconds before next try
@param **kwargs - args passed to kobo.shortcuts.run()
"""
for n in range(1, retries + 1):
try:
self.log_debug("Retrying execution %s/%s of '%s'" % (n, retries, cmd))
return run(cmd, **kwargs)
except RuntimeError as ex:
if n == retries:
raise ex
self.log_debug("Waiting %s seconds to retry execution of '%s'" % (timeout, cmd))
time.sleep(timeout)
raise RuntimeError("Something went wrong during execution of '%s'" % cmd)
class FileWrapper(ScmBase):
def export_dir(self, scm_root, scm_dir, target_dir, scm_branch=None, tmp_dir=None, log_file=None):
self.log_debug("Exporting directory %s from current working directory..."
% (scm_dir))
if scm_root:
raise ValueError("FileWrapper: 'scm_root' should be empty.")
dirs = glob.glob(scm_dir)
if not dirs:
raise RuntimeError('No directories matched, can not export.')
for i in dirs:
copy_all(i, target_dir)
def export_file(self, scm_root, scm_file, target_dir, scm_branch=None, tmp_dir=None, log_file=None):
if scm_root:
raise ValueError("FileWrapper: 'scm_root' should be empty.")
self.log_debug("Exporting file %s from current working directory..."
% (scm_file))
files = glob.glob(scm_file)
if not files:
raise RuntimeError('No files matched, can not export.')
for i in files:
target_path = os.path.join(target_dir, os.path.basename(i))
shutil.copy2(i, target_path)
class CvsWrapper(ScmBase):
def export_dir(self, scm_root, scm_dir, target_dir, scm_branch=None, tmp_dir=None, log_file=None):
scm_dir = scm_dir.lstrip("/")
scm_branch = scm_branch or "HEAD"
with self._temp_dir(tmp_dir=tmp_dir) as tmp_dir:
self.log_debug("Exporting directory %s from CVS %s (branch %s)..."
% (scm_dir, scm_root, scm_branch))
self.retry_run(["/usr/bin/cvs", "-q", "-d", scm_root, "export", "-r", scm_branch, scm_dir],
workdir=tmp_dir, show_cmd=True, logfile=log_file)
copy_all(os.path.join(tmp_dir, scm_dir), target_dir)
def export_file(self, scm_root, scm_file, target_dir, scm_branch=None, tmp_dir=None, log_file=None):
scm_file = scm_file.lstrip("/")
scm_branch = scm_branch or "HEAD"
with self._temp_dir(tmp_dir=tmp_dir) as tmp_dir:
target_path = os.path.join(target_dir, os.path.basename(scm_file))
self.log_debug("Exporting file %s from CVS %s (branch %s)..." % (scm_file, scm_root, scm_branch))
self.retry_run(["/usr/bin/cvs", "-q", "-d", scm_root, "export", "-r", scm_branch, scm_file],
workdir=tmp_dir, show_cmd=True, logfile=log_file)
makedirs(target_dir)
shutil.copy2(os.path.join(tmp_dir, scm_file), target_path)
class GitWrapper(ScmBase):
def export_dir(self, scm_root, scm_dir, target_dir, scm_branch=None, tmp_dir=None, log_file=None):
scm_dir = scm_dir.lstrip("/")
scm_branch = scm_branch or "master"
with self._temp_dir(tmp_dir=tmp_dir) as tmp_dir:
if "://" not in scm_root:
scm_root = "file://%s" % scm_root
self.log_debug("Exporting directory %s from git %s (branch %s)..."
% (scm_dir, scm_root, scm_branch))
cmd = ("/usr/bin/git archive --remote=%s %s %s | tar xf -"
% (pipes.quote(scm_root), pipes.quote(scm_branch), pipes.quote(scm_dir)))
# git archive is not supported by http/https
# or by smart http https://git-scm.com/book/en/v2/Git-on-the-Server-Smart-HTTP
if scm_root.startswith("http"):
cmd = ("/usr/bin/git clone --depth 1 --branch=%s %s %s"
% (pipes.quote(scm_branch), pipes.quote(scm_root), pipes.quote(tmp_dir)))
self.retry_run(cmd, workdir=tmp_dir, show_cmd=True, logfile=log_file)
copy_all(os.path.join(tmp_dir, scm_dir), target_dir)
def export_file(self, scm_root, scm_file, target_dir, scm_branch=None, tmp_dir=None, log_file=None):
scm_file = scm_file.lstrip("/")
scm_branch = scm_branch or "master"
with self._temp_dir(tmp_dir=tmp_dir) as tmp_dir:
target_path = os.path.join(target_dir, os.path.basename(scm_file))
if "://" not in scm_root:
scm_root = "file://%s" % scm_root
self.log_debug("Exporting file %s from git %s (branch %s)..."
% (scm_file, scm_root, scm_branch))
cmd = ("/usr/bin/git archive --remote=%s %s %s | tar xf -"
% (pipes.quote(scm_root), pipes.quote(scm_branch), pipes.quote(scm_file)))
# git archive is not supported by http/https
# or by smart http https://git-scm.com/book/en/v2/Git-on-the-Server-Smart-HTTP
if scm_root.startswith("http"):
cmd = ("/usr/bin/git clone --depth 1 --branch=%s %s %s"
% (pipes.quote(scm_branch), pipes.quote(scm_root), pipes.quote(tmp_dir)))
self.retry_run(cmd, workdir=tmp_dir, show_cmd=True, logfile=log_file)
makedirs(target_dir)
shutil.copy2(os.path.join(tmp_dir, scm_file), target_path)
class RpmScmWrapper(ScmBase):
def _list_rpms(self, pats):
for pat in force_list(pats):
for rpm in glob.glob(pat):
yield rpm
def export_dir(self, scm_root, scm_dir, target_dir, scm_branch=None, tmp_dir=None, log_file=None):
for rpm in self._list_rpms(scm_root):
scm_dir = scm_dir.lstrip("/")
with self._temp_dir(tmp_dir=tmp_dir) as tmp_dir:
self.log_debug("Extracting directory %s from RPM package %s..." % (scm_dir, rpm))
explode_rpm_package(rpm, tmp_dir)
makedirs(target_dir)
# "dir" includes the whole directory while "dir/" includes it's content
if scm_dir.endswith("/"):
copy_all(os.path.join(tmp_dir, scm_dir), target_dir)
else:
run("cp -a %s %s/" % (pipes.quote(os.path.join(tmp_dir, scm_dir)),
pipes.quote(target_dir)))
def export_file(self, scm_root, scm_file, target_dir, scm_branch=None, tmp_dir=None, log_file=None):
for rpm in self._list_rpms(scm_root):
scm_file = scm_file.lstrip("/")
with self._temp_dir(tmp_dir=tmp_dir) as tmp_dir:
self.log_debug("Exporting file %s from RPM file %s..." % (scm_file, rpm))
explode_rpm_package(rpm, tmp_dir)
makedirs(target_dir)
for src in glob.glob(os.path.join(tmp_dir, scm_file)):
dst = os.path.join(target_dir, os.path.basename(src))
shutil.copy2(src, dst)
def _get_wrapper(scm_type, *args, **kwargs):
SCM_WRAPPERS = {
"file": FileWrapper,
"cvs": CvsWrapper,
"git": GitWrapper,
"rpm": RpmScmWrapper,
}
try:
return SCM_WRAPPERS[scm_type](*args, **kwargs)
except KeyError:
raise ValueError("Unknown SCM type: %s" % scm_type)
def get_file_from_scm(scm_dict, target_path, logger=None):
"""
Copy one or more files from source control to a target path. A list of files
created in ``target_path`` is returned.
:param scm_dict:
A dictionary describing the source control repository; this can
optionally be a path to a directory on the local filesystem or reference
an RPM. Supported keys for the dictionary are ``scm``, ``repo``,
``file``, and ``branch``. ``scm`` is the type of version control system
used ('git', 'cvs', 'rpm', etc.), ``repo`` is the URL of the repository
(or, if 'rpm' is the ``scm``, the package name), ``file`` is either a
path or list of paths to copy, and ``branch`` is the branch to check
out, if any.
:param target_path:
The destination path for the files being copied.
:param logger:
The logger to use for any logging performed.
Example:
>>> scm_dict = {
>>> 'scm': 'git',
>>> 'repo': 'https://pagure.io/pungi.git',
>>> 'file': ['share/variants.dtd'],
>>> }
>>> target_path = '/tmp/path/'
>>> get_file_from_scm(scm_dict, target_path)
['/tmp/path/share/variants.dtd']
"""
if isinstance(scm_dict, str):
scm_type = "file"
scm_repo = None
scm_file = os.path.abspath(scm_dict)
scm_branch = None
else:
scm_type = scm_dict["scm"]
scm_repo = scm_dict["repo"]
scm_file = scm_dict["file"]
scm_branch = scm_dict.get("branch", None)
scm = _get_wrapper(scm_type, logger=logger)
files_copied = []
for i in force_list(scm_file):
tmp_dir = tempfile.mkdtemp(prefix="scm_checkout_")
scm.export_file(scm_repo, i, scm_branch=scm_branch, target_dir=tmp_dir)
files_copied += copy_all(tmp_dir, target_path)
shutil.rmtree(tmp_dir)
return files_copied
def get_dir_from_scm(scm_dict, target_path, logger=None):
"""
Copy a directory from source control to a target path. A list of files
created in ``target_path`` is returned.
:param scm_dict:
A dictionary describing the source control repository; this can
optionally be a path to a directory on the local filesystem or reference
an RPM. Supported keys for the dictionary are ``scm``, ``repo``,
``dir``, and ``branch``. ``scm`` is the type of version control system
used ('git', 'cvs', 'rpm', etc.), ``repo`` is the URL of the repository
(or, if 'rpm' is the ``scm``, the package name), ``dir`` is the
directory to copy, and ``branch`` is the branch to check out, if any.
:param target_path:
The destination path for the directory being copied.
:param logger:
The logger to use for any logging performed.
Example:
>>> scm_dict = {
>>> 'scm': 'git',
>>> 'repo': 'https://pagure.io/pungi.git',
>>> 'dir': 'share,
>>> }
>>> target_path = '/tmp/path/'
>>> get_dir_from_scm(scm_dict, target_path)
['/tmp/path/share/variants.dtd', '/tmp/path/share/rawhide-fedora.ks', ...]
"""
if isinstance(scm_dict, str):
scm_type = "file"
scm_repo = None
scm_dir = os.path.abspath(scm_dict)
scm_branch = None
else:
scm_type = scm_dict["scm"]
scm_repo = scm_dict.get("repo", None)
scm_dir = scm_dict["dir"]
scm_branch = scm_dict.get("branch", None)
scm = _get_wrapper(scm_type, logger=logger)
tmp_dir = tempfile.mkdtemp(prefix="scm_checkout_")
scm.export_dir(scm_repo, scm_dir, scm_branch=scm_branch, target_dir=tmp_dir)
files_copied = copy_all(tmp_dir, target_path)
shutil.rmtree(tmp_dir)
return files_copied