From 7739c2a802c1dddb6757ff75cf7f6582a89bd518 Mon Sep 17 00:00:00 2001 From: id Date: Fri, 31 May 2024 09:00:18 +0200 Subject: [PATCH] azure-events-az: update to API versions, add retry functionality for metadata requests, update tests --- heartbeat/azure-events-az.in | 117 ++++++++++++++++++++++++----------- heartbeat/ocf.py | 50 +++++++++++++-- 2 files changed, 126 insertions(+), 41 deletions(-) diff --git a/heartbeat/azure-events-az.in b/heartbeat/azure-events-az.in index 46d4d1f3d9..6d31e5abae 100644 --- a/heartbeat/azure-events-az.in +++ b/heartbeat/azure-events-az.in @@ -27,7 +27,7 @@ import ocf ############################################################################## -VERSION = "0.10" +VERSION = "0.20" USER_AGENT = "Pacemaker-ResourceAgent/%s %s" % (VERSION, ocf.distro()) attr_globalPullState = "azure-events-az_globalPullState" @@ -39,9 +39,6 @@ attr_healthstate = "#health-azure" default_loglevel = ocf.logging.INFO default_relevantEventTypes = set(["Reboot", "Redeploy"]) -global_pullMaxAttempts = 3 -global_pullDelaySecs = 1 - ############################################################################## class attrDict(defaultdict): @@ -71,16 +68,22 @@ class azHelper: metadata_host = "http://169.254.169.254/metadata" instance_api = "instance" events_api = "scheduledevents" - api_version = "2019-08-01" + events_api_version = "2020-07-01" + instance_api_version = "2021-12-13" @staticmethod - def _sendMetadataRequest(endpoint, postData=None): + def _sendMetadataRequest(endpoint, postData=None, api_version="2019-08-01"): """ Send a request to Azure's Azure Metadata Service API """ - url = "%s/%s?api-version=%s" % (azHelper.metadata_host, endpoint, azHelper.api_version) + + retryCount = int(ocf.get_parameter("retry_count",3)) + retryWaitTime = int(ocf.get_parameter("retry_wait",20)) + requestTimeout = int(ocf.get_parameter("request_timeout",15)) + + url = "%s/%s?api-version=%s" % (azHelper.metadata_host, endpoint, api_version) data = "" - ocf.logger.debug("_sendMetadataRequest: begin; endpoint = %s, postData = %s" % (endpoint, postData)) + ocf.logger.debug("_sendMetadataRequest: begin; endpoint = %s, postData = %s, retry_count = %s, retry_wait time = %s, request_timeout = %s" % (endpoint, postData, retryCount, retryWaitTime, requestTimeout)) ocf.logger.debug("_sendMetadataRequest: url = %s" % url) if postData and type(postData) != bytes: @@ -89,18 +92,37 @@ class azHelper: req = urllib2.Request(url, postData) req.add_header("Metadata", "true") req.add_header("User-Agent", USER_AGENT) - try: - resp = urllib2.urlopen(req) - except URLError as e: - if hasattr(e, 'reason'): - ocf.logger.warning("Failed to reach the server: %s" % e.reason) - clusterHelper.setAttr(attr_globalPullState, "IDLE") - elif hasattr(e, 'code'): - ocf.logger.warning("The server couldn\'t fulfill the request. Error code: %s" % e.code) - clusterHelper.setAttr(attr_globalPullState, "IDLE") - else: - data = resp.read() - ocf.logger.debug("_sendMetadataRequest: response = %s" % data) + + if retryCount > 0: + ocf.logger.debug("_sendMetadataRequest: retry enabled") + + successful = None + for retry in range(retryCount+1): + try: + resp = urllib2.urlopen(req, timeout=requestTimeout) + except Exception as e: + excType = e.__class__.__name__ + if excType == TimeoutError.__name__: + ocf.logger.warning("Request timed out after %s seconds Error: %s" % (requestTimeout, e)) + if excType == URLError.__name__: + if hasattr(e, 'reason'): + ocf.logger.warning("Failed to reach the server: %s" % e.reason) + elif hasattr(e, 'code'): + ocf.logger.warning("The server couldn\'t fulfill the request. Error code: %s" % e.code) + + if retryCount > 1 and retry != retryCount: + ocf.logger.warning("Request failed, retry (%s/%s) wait %s seconds before retry (wait time)" % (retry + 1,retryCount,retryWaitTime)) + time.sleep(retryWaitTime) + + else: + data = resp.read() + ocf.logger.debug("_sendMetadataRequest: response = %s" % data) + successful = 1 + break + + # When no request was successful also with retry enabled, set the cluster to idle + if successful is None: + clusterHelper.setAttr(attr_globalPullState, "IDLE") if data: data = json.loads(data) @@ -115,14 +137,15 @@ class azHelper: """ ocf.logger.debug("getInstanceInfo: begin") - jsondata = azHelper._sendMetadataRequest(azHelper.instance_api) + jsondata = azHelper._sendMetadataRequest(azHelper.instance_api, None, azHelper.instance_api_version) ocf.logger.debug("getInstanceInfo: json = %s" % jsondata) if jsondata: ocf.logger.debug("getInstanceInfo: finished, returning {}".format(jsondata["compute"])) return attrDict(jsondata["compute"]) else: - ocf.ocf_exit_reason("getInstanceInfo: Unable to get instance info") + apiCall = "%s/%s?api-version=%s" % (azHelper.metadata_host, azHelper.instance_api, azHelper.instance_api_version) + ocf.ocf_exit_reason("getInstanceInfo: Unable to get instance info - call: %s" % apiCall) sys.exit(ocf.OCF_ERR_GENERIC) @staticmethod @@ -132,11 +155,17 @@ class azHelper: """ ocf.logger.debug("pullScheduledEvents: begin") - jsondata = azHelper._sendMetadataRequest(azHelper.events_api) + jsondata = azHelper._sendMetadataRequest(azHelper.events_api, None, azHelper.events_api_version) ocf.logger.debug("pullScheduledEvents: json = %s" % jsondata) - ocf.logger.debug("pullScheduledEvents: finished") - return attrDict(jsondata) + if jsondata: + ocf.logger.debug("pullScheduledEvents: finished") + return attrDict(jsondata) + else: + apiCall = "%s/%s?api-version=%s" % (azHelper.metadata_host, azHelper.events_api, azHelper.events_api_version) + ocf.ocf_exit_reason("pullScheduledEvents: Unable to get scheduledevents info - call: %s" % apiCall) + sys.exit(ocf.OCF_ERR_GENERIC) + @staticmethod def forceEvents(eventIDs): @@ -534,7 +563,7 @@ class Node: except ValueError: # Handle the exception ocf.logger.warn("Health attribute %s on node %s cannot be converted to an integer value" % (healthAttributeStr, node)) - + ocf.logger.debug("isNodeInStandby: finished - result %s" % isInStandy) return isInStandy @@ -584,7 +613,7 @@ class raAzEvents: def monitor(self): ocf.logger.debug("monitor: begin") - + events = azHelper.pullScheduledEvents() # get current document version @@ -600,21 +629,21 @@ class raAzEvents: ocf.logger.info("monitor: already handled curDocVersion, skip") return ocf.OCF_SUCCESS - localAzEventIDs = set() + localAzEventIds = dict() for e in localEvents: - localAzEventIDs.add(e.EventId) + localAzEventIds[e.EventId] = json.dumps(e) curState = self.node.getState() clusterEventIDs = self.node.getEventIDs() ocf.logger.debug("monitor: curDocVersion has not been handled yet") - + if clusterEventIDs: # there are pending events set, so our state must be STOPPING or IN_EVENT i = 0; touchedEventIDs = False while i < len(clusterEventIDs): # clean up pending events that are already finished according to AZ - if clusterEventIDs[i] not in localAzEventIDs: + if clusterEventIDs[i] not in localAzEventIds.keys(): ocf.logger.info("monitor: remove finished local clusterEvent %s" % (clusterEventIDs[i])) clusterEventIDs.pop(i) touchedEventIDs = True @@ -644,12 +673,12 @@ class raAzEvents: ocf.logger.info("monitor: all local events finished, but some resources have not completed startup yet -> wait") else: if curState == AVAILABLE: - if len(localAzEventIDs) > 0: + if len(localAzEventIds) > 0: if clusterHelper.otherNodesAvailable(self.node): - ocf.logger.info("monitor: can handle local events %s -> set state STOPPING" % (str(localAzEventIDs))) - curState = self.node.updateNodeStateAndEvents(STOPPING, localAzEventIDs) + ocf.logger.info("monitor: can handle local events %s -> set state STOPPING - %s" % (str(list(localAzEventIds.keys())), str(list(localAzEventIds.values())))) + curState = self.node.updateNodeStateAndEvents(STOPPING, localAzEventIds.keys()) else: - ocf.logger.info("monitor: cannot handle azEvents %s (only node available) -> set state ON_HOLD" % str(localAzEventIDs)) + ocf.logger.info("monitor: cannot handle azEvents %s (only node available) -> set state ON_HOLD - %s" % (str(list(localAzEventIds.keys())), str(list(localAzEventIds.values())))) self.node.setState(ON_HOLD) else: ocf.logger.debug("monitor: no local azEvents to handle") @@ -761,6 +790,24 @@ def main(): longdesc="Set to true to enable verbose logging", content_type="boolean", default="false") + agent.add_parameter( + "retry_count", + shortdesc="Azure IMDS webservice retry count", + longdesc="Set to any number bigger than zero to enable retry count", + content_type="integer", + default="3") + agent.add_parameter( + "retry_wait", + shortdesc="Configure a retry wait time", + longdesc="Set retry wait time in seconds", + content_type="integer", + default="20") + agent.add_parameter( + "request_timeout", + shortdesc="Configure a request timeout", + longdesc="Set request timeout in seconds", + content_type="integer", + default="15") agent.add_action("start", timeout=10, handler=lambda: ocf.OCF_SUCCESS) agent.add_action("stop", timeout=10, handler=lambda: ocf.OCF_SUCCESS) agent.add_action("validate-all", timeout=20, handler=validate_action) diff --git a/heartbeat/ocf.py b/heartbeat/ocf.py index dda2fed4bb..571cd19664 100644 --- a/heartbeat/ocf.py +++ b/heartbeat/ocf.py @@ -16,7 +16,7 @@ # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -# +# import sys, os, logging, syslog @@ -42,19 +42,19 @@ # OCF does not include the concept of master/slave resources so we # need to extend it so we can discover a resource's complete state. # -# OCF_RUNNING_MASTER: +# OCF_RUNNING_MASTER: # The resource is in "master" mode and fully operational # OCF_FAILED_MASTER: # The resource is in "master" mode but in a failed state -# +# # The extra two values should only be used during a probe. # # Probes are used to discover resources that were started outside of # the CRM and/or left behind if the LRM fails. -# +# # They can be identified in RA scripts by checking for: # [ "${__OCF_ACTION}" = "monitor" -a "${OCF_RESKEY_CRM_meta_interval}" = "0" ] -# +# # Failed "slaves" should continue to use: OCF_ERR_GENERIC # Fully operational "slaves" should continue to use: OCF_SUCCESS # @@ -451,15 +451,17 @@ def value_for_parameter(param): sys.exit(OCF_ERR_UNIMPLEMENTED) + if __name__ == "__main__": import unittest + import logging class TestMetadata(unittest.TestCase): def test_noparams_noactions(self): m = Agent("foo", shortdesc="shortdesc", longdesc="longdesc") self.assertEqual(""" - + 1.0 longdesc @@ -483,4 +485,40 @@ def test_params_actions(self): m.add_action("start") self.assertEqual(str(m.actions[0]), '\n') + def test_retry_params_actions(self): + log= logging.getLogger( "test_retry_params_actions" ) + + m = Agent("foo", shortdesc="shortdesc", longdesc="longdesc") + m.add_parameter( + "retry_count", + shortdesc="Azure ims webservice retry count", + longdesc="Set to any number bigger than zero to enable retry count", + content_type="integer", + default="0") + m.add_parameter( + "retry_wait", + shortdesc="Configure a retry wait time", + longdesc="Set retry wait time in seconds", + content_type="integer", + default="20") + m.add_parameter( + "request_timeout", + shortdesc="Configure a request timeout", + longdesc="Set request timeout in seconds", + content_type="integer", + default="15") + + m.add_action("start") + + log.debug( "actions= %s", str(m.actions[0] )) + self.assertEqual(str(m.actions[0]), '\n') + + log.debug( "parameters= %s", str(m.parameters[0] )) + log.debug( "parameters= %s", str(m.parameters[1] )) + log.debug( "parameters= %s", str(m.parameters[2] )) + self.assertEqual(str(m.parameters[0]), '\nSet to any number bigger than zero to enable retry count\nAzure ims webservice retry count\n\n\n') + self.assertEqual(str(m.parameters[1]), '\nSet retry wait time in seconds\nConfigure a retry wait time\n\n\n') + self.assertEqual(str(m.parameters[2]), '\nSet request timeout in seconds\nConfigure a request timeout\n\n\n') + + logging.basicConfig( stream=sys.stderr ) unittest.main()