From 72cc54bc2798125a7e0ca3eb08fcc8b973b8844e Mon Sep 17 00:00:00 2001 From: Jan Cholasta Date: Tue, 31 Jul 2012 06:37:14 -0400 Subject: [PATCH 78/79] Make --{set,add,del}attr more robust. This fixes --addattr on single value attributes in add commands and --delattr on non-unicode attributes in mod commands. ticket 2954 --- ipalib/plugins/baseldap.py | 89 +++++++++++++++++++++++------------------- tests/test_xmlrpc/test_attr.py | 85 ++++++++++++++++++++++++++++++++++++++-- 2 files changed, 131 insertions(+), 43 deletions(-) diff --git a/ipalib/plugins/baseldap.py b/ipalib/plugins/baseldap.py index 6a37995c57cd9d57280186d15f209426aa0776f2..32dae5160a00c927855990211305706f332e77db 100644 --- a/ipalib/plugins/baseldap.py +++ b/ipalib/plugins/baseldap.py @@ -784,36 +784,45 @@ last, after all sets and adds."""), :param attrs: A list of name/value pair strings, in the "name=value" format. May also be a single string, or None. """ - - newdict = {} if attrs is None: - attrs = [] - elif not type(attrs) in (list, tuple): + return {} + + if not isinstance(attrs, (tuple, list)): attrs = [attrs] + + newdict = {} for a in attrs: - m = re.match("\s*(.*?)\s*=\s*(.*?)\s*$", a) - attr = str(m.group(1)).lower() - value = m.group(2) + m = re.match("^\s*(?P.*?)\s*=\s*(?P.*?)\s*$", a) + attr = str(m.group('attr').lower()) + value = m.group('value') + if attr in self.obj.params and attr not in self.params: # The attribute is managed by IPA, but it didn't get cloned # to the command. This happens with no_update/no_create attrs. raise errors.ValidationError( name=attr, error=_('attribute is not configurable')) - if len(value) == 0: - # None means "delete this attribute" - value = None - if attr in newdict: - if type(value) in (tuple,): - newdict[attr] += list(value) - else: - newdict[attr].append(value) - else: - if type(value) in (tuple,): - newdict[attr] = list(value) - else: - newdict[attr] = [value] + + newdict.setdefault(attr, []).append(value) + return newdict + def _convert_entry(self, entry_attrs): + result = {} + for attr, val in entry_attrs.iteritems(): + if val is None: + val = [] + elif not isinstance(val, (tuple, list)): + val = [val] + + result[attr] = [] + for v in val: + if isinstance(v, str): + # This is a Binary value, base64 encode it + v = base64.b64encode(v) + result[attr].append(unicode(v)) + + return result + def process_attr_options(self, entry_attrs, dn, keys, options): """ Process all --setattr, --addattr, and --delattr options and add the @@ -860,19 +869,20 @@ last, after all sets and adds."""), direct_del = setattrs & delattrs needldapattrs = list((addattrs | delattrs) - setattrs) + mod_attrs = self._convert_entry(entry_attrs) + for attr, val in setdict.iteritems(): - entry_attrs[attr] = val + mod_attrs[attr] = val for attr in direct_add: - entry_attrs.setdefault(attr, []).extend(adddict[attr]) + mod_attrs.setdefault(attr, []).extend(adddict[attr]) for attr in direct_del: for delval in deldict[attr]: try: - entry_attrs[attr].remove(delval) + mod_attrs[attr].remove(delval) except ValueError: - raise errors.AttrValueNotFound(attr=attr, - value=delval) + raise errors.AttrValueNotFound(attr=attr, value=delval) if needldapattrs: try: @@ -891,28 +901,27 @@ last, after all sets and adds."""), raise errors.ValidationError(name=del_nonexisting.pop(), error=_('No such attribute on this entry')) + old_entry = self._convert_entry(old_entry) + for attr in needldapattrs: - entry_attrs[attr] = old_entry.get(attr, []) + mod_attrs[attr] = old_entry.get(attr, []) if attr in addattrs: - entry_attrs[attr].extend(adddict.get(attr, [])) + mod_attrs[attr].extend(adddict.get(attr, [])) for delval in deldict.get(attr, []): try: - entry_attrs[attr].remove(delval) + mod_attrs[attr].remove(delval) except ValueError: - if isinstance(delval, str): - # This is a Binary value, base64 encode it - delval = unicode(base64.b64encode(delval)) raise errors.AttrValueNotFound(attr=attr, value=delval) # normalize all values changedattrs = setattrs | addattrs | delattrs for attr in changedattrs: + value = mod_attrs[attr] if attr in self.params and self.params[attr].attribute: - # convert single-value params to scalars param = self.params[attr] - value = entry_attrs[attr] + # convert single-value params to scalars if not param.multivalue: if len(value) == 1: value = value[0] @@ -922,19 +931,19 @@ last, after all sets and adds."""), raise errors.OnlyOneValueAllowed(attr=attr) # validate, convert and encode params try: - value = param(value) + value = param(value) except errors.ValidationError, err: raise errors.ValidationError(name=attr, error=err.error) except errors.ConversionError, err: raise errors.ConversionError(name=attr, error=err.error) - entry_attrs[attr] = value else: # unknown attribute: remove duplicite and invalid values - entry_attrs[attr] = list(set([val for val in entry_attrs[attr] if val])) - if not entry_attrs[attr]: - entry_attrs[attr] = None - elif isinstance(entry_attrs[attr], (tuple, list)) and len(entry_attrs[attr]) == 1: - entry_attrs[attr] = entry_attrs[attr][0] + value = list(set([val for val in value if val])) + if not value: + value = None + elif isinstance(value, (tuple, list)) and len(value) == 1: + value = value[0] + entry_attrs[attr] = value @classmethod def register_pre_callback(cls, callback, first=False): diff --git a/tests/test_xmlrpc/test_attr.py b/tests/test_xmlrpc/test_attr.py index 8b78c97b460421d36134e4a545cad8738e96f8ce..f5003c403b733abcfbd54f9f13f0b252cbf31959 100644 --- a/tests/test_xmlrpc/test_attr.py +++ b/tests/test_xmlrpc/test_attr.py @@ -21,17 +21,29 @@ Test --setattr and --addattr and other attribute-specific issues """ -from ipalib import api, errors +from ipalib import api, errors, x509 from tests.test_xmlrpc import objectclasses -from xmlrpc_test import Declarative, fuzzy_digits, fuzzy_uuid +from xmlrpc_test import (Declarative, fuzzy_digits, fuzzy_uuid, fuzzy_date, + fuzzy_hex, fuzzy_hash, fuzzy_issuer) from ipalib.dn import * +import base64 -user1=u'tuser1' +user1 = u'tuser1' +fqdn1 = u'testhost1.%s' % api.env.domain + +# We can use the same cert we generated for the service tests +fd = open('tests/test_xmlrpc/service.crt', 'r') +servercert = fd.readlines() +servercert = u''.join(servercert) +servercert = x509.strip_header(servercert) +servercert = servercert.replace('\n', '') +fd.close() class test_attr(Declarative): cleanup_commands = [ ('user_del', [user1], {}), + ('host_del', [fqdn1], {}), ] tests = [ @@ -551,4 +563,71 @@ class test_attr(Declarative): desc='Server is unwilling to perform', info=''), ), + dict( + desc='Try to create %r with description and --addattr description' % fqdn1, + command=('host_add', [fqdn1], + dict( + description=u'Test host 1', + addattr=u'description=Test host 2', + force=True, + ), + ), + expected=errors.OnlyOneValueAllowed(attr='description'), + ), + + dict( + desc='Create %r with a certificate' % fqdn1, + command=('host_add', [fqdn1], + dict( + usercertificate=servercert, + force=True, + ), + ), + expected=dict( + value=fqdn1, + summary=u'Added host "%s"' % fqdn1, + result=dict( + dn=lambda x: DN(x) == DN(('fqdn',fqdn1),('cn','computers'), + ('cn','accounts'),api.env.basedn), + fqdn=[fqdn1], + krbprincipalname=[u'host/%s@%s' % (fqdn1, api.env.realm)], + objectclass=objectclasses.host, + ipauniqueid=[fuzzy_uuid], + managedby_host=[fqdn1], + usercertificate=[base64.b64decode(servercert)], + valid_not_before=fuzzy_date, + valid_not_after=fuzzy_date, + subject=lambda x: DN(x) == \ + DN(('CN',api.env.host),x509.subject_base()), + serial_number=fuzzy_digits, + serial_number_hex=fuzzy_hex, + md5_fingerprint=fuzzy_hash, + sha1_fingerprint=fuzzy_hash, + issuer=fuzzy_issuer, + has_keytab=False, + has_password=False, + ), + ), + ), + + dict( + desc='Remove %r certificate using --delattr' % fqdn1, + command=('host_mod', [fqdn1], + dict( + delattr=u'usercertificate=%s' % servercert, + ), + ), + expected=dict( + value=fqdn1, + summary=u'Modified host "%s"' % fqdn1, + result=dict( + fqdn=[fqdn1], + krbprincipalname=[u'host/%s@%s' % (fqdn1, api.env.realm)], + managedby_host=[fqdn1], + has_keytab=False, + has_password=False, + ), + ), + ), + ] -- 1.7.11.2