krb5/SOURCES/Add-tests-for-KCM-ccache-ty...

296 lines
8.6 KiB
Diff

From dd863eb4311310c23ee12961fa8e91703f29a6f5 Mon Sep 17 00:00:00 2001
From: Greg Hudson <ghudson@mit.edu>
Date: Thu, 22 Nov 2018 00:27:35 -0500
Subject: [PATCH] Add tests for KCM ccache type
Using a trivial Python implementation of a KCM server, run the
t_ccache.py tests against the KCM ccache type.
(cherry picked from commit f0bcb86131e385b2603ccf0f3c7d65aa3891b220)
(cherry picked from commit 5ecbe8d3ab4f53c0923a0442273bf18a9ff04fd5)
---
src/tests/kcmserver.py | 246 +++++++++++++++++++++++++++++++++++++++++
src/tests/t_ccache.py | 9 +-
2 files changed, 254 insertions(+), 1 deletion(-)
create mode 100644 src/tests/kcmserver.py
diff --git a/src/tests/kcmserver.py b/src/tests/kcmserver.py
new file mode 100644
index 000000000..57432e5a7
--- /dev/null
+++ b/src/tests/kcmserver.py
@@ -0,0 +1,246 @@
+# This is a simple KCM test server, used to exercise the KCM ccache
+# client code. It will generally throw an uncaught exception if the
+# client sends anything unexpected, so is unsuitable for production.
+# (It also imposes no namespace or access constraints, and blocks
+# while reading requests and writing responses.)
+
+# This code knows nothing about how to marshal and unmarshal principal
+# names and credentials as is required in the KCM protocol; instead,
+# it just remembers the marshalled forms and replays them to the
+# client when asked. This works because marshalled creds and
+# principal names are always the last part of marshalled request
+# arguments, and because we don't need to implement remove_cred (which
+# would need to know how to match a cred tag against previously stored
+# credentials).
+
+# The following code is useful for debugging if anything appears to be
+# going wrong in the server, since daemon output is generally not
+# visible in Python test scripts.
+#
+# import sys, traceback
+# def ehook(etype, value, tb):
+# with open('/tmp/exception', 'w') as f:
+# traceback.print_exception(etype, value, tb, file=f)
+# sys.excepthook = ehook
+
+import select
+import socket
+import struct
+import sys
+
+caches = {}
+cache_uuidmap = {}
+defname = b'default'
+next_unique = 1
+next_uuid = 1
+
+class KCMOpcodes(object):
+ GEN_NEW = 3
+ INITIALIZE = 4
+ DESTROY = 5
+ STORE = 6
+ GET_PRINCIPAL = 8
+ GET_CRED_UUID_LIST = 9
+ GET_CRED_BY_UUID = 10
+ REMOVE_CRED = 11
+ GET_CACHE_UUID_LIST = 18
+ GET_CACHE_BY_UUID = 19
+ GET_DEFAULT_CACHE = 20
+ SET_DEFAULT_CACHE = 21
+ GET_KDC_OFFSET = 22
+ SET_KDC_OFFSET = 23
+
+
+class KRB5Errors(object):
+ KRB5_CC_END = -1765328242
+ KRB5_CC_NOSUPP = -1765328137
+ KRB5_FCC_NOFILE = -1765328189
+
+
+def make_uuid():
+ global next_uuid
+ uuid = bytes(12) + struct.pack('>L', next_uuid)
+ next_uuid = next_uuid + 1
+ return uuid
+
+
+class Cache(object):
+ def __init__(self, name):
+ self.name = name
+ self.princ = None
+ self.uuid = make_uuid()
+ self.cred_uuids = []
+ self.creds = {}
+ self.time_offset = 0
+
+
+def get_cache(name):
+ if name in caches:
+ return caches[name]
+ cache = Cache(name)
+ caches[name] = cache
+ cache_uuidmap[cache.uuid] = cache
+ return cache
+
+
+def unmarshal_name(argbytes):
+ offset = argbytes.find(b'\0')
+ return argbytes[0:offset], argbytes[offset+1:]
+
+
+def op_gen_new(argbytes):
+ # Does not actually check for uniqueness.
+ global next_unique
+ name = b'unique' + str(next_unique).encode('ascii')
+ next_unique += 1
+ return 0, name + b'\0'
+
+
+def op_initialize(argbytes):
+ name, princ = unmarshal_name(argbytes)
+ cache = get_cache(name)
+ cache.princ = princ
+ cache.cred_uuids = []
+ cache.creds = {}
+ cache.time_offset = 0
+ return 0, b''
+
+
+def op_destroy(argbytes):
+ name, rest = unmarshal_name(argbytes)
+ cache = get_cache(name)
+ del cache_uuidmap[cache.uuid]
+ del caches[name]
+ return 0, b''
+
+
+def op_store(argbytes):
+ name, cred = unmarshal_name(argbytes)
+ cache = get_cache(name)
+ uuid = make_uuid()
+ cache.creds[uuid] = cred
+ cache.cred_uuids.append(uuid)
+ return 0, b''
+
+
+def op_get_principal(argbytes):
+ name, rest = unmarshal_name(argbytes)
+ cache = get_cache(name)
+ if cache.princ is None:
+ return KRB5Errors.KRB5_FCC_NOFILE, b''
+ return 0, cache.princ + b'\0'
+
+
+def op_get_cred_uuid_list(argbytes):
+ name, rest = unmarshal_name(argbytes)
+ cache = get_cache(name)
+ return 0, b''.join(cache.cred_uuids)
+
+
+def op_get_cred_by_uuid(argbytes):
+ name, uuid = unmarshal_name(argbytes)
+ cache = get_cache(name)
+ if uuid not in cache.creds:
+ return KRB5Errors.KRB5_CC_END, b''
+ return 0, cache.creds[uuid]
+
+
+def op_remove_cred(argbytes):
+ return KRB5Errors.KRB5_CC_NOSUPP, b''
+
+
+def op_get_cache_uuid_list(argbytes):
+ return 0, b''.join(cache_uuidmap.keys())
+
+
+def op_get_cache_by_uuid(argbytes):
+ uuid = argbytes
+ if uuid not in cache_uuidmap:
+ return KRB5Errors.KRB5_CC_END, b''
+ return 0, cache_uuidmap[uuid].name + b'\0'
+
+
+def op_get_default_cache(argbytes):
+ return 0, defname + b'\0'
+
+
+def op_set_default_cache(argbytes):
+ global defname
+ defname, rest = unmarshal_name(argbytes)
+ return 0, b''
+
+
+def op_get_kdc_offset(argbytes):
+ name, rest = unmarshal_name(argbytes)
+ cache = get_cache(name)
+ return 0, struct.pack('>l', cache.time_offset)
+
+
+def op_set_kdc_offset(argbytes):
+ name, obytes = unmarshal_name(argbytes)
+ cache = get_cache(name)
+ cache.time_offset, = struct.unpack('>l', obytes)
+ return 0, b''
+
+
+ophandlers = {
+ KCMOpcodes.GEN_NEW : op_gen_new,
+ KCMOpcodes.INITIALIZE : op_initialize,
+ KCMOpcodes.DESTROY : op_destroy,
+ KCMOpcodes.STORE : op_store,
+ KCMOpcodes.GET_PRINCIPAL : op_get_principal,
+ KCMOpcodes.GET_CRED_UUID_LIST : op_get_cred_uuid_list,
+ KCMOpcodes.GET_CRED_BY_UUID : op_get_cred_by_uuid,
+ KCMOpcodes.REMOVE_CRED : op_remove_cred,
+ KCMOpcodes.GET_CACHE_UUID_LIST : op_get_cache_uuid_list,
+ KCMOpcodes.GET_CACHE_BY_UUID : op_get_cache_by_uuid,
+ KCMOpcodes.GET_DEFAULT_CACHE : op_get_default_cache,
+ KCMOpcodes.SET_DEFAULT_CACHE : op_set_default_cache,
+ KCMOpcodes.GET_KDC_OFFSET : op_get_kdc_offset,
+ KCMOpcodes.SET_KDC_OFFSET : op_set_kdc_offset
+}
+
+# Read and respond to a request from the socket s.
+def service_request(s):
+ lenbytes = b''
+ while len(lenbytes) < 4:
+ lenbytes += s.recv(4 - len(lenbytes))
+ if lenbytes == b'':
+ return False
+
+ reqlen, = struct.unpack('>L', lenbytes)
+ req = b''
+ while len(req) < reqlen:
+ req += s.recv(reqlen - len(req))
+
+ majver, minver, op = struct.unpack('>BBH', req[:4])
+ argbytes = req[4:]
+ code, payload = ophandlers[op](argbytes)
+
+ # The KCM response is the code (4 bytes) and the response payload.
+ # The Heimdal IPC response is the length of the KCM response (4
+ # bytes), a status code which is essentially always 0 (4 bytes),
+ # and the KCM response.
+ kcm_response = struct.pack('>l', code) + payload
+ hipc_response = struct.pack('>LL', len(kcm_response), 0) + kcm_response
+ s.sendall(hipc_response)
+ return True
+
+
+server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+server.bind(sys.argv[1])
+server.listen(5)
+select_input = [server,]
+sys.stderr.write('starting...\n')
+sys.stderr.flush()
+
+while True:
+ iready, oready, xready = select.select(select_input, [], [])
+ for s in iready:
+ if s == server:
+ client, addr = server.accept()
+ select_input.append(client)
+ else:
+ if not service_request(s):
+ select_input.remove(s)
+ s.close()
diff --git a/src/tests/t_ccache.py b/src/tests/t_ccache.py
index fcf1a611e..66804afa5 100755
--- a/src/tests/t_ccache.py
+++ b/src/tests/t_ccache.py
@@ -22,7 +22,10 @@
from k5test import *
-realm = K5Realm(create_host=False)
+kcm_socket_path = os.path.join(os.getcwd(), 'testdir', 'kcm')
+conf = {'libdefaults': {'kcm_socket': kcm_socket_path,
+ 'kcm_mach_service': '-'}}
+realm = K5Realm(create_host=False, krb5_conf=conf)
keyctl = which('keyctl')
out = realm.run([klist, '-c', 'KEYRING:process:abcd'], expected_code=1)
@@ -122,6 +125,10 @@ def collection_test(realm, ccname):
collection_test(realm, 'DIR:' + os.path.join(realm.testdir, 'cc'))
+kcmserver_path = os.path.join(srctop, 'tests', 'kcmserver.py')
+realm.start_server([sys.executable, kcmserver_path, kcm_socket_path],
+ 'starting...')
+collection_test(realm, 'KCM:')
if test_keyring:
def cleanup_keyring(anchor, name):
out = realm.run(['keyctl', 'list', anchor])