#!/usr/bin/env python2
# -*- coding: utf-8 -*-

import mock
import os
import sys
try:
    import unittest2 as unittest
except ImportError:
    import unittest
import tempfile
import shutil
import subprocess

sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))

from pungi import compose
from pungi import util

from tests.helpers import touch, PungiTestCase


class TestGitRefResolver(unittest.TestCase):

    @mock.patch('pungi.util.run')
    def test_successful_resolve(self, run):
        run.return_value = (0, 'CAFEBABE\tHEAD\n')

        url = util.resolve_git_url('https://git.example.com/repo.git?somedir#HEAD')

        self.assertEqual(url, 'https://git.example.com/repo.git?somedir#CAFEBABE')
        run.assert_called_once_with(['git', 'ls-remote', 'https://git.example.com/repo.git', 'HEAD'])

    @mock.patch('pungi.util.run')
    def test_successful_resolve_branch(self, run):
        run.return_value = (0, 'CAFEBABE\trefs/heads/f24\n')

        url = util.resolve_git_url('https://git.example.com/repo.git?somedir#origin/f24')

        self.assertEqual(url, 'https://git.example.com/repo.git?somedir#CAFEBABE')
        run.assert_called_once_with(['git', 'ls-remote', 'https://git.example.com/repo.git', 'refs/heads/f24'])

    @mock.patch('pungi.util.run')
    def test_resolve_missing_spec(self, run):
        url = util.resolve_git_url('https://git.example.com/repo.git')

        self.assertEqual(url, 'https://git.example.com/repo.git')
        self.assertEqual(run.mock_calls, [])

    @mock.patch('pungi.util.run')
    def test_resolve_non_head_spec(self, run):
        url = util.resolve_git_url('https://git.example.com/repo.git#some-tag')

        self.assertEqual(url, 'https://git.example.com/repo.git#some-tag')
        self.assertEqual(run.mock_calls, [])

    @mock.patch('pungi.util.run')
    def test_resolve_ambiguous(self, run):
        run.return_value = (0, 'CAFEBABE\tF11\nDEADBEEF\tF10\n')

        with self.assertRaises(RuntimeError):
            util.resolve_git_url('https://git.example.com/repo.git?somedir#HEAD')

        run.assert_called_once_with(['git', 'ls-remote', 'https://git.example.com/repo.git', 'HEAD'])

    @mock.patch('pungi.util.run')
    def test_resolve_keep_empty_query_string(self, run):
        run.return_value = (0, 'CAFEBABE\tHEAD\n')

        url = util.resolve_git_url('https://git.example.com/repo.git?#HEAD')

        run.assert_called_once_with(['git', 'ls-remote', 'https://git.example.com/repo.git', 'HEAD'])
        self.assertEqual(url, 'https://git.example.com/repo.git?#CAFEBABE')

    @mock.patch('pungi.util.run')
    def test_resolve_strip_git_plus_prefix(self, run):
        run.return_value = (0, 'CAFEBABE\tHEAD\n')

        url = util.resolve_git_url('git+https://git.example.com/repo.git#HEAD')

        run.assert_called_once_with(['git', 'ls-remote', 'https://git.example.com/repo.git', 'HEAD'])
        self.assertEqual(url, 'git+https://git.example.com/repo.git#CAFEBABE')

    @mock.patch('pungi.util.run')
    def test_resolve_no_branch_in_remote(self, run):
        run.return_value = (0, '')

        with self.assertRaises(RuntimeError) as ctx:
            util.resolve_git_url('https://git.example.com/repo.git?somedir#origin/my-branch')

        run.assert_called_once_with(
            ['git', 'ls-remote', 'https://git.example.com/repo.git', 'refs/heads/my-branch'])
        self.assertIn('ref does not exist in remote repo', str(ctx.exception))


class TestGetVariantData(unittest.TestCase):
    def test_get_simple(self):
        conf = {
            'foo': {
                '^Client$': 1
            }
        }
        result = util.get_variant_data(conf, 'foo', mock.Mock(uid='Client'))
        self.assertEqual(result, [1])

    def test_get_make_list(self):
        conf = {
            'foo': {
                '^Client$': [1, 2],
                '^.*$': 3,
            }
        }
        result = util.get_variant_data(conf, 'foo', mock.Mock(uid='Client'))
        self.assertItemsEqual(result, [1, 2, 3])

    def test_not_matching_arch(self):
        conf = {
            'foo': {
                '^Client$': [1, 2],
            }
        }
        result = util.get_variant_data(conf, 'foo', mock.Mock(uid='Server'))
        self.assertItemsEqual(result, [])

    def test_handle_missing_config(self):
        result = util.get_variant_data({}, 'foo', mock.Mock(uid='Client'))
        self.assertItemsEqual(result, [])


class TestVolumeIdGenerator(unittest.TestCase):
    def setUp(self):
        self.tmp_dir = tempfile.mkdtemp()

    def tearDown(self):
        shutil.rmtree(self.tmp_dir)

    @mock.patch('pungi.compose.ComposeInfo')
    def test_get_volid(self, ci):
        all_keys = [
            (['arch', 'compose_id', 'date', 'disc_type'], 'x86_64-compose_id-20160107-'),
            (['label', 'label_major_version', 'release_short', 'respin'], 'RC-1.0-1-rel_short2-2'),
            (['type', 'type_suffix', 'variant', 'version'], 'nightly-.n-Server-6.0')
        ]
        for keys, expected in all_keys:
            format = '-'.join(['%(' + k + ')s' for k in keys])
            conf = {
                'release_short': 'rel_short2',
                'release_version': '6.0',
                'release_is_layered': False,
                'image_volid_formats': [format],
                'image_volid_layered_product_formats': [],
                'volume_id_substitutions': {},
            }
            variant = mock.Mock(uid='Server', type='variant')
            ci.return_value.compose.respin = 2
            ci.return_value.compose.id = 'compose_id'
            ci.return_value.compose.date = '20160107'
            ci.return_value.compose.type = 'nightly'
            ci.return_value.compose.type_suffix = '.n'
            ci.return_value.compose.label = 'RC-1.0'
            ci.return_value.compose.label_major_version = '1'

            ci.return_value.release.version = '3.0'
            ci.return_value.release.short = 'rel_short'

            c = compose.Compose(conf, self.tmp_dir)

            volid = util.get_volid(c, 'x86_64', variant, escape_spaces=False, disc_type=False)

            self.assertEqual(volid, expected)

    @mock.patch('pungi.compose.ComposeInfo')
    def test_get_volid_too_long(self, ci):
        conf = {
            'release_short': 'rel_short2',
            'release_version': '6.0',
            'release_is_layered': False,
            'image_volid_formats': [
                'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa',   # 34 chars
                'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb',    # 33 chars
            ],
            'image_volid_layered_product_formats': [],
            'volume_id_substitutions': {},
        }
        variant = mock.Mock(uid='Server', type='variant')
        c = compose.Compose(conf, self.tmp_dir)

        with self.assertRaises(ValueError) as ctx:
            util.get_volid(c, 'x86_64', variant, escape_spaces=False, disc_type=False)

        self.assertIn('bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb', str(ctx.exception))
        self.assertIn('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa', str(ctx.exception))


class TestFindOldCompose(unittest.TestCase):
    def setUp(self):
        self.tmp_dir = tempfile.mkdtemp()

    def tearDown(self):
        shutil.rmtree(self.tmp_dir)

    def test_finds_single(self):
        touch(self.tmp_dir + '/Fedora-Rawhide-20160229.0/STATUS', 'FINISHED')
        old = util.find_old_compose(self.tmp_dir, 'Fedora', 'Rawhide')
        self.assertEqual(old, self.tmp_dir + '/Fedora-Rawhide-20160229.0')

    def test_ignores_in_progress(self):
        touch(self.tmp_dir + '/Fedora-Rawhide-20160229.0/STATUS', 'STARTED')
        old = util.find_old_compose(self.tmp_dir, 'Fedora', 'Rawhide')
        self.assertIsNone(old)

    def test_finds_latest(self):
        touch(self.tmp_dir + '/Fedora-Rawhide-20160228.0/STATUS', 'DOOMED')
        touch(self.tmp_dir + '/Fedora-Rawhide-20160229.0/STATUS', 'FINISHED')
        touch(self.tmp_dir + '/Fedora-Rawhide-20160229.1/STATUS', 'FINISHED_INCOMPLETE')
        old = util.find_old_compose(self.tmp_dir, 'Fedora', 'Rawhide')
        self.assertEqual(old, self.tmp_dir + '/Fedora-Rawhide-20160229.1')

    def test_finds_ignores_other_files(self):
        touch(self.tmp_dir + '/Fedora-Rawhide-20160229.0', 'not a compose')
        touch(self.tmp_dir + '/Fedora-Rawhide-20160228.0/STATUS/file', 'also not a compose')
        touch(self.tmp_dir + '/Fedora-24-20160229.0/STATUS', 'FINISHED')
        touch(self.tmp_dir + '/Another-Rawhide-20160229.0/STATUS', 'FINISHED')
        old = util.find_old_compose(self.tmp_dir, 'Fedora', 'Rawhide')
        self.assertIsNone(old)

    def test_search_in_file(self):
        touch(self.tmp_dir + '/file')
        old = util.find_old_compose(self.tmp_dir + '/file', 'Fedora', 'Rawhide')
        self.assertIsNone(old)

    def test_skips_symlink(self):
        os.symlink(self.tmp_dir, self.tmp_dir + '/Fedora-Rawhide-20160229.0')
        old = util.find_old_compose(self.tmp_dir, 'Fedora', 'Rawhide')
        self.assertIsNone(old)

    def test_finds_layered_product(self):
        touch(self.tmp_dir + '/Fedora-Rawhide-Base-1-20160229.0/STATUS', 'FINISHED')
        old = util.find_old_compose(self.tmp_dir, 'Fedora', 'Rawhide',
                                    base_product_short='Base', base_product_version='1')
        self.assertEqual(old, self.tmp_dir + '/Fedora-Rawhide-Base-1-20160229.0')


class TestHelpers(PungiTestCase):
    def test_process_args(self):
        self.assertEqual(util.process_args('--opt=%s', None), [])
        self.assertEqual(util.process_args('--opt=%s', []), [])
        self.assertEqual(util.process_args('--opt=%s', ['foo', 'bar']),
                         ['--opt=foo', '--opt=bar'])
        self.assertEqual(util.process_args('--opt=%s', 'foo'), ['--opt=foo'])

    def test_makedirs(self):
        util.makedirs(self.topdir + '/foo/bar/baz')
        self.assertTrue(os.path.isdir(self.topdir + '/foo/bar/baz'))

    def test_makedirs_on_existing(self):
        os.makedirs(self.topdir + '/foo/bar/baz')
        try:
            util.makedirs(self.topdir + '/foo/bar/baz')
        except OSError:
            self.fail('makedirs raised exception on existing directory')


class TestLevenshtein(unittest.TestCase):
    def test_edit_dist_empty_str(self):
        self.assertEqual(util.levenshtein('', ''), 0)

    def test_edit_dist_same_str(self):
        self.assertEqual(util.levenshtein('aaa', 'aaa'), 0)

    def test_edit_dist_one_change(self):
        self.assertEqual(util.levenshtein('aab', 'aaa'), 1)

    def test_edit_dist_different_words(self):
        self.assertEqual(util.levenshtein('kitten', 'sitting'), 3)


class TestRecursiveFileList(unittest.TestCase):

    def setUp(self):
        self.tmp_dir = tempfile.mkdtemp()

    def tearDown(self):
        shutil.rmtree(self.tmp_dir)

    def test_flat_file_list(self):
        """Build a directory containing files and assert they are listed."""
        expected_files = sorted(['file1', 'file2', 'file3'])
        for expected_file in [os.path.join(self.tmp_dir, f) for f in expected_files]:
            touch(expected_file)

        actual_files = sorted(util.recursive_file_list(self.tmp_dir))
        self.assertEqual(expected_files, actual_files)

    def test_nested_file_list(self):
        """Build a directory containing files and assert they are listed."""
        expected_files = sorted(['file1', 'subdir/file2', 'sub/subdir/file3'])
        for expected_file in [os.path.join(self.tmp_dir, f) for f in expected_files]:
            touch(expected_file)

        actual_files = sorted(util.recursive_file_list(self.tmp_dir))
        self.assertEqual(expected_files, actual_files)


class TestTempFiles(unittest.TestCase):
    def test_temp_dir_ok(self):
        with util.temp_dir() as tmp:
            self.assertTrue(os.path.isdir(tmp))
        self.assertFalse(os.path.exists(tmp))

    def test_temp_dir_fail(self):
        with self.assertRaises(RuntimeError):
            with util.temp_dir() as tmp:
                self.assertTrue(os.path.isdir(tmp))
                raise RuntimeError('BOOM')
        self.assertFalse(os.path.exists(tmp))

    def test_temp_dir_in_non_existing_dir(self):
        with util.temp_dir() as playground:
            root = os.path.join(playground, 'missing')
            with util.temp_dir(dir=root) as tmp:
                self.assertTrue(os.path.isdir(tmp))
            self.assertTrue(os.path.isdir(root))
            self.assertFalse(os.path.exists(tmp))


class TestUnmountCmd(unittest.TestCase):

    def _fakeProc(self, ret, err='', out=''):
        proc = mock.Mock(returncode=ret)
        proc.communicate.return_value = (out, err)
        return proc

    @mock.patch('subprocess.Popen')
    def test_unmount_cmd_success(self, mockPopen):
        cmd = 'unmount'
        mockPopen.side_effect = [self._fakeProc(0, '')]
        util.run_unmount_cmd(cmd)
        self.assertEqual(mockPopen.call_args_list,
                         [mock.call(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE)])

    @mock.patch('subprocess.Popen')
    def test_unmount_cmd_fail_other_reason(self, mockPopen):
        cmd = 'unmount'
        mockPopen.side_effect = [self._fakeProc(1, 'It is broken')]
        with self.assertRaises(RuntimeError) as ctx:
            util.run_unmount_cmd(cmd)
        self.assertEqual(str(ctx.exception),
                         "Unhandled error when running 'unmount': 'It is broken'")
        self.assertEqual(mockPopen.call_args_list,
                         [mock.call(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE)])

    @mock.patch('time.sleep')
    @mock.patch('subprocess.Popen')
    def test_unmount_cmd_fail_then_retry(self, mockPopen, mock_sleep):
        cmd = 'unmount'
        mockPopen.side_effect = [self._fakeProc(1, 'Device or resource busy'),
                                 self._fakeProc(1, 'Device or resource busy'),
                                 self._fakeProc(0, '')]
        util.run_unmount_cmd(cmd)
        self.assertEqual(mockPopen.call_args_list,
                         [mock.call(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE)] * 3)
        self.assertEqual(mock_sleep.call_args_list,
                         [mock.call(0), mock.call(1)])

    @mock.patch('time.sleep')
    @mock.patch('subprocess.Popen')
    def test_unmount_cmd_fail_then_retry_and_fail(self, mockPopen, mock_sleep):
        cmd = 'unmount'
        mockPopen.side_effect = [self._fakeProc(1, 'Device or resource busy'),
                                 self._fakeProc(1, 'Device or resource busy'),
                                 self._fakeProc(1, 'Device or resource busy')]
        with self.assertRaises(RuntimeError) as ctx:
            util.run_unmount_cmd(cmd, max_retries=3)
        self.assertEqual(mockPopen.call_args_list,
                         [mock.call(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE)] * 3)
        self.assertEqual(mock_sleep.call_args_list,
                         [mock.call(0), mock.call(1), mock.call(2)])
        self.assertEqual(str(ctx.exception), "Failed to run 'unmount': Device or resource busy.")

    @mock.patch('time.sleep')
    @mock.patch('subprocess.Popen')
    def test_fusermount_fail_then_retry_and_fail_with_debug(self, mockPopen, mock_sleep):
        logger = mock.Mock()
        mockPopen.side_effect = [self._fakeProc(1, 'Device or resource busy'),
                                 self._fakeProc(1, 'Device or resource busy'),
                                 self._fakeProc(1, 'Device or resource busy'),
                                 self._fakeProc(0, out='list of files'),
                                 self._fakeProc(0, out='It is very busy'),
                                 self._fakeProc(1, out='lsof output')]
        with self.assertRaises(RuntimeError) as ctx:
            util.fusermount('/path', max_retries=3, logger=logger)
        cmd = ['fusermount', '-u', '/path']
        expected = [mock.call(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE),
                    mock.call(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE),
                    mock.call(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE),
                    mock.call(['ls', '-lA', '/path'],
                              stderr=subprocess.STDOUT, stdout=subprocess.PIPE),
                    mock.call(['fuser', '-vm', '/path'],
                              stderr=subprocess.STDOUT, stdout=subprocess.PIPE),
                    mock.call(['lsof', '+D', '/path'],
                              stderr=subprocess.STDOUT, stdout=subprocess.PIPE)]
        self.assertEqual(mockPopen.call_args_list, expected)
        self.assertEqual(mock_sleep.call_args_list,
                         [mock.call(0), mock.call(1), mock.call(2)])
        self.assertEqual(str(ctx.exception),
                         "Failed to run ['fusermount', '-u', '/path']: Device or resource busy.")
        self.assertEqual(logger.mock_calls,
                         [mock.call.debug('`%s` exited with %s and following output:\n%s',
                                          'ls -lA /path', 0, 'list of files'),
                          mock.call.debug('`%s` exited with %s and following output:\n%s',
                                          'fuser -vm /path', 0, 'It is very busy'),
                          mock.call.debug('`%s` exited with %s and following output:\n%s',
                                          'lsof +D /path', 1, 'lsof output')])


class TranslatePathTestCase(unittest.TestCase):
    def test_does_nothing_without_config(self):
        compose = mock.Mock(conf={'translate_paths': []})
        ret = util.translate_path(compose, '/mnt/koji/compose/rawhide/XYZ')
        self.assertEqual(ret, '/mnt/koji/compose/rawhide/XYZ')

    def test_translates_prefix(self):
        compose = mock.Mock(conf={
            'translate_paths': [('/mnt/koji', 'http://example.com')]
        })
        ret = util.translate_path(compose, '/mnt/koji/compose/rawhide/XYZ')
        self.assertEqual(ret, 'http://example.com/compose/rawhide/XYZ')

    def test_does_not_translate_not_matching(self):
        compose = mock.Mock(conf={
            'translate_paths': [('/mnt/koji', 'http://example.com')]
        })
        ret = util.translate_path(compose, '/mnt/fedora_koji/compose/rawhide/XYZ')
        self.assertEqual(ret, '/mnt/fedora_koji/compose/rawhide/XYZ')


class GetRepoFuncsTestCase(unittest.TestCase):
    @mock.patch('pungi.compose.ComposeInfo')
    def setUp(self, ci):
        self.tmp_dir = tempfile.mkdtemp()
        conf = {
            'translate_paths': [(self.tmp_dir, 'http://example.com')]
        }
        ci.return_value.compose.respin = 0
        ci.return_value.compose.id = 'RHEL-8.0-20180101.n.0'
        ci.return_value.compose.date = '20160101'
        ci.return_value.compose.type = 'nightly'
        ci.return_value.compose.type_suffix = '.n'
        ci.return_value.compose.label = 'RC-1.0'
        ci.return_value.compose.label_major_version = '1'

        compose_dir = os.path.join(self.tmp_dir, ci.return_value.compose.id)
        self.compose = compose.Compose(conf, compose_dir)
        server_variant = mock.Mock(uid='Server', type='variant')
        client_variant = mock.Mock(uid='Client', type='variant')
        self.compose.all_variants = {
            'Server': server_variant,
            'Client': client_variant,
        }

    def tearDown(self):
        shutil.rmtree(self.tmp_dir)

    def test_get_repo_url_from_normal_url(self):
        url = util.get_repo_url(self.compose, 'http://example.com/repo')
        self.assertEqual(url, 'http://example.com/repo')

    def test_get_repo_url_from_variant_uid(self):
        url = util.get_repo_url(self.compose, 'Server')
        self.assertEqual(url, 'http://example.com/RHEL-8.0-20180101.n.0/compose/Server/$basearch/os')

    def test_get_repo_url_from_repo_dict(self):
        repo = {'baseurl': 'http://example.com/repo'}
        url = util.get_repo_url(self.compose, repo)
        self.assertEqual(url, 'http://example.com/repo')

        repo = {'baseurl': 'Server'}
        url = util.get_repo_url(self.compose, repo)
        self.assertEqual(url, 'http://example.com/RHEL-8.0-20180101.n.0/compose/Server/$basearch/os')

    def test_get_repo_urls(self):
        repos = [
            'http://example.com/repo',
            'Server',
            {'baseurl': 'Client'},
            {'baseurl': 'ftp://example.com/linux/repo'},
        ]

        expect = [
            'http://example.com/repo',
            'http://example.com/RHEL-8.0-20180101.n.0/compose/Server/$basearch/os',
            'http://example.com/RHEL-8.0-20180101.n.0/compose/Client/$basearch/os',
            'ftp://example.com/linux/repo',
        ]

        self.assertEqual(util.get_repo_urls(self.compose, repos), expect)

    def test_get_repo_dict_from_normal_url(self):
        repo_dict = util.get_repo_dict(self.compose, 'http://example.com/repo')
        expect = {'name': 'http:__example.com_repo', 'baseurl': 'http://example.com/repo'}
        self.assertEqual(repo_dict, expect)

    def test_get_repo_dict_from_variant_uid(self):
        repo_dict = util.get_repo_dict(self.compose, 'Server')
        expect = {
            'name': "%s-%s" % (self.compose.compose_id, 'Server'),
            'baseurl': 'http://example.com/RHEL-8.0-20180101.n.0/compose/Server/$basearch/os',
        }
        self.assertEqual(repo_dict, expect)

    def test_get_repo_dict_from_repo_dict(self):
        repo = {'baseurl': 'Server'}
        expect = {
            'name': '%s-%s' % (self.compose.compose_id, 'Server'),
            'baseurl': 'http://example.com/RHEL-8.0-20180101.n.0/compose/Server/$basearch/os'
        }
        repo_dict = util.get_repo_dict(self.compose, repo)
        self.assertEqual(repo_dict, expect)

    def test_get_repo_dicts(self):
        repos = [
            'http://example.com/repo',
            'Server',
            {'baseurl': 'Client'},
            {'baseurl': 'ftp://example.com/linux/repo'},
            {'name': 'testrepo', 'baseurl': 'ftp://example.com/linux/repo'},
        ]
        expect = [
            {'name': 'http:__example.com_repo', 'baseurl': 'http://example.com/repo'},
            {'name': '%s-%s' % (self.compose.compose_id, 'Server'), 'baseurl': 'http://example.com/RHEL-8.0-20180101.n.0/compose/Server/$basearch/os'},
            {'name': '%s-%s' % (self.compose.compose_id, 'Client'), 'baseurl': 'http://example.com/RHEL-8.0-20180101.n.0/compose/Client/$basearch/os'},
            {'name': 'ftp:__example.com_linux_repo', 'baseurl': 'ftp://example.com/linux/repo'},
            {'name': 'testrepo', 'baseurl': 'ftp://example.com/linux/repo'},
        ]
        repos = util.get_repo_dicts(self.compose, repos)
        self.assertEqual(repos, expect)


class TestVersionGenerator(unittest.TestCase):
    def test_unknown_generator(self):
        compose = mock.Mock()
        with self.assertRaises(RuntimeError) as ctx:
            util.version_generator(compose, '!GIMME_VERSION')

        self.assertEqual(str(ctx.exception),
                         "Unknown version generator '!GIMME_VERSION'")

    def test_passthrough_value(self):
        compose = mock.Mock()
        self.assertEqual(util.version_generator(compose, '1.2.3'), '1.2.3')

    def test_passthrough_none(self):
        compose = mock.Mock()
        self.assertEqual(util.version_generator(compose, None), None)


if __name__ == "__main__":
    unittest.main()