diff --git a/cib/callbacks.c b/cib/callbacks.c index 754e218..77853d9 100644 --- a/cib/callbacks.c +++ b/cib/callbacks.c @@ -1391,7 +1391,6 @@ initiate_exit(void) extern int remote_fd; extern int remote_tls_fd; -extern void terminate_cs_connection(void); void terminate_cib(const char *caller, gboolean fast) diff --git a/cib/main.c b/cib/main.c index 6b56274..3328558 100644 --- a/cib/main.c +++ b/cib/main.c @@ -371,15 +371,25 @@ ccm_connect(void) #endif #if SUPPORT_COROSYNC -static gboolean -cib_ais_dispatch(int kind, const char *from, const char *data) +static void +cib_cs_dispatch(cpg_handle_t handle, + const struct cpg_name *groupName, + uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len) { + uint32_t kind = 0; xmlNode *xml = NULL; + const char *from = NULL; + char *data = pcmk_message_common_cs(handle, nodeid, pid, msg, &kind, &from); + if(data == NULL) { + return; + } if (kind == crm_class_cluster) { xml = string2xml(data); if (xml == NULL) { - goto bail; + crm_err("Invalid XML: '%.120s'", data); + free(data); + return; } crm_xml_add(xml, F_ORIG, from); /* crm_xml_add_int(xml, F_SEQ, wrapper->id); */ @@ -387,16 +397,11 @@ cib_ais_dispatch(int kind, const char *from, const char *data) } free_xml(xml); - return TRUE; - - bail: - crm_err("Invalid XML: '%.120s'", data); - return TRUE; - + free(data); } static void -cib_ais_destroy(gpointer user_data) +cib_cs_destroy(gpointer user_data) { if (cib_shutdown_flag) { crm_info("Corosync disconnection complete"); @@ -463,8 +468,9 @@ cib_init(void) { if (is_openais_cluster()) { #if SUPPORT_COROSYNC - crm_cluster.destroy = cib_ais_destroy; - crm_cluster.cs_dispatch = cib_ais_dispatch; + crm_cluster.destroy = cib_cs_destroy; + crm_cluster.cpg.cpg_deliver_fn = cib_cs_dispatch; + crm_cluster.cpg.cpg_confchg_fn = pcmk_cpg_membership; #endif } else if (is_heartbeat_cluster()) { #if SUPPORT_HEARTBEAT diff --git a/configure.ac b/configure.ac index be8261a..7d2e384 100644 --- a/configure.ac +++ b/configure.ac @@ -132,7 +132,7 @@ try_extract_header_define() { AC_MSG_RESULT($value) fi printf $value - rm -rf ${Cfile}.cc ${Cfile} ${Cfile}.dSYM ${Cfile}.gcno + rm -rf ${Cfile}.c ${Cfile} ${Cfile}.dSYM ${Cfile}.gcno } extract_header_define() { @@ -669,14 +669,6 @@ else fi AC_MSG_RESULT(using $GLIBCONFIG) -if - $PKGCONFIG --exists systemd -then - systemdunitdir=`$PKGCONFIG --variable=systemdsystemunitdir systemd` - AC_SUBST(systemdunitdir) -fi -AM_CONDITIONAL(HAVE_SYSTEMD, test -n "$systemdunitdir" -a "x$systemdunitdir" != xno) - # # Where is dlopen? # @@ -965,50 +957,37 @@ dnl ======================================================================== dnl Profiling and GProf dnl ======================================================================== -case $SUPPORT_PROFILING in +case $SUPPORT_GCOV in 1|yes|true) SUPPORT_PROFILING=1 - - dnl Enable gprof - #LIBS="$LIBS -pg" - #CFLAGS="$CFLAGS -pg" - - dnl Disable various compiler optimizations - CFLAGS="$CFLAGS -fno-omit-frame-pointer" - #CFLAGS="$CFLAGS -fno-inline-functions -fno-inline-functions-called-once -fno-optimize-sibling-calls" - dnl CFLAGS="$CFLAGS -fno-default-inline -fno-inline" - - dnl Update features - PCMK_FEATURES="$PCMK_FEATURES gprof" ;; - *) SUPPORT_PROFILING=0;; esac -AC_DEFINE_UNQUOTED(SUPPORT_PROFILING, $SUPPORT_PROFILING, Support for gprof profiling) -case $SUPPORT_GCOV in +case $SUPPORT_PROFILING in 1|yes|true) - SUPPORT_GCOV=1 + SUPPORT_PROFILING=1 dnl Enable gprof #LIBS="$LIBS -pg" #CFLAGS="$CFLAGS -pg" dnl Disable various compiler optimizations - CFLAGS="$CFLAGS -fprofile-arcs -ftest-coverage -fno-inline" + CFLAGS="$CFLAGS -fno-omit-frame-pointer -fprofile-arcs -ftest-coverage -fno-inline" + #CFLAGS="$CFLAGS -fno-inline-functions -fno-inline-functions-called-once -fno-optimize-sibling-calls" + dnl CFLAGS="$CFLAGS -fno-default-inline -fno-inline" - dnl Turn off optimization so code coverage tool - dnl can get accurate line numbers + dnl Turn off optimization so code coverage tool can get accurate line numbers AC_MSG_NOTICE(Old CFLAGS: $CFLAGS) - CFLAGS=`echo $CFLAGS | sed -e 's/-O.\ //g' -e 's/-Wp,-D_FORTIFY_SOURCE=.\ //g'` + CFLAGS=`echo $CFLAGS | sed -e 's/-O.\ //g' -e 's/-Wp,-D_FORTIFY_SOURCE=.\ //g' -e 's/-D_FORTIFY_SOURCE=.\ //g'` CFLAGS="$CFLAGS -O0" AC_MSG_NOTICE(New CFLAGS: $CFLAGS) dnl Update features - PCMK_FEATURES="$PCMK_FEATURES gcov" + PCMK_FEATURES="$PCMK_FEATURES profile" ;; *) SUPPORT_PROFILING=0;; esac -AC_DEFINE_UNQUOTED(SUPPORT_GCOV, $SUPPORT_GCOV, Support for gcov coverage testing) +AC_DEFINE_UNQUOTED(SUPPORT_PROFILING, $SUPPORT_PROFILING, Support for profiling) dnl ======================================================================== dnl Cluster infrastructure - Heartbeat / LibQB @@ -1192,14 +1171,25 @@ fi AC_DEFINE_UNQUOTED(SUPPORT_UPSTART, $HAVE_upstart, Support upstart based system services) AM_CONDITIONAL(BUILD_UPSTART, test $HAVE_upstart = 1) +if + $PKGCONFIG --exists systemd +then + systemdunitdir=`$PKGCONFIG --variable=systemdsystemunitdir systemd` + AC_SUBST(systemdunitdir) +else + enable_systemd=no +fi + if test $HAVE_gio = 1 -a "x${enable_systemd}" != xno; then - HAVE_systemd=1 - PCMK_FEATURES="$PCMK_FEATURES systemd" + if test -n "$systemdunitdir" -a "x$systemdunitdir" != xno; then + HAVE_systemd=1 + PCMK_FEATURES="$PCMK_FEATURES systemd" + fi fi + AC_DEFINE_UNQUOTED(SUPPORT_SYSTEMD, $HAVE_systemd, Support systemd based system services) AM_CONDITIONAL(BUILD_SYSTEMD, test $HAVE_systemd = 1) - case $SUPPORT_NAGIOS in 1|yes|true|try) SUPPORT_NAGIOS=1;; diff --git a/crmd/control.c b/crmd/control.c index 7f423db..0808f56 100644 --- a/crmd/control.c +++ b/crmd/control.c @@ -915,7 +915,7 @@ config_query_callback(xmlNode * msg, int call_id, int rc, xmlNode * output, void if (is_classic_ais_cluster()) { value = crmd_pref(config_hash, XML_ATTR_EXPECTED_VOTES); crm_debug("Sending expected-votes=%s to corosync", value); - send_ais_text(crm_class_quorum, value, TRUE, NULL, crm_msg_ais); + send_cluster_text(crm_class_quorum, value, TRUE, NULL, crm_msg_ais); } #endif diff --git a/crmd/corosync.c b/crmd/corosync.c index 6385780..c4aef38 100644 --- a/crmd/corosync.c +++ b/crmd/corosync.c @@ -41,8 +41,10 @@ extern void crmd_ha_connection_destroy(gpointer user_data); /* A_HA_CONNECT */ #if SUPPORT_COROSYNC -static gboolean -crmd_ais_dispatch(int kind, const char *from, const char *data) +static void +crmd_cs_dispatch(cpg_handle_t handle, + const struct cpg_name *groupName, + uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len) { int seq = 0; xmlNode *xml = NULL; @@ -50,10 +52,18 @@ crmd_ais_dispatch(int kind, const char *from, const char *data) crm_node_t *peer = NULL; enum crm_proc_flag flag = crm_proc_cpg; + uint32_t kind = 0; + const char *from = NULL; + char *data = pcmk_message_common_cs(handle, nodeid, pid, msg, &kind, &from); + + if(data == NULL) { + return; + } xml = string2xml(data); if (xml == NULL) { crm_err("Could not parse message content (%d): %.100s", kind, data); - return TRUE; + free(data); + return; } switch (kind) { @@ -103,8 +113,8 @@ crmd_ais_dispatch(int kind, const char *from, const char *data) /* If we can still talk to our peer process on that node, * then its also part of the corosync membership */ - crm_err("Recieving messages from a node we think is dead: %s[%d]", peer->uname, - peer->id); + crm_warn("Recieving messages from a node we think is dead: %s[%d]", peer->uname, + peer->id); crm_update_peer_proc(__FUNCTION__, peer, flag, ONLINESTATUS); } crmd_ha_msg_filter(xml); @@ -123,8 +133,8 @@ crmd_ais_dispatch(int kind, const char *from, const char *data) crm_err("Invalid message class (%d): %.100s", kind, data); } + free(data); free_xml(xml); - return TRUE; } static gboolean @@ -148,7 +158,7 @@ crmd_quorum_destroy(gpointer user_data) } static void -crmd_ais_destroy(gpointer user_data) +crmd_cs_destroy(gpointer user_data) { if (is_not_set(fsa_input_register, R_HA_DISCONNECTED)) { crm_err("connection terminated"); @@ -182,8 +192,9 @@ crm_connect_corosync(crm_cluster_t * cluster) if (is_openais_cluster()) { crm_set_status_callback(&peer_update_callback); - cluster->cs_dispatch = crmd_ais_dispatch; - cluster->destroy = crmd_ais_destroy; + cluster->cpg.cpg_deliver_fn = crmd_cs_dispatch; + cluster->cpg.cpg_confchg_fn = pcmk_cpg_membership; + cluster->destroy = crmd_cs_destroy; rc = crm_cluster_connect(cluster); } diff --git a/crmd/election.c b/crmd/election.c index 1946858..25cb647 100644 --- a/crmd/election.c +++ b/crmd/election.c @@ -518,7 +518,7 @@ do_dc_takeover(long long action, #if SUPPORT_COROSYNC if (is_classic_ais_cluster()) { - send_ais_text(crm_class_quorum, NULL, TRUE, NULL, crm_msg_ais); + send_cluster_text(crm_class_quorum, NULL, TRUE, NULL, crm_msg_ais); } #endif diff --git a/crmd/lrm.c b/crmd/lrm.c index 31f00d7..15bad88 100644 --- a/crmd/lrm.c +++ b/crmd/lrm.c @@ -1929,6 +1929,7 @@ do_update_resource(lrm_state_t * lrm_state, lrmd_rsc_info_t * rsc, lrmd_event_da } else { crm_warn("Resource %s no longer exists in the lrmd", op->rsc_id); + send_direct_ack(NULL, NULL, rsc, op, op->rsc_id); goto cleanup; } diff --git a/doc/Pacemaker_Remote/en-US/Revision_History.xml b/doc/Pacemaker_Remote/en-US/Revision_History.xml index 26d8ab6..257ecbd 100644 --- a/doc/Pacemaker_Remote/en-US/Revision_History.xml +++ b/doc/Pacemaker_Remote/en-US/Revision_History.xml @@ -8,13 +8,13 @@ - 1 + 1-0 Tue Mar 19 2013 DavidVosseldvossel@redhat.com Import from Pages.app - 2 + 2-0 Tue May 13 2013 DavidVosseldvossel@redhat.com Added Future Features Section diff --git a/extra/cluster-init b/extra/cluster-init index 5dc71c2..fe0ff61 100755 --- a/extra/cluster-init +++ b/extra/cluster-init @@ -294,10 +294,10 @@ esac case $DATE in [Yy][Ee][Ss]|[Yy]) - now=`date` for host in $host_list; do echo "Setting time on ${host}" scp /etc/localtime root@${host}:/etc + now=`date` ssh -l root ${host} -- date -s "'$now'" echo "" done diff --git a/fencing/fence_dummy b/fencing/fence_dummy index b202977..8cf5103 100644 --- a/fencing/fence_dummy +++ b/fencing/fence_dummy @@ -5,7 +5,7 @@ # Virsh 0.3.3 on RHEL 5.2 with xen-3.0.3-51 # -import sys, time, random +import sys, time, random, os, atexit, getopt, re #BEGIN_VERSION_GENERATION RELEASE_VERSION="3.1.6" @@ -42,14 +42,28 @@ all_opt = { "debug" : { "getopt" : "D:", "longopt" : "debug-file", - "help" : "-D, --debug-file= Debugging to output file", + "help" : "-D, --debug-file=[debugfile] Debugging to output file", "required" : "0", "shortdesc" : "Write debug information to given file", "order" : 52 }, + "random_sleep_range": { + "getopt" : "R:", + "required" : "0", + "longopt" : "random_sleep_range", + "help" : "--random_sleep-range=[seconds] Issue a sleep between 1 and [seconds]. Used for testing.", + "shortdesc" : "Issue a sleep between 1 and [seconds]", + "order" : 1 }, + "mode": { + "getopt" : "M:", + "longopt" : "mode", + "required" : "0", + "help" : "--mode=(pass|fail|random). Used for testing.", + "shortdesc" : "Should operations always pass, always fail or fail at random", + "order" : 1 }, "delay" : { "getopt" : "f:", "longopt" : "delay", - "help" : "--delay Wait X seconds before fencing is started", + "help" : "--delay [seconds] Wait X seconds before fencing is started", "required" : "0", "shortdesc" : "Wait X seconds before fencing is started", "default" : "0", @@ -57,7 +71,7 @@ all_opt = { "action" : { "getopt" : "o:", "longopt" : "action", - "help" : "-o, --action= Action: status, reboot (default), off or on", + "help" : "-o, --action=[action] Action: status, reboot (default), off or on", "required" : "1", "shortdesc" : "Fencing Action", "default" : "reboot", @@ -65,7 +79,7 @@ all_opt = { "port" : { "getopt" : "n:", "longopt" : "plug", - "help" : "-n, --plug= Physical plug number on device or\n" + + "help" : "-n, --plug=[id] Physical plug number on device or\n" + " name of virtual machine", "required" : "1", "shortdesc" : "Physical plug number or name of virtual machine", @@ -73,7 +87,7 @@ all_opt = { "switch" : { "getopt" : "s:", "longopt" : "switch", - "help" : "-s, --switch= Physical switch number on device", + "help" : "-s, --switch=[id] Physical switch number on device", "required" : "0", "shortdesc" : "Physical switch number on device", "order" : 1 }, @@ -86,8 +100,6 @@ all_opt = { "order" : 1} } -common_opt = [ "retry_on", "delay" ] - def show_docs(options, docs = None): device_opt = options["device_opt"] @@ -189,12 +201,6 @@ def metadata(avail_opt, options, docs): def process_input(avail_opt): global all_opt - global common_opt - - ## - ## Add options which are available for every fence agent - ##### - avail_opt.extend(common_opt) ## ## Set standard environment @@ -290,24 +296,11 @@ def atexit_handler(): os.close(1) except IOError: sys.stderr.write("%s failed to close standard output\n"%(sys.argv[0])) - sys.exit(EC_GENERIC_ERROR) + sys.exit(1) def main(): global all_opt - device_opt = [ "help", "version", "verbose", "debug", "action", "port", - "power_timeout", "random_sleep_range"] - - all_opt["random_sleep_range"] = { - "getopt" : "R:", - "longopt" : "random_sleep_range", - "help" : "--random_sleep-range=Issue a sleep between 1 and . Used for testing.", - "order" : 1 } - - all_opt["mode"] = { - "getopt" : "M:", - "longopt" : "mode", - "help" : "--mode=(pass|fail|random). Used for testing.", - "order" : 1 } + device_opt = [ "help", "version", "verbose", "debug", "action", "port", "mode", "random_sleep_range"] ## Defaults for fence agent docs = { } @@ -316,6 +309,7 @@ def main(): atexit.register(atexit_handler) options = process_input(device_opt) + options["device_opt"] = device_opt show_docs(options, docs) # random sleep for testing diff --git a/fencing/main.c b/fencing/main.c index c7b67a1..fee9f7a 100644 --- a/fencing/main.c +++ b/fencing/main.c @@ -190,15 +190,25 @@ stonith_peer_hb_destroy(gpointer user_data) #endif #if SUPPORT_COROSYNC -static gboolean -stonith_peer_ais_callback(int kind, const char *from, const char *data) +static void +stonith_peer_ais_callback(cpg_handle_t handle, + const struct cpg_name *groupName, + uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len) { + uint32_t kind = 0; xmlNode *xml = NULL; + const char *from = NULL; + char *data = pcmk_message_common_cs(handle, nodeid, pid, msg, &kind, &from); + if(data == NULL) { + return; + } if (kind == crm_class_cluster) { xml = string2xml(data); if (xml == NULL) { - goto bail; + crm_err("Invalid XML: '%.120s'", data); + free(data); + return; } crm_xml_add(xml, F_ORIG, from); /* crm_xml_add_int(xml, F_SEQ, wrapper->id); */ @@ -206,18 +216,14 @@ stonith_peer_ais_callback(int kind, const char *from, const char *data) } free_xml(xml); - return TRUE; - - bail: - crm_err("Invalid XML: '%.120s'", data); - return TRUE; - + free(data); + return; } static void -stonith_peer_ais_destroy(gpointer user_data) +stonith_peer_cs_destroy(gpointer user_data) { - crm_err("AIS connection terminated"); + crm_err("Corosync connection terminated"); stonith_shutdown(0); } #endif @@ -1084,8 +1090,9 @@ main(int argc, char **argv) if (is_openais_cluster()) { #if SUPPORT_COROSYNC - cluster.destroy = stonith_peer_ais_destroy; - cluster.cs_dispatch = stonith_peer_ais_callback; + cluster.destroy = stonith_peer_cs_destroy; + cluster.cpg.cpg_deliver_fn = stonith_peer_ais_callback; + cluster.cpg.cpg_confchg_fn = pcmk_cpg_membership; #endif } diff --git a/include/crm/cluster.h b/include/crm/cluster.h index cac863f..c999367 100644 --- a/include/crm/cluster.h +++ b/include/crm/cluster.h @@ -26,9 +26,12 @@ # include # endif +# if SUPPORT_COROSYNC +# include +# endif + extern gboolean crm_have_quorum; extern GHashTable *crm_peer_cache; -extern GHashTable *crm_peer_id_cache; extern unsigned long long crm_peer_seq; # ifndef CRM_SERVICE @@ -73,21 +76,24 @@ typedef struct crm_peer_node_s { void crm_peer_init(void); void crm_peer_destroy(void); -char *get_corosync_uuid(crm_node_t *peer); -int get_corosync_id(int id, const char *uuid); typedef struct crm_cluster_s { char *uuid; char *uname; uint32_t nodeid; + void (*destroy) (gpointer); + # if SUPPORT_HEARTBEAT ll_cluster_t *hb_conn; void (*hb_dispatch) (HA_Message * msg, void *private); # endif - gboolean(*cs_dispatch) (int kind, const char *from, const char *data); - void (*destroy) (gpointer); +# if SUPPORT_COROSYNC + struct cpg_name group; + cpg_callbacks_t cpg; + cpg_handle_t cpg_handle; +# endif } crm_cluster_t; @@ -122,8 +128,6 @@ enum crm_ais_msg_types { gboolean send_cluster_message(crm_node_t * node, enum crm_ais_msg_types service, xmlNode * data, gboolean ordered); -void destroy_crm_node(gpointer /* crm_node_t* */ data); - crm_node_t *crm_get_peer(unsigned int id, const char *uname); guint crm_active_peers(void); @@ -138,8 +142,18 @@ gboolean crm_is_heartbeat_peer_active(const crm_node_t * node); # if SUPPORT_COROSYNC extern int ais_fd_sync; +uint32_t get_local_nodeid(cpg_handle_t handle); + +gboolean cluster_connect_cpg(crm_cluster_t *cluster); +void cluster_disconnect_cpg(crm_cluster_t * cluster); + +void pcmk_cpg_membership(cpg_handle_t handle, + const struct cpg_name *groupName, + const struct cpg_address *member_list, size_t member_list_entries, + const struct cpg_address *left_list, size_t left_list_entries, + const struct cpg_address *joined_list, size_t joined_list_entries); gboolean crm_is_corosync_peer_active(const crm_node_t * node); -gboolean send_ais_text(int class, const char *data, gboolean local, +gboolean send_cluster_text(int class, const char *data, gboolean local, crm_node_t * node, enum crm_ais_msg_types dest); # endif @@ -180,4 +194,7 @@ gboolean is_heartbeat_cluster(void); const char *get_local_node_name(void); char *get_node_name(uint32_t nodeid); +char *pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *msg, + uint32_t *kind, const char **from); + #endif diff --git a/include/crm/cluster/internal.h b/include/crm/cluster/internal.h index 2fa8e08..791a1f9 100644 --- a/include/crm/cluster/internal.h +++ b/include/crm/cluster/internal.h @@ -349,20 +349,24 @@ gboolean heartbeat_initialize_nodelist(void *cluster, gboolean force_member, xml # if SUPPORT_COROSYNC +gboolean send_cpg_iov(struct iovec * iov); + # if SUPPORT_PLUGIN char *classic_node_name(uint32_t nodeid); +void plugin_handle_membership(AIS_Message *msg); +bool send_plugin_text(int class, struct iovec *iov); # else char *corosync_node_name(uint64_t /*cmap_handle_t */ cmap_handle, uint32_t nodeid); # endif gboolean corosync_initialize_nodelist(void *cluster, gboolean force_member, xmlNode * xml_parent); -gboolean send_ais_message(xmlNode * msg, gboolean local, - crm_node_t * node, enum crm_ais_msg_types dest); +gboolean send_cluster_message_cs(xmlNode * msg, gboolean local, + crm_node_t * node, enum crm_ais_msg_types dest); enum cluster_type_e find_corosync_variant(void); -void terminate_cs_connection(void); +void terminate_cs_connection(crm_cluster_t * cluster); gboolean init_cs_connection(crm_cluster_t * cluster); gboolean init_cs_connection_once(crm_cluster_t * cluster); # endif @@ -377,6 +381,8 @@ enum crm_quorum_source { crm_quorum_pacemaker, }; +int get_corosync_id(int id, const char *uuid); +char *get_corosync_uuid(crm_node_t *peer); enum crm_quorum_source get_quorum_source(void); void crm_update_peer_proc(const char *source, crm_node_t * peer, uint32_t flag, const char *status); diff --git a/lib/cluster/Makefile.am b/lib/cluster/Makefile.am index a5a70ff..744ff27 100644 --- a/lib/cluster/Makefile.am +++ b/lib/cluster/Makefile.am @@ -33,6 +33,7 @@ libcrmcluster_la_LIBADD = $(top_builddir)/lib/common/libcrmcommon.la $(top_buil libcrmcluster_la_DEPENDENCIES = $(top_builddir)/lib/common/libcrmcommon.la $(top_builddir)/lib/fencing/libstonithd.la if BUILD_CS_SUPPORT +libcrmcluster_la_SOURCES += cpg.c if BUILD_CS_PLUGIN libcrmcluster_la_SOURCES += legacy.c else diff --git a/lib/cluster/cluster.c b/lib/cluster/cluster.c index 9538816..5820c8d 100644 --- a/lib/cluster/cluster.c +++ b/lib/cluster/cluster.c @@ -240,7 +240,7 @@ crm_cluster_disconnect(crm_cluster_t * cluster) #if SUPPORT_COROSYNC if (is_openais_cluster()) { crm_peer_destroy(); - terminate_cs_connection(); + terminate_cs_connection(cluster); crm_info("Disconnected from %s", type_str); return; } @@ -274,7 +274,7 @@ send_cluster_message(crm_node_t * node, enum crm_ais_msg_types service, xmlNode #if SUPPORT_COROSYNC if (is_openais_cluster()) { - return send_ais_message(data, FALSE, node, service); + return send_cluster_message_cs(data, FALSE, node, service); } #endif #if SUPPORT_HEARTBEAT diff --git a/lib/cluster/corosync.c b/lib/cluster/corosync.c index 83a0c78..5a64fe1 100644 --- a/lib/cluster/corosync.c +++ b/lib/cluster/corosync.c @@ -34,69 +34,16 @@ #include #include #include -#include #include #include #include #include -cpg_handle_t pcmk_cpg_handle = 0; - -struct cpg_name pcmk_cpg_group = { - .length = 0, - .value[0] = 0, -}; - quorum_handle_t pcmk_quorum_handle = 0; gboolean(*quorum_app_callback) (unsigned long long seq, gboolean quorate) = NULL; -#define cs_repeat(counter, max, code) do { \ - code; \ - if(rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { \ - counter++; \ - crm_debug("Retrying operation after %ds", counter); \ - sleep(counter); \ - } else { \ - break; \ - } \ - } while(counter < max) - -static uint32_t get_local_nodeid(cpg_handle_t handle) -{ - int rc = CS_OK; - int retries = 0; - static uint32_t local_nodeid = 0; - cpg_handle_t local_handle = handle; - cpg_callbacks_t cb = { }; - - if(local_nodeid != 0) { - return local_nodeid; - } - - if(handle == 0) { - crm_trace("Creating connection"); - cs_repeat(retries, 5, rc = cpg_initialize(&local_handle, &cb)); - } - - if (rc == CS_OK) { - retries = 0; - crm_trace("Performing lookup"); - cs_repeat(retries, 5, rc = cpg_local_get(local_handle, &local_nodeid)); - } - - if (rc != CS_OK) { - crm_err("Could not get local node id from the CPG API: %s (%d)", ais_error2text(rc), rc); - } - if(handle == 0) { - crm_trace("Closing connection"); - cpg_finalize(local_handle); - } - crm_debug("Local nodeid is %u", local_nodeid); - return local_nodeid; -} - /* * CFG functionality stolen from node_name() in corosync-quorumtool.c * This resolves the first address assigned to a node and returns the name or IP address. @@ -189,281 +136,12 @@ corosync_node_name(uint64_t /*cmap_handle_t */ cmap_handle, uint32_t nodeid) return name; } -enum crm_ais_msg_types -text2msg_type(const char *text) -{ - int type = crm_msg_none; - - CRM_CHECK(text != NULL, return type); - if (safe_str_eq(text, "ais")) { - type = crm_msg_ais; - } else if (safe_str_eq(text, "crm_plugin")) { - type = crm_msg_ais; - } else if (safe_str_eq(text, CRM_SYSTEM_CIB)) { - type = crm_msg_cib; - } else if (safe_str_eq(text, CRM_SYSTEM_CRMD)) { - type = crm_msg_crmd; - } else if (safe_str_eq(text, CRM_SYSTEM_DC)) { - type = crm_msg_crmd; - } else if (safe_str_eq(text, CRM_SYSTEM_TENGINE)) { - type = crm_msg_te; - } else if (safe_str_eq(text, CRM_SYSTEM_PENGINE)) { - type = crm_msg_pe; - } else if (safe_str_eq(text, CRM_SYSTEM_LRMD)) { - type = crm_msg_lrmd; - } else if (safe_str_eq(text, CRM_SYSTEM_STONITHD)) { - type = crm_msg_stonithd; - } else if (safe_str_eq(text, "stonith-ng")) { - type = crm_msg_stonith_ng; - } else if (safe_str_eq(text, "attrd")) { - type = crm_msg_attrd; - - } else { - /* This will normally be a transient client rather than - * a cluster daemon. Set the type to the pid of the client - */ - int scan_rc = sscanf(text, "%d", &type); - - if (scan_rc != 1) { - /* Ensure its sane */ - type = crm_msg_none; - } - } - return type; -} - -GListPtr cs_message_queue = NULL; -int cs_message_timer = 0; - -static ssize_t crm_cs_flush(void); - -static gboolean -crm_cs_flush_cb(gpointer data) -{ - cs_message_timer = 0; - crm_cs_flush(); - return FALSE; -} - -#define CS_SEND_MAX 200 -static ssize_t -crm_cs_flush(void) -{ - int sent = 0; - ssize_t rc = 0; - int queue_len = 0; - static unsigned int last_sent = 0; - - if (pcmk_cpg_handle == 0) { - crm_trace("Connection is dead"); - return pcmk_ok; - } - - queue_len = g_list_length(cs_message_queue); - if ((queue_len % 1000) == 0 && queue_len > 1) { - crm_err("CPG queue has grown to %d", queue_len); - - } else if (queue_len == CS_SEND_MAX) { - crm_warn("CPG queue has grown to %d", queue_len); - } - - if (cs_message_timer) { - /* There is already a timer, wait until it goes off */ - crm_trace("Timer active %d", cs_message_timer); - return pcmk_ok; - } - - while (cs_message_queue && sent < CS_SEND_MAX) { - AIS_Message *header = NULL; - struct iovec *iov = cs_message_queue->data; - - errno = 0; - rc = cpg_mcast_joined(pcmk_cpg_handle, CPG_TYPE_AGREED, iov, 1); - - if (rc != CS_OK) { - break; - } - - sent++; - header = iov->iov_base; - last_sent = header->id; - if (header->compressed_size) { - crm_trace("CPG message %d (%d compressed bytes) sent", - header->id, header->compressed_size); - } else { - crm_trace("CPG message %d (%d bytes) sent: %.200s", - header->id, header->size, header->data); - } - - cs_message_queue = g_list_remove(cs_message_queue, iov); - free(iov[0].iov_base); - free(iov); - } - - queue_len -= sent; - if (sent > 1 || cs_message_queue) { - crm_info("Sent %d CPG messages (%d remaining, last=%u): %s (%d)", - sent, queue_len, last_sent, ais_error2text(rc), rc); - } else { - crm_trace("Sent %d CPG messages (%d remaining, last=%u): %s (%d)", - sent, queue_len, last_sent, ais_error2text(rc), rc); - } - - if (cs_message_queue) { - uint32_t delay_ms = 100; - if(rc != CS_OK) { - /* Proportionally more if sending failed but cap at 1s */ - delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len)); - } - cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, NULL); - } - - return rc; -} - -gboolean -send_ais_text(int class, const char *data, - gboolean local, crm_node_t * node, enum crm_ais_msg_types dest) -{ - static int msg_id = 0; - static int local_pid = 0; - static int local_name_len = 0; - static const char *local_name = NULL; - - char *target = NULL; - struct iovec *iov; - AIS_Message *ais_msg = NULL; - enum crm_ais_msg_types sender = text2msg_type(crm_system_name); - - /* There are only 6 handlers registered to crm_lib_service in plugin.c */ - CRM_CHECK(class < 6, crm_err("Invalid message class: %d", class); - return FALSE); - - CRM_CHECK(dest != crm_msg_ais, return FALSE); - - if(local_name == NULL) { - local_name = get_local_node_name(); - } - if(local_name_len == 0 && local_name) { - local_name_len = strlen(local_name); - } - - if (data == NULL) { - data = ""; - } - - if (local_pid == 0) { - local_pid = getpid(); - } - - if (sender == crm_msg_none) { - sender = local_pid; - } - - ais_msg = calloc(1, sizeof(AIS_Message)); - - ais_msg->id = msg_id++; - ais_msg->header.id = class; - ais_msg->header.error = CS_OK; - - ais_msg->host.type = dest; - ais_msg->host.local = local; - - if (node) { - if (node->uname) { - target = strdup(node->uname); - ais_msg->host.size = strlen(node->uname); - memset(ais_msg->host.uname, 0, MAX_NAME); - memcpy(ais_msg->host.uname, node->uname, ais_msg->host.size); - } else { - target = g_strdup_printf("%u", node->id); - } - ais_msg->host.id = node->id; - } else { - target = strdup("all"); - } - - ais_msg->sender.id = 0; - ais_msg->sender.type = sender; - ais_msg->sender.pid = local_pid; - ais_msg->sender.size = local_name_len; - memset(ais_msg->sender.uname, 0, MAX_NAME); - memcpy(ais_msg->sender.uname, local_name, ais_msg->sender.size); - - ais_msg->size = 1 + strlen(data); - ais_msg->header.size = sizeof(AIS_Message) + ais_msg->size; - - if (ais_msg->size < CRM_BZ2_THRESHOLD) { - ais_msg = realloc(ais_msg, ais_msg->header.size); - memcpy(ais_msg->data, data, ais_msg->size); - - } else { - char *compressed = NULL; - unsigned int new_size = 0; - char *uncompressed = strdup(data); - - if (crm_compress_string(uncompressed, ais_msg->size, 0, &compressed, &new_size)) { - - ais_msg->header.size = sizeof(AIS_Message) + new_size + 1; - ais_msg = realloc(ais_msg, ais_msg->header.size); - memcpy(ais_msg->data, compressed, new_size); - ais_msg->data[new_size] = 0; - - ais_msg->is_compressed = TRUE; - ais_msg->compressed_size = new_size; - - } else { - ais_msg = realloc(ais_msg, ais_msg->header.size); - memcpy(ais_msg->data, data, ais_msg->size); - } - - free(uncompressed); - free(compressed); - } - - if (ais_msg->compressed_size) { - crm_trace("Queueing CPG message %u to %s (%d compressed bytes)", - ais_msg->id, target, ais_msg->compressed_size); - } else { - crm_trace("Queueing CPG message %u to %s (%d bytes)", - ais_msg->id, target, ais_msg->size); - } - - iov = calloc(1, sizeof(struct iovec)); - iov->iov_base = ais_msg; - iov->iov_len = ais_msg->header.size; - cs_message_queue = g_list_append(cs_message_queue, iov); - crm_cs_flush(); - - free(target); - return TRUE; -} - -gboolean -send_ais_message(xmlNode * msg, gboolean local, crm_node_t * node, enum crm_ais_msg_types dest) -{ - gboolean rc = TRUE; - char *data = dump_xml_unformatted(msg); - - rc = send_ais_text(crm_class_cluster, data, local, node, dest); - free(data); - return rc; -} - void -terminate_cs_connection(void) +terminate_cs_connection(crm_cluster_t *cluster) { crm_notice("Disconnecting from Corosync"); - if (pcmk_cpg_handle) { - crm_trace("Disconnecting CPG"); - cpg_leave(pcmk_cpg_handle, &pcmk_cpg_group); - cpg_finalize(pcmk_cpg_handle); - pcmk_cpg_handle = 0; - - } else { - crm_info("No CPG connection"); - } + cluster_disconnect_cpg(cluster); if (pcmk_quorum_handle) { crm_trace("Disconnecting quorum"); @@ -478,284 +156,6 @@ terminate_cs_connection(void) int ais_membership_timer = 0; gboolean ais_membership_force = FALSE; -static gboolean -ais_dispatch_message(AIS_Message * msg, - gboolean(*dispatch) (int kind, const char *from, const char *data)) -{ - char *data = NULL; - char *uncompressed = NULL; - - xmlNode *xml = NULL; - - CRM_ASSERT(msg != NULL); - - crm_trace("Got new%s message (size=%d, %d, %d)", - msg->is_compressed ? " compressed" : "", - ais_data_len(msg), msg->size, msg->compressed_size); - - data = msg->data; - if (msg->is_compressed && msg->size > 0) { - int rc = BZ_OK; - unsigned int new_size = msg->size + 1; - - if (check_message_sanity(msg, NULL) == FALSE) { - goto badmsg; - } - - crm_trace("Decompressing message data"); - uncompressed = calloc(1, new_size); - rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, data, msg->compressed_size, 1, 0); - - if (rc != BZ_OK) { - crm_err("Decompression failed: %d", rc); - goto badmsg; - } - - CRM_ASSERT(rc == BZ_OK); - CRM_ASSERT(new_size == msg->size); - - data = uncompressed; - - } else if (check_message_sanity(msg, data) == FALSE) { - goto badmsg; - - } else if (safe_str_eq("identify", data)) { - int pid = getpid(); - char *pid_s = crm_itoa(pid); - - send_ais_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais); - free(pid_s); - goto done; - } - - if (msg->header.id != crm_class_members) { - /* Is this even needed anymore? */ - crm_get_peer(msg->sender.id, msg->sender.uname); - } - - if (msg->header.id == crm_class_rmpeer) { - uint32_t id = crm_int_helper(data, NULL); - - crm_info("Removing peer %s/%u", data, id); - reap_crm_member(id, NULL); - goto done; - } - - crm_trace("Payload: %.200s", data); - if (dispatch != NULL) { - dispatch(msg->header.id, msg->sender.uname, data); - } - - done: - free(uncompressed); - free_xml(xml); - return TRUE; - - badmsg: - crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):" - " min=%d, total=%d, size=%d, bz2_size=%d", - msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type), - ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), - msg->sender.pid, (int)sizeof(AIS_Message), - msg->header.size, msg->size, msg->compressed_size); - goto done; -} - -static bool cpg_evicted = FALSE; -gboolean(*pcmk_cpg_dispatch_fn) (int kind, const char *from, const char *data) = NULL; - -static int -pcmk_cpg_dispatch(gpointer user_data) -{ - int rc = 0; - - pcmk_cpg_dispatch_fn = user_data; - rc = cpg_dispatch(pcmk_cpg_handle, CS_DISPATCH_ALL); - if (rc != CS_OK) { - crm_err("Connection to the CPG API failed: %d", rc); - pcmk_cpg_handle = 0; - return -1; - - } else if(cpg_evicted) { - crm_err("Evicted from CPG membership"); - return -1; - } - return 0; -} - -static void -pcmk_cpg_deliver(cpg_handle_t handle, - const struct cpg_name *groupName, - uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len) -{ - AIS_Message *ais_msg = (AIS_Message *) msg; - uint32_t local_nodeid = get_local_nodeid(handle); - const char *local_name = get_local_node_name(); - - if (ais_msg->sender.id > 0 && ais_msg->sender.id != nodeid) { - crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, ais_msg->sender.id); - return; - - } else if (ais_msg->host.id != 0 && (local_nodeid != ais_msg->host.id)) { - /* Not for us */ - crm_trace("Not for us: %u != %u", ais_msg->host.id, local_nodeid); - return; - } else if (ais_msg->host.size != 0 && safe_str_neq(ais_msg->host.uname, local_name)) { - /* Not for us */ - crm_trace("Not for us: %s != %s", ais_msg->host.uname, local_name); - return; - } - - ais_msg->sender.id = nodeid; - if (ais_msg->sender.size == 0) { - crm_node_t *peer = crm_get_peer(nodeid, NULL); - - if (peer == NULL) { - crm_err("Peer with nodeid=%u is unknown", nodeid); - - } else if (peer->uname == NULL) { - crm_err("No uname for peer with nodeid=%u", nodeid); - - } else { - crm_notice("Fixing uname for peer with nodeid=%u", nodeid); - ais_msg->sender.size = strlen(peer->uname); - memset(ais_msg->sender.uname, 0, MAX_NAME); - memcpy(ais_msg->sender.uname, peer->uname, ais_msg->sender.size); - } - } - - ais_dispatch_message(ais_msg, pcmk_cpg_dispatch_fn); -} - -static void -pcmk_cpg_membership(cpg_handle_t handle, - const struct cpg_name *groupName, - const struct cpg_address *member_list, size_t member_list_entries, - const struct cpg_address *left_list, size_t left_list_entries, - const struct cpg_address *joined_list, size_t joined_list_entries) -{ - int i; - gboolean found = FALSE; - static int counter = 0; - uint32_t local_nodeid = get_local_nodeid(handle); - - for (i = 0; i < left_list_entries; i++) { - crm_node_t *peer = crm_get_peer(left_list[i].nodeid, NULL); - - crm_info("Left[%d.%d] %s.%u ", counter, i, groupName->value, left_list[i].nodeid); - crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, OFFLINESTATUS); - } - - for (i = 0; i < joined_list_entries; i++) { - crm_info("Joined[%d.%d] %s.%u ", counter, i, groupName->value, joined_list[i].nodeid); - } - - for (i = 0; i < member_list_entries; i++) { - crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL); - - crm_info("Member[%d.%d] %s.%u ", counter, i, groupName->value, member_list[i].nodeid); - - /* Anyone that is sending us CPG messages must also be a _CPG_ member. - * But its _not_ safe to assume its in the quorum membership. - * We may have just found out its dead and are processing the last couple of messages it sent - */ - crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS); - if(peer && peer->state && crm_is_peer_active(peer) == FALSE) { - time_t now = time(NULL); - - /* Co-opt the otherwise unused votes field */ - if(peer->votes == 0) { - peer->votes = now; - - } else if(now > (60 + peer->votes)) { - /* On the otherhand, if we're still getting messages, at a certain point - * we need to acknowledge our internal cache is probably wrong - * - * Set the threshold to 1 minute - */ - crm_err("Node %s[%u] appears to be online even though we think it is dead", peer->uname, peer->id); - crm_update_peer_state(__FUNCTION__, peer, CRM_NODE_MEMBER, 0); - peer->votes = 0; - } - } - - if (local_nodeid == member_list[i].nodeid) { - found = TRUE; - } - } - - if (!found) { - crm_err("We're not part of CPG group '%s' anymore!", groupName->value); - cpg_evicted = TRUE; - } - - counter++; -} - -cpg_callbacks_t cpg_callbacks = { - .cpg_deliver_fn = pcmk_cpg_deliver, - .cpg_confchg_fn = pcmk_cpg_membership, -}; - -static gboolean -init_cpg_connection(gboolean(*dispatch) (int kind, const char *from, const char *data), - void (*destroy) (gpointer), uint32_t * nodeid) -{ - int rc = -1; - int fd = 0; - int retries = 0; - uint32_t id = 0; - crm_node_t *peer = NULL; - - struct mainloop_fd_callbacks cpg_fd_callbacks = { - .dispatch = pcmk_cpg_dispatch, - .destroy = destroy, - }; - - cpg_evicted = FALSE; - strncpy(pcmk_cpg_group.value, crm_system_name, 128); - pcmk_cpg_group.length = strlen(crm_system_name) + 1; - - cs_repeat(retries, 30, rc = cpg_initialize(&pcmk_cpg_handle, &cpg_callbacks)); - if (rc != CS_OK) { - crm_err("Could not connect to the Cluster Process Group API: %d\n", rc); - goto bail; - } - - id = get_local_nodeid(pcmk_cpg_handle); - if (id == 0) { - crm_err("Could not get local node id from the CPG API"); - goto bail; - - } else if(nodeid) { - *nodeid = id; - } - - retries = 0; - cs_repeat(retries, 30, rc = cpg_join(pcmk_cpg_handle, &pcmk_cpg_group)); - if (rc != CS_OK) { - crm_err("Could not join the CPG group '%s': %d", crm_system_name, rc); - goto bail; - } - - rc = cpg_fd_get(pcmk_cpg_handle, &fd); - if (rc != CS_OK) { - crm_err("Could not obtain the CPG API connection: %d\n", rc); - goto bail; - } - - mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, dispatch, &cpg_fd_callbacks); - - bail: - if (rc != CS_OK) { - cpg_finalize(pcmk_cpg_handle); - return FALSE; - } - - peer = crm_get_peer(id, NULL); - crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS); - return TRUE; -} static int pcmk_quorum_dispatch(gpointer user_data) @@ -940,7 +340,7 @@ init_cs_connection_once(crm_cluster_t * cluster) return FALSE; } - if (init_cpg_connection(cluster->cs_dispatch, cluster->destroy, NULL) == FALSE) { + if (cluster_connect_cpg(cluster) == FALSE) { return FALSE; } crm_info("Connection to '%s': established", name_for_cluster_type(stack)); diff --git a/lib/cluster/cpg.c b/lib/cluster/cpg.c new file mode 100644 index 0000000..903576e --- /dev/null +++ b/lib/cluster/cpg.c @@ -0,0 +1,689 @@ +/* + * Copyright (C) 2004 Andrew Beekhof + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include + +#include + +cpg_handle_t pcmk_cpg_handle = 0; /* TODO: Remove, use cluster.cpg_handle */ + +static bool cpg_evicted = FALSE; +gboolean(*pcmk_cpg_dispatch_fn) (int kind, const char *from, const char *data) = NULL; + +#define cs_repeat(counter, max, code) do { \ + code; \ + if(rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { \ + counter++; \ + crm_debug("Retrying operation after %ds", counter); \ + sleep(counter); \ + } else { \ + break; \ + } \ + } while(counter < max) + +void +cluster_disconnect_cpg(crm_cluster_t *cluster) +{ + pcmk_cpg_handle = 0; + if (cluster->cpg_handle) { + crm_trace("Disconnecting CPG"); + cpg_leave(cluster->cpg_handle, &cluster->group); + cpg_finalize(cluster->cpg_handle); + cluster->cpg_handle = 0; + + } else { + crm_info("No CPG connection"); + } +} + +uint32_t get_local_nodeid(cpg_handle_t handle) +{ + int rc = CS_OK; + int retries = 0; + static uint32_t local_nodeid = 0; + cpg_handle_t local_handle = handle; + cpg_callbacks_t cb = { }; + + if(local_nodeid != 0) { + return local_nodeid; + } + +#if 0 + /* Should not be necessary */ + if(get_cluster_type() == pcmk_cluster_classic_ais) { + get_ais_details(&local_nodeid, NULL); + goto done; + } +#endif + + if(handle == 0) { + crm_trace("Creating connection"); + cs_repeat(retries, 5, rc = cpg_initialize(&local_handle, &cb)); + } + + if (rc == CS_OK) { + retries = 0; + crm_trace("Performing lookup"); + cs_repeat(retries, 5, rc = cpg_local_get(local_handle, &local_nodeid)); + } + + if (rc != CS_OK) { + crm_err("Could not get local node id from the CPG API: %s (%d)", ais_error2text(rc), rc); + } + if(handle == 0) { + crm_trace("Closing connection"); + cpg_finalize(local_handle); + } + crm_debug("Local nodeid is %u", local_nodeid); + return local_nodeid; +} + + +GListPtr cs_message_queue = NULL; +int cs_message_timer = 0; + +static ssize_t crm_cs_flush(gpointer data); + +static gboolean +crm_cs_flush_cb(gpointer data) +{ + cs_message_timer = 0; + crm_cs_flush(data); + return FALSE; +} + +#define CS_SEND_MAX 200 +static ssize_t +crm_cs_flush(gpointer data) +{ + int sent = 0; + ssize_t rc = 0; + int queue_len = 0; + static unsigned int last_sent = 0; + cpg_handle_t *handle = (cpg_handle_t *)data; + + if (*handle == 0) { + crm_trace("Connection is dead"); + return pcmk_ok; + } + + queue_len = g_list_length(cs_message_queue); + if ((queue_len % 1000) == 0 && queue_len > 1) { + crm_err("CPG queue has grown to %d", queue_len); + + } else if (queue_len == CS_SEND_MAX) { + crm_warn("CPG queue has grown to %d", queue_len); + } + + if (cs_message_timer) { + /* There is already a timer, wait until it goes off */ + crm_trace("Timer active %d", cs_message_timer); + return pcmk_ok; + } + + while (cs_message_queue && sent < CS_SEND_MAX) { + struct iovec *iov = cs_message_queue->data; + + errno = 0; + rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1); + + if (rc != CS_OK) { + break; + } + + sent++; + last_sent++; + crm_trace("CPG message sent, size=%d", iov->iov_len); + + cs_message_queue = g_list_remove(cs_message_queue, iov); + free(iov[0].iov_base); + free(iov); + } + + queue_len -= sent; + if (sent > 1 || cs_message_queue) { + crm_info("Sent %d CPG messages (%d remaining, last=%u): %s (%d)", + sent, queue_len, last_sent, ais_error2text(rc), rc); + } else { + crm_trace("Sent %d CPG messages (%d remaining, last=%u): %s (%d)", + sent, queue_len, last_sent, ais_error2text(rc), rc); + } + + if (cs_message_queue) { + uint32_t delay_ms = 100; + if(rc != CS_OK) { + /* Proportionally more if sending failed but cap at 1s */ + delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len)); + } + cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, data); + } + + return rc; +} + +gboolean +send_cpg_iov(struct iovec * iov) +{ + static unsigned int queued = 0; + + queued++; + crm_trace("Queueing CPG message %u (%d bytes)", queued, iov->iov_len); + cs_message_queue = g_list_append(cs_message_queue, iov); + crm_cs_flush(&pcmk_cpg_handle); + return TRUE; +} + +static int +pcmk_cpg_dispatch(gpointer user_data) +{ + int rc = 0; + crm_cluster_t *cluster = (crm_cluster_t*) user_data; + + rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ALL); + if (rc != CS_OK) { + crm_err("Connection to the CPG API failed: %s (%d)", ais_error2text(rc), rc); + cluster->cpg_handle = 0; + return -1; + + } else if(cpg_evicted) { + crm_err("Evicted from CPG membership"); + return -1; + } + return 0; +} + +/* +static void +pcmk_cpg_deliver_message(cpg_handle_t handle, + const struct cpg_name *groupName, + uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len) +{ + uint32_t kind = 0; + const char *from = NULL; + char *data = pcmk_message_common_cs(handle, nodeid, pid, msg, &kind, &from); + + free(data); +} +*/ + +char * +pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content, + uint32_t *kind, const char **from) +{ + char *data = NULL; + AIS_Message *msg = (AIS_Message *) content; + + if(handle) { + /* 'msg' came from CPG not the plugin + * Do filtering and field massaging + */ + uint32_t local_nodeid = get_local_nodeid(handle); + const char *local_name = get_local_node_name(); + + if (msg->sender.id > 0 && msg->sender.id != nodeid) { + crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, msg->sender.id); + return NULL; + + } else if (msg->host.id != 0 && (local_nodeid != msg->host.id)) { + /* Not for us */ + crm_trace("Not for us: %u != %u", msg->host.id, local_nodeid); + return NULL; + } else if (msg->host.size != 0 && safe_str_neq(msg->host.uname, local_name)) { + /* Not for us */ + crm_trace("Not for us: %s != %s", msg->host.uname, local_name); + return NULL; + } + + msg->sender.id = nodeid; + if (msg->sender.size == 0) { + crm_node_t *peer = crm_get_peer(nodeid, NULL); + + if (peer == NULL) { + crm_err("Peer with nodeid=%u is unknown", nodeid); + + } else if (peer->uname == NULL) { + crm_err("No uname for peer with nodeid=%u", nodeid); + + } else { + crm_notice("Fixing uname for peer with nodeid=%u", nodeid); + msg->sender.size = strlen(peer->uname); + memset(msg->sender.uname, 0, MAX_NAME); + memcpy(msg->sender.uname, peer->uname, msg->sender.size); + } + } + } + + crm_trace("Got new%s message (size=%d, %d, %d)", + msg->is_compressed ? " compressed" : "", + ais_data_len(msg), msg->size, msg->compressed_size); + + if (kind != NULL) { + *kind = msg->header.id; + } + if (from != NULL) { + *from = msg->sender.uname; + } + + if (msg->is_compressed && msg->size > 0) { + int rc = BZ_OK; + char *uncompressed = NULL; + unsigned int new_size = msg->size + 1; + + if (check_message_sanity(msg, NULL) == FALSE) { + goto badmsg; + } + + crm_trace("Decompressing message data"); + uncompressed = calloc(1, new_size); + rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0); + + if (rc != BZ_OK) { + crm_err("Decompression failed: %d", rc); + goto badmsg; + } + + CRM_ASSERT(rc == BZ_OK); + CRM_ASSERT(new_size == msg->size); + + data = uncompressed; + + } else if (check_message_sanity(msg, data) == FALSE) { + goto badmsg; + + } else if (safe_str_eq("identify", data)) { + int pid = getpid(); + char *pid_s = crm_itoa(pid); + + send_cluster_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais); + free(pid_s); + return NULL; + + } else { + data = strdup(msg->data); + } + + if (msg->header.id != crm_class_members) { + /* Is this even needed anymore? */ + crm_get_peer(msg->sender.id, msg->sender.uname); + } + + if (msg->header.id == crm_class_rmpeer) { + uint32_t id = crm_int_helper(data, NULL); + + crm_info("Removing peer %s/%u", data, id); + reap_crm_member(id, NULL); + free(data); + return NULL; + +#if SUPPORT_PLUGIN + } else if (is_classic_ais_cluster()) { + plugin_handle_membership(msg); +#endif + } + + crm_trace("Payload: %.200s", data); + return data; + + badmsg: + crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):" + " min=%d, total=%d, size=%d, bz2_size=%d", + msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type), + ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), + msg->sender.pid, (int)sizeof(AIS_Message), + msg->header.size, msg->size, msg->compressed_size); + + free(data); + return NULL; +} + +void +pcmk_cpg_membership(cpg_handle_t handle, + const struct cpg_name *groupName, + const struct cpg_address *member_list, size_t member_list_entries, + const struct cpg_address *left_list, size_t left_list_entries, + const struct cpg_address *joined_list, size_t joined_list_entries) +{ + int i; + gboolean found = FALSE; + static int counter = 0; + uint32_t local_nodeid = get_local_nodeid(handle); + + for (i = 0; i < left_list_entries; i++) { + crm_node_t *peer = crm_get_peer(left_list[i].nodeid, NULL); + + crm_info("Left[%d.%d] %s.%u ", counter, i, groupName->value, left_list[i].nodeid); + crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, OFFLINESTATUS); + } + + for (i = 0; i < joined_list_entries; i++) { + crm_info("Joined[%d.%d] %s.%u ", counter, i, groupName->value, joined_list[i].nodeid); + } + + for (i = 0; i < member_list_entries; i++) { + crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL); + + crm_info("Member[%d.%d] %s.%u ", counter, i, groupName->value, member_list[i].nodeid); + + /* Anyone that is sending us CPG messages must also be a _CPG_ member. + * But its _not_ safe to assume its in the quorum membership. + * We may have just found out its dead and are processing the last couple of messages it sent + */ + crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS); + if(peer && peer->state && crm_is_peer_active(peer) == FALSE) { + time_t now = time(NULL); + + /* Co-opt the otherwise unused votes field */ + if(peer->votes == 0) { + peer->votes = now; + + } else if(now > (60 + peer->votes)) { + /* On the otherhand, if we're still getting messages, at a certain point + * we need to acknowledge our internal cache is probably wrong + * + * Set the threshold to 1 minute + */ + crm_err("Node %s[%u] appears to be online even though we think it is dead", peer->uname, peer->id); + crm_update_peer_state(__FUNCTION__, peer, CRM_NODE_MEMBER, 0); + peer->votes = 0; + } + } + + if (local_nodeid == member_list[i].nodeid) { + found = TRUE; + } + } + + if (!found) { + crm_err("We're not part of CPG group '%s' anymore!", groupName->value); + cpg_evicted = TRUE; + } + + counter++; +} + +gboolean +cluster_connect_cpg(crm_cluster_t *cluster) +{ + int rc = -1; + int fd = 0; + int retries = 0; + uint32_t id = 0; + crm_node_t *peer = NULL; + cpg_handle_t handle = 0; + + struct mainloop_fd_callbacks cpg_fd_callbacks = { + .dispatch = pcmk_cpg_dispatch, + .destroy = cluster->destroy, + }; + + cpg_callbacks_t cpg_callbacks = { + .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn, + .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn, + /* .cpg_deliver_fn = pcmk_cpg_deliver, */ + /* .cpg_confchg_fn = pcmk_cpg_membership, */ + }; + + cpg_evicted = FALSE; + cluster->group.length = 0; + cluster->group.value[0] = 0; + + strncpy(cluster->group.value, crm_system_name, 128); + cluster->group.length = strlen(crm_system_name) + 1; + + cs_repeat(retries, 30, rc = cpg_initialize(&handle, &cpg_callbacks)); + if (rc != CS_OK) { + crm_err("Could not connect to the Cluster Process Group API: %d\n", rc); + goto bail; + } + + id = get_local_nodeid(handle); + if (id == 0) { + crm_err("Could not get local node id from the CPG API"); + goto bail; + + } + cluster->nodeid = id; + + retries = 0; + cs_repeat(retries, 30, rc = cpg_join(handle, &cluster->group)); + if (rc != CS_OK) { + crm_err("Could not join the CPG group '%s': %d", crm_system_name, rc); + goto bail; + } + + rc = cpg_fd_get(handle, &fd); + if (rc != CS_OK) { + crm_err("Could not obtain the CPG API connection: %d\n", rc); + goto bail; + } + + pcmk_cpg_handle = handle; + cluster->cpg_handle = handle; + mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks); + + bail: + if (rc != CS_OK) { + cpg_finalize(handle); + return FALSE; + } + + peer = crm_get_peer(id, NULL); + crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS); + return TRUE; +} + +gboolean +send_cluster_message_cs(xmlNode * msg, gboolean local, crm_node_t * node, enum crm_ais_msg_types dest) +{ + gboolean rc = TRUE; + char *data = NULL; + + data = dump_xml_unformatted(msg); + rc = send_cluster_text(crm_class_cluster, data, local, node, dest); + free(data); + return rc; +} + +gboolean +send_cluster_text(int class, const char *data, + gboolean local, crm_node_t * node, enum crm_ais_msg_types dest) +{ + static int msg_id = 0; + static int local_pid = 0; + static int local_name_len = 0; + static const char *local_name = NULL; + + char *target = NULL; + struct iovec *iov; + AIS_Message *msg = NULL; + enum crm_ais_msg_types sender = text2msg_type(crm_system_name); + + /* There are only 6 handlers registered to crm_lib_service in plugin.c */ + CRM_CHECK(class < 6, crm_err("Invalid message class: %d", class); + return FALSE); + +#if !SUPPORT_PLUGIN + CRM_CHECK(dest != crm_msg_ais, return FALSE); +#endif + + if(local_name == NULL) { + local_name = get_local_node_name(); + } + if(local_name_len == 0 && local_name) { + local_name_len = strlen(local_name); + } + + if (data == NULL) { + data = ""; + } + + if (local_pid == 0) { + local_pid = getpid(); + } + + if (sender == crm_msg_none) { + sender = local_pid; + } + + msg = calloc(1, sizeof(AIS_Message)); + + msg_id++; + msg->id = msg_id; + msg->header.id = class; + msg->header.error = CS_OK; + + msg->host.type = dest; + msg->host.local = local; + + if (node) { + if (node->uname) { + target = strdup(node->uname); + msg->host.size = strlen(node->uname); + memset(msg->host.uname, 0, MAX_NAME); + memcpy(msg->host.uname, node->uname, msg->host.size); + } else { + target = g_strdup_printf("%u", node->id); + } + msg->host.id = node->id; + } else { + target = strdup("all"); + } + + msg->sender.id = 0; + msg->sender.type = sender; + msg->sender.pid = local_pid; + msg->sender.size = local_name_len; + memset(msg->sender.uname, 0, MAX_NAME); + memcpy(msg->sender.uname, local_name, msg->sender.size); + + msg->size = 1 + strlen(data); + msg->header.size = sizeof(AIS_Message) + msg->size; + + if (msg->size < CRM_BZ2_THRESHOLD) { + msg = realloc(msg, msg->header.size); + memcpy(msg->data, data, msg->size); + + } else { + char *compressed = NULL; + unsigned int new_size = 0; + char *uncompressed = strdup(data); + + if (crm_compress_string(uncompressed, msg->size, 0, &compressed, &new_size)) { + + msg->header.size = sizeof(AIS_Message) + new_size + 1; + msg = realloc(msg, msg->header.size); + memcpy(msg->data, compressed, new_size); + msg->data[new_size] = 0; + + msg->is_compressed = TRUE; + msg->compressed_size = new_size; + + } else { + msg = realloc(msg, msg->header.size); + memcpy(msg->data, data, msg->size); + } + + free(uncompressed); + free(compressed); + } + + iov = calloc(1, sizeof(struct iovec)); + iov->iov_base = msg; + iov->iov_len = msg->header.size; + + if (msg->compressed_size) { + crm_trace("Queueing CPG message %u to %s (%d bytes, %d bytes compressed payload): %.200s", + msg->id, target, iov->iov_len, msg->compressed_size, data); + } else { + crm_trace("Queueing CPG message %u to %s (%d bytes, %d bytes payload): %.200s", + msg->id, target, iov->iov_len, msg->size, data); + } + +#if SUPPORT_PLUGIN + /* The plugin is the only time we dont use CPG messaging */ + if(get_cluster_type() == pcmk_cluster_classic_ais) { + return send_plugin_text(class, iov); + } +#endif + + send_cpg_iov(iov); + + free(target); + return TRUE; +} + +enum crm_ais_msg_types +text2msg_type(const char *text) +{ + int type = crm_msg_none; + + CRM_CHECK(text != NULL, return type); + if (safe_str_eq(text, "ais")) { + type = crm_msg_ais; + } else if (safe_str_eq(text, "crm_plugin")) { + type = crm_msg_ais; + } else if (safe_str_eq(text, CRM_SYSTEM_CIB)) { + type = crm_msg_cib; + } else if (safe_str_eq(text, CRM_SYSTEM_CRMD)) { + type = crm_msg_crmd; + } else if (safe_str_eq(text, CRM_SYSTEM_DC)) { + type = crm_msg_crmd; + } else if (safe_str_eq(text, CRM_SYSTEM_TENGINE)) { + type = crm_msg_te; + } else if (safe_str_eq(text, CRM_SYSTEM_PENGINE)) { + type = crm_msg_pe; + } else if (safe_str_eq(text, CRM_SYSTEM_LRMD)) { + type = crm_msg_lrmd; + } else if (safe_str_eq(text, CRM_SYSTEM_STONITHD)) { + type = crm_msg_stonithd; + } else if (safe_str_eq(text, "stonith-ng")) { + type = crm_msg_stonith_ng; + } else if (safe_str_eq(text, "attrd")) { + type = crm_msg_attrd; + + } else { + /* This will normally be a transient client rather than + * a cluster daemon. Set the type to the pid of the client + */ + int scan_rc = sscanf(text, "%d", &type); + + if (scan_rc != 1) { + /* Ensure its sane */ + type = crm_msg_none; + } + } + return type; +} diff --git a/lib/cluster/legacy.c b/lib/cluster/legacy.c index 14749e4..8b16f7e 100644 --- a/lib/cluster/legacy.c +++ b/lib/cluster/legacy.c @@ -31,12 +31,6 @@ # include # include # include -cpg_handle_t pcmk_cpg_handle = 0; - -struct cpg_name pcmk_cpg_group = { - .length = 0, - .value[0] = 0, -}; #endif #if HAVE_CMAP @@ -50,88 +44,8 @@ cman_handle_t pcmk_cman_handle = NULL; int ais_membership_timer = 0; gboolean ais_membership_force = FALSE; -int ais_dispatch(gpointer user_data); - -#define cs_repeat(counter, max, code) do { \ - code; \ - if(rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { \ - counter++; \ - crm_debug("Retrying operation after %ds", counter); \ - sleep(counter); \ - } else { \ - break; \ - } \ - } while(counter < max) - -enum crm_ais_msg_types -text2msg_type(const char *text) -{ - int type = crm_msg_none; - - CRM_CHECK(text != NULL, return type); - if (safe_str_eq(text, "ais")) { - type = crm_msg_ais; - } else if (safe_str_eq(text, "crm_plugin")) { - type = crm_msg_ais; - } else if (safe_str_eq(text, CRM_SYSTEM_CIB)) { - type = crm_msg_cib; - } else if (safe_str_eq(text, CRM_SYSTEM_CRMD)) { - type = crm_msg_crmd; - } else if (safe_str_eq(text, CRM_SYSTEM_DC)) { - type = crm_msg_crmd; - } else if (safe_str_eq(text, CRM_SYSTEM_TENGINE)) { - type = crm_msg_te; - } else if (safe_str_eq(text, CRM_SYSTEM_PENGINE)) { - type = crm_msg_pe; - } else if (safe_str_eq(text, CRM_SYSTEM_LRMD)) { - type = crm_msg_lrmd; - } else if (safe_str_eq(text, CRM_SYSTEM_STONITHD)) { - type = crm_msg_stonithd; - } else if (safe_str_eq(text, "stonith-ng")) { - type = crm_msg_stonith_ng; - } else if (safe_str_eq(text, "attrd")) { - type = crm_msg_attrd; - - } else { - /* This will normally be a transient client rather than - * a cluster daemon. Set the type to the pid of the client - */ - int scan_rc = sscanf(text, "%d", &type); - - if (scan_rc != 1 || type <= crm_msg_stonith_ng) { - /* Ensure its sane */ - type = crm_msg_none; - } - } - return type; -} +int plugin_dispatch(gpointer user_data); -char * -get_ais_data(const AIS_Message * msg) -{ - int rc = BZ_OK; - char *uncompressed = NULL; - unsigned int new_size = msg->size + 1; - - if (msg->is_compressed == FALSE) { - crm_trace("Returning uncompressed message data"); - uncompressed = strdup(msg->data); - - } else { - crm_trace("Decompressing message data"); - uncompressed = calloc(1, new_size); - - rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, (char *)msg->data, - msg->compressed_size, 1, 0); - - CRM_ASSERT(rc == BZ_OK); - CRM_ASSERT(new_size == msg->size); - } - - return uncompressed; -} - -#if SUPPORT_COROSYNC int ais_fd_sync = -1; int ais_fd_async = -1; /* never send messages via this channel */ void *ais_ipc_ctx = NULL; @@ -160,9 +74,6 @@ get_ais_details(uint32_t * id, char **uname) header.id = crm_class_nodeid; header.size = sizeof(cs_ipc_header_response_t); - CRM_CHECK(id != NULL, return FALSE); - CRM_CHECK(uname != NULL, return FALSE); - iov.iov_base = &header; iov.iov_len = header.size; @@ -203,140 +114,7 @@ get_ais_details(uint32_t * id, char **uname) return TRUE; } -static uint32_t get_local_nodeid(cpg_handle_t handle) -{ - int rc = CS_OK; - int retries = 0; - static uint32_t local_nodeid = 0; - cpg_handle_t local_handle = handle; - cpg_callbacks_t cb = { }; - - if(local_nodeid != 0) { - return local_nodeid; - } - -#if 0 - /* Should not be necessary */ - if(get_cluster_type() == pcmk_cluster_classic_ais) { - get_ais_details(&local_nodeid, NULL); - goto done; - } -#endif - - if(local_handle == 0) { - crm_trace("Creating connection"); - cs_repeat(retries, 5, rc = cpg_initialize(&local_handle, &cb)); - } - - if (rc == CS_OK) { - retries = 0; - crm_trace("Performing lookup"); - cs_repeat(retries, 5, rc = cpg_local_get(local_handle, &local_nodeid)); - } - - if (rc != CS_OK) { - crm_err("Could not get local node id from the CPG API: %s (%d)", ais_error2text(rc), rc); - } - - if(handle != local_handle) { - crm_trace("Closing connection %u", local_handle); - cpg_finalize(local_handle); - } - - crm_debug("Local nodeid is %u", local_nodeid); - return local_nodeid; -} - -GListPtr cs_message_queue = NULL; -int cs_message_timer = 0; - -static ssize_t crm_cs_flush(void); - -static gboolean -crm_cs_flush_cb(gpointer data) -{ - cs_message_timer = 0; - crm_cs_flush(); - return FALSE; -} - -#define CS_SEND_MAX 200 -static ssize_t -crm_cs_flush(void) -{ - int sent = 0; - ssize_t rc = 0; - int queue_len = 0; - static unsigned int last_sent = 0; - - if (pcmk_cpg_handle == 0) { - crm_trace("Connection is dead"); - return pcmk_ok; - } - - queue_len = g_list_length(cs_message_queue); - if ((queue_len % 1000) == 0 && queue_len > 1) { - crm_err("CPG queue has grown to %d", queue_len); - - } else if (queue_len == CS_SEND_MAX) { - crm_warn("CPG queue has grown to %d", queue_len); - } - - if (cs_message_timer) { - /* There is already a timer, wait until it goes off */ - crm_trace("Timer active %d", cs_message_timer); - return pcmk_ok; - } - - while (cs_message_queue && sent < CS_SEND_MAX) { - AIS_Message *header = NULL; - struct iovec *iov = cs_message_queue->data; - - errno = 0; - rc = cpg_mcast_joined(pcmk_cpg_handle, CPG_TYPE_AGREED, iov, 1); - - if (rc != CS_OK) { - break; - } - - sent++; - header = iov->iov_base; - last_sent = header->id; - if (header->compressed_size) { - crm_trace("CPG message %d (%d compressed bytes) sent", - header->id, header->compressed_size); - } else { - crm_trace("CPG message %d (%d bytes) sent: %.200s", - header->id, header->size, header->data); - } - - cs_message_queue = g_list_remove(cs_message_queue, iov); - free(iov[0].iov_base); - free(iov); - } - - queue_len -= sent; - if (sent > 1 || cs_message_queue) { - crm_info("Sent %d CPG messages (%d remaining, last=%u): %s (%d)", - sent, queue_len, last_sent, ais_error2text(rc), rc); - } else { - crm_trace("Sent %d CPG messages (%d remaining, last=%u): %s (%d)", - sent, queue_len, last_sent, ais_error2text(rc), rc); - } - - if (cs_message_queue) { - uint32_t delay_ms = 100; - if(rc != CS_OK) { - /* Proportionally more if sending failed but cap at 1s */ - delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len)); - } - cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, NULL); - } - - return rc; -} - -static bool +bool send_plugin_text(int class, struct iovec *iov) { int rc = CS_OK; @@ -386,154 +164,8 @@ send_plugin_text(int class, struct iovec *iov) return (rc == CS_OK); } -gboolean -send_ais_text(int class, const char *data, - gboolean local, crm_node_t * node, enum crm_ais_msg_types dest) -{ - static int msg_id = 0; - static int local_pid = 0; - static int local_name_len = 0; - static const char *local_name = NULL; - - char *target = NULL; - struct iovec *iov; - AIS_Message *ais_msg = NULL; - enum cluster_type_e cluster_type = get_cluster_type(); - enum crm_ais_msg_types sender = text2msg_type(crm_system_name); - - /* There are only 6 handlers registered to crm_lib_service in plugin.c */ - CRM_CHECK(class < 6, crm_err("Invalid message class: %d", class); - return FALSE); - - CRM_CHECK(dest != crm_msg_ais, return FALSE); - - if(local_name == NULL) { - local_name = get_local_node_name(); - } - if(local_name_len == 0 && local_name) { - local_name_len = strlen(local_name); - } - - if (data == NULL) { - data = ""; - } - - if (local_pid == 0) { - local_pid = getpid(); - } - - if (sender == crm_msg_none) { - sender = local_pid; - } - - ais_msg = calloc(1, sizeof(AIS_Message)); - - ais_msg->id = msg_id++; - ais_msg->header.id = class; - ais_msg->header.error = CS_OK; - - ais_msg->host.type = dest; - ais_msg->host.local = local; - - if (node) { - if (node->uname) { - target = strdup(node->uname); - ais_msg->host.size = strlen(node->uname); - memset(ais_msg->host.uname, 0, MAX_NAME); - memcpy(ais_msg->host.uname, node->uname, ais_msg->host.size); - } else { - target = g_strdup_printf("%u", node->id); - } - ais_msg->host.id = node->id; - } else { - target = strdup("all"); - } - - ais_msg->sender.id = 0; - ais_msg->sender.type = sender; - ais_msg->sender.pid = local_pid; - ais_msg->sender.size = local_name_len; - memset(ais_msg->sender.uname, 0, MAX_NAME); - memcpy(ais_msg->sender.uname, local_name, ais_msg->sender.size); - - ais_msg->size = 1 + strlen(data); - ais_msg->header.size = sizeof(AIS_Message) + ais_msg->size; - - if (ais_msg->size < CRM_BZ2_THRESHOLD) { - ais_msg = realloc(ais_msg, ais_msg->header.size); - memcpy(ais_msg->data, data, ais_msg->size); - - } else { - char *compressed = NULL; - unsigned int new_size = 0; - char *uncompressed = strdup(data); - - if (crm_compress_string(uncompressed, ais_msg->size, 0, &compressed, &new_size)) { - - ais_msg->header.size = sizeof(AIS_Message) + new_size + 1; - ais_msg = realloc(ais_msg, ais_msg->header.size); - memcpy(ais_msg->data, compressed, new_size); - ais_msg->data[new_size] = 0; - - ais_msg->is_compressed = TRUE; - ais_msg->compressed_size = new_size; - - } else { - ais_msg = realloc(ais_msg, ais_msg->header.size); - memcpy(ais_msg->data, data, ais_msg->size); - } - - free(uncompressed); - free(compressed); - } - - iov = calloc(1, sizeof(struct iovec)); - iov->iov_base = ais_msg; - iov->iov_len = ais_msg->header.size; - - if (ais_msg->compressed_size) { - crm_trace("Queueing %s message %u to %s (%d compressed bytes)", - cluster_type == pcmk_cluster_classic_ais?"plugin":"CPG", - ais_msg->id, target, ais_msg->compressed_size); - } else { - crm_trace("Queueing %s message %u to %s (%d bytes)", - cluster_type == pcmk_cluster_classic_ais?"plugin":"CPG", - ais_msg->id, target, ais_msg->size); - } - - /* The plugin is the only time we dont use CPG messaging */ - if(cluster_type == pcmk_cluster_classic_ais) { - return send_plugin_text(class, iov); - } - - cs_message_queue = g_list_append(cs_message_queue, iov); - crm_cs_flush(); - - free(target); - return TRUE; -} - -gboolean -send_ais_message(xmlNode * msg, gboolean local, crm_node_t * node, enum crm_ais_msg_types dest) -{ - gboolean rc = TRUE; - char *data = NULL; - - if (is_classic_ais_cluster()) { - if (ais_fd_async < 0) { - crm_err("Not connected to AIS: %d", ais_fd_async); - return FALSE; - } - } - - data = dump_xml_unformatted(msg); - rc = send_ais_text(crm_class_cluster, data, local, node, dest); - free(data); - return rc; -} - void -terminate_cs_connection(void) +terminate_cs_connection(crm_cluster_t *cluster) { crm_notice("Disconnecting from Corosync"); @@ -545,20 +177,8 @@ terminate_cs_connection(void) } else { crm_info("No plugin connection"); } - - } else { - if (pcmk_cpg_handle) { - crm_info("Disconnecting CPG"); - if (cpg_leave(pcmk_cpg_handle, &pcmk_cpg_group) == CS_OK) { - crm_info("Destroying CPG"); - cpg_finalize(pcmk_cpg_handle); - } - pcmk_cpg_handle = 0; - - } else { - crm_info("No CPG connection"); - } } + cluster_disconnect_cpg(cluster); # if SUPPORT_CMAN if (is_cman_cluster()) { @@ -578,155 +198,66 @@ terminate_cs_connection(void) ais_fd_sync = -1; } -static crm_node_t * -crm_update_ais_node(xmlNode * member, long long seq) -{ - const char *id_s = crm_element_value(member, "id"); - const char *addr = crm_element_value(member, "addr"); - const char *uname = crm_element_value(member, "uname"); - const char *state = crm_element_value(member, "state"); - const char *born_s = crm_element_value(member, "born"); - const char *seen_s = crm_element_value(member, "seen"); - const char *votes_s = crm_element_value(member, "votes"); - const char *procs_s = crm_element_value(member, "processes"); - - int votes = crm_int_helper(votes_s, NULL); - unsigned int id = crm_int_helper(id_s, NULL); - unsigned int procs = crm_int_helper(procs_s, NULL); - - /* TODO: These values will contain garbage if version < 0.7.1 */ - uint64_t born = crm_int_helper(born_s, NULL); - uint64_t seen = crm_int_helper(seen_s, NULL); - - return crm_update_peer(__FUNCTION__, id, born, seen, votes, procs, uname, uname, addr, state); -} - -static gboolean -ais_dispatch_message(AIS_Message * msg, - gboolean(*dispatch) (int kind, const char *from, const char *data)) +void +plugin_handle_membership(AIS_Message *msg) { - char *data = NULL; - char *uncompressed = NULL; - - xmlNode *xml = NULL; + if (msg->header.id == crm_class_members || msg->header.id == crm_class_quorum) { + xmlNode *member = NULL; + const char *value = NULL; + gboolean quorate = FALSE; + xmlNode *xml = string2xml(msg->data); - CRM_ASSERT(msg != NULL); - - crm_trace("Got new%s message (size=%d, %d, %d)", - msg->is_compressed ? " compressed" : "", - ais_data_len(msg), msg->size, msg->compressed_size); - - data = msg->data; - if (msg->is_compressed && msg->size > 0) { - int rc = BZ_OK; - unsigned int new_size = msg->size + 1; - - if (check_message_sanity(msg, NULL) == FALSE) { - goto badmsg; + if (xml == NULL) { + crm_err("Invalid membership update: %s", msg->data); + return; } - crm_trace("Decompressing message data"); - uncompressed = calloc(1, new_size); - rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, data, msg->compressed_size, 1, 0); - - if (rc != BZ_OK) { - crm_err("Decompression failed: %d", rc); - goto badmsg; + value = crm_element_value(xml, "quorate"); + CRM_CHECK(value != NULL, crm_log_xml_err(xml, "No quorum value:"); return); + if (crm_is_true(value)) { + quorate = TRUE; } - CRM_ASSERT(rc == BZ_OK); - CRM_ASSERT(new_size == msg->size); - - data = uncompressed; - - } else if (check_message_sanity(msg, data) == FALSE) { - goto badmsg; - - } else if (safe_str_eq("identify", data)) { - int pid = getpid(); - char *pid_s = crm_itoa(pid); - - send_ais_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais); - free(pid_s); - goto done; - } - - if (msg->header.id != crm_class_members) { - crm_get_peer(msg->sender.id, msg->sender.uname); - } - - if (msg->header.id == crm_class_rmpeer) { - uint32_t id = crm_int_helper(data, NULL); - - crm_info("Removing peer %s/%u", data, id); - reap_crm_member(id, NULL); - goto done; - - } else if (is_classic_ais_cluster()) { - if (msg->header.id == crm_class_members || msg->header.id == crm_class_quorum) { - xmlNode *node = NULL; - const char *value = NULL; - gboolean quorate = FALSE; - - xml = string2xml(data); - if (xml == NULL) { - crm_err("Invalid membership update: %s", data); - goto badmsg; - } - - value = crm_element_value(xml, "quorate"); - CRM_CHECK(value != NULL, crm_log_xml_err(xml, "No quorum value:"); - goto badmsg); - if (crm_is_true(value)) { - quorate = TRUE; - } - - value = crm_element_value(xml, "id"); - CRM_CHECK(value != NULL, crm_log_xml_err(xml, "No membership id"); - goto badmsg); - crm_peer_seq = crm_int_helper(value, NULL); + value = crm_element_value(xml, "id"); + CRM_CHECK(value != NULL, crm_log_xml_err(xml, "No membership id"); return); + crm_peer_seq = crm_int_helper(value, NULL); - if (quorate != crm_have_quorum) { - crm_notice("Membership %s: quorum %s", value, quorate ? "acquired" : "lost"); - crm_have_quorum = quorate; + if (quorate != crm_have_quorum) { + crm_notice("Membership %s: quorum %s", value, quorate ? "acquired" : "lost"); + crm_have_quorum = quorate; - } else { - crm_info("Membership %s: quorum %s", value, quorate ? "retained" : "still lost"); - } - - for (node = __xml_first_child(xml); node != NULL; node = __xml_next(node)) { - crm_update_ais_node(node, crm_peer_seq); - } + } else { + crm_info("Membership %s: quorum %s", value, quorate ? "retained" : "still lost"); } - } - crm_trace("Payload: %s", data); - if (dispatch != NULL) { - dispatch(msg->header.id, msg->sender.uname, data); + for (member = __xml_first_child(xml); member != NULL; member = __xml_next(member)) { + const char *id_s = crm_element_value(member, "id"); + const char *addr = crm_element_value(member, "addr"); + const char *uname = crm_element_value(member, "uname"); + const char *state = crm_element_value(member, "state"); + const char *born_s = crm_element_value(member, "born"); + const char *seen_s = crm_element_value(member, "seen"); + const char *votes_s = crm_element_value(member, "votes"); + const char *procs_s = crm_element_value(member, "processes"); + + int votes = crm_int_helper(votes_s, NULL); + unsigned int id = crm_int_helper(id_s, NULL); + unsigned int procs = crm_int_helper(procs_s, NULL); + + /* TODO: These values will contain garbage if version < 0.7.1 */ + uint64_t born = crm_int_helper(born_s, NULL); + uint64_t seen = crm_int_helper(seen_s, NULL); + + crm_update_peer(__FUNCTION__, id, born, seen, votes, procs, uname, uname, addr, state); + } } - - done: - free(uncompressed); - free_xml(xml); - return TRUE; - - badmsg: - crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):" - " min=%d, total=%d, size=%d, bz2_size=%d", - msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type), - ais_dest(&(msg->sender)), msg_type2text(msg->sender.type), - msg->sender.pid, (int)sizeof(AIS_Message), - msg->header.size, msg->size, msg->compressed_size); - goto done; } int -ais_dispatch(gpointer user_data) +plugin_dispatch(gpointer user_data) { int rc = CS_OK; - gboolean good = TRUE; - - gboolean(*dispatch) (int kind, const char *from, const char *data) = user_data; + crm_cluster_t *cluster = (crm_cluster_t *) user_data; do { char *buffer = NULL; @@ -743,20 +274,20 @@ ais_dispatch(gpointer user_data) /* NULL is a legal "no message afterall" value */ return 0; } - good = ais_dispatch_message((AIS_Message *) buffer, dispatch); + /* + cpg_deliver_fn_t(cpg_handle_t handle, const struct cpg_name *group_name, + uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len); + */ + cluster->cpg.cpg_deliver_fn(0, NULL, 0, 0, buffer, 0); coroipcc_dispatch_put(ais_ipc_handle); - } while (good && ais_ipc_handle); - - if (good) { - return 0; - } + } while (ais_ipc_handle); - return -1; + return 0; } static void -ais_destroy(gpointer user_data) +plugin_destroy(gpointer user_data) { crm_err("AIS connection terminated"); ais_fd_sync = -1; @@ -896,179 +427,6 @@ init_cman_connection(gboolean(*dispatch) (unsigned long long, gboolean), void (* } # ifdef SUPPORT_COROSYNC -gboolean(*pcmk_cpg_dispatch_fn) (int kind, const char *from, const char *data) = NULL; -static bool cpg_evicted = FALSE; - -static int -pcmk_cpg_dispatch(gpointer user_data) -{ - int rc = 0; - - pcmk_cpg_dispatch_fn = user_data; - rc = cpg_dispatch(pcmk_cpg_handle, CS_DISPATCH_ALL); - if (rc != CS_OK) { - crm_err("Connection to the CPG API failed: %d", rc); - pcmk_cpg_handle = 0; - return -1; - - } else if(cpg_evicted) { - crm_err("Evicted from CPG membership"); - return -1; - } - return 0; -} - -static void -pcmk_cpg_deliver(cpg_handle_t handle, - const struct cpg_name *groupName, - uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len) -{ - AIS_Message *ais_msg = (AIS_Message *) msg; - uint32_t local_nodeid = get_local_nodeid(handle); - const char *local_name = get_local_node_name(); - - if (ais_msg->sender.id > 0 && ais_msg->sender.id != nodeid) { - crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, ais_msg->sender.id); - return; - - } else if (ais_msg->host.id != 0 && (local_nodeid != ais_msg->host.id)) { - /* Not for us */ - return; - - } else if (ais_msg->host.size != 0 && safe_str_neq(ais_msg->host.uname, local_name)) { - /* Not for us */ - return; - } - - ais_msg->sender.id = nodeid; - if (ais_msg->sender.size == 0) { - crm_node_t *peer = crm_get_peer(nodeid, NULL); - - if (peer == NULL) { - crm_err("Peer with nodeid=%u is unknown", nodeid); - - } else if (peer->uname == NULL) { - crm_err("No uname for peer with nodeid=%u", nodeid); - - } else { - crm_notice("Fixing uname for peer with nodeid=%u", nodeid); - ais_msg->sender.size = strlen(peer->uname); - memset(ais_msg->sender.uname, 0, MAX_NAME); - memcpy(ais_msg->sender.uname, peer->uname, ais_msg->sender.size); - } - } - - ais_dispatch_message(ais_msg, pcmk_cpg_dispatch_fn); -} - -static void -pcmk_cpg_membership(cpg_handle_t handle, - const struct cpg_name *groupName, - const struct cpg_address *member_list, size_t member_list_entries, - const struct cpg_address *left_list, size_t left_list_entries, - const struct cpg_address *joined_list, size_t joined_list_entries) -{ - int i; - gboolean found = FALSE; - static int counter = 0; - uint32_t local_nodeid = get_local_nodeid(handle); - - for (i = 0; i < left_list_entries; i++) { - crm_node_t *peer = crm_get_peer(left_list[i].nodeid, NULL); - - crm_info("Left[%d.%d] %s.%u ", counter, i, groupName->value, left_list[i].nodeid); - crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, OFFLINESTATUS); - } - - for (i = 0; i < joined_list_entries; i++) { - crm_info("Joined[%d.%d] %s.%u ", counter, i, groupName->value, joined_list[i].nodeid); - } - - for (i = 0; i < member_list_entries; i++) { - crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL); - - crm_info("Member[%d.%d] %s.%u ", counter, i, groupName->value, member_list[i].nodeid); - crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS); - if (local_nodeid == member_list[i].nodeid) { - found = TRUE; - } - } - - if (!found) { - crm_err("We're not part of CPG group %s anymore!", groupName->value); - cpg_evicted = TRUE; - } - - counter++; -} - -cpg_callbacks_t cpg_callbacks = { - .cpg_deliver_fn = pcmk_cpg_deliver, - .cpg_confchg_fn = pcmk_cpg_membership, -}; -# endif - -static gboolean -init_cpg_connection(crm_cluster_t * cluster) -{ -# ifdef SUPPORT_COROSYNC - int rc = -1; - int fd = 0; - int retries = 0; - crm_node_t *peer = NULL; - - struct mainloop_fd_callbacks cpg_fd_callbacks = { - .dispatch = pcmk_cpg_dispatch, - .destroy = cluster->destroy, - }; - - cpg_evicted = FALSE; - strcpy(pcmk_cpg_group.value, crm_system_name); - pcmk_cpg_group.length = strlen(crm_system_name) + 1; - - cs_repeat(retries, 30, rc = cpg_initialize(&pcmk_cpg_handle, &cpg_callbacks)); - if (rc != CS_OK) { - crm_err("Could not connect to the Cluster Process Group API: %d\n", rc); - goto bail; - } - - retries = 0; - cs_repeat(retries, 30, rc = cpg_local_get(pcmk_cpg_handle, (unsigned int *)&cluster->nodeid)); - if (rc != CS_OK) { - crm_err("Could not get local node id from the CPG API"); - goto bail; - } - - retries = 0; - cs_repeat(retries, 30, rc = cpg_join(pcmk_cpg_handle, &pcmk_cpg_group)); - if (rc != CS_OK) { - crm_err("Could not join the CPG group '%s': %d", crm_system_name, rc); - goto bail; - } - - rc = cpg_fd_get(pcmk_cpg_handle, &fd); - if (rc != CS_OK) { - crm_err("Could not obtain the CPG API connection: %d\n", rc); - goto bail; - } - - mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster->cs_dispatch, &cpg_fd_callbacks); - - bail: - if (rc != CS_OK) { - cpg_finalize(pcmk_cpg_handle); - return FALSE; - } - - peer = crm_get_peer(cluster->nodeid, NULL); - crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS); - -# else - crm_err("The Corosync CPG API is not supported in this build"); - crm_exit(DAEMON_RESPAWN_STOP); -# endif - return TRUE; -} gboolean init_quorum_connection(gboolean(*dispatch) (unsigned long long, gboolean), @@ -1086,9 +444,11 @@ init_cs_connection_classic(crm_cluster_t * cluster) int pid = 0; char *pid_s = NULL; const char *name = NULL; + crm_node_t *peer = NULL; + enum crm_proc_flag proc = 0; struct mainloop_fd_callbacks ais_fd_callbacks = { - .dispatch = ais_dispatch, + .dispatch = plugin_dispatch, .destroy = cluster->destroy, }; @@ -1099,7 +459,7 @@ init_cs_connection_classic(crm_cluster_t * cluster) if (ais_ipc_handle) { coroipcc_fd_get(ais_ipc_handle, &ais_fd_async); } else { - crm_info("Connection to our AIS plugin (%d) failed: %s (%d)", + crm_info("Connection to our Corosync plugin (%d) failed: %s (%d)", PCMK_SERVICE_ID, strerror(errno), errno); return FALSE; } @@ -1108,7 +468,7 @@ init_cs_connection_classic(crm_cluster_t * cluster) rc = CS_ERR_LIBRARY; } if (rc != CS_OK) { - crm_info("Connection to our AIS plugin (%d) failed: %s (%d)", PCMK_SERVICE_ID, + crm_info("Connection to our Corosync plugin (%d) failed: %s (%d)", PCMK_SERVICE_ID, ais_error2text(rc), rc); } @@ -1117,16 +477,15 @@ init_cs_connection_classic(crm_cluster_t * cluster) } if (ais_fd_callbacks.destroy == NULL) { - ais_fd_callbacks.destroy = ais_destroy; + ais_fd_callbacks.destroy = plugin_destroy; } - mainloop_add_fd("corosync-plugin", G_PRIORITY_MEDIUM, ais_fd_async, cluster->cs_dispatch, - &ais_fd_callbacks); + mainloop_add_fd("corosync-plugin", G_PRIORITY_MEDIUM, ais_fd_async, cluster, &ais_fd_callbacks); crm_info("AIS connection established"); pid = getpid(); pid_s = crm_itoa(pid); - send_ais_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais); + send_cluster_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais); free(pid_s); cluster->nodeid = get_local_nodeid(0); @@ -1141,6 +500,9 @@ init_cs_connection_classic(crm_cluster_t * cluster) crm_exit(ENOTUNIQ); } + proc = text2proc(crm_system_name); + peer = crm_get_peer(cluster->nodeid, cluster->uname); + crm_update_peer_proc(__FUNCTION__, peer, proc|crm_proc_plugin, ONLINESTATUS); return TRUE; } @@ -1275,7 +637,7 @@ init_cs_connection_once(crm_cluster_t * cluster) } break; case pcmk_cluster_cman: - if (init_cpg_connection(cluster) == FALSE) { + if (cluster_connect_cpg(cluster) == FALSE) { return FALSE; } cluster->uname = cman_node_name(0 /* CMAN_NODEID_US */ ); diff --git a/lib/cluster/membership.c b/lib/cluster/membership.c index a1e044c..875c1c8 100644 --- a/lib/cluster/membership.c +++ b/lib/cluster/membership.c @@ -125,7 +125,7 @@ crm_active_peers(void) return count; } -void +static void destroy_crm_node(gpointer data) { crm_node_t *node = data; @@ -143,14 +143,6 @@ destroy_crm_node(gpointer data) void crm_peer_init(void) { - static gboolean initialized = FALSE; - - if (initialized) { - return; - } - initialized = TRUE; - - crm_peer_destroy(); if (crm_peer_cache == NULL) { crm_peer_cache = g_hash_table_new_full(crm_str_hash, g_str_equal, free, destroy_crm_node); } diff --git a/lib/common/logging.c b/lib/common/logging.c index a1b01f2..155a068 100644 --- a/lib/common/logging.c +++ b/lib/common/logging.c @@ -95,6 +95,10 @@ crm_glib_handler(const gchar * log_domain, GLogLevelFlags flags, const gchar * m static void crm_trigger_blackbox(int nsig) { + if(nsig == SIGTRAP) { + /* Turn it on if it wasn't already */ + crm_enable_blackbox(nsig); + } crm_write_blackbox(nsig, NULL); } @@ -344,15 +348,6 @@ crm_enable_blackbox(int nsig) crm_update_callsites(); - /* Original meanings from signal(7) - * - * Signal Value Action Comment - * SIGTRAP 5 Core Trace/breakpoint trap - * - * Our usage is as similar as possible - */ - mainloop_add_signal(SIGTRAP, crm_trigger_blackbox); - blackbox_trigger = qb_log_custom_open(blackbox_logger, NULL, NULL, NULL); qb_log_ctl(blackbox_trigger, QB_LOG_CONF_ENABLED, QB_TRUE); crm_trace("Trigger: %d is %d %d", blackbox_trigger, @@ -762,7 +757,17 @@ crm_log_init(const char *entity, int level, gboolean daemon, gboolean to_stderr, } #endif } + + /* Original meanings from signal(7) + * + * Signal Value Action Comment + * SIGTRAP 5 Core Trace/breakpoint trap + * SIGUSR1 30,10,16 Term User-defined signal 1 + * + * Our usage is as similar as possible + */ mainloop_add_signal(SIGUSR1, crm_enable_blackbox); + mainloop_add_signal(SIGTRAP, crm_trigger_blackbox); } crm_xml_init(); /* Sets buffer allocation strategy */ diff --git a/lib/services/systemd.c b/lib/services/systemd.c index 886cb35..2a66da5 100644 --- a/lib/services/systemd.c +++ b/lib/services/systemd.c @@ -407,6 +407,8 @@ systemd_unit_exec_done(GObject * source_object, GAsyncResult * res, gpointer use } } +#define SYSTEMD_OVERRIDE_ROOT "/run/systemd/system/" + gboolean systemd_unit_exec(svc_action_t * op, gboolean synchronous) { @@ -453,9 +455,42 @@ systemd_unit_exec(svc_action_t * op, gboolean synchronous) goto cleanup; } else if (g_strcmp0(action, "start") == 0) { + FILE *file_strm = NULL; + char *override_dir = g_strdup_printf("%s/%s", SYSTEMD_OVERRIDE_ROOT, unit); + char *override_file = g_strdup_printf("%s/50-pacemaker.conf", override_dir); + action = "StartUnit"; + crm_build_path(override_dir, 0755); + + file_strm = fopen(override_file, "w"); + if (file_strm != NULL) { + int rc = fprintf(file_strm, "[Service]\nRestart=no"); + if (rc < 0) { + crm_perror(LOG_ERR, "Cannot write to systemd override file %s: %s (%d)", override_file, pcmk_strerror(errno), errno); + } + + } else { + crm_err("Cannot open systemd override file %s for writing: %s (%d)", override_file, pcmk_strerror(errno), errno); + } + + if (file_strm != NULL) { + fflush(file_strm); + fclose(file_strm); + } + systemd_daemon_reload(systemd_proxy, &error); + g_error_free(error); error = NULL; + free(override_file); + free(override_dir); + } else if (g_strcmp0(action, "stop") == 0) { + char *override_file = g_strdup_printf("%s/%s/50-pacemaker.conf", SYSTEMD_OVERRIDE_ROOT, unit); + action = "StopUnit"; + unlink(override_file); + free(override_file); + systemd_daemon_reload(systemd_proxy, &error); + g_error_free(error); error = NULL; + } else if (g_strcmp0(action, "restart") == 0) { action = "RestartUnit"; } else { diff --git a/lrmd/Makefile.am b/lrmd/Makefile.am index 73f1d7e..82cb65f 100644 --- a/lrmd/Makefile.am +++ b/lrmd/Makefile.am @@ -27,7 +27,7 @@ initdir = $(INITDIR) init_SCRIPTS = pacemaker_remote sbin_PROGRAMS = pacemaker_remoted -if HAVE_SYSTEMD +if BUILD_SYSTEMD systemdunit_DATA = pacemaker_remote.service endif diff --git a/mcp/Makefile.am b/mcp/Makefile.am index 73a71c4..f98f286 100644 --- a/mcp/Makefile.am +++ b/mcp/Makefile.am @@ -29,7 +29,7 @@ if BUILD_HELP man8_MANS = $(sbin_PROGRAMS:%=%.8) endif -if HAVE_SYSTEMD +if BUILD_SYSTEMD systemdunit_DATA = pacemaker.service endif diff --git a/mcp/corosync.c b/mcp/corosync.c index 64d6eb5..ca37871 100644 --- a/mcp/corosync.c +++ b/mcp/corosync.c @@ -43,13 +43,7 @@ # include #endif -static struct cpg_name cpg_group = { - .length = 0, - .value[0] = 0, -}; - enum cluster_type_e stack = pcmk_cluster_unknown; -static cpg_handle_t cpg_handle; static corosync_cfg_handle_t cfg_handle; /* =::=::=::= CFG - Shutdown stuff =::=::=::= */ @@ -155,169 +149,6 @@ cluster_connect_cfg(uint32_t * nodeid) return FALSE; } -/* =::=::=::= CPG - Closed Process Group Messaging =::=::=::= */ - -static int -pcmk_cpg_dispatch(gpointer user_data) -{ - cpg_handle_t *handle = (cpg_handle_t *) user_data; - cs_error_t rc = cpg_dispatch(*handle, CS_DISPATCH_ALL); - - if (rc != CS_OK) { - return -1; - } - return 0; -} - -static void -cpg_connection_destroy(gpointer user_data) -{ - crm_err("Connection destroyed"); - cpg_handle = 0; - crm_exit(ENOTCONN); -} - -static void -pcmk_cpg_deliver(cpg_handle_t handle, - const struct cpg_name *groupName, - uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len) -{ - if (nodeid != local_nodeid) { - uint32_t procs = 0; - xmlNode *xml = string2xml(msg); - const char *uname = crm_element_value(xml, "uname"); - - crm_element_value_int(xml, "proclist", (int *)&procs); - /* crm_debug("Got proclist %.32x from %s", procs, uname); */ - if (update_node_processes(nodeid, uname, procs)) { - update_process_clients(); - } - } -} - -static void -pcmk_cpg_membership(cpg_handle_t handle, - const struct cpg_name *groupName, - const struct cpg_address *member_list, size_t member_list_entries, - const struct cpg_address *left_list, size_t left_list_entries, - const struct cpg_address *joined_list, size_t joined_list_entries) -{ - /* Don't care about CPG membership */ - update_process_peers(); -} - -cpg_callbacks_t cpg_callbacks = { - .cpg_deliver_fn = pcmk_cpg_deliver, - .cpg_confchg_fn = pcmk_cpg_membership, -}; - -gboolean -cluster_disconnect_cpg(void) -{ - if (cpg_handle) { - cpg_finalize(cpg_handle); - cpg_handle = 0; - } - return TRUE; -} - -gboolean -cluster_connect_cpg(void) -{ - cs_error_t rc; - unsigned int nodeid; - int fd; - int retries = 0; - - static struct mainloop_fd_callbacks cpg_fd_callbacks = { - .dispatch = pcmk_cpg_dispatch, - .destroy = cpg_connection_destroy, - }; - - strcpy(cpg_group.value, "pcmk"); - cpg_group.length = strlen(cpg_group.value) + 1; - - retries = 0; - cs_repeat(retries, 30, rc = cpg_initialize(&cpg_handle, &cpg_callbacks)); - if (rc != CS_OK) { - crm_err("corosync cpg init error %d", rc); - return FALSE; - } - - rc = cpg_fd_get(cpg_handle, &fd); - if (rc != CS_OK) { - crm_err("corosync cpg fd_get error %d", rc); - goto bail; - } - - retries = 0; - cs_repeat(retries, 30, rc = cpg_local_get(cpg_handle, &nodeid)); - if (rc != CS_OK) { - crm_err("corosync cpg local_get error %d", rc); - goto bail; - } - - crm_debug("Our nodeid: %d", nodeid); - - retries = 0; - cs_repeat(retries, 30, rc = cpg_join(cpg_handle, &cpg_group)); - - if (rc != CS_OK) { - crm_err("Could not join the CPG group '%s': %d", crm_system_name, rc); - goto bail; - } - - mainloop_add_fd("corosync-cpg", G_PRIORITY_DEFAULT, fd, &cpg_handle, &cpg_fd_callbacks); - return TRUE; - - bail: - cpg_finalize(cpg_handle); - return FALSE; -} - -gboolean -send_cpg_message(struct iovec * iov) -{ - int rc = CS_OK; - int retries = 0; - - errno = 0; - - do { - rc = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, iov, 1); - if (rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { - cpg_flow_control_state_t fc_state = CPG_FLOW_CONTROL_DISABLED; - int rc2 = cpg_flow_control_state_get(cpg_handle, &fc_state); - - if (rc2 == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) { - crm_debug("Attempting to clear cpg dispatch queue"); - rc2 = cpg_dispatch(cpg_handle, CS_DISPATCH_ALL); - } - - if (rc2 != CS_OK) { - crm_warn("Could not check/clear the cpg connection"); - goto bail; - - } else { - retries++; - crm_debug("Retrying operation after %ds", retries); - sleep(retries); - } - } else { - break; - } - - /* 5 retires is plenty, we'll resend once the membership reforms anyway */ - } while (retries < 5); - - bail: - if (rc != CS_OK) { - crm_err("Sending message via cpg FAILED: (rc=%d) %s", rc, ais_error2text(rc)); - } - - return (rc == CS_OK); -} - /* =::=::=::= Configuration =::=::=::= */ #if HAVE_CONFDB static int @@ -447,7 +278,7 @@ read_config(void) #if HAVE_CONFDB char *value = NULL; - confdb_handle_t config; + confdb_handle_t config = 0; confdb_handle_t top_handle = 0; hdb_handle_t local_handle; static confdb_callbacks_t callbacks = { }; @@ -456,7 +287,8 @@ read_config(void) rc = confdb_initialize(&config, &callbacks); if (rc != CS_OK) { retries++; - printf("Connection setup failed: %d. Retrying in %ds\n", rc, retries); + printf("confdb connection setup failed: %s. Retrying in %ds\n", ais_error2text(rc), retries); + crm_info("confdb connection setup failed: %s. Retrying in %ds", ais_error2text(rc), retries); sleep(retries); } else { @@ -473,8 +305,8 @@ read_config(void) rc = cmap_initialize(&local_handle); if (rc != CS_OK) { retries++; - printf("API connection setup failed: %s. Retrying in %ds\n", cs_strerror(rc), retries); - crm_info("API connection setup failed: %s. Retrying in %ds", cs_strerror(rc), retries); + printf("cmap connection setup failed: %s. Retrying in %ds\n", cs_strerror(rc), retries); + crm_info("cmap connection setup failed: %s. Retrying in %ds", cs_strerror(rc), retries); sleep(retries); } else { diff --git a/mcp/pacemaker.c b/mcp/pacemaker.c index 47fdd68..6f8d9b9 100644 --- a/mcp/pacemaker.c +++ b/mcp/pacemaker.c @@ -29,6 +29,7 @@ #include #include #include +#include #include #include @@ -44,22 +45,6 @@ uint32_t local_nodeid = 0; crm_trigger_t *shutdown_trigger = NULL; const char *pid_file = "/var/run/pacemaker.pid"; -/* *INDENT-OFF* */ -enum crm_proc_flag { - crm_proc_none = 0x00000001, - crm_proc_plugin = 0x00000002, - crm_proc_lrmd = 0x00000010, - crm_proc_cib = 0x00000100, - crm_proc_crmd = 0x00000200, - crm_proc_attrd = 0x00001000, - crm_proc_stonithd = 0x00002000, - crm_proc_pe = 0x00010000, - crm_proc_te = 0x00020000, - crm_proc_mgmtd = 0x00040000, - crm_proc_stonith_ng = 0x00100000, -}; -/* *INDENT-ON* */ - typedef struct pcmk_child_s { int pid; long flag; @@ -539,8 +524,10 @@ update_process_clients(void) void update_process_peers(void) { + /* Do nothing for corosync-2 based clusters */ + char buffer[1024]; - struct iovec iov; + struct iovec *iov; int rc = 0; memset(buffer, 0, SIZEOF(buffer)); @@ -552,11 +539,11 @@ update_process_peers(void) rc = snprintf(buffer, SIZEOF(buffer) - 1, "", get_process_list()); } - iov.iov_base = buffer; - iov.iov_len = rc + 1; - crm_trace("Sending %s", buffer); - send_cpg_message(&iov); + iov = calloc(1, sizeof(struct iovec)); + iov->iov_base = strdup(buffer); + iov->iov_len = rc + 1; + send_cpg_iov(iov); } gboolean @@ -619,6 +606,7 @@ update_node_processes(uint32_t id, const char *uname, uint32_t procs) return changed; } + /* *INDENT-OFF* */ static struct crm_option long_options[] = { /* Top-level Options */ @@ -779,6 +767,42 @@ init_children_processes(void) } } +static void +mcp_cpg_destroy(gpointer user_data) +{ + crm_err("Connection destroyed"); + crm_exit(ENOTCONN); +} + +static void +mcp_cpg_deliver(cpg_handle_t handle, + const struct cpg_name *groupName, + uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len) +{ + if (nodeid != local_nodeid) { + uint32_t procs = 0; + xmlNode *xml = string2xml(msg); + const char *uname = crm_element_value(xml, "uname"); + + crm_element_value_int(xml, "proclist", (int *)&procs); + /* crm_debug("Got proclist %.32x from %s", procs, uname); */ + if (update_node_processes(nodeid, uname, procs)) { + update_process_clients(); + } + } +} + +static void +mcp_cpg_membership(cpg_handle_t handle, + const struct cpg_name *groupName, + const struct cpg_address *member_list, size_t member_list_entries, + const struct cpg_address *left_list, size_t left_list_entries, + const struct cpg_address *joined_list, size_t joined_list_entries) +{ + /* Don't care about CPG membership, but we do want to broadcast our own presence */ + update_process_peers(); +} + int main(int argc, char **argv) { @@ -795,6 +819,7 @@ main(int argc, char **argv) crm_ipc_t *old_instance = NULL; qb_ipcs_service_t *ipcs = NULL; const char *facility = daemon_option("logfacility"); + static crm_cluster_t cluster; setenv("LC_ALL", "C", 1); setenv("HA_LOGD", "no", 1); @@ -951,12 +976,17 @@ main(int argc, char **argv) crm_exit(EIO); } + /* Allows us to block shutdown */ if (cluster_connect_cfg(&local_nodeid) == FALSE) { crm_err("Couldn't connect to Corosync's CFG service"); crm_exit(ENOPROTOOPT); } - if (cluster_connect_cpg() == FALSE) { + cluster.destroy = mcp_cpg_destroy; + cluster.cpg.cpg_deliver_fn = mcp_cpg_deliver; + cluster.cpg.cpg_confchg_fn = mcp_cpg_membership; + + if(cluster_connect_cpg(&cluster) == FALSE) { crm_err("Couldn't connect to Corosync's CPG service"); crm_exit(ENOPROTOOPT); } @@ -982,7 +1012,7 @@ main(int argc, char **argv) g_main_destroy(mainloop); - cluster_disconnect_cpg(); + cluster_disconnect_cpg(&cluster); cluster_disconnect_cfg(); crm_info("Exiting %s", crm_system_name); diff --git a/mcp/pacemaker.h b/mcp/pacemaker.h index 224df93..8967966 100644 --- a/mcp/pacemaker.h +++ b/mcp/pacemaker.h @@ -41,20 +41,16 @@ typedef struct pcmk_peer_s { char *uname; } pcmk_peer_t; -extern gboolean read_config(void); +gboolean read_config(void); -extern gboolean cluster_connect_cfg(uint32_t * nodeid); -extern gboolean cluster_disconnect_cfg(void); +gboolean cluster_connect_cfg(uint32_t * nodeid); +gboolean cluster_disconnect_cfg(void); -extern gboolean cluster_connect_cpg(void); -extern gboolean cluster_disconnect_cpg(void); -extern gboolean send_cpg_message(struct iovec *iov); +void update_process_clients(void); +void update_process_peers(void); +gboolean update_node_processes(uint32_t node, const char *uname, uint32_t procs); -extern void update_process_clients(void); -extern void update_process_peers(void); -extern gboolean update_node_processes(uint32_t node, const char *uname, uint32_t procs); +void enable_mgmtd(gboolean enable); +void enable_crmd_as_root(gboolean enable); -extern void enable_mgmtd(gboolean enable); -extern void enable_crmd_as_root(gboolean enable); - -extern void pcmk_shutdown(int nsig); +void pcmk_shutdown(int nsig); diff --git a/mcp/pacemaker.in b/mcp/pacemaker.in index a6647fe..c96f1d1 100644 --- a/mcp/pacemaker.in +++ b/mcp/pacemaker.in @@ -111,6 +111,7 @@ cman_pre_start() pid=$(pidof corosync 2>/dev/null) if [ $? -ne 0 ]; then service cman start + sleep 2 fi } diff --git a/tools/attrd.c b/tools/attrd.c index 1e834ea..2d485f9 100644 --- a/tools/attrd.c +++ b/tools/attrd.c @@ -325,11 +325,19 @@ attrd_ha_callback(HA_Message * msg, void *private_data) #endif #if SUPPORT_COROSYNC -static gboolean -attrd_ais_dispatch(int kind, const char *from, const char *data) +static void +attrd_cs_dispatch(cpg_handle_t handle, + const struct cpg_name *groupName, + uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len) { + uint32_t kind = 0; xmlNode *xml = NULL; + const char *from = NULL; + char *data = pcmk_message_common_cs(handle, nodeid, pid, msg, &kind, &from); + if(data == NULL) { + return; + } if (kind == crm_class_cluster) { xml = string2xml(data); if (xml == NULL) { @@ -360,11 +368,11 @@ attrd_ais_dispatch(int kind, const char *from, const char *data) free_xml(xml); } - return TRUE; + free(data); } static void -attrd_ais_destroy(gpointer unused) +attrd_cs_destroy(gpointer unused) { if (need_shutdown) { /* we signed out, so this is expected */ @@ -405,7 +413,7 @@ update_for_hash_entry(gpointer key, gpointer value, gpointer user_data) { attr_hash_entry_t *entry = value; - if (entry->value != NULL) { + if (entry->value != NULL || entry->stored_value != NULL) { attrd_timer_callback(value); } } @@ -537,8 +545,9 @@ main(int argc, char **argv) #if SUPPORT_COROSYNC if (is_openais_cluster()) { - cluster.destroy = attrd_ais_destroy; - cluster.cs_dispatch = attrd_ais_dispatch; + cluster.destroy = attrd_cs_destroy; + cluster.cpg.cpg_deliver_fn = attrd_cs_dispatch; + cluster.cpg.cpg_confchg_fn = pcmk_cpg_membership; } #endif diff --git a/tools/crm_node.c b/tools/crm_node.c index a25b3b4..aacea76 100644 --- a/tools/crm_node.c +++ b/tools/crm_node.c @@ -500,16 +500,23 @@ crm_add_member(gpointer key, gpointer value, gpointer user_data) } } -static gboolean -ais_membership_dispatch(int kind, const char *from, const char *data) +static void +ais_membership_dispatch(cpg_handle_t handle, + const struct cpg_name *groupName, + uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len) { + uint32_t kind = 0; + const char *from = NULL; + char *data = pcmk_message_common_cs(handle, nodeid, pid, msg, &kind, &from); + switch (kind) { case crm_class_members: case crm_class_notify: case crm_class_quorum: break; default: - return TRUE; + free(data); + return; break; } @@ -548,9 +555,10 @@ ais_membership_dispatch(int kind, const char *from, const char *data) fprintf(stdout, "\n"); } + free(data); crm_exit(pcmk_ok); - return TRUE; + return; } #endif @@ -695,7 +703,8 @@ try_openais(int command, enum cluster_type_e stack) static crm_cluster_t cluster; cluster.destroy = ais_membership_destroy; - cluster.cs_dispatch = ais_membership_dispatch; + cluster.cpg.cpg_deliver_fn = ais_membership_dispatch; + cluster.cpg.cpg_confchg_fn = NULL; if (init_cs_connection_once(&cluster)) { @@ -703,7 +712,7 @@ try_openais(int command, enum cluster_type_e stack) switch (command) { case 'R': - send_ais_text(crm_class_rmpeer, target_uname, TRUE, NULL, crm_msg_ais); + send_cluster_text(crm_class_rmpeer, target_uname, TRUE, NULL, crm_msg_ais); cib_remove_node(0, target_uname); crm_exit(pcmk_ok); @@ -713,13 +722,13 @@ try_openais(int command, enum cluster_type_e stack) crm_exit(pcmk_ok); case 'q': - send_ais_text(crm_class_quorum, NULL, TRUE, NULL, crm_msg_ais); + send_cluster_text(crm_class_quorum, NULL, TRUE, NULL, crm_msg_ais); break; case 'l': case 'p': crm_info("Requesting the list of configured nodes"); - send_ais_text(crm_class_members, __FUNCTION__, TRUE, NULL, crm_msg_ais); + send_cluster_text(crm_class_members, __FUNCTION__, TRUE, NULL, crm_msg_ais); break; case 'i':