python-kdcproxy/SOURCES/0006-Use-DNS-discovery-for-declared-realms-only.patch

1408 lines
55 KiB
Diff

From e85152a313fd848a9e1cb51f52a6783d048e25ff Mon Sep 17 00:00:00 2001
From: Julien Rische <jrische@redhat.com>
Date: Fri, 3 Oct 2025 17:40:25 +0200
Subject: [PATCH] Use DNS discovery for declared realms only
Allowing the use of DNS discovery for any requested realm (i.e. querying
SRV records from the DNS zone matching the realm name) created a
server-side request forgery vulnerability (CVE-2025-59088). An attacker
could take advantage of a DNS zone they control to have kdcproxy direct
their request to any IP addresses (including loopback and internal
network) and port, allowing network and firewall rules probing, and data
exfiltration.
This commit mitigates this risk by making the global "use_dns" parameter
apply only to realms declared in the kdcproxy configuration, and other
configurations if their modules are enabled.
To accommodate cases where realm hierarchies (like AD forests) are
proxied, support for wildcards is added for realm section names. This
can be used to have any "sub-realm" considered known, and therefore
allowed to use DNS to discover their KDCs.
The new "dns_realm_discovery" parameter can be enabled (if "use_dns" is
not globally disabled) to allow use of DNS discovery for unknown realms
too, restoring the previous unsafe behavior.
For any KDC address obtained by DNS discovery, a warning is logged if
the port is not a standard Kerberos port. This warning can be silenced
using the "silence_port_warn" configuration parameter.
Signed-off-by: Julien Rische <jrische@redhat.com>
(cherry picked from commit 0254c168919de84867d31a87698287900d560e9f)
---
README | 80 ++--
kdcproxy/config/__init__.py | 252 ++++++++---
kdcproxy/config/mit.py | 23 +-
tests.py | 843 +++++++++++++++++++++++++++++++++++-
4 files changed, 1102 insertions(+), 96 deletions(-)
diff --git a/README b/README
index 4b756d2..0d57da8 100644
--- a/README
+++ b/README
@@ -43,24 +43,43 @@ UDP, both will be attempted. TCP will be attempted before UDP. This permits the
use of longer timeouts and prevents possible lockouts when the KDC packets
contain OTP token codes (which should preferably be sent to only one server).
-Automatic Configuration
------------------------
-By default, no configuration is necessary. In this case, kdcproxy will use
-REALM DNS SRV record lookups to determine remote KDC locations.
-
-Master Configuration File
+Main Configuration File
-------------------------
-If you wish to have more detailed configuration, the first place you can
-configure kdcproxy is the master configuration file. This file exists at the
-location specified in the environment variable KDCPROXY_CONFIG. If this
-variable is unspecified, the default location is /etc/kdcproxy.conf. This
-configuration file takes precedence over all other configuration modules. This
-file is an ini-style configuration with a special section **[global]**. Two
-parameters are available in this section: **configs** and **use_dns**.
-
-The **use_dns** allows you to enable or disable use of DNS SRV record lookups.
-
-The **configs** parameter allows you to load other configuration modules for
+The location of kdcproxy's main configuration file is specified by the
+`KDCPROXY_CONFIG` environment variable. If not set, the default locations are
+`/usr/local/etc/kdcproxy.conf` or `/etc/kdcproxy.conf`. This configuration
+file takes precedence over all other configuration modules. This file is an
+ini-style configuration with a special **[global]** section, wildcard realm
+sections, and exact realm sections.
+
+Exact realm sections are named after the realms that kdcproxy is expected to
+receive requests for. Wildcard realm sections differ from exact realm sections
+by being prefixed by a '\*' character. Such sections will match with realms
+having either all or their final labels in common with the section. As an
+example, **[\*EXAMPLE.COM]** will match with `EXAMPLE.COM`, `SUB.EXAMPLE.COM`,
+and `SUB.SUB.EXAMPLE.COM`, but not `MYEXAMPLE.COM`.
+
+The following parameters can be set on any of these sections, with exact realm
+parameters having higher precedence, followed by wildcard realm parameters, and
+then global parameters:
+
+**use_dns** (boolean): Allows querying DNS SRV records (aka. DNS discovery) to
+find KDCs associated with the requested realm in case they are not explicitly
+set in the configuration (main one, or configuration module-provided). By
+default (or if explicitly enabled globally), this mechanism is **activated only
+for realms explicitly declared** in the main (an empty section named after the
+realm, or a matching wildcard realm section is enough) or module-provided
+configuration. To allow use of DNS discovery for any requested realm, see the
+**dns_realm_discovery** parameter.
+
+**silence_port_warn** (boolean): When DNS SRV records are used to discover KDC
+addresses, kdcproxy will write a warning in the logs in case a non-standard
+port is found in the DNS response. Setting this parameter to `true` will
+silence such warnings.
+
+The following parameters are specific to the **[global]** section:
+
+**configs** (string): Allows you to load other configuration modules for
finding configuration in other places. The configuration modules specified in
here will have priority in the order listed. For instance, if you wished to
read configuration from MIT libkrb5, you would set the following:
@@ -68,11 +87,19 @@ read configuration from MIT libkrb5, you would set the following:
[global]
configs = mit
-Aside from the **[global]** section, you may also specify manual configuration
-for realms. In this case, each section is the name of the realm and the
-parameters are **kerberos** or **kpasswd**. These specify the locations of the
-remote servers for krb5 AS requests and kpasswd requests, respectively. For
-example:
+**dns_realm_discovery** (boolean): When **use_dns** is not disabled globally,
+kdcproxy is allowed to query SRV records to find KDCs of the realms declared in
+its configuration only. This protects kdcproxy from attacks based on
+server-side request forgery (CVE-2025-59088). Allowing DNS discovery for
+unknown realms too is possible by also setting **dns_realm_discovery** to true,
+yet heavily discouraged:
+
+ [global]
+ dns_realm_discovery = true
+
+Exact realm sections have 2 specific parameters: **kerberos** and **kpasswd**.
+These specify the locations of the remote servers for Kerberos ticket requests,
+and kpasswd requests, respectively. For example:
[EXAMPLE.COM]
kerberos = kerberos+tcp://kdc.example.com:88
@@ -92,11 +119,10 @@ forwarding requests. The port number is optional. Possible schemes are:
MIT libkrb5
-----------
-If you load the **mit** config module in the master configuration file,
-kdcproxy will also read the config using libkrb5 (usually /etc/krb5.conf). If
-this module is used, kdcproxy will respect the DNS settings from the
-**[libdefaults]** section and the realm configuration from the **[realms]**
-section.
+If you load the **mit** config module in the main configuration file, kdcproxy
+will also read the config using libkrb5 (usually /etc/krb5.conf). If this
+module is used, kdcproxy will respect the realm configuration from the
+**[realms]** section.
For more information, see the documentation for MIT's krb5.conf.
diff --git a/kdcproxy/config/__init__.py b/kdcproxy/config/__init__.py
index 93af69a..0ebb6ea 100644
--- a/kdcproxy/config/__init__.py
+++ b/kdcproxy/config/__init__.py
@@ -20,7 +20,6 @@
# THE SOFTWARE.
import importlib
-import itertools
import logging
import os
@@ -35,38 +34,76 @@ import dns.resolver
logging.basicConfig()
logger = logging.getLogger('kdcproxy')
+SRV_KRB = 'kerberos'
+SRV_KPWD = 'kpasswd'
+SRV_KPWD_ADM = 'kerberos-adm'
+
class IResolver(object):
def lookup(self, realm, kpasswd=False):
+ # type: (str, bool) -> Iterable[str]
"Returns an iterable of remote server URIs."
raise NotImplementedError()
class IConfig(IResolver):
- def use_dns(self):
- "Returns whether or not DNS should be used. Returns None if not set."
+ def realm_configured(self, realm):
+ # type: (str) -> bool
+ """Check if a realm is declared in the configuration."""
+ raise NotImplementedError()
+
+ def param(self, realm, param):
+ # type: (str, str) -> bool
+ """Get a configuration parameter value for a realm.
+
+ None can be passed as realm to query global parameters only.
+ """
raise NotImplementedError()
class KDCProxyConfig(IConfig):
GLOBAL = "global"
- default_filename = "/etc/kdcproxy.conf"
-
- def __init__(self, filename=None):
- self.__cp = configparser.ConfigParser()
- if filename is None:
- filename = os.environ.get("KDCPROXY_CONFIG", None)
- if filename is None:
- filename = self.default_filename
+ default_filenames = ["/etc/kdcproxy.conf"]
+
+ GLOBAL_PARAMS = {
+ 'dns_realm_discovery': False,
+ }
+ GENERAL_PARAMS = {
+ 'use_dns': True,
+ 'silence_port_warn': False,
+ }
+ RESOLV_PARAMS = [SRV_KRB, SRV_KPWD]
+
+ @staticmethod
+ def __get_cfg_param(cp, section, param, typ):
+ """Retrieve a typed parameter from a configuration section."""
+ try:
+ if typ is bool:
+ return cp.getboolean(section, param)
+ elif typ is str:
+ return cp.get(section, param)
+ else:
+ raise ValueError(
+ 'Configuration parameters cannot have "%s" type' %
+ typ.__name__)
+ except configparser.Error:
+ return None
+
+ def __init__(self, filenames=None):
+ cp = configparser.ConfigParser()
+ if filenames is None:
+ filenames = os.environ.get("KDCPROXY_CONFIG", None)
+ if filenames is None:
+ filenames = self.default_filenames
try:
- self.__cp.read(filename)
+ cp.read(filenames)
except configparser.Error:
logger.error("Unable to read config file: %s", filename)
try:
- mod = self.__cp.get(self.GLOBAL, "configs")
+ mod = cp.get(self.GLOBAL, "configs")
try:
importlib.import_module("kdcproxy.config." + mod)
except ImportError as e:
@@ -74,23 +111,98 @@ class KDCProxyConfig(IConfig):
except configparser.Error:
pass
+ self.__config = dict()
+
+ for section in cp.sections():
+ self.__config.setdefault(section, {})
+ for param in self.GENERAL_PARAMS.keys():
+ value = self.__get_cfg_param(cp, section, param, bool)
+ if value is not None:
+ self.__config[section][param] = value
+ if section == self.GLOBAL:
+ for param in self.GLOBAL_PARAMS.keys():
+ value = self.__get_cfg_param(cp, section, param, bool)
+ if value is not None:
+ self.__config[section][param] = value
+ elif not section.startswith('*'):
+ for service in self.RESOLV_PARAMS:
+ servers = self.__get_cfg_param(cp, section, service, str)
+ if servers:
+ self.__config[section][service] = (
+ tuple(servers.split())
+ )
+
+ def __global_forbidden(self, realm):
+ """Raise ValueError if realm name is 'global'."""
+ if realm == self.GLOBAL:
+ raise ValueError('"%s" is not allowed as realm name' % realm)
+
def lookup(self, realm, kpasswd=False):
- service = "kpasswd" if kpasswd else "kerberos"
- try:
- servers = self.__cp.get(realm, service)
- return map(lambda s: s.strip(), servers.strip().split(" "))
- except configparser.Error:
+ self.__global_forbidden(realm)
+ service = SRV_KPWD if kpasswd else SRV_KRB
+ if realm in self.__config and service in self.__config[realm]:
+ return self.__config[realm][service]
+ else:
return ()
- def use_dns(self):
- try:
- return self.__cp.getboolean(self.GLOBAL, "use_dns")
- except configparser.Error:
- return None
+ def realm_configured(self, realm):
+ """Check if a realm is declared in the configuration.
+
+ Matches exact realm sections or wildcard realm sections.
+ """
+ self.__global_forbidden(realm)
+
+ if realm in self.__config:
+ return True
+
+ realm_labels = realm.split('.')
+ for i in range(len(realm_labels)):
+ rule = '*' + '.'.join(realm_labels[i:])
+ if rule in self.__config:
+ return True
+
+ return False
+
+ def param(self, realm, param):
+ """Get a configuration parameter value for a realm.
+
+ None can be passed as realm to query global parameters only.
+ Precedence: exact realm, wildcard realm, global, default.
+ """
+ self.__global_forbidden(realm)
+
+ if realm is not None:
+ if param in self.__config.get(realm, {}):
+ # Parameter found in realm section
+ return self.__config[realm][param]
+
+ realm_labels = realm.split('.')
+ for i in range(len(realm_labels)):
+ rule = '*' + '.'.join(realm_labels[i:])
+ if param in self.__config.get(rule, {}):
+ # Parameter found in realm matching rule
+ return self.__config[rule][param]
+
+ if param in self.__config.get(self.GLOBAL, {}):
+ # Fallback to global section
+ return self.__config[self.GLOBAL][param]
+
+ if param in self.GENERAL_PARAMS:
+ # Fallback to default value if general parameter not set
+ return self.GENERAL_PARAMS[param]
+
+ if param in self.GLOBAL_PARAMS:
+ # Fallback to default value if global parameter not set
+ return self.GLOBAL_PARAMS[param]
+
+ raise ValueError('Configuration parameter "%s" does not exist' % param)
class DNSResolver(IResolver):
+ def __init__(self, log_warning=None):
+ self.__log_warning = log_warning
+
def __dns(self, service, protocol, realm):
query = '_%s._%s.%s' % (service, protocol, realm)
@@ -109,48 +221,38 @@ class DNSResolver(IResolver):
yield (host, entry.port)
def lookup(self, realm, kpasswd=False):
- service = "kpasswd" if kpasswd else "kerberos"
+ service = SRV_KPWD if kpasswd else SRV_KRB
for protocol in ("tcp", "udp"):
- servers = tuple(self.__dns(service, protocol, realm))
+ sv = service
+ servers = tuple(self.__dns(sv, protocol, realm))
if not servers and kpasswd:
- servers = self.__dns("kerberos-adm", protocol, realm)
+ sv = SRV_KPWD_ADM
+ servers = self.__dns(sv, protocol, realm)
for host, port in servers:
+ if self.__log_warning:
+ self.__log_warning(sv, protocol, realm, kpasswd, host,
+ port)
yield "%s://%s:%d" % (service, host, port)
class MetaResolver(IResolver):
- SCHEMES = ("kerberos", "kerberos+tcp", "kerberos+udp",
- "kpasswd", "kpasswd+tcp", "kpasswd+udp",
- "http", "https",)
- def __init__(self):
- self.__resolvers = []
- for i in itertools.count(0):
- allsub = IConfig.__subclasses__()
- if not i < len(allsub):
- break
+ STANDARD_PORTS = {SRV_KRB: 88, SRV_KPWD: 464}
+ def __init__(self):
+ self.__config = KDCProxyConfig()
+ self.__dns_resolver = DNSResolver(self.__log_warning)
+ self.__extra_configs = []
+ for cfgcls in IConfig.__subclasses__():
+ if cfgcls is KDCProxyConfig:
+ continue
try:
- self.__resolvers.append(allsub[i]())
+ self.__extra_configs.append(cfgcls())
except Exception as e:
- fmt = (allsub[i], repr(e))
- logging.log(logging.WARNING,
- "Error instantiating %s due to %s" % fmt)
- assert self.__resolvers
-
- # See if we should use DNS
- dns = None
- for cfg in self.__resolvers:
- tmp = cfg.use_dns()
- if tmp is not None:
- dns = tmp
- break
-
- # If DNS is enabled, append the DNSResolver at the end
- if dns in (None, True):
- self.__resolvers.append(DNSResolver())
+ logging.warning("Error instantiating %s due to %s", cfgcls,
+ repr(e))
def __unique(self, items):
"Removes duplicate items from an iterable while maintaining order."
@@ -161,10 +263,52 @@ class MetaResolver(IResolver):
unique.remove(item)
yield item
+ def __silenced_port_warn(self, realm):
+ """Check if port warnings are silenced for a realm."""
+ return self.__config.param(realm, 'silence_port_warn')
+
+ def __log_warning(self, service, protocol, realm, kpasswd, host, port):
+ """Log a warning if a KDC uses a non-standard port."""
+ if not self.__silenced_port_warn(realm):
+ expected_port = self.STANDARD_PORTS[SRV_KPWD if kpasswd
+ else SRV_KRB]
+ if port != expected_port:
+ logger.warning(
+ 'DNS SRV record _%s._%s.%s. points to KDC %s with '
+ 'non-standard port %i (%i expected)',
+ service, protocol, realm, host, port, expected_port)
+
+ def __realm_configured(self, realm):
+ """Check if realm is declared in any configuration source."""
+ if self.__config.realm_configured(realm):
+ return True
+ for c in self.__extra_configs:
+ if c.realm_configured(realm):
+ return True
+ return False
+
+ def __dns_discovery_allowed(self, realm):
+ """Check if DNS discovery is allowed for a realm."""
+ return (
+ self.__realm_configured(realm)
+ or self.__config.param(None, 'dns_realm_discovery')
+ ) and self.__config.param(realm, 'use_dns')
+
def lookup(self, realm, kpasswd=False):
- for r in self.__resolvers:
- servers = tuple(self.__unique(r.lookup(realm, kpasswd)))
+ servers = tuple(self.__unique(self.__config.lookup(realm, kpasswd)))
+ if servers:
+ return servers
+
+ for c in self.__extra_configs:
+ servers = tuple(self.__unique(c.lookup(realm, kpasswd)))
if servers:
return servers
+ # The scope of realms we are allowed to use DNS discovery for depends
+ # on the configuration
+ if self.__dns_discovery_allowed(realm):
+ servers = tuple(self.__unique(
+ self.__dns_resolver.lookup(realm, kpasswd)))
+ return servers
+
return ()
diff --git a/kdcproxy/config/mit.py b/kdcproxy/config/mit.py
index 1af4167..cd80f6b 100644
--- a/kdcproxy/config/mit.py
+++ b/kdcproxy/config/mit.py
@@ -232,19 +232,9 @@ class MITConfig(IConfig):
def __init__(self, *args, **kwargs):
self.__config = {}
with KRB5Profile() as prof:
- # Load DNS setting
- self.__config["dns"] = prof.get_bool("libdefaults",
- "dns_fallback",
- default=True)
- if "dns_lookup_kdc" in dict(prof.section("libdefaults")):
- self.__config["dns"] = prof.get_bool("libdefaults",
- "dns_lookup_kdc",
- default=True)
-
# Load all configured realms
- self.__config["realms"] = {}
for realm, values in prof.section("realms"):
- rconf = self.__config["realms"].setdefault(realm, {})
+ rconf = self.__config.setdefault(realm, {})
for server, hostport in values:
if server not in self.CONFIG_KEYS:
continue
@@ -261,7 +251,7 @@ class MITConfig(IConfig):
rconf.setdefault(server, []).append(parsed.geturl())
def lookup(self, realm, kpasswd=False):
- rconf = self.__config.get("realms", {}).get(realm, {})
+ rconf = self.__config.get(realm, {})
if kpasswd:
servers = list(rconf.get('kpasswd_server', []))
@@ -271,8 +261,13 @@ class MITConfig(IConfig):
return tuple(servers)
- def use_dns(self, default=True):
- return self.__config["dns"]
+ def realm_configured(self, realm):
+ """Check if a realm is declared in the MIT krb5 configuration."""
+ return realm in self.__config
+
+ def param(self, realm, param):
+ """Always None. MIT krb5 config only provides server addresses."""
+ return None
if __name__ == "__main__":
diff --git a/tests.py b/tests.py
index e3fa4f7..6464494 100644
--- a/tests.py
+++ b/tests.py
@@ -19,10 +19,12 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
+import contextlib
import os
import socket
import struct
import sys
+import tempfile
import unittest
from base64 import b64decode
try:
@@ -314,11 +316,24 @@ class KDCProxyCodecTests(unittest.TestCase):
class KDCProxyConfigTests(unittest.TestCase):
+ @contextlib.contextmanager
+ def temp_config_file(self, content):
+ with tempfile.NamedTemporaryFile(
+ mode="w", delete=False, suffix=".conf"
+ ) as f:
+ f.write(content)
+ config_file = f.name
+
+ try:
+ yield config_file
+ finally:
+ os.remove(config_file)
+
def test_mit_config(self):
with mock.patch.dict('os.environ', {'KRB5_CONFIG': KRB5_CONFIG}):
cfg = mit.MITConfig()
- self.assertIs(cfg.use_dns(), False)
+ self.assertIs(cfg.param('KDCPROXY.TEST', 'use_dns'), None)
self.assertEqual(
cfg.lookup('KDCPROXY.TEST'),
(
@@ -398,6 +413,832 @@ class KDCProxyConfigTests(unittest.TestCase):
m_query.assert_any_call('_kpasswd._udp.KDCPROXY.TEST', RDTYPE_SRV)
m_query.assert_any_call('_kerberos-adm._udp.KDCPROXY.TEST', RDTYPE_SRV)
+ def test_kdcproxy_config_realm_configured(self):
+ with self.temp_config_file(
+ """[REALM1.TEST]
+ kerberos = kerberos://kdc1.realm1.test:88
+ [REALM2.TEST]
+ kpasswd = kpasswd://kpwd.realm2.test:464\n"""
+ ) as config_file:
+ cfg = config.KDCProxyConfig(filenames=[config_file])
+
+ # Test configured realms
+ self.assertTrue(cfg.realm_configured("REALM1.TEST"))
+ self.assertTrue(cfg.realm_configured("REALM2.TEST"))
+
+ # Test unconfigured realm
+ self.assertFalse(cfg.realm_configured("UNKNOWN.TEST"))
+
+ # Test that 'global' cannot be used as realm name
+ with self.assertRaises(ValueError):
+ cfg.realm_configured("global")
+
+ def test_kdcproxy_config_param(self):
+ with self.temp_config_file(
+ """[global]
+ silence_port_warn = true
+ [REALM1.TEST]
+ use_dns = false
+ kerberos = kerberos://kdc1.realm1.test:88
+ [REALM2.TEST]
+ kerberos = kerberos://kdc2.realm2.test:88"""
+ ) as config_file:
+ cfg = config.KDCProxyConfig(filenames=[config_file])
+
+ # Test realm-specific parameter overrides global
+ self.assertFalse(cfg.param("REALM1.TEST", "use_dns"))
+
+ # Test fallback to global parameter
+ self.assertTrue(cfg.param("REALM1.TEST", "silence_port_warn"))
+ self.assertTrue(cfg.param("REALM2.TEST", "use_dns"))
+ self.assertTrue(cfg.param("REALM2.TEST", "silence_port_warn"))
+
+ # Test invalid parameter
+ with self.assertRaises(ValueError):
+ cfg.param("REALM1.TEST", "invalid_param")
+
+ # Test that 'global' cannot be used as realm name
+ with self.assertRaises(ValueError):
+ cfg.param("global", "use_dns")
+
+ def test_kdcproxy_config_lookup(self):
+ with self.temp_config_file(
+ "[REALM.TEST]\n"
+ "kerberos = kerberos://kdc1.test:88 "
+ "kerberos://kdc2.test:88\n"
+ "kpasswd = kpasswd://kpwd.test:464"
+ ) as config_file:
+ cfg = config.KDCProxyConfig(filenames=[config_file])
+
+ # Test kerberos lookup
+ self.assertEqual(
+ cfg.lookup("REALM.TEST"),
+ ("kerberos://kdc1.test:88", "kerberos://kdc2.test:88"),
+ )
+
+ # Test kpasswd lookup
+ self.assertEqual(
+ cfg.lookup("REALM.TEST", kpasswd=True),
+ ("kpasswd://kpwd.test:464",),
+ )
+
+ # Test unconfigured realm
+ self.assertEqual(cfg.lookup("UNKNOWN.TEST"), ())
+
+ # Test that 'global' cannot be used as realm name
+ with self.assertRaises(ValueError):
+ cfg.lookup("global")
+
+ @mock.patch("dns.resolver.query")
+ def test_dns_blocked_for_undeclared_realms(self, m_query):
+ with mock.patch.object(config.KDCProxyConfig, "default_filenames", []):
+ resolver = config.MetaResolver()
+
+ # DNS should NOT be used for unconfigured realm
+ result = resolver.lookup("UNCONFIGURED.TEST")
+ self.assertEqual(result, ())
+ m_query.assert_not_called()
+
+ @mock.patch("dns.resolver.query")
+ def test_use_dns_false_disables_dns_discovery(self, m_query):
+ # Test exact realm section
+ with self.temp_config_file(
+ """[global]
+ use_dns = false
+ [REALM.TEST]
+ ; Exact realm declared but no servers specified"""
+ ) as config_file:
+ with mock.patch.object(
+ config.KDCProxyConfig, "default_filenames", [config_file]
+ ):
+ resolver = config.MetaResolver()
+
+ # DNS should NOT be used when use_dns is false for exact realm
+ result = resolver.lookup("REALM.TEST")
+ self.assertEqual(result, ())
+ m_query.assert_not_called()
+
+ # Test wildcard realm section
+ m_query.reset_mock()
+ with self.temp_config_file(
+ """[global]
+ use_dns = false
+ [*EXAMPLE.COM]
+ ; Wildcard realm declared but no servers specified"""
+ ) as config_file:
+ with mock.patch.object(
+ config.KDCProxyConfig, "default_filenames", [config_file]
+ ):
+ resolver = config.MetaResolver()
+
+ # DNS should NOT be used when use_dns is false for wildcard
+ # realm
+ result = resolver.lookup("SUB.EXAMPLE.COM")
+ self.assertEqual(result, ())
+ m_query.assert_not_called()
+
+ @mock.patch("dns.resolver.query")
+ def test_use_dns_true_enables_dns_for_declared_realms(self, m_query):
+ # Test exact realm section
+ with self.temp_config_file(
+ """[global]
+ use_dns = true
+ [REALM.TEST]
+ ; Exact realm declared but no servers specified"""
+ ) as config_file:
+ tcp_srv = [self.mksrv("0 0 88 kdc.realm.test.")]
+ udp_srv = []
+ m_query.side_effect = [tcp_srv, udp_srv]
+
+ with mock.patch.object(
+ config.KDCProxyConfig, "default_filenames", [config_file]
+ ):
+ resolver = config.MetaResolver()
+
+ # DNS SHOULD be used when exact realm is declared and use_dns
+ # is true
+ result = resolver.lookup("REALM.TEST")
+ self.assertEqual(result, ("kerberos://kdc.realm.test:88",))
+ self.assertEqual(m_query.call_count, 2)
+
+ # Test wildcard realm section
+ m_query.reset_mock()
+ with self.temp_config_file(
+ """[global]
+ use_dns = true
+ [*EXAMPLE.COM]
+ ; Wildcard realm declared but no servers specified"""
+ ) as config_file:
+ tcp_srv = [self.mksrv("0 0 88 kdc.sub.example.com.")]
+ udp_srv = []
+ m_query.side_effect = [tcp_srv, udp_srv]
+
+ with mock.patch.object(
+ config.KDCProxyConfig, "default_filenames", [config_file]
+ ):
+ resolver = config.MetaResolver()
+
+ # DNS SHOULD be used when wildcard realm matches and use_dns
+ # is true
+ result = resolver.lookup("SUB.EXAMPLE.COM")
+ self.assertEqual(
+ result, ("kerberos://kdc.sub.example.com:88",)
+ )
+ self.assertEqual(m_query.call_count, 2)
+
+ @mock.patch("logging.Logger.warning")
+ @mock.patch("dns.resolver.query")
+ def test_dns_discovery_warns_on_nonstandard_port(
+ self, m_query, m_log_warning
+ ):
+ # Test exact realm section
+ with self.temp_config_file(
+ """[REALM.TEST]"""
+ ) as config_file:
+ # DNS returns KDC on non-standard port
+ tcp_srv = [self.mksrv("0 0 1088 kdc.realm.test.")]
+ udp_srv = []
+ m_query.side_effect = [tcp_srv, udp_srv]
+
+ with mock.patch.object(
+ config.KDCProxyConfig, "default_filenames", [config_file]
+ ):
+ resolver = config.MetaResolver()
+ result = resolver.lookup("REALM.TEST")
+
+ # Should return the server
+ self.assertEqual(result, ("kerberos://kdc.realm.test:1088",))
+
+ # Should log warning about non-standard port for exact realm
+ m_log_warning.assert_called_once()
+ args = m_log_warning.call_args[0]
+ self.assertIn("non-standard port", args[0])
+ self.assertEqual(args[5], 1088) # port
+ self.assertEqual(args[6], 88) # expected port
+
+ # Test wildcard realm section
+ m_query.reset_mock()
+ m_log_warning.reset_mock()
+ with self.temp_config_file(
+ """[*EXAMPLE.COM]"""
+ ) as config_file:
+ # DNS returns KDC on non-standard port
+ tcp_srv = [self.mksrv("0 0 1088 kdc.sub.example.com.")]
+ udp_srv = []
+ m_query.side_effect = [tcp_srv, udp_srv]
+
+ with mock.patch.object(
+ config.KDCProxyConfig, "default_filenames", [config_file]
+ ):
+ resolver = config.MetaResolver()
+ result = resolver.lookup("SUB.EXAMPLE.COM")
+
+ # Should return the server
+ self.assertEqual(
+ result, ("kerberos://kdc.sub.example.com:1088",)
+ )
+
+ # Should log warning about non-standard port for wildcard realm
+ m_log_warning.assert_called_once()
+ args = m_log_warning.call_args[0]
+ self.assertIn("non-standard port", args[0])
+ self.assertEqual(args[5], 1088) # port
+ self.assertEqual(args[6], 88) # expected port
+
+ @mock.patch("logging.Logger.warning")
+ @mock.patch("dns.resolver.query")
+ def test_silence_port_warn_suppresses_nonstandard_port_warnings(
+ self, m_query, m_log_warning
+ ):
+ # Test exact realm section
+ with self.temp_config_file(
+ """[REALM.TEST]
+ silence_port_warn = true"""
+ ) as config_file:
+ # DNS returns KDC on non-standard port
+ tcp_srv = [self.mksrv("0 0 1088 kdc.realm.test.")]
+ udp_srv = []
+ m_query.side_effect = [tcp_srv, udp_srv]
+
+ with mock.patch.object(
+ config.KDCProxyConfig, "default_filenames", [config_file]
+ ):
+ resolver = config.MetaResolver()
+ result = resolver.lookup("REALM.TEST")
+
+ # Should return the server
+ self.assertEqual(result, ("kerberos://kdc.realm.test:1088",))
+
+ # Should NOT log warning when silenced for exact realm
+ m_log_warning.assert_not_called()
+
+ # Test wildcard realm section
+ m_query.reset_mock()
+ m_log_warning.reset_mock()
+ with self.temp_config_file(
+ """[*EXAMPLE.COM]
+ silence_port_warn = true"""
+ ) as config_file:
+ # DNS returns KDC on non-standard port
+ tcp_srv = [self.mksrv("0 0 1088 kdc.sub.example.com.")]
+ udp_srv = []
+ m_query.side_effect = [tcp_srv, udp_srv]
+
+ with mock.patch.object(
+ config.KDCProxyConfig, "default_filenames", [config_file]
+ ):
+ resolver = config.MetaResolver()
+ result = resolver.lookup("SUB.EXAMPLE.COM")
+
+ # Should return the server
+ self.assertEqual(
+ result, ("kerberos://kdc.sub.example.com:1088",)
+ )
+
+ # Should NOT log warning when silenced for wildcard realm
+ m_log_warning.assert_not_called()
+
+ @mock.patch("dns.resolver.query")
+ def test_configured_servers_preferred_over_dns_discovery(self, m_query):
+ # Create a config with servers configured
+ with self.temp_config_file(
+ """[REALM.TEST]
+ kerberos = kerberos://configured-kdc.test:88"""
+ ) as config_file:
+ with mock.patch.object(
+ config.KDCProxyConfig, "default_filenames", [config_file]
+ ):
+ resolver = config.MetaResolver()
+ result = resolver.lookup("REALM.TEST")
+
+ # Should return configured server, not DNS
+ self.assertEqual(
+ result, ("kerberos://configured-kdc.test:88",)
+ )
+
+ # DNS should not be queried when servers are configured
+ m_query.assert_not_called()
+
+ @mock.patch("dns.resolver.query")
+ def test_mit_realm_prefers_configured_servers_over_dns(self, m_query):
+ # Test that realm in MIT config uses configured servers even when
+ # use_dns = true
+ with self.temp_config_file(
+ """[global]
+ use_dns = true
+ configs = mit"""
+ ) as config_file:
+ with mock.patch.dict(
+ "os.environ", {"KRB5_CONFIG": KRB5_CONFIG}
+ ), mock.patch.object(
+ config.KDCProxyConfig, "default_filenames", [config_file]
+ ):
+ resolver = config.MetaResolver()
+ result = resolver.lookup("KDCPROXY.TEST")
+
+ # Should return MIT-configured servers (from tests.krb5.conf)
+ self.assertEqual(
+ result,
+ (
+ "kerberos://k1.kdcproxy.test.:88",
+ "kerberos://k2.kdcproxy.test.:1088",
+ ),
+ )
+
+ # DNS should NOT be queried when servers are in MIT config
+ m_query.assert_not_called()
+
+ @mock.patch("dns.resolver.query")
+ def test_mit_realm_uses_configured_servers_when_use_dns_false(
+ self, m_query
+ ):
+ # Test that realm in MIT config uses configured servers when
+ # use_dns = false
+ with self.temp_config_file(
+ """[global]
+ use_dns = false
+ configs = mit"""
+ ) as config_file:
+ with mock.patch.dict(
+ "os.environ", {"KRB5_CONFIG": KRB5_CONFIG}
+ ), mock.patch.object(
+ config.KDCProxyConfig, "default_filenames", [config_file]
+ ):
+ resolver = config.MetaResolver()
+ result = resolver.lookup("KDCPROXY.TEST")
+
+ # Should return MIT-configured servers
+ self.assertEqual(
+ result,
+ (
+ "kerberos://k1.kdcproxy.test.:88",
+ "kerberos://k2.kdcproxy.test.:1088",
+ ),
+ )
+
+ # DNS should NOT be queried
+ m_query.assert_not_called()
+
+ @mock.patch("dns.resolver.query")
+ def test_mit_kpasswd_prefers_configured_servers_over_dns(self, m_query):
+ # Test that kpasswd servers from MIT config are used even when
+ # use_dns = true
+ with self.temp_config_file(
+ """[global]
+ use_dns = true
+ configs = mit"""
+ ) as config_file:
+ with mock.patch.dict(
+ "os.environ", {"KRB5_CONFIG": KRB5_CONFIG}
+ ), mock.patch.object(
+ config.KDCProxyConfig, "default_filenames", [config_file]
+ ):
+ resolver = config.MetaResolver()
+ result = resolver.lookup("KDCPROXY.TEST", kpasswd=True)
+
+ # Should return MIT-configured kpasswd servers
+ self.assertEqual(
+ result,
+ (
+ "kpasswd://adm.kdcproxy.test.:1749",
+ "kpasswd://adm.kdcproxy.test.",
+ ),
+ )
+
+ # DNS should NOT be queried
+ m_query.assert_not_called()
+
+ @mock.patch("dns.resolver.query")
+ def test_kdcproxy_declared_realm_uses_dns_when_no_servers(self, m_query):
+ # Test that a realm in kdcproxy.conf (but not MIT) will use DNS when no
+ # servers are configured
+ with self.temp_config_file(
+ """[global]
+ configs = mit
+ [REALM.TEST]
+ ; Realm section exists but no servers configured"""
+ ) as config_file:
+ tcp_srv = [self.mksrv("0 0 88 kdc.realm.test.")]
+ udp_srv = []
+ m_query.side_effect = [tcp_srv, udp_srv]
+
+ with mock.patch.dict(
+ "os.environ", {"KRB5_CONFIG": KRB5_CONFIG}
+ ), mock.patch.object(
+ config.KDCProxyConfig, "default_filenames", [config_file]
+ ):
+ resolver = config.MetaResolver()
+ result = resolver.lookup("REALM.TEST")
+
+ # Should use DNS since realm is in config but has no servers
+ self.assertEqual(result, ("kerberos://kdc.realm.test:88",))
+ self.assertEqual(m_query.call_count, 2)
+
+ @mock.patch("dns.resolver.query")
+ def test_realm_specific_use_dns_overrides_global(self, m_query):
+ # Test that realm-specific use_dns overrides global setting for a realm
+ # that's in MIT config
+ with self.temp_config_file(
+ """[global]
+ use_dns = true
+ configs = mit
+ [KDCPROXY.TEST]
+ use_dns = false"""
+ ) as config_file:
+ with mock.patch.dict(
+ "os.environ", {"KRB5_CONFIG": KRB5_CONFIG}
+ ), mock.patch.object(
+ config.KDCProxyConfig, "default_filenames", [config_file]
+ ):
+ resolver = config.MetaResolver()
+
+ # First check: should return MIT servers
+ result = resolver.lookup("KDCPROXY.TEST")
+ self.assertEqual(
+ result,
+ (
+ "kerberos://k1.kdcproxy.test.:88",
+ "kerberos://k2.kdcproxy.test.:1088",
+ ),
+ )
+
+ # DNS should not be queried due to realm override
+ m_query.assert_not_called()
+
+ @mock.patch("dns.resolver.query")
+ def test_kdcproxy_servers_override_mit_servers(self, m_query):
+ # Test that servers configured in kdcproxy.conf take precedence over
+ # MIT config servers
+ with self.temp_config_file(
+ """[global]
+ configs = mit
+ [KDCPROXY.TEST]
+ kerberos = kerberos://override.test:88"""
+ ) as config_file:
+ with mock.patch.dict(
+ "os.environ", {"KRB5_CONFIG": KRB5_CONFIG}
+ ), mock.patch.object(
+ config.KDCProxyConfig, "default_filenames", [config_file]
+ ):
+ resolver = config.MetaResolver()
+ result = resolver.lookup("KDCPROXY.TEST")
+
+ # Should return kdcproxy.conf servers, not MIT servers
+ self.assertEqual(result, ("kerberos://override.test:88",))
+
+ # DNS should not be queried
+ m_query.assert_not_called()
+
+ @mock.patch("dns.resolver.query")
+ def test_undeclared_realm_blocks_dns_despite_use_dns_true(self, m_query):
+ # Test that a realm NOT in MIT and NOT in kdcproxy.conf will NOT use
+ # DNS even with use_dns = true (security restriction)
+ with self.temp_config_file(
+ """[global]
+ use_dns = true
+ configs = mit"""
+ ) as config_file:
+ with mock.patch.dict(
+ "os.environ", {"KRB5_CONFIG": KRB5_CONFIG}
+ ), mock.patch.object(
+ config.KDCProxyConfig, "default_filenames", [config_file]
+ ):
+ resolver = config.MetaResolver()
+ result = resolver.lookup("UNCONFIGURED.REALM")
+
+ # Should return empty - no DNS lookup
+ self.assertEqual(result, ())
+
+ # DNS should NOT be queried for unconfigured realm
+ m_query.assert_not_called()
+
+ @mock.patch("dns.resolver.query")
+ def test_mit_declared_realm_without_servers_uses_dns(self, m_query):
+ # Test that a realm in MIT config but WITHOUT KDC servers configured
+ # will use DNS
+
+ # Create a krb5.conf with a realm section but no kdc entries
+ with tempfile.NamedTemporaryFile(
+ mode="w", delete=False, suffix=".conf"
+ ) as krb5_file:
+ krb5_file.write(
+ """[libdefaults]
+ default_realm = EMPTY.REALM
+
+ [realms]
+ EMPTY.REALM = {
+ default_domain = empty.realm
+ }"""
+ )
+ krb5_conf = krb5_file.name
+
+ # Create kdcproxy.conf
+ with self.temp_config_file(
+ """[global]
+ configs = mit"""
+ ) as config_file:
+ tcp_srv = [self.mksrv("0 0 88 kdc.empty.realm.")]
+ udp_srv = []
+ m_query.side_effect = [tcp_srv, udp_srv]
+
+ with mock.patch.dict(
+ "os.environ", {"KRB5_CONFIG": krb5_conf}
+ ), mock.patch.object(
+ config.KDCProxyConfig, "default_filenames", [config_file]
+ ):
+ resolver = config.MetaResolver()
+ result = resolver.lookup("EMPTY.REALM")
+
+ # Should use DNS because:
+ # 1. Realm is in MIT config (realm_configured returns True)
+ # 2. No servers configured in MIT config
+ # 3. use_dns enabled globally by default
+ self.assertEqual(result, ("kerberos://kdc.empty.realm:88",))
+
+ # DNS SHOULD be queried
+ self.assertEqual(m_query.call_count, 2)
+ m_query.assert_any_call(
+ "_kerberos._tcp.EMPTY.REALM", RDTYPE_SRV
+ )
+ m_query.assert_any_call(
+ "_kerberos._udp.EMPTY.REALM", RDTYPE_SRV
+ )
+ os.remove(krb5_conf)
+
+ def test_kdcproxy_config_realm_wildcard_matching(self):
+ # Test realm matching with wildcard patterns
+ with self.temp_config_file(
+ """[global]
+ use_dns = false
+ [SPECIFIC.SUB.EXAMPLE.COM]
+ kerberos = kerberos://specific.example.com:88
+ [*SUB.EXAMPLE.COM]
+ use_dns = true
+ [*EXAMPLE.COM]
+ silence_port_warn = true"""
+ ) as config_file:
+ cfg = config.KDCProxyConfig(filenames=[config_file])
+
+ # Test exact match
+ self.assertTrue(cfg.realm_configured("SPECIFIC.SUB.EXAMPLE.COM"))
+ self.assertEqual(
+ cfg.lookup("SPECIFIC.SUB.EXAMPLE.COM"),
+ ("kerberos://specific.example.com:88",),
+ )
+
+ # Test wildcard matching for *SUB.EXAMPLE.COM
+ self.assertTrue(cfg.realm_configured("OTHER.SUB.EXAMPLE.COM"))
+ # Wildcard sections don't support kerberos/kpasswd params
+ self.assertEqual(cfg.lookup("OTHER.SUB.EXAMPLE.COM"), ())
+
+ # Test wildcard matching for *EXAMPLE.COM
+ self.assertTrue(cfg.realm_configured("FOO.EXAMPLE.COM"))
+ self.assertEqual(cfg.lookup("FOO.EXAMPLE.COM"), ())
+
+ # Test wildcard matches exact realm name (EXAMPLE.COM matches
+ # *EXAMPLE.COM)
+ self.assertTrue(cfg.realm_configured("EXAMPLE.COM"))
+ self.assertTrue(cfg.param("EXAMPLE.COM", "silence_port_warn"))
+
+ # Test multi-level subdomain matches wildcard
+ self.assertTrue(cfg.realm_configured("A.B.C.EXAMPLE.COM"))
+
+ # Test non-matching realm (MYEXAMPLE.COM should NOT match
+ # *EXAMPLE.COM)
+ self.assertFalse(cfg.realm_configured("MYEXAMPLE.COM"))
+ self.assertEqual(cfg.lookup("MYEXAMPLE.COM"), ())
+
+ # Test other non-matching realm
+ self.assertFalse(cfg.realm_configured("OTHER.DOMAIN"))
+ self.assertEqual(cfg.lookup("OTHER.DOMAIN"), ())
+
+ def test_kdcproxy_config_param_wildcard_matching(self):
+ # Test parameter lookup with wildcard patterns
+ with self.temp_config_file(
+ """[global]
+ use_dns = false
+ silence_port_warn = false
+ [*EXAMPLE.COM]
+ use_dns = true
+ silence_port_warn = true
+ [SPECIFIC.EXAMPLE.COM]
+ silence_port_warn = false"""
+ ) as config_file:
+ cfg = config.KDCProxyConfig(filenames=[config_file])
+
+ # Test exact match takes precedence for parameters
+ self.assertTrue(cfg.param("SPECIFIC.EXAMPLE.COM", "use_dns"))
+ self.assertFalse(
+ cfg.param("SPECIFIC.EXAMPLE.COM", "silence_port_warn")
+ )
+
+ # Test wildcard parameter matching
+ self.assertTrue(cfg.param("OTHER.EXAMPLE.COM", "use_dns"))
+ self.assertTrue(
+ cfg.param("OTHER.EXAMPLE.COM", "silence_port_warn")
+ )
+
+ # Test fallback to global when no wildcard match
+ self.assertFalse(cfg.param("OTHER.DOMAIN", "use_dns"))
+ self.assertFalse(cfg.param("OTHER.DOMAIN", "silence_port_warn"))
+
+ def test_wildcard_specificity_determines_priority(self):
+ # Test that more specific wildcards take precedence
+ with self.temp_config_file(
+ """[global]
+ use_dns = false
+ [*EXAMPLE.COM]
+ silence_port_warn = true
+ [*SUB.EXAMPLE.COM]
+ use_dns = true"""
+ ) as config_file:
+ cfg = config.KDCProxyConfig(filenames=[config_file])
+
+ # More specific wildcard (*SUB.EXAMPLE.COM) should match first
+ self.assertTrue(cfg.param("FOO.SUB.EXAMPLE.COM", "use_dns"))
+ # Should also get parameter from broader wildcard
+ self.assertTrue(
+ cfg.param("FOO.SUB.EXAMPLE.COM", "silence_port_warn")
+ )
+
+ # Broader wildcard should match other subdomains
+ self.assertTrue(
+ cfg.param("FOO.OTHER.EXAMPLE.COM", "silence_port_warn")
+ )
+ # Should fallback to global for use_dns
+ self.assertFalse(cfg.param("FOO.OTHER.EXAMPLE.COM", "use_dns"))
+
+ @mock.patch("dns.resolver.query")
+ def test_kdcproxy_config_exact_realm_priority_over_wildcard(self, m_query):
+ # Test that exact realm sections take precedence over wildcard sections
+ with self.temp_config_file(
+ """[global]
+ use_dns = false
+ silence_port_warn = false
+ [*EXAMPLE.COM]
+ use_dns = true
+ silence_port_warn = true
+ [SPECIFIC.EXAMPLE.COM]
+ kerberos = kerberos://specific-kdc.example.com:88
+ use_dns = false"""
+ ) as config_file:
+ with mock.patch.object(
+ config.KDCProxyConfig, "default_filenames", [config_file]
+ ):
+ resolver = config.MetaResolver()
+
+ # Exact realm section should take priority
+ self.assertTrue(
+ resolver._MetaResolver__config.realm_configured(
+ "SPECIFIC.EXAMPLE.COM"
+ )
+ )
+
+ # Should get kerberos from exact realm section
+ result = resolver.lookup("SPECIFIC.EXAMPLE.COM")
+ self.assertEqual(
+ result,
+ ("kerberos://specific-kdc.example.com:88",),
+ )
+
+ # DNS should NOT be called because:
+ # 1. Exact realm has configured servers
+ # 2. Exact realm has use_dns=false (takes priority over
+ # wildcard)
+ m_query.assert_not_called()
+
+ # Verify exact realm's use_dns=false takes priority
+ self.assertFalse(
+ resolver._MetaResolver__config.param(
+ "SPECIFIC.EXAMPLE.COM", "use_dns"
+ )
+ )
+
+ # Should get silence_port_warn from wildcard since not in exact
+ # section
+ self.assertTrue(
+ resolver._MetaResolver__config.param(
+ "SPECIFIC.EXAMPLE.COM", "silence_port_warn"
+ )
+ )
+
+ def test_dns_realm_discovery_param_defaults_false(self):
+ # Test the dns_realm_discovery global parameter
+ with self.temp_config_file(
+ """[global]
+ dns_realm_discovery = true"""
+ ) as config_file:
+ cfg = config.KDCProxyConfig(filenames=[config_file])
+
+ # Test that dns_realm_discovery can be read
+ self.assertTrue(cfg.param(None, "dns_realm_discovery"))
+
+ # Test default value when not specified
+ cfg2 = config.KDCProxyConfig(filenames=[])
+ self.assertFalse(cfg2.param(None, "dns_realm_discovery"))
+
+ @mock.patch("dns.resolver.query")
+ def test_dns_realm_discovery_true_allows_undeclared_realms(self, m_query):
+ # Test that dns_realm_discovery allows DNS for unconfigured realms
+ with self.temp_config_file(
+ """[global]
+ dns_realm_discovery = true"""
+ ) as config_file:
+ tcp_srv = [self.mksrv("0 0 88 kdc.unconfigured.test.")]
+ udp_srv = []
+ m_query.side_effect = [tcp_srv, udp_srv]
+
+ with mock.patch.object(
+ config.KDCProxyConfig, "default_filenames", [config_file]
+ ):
+ resolver = config.MetaResolver()
+
+ # DNS SHOULD be used for unconfigured realm when
+ # dns_realm_discovery = true
+ result = resolver.lookup("UNCONFIGURED.TEST")
+ self.assertEqual(
+ result, ("kerberos://kdc.unconfigured.test:88",)
+ )
+ self.assertEqual(m_query.call_count, 2)
+
+ @mock.patch("dns.resolver.query")
+ def test_dns_realm_discovery_false_blocks_undeclared_realms(self, m_query):
+ # Test that dns_realm_discovery=false restricts DNS to configured
+ # realms
+ with self.temp_config_file(
+ """[global]
+ dns_realm_discovery = false"""
+ ) as config_file:
+ with mock.patch.object(
+ config.KDCProxyConfig, "default_filenames", [config_file]
+ ):
+ resolver = config.MetaResolver()
+
+ # DNS should NOT be used for unconfigured realm when
+ # dns_realm_discovery = false
+ result = resolver.lookup("UNCONFIGURED.TEST")
+ self.assertEqual(result, ())
+ m_query.assert_not_called()
+
+ @mock.patch("dns.resolver.query")
+ def test_wildcard_realm_uses_dns_despite_dns_realm_discovery_false(
+ self, m_query
+ ):
+ # Test that wildcard-matched realms can use DNS discovery
+ with self.temp_config_file(
+ """[global]
+ dns_realm_discovery = false
+ [*EXAMPLE.COM]"""
+ ) as config_file:
+ tcp_srv = [self.mksrv("0 0 88 kdc.sub.example.com.")]
+ udp_srv = []
+ m_query.side_effect = [tcp_srv, udp_srv]
+
+ with mock.patch.object(
+ config.KDCProxyConfig, "default_filenames", [config_file]
+ ):
+ resolver = config.MetaResolver()
+
+ # DNS SHOULD be used for wildcard-matched realm even when
+ # dns_realm_discovery = false
+ result = resolver.lookup("SUB.EXAMPLE.COM")
+ self.assertEqual(
+ result, ("kerberos://kdc.sub.example.com:88",)
+ )
+ self.assertEqual(m_query.call_count, 2)
+
+ @mock.patch("dns.resolver.query")
+ def test_use_dns_defaults_to_true(self, m_query):
+ # Test that use_dns defaults to true when not set
+ with self.temp_config_file(
+ """[REALM.TEST]
+ ; Realm declared but use_dns not specified"""
+ ) as config_file:
+ tcp_srv = [self.mksrv("0 0 88 kdc.realm.test.")]
+ udp_srv = []
+ m_query.side_effect = [tcp_srv, udp_srv]
+
+ with mock.patch.object(
+ config.KDCProxyConfig, "default_filenames", [config_file]
+ ):
+ resolver = config.MetaResolver()
+
+ # DNS SHOULD be used when use_dns is not set (defaults to true)
+ result = resolver.lookup("REALM.TEST")
+ self.assertEqual(result, ("kerberos://kdc.realm.test:88",))
+ self.assertEqual(m_query.call_count, 2)
+
+ @mock.patch("dns.resolver.query")
+ def test_dns_realm_discovery_defaults_to_false(self, m_query):
+ # Test that dns_realm_discovery defaults to false for security
+ with mock.patch.object(config.KDCProxyConfig, "default_filenames", []):
+ resolver = config.MetaResolver()
+
+ # DNS should NOT be used for unconfigured realm by default
+ result = resolver.lookup("UNCONFIGURED.TEST")
+ self.assertEqual(result, ())
+ m_query.assert_not_called()
+
if __name__ == "__main__":
unittest.main()
--
2.51.0