286 lines
11 KiB
Diff
286 lines
11 KiB
Diff
From 72cc54bc2798125a7e0ca3eb08fcc8b973b8844e Mon Sep 17 00:00:00 2001
|
|
From: Jan Cholasta <jcholast@redhat.com>
|
|
Date: Tue, 31 Jul 2012 06:37:14 -0400
|
|
Subject: [PATCH 78/79] Make --{set,add,del}attr more robust.
|
|
|
|
This fixes --addattr on single value attributes in add commands and --delattr
|
|
on non-unicode attributes in mod commands.
|
|
|
|
ticket 2954
|
|
---
|
|
ipalib/plugins/baseldap.py | 89 +++++++++++++++++++++++-------------------
|
|
tests/test_xmlrpc/test_attr.py | 85 ++++++++++++++++++++++++++++++++++++++--
|
|
2 files changed, 131 insertions(+), 43 deletions(-)
|
|
|
|
diff --git a/ipalib/plugins/baseldap.py b/ipalib/plugins/baseldap.py
|
|
index 6a37995c57cd9d57280186d15f209426aa0776f2..32dae5160a00c927855990211305706f332e77db 100644
|
|
--- a/ipalib/plugins/baseldap.py
|
|
+++ b/ipalib/plugins/baseldap.py
|
|
@@ -784,36 +784,45 @@ last, after all sets and adds."""),
|
|
:param attrs: A list of name/value pair strings, in the "name=value"
|
|
format. May also be a single string, or None.
|
|
"""
|
|
-
|
|
- newdict = {}
|
|
if attrs is None:
|
|
- attrs = []
|
|
- elif not type(attrs) in (list, tuple):
|
|
+ return {}
|
|
+
|
|
+ if not isinstance(attrs, (tuple, list)):
|
|
attrs = [attrs]
|
|
+
|
|
+ newdict = {}
|
|
for a in attrs:
|
|
- m = re.match("\s*(.*?)\s*=\s*(.*?)\s*$", a)
|
|
- attr = str(m.group(1)).lower()
|
|
- value = m.group(2)
|
|
+ m = re.match("^\s*(?P<attr>.*?)\s*=\s*(?P<value>.*?)\s*$", a)
|
|
+ attr = str(m.group('attr').lower())
|
|
+ value = m.group('value')
|
|
+
|
|
if attr in self.obj.params and attr not in self.params:
|
|
# The attribute is managed by IPA, but it didn't get cloned
|
|
# to the command. This happens with no_update/no_create attrs.
|
|
raise errors.ValidationError(
|
|
name=attr, error=_('attribute is not configurable'))
|
|
- if len(value) == 0:
|
|
- # None means "delete this attribute"
|
|
- value = None
|
|
- if attr in newdict:
|
|
- if type(value) in (tuple,):
|
|
- newdict[attr] += list(value)
|
|
- else:
|
|
- newdict[attr].append(value)
|
|
- else:
|
|
- if type(value) in (tuple,):
|
|
- newdict[attr] = list(value)
|
|
- else:
|
|
- newdict[attr] = [value]
|
|
+
|
|
+ newdict.setdefault(attr, []).append(value)
|
|
+
|
|
return newdict
|
|
|
|
+ def _convert_entry(self, entry_attrs):
|
|
+ result = {}
|
|
+ for attr, val in entry_attrs.iteritems():
|
|
+ if val is None:
|
|
+ val = []
|
|
+ elif not isinstance(val, (tuple, list)):
|
|
+ val = [val]
|
|
+
|
|
+ result[attr] = []
|
|
+ for v in val:
|
|
+ if isinstance(v, str):
|
|
+ # This is a Binary value, base64 encode it
|
|
+ v = base64.b64encode(v)
|
|
+ result[attr].append(unicode(v))
|
|
+
|
|
+ return result
|
|
+
|
|
def process_attr_options(self, entry_attrs, dn, keys, options):
|
|
"""
|
|
Process all --setattr, --addattr, and --delattr options and add the
|
|
@@ -860,19 +869,20 @@ last, after all sets and adds."""),
|
|
direct_del = setattrs & delattrs
|
|
needldapattrs = list((addattrs | delattrs) - setattrs)
|
|
|
|
+ mod_attrs = self._convert_entry(entry_attrs)
|
|
+
|
|
for attr, val in setdict.iteritems():
|
|
- entry_attrs[attr] = val
|
|
+ mod_attrs[attr] = val
|
|
|
|
for attr in direct_add:
|
|
- entry_attrs.setdefault(attr, []).extend(adddict[attr])
|
|
+ mod_attrs.setdefault(attr, []).extend(adddict[attr])
|
|
|
|
for attr in direct_del:
|
|
for delval in deldict[attr]:
|
|
try:
|
|
- entry_attrs[attr].remove(delval)
|
|
+ mod_attrs[attr].remove(delval)
|
|
except ValueError:
|
|
- raise errors.AttrValueNotFound(attr=attr,
|
|
- value=delval)
|
|
+ raise errors.AttrValueNotFound(attr=attr, value=delval)
|
|
|
|
if needldapattrs:
|
|
try:
|
|
@@ -891,28 +901,27 @@ last, after all sets and adds."""),
|
|
raise errors.ValidationError(name=del_nonexisting.pop(),
|
|
error=_('No such attribute on this entry'))
|
|
|
|
+ old_entry = self._convert_entry(old_entry)
|
|
+
|
|
for attr in needldapattrs:
|
|
- entry_attrs[attr] = old_entry.get(attr, [])
|
|
+ mod_attrs[attr] = old_entry.get(attr, [])
|
|
|
|
if attr in addattrs:
|
|
- entry_attrs[attr].extend(adddict.get(attr, []))
|
|
+ mod_attrs[attr].extend(adddict.get(attr, []))
|
|
|
|
for delval in deldict.get(attr, []):
|
|
try:
|
|
- entry_attrs[attr].remove(delval)
|
|
+ mod_attrs[attr].remove(delval)
|
|
except ValueError:
|
|
- if isinstance(delval, str):
|
|
- # This is a Binary value, base64 encode it
|
|
- delval = unicode(base64.b64encode(delval))
|
|
raise errors.AttrValueNotFound(attr=attr, value=delval)
|
|
|
|
# normalize all values
|
|
changedattrs = setattrs | addattrs | delattrs
|
|
for attr in changedattrs:
|
|
+ value = mod_attrs[attr]
|
|
if attr in self.params and self.params[attr].attribute:
|
|
- # convert single-value params to scalars
|
|
param = self.params[attr]
|
|
- value = entry_attrs[attr]
|
|
+ # convert single-value params to scalars
|
|
if not param.multivalue:
|
|
if len(value) == 1:
|
|
value = value[0]
|
|
@@ -922,19 +931,19 @@ last, after all sets and adds."""),
|
|
raise errors.OnlyOneValueAllowed(attr=attr)
|
|
# validate, convert and encode params
|
|
try:
|
|
- value = param(value)
|
|
+ value = param(value)
|
|
except errors.ValidationError, err:
|
|
raise errors.ValidationError(name=attr, error=err.error)
|
|
except errors.ConversionError, err:
|
|
raise errors.ConversionError(name=attr, error=err.error)
|
|
- entry_attrs[attr] = value
|
|
else:
|
|
# unknown attribute: remove duplicite and invalid values
|
|
- entry_attrs[attr] = list(set([val for val in entry_attrs[attr] if val]))
|
|
- if not entry_attrs[attr]:
|
|
- entry_attrs[attr] = None
|
|
- elif isinstance(entry_attrs[attr], (tuple, list)) and len(entry_attrs[attr]) == 1:
|
|
- entry_attrs[attr] = entry_attrs[attr][0]
|
|
+ value = list(set([val for val in value if val]))
|
|
+ if not value:
|
|
+ value = None
|
|
+ elif isinstance(value, (tuple, list)) and len(value) == 1:
|
|
+ value = value[0]
|
|
+ entry_attrs[attr] = value
|
|
|
|
@classmethod
|
|
def register_pre_callback(cls, callback, first=False):
|
|
diff --git a/tests/test_xmlrpc/test_attr.py b/tests/test_xmlrpc/test_attr.py
|
|
index 8b78c97b460421d36134e4a545cad8738e96f8ce..f5003c403b733abcfbd54f9f13f0b252cbf31959 100644
|
|
--- a/tests/test_xmlrpc/test_attr.py
|
|
+++ b/tests/test_xmlrpc/test_attr.py
|
|
@@ -21,17 +21,29 @@
|
|
Test --setattr and --addattr and other attribute-specific issues
|
|
"""
|
|
|
|
-from ipalib import api, errors
|
|
+from ipalib import api, errors, x509
|
|
from tests.test_xmlrpc import objectclasses
|
|
-from xmlrpc_test import Declarative, fuzzy_digits, fuzzy_uuid
|
|
+from xmlrpc_test import (Declarative, fuzzy_digits, fuzzy_uuid, fuzzy_date,
|
|
+ fuzzy_hex, fuzzy_hash, fuzzy_issuer)
|
|
from ipalib.dn import *
|
|
+import base64
|
|
|
|
-user1=u'tuser1'
|
|
+user1 = u'tuser1'
|
|
+fqdn1 = u'testhost1.%s' % api.env.domain
|
|
+
|
|
+# We can use the same cert we generated for the service tests
|
|
+fd = open('tests/test_xmlrpc/service.crt', 'r')
|
|
+servercert = fd.readlines()
|
|
+servercert = u''.join(servercert)
|
|
+servercert = x509.strip_header(servercert)
|
|
+servercert = servercert.replace('\n', '')
|
|
+fd.close()
|
|
|
|
class test_attr(Declarative):
|
|
|
|
cleanup_commands = [
|
|
('user_del', [user1], {}),
|
|
+ ('host_del', [fqdn1], {}),
|
|
]
|
|
|
|
tests = [
|
|
@@ -551,4 +563,71 @@ class test_attr(Declarative):
|
|
desc='Server is unwilling to perform', info=''),
|
|
),
|
|
|
|
+ dict(
|
|
+ desc='Try to create %r with description and --addattr description' % fqdn1,
|
|
+ command=('host_add', [fqdn1],
|
|
+ dict(
|
|
+ description=u'Test host 1',
|
|
+ addattr=u'description=Test host 2',
|
|
+ force=True,
|
|
+ ),
|
|
+ ),
|
|
+ expected=errors.OnlyOneValueAllowed(attr='description'),
|
|
+ ),
|
|
+
|
|
+ dict(
|
|
+ desc='Create %r with a certificate' % fqdn1,
|
|
+ command=('host_add', [fqdn1],
|
|
+ dict(
|
|
+ usercertificate=servercert,
|
|
+ force=True,
|
|
+ ),
|
|
+ ),
|
|
+ expected=dict(
|
|
+ value=fqdn1,
|
|
+ summary=u'Added host "%s"' % fqdn1,
|
|
+ result=dict(
|
|
+ dn=lambda x: DN(x) == DN(('fqdn',fqdn1),('cn','computers'),
|
|
+ ('cn','accounts'),api.env.basedn),
|
|
+ fqdn=[fqdn1],
|
|
+ krbprincipalname=[u'host/%s@%s' % (fqdn1, api.env.realm)],
|
|
+ objectclass=objectclasses.host,
|
|
+ ipauniqueid=[fuzzy_uuid],
|
|
+ managedby_host=[fqdn1],
|
|
+ usercertificate=[base64.b64decode(servercert)],
|
|
+ valid_not_before=fuzzy_date,
|
|
+ valid_not_after=fuzzy_date,
|
|
+ subject=lambda x: DN(x) == \
|
|
+ DN(('CN',api.env.host),x509.subject_base()),
|
|
+ serial_number=fuzzy_digits,
|
|
+ serial_number_hex=fuzzy_hex,
|
|
+ md5_fingerprint=fuzzy_hash,
|
|
+ sha1_fingerprint=fuzzy_hash,
|
|
+ issuer=fuzzy_issuer,
|
|
+ has_keytab=False,
|
|
+ has_password=False,
|
|
+ ),
|
|
+ ),
|
|
+ ),
|
|
+
|
|
+ dict(
|
|
+ desc='Remove %r certificate using --delattr' % fqdn1,
|
|
+ command=('host_mod', [fqdn1],
|
|
+ dict(
|
|
+ delattr=u'usercertificate=%s' % servercert,
|
|
+ ),
|
|
+ ),
|
|
+ expected=dict(
|
|
+ value=fqdn1,
|
|
+ summary=u'Modified host "%s"' % fqdn1,
|
|
+ result=dict(
|
|
+ fqdn=[fqdn1],
|
|
+ krbprincipalname=[u'host/%s@%s' % (fqdn1, api.env.realm)],
|
|
+ managedby_host=[fqdn1],
|
|
+ has_keytab=False,
|
|
+ has_password=False,
|
|
+ ),
|
|
+ ),
|
|
+ ),
|
|
+
|
|
]
|
|
--
|
|
1.7.11.2
|
|
|