7ec409a236
Signed-off-by: Lubos Kocman <lkocman@redhat.com>
271 lines
11 KiB
Python
271 lines
11 KiB
Python
from __future__ import absolute_import
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; version 2 of the License.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Library General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, write to the Free Software
|
|
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
|
|
|
|
|
|
import os
|
|
import tempfile
|
|
import shutil
|
|
import pipes
|
|
import glob
|
|
import time
|
|
|
|
import kobo.log
|
|
from kobo.shortcuts import run, force_list
|
|
from pungi.util import explode_rpm_package, makedirs
|
|
|
|
class ScmBase(kobo.log.LoggingBase):
|
|
def __init__(self, logger=None):
|
|
kobo.log.LoggingBase.__init__(self, logger=logger)
|
|
|
|
def _create_temp_dir(self, tmp_dir=None):
|
|
if tmp_dir is not None:
|
|
makedirs(tmp_dir)
|
|
return tempfile.mkdtemp(prefix="cvswrapper_", dir=tmp_dir)
|
|
|
|
def _delete_temp_dir(self, tmp_dir):
|
|
self.log_debug("Removing %s" % tmp_dir)
|
|
try:
|
|
shutil.rmtree(tmp_dir)
|
|
except OSError as ex:
|
|
self.log_warning("Error removing %s: %s" % (tmp_dir, ex))
|
|
|
|
def export_file(self, scm_root, scm_file, target_dir, scm_branch=None, tmp_dir=None, log_file=None):
|
|
raise NotImplemented
|
|
|
|
def retry_run(self, cmd, retries=5, timeout=60, **kwargs):
|
|
"""
|
|
@param cmd - cmd passed to kobo.shortcuts.run()
|
|
@param retries=5 - attempt to execute n times
|
|
@param timeout=60 - seconds before next try
|
|
@param **kwargs - args passed to kobo.shortcuts.run()
|
|
"""
|
|
|
|
for n in range(1, retries + 1):
|
|
try:
|
|
self.log_debug("Retrying execution %s/%s of '%s'" % (n, retries, cmd))
|
|
return run(cmd, **kwargs)
|
|
except RuntimeError as ex:
|
|
if n == retries:
|
|
raise ex
|
|
self.log_debug("Waiting %s seconds to retry execution of '%s'" % (timeout, cmd))
|
|
time.sleep(timeout)
|
|
|
|
raise RuntimeError("Something went wrong during execution of '%s'" % cmd)
|
|
|
|
|
|
class FileWrapper(ScmBase):
|
|
def export_dir(self, scm_root, scm_dir, target_dir, scm_branch=None, tmp_dir=None, log_file=None):
|
|
if scm_root:
|
|
raise ValueError("FileWrapper: 'scm_root' should be empty.")
|
|
dirs = glob.glob(scm_dir)
|
|
for i in dirs:
|
|
run("cp -a %s/* %s/" % (pipes.quote(i), pipes.quote(target_dir)))
|
|
|
|
def export_file(self, scm_root, scm_file, target_dir, scm_branch=None, tmp_dir=None, log_file=None):
|
|
if scm_root:
|
|
raise ValueError("FileWrapper: 'scm_root' should be empty.")
|
|
files = glob.glob(scm_file)
|
|
for i in files:
|
|
target_path = os.path.join(target_dir, os.path.basename(i))
|
|
shutil.copy2(i, target_path)
|
|
|
|
|
|
class CvsWrapper(ScmBase):
|
|
def export_dir(self, scm_root, scm_dir, target_dir, scm_branch=None, tmp_dir=None, log_file=None):
|
|
scm_dir = scm_dir.lstrip("/")
|
|
scm_branch = scm_branch or "HEAD"
|
|
tmp_dir = self._create_temp_dir(tmp_dir=tmp_dir)
|
|
|
|
self.log_debug("Exporting directory %s from CVS %s (branch %s)..." % (scm_dir, scm_root, scm_branch))
|
|
self.retry_run(["/usr/bin/cvs", "-q", "-d", scm_root, "export", "-r", scm_branch, scm_dir], workdir=tmp_dir, show_cmd=True, logfile=log_file)
|
|
# TODO: hidden files
|
|
run("cp -a %s/* %s/" % (pipes.quote(os.path.join(tmp_dir, scm_dir)), pipes.quote(target_dir)))
|
|
self._delete_temp_dir(tmp_dir)
|
|
|
|
def export_file(self, scm_root, scm_file, target_dir, scm_branch=None, tmp_dir=None, log_file=None):
|
|
scm_file = scm_file.lstrip("/")
|
|
scm_branch = scm_branch or "HEAD"
|
|
tmp_dir = self._create_temp_dir(tmp_dir=tmp_dir)
|
|
|
|
target_path = os.path.join(target_dir, os.path.basename(scm_file))
|
|
self.log_debug("Exporting file %s from CVS %s (branch %s)..." % (scm_file, scm_root, scm_branch))
|
|
self.retry_run(["/usr/bin/cvs", "-q", "-d", scm_root, "export", "-r", scm_branch, scm_file], workdir=tmp_dir, show_cmd=True, logfile=log_file)
|
|
|
|
makedirs(target_dir)
|
|
shutil.copy2(os.path.join(tmp_dir, scm_file), target_path)
|
|
self._delete_temp_dir(tmp_dir)
|
|
|
|
|
|
class GitWrapper(ScmBase):
|
|
def export_dir(self, scm_root, scm_dir, target_dir, scm_branch=None, tmp_dir=None, log_file=None):
|
|
scm_dir = scm_dir.lstrip("/")
|
|
scm_branch = scm_branch or "master"
|
|
tmp_dir = self._create_temp_dir(tmp_dir=tmp_dir)
|
|
|
|
if "://" not in scm_root:
|
|
scm_root = "file://%s" % scm_root
|
|
|
|
self.log_debug("Exporting directory %s from git %s (branch %s)..." % (scm_dir, scm_root, scm_branch))
|
|
cmd = "/usr/bin/git archive --remote=%s %s %s | tar xf -" % (pipes.quote(scm_root), pipes.quote(scm_branch), pipes.quote(scm_dir))
|
|
# git archive is not supported by http/https
|
|
# or by smart http https://git-scm.com/book/en/v2/Git-on-the-Server-Smart-HTTP
|
|
if scm_root.startswith("http"):
|
|
cmd = "/usr/bin/git clone --depth 1 --branch=%s %s %s" % (pipes.quote(scm_branch), pipes.quote(scm_root), pipes.quote(tmp_dir))
|
|
self.retry_run(cmd, workdir=tmp_dir, show_cmd=True, logfile=log_file)
|
|
|
|
run("cp -a %s/* %s/" % (pipes.quote(os.path.join(tmp_dir, scm_dir)), pipes.quote(target_dir)))
|
|
self._delete_temp_dir(tmp_dir)
|
|
|
|
def export_file(self, scm_root, scm_file, target_dir, scm_branch=None, tmp_dir=None, log_file=None):
|
|
scm_file = scm_file.lstrip("/")
|
|
scm_branch = scm_branch or "master"
|
|
tmp_dir = self._create_temp_dir(tmp_dir=tmp_dir)
|
|
|
|
target_path = os.path.join(target_dir, os.path.basename(scm_file))
|
|
|
|
if "://" not in scm_root:
|
|
scm_root = "file://%s" % scm_root
|
|
|
|
self.log_debug("Exporting file %s from git %s (branch %s)..." % (scm_file, scm_root, scm_branch))
|
|
cmd = "/usr/bin/git archive --remote=%s %s %s | tar xf -" % (pipes.quote(scm_root), pipes.quote(scm_branch), pipes.quote(scm_file))
|
|
# git archive is not supported by http/https
|
|
# or by smart http https://git-scm.com/book/en/v2/Git-on-the-Server-Smart-HTTP
|
|
if scm_root.startswith("http"):
|
|
cmd = "/usr/bin/git clone --depth 1 --branch=%s %s %s" % (pipes.quote(scm_branch), pipes.quote(scm_root), pipes.quote(tmp_dir))
|
|
self.retry_run(cmd, workdir=tmp_dir, show_cmd=True, logfile=log_file)
|
|
|
|
makedirs(target_dir)
|
|
shutil.copy2(os.path.join(tmp_dir, scm_file), target_path)
|
|
self._delete_temp_dir(tmp_dir)
|
|
|
|
|
|
class RpmScmWrapper(ScmBase):
|
|
def export_dir(self, scm_root, scm_dir, target_dir, scm_branch=None, tmp_dir=None, log_file=None):
|
|
# if scm_root is a list, recursively process all RPMs
|
|
if isinstance(scm_root, list):
|
|
for i in scm_root:
|
|
self.export_dir(i, scm_dir, target_dir, scm_branch, tmp_dir, log_file)
|
|
return
|
|
|
|
# if scm_root is a glob, recursively process all RPMs
|
|
rpms = glob.glob(scm_root)
|
|
if len(rpms) > 1 or (rpms and rpms[0] != scm_root):
|
|
for i in rpms:
|
|
self.export_dir(i, scm_dir, target_dir, scm_branch, tmp_dir, log_file)
|
|
return
|
|
|
|
scm_dir = scm_dir.lstrip("/")
|
|
tmp_dir = self._create_temp_dir(tmp_dir=tmp_dir)
|
|
self.log_debug("Extracting directory %s from RPM package %s..." % (scm_dir, scm_root))
|
|
explode_rpm_package(scm_root, tmp_dir)
|
|
|
|
makedirs(target_dir)
|
|
# "dir" includes the whole directory while "dir/" includes it's content
|
|
if scm_dir.endswith("/"):
|
|
run("cp -a %s/* %s/" % (pipes.quote(os.path.join(tmp_dir, scm_dir)), pipes.quote(target_dir)))
|
|
else:
|
|
run("cp -a %s %s/" % (pipes.quote(os.path.join(tmp_dir, scm_dir)), pipes.quote(target_dir)))
|
|
self._delete_temp_dir(tmp_dir)
|
|
|
|
def export_file(self, scm_root, scm_file, target_dir, scm_branch=None, tmp_dir=None, log_file=None):
|
|
# if scm_root is a list, recursively process all RPMs
|
|
if isinstance(scm_root, list):
|
|
for i in scm_root:
|
|
self.export_file(i, scm_file, target_dir, scm_branch, tmp_dir, log_file)
|
|
return
|
|
|
|
# if scm_root is a glob, recursively process all RPMs
|
|
rpms = glob.glob(scm_root)
|
|
if len(rpms) > 1 or (rpms and rpms[0] != scm_root):
|
|
for i in rpms:
|
|
self.export_file(i, scm_file, target_dir, scm_branch, tmp_dir, log_file)
|
|
return
|
|
|
|
scm_file = scm_file.lstrip("/")
|
|
tmp_dir = self._create_temp_dir(tmp_dir=tmp_dir)
|
|
|
|
self.log_debug("Exporting file %s from RPM file %s..." % (scm_file, scm_root))
|
|
explode_rpm_package(scm_root, tmp_dir)
|
|
|
|
makedirs(target_dir)
|
|
for src in glob.glob(os.path.join(tmp_dir, scm_file)):
|
|
dst = os.path.join(target_dir, os.path.basename(src))
|
|
shutil.copy2(src, dst)
|
|
self._delete_temp_dir(tmp_dir)
|
|
|
|
|
|
def get_file_from_scm(scm_dict, target_path, logger=None):
|
|
if isinstance(scm_dict, str):
|
|
scm_type = "file"
|
|
scm_repo = None
|
|
scm_file = os.path.abspath(scm_dict)
|
|
scm_branch = None
|
|
else:
|
|
scm_type = scm_dict["scm"]
|
|
scm_repo = scm_dict["repo"]
|
|
scm_file = scm_dict["file"]
|
|
scm_branch = scm_dict.get("branch", None)
|
|
|
|
if scm_type == "file":
|
|
scm = FileWrapper(logger=logger)
|
|
elif scm_type == "cvs":
|
|
scm = CvsWrapper(logger=logger)
|
|
elif scm_type == "git":
|
|
scm = GitWrapper(logger=logger)
|
|
elif scm_type == "rpm":
|
|
scm = RpmScmWrapper(logger=logger)
|
|
else:
|
|
raise ValueError("Unknown SCM type: %s" % scm_type)
|
|
|
|
for i in force_list(scm_file):
|
|
tmp_dir = tempfile.mkdtemp(prefix="scm_checkout_")
|
|
scm.export_file(scm_repo, i, scm_branch=scm_branch, target_dir=tmp_dir)
|
|
makedirs(target_path)
|
|
run("cp -a %s/* %s/" % (pipes.quote(tmp_dir), pipes.quote(target_path)))
|
|
shutil.rmtree(tmp_dir)
|
|
|
|
|
|
def get_dir_from_scm(scm_dict, target_path, logger=None):
|
|
if isinstance(scm_dict, str):
|
|
scm_type = "file"
|
|
scm_repo = None
|
|
scm_dir = os.path.abspath(scm_dict)
|
|
scm_branch = None
|
|
else:
|
|
scm_type = scm_dict["scm"]
|
|
scm_repo = scm_dict.get("repo", None)
|
|
scm_dir = scm_dict["dir"]
|
|
scm_branch = scm_dict.get("branch", None)
|
|
|
|
if scm_type == "file":
|
|
scm = FileWrapper(logger=logger)
|
|
elif scm_type == "cvs":
|
|
scm = CvsWrapper(logger=logger)
|
|
elif scm_type == "git":
|
|
scm = GitWrapper(logger=logger)
|
|
elif scm_type == "rpm":
|
|
scm = RpmScmWrapper(logger=logger)
|
|
else:
|
|
raise ValueError("Unknown SCM type: %s" % scm_type)
|
|
|
|
tmp_dir = tempfile.mkdtemp(prefix="scm_checkout_")
|
|
scm.export_dir(scm_repo, scm_dir, scm_branch=scm_branch, target_dir=tmp_dir)
|
|
# TODO: hidden files
|
|
makedirs(target_path)
|
|
run("cp -a %s/* %s/" % (pipes.quote(tmp_dir), pipes.quote(target_path)))
|
|
shutil.rmtree(tmp_dir)
|