From 857713c5a9c8e0b62c06dd92e69c09eeb34b2e99 Mon Sep 17 00:00:00 2001 From: Anuja More Date: Mon, 23 May 2022 12:26:34 +0530 Subject: [PATCH] Add end to end integration tests for external IdP Added tests for HBAC and SUDO rule and other test scenarios. Related : https://pagure.io/freeipa/issue/8805 Related: https://pagure.io/freeipa/issue/8803 Related: https://pagure.io/freeipa/issue/8804 Signed-off-by: Anuja More Reviewed-By: Florence Blanc-Renaud Reviewed-By: Florence Blanc-Renaud --- ipatests/test_integration/test_idp.py | 260 ++++++++++++++++++++++---- 1 file changed, 226 insertions(+), 34 deletions(-) diff --git a/ipatests/test_integration/test_idp.py b/ipatests/test_integration/test_idp.py index 8f9e92e6a..2ffe6a208 100644 --- a/ipatests/test_integration/test_idp.py +++ b/ipatests/test_integration/test_idp.py @@ -1,6 +1,8 @@ from __future__ import absolute_import import time +import pytest +import re import textwrap from ipaplatform.paths import paths @@ -22,12 +24,12 @@ driver.get(verification_uri) try: element = WebDriverWait(driver, 90).until( EC.presence_of_element_located((By.ID, "username"))) - driver.find_element_by_id("username").send_keys("testuser1") - driver.find_element_by_id("password").send_keys("{passwd}") - driver.find_element_by_id("kc-login").click() + driver.find_element(By.ID, "username").send_keys("testuser1") + driver.find_element(By.ID, "password").send_keys("{passwd}") + driver.find_element(By.ID, "kc-login").click() element = WebDriverWait(driver, 90).until( EC.presence_of_element_located((By.ID, "kc-login"))) - driver.find_element_by_id("kc-login").click() + driver.find_element(By.ID, "kc-login").click() assert "Device Login Successful" in driver.page_source finally: now = datetime.now().strftime("%M-%S") @@ -39,18 +41,12 @@ finally: def add_user_code(host, verification_uri): contents = user_code_script.format(uri=verification_uri, passwd=host.config.admin_password) - host.put_file_contents("/tmp/add_user_code.py", contents) - tasks.run_repeatedly( - host, ['python3', '/tmp/add_user_code.py']) - - -def get_verification_uri(host, since, keycloak_server_name): - command = textwrap.dedent(""" - journalctl -u ipa-otpd\\* --since="%s" | grep "user_code:" | awk '{ print substr($7,2,9) }'""" % since) # noqa: E501 - user_code = host.run_command(command).stdout_text.rstrip("\r\n") - uri = ("https://{0}:8443/auth/realms/master/device?user_code={1}".format( - keycloak_server_name, user_code)) - return uri + try: + host.put_file_contents("/tmp/add_user_code.py", contents) + tasks.run_repeatedly( + host, ['python3', '/tmp/add_user_code.py']) + finally: + host.run_command(["rm", "-f", "/tmp/add_user_code.py"]) def kinit_idp(host, user, keycloak_server): @@ -58,11 +54,14 @@ def kinit_idp(host, user, keycloak_server): tasks.kdestroy_all(host) # create armor for FAST host.run_command(["kinit", "-n", "-c", ARMOR]) - since = time.strftime('%Y-%m-%d %H:%M:%S') cmd = ["kinit", "-T", ARMOR, user] + with host.spawn_expect(cmd, default_timeout=100) as e: - e.expect('Authenticate at .+: ') - uri = get_verification_uri(host, since, keycloak_server.hostname) + e.expect('Authenticate at (.+) and press ENTER.:') + prompt = e.get_last_output() + uri = re.search(r'Authenticate at (.*?) and press ENTER.:', prompt + ).group(1) + time.sleep(15) if uri: add_user_code(keycloak_server, uri) e.sendline('\n') @@ -74,21 +73,27 @@ def kinit_idp(host, user, keycloak_server): class TestIDPKeycloak(IntegrationTest): - num_replicas = 1 + num_replicas = 2 topology = 'line' @classmethod def install(cls, mh): - tasks.install_master(cls.master, setup_dns=True) - tasks.install_client(cls.master, cls.replicas[0]) - content = cls.master.get_file_contents(paths.IPA_DEFAULT_CONF, - encoding='utf-8') - new_content = content + "\noidc_child_debug_level = 10" - cls.master.put_file_contents(paths.IPA_DEFAULT_CONF, new_content) + cls.client = cls.replicas[0] + cls.replica = cls.replicas[1] + tasks.install_master(cls.master) + tasks.install_client(cls.master, cls.replicas[0], + extra_args=["--mkhomedir"]) + tasks.install_replica(cls.master, cls.replicas[1]) + for host in [cls.master, cls.replicas[0], cls.replicas[1]]: + content = host.get_file_contents(paths.IPA_DEFAULT_CONF, + encoding='utf-8') + new_content = content + "\noidc_child_debug_level = 10" + host.put_file_contents(paths.IPA_DEFAULT_CONF, new_content) with tasks.remote_sssd_config(cls.master) as sssd_config: sssd_config.edit_domain( cls.master.domain, 'krb5_auth_timeout', 1100) tasks.clear_sssd_cache(cls.master) + tasks.clear_sssd_cache(cls.replicas[0]) tasks.kinit_admin(cls.master) cls.master.run_command(["ipa", "config-mod", "--user-auth-type=idp", "--user-auth-type=password"]) @@ -97,20 +102,207 @@ class TestIDPKeycloak(IntegrationTest): cls.replicas[0].run_command(xvfb) def test_auth_keycloak_idp(self): - keycloak_srv = self.replicas[0] - create_quarkus.setup_keycloakserver(keycloak_srv) + """ + Test case to check that OAuth 2.0 Device + Authorization Grant is working as + expected for user configured with external idp. + """ + create_quarkus.setup_keycloakserver(self.client) time.sleep(60) - create_quarkus.setup_keycloak_client(keycloak_srv) + create_quarkus.setup_keycloak_client(self.client) tasks.kinit_admin(self.master) - cmd = ["ipa", "idp-add", "keycloak", "--provider=keycloak", + cmd = ["ipa", "idp-add", "keycloakidp", "--provider=keycloak", "--client-id=ipa_oidc_client", "--org=master", - "--base-url={0}:8443/auth".format(keycloak_srv.hostname)] + "--base-url={0}:8443/auth".format(self.client.hostname)] self.master.run_command(cmd, stdin_text="{0}\n{0}".format( - keycloak_srv.config.admin_password)) + self.client.config.admin_password)) tasks.user_add(self.master, 'keycloakuser', extra_args=["--user-auth-type=idp", "--idp-user-id=testuser1@ipa.test", - "--idp=keycloak"] + "--idp=keycloakidp"] ) + list_user = self.master.run_command( + ["ipa", "user-find", "--idp-user-id=testuser1@ipa.test"] + ) + assert "keycloakuser" in list_user.stdout_text + list_by_idp = self.master.run_command(["ipa", "user-find", + "--idp=keycloakidp"] + ) + assert "keycloakuser" in list_by_idp.stdout_text + list_by_user = self.master.run_command( + ["ipa", "user-find", "--idp-user-id=testuser1@ipa.test", "--all"] + ) + assert "keycloakidp" in list_by_user.stdout_text + tasks.clear_sssd_cache(self.master) + kinit_idp(self.master, 'keycloakuser', keycloak_server=self.client) + + @pytest.fixture + def hbac_setup_teardown(self): + # allow sshd only on given host + tasks.kinit_admin(self.master) + self.master.run_command(["ipa", "hbacrule-disable", "allow_all"]) + self.master.run_command(["ipa", "hbacrule-add", "rule1"]) + self.master.run_command(["ipa", "hbacrule-add-user", "rule1", + "--users=keycloakuser"] + ) + self.master.run_command(["ipa", "hbacrule-add-host", "rule1", + "--hosts", self.replica.hostname]) + self.master.run_command(["ipa", "hbacrule-add-service", "rule1", + "--hbacsvcs=sshd"] + ) + tasks.clear_sssd_cache(self.master) + tasks.clear_sssd_cache(self.replica) + yield + + # cleanup + tasks.kinit_admin(self.master) + self.master.run_command(["ipa", "hbacrule-enable", "allow_all"]) + self.master.run_command(["ipa", "hbacrule-del", "rule1"]) + + def test_auth_hbac(self, hbac_setup_teardown): + """ + Test case to check that hbacrule is working as + expected for user configured with external idp. + """ + kinit_idp(self.master, 'keycloakuser', keycloak_server=self.client) + ssh_cmd = "ssh -q -K -l keycloakuser {0} whoami" + valid_ssh = self.master.run_command( + ssh_cmd.format(self.replica.hostname)) + assert "keycloakuser" in valid_ssh.stdout_text + negative_ssh = self.master.run_command( + ssh_cmd.format(self.master.hostname), raiseonerr=False + ) + assert negative_ssh.returncode == 255 + + def test_auth_sudo_idp(self): + """ + Test case to check that sudorule is working as + expected for user configured with external idp. + """ + tasks.kdestroy_all(self.master) + tasks.kinit_admin(self.master) + # rule: keycloakuser are allowed to execute yum on + # the client machine as root. + cmdlist = [ + ["ipa", "sudocmd-add", "/usr/bin/yum"], + ["ipa", "sudorule-add", "sudorule"], + ['ipa', 'sudorule-add-user', '--users=keycloakuser', + 'sudorule'], + ['ipa', 'sudorule-add-host', '--hosts', + self.client.hostname, 'sudorule'], + ['ipa', 'sudorule-add-runasuser', + '--users=root', 'sudorule'], + ['ipa', 'sudorule-add-allow-command', + '--sudocmds=/usr/bin/yum', 'sudorule'], + ['ipa', 'sudorule-show', 'sudorule', '--all'], + ['ipa', 'sudorule-add-option', + 'sudorule', '--sudooption', "!authenticate"] + ] + for cmd in cmdlist: + self.master.run_command(cmd) + tasks.clear_sssd_cache(self.master) + tasks.clear_sssd_cache(self.client) + try: + cmd = 'sudo -ll -U keycloakuser' + test = self.client.run_command(cmd).stdout_text + assert "User keycloakuser may run the following commands" in test + assert "/usr/bin/yum" in test + kinit_idp(self.client, 'keycloakuser', self.client) + test_sudo = 'su -c "sudo yum list wget" keycloakuser' + self.client.run_command(test_sudo) + list_fail = self.master.run_command(cmd).stdout_text + assert "User keycloakuser is not allowed to run sudo" in list_fail + finally: + tasks.kinit_admin(self.master) + self.master.run_command(['ipa', 'sudorule-del', 'sudorule']) + self.master.run_command(["ipa", "sudocmd-del", "/usr/bin/yum"]) + + def test_auth_replica(self): + """ + Test case to check that OAuth 2.0 Device + Authorization is working as expected on replica. + """ + tasks.clear_sssd_cache(self.master) + tasks.clear_sssd_cache(self.replica) + tasks.kinit_admin(self.replica) + list_user = self.master.run_command( + ["ipa", "user-find", "--idp-user-id=testuser1@ipa.test"] + ) + assert "keycloakuser" in list_user.stdout_text + list_by_idp = self.replica.run_command(["ipa", "user-find", + "--idp=keycloakidp"] + ) + assert "keycloakuser" in list_by_idp.stdout_text + list_by_user = self.replica.run_command( + ["ipa", "user-find", "--idp-user-id=testuser1@ipa.test", "--all"] + ) + assert "keycloakidp" in list_by_user.stdout_text + kinit_idp(self.replica, 'keycloakuser', keycloak_server=self.client) + + def test_idp_with_services(self): + """ + Test case to check that services can be configured + auth indicator as idp. + """ tasks.clear_sssd_cache(self.master) - kinit_idp(self.master, 'keycloakuser', keycloak_srv) + tasks.kinit_admin(self.master) + domain = self.master.domain.name.upper() + services = [ + "DNS/{0}@{1}".format(self.master.hostname, domain), + "HTTP/{0}@{1}".format(self.client.hostname, domain), + "dogtag/{0}@{1}".format(self.master.hostname, domain), + "ipa-dnskeysyncd/{0}@{1}".format(self.master.hostname, domain) + ] + try: + for service in services: + test = self.master.run_command(["ipa", "service-mod", service, + "--auth-ind=idp"] + ) + assert "Authentication Indicators: idp" in test.stdout_text + finally: + for service in services: + self.master.run_command(["ipa", "service-mod", service, + "--auth-ind="]) + + def test_idp_backup_restore(self): + """ + Test case to check that after restore data is retrieved + with related idp configuration. + """ + tasks.kinit_admin(self.master) + user = "backupuser" + cmd = ["ipa", "idp-add", "testidp", "--provider=keycloak", + "--client-id=ipa_oidc_client", "--org=master", + "--base-url={0}:8443/auth".format(self.client.hostname)] + self.master.run_command(cmd, stdin_text="{0}\n{0}".format( + self.client.config.admin_password)) + + tasks.user_add(self.master, user, + extra_args=["--user-auth-type=idp", + "--idp-user-id=testuser1@ipa.test", + "--idp=testidp"] + ) + + backup_path = tasks.get_backup_dir(self.master) + # change data after backup + self.master.run_command(['ipa', 'user-del', user]) + self.master.run_command(['ipa', 'idp-del', 'testidp']) + dirman_password = self.master.config.dirman_password + self.master.run_command(['ipa-restore', backup_path], + stdin_text=dirman_password + '\nyes') + try: + list_user = self.master.run_command( + ['ipa', 'user-show', 'backupuser', '--all'] + ).stdout_text + assert "External IdP configuration: testidp" in list_user + assert "User authentication types: idp" in list_user + assert ("External IdP user identifier: " + "testuser1@ipa.test") in list_user + list_idp = self.master.run_command(['ipa', 'idp-find', 'testidp']) + assert "testidp" in list_idp.stdout_text + kinit_idp(self.master, user, self.client) + finally: + tasks.kdestroy_all(self.master) + tasks.kinit_admin(self.master) + self.master.run_command(["rm", "-rf", backup_path]) + self.master.run_command(["ipa", "idp-del", "testidp"]) -- 2.36.1