# -*- coding: utf-8 -*-

import itertools
import mock
import os
import six

try:
    import unittest2 as unittest
except ImportError:
    import unittest

from pungi.wrappers import iso

CORRECT_OUTPUT = """dummy.iso:   31ff3e405e26ad01c63b62f6b11d30f6
Fragment sums: 6eb92e7bda221d7fe5f19b4d21468c9bf261d84c96d145d96c76444b9cbc
Fragment count: 20
Supported ISO: no
"""

INCORRECT_OUTPUT = """This should never happen: File not found"""

# Cached to use in tests that mock os.listdir
orig_listdir = os.listdir


def fake_listdir(pattern, result=None, exc=None):
    """Create a function that mocks os.listdir. If the path contains pattern,
    result will be returned or exc raised. Otherwise it's normal os.listdir
    """

    # The point of this is to avoid issues on Python 2, where apparently
    # isdir() is using listdir(), so the mocking is breaking it.
    def worker(path):
        if isinstance(path, six.string_types) and pattern in path:
            if exc:
                raise exc
            return result
        return orig_listdir(path)

    return worker


class TestIsoUtils(unittest.TestCase):
    @mock.patch("pungi.wrappers.iso.run")
    def test_get_implanted_md5_correct(self, mock_run):
        mock_run.return_value = (0, CORRECT_OUTPUT)
        logger = mock.Mock()
        self.assertEqual(
            iso.get_implanted_md5("dummy.iso", logger=logger),
            "31ff3e405e26ad01c63b62f6b11d30f6",
        )
        self.assertEqual(
            mock_run.call_args_list,
            [
                mock.call(
                    ["/usr/bin/checkisomd5", "--md5sumonly", "dummy.iso"],
                    universal_newlines=True,
                )
            ],
        )
        self.assertEqual(logger.mock_calls, [])

    @mock.patch("pungi.wrappers.iso.run")
    def test_get_implanted_md5_incorrect(self, mock_run):
        mock_run.return_value = (0, INCORRECT_OUTPUT)
        logger = mock.Mock()
        self.assertEqual(iso.get_implanted_md5("dummy.iso", logger=logger), None)
        self.assertEqual(
            mock_run.call_args_list,
            [
                mock.call(
                    ["/usr/bin/checkisomd5", "--md5sumonly", "dummy.iso"],
                    universal_newlines=True,
                )
            ],
        )
        self.assertTrue(len(logger.mock_calls) > 0)

    @mock.patch("pungi.util.run_unmount_cmd")
    @mock.patch("pungi.wrappers.iso.run")
    def test_mount_iso(self, mock_run, mock_unmount):
        # first tuple is return value for command 'which guestmount'
        # value determines type of the mount/unmount
        # command ('1' - guestmount is not available)
        # for approach as a root, pair commands mount-umount are used
        mock_run.side_effect = [(1, ""), (0, "")]
        with iso.mount("dummy") as temp_dir:
            self.assertTrue(os.path.isdir(temp_dir))
        self.assertEqual(len(mock_run.call_args_list), 2)
        mount_call_str = str(mock_run.call_args_list[1])
        self.assertTrue(mount_call_str.startswith("call(['mount'"))
        self.assertEqual(len(mock_unmount.call_args_list), 1)
        unmount_call_str = str(mock_unmount.call_args_list[0])
        self.assertTrue(unmount_call_str.startswith("call(['umount'"))
        self.assertFalse(os.path.isdir(temp_dir))

    @mock.patch("pungi.util.rmtree")
    @mock.patch("os.listdir", new=fake_listdir("guestfs", ["root"]))
    @mock.patch("pungi.util.run_unmount_cmd")
    @mock.patch("pungi.wrappers.iso.run")
    def test_guestmount(self, mock_run, mock_unmount, mock_rmtree):
        # first tuple is return value for command 'which guestmount'
        # value determines type of the mount/unmount
        # command ('0' - guestmount is available)
        # for approach as a non-root, pair commands guestmount-fusermount are used
        mock_run.side_effect = [(0, ""), (0, "")]
        with iso.mount("dummy") as temp_dir:
            self.assertTrue(os.path.isdir(temp_dir))
        self.assertEqual(len(mock_run.call_args_list), 2)
        mount_call_str = str(mock_run.call_args_list[1])
        self.assertTrue(mount_call_str.startswith("call(['guestmount'"))
        self.assertEqual(len(mock_unmount.call_args_list), 1)
        unmount_call_str = str(mock_unmount.call_args_list[0])
        self.assertTrue(unmount_call_str.startswith("call(['fusermount'"))
        self.assertFalse(os.path.isdir(temp_dir))

    @mock.patch("pungi.util.rmtree")
    @mock.patch("os.listdir", new=fake_listdir("guestfs", []))
    @mock.patch("pungi.util.run_unmount_cmd")
    @mock.patch("pungi.wrappers.iso.run")
    def test_guestmount_cleans_up_cache(self, mock_run, mock_unmount, mock_rmtree):
        # first tuple is return value for command 'which guestmount'
        # value determines type of the mount/unmount
        # command ('0' - guestmount is available)
        # for approach as a non-root, pair commands guestmount-fusermount are used
        mock_run.side_effect = [(0, ""), (0, "")]
        with iso.mount("dummy") as temp_dir:
            self.assertTrue(os.path.isdir(temp_dir))
        self.assertEqual(len(mock_run.call_args_list), 2)
        mount_call_str = str(mock_run.call_args_list[1])
        self.assertTrue(mount_call_str.startswith("call(['guestmount'"))
        self.assertEqual(len(mock_unmount.call_args_list), 1)
        unmount_call_str = str(mock_unmount.call_args_list[0])
        self.assertTrue(unmount_call_str.startswith("call(['fusermount'"))
        self.assertFalse(os.path.isdir(temp_dir))

    @mock.patch("pungi.util.rmtree")
    @mock.patch("os.listdir", new=fake_listdir("guestfs", OSError("No such file")))
    @mock.patch("pungi.util.run_unmount_cmd")
    @mock.patch("pungi.wrappers.iso.run")
    def test_guestmount_handles_missing_cache(
        self, mock_run, mock_unmount, mock_rmtree
    ):
        # first tuple is return value for command 'which guestmount'
        # value determines type of the mount/unmount
        # command ('0' - guestmount is available)
        # for approach as a non-root, pair commands guestmount-fusermount are used
        mock_run.side_effect = [(0, ""), (0, "")]
        with iso.mount("dummy") as temp_dir:
            self.assertTrue(os.path.isdir(temp_dir))
        self.assertEqual(len(mock_run.call_args_list), 2)
        mount_call_str = str(mock_run.call_args_list[1])
        self.assertTrue(mount_call_str.startswith("call(['guestmount'"))
        self.assertEqual(len(mock_unmount.call_args_list), 1)
        unmount_call_str = str(mock_unmount.call_args_list[0])
        self.assertTrue(unmount_call_str.startswith("call(['fusermount'"))
        self.assertFalse(os.path.isdir(temp_dir))

    @mock.patch("pungi.util.run_unmount_cmd")
    @mock.patch("pungi.wrappers.iso.run")
    def test_mount_iso_always_unmounts(self, mock_run, mock_unmount):
        mock_run.side_effect = [(1, ""), (0, "")]
        try:
            with iso.mount("dummy") as temp_dir:
                self.assertTrue(os.path.isdir(temp_dir))
                raise RuntimeError()
        except RuntimeError:
            pass
        self.assertEqual(len(mock_run.call_args_list), 2)
        self.assertEqual(len(mock_unmount.call_args_list), 1)
        self.assertFalse(os.path.isdir(temp_dir))

    @mock.patch("pungi.util.run_unmount_cmd")
    @mock.patch("pungi.wrappers.iso.run")
    def test_mount_iso_raises_on_error(self, mock_run, mock_unmount):
        log = mock.Mock()
        mock_run.side_effect = [(1, ""), (1, "Boom")]
        with self.assertRaises(RuntimeError):
            with iso.mount("dummy", logger=log) as temp_dir:
                self.assertTrue(os.path.isdir(temp_dir))
        self.assertEqual(len(mock_run.call_args_list), 2)
        self.assertEqual(len(mock_unmount.call_args_list), 0)
        self.assertEqual(len(log.mock_calls), 1)


class TestCmpGraftPoints(unittest.TestCase):
    def assertSorted(self, *args):
        """Tests that all permutations of arguments yield the same sorted results."""
        for perm in itertools.permutations(args):
            self.assertEqual(sorted(perm, key=iso.graft_point_sort_key), list(args))

    def test_eq(self):
        self.assertSorted("pkgs/foo.rpm", "pkgs/foo.rpm")

    def test_rpms_sorted_alphabetically(self):
        self.assertSorted("pkgs/bar.rpm", "pkgs/foo.rpm")

    def test_images_sorted_alphabetically(self):
        self.assertSorted("aaa.img", "images/foo", "isolinux/foo")

    def test_other_files_sorted_alphabetically(self):
        self.assertSorted("bar.txt", "foo.txt")

    def test_rpms_after_images(self):
        self.assertSorted("foo.ins", "bar.rpm")

    def test_other_after_images(self):
        self.assertSorted("EFI/anything", "zzz.txt")

    def test_rpm_after_other(self):
        self.assertSorted("bbb.txt", "aaa.rpm")

    def test_all_kinds(self):
        self.assertSorted("etc/file", "ppc/file", "c.txt", "d.txt", "a.rpm", "b.rpm")