libqb/IPC-avoid-temporary-channel-priority-loss.patch
Jan Pokorný 9825519903
1.0.5-2 - Fix temporary channel priority loss, up to deadlock-worth
(upstream patchset https://github.com/ClusterLabs/libqb/pull/354)

Signed-off-by: Jan Pokorný <jpokorny@redhat.com>
2019-06-12 16:44:33 +02:00

1771 lines
53 KiB
Diff

From 039bd76882984ebada5f3e3801a5d6e51d74f172 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jan=20Pokorn=C3=BD?= <jpokorny@redhat.com>
Date: Tue, 11 Jun 2019 15:02:59 +0200
Subject: [PATCH 1/8] tests: ipc: avoid problems when UNIX_PATH_MAX (108)
limits is hit
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
There's some slight reserve for when bigger PID ranges are in use.
The method to yield the limit on prefix string was derived from
practical experience (rather than based on exact calculations).
Signed-off-by: Jan Pokorný <jpokorny@redhat.com>
---
tests/check_ipc.c | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/tests/check_ipc.c b/tests/check_ipc.c
index 71b3a7f..2231efe 100644
--- a/tests/check_ipc.c
+++ b/tests/check_ipc.c
@@ -140,7 +140,7 @@ set_ipc_name(const char *prefix)
t_sec[sizeof(t_sec) - 1] = '\0';
}
- snprintf(ipc_name, sizeof(ipc_name), "%s%s%lX%.4x", prefix, t_sec,
+ snprintf(ipc_name, sizeof(ipc_name), "%.44s%s%lX%.4x", prefix, t_sec,
(unsigned long)getpid(), (unsigned) ((long) time(NULL) % (0x10000)));
}
--
2.22.0.rc3
From 1dc71575dad1c8f62540eaaf4b38ecb2e3ab9065 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jan=20Pokorn=C3=BD?= <jpokorny@redhat.com>
Date: Tue, 11 Jun 2019 15:14:16 +0200
Subject: [PATCH 2/8] tests: ipc: speed the suite up with avoiding expendable
sleep(3)s
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Using i7-6820HQ CPU yields these results:
Before: ~2:54
After: ~2:26
Speedup: ~16%
The main optimization lies in how run_function_in_new_process helper is
constructed, since now, there's an actual synchronization between the
parent and its child (that needs to be prioritized here, which is
furthermore help with making the parent immediately give up it's
processor possession) after the fork, so that a subsequent sleep is
completely omitted -- at worst (unlikely), additional sleep round(s)
will need to be undertaken as already arranged for (and now, just
400 ms is waited rather than excessive 1 second).
Another slight optimization is likewise in omission of sleep where
the control gets returned to once the waited for process has been
suceesfully examined post-mortem, without worries it's previous
life is still resounding.
Signed-off-by: Jan Pokorný <jpokorny@redhat.com>
---
tests/check_ipc.c | 161 ++++++++++++++++++++++++++++------------------
1 file changed, 99 insertions(+), 62 deletions(-)
diff --git a/tests/check_ipc.c b/tests/check_ipc.c
index 2231efe..37ef74d 100644
--- a/tests/check_ipc.c
+++ b/tests/check_ipc.c
@@ -395,8 +395,30 @@ s1_connection_created(qb_ipcs_connection_t *c)
}
-static void
-run_ipc_server(void)
+static volatile sig_atomic_t usr1_bit;
+
+static void usr1_bit_setter(int signal) {
+ if (signal == SIGUSR1) {
+ usr1_bit = 1;
+ }
+}
+
+#define READY_SIGNALLER(name, data_arg) void (name)(void *data_arg)
+typedef READY_SIGNALLER(ready_signaller_fn, );
+
+static
+READY_SIGNALLER(usr1_signaller, parent_target)
+{
+ kill(*((pid_t *) parent_target), SIGUSR1);
+}
+
+#define NEW_PROCESS_RUNNER(name, ready_signaller_arg, signaller_data_arg) \
+ void (name)(ready_signaller_fn ready_signaller_arg, \
+ void *signaller_data_arg)
+typedef NEW_PROCESS_RUNNER(new_process_runner_fn, , );
+
+static
+NEW_PROCESS_RUNNER(run_ipc_server, ready_signaller, signaller_data)
{
int32_t res;
qb_loop_signal_handle handle;
@@ -419,7 +441,7 @@ run_ipc_server(void)
my_loop = qb_loop_create();
qb_loop_signal_add(my_loop, QB_LOOP_HIGH, SIGTERM,
- NULL, exit_handler, &handle);
+ NULL, exit_handler, &handle);
s1 = qb_ipcs_create(ipc_name, 4, ipc_type, &sh);
@@ -433,14 +455,35 @@ run_ipc_server(void)
res = qb_ipcs_run(s1);
ck_assert_int_eq(res, 0);
+ if (ready_signaller != NULL) {
+ ready_signaller(signaller_data);
+ }
+
qb_loop_run(my_loop);
qb_log(LOG_DEBUG, "loop finished - done ...");
}
static pid_t
-run_function_in_new_process(void (*run_ipc_server_fn)(void))
+run_function_in_new_process(new_process_runner_fn new_process_runner)
{
- pid_t pid = fork ();
+ pid_t parent_target, pid;
+
+ struct sigaction orig_sa, purpose_sa;
+ sigset_t orig_mask, purpose_mask, purpose_clear_mask;
+
+ sigemptyset(&purpose_mask);
+ sigaddset(&purpose_mask, SIGUSR1);
+
+ sigprocmask(SIG_BLOCK, &purpose_mask, &orig_mask);
+ purpose_clear_mask = orig_mask;
+ sigdelset(&purpose_clear_mask, SIGUSR1);
+
+ purpose_sa.sa_handler = usr1_bit_setter;
+ purpose_sa.sa_mask = purpose_mask;
+ purpose_sa.sa_flags = SA_RESTART;
+
+ parent_target = getpid();
+ pid = fork();
if (pid == -1) {
fprintf (stderr, "Can't fork\n");
@@ -448,9 +491,24 @@ run_function_in_new_process(void (*run_ipc_server_fn)(void))
}
if (pid == 0) {
- run_ipc_server_fn();
+ sigprocmask(SIG_SETMASK, &orig_mask, NULL);
+ new_process_runner(usr1_signaller, &parent_target);
exit(0);
}
+
+ usr1_bit = 0;
+ /* XXX assume never fails */
+ sigaction(SIGUSR1, &purpose_sa, &orig_sa);
+
+ do {
+ /* XXX assume never fails with EFAULT */
+ sigsuspend(&purpose_clear_mask);
+ } while (usr1_bit != 1);
+ usr1_bit = 0;
+ sigprocmask(SIG_SETMASK, &orig_mask, NULL);
+ /* give children a slight/non-strict scheduling advantage */
+ sched_yield();
+
return pid;
}
@@ -600,14 +658,13 @@ test_ipc_txrx_timeout(void)
pid = run_function_in_new_process(run_ipc_server);
fail_if(pid == -1);
- sleep(1);
do {
conn = qb_ipcc_connect(ipc_name, max_size);
if (conn == NULL) {
j = waitpid(pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
- sleep(1);
+ poll(NULL, 0, 400);
c++;
}
} while (conn == NULL && c < 5);
@@ -631,11 +688,6 @@ test_ipc_txrx_timeout(void)
request_server_exit();
verify_graceful_stop(pid);
- /*
- * wait a bit for the server to die.
- */
- sleep(1);
-
/*
* this needs to free up the shared mem
*/
@@ -654,14 +706,13 @@ test_ipc_txrx(void)
pid = run_function_in_new_process(run_ipc_server);
fail_if(pid == -1);
- sleep(1);
do {
conn = qb_ipcc_connect(ipc_name, max_size);
if (conn == NULL) {
j = waitpid(pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
- sleep(1);
+ poll(NULL, 0, 400);
c++;
}
} while (conn == NULL && c < 5);
@@ -705,14 +756,13 @@ test_ipc_exit(void)
pid = run_function_in_new_process(run_ipc_server);
fail_if(pid == -1);
- sleep(1);
do {
conn = qb_ipcc_connect(ipc_name, max_size);
if (conn == NULL) {
j = waitpid(pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
- sleep(1);
+ poll(NULL, 0, 400);
c++;
}
} while (conn == NULL && c < 5);
@@ -732,11 +782,6 @@ test_ipc_exit(void)
request_server_exit();
verify_graceful_stop(pid);
- /*
- * wait a bit for the server to die.
- */
- sleep(1);
-
/*
* this needs to free up the shared mem
*/
@@ -873,14 +918,13 @@ test_ipc_dispatch(void)
pid = run_function_in_new_process(run_ipc_server);
fail_if(pid == -1);
- sleep(1);
do {
conn = qb_ipcc_connect(ipc_name, max_size);
if (conn == NULL) {
j = waitpid(pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
- sleep(1);
+ poll(NULL, 0, 400);
c++;
}
} while (conn == NULL && c < 5);
@@ -1001,7 +1045,6 @@ test_ipc_stress_connections(void)
pid = run_function_in_new_process(run_ipc_server);
fail_if(pid == -1);
- sleep(1);
for (connections = 1; connections < 70000; connections++) {
if (conn) {
@@ -1049,14 +1092,13 @@ test_ipc_bulk_events(void)
pid = run_function_in_new_process(run_ipc_server);
fail_if(pid == -1);
- sleep(1);
do {
conn = qb_ipcc_connect(ipc_name, max_size);
if (conn == NULL) {
j = waitpid(pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
- sleep(1);
+ poll(NULL, 0, 400);
c++;
}
} while (conn == NULL && c < 5);
@@ -1115,14 +1157,13 @@ test_ipc_stress_test(void)
pid = run_function_in_new_process(run_ipc_server);
enforce_server_buffer = 0;
fail_if(pid == -1);
- sleep(1);
do {
conn = qb_ipcc_connect(ipc_name, client_buf_size);
if (conn == NULL) {
j = waitpid(pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
- sleep(1);
+ poll(NULL, 0, 400);
c++;
}
} while (conn == NULL && c < 5);
@@ -1217,14 +1258,13 @@ test_ipc_event_on_created(void)
pid = run_function_in_new_process(run_ipc_server);
fail_if(pid == -1);
- sleep(1);
do {
conn = qb_ipcc_connect(ipc_name, max_size);
if (conn == NULL) {
j = waitpid(pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
- sleep(1);
+ poll(NULL, 0, 400);
c++;
}
} while (conn == NULL && c < 5);
@@ -1272,14 +1312,13 @@ test_ipc_disconnect_after_created(void)
pid = run_function_in_new_process(run_ipc_server);
fail_if(pid == -1);
- sleep(1);
do {
conn = qb_ipcc_connect(ipc_name, max_size);
if (conn == NULL) {
j = waitpid(pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
- sleep(1);
+ poll(NULL, 0, 400);
c++;
}
} while (conn == NULL && c < 5);
@@ -1330,14 +1369,13 @@ test_ipc_server_fail(void)
pid = run_function_in_new_process(run_ipc_server);
fail_if(pid == -1);
- sleep(1);
do {
conn = qb_ipcc_connect(ipc_name, max_size);
if (conn == NULL) {
j = waitpid(pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
- sleep(1);
+ poll(NULL, 0, 400);
c++;
}
} while (conn == NULL && c < 5);
@@ -1460,14 +1498,13 @@ test_ipc_service_ref_count(void)
pid = run_function_in_new_process(run_ipc_server);
fail_if(pid == -1);
- sleep(1);
do {
conn = qb_ipcc_connect(ipc_name, max_size);
if (conn == NULL) {
j = waitpid(pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
- sleep(1);
+ poll(NULL, 0, 400);
c++;
}
} while (conn == NULL && c < 5);
@@ -1551,18 +1588,18 @@ make_shm_suite(void)
TCase *tc;
Suite *s = suite_create("shm");
- add_tcase(s, tc, test_ipc_txrx_shm_timeout, 30);
- add_tcase(s, tc, test_ipc_server_fail_shm, 8);
- add_tcase(s, tc, test_ipc_txrx_shm_block, 8);
- add_tcase(s, tc, test_ipc_txrx_shm_tmo, 8);
- add_tcase(s, tc, test_ipc_fc_shm, 8);
- add_tcase(s, tc, test_ipc_dispatch_shm, 16);
- add_tcase(s, tc, test_ipc_stress_test_shm, 16);
- add_tcase(s, tc, test_ipc_bulk_events_shm, 16);
- add_tcase(s, tc, test_ipc_exit_shm, 8);
- add_tcase(s, tc, test_ipc_event_on_created_shm, 10);
- add_tcase(s, tc, test_ipc_service_ref_count_shm, 10);
- add_tcase(s, tc, test_ipc_stress_connections_shm, 3600);
+ add_tcase(s, tc, test_ipc_txrx_shm_timeout, 28);
+ add_tcase(s, tc, test_ipc_server_fail_shm, 7);
+ add_tcase(s, tc, test_ipc_txrx_shm_block, 7);
+ add_tcase(s, tc, test_ipc_txrx_shm_tmo, 7);
+ add_tcase(s, tc, test_ipc_fc_shm, 7);
+ add_tcase(s, tc, test_ipc_dispatch_shm, 15);
+ add_tcase(s, tc, test_ipc_stress_test_shm, 15);
+ add_tcase(s, tc, test_ipc_bulk_events_shm, 15);
+ add_tcase(s, tc, test_ipc_exit_shm, 6);
+ add_tcase(s, tc, test_ipc_event_on_created_shm, 9);
+ add_tcase(s, tc, test_ipc_service_ref_count_shm, 9);
+ add_tcase(s, tc, test_ipc_stress_connections_shm, 3600 /* ? */);
#ifdef HAVE_FAILURE_INJECTION
add_tcase(s, tc, test_ipcc_truncate_when_unlink_fails_shm, 8);
@@ -1577,24 +1614,24 @@ make_soc_suite(void)
Suite *s = suite_create("socket");
TCase *tc;
- add_tcase(s, tc, test_ipc_txrx_us_timeout, 30);
+ add_tcase(s, tc, test_ipc_txrx_us_timeout, 28);
/* Commented out for the moment as space in /dev/shm on the CI machines
causes random failures */
/* add_tcase(s, tc, test_ipc_max_dgram_size, 30); */
- add_tcase(s, tc, test_ipc_server_fail_soc, 8);
- add_tcase(s, tc, test_ipc_txrx_us_block, 8);
- add_tcase(s, tc, test_ipc_txrx_us_tmo, 8);
- add_tcase(s, tc, test_ipc_fc_us, 8);
- add_tcase(s, tc, test_ipc_exit_us, 8);
- add_tcase(s, tc, test_ipc_dispatch_us, 16);
+ add_tcase(s, tc, test_ipc_server_fail_soc, 7);
+ add_tcase(s, tc, test_ipc_txrx_us_block, 7);
+ add_tcase(s, tc, test_ipc_txrx_us_tmo, 7);
+ add_tcase(s, tc, test_ipc_fc_us, 7);
+ add_tcase(s, tc, test_ipc_exit_us, 6);
+ add_tcase(s, tc, test_ipc_dispatch_us, 15);
#ifndef __clang__ /* see variable length array in structure' at the top */
- add_tcase(s, tc, test_ipc_stress_test_us, 60);
+ add_tcase(s, tc, test_ipc_stress_test_us, 58);
#endif
- add_tcase(s, tc, test_ipc_bulk_events_us, 16);
- add_tcase(s, tc, test_ipc_event_on_created_us, 10);
- add_tcase(s, tc, test_ipc_disconnect_after_created_us, 10);
- add_tcase(s, tc, test_ipc_service_ref_count_us, 10);
- add_tcase(s, tc, test_ipc_stress_connections_us, 3600);
+ add_tcase(s, tc, test_ipc_bulk_events_us, 15);
+ add_tcase(s, tc, test_ipc_event_on_created_us, 9);
+ add_tcase(s, tc, test_ipc_disconnect_after_created_us, 9);
+ add_tcase(s, tc, test_ipc_service_ref_count_us, 9);
+ add_tcase(s, tc, test_ipc_stress_connections_us, 3600 /* ? */);
return s;
}
--
2.22.0.rc3
From 248010aa7164305a3abbd6ee9e09a503e4d4034f Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jan=20Pokorn=C3=BD?= <jpokorny@redhat.com>
Date: Tue, 11 Jun 2019 15:41:50 +0200
Subject: [PATCH 3/8] tests: ipc: allow for easier tests debugging by
discerning PIDs/roles
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Roles specifications are currently not applied and are rather
a preparation for the actual meaningful use to come.
Signed-off-by: Jan Pokorný <jpokorny@redhat.com>
---
tests/check_ipc.c | 35 +++++++++++++++++++++++------------
1 file changed, 23 insertions(+), 12 deletions(-)
diff --git a/tests/check_ipc.c b/tests/check_ipc.c
index 37ef74d..852b8ca 100644
--- a/tests/check_ipc.c
+++ b/tests/check_ipc.c
@@ -464,8 +464,10 @@ NEW_PROCESS_RUNNER(run_ipc_server, ready_signaller, signaller_data)
}
static pid_t
-run_function_in_new_process(new_process_runner_fn new_process_runner)
+run_function_in_new_process(const char *role,
+ new_process_runner_fn new_process_runner)
{
+ char formatbuf[1024];
pid_t parent_target, pid;
struct sigaction orig_sa, purpose_sa;
@@ -492,6 +494,15 @@ run_function_in_new_process(new_process_runner_fn new_process_runner)
if (pid == 0) {
sigprocmask(SIG_SETMASK, &orig_mask, NULL);
+
+ if (role == NULL) {
+ qb_log_format_set(QB_LOG_STDERR, "lib/%f|%l[%P] %b");
+ } else {
+ snprintf(formatbuf, sizeof(formatbuf),
+ "lib/%%f|%%l|%s[%%P] %%b", role);
+ qb_log_format_set(QB_LOG_STDERR, formatbuf);
+ }
+
new_process_runner(usr1_signaller, &parent_target);
exit(0);
}
@@ -656,7 +667,7 @@ test_ipc_txrx_timeout(void)
pid_t pid;
uint32_t max_size = MAX_MSG_SIZE;
- pid = run_function_in_new_process(run_ipc_server);
+ pid = run_function_in_new_process("server", run_ipc_server);
fail_if(pid == -1);
do {
@@ -704,7 +715,7 @@ test_ipc_txrx(void)
pid_t pid;
uint32_t max_size = MAX_MSG_SIZE;
- pid = run_function_in_new_process(run_ipc_server);
+ pid = run_function_in_new_process("server", run_ipc_server);
fail_if(pid == -1);
do {
@@ -754,7 +765,7 @@ test_ipc_exit(void)
pid_t pid;
uint32_t max_size = MAX_MSG_SIZE;
- pid = run_function_in_new_process(run_ipc_server);
+ pid = run_function_in_new_process("server", run_ipc_server);
fail_if(pid == -1);
do {
@@ -916,7 +927,7 @@ test_ipc_dispatch(void)
int32_t size;
uint32_t max_size = MAX_MSG_SIZE;
- pid = run_function_in_new_process(run_ipc_server);
+ pid = run_function_in_new_process("server", run_ipc_server);
fail_if(pid == -1);
do {
@@ -1043,7 +1054,7 @@ test_ipc_stress_connections(void)
QB_LOG_FILTER_FILE, "*", LOG_INFO);
qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE);
- pid = run_function_in_new_process(run_ipc_server);
+ pid = run_function_in_new_process("server", run_ipc_server);
fail_if(pid == -1);
for (connections = 1; connections < 70000; connections++) {
@@ -1090,7 +1101,7 @@ test_ipc_bulk_events(void)
int32_t fd;
uint32_t max_size = MAX_MSG_SIZE;
- pid = run_function_in_new_process(run_ipc_server);
+ pid = run_function_in_new_process("server", run_ipc_server);
fail_if(pid == -1);
do {
@@ -1154,7 +1165,7 @@ test_ipc_stress_test(void)
int32_t real_buf_size;
enforce_server_buffer = 1;
- pid = run_function_in_new_process(run_ipc_server);
+ pid = run_function_in_new_process("server", run_ipc_server);
enforce_server_buffer = 0;
fail_if(pid == -1);
@@ -1256,7 +1267,7 @@ test_ipc_event_on_created(void)
num_bulk_events = 1;
- pid = run_function_in_new_process(run_ipc_server);
+ pid = run_function_in_new_process("server", run_ipc_server);
fail_if(pid == -1);
do {
@@ -1310,7 +1321,7 @@ test_ipc_disconnect_after_created(void)
int32_t res;
uint32_t max_size = MAX_MSG_SIZE;
- pid = run_function_in_new_process(run_ipc_server);
+ pid = run_function_in_new_process("server", run_ipc_server);
fail_if(pid == -1);
do {
@@ -1367,7 +1378,7 @@ test_ipc_server_fail(void)
pid_t pid;
uint32_t max_size = MAX_MSG_SIZE;
- pid = run_function_in_new_process(run_ipc_server);
+ pid = run_function_in_new_process("server", run_ipc_server);
fail_if(pid == -1);
do {
@@ -1496,7 +1507,7 @@ test_ipc_service_ref_count(void)
reference_count_test = QB_TRUE;
- pid = run_function_in_new_process(run_ipc_server);
+ pid = run_function_in_new_process("server", run_ipc_server);
fail_if(pid == -1);
do {
--
2.22.0.rc3
From 8c838db56650ca4b653e3060deeedea9e6c7329c Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jan=20Pokorn=C3=BD?= <jpokorny@redhat.com>
Date: Thu, 23 May 2019 15:33:20 +0200
Subject: [PATCH 4/8] tests: ipc: refactor/split test_ipc_dispatch part into
client_dispatch
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
This way, this core part can be easily reused where needed.
Note that "ready_signaller" similarity with run_ipc_server is not
accidental, following commit will justify it.
Signed-off-by: Jan Pokorný <jpokorny@redhat.com>
---
tests/check_ipc.c | 34 +++++++++++++++++++++++-----------
1 file changed, 23 insertions(+), 11 deletions(-)
diff --git a/tests/check_ipc.c b/tests/check_ipc.c
index 852b8ca..d75352f 100644
--- a/tests/check_ipc.c
+++ b/tests/check_ipc.c
@@ -918,22 +918,19 @@ struct my_res {
char message[1024 * 1024];
};
-static void
-test_ipc_dispatch(void)
+static inline
+NEW_PROCESS_RUNNER(client_dispatch, ready_signaller, signaller_data)
{
- int32_t j;
- int32_t c = 0;
- pid_t pid;
- int32_t size;
uint32_t max_size = MAX_MSG_SIZE;
-
- pid = run_function_in_new_process("server", run_ipc_server);
- fail_if(pid == -1);
+ int32_t size;
+ int32_t c = 0;
+ int32_t j;
+ pid_t server_pid = *((pid_t *) signaller_data);
do {
conn = qb_ipcc_connect(ipc_name, max_size);
if (conn == NULL) {
- j = waitpid(pid, NULL, WNOHANG);
+ j = waitpid(server_pid, NULL, WNOHANG);
ck_assert_int_eq(j, 0);
poll(NULL, 0, 400);
c++;
@@ -941,16 +938,31 @@ test_ipc_dispatch(void)
} while (conn == NULL && c < 5);
fail_if(conn == NULL);
+ if (ready_signaller != NULL) {
+ ready_signaller(signaller_data);
+ }
+
size = QB_MIN(sizeof(struct qb_ipc_request_header), 64);
for (j = 1; j < 19; j++) {
size *= 2;
if (size >= max_size)
break;
if (send_and_check(IPC_MSG_REQ_DISPATCH, size,
- recv_timeout, QB_TRUE) < 0) {
+ recv_timeout, QB_TRUE) < 0) {
break;
}
}
+}
+
+static void
+test_ipc_dispatch(void)
+{
+ pid_t pid;
+
+ pid = run_function_in_new_process(NULL, run_ipc_server);
+ fail_if(pid == -1);
+
+ client_dispatch(NULL, (void *) &pid);
request_server_exit();
qb_ipcc_disconnect(conn);
--
2.22.0.rc3
From 2aefe5318849394d4a49db41938d070cb5d198d2 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jan=20Pokorn=C3=BD?= <jpokorny@redhat.com>
Date: Tue, 11 Jun 2019 16:09:28 +0200
Subject: [PATCH 5/8] tests: ipc: check deadlock-like situation due to mixing
priorities
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Compared to the outer world, libqb brings rather unintuitive approach
to priorities within a native event loop (qbloop.h) -- it doesn't do
an exhaustive high-to-low priorities in a batched (clean-the-level)
manner, but rather linearly adds a possibility to pick the handling
task from the higher priority level as opposed to lower priority ones.
This has the advantage of limiting the chances of starvation and
deadlock opportunities in the incorrectly constructed SW, on the other
hand, it means that libqb is not fulfilling the architected intentions
regarding what deserves a priority truthfully, so these priorities are
worth just a hint rather than urgency-based separation.
And consequently, a discovery of these deadlocks etc. is deferred to
the (as Murphy's laws have it) least convenient moment, e.g., when
said native event loop is exchanged for other (this time priority
trully abiding, like GLib) implementation, while retaining the same
basic notion and high-level handling of priorities on libqb
side, in IPC server (service handling) context.
Hence, demonstration of such a degenerate blocking is not trivial,
and we must defer such other event loop implementation. After this
hassle, we are rewarded with a practical proof said "high-level
handling [...] in IPC server (service handling) context" contains
a bug (which we are going to subsequently fix) -- this is contrasted
with libqb's native loop implementation that works just fine even
prior that fix.
Signed-off-by: Jan Pokorný <jpokorny@redhat.com>
---
tests/Makefile.am | 5 +
tests/check_ipc.c | 584 +++++++++++++++++++++++++++++++++++++++++++---
2 files changed, 553 insertions(+), 36 deletions(-)
diff --git a/tests/Makefile.am b/tests/Makefile.am
index df1af81..da8f3a5 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -147,6 +147,11 @@ ipc_test_LDADD = $(top_builddir)/lib/libqb.la @CHECK_LIBS@
if HAVE_FAILURE_INJECTION
ipc_test_LDADD += _failure_injection.la
+if HAVE_GLIB
+ipc_test_CFLAGS += $(GLIB_CFLAGS)
+ipc_test_LDADD += $(GLIB_LIBS)
+endif
+
check_LTLIBRARIES += _failure_injection.la
_failure_injection_la_SOURCES = _failure_injection.c _failure_injection.h
_failure_injection_la_LDFLAGS = -module
diff --git a/tests/check_ipc.c b/tests/check_ipc.c
index d75352f..5ded7db 100644
--- a/tests/check_ipc.c
+++ b/tests/check_ipc.c
@@ -24,6 +24,12 @@
#include "os_base.h"
#include <sys/wait.h>
#include <signal.h>
+#include <stdbool.h>
+#include <fcntl.h>
+
+#ifdef HAVE_GLIB
+#include <glib.h>
+#endif
#include "check_common.h"
@@ -62,9 +68,12 @@ static const int MAX_MSG_SIZE = DEFAULT_MAX_MSG_SIZE;
* this the largests msg we can successfully send. */
#define GIANT_MSG_DATA_SIZE MAX_MSG_SIZE - sizeof(struct qb_ipc_response_header) - 8
-static int enforce_server_buffer=0;
+static int enforce_server_buffer;
static qb_ipcc_connection_t *conn;
static enum qb_ipc_type ipc_type;
+static enum qb_loop_priority global_loop_prio = QB_LOOP_MED;
+static bool global_use_glib;
+static int global_pipefd[2];
enum my_msg_ids {
IPC_MSG_REQ_TX_RX,
@@ -75,12 +84,92 @@ enum my_msg_ids {
IPC_MSG_RES_BULK_EVENTS,
IPC_MSG_REQ_STRESS_EVENT,
IPC_MSG_RES_STRESS_EVENT,
+ IPC_MSG_REQ_SELF_FEED,
+ IPC_MSG_RES_SELF_FEED,
IPC_MSG_REQ_SERVER_FAIL,
IPC_MSG_RES_SERVER_FAIL,
IPC_MSG_REQ_SERVER_DISCONNECT,
IPC_MSG_RES_SERVER_DISCONNECT,
};
+
+/* these 2 functions from pacemaker code */
+static enum qb_ipcs_rate_limit
+conv_libqb_prio2ratelimit(enum qb_loop_priority prio)
+{
+ /* this is an inversion of what libqb's qb_ipcs_request_rate_limit does */
+ enum qb_ipcs_rate_limit ret = QB_IPCS_RATE_NORMAL;
+ switch (prio) {
+ case QB_LOOP_LOW:
+ ret = QB_IPCS_RATE_SLOW;
+ break;
+ case QB_LOOP_HIGH:
+ ret = QB_IPCS_RATE_FAST;
+ break;
+ default:
+ qb_log(LOG_DEBUG, "Invalid libqb's loop priority %d,"
+ " assuming QB_LOOP_MED", prio);
+ /* fall-through */
+ case QB_LOOP_MED:
+ break;
+ }
+ return ret;
+}
+#ifdef HAVE_GLIB
+static gint
+conv_prio_libqb2glib(enum qb_loop_priority prio)
+{
+ gint ret = G_PRIORITY_DEFAULT;
+ switch (prio) {
+ case QB_LOOP_LOW:
+ ret = G_PRIORITY_LOW;
+ break;
+ case QB_LOOP_HIGH:
+ ret = G_PRIORITY_HIGH;
+ break;
+ default:
+ qb_log(LOG_DEBUG, "Invalid libqb's loop priority %d,"
+ " assuming QB_LOOP_MED", prio);
+ /* fall-through */
+ case QB_LOOP_MED:
+ break;
+ }
+ return ret;
+}
+
+/* these 3 glue functions inspired from pacemaker, too */
+static gboolean
+gio_source_prepare(GSource *source, gint *timeout)
+{
+ qb_enter();
+ *timeout = 500;
+ return FALSE;
+}
+static gboolean
+gio_source_check(GSource *source)
+{
+ qb_enter();
+ return TRUE;
+}
+static gboolean
+gio_source_dispatch(GSource *source, GSourceFunc callback, gpointer user_data)
+{
+ gboolean ret = G_SOURCE_CONTINUE;
+ qb_enter();
+ if (callback) {
+ ret = callback(user_data);
+ }
+ return ret;
+}
+static GSourceFuncs gio_source_funcs = {
+ .prepare = gio_source_prepare,
+ .check = gio_source_check,
+ .dispatch = gio_source_dispatch,
+};
+
+#endif
+
+
/* Test Cases
*
* 1) basic send & recv different message sizes
@@ -144,6 +233,61 @@ set_ipc_name(const char *prefix)
(unsigned long)getpid(), (unsigned) ((long) time(NULL) % (0x10000)));
}
+static int
+pipe_writer(int fd, int revents, void *data) {
+ qb_enter();
+ static const char buf[8] = { 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h' };
+
+ ssize_t wbytes = 0, wbytes_sum = 0;
+
+ //for (size_t i = 0; i < SIZE_MAX; i++) {
+ for (size_t i = 0; i < 4096; i++) {
+ wbytes_sum += wbytes;
+ if ((wbytes = write(fd, buf, sizeof(buf))) == -1) {
+ if (errno != EAGAIN) {
+ perror("write");
+ exit(-1);
+ }
+ break;
+ }
+ }
+ if (wbytes_sum > 0) {
+ qb_log(LOG_DEBUG, "written %zd bytes", wbytes_sum);
+ }
+ qb_leave();
+ return 1;
+}
+
+static int
+pipe_reader(int fd, int revents, void *data) {
+ qb_enter();
+ ssize_t rbytes, rbytes_sum = 0;
+ size_t cnt = SIZE_MAX;
+ char buf[4096] = { '\0' };
+ while ((rbytes = read(fd, buf, sizeof(buf))) > 0 && rbytes < cnt) {
+ cnt -= rbytes;
+ rbytes_sum += rbytes;
+ }
+ if (rbytes_sum > 0) {
+ fail_if(buf[0] == '\0'); /* avoid dead store elimination */
+ qb_log(LOG_DEBUG, "read %zd bytes", rbytes_sum);
+ sleep(1);
+ }
+ qb_leave();
+ return 1;
+}
+
+#if HAVE_GLIB
+static gboolean
+gio_pipe_reader(void *data) {
+ return (pipe_reader(*((int *) data), 0, NULL) > 0);
+}
+static gboolean
+gio_pipe_writer(void *data) {
+ return (pipe_writer(*((int *) data), 0, NULL) > 0);
+}
+#endif
+
static int32_t
s1_msg_process_fn(qb_ipcs_connection_t *c,
void *data, size_t size)
@@ -264,6 +408,39 @@ s1_msg_process_fn(qb_ipcs_connection_t *c,
giant_event_send.hdr.id++;
}
+ } else if (req_pt->id == IPC_MSG_REQ_SELF_FEED) {
+ if (pipe(global_pipefd) != 0) {
+ perror("pipefd");
+ fail_if(1);
+ }
+ fcntl(global_pipefd[0], F_SETFL, O_NONBLOCK);
+ fcntl(global_pipefd[1], F_SETFL, O_NONBLOCK);
+ if (global_use_glib) {
+#ifdef HAVE_GLIB
+ GSource *source_r, *source_w;
+ source_r = g_source_new(&gio_source_funcs, sizeof(GSource));
+ source_w = g_source_new(&gio_source_funcs, sizeof(GSource));
+ fail_if(source_r == NULL || source_w == NULL);
+ g_source_set_priority(source_r, conv_prio_libqb2glib(QB_LOOP_HIGH));
+ g_source_set_priority(source_w, conv_prio_libqb2glib(QB_LOOP_HIGH));
+ g_source_set_can_recurse(source_r, FALSE);
+ g_source_set_can_recurse(source_w, FALSE);
+ g_source_set_callback(source_r, gio_pipe_reader, &global_pipefd[0], NULL);
+ g_source_set_callback(source_w, gio_pipe_writer, &global_pipefd[1], NULL);
+ g_source_add_unix_fd(source_r, global_pipefd[0], G_IO_IN);
+ g_source_add_unix_fd(source_w, global_pipefd[1], G_IO_OUT);
+ g_source_attach(source_r, NULL);
+ g_source_attach(source_w, NULL);
+#else
+ fail_if(1);
+#endif
+ } else {
+ qb_loop_poll_add(my_loop, QB_LOOP_HIGH, global_pipefd[1],
+ POLLOUT|POLLERR, NULL, pipe_writer);
+ qb_loop_poll_add(my_loop, QB_LOOP_HIGH, global_pipefd[0],
+ POLLIN|POLLERR, NULL, pipe_reader);
+ }
+
} else if (req_pt->id == IPC_MSG_REQ_SERVER_FAIL) {
exit(0);
} else if (req_pt->id == IPC_MSG_REQ_SERVER_DISCONNECT) {
@@ -301,6 +478,122 @@ my_dispatch_del(int32_t fd)
return qb_loop_poll_del(my_loop, fd);
}
+
+/* taken from examples/ipcserver.c, with s/my_g/gio/ */
+#ifdef HAVE_GLIB
+
+#include <qb/qbarray.h>
+
+static qb_array_t *gio_map;
+static GMainLoop *glib_loop;
+
+struct gio_to_qb_poll {
+ int32_t is_used;
+ int32_t events;
+ int32_t source;
+ int32_t fd;
+ void *data;
+ qb_ipcs_dispatch_fn_t fn;
+ enum qb_loop_priority p;
+};
+
+static gboolean
+gio_read_socket(GIOChannel * gio, GIOCondition condition, gpointer data)
+{
+ struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
+ gint fd = g_io_channel_unix_get_fd(gio);
+
+ qb_enter();
+
+ return (adaptor->fn(fd, condition, adaptor->data) == 0);
+}
+
+static void
+gio_poll_destroy(gpointer data)
+{
+ struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
+
+ adaptor->is_used--;
+ if (adaptor->is_used == 0) {
+ qb_log(LOG_DEBUG, "fd %d adaptor destroyed\n", adaptor->fd);
+ adaptor->fd = 0;
+ adaptor->source = 0;
+ }
+}
+
+static int32_t
+gio_dispatch_update(enum qb_loop_priority p, int32_t fd, int32_t evts,
+ void *data, qb_ipcs_dispatch_fn_t fn, gboolean is_new)
+{
+ struct gio_to_qb_poll *adaptor;
+ GIOChannel *channel;
+ int32_t res = 0;
+
+ qb_enter();
+
+ res = qb_array_index(gio_map, fd, (void **)&adaptor);
+ if (res < 0) {
+ return res;
+ }
+ if (adaptor->is_used && adaptor->source) {
+ if (is_new) {
+ return -EEXIST;
+ }
+ g_source_remove(adaptor->source);
+ adaptor->source = 0;
+ }
+
+ channel = g_io_channel_unix_new(fd);
+ if (!channel) {
+ return -ENOMEM;
+ }
+
+ adaptor->fn = fn;
+ adaptor->events = evts;
+ adaptor->data = data;
+ adaptor->p = p;
+ adaptor->is_used++;
+ adaptor->fd = fd;
+
+ adaptor->source = g_io_add_watch_full(channel, conv_prio_libqb2glib(p),
+ evts, gio_read_socket, adaptor,
+ gio_poll_destroy);
+
+ /* we are handing the channel off to be managed by mainloop now.
+ * remove our reference. */
+ g_io_channel_unref(channel);
+
+ return 0;
+}
+
+static int32_t
+gio_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts,
+ void *data, qb_ipcs_dispatch_fn_t fn)
+{
+ return gio_dispatch_update(p, fd, evts, data, fn, TRUE);
+}
+
+static int32_t
+gio_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts,
+ void *data, qb_ipcs_dispatch_fn_t fn)
+{
+ return gio_dispatch_update(p, fd, evts, data, fn, FALSE);
+}
+
+static int32_t
+gio_dispatch_del(int32_t fd)
+{
+ struct gio_to_qb_poll *adaptor;
+ if (qb_array_index(gio_map, fd, (void **)&adaptor) == 0) {
+ g_source_remove(adaptor->source);
+ adaptor->source = 0;
+ }
+ return 0;
+}
+
+#endif /* HAVE_GLIB */
+
+
static int32_t
s1_connection_closed(qb_ipcs_connection_t *c)
{
@@ -412,13 +705,13 @@ READY_SIGNALLER(usr1_signaller, parent_target)
kill(*((pid_t *) parent_target), SIGUSR1);
}
-#define NEW_PROCESS_RUNNER(name, ready_signaller_arg, signaller_data_arg) \
+#define NEW_PROCESS_RUNNER(name, ready_signaller_arg, signaller_data_arg, data_arg) \
void (name)(ready_signaller_fn ready_signaller_arg, \
- void *signaller_data_arg)
-typedef NEW_PROCESS_RUNNER(new_process_runner_fn, , );
+ void *signaller_data_arg, void *data_arg)
+typedef NEW_PROCESS_RUNNER(new_process_runner_fn, , , );
static
-NEW_PROCESS_RUNNER(run_ipc_server, ready_signaller, signaller_data)
+NEW_PROCESS_RUNNER(run_ipc_server, ready_signaller, signaller_data, data)
{
int32_t res;
qb_loop_signal_handle handle;
@@ -431,12 +724,7 @@ NEW_PROCESS_RUNNER(run_ipc_server, ready_signaller, signaller_data)
.connection_closed = s1_connection_closed,
};
- struct qb_ipcs_poll_handlers ph = {
- .job_add = my_job_add,
- .dispatch_add = my_dispatch_add,
- .dispatch_mod = my_dispatch_mod,
- .dispatch_del = my_dispatch_del,
- };
+ struct qb_ipcs_poll_handlers ph;
uint32_t max_size = MAX_MSG_SIZE;
my_loop = qb_loop_create();
@@ -447,6 +735,33 @@ NEW_PROCESS_RUNNER(run_ipc_server, ready_signaller, signaller_data)
s1 = qb_ipcs_create(ipc_name, 4, ipc_type, &sh);
fail_if(s1 == 0);
+ if (global_loop_prio != QB_LOOP_MED) {
+ qb_ipcs_request_rate_limit(s1,
+ conv_libqb_prio2ratelimit(global_loop_prio));
+ }
+ if (global_use_glib) {
+#ifdef HAVE_GLIB
+ ph = (struct qb_ipcs_poll_handlers) {
+ .job_add = NULL,
+ .dispatch_add = gio_dispatch_add,
+ .dispatch_mod = gio_dispatch_mod,
+ .dispatch_del = gio_dispatch_del,
+ };
+ glib_loop = g_main_loop_new(NULL, FALSE);
+ gio_map = qb_array_create_2(16, sizeof(struct gio_to_qb_poll), 1);
+ fail_if (gio_map == NULL);
+#else
+ fail_if(1);
+#endif
+ } else {
+ ph = (struct qb_ipcs_poll_handlers) {
+ .job_add = my_job_add,
+ .dispatch_add = my_dispatch_add,
+ .dispatch_mod = my_dispatch_mod,
+ .dispatch_del = my_dispatch_del,
+ };
+ }
+
if (enforce_server_buffer) {
qb_ipcs_enforce_buffer_size(s1, max_size);
}
@@ -459,13 +774,20 @@ NEW_PROCESS_RUNNER(run_ipc_server, ready_signaller, signaller_data)
ready_signaller(signaller_data);
}
- qb_loop_run(my_loop);
+ if (global_use_glib) {
+#ifdef HAVE_GLIB
+ g_main_loop_run(glib_loop);
+#endif
+ } else {
+ qb_loop_run(my_loop);
+ }
qb_log(LOG_DEBUG, "loop finished - done ...");
}
static pid_t
run_function_in_new_process(const char *role,
- new_process_runner_fn new_process_runner)
+ new_process_runner_fn new_process_runner,
+ void *data)
{
char formatbuf[1024];
pid_t parent_target, pid;
@@ -503,7 +825,7 @@ run_function_in_new_process(const char *role,
qb_log_format_set(QB_LOG_STDERR, formatbuf);
}
- new_process_runner(usr1_signaller, &parent_target);
+ new_process_runner(usr1_signaller, &parent_target, data);
exit(0);
}
@@ -667,7 +989,7 @@ test_ipc_txrx_timeout(void)
pid_t pid;
uint32_t max_size = MAX_MSG_SIZE;
- pid = run_function_in_new_process("server", run_ipc_server);
+ pid = run_function_in_new_process("server", run_ipc_server, NULL);
fail_if(pid == -1);
do {
@@ -715,7 +1037,7 @@ test_ipc_txrx(void)
pid_t pid;
uint32_t max_size = MAX_MSG_SIZE;
- pid = run_function_in_new_process("server", run_ipc_server);
+ pid = run_function_in_new_process("server", run_ipc_server, NULL);
fail_if(pid == -1);
do {
@@ -765,7 +1087,7 @@ test_ipc_exit(void)
pid_t pid;
uint32_t max_size = MAX_MSG_SIZE;
- pid = run_function_in_new_process("server", run_ipc_server);
+ pid = run_function_in_new_process("server", run_ipc_server, NULL);
fail_if(pid == -1);
do {
@@ -918,14 +1240,21 @@ struct my_res {
char message[1024 * 1024];
};
+struct dispatch_data {
+ pid_t server_pid;
+ enum my_msg_ids msg_type;
+ uint32_t repetitions;
+};
+
static inline
-NEW_PROCESS_RUNNER(client_dispatch, ready_signaller, signaller_data)
+NEW_PROCESS_RUNNER(client_dispatch, ready_signaller, signaller_data, data)
{
uint32_t max_size = MAX_MSG_SIZE;
int32_t size;
int32_t c = 0;
int32_t j;
- pid_t server_pid = *((pid_t *) signaller_data);
+ pid_t server_pid = ((struct dispatch_data *) data)->server_pid;
+ enum my_msg_ids msg_type = ((struct dispatch_data *) data)->msg_type;
do {
conn = qb_ipcc_connect(ipc_name, max_size);
@@ -943,13 +1272,17 @@ NEW_PROCESS_RUNNER(client_dispatch, ready_signaller, signaller_data)
}
size = QB_MIN(sizeof(struct qb_ipc_request_header), 64);
- for (j = 1; j < 19; j++) {
- size *= 2;
- if (size >= max_size)
- break;
- if (send_and_check(IPC_MSG_REQ_DISPATCH, size,
- recv_timeout, QB_TRUE) < 0) {
- break;
+
+ for (uint32_t r = ((struct dispatch_data *) data)->repetitions;
+ r > 0; r--) {
+ for (j = 1; j < 19; j++) {
+ size *= 2;
+ if (size >= max_size)
+ break;
+ if (send_and_check(msg_type, size,
+ recv_timeout, QB_TRUE) < 0) {
+ break;
+ }
}
}
}
@@ -958,11 +1291,15 @@ static void
test_ipc_dispatch(void)
{
pid_t pid;
+ struct dispatch_data data;
- pid = run_function_in_new_process(NULL, run_ipc_server);
+ pid = run_function_in_new_process(NULL, run_ipc_server, NULL);
fail_if(pid == -1);
+ data = (struct dispatch_data){.server_pid = pid,
+ .msg_type = IPC_MSG_REQ_DISPATCH,
+ .repetitions = 1};
- client_dispatch(NULL, (void *) &pid);
+ client_dispatch(NULL, NULL, (void *) &data);
request_server_exit();
qb_ipcc_disconnect(conn);
@@ -1066,7 +1403,7 @@ test_ipc_stress_connections(void)
QB_LOG_FILTER_FILE, "*", LOG_INFO);
qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE);
- pid = run_function_in_new_process("server", run_ipc_server);
+ pid = run_function_in_new_process("server", run_ipc_server, NULL);
fail_if(pid == -1);
for (connections = 1; connections < 70000; connections++) {
@@ -1113,7 +1450,7 @@ test_ipc_bulk_events(void)
int32_t fd;
uint32_t max_size = MAX_MSG_SIZE;
- pid = run_function_in_new_process("server", run_ipc_server);
+ pid = run_function_in_new_process("server", run_ipc_server, NULL);
fail_if(pid == -1);
do {
@@ -1177,7 +1514,7 @@ test_ipc_stress_test(void)
int32_t real_buf_size;
enforce_server_buffer = 1;
- pid = run_function_in_new_process("server", run_ipc_server);
+ pid = run_function_in_new_process("server", run_ipc_server, NULL);
enforce_server_buffer = 0;
fail_if(pid == -1);
@@ -1266,6 +1603,93 @@ START_TEST(test_ipc_bulk_events_us)
}
END_TEST
+static
+READY_SIGNALLER(connected_signaller, _)
+{
+ request_server_exit();
+}
+
+START_TEST(test_ipc_dispatch_us_native_prio_deadlock_provoke)
+{
+ pid_t server_pid, alphaclient_pid;
+ struct dispatch_data data;
+
+ qb_enter();
+ ipc_type = QB_IPC_SOCKET;
+ set_ipc_name(__func__);
+
+ /* this is to demonstrate that native event loop can deal even
+ with "extreme" priority disproportions */
+ global_loop_prio = QB_LOOP_LOW;
+ multiple_connections = QB_TRUE;
+ recv_timeout = -1;
+
+ server_pid = run_function_in_new_process("server", run_ipc_server,
+ NULL);
+ fail_if(server_pid == -1);
+ data = (struct dispatch_data){.server_pid = server_pid,
+ .msg_type = IPC_MSG_REQ_SELF_FEED,
+ .repetitions = 1};
+ alphaclient_pid = run_function_in_new_process("alphaclient",
+ client_dispatch,
+ (void *) &data);
+ fail_if(alphaclient_pid == -1);
+
+ //sleep(1);
+ sched_yield();
+
+ data.repetitions = 0;
+ client_dispatch(connected_signaller, NULL, (void *) &data);
+ verify_graceful_stop(server_pid);
+
+ multiple_connections = QB_FALSE;
+ qb_leave();
+}
+END_TEST
+
+#if HAVE_GLIB
+START_TEST(test_ipc_dispatch_us_glib_prio_deadlock_provoke)
+{
+ pid_t server_pid, alphaclient_pid;
+ struct dispatch_data data;
+
+ qb_enter();
+ ipc_type = QB_IPC_SOCKET;
+ set_ipc_name(__func__);
+
+ global_use_glib = QB_TRUE;
+ /* this is to make the test pass at all, since GLib is strict
+ on priorities -- QB_LOOP_MED or lower would fail for sure */
+ global_loop_prio = QB_LOOP_HIGH;
+ multiple_connections = QB_TRUE;
+ recv_timeout = -1;
+
+ server_pid = run_function_in_new_process("server", run_ipc_server,
+ NULL);
+ fail_if(server_pid == -1);
+ data = (struct dispatch_data){.server_pid = server_pid,
+ .msg_type = IPC_MSG_REQ_SELF_FEED,
+ .repetitions = 1};
+ alphaclient_pid = run_function_in_new_process("alphaclient",
+ client_dispatch,
+ (void *) &data);
+ fail_if(alphaclient_pid == -1);
+
+ //sleep(1);
+ sched_yield();
+
+ data.repetitions = 0;
+ client_dispatch(connected_signaller, NULL, (void *) &data);
+ verify_graceful_stop(server_pid);
+
+ multiple_connections = QB_FALSE;
+ global_loop_prio = QB_LOOP_MED;
+ global_use_glib = QB_FALSE;
+ qb_leave();
+}
+END_TEST
+#endif
+
static void
test_ipc_event_on_created(void)
{
@@ -1279,7 +1703,7 @@ test_ipc_event_on_created(void)
num_bulk_events = 1;
- pid = run_function_in_new_process("server", run_ipc_server);
+ pid = run_function_in_new_process("server", run_ipc_server, NULL);
fail_if(pid == -1);
do {
@@ -1333,7 +1757,7 @@ test_ipc_disconnect_after_created(void)
int32_t res;
uint32_t max_size = MAX_MSG_SIZE;
- pid = run_function_in_new_process("server", run_ipc_server);
+ pid = run_function_in_new_process("server", run_ipc_server, NULL);
fail_if(pid == -1);
do {
@@ -1390,7 +1814,7 @@ test_ipc_server_fail(void)
pid_t pid;
uint32_t max_size = MAX_MSG_SIZE;
- pid = run_function_in_new_process("server", run_ipc_server);
+ pid = run_function_in_new_process("server", run_ipc_server, NULL);
fail_if(pid == -1);
do {
@@ -1457,6 +1881,87 @@ START_TEST(test_ipc_stress_connections_shm)
}
END_TEST
+START_TEST(test_ipc_dispatch_shm_native_prio_deadlock_provoke)
+{
+ pid_t server_pid, alphaclient_pid;
+ struct dispatch_data data;
+
+ qb_enter();
+ ipc_type = QB_IPC_SHM;
+ set_ipc_name(__func__);
+
+ /* this is to demonstrate that native event loop can deal even
+ with "extreme" priority disproportions */
+ global_loop_prio = QB_LOOP_LOW;
+ multiple_connections = QB_TRUE;
+ recv_timeout = -1;
+
+ server_pid = run_function_in_new_process("server", run_ipc_server,
+ NULL);
+ fail_if(server_pid == -1);
+ data = (struct dispatch_data){.server_pid = server_pid,
+ .msg_type = IPC_MSG_REQ_SELF_FEED,
+ .repetitions = 1};
+ alphaclient_pid = run_function_in_new_process("alphaclient",
+ client_dispatch,
+ (void *) &data);
+ fail_if(alphaclient_pid == -1);
+
+ //sleep(1);
+ sched_yield();
+
+ data.repetitions = 0;
+ client_dispatch(connected_signaller, NULL, (void *) &data);
+ verify_graceful_stop(server_pid);
+
+ multiple_connections = QB_FALSE;
+ qb_leave();
+}
+END_TEST
+
+#if HAVE_GLIB
+START_TEST(test_ipc_dispatch_shm_glib_prio_deadlock_provoke)
+{
+ pid_t server_pid, alphaclient_pid;
+ struct dispatch_data data;
+
+ qb_enter();
+ ipc_type = QB_IPC_SOCKET;
+ set_ipc_name(__func__);
+
+ global_use_glib = QB_TRUE;
+ /* this is to make the test pass at all, since GLib is strict
+ on priorities -- QB_LOOP_MED or lower would fail for sure */
+ global_loop_prio = QB_LOOP_HIGH;
+ multiple_connections = QB_TRUE;
+ recv_timeout = -1;
+
+ server_pid = run_function_in_new_process("server", run_ipc_server,
+ NULL);
+ fail_if(server_pid == -1);
+ data = (struct dispatch_data){.server_pid = server_pid,
+ .msg_type = IPC_MSG_REQ_SELF_FEED,
+ .repetitions = 1};
+ alphaclient_pid = run_function_in_new_process("alphaclient",
+ client_dispatch,
+ (void *) &data);
+ fail_if(alphaclient_pid == -1);
+
+ //sleep(1);
+ sched_yield();
+
+ data.repetitions = 0;
+ client_dispatch(connected_signaller, NULL, (void *) &data);
+ verify_graceful_stop(server_pid);
+
+ multiple_connections = QB_FALSE;
+ global_loop_prio = QB_LOOP_MED;
+ global_use_glib = QB_FALSE;
+ qb_leave();
+}
+END_TEST
+#endif
+
START_TEST(test_ipc_bulk_events_shm)
{
qb_enter();
@@ -1519,7 +2024,7 @@ test_ipc_service_ref_count(void)
reference_count_test = QB_TRUE;
- pid = run_function_in_new_process("server", run_ipc_server);
+ pid = run_function_in_new_process("server", run_ipc_server, NULL);
fail_if(pid == -1);
do {
@@ -1623,7 +2128,10 @@ make_shm_suite(void)
add_tcase(s, tc, test_ipc_event_on_created_shm, 9);
add_tcase(s, tc, test_ipc_service_ref_count_shm, 9);
add_tcase(s, tc, test_ipc_stress_connections_shm, 3600 /* ? */);
-
+ add_tcase(s, tc, test_ipc_dispatch_shm_native_prio_deadlock_provoke, 15);
+#if HAVE_GLIB
+ add_tcase(s, tc, test_ipc_dispatch_shm_glib_prio_deadlock_provoke, 15);
+#endif
#ifdef HAVE_FAILURE_INJECTION
add_tcase(s, tc, test_ipcc_truncate_when_unlink_fails_shm, 8);
#endif
@@ -1655,6 +2163,10 @@ make_soc_suite(void)
add_tcase(s, tc, test_ipc_disconnect_after_created_us, 9);
add_tcase(s, tc, test_ipc_service_ref_count_us, 9);
add_tcase(s, tc, test_ipc_stress_connections_us, 3600 /* ? */);
+ add_tcase(s, tc, test_ipc_dispatch_us_native_prio_deadlock_provoke, 15);
+#if HAVE_GLIB
+ add_tcase(s, tc, test_ipc_dispatch_us_glib_prio_deadlock_provoke, 15);
+#endif
return s;
}
--
2.22.0.rc3
From 6f6be63800b395506b3d3c0f1f2fc1093b4f863d Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jan=20Pokorn=C3=BD?= <jpokorny@redhat.com>
Date: Fri, 24 May 2019 22:18:55 +0200
Subject: [PATCH 6/8] IPC: server: avoid temporary channel priority loss, up to
deadlock-worth
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
It turns out that while 7f56f58 allowed for less blocking (thus
throughput increasing) initial handling of connections from clients
within the abstract (out-of-libqb managed) event loop, it unfortunately
subscribes itself back to such polling mechanism for UNIX-socket-check
with a default priority, which can be lower than desired (via explicit
qb_ipcs_request_rate_limit() configuration) for particular channel
(amongst attention-competing siblings in the pool, the term here
refers to associated communication, that is, both server and
on-server abstraction for particular clients). And priority-based
discrepancies are not forgiven in true priority abiding systems
(that is, unlikele with libqb's native event loop harness as detailed
in the previous commit, for which this would be soft-torelated hence
the problem would not be spotted in the first place -- but that's
expliicitly excluded from further discussion).
On top of that, it violates the natural assumption that once (single
threaded, which is imposed by libqb, at least between initial accept()
and after-said-UNIX-socket-check) server accepts the connection, it
shall rather take care of serving it (at least within stated initial
scope of client connection life cycle) rather than be rushing to accept
new ones -- which is exactly what used to happen previously once the
library user set the effectively priority in the abstract poll
above the default one.
It's conceivable, just as with the former case of attention-competing
siblings with higher priority whereby they could _infinitely_ live on
at the expense of starving the client in the initial handling phase
(authentication) despite the library user's as-high-as-siblings
intention (for using the default priority for that unconditionally
instead, which we address here), the dead lock is imminent also in
this latter accept-to-client-authentication-handling case as well
if there's an _unlimited_ fast-paced arrival queue (well, limited
by with number of allowable open descriptors within the system,
but for the Linux built-in maximum of 1M, there may be no practical
difference, at least for time-sensitive applications).
The only hope then is that such dead-locks are rather theoretical,
since a "spontaneous" constant stream of either communication on
unrelated, higher-prio sibling channels, or of new connection arrivals
can as well testify the poor design of the libqb's IPC application.
That being said, unconditional default priority in the isolated
context of initial server-side client authentication is clearly
a bug, but such application shall apply appropriate rate-limiting
measures (exactly on priority basis) to handle unexpected flux
nonetheless.
The fix makes test_ipc_dispatch_*_glib_prio_deadlock_provoke tests pass.
Signed-off-by: Jan Pokorný <jpokorny@redhat.com>
---
lib/ipc_setup.c | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git a/lib/ipc_setup.c b/lib/ipc_setup.c
index 3f53c4b..9aea411 100644
--- a/lib/ipc_setup.c
+++ b/lib/ipc_setup.c
@@ -843,10 +843,10 @@ qb_ipcs_uc_recv_and_auth(int32_t sock, struct qb_ipcs_service *s)
setsockopt(sock, SOL_SOCKET, SO_PASSCRED, &on, sizeof(on));
#endif
- res = s->poll_fns.dispatch_add(QB_LOOP_MED,
- data->sock,
- POLLIN | POLLPRI | POLLNVAL,
- data, process_auth);
+ res = s->poll_fns.dispatch_add(s->poll_priority,
+ data->sock,
+ POLLIN | POLLPRI | POLLNVAL,
+ data, process_auth);
if (res < 0) {
qb_util_log(LOG_DEBUG, "Failed to process AUTH for fd (%d)", data->sock);
close(sock);
--
2.22.0.rc3
From 87c90ee8a969472c638c83cddd4685d02a284265 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jan=20Pokorn=C3=BD?= <jpokorny@redhat.com>
Date: Thu, 16 May 2019 18:59:54 +0200
Subject: [PATCH 7/8] IPC: server: fix debug message wrt. what actually went
wrong
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
It's misleading towards a random code observer, at least,
hiding the fact that what failed is actually the queing up
of some handling to perform asynchronously in the future,
rather than invoking it synchronously right away.
Signed-off-by: Jan Pokorný <jpokorny@redhat.com>
---
lib/ipc_setup.c | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/lib/ipc_setup.c b/lib/ipc_setup.c
index 9aea411..7cd1fd9 100644
--- a/lib/ipc_setup.c
+++ b/lib/ipc_setup.c
@@ -848,7 +848,8 @@ qb_ipcs_uc_recv_and_auth(int32_t sock, struct qb_ipcs_service *s)
POLLIN | POLLPRI | POLLNVAL,
data, process_auth);
if (res < 0) {
- qb_util_log(LOG_DEBUG, "Failed to process AUTH for fd (%d)", data->sock);
+ qb_util_log(LOG_DEBUG, "Failed to arrange for AUTH for fd (%d)",
+ data->sock);
close(sock);
destroy_ipc_auth_data(data);
}
--
2.22.0.rc3
From e2d5be4d1e881fc118d00fe109cf8aff669c00c9 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jan=20Pokorn=C3=BD?= <jpokorny@redhat.com>
Date: Fri, 24 May 2019 14:52:09 +0200
Subject: [PATCH 8/8] doc: qbloop.h: document pros/cons of using built-in event
loop impl
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Make the qbipcs.h module interdependence clear (also shedding light to
some semantic dependencies) as well.
Signed-off-by: Jan Pokorný <jpokorny@redhat.com>
---
include/qb/qbipcs.h | 22 ++++++++++++++++++++++
include/qb/qbloop.h | 20 ++++++++++++++++++++
2 files changed, 42 insertions(+)
diff --git a/include/qb/qbipcs.h b/include/qb/qbipcs.h
index 55c0f81..7cb8586 100644
--- a/include/qb/qbipcs.h
+++ b/include/qb/qbipcs.h
@@ -43,6 +43,12 @@ extern "C" {
* @example ipcserver.c
*/
+/**
+ * Rates to be passed to #qb_ipcs_request_rate_limit. The exact interpretation
+ * depends on how the event loop implementation understands the concept of
+ * priorities, see the discussion at #qb_ipcs_poll_handlers structure -- an
+ * integration point between IPC server instance and the underlying event loop.
+ */
enum qb_ipcs_rate_limit {
QB_IPCS_RATE_FAST,
QB_IPCS_RATE_NORMAL,
@@ -104,6 +110,22 @@ typedef int32_t (*qb_ipcs_job_add_fn)(enum qb_loop_priority p,
void *data,
qb_loop_job_dispatch_fn dispatch_fn);
+/**
+ * A set of callbacks that need to be provided (only #job_add can be #NULL)
+ * whenever the IPC server is to be run (by the means of #qb_ipcs_run).
+ * It is possible to use accordingly named functions defined in qbloop.h module
+ * or integrate with other existing (like GLib's event loop) or entirely new
+ * code -- see the subtle distinction amongst the possible event loops pointed
+ * out in the introductory comment at qbloop.h.
+ *
+ * At that occasion, please note the correlation of #QB_IPCS_RATE_FAST etc.
+ * symbolic names with said advisory effect of the priorities in the native
+ * implementation. This correspondence will not be this intuitively seemless
+ * if some other event loop implementation is hooked in given that it abids
+ * them strictly as mentioned (e.g. GLib's event loop over poll'able sources).
+ * Differences between the two paradigms should also be accounted for when
+ * the requirement to swap the event loop implementations arises.
+ */
struct qb_ipcs_poll_handlers {
qb_ipcs_job_add_fn job_add;
qb_ipcs_dispatch_add_fn dispatch_add;
diff --git a/include/qb/qbloop.h b/include/qb/qbloop.h
index 6bded75..db0c480 100644
--- a/include/qb/qbloop.h
+++ b/include/qb/qbloop.h
@@ -36,6 +36,26 @@ extern "C" {
*
* Main loop manages timers, jobs and polling sockets.
*
+ * Only a weaker sense of priorities is implemented, alluding to distinct
+ * set of pros and cons compared to the stronger, strict approach to them
+ * as widely applied in this problem space (since the latter gives the
+ * application more control as the effect of the former can still be
+ * achieved with some reductions, whereas it is not straightforward the
+ * other way around; cf. static priority task scheduling vs. relative
+ * fine-tuning within a single priority domain with nice(2)):
+ *
+ * + implicit mitigation for deadlock-prone priority arrangements
+ *
+ * - less predictable (proportional probability based, we can talk
+ * about an advisory effect of the priorities) responses to the arrival
+ * of the high-ranked events (i.e. in the process of the picking the next
+ * event to handle from the priority queue when at least two different
+ * priorities are eligible at the moment)
+ *
+ * One practical application for this module of libqb is in combination with
+ * IPC servers based on qbipcs.h published one (the #qb_ipcs_poll_handlers
+ * structure maps fittingly to the control functions published here).
+ *
* @example tcpserver.c
*/
--
2.22.0.rc3