pacemaker/pacemaker-7d8acec.patch

3678 lines
113 KiB
Diff
Raw Normal View History

2013-06-20 06:29:04 +00:00
diff --git a/cib/callbacks.c b/cib/callbacks.c
index 754e218..77853d9 100644
--- a/cib/callbacks.c
+++ b/cib/callbacks.c
@@ -1391,7 +1391,6 @@ initiate_exit(void)
extern int remote_fd;
extern int remote_tls_fd;
-extern void terminate_cs_connection(void);
void
terminate_cib(const char *caller, gboolean fast)
diff --git a/cib/main.c b/cib/main.c
index 6b56274..3328558 100644
--- a/cib/main.c
+++ b/cib/main.c
@@ -371,15 +371,25 @@ ccm_connect(void)
#endif
#if SUPPORT_COROSYNC
-static gboolean
-cib_ais_dispatch(int kind, const char *from, const char *data)
+static void
+cib_cs_dispatch(cpg_handle_t handle,
+ const struct cpg_name *groupName,
+ uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
{
+ uint32_t kind = 0;
xmlNode *xml = NULL;
+ const char *from = NULL;
+ char *data = pcmk_message_common_cs(handle, nodeid, pid, msg, &kind, &from);
+ if(data == NULL) {
+ return;
+ }
if (kind == crm_class_cluster) {
xml = string2xml(data);
if (xml == NULL) {
- goto bail;
+ crm_err("Invalid XML: '%.120s'", data);
+ free(data);
+ return;
}
crm_xml_add(xml, F_ORIG, from);
/* crm_xml_add_int(xml, F_SEQ, wrapper->id); */
@@ -387,16 +397,11 @@ cib_ais_dispatch(int kind, const char *from, const char *data)
}
free_xml(xml);
- return TRUE;
-
- bail:
- crm_err("Invalid XML: '%.120s'", data);
- return TRUE;
-
+ free(data);
}
static void
-cib_ais_destroy(gpointer user_data)
+cib_cs_destroy(gpointer user_data)
{
if (cib_shutdown_flag) {
crm_info("Corosync disconnection complete");
@@ -463,8 +468,9 @@ cib_init(void)
{
if (is_openais_cluster()) {
#if SUPPORT_COROSYNC
- crm_cluster.destroy = cib_ais_destroy;
- crm_cluster.cs_dispatch = cib_ais_dispatch;
+ crm_cluster.destroy = cib_cs_destroy;
+ crm_cluster.cpg.cpg_deliver_fn = cib_cs_dispatch;
+ crm_cluster.cpg.cpg_confchg_fn = pcmk_cpg_membership;
#endif
} else if (is_heartbeat_cluster()) {
#if SUPPORT_HEARTBEAT
diff --git a/configure.ac b/configure.ac
index be8261a..7d2e384 100644
--- a/configure.ac
+++ b/configure.ac
@@ -132,7 +132,7 @@ try_extract_header_define() {
AC_MSG_RESULT($value)
fi
printf $value
- rm -rf ${Cfile}.cc ${Cfile} ${Cfile}.dSYM ${Cfile}.gcno
+ rm -rf ${Cfile}.c ${Cfile} ${Cfile}.dSYM ${Cfile}.gcno
}
extract_header_define() {
@@ -669,14 +669,6 @@ else
fi
AC_MSG_RESULT(using $GLIBCONFIG)
-if
- $PKGCONFIG --exists systemd
-then
- systemdunitdir=`$PKGCONFIG --variable=systemdsystemunitdir systemd`
- AC_SUBST(systemdunitdir)
-fi
-AM_CONDITIONAL(HAVE_SYSTEMD, test -n "$systemdunitdir" -a "x$systemdunitdir" != xno)
-
#
# Where is dlopen?
#
@@ -965,50 +957,37 @@ dnl ========================================================================
dnl Profiling and GProf
dnl ========================================================================
-case $SUPPORT_PROFILING in
+case $SUPPORT_GCOV in
1|yes|true)
SUPPORT_PROFILING=1
-
- dnl Enable gprof
- #LIBS="$LIBS -pg"
- #CFLAGS="$CFLAGS -pg"
-
- dnl Disable various compiler optimizations
- CFLAGS="$CFLAGS -fno-omit-frame-pointer"
- #CFLAGS="$CFLAGS -fno-inline-functions -fno-inline-functions-called-once -fno-optimize-sibling-calls"
- dnl CFLAGS="$CFLAGS -fno-default-inline -fno-inline"
-
- dnl Update features
- PCMK_FEATURES="$PCMK_FEATURES gprof"
;;
- *) SUPPORT_PROFILING=0;;
esac
-AC_DEFINE_UNQUOTED(SUPPORT_PROFILING, $SUPPORT_PROFILING, Support for gprof profiling)
-case $SUPPORT_GCOV in
+case $SUPPORT_PROFILING in
1|yes|true)
- SUPPORT_GCOV=1
+ SUPPORT_PROFILING=1
dnl Enable gprof
#LIBS="$LIBS -pg"
#CFLAGS="$CFLAGS -pg"
dnl Disable various compiler optimizations
- CFLAGS="$CFLAGS -fprofile-arcs -ftest-coverage -fno-inline"
+ CFLAGS="$CFLAGS -fno-omit-frame-pointer -fprofile-arcs -ftest-coverage -fno-inline"
+ #CFLAGS="$CFLAGS -fno-inline-functions -fno-inline-functions-called-once -fno-optimize-sibling-calls"
+ dnl CFLAGS="$CFLAGS -fno-default-inline -fno-inline"
- dnl Turn off optimization so code coverage tool
- dnl can get accurate line numbers
+ dnl Turn off optimization so code coverage tool can get accurate line numbers
AC_MSG_NOTICE(Old CFLAGS: $CFLAGS)
- CFLAGS=`echo $CFLAGS | sed -e 's/-O.\ //g' -e 's/-Wp,-D_FORTIFY_SOURCE=.\ //g'`
+ CFLAGS=`echo $CFLAGS | sed -e 's/-O.\ //g' -e 's/-Wp,-D_FORTIFY_SOURCE=.\ //g' -e 's/-D_FORTIFY_SOURCE=.\ //g'`
CFLAGS="$CFLAGS -O0"
AC_MSG_NOTICE(New CFLAGS: $CFLAGS)
dnl Update features
- PCMK_FEATURES="$PCMK_FEATURES gcov"
+ PCMK_FEATURES="$PCMK_FEATURES profile"
;;
*) SUPPORT_PROFILING=0;;
esac
-AC_DEFINE_UNQUOTED(SUPPORT_GCOV, $SUPPORT_GCOV, Support for gcov coverage testing)
+AC_DEFINE_UNQUOTED(SUPPORT_PROFILING, $SUPPORT_PROFILING, Support for profiling)
dnl ========================================================================
dnl Cluster infrastructure - Heartbeat / LibQB
@@ -1192,14 +1171,25 @@ fi
AC_DEFINE_UNQUOTED(SUPPORT_UPSTART, $HAVE_upstart, Support upstart based system services)
AM_CONDITIONAL(BUILD_UPSTART, test $HAVE_upstart = 1)
+if
+ $PKGCONFIG --exists systemd
+then
+ systemdunitdir=`$PKGCONFIG --variable=systemdsystemunitdir systemd`
+ AC_SUBST(systemdunitdir)
+else
+ enable_systemd=no
+fi
+
if test $HAVE_gio = 1 -a "x${enable_systemd}" != xno; then
- HAVE_systemd=1
- PCMK_FEATURES="$PCMK_FEATURES systemd"
+ if test -n "$systemdunitdir" -a "x$systemdunitdir" != xno; then
+ HAVE_systemd=1
+ PCMK_FEATURES="$PCMK_FEATURES systemd"
+ fi
fi
+
AC_DEFINE_UNQUOTED(SUPPORT_SYSTEMD, $HAVE_systemd, Support systemd based system services)
AM_CONDITIONAL(BUILD_SYSTEMD, test $HAVE_systemd = 1)
-
case $SUPPORT_NAGIOS in
1|yes|true|try)
SUPPORT_NAGIOS=1;;
diff --git a/crmd/control.c b/crmd/control.c
index 7f423db..0808f56 100644
--- a/crmd/control.c
+++ b/crmd/control.c
@@ -915,7 +915,7 @@ config_query_callback(xmlNode * msg, int call_id, int rc, xmlNode * output, void
if (is_classic_ais_cluster()) {
value = crmd_pref(config_hash, XML_ATTR_EXPECTED_VOTES);
crm_debug("Sending expected-votes=%s to corosync", value);
- send_ais_text(crm_class_quorum, value, TRUE, NULL, crm_msg_ais);
+ send_cluster_text(crm_class_quorum, value, TRUE, NULL, crm_msg_ais);
}
#endif
diff --git a/crmd/corosync.c b/crmd/corosync.c
index 6385780..c4aef38 100644
--- a/crmd/corosync.c
+++ b/crmd/corosync.c
@@ -41,8 +41,10 @@ extern void crmd_ha_connection_destroy(gpointer user_data);
/* A_HA_CONNECT */
#if SUPPORT_COROSYNC
-static gboolean
-crmd_ais_dispatch(int kind, const char *from, const char *data)
+static void
+crmd_cs_dispatch(cpg_handle_t handle,
+ const struct cpg_name *groupName,
+ uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
{
int seq = 0;
xmlNode *xml = NULL;
@@ -50,10 +52,18 @@ crmd_ais_dispatch(int kind, const char *from, const char *data)
crm_node_t *peer = NULL;
enum crm_proc_flag flag = crm_proc_cpg;
+ uint32_t kind = 0;
+ const char *from = NULL;
+ char *data = pcmk_message_common_cs(handle, nodeid, pid, msg, &kind, &from);
+
+ if(data == NULL) {
+ return;
+ }
xml = string2xml(data);
if (xml == NULL) {
crm_err("Could not parse message content (%d): %.100s", kind, data);
- return TRUE;
+ free(data);
+ return;
}
switch (kind) {
@@ -103,8 +113,8 @@ crmd_ais_dispatch(int kind, const char *from, const char *data)
/* If we can still talk to our peer process on that node,
* then its also part of the corosync membership
*/
- crm_err("Recieving messages from a node we think is dead: %s[%d]", peer->uname,
- peer->id);
+ crm_warn("Recieving messages from a node we think is dead: %s[%d]", peer->uname,
+ peer->id);
crm_update_peer_proc(__FUNCTION__, peer, flag, ONLINESTATUS);
}
crmd_ha_msg_filter(xml);
@@ -123,8 +133,8 @@ crmd_ais_dispatch(int kind, const char *from, const char *data)
crm_err("Invalid message class (%d): %.100s", kind, data);
}
+ free(data);
free_xml(xml);
- return TRUE;
}
static gboolean
@@ -148,7 +158,7 @@ crmd_quorum_destroy(gpointer user_data)
}
static void
-crmd_ais_destroy(gpointer user_data)
+crmd_cs_destroy(gpointer user_data)
{
if (is_not_set(fsa_input_register, R_HA_DISCONNECTED)) {
crm_err("connection terminated");
@@ -182,8 +192,9 @@ crm_connect_corosync(crm_cluster_t * cluster)
if (is_openais_cluster()) {
crm_set_status_callback(&peer_update_callback);
- cluster->cs_dispatch = crmd_ais_dispatch;
- cluster->destroy = crmd_ais_destroy;
+ cluster->cpg.cpg_deliver_fn = crmd_cs_dispatch;
+ cluster->cpg.cpg_confchg_fn = pcmk_cpg_membership;
+ cluster->destroy = crmd_cs_destroy;
rc = crm_cluster_connect(cluster);
}
diff --git a/crmd/election.c b/crmd/election.c
index 1946858..25cb647 100644
--- a/crmd/election.c
+++ b/crmd/election.c
@@ -518,7 +518,7 @@ do_dc_takeover(long long action,
#if SUPPORT_COROSYNC
if (is_classic_ais_cluster()) {
- send_ais_text(crm_class_quorum, NULL, TRUE, NULL, crm_msg_ais);
+ send_cluster_text(crm_class_quorum, NULL, TRUE, NULL, crm_msg_ais);
}
#endif
diff --git a/crmd/lrm.c b/crmd/lrm.c
index 31f00d7..15bad88 100644
--- a/crmd/lrm.c
+++ b/crmd/lrm.c
@@ -1929,6 +1929,7 @@ do_update_resource(lrm_state_t * lrm_state, lrmd_rsc_info_t * rsc, lrmd_event_da
} else {
crm_warn("Resource %s no longer exists in the lrmd", op->rsc_id);
+ send_direct_ack(NULL, NULL, rsc, op, op->rsc_id);
goto cleanup;
}
diff --git a/doc/Pacemaker_Remote/en-US/Revision_History.xml b/doc/Pacemaker_Remote/en-US/Revision_History.xml
index 26d8ab6..257ecbd 100644
--- a/doc/Pacemaker_Remote/en-US/Revision_History.xml
+++ b/doc/Pacemaker_Remote/en-US/Revision_History.xml
@@ -8,13 +8,13 @@
<simpara>
<revhistory>
<revision>
- <revnumber>1</revnumber>
+ <revnumber>1-0</revnumber>
<date>Tue Mar 19 2013</date>
<author><firstname>David</firstname><surname>Vossel</surname><email>dvossel@redhat.com</email></author>
<revdescription><simplelist><member>Import from Pages.app</member></simplelist></revdescription>
</revision>
<revision>
- <revnumber>2</revnumber>
+ <revnumber>2-0</revnumber>
<date>Tue May 13 2013</date>
<author><firstname>David</firstname><surname>Vossel</surname><email>dvossel@redhat.com</email></author>
<revdescription><simplelist><member>Added Future Features Section</member></simplelist></revdescription>
diff --git a/extra/cluster-init b/extra/cluster-init
index 5dc71c2..fe0ff61 100755
--- a/extra/cluster-init
+++ b/extra/cluster-init
@@ -294,10 +294,10 @@ esac
case $DATE in
[Yy][Ee][Ss]|[Yy])
- now=`date`
for host in $host_list; do
echo "Setting time on ${host}"
scp /etc/localtime root@${host}:/etc
+ now=`date`
ssh -l root ${host} -- date -s "'$now'"
echo ""
done
diff --git a/fencing/fence_dummy b/fencing/fence_dummy
index b202977..8cf5103 100644
--- a/fencing/fence_dummy
+++ b/fencing/fence_dummy
@@ -5,7 +5,7 @@
# Virsh 0.3.3 on RHEL 5.2 with xen-3.0.3-51
#
-import sys, time, random
+import sys, time, random, os, atexit, getopt, re
#BEGIN_VERSION_GENERATION
RELEASE_VERSION="3.1.6"
@@ -42,14 +42,28 @@ all_opt = {
"debug" : {
"getopt" : "D:",
"longopt" : "debug-file",
- "help" : "-D, --debug-file=<debugfile> Debugging to output file",
+ "help" : "-D, --debug-file=[debugfile] Debugging to output file",
"required" : "0",
"shortdesc" : "Write debug information to given file",
"order" : 52 },
+ "random_sleep_range": {
+ "getopt" : "R:",
+ "required" : "0",
+ "longopt" : "random_sleep_range",
+ "help" : "--random_sleep-range=[seconds] Issue a sleep between 1 and [seconds]. Used for testing.",
+ "shortdesc" : "Issue a sleep between 1 and [seconds]",
+ "order" : 1 },
+ "mode": {
+ "getopt" : "M:",
+ "longopt" : "mode",
+ "required" : "0",
+ "help" : "--mode=(pass|fail|random). Used for testing.",
+ "shortdesc" : "Should operations always pass, always fail or fail at random",
+ "order" : 1 },
"delay" : {
"getopt" : "f:",
"longopt" : "delay",
- "help" : "--delay <seconds> Wait X seconds before fencing is started",
+ "help" : "--delay [seconds] Wait X seconds before fencing is started",
"required" : "0",
"shortdesc" : "Wait X seconds before fencing is started",
"default" : "0",
@@ -57,7 +71,7 @@ all_opt = {
"action" : {
"getopt" : "o:",
"longopt" : "action",
- "help" : "-o, --action=<action> Action: status, reboot (default), off or on",
+ "help" : "-o, --action=[action] Action: status, reboot (default), off or on",
"required" : "1",
"shortdesc" : "Fencing Action",
"default" : "reboot",
@@ -65,7 +79,7 @@ all_opt = {
"port" : {
"getopt" : "n:",
"longopt" : "plug",
- "help" : "-n, --plug=<id> Physical plug number on device or\n" +
+ "help" : "-n, --plug=[id] Physical plug number on device or\n" +
" name of virtual machine",
"required" : "1",
"shortdesc" : "Physical plug number or name of virtual machine",
@@ -73,7 +87,7 @@ all_opt = {
"switch" : {
"getopt" : "s:",
"longopt" : "switch",
- "help" : "-s, --switch=<id> Physical switch number on device",
+ "help" : "-s, --switch=[id] Physical switch number on device",
"required" : "0",
"shortdesc" : "Physical switch number on device",
"order" : 1 },
@@ -86,8 +100,6 @@ all_opt = {
"order" : 1}
}
-common_opt = [ "retry_on", "delay" ]
-
def show_docs(options, docs = None):
device_opt = options["device_opt"]
@@ -189,12 +201,6 @@ def metadata(avail_opt, options, docs):
def process_input(avail_opt):
global all_opt
- global common_opt
-
- ##
- ## Add options which are available for every fence agent
- #####
- avail_opt.extend(common_opt)
##
## Set standard environment
@@ -290,24 +296,11 @@ def atexit_handler():
os.close(1)
except IOError:
sys.stderr.write("%s failed to close standard output\n"%(sys.argv[0]))
- sys.exit(EC_GENERIC_ERROR)
+ sys.exit(1)
def main():
global all_opt
- device_opt = [ "help", "version", "verbose", "debug", "action", "port",
- "power_timeout", "random_sleep_range"]
-
- all_opt["random_sleep_range"] = {
- "getopt" : "R:",
- "longopt" : "random_sleep_range",
- "help" : "--random_sleep-range=<seconds>Issue a sleep between 1 and <seconds>. Used for testing.",
- "order" : 1 }
-
- all_opt["mode"] = {
- "getopt" : "M:",
- "longopt" : "mode",
- "help" : "--mode=(pass|fail|random). Used for testing.",
- "order" : 1 }
+ device_opt = [ "help", "version", "verbose", "debug", "action", "port", "mode", "random_sleep_range"]
## Defaults for fence agent
docs = { }
@@ -316,6 +309,7 @@ def main():
atexit.register(atexit_handler)
options = process_input(device_opt)
+ options["device_opt"] = device_opt
show_docs(options, docs)
# random sleep for testing
diff --git a/fencing/main.c b/fencing/main.c
index c7b67a1..fee9f7a 100644
--- a/fencing/main.c
+++ b/fencing/main.c
@@ -190,15 +190,25 @@ stonith_peer_hb_destroy(gpointer user_data)
#endif
#if SUPPORT_COROSYNC
-static gboolean
-stonith_peer_ais_callback(int kind, const char *from, const char *data)
+static void
+stonith_peer_ais_callback(cpg_handle_t handle,
+ const struct cpg_name *groupName,
+ uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
{
+ uint32_t kind = 0;
xmlNode *xml = NULL;
+ const char *from = NULL;
+ char *data = pcmk_message_common_cs(handle, nodeid, pid, msg, &kind, &from);
+ if(data == NULL) {
+ return;
+ }
if (kind == crm_class_cluster) {
xml = string2xml(data);
if (xml == NULL) {
- goto bail;
+ crm_err("Invalid XML: '%.120s'", data);
+ free(data);
+ return;
}
crm_xml_add(xml, F_ORIG, from);
/* crm_xml_add_int(xml, F_SEQ, wrapper->id); */
@@ -206,18 +216,14 @@ stonith_peer_ais_callback(int kind, const char *from, const char *data)
}
free_xml(xml);
- return TRUE;
-
- bail:
- crm_err("Invalid XML: '%.120s'", data);
- return TRUE;
-
+ free(data);
+ return;
}
static void
-stonith_peer_ais_destroy(gpointer user_data)
+stonith_peer_cs_destroy(gpointer user_data)
{
- crm_err("AIS connection terminated");
+ crm_err("Corosync connection terminated");
stonith_shutdown(0);
}
#endif
@@ -1084,8 +1090,9 @@ main(int argc, char **argv)
if (is_openais_cluster()) {
#if SUPPORT_COROSYNC
- cluster.destroy = stonith_peer_ais_destroy;
- cluster.cs_dispatch = stonith_peer_ais_callback;
+ cluster.destroy = stonith_peer_cs_destroy;
+ cluster.cpg.cpg_deliver_fn = stonith_peer_ais_callback;
+ cluster.cpg.cpg_confchg_fn = pcmk_cpg_membership;
#endif
}
diff --git a/include/crm/cluster.h b/include/crm/cluster.h
index cac863f..c999367 100644
--- a/include/crm/cluster.h
+++ b/include/crm/cluster.h
@@ -26,9 +26,12 @@
# include <ocf/oc_event.h>
# endif
+# if SUPPORT_COROSYNC
+# include <corosync/cpg.h>
+# endif
+
extern gboolean crm_have_quorum;
extern GHashTable *crm_peer_cache;
-extern GHashTable *crm_peer_id_cache;
extern unsigned long long crm_peer_seq;
# ifndef CRM_SERVICE
@@ -73,21 +76,24 @@ typedef struct crm_peer_node_s {
void crm_peer_init(void);
void crm_peer_destroy(void);
-char *get_corosync_uuid(crm_node_t *peer);
-int get_corosync_id(int id, const char *uuid);
typedef struct crm_cluster_s {
char *uuid;
char *uname;
uint32_t nodeid;
+ void (*destroy) (gpointer);
+
# if SUPPORT_HEARTBEAT
ll_cluster_t *hb_conn;
void (*hb_dispatch) (HA_Message * msg, void *private);
# endif
- gboolean(*cs_dispatch) (int kind, const char *from, const char *data);
- void (*destroy) (gpointer);
+# if SUPPORT_COROSYNC
+ struct cpg_name group;
+ cpg_callbacks_t cpg;
+ cpg_handle_t cpg_handle;
+# endif
} crm_cluster_t;
@@ -122,8 +128,6 @@ enum crm_ais_msg_types {
gboolean send_cluster_message(crm_node_t * node, enum crm_ais_msg_types service,
xmlNode * data, gboolean ordered);
-void destroy_crm_node(gpointer /* crm_node_t* */ data);
-
crm_node_t *crm_get_peer(unsigned int id, const char *uname);
guint crm_active_peers(void);
@@ -138,8 +142,18 @@ gboolean crm_is_heartbeat_peer_active(const crm_node_t * node);
# if SUPPORT_COROSYNC
extern int ais_fd_sync;
+uint32_t get_local_nodeid(cpg_handle_t handle);
+
+gboolean cluster_connect_cpg(crm_cluster_t *cluster);
+void cluster_disconnect_cpg(crm_cluster_t * cluster);
+
+void pcmk_cpg_membership(cpg_handle_t handle,
+ const struct cpg_name *groupName,
+ const struct cpg_address *member_list, size_t member_list_entries,
+ const struct cpg_address *left_list, size_t left_list_entries,
+ const struct cpg_address *joined_list, size_t joined_list_entries);
gboolean crm_is_corosync_peer_active(const crm_node_t * node);
-gboolean send_ais_text(int class, const char *data, gboolean local,
+gboolean send_cluster_text(int class, const char *data, gboolean local,
crm_node_t * node, enum crm_ais_msg_types dest);
# endif
@@ -180,4 +194,7 @@ gboolean is_heartbeat_cluster(void);
const char *get_local_node_name(void);
char *get_node_name(uint32_t nodeid);
+char *pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *msg,
+ uint32_t *kind, const char **from);
+
#endif
diff --git a/include/crm/cluster/internal.h b/include/crm/cluster/internal.h
index 2fa8e08..791a1f9 100644
--- a/include/crm/cluster/internal.h
+++ b/include/crm/cluster/internal.h
@@ -349,20 +349,24 @@ gboolean heartbeat_initialize_nodelist(void *cluster, gboolean force_member, xml
# if SUPPORT_COROSYNC
+gboolean send_cpg_iov(struct iovec * iov);
+
# if SUPPORT_PLUGIN
char *classic_node_name(uint32_t nodeid);
+void plugin_handle_membership(AIS_Message *msg);
+bool send_plugin_text(int class, struct iovec *iov);
# else
char *corosync_node_name(uint64_t /*cmap_handle_t */ cmap_handle, uint32_t nodeid);
# endif
gboolean corosync_initialize_nodelist(void *cluster, gboolean force_member, xmlNode * xml_parent);
-gboolean send_ais_message(xmlNode * msg, gboolean local,
- crm_node_t * node, enum crm_ais_msg_types dest);
+gboolean send_cluster_message_cs(xmlNode * msg, gboolean local,
+ crm_node_t * node, enum crm_ais_msg_types dest);
enum cluster_type_e find_corosync_variant(void);
-void terminate_cs_connection(void);
+void terminate_cs_connection(crm_cluster_t * cluster);
gboolean init_cs_connection(crm_cluster_t * cluster);
gboolean init_cs_connection_once(crm_cluster_t * cluster);
# endif
@@ -377,6 +381,8 @@ enum crm_quorum_source {
crm_quorum_pacemaker,
};
+int get_corosync_id(int id, const char *uuid);
+char *get_corosync_uuid(crm_node_t *peer);
enum crm_quorum_source get_quorum_source(void);
void crm_update_peer_proc(const char *source, crm_node_t * peer, uint32_t flag, const char *status);
diff --git a/lib/cluster/Makefile.am b/lib/cluster/Makefile.am
index a5a70ff..744ff27 100644
--- a/lib/cluster/Makefile.am
+++ b/lib/cluster/Makefile.am
@@ -33,6 +33,7 @@ libcrmcluster_la_LIBADD = $(top_builddir)/lib/common/libcrmcommon.la $(top_buil
libcrmcluster_la_DEPENDENCIES = $(top_builddir)/lib/common/libcrmcommon.la $(top_builddir)/lib/fencing/libstonithd.la
if BUILD_CS_SUPPORT
+libcrmcluster_la_SOURCES += cpg.c
if BUILD_CS_PLUGIN
libcrmcluster_la_SOURCES += legacy.c
else
diff --git a/lib/cluster/cluster.c b/lib/cluster/cluster.c
index 9538816..5820c8d 100644
--- a/lib/cluster/cluster.c
+++ b/lib/cluster/cluster.c
@@ -240,7 +240,7 @@ crm_cluster_disconnect(crm_cluster_t * cluster)
#if SUPPORT_COROSYNC
if (is_openais_cluster()) {
crm_peer_destroy();
- terminate_cs_connection();
+ terminate_cs_connection(cluster);
crm_info("Disconnected from %s", type_str);
return;
}
@@ -274,7 +274,7 @@ send_cluster_message(crm_node_t * node, enum crm_ais_msg_types service, xmlNode
#if SUPPORT_COROSYNC
if (is_openais_cluster()) {
- return send_ais_message(data, FALSE, node, service);
+ return send_cluster_message_cs(data, FALSE, node, service);
}
#endif
#if SUPPORT_HEARTBEAT
diff --git a/lib/cluster/corosync.c b/lib/cluster/corosync.c
index 83a0c78..5a64fe1 100644
--- a/lib/cluster/corosync.c
+++ b/lib/cluster/corosync.c
@@ -34,69 +34,16 @@
#include <corosync/corodefs.h>
#include <corosync/corotypes.h>
#include <corosync/hdb.h>
-#include <corosync/cpg.h>
#include <corosync/cfg.h>
#include <corosync/cmap.h>
#include <corosync/quorum.h>
#include <crm/msg_xml.h>
-cpg_handle_t pcmk_cpg_handle = 0;
-
-struct cpg_name pcmk_cpg_group = {
- .length = 0,
- .value[0] = 0,
-};
-
quorum_handle_t pcmk_quorum_handle = 0;
gboolean(*quorum_app_callback) (unsigned long long seq, gboolean quorate) = NULL;
-#define cs_repeat(counter, max, code) do { \
- code; \
- if(rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { \
- counter++; \
- crm_debug("Retrying operation after %ds", counter); \
- sleep(counter); \
- } else { \
- break; \
- } \
- } while(counter < max)
-
-static uint32_t get_local_nodeid(cpg_handle_t handle)
-{
- int rc = CS_OK;
- int retries = 0;
- static uint32_t local_nodeid = 0;
- cpg_handle_t local_handle = handle;
- cpg_callbacks_t cb = { };
-
- if(local_nodeid != 0) {
- return local_nodeid;
- }
-
- if(handle == 0) {
- crm_trace("Creating connection");
- cs_repeat(retries, 5, rc = cpg_initialize(&local_handle, &cb));
- }
-
- if (rc == CS_OK) {
- retries = 0;
- crm_trace("Performing lookup");
- cs_repeat(retries, 5, rc = cpg_local_get(local_handle, &local_nodeid));
- }
-
- if (rc != CS_OK) {
- crm_err("Could not get local node id from the CPG API: %s (%d)", ais_error2text(rc), rc);
- }
- if(handle == 0) {
- crm_trace("Closing connection");
- cpg_finalize(local_handle);
- }
- crm_debug("Local nodeid is %u", local_nodeid);
- return local_nodeid;
-}
-
/*
* CFG functionality stolen from node_name() in corosync-quorumtool.c
* This resolves the first address assigned to a node and returns the name or IP address.
@@ -189,281 +136,12 @@ corosync_node_name(uint64_t /*cmap_handle_t */ cmap_handle, uint32_t nodeid)
return name;
}
-enum crm_ais_msg_types
-text2msg_type(const char *text)
-{
- int type = crm_msg_none;
-
- CRM_CHECK(text != NULL, return type);
- if (safe_str_eq(text, "ais")) {
- type = crm_msg_ais;
- } else if (safe_str_eq(text, "crm_plugin")) {
- type = crm_msg_ais;
- } else if (safe_str_eq(text, CRM_SYSTEM_CIB)) {
- type = crm_msg_cib;
- } else if (safe_str_eq(text, CRM_SYSTEM_CRMD)) {
- type = crm_msg_crmd;
- } else if (safe_str_eq(text, CRM_SYSTEM_DC)) {
- type = crm_msg_crmd;
- } else if (safe_str_eq(text, CRM_SYSTEM_TENGINE)) {
- type = crm_msg_te;
- } else if (safe_str_eq(text, CRM_SYSTEM_PENGINE)) {
- type = crm_msg_pe;
- } else if (safe_str_eq(text, CRM_SYSTEM_LRMD)) {
- type = crm_msg_lrmd;
- } else if (safe_str_eq(text, CRM_SYSTEM_STONITHD)) {
- type = crm_msg_stonithd;
- } else if (safe_str_eq(text, "stonith-ng")) {
- type = crm_msg_stonith_ng;
- } else if (safe_str_eq(text, "attrd")) {
- type = crm_msg_attrd;
-
- } else {
- /* This will normally be a transient client rather than
- * a cluster daemon. Set the type to the pid of the client
- */
- int scan_rc = sscanf(text, "%d", &type);
-
- if (scan_rc != 1) {
- /* Ensure its sane */
- type = crm_msg_none;
- }
- }
- return type;
-}
-
-GListPtr cs_message_queue = NULL;
-int cs_message_timer = 0;
-
-static ssize_t crm_cs_flush(void);
-
-static gboolean
-crm_cs_flush_cb(gpointer data)
-{
- cs_message_timer = 0;
- crm_cs_flush();
- return FALSE;
-}
-
-#define CS_SEND_MAX 200
-static ssize_t
-crm_cs_flush(void)
-{
- int sent = 0;
- ssize_t rc = 0;
- int queue_len = 0;
- static unsigned int last_sent = 0;
-
- if (pcmk_cpg_handle == 0) {
- crm_trace("Connection is dead");
- return pcmk_ok;
- }
-
- queue_len = g_list_length(cs_message_queue);
- if ((queue_len % 1000) == 0 && queue_len > 1) {
- crm_err("CPG queue has grown to %d", queue_len);
-
- } else if (queue_len == CS_SEND_MAX) {
- crm_warn("CPG queue has grown to %d", queue_len);
- }
-
- if (cs_message_timer) {
- /* There is already a timer, wait until it goes off */
- crm_trace("Timer active %d", cs_message_timer);
- return pcmk_ok;
- }
-
- while (cs_message_queue && sent < CS_SEND_MAX) {
- AIS_Message *header = NULL;
- struct iovec *iov = cs_message_queue->data;
-
- errno = 0;
- rc = cpg_mcast_joined(pcmk_cpg_handle, CPG_TYPE_AGREED, iov, 1);
-
- if (rc != CS_OK) {
- break;
- }
-
- sent++;
- header = iov->iov_base;
- last_sent = header->id;
- if (header->compressed_size) {
- crm_trace("CPG message %d (%d compressed bytes) sent",
- header->id, header->compressed_size);
- } else {
- crm_trace("CPG message %d (%d bytes) sent: %.200s",
- header->id, header->size, header->data);
- }
-
- cs_message_queue = g_list_remove(cs_message_queue, iov);
- free(iov[0].iov_base);
- free(iov);
- }
-
- queue_len -= sent;
- if (sent > 1 || cs_message_queue) {
- crm_info("Sent %d CPG messages (%d remaining, last=%u): %s (%d)",
- sent, queue_len, last_sent, ais_error2text(rc), rc);
- } else {
- crm_trace("Sent %d CPG messages (%d remaining, last=%u): %s (%d)",
- sent, queue_len, last_sent, ais_error2text(rc), rc);
- }
-
- if (cs_message_queue) {
- uint32_t delay_ms = 100;
- if(rc != CS_OK) {
- /* Proportionally more if sending failed but cap at 1s */
- delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
- }
- cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, NULL);
- }
-
- return rc;
-}
-
-gboolean
-send_ais_text(int class, const char *data,
- gboolean local, crm_node_t * node, enum crm_ais_msg_types dest)
-{
- static int msg_id = 0;
- static int local_pid = 0;
- static int local_name_len = 0;
- static const char *local_name = NULL;
-
- char *target = NULL;
- struct iovec *iov;
- AIS_Message *ais_msg = NULL;
- enum crm_ais_msg_types sender = text2msg_type(crm_system_name);
-
- /* There are only 6 handlers registered to crm_lib_service in plugin.c */
- CRM_CHECK(class < 6, crm_err("Invalid message class: %d", class);
- return FALSE);
-
- CRM_CHECK(dest != crm_msg_ais, return FALSE);
-
- if(local_name == NULL) {
- local_name = get_local_node_name();
- }
- if(local_name_len == 0 && local_name) {
- local_name_len = strlen(local_name);
- }
-
- if (data == NULL) {
- data = "";
- }
-
- if (local_pid == 0) {
- local_pid = getpid();
- }
-
- if (sender == crm_msg_none) {
- sender = local_pid;
- }
-
- ais_msg = calloc(1, sizeof(AIS_Message));
-
- ais_msg->id = msg_id++;
- ais_msg->header.id = class;
- ais_msg->header.error = CS_OK;
-
- ais_msg->host.type = dest;
- ais_msg->host.local = local;
-
- if (node) {
- if (node->uname) {
- target = strdup(node->uname);
- ais_msg->host.size = strlen(node->uname);
- memset(ais_msg->host.uname, 0, MAX_NAME);
- memcpy(ais_msg->host.uname, node->uname, ais_msg->host.size);
- } else {
- target = g_strdup_printf("%u", node->id);
- }
- ais_msg->host.id = node->id;
- } else {
- target = strdup("all");
- }
-
- ais_msg->sender.id = 0;
- ais_msg->sender.type = sender;
- ais_msg->sender.pid = local_pid;
- ais_msg->sender.size = local_name_len;
- memset(ais_msg->sender.uname, 0, MAX_NAME);
- memcpy(ais_msg->sender.uname, local_name, ais_msg->sender.size);
-
- ais_msg->size = 1 + strlen(data);
- ais_msg->header.size = sizeof(AIS_Message) + ais_msg->size;
-
- if (ais_msg->size < CRM_BZ2_THRESHOLD) {
- ais_msg = realloc(ais_msg, ais_msg->header.size);
- memcpy(ais_msg->data, data, ais_msg->size);
-
- } else {
- char *compressed = NULL;
- unsigned int new_size = 0;
- char *uncompressed = strdup(data);
-
- if (crm_compress_string(uncompressed, ais_msg->size, 0, &compressed, &new_size)) {
-
- ais_msg->header.size = sizeof(AIS_Message) + new_size + 1;
- ais_msg = realloc(ais_msg, ais_msg->header.size);
- memcpy(ais_msg->data, compressed, new_size);
- ais_msg->data[new_size] = 0;
-
- ais_msg->is_compressed = TRUE;
- ais_msg->compressed_size = new_size;
-
- } else {
- ais_msg = realloc(ais_msg, ais_msg->header.size);
- memcpy(ais_msg->data, data, ais_msg->size);
- }
-
- free(uncompressed);
- free(compressed);
- }
-
- if (ais_msg->compressed_size) {
- crm_trace("Queueing CPG message %u to %s (%d compressed bytes)",
- ais_msg->id, target, ais_msg->compressed_size);
- } else {
- crm_trace("Queueing CPG message %u to %s (%d bytes)",
- ais_msg->id, target, ais_msg->size);
- }
-
- iov = calloc(1, sizeof(struct iovec));
- iov->iov_base = ais_msg;
- iov->iov_len = ais_msg->header.size;
- cs_message_queue = g_list_append(cs_message_queue, iov);
- crm_cs_flush();
-
- free(target);
- return TRUE;
-}
-
-gboolean
-send_ais_message(xmlNode * msg, gboolean local, crm_node_t * node, enum crm_ais_msg_types dest)
-{
- gboolean rc = TRUE;
- char *data = dump_xml_unformatted(msg);
-
- rc = send_ais_text(crm_class_cluster, data, local, node, dest);
- free(data);
- return rc;
-}
-
void
-terminate_cs_connection(void)
+terminate_cs_connection(crm_cluster_t *cluster)
{
crm_notice("Disconnecting from Corosync");
- if (pcmk_cpg_handle) {
- crm_trace("Disconnecting CPG");
- cpg_leave(pcmk_cpg_handle, &pcmk_cpg_group);
- cpg_finalize(pcmk_cpg_handle);
- pcmk_cpg_handle = 0;
-
- } else {
- crm_info("No CPG connection");
- }
+ cluster_disconnect_cpg(cluster);
if (pcmk_quorum_handle) {
crm_trace("Disconnecting quorum");
@@ -478,284 +156,6 @@ terminate_cs_connection(void)
int ais_membership_timer = 0;
gboolean ais_membership_force = FALSE;
-static gboolean
-ais_dispatch_message(AIS_Message * msg,
- gboolean(*dispatch) (int kind, const char *from, const char *data))
-{
- char *data = NULL;
- char *uncompressed = NULL;
-
- xmlNode *xml = NULL;
-
- CRM_ASSERT(msg != NULL);
-
- crm_trace("Got new%s message (size=%d, %d, %d)",
- msg->is_compressed ? " compressed" : "",
- ais_data_len(msg), msg->size, msg->compressed_size);
-
- data = msg->data;
- if (msg->is_compressed && msg->size > 0) {
- int rc = BZ_OK;
- unsigned int new_size = msg->size + 1;
-
- if (check_message_sanity(msg, NULL) == FALSE) {
- goto badmsg;
- }
-
- crm_trace("Decompressing message data");
- uncompressed = calloc(1, new_size);
- rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, data, msg->compressed_size, 1, 0);
-
- if (rc != BZ_OK) {
- crm_err("Decompression failed: %d", rc);
- goto badmsg;
- }
-
- CRM_ASSERT(rc == BZ_OK);
- CRM_ASSERT(new_size == msg->size);
-
- data = uncompressed;
-
- } else if (check_message_sanity(msg, data) == FALSE) {
- goto badmsg;
-
- } else if (safe_str_eq("identify", data)) {
- int pid = getpid();
- char *pid_s = crm_itoa(pid);
-
- send_ais_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais);
- free(pid_s);
- goto done;
- }
-
- if (msg->header.id != crm_class_members) {
- /* Is this even needed anymore? */
- crm_get_peer(msg->sender.id, msg->sender.uname);
- }
-
- if (msg->header.id == crm_class_rmpeer) {
- uint32_t id = crm_int_helper(data, NULL);
-
- crm_info("Removing peer %s/%u", data, id);
- reap_crm_member(id, NULL);
- goto done;
- }
-
- crm_trace("Payload: %.200s", data);
- if (dispatch != NULL) {
- dispatch(msg->header.id, msg->sender.uname, data);
- }
-
- done:
- free(uncompressed);
- free_xml(xml);
- return TRUE;
-
- badmsg:
- crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
- " min=%d, total=%d, size=%d, bz2_size=%d",
- msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
- ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
- msg->sender.pid, (int)sizeof(AIS_Message),
- msg->header.size, msg->size, msg->compressed_size);
- goto done;
-}
-
-static bool cpg_evicted = FALSE;
-gboolean(*pcmk_cpg_dispatch_fn) (int kind, const char *from, const char *data) = NULL;
-
-static int
-pcmk_cpg_dispatch(gpointer user_data)
-{
- int rc = 0;
-
- pcmk_cpg_dispatch_fn = user_data;
- rc = cpg_dispatch(pcmk_cpg_handle, CS_DISPATCH_ALL);
- if (rc != CS_OK) {
- crm_err("Connection to the CPG API failed: %d", rc);
- pcmk_cpg_handle = 0;
- return -1;
-
- } else if(cpg_evicted) {
- crm_err("Evicted from CPG membership");
- return -1;
- }
- return 0;
-}
-
-static void
-pcmk_cpg_deliver(cpg_handle_t handle,
- const struct cpg_name *groupName,
- uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
-{
- AIS_Message *ais_msg = (AIS_Message *) msg;
- uint32_t local_nodeid = get_local_nodeid(handle);
- const char *local_name = get_local_node_name();
-
- if (ais_msg->sender.id > 0 && ais_msg->sender.id != nodeid) {
- crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, ais_msg->sender.id);
- return;
-
- } else if (ais_msg->host.id != 0 && (local_nodeid != ais_msg->host.id)) {
- /* Not for us */
- crm_trace("Not for us: %u != %u", ais_msg->host.id, local_nodeid);
- return;
- } else if (ais_msg->host.size != 0 && safe_str_neq(ais_msg->host.uname, local_name)) {
- /* Not for us */
- crm_trace("Not for us: %s != %s", ais_msg->host.uname, local_name);
- return;
- }
-
- ais_msg->sender.id = nodeid;
- if (ais_msg->sender.size == 0) {
- crm_node_t *peer = crm_get_peer(nodeid, NULL);
-
- if (peer == NULL) {
- crm_err("Peer with nodeid=%u is unknown", nodeid);
-
- } else if (peer->uname == NULL) {
- crm_err("No uname for peer with nodeid=%u", nodeid);
-
- } else {
- crm_notice("Fixing uname for peer with nodeid=%u", nodeid);
- ais_msg->sender.size = strlen(peer->uname);
- memset(ais_msg->sender.uname, 0, MAX_NAME);
- memcpy(ais_msg->sender.uname, peer->uname, ais_msg->sender.size);
- }
- }
-
- ais_dispatch_message(ais_msg, pcmk_cpg_dispatch_fn);
-}
-
-static void
-pcmk_cpg_membership(cpg_handle_t handle,
- const struct cpg_name *groupName,
- const struct cpg_address *member_list, size_t member_list_entries,
- const struct cpg_address *left_list, size_t left_list_entries,
- const struct cpg_address *joined_list, size_t joined_list_entries)
-{
- int i;
- gboolean found = FALSE;
- static int counter = 0;
- uint32_t local_nodeid = get_local_nodeid(handle);
-
- for (i = 0; i < left_list_entries; i++) {
- crm_node_t *peer = crm_get_peer(left_list[i].nodeid, NULL);
-
- crm_info("Left[%d.%d] %s.%u ", counter, i, groupName->value, left_list[i].nodeid);
- crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, OFFLINESTATUS);
- }
-
- for (i = 0; i < joined_list_entries; i++) {
- crm_info("Joined[%d.%d] %s.%u ", counter, i, groupName->value, joined_list[i].nodeid);
- }
-
- for (i = 0; i < member_list_entries; i++) {
- crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL);
-
- crm_info("Member[%d.%d] %s.%u ", counter, i, groupName->value, member_list[i].nodeid);
-
- /* Anyone that is sending us CPG messages must also be a _CPG_ member.
- * But its _not_ safe to assume its in the quorum membership.
- * We may have just found out its dead and are processing the last couple of messages it sent
- */
- crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS);
- if(peer && peer->state && crm_is_peer_active(peer) == FALSE) {
- time_t now = time(NULL);
-
- /* Co-opt the otherwise unused votes field */
- if(peer->votes == 0) {
- peer->votes = now;
-
- } else if(now > (60 + peer->votes)) {
- /* On the otherhand, if we're still getting messages, at a certain point
- * we need to acknowledge our internal cache is probably wrong
- *
- * Set the threshold to 1 minute
- */
- crm_err("Node %s[%u] appears to be online even though we think it is dead", peer->uname, peer->id);
- crm_update_peer_state(__FUNCTION__, peer, CRM_NODE_MEMBER, 0);
- peer->votes = 0;
- }
- }
-
- if (local_nodeid == member_list[i].nodeid) {
- found = TRUE;
- }
- }
-
- if (!found) {
- crm_err("We're not part of CPG group '%s' anymore!", groupName->value);
- cpg_evicted = TRUE;
- }
-
- counter++;
-}
-
-cpg_callbacks_t cpg_callbacks = {
- .cpg_deliver_fn = pcmk_cpg_deliver,
- .cpg_confchg_fn = pcmk_cpg_membership,
-};
-
-static gboolean
-init_cpg_connection(gboolean(*dispatch) (int kind, const char *from, const char *data),
- void (*destroy) (gpointer), uint32_t * nodeid)
-{
- int rc = -1;
- int fd = 0;
- int retries = 0;
- uint32_t id = 0;
- crm_node_t *peer = NULL;
-
- struct mainloop_fd_callbacks cpg_fd_callbacks = {
- .dispatch = pcmk_cpg_dispatch,
- .destroy = destroy,
- };
-
- cpg_evicted = FALSE;
- strncpy(pcmk_cpg_group.value, crm_system_name, 128);
- pcmk_cpg_group.length = strlen(crm_system_name) + 1;
-
- cs_repeat(retries, 30, rc = cpg_initialize(&pcmk_cpg_handle, &cpg_callbacks));
- if (rc != CS_OK) {
- crm_err("Could not connect to the Cluster Process Group API: %d\n", rc);
- goto bail;
- }
-
- id = get_local_nodeid(pcmk_cpg_handle);
- if (id == 0) {
- crm_err("Could not get local node id from the CPG API");
- goto bail;
-
- } else if(nodeid) {
- *nodeid = id;
- }
-
- retries = 0;
- cs_repeat(retries, 30, rc = cpg_join(pcmk_cpg_handle, &pcmk_cpg_group));
- if (rc != CS_OK) {
- crm_err("Could not join the CPG group '%s': %d", crm_system_name, rc);
- goto bail;
- }
-
- rc = cpg_fd_get(pcmk_cpg_handle, &fd);
- if (rc != CS_OK) {
- crm_err("Could not obtain the CPG API connection: %d\n", rc);
- goto bail;
- }
-
- mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, dispatch, &cpg_fd_callbacks);
-
- bail:
- if (rc != CS_OK) {
- cpg_finalize(pcmk_cpg_handle);
- return FALSE;
- }
-
- peer = crm_get_peer(id, NULL);
- crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS);
- return TRUE;
-}
static int
pcmk_quorum_dispatch(gpointer user_data)
@@ -940,7 +340,7 @@ init_cs_connection_once(crm_cluster_t * cluster)
return FALSE;
}
- if (init_cpg_connection(cluster->cs_dispatch, cluster->destroy, NULL) == FALSE) {
+ if (cluster_connect_cpg(cluster) == FALSE) {
return FALSE;
}
crm_info("Connection to '%s': established", name_for_cluster_type(stack));
diff --git a/lib/cluster/cpg.c b/lib/cluster/cpg.c
new file mode 100644
index 0000000..903576e
--- /dev/null
+++ b/lib/cluster/cpg.c
@@ -0,0 +1,689 @@
+/*
+ * Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include <crm_internal.h>
+#include <bzlib.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+
+#include <crm/common/ipc.h>
+#include <crm/cluster/internal.h>
+#include <crm/common/mainloop.h>
+#include <sys/utsname.h>
+
+#include <qb/qbipcc.h>
+#include <qb/qbutil.h>
+
+#include <corosync/corodefs.h>
+#include <corosync/corotypes.h>
+#include <corosync/hdb.h>
+#include <corosync/cpg.h>
+
+#include <crm/msg_xml.h>
+
+cpg_handle_t pcmk_cpg_handle = 0; /* TODO: Remove, use cluster.cpg_handle */
+
+static bool cpg_evicted = FALSE;
+gboolean(*pcmk_cpg_dispatch_fn) (int kind, const char *from, const char *data) = NULL;
+
+#define cs_repeat(counter, max, code) do { \
+ code; \
+ if(rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { \
+ counter++; \
+ crm_debug("Retrying operation after %ds", counter); \
+ sleep(counter); \
+ } else { \
+ break; \
+ } \
+ } while(counter < max)
+
+void
+cluster_disconnect_cpg(crm_cluster_t *cluster)
+{
+ pcmk_cpg_handle = 0;
+ if (cluster->cpg_handle) {
+ crm_trace("Disconnecting CPG");
+ cpg_leave(cluster->cpg_handle, &cluster->group);
+ cpg_finalize(cluster->cpg_handle);
+ cluster->cpg_handle = 0;
+
+ } else {
+ crm_info("No CPG connection");
+ }
+}
+
+uint32_t get_local_nodeid(cpg_handle_t handle)
+{
+ int rc = CS_OK;
+ int retries = 0;
+ static uint32_t local_nodeid = 0;
+ cpg_handle_t local_handle = handle;
+ cpg_callbacks_t cb = { };
+
+ if(local_nodeid != 0) {
+ return local_nodeid;
+ }
+
+#if 0
+ /* Should not be necessary */
+ if(get_cluster_type() == pcmk_cluster_classic_ais) {
+ get_ais_details(&local_nodeid, NULL);
+ goto done;
+ }
+#endif
+
+ if(handle == 0) {
+ crm_trace("Creating connection");
+ cs_repeat(retries, 5, rc = cpg_initialize(&local_handle, &cb));
+ }
+
+ if (rc == CS_OK) {
+ retries = 0;
+ crm_trace("Performing lookup");
+ cs_repeat(retries, 5, rc = cpg_local_get(local_handle, &local_nodeid));
+ }
+
+ if (rc != CS_OK) {
+ crm_err("Could not get local node id from the CPG API: %s (%d)", ais_error2text(rc), rc);
+ }
+ if(handle == 0) {
+ crm_trace("Closing connection");
+ cpg_finalize(local_handle);
+ }
+ crm_debug("Local nodeid is %u", local_nodeid);
+ return local_nodeid;
+}
+
+
+GListPtr cs_message_queue = NULL;
+int cs_message_timer = 0;
+
+static ssize_t crm_cs_flush(gpointer data);
+
+static gboolean
+crm_cs_flush_cb(gpointer data)
+{
+ cs_message_timer = 0;
+ crm_cs_flush(data);
+ return FALSE;
+}
+
+#define CS_SEND_MAX 200
+static ssize_t
+crm_cs_flush(gpointer data)
+{
+ int sent = 0;
+ ssize_t rc = 0;
+ int queue_len = 0;
+ static unsigned int last_sent = 0;
+ cpg_handle_t *handle = (cpg_handle_t *)data;
+
+ if (*handle == 0) {
+ crm_trace("Connection is dead");
+ return pcmk_ok;
+ }
+
+ queue_len = g_list_length(cs_message_queue);
+ if ((queue_len % 1000) == 0 && queue_len > 1) {
+ crm_err("CPG queue has grown to %d", queue_len);
+
+ } else if (queue_len == CS_SEND_MAX) {
+ crm_warn("CPG queue has grown to %d", queue_len);
+ }
+
+ if (cs_message_timer) {
+ /* There is already a timer, wait until it goes off */
+ crm_trace("Timer active %d", cs_message_timer);
+ return pcmk_ok;
+ }
+
+ while (cs_message_queue && sent < CS_SEND_MAX) {
+ struct iovec *iov = cs_message_queue->data;
+
+ errno = 0;
+ rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
+
+ if (rc != CS_OK) {
+ break;
+ }
+
+ sent++;
+ last_sent++;
+ crm_trace("CPG message sent, size=%d", iov->iov_len);
+
+ cs_message_queue = g_list_remove(cs_message_queue, iov);
+ free(iov[0].iov_base);
+ free(iov);
+ }
+
+ queue_len -= sent;
+ if (sent > 1 || cs_message_queue) {
+ crm_info("Sent %d CPG messages (%d remaining, last=%u): %s (%d)",
+ sent, queue_len, last_sent, ais_error2text(rc), rc);
+ } else {
+ crm_trace("Sent %d CPG messages (%d remaining, last=%u): %s (%d)",
+ sent, queue_len, last_sent, ais_error2text(rc), rc);
+ }
+
+ if (cs_message_queue) {
+ uint32_t delay_ms = 100;
+ if(rc != CS_OK) {
+ /* Proportionally more if sending failed but cap at 1s */
+ delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
+ }
+ cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, data);
+ }
+
+ return rc;
+}
+
+gboolean
+send_cpg_iov(struct iovec * iov)
+{
+ static unsigned int queued = 0;
+
+ queued++;
+ crm_trace("Queueing CPG message %u (%d bytes)", queued, iov->iov_len);
+ cs_message_queue = g_list_append(cs_message_queue, iov);
+ crm_cs_flush(&pcmk_cpg_handle);
+ return TRUE;
+}
+
+static int
+pcmk_cpg_dispatch(gpointer user_data)
+{
+ int rc = 0;
+ crm_cluster_t *cluster = (crm_cluster_t*) user_data;
+
+ rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ALL);
+ if (rc != CS_OK) {
+ crm_err("Connection to the CPG API failed: %s (%d)", ais_error2text(rc), rc);
+ cluster->cpg_handle = 0;
+ return -1;
+
+ } else if(cpg_evicted) {
+ crm_err("Evicted from CPG membership");
+ return -1;
+ }
+ return 0;
+}
+
+/*
+static void
+pcmk_cpg_deliver_message(cpg_handle_t handle,
+ const struct cpg_name *groupName,
+ uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
+{
+ uint32_t kind = 0;
+ const char *from = NULL;
+ char *data = pcmk_message_common_cs(handle, nodeid, pid, msg, &kind, &from);
+
+ free(data);
+}
+*/
+
+char *
+pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content,
+ uint32_t *kind, const char **from)
+{
+ char *data = NULL;
+ AIS_Message *msg = (AIS_Message *) content;
+
+ if(handle) {
+ /* 'msg' came from CPG not the plugin
+ * Do filtering and field massaging
+ */
+ uint32_t local_nodeid = get_local_nodeid(handle);
+ const char *local_name = get_local_node_name();
+
+ if (msg->sender.id > 0 && msg->sender.id != nodeid) {
+ crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, msg->sender.id);
+ return NULL;
+
+ } else if (msg->host.id != 0 && (local_nodeid != msg->host.id)) {
+ /* Not for us */
+ crm_trace("Not for us: %u != %u", msg->host.id, local_nodeid);
+ return NULL;
+ } else if (msg->host.size != 0 && safe_str_neq(msg->host.uname, local_name)) {
+ /* Not for us */
+ crm_trace("Not for us: %s != %s", msg->host.uname, local_name);
+ return NULL;
+ }
+
+ msg->sender.id = nodeid;
+ if (msg->sender.size == 0) {
+ crm_node_t *peer = crm_get_peer(nodeid, NULL);
+
+ if (peer == NULL) {
+ crm_err("Peer with nodeid=%u is unknown", nodeid);
+
+ } else if (peer->uname == NULL) {
+ crm_err("No uname for peer with nodeid=%u", nodeid);
+
+ } else {
+ crm_notice("Fixing uname for peer with nodeid=%u", nodeid);
+ msg->sender.size = strlen(peer->uname);
+ memset(msg->sender.uname, 0, MAX_NAME);
+ memcpy(msg->sender.uname, peer->uname, msg->sender.size);
+ }
+ }
+ }
+
+ crm_trace("Got new%s message (size=%d, %d, %d)",
+ msg->is_compressed ? " compressed" : "",
+ ais_data_len(msg), msg->size, msg->compressed_size);
+
+ if (kind != NULL) {
+ *kind = msg->header.id;
+ }
+ if (from != NULL) {
+ *from = msg->sender.uname;
+ }
+
+ if (msg->is_compressed && msg->size > 0) {
+ int rc = BZ_OK;
+ char *uncompressed = NULL;
+ unsigned int new_size = msg->size + 1;
+
+ if (check_message_sanity(msg, NULL) == FALSE) {
+ goto badmsg;
+ }
+
+ crm_trace("Decompressing message data");
+ uncompressed = calloc(1, new_size);
+ rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0);
+
+ if (rc != BZ_OK) {
+ crm_err("Decompression failed: %d", rc);
+ goto badmsg;
+ }
+
+ CRM_ASSERT(rc == BZ_OK);
+ CRM_ASSERT(new_size == msg->size);
+
+ data = uncompressed;
+
+ } else if (check_message_sanity(msg, data) == FALSE) {
+ goto badmsg;
+
+ } else if (safe_str_eq("identify", data)) {
+ int pid = getpid();
+ char *pid_s = crm_itoa(pid);
+
+ send_cluster_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais);
+ free(pid_s);
+ return NULL;
+
+ } else {
+ data = strdup(msg->data);
+ }
+
+ if (msg->header.id != crm_class_members) {
+ /* Is this even needed anymore? */
+ crm_get_peer(msg->sender.id, msg->sender.uname);
+ }
+
+ if (msg->header.id == crm_class_rmpeer) {
+ uint32_t id = crm_int_helper(data, NULL);
+
+ crm_info("Removing peer %s/%u", data, id);
+ reap_crm_member(id, NULL);
+ free(data);
+ return NULL;
+
+#if SUPPORT_PLUGIN
+ } else if (is_classic_ais_cluster()) {
+ plugin_handle_membership(msg);
+#endif
+ }
+
+ crm_trace("Payload: %.200s", data);
+ return data;
+
+ badmsg:
+ crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
+ " min=%d, total=%d, size=%d, bz2_size=%d",
+ msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
+ ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
+ msg->sender.pid, (int)sizeof(AIS_Message),
+ msg->header.size, msg->size, msg->compressed_size);
+
+ free(data);
+ return NULL;
+}
+
+void
+pcmk_cpg_membership(cpg_handle_t handle,
+ const struct cpg_name *groupName,
+ const struct cpg_address *member_list, size_t member_list_entries,
+ const struct cpg_address *left_list, size_t left_list_entries,
+ const struct cpg_address *joined_list, size_t joined_list_entries)
+{
+ int i;
+ gboolean found = FALSE;
+ static int counter = 0;
+ uint32_t local_nodeid = get_local_nodeid(handle);
+
+ for (i = 0; i < left_list_entries; i++) {
+ crm_node_t *peer = crm_get_peer(left_list[i].nodeid, NULL);
+
+ crm_info("Left[%d.%d] %s.%u ", counter, i, groupName->value, left_list[i].nodeid);
+ crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, OFFLINESTATUS);
+ }
+
+ for (i = 0; i < joined_list_entries; i++) {
+ crm_info("Joined[%d.%d] %s.%u ", counter, i, groupName->value, joined_list[i].nodeid);
+ }
+
+ for (i = 0; i < member_list_entries; i++) {
+ crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL);
+
+ crm_info("Member[%d.%d] %s.%u ", counter, i, groupName->value, member_list[i].nodeid);
+
+ /* Anyone that is sending us CPG messages must also be a _CPG_ member.
+ * But its _not_ safe to assume its in the quorum membership.
+ * We may have just found out its dead and are processing the last couple of messages it sent
+ */
+ crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS);
+ if(peer && peer->state && crm_is_peer_active(peer) == FALSE) {
+ time_t now = time(NULL);
+
+ /* Co-opt the otherwise unused votes field */
+ if(peer->votes == 0) {
+ peer->votes = now;
+
+ } else if(now > (60 + peer->votes)) {
+ /* On the otherhand, if we're still getting messages, at a certain point
+ * we need to acknowledge our internal cache is probably wrong
+ *
+ * Set the threshold to 1 minute
+ */
+ crm_err("Node %s[%u] appears to be online even though we think it is dead", peer->uname, peer->id);
+ crm_update_peer_state(__FUNCTION__, peer, CRM_NODE_MEMBER, 0);
+ peer->votes = 0;
+ }
+ }
+
+ if (local_nodeid == member_list[i].nodeid) {
+ found = TRUE;
+ }
+ }
+
+ if (!found) {
+ crm_err("We're not part of CPG group '%s' anymore!", groupName->value);
+ cpg_evicted = TRUE;
+ }
+
+ counter++;
+}
+
+gboolean
+cluster_connect_cpg(crm_cluster_t *cluster)
+{
+ int rc = -1;
+ int fd = 0;
+ int retries = 0;
+ uint32_t id = 0;
+ crm_node_t *peer = NULL;
+ cpg_handle_t handle = 0;
+
+ struct mainloop_fd_callbacks cpg_fd_callbacks = {
+ .dispatch = pcmk_cpg_dispatch,
+ .destroy = cluster->destroy,
+ };
+
+ cpg_callbacks_t cpg_callbacks = {
+ .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
+ .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
+ /* .cpg_deliver_fn = pcmk_cpg_deliver, */
+ /* .cpg_confchg_fn = pcmk_cpg_membership, */
+ };
+
+ cpg_evicted = FALSE;
+ cluster->group.length = 0;
+ cluster->group.value[0] = 0;
+
+ strncpy(cluster->group.value, crm_system_name, 128);
+ cluster->group.length = strlen(crm_system_name) + 1;
+
+ cs_repeat(retries, 30, rc = cpg_initialize(&handle, &cpg_callbacks));
+ if (rc != CS_OK) {
+ crm_err("Could not connect to the Cluster Process Group API: %d\n", rc);
+ goto bail;
+ }
+
+ id = get_local_nodeid(handle);
+ if (id == 0) {
+ crm_err("Could not get local node id from the CPG API");
+ goto bail;
+
+ }
+ cluster->nodeid = id;
+
+ retries = 0;
+ cs_repeat(retries, 30, rc = cpg_join(handle, &cluster->group));
+ if (rc != CS_OK) {
+ crm_err("Could not join the CPG group '%s': %d", crm_system_name, rc);
+ goto bail;
+ }
+
+ rc = cpg_fd_get(handle, &fd);
+ if (rc != CS_OK) {
+ crm_err("Could not obtain the CPG API connection: %d\n", rc);
+ goto bail;
+ }
+
+ pcmk_cpg_handle = handle;
+ cluster->cpg_handle = handle;
+ mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
+
+ bail:
+ if (rc != CS_OK) {
+ cpg_finalize(handle);
+ return FALSE;
+ }
+
+ peer = crm_get_peer(id, NULL);
+ crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS);
+ return TRUE;
+}
+
+gboolean
+send_cluster_message_cs(xmlNode * msg, gboolean local, crm_node_t * node, enum crm_ais_msg_types dest)
+{
+ gboolean rc = TRUE;
+ char *data = NULL;
+
+ data = dump_xml_unformatted(msg);
+ rc = send_cluster_text(crm_class_cluster, data, local, node, dest);
+ free(data);
+ return rc;
+}
+
+gboolean
+send_cluster_text(int class, const char *data,
+ gboolean local, crm_node_t * node, enum crm_ais_msg_types dest)
+{
+ static int msg_id = 0;
+ static int local_pid = 0;
+ static int local_name_len = 0;
+ static const char *local_name = NULL;
+
+ char *target = NULL;
+ struct iovec *iov;
+ AIS_Message *msg = NULL;
+ enum crm_ais_msg_types sender = text2msg_type(crm_system_name);
+
+ /* There are only 6 handlers registered to crm_lib_service in plugin.c */
+ CRM_CHECK(class < 6, crm_err("Invalid message class: %d", class);
+ return FALSE);
+
+#if !SUPPORT_PLUGIN
+ CRM_CHECK(dest != crm_msg_ais, return FALSE);
+#endif
+
+ if(local_name == NULL) {
+ local_name = get_local_node_name();
+ }
+ if(local_name_len == 0 && local_name) {
+ local_name_len = strlen(local_name);
+ }
+
+ if (data == NULL) {
+ data = "";
+ }
+
+ if (local_pid == 0) {
+ local_pid = getpid();
+ }
+
+ if (sender == crm_msg_none) {
+ sender = local_pid;
+ }
+
+ msg = calloc(1, sizeof(AIS_Message));
+
+ msg_id++;
+ msg->id = msg_id;
+ msg->header.id = class;
+ msg->header.error = CS_OK;
+
+ msg->host.type = dest;
+ msg->host.local = local;
+
+ if (node) {
+ if (node->uname) {
+ target = strdup(node->uname);
+ msg->host.size = strlen(node->uname);
+ memset(msg->host.uname, 0, MAX_NAME);
+ memcpy(msg->host.uname, node->uname, msg->host.size);
+ } else {
+ target = g_strdup_printf("%u", node->id);
+ }
+ msg->host.id = node->id;
+ } else {
+ target = strdup("all");
+ }
+
+ msg->sender.id = 0;
+ msg->sender.type = sender;
+ msg->sender.pid = local_pid;
+ msg->sender.size = local_name_len;
+ memset(msg->sender.uname, 0, MAX_NAME);
+ memcpy(msg->sender.uname, local_name, msg->sender.size);
+
+ msg->size = 1 + strlen(data);
+ msg->header.size = sizeof(AIS_Message) + msg->size;
+
+ if (msg->size < CRM_BZ2_THRESHOLD) {
+ msg = realloc(msg, msg->header.size);
+ memcpy(msg->data, data, msg->size);
+
+ } else {
+ char *compressed = NULL;
+ unsigned int new_size = 0;
+ char *uncompressed = strdup(data);
+
+ if (crm_compress_string(uncompressed, msg->size, 0, &compressed, &new_size)) {
+
+ msg->header.size = sizeof(AIS_Message) + new_size + 1;
+ msg = realloc(msg, msg->header.size);
+ memcpy(msg->data, compressed, new_size);
+ msg->data[new_size] = 0;
+
+ msg->is_compressed = TRUE;
+ msg->compressed_size = new_size;
+
+ } else {
+ msg = realloc(msg, msg->header.size);
+ memcpy(msg->data, data, msg->size);
+ }
+
+ free(uncompressed);
+ free(compressed);
+ }
+
+ iov = calloc(1, sizeof(struct iovec));
+ iov->iov_base = msg;
+ iov->iov_len = msg->header.size;
+
+ if (msg->compressed_size) {
+ crm_trace("Queueing CPG message %u to %s (%d bytes, %d bytes compressed payload): %.200s",
+ msg->id, target, iov->iov_len, msg->compressed_size, data);
+ } else {
+ crm_trace("Queueing CPG message %u to %s (%d bytes, %d bytes payload): %.200s",
+ msg->id, target, iov->iov_len, msg->size, data);
+ }
+
+#if SUPPORT_PLUGIN
+ /* The plugin is the only time we dont use CPG messaging */
+ if(get_cluster_type() == pcmk_cluster_classic_ais) {
+ return send_plugin_text(class, iov);
+ }
+#endif
+
+ send_cpg_iov(iov);
+
+ free(target);
+ return TRUE;
+}
+
+enum crm_ais_msg_types
+text2msg_type(const char *text)
+{
+ int type = crm_msg_none;
+
+ CRM_CHECK(text != NULL, return type);
+ if (safe_str_eq(text, "ais")) {
+ type = crm_msg_ais;
+ } else if (safe_str_eq(text, "crm_plugin")) {
+ type = crm_msg_ais;
+ } else if (safe_str_eq(text, CRM_SYSTEM_CIB)) {
+ type = crm_msg_cib;
+ } else if (safe_str_eq(text, CRM_SYSTEM_CRMD)) {
+ type = crm_msg_crmd;
+ } else if (safe_str_eq(text, CRM_SYSTEM_DC)) {
+ type = crm_msg_crmd;
+ } else if (safe_str_eq(text, CRM_SYSTEM_TENGINE)) {
+ type = crm_msg_te;
+ } else if (safe_str_eq(text, CRM_SYSTEM_PENGINE)) {
+ type = crm_msg_pe;
+ } else if (safe_str_eq(text, CRM_SYSTEM_LRMD)) {
+ type = crm_msg_lrmd;
+ } else if (safe_str_eq(text, CRM_SYSTEM_STONITHD)) {
+ type = crm_msg_stonithd;
+ } else if (safe_str_eq(text, "stonith-ng")) {
+ type = crm_msg_stonith_ng;
+ } else if (safe_str_eq(text, "attrd")) {
+ type = crm_msg_attrd;
+
+ } else {
+ /* This will normally be a transient client rather than
+ * a cluster daemon. Set the type to the pid of the client
+ */
+ int scan_rc = sscanf(text, "%d", &type);
+
+ if (scan_rc != 1) {
+ /* Ensure its sane */
+ type = crm_msg_none;
+ }
+ }
+ return type;
+}
diff --git a/lib/cluster/legacy.c b/lib/cluster/legacy.c
index 14749e4..8b16f7e 100644
--- a/lib/cluster/legacy.c
+++ b/lib/cluster/legacy.c
@@ -31,12 +31,6 @@
# include <corosync/corodefs.h>
# include <corosync/cpg.h>
# include <corosync/cfg.h>
-cpg_handle_t pcmk_cpg_handle = 0;
-
-struct cpg_name pcmk_cpg_group = {
- .length = 0,
- .value[0] = 0,
-};
#endif
#if HAVE_CMAP
@@ -50,88 +44,8 @@ cman_handle_t pcmk_cman_handle = NULL;
int ais_membership_timer = 0;
gboolean ais_membership_force = FALSE;
-int ais_dispatch(gpointer user_data);
-
-#define cs_repeat(counter, max, code) do { \
- code; \
- if(rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { \
- counter++; \
- crm_debug("Retrying operation after %ds", counter); \
- sleep(counter); \
- } else { \
- break; \
- } \
- } while(counter < max)
-
-enum crm_ais_msg_types
-text2msg_type(const char *text)
-{
- int type = crm_msg_none;
-
- CRM_CHECK(text != NULL, return type);
- if (safe_str_eq(text, "ais")) {
- type = crm_msg_ais;
- } else if (safe_str_eq(text, "crm_plugin")) {
- type = crm_msg_ais;
- } else if (safe_str_eq(text, CRM_SYSTEM_CIB)) {
- type = crm_msg_cib;
- } else if (safe_str_eq(text, CRM_SYSTEM_CRMD)) {
- type = crm_msg_crmd;
- } else if (safe_str_eq(text, CRM_SYSTEM_DC)) {
- type = crm_msg_crmd;
- } else if (safe_str_eq(text, CRM_SYSTEM_TENGINE)) {
- type = crm_msg_te;
- } else if (safe_str_eq(text, CRM_SYSTEM_PENGINE)) {
- type = crm_msg_pe;
- } else if (safe_str_eq(text, CRM_SYSTEM_LRMD)) {
- type = crm_msg_lrmd;
- } else if (safe_str_eq(text, CRM_SYSTEM_STONITHD)) {
- type = crm_msg_stonithd;
- } else if (safe_str_eq(text, "stonith-ng")) {
- type = crm_msg_stonith_ng;
- } else if (safe_str_eq(text, "attrd")) {
- type = crm_msg_attrd;
-
- } else {
- /* This will normally be a transient client rather than
- * a cluster daemon. Set the type to the pid of the client
- */
- int scan_rc = sscanf(text, "%d", &type);
-
- if (scan_rc != 1 || type <= crm_msg_stonith_ng) {
- /* Ensure its sane */
- type = crm_msg_none;
- }
- }
- return type;
-}
+int plugin_dispatch(gpointer user_data);
-char *
-get_ais_data(const AIS_Message * msg)
-{
- int rc = BZ_OK;
- char *uncompressed = NULL;
- unsigned int new_size = msg->size + 1;
-
- if (msg->is_compressed == FALSE) {
- crm_trace("Returning uncompressed message data");
- uncompressed = strdup(msg->data);
-
- } else {
- crm_trace("Decompressing message data");
- uncompressed = calloc(1, new_size);
-
- rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, (char *)msg->data,
- msg->compressed_size, 1, 0);
-
- CRM_ASSERT(rc == BZ_OK);
- CRM_ASSERT(new_size == msg->size);
- }
-
- return uncompressed;
-}
-
-#if SUPPORT_COROSYNC
int ais_fd_sync = -1;
int ais_fd_async = -1; /* never send messages via this channel */
void *ais_ipc_ctx = NULL;
@@ -160,9 +74,6 @@ get_ais_details(uint32_t * id, char **uname)
header.id = crm_class_nodeid;
header.size = sizeof(cs_ipc_header_response_t);
- CRM_CHECK(id != NULL, return FALSE);
- CRM_CHECK(uname != NULL, return FALSE);
-
iov.iov_base = &header;
iov.iov_len = header.size;
@@ -203,140 +114,7 @@ get_ais_details(uint32_t * id, char **uname)
return TRUE;
}
-static uint32_t get_local_nodeid(cpg_handle_t handle)
-{
- int rc = CS_OK;
- int retries = 0;
- static uint32_t local_nodeid = 0;
- cpg_handle_t local_handle = handle;
- cpg_callbacks_t cb = { };
-
- if(local_nodeid != 0) {
- return local_nodeid;
- }
-
-#if 0
- /* Should not be necessary */
- if(get_cluster_type() == pcmk_cluster_classic_ais) {
- get_ais_details(&local_nodeid, NULL);
- goto done;
- }
-#endif
-
- if(local_handle == 0) {
- crm_trace("Creating connection");
- cs_repeat(retries, 5, rc = cpg_initialize(&local_handle, &cb));
- }
-
- if (rc == CS_OK) {
- retries = 0;
- crm_trace("Performing lookup");
- cs_repeat(retries, 5, rc = cpg_local_get(local_handle, &local_nodeid));
- }
-
- if (rc != CS_OK) {
- crm_err("Could not get local node id from the CPG API: %s (%d)", ais_error2text(rc), rc);
- }
-
- if(handle != local_handle) {
- crm_trace("Closing connection %u", local_handle);
- cpg_finalize(local_handle);
- }
-
- crm_debug("Local nodeid is %u", local_nodeid);
- return local_nodeid;
-}
-
-GListPtr cs_message_queue = NULL;
-int cs_message_timer = 0;
-
-static ssize_t crm_cs_flush(void);
-
-static gboolean
-crm_cs_flush_cb(gpointer data)
-{
- cs_message_timer = 0;
- crm_cs_flush();
- return FALSE;
-}
-
-#define CS_SEND_MAX 200
-static ssize_t
-crm_cs_flush(void)
-{
- int sent = 0;
- ssize_t rc = 0;
- int queue_len = 0;
- static unsigned int last_sent = 0;
-
- if (pcmk_cpg_handle == 0) {
- crm_trace("Connection is dead");
- return pcmk_ok;
- }
-
- queue_len = g_list_length(cs_message_queue);
- if ((queue_len % 1000) == 0 && queue_len > 1) {
- crm_err("CPG queue has grown to %d", queue_len);
-
- } else if (queue_len == CS_SEND_MAX) {
- crm_warn("CPG queue has grown to %d", queue_len);
- }
-
- if (cs_message_timer) {
- /* There is already a timer, wait until it goes off */
- crm_trace("Timer active %d", cs_message_timer);
- return pcmk_ok;
- }
-
- while (cs_message_queue && sent < CS_SEND_MAX) {
- AIS_Message *header = NULL;
- struct iovec *iov = cs_message_queue->data;
-
- errno = 0;
- rc = cpg_mcast_joined(pcmk_cpg_handle, CPG_TYPE_AGREED, iov, 1);
-
- if (rc != CS_OK) {
- break;
- }
-
- sent++;
- header = iov->iov_base;
- last_sent = header->id;
- if (header->compressed_size) {
- crm_trace("CPG message %d (%d compressed bytes) sent",
- header->id, header->compressed_size);
- } else {
- crm_trace("CPG message %d (%d bytes) sent: %.200s",
- header->id, header->size, header->data);
- }
-
- cs_message_queue = g_list_remove(cs_message_queue, iov);
- free(iov[0].iov_base);
- free(iov);
- }
-
- queue_len -= sent;
- if (sent > 1 || cs_message_queue) {
- crm_info("Sent %d CPG messages (%d remaining, last=%u): %s (%d)",
- sent, queue_len, last_sent, ais_error2text(rc), rc);
- } else {
- crm_trace("Sent %d CPG messages (%d remaining, last=%u): %s (%d)",
- sent, queue_len, last_sent, ais_error2text(rc), rc);
- }
-
- if (cs_message_queue) {
- uint32_t delay_ms = 100;
- if(rc != CS_OK) {
- /* Proportionally more if sending failed but cap at 1s */
- delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
- }
- cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, NULL);
- }
-
- return rc;
-}
-
-static bool
+bool
send_plugin_text(int class, struct iovec *iov)
{
int rc = CS_OK;
@@ -386,154 +164,8 @@ send_plugin_text(int class, struct iovec *iov)
return (rc == CS_OK);
}
-gboolean
-send_ais_text(int class, const char *data,
- gboolean local, crm_node_t * node, enum crm_ais_msg_types dest)
-{
- static int msg_id = 0;
- static int local_pid = 0;
- static int local_name_len = 0;
- static const char *local_name = NULL;
-
- char *target = NULL;
- struct iovec *iov;
- AIS_Message *ais_msg = NULL;
- enum cluster_type_e cluster_type = get_cluster_type();
- enum crm_ais_msg_types sender = text2msg_type(crm_system_name);
-
- /* There are only 6 handlers registered to crm_lib_service in plugin.c */
- CRM_CHECK(class < 6, crm_err("Invalid message class: %d", class);
- return FALSE);
-
- CRM_CHECK(dest != crm_msg_ais, return FALSE);
-
- if(local_name == NULL) {
- local_name = get_local_node_name();
- }
- if(local_name_len == 0 && local_name) {
- local_name_len = strlen(local_name);
- }
-
- if (data == NULL) {
- data = "";
- }
-
- if (local_pid == 0) {
- local_pid = getpid();
- }
-
- if (sender == crm_msg_none) {
- sender = local_pid;
- }
-
- ais_msg = calloc(1, sizeof(AIS_Message));
-
- ais_msg->id = msg_id++;
- ais_msg->header.id = class;
- ais_msg->header.error = CS_OK;
-
- ais_msg->host.type = dest;
- ais_msg->host.local = local;
-
- if (node) {
- if (node->uname) {
- target = strdup(node->uname);
- ais_msg->host.size = strlen(node->uname);
- memset(ais_msg->host.uname, 0, MAX_NAME);
- memcpy(ais_msg->host.uname, node->uname, ais_msg->host.size);
- } else {
- target = g_strdup_printf("%u", node->id);
- }
- ais_msg->host.id = node->id;
- } else {
- target = strdup("all");
- }
-
- ais_msg->sender.id = 0;
- ais_msg->sender.type = sender;
- ais_msg->sender.pid = local_pid;
- ais_msg->sender.size = local_name_len;
- memset(ais_msg->sender.uname, 0, MAX_NAME);
- memcpy(ais_msg->sender.uname, local_name, ais_msg->sender.size);
-
- ais_msg->size = 1 + strlen(data);
- ais_msg->header.size = sizeof(AIS_Message) + ais_msg->size;
-
- if (ais_msg->size < CRM_BZ2_THRESHOLD) {
- ais_msg = realloc(ais_msg, ais_msg->header.size);
- memcpy(ais_msg->data, data, ais_msg->size);
-
- } else {
- char *compressed = NULL;
- unsigned int new_size = 0;
- char *uncompressed = strdup(data);
-
- if (crm_compress_string(uncompressed, ais_msg->size, 0, &compressed, &new_size)) {
-
- ais_msg->header.size = sizeof(AIS_Message) + new_size + 1;
- ais_msg = realloc(ais_msg, ais_msg->header.size);
- memcpy(ais_msg->data, compressed, new_size);
- ais_msg->data[new_size] = 0;
-
- ais_msg->is_compressed = TRUE;
- ais_msg->compressed_size = new_size;
-
- } else {
- ais_msg = realloc(ais_msg, ais_msg->header.size);
- memcpy(ais_msg->data, data, ais_msg->size);
- }
-
- free(uncompressed);
- free(compressed);
- }
-
- iov = calloc(1, sizeof(struct iovec));
- iov->iov_base = ais_msg;
- iov->iov_len = ais_msg->header.size;
-
- if (ais_msg->compressed_size) {
- crm_trace("Queueing %s message %u to %s (%d compressed bytes)",
- cluster_type == pcmk_cluster_classic_ais?"plugin":"CPG",
- ais_msg->id, target, ais_msg->compressed_size);
- } else {
- crm_trace("Queueing %s message %u to %s (%d bytes)",
- cluster_type == pcmk_cluster_classic_ais?"plugin":"CPG",
- ais_msg->id, target, ais_msg->size);
- }
-
- /* The plugin is the only time we dont use CPG messaging */
- if(cluster_type == pcmk_cluster_classic_ais) {
- return send_plugin_text(class, iov);
- }
-
- cs_message_queue = g_list_append(cs_message_queue, iov);
- crm_cs_flush();
-
- free(target);
- return TRUE;
-}
-
-gboolean
-send_ais_message(xmlNode * msg, gboolean local, crm_node_t * node, enum crm_ais_msg_types dest)
-{
- gboolean rc = TRUE;
- char *data = NULL;
-
- if (is_classic_ais_cluster()) {
- if (ais_fd_async < 0) {
- crm_err("Not connected to AIS: %d", ais_fd_async);
- return FALSE;
- }
- }
-
- data = dump_xml_unformatted(msg);
- rc = send_ais_text(crm_class_cluster, data, local, node, dest);
- free(data);
- return rc;
-}
-
void
-terminate_cs_connection(void)
+terminate_cs_connection(crm_cluster_t *cluster)
{
crm_notice("Disconnecting from Corosync");
@@ -545,20 +177,8 @@ terminate_cs_connection(void)
} else {
crm_info("No plugin connection");
}
-
- } else {
- if (pcmk_cpg_handle) {
- crm_info("Disconnecting CPG");
- if (cpg_leave(pcmk_cpg_handle, &pcmk_cpg_group) == CS_OK) {
- crm_info("Destroying CPG");
- cpg_finalize(pcmk_cpg_handle);
- }
- pcmk_cpg_handle = 0;
-
- } else {
- crm_info("No CPG connection");
- }
}
+ cluster_disconnect_cpg(cluster);
# if SUPPORT_CMAN
if (is_cman_cluster()) {
@@ -578,155 +198,66 @@ terminate_cs_connection(void)
ais_fd_sync = -1;
}
-static crm_node_t *
-crm_update_ais_node(xmlNode * member, long long seq)
-{
- const char *id_s = crm_element_value(member, "id");
- const char *addr = crm_element_value(member, "addr");
- const char *uname = crm_element_value(member, "uname");
- const char *state = crm_element_value(member, "state");
- const char *born_s = crm_element_value(member, "born");
- const char *seen_s = crm_element_value(member, "seen");
- const char *votes_s = crm_element_value(member, "votes");
- const char *procs_s = crm_element_value(member, "processes");
-
- int votes = crm_int_helper(votes_s, NULL);
- unsigned int id = crm_int_helper(id_s, NULL);
- unsigned int procs = crm_int_helper(procs_s, NULL);
-
- /* TODO: These values will contain garbage if version < 0.7.1 */
- uint64_t born = crm_int_helper(born_s, NULL);
- uint64_t seen = crm_int_helper(seen_s, NULL);
-
- return crm_update_peer(__FUNCTION__, id, born, seen, votes, procs, uname, uname, addr, state);
-}
-
-static gboolean
-ais_dispatch_message(AIS_Message * msg,
- gboolean(*dispatch) (int kind, const char *from, const char *data))
+void
+plugin_handle_membership(AIS_Message *msg)
{
- char *data = NULL;
- char *uncompressed = NULL;
-
- xmlNode *xml = NULL;
+ if (msg->header.id == crm_class_members || msg->header.id == crm_class_quorum) {
+ xmlNode *member = NULL;
+ const char *value = NULL;
+ gboolean quorate = FALSE;
+ xmlNode *xml = string2xml(msg->data);
- CRM_ASSERT(msg != NULL);
-
- crm_trace("Got new%s message (size=%d, %d, %d)",
- msg->is_compressed ? " compressed" : "",
- ais_data_len(msg), msg->size, msg->compressed_size);
-
- data = msg->data;
- if (msg->is_compressed && msg->size > 0) {
- int rc = BZ_OK;
- unsigned int new_size = msg->size + 1;
-
- if (check_message_sanity(msg, NULL) == FALSE) {
- goto badmsg;
+ if (xml == NULL) {
+ crm_err("Invalid membership update: %s", msg->data);
+ return;
}
- crm_trace("Decompressing message data");
- uncompressed = calloc(1, new_size);
- rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, data, msg->compressed_size, 1, 0);
-
- if (rc != BZ_OK) {
- crm_err("Decompression failed: %d", rc);
- goto badmsg;
+ value = crm_element_value(xml, "quorate");
+ CRM_CHECK(value != NULL, crm_log_xml_err(xml, "No quorum value:"); return);
+ if (crm_is_true(value)) {
+ quorate = TRUE;
}
- CRM_ASSERT(rc == BZ_OK);
- CRM_ASSERT(new_size == msg->size);
-
- data = uncompressed;
-
- } else if (check_message_sanity(msg, data) == FALSE) {
- goto badmsg;
-
- } else if (safe_str_eq("identify", data)) {
- int pid = getpid();
- char *pid_s = crm_itoa(pid);
-
- send_ais_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais);
- free(pid_s);
- goto done;
- }
-
- if (msg->header.id != crm_class_members) {
- crm_get_peer(msg->sender.id, msg->sender.uname);
- }
-
- if (msg->header.id == crm_class_rmpeer) {
- uint32_t id = crm_int_helper(data, NULL);
-
- crm_info("Removing peer %s/%u", data, id);
- reap_crm_member(id, NULL);
- goto done;
-
- } else if (is_classic_ais_cluster()) {
- if (msg->header.id == crm_class_members || msg->header.id == crm_class_quorum) {
- xmlNode *node = NULL;
- const char *value = NULL;
- gboolean quorate = FALSE;
-
- xml = string2xml(data);
- if (xml == NULL) {
- crm_err("Invalid membership update: %s", data);
- goto badmsg;
- }
-
- value = crm_element_value(xml, "quorate");
- CRM_CHECK(value != NULL, crm_log_xml_err(xml, "No quorum value:");
- goto badmsg);
- if (crm_is_true(value)) {
- quorate = TRUE;
- }
-
- value = crm_element_value(xml, "id");
- CRM_CHECK(value != NULL, crm_log_xml_err(xml, "No membership id");
- goto badmsg);
- crm_peer_seq = crm_int_helper(value, NULL);
+ value = crm_element_value(xml, "id");
+ CRM_CHECK(value != NULL, crm_log_xml_err(xml, "No membership id"); return);
+ crm_peer_seq = crm_int_helper(value, NULL);
- if (quorate != crm_have_quorum) {
- crm_notice("Membership %s: quorum %s", value, quorate ? "acquired" : "lost");
- crm_have_quorum = quorate;
+ if (quorate != crm_have_quorum) {
+ crm_notice("Membership %s: quorum %s", value, quorate ? "acquired" : "lost");
+ crm_have_quorum = quorate;
- } else {
- crm_info("Membership %s: quorum %s", value, quorate ? "retained" : "still lost");
- }
-
- for (node = __xml_first_child(xml); node != NULL; node = __xml_next(node)) {
- crm_update_ais_node(node, crm_peer_seq);
- }
+ } else {
+ crm_info("Membership %s: quorum %s", value, quorate ? "retained" : "still lost");
}
- }
- crm_trace("Payload: %s", data);
- if (dispatch != NULL) {
- dispatch(msg->header.id, msg->sender.uname, data);
+ for (member = __xml_first_child(xml); member != NULL; member = __xml_next(member)) {
+ const char *id_s = crm_element_value(member, "id");
+ const char *addr = crm_element_value(member, "addr");
+ const char *uname = crm_element_value(member, "uname");
+ const char *state = crm_element_value(member, "state");
+ const char *born_s = crm_element_value(member, "born");
+ const char *seen_s = crm_element_value(member, "seen");
+ const char *votes_s = crm_element_value(member, "votes");
+ const char *procs_s = crm_element_value(member, "processes");
+
+ int votes = crm_int_helper(votes_s, NULL);
+ unsigned int id = crm_int_helper(id_s, NULL);
+ unsigned int procs = crm_int_helper(procs_s, NULL);
+
+ /* TODO: These values will contain garbage if version < 0.7.1 */
+ uint64_t born = crm_int_helper(born_s, NULL);
+ uint64_t seen = crm_int_helper(seen_s, NULL);
+
+ crm_update_peer(__FUNCTION__, id, born, seen, votes, procs, uname, uname, addr, state);
+ }
}
-
- done:
- free(uncompressed);
- free_xml(xml);
- return TRUE;
-
- badmsg:
- crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
- " min=%d, total=%d, size=%d, bz2_size=%d",
- msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
- ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
- msg->sender.pid, (int)sizeof(AIS_Message),
- msg->header.size, msg->size, msg->compressed_size);
- goto done;
}
int
-ais_dispatch(gpointer user_data)
+plugin_dispatch(gpointer user_data)
{
int rc = CS_OK;
- gboolean good = TRUE;
-
- gboolean(*dispatch) (int kind, const char *from, const char *data) = user_data;
+ crm_cluster_t *cluster = (crm_cluster_t *) user_data;
do {
char *buffer = NULL;
@@ -743,20 +274,20 @@ ais_dispatch(gpointer user_data)
/* NULL is a legal "no message afterall" value */
return 0;
}
- good = ais_dispatch_message((AIS_Message *) buffer, dispatch);
+ /*
+ cpg_deliver_fn_t(cpg_handle_t handle, const struct cpg_name *group_name,
+ uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len);
+ */
+ cluster->cpg.cpg_deliver_fn(0, NULL, 0, 0, buffer, 0);
coroipcc_dispatch_put(ais_ipc_handle);
- } while (good && ais_ipc_handle);
-
- if (good) {
- return 0;
- }
+ } while (ais_ipc_handle);
- return -1;
+ return 0;
}
static void
-ais_destroy(gpointer user_data)
+plugin_destroy(gpointer user_data)
{
crm_err("AIS connection terminated");
ais_fd_sync = -1;
@@ -896,179 +427,6 @@ init_cman_connection(gboolean(*dispatch) (unsigned long long, gboolean), void (*
}
# ifdef SUPPORT_COROSYNC
-gboolean(*pcmk_cpg_dispatch_fn) (int kind, const char *from, const char *data) = NULL;
-static bool cpg_evicted = FALSE;
-
-static int
-pcmk_cpg_dispatch(gpointer user_data)
-{
- int rc = 0;
-
- pcmk_cpg_dispatch_fn = user_data;
- rc = cpg_dispatch(pcmk_cpg_handle, CS_DISPATCH_ALL);
- if (rc != CS_OK) {
- crm_err("Connection to the CPG API failed: %d", rc);
- pcmk_cpg_handle = 0;
- return -1;
-
- } else if(cpg_evicted) {
- crm_err("Evicted from CPG membership");
- return -1;
- }
- return 0;
-}
-
-static void
-pcmk_cpg_deliver(cpg_handle_t handle,
- const struct cpg_name *groupName,
- uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
-{
- AIS_Message *ais_msg = (AIS_Message *) msg;
- uint32_t local_nodeid = get_local_nodeid(handle);
- const char *local_name = get_local_node_name();
-
- if (ais_msg->sender.id > 0 && ais_msg->sender.id != nodeid) {
- crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, ais_msg->sender.id);
- return;
-
- } else if (ais_msg->host.id != 0 && (local_nodeid != ais_msg->host.id)) {
- /* Not for us */
- return;
-
- } else if (ais_msg->host.size != 0 && safe_str_neq(ais_msg->host.uname, local_name)) {
- /* Not for us */
- return;
- }
-
- ais_msg->sender.id = nodeid;
- if (ais_msg->sender.size == 0) {
- crm_node_t *peer = crm_get_peer(nodeid, NULL);
-
- if (peer == NULL) {
- crm_err("Peer with nodeid=%u is unknown", nodeid);
-
- } else if (peer->uname == NULL) {
- crm_err("No uname for peer with nodeid=%u", nodeid);
-
- } else {
- crm_notice("Fixing uname for peer with nodeid=%u", nodeid);
- ais_msg->sender.size = strlen(peer->uname);
- memset(ais_msg->sender.uname, 0, MAX_NAME);
- memcpy(ais_msg->sender.uname, peer->uname, ais_msg->sender.size);
- }
- }
-
- ais_dispatch_message(ais_msg, pcmk_cpg_dispatch_fn);
-}
-
-static void
-pcmk_cpg_membership(cpg_handle_t handle,
- const struct cpg_name *groupName,
- const struct cpg_address *member_list, size_t member_list_entries,
- const struct cpg_address *left_list, size_t left_list_entries,
- const struct cpg_address *joined_list, size_t joined_list_entries)
-{
- int i;
- gboolean found = FALSE;
- static int counter = 0;
- uint32_t local_nodeid = get_local_nodeid(handle);
-
- for (i = 0; i < left_list_entries; i++) {
- crm_node_t *peer = crm_get_peer(left_list[i].nodeid, NULL);
-
- crm_info("Left[%d.%d] %s.%u ", counter, i, groupName->value, left_list[i].nodeid);
- crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, OFFLINESTATUS);
- }
-
- for (i = 0; i < joined_list_entries; i++) {
- crm_info("Joined[%d.%d] %s.%u ", counter, i, groupName->value, joined_list[i].nodeid);
- }
-
- for (i = 0; i < member_list_entries; i++) {
- crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL);
-
- crm_info("Member[%d.%d] %s.%u ", counter, i, groupName->value, member_list[i].nodeid);
- crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS);
- if (local_nodeid == member_list[i].nodeid) {
- found = TRUE;
- }
- }
-
- if (!found) {
- crm_err("We're not part of CPG group %s anymore!", groupName->value);
- cpg_evicted = TRUE;
- }
-
- counter++;
-}
-
-cpg_callbacks_t cpg_callbacks = {
- .cpg_deliver_fn = pcmk_cpg_deliver,
- .cpg_confchg_fn = pcmk_cpg_membership,
-};
-# endif
-
-static gboolean
-init_cpg_connection(crm_cluster_t * cluster)
-{
-# ifdef SUPPORT_COROSYNC
- int rc = -1;
- int fd = 0;
- int retries = 0;
- crm_node_t *peer = NULL;
-
- struct mainloop_fd_callbacks cpg_fd_callbacks = {
- .dispatch = pcmk_cpg_dispatch,
- .destroy = cluster->destroy,
- };
-
- cpg_evicted = FALSE;
- strcpy(pcmk_cpg_group.value, crm_system_name);
- pcmk_cpg_group.length = strlen(crm_system_name) + 1;
-
- cs_repeat(retries, 30, rc = cpg_initialize(&pcmk_cpg_handle, &cpg_callbacks));
- if (rc != CS_OK) {
- crm_err("Could not connect to the Cluster Process Group API: %d\n", rc);
- goto bail;
- }
-
- retries = 0;
- cs_repeat(retries, 30, rc = cpg_local_get(pcmk_cpg_handle, (unsigned int *)&cluster->nodeid));
- if (rc != CS_OK) {
- crm_err("Could not get local node id from the CPG API");
- goto bail;
- }
-
- retries = 0;
- cs_repeat(retries, 30, rc = cpg_join(pcmk_cpg_handle, &pcmk_cpg_group));
- if (rc != CS_OK) {
- crm_err("Could not join the CPG group '%s': %d", crm_system_name, rc);
- goto bail;
- }
-
- rc = cpg_fd_get(pcmk_cpg_handle, &fd);
- if (rc != CS_OK) {
- crm_err("Could not obtain the CPG API connection: %d\n", rc);
- goto bail;
- }
-
- mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster->cs_dispatch, &cpg_fd_callbacks);
-
- bail:
- if (rc != CS_OK) {
- cpg_finalize(pcmk_cpg_handle);
- return FALSE;
- }
-
- peer = crm_get_peer(cluster->nodeid, NULL);
- crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS);
-
-# else
- crm_err("The Corosync CPG API is not supported in this build");
- crm_exit(DAEMON_RESPAWN_STOP);
-# endif
- return TRUE;
-}
gboolean
init_quorum_connection(gboolean(*dispatch) (unsigned long long, gboolean),
@@ -1086,9 +444,11 @@ init_cs_connection_classic(crm_cluster_t * cluster)
int pid = 0;
char *pid_s = NULL;
const char *name = NULL;
+ crm_node_t *peer = NULL;
+ enum crm_proc_flag proc = 0;
struct mainloop_fd_callbacks ais_fd_callbacks = {
- .dispatch = ais_dispatch,
+ .dispatch = plugin_dispatch,
.destroy = cluster->destroy,
};
@@ -1099,7 +459,7 @@ init_cs_connection_classic(crm_cluster_t * cluster)
if (ais_ipc_handle) {
coroipcc_fd_get(ais_ipc_handle, &ais_fd_async);
} else {
- crm_info("Connection to our AIS plugin (%d) failed: %s (%d)",
+ crm_info("Connection to our Corosync plugin (%d) failed: %s (%d)",
PCMK_SERVICE_ID, strerror(errno), errno);
return FALSE;
}
@@ -1108,7 +468,7 @@ init_cs_connection_classic(crm_cluster_t * cluster)
rc = CS_ERR_LIBRARY;
}
if (rc != CS_OK) {
- crm_info("Connection to our AIS plugin (%d) failed: %s (%d)", PCMK_SERVICE_ID,
+ crm_info("Connection to our Corosync plugin (%d) failed: %s (%d)", PCMK_SERVICE_ID,
ais_error2text(rc), rc);
}
@@ -1117,16 +477,15 @@ init_cs_connection_classic(crm_cluster_t * cluster)
}
if (ais_fd_callbacks.destroy == NULL) {
- ais_fd_callbacks.destroy = ais_destroy;
+ ais_fd_callbacks.destroy = plugin_destroy;
}
- mainloop_add_fd("corosync-plugin", G_PRIORITY_MEDIUM, ais_fd_async, cluster->cs_dispatch,
- &ais_fd_callbacks);
+ mainloop_add_fd("corosync-plugin", G_PRIORITY_MEDIUM, ais_fd_async, cluster, &ais_fd_callbacks);
crm_info("AIS connection established");
pid = getpid();
pid_s = crm_itoa(pid);
- send_ais_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais);
+ send_cluster_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais);
free(pid_s);
cluster->nodeid = get_local_nodeid(0);
@@ -1141,6 +500,9 @@ init_cs_connection_classic(crm_cluster_t * cluster)
crm_exit(ENOTUNIQ);
}
+ proc = text2proc(crm_system_name);
+ peer = crm_get_peer(cluster->nodeid, cluster->uname);
+ crm_update_peer_proc(__FUNCTION__, peer, proc|crm_proc_plugin, ONLINESTATUS);
return TRUE;
}
@@ -1275,7 +637,7 @@ init_cs_connection_once(crm_cluster_t * cluster)
}
break;
case pcmk_cluster_cman:
- if (init_cpg_connection(cluster) == FALSE) {
+ if (cluster_connect_cpg(cluster) == FALSE) {
return FALSE;
}
cluster->uname = cman_node_name(0 /* CMAN_NODEID_US */ );
diff --git a/lib/cluster/membership.c b/lib/cluster/membership.c
index a1e044c..875c1c8 100644
--- a/lib/cluster/membership.c
+++ b/lib/cluster/membership.c
@@ -125,7 +125,7 @@ crm_active_peers(void)
return count;
}
-void
+static void
destroy_crm_node(gpointer data)
{
crm_node_t *node = data;
@@ -143,14 +143,6 @@ destroy_crm_node(gpointer data)
void
crm_peer_init(void)
{
- static gboolean initialized = FALSE;
-
- if (initialized) {
- return;
- }
- initialized = TRUE;
-
- crm_peer_destroy();
if (crm_peer_cache == NULL) {
crm_peer_cache = g_hash_table_new_full(crm_str_hash, g_str_equal, free, destroy_crm_node);
}
diff --git a/lib/common/logging.c b/lib/common/logging.c
index a1b01f2..155a068 100644
--- a/lib/common/logging.c
+++ b/lib/common/logging.c
@@ -95,6 +95,10 @@ crm_glib_handler(const gchar * log_domain, GLogLevelFlags flags, const gchar * m
static void
crm_trigger_blackbox(int nsig)
{
+ if(nsig == SIGTRAP) {
+ /* Turn it on if it wasn't already */
+ crm_enable_blackbox(nsig);
+ }
crm_write_blackbox(nsig, NULL);
}
@@ -344,15 +348,6 @@ crm_enable_blackbox(int nsig)
crm_update_callsites();
- /* Original meanings from signal(7)
- *
- * Signal Value Action Comment
- * SIGTRAP 5 Core Trace/breakpoint trap
- *
- * Our usage is as similar as possible
- */
- mainloop_add_signal(SIGTRAP, crm_trigger_blackbox);
-
blackbox_trigger = qb_log_custom_open(blackbox_logger, NULL, NULL, NULL);
qb_log_ctl(blackbox_trigger, QB_LOG_CONF_ENABLED, QB_TRUE);
crm_trace("Trigger: %d is %d %d", blackbox_trigger,
@@ -762,7 +757,17 @@ crm_log_init(const char *entity, int level, gboolean daemon, gboolean to_stderr,
}
#endif
}
+
+ /* Original meanings from signal(7)
+ *
+ * Signal Value Action Comment
+ * SIGTRAP 5 Core Trace/breakpoint trap
+ * SIGUSR1 30,10,16 Term User-defined signal 1
+ *
+ * Our usage is as similar as possible
+ */
mainloop_add_signal(SIGUSR1, crm_enable_blackbox);
+ mainloop_add_signal(SIGTRAP, crm_trigger_blackbox);
}
crm_xml_init(); /* Sets buffer allocation strategy */
diff --git a/lib/services/systemd.c b/lib/services/systemd.c
index 886cb35..2a66da5 100644
--- a/lib/services/systemd.c
+++ b/lib/services/systemd.c
@@ -407,6 +407,8 @@ systemd_unit_exec_done(GObject * source_object, GAsyncResult * res, gpointer use
}
}
+#define SYSTEMD_OVERRIDE_ROOT "/run/systemd/system/"
+
gboolean
systemd_unit_exec(svc_action_t * op, gboolean synchronous)
{
@@ -453,9 +455,42 @@ systemd_unit_exec(svc_action_t * op, gboolean synchronous)
goto cleanup;
} else if (g_strcmp0(action, "start") == 0) {
+ FILE *file_strm = NULL;
+ char *override_dir = g_strdup_printf("%s/%s", SYSTEMD_OVERRIDE_ROOT, unit);
+ char *override_file = g_strdup_printf("%s/50-pacemaker.conf", override_dir);
+
action = "StartUnit";
+ crm_build_path(override_dir, 0755);
+
+ file_strm = fopen(override_file, "w");
+ if (file_strm != NULL) {
+ int rc = fprintf(file_strm, "[Service]\nRestart=no");
+ if (rc < 0) {
+ crm_perror(LOG_ERR, "Cannot write to systemd override file %s: %s (%d)", override_file, pcmk_strerror(errno), errno);
+ }
+
+ } else {
+ crm_err("Cannot open systemd override file %s for writing: %s (%d)", override_file, pcmk_strerror(errno), errno);
+ }
+
+ if (file_strm != NULL) {
+ fflush(file_strm);
+ fclose(file_strm);
+ }
+ systemd_daemon_reload(systemd_proxy, &error);
+ g_error_free(error); error = NULL;
+ free(override_file);
+ free(override_dir);
+
} else if (g_strcmp0(action, "stop") == 0) {
+ char *override_file = g_strdup_printf("%s/%s/50-pacemaker.conf", SYSTEMD_OVERRIDE_ROOT, unit);
+
action = "StopUnit";
+ unlink(override_file);
+ free(override_file);
+ systemd_daemon_reload(systemd_proxy, &error);
+ g_error_free(error); error = NULL;
+
} else if (g_strcmp0(action, "restart") == 0) {
action = "RestartUnit";
} else {
diff --git a/lrmd/Makefile.am b/lrmd/Makefile.am
index 73f1d7e..82cb65f 100644
--- a/lrmd/Makefile.am
+++ b/lrmd/Makefile.am
@@ -27,7 +27,7 @@ initdir = $(INITDIR)
init_SCRIPTS = pacemaker_remote
sbin_PROGRAMS = pacemaker_remoted
-if HAVE_SYSTEMD
+if BUILD_SYSTEMD
systemdunit_DATA = pacemaker_remote.service
endif
diff --git a/mcp/Makefile.am b/mcp/Makefile.am
index 73a71c4..f98f286 100644
--- a/mcp/Makefile.am
+++ b/mcp/Makefile.am
@@ -29,7 +29,7 @@ if BUILD_HELP
man8_MANS = $(sbin_PROGRAMS:%=%.8)
endif
-if HAVE_SYSTEMD
+if BUILD_SYSTEMD
systemdunit_DATA = pacemaker.service
endif
diff --git a/mcp/corosync.c b/mcp/corosync.c
index 64d6eb5..ca37871 100644
--- a/mcp/corosync.c
+++ b/mcp/corosync.c
@@ -43,13 +43,7 @@
# include <corosync/cmap.h>
#endif
-static struct cpg_name cpg_group = {
- .length = 0,
- .value[0] = 0,
-};
-
enum cluster_type_e stack = pcmk_cluster_unknown;
-static cpg_handle_t cpg_handle;
static corosync_cfg_handle_t cfg_handle;
/* =::=::=::= CFG - Shutdown stuff =::=::=::= */
@@ -155,169 +149,6 @@ cluster_connect_cfg(uint32_t * nodeid)
return FALSE;
}
-/* =::=::=::= CPG - Closed Process Group Messaging =::=::=::= */
-
-static int
-pcmk_cpg_dispatch(gpointer user_data)
-{
- cpg_handle_t *handle = (cpg_handle_t *) user_data;
- cs_error_t rc = cpg_dispatch(*handle, CS_DISPATCH_ALL);
-
- if (rc != CS_OK) {
- return -1;
- }
- return 0;
-}
-
-static void
-cpg_connection_destroy(gpointer user_data)
-{
- crm_err("Connection destroyed");
- cpg_handle = 0;
- crm_exit(ENOTCONN);
-}
-
-static void
-pcmk_cpg_deliver(cpg_handle_t handle,
- const struct cpg_name *groupName,
- uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
-{
- if (nodeid != local_nodeid) {
- uint32_t procs = 0;
- xmlNode *xml = string2xml(msg);
- const char *uname = crm_element_value(xml, "uname");
-
- crm_element_value_int(xml, "proclist", (int *)&procs);
- /* crm_debug("Got proclist %.32x from %s", procs, uname); */
- if (update_node_processes(nodeid, uname, procs)) {
- update_process_clients();
- }
- }
-}
-
-static void
-pcmk_cpg_membership(cpg_handle_t handle,
- const struct cpg_name *groupName,
- const struct cpg_address *member_list, size_t member_list_entries,
- const struct cpg_address *left_list, size_t left_list_entries,
- const struct cpg_address *joined_list, size_t joined_list_entries)
-{
- /* Don't care about CPG membership */
- update_process_peers();
-}
-
-cpg_callbacks_t cpg_callbacks = {
- .cpg_deliver_fn = pcmk_cpg_deliver,
- .cpg_confchg_fn = pcmk_cpg_membership,
-};
-
-gboolean
-cluster_disconnect_cpg(void)
-{
- if (cpg_handle) {
- cpg_finalize(cpg_handle);
- cpg_handle = 0;
- }
- return TRUE;
-}
-
-gboolean
-cluster_connect_cpg(void)
-{
- cs_error_t rc;
- unsigned int nodeid;
- int fd;
- int retries = 0;
-
- static struct mainloop_fd_callbacks cpg_fd_callbacks = {
- .dispatch = pcmk_cpg_dispatch,
- .destroy = cpg_connection_destroy,
- };
-
- strcpy(cpg_group.value, "pcmk");
- cpg_group.length = strlen(cpg_group.value) + 1;
-
- retries = 0;
- cs_repeat(retries, 30, rc = cpg_initialize(&cpg_handle, &cpg_callbacks));
- if (rc != CS_OK) {
- crm_err("corosync cpg init error %d", rc);
- return FALSE;
- }
-
- rc = cpg_fd_get(cpg_handle, &fd);
- if (rc != CS_OK) {
- crm_err("corosync cpg fd_get error %d", rc);
- goto bail;
- }
-
- retries = 0;
- cs_repeat(retries, 30, rc = cpg_local_get(cpg_handle, &nodeid));
- if (rc != CS_OK) {
- crm_err("corosync cpg local_get error %d", rc);
- goto bail;
- }
-
- crm_debug("Our nodeid: %d", nodeid);
-
- retries = 0;
- cs_repeat(retries, 30, rc = cpg_join(cpg_handle, &cpg_group));
-
- if (rc != CS_OK) {
- crm_err("Could not join the CPG group '%s': %d", crm_system_name, rc);
- goto bail;
- }
-
- mainloop_add_fd("corosync-cpg", G_PRIORITY_DEFAULT, fd, &cpg_handle, &cpg_fd_callbacks);
- return TRUE;
-
- bail:
- cpg_finalize(cpg_handle);
- return FALSE;
-}
-
-gboolean
-send_cpg_message(struct iovec * iov)
-{
- int rc = CS_OK;
- int retries = 0;
-
- errno = 0;
-
- do {
- rc = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, iov, 1);
- if (rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) {
- cpg_flow_control_state_t fc_state = CPG_FLOW_CONTROL_DISABLED;
- int rc2 = cpg_flow_control_state_get(cpg_handle, &fc_state);
-
- if (rc2 == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) {
- crm_debug("Attempting to clear cpg dispatch queue");
- rc2 = cpg_dispatch(cpg_handle, CS_DISPATCH_ALL);
- }
-
- if (rc2 != CS_OK) {
- crm_warn("Could not check/clear the cpg connection");
- goto bail;
-
- } else {
- retries++;
- crm_debug("Retrying operation after %ds", retries);
- sleep(retries);
- }
- } else {
- break;
- }
-
- /* 5 retires is plenty, we'll resend once the membership reforms anyway */
- } while (retries < 5);
-
- bail:
- if (rc != CS_OK) {
- crm_err("Sending message via cpg FAILED: (rc=%d) %s", rc, ais_error2text(rc));
- }
-
- return (rc == CS_OK);
-}
-
/* =::=::=::= Configuration =::=::=::= */
#if HAVE_CONFDB
static int
@@ -447,7 +278,7 @@ read_config(void)
#if HAVE_CONFDB
char *value = NULL;
- confdb_handle_t config;
+ confdb_handle_t config = 0;
confdb_handle_t top_handle = 0;
hdb_handle_t local_handle;
static confdb_callbacks_t callbacks = { };
@@ -456,7 +287,8 @@ read_config(void)
rc = confdb_initialize(&config, &callbacks);
if (rc != CS_OK) {
retries++;
- printf("Connection setup failed: %d. Retrying in %ds\n", rc, retries);
+ printf("confdb connection setup failed: %s. Retrying in %ds\n", ais_error2text(rc), retries);
+ crm_info("confdb connection setup failed: %s. Retrying in %ds", ais_error2text(rc), retries);
sleep(retries);
} else {
@@ -473,8 +305,8 @@ read_config(void)
rc = cmap_initialize(&local_handle);
if (rc != CS_OK) {
retries++;
- printf("API connection setup failed: %s. Retrying in %ds\n", cs_strerror(rc), retries);
- crm_info("API connection setup failed: %s. Retrying in %ds", cs_strerror(rc), retries);
+ printf("cmap connection setup failed: %s. Retrying in %ds\n", cs_strerror(rc), retries);
+ crm_info("cmap connection setup failed: %s. Retrying in %ds", cs_strerror(rc), retries);
sleep(retries);
} else {
diff --git a/mcp/pacemaker.c b/mcp/pacemaker.c
index 47fdd68..6f8d9b9 100644
--- a/mcp/pacemaker.c
+++ b/mcp/pacemaker.c
@@ -29,6 +29,7 @@
#include <crm/msg_xml.h>
#include <crm/common/ipcs.h>
#include <crm/common/mainloop.h>
+#include <crm/cluster/internal.h>
#include <crm/cluster.h>
#include <dirent.h>
@@ -44,22 +45,6 @@ uint32_t local_nodeid = 0;
crm_trigger_t *shutdown_trigger = NULL;
const char *pid_file = "/var/run/pacemaker.pid";
-/* *INDENT-OFF* */
-enum crm_proc_flag {
- crm_proc_none = 0x00000001,
- crm_proc_plugin = 0x00000002,
- crm_proc_lrmd = 0x00000010,
- crm_proc_cib = 0x00000100,
- crm_proc_crmd = 0x00000200,
- crm_proc_attrd = 0x00001000,
- crm_proc_stonithd = 0x00002000,
- crm_proc_pe = 0x00010000,
- crm_proc_te = 0x00020000,
- crm_proc_mgmtd = 0x00040000,
- crm_proc_stonith_ng = 0x00100000,
-};
-/* *INDENT-ON* */
-
typedef struct pcmk_child_s {
int pid;
long flag;
@@ -539,8 +524,10 @@ update_process_clients(void)
void
update_process_peers(void)
{
+ /* Do nothing for corosync-2 based clusters */
+
char buffer[1024];
- struct iovec iov;
+ struct iovec *iov;
int rc = 0;
memset(buffer, 0, SIZEOF(buffer));
@@ -552,11 +539,11 @@ update_process_peers(void)
rc = snprintf(buffer, SIZEOF(buffer) - 1, "<node proclist=\"%u\"/>", get_process_list());
}
- iov.iov_base = buffer;
- iov.iov_len = rc + 1;
-
crm_trace("Sending %s", buffer);
- send_cpg_message(&iov);
+ iov = calloc(1, sizeof(struct iovec));
+ iov->iov_base = strdup(buffer);
+ iov->iov_len = rc + 1;
+ send_cpg_iov(iov);
}
gboolean
@@ -619,6 +606,7 @@ update_node_processes(uint32_t id, const char *uname, uint32_t procs)
return changed;
}
+
/* *INDENT-OFF* */
static struct crm_option long_options[] = {
/* Top-level Options */
@@ -779,6 +767,42 @@ init_children_processes(void)
}
}
+static void
+mcp_cpg_destroy(gpointer user_data)
+{
+ crm_err("Connection destroyed");
+ crm_exit(ENOTCONN);
+}
+
+static void
+mcp_cpg_deliver(cpg_handle_t handle,
+ const struct cpg_name *groupName,
+ uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
+{
+ if (nodeid != local_nodeid) {
+ uint32_t procs = 0;
+ xmlNode *xml = string2xml(msg);
+ const char *uname = crm_element_value(xml, "uname");
+
+ crm_element_value_int(xml, "proclist", (int *)&procs);
+ /* crm_debug("Got proclist %.32x from %s", procs, uname); */
+ if (update_node_processes(nodeid, uname, procs)) {
+ update_process_clients();
+ }
+ }
+}
+
+static void
+mcp_cpg_membership(cpg_handle_t handle,
+ const struct cpg_name *groupName,
+ const struct cpg_address *member_list, size_t member_list_entries,
+ const struct cpg_address *left_list, size_t left_list_entries,
+ const struct cpg_address *joined_list, size_t joined_list_entries)
+{
+ /* Don't care about CPG membership, but we do want to broadcast our own presence */
+ update_process_peers();
+}
+
int
main(int argc, char **argv)
{
@@ -795,6 +819,7 @@ main(int argc, char **argv)
crm_ipc_t *old_instance = NULL;
qb_ipcs_service_t *ipcs = NULL;
const char *facility = daemon_option("logfacility");
+ static crm_cluster_t cluster;
setenv("LC_ALL", "C", 1);
setenv("HA_LOGD", "no", 1);
@@ -951,12 +976,17 @@ main(int argc, char **argv)
crm_exit(EIO);
}
+ /* Allows us to block shutdown */
if (cluster_connect_cfg(&local_nodeid) == FALSE) {
crm_err("Couldn't connect to Corosync's CFG service");
crm_exit(ENOPROTOOPT);
}
- if (cluster_connect_cpg() == FALSE) {
+ cluster.destroy = mcp_cpg_destroy;
+ cluster.cpg.cpg_deliver_fn = mcp_cpg_deliver;
+ cluster.cpg.cpg_confchg_fn = mcp_cpg_membership;
+
+ if(cluster_connect_cpg(&cluster) == FALSE) {
crm_err("Couldn't connect to Corosync's CPG service");
crm_exit(ENOPROTOOPT);
}
@@ -982,7 +1012,7 @@ main(int argc, char **argv)
g_main_destroy(mainloop);
- cluster_disconnect_cpg();
+ cluster_disconnect_cpg(&cluster);
cluster_disconnect_cfg();
crm_info("Exiting %s", crm_system_name);
diff --git a/mcp/pacemaker.h b/mcp/pacemaker.h
index 224df93..8967966 100644
--- a/mcp/pacemaker.h
+++ b/mcp/pacemaker.h
@@ -41,20 +41,16 @@ typedef struct pcmk_peer_s {
char *uname;
} pcmk_peer_t;
-extern gboolean read_config(void);
+gboolean read_config(void);
-extern gboolean cluster_connect_cfg(uint32_t * nodeid);
-extern gboolean cluster_disconnect_cfg(void);
+gboolean cluster_connect_cfg(uint32_t * nodeid);
+gboolean cluster_disconnect_cfg(void);
-extern gboolean cluster_connect_cpg(void);
-extern gboolean cluster_disconnect_cpg(void);
-extern gboolean send_cpg_message(struct iovec *iov);
+void update_process_clients(void);
+void update_process_peers(void);
+gboolean update_node_processes(uint32_t node, const char *uname, uint32_t procs);
-extern void update_process_clients(void);
-extern void update_process_peers(void);
-extern gboolean update_node_processes(uint32_t node, const char *uname, uint32_t procs);
+void enable_mgmtd(gboolean enable);
+void enable_crmd_as_root(gboolean enable);
-extern void enable_mgmtd(gboolean enable);
-extern void enable_crmd_as_root(gboolean enable);
-
-extern void pcmk_shutdown(int nsig);
+void pcmk_shutdown(int nsig);
diff --git a/mcp/pacemaker.in b/mcp/pacemaker.in
index a6647fe..c96f1d1 100644
--- a/mcp/pacemaker.in
+++ b/mcp/pacemaker.in
@@ -111,6 +111,7 @@ cman_pre_start()
pid=$(pidof corosync 2>/dev/null)
if [ $? -ne 0 ]; then
service cman start
+ sleep 2
fi
}
diff --git a/tools/attrd.c b/tools/attrd.c
index 1e834ea..2d485f9 100644
--- a/tools/attrd.c
+++ b/tools/attrd.c
@@ -325,11 +325,19 @@ attrd_ha_callback(HA_Message * msg, void *private_data)
#endif
#if SUPPORT_COROSYNC
-static gboolean
-attrd_ais_dispatch(int kind, const char *from, const char *data)
+static void
+attrd_cs_dispatch(cpg_handle_t handle,
+ const struct cpg_name *groupName,
+ uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
{
+ uint32_t kind = 0;
xmlNode *xml = NULL;
+ const char *from = NULL;
+ char *data = pcmk_message_common_cs(handle, nodeid, pid, msg, &kind, &from);
+ if(data == NULL) {
+ return;
+ }
if (kind == crm_class_cluster) {
xml = string2xml(data);
if (xml == NULL) {
@@ -360,11 +368,11 @@ attrd_ais_dispatch(int kind, const char *from, const char *data)
free_xml(xml);
}
- return TRUE;
+ free(data);
}
static void
-attrd_ais_destroy(gpointer unused)
+attrd_cs_destroy(gpointer unused)
{
if (need_shutdown) {
/* we signed out, so this is expected */
@@ -405,7 +413,7 @@ update_for_hash_entry(gpointer key, gpointer value, gpointer user_data)
{
attr_hash_entry_t *entry = value;
- if (entry->value != NULL) {
+ if (entry->value != NULL || entry->stored_value != NULL) {
attrd_timer_callback(value);
}
}
@@ -537,8 +545,9 @@ main(int argc, char **argv)
#if SUPPORT_COROSYNC
if (is_openais_cluster()) {
- cluster.destroy = attrd_ais_destroy;
- cluster.cs_dispatch = attrd_ais_dispatch;
+ cluster.destroy = attrd_cs_destroy;
+ cluster.cpg.cpg_deliver_fn = attrd_cs_dispatch;
+ cluster.cpg.cpg_confchg_fn = pcmk_cpg_membership;
}
#endif
diff --git a/tools/crm_node.c b/tools/crm_node.c
index a25b3b4..aacea76 100644
--- a/tools/crm_node.c
+++ b/tools/crm_node.c
@@ -500,16 +500,23 @@ crm_add_member(gpointer key, gpointer value, gpointer user_data)
}
}
-static gboolean
-ais_membership_dispatch(int kind, const char *from, const char *data)
+static void
+ais_membership_dispatch(cpg_handle_t handle,
+ const struct cpg_name *groupName,
+ uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
{
+ uint32_t kind = 0;
+ const char *from = NULL;
+ char *data = pcmk_message_common_cs(handle, nodeid, pid, msg, &kind, &from);
+
switch (kind) {
case crm_class_members:
case crm_class_notify:
case crm_class_quorum:
break;
default:
- return TRUE;
+ free(data);
+ return;
break;
}
@@ -548,9 +555,10 @@ ais_membership_dispatch(int kind, const char *from, const char *data)
fprintf(stdout, "\n");
}
+ free(data);
crm_exit(pcmk_ok);
- return TRUE;
+ return;
}
#endif
@@ -695,7 +703,8 @@ try_openais(int command, enum cluster_type_e stack)
static crm_cluster_t cluster;
cluster.destroy = ais_membership_destroy;
- cluster.cs_dispatch = ais_membership_dispatch;
+ cluster.cpg.cpg_deliver_fn = ais_membership_dispatch;
+ cluster.cpg.cpg_confchg_fn = NULL;
if (init_cs_connection_once(&cluster)) {
@@ -703,7 +712,7 @@ try_openais(int command, enum cluster_type_e stack)
switch (command) {
case 'R':
- send_ais_text(crm_class_rmpeer, target_uname, TRUE, NULL, crm_msg_ais);
+ send_cluster_text(crm_class_rmpeer, target_uname, TRUE, NULL, crm_msg_ais);
cib_remove_node(0, target_uname);
crm_exit(pcmk_ok);
@@ -713,13 +722,13 @@ try_openais(int command, enum cluster_type_e stack)
crm_exit(pcmk_ok);
case 'q':
- send_ais_text(crm_class_quorum, NULL, TRUE, NULL, crm_msg_ais);
+ send_cluster_text(crm_class_quorum, NULL, TRUE, NULL, crm_msg_ais);
break;
case 'l':
case 'p':
crm_info("Requesting the list of configured nodes");
- send_ais_text(crm_class_members, __FUNCTION__, TRUE, NULL, crm_msg_ais);
+ send_cluster_text(crm_class_members, __FUNCTION__, TRUE, NULL, crm_msg_ais);
break;
case 'i':