diff --git a/ci-Stop-copying-ssh-system-keys-and-check-folder-permis.patch b/ci-Stop-copying-ssh-system-keys-and-check-folder-permis.patch new file mode 100644 index 0000000..f29354c --- /dev/null +++ b/ci-Stop-copying-ssh-system-keys-and-check-folder-permis.patch @@ -0,0 +1,1383 @@ +From 56a18921655fa829b7b76d6d515a45dd7c733620 Mon Sep 17 00:00:00 2001 +From: Emanuele Giuseppe Esposito +Date: Tue, 10 Aug 2021 00:05:25 +0200 +Subject: [PATCH 1/2] Stop copying ssh system keys and check folder permissions + (#956) + +RH-Author: Emanuele Giuseppe Esposito +RH-MergeRequest: 7: Stop copying ssh system keys and check folder permissions (#956) +RH-Commit: [1/1] e475216a77e3ae6ba9b699fb3ea50bb9c0a88dae (eesposit/cloud-init-centos-) +RH-Bugzilla: 1979099 +RH-Acked-by: Mohamed Gamal Morsy +RH-Acked-by: Eduardo Otubo + +This is a continuation of previous MR 25 and upstream PR #937. +There were still issues when using non-standard file paths like +/etc/ssh/userkeys/%u or /etc/ssh/authorized_keys, and the choice +of storing the keys of all authorized_keys files into a single +one was not ideal. This fix modifies cloudinit to support +all different cases of authorized_keys file locations, and +picks a user-specific file where to copy the new keys that +complies with ssh permissions. + +commit 00dbaf1e9ab0e59d81662f0f3561897bef499a3f +Author: Emanuele Giuseppe Esposito +Date: Mon Aug 9 16:49:56 2021 +0200 + + Stop copying ssh system keys and check folder permissions (#956) + + In /etc/ssh/sshd_config, it is possible to define a custom + authorized_keys file that will contain the keys allowed to access the + machine via the AuthorizedKeysFile option. Cloudinit is able to add + user-specific keys to the existing ones, but we need to be careful on + which of the authorized_keys files listed to pick. + Chosing a file that is shared by all user will cause security + issues, because the owner of that key can then access also other users. + + We therefore pick an authorized_keys file only if it satisfies the + following conditions: + 1. it is not a "global" file, ie it must be defined in + AuthorizedKeysFile with %u, %h or be in /home/. This avoids + security issues. + 2. it must comply with ssh permission requirements, otherwise the ssh + agent won't use that file. + + If it doesn't meet either of those conditions, write to + ~/.ssh/authorized_keys + + We also need to consider the case when the chosen authorized_keys file + does not exist. In this case, the existing behavior of cloud-init is + to create the new file. We therefore need to be sure that the file + complies with ssh permissions too, by setting: + - the actual file to permission 600, and owned by the user + - the directories in the path that do not exist must be root owned and + with permission 755. + +Signed-off-by: Emanuele Giuseppe Esposito +Signed-off-by: Miroslav Rezanina +--- + cloudinit/ssh_util.py | 133 ++++- + cloudinit/util.py | 51 +- + tests/unittests/test_sshutil.py | 952 +++++++++++++++++++++++++------- + 3 files changed, 920 insertions(+), 216 deletions(-) + +diff --git a/cloudinit/ssh_util.py b/cloudinit/ssh_util.py +index 89057262..b8a3c8f7 100644 +--- a/cloudinit/ssh_util.py ++++ b/cloudinit/ssh_util.py +@@ -249,6 +249,113 @@ def render_authorizedkeysfile_paths(value, homedir, username): + return rendered + + ++# Inspired from safe_path() in openssh source code (misc.c). ++def check_permissions(username, current_path, full_path, is_file, strictmodes): ++ """Check if the file/folder in @current_path has the right permissions. ++ ++ We need to check that: ++ 1. If StrictMode is enabled, the owner is either root or the user ++ 2. the user can access the file/folder, otherwise ssh won't use it ++ 3. If StrictMode is enabled, no write permission is given to group ++ and world users (022) ++ """ ++ ++ # group/world can only execute the folder (access) ++ minimal_permissions = 0o711 ++ if is_file: ++ # group/world can only read the file ++ minimal_permissions = 0o644 ++ ++ # 1. owner must be either root or the user itself ++ owner = util.get_owner(current_path) ++ if strictmodes and owner != username and owner != "root": ++ LOG.debug("Path %s in %s must be own by user %s or" ++ " by root, but instead is own by %s. Ignoring key.", ++ current_path, full_path, username, owner) ++ return False ++ ++ parent_permission = util.get_permissions(current_path) ++ # 2. the user can access the file/folder, otherwise ssh won't use it ++ if owner == username: ++ # need only the owner permissions ++ minimal_permissions &= 0o700 ++ else: ++ group_owner = util.get_group(current_path) ++ user_groups = util.get_user_groups(username) ++ ++ if group_owner in user_groups: ++ # need only the group permissions ++ minimal_permissions &= 0o070 ++ else: ++ # need only the world permissions ++ minimal_permissions &= 0o007 ++ ++ if parent_permission & minimal_permissions == 0: ++ LOG.debug("Path %s in %s must be accessible by user %s," ++ " check its permissions", ++ current_path, full_path, username) ++ return False ++ ++ # 3. no write permission (w) is given to group and world users (022) ++ # Group and world user can still have +rx. ++ if strictmodes and parent_permission & 0o022 != 0: ++ LOG.debug("Path %s in %s must not give write" ++ "permission to group or world users. Ignoring key.", ++ current_path, full_path) ++ return False ++ ++ return True ++ ++ ++def check_create_path(username, filename, strictmodes): ++ user_pwent = users_ssh_info(username)[1] ++ root_pwent = users_ssh_info("root")[1] ++ try: ++ # check the directories first ++ directories = filename.split("/")[1:-1] ++ ++ # scan in order, from root to file name ++ parent_folder = "" ++ # this is to comply also with unit tests, and ++ # strange home directories ++ home_folder = os.path.dirname(user_pwent.pw_dir) ++ for directory in directories: ++ parent_folder += "/" + directory ++ if home_folder.startswith(parent_folder): ++ continue ++ ++ if not os.path.isdir(parent_folder): ++ # directory does not exist, and permission so far are good: ++ # create the directory, and make it accessible by everyone ++ # but owned by root, as it might be used by many users. ++ with util.SeLinuxGuard(parent_folder): ++ os.makedirs(parent_folder, mode=0o755, exist_ok=True) ++ util.chownbyid(parent_folder, root_pwent.pw_uid, ++ root_pwent.pw_gid) ++ ++ permissions = check_permissions(username, parent_folder, ++ filename, False, strictmodes) ++ if not permissions: ++ return False ++ ++ # check the file ++ if not os.path.exists(filename): ++ # if file does not exist: we need to create it, since the ++ # folders at this point exist and have right permissions ++ util.write_file(filename, '', mode=0o600, ensure_dir_exists=True) ++ util.chownbyid(filename, user_pwent.pw_uid, user_pwent.pw_gid) ++ ++ permissions = check_permissions(username, filename, ++ filename, True, strictmodes) ++ if not permissions: ++ return False ++ except (IOError, OSError) as e: ++ util.logexc(LOG, str(e)) ++ return False ++ ++ return True ++ ++ + def extract_authorized_keys(username, sshd_cfg_file=DEF_SSHD_CFG): + (ssh_dir, pw_ent) = users_ssh_info(username) + default_authorizedkeys_file = os.path.join(ssh_dir, 'authorized_keys') +@@ -259,6 +366,7 @@ def extract_authorized_keys(username, sshd_cfg_file=DEF_SSHD_CFG): + ssh_cfg = parse_ssh_config_map(sshd_cfg_file) + key_paths = ssh_cfg.get("authorizedkeysfile", + "%h/.ssh/authorized_keys") ++ strictmodes = ssh_cfg.get("strictmodes", "yes") + auth_key_fns = render_authorizedkeysfile_paths( + key_paths, pw_ent.pw_dir, username) + +@@ -269,31 +377,31 @@ def extract_authorized_keys(username, sshd_cfg_file=DEF_SSHD_CFG): + "config from %r, using 'AuthorizedKeysFile' file " + "%r instead", DEF_SSHD_CFG, auth_key_fns[0]) + +- # check if one of the keys is the user's one ++ # check if one of the keys is the user's one and has the right permissions + for key_path, auth_key_fn in zip(key_paths.split(), auth_key_fns): + if any([ + '%u' in key_path, + '%h' in key_path, + auth_key_fn.startswith('{}/'.format(pw_ent.pw_dir)) + ]): +- user_authorizedkeys_file = auth_key_fn ++ permissions_ok = check_create_path(username, auth_key_fn, ++ strictmodes == "yes") ++ if permissions_ok: ++ user_authorizedkeys_file = auth_key_fn ++ break + + if user_authorizedkeys_file != default_authorizedkeys_file: + LOG.debug( + "AuthorizedKeysFile has an user-specific authorized_keys, " + "using %s", user_authorizedkeys_file) + +- # always store all the keys in the user's private file +- return (user_authorizedkeys_file, parse_authorized_keys(auth_key_fns)) ++ return ( ++ user_authorizedkeys_file, ++ parse_authorized_keys([user_authorizedkeys_file]) ++ ) + + + def setup_user_keys(keys, username, options=None): +- # Make sure the users .ssh dir is setup accordingly +- (ssh_dir, pwent) = users_ssh_info(username) +- if not os.path.isdir(ssh_dir): +- util.ensure_dir(ssh_dir, mode=0o700) +- util.chownbyid(ssh_dir, pwent.pw_uid, pwent.pw_gid) +- + # Turn the 'update' keys given into actual entries + parser = AuthKeyLineParser() + key_entries = [] +@@ -302,11 +410,10 @@ def setup_user_keys(keys, username, options=None): + + # Extract the old and make the new + (auth_key_fn, auth_key_entries) = extract_authorized_keys(username) ++ ssh_dir = os.path.dirname(auth_key_fn) + with util.SeLinuxGuard(ssh_dir, recursive=True): + content = update_authorized_keys(auth_key_entries, key_entries) +- util.ensure_dir(os.path.dirname(auth_key_fn), mode=0o700) +- util.write_file(auth_key_fn, content, mode=0o600) +- util.chownbyid(auth_key_fn, pwent.pw_uid, pwent.pw_gid) ++ util.write_file(auth_key_fn, content, preserve_mode=True) + + + class SshdConfigLine(object): +diff --git a/cloudinit/util.py b/cloudinit/util.py +index 4e0a72db..343976ad 100644 +--- a/cloudinit/util.py ++++ b/cloudinit/util.py +@@ -35,6 +35,7 @@ from base64 import b64decode, b64encode + from errno import ENOENT + from functools import lru_cache + from urllib import parse ++from typing import List + + from cloudinit import importer + from cloudinit import log as logging +@@ -1830,6 +1831,53 @@ def chmod(path, mode): + os.chmod(path, real_mode) + + ++def get_permissions(path: str) -> int: ++ """ ++ Returns the octal permissions of the file/folder pointed by the path, ++ encoded as an int. ++ ++ @param path: The full path of the file/folder. ++ """ ++ ++ return stat.S_IMODE(os.stat(path).st_mode) ++ ++ ++def get_owner(path: str) -> str: ++ """ ++ Returns the owner of the file/folder pointed by the path. ++ ++ @param path: The full path of the file/folder. ++ """ ++ st = os.stat(path) ++ return pwd.getpwuid(st.st_uid).pw_name ++ ++ ++def get_group(path: str) -> str: ++ """ ++ Returns the group of the file/folder pointed by the path. ++ ++ @param path: The full path of the file/folder. ++ """ ++ st = os.stat(path) ++ return grp.getgrgid(st.st_gid).gr_name ++ ++ ++def get_user_groups(username: str) -> List[str]: ++ """ ++ Returns a list of all groups to which the user belongs ++ ++ @param username: the user we want to check ++ """ ++ groups = [] ++ for group in grp.getgrall(): ++ if username in group.gr_mem: ++ groups.append(group.gr_name) ++ ++ gid = pwd.getpwnam(username).pw_gid ++ groups.append(grp.getgrgid(gid).gr_name) ++ return groups ++ ++ + def write_file( + filename, + content, +@@ -1856,8 +1904,7 @@ def write_file( + + if preserve_mode: + try: +- file_stat = os.stat(filename) +- mode = stat.S_IMODE(file_stat.st_mode) ++ mode = get_permissions(filename) + except OSError: + pass + +diff --git a/tests/unittests/test_sshutil.py b/tests/unittests/test_sshutil.py +index bcb8044f..a66788bf 100644 +--- a/tests/unittests/test_sshutil.py ++++ b/tests/unittests/test_sshutil.py +@@ -1,6 +1,9 @@ + # This file is part of cloud-init. See LICENSE file for license information. + ++import os ++ + from collections import namedtuple ++from functools import partial + from unittest.mock import patch + + from cloudinit import ssh_util +@@ -8,13 +11,48 @@ from cloudinit.tests import helpers as test_helpers + from cloudinit import util + + # https://stackoverflow.com/questions/11351032/ +-FakePwEnt = namedtuple( +- 'FakePwEnt', +- ['pw_dir', 'pw_gecos', 'pw_name', 'pw_passwd', 'pw_shell', 'pwd_uid']) ++FakePwEnt = namedtuple('FakePwEnt', [ ++ 'pw_name', ++ 'pw_passwd', ++ 'pw_uid', ++ 'pw_gid', ++ 'pw_gecos', ++ 'pw_dir', ++ 'pw_shell', ++]) + FakePwEnt.__new__.__defaults__ = tuple( + "UNSET_%s" % n for n in FakePwEnt._fields) + + ++def mock_get_owner(updated_permissions, value): ++ try: ++ return updated_permissions[value][0] ++ except ValueError: ++ return util.get_owner(value) ++ ++ ++def mock_get_group(updated_permissions, value): ++ try: ++ return updated_permissions[value][1] ++ except ValueError: ++ return util.get_group(value) ++ ++ ++def mock_get_user_groups(username): ++ return username ++ ++ ++def mock_get_permissions(updated_permissions, value): ++ try: ++ return updated_permissions[value][2] ++ except ValueError: ++ return util.get_permissions(value) ++ ++ ++def mock_getpwnam(users, username): ++ return users[username] ++ ++ + # Do not use these public keys, most of them are fetched from + # the testdata for OpenSSH, and their private keys are available + # https://github.com/openssh/openssh-portable/tree/master/regress/unittests/sshkey/testdata +@@ -552,12 +590,30 @@ class TestBasicAuthorizedKeyParse(test_helpers.CiTestCase): + ssh_util.render_authorizedkeysfile_paths( + "/opt/%u/keys", "/home/bobby", "bobby")) + ++ def test_user_file(self): ++ self.assertEqual( ++ ["/opt/bobby"], ++ ssh_util.render_authorizedkeysfile_paths( ++ "/opt/%u", "/home/bobby", "bobby")) ++ ++ def test_user_file2(self): ++ self.assertEqual( ++ ["/opt/bobby/bobby"], ++ ssh_util.render_authorizedkeysfile_paths( ++ "/opt/%u/%u", "/home/bobby", "bobby")) ++ + def test_multiple(self): + self.assertEqual( + ["/keys/path1", "/keys/path2"], + ssh_util.render_authorizedkeysfile_paths( + "/keys/path1 /keys/path2", "/home/bobby", "bobby")) + ++ def test_multiple2(self): ++ self.assertEqual( ++ ["/keys/path1", "/keys/bobby"], ++ ssh_util.render_authorizedkeysfile_paths( ++ "/keys/path1 /keys/%u", "/home/bobby", "bobby")) ++ + def test_relative(self): + self.assertEqual( + ["/home/bobby/.secret/keys"], +@@ -581,269 +637,763 @@ class TestBasicAuthorizedKeyParse(test_helpers.CiTestCase): + + class TestMultipleSshAuthorizedKeysFile(test_helpers.CiTestCase): + +- @patch("cloudinit.ssh_util.pwd.getpwnam") +- def test_multiple_authorizedkeys_file_order1(self, m_getpwnam): +- fpw = FakePwEnt(pw_name='bobby', pw_dir='/tmp/home2/bobby') +- m_getpwnam.return_value = fpw +- user_ssh_folder = "%s/.ssh" % fpw.pw_dir +- +- # /tmp/home2/bobby/.ssh/authorized_keys = rsa +- authorized_keys = self.tmp_path('authorized_keys', dir=user_ssh_folder) +- util.write_file(authorized_keys, VALID_CONTENT['rsa']) +- +- # /tmp/home2/bobby/.ssh/user_keys = dsa +- user_keys = self.tmp_path('user_keys', dir=user_ssh_folder) +- util.write_file(user_keys, VALID_CONTENT['dsa']) +- +- # /tmp/sshd_config ++ def create_fake_users(self, names, mock_permissions, ++ m_get_group, m_get_owner, m_get_permissions, ++ m_getpwnam, users): ++ homes = [] ++ ++ root = '/tmp/root' ++ fpw = FakePwEnt(pw_name="root", pw_dir=root) ++ users["root"] = fpw ++ ++ for name in names: ++ home = '/tmp/home/' + name ++ fpw = FakePwEnt(pw_name=name, pw_dir=home) ++ users[name] = fpw ++ homes.append(home) ++ ++ m_get_permissions.side_effect = partial( ++ mock_get_permissions, mock_permissions) ++ m_get_owner.side_effect = partial(mock_get_owner, mock_permissions) ++ m_get_group.side_effect = partial(mock_get_group, mock_permissions) ++ m_getpwnam.side_effect = partial(mock_getpwnam, users) ++ return homes ++ ++ def create_user_authorized_file(self, home, filename, content_key, keys): ++ user_ssh_folder = "%s/.ssh" % home ++ # /tmp/home//.ssh/authorized_keys = content_key ++ authorized_keys = self.tmp_path(filename, dir=user_ssh_folder) ++ util.write_file(authorized_keys, VALID_CONTENT[content_key]) ++ keys[authorized_keys] = content_key ++ return authorized_keys ++ ++ def create_global_authorized_file(self, filename, content_key, keys): ++ authorized_keys = self.tmp_path(filename, dir='/tmp') ++ util.write_file(authorized_keys, VALID_CONTENT[content_key]) ++ keys[authorized_keys] = content_key ++ return authorized_keys ++ ++ def create_sshd_config(self, authorized_keys_files): + sshd_config = self.tmp_path('sshd_config', dir="/tmp") + util.write_file( + sshd_config, +- "AuthorizedKeysFile %s %s" % (authorized_keys, user_keys) ++ "AuthorizedKeysFile " + authorized_keys_files + ) ++ return sshd_config + ++ def execute_and_check(self, user, sshd_config, solution, keys, ++ delete_keys=True): + (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys( +- fpw.pw_name, sshd_config) ++ user, sshd_config) + content = ssh_util.update_authorized_keys(auth_key_entries, []) + +- self.assertEqual(user_keys, auth_key_fn) +- self.assertTrue(VALID_CONTENT['rsa'] in content) +- self.assertTrue(VALID_CONTENT['dsa'] in content) ++ self.assertEqual(auth_key_fn, solution) ++ for path, key in keys.items(): ++ if path == solution: ++ self.assertTrue(VALID_CONTENT[key] in content) ++ else: ++ self.assertFalse(VALID_CONTENT[key] in content) ++ ++ if delete_keys and os.path.isdir("/tmp/home/"): ++ util.delete_dir_contents("/tmp/home/") + + @patch("cloudinit.ssh_util.pwd.getpwnam") +- def test_multiple_authorizedkeys_file_order2(self, m_getpwnam): +- fpw = FakePwEnt(pw_name='suzie', pw_dir='/tmp/home/suzie') +- m_getpwnam.return_value = fpw +- user_ssh_folder = "%s/.ssh" % fpw.pw_dir ++ @patch("cloudinit.util.get_permissions") ++ @patch("cloudinit.util.get_owner") ++ @patch("cloudinit.util.get_group") ++ def test_single_user_two_local_files( ++ self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam ++ ): ++ user_bobby = 'bobby' ++ keys = {} ++ users = {} ++ mock_permissions = { ++ '/tmp/home/bobby': ('bobby', 'bobby', 0o700), ++ '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700), ++ '/tmp/home/bobby/.ssh/user_keys': ('bobby', 'bobby', 0o600), ++ '/tmp/home/bobby/.ssh/authorized_keys': ('bobby', 'bobby', 0o600), ++ } ++ ++ homes = self.create_fake_users( ++ [user_bobby], mock_permissions, m_get_group, m_get_owner, ++ m_get_permissions, m_getpwnam, users ++ ) ++ home = homes[0] + +- # /tmp/home/suzie/.ssh/authorized_keys = rsa +- authorized_keys = self.tmp_path('authorized_keys', dir=user_ssh_folder) +- util.write_file(authorized_keys, VALID_CONTENT['rsa']) ++ # /tmp/home/bobby/.ssh/authorized_keys = rsa ++ authorized_keys = self.create_user_authorized_file( ++ home, 'authorized_keys', 'rsa', keys ++ ) + +- # /tmp/home/suzie/.ssh/user_keys = dsa +- user_keys = self.tmp_path('user_keys', dir=user_ssh_folder) +- util.write_file(user_keys, VALID_CONTENT['dsa']) ++ # /tmp/home/bobby/.ssh/user_keys = dsa ++ user_keys = self.create_user_authorized_file( ++ home, 'user_keys', 'dsa', keys ++ ) + + # /tmp/sshd_config +- sshd_config = self.tmp_path('sshd_config', dir="/tmp") +- util.write_file( +- sshd_config, +- "AuthorizedKeysFile %s %s" % (user_keys, authorized_keys) ++ options = "%s %s" % (authorized_keys, user_keys) ++ sshd_config = self.create_sshd_config(options) ++ ++ self.execute_and_check(user_bobby, sshd_config, authorized_keys, keys) ++ ++ @patch("cloudinit.ssh_util.pwd.getpwnam") ++ @patch("cloudinit.util.get_permissions") ++ @patch("cloudinit.util.get_owner") ++ @patch("cloudinit.util.get_group") ++ def test_single_user_two_local_files_inverted( ++ self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam ++ ): ++ user_bobby = 'bobby' ++ keys = {} ++ users = {} ++ mock_permissions = { ++ '/tmp/home/bobby': ('bobby', 'bobby', 0o700), ++ '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700), ++ '/tmp/home/bobby/.ssh/user_keys': ('bobby', 'bobby', 0o600), ++ '/tmp/home/bobby/.ssh/authorized_keys': ('bobby', 'bobby', 0o600), ++ } ++ ++ homes = self.create_fake_users( ++ [user_bobby], mock_permissions, m_get_group, m_get_owner, ++ m_get_permissions, m_getpwnam, users + ) ++ home = homes[0] + +- (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys( +- fpw.pw_name, sshd_config) +- content = ssh_util.update_authorized_keys(auth_key_entries, []) ++ # /tmp/home/bobby/.ssh/authorized_keys = rsa ++ authorized_keys = self.create_user_authorized_file( ++ home, 'authorized_keys', 'rsa', keys ++ ) + +- self.assertEqual(authorized_keys, auth_key_fn) +- self.assertTrue(VALID_CONTENT['rsa'] in content) +- self.assertTrue(VALID_CONTENT['dsa'] in content) ++ # /tmp/home/bobby/.ssh/user_keys = dsa ++ user_keys = self.create_user_authorized_file( ++ home, 'user_keys', 'dsa', keys ++ ) + +- @patch("cloudinit.ssh_util.pwd.getpwnam") +- def test_multiple_authorizedkeys_file_local_global(self, m_getpwnam): +- fpw = FakePwEnt(pw_name='bobby', pw_dir='/tmp/home2/bobby') +- m_getpwnam.return_value = fpw +- user_ssh_folder = "%s/.ssh" % fpw.pw_dir ++ # /tmp/sshd_config ++ options = "%s %s" % (user_keys, authorized_keys) ++ sshd_config = self.create_sshd_config(options) + +- # /tmp/home2/bobby/.ssh/authorized_keys = rsa +- authorized_keys = self.tmp_path('authorized_keys', dir=user_ssh_folder) +- util.write_file(authorized_keys, VALID_CONTENT['rsa']) ++ self.execute_and_check(user_bobby, sshd_config, user_keys, keys) + +- # /tmp/home2/bobby/.ssh/user_keys = dsa +- user_keys = self.tmp_path('user_keys', dir=user_ssh_folder) +- util.write_file(user_keys, VALID_CONTENT['dsa']) ++ @patch("cloudinit.ssh_util.pwd.getpwnam") ++ @patch("cloudinit.util.get_permissions") ++ @patch("cloudinit.util.get_owner") ++ @patch("cloudinit.util.get_group") ++ def test_single_user_local_global_files( ++ self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam ++ ): ++ user_bobby = 'bobby' ++ keys = {} ++ users = {} ++ mock_permissions = { ++ '/tmp/home/bobby': ('bobby', 'bobby', 0o700), ++ '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700), ++ '/tmp/home/bobby/.ssh/user_keys': ('bobby', 'bobby', 0o600), ++ '/tmp/home/bobby/.ssh/authorized_keys': ('bobby', 'bobby', 0o600), ++ } ++ ++ homes = self.create_fake_users( ++ [user_bobby], mock_permissions, m_get_group, m_get_owner, ++ m_get_permissions, m_getpwnam, users ++ ) ++ home = homes[0] + +- # /tmp/etc/ssh/authorized_keys = ecdsa +- authorized_keys_global = self.tmp_path('etc/ssh/authorized_keys', +- dir="/tmp") +- util.write_file(authorized_keys_global, VALID_CONTENT['ecdsa']) ++ # /tmp/home/bobby/.ssh/authorized_keys = rsa ++ authorized_keys = self.create_user_authorized_file( ++ home, 'authorized_keys', 'rsa', keys ++ ) + +- # /tmp/sshd_config +- sshd_config = self.tmp_path('sshd_config', dir="/tmp") +- util.write_file( +- sshd_config, +- "AuthorizedKeysFile %s %s %s" % (authorized_keys_global, +- user_keys, authorized_keys) ++ # /tmp/home/bobby/.ssh/user_keys = dsa ++ user_keys = self.create_user_authorized_file( ++ home, 'user_keys', 'dsa', keys + ) + +- (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys( +- fpw.pw_name, sshd_config) +- content = ssh_util.update_authorized_keys(auth_key_entries, []) ++ authorized_keys_global = self.create_global_authorized_file( ++ 'etc/ssh/authorized_keys', 'ecdsa', keys ++ ) + +- self.assertEqual(authorized_keys, auth_key_fn) +- self.assertTrue(VALID_CONTENT['rsa'] in content) +- self.assertTrue(VALID_CONTENT['ecdsa'] in content) +- self.assertTrue(VALID_CONTENT['dsa'] in content) ++ options = "%s %s %s" % (authorized_keys_global, user_keys, ++ authorized_keys) ++ sshd_config = self.create_sshd_config(options) + +- @patch("cloudinit.ssh_util.pwd.getpwnam") +- def test_multiple_authorizedkeys_file_local_global2(self, m_getpwnam): +- fpw = FakePwEnt(pw_name='bobby', pw_dir='/tmp/home2/bobby') +- m_getpwnam.return_value = fpw +- user_ssh_folder = "%s/.ssh" % fpw.pw_dir ++ self.execute_and_check(user_bobby, sshd_config, user_keys, keys) + +- # /tmp/home2/bobby/.ssh/authorized_keys2 = rsa +- authorized_keys = self.tmp_path('authorized_keys2', +- dir=user_ssh_folder) +- util.write_file(authorized_keys, VALID_CONTENT['rsa']) ++ @patch("cloudinit.ssh_util.pwd.getpwnam") ++ @patch("cloudinit.util.get_permissions") ++ @patch("cloudinit.util.get_owner") ++ @patch("cloudinit.util.get_group") ++ def test_single_user_local_global_files_inverted( ++ self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam ++ ): ++ user_bobby = 'bobby' ++ keys = {} ++ users = {} ++ mock_permissions = { ++ '/tmp/home/bobby': ('bobby', 'bobby', 0o700), ++ '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700), ++ '/tmp/home/bobby/.ssh/user_keys3': ('bobby', 'bobby', 0o600), ++ '/tmp/home/bobby/.ssh/authorized_keys2': ('bobby', 'bobby', 0o600), ++ } ++ ++ homes = self.create_fake_users( ++ [user_bobby], mock_permissions, m_get_group, m_get_owner, ++ m_get_permissions, m_getpwnam, users ++ ) ++ home = homes[0] + +- # /tmp/home2/bobby/.ssh/user_keys3 = dsa +- user_keys = self.tmp_path('user_keys3', dir=user_ssh_folder) +- util.write_file(user_keys, VALID_CONTENT['dsa']) ++ # /tmp/home/bobby/.ssh/authorized_keys = rsa ++ authorized_keys = self.create_user_authorized_file( ++ home, 'authorized_keys2', 'rsa', keys ++ ) + +- # /tmp/etc/ssh/authorized_keys = ecdsa +- authorized_keys_global = self.tmp_path('etc/ssh/authorized_keys', +- dir="/tmp") +- util.write_file(authorized_keys_global, VALID_CONTENT['ecdsa']) ++ # /tmp/home/bobby/.ssh/user_keys = dsa ++ user_keys = self.create_user_authorized_file( ++ home, 'user_keys3', 'dsa', keys ++ ) + +- # /tmp/sshd_config +- sshd_config = self.tmp_path('sshd_config', dir="/tmp") +- util.write_file( +- sshd_config, +- "AuthorizedKeysFile %s %s %s" % (authorized_keys_global, +- authorized_keys, user_keys) ++ authorized_keys_global = self.create_global_authorized_file( ++ 'etc/ssh/authorized_keys', 'ecdsa', keys + ) + +- (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys( +- fpw.pw_name, sshd_config) +- content = ssh_util.update_authorized_keys(auth_key_entries, []) ++ options = "%s %s %s" % (authorized_keys_global, authorized_keys, ++ user_keys) ++ sshd_config = self.create_sshd_config(options) + +- self.assertEqual(user_keys, auth_key_fn) +- self.assertTrue(VALID_CONTENT['rsa'] in content) +- self.assertTrue(VALID_CONTENT['ecdsa'] in content) +- self.assertTrue(VALID_CONTENT['dsa'] in content) ++ self.execute_and_check(user_bobby, sshd_config, authorized_keys, keys) + + @patch("cloudinit.ssh_util.pwd.getpwnam") +- def test_multiple_authorizedkeys_file_global(self, m_getpwnam): +- fpw = FakePwEnt(pw_name='bobby', pw_dir='/tmp/home2/bobby') +- m_getpwnam.return_value = fpw ++ @patch("cloudinit.util.get_permissions") ++ @patch("cloudinit.util.get_owner") ++ @patch("cloudinit.util.get_group") ++ def test_single_user_global_file( ++ self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam ++ ): ++ user_bobby = 'bobby' ++ keys = {} ++ users = {} ++ mock_permissions = { ++ '/tmp/home/bobby': ('bobby', 'bobby', 0o700), ++ '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700), ++ '/tmp/home/bobby/.ssh/authorized_keys': ('bobby', 'bobby', 0o600), ++ } ++ ++ homes = self.create_fake_users( ++ [user_bobby], mock_permissions, m_get_group, m_get_owner, ++ m_get_permissions, m_getpwnam, users ++ ) ++ home = homes[0] + + # /tmp/etc/ssh/authorized_keys = rsa +- authorized_keys_global = self.tmp_path('etc/ssh/authorized_keys', +- dir="/tmp") +- util.write_file(authorized_keys_global, VALID_CONTENT['rsa']) ++ authorized_keys_global = self.create_global_authorized_file( ++ 'etc/ssh/authorized_keys', 'rsa', keys ++ ) + +- # /tmp/sshd_config +- sshd_config = self.tmp_path('sshd_config') +- util.write_file( +- sshd_config, +- "AuthorizedKeysFile %s" % (authorized_keys_global) ++ options = "%s" % authorized_keys_global ++ sshd_config = self.create_sshd_config(options) ++ ++ default = "%s/.ssh/authorized_keys" % home ++ self.execute_and_check(user_bobby, sshd_config, default, keys) ++ ++ @patch("cloudinit.ssh_util.pwd.getpwnam") ++ @patch("cloudinit.util.get_permissions") ++ @patch("cloudinit.util.get_owner") ++ @patch("cloudinit.util.get_group") ++ def test_two_users_local_file_standard( ++ self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam ++ ): ++ keys = {} ++ users = {} ++ mock_permissions = { ++ '/tmp/home/bobby': ('bobby', 'bobby', 0o700), ++ '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700), ++ '/tmp/home/bobby/.ssh/authorized_keys': ('bobby', 'bobby', 0o600), ++ '/tmp/home/suzie': ('suzie', 'suzie', 0o700), ++ '/tmp/home/suzie/.ssh': ('suzie', 'suzie', 0o700), ++ '/tmp/home/suzie/.ssh/authorized_keys': ('suzie', 'suzie', 0o600), ++ } ++ ++ user_bobby = 'bobby' ++ user_suzie = 'suzie' ++ homes = self.create_fake_users( ++ [user_bobby, user_suzie], mock_permissions, m_get_group, ++ m_get_owner, m_get_permissions, m_getpwnam, users + ) ++ home_bobby = homes[0] ++ home_suzie = homes[1] + +- (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys( +- fpw.pw_name, sshd_config) +- content = ssh_util.update_authorized_keys(auth_key_entries, []) ++ # /tmp/home/bobby/.ssh/authorized_keys = rsa ++ authorized_keys = self.create_user_authorized_file( ++ home_bobby, 'authorized_keys', 'rsa', keys ++ ) + +- self.assertEqual("%s/.ssh/authorized_keys" % fpw.pw_dir, auth_key_fn) +- self.assertTrue(VALID_CONTENT['rsa'] in content) ++ # /tmp/home/suzie/.ssh/authorized_keys = rsa ++ authorized_keys2 = self.create_user_authorized_file( ++ home_suzie, 'authorized_keys', 'ssh-xmss@openssh.com', keys ++ ) ++ ++ options = ".ssh/authorized_keys" ++ sshd_config = self.create_sshd_config(options) ++ ++ self.execute_and_check( ++ user_bobby, sshd_config, authorized_keys, keys, delete_keys=False ++ ) ++ self.execute_and_check(user_suzie, sshd_config, authorized_keys2, keys) + + @patch("cloudinit.ssh_util.pwd.getpwnam") +- def test_multiple_authorizedkeys_file_multiuser(self, m_getpwnam): +- fpw = FakePwEnt(pw_name='bobby', pw_dir='/tmp/home2/bobby') +- m_getpwnam.return_value = fpw +- user_ssh_folder = "%s/.ssh" % fpw.pw_dir +- # /tmp/home2/bobby/.ssh/authorized_keys2 = rsa +- authorized_keys = self.tmp_path('authorized_keys2', +- dir=user_ssh_folder) +- util.write_file(authorized_keys, VALID_CONTENT['rsa']) +- # /tmp/home2/bobby/.ssh/user_keys3 = dsa +- user_keys = self.tmp_path('user_keys3', dir=user_ssh_folder) +- util.write_file(user_keys, VALID_CONTENT['dsa']) +- +- fpw2 = FakePwEnt(pw_name='suzie', pw_dir='/tmp/home/suzie') +- user_ssh_folder = "%s/.ssh" % fpw2.pw_dir +- # /tmp/home/suzie/.ssh/authorized_keys2 = ssh-xmss@openssh.com +- authorized_keys2 = self.tmp_path('authorized_keys2', +- dir=user_ssh_folder) +- util.write_file(authorized_keys2, +- VALID_CONTENT['ssh-xmss@openssh.com']) ++ @patch("cloudinit.util.get_permissions") ++ @patch("cloudinit.util.get_owner") ++ @patch("cloudinit.util.get_group") ++ def test_two_users_local_file_custom( ++ self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam ++ ): ++ keys = {} ++ users = {} ++ mock_permissions = { ++ '/tmp/home/bobby': ('bobby', 'bobby', 0o700), ++ '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700), ++ '/tmp/home/bobby/.ssh/authorized_keys2': ('bobby', 'bobby', 0o600), ++ '/tmp/home/suzie': ('suzie', 'suzie', 0o700), ++ '/tmp/home/suzie/.ssh': ('suzie', 'suzie', 0o700), ++ '/tmp/home/suzie/.ssh/authorized_keys2': ('suzie', 'suzie', 0o600), ++ } ++ ++ user_bobby = 'bobby' ++ user_suzie = 'suzie' ++ homes = self.create_fake_users( ++ [user_bobby, user_suzie], mock_permissions, m_get_group, ++ m_get_owner, m_get_permissions, m_getpwnam, users ++ ) ++ home_bobby = homes[0] ++ home_suzie = homes[1] + +- # /tmp/etc/ssh/authorized_keys = ecdsa +- authorized_keys_global = self.tmp_path('etc/ssh/authorized_keys2', +- dir="/tmp") +- util.write_file(authorized_keys_global, VALID_CONTENT['ecdsa']) ++ # /tmp/home/bobby/.ssh/authorized_keys2 = rsa ++ authorized_keys = self.create_user_authorized_file( ++ home_bobby, 'authorized_keys2', 'rsa', keys ++ ) + +- # /tmp/sshd_config +- sshd_config = self.tmp_path('sshd_config', dir="/tmp") +- util.write_file( +- sshd_config, +- "AuthorizedKeysFile %s %%h/.ssh/authorized_keys2 %s" % +- (authorized_keys_global, user_keys) ++ # /tmp/home/suzie/.ssh/authorized_keys2 = rsa ++ authorized_keys2 = self.create_user_authorized_file( ++ home_suzie, 'authorized_keys2', 'ssh-xmss@openssh.com', keys + ) + +- # process first user +- (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys( +- fpw.pw_name, sshd_config) +- content = ssh_util.update_authorized_keys(auth_key_entries, []) ++ options = ".ssh/authorized_keys2" ++ sshd_config = self.create_sshd_config(options) ++ ++ self.execute_and_check( ++ user_bobby, sshd_config, authorized_keys, keys, delete_keys=False ++ ) ++ self.execute_and_check(user_suzie, sshd_config, authorized_keys2, keys) + +- self.assertEqual(user_keys, auth_key_fn) +- self.assertTrue(VALID_CONTENT['rsa'] in content) +- self.assertTrue(VALID_CONTENT['ecdsa'] in content) +- self.assertTrue(VALID_CONTENT['dsa'] in content) +- self.assertFalse(VALID_CONTENT['ssh-xmss@openssh.com'] in content) ++ @patch("cloudinit.ssh_util.pwd.getpwnam") ++ @patch("cloudinit.util.get_permissions") ++ @patch("cloudinit.util.get_owner") ++ @patch("cloudinit.util.get_group") ++ def test_two_users_local_global_files( ++ self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam ++ ): ++ keys = {} ++ users = {} ++ mock_permissions = { ++ '/tmp/home/bobby': ('bobby', 'bobby', 0o700), ++ '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700), ++ '/tmp/home/bobby/.ssh/authorized_keys2': ('bobby', 'bobby', 0o600), ++ '/tmp/home/bobby/.ssh/user_keys3': ('bobby', 'bobby', 0o600), ++ '/tmp/home/suzie': ('suzie', 'suzie', 0o700), ++ '/tmp/home/suzie/.ssh': ('suzie', 'suzie', 0o700), ++ '/tmp/home/suzie/.ssh/authorized_keys2': ('suzie', 'suzie', 0o600), ++ '/tmp/home/suzie/.ssh/user_keys3': ('suzie', 'suzie', 0o600), ++ } ++ ++ user_bobby = 'bobby' ++ user_suzie = 'suzie' ++ homes = self.create_fake_users( ++ [user_bobby, user_suzie], mock_permissions, m_get_group, ++ m_get_owner, m_get_permissions, m_getpwnam, users ++ ) ++ home_bobby = homes[0] ++ home_suzie = homes[1] + +- m_getpwnam.return_value = fpw2 +- # process second user +- (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys( +- fpw2.pw_name, sshd_config) +- content = ssh_util.update_authorized_keys(auth_key_entries, []) ++ # /tmp/home/bobby/.ssh/authorized_keys2 = rsa ++ self.create_user_authorized_file( ++ home_bobby, 'authorized_keys2', 'rsa', keys ++ ) ++ # /tmp/home/bobby/.ssh/user_keys3 = dsa ++ user_keys = self.create_user_authorized_file( ++ home_bobby, 'user_keys3', 'dsa', keys ++ ) ++ ++ # /tmp/home/suzie/.ssh/authorized_keys2 = rsa ++ authorized_keys2 = self.create_user_authorized_file( ++ home_suzie, 'authorized_keys2', 'ssh-xmss@openssh.com', keys ++ ) ++ ++ # /tmp/etc/ssh/authorized_keys = ecdsa ++ authorized_keys_global = self.create_global_authorized_file( ++ 'etc/ssh/authorized_keys2', 'ecdsa', keys ++ ) ++ ++ options = "%s %s %%h/.ssh/authorized_keys2" % \ ++ (authorized_keys_global, user_keys) ++ sshd_config = self.create_sshd_config(options) + +- self.assertEqual(authorized_keys2, auth_key_fn) +- self.assertTrue(VALID_CONTENT['ssh-xmss@openssh.com'] in content) +- self.assertTrue(VALID_CONTENT['ecdsa'] in content) +- self.assertTrue(VALID_CONTENT['dsa'] in content) +- self.assertFalse(VALID_CONTENT['rsa'] in content) ++ self.execute_and_check( ++ user_bobby, sshd_config, user_keys, keys, delete_keys=False ++ ) ++ self.execute_and_check(user_suzie, sshd_config, authorized_keys2, keys) + ++ @patch("cloudinit.util.get_user_groups") + @patch("cloudinit.ssh_util.pwd.getpwnam") +- def test_multiple_authorizedkeys_file_multiuser2(self, m_getpwnam): +- fpw = FakePwEnt(pw_name='bobby', pw_dir='/tmp/home/bobby') +- m_getpwnam.return_value = fpw +- user_ssh_folder = "%s/.ssh" % fpw.pw_dir ++ @patch("cloudinit.util.get_permissions") ++ @patch("cloudinit.util.get_owner") ++ @patch("cloudinit.util.get_group") ++ def test_two_users_local_global_files_badguy( ++ self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, ++ m_get_user_groups ++ ): ++ keys = {} ++ users = {} ++ mock_permissions = { ++ '/tmp/home/bobby': ('bobby', 'bobby', 0o700), ++ '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700), ++ '/tmp/home/bobby/.ssh/authorized_keys2': ('bobby', 'bobby', 0o600), ++ '/tmp/home/bobby/.ssh/user_keys3': ('bobby', 'bobby', 0o600), ++ '/tmp/home/badguy': ('root', 'root', 0o755), ++ '/tmp/home/badguy/home': ('root', 'root', 0o755), ++ '/tmp/home/badguy/home/bobby': ('root', 'root', 0o655), ++ } ++ ++ user_bobby = 'bobby' ++ user_badguy = 'badguy' ++ home_bobby, *_ = self.create_fake_users( ++ [user_bobby, user_badguy], mock_permissions, m_get_group, ++ m_get_owner, m_get_permissions, m_getpwnam, users ++ ) ++ m_get_user_groups.side_effect = mock_get_user_groups ++ + # /tmp/home/bobby/.ssh/authorized_keys2 = rsa +- authorized_keys = self.tmp_path('authorized_keys2', +- dir=user_ssh_folder) +- util.write_file(authorized_keys, VALID_CONTENT['rsa']) ++ authorized_keys = self.create_user_authorized_file( ++ home_bobby, 'authorized_keys2', 'rsa', keys ++ ) + # /tmp/home/bobby/.ssh/user_keys3 = dsa +- user_keys = self.tmp_path('user_keys3', dir=user_ssh_folder) +- util.write_file(user_keys, VALID_CONTENT['dsa']) ++ user_keys = self.create_user_authorized_file( ++ home_bobby, 'user_keys3', 'dsa', keys ++ ) + +- fpw2 = FakePwEnt(pw_name='badguy', pw_dir='/tmp/home/badguy') +- user_ssh_folder = "%s/.ssh" % fpw2.pw_dir + # /tmp/home/badguy/home/bobby = "" + authorized_keys2 = self.tmp_path('home/bobby', dir="/tmp/home/badguy") ++ util.write_file(authorized_keys2, '') + + # /tmp/etc/ssh/authorized_keys = ecdsa +- authorized_keys_global = self.tmp_path('etc/ssh/authorized_keys2', +- dir="/tmp") +- util.write_file(authorized_keys_global, VALID_CONTENT['ecdsa']) ++ authorized_keys_global = self.create_global_authorized_file( ++ 'etc/ssh/authorized_keys2', 'ecdsa', keys ++ ) + + # /tmp/sshd_config +- sshd_config = self.tmp_path('sshd_config', dir="/tmp") +- util.write_file( +- sshd_config, +- "AuthorizedKeysFile %s %%h/.ssh/authorized_keys2 %s %s" % +- (authorized_keys_global, user_keys, authorized_keys2) ++ options = "%s %%h/.ssh/authorized_keys2 %s %s" % \ ++ (authorized_keys2, authorized_keys_global, user_keys) ++ sshd_config = self.create_sshd_config(options) ++ ++ self.execute_and_check( ++ user_bobby, sshd_config, authorized_keys, keys, delete_keys=False ++ ) ++ self.execute_and_check( ++ user_badguy, sshd_config, authorized_keys2, keys + ) + +- # process first user +- (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys( +- fpw.pw_name, sshd_config) +- content = ssh_util.update_authorized_keys(auth_key_entries, []) ++ @patch("cloudinit.util.get_user_groups") ++ @patch("cloudinit.ssh_util.pwd.getpwnam") ++ @patch("cloudinit.util.get_permissions") ++ @patch("cloudinit.util.get_owner") ++ @patch("cloudinit.util.get_group") ++ def test_two_users_unaccessible_file( ++ self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, ++ m_get_user_groups ++ ): ++ keys = {} ++ users = {} ++ mock_permissions = { ++ '/tmp/home/bobby': ('bobby', 'bobby', 0o700), ++ '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700), ++ '/tmp/home/bobby/.ssh/authorized_keys': ('bobby', 'bobby', 0o600), ++ ++ '/tmp/etc': ('root', 'root', 0o755), ++ '/tmp/etc/ssh': ('root', 'root', 0o755), ++ '/tmp/etc/ssh/userkeys': ('root', 'root', 0o700), ++ '/tmp/etc/ssh/userkeys/bobby': ('bobby', 'bobby', 0o600), ++ '/tmp/etc/ssh/userkeys/badguy': ('badguy', 'badguy', 0o600), ++ ++ '/tmp/home/badguy': ('badguy', 'badguy', 0o700), ++ '/tmp/home/badguy/.ssh': ('badguy', 'badguy', 0o700), ++ '/tmp/home/badguy/.ssh/authorized_keys': ++ ('badguy', 'badguy', 0o600), ++ } ++ ++ user_bobby = 'bobby' ++ user_badguy = 'badguy' ++ homes = self.create_fake_users( ++ [user_bobby, user_badguy], mock_permissions, m_get_group, ++ m_get_owner, m_get_permissions, m_getpwnam, users ++ ) ++ m_get_user_groups.side_effect = mock_get_user_groups ++ home_bobby = homes[0] ++ home_badguy = homes[1] + +- self.assertEqual(user_keys, auth_key_fn) +- self.assertTrue(VALID_CONTENT['rsa'] in content) +- self.assertTrue(VALID_CONTENT['ecdsa'] in content) +- self.assertTrue(VALID_CONTENT['dsa'] in content) ++ # /tmp/home/bobby/.ssh/authorized_keys = rsa ++ authorized_keys = self.create_user_authorized_file( ++ home_bobby, 'authorized_keys', 'rsa', keys ++ ) ++ # /tmp/etc/ssh/userkeys/bobby = dsa ++ # assume here that we can bypass userkeys, despite permissions ++ self.create_global_authorized_file( ++ 'etc/ssh/userkeys/bobby', 'dsa', keys ++ ) + +- m_getpwnam.return_value = fpw2 +- # process second user +- (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys( +- fpw2.pw_name, sshd_config) +- content = ssh_util.update_authorized_keys(auth_key_entries, []) ++ # /tmp/home/badguy/.ssh/authorized_keys = ssh-xmss@openssh.com ++ authorized_keys2 = self.create_user_authorized_file( ++ home_badguy, 'authorized_keys', 'ssh-xmss@openssh.com', keys ++ ) + +- # badguy should not take the key from the other user! +- self.assertEqual(authorized_keys2, auth_key_fn) +- self.assertTrue(VALID_CONTENT['ecdsa'] in content) +- self.assertTrue(VALID_CONTENT['dsa'] in content) +- self.assertFalse(VALID_CONTENT['rsa'] in content) ++ # /tmp/etc/ssh/userkeys/badguy = ecdsa ++ self.create_global_authorized_file( ++ 'etc/ssh/userkeys/badguy', 'ecdsa', keys ++ ) ++ ++ # /tmp/sshd_config ++ options = "/tmp/etc/ssh/userkeys/%u .ssh/authorized_keys" ++ sshd_config = self.create_sshd_config(options) ++ ++ self.execute_and_check( ++ user_bobby, sshd_config, authorized_keys, keys, delete_keys=False ++ ) ++ self.execute_and_check( ++ user_badguy, sshd_config, authorized_keys2, keys ++ ) ++ ++ @patch("cloudinit.util.get_user_groups") ++ @patch("cloudinit.ssh_util.pwd.getpwnam") ++ @patch("cloudinit.util.get_permissions") ++ @patch("cloudinit.util.get_owner") ++ @patch("cloudinit.util.get_group") ++ def test_two_users_accessible_file( ++ self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, ++ m_get_user_groups ++ ): ++ keys = {} ++ users = {} ++ mock_permissions = { ++ '/tmp/home/bobby': ('bobby', 'bobby', 0o700), ++ '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700), ++ '/tmp/home/bobby/.ssh/authorized_keys': ('bobby', 'bobby', 0o600), ++ ++ '/tmp/etc': ('root', 'root', 0o755), ++ '/tmp/etc/ssh': ('root', 'root', 0o755), ++ '/tmp/etc/ssh/userkeys': ('root', 'root', 0o755), ++ '/tmp/etc/ssh/userkeys/bobby': ('bobby', 'bobby', 0o600), ++ '/tmp/etc/ssh/userkeys/badguy': ('badguy', 'badguy', 0o600), ++ ++ '/tmp/home/badguy': ('badguy', 'badguy', 0o700), ++ '/tmp/home/badguy/.ssh': ('badguy', 'badguy', 0o700), ++ '/tmp/home/badguy/.ssh/authorized_keys': ++ ('badguy', 'badguy', 0o600), ++ } ++ ++ user_bobby = 'bobby' ++ user_badguy = 'badguy' ++ homes = self.create_fake_users( ++ [user_bobby, user_badguy], mock_permissions, m_get_group, ++ m_get_owner, m_get_permissions, m_getpwnam, users ++ ) ++ m_get_user_groups.side_effect = mock_get_user_groups ++ home_bobby = homes[0] ++ home_badguy = homes[1] ++ ++ # /tmp/home/bobby/.ssh/authorized_keys = rsa ++ self.create_user_authorized_file( ++ home_bobby, 'authorized_keys', 'rsa', keys ++ ) ++ # /tmp/etc/ssh/userkeys/bobby = dsa ++ # assume here that we can bypass userkeys, despite permissions ++ authorized_keys = self.create_global_authorized_file( ++ 'etc/ssh/userkeys/bobby', 'dsa', keys ++ ) ++ ++ # /tmp/home/badguy/.ssh/authorized_keys = ssh-xmss@openssh.com ++ self.create_user_authorized_file( ++ home_badguy, 'authorized_keys', 'ssh-xmss@openssh.com', keys ++ ) ++ ++ # /tmp/etc/ssh/userkeys/badguy = ecdsa ++ authorized_keys2 = self.create_global_authorized_file( ++ 'etc/ssh/userkeys/badguy', 'ecdsa', keys ++ ) ++ ++ # /tmp/sshd_config ++ options = "/tmp/etc/ssh/userkeys/%u .ssh/authorized_keys" ++ sshd_config = self.create_sshd_config(options) ++ ++ self.execute_and_check( ++ user_bobby, sshd_config, authorized_keys, keys, delete_keys=False ++ ) ++ self.execute_and_check( ++ user_badguy, sshd_config, authorized_keys2, keys ++ ) ++ ++ @patch("cloudinit.util.get_user_groups") ++ @patch("cloudinit.ssh_util.pwd.getpwnam") ++ @patch("cloudinit.util.get_permissions") ++ @patch("cloudinit.util.get_owner") ++ @patch("cloudinit.util.get_group") ++ def test_two_users_hardcoded_single_user_file( ++ self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, ++ m_get_user_groups ++ ): ++ keys = {} ++ users = {} ++ mock_permissions = { ++ '/tmp/home/bobby': ('bobby', 'bobby', 0o700), ++ '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700), ++ '/tmp/home/bobby/.ssh/authorized_keys': ('bobby', 'bobby', 0o600), ++ ++ '/tmp/home/suzie': ('suzie', 'suzie', 0o700), ++ '/tmp/home/suzie/.ssh': ('suzie', 'suzie', 0o700), ++ '/tmp/home/suzie/.ssh/authorized_keys': ('suzie', 'suzie', 0o600), ++ } ++ ++ user_bobby = 'bobby' ++ user_suzie = 'suzie' ++ homes = self.create_fake_users( ++ [user_bobby, user_suzie], mock_permissions, m_get_group, ++ m_get_owner, m_get_permissions, m_getpwnam, users ++ ) ++ home_bobby = homes[0] ++ home_suzie = homes[1] ++ m_get_user_groups.side_effect = mock_get_user_groups ++ ++ # /tmp/home/bobby/.ssh/authorized_keys = rsa ++ authorized_keys = self.create_user_authorized_file( ++ home_bobby, 'authorized_keys', 'rsa', keys ++ ) ++ ++ # /tmp/home/suzie/.ssh/authorized_keys = ssh-xmss@openssh.com ++ self.create_user_authorized_file( ++ home_suzie, 'authorized_keys', 'ssh-xmss@openssh.com', keys ++ ) ++ ++ # /tmp/sshd_config ++ options = "%s" % (authorized_keys) ++ sshd_config = self.create_sshd_config(options) ++ ++ self.execute_and_check( ++ user_bobby, sshd_config, authorized_keys, keys, delete_keys=False ++ ) ++ default = "%s/.ssh/authorized_keys" % home_suzie ++ self.execute_and_check(user_suzie, sshd_config, default, keys) ++ ++ @patch("cloudinit.util.get_user_groups") ++ @patch("cloudinit.ssh_util.pwd.getpwnam") ++ @patch("cloudinit.util.get_permissions") ++ @patch("cloudinit.util.get_owner") ++ @patch("cloudinit.util.get_group") ++ def test_two_users_hardcoded_single_user_file_inverted( ++ self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, ++ m_get_user_groups ++ ): ++ keys = {} ++ users = {} ++ mock_permissions = { ++ '/tmp/home/bobby': ('bobby', 'bobby', 0o700), ++ '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700), ++ '/tmp/home/bobby/.ssh/authorized_keys': ('bobby', 'bobby', 0o600), ++ ++ '/tmp/home/suzie': ('suzie', 'suzie', 0o700), ++ '/tmp/home/suzie/.ssh': ('suzie', 'suzie', 0o700), ++ '/tmp/home/suzie/.ssh/authorized_keys': ('suzie', 'suzie', 0o600), ++ } ++ ++ user_bobby = 'bobby' ++ user_suzie = 'suzie' ++ homes = self.create_fake_users( ++ [user_bobby, user_suzie], mock_permissions, m_get_group, ++ m_get_owner, m_get_permissions, m_getpwnam, users ++ ) ++ home_bobby = homes[0] ++ home_suzie = homes[1] ++ m_get_user_groups.side_effect = mock_get_user_groups ++ ++ # /tmp/home/bobby/.ssh/authorized_keys = rsa ++ self.create_user_authorized_file( ++ home_bobby, 'authorized_keys', 'rsa', keys ++ ) ++ ++ # /tmp/home/suzie/.ssh/authorized_keys = ssh-xmss@openssh.com ++ authorized_keys2 = self.create_user_authorized_file( ++ home_suzie, 'authorized_keys', 'ssh-xmss@openssh.com', keys ++ ) ++ ++ # /tmp/sshd_config ++ options = "%s" % (authorized_keys2) ++ sshd_config = self.create_sshd_config(options) ++ ++ default = "%s/.ssh/authorized_keys" % home_bobby ++ self.execute_and_check( ++ user_bobby, sshd_config, default, keys, delete_keys=False ++ ) ++ self.execute_and_check(user_suzie, sshd_config, authorized_keys2, keys) ++ ++ @patch("cloudinit.util.get_user_groups") ++ @patch("cloudinit.ssh_util.pwd.getpwnam") ++ @patch("cloudinit.util.get_permissions") ++ @patch("cloudinit.util.get_owner") ++ @patch("cloudinit.util.get_group") ++ def test_two_users_hardcoded_user_files( ++ self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, ++ m_get_user_groups ++ ): ++ keys = {} ++ users = {} ++ mock_permissions = { ++ '/tmp/home/bobby': ('bobby', 'bobby', 0o700), ++ '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700), ++ '/tmp/home/bobby/.ssh/authorized_keys': ('bobby', 'bobby', 0o600), ++ ++ '/tmp/home/suzie': ('suzie', 'suzie', 0o700), ++ '/tmp/home/suzie/.ssh': ('suzie', 'suzie', 0o700), ++ '/tmp/home/suzie/.ssh/authorized_keys': ('suzie', 'suzie', 0o600), ++ } ++ ++ user_bobby = 'bobby' ++ user_suzie = 'suzie' ++ homes = self.create_fake_users( ++ [user_bobby, user_suzie], mock_permissions, m_get_group, ++ m_get_owner, m_get_permissions, m_getpwnam, users ++ ) ++ home_bobby = homes[0] ++ home_suzie = homes[1] ++ m_get_user_groups.side_effect = mock_get_user_groups ++ ++ # /tmp/home/bobby/.ssh/authorized_keys = rsa ++ authorized_keys = self.create_user_authorized_file( ++ home_bobby, 'authorized_keys', 'rsa', keys ++ ) ++ ++ # /tmp/home/suzie/.ssh/authorized_keys = ssh-xmss@openssh.com ++ authorized_keys2 = self.create_user_authorized_file( ++ home_suzie, 'authorized_keys', 'ssh-xmss@openssh.com', keys ++ ) ++ ++ # /tmp/etc/ssh/authorized_keys = ecdsa ++ authorized_keys_global = self.create_global_authorized_file( ++ 'etc/ssh/authorized_keys', 'ecdsa', keys ++ ) ++ ++ # /tmp/sshd_config ++ options = "%s %s %s" % \ ++ (authorized_keys_global, authorized_keys, authorized_keys2) ++ sshd_config = self.create_sshd_config(options) ++ ++ self.execute_and_check( ++ user_bobby, sshd_config, authorized_keys, keys, delete_keys=False ++ ) ++ self.execute_and_check(user_suzie, sshd_config, authorized_keys2, keys) + + # vi: ts=4 expandtab +-- +2.27.0 + diff --git a/cloud-init.spec b/cloud-init.spec index bd9b16c..fd76b0e 100644 --- a/cloud-init.spec +++ b/cloud-init.spec @@ -1,6 +1,6 @@ Name: cloud-init Version: 21.1 -Release: 6%{?dist} +Release: 7%{?dist} Summary: Cloud instance init scripts License: ASL 2.0 or GPLv3 URL: http://launchpad.net/cloud-init @@ -18,6 +18,8 @@ Patch5: ci-Fix-requiring-device-number-on-EC2-derivatives-836.patch Patch6: ci-write-passwords-only-to-serial-console-lock-down-clo.patch # For bz#1979099 - [cloud-init]Customize ssh AuthorizedKeysFile causes login failure[RHEL-9.0] Patch7: ci-ssh-util-allow-cloudinit-to-merge-all-ssh-keys-into-.patch +# For bz#1979099 - [cloud-init]Customize ssh AuthorizedKeysFile causes login failure[RHEL-9.0] +Patch8: ci-Stop-copying-ssh-system-keys-and-check-folder-permis.patch # Source-git patches @@ -96,6 +98,8 @@ python3 tools/render-cloudcfg --variant fedora > $RPM_BUILD_ROOT/%{_sysconfdir}/ cp -p rhel/cloud.cfg $RPM_BUILD_ROOT/%{_sysconfdir}/cloud/cloud.cfg %endif +sed -i "s,@@PACKAGED_VERSION@@,%{version}-%{release}," $RPM_BUILD_ROOT/%{python3_sitelib}/cloudinit/version.py + mkdir -p $RPM_BUILD_ROOT/var/lib/cloud # /run/cloud-init needs a tmpfiles.d entry @@ -213,6 +217,14 @@ fi %config(noreplace) %{_sysconfdir}/rsyslog.d/21-cloudinit.conf %changelog +* Mon Aug 16 2021 Miroslav Rezanina - 21.1-7 +- ci-Stop-copying-ssh-system-keys-and-check-folder-permis.patch [bz#1979099] +- ci-Report-full-specific-version-with-cloud-init-version.patch [bz#1971002] +- Resolves: bz#1979099 + ([cloud-init]Customize ssh AuthorizedKeysFile causes login failure[RHEL-9.0]) +- Resolves: bz#1971002 + (cloud-init should report full specific full version with "cloud-init --version" [rhel-9]) + * Mon Aug 09 2021 Mohan Boddu - 21.1-6 - Rebuilt for IMA sigs, glibc 2.34, aarch64 flags Related: rhbz#1991688