diff --git a/freeipa.spec.in b/freeipa.spec.in index 086e8c7..11a5607 100755 --- a/freeipa.spec.in +++ b/freeipa.spec.in @@ -1395,6 +1395,7 @@ fi %{_sbindir}/ipa-pkinit-manage %{_sbindir}/ipa-crlgen-manage %{_sbindir}/ipa-cert-fix +%{_sbindir}/ipa-idrange-fix %{_sbindir}/ipa-acme-manage %{_libexecdir}/certmonger/dogtag-ipa-ca-renew-agent-submit %{_libexecdir}/certmonger/ipa-server-guard @@ -1470,6 +1471,7 @@ fi %{_mandir}/man1/ipa-pkinit-manage.1* %{_mandir}/man1/ipa-crlgen-manage.1* %{_mandir}/man1/ipa-cert-fix.1* +%{_mandir}/man1/ipa-idrange-fix.1* %{_mandir}/man1/ipa-acme-manage.1* diff --git a/install/tools/Makefile.am b/install/tools/Makefile.am index 5f36742..ad56cfa 100644 --- a/install/tools/Makefile.am +++ b/install/tools/Makefile.am @@ -31,6 +31,7 @@ dist_noinst_DATA = \ ipa-pkinit-manage.in \ ipa-crlgen-manage.in \ ipa-cert-fix.in \ + ipa-idrange-fix.in \ ipa-custodia.in \ ipa-custodia-check.in \ ipa-httpd-kdcproxy.in \ @@ -67,6 +68,7 @@ nodist_sbin_SCRIPTS = \ ipa-pkinit-manage \ ipa-crlgen-manage \ ipa-cert-fix \ + ipa-idrange-fix \ ipa-acme-manage \ $(NULL) diff --git a/install/tools/ipa-idrange-fix.in b/install/tools/ipa-idrange-fix.in new file mode 100644 index 0000000..5994bd2 --- /dev/null +++ b/install/tools/ipa-idrange-fix.in @@ -0,0 +1,8 @@ +#!/usr/bin/python3 +# +# Copyright (C) 2024 FreeIPA Contributors see COPYING for license +# + +from ipaserver.install.ipa_idrange_fix import IPAIDRangeFix + +IPAIDRangeFix.run_cli() diff --git a/install/tools/man/Makefile.am b/install/tools/man/Makefile.am index d1ea046..b9bbd04 100644 --- a/install/tools/man/Makefile.am +++ b/install/tools/man/Makefile.am @@ -29,6 +29,7 @@ dist_man1_MANS = \ ipa-pkinit-manage.1 \ ipa-crlgen-manage.1 \ ipa-cert-fix.1 \ + ipa-idrange-fix.1 \ ipa-acme-manage.1 \ $(NULL) diff --git a/install/tools/man/ipa-idrange-fix.1 b/install/tools/man/ipa-idrange-fix.1 new file mode 100644 index 0000000..178d2e8 --- /dev/null +++ b/install/tools/man/ipa-idrange-fix.1 @@ -0,0 +1,111 @@ +.\" +.\" Copyright (C) 2024 FreeIPA Contributors see COPYING for license +.\" +.TH "ipa-idrange-fix" "1" "May 26 2024" "IPA" "IPA Manual Pages" +.SH "NAME" +ipa\-idrange\-fix \- Analyse and fix IPA ID ranges +.SH "SYNOPSIS" +ipa\-idrange\-fix [options] +.SH "DESCRIPTION" + +\fIipa-idrange-fix\fR is a tool for analysis of existing IPA ranges, users and +groups outside of those ranges, and functionality to propose and apply +remediations to make sure as many users and groups as possible end up in the +IPA-managed ranges. Before any changes are applied, a full backup of the system +is \fBSTRONGLY RECOMMENDED\fR. + +Do not use this program in unattended mode unless you are absolutely sure +you are consenting to the tool's proposals. + +You can apply the proposals manually via \fIipa idrange(1)\fR commands. + +This tool requires it to be run as \fBroot\fR and does not require a kerberos +ticket. The directory server needs to be running. + +\fIipa-idrange-fix\fR will read current ranges from LDAP, then check their +basic constraints, RID bases, etc. If it finds critical issues with ranges, +manual adjustment will be required. + +After analyzing existing ranges, the tool will search for users and groups that +are outside of ipa-local ranges. Then it will attempt to propose new ipa-local +ranges in order to cover users and groups found. + +Finally, the tool will summarize the analysis, and, if there are proposed +changes, will ask if the user wants to apply those. Please read the +proposals carefully before proceeding with changes! + +Important note: By default, \fIipa-idrange-fix\fR will not cover the users and +groups that have IDs under 1000 as these IDs are reserved for system and +service users and groups. We \fBdon't recommend\fR using IDs under 1000 for +IPA users and groups as they can possibly overlap with local ones. Please +consider moving those users out of the range 1..1000, unless they are +absolutely needed. + +.SH "OPTIONS" +.TP +\fB\-\-version\fR +Show the program's version and exit. +.TP +\fB\-h\fR, \fB\-\-help\fR +Show the help for this program. +.TP +\fB\-\-ridoffset \fIINT\fR +An offset for newly proposed base RIDs for ranges. We introduce offset in order +to have an ability to increase ranges in the future, increase to more than +offset will result in RID bases overlapping, and will be denied. If set to 0, +there will be no offset, proposed RID ranges will start directly one after +another. + +Default - \fI100000\fR, allowed values - from \fI0\fR to \fI2^31\fR. +.TP +\fB\-\-rangegap \fIINT\fR +A number of IDs between out of ranges IDs to be considered too big to be inside +a proposed range. If the gap is bigger than this attribute, a new range will be +started. If set to 0, every entity will get its own range, if allowed by +\fI--minrange\fR. + +Default - \fI200000\fR, allowed values - from \fI0\fR to \fI2^31\fR. +.TP +\fB\-\-minrange \fIINT\fR +A minimal amount of IDs the tool considers to be a valid range. All IDs that +would form a range with less than this number will be considered outliers, not +worth creating an IDrange for, and will be listed explicitly to be moved +manually. If set to 1, a range will be proposed for every entity, even if the +entity is single in the middle of an empty space. + +Default - \fI10\fR, allowed values - from \fI1\fR to \fI2^31\fR. +.TP +\fB\-\-allowunder1000\fR +A flag to allow proposing ranges that start with IDs lower than \fI1000\fR. +Remember, this is not recommended - IDs under 1000 are reserved for system and +service users and groups. IDranges with these low IDs may result with +overlapping of IPA and system local users and groups, which can be a serious +security issue and generally produce a lot of issues around these entities' +resolution. +.TP +\fB\-\-norounding\fR +A flag to turn off idrange starting id and size rounding - e.g. if we find +ID 1234, and the size 567, it will stay that way, the proposed range will +start at ID 1234, and have a 567 size. If not specified, basic rounding to +outer margins will be applied. Rounding will be 10^size of the proposed range. +.TP +\fB\-\-unattended\fR +Run the tool in unattended mode, if any changes would be proposed, they will +be applied automatically. +.TP +\fB\-v\fR, \fB\-\-verbose\fR +Print debugging information. +.TP +\fB\-q\fR, \fB\-\-quiet\fR +Output only errors (output from child processes may still be shown). +.TP +\fB\-\-log\-file\fR=\fIFILE\fR +Log to the given file. +.SH "EXIT STATUS" +0 if the command was successful + +1 if an error occurred + +.SH "SEE ALSO" +.BR ipa\ idrange-mod(1) +.BR ipa\ idrange-add(1) diff --git a/ipaserver/install/ipa_idrange_fix.py b/ipaserver/install/ipa_idrange_fix.py new file mode 100644 index 0000000..c6c67ae --- /dev/null +++ b/ipaserver/install/ipa_idrange_fix.py @@ -0,0 +1,1085 @@ +"""Tool to analyze and fix IPA ID ranges""" +# +# Copyright (C) 2024 FreeIPA Contributors see COPYING for license +# + +import logging +import ldap + +from ipalib import api, errors +from ipapython.admintool import AdminTool +from ipapython.dn import DN +from ipapython import ipautil +from typing import List, Tuple + +logger = logging.getLogger(__name__) + + +class IDRange: + """Class for ID range entity""" + + def __init__(self): + self.last_id: int = None + self.last_base_rid: int = None + self.last_secondary_rid: int = None + self.name: str = None + self.size: int = None + self.first_id: int = None + self.base_rid: int = None + self.secondary_base_rid: int = None + self.type: str = None + self.suffix: str = None + self.dn: str = None + + def _count(self) -> None: + """Function to calculate last IDs for the range""" + self.last_id = self.first_id + self.size - 1 + if self.type == "ipa-local": + self.last_base_rid = ( + self.base_rid + self.size + if self.base_rid is not None + else None + ) + self.last_secondary_rid = ( + self.secondary_base_rid + self.size + if self.secondary_base_rid is not None + else None + ) + + def __repr__(self): + return ( + f"IDRange(name='{self.name}', " + f"type={self.type}, " + f"size={self.size}, " + f"first_id={self.first_id}, " + f"base_rid={self.base_rid}, " + f"secondary_base_rid={self.secondary_base_rid})" + ) + + def __eq__(self, other): + return self.first_id == other.first_id + + +class IDentity: + """A generic class for ID entity - users or groups""" + + def __init__(self, **kwargs): + self.dn: str = kwargs.get('dn') + self.name: str = kwargs.get('name') + self.user: str = kwargs.get('user') + self.number: int = kwargs.get('number') + + def __str__(self): + if self.user: + return (f"user '{self.name}', uid={self.number}") + return (f"group '{self.name}', gid={self.number}") + + def debug(self): + if self.user: + return ( + f"user(username='{self.name}', " + f"uid={self.number}, " + f"{self.dn})" + ) + return ( + f"group(groupname='{self.name}', " + f"gid={self.number}, " + f"{self.dn})" + ) + + def __eq__(self, other): + return self.number == other.number and self.user == other.user + + +class IPAIDRangeFix(AdminTool): + """Tool to analyze and fix IPA ID ranges""" + + command_name = "ipa-idrange-fix" + log_file_name = "/var/log/ipa-idrange-fix.log" + usage = "%prog" + description = "Analyze and fix IPA ID ranges" + + @classmethod + def add_options(cls, parser, debug_option=False): + super(IPAIDRangeFix, cls).add_options(parser) + parser.add_option( + "--ridoffset", + dest="ridoffset", + type=int, + default=100000, + metavar=100000, + help="Offset for a next base RID from previous RID range. \ +Needed for future range size expansions. Has to be > 0", + ) + parser.add_option( + "--rangegap", + dest="rangegap", + type=int, + default=200000, + metavar=200000, + help="Threshold for a gap between out-of-range IDs to be \ +considered a different range. Has to be > 0", + ) + parser.add_option( + "--minrange", + dest="minrange", + type=int, + default=10, + metavar=10, + help="Minimal considered range size for out-of-range IDs.\ +All ranges with amount of IDs lower than this number will be discarded and \ +IDs will be listed to be moved manually. Has to be > 1", + ) + parser.add_option( + "--allowunder1000", + dest="allowunder1000", + action="store_true", + default=False, + help="Allow idranges to start below 1000. Be careful to not \ +overlap IPA users/groups with existing system-local ones!", + ) + parser.add_option( + "--norounding", + dest="norounding", + action="store_true", + default=False, + help="Disable IDrange rounding attempt in order to get ranges \ +exactly covering just IDs provided", + ) + parser.add_option( + "--unattended", + dest="unattended", + action="store_true", + default=False, + help="Automatically fix all range issues found without asking \ +for confirmation", + ) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.realm: str = None + self.suffix: DN = None + self.proposals_rid: List[IDRange] = [] + self.proposals_new: List[IDRange] = [] + self.outliers: List[IDentity] = [] + self.under1000: List[IDentity] = [] + self.id_ranges: List[IDRange] = [] + + def validate_options(self, needs_root=True): + super().validate_options(needs_root) + + def run(self): + api.bootstrap(in_server=True) + api.finalize() + + self.realm = api.env.realm + self.suffix = ipautil.realm_to_suffix(self.realm) + try: + api.Backend.ldap2.connect() + + # Reading range data + self.id_ranges = read_ranges(self.suffix) + + # Evaluating existing ranges, if something is off, exit + if self.evaluate_ranges() != 0: + return 1 + + # reading out of range IDs + ids_out_of_range = read_outofrange_identities( + self.suffix, self.id_ranges + ) + + # Evaluating out of range IDs + self.evaluate_identities(ids_out_of_range) + + # Print the proposals + self.print_intentions() + + # If there are no proposals, we have nothing to do, exiting + if (len(self.proposals_rid) == 0 + and len(self.proposals_new) == 0): + logger.info("\nNo changes proposed, nothing to do.") + return 0 + + logger.info("\nID ranges table after proposed changes:") + draw_ascii_table(self.id_ranges) + + if self.options.unattended: + logger.info( + "Unattended mode, proceeding with applying changes!" + ) + else: + response = ipautil.user_input('Enter "yes" to proceed') + if response.lower() != "yes": + logger.info("Not proceeding.") + return 0 + logger.info("Proceeding.") + + # Applying changes + for id_range in self.proposals_rid: + apply_ridbases(id_range) + + for id_range in self.proposals_new: + create_range(id_range) + + logger.info("All changes applied successfully!") + + finally: + if api.Backend.ldap2.isconnected(): + api.Backend.ldap2.disconnect() + + return 0 + + def evaluate_ranges(self) -> int: + """Function to evaluate existing ID ranges""" + if len(self.id_ranges) == 0: + logger.error("No ID ranges found!") + return 1 + + draw_ascii_table(self.id_ranges) + + if not ranges_overlap_check(self.id_ranges): + logger.error( + "Ranges overlap detected, cannot proceed! Please adjust \ +existing ranges manually." + ) + return 1 + + # Checking RID bases for existing ranges + id_ranges_nobase = get_ranges_no_base(self.id_ranges) + + if len(id_ranges_nobase) > 0: + logger.info( + "Found %s ranges without base RIDs", len(id_ranges_nobase) + ) + for id_range in id_ranges_nobase: + logger.debug( + "Range '%s' has RID base %s and secondary RID base %s", + id_range.name, + id_range.base_rid, + id_range.secondary_base_rid, + ) + propose_rid_ranges( + self.id_ranges, + self.options.ridoffset, + self.proposals_rid + ) + else: + logger.info( + "All ID ranges have base RIDs set, RID adjustments are \ +not needed." + ) + return 0 + + def evaluate_identities(self, ids_out_of_range: List[IDentity]) -> None: + """Function to evaluate out of range IDs""" + if len(ids_out_of_range) == 0: + logger.info("No out of range IDs found!") + else: + logger.info( + "Found overall %s IDs out of existing ID ranges.\n", + len(ids_out_of_range), + ) + # ruling out IDs under 1000 if flag is not set + if not self.options.allowunder1000: + self.under1000, ids_out_of_range = separate_under1000( + ids_out_of_range + ) + if len(self.under1000) > 0: + logger.info( + "Found IDs under 1000, which is not recommeneded \ +(if you definitely need ranges proposed for those, use --allowunder1000):" + ) + for identity in self.under1000: + logger.info("%s", identity) + + # Get initial divide of IDs into groups + groups = group_identities_by_threshold( + ids_out_of_range, self.options.rangegap + ) + + # Get outliers from too small groups and clean groups for + # further processing + self.outliers, cleangroups = separate_ranges_and_outliers( + groups, self.options.minrange + ) + + # Print the outliers, they have to be moved manually + if len(self.outliers) > 0: + logger.info( + "\nIdentities that don't fit the criteria to get a new " + "range found! Current attributes:\n" + "Minimal range size: %s\n" + "Maximum gap between IDs: %s\n" + "Try adjusting --minrange, --rangegap or move the " + "following identities into already existing ranges:", + self.options.minrange, + self.options.rangegap + ) + for identity in self.outliers: + logger.info("%s", identity) + + if len(cleangroups) > 0: + # Get IDrange name base + basename = get_rangename_base(self.id_ranges) + + # Create proposals for new ranges from groups + for group in cleangroups: + newrange = propose_range( + group, + self.id_ranges, + self.options.ridoffset, + basename, + self.options.norounding, + self.options.allowunder1000 + ) + if newrange is not None: + self.proposals_new.append(newrange) + self.id_ranges.append(newrange) + self.id_ranges.sort(key=lambda x: x.first_id) + else: + logger.info( + "\nNo IDs fit the criteria for a new ID range to propose!" + ) + + def print_intentions(self) -> None: + """Function to print out the summary of the proposed changes""" + logger.info("\nSummary:") + + if len(self.outliers) > 0: + logger.info("Outlier IDs that are too far away to get a range:") + for identity in self.outliers: + logger.info("%s", identity) + + if len(self.under1000) > 0: + if self.options.allowunder1000: + logger.info("IDs under 1000 were treated like normal IDs.") + else: + logger.info("IDs under 1000:") + for identity in self.under1000: + logger.info("%s", identity) + else: + logger.info("No IDs under 1000 found.") + + if len(self.proposals_rid) > 0: + logger.info("Proposed changes to existing ranges:") + for id_range in self.proposals_rid: + logger.info( + "Range '%s' - base RID: %s, secondary base RID: %s", + id_range.name, + id_range.base_rid, + id_range.secondary_base_rid, + ) + else: + logger.info("No changes proposed for existing ranges.") + + if len(self.proposals_new) > 0: + logger.info("Proposed new ranges:") + for id_range in self.proposals_new: + logger.info("%s", id_range) + else: + logger.info("No new ranges proposed.") + +# Working with output +# region + + +def draw_ascii_table(id_ranges: List[IDRange], stdout: bool = False) -> None: + """Function to draw a table with ID ranges in ASCII""" + table: str = "\n" + # Calculate the maximum width required for each column using column names + max_widths = { + column: max( + len(str(column)), + max( + ( + len(str(getattr(id_range, column))) + if getattr(id_range, column) is not None + else 0 + ) + for id_range in id_ranges + ), + ) + for column in [ + "name", + "type", + "size", + "first_id", + "last_id", + "base_rid", + "last_base_rid", + "secondary_base_rid", + "last_secondary_rid", + ] + } + + # Draw the table header + header = "| " + for column, width in max_widths.items(): + header += f"{column.ljust(width)} | " + horizontal_line = "-" * (len(header) - 1) + table += horizontal_line + "\n" + table += header + "\n" + table += horizontal_line + "\n" + + # Draw the table rows + for id_range in id_ranges: + row = "| " + for column, width in max_widths.items(): + value = getattr(id_range, column) + if value is not None: + row += f"{str(value).rjust(width)} | " + else: + # Adding the separator + row += " " * (width + 1) + "| " + table += row + "\n" + table += horizontal_line + "\n" + if stdout: + print(table) + else: + logger.info(table) +# endregion +# Reading from LDAP +# region + + +def read_ranges(suffix) -> List[IDRange]: + """Function to read ID ranges from LDAP""" + id_ranges: IDRange = [] + try: + ranges = api.Backend.ldap2.get_entries( + DN(api.env.container_ranges, suffix), + ldap.SCOPE_ONELEVEL, + "(objectclass=ipaIDRange)", + ) + except errors.NotFound: + logger.error("LDAPError: No ranges found!") + except errors.ExecutionError as e: + logger.error("Exception while reading users: %s", e) + else: + for entry in ranges: + sv = entry.single_value + id_range = IDRange() + id_range.name = sv.get("cn") + id_range.size = int(sv.get("ipaidrangesize")) + id_range.first_id = int(sv.get("ipabaseid")) + id_range.base_rid = ( + int(sv.get("ipabaserid")) if sv.get("ipabaserid") else None + ) + id_range.secondary_base_rid = ( + int(sv.get("ipasecondarybaserid")) + if sv.get("ipasecondarybaserid") + else None + ) + id_range.suffix = suffix + id_range.type = sv.get("iparangetype") + id_range.dn = entry.dn + + id_range._count() + logger.debug("ID range found: %s", id_range) + + id_ranges.append(id_range) + + id_ranges.sort(key=lambda x: x.first_id) + return id_ranges + + +def read_outofrange_identities(suffix, id_ranges) -> List[IDentity]: + """Function to read out of range users and groups from LDAP""" + users_outofrange = read_ldap_ids( + DN(api.env.container_user, suffix), + True, + id_ranges + ) + logger.info("Users out of range found: %s", len(users_outofrange)) + del_outofrange = read_ldap_ids( + DN(api.env.container_deleteuser, suffix), + True, + id_ranges + ) + logger.info("Preserved users out of range found: %s", len(del_outofrange)) + groups_outofrange = read_ldap_ids( + DN(api.env.container_group, suffix), + False, + id_ranges + ) + logger.info("Groups out of range found: %s", len(groups_outofrange)) + outofrange = users_outofrange + del_outofrange + groups_outofrange + outofrange.sort(key=lambda x: x.number) + return outofrange + + +def read_ldap_ids(container_dn, user: bool, id_ranges) -> List[IDentity]: + """Function to read IDs from containter in LDAP""" + id_entities = [] + if user: + id_name = "user" + ldap_filter = get_outofrange_filter( + id_ranges, + "posixaccount", + "uidNumber" + ) + else: + id_name = "group" + ldap_filter = get_outofrange_filter( + id_ranges, + "posixgroup", + "gidNumber" + ) + + logger.debug("Searching %ss in %s with filter: %s", id_name, container_dn, + ldap_filter) + try: + identities = api.Backend.ldap2.get_entries( + container_dn, + ldap.SCOPE_ONELEVEL, + ldap_filter, + ) + for entry in identities: + id_entities.append(read_identity(entry, user)) + except errors.NotFound: + logger.debug("No out of range %ss found in %s!", id_name, container_dn) + except errors.ExecutionError as e: + logger.error("Exception while reading %s: %s", container_dn, e) + return id_entities + + +def read_identity(ldapentry, user: bool = True) -> IDentity: + """Function to convert LDAP entry to IDentity object""" + sv = ldapentry.single_value + id_entity = IDentity() + id_entity.dn = ldapentry.dn + id_entity.name = sv.get("cn") + id_entity.number = ( + int(sv.get("uidNumber")) if user else int(sv.get("gidNumber")) + ) + id_entity.user = user + logger.debug("Out of range found: %s", id_entity.debug()) + return id_entity + + +def get_outofrange_filter( + id_ranges_all: List[IDRange], object_class: str, posix_id: str +) -> str: + """Function to create LDAP filter for out of range users and groups""" + # we need to look only for ipa-local ranges + id_ranges = get_ipa_local_ranges(id_ranges_all) + + ldap_filter = f"(&(objectClass={object_class})(|" + + # adding gaps in ranges to the filter + for i in range(len(id_ranges) + 1): + if i == 0: + start_condition = f"({posix_id}>=1)" + else: + start_condition = f"({posix_id}>={id_ranges[i - 1].last_id + 1})" + + if i < len(id_ranges): + end_condition = f"({posix_id}<={id_ranges[i].first_id - 1})" + else: + end_condition = f"({posix_id}<=2147483647)" + + ldap_filter += f"(&{start_condition}{end_condition})" + + ldap_filter += "))" + + return ldap_filter +# endregion +# Writing to LDAP +# region + + +def apply_ridbases(id_range: IDRange) -> None: + """Funtion to apply RID bases to the range in LDAP""" + try: + api.Backend.ldap2.modify_s( + id_range.dn, + [ + (ldap.MOD_ADD, "ipaBaseRID", str(id_range.base_rid)), + ( + ldap.MOD_ADD, + "ipaSecondaryBaseRID", + str(id_range.secondary_base_rid), + ), + ], + ) + logger.info("RID bases updated for range '%s'", id_range.name) + + except ldap.CONSTRAINT_VIOLATION as e: + logger.error( + "Failed to add RID bases to the range '%s': %s", + id_range.name, + e + ) + raise RuntimeError("Constraint violation.\n") from e + + except Exception as e: + logger.error( + "Exception while updating RID bases for range '%s': %s", + id_range.name, + e, + ) + raise RuntimeError("Failed to update RID bases.\n") from e + + +def create_range(id_range: IDRange) -> None: + """Function to create a new range in LDAP""" + try: + logger.info("Creating range '%s'...", id_range.name) + + entry = api.Backend.ldap2.make_entry( + DN(id_range.dn), + objectclass=["ipaIDRange", "ipaDomainIDRange"], + ipaidrangesize=[str(id_range.size)], + ipabaseid=[str(id_range.first_id)], + ipabaserid=[str(id_range.base_rid)], + ipasecondarybaserid=[str(id_range.secondary_base_rid)], + iparangetype=[id_range.type], + ) + + api.Backend.ldap2.add_entry(entry) + logger.info("Range '%s' created successfully", id_range.name) + except Exception as e: + logger.error( + "Exception while creating range '%s': %s", + id_range.name, + e + ) + raise RuntimeError("Failed to create range.\n") from e +# endregion +# Working with ranges +# region + + +def get_ipa_local_ranges(id_ranges: List[IDRange]) -> List[IDRange]: + """Function to get only ipa-local ranges from the list of ranges""" + ipa_local_ranges = [] + + for id_range in id_ranges: + if id_range.type == "ipa-local": + ipa_local_ranges.append(id_range) + + return ipa_local_ranges + + +def range_overlap_check( + range1_start: int, range1_end: int, range2_start: int, range2_end: int +) -> bool: + """Function to check if two ranges overlap""" + # False when overlapping + return not (range1_start <= range2_end and range2_start <= range1_end) + + +def range_overlap_check_idrange(range1: IDRange, range2: IDRange) -> bool: + """Function to check if two ranges overlap""" + # False when overlapping + return range_overlap_check( + range1.first_id, range1.last_id, range2.first_id, range2.last_id) + + +def newrange_overlap_check( + id_ranges: List[IDRange], newrange: IDRange +) -> bool: + """Function to check if proposed range overlaps with existing ones""" + for id_range in id_ranges: + if not range_overlap_check_idrange(id_range, newrange): + return False + return True + + +def ranges_overlap_check(id_ranges: List[IDRange]) -> bool: + """Function to check if any of the existing ranges overlap""" + if len(id_ranges) < 2: + return True + for i in range(len(id_ranges) - 1): + for j in range(i + 1, len(id_ranges)): + if not range_overlap_check_idrange(id_ranges[i], id_ranges[j]): + logger.error( + "Ranges '%s' and '%s' overlap!", + id_ranges[i].name, + id_ranges[j].name, + ) + return False + return True +# endregion +# Working with RID bases +# region + + +def propose_rid_ranges( + id_ranges: List[IDRange], delta: int, proposals: List[IDRange] +) -> None: + """ + Function to propose RID bases for ranges that don't have them set. + + - delta represents how far we start new base off existing range, + used in order to allow for future expansion of existing ranges up + to [delta] IDs. + """ + ipa_local_ranges = get_ipa_local_ranges(id_ranges) + + for id_range in ipa_local_ranges: + proposed_base_rid = 0 + proposed_secondary_base_rid = 0 + + # Calculate proposed base RID and secondary base RID + if id_range.base_rid is None: + result, proposed_base_rid = propose_rid_base( + id_range, ipa_local_ranges, delta, True + ) + if result: + id_range.base_rid = proposed_base_rid + id_range.last_base_rid = proposed_base_rid + id_range.size + else: + # if this fails too, we print the warning and abandon the idea + logger.warning( + "Warning: Proposed base RIDs %s for '%s' both failed, \ +please adjust manually", + proposed_base_rid, + id_range.name, + ) + continue + + if id_range.secondary_base_rid is None: + result, proposed_secondary_base_rid = propose_rid_base( + id_range, ipa_local_ranges, delta, False, proposed_base_rid + ) + if result: + id_range.secondary_base_rid = proposed_secondary_base_rid + id_range.last_secondary_rid = ( + proposed_secondary_base_rid + id_range.size + ) + else: + # if this fails too, we print the warning and abandon the idea + logger.warning( + "Warning: Proposed secondary base RIDs %s for '%s' \ +both failed, please adjust manually", + proposed_secondary_base_rid, + id_range.name, + ) + continue + + # Add range to the proposals if we changed something successfully + if proposed_base_rid > 0 or proposed_secondary_base_rid > 0: + logger.debug( + "Proposed RIDs for range '%s': pri %s, sec %s", + id_range.name, + proposed_base_rid, + proposed_secondary_base_rid, + ) + proposals.append(id_range) + + +def propose_rid_base( + idrange: IDRange, + ipa_local_ranges: List[IDRange], + delta: int, + primary: bool = True, + previous_base_rid: int = -1 +) -> Tuple[bool, str]: + """ + Function to propose a base RID for a range, primary or secondary. + We are getting the biggest base RID + size + delta and try + if it's a viable option, check same kind first, then the other. + """ + proposed_base_rid = max_rid(ipa_local_ranges, primary) + delta + if proposed_base_rid == previous_base_rid: + proposed_base_rid += idrange.size + delta + if check_rid_base(ipa_local_ranges, proposed_base_rid, idrange.size): + return True, proposed_base_rid + + # if we fail, we try the same with biggest of a different kind + proposed_base_rid_orig = proposed_base_rid + proposed_base_rid = max_rid(ipa_local_ranges, not primary) + delta + if proposed_base_rid == previous_base_rid: + proposed_base_rid += idrange.size + delta + if check_rid_base(ipa_local_ranges, proposed_base_rid, idrange.size): + return True, proposed_base_rid + + # if it fails, we return both RID proposals for the range + return False, f"{proposed_base_rid_orig} and {proposed_base_rid}" + + +def max_rid(id_ranges: List[IDRange], primary: bool = True) -> int: + """Function to get maximum RID of primary or secondary RIDs""" + maximum_rid = 0 + for id_range in id_ranges: + + # looking only for primary RIDs + if primary: + if id_range.last_base_rid is not None: + maximum_rid = max(maximum_rid, id_range.last_base_rid) + # looking only for secondary RIDs + else: + if id_range.last_secondary_rid is not None: + maximum_rid = max(maximum_rid, id_range.last_secondary_rid) + + return maximum_rid + + +def check_rid_base(id_ranges: List[IDRange], base: int, size: int) -> bool: + """Function to check if proposed RID base is viable""" + end = base + size + 1 + + # Checking sanity of RID range + if base + size > 2147483647: + return False + if base < 1000: + return False + + # Checking RID range overlaps + for id_range in id_ranges: + # we are interested only in ipa-local ranges + if id_range.type != "ipa-local": + continue + + # if there is no base rid set, there is no secondary base rid set, + # so nothing to overlap with + if id_range.base_rid is None: + continue + + # checking for an overlap + if not range_overlap_check( + base, end, id_range.base_rid, id_range.last_base_rid + ): + logger.debug( + "RID check failure: proposed Primary %s + %s, \ +intersects with %s-%s from range '%s'", + base, + size, + id_range.base_rid, + id_range.last_base_rid, + id_range.name, + ) + return False + + # if there is no secondary base rid set, nothing to overlap with + if id_range.secondary_base_rid is None: + continue + + # if either start of end of the range fails inside existing range, + # or existing range is inside proposed one, we have an overlap + if not range_overlap_check( + base, end, id_range.secondary_base_rid, id_range.last_secondary_rid + ): + logger.debug( + "RID check failure: proposed Secondary %s + %s, \ +intersects with %s-%s from range '%s'", + base, + size, + id_range.secondary_base_rid, + id_range.last_secondary_rid, + id_range.name, + ) + return False + + return True + + +def get_ranges_no_base(id_ranges: List[IDRange]) -> List[IDRange]: + """Function to get ranges without either of base RIDs set""" + ipa_local_ranges = get_ipa_local_ranges(id_ranges) + ranges_no_base = [] + for id_range in ipa_local_ranges: + if id_range.base_rid is None or id_range.secondary_base_rid is None: + ranges_no_base.append(id_range) + + return ranges_no_base +# endregion +# Working with IDentities out of range +# region + + +def group_identities_by_threshold( + identities: List[IDentity], threshold: int +) -> List[List[IDentity]]: + """Function to group out of range IDs by threshold""" + groups: List[List[IDentity]] = [] + currentgroup: List[IDentity] = [] + if len(identities) == 0: + return groups + + for i in range(len(identities) - 1): + # add id to current group + currentgroup.append(identities[i]) + + # If the difference with the next one is greater than the threshold, + # start a new group + if identities[i + 1].number - identities[i].number > threshold: + groups.append(currentgroup) + currentgroup = [] + + # Add the last ID number to the last group + currentgroup.append(identities[-1]) + groups.append(currentgroup) + + return groups + + +def separate_under1000( + identities: List[IDentity], +) -> Tuple[List[IDentity], List[IDentity]]: + """Function to separate IDs under 1000, expects sorted list""" + for i, identity in enumerate(identities): + if identity.number >= 1000: + return identities[:i], identities[i:] + return identities, [] + + +def separate_ranges_and_outliers( + groups: List[List[IDentity]], minrangesize=int +) -> Tuple[List[List[IDentity]], List[List[IDentity]]]: + """Function to separate IDs into outliers and IDs that can get ranges""" + outliers = [] + cleangroups = [] + for group in groups: + # if group is smaller than minrangesize, add it's memebers to ourliers + if group[-1].number - group[0].number + 1 < minrangesize: + for identity in group: + outliers.append(identity) + # if the group is OK, add it to cleaned groups + else: + cleangroups.append(group) + + return outliers, cleangroups + + +def round_idrange(start: int, end: int, under1000: bool) -> Tuple[int, int]: + """Function to round up range margins to look pretty""" + # calculating power of the size + sizepower = len(str(end - start + 1)) + # multiplier for the nearest rounded number + multiplier = 10 ** (sizepower - 1) + # getting rounded range margins + rounded_start = (start // multiplier) * multiplier + if not under1000: + rounded_start = max(rounded_start, 1000) + else: + rounded_start = max(rounded_start, 1) + rounded_end = ((end + multiplier) // multiplier) * multiplier - 1 + + return rounded_start, rounded_end + + +def get_rangename_base(id_ranges: List[IDRange]) -> str: + """Function to get a base name for new range proposals""" + base_name = "" + # we want to use default range name as a base for new ranges + for id_range in id_ranges: + if id_range.base_rid == 1000: + base_name = id_range.name + + # if we didn't find it, propose generic name + if base_name == "": + base_name = "Auto_added_range" + + return base_name + + +def get_rangename(id_ranges: List[IDRange], basename: str) -> str: + """ + Function to get a new range name, we add the counter as 3-digit number + extension and make sure it's unique + """ + counter = 1 + full_name = f"{basename}_{counter:03}" + while any(id_range.name == full_name for id_range in id_ranges): + counter += 1 + full_name = f"{basename}_{counter:03}" + return full_name + + +def propose_range( + group: List[IDentity], + id_ranges: List[IDRange], + delta: int, + basename: str, + norounding: bool, + allowunder1000: bool +) -> IDRange: + """Function to propose a new range for group of IDs out of ranges""" + startid = group[0].number + endid = group[-1].number + + logger.debug( + "Proposing a range for existing IDs out of ranges with start id %s \ +and end id %s...", + startid, + endid, + ) + + # creating new range + newrange = IDRange() + newrange.type = "ipa-local" + newrange.name = get_rangename(id_ranges, basename) + newrange.suffix = id_ranges[0].suffix + newrange.dn = f"cn={newrange.name},cn=ranges,cn=etc,{newrange.suffix}" + + if norounding: + newrange.first_id = startid + newrange.last_id = endid + newrange.size = newrange.last_id - newrange.first_id + 1 + else: + # first trying to round up ranges to look pretty + newrange.first_id, newrange.last_id = round_idrange( + startid, + endid, + allowunder1000 + ) + newrange.size = newrange.last_id - newrange.first_id + 1 + + # if this creates an overlap, try without rounding + if not newrange_overlap_check(id_ranges, newrange): + newrange.first_id = startid + newrange.last_id = endid + newrange.size = newrange.last_id - newrange.first_id + 1 + # if we still failed, abandon idea + if not newrange_overlap_check(id_ranges, newrange): + logger.error( + "ERROR! Failed to create idrange for existing IDs out of \ +ranges with start id %s and end id %s, it overlaps with existing range!", + startid, + endid, + ) + return None + + # creating RID bases + ipa_local_ranges = get_ipa_local_ranges(id_ranges) + + result, proposed_base_rid = propose_rid_base( + newrange, ipa_local_ranges, delta, True + ) + if result: + newrange.base_rid = proposed_base_rid + newrange.last_base_rid = proposed_base_rid + newrange.size + else: + # if this fails we print the warning + logger.warning( + "Warning! Proposed base RIDs %s for new range start id %s and \ +end id %s both failed, please adjust manually", + proposed_base_rid, + newrange.first_id, + newrange.last_id, + ) + + result, proposed_secondary_base_rid = propose_rid_base( + newrange, ipa_local_ranges, delta, False, proposed_base_rid + ) + if result: + newrange.secondary_base_rid = proposed_secondary_base_rid + newrange.last_secondary_rid = ( + proposed_secondary_base_rid + newrange.size + ) + else: + # if this fails we print the warning + logger.warning( + "Warning! Proposed secondary base RIDs %s for new range start id \ +%s and end id %s both failed, please adjust manually", + proposed_secondary_base_rid, + newrange.first_id, + newrange.last_id, + ) + + logger.debug("Proposed range: %s", newrange) + return newrange +# endregion diff --git a/ipatests/test_integration/test_ipa_idrange_fix.py b/ipatests/test_integration/test_ipa_idrange_fix.py new file mode 100644 index 0000000..ff8fbda --- /dev/null +++ b/ipatests/test_integration/test_ipa_idrange_fix.py @@ -0,0 +1,189 @@ +# +# Copyright (C) 2024 FreeIPA Contributors see COPYING for license +# + +""" +Module provides tests for ipa-idrange-fix CLI. +""" + +import logging +import re + +from ipatests.pytest_ipa.integration import tasks +from ipatests.test_integration.base import IntegrationTest + + +logger = logging.getLogger(__name__) + + +class TestIpaIdrangeFix(IntegrationTest): + @classmethod + def install(cls, mh): + super(TestIpaIdrangeFix, cls).install(mh) + tasks.kinit_admin(cls.master) + + def test_no_issues(self): + """Test ipa-idrange-fix command with no issues.""" + result = self.master.run_command(["ipa-idrange-fix", "--unattended"]) + + expected_under1000 = "No IDs under 1000 found" + expected_nochanges = "No changes proposed for existing ranges" + expected_newrange = "No new ranges proposed" + expected_noissues = "No changes proposed, nothing to do." + assert expected_under1000 in result.stderr_text + assert expected_nochanges in result.stderr_text + assert expected_newrange in result.stderr_text + assert expected_noissues in result.stderr_text + + def test_idrange_no_rid_bases(self): + """Test ipa-idrange-fix command with IDrange with no RID bases.""" + self.master.run_command([ + "ipa", + "idrange-add", + "idrange_no_rid_bases", + "--base-id", '10000', + "--range-size", '20000', + ]) + + result = self.master.run_command(["ipa-idrange-fix", "--unattended"]) + expected_text = "RID bases updated for range 'idrange_no_rid_bases'" + + # Remove IDrange with no rid bases + self.master.run_command(["ipa", "idrange-del", "idrange_no_rid_bases"]) + + assert expected_text in result.stderr_text + + def test_idrange_no_rid_bases_reversed(self): + """ + Test ipa-idrange-fix command with IDrange with no RID bases, but we + previously had a range with RID bases reversed - secondary lower than + primary. It is a valid configuration, so we should fix no-RID range. + """ + self.master.run_command([ + "ipa", + "idrange-add", + "idrange_no_rid_bases", + "--base-id", '10000', + "--range-size", '20000', + ]) + self.master.run_command([ + "ipa", + "idrange-add", + "idrange_reversed", + "--base-id", '50000', + "--range-size", '20000', + "--rid-base", '100300000', + "--secondary-rid-base", '301000' + ]) + + result = self.master.run_command(["ipa-idrange-fix", "--unattended"]) + expected_text = "RID bases updated for range 'idrange_no_rid_bases'" + + # Remove test IDranges + self.master.run_command(["ipa", "idrange-del", "idrange_no_rid_bases"]) + self.master.run_command(["ipa", "idrange-del", "idrange_reversed"]) + + assert expected_text in result.stderr_text + + def test_users_outofrange(self): + """Test ipa-idrange-fix command with users out of range.""" + for i in range(1, 20): + self.master.run_command([ + "ipa", + "user-add", + "testuser{}".format(i), + "--first", "Test", + "--last", "User {}".format(i), + "--uid", str(100000 + i * 10), + ]) + + result = self.master.run_command(["ipa-idrange-fix", "--unattended"]) + expected_text = r"Range '[\w\.]+_id_range_\d{3}' created successfully" + match = re.search(expected_text, result.stderr_text) + + # Remove users out of range and created IDrange + for i in range(1, 20): + self.master.run_command([ + "ipa", + "user-del", + "testuser{}".format(i) + ]) + if match is not None: + self.master.run_command([ + "ipa", + "idrange-del", + match.group(0).split(" ")[1].replace("'", "") + ]) + + assert match is not None + + def test_user_outlier(self): + """Test ipa-idrange-fix command with outlier user.""" + self.master.run_command([ + "ipa", + "user-add", + "testuser_outlier", + "--first", "Outlier", + "--last", "User", + "--uid", '500000', + ]) + + result = self.master.run_command(["ipa-idrange-fix", "--unattended"]) + expected_text = "Identities that don't fit the criteria to get a new \ +range found!" + expected_user = "user 'Outlier User', uid=500000" + + # Remove outlier user + self.master.run_command(["ipa", "user-del", "testuser_outlier"]) + + assert expected_text in result.stderr_text + assert expected_user in result.stderr_text + + def test_user_under1000(self): + """Test ipa-idrange-fix command with user under 1000.""" + self.master.run_command([ + "ipa", + "user-add", + "testuser_under1000", + "--first", "Under", + "--last", "1000", + "--uid", '999', + ]) + + result = self.master.run_command(["ipa-idrange-fix", "--unattended"]) + expected_text = "IDs under 1000:" + expected_user = "user 'Under 1000', uid=999" + + # Remove user under 1000 + self.master.run_command(["ipa", "user-del", "testuser_under1000"]) + + assert expected_text in result.stderr_text + assert expected_user in result.stderr_text + + def test_user_preserved(self): + """Test ipa-idrange-fix command with preserved user.""" + self.master.run_command([ + "ipa", + "user-add", + "testuser_preserved", + "--first", "Preserved", + "--last", "User", + "--uid", '9999', + ]) + self.master.run_command([ + "ipa", + "user-del", + "testuser_preserved", + "--preserve" + ]) + + result = self.master.run_command(["ipa-idrange-fix", "--unattended"]) + expected_text = "Identities that don't fit the criteria to get a new \ +range found!" + expected_user = "user 'Preserved User', uid=9999" + + # Remove preserved user + self.master.run_command(["ipa", "user-del", "testuser_preserved"]) + + assert expected_text in result.stderr_text + assert expected_user in result.stderr_text diff --git a/ipatests/prci_definitions/nightly_ipa-4-9_latest.yaml b/ipatests/prci_definitions/nightly_ipa-4-9_latest.yaml index 7c03a48..4021d3b 100644 --- a/ipatests/prci_definitions/nightly_ipa-4-9_latest.yaml +++ b/ipatests/prci_definitions/nightly_ipa-4-9_latest.yaml @@ -1766,6 +1766,18 @@ jobs: timeout: 10800 topology: *master_1repl + fedora-latest-ipa-4-9/test_idrange_fix: + requires: [fedora-latest-ipa-4-9/build] + priority: 50 + job: + class: RunPytest + args: + build_url: '{fedora-latest-ipa-4-9/build_url}' + test_suite: test_integration/test_ipa_idrange_fix.py + template: *ci-ipa-4-9-latest + timeout: 3600 + topology: *ipaserver + fedora-latest-ipa-4-9/test_subids: requires: [fedora-latest-ipa-4-9/build] priority: 50 diff --git a/ipatests/prci_definitions/nightly_ipa-4-9_latest_selinux.yaml b/ipatests/prci_definitions/nightly_ipa-4-9_latest_selinux.yaml index 802bd2a..d24aaa8 100644 --- a/ipatests/prci_definitions/nightly_ipa-4-9_latest_selinux.yaml +++ b/ipatests/prci_definitions/nightly_ipa-4-9_latest_selinux.yaml @@ -1906,6 +1906,19 @@ jobs: timeout: 10800 topology: *master_1repl + fedora-latest-ipa-4-9/test_idrange_fix: + requires: [fedora-latest-ipa-4-9/build] + priority: 50 + job: + class: RunPytest + args: + build_url: '{fedora-latest-ipa-4-9/build_url}' + selinux_enforcing: True + test_suite: test_integration/test_ipa_idrange_fix.py + template: *ci-ipa-4-9-latest + timeout: 3600 + topology: *ipaserver + fedora-latest-ipa-4-9/test_subids: requires: [fedora-latest-ipa-4-9/build] priority: 50 diff --git a/ipatests/prci_definitions/nightly_ipa-4-9_previous.yaml b/ipatests/prci_definitions/nightly_ipa-4-9_previous.yaml index 1e1adb8..b0efd57 100644 --- a/ipatests/prci_definitions/nightly_ipa-4-9_previous.yaml +++ b/ipatests/prci_definitions/nightly_ipa-4-9_previous.yaml @@ -1766,6 +1766,18 @@ jobs: timeout: 10800 topology: *master_1repl + fedora-previous-ipa-4-9/test_idrange_fix: + requires: [fedora-previous-ipa-4-9/build] + priority: 50 + job: + class: RunPytest + args: + build_url: '{fedora-previous-ipa-4-9/build_url}' + test_suite: test_integration/test_ipa_idrange_fix.py + template: *ci-ipa-4-9-previous + timeout: 3600 + topology: *ipaserver + fedora-previous-ipa-4-9/test_subids: requires: [fedora-previous-ipa-4-9/build] priority: 50