[scm-wrapper] Copy files directly

Instead of spawning `cp x/* y` there is now Python code to the same
thing. This should help with debugging if something fails as the
traceback will be more informative (rather than saying a command
failed). As another benefit the tests get much simpler.

Signed-off-by: Lubomír Sedlář <lsedlar@redhat.com>
This commit is contained in:
Lubomír Sedlář 2016-02-17 16:32:33 +01:00
parent 5d14304dd1
commit 770ca9c202
2 changed files with 36 additions and 44 deletions

View File

@ -74,7 +74,7 @@ class FileWrapper(ScmBase):
raise ValueError("FileWrapper: 'scm_root' should be empty.")
dirs = glob.glob(scm_dir)
for i in dirs:
run("cp -a %s/* %s/" % (pipes.quote(i), pipes.quote(target_dir)))
_copy_all(i, target_dir)
def export_file(self, scm_root, scm_file, target_dir, scm_branch=None, tmp_dir=None, log_file=None):
if scm_root:
@ -93,8 +93,7 @@ class CvsWrapper(ScmBase):
self.log_debug("Exporting directory %s from CVS %s (branch %s)..." % (scm_dir, scm_root, scm_branch))
self.retry_run(["/usr/bin/cvs", "-q", "-d", scm_root, "export", "-r", scm_branch, scm_dir], workdir=tmp_dir, show_cmd=True, logfile=log_file)
# TODO: hidden files
run("cp -a %s/* %s/" % (pipes.quote(os.path.join(tmp_dir, scm_dir)), pipes.quote(target_dir)))
_copy_all(os.path.join(tmp_dir, scm_dir), target_dir)
self._delete_temp_dir(tmp_dir)
def export_file(self, scm_root, scm_file, target_dir, scm_branch=None, tmp_dir=None, log_file=None):
@ -128,7 +127,7 @@ class GitWrapper(ScmBase):
cmd = "/usr/bin/git clone --depth 1 --branch=%s %s %s" % (pipes.quote(scm_branch), pipes.quote(scm_root), pipes.quote(tmp_dir))
self.retry_run(cmd, workdir=tmp_dir, show_cmd=True, logfile=log_file)
run("cp -a %s/* %s/" % (pipes.quote(os.path.join(tmp_dir, scm_dir)), pipes.quote(target_dir)))
_copy_all(os.path.join(tmp_dir, scm_dir), target_dir)
self._delete_temp_dir(tmp_dir)
def export_file(self, scm_root, scm_file, target_dir, scm_branch=None, tmp_dir=None, log_file=None):
@ -177,7 +176,7 @@ class RpmScmWrapper(ScmBase):
makedirs(target_dir)
# "dir" includes the whole directory while "dir/" includes it's content
if scm_dir.endswith("/"):
run("cp -a %s/* %s/" % (pipes.quote(os.path.join(tmp_dir, scm_dir)), pipes.quote(target_dir)))
_copy_all(os.path.join(tmp_dir, scm_dir), target_dir)
else:
run("cp -a %s %s/" % (pipes.quote(os.path.join(tmp_dir, scm_dir)), pipes.quote(target_dir)))
self._delete_temp_dir(tmp_dir)
@ -239,8 +238,7 @@ def get_file_from_scm(scm_dict, target_path, logger=None):
for i in force_list(scm_file):
tmp_dir = tempfile.mkdtemp(prefix="scm_checkout_")
scm.export_file(scm_repo, i, scm_branch=scm_branch, target_dir=tmp_dir)
makedirs(target_path)
run("cp -a %s/* %s/" % (pipes.quote(tmp_dir), pipes.quote(target_path)))
_copy_all(tmp_dir, target_path)
shutil.rmtree(tmp_dir)
@ -260,7 +258,20 @@ def get_dir_from_scm(scm_dict, target_path, logger=None):
tmp_dir = tempfile.mkdtemp(prefix="scm_checkout_")
scm.export_dir(scm_repo, scm_dir, scm_branch=scm_branch, target_dir=tmp_dir)
# TODO: hidden files
makedirs(target_path)
run("cp -a %s/* %s/" % (pipes.quote(tmp_dir), pipes.quote(target_path)))
_copy_all(tmp_dir, target_path)
shutil.rmtree(tmp_dir)
def _copy_all(src, dest):
"""This function is equivalent to running `cp src/* dest`."""
contents = os.listdir(src)
if not contents:
raise RuntimeError('Source directory %s is empty.' % src)
makedirs(dest)
for item in contents:
source = os.path.join(src, item)
destination = os.path.join(dest, item)
if os.path.isdir(source):
shutil.copytree(source, destination)
else:
shutil.copy2(source, destination)

View File

@ -5,7 +5,6 @@ import mock
import unittest
import shutil
import tempfile
from kobo import shortcuts
import os
import sys
@ -83,12 +82,9 @@ class GitSCMTestCase(unittest.TestCase):
commands = []
def process(cmd, workdir=None, **kwargs):
if cmd[:3] == 'cp ':
shortcuts.run(cmd, workdir=workdir, **kwargs)
else:
fname = cmd.split('|')[0].strip().split(' ')[-1]
touch(os.path.join(workdir, fname))
commands.append(cmd)
fname = cmd.split('|')[0].strip().split(' ')[-1]
touch(os.path.join(workdir, fname))
commands.append(cmd)
run.side_effect = process
@ -108,13 +104,10 @@ class GitSCMTestCase(unittest.TestCase):
commands = []
def process(cmd, workdir=None, **kwargs):
if cmd[:3] == 'cp ':
shortcuts.run(cmd, workdir=workdir, **kwargs)
else:
checkout = cmd.split(' ')[-1]
touch(os.path.join(checkout, 'some_file.txt'))
touch(os.path.join(checkout, 'other_file.txt'))
commands.append(cmd)
checkout = cmd.split(' ')[-1]
touch(os.path.join(checkout, 'some_file.txt'))
touch(os.path.join(checkout, 'other_file.txt'))
commands.append(cmd)
run.side_effect = process
@ -135,13 +128,10 @@ class GitSCMTestCase(unittest.TestCase):
commands = []
def process(cmd, workdir=None, **kwargs):
if cmd[:3] == 'cp ':
shortcuts.run(cmd, workdir=workdir, **kwargs)
else:
fname = cmd.split('|')[0].strip().split(' ')[-1]
touch(os.path.join(workdir, fname, 'first'))
touch(os.path.join(workdir, fname, 'second'))
commands.append(cmd)
fname = cmd.split('|')[0].strip().split(' ')[-1]
touch(os.path.join(workdir, fname, 'first'))
touch(os.path.join(workdir, fname, 'second'))
commands.append(cmd)
run.side_effect = process
@ -163,13 +153,10 @@ class GitSCMTestCase(unittest.TestCase):
commands = []
def process(cmd, workdir=None, **kwargs):
if cmd[:3] == 'cp ':
shortcuts.run(cmd, workdir=workdir, **kwargs)
else:
checkout = cmd.split(' ')[-1]
touch(os.path.join(checkout, 'subdir', 'first'))
touch(os.path.join(checkout, 'subdir', 'second'))
commands.append(cmd)
checkout = cmd.split(' ')[-1]
touch(os.path.join(checkout, 'subdir', 'first'))
touch(os.path.join(checkout, 'subdir', 'second'))
commands.append(cmd)
run.side_effect = process
@ -356,9 +343,6 @@ class CvsSCMTestCase(unittest.TestCase):
commands = []
def process(cmd, workdir=None, **kwargs):
if cmd[:3] == 'cp ':
shortcuts.run(cmd, workdir=workdir, **kwargs)
return
fname = cmd[-1]
touch(os.path.join(workdir, fname))
commands.append(' '.join(cmd))
@ -381,9 +365,6 @@ class CvsSCMTestCase(unittest.TestCase):
commands = []
def process(cmd, workdir=None, **kwargs):
if cmd[:3] == 'cp ':
shortcuts.run(cmd, workdir=workdir, **kwargs)
return
fname = cmd[-1]
touch(os.path.join(workdir, fname, 'first'))
touch(os.path.join(workdir, fname, 'second'))