2f3c8edfdf
Resolves: bz#1390151 bz#1410145 bz#1429190 bz#1510752 bz#1511779 Resolves: bz#1570958 bz#1574490 bz#1595246 bz#1618669 bz#1661393 Resolves: bz#1668989 bz#1669020 Signed-off-by: Milind Changire <mchangir@redhat.com>
1212 lines
47 KiB
Diff
1212 lines
47 KiB
Diff
From 667e92a8dd0a21902cef39a59bc6c6b77d1f3c26 Mon Sep 17 00:00:00 2001
|
|
From: Raghavendra Gowdappa <rgowdapp@redhat.com>
|
|
Date: Mon, 11 Feb 2019 12:32:52 +0530
|
|
Subject: [PATCH 525/529] rpcsvc: provide each request handler thread its own
|
|
queue
|
|
|
|
A single global per program queue is contended by all request handler
|
|
threads and event threads. This can lead to high contention. So,
|
|
reduce the contention by providing each request handler thread its own
|
|
private queue.
|
|
|
|
Thanks to "Manoj Pillai"<mpillai@redhat.com> for the idea of pairing a
|
|
single queue with a fixed request-handler-thread and event-thread,
|
|
which brought down the performance regression due to overhead of
|
|
queuing significantly.
|
|
|
|
Thanks to "Xavi Hernandez"<xhernandez@redhat.com> for discussion on
|
|
how to communicate the event-thread death to request-handler-thread.
|
|
|
|
Thanks to "Karan Sandha"<ksandha@redhat.com> for voluntarily running
|
|
the perf benchmarks to qualify that performance regression introduced
|
|
by ping-timer-fixes is fixed with this patch and patiently running
|
|
many iterations of regression tests while RCAing the issue.
|
|
|
|
Thanks to "Milind Changire"<mchangir@redhat.com> for patiently running
|
|
the many iterations of perf benchmarking tests while RCAing the
|
|
regression caused by ping-timer-expiry fixes.
|
|
|
|
Change-Id: I578c3fc67713f4234bd3abbec5d3fbba19059ea5
|
|
BUG: 1390151
|
|
Signed-off-by: Raghavendra Gowdappa <rgowdapp@redhat.com>
|
|
(cherry picked from commit 95e380eca19b9f0d03a53429535f15556e5724ad)
|
|
Reviewed-on: https://code.engineering.redhat.com/gerrit/162427
|
|
Tested-by: RHGS Build Bot <nigelb@redhat.com>
|
|
---
|
|
cli/src/cli-rl.c | 4 +-
|
|
libglusterfs/src/event-epoll.c | 156 +++++++++---
|
|
libglusterfs/src/event-poll.c | 14 +-
|
|
libglusterfs/src/event.c | 11 +-
|
|
libglusterfs/src/event.h | 19 +-
|
|
rpc/rpc-lib/src/rpc-clnt.c | 6 +
|
|
rpc/rpc-lib/src/rpc-transport.c | 4 +
|
|
rpc/rpc-lib/src/rpc-transport.h | 3 +
|
|
rpc/rpc-lib/src/rpcsvc.c | 339 +++++++++++++++++++++++----
|
|
rpc/rpc-lib/src/rpcsvc.h | 32 ++-
|
|
rpc/rpc-transport/socket/src/socket.c | 29 ++-
|
|
xlators/protocol/server/src/server-helpers.c | 4 +
|
|
xlators/protocol/server/src/server.c | 3 +
|
|
13 files changed, 530 insertions(+), 94 deletions(-)
|
|
|
|
diff --git a/cli/src/cli-rl.c b/cli/src/cli-rl.c
|
|
index 4745cf4..cffd0a8 100644
|
|
--- a/cli/src/cli-rl.c
|
|
+++ b/cli/src/cli-rl.c
|
|
@@ -109,7 +109,7 @@ cli_rl_process_line (char *line)
|
|
|
|
int
|
|
cli_rl_stdin (int fd, int idx, int gen, void *data,
|
|
- int poll_out, int poll_in, int poll_err)
|
|
+ int poll_out, int poll_in, int poll_err, char event_thread_died)
|
|
{
|
|
struct cli_state *state = NULL;
|
|
|
|
@@ -394,7 +394,7 @@ cli_rl_enable (struct cli_state *state)
|
|
}
|
|
|
|
ret = event_register (state->ctx->event_pool, 0, cli_rl_stdin, state,
|
|
- 1, 0);
|
|
+ 1, 0, 0);
|
|
if (ret == -1)
|
|
goto out;
|
|
|
|
diff --git a/libglusterfs/src/event-epoll.c b/libglusterfs/src/event-epoll.c
|
|
index 7fc53ff..310bce3 100644
|
|
--- a/libglusterfs/src/event-epoll.c
|
|
+++ b/libglusterfs/src/event-epoll.c
|
|
@@ -32,6 +32,7 @@ struct event_slot_epoll {
|
|
int fd;
|
|
int events;
|
|
int gen;
|
|
+ int idx;
|
|
int ref;
|
|
int do_close;
|
|
int in_handler;
|
|
@@ -39,6 +40,7 @@ struct event_slot_epoll {
|
|
void *data;
|
|
event_handler_t handler;
|
|
gf_lock_t lock;
|
|
+ struct list_head poller_death;
|
|
};
|
|
|
|
struct event_thread_data {
|
|
@@ -60,6 +62,7 @@ __event_newtable (struct event_pool *event_pool, int table_idx)
|
|
for (i = 0; i < EVENT_EPOLL_SLOTS; i++) {
|
|
table[i].fd = -1;
|
|
LOCK_INIT (&table[i].lock);
|
|
+ INIT_LIST_HEAD(&table[i].poller_death);
|
|
}
|
|
|
|
event_pool->ereg[table_idx] = table;
|
|
@@ -70,7 +73,8 @@ __event_newtable (struct event_pool *event_pool, int table_idx)
|
|
|
|
|
|
static int
|
|
-__event_slot_alloc (struct event_pool *event_pool, int fd)
|
|
+__event_slot_alloc (struct event_pool *event_pool, int fd,
|
|
+ char notify_poller_death)
|
|
{
|
|
int i = 0;
|
|
int table_idx = -1;
|
|
@@ -105,34 +109,42 @@ __event_slot_alloc (struct event_pool *event_pool, int fd)
|
|
|
|
table_idx = i;
|
|
|
|
- for (i = 0; i < EVENT_EPOLL_SLOTS; i++) {
|
|
- if (table[i].fd == -1) {
|
|
- /* wipe everything except bump the generation */
|
|
- gen = table[i].gen;
|
|
- memset (&table[i], 0, sizeof (table[i]));
|
|
- table[i].gen = gen + 1;
|
|
-
|
|
- LOCK_INIT (&table[i].lock);
|
|
+ for (i = 0; i < EVENT_EPOLL_SLOTS; i++) {
|
|
+ if (table[i].fd == -1) {
|
|
+ /* wipe everything except bump the generation */
|
|
+ gen = table[i].gen;
|
|
+ memset (&table[i], 0, sizeof (table[i]));
|
|
+ table[i].gen = gen + 1;
|
|
+
|
|
+ LOCK_INIT (&table[i].lock);
|
|
+ INIT_LIST_HEAD(&table[i].poller_death);
|
|
+
|
|
+ table[i].fd = fd;
|
|
+ if (notify_poller_death) {
|
|
+ table[i].idx = table_idx * EVENT_EPOLL_SLOTS + i;
|
|
+ list_add_tail(&table[i].poller_death,
|
|
+ &event_pool->poller_death);
|
|
+ }
|
|
|
|
- table[i].fd = fd;
|
|
- event_pool->slots_used[table_idx]++;
|
|
+ event_pool->slots_used[table_idx]++;
|
|
|
|
- break;
|
|
- }
|
|
- }
|
|
+ break;
|
|
+ }
|
|
+ }
|
|
|
|
return table_idx * EVENT_EPOLL_SLOTS + i;
|
|
}
|
|
|
|
|
|
static int
|
|
-event_slot_alloc (struct event_pool *event_pool, int fd)
|
|
+event_slot_alloc (struct event_pool *event_pool, int fd,
|
|
+ char notify_poller_death)
|
|
{
|
|
int idx = -1;
|
|
|
|
pthread_mutex_lock (&event_pool->mutex);
|
|
{
|
|
- idx = __event_slot_alloc (event_pool, fd);
|
|
+ idx = __event_slot_alloc (event_pool, fd, notify_poller_death);
|
|
}
|
|
pthread_mutex_unlock (&event_pool->mutex);
|
|
|
|
@@ -162,6 +174,7 @@ __event_slot_dealloc (struct event_pool *event_pool, int idx)
|
|
slot->fd = -1;
|
|
slot->handled_error = 0;
|
|
slot->in_handler = 0;
|
|
+ list_del_init(&slot->poller_death);
|
|
event_pool->slots_used[table_idx]--;
|
|
|
|
return;
|
|
@@ -180,6 +193,23 @@ event_slot_dealloc (struct event_pool *event_pool, int idx)
|
|
return;
|
|
}
|
|
|
|
+static int
|
|
+event_slot_ref(struct event_slot_epoll *slot)
|
|
+{
|
|
+ int ref;
|
|
+
|
|
+ if (!slot)
|
|
+ return -1;
|
|
+
|
|
+ LOCK (&slot->lock);
|
|
+ {
|
|
+ slot->ref++;
|
|
+ ref = slot->ref;
|
|
+ }
|
|
+ UNLOCK (&slot->lock);
|
|
+
|
|
+ return ref;
|
|
+}
|
|
|
|
static struct event_slot_epoll *
|
|
event_slot_get (struct event_pool *event_pool, int idx)
|
|
@@ -198,15 +228,44 @@ event_slot_get (struct event_pool *event_pool, int idx)
|
|
|
|
slot = &table[offset];
|
|
|
|
+ event_slot_ref (slot);
|
|
+ return slot;
|
|
+}
|
|
+
|
|
+static void
|
|
+__event_slot_unref(struct event_pool *event_pool, struct event_slot_epoll *slot,
|
|
+ int idx)
|
|
+{
|
|
+ int ref = -1;
|
|
+ int fd = -1;
|
|
+ int do_close = 0;
|
|
+
|
|
LOCK (&slot->lock);
|
|
{
|
|
- slot->ref++;
|
|
+ --(slot->ref);
|
|
+ ref = slot->ref;
|
|
}
|
|
UNLOCK (&slot->lock);
|
|
|
|
- return slot;
|
|
-}
|
|
+ if (ref)
|
|
+ /* slot still alive */
|
|
+ goto done;
|
|
+
|
|
+ LOCK(&slot->lock);
|
|
+ {
|
|
+ fd = slot->fd;
|
|
+ do_close = slot->do_close;
|
|
+ slot->do_close = 0;
|
|
+ }
|
|
+ UNLOCK(&slot->lock);
|
|
+
|
|
+ __event_slot_dealloc(event_pool, idx);
|
|
|
|
+ if (do_close)
|
|
+ sys_close(fd);
|
|
+done:
|
|
+ return;
|
|
+}
|
|
|
|
static void
|
|
event_slot_unref (struct event_pool *event_pool, struct event_slot_epoll *slot,
|
|
@@ -264,7 +323,7 @@ event_pool_new_epoll (int count, int eventthreadcount)
|
|
event_pool->fd = epfd;
|
|
|
|
event_pool->count = count;
|
|
-
|
|
+ INIT_LIST_HEAD(&event_pool->poller_death);
|
|
event_pool->eventthreadcount = eventthreadcount;
|
|
event_pool->auto_thread_count = 0;
|
|
|
|
@@ -315,7 +374,8 @@ __slot_update_events (struct event_slot_epoll *slot, int poll_in, int poll_out)
|
|
int
|
|
event_register_epoll (struct event_pool *event_pool, int fd,
|
|
event_handler_t handler,
|
|
- void *data, int poll_in, int poll_out)
|
|
+ void *data, int poll_in, int poll_out,
|
|
+ char notify_poller_death)
|
|
{
|
|
int idx = -1;
|
|
int ret = -1;
|
|
@@ -345,7 +405,7 @@ event_register_epoll (struct event_pool *event_pool, int fd,
|
|
if (destroy == 1)
|
|
goto out;
|
|
|
|
- idx = event_slot_alloc (event_pool, fd);
|
|
+ idx = event_slot_alloc (event_pool, fd, notify_poller_death);
|
|
if (idx == -1) {
|
|
gf_msg ("epoll", GF_LOG_ERROR, 0, LG_MSG_SLOT_NOT_FOUND,
|
|
"could not find slot for fd=%d", fd);
|
|
@@ -583,7 +643,7 @@ pre_unlock:
|
|
ret = handler (fd, idx, gen, data,
|
|
(event->events & (EPOLLIN|EPOLLPRI)),
|
|
(event->events & (EPOLLOUT)),
|
|
- (event->events & (EPOLLERR|EPOLLHUP)));
|
|
+ (event->events & (EPOLLERR|EPOLLHUP)), 0);
|
|
}
|
|
out:
|
|
event_slot_unref (event_pool, slot, idx);
|
|
@@ -600,7 +660,10 @@ event_dispatch_epoll_worker (void *data)
|
|
struct event_thread_data *ev_data = data;
|
|
struct event_pool *event_pool;
|
|
int myindex = -1;
|
|
- int timetodie = 0;
|
|
+ int timetodie = 0, gen = 0;
|
|
+ struct list_head poller_death_notify;
|
|
+ struct event_slot_epoll *slot = NULL, *tmp = NULL;
|
|
+
|
|
|
|
GF_VALIDATE_OR_GOTO ("event", ev_data, out);
|
|
|
|
@@ -610,7 +673,7 @@ event_dispatch_epoll_worker (void *data)
|
|
GF_VALIDATE_OR_GOTO ("event", event_pool, out);
|
|
|
|
gf_msg ("epoll", GF_LOG_INFO, 0, LG_MSG_STARTED_EPOLL_THREAD, "Started"
|
|
- " thread with index %d", myindex);
|
|
+ " thread with index %d", myindex - 1);
|
|
|
|
pthread_mutex_lock (&event_pool->mutex);
|
|
{
|
|
@@ -627,21 +690,58 @@ event_dispatch_epoll_worker (void *data)
|
|
* reconfigured always */
|
|
pthread_mutex_lock (&event_pool->mutex);
|
|
{
|
|
- if (event_pool->eventthreadcount <
|
|
- myindex) {
|
|
+ if (event_pool->eventthreadcount < myindex) {
|
|
+ while (event_pool->poller_death_sliced) {
|
|
+ pthread_cond_wait(
|
|
+ &event_pool->cond,
|
|
+ &event_pool->mutex);
|
|
+ }
|
|
+
|
|
+ INIT_LIST_HEAD(&poller_death_notify);
|
|
+
|
|
/* if found true in critical section,
|
|
* die */
|
|
event_pool->pollers[myindex - 1] = 0;
|
|
event_pool->activethreadcount--;
|
|
timetodie = 1;
|
|
+ gen = ++event_pool->poller_gen;
|
|
+ list_for_each_entry(slot, &event_pool->poller_death,
|
|
+ poller_death)
|
|
+ {
|
|
+ event_slot_ref(slot);
|
|
+ }
|
|
+
|
|
+ list_splice_init(&event_pool->poller_death,
|
|
+ &poller_death_notify);
|
|
+ event_pool->poller_death_sliced = 1;
|
|
+
|
|
pthread_cond_broadcast (&event_pool->cond);
|
|
}
|
|
}
|
|
pthread_mutex_unlock (&event_pool->mutex);
|
|
if (timetodie) {
|
|
+ list_for_each_entry(slot, &poller_death_notify, poller_death)
|
|
+ {
|
|
+ slot->handler(slot->fd, 0, gen, slot->data, 0, 0, 0, 1);
|
|
+ }
|
|
+
|
|
+ pthread_mutex_lock(&event_pool->mutex);
|
|
+ {
|
|
+ list_for_each_entry_safe(slot, tmp, &poller_death_notify, poller_death)
|
|
+ {
|
|
+ __event_slot_unref(event_pool, slot, slot->idx);
|
|
+ }
|
|
+
|
|
+ list_splice(&poller_death_notify,
|
|
+ &event_pool->poller_death);
|
|
+ event_pool->poller_death_sliced = 0;
|
|
+ pthread_cond_broadcast(&event_pool->cond);
|
|
+ }
|
|
+ pthread_mutex_unlock(&event_pool->mutex);
|
|
+
|
|
gf_msg ("epoll", GF_LOG_INFO, 0,
|
|
LG_MSG_EXITED_EPOLL_THREAD, "Exited "
|
|
- "thread with index %d", myindex);
|
|
+ "thread with index %d", myindex - 1);
|
|
goto out;
|
|
}
|
|
}
|
|
diff --git a/libglusterfs/src/event-poll.c b/libglusterfs/src/event-poll.c
|
|
index 3bffc47..ca00071 100644
|
|
--- a/libglusterfs/src/event-poll.c
|
|
+++ b/libglusterfs/src/event-poll.c
|
|
@@ -36,12 +36,14 @@ struct event_slot_poll {
|
|
static int
|
|
event_register_poll (struct event_pool *event_pool, int fd,
|
|
event_handler_t handler,
|
|
- void *data, int poll_in, int poll_out);
|
|
+ void *data, int poll_in, int poll_out,
|
|
+ char notify_poller_death);
|
|
|
|
|
|
static int
|
|
__flush_fd (int fd, int idx, int gen, void *data,
|
|
- int poll_in, int poll_out, int poll_err)
|
|
+ int poll_in, int poll_out, int poll_err,
|
|
+ char notify_poller_death)
|
|
{
|
|
char buf[64];
|
|
int ret = -1;
|
|
@@ -153,7 +155,7 @@ event_pool_new_poll (int count, int eventthreadcount)
|
|
}
|
|
|
|
ret = event_register_poll (event_pool, event_pool->breaker[0],
|
|
- __flush_fd, NULL, 1, 0);
|
|
+ __flush_fd, NULL, 1, 0, 0);
|
|
if (ret == -1) {
|
|
gf_msg ("poll", GF_LOG_ERROR, 0, LG_MSG_REGISTER_PIPE_FAILED,
|
|
"could not register pipe fd with poll event loop");
|
|
@@ -180,7 +182,8 @@ event_pool_new_poll (int count, int eventthreadcount)
|
|
static int
|
|
event_register_poll (struct event_pool *event_pool, int fd,
|
|
event_handler_t handler,
|
|
- void *data, int poll_in, int poll_out)
|
|
+ void *data, int poll_in, int poll_out,
|
|
+ char notify_poller_death)
|
|
{
|
|
int idx = -1;
|
|
|
|
@@ -389,7 +392,8 @@ unlock:
|
|
ret = handler (ufds[i].fd, idx, 0, data,
|
|
(ufds[i].revents & (POLLIN|POLLPRI)),
|
|
(ufds[i].revents & (POLLOUT)),
|
|
- (ufds[i].revents & (POLLERR|POLLHUP|POLLNVAL)));
|
|
+ (ufds[i].revents & (POLLERR|POLLHUP|POLLNVAL)),
|
|
+ 0);
|
|
|
|
return ret;
|
|
}
|
|
diff --git a/libglusterfs/src/event.c b/libglusterfs/src/event.c
|
|
index bba6f84..8463c19 100644
|
|
--- a/libglusterfs/src/event.c
|
|
+++ b/libglusterfs/src/event.c
|
|
@@ -58,14 +58,16 @@ event_pool_new (int count, int eventthreadcount)
|
|
int
|
|
event_register (struct event_pool *event_pool, int fd,
|
|
event_handler_t handler,
|
|
- void *data, int poll_in, int poll_out)
|
|
+ void *data, int poll_in, int poll_out,
|
|
+ char notify_poller_death)
|
|
{
|
|
int ret = -1;
|
|
|
|
GF_VALIDATE_OR_GOTO ("event", event_pool, out);
|
|
|
|
ret = event_pool->ops->event_register (event_pool, fd, handler, data,
|
|
- poll_in, poll_out);
|
|
+ poll_in, poll_out,
|
|
+ notify_poller_death);
|
|
out:
|
|
return ret;
|
|
}
|
|
@@ -170,7 +172,8 @@ out:
|
|
|
|
int
|
|
poller_destroy_handler (int fd, int idx, int gen, void *data,
|
|
- int poll_out, int poll_in, int poll_err)
|
|
+ int poll_out, int poll_in, int poll_err,
|
|
+ char event_thread_exit)
|
|
{
|
|
struct event_destroy_data *destroy = NULL;
|
|
int readfd = -1, ret = -1;
|
|
@@ -239,7 +242,7 @@ event_dispatch_destroy (struct event_pool *event_pool)
|
|
/* From the main thread register an event on the pipe fd[0],
|
|
*/
|
|
idx = event_register (event_pool, fd[0], poller_destroy_handler,
|
|
- &data, 1, 0);
|
|
+ &data, 1, 0, 0);
|
|
if (idx < 0)
|
|
goto out;
|
|
|
|
diff --git a/libglusterfs/src/event.h b/libglusterfs/src/event.h
|
|
index c60b14a..875cd7d 100644
|
|
--- a/libglusterfs/src/event.h
|
|
+++ b/libglusterfs/src/event.h
|
|
@@ -12,6 +12,7 @@
|
|
#define _EVENT_H_
|
|
|
|
#include <pthread.h>
|
|
+#include "list.h"
|
|
|
|
struct event_pool;
|
|
struct event_ops;
|
|
@@ -24,7 +25,8 @@ struct event_data {
|
|
|
|
|
|
typedef int (*event_handler_t) (int fd, int idx, int gen, void *data,
|
|
- int poll_in, int poll_out, int poll_err);
|
|
+ int poll_in, int poll_out, int poll_err,
|
|
+ char event_thread_exit);
|
|
|
|
#define EVENT_EPOLL_TABLES 1024
|
|
#define EVENT_EPOLL_SLOTS 1024
|
|
@@ -41,6 +43,13 @@ struct event_pool {
|
|
struct event_slot_epoll *ereg[EVENT_EPOLL_TABLES];
|
|
int slots_used[EVENT_EPOLL_TABLES];
|
|
|
|
+ struct list_head poller_death;
|
|
+ int poller_death_sliced; /* track whether the list of fds interested
|
|
+ * poller_death is sliced. If yes, new thread
|
|
+ * death notification has to wait till the
|
|
+ * list is added back
|
|
+ */
|
|
+ int poller_gen;
|
|
int used;
|
|
int changed;
|
|
|
|
@@ -54,7 +63,7 @@ struct event_pool {
|
|
* epoll. */
|
|
int eventthreadcount; /* number of event threads to execute. */
|
|
pthread_t pollers[EVENT_MAX_THREADS]; /* poller thread_id store,
|
|
- * and live status */
|
|
+ * and live status */
|
|
int destroy;
|
|
int activethreadcount;
|
|
|
|
@@ -83,7 +92,8 @@ struct event_ops {
|
|
|
|
int (*event_register) (struct event_pool *event_pool, int fd,
|
|
event_handler_t handler,
|
|
- void *data, int poll_in, int poll_out);
|
|
+ void *data, int poll_in, int poll_out,
|
|
+ char notify_poller_death);
|
|
|
|
int (*event_select_on) (struct event_pool *event_pool, int fd, int idx,
|
|
int poll_in, int poll_out);
|
|
@@ -107,7 +117,8 @@ int event_select_on (struct event_pool *event_pool, int fd, int idx,
|
|
int poll_in, int poll_out);
|
|
int event_register (struct event_pool *event_pool, int fd,
|
|
event_handler_t handler,
|
|
- void *data, int poll_in, int poll_out);
|
|
+ void *data, int poll_in, int poll_out,
|
|
+ char notify_poller_death);
|
|
int event_unregister (struct event_pool *event_pool, int fd, int idx);
|
|
int event_unregister_close (struct event_pool *event_pool, int fd, int idx);
|
|
int event_dispatch (struct event_pool *event_pool);
|
|
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c
|
|
index fd7e3ec..fe5e3fd 100644
|
|
--- a/rpc/rpc-lib/src/rpc-clnt.c
|
|
+++ b/rpc/rpc-lib/src/rpc-clnt.c
|
|
@@ -1013,6 +1013,12 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata,
|
|
*/
|
|
ret = 0;
|
|
break;
|
|
+
|
|
+ case RPC_TRANSPORT_EVENT_THREAD_DIED:
|
|
+ /* only meaningful on a server, no need of handling this event on a
|
|
+ * client */
|
|
+ ret = 0;
|
|
+ break;
|
|
}
|
|
|
|
out:
|
|
diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c
|
|
index b737ff2..db02338 100644
|
|
--- a/rpc/rpc-lib/src/rpc-transport.c
|
|
+++ b/rpc/rpc-lib/src/rpc-transport.c
|
|
@@ -294,6 +294,10 @@ rpc_transport_load (glusterfs_ctx_t *ctx, dict_t *options, char *trans_name)
|
|
goto fail;
|
|
}
|
|
|
|
+ if (dict_get(options, "notify-poller-death")) {
|
|
+ trans->notify_poller_death = 1;
|
|
+ }
|
|
+
|
|
gf_log ("rpc-transport", GF_LOG_DEBUG,
|
|
"attempt to load file %s", name);
|
|
|
|
diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h
|
|
index c97f98d..cf77c9d 100644
|
|
--- a/rpc/rpc-lib/src/rpc-transport.h
|
|
+++ b/rpc/rpc-lib/src/rpc-transport.h
|
|
@@ -99,6 +99,7 @@ typedef enum {
|
|
RPC_TRANSPORT_MSG_RECEIVED, /* Complete rpc msg has been read */
|
|
RPC_TRANSPORT_CONNECT, /* client is connected to server */
|
|
RPC_TRANSPORT_MSG_SENT,
|
|
+ RPC_TRANSPORT_EVENT_THREAD_DIED /* event-thread has died */
|
|
} rpc_transport_event_t;
|
|
|
|
struct rpc_transport_msg {
|
|
@@ -218,6 +219,8 @@ struct rpc_transport {
|
|
*/
|
|
gf_boolean_t connect_failed;
|
|
gf_atomic_t disconnect_progress;
|
|
+ char notify_poller_death;
|
|
+ char poller_death_accept;
|
|
};
|
|
|
|
struct rpc_transport_ops {
|
|
diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c
|
|
index faa1956..c769463 100644
|
|
--- a/rpc/rpc-lib/src/rpcsvc.c
|
|
+++ b/rpc/rpc-lib/src/rpcsvc.c
|
|
@@ -8,6 +8,7 @@
|
|
cases as published by the Free Software Foundation.
|
|
*/
|
|
|
|
+#include <math.h>
|
|
#include "rpcsvc.h"
|
|
#include "rpc-transport.h"
|
|
#include "dict.h"
|
|
@@ -56,9 +57,76 @@ int
|
|
rpcsvc_notify (rpc_transport_t *trans, void *mydata,
|
|
rpc_transport_event_t event, void *data, ...);
|
|
|
|
+void *
|
|
+rpcsvc_request_handler(void *arg);
|
|
+
|
|
static int
|
|
rpcsvc_match_subnet_v4 (const char *addrtok, const char *ipaddr);
|
|
|
|
+void
|
|
+rpcsvc_toggle_queue_status(rpcsvc_program_t *prog,
|
|
+ rpcsvc_request_queue_t *queue, char status[])
|
|
+{
|
|
+ int queue_index = 0, status_index = 0, set_bit = 0;
|
|
+
|
|
+ if (queue != &prog->request_queue[0]) {
|
|
+ queue_index = (queue - &prog->request_queue[0]);
|
|
+ }
|
|
+
|
|
+ status_index = queue_index / 8;
|
|
+ set_bit = queue_index % 8;
|
|
+
|
|
+ status[status_index] ^= (1 << set_bit);
|
|
+
|
|
+ return;
|
|
+}
|
|
+
|
|
+static int
|
|
+get_rightmost_set_bit(int n)
|
|
+{
|
|
+ return log2(n & -n);
|
|
+}
|
|
+
|
|
+int
|
|
+rpcsvc_get_free_queue_index(rpcsvc_program_t *prog)
|
|
+{
|
|
+ int queue_index = 0, max_index = 0, i = 0;
|
|
+ unsigned int right_most_unset_bit = 0;
|
|
+
|
|
+ right_most_unset_bit = 8;
|
|
+
|
|
+ max_index = gf_roof(EVENT_MAX_THREADS, 8) / 8;
|
|
+ for (i = 0; i < max_index; i++) {
|
|
+ if (prog->request_queue_status[i] == 0) {
|
|
+ right_most_unset_bit = 0;
|
|
+ break;
|
|
+ } else {
|
|
+ right_most_unset_bit = get_rightmost_set_bit(
|
|
+ ~prog->request_queue_status[i]);
|
|
+ if (right_most_unset_bit < 8) {
|
|
+ break;
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+ if (right_most_unset_bit > 7) {
|
|
+ queue_index = -1;
|
|
+ } else {
|
|
+ queue_index = i * 8;
|
|
+ queue_index += right_most_unset_bit;
|
|
+
|
|
+ if (queue_index > EVENT_MAX_THREADS) {
|
|
+ queue_index = -1;
|
|
+ }
|
|
+ }
|
|
+
|
|
+ if (queue_index != -1) {
|
|
+ prog->request_queue_status[i] |= (0x1 << right_most_unset_bit);
|
|
+ }
|
|
+
|
|
+ return queue_index;
|
|
+}
|
|
+
|
|
rpcsvc_notify_wrapper_t *
|
|
rpcsvc_notify_wrapper_alloc (void)
|
|
{
|
|
@@ -412,7 +480,6 @@ rpcsvc_request_init (rpcsvc_t *svc, rpc_transport_t *trans,
|
|
req->progver = rpc_call_progver (callmsg);
|
|
req->procnum = rpc_call_progproc (callmsg);
|
|
req->trans = rpc_transport_ref (trans);
|
|
- gf_client_ref (req->trans->xl_private);
|
|
req->count = msg->count;
|
|
req->msg[0] = progmsg;
|
|
req->iobref = iobref_ref (msg->iobref);
|
|
@@ -570,6 +637,73 @@ rpcsvc_check_and_reply_error (int ret, call_frame_t *frame, void *opaque)
|
|
return 0;
|
|
}
|
|
|
|
+void
|
|
+rpcsvc_queue_event_thread_death(rpcsvc_t *svc, rpcsvc_program_t *prog, int gen)
|
|
+{
|
|
+ rpcsvc_request_queue_t *queue = NULL;
|
|
+ int num = 0;
|
|
+ void *value = NULL;
|
|
+ rpcsvc_request_t *req = NULL;
|
|
+ char empty = 0;
|
|
+
|
|
+ value = pthread_getspecific(prog->req_queue_key);
|
|
+ if (value == NULL) {
|
|
+ return;
|
|
+ }
|
|
+
|
|
+ num = ((unsigned long)value) - 1;
|
|
+
|
|
+ queue = &prog->request_queue[num];
|
|
+
|
|
+ if (queue->gen == gen) {
|
|
+ /* duplicate event */
|
|
+ gf_log(GF_RPCSVC, GF_LOG_INFO,
|
|
+ "not queuing duplicate event thread death. "
|
|
+ "queue %d program %s",
|
|
+ num, prog->progname);
|
|
+ return;
|
|
+ }
|
|
+
|
|
+ rpcsvc_alloc_request(svc, req);
|
|
+ req->prognum = RPCSVC_INFRA_PROGRAM;
|
|
+ req->procnum = RPCSVC_PROC_EVENT_THREAD_DEATH;
|
|
+ gf_log(GF_RPCSVC, GF_LOG_INFO,
|
|
+ "queuing event thread death request to queue %d of program %s", num,
|
|
+ prog->progname);
|
|
+
|
|
+ pthread_mutex_lock(&queue->queue_lock);
|
|
+ {
|
|
+ empty = list_empty(&queue->request_queue);
|
|
+
|
|
+ list_add_tail(&req->request_list, &queue->request_queue);
|
|
+ queue->gen = gen;
|
|
+
|
|
+ if (empty && queue->waiting)
|
|
+ pthread_cond_signal(&queue->queue_cond);
|
|
+ }
|
|
+ pthread_mutex_unlock(&queue->queue_lock);
|
|
+
|
|
+ return;
|
|
+}
|
|
+
|
|
+int
|
|
+rpcsvc_handle_event_thread_death(rpcsvc_t *svc, rpc_transport_t *trans, int gen)
|
|
+{
|
|
+ rpcsvc_program_t *prog = NULL;
|
|
+
|
|
+ pthread_mutex_lock (&svc->rpclock);
|
|
+ {
|
|
+ list_for_each_entry(prog, &svc->programs, program)
|
|
+ {
|
|
+ if (prog->ownthread)
|
|
+ rpcsvc_queue_event_thread_death(svc, prog, gen);
|
|
+ }
|
|
+ }
|
|
+ pthread_mutex_unlock (&svc->rpclock);
|
|
+
|
|
+ return 0;
|
|
+}
|
|
+
|
|
int
|
|
rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans,
|
|
rpc_transport_pollin_t *msg)
|
|
@@ -581,8 +715,12 @@ rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans,
|
|
uint16_t port = 0;
|
|
gf_boolean_t is_unix = _gf_false, empty = _gf_false;
|
|
gf_boolean_t unprivileged = _gf_false;
|
|
+ gf_boolean_t spawn_request_handler = _gf_false;
|
|
drc_cached_op_t *reply = NULL;
|
|
rpcsvc_drc_globals_t *drc = NULL;
|
|
+ rpcsvc_request_queue_t *queue = NULL;
|
|
+ long num = 0;
|
|
+ void *value = NULL;
|
|
|
|
if (!trans || !svc)
|
|
return -1;
|
|
@@ -696,20 +834,83 @@ rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans,
|
|
rpcsvc_check_and_reply_error, NULL,
|
|
req);
|
|
} else if (req->ownthread) {
|
|
- pthread_mutex_lock (&req->prog->queue_lock);
|
|
+ value = pthread_getspecific(req->prog->req_queue_key);
|
|
+ if (value == NULL) {
|
|
+ pthread_mutex_lock(&req->prog->thr_lock);
|
|
+ {
|
|
+ num = rpcsvc_get_free_queue_index(req->prog);
|
|
+ if (num != -1) {
|
|
+ num++;
|
|
+ value = (void *)num;
|
|
+ ret = pthread_setspecific(req->prog->req_queue_key,
|
|
+ value);
|
|
+ if (ret < 0) {
|
|
+ gf_log(GF_RPCSVC, GF_LOG_WARNING,
|
|
+ "setting request queue in TLS failed");
|
|
+ rpcsvc_toggle_queue_status(
|
|
+ req->prog, &req->prog->request_queue[num - 1],
|
|
+ req->prog->request_queue_status);
|
|
+ num = -1;
|
|
+ } else {
|
|
+ spawn_request_handler = 1;
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+ pthread_mutex_unlock(&req->prog->thr_lock);
|
|
+ }
|
|
+
|
|
+ if (num == -1)
|
|
+ goto noqueue;
|
|
+
|
|
+ num = ((unsigned long)value) - 1;
|
|
+
|
|
+ queue = &req->prog->request_queue[num];
|
|
+
|
|
+ if (spawn_request_handler) {
|
|
+ ret = gf_thread_create(&queue->thread, NULL,
|
|
+ rpcsvc_request_handler, queue,
|
|
+ "rpcrqhnd");
|
|
+ if (!ret) {
|
|
+ gf_log(GF_RPCSVC, GF_LOG_INFO,
|
|
+ "spawned a request handler "
|
|
+ "thread for queue %d",
|
|
+ (int)num);
|
|
+
|
|
+ req->prog->threadcount++;
|
|
+ } else {
|
|
+ gf_log(GF_RPCSVC, GF_LOG_INFO,
|
|
+ "spawning a request handler "
|
|
+ "thread for queue %d failed",
|
|
+ (int)num);
|
|
+ ret = pthread_setspecific(req->prog->req_queue_key, 0);
|
|
+ if (ret < 0) {
|
|
+ gf_log(GF_RPCSVC, GF_LOG_WARNING,
|
|
+ "resetting request "
|
|
+ "queue in TLS failed");
|
|
+ }
|
|
+
|
|
+ rpcsvc_toggle_queue_status(
|
|
+ req->prog, &req->prog->request_queue[num - 1],
|
|
+ req->prog->request_queue_status);
|
|
+
|
|
+ goto noqueue;
|
|
+ }
|
|
+ }
|
|
+
|
|
+ pthread_mutex_lock(&queue->queue_lock);
|
|
{
|
|
- empty = list_empty (&req->prog->request_queue);
|
|
+ empty = list_empty(&queue->request_queue);
|
|
|
|
- list_add_tail (&req->request_list,
|
|
- &req->prog->request_queue);
|
|
+ list_add_tail(&req->request_list, &queue->request_queue);
|
|
|
|
- if (empty)
|
|
- pthread_cond_signal (&req->prog->queue_cond);
|
|
+ if (empty && queue->waiting)
|
|
+ pthread_cond_signal(&queue->queue_cond);
|
|
}
|
|
- pthread_mutex_unlock (&req->prog->queue_lock);
|
|
+ pthread_mutex_unlock(&queue->queue_lock);
|
|
|
|
ret = 0;
|
|
} else {
|
|
+noqueue:
|
|
ret = actor_fn (req);
|
|
}
|
|
}
|
|
@@ -838,6 +1039,12 @@ rpcsvc_notify (rpc_transport_t *trans, void *mydata,
|
|
"got MAP_XID event, which should have not come");
|
|
ret = 0;
|
|
break;
|
|
+
|
|
+ case RPC_TRANSPORT_EVENT_THREAD_DIED:
|
|
+ rpcsvc_handle_event_thread_death(svc, trans,
|
|
+ (int)(unsigned long)data);
|
|
+ ret = 0;
|
|
+ break;
|
|
}
|
|
|
|
out:
|
|
@@ -1779,6 +1986,7 @@ rpcsvc_create_listeners (rpcsvc_t *svc, dict_t *options, char *name)
|
|
goto out;
|
|
}
|
|
|
|
+ dict_del(options, "notify-poller-death");
|
|
GF_FREE (transport_name);
|
|
transport_name = NULL;
|
|
count++;
|
|
@@ -1864,50 +2072,87 @@ out:
|
|
void *
|
|
rpcsvc_request_handler (void *arg)
|
|
{
|
|
- rpcsvc_program_t *program = arg;
|
|
- rpcsvc_request_t *req = NULL;
|
|
+ rpcsvc_request_queue_t *queue = NULL;
|
|
+ rpcsvc_program_t *program = NULL;
|
|
+ rpcsvc_request_t *req = NULL, *tmp_req = NULL;
|
|
rpcsvc_actor_t *actor = NULL;
|
|
gf_boolean_t done = _gf_false;
|
|
int ret = 0;
|
|
+ struct list_head tmp_list = {
|
|
+ 0,
|
|
+ };
|
|
+
|
|
+ queue = arg;
|
|
+ program = queue->program;
|
|
+
|
|
+ INIT_LIST_HEAD(&tmp_list);
|
|
|
|
if (!program)
|
|
return NULL;
|
|
|
|
while (1) {
|
|
- pthread_mutex_lock (&program->queue_lock);
|
|
+ pthread_mutex_lock(&queue->queue_lock);
|
|
{
|
|
- if (!program->alive
|
|
- && list_empty (&program->request_queue)) {
|
|
+ if (!program->alive && list_empty(&queue->request_queue)) {
|
|
done = 1;
|
|
goto unlock;
|
|
}
|
|
-
|
|
- while (list_empty (&program->request_queue))
|
|
- pthread_cond_wait (&program->queue_cond,
|
|
- &program->queue_lock);
|
|
-
|
|
- req = list_entry (program->request_queue.next,
|
|
- typeof (*req), request_list);
|
|
-
|
|
- list_del_init (&req->request_list);
|
|
+ while (list_empty(&queue->request_queue)) {
|
|
+ queue->waiting = _gf_true;
|
|
+ pthread_cond_wait(&queue->queue_cond, &queue->queue_lock);
|
|
+ }
|
|
+ queue->waiting = _gf_false;
|
|
+ if (!list_empty(&queue->request_queue)) {
|
|
+ INIT_LIST_HEAD(&tmp_list);
|
|
+ list_splice_init(&queue->request_queue, &tmp_list);
|
|
+ }
|
|
+ }
|
|
+unlock:
|
|
+ pthread_mutex_unlock(&queue->queue_lock);
|
|
+ list_for_each_entry_safe(req, tmp_req, &tmp_list, request_list)
|
|
+ {
|
|
+ list_del_init(&req->request_list);
|
|
+ if (req) {
|
|
+ if (req->prognum == RPCSVC_INFRA_PROGRAM) {
|
|
+ switch (req->procnum) {
|
|
+ case RPCSVC_PROC_EVENT_THREAD_DEATH:
|
|
+ gf_log(GF_RPCSVC, GF_LOG_INFO,
|
|
+ "event thread died, exiting request handler "
|
|
+ "thread for queue %d of program %s",
|
|
+ (int)(queue - &program->request_queue[0]),
|
|
+ program->progname);
|
|
+ done = 1;
|
|
+
|
|
+ pthread_mutex_lock(&program->thr_lock);
|
|
+ {
|
|
+ rpcsvc_toggle_queue_status(
|
|
+ program, queue,
|
|
+ program->request_queue_status);
|
|
+ program->threadcount--;
|
|
+ }
|
|
+ pthread_mutex_unlock(&program->thr_lock);
|
|
+ rpcsvc_request_destroy(req);
|
|
+ break;
|
|
+
|
|
+ default:
|
|
+ break;
|
|
+ }
|
|
+ } else {
|
|
+ THIS = req->svc->xl;
|
|
+ actor = rpcsvc_program_actor(req);
|
|
+ ret = actor->actor(req);
|
|
+
|
|
+ if (ret != 0) {
|
|
+ rpcsvc_check_and_reply_error(ret, NULL, req);
|
|
+ }
|
|
+
|
|
+ req = NULL;
|
|
+ }
|
|
+ }
|
|
}
|
|
- unlock:
|
|
- pthread_mutex_unlock (&program->queue_lock);
|
|
-
|
|
if (done)
|
|
break;
|
|
-
|
|
- THIS = req->svc->xl;
|
|
-
|
|
- actor = rpcsvc_program_actor (req);
|
|
-
|
|
- ret = actor->actor (req);
|
|
-
|
|
- if (ret != 0) {
|
|
- rpcsvc_check_and_reply_error (ret, NULL, req);
|
|
- }
|
|
}
|
|
-
|
|
return NULL;
|
|
}
|
|
|
|
@@ -1917,6 +2162,7 @@ rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t *program)
|
|
int ret = -1;
|
|
rpcsvc_program_t *newprog = NULL;
|
|
char already_registered = 0;
|
|
+ int i = 0;
|
|
|
|
if (!svc) {
|
|
goto out;
|
|
@@ -1951,9 +2197,16 @@ rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t *program)
|
|
memcpy (newprog, program, sizeof (*program));
|
|
|
|
INIT_LIST_HEAD (&newprog->program);
|
|
- INIT_LIST_HEAD (&newprog->request_queue);
|
|
- pthread_mutex_init (&newprog->queue_lock, NULL);
|
|
- pthread_cond_init (&newprog->queue_cond, NULL);
|
|
+
|
|
+ for (i = 0; i < EVENT_MAX_THREADS; i++) {
|
|
+ INIT_LIST_HEAD(&newprog->request_queue[i].request_queue);
|
|
+ pthread_mutex_init(&newprog->request_queue[i].queue_lock, NULL);
|
|
+ pthread_cond_init(&newprog->request_queue[i].queue_cond, NULL);
|
|
+ newprog->request_queue[i].program = newprog;
|
|
+ }
|
|
+
|
|
+ pthread_mutex_init(&newprog->thr_lock, NULL);
|
|
+ pthread_cond_init(&newprog->thr_cond, NULL);
|
|
|
|
newprog->alive = _gf_true;
|
|
|
|
@@ -1962,9 +2215,11 @@ rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t *program)
|
|
newprog->ownthread = _gf_false;
|
|
|
|
if (newprog->ownthread) {
|
|
- gf_thread_create (&newprog->thread, NULL,
|
|
- rpcsvc_request_handler,
|
|
- newprog, "reqhnd");
|
|
+ struct event_pool *ep = svc->ctx->event_pool;
|
|
+ newprog->eventthreadcount = ep->eventthreadcount;
|
|
+
|
|
+ pthread_key_create(&newprog->req_queue_key, NULL);
|
|
+ newprog->thr_queue = 1;
|
|
}
|
|
|
|
pthread_mutex_lock (&svc->rpclock);
|
|
diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h
|
|
index 58c0055..f500bab 100644
|
|
--- a/rpc/rpc-lib/src/rpcsvc.h
|
|
+++ b/rpc/rpc-lib/src/rpcsvc.h
|
|
@@ -33,6 +33,16 @@
|
|
#define MAX_IOVEC 16
|
|
#endif
|
|
|
|
+/* TODO: we should store prognums at a centralized location to avoid conflict
|
|
+ or use a robust random number generator to avoid conflicts
|
|
+*/
|
|
+
|
|
+#define RPCSVC_INFRA_PROGRAM 7712846 /* random number */
|
|
+
|
|
+typedef enum {
|
|
+ RPCSVC_PROC_EVENT_THREAD_DEATH = 0,
|
|
+} rpcsvc_infra_procnum_t;
|
|
+
|
|
#define RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT 64 /* Default for protocol/server */
|
|
#define RPCSVC_DEF_NFS_OUTSTANDING_RPC_LIMIT 16 /* Default for nfs/server */
|
|
#define RPCSVC_MAX_OUTSTANDING_RPC_LIMIT 65536
|
|
@@ -349,6 +359,16 @@ typedef struct rpcsvc_actor_desc {
|
|
drc_op_type_t op_type;
|
|
} rpcsvc_actor_t;
|
|
|
|
+typedef struct rpcsvc_request_queue {
|
|
+ int gen;
|
|
+ struct list_head request_queue;
|
|
+ pthread_mutex_t queue_lock;
|
|
+ pthread_cond_t queue_cond;
|
|
+ pthread_t thread;
|
|
+ struct rpcsvc_program *program;
|
|
+ gf_boolean_t waiting;
|
|
+} rpcsvc_request_queue_t;
|
|
+
|
|
/* Describes a program and its version along with the function pointers
|
|
* required to handle the procedures/actors of each program/version.
|
|
* Never changed ever by any thread so no need for a lock.
|
|
@@ -409,10 +429,14 @@ struct rpcsvc_program {
|
|
gf_boolean_t synctask;
|
|
/* list member to link to list of registered services with rpcsvc */
|
|
struct list_head program;
|
|
- struct list_head request_queue;
|
|
- pthread_mutex_t queue_lock;
|
|
- pthread_cond_t queue_cond;
|
|
- pthread_t thread;
|
|
+ rpcsvc_request_queue_t request_queue[EVENT_MAX_THREADS];
|
|
+ char request_queue_status[EVENT_MAX_THREADS / 8 + 1];
|
|
+ pthread_mutex_t thr_lock;
|
|
+ pthread_cond_t thr_cond;
|
|
+ int thr_queue;
|
|
+ pthread_key_t req_queue_key;
|
|
+ int threadcount;
|
|
+ int eventthreadcount;
|
|
};
|
|
|
|
typedef struct rpcsvc_cbk_program {
|
|
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
|
|
index e28c5cd..df984f8 100644
|
|
--- a/rpc/rpc-transport/socket/src/socket.c
|
|
+++ b/rpc/rpc-transport/socket/src/socket.c
|
|
@@ -2419,7 +2419,8 @@ static int socket_disconnect (rpc_transport_t *this, gf_boolean_t wait);
|
|
/* reads rpc_requests during pollin */
|
|
static int
|
|
socket_event_handler (int fd, int idx, int gen, void *data,
|
|
- int poll_in, int poll_out, int poll_err)
|
|
+ int poll_in, int poll_out, int poll_err,
|
|
+ char event_thread_died)
|
|
{
|
|
rpc_transport_t *this = NULL;
|
|
socket_private_t *priv = NULL;
|
|
@@ -2429,6 +2430,13 @@ socket_event_handler (int fd, int idx, int gen, void *data,
|
|
|
|
this = data;
|
|
|
|
+ if (event_thread_died) {
|
|
+ /* to avoid duplicate notifications,
|
|
+ * notify only for listener sockets
|
|
+ */
|
|
+ return 0;
|
|
+ }
|
|
+
|
|
GF_VALIDATE_OR_GOTO ("socket", this, out);
|
|
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
|
|
GF_VALIDATE_OR_GOTO ("socket", this->xl, out);
|
|
@@ -2720,7 +2728,8 @@ socket_spawn (rpc_transport_t *this)
|
|
|
|
static int
|
|
socket_server_event_handler (int fd, int idx, int gen, void *data,
|
|
- int poll_in, int poll_out, int poll_err)
|
|
+ int poll_in, int poll_out, int poll_err,
|
|
+ char event_thread_died)
|
|
{
|
|
rpc_transport_t *this = NULL;
|
|
socket_private_t *priv = NULL;
|
|
@@ -2742,6 +2751,12 @@ socket_server_event_handler (int fd, int idx, int gen, void *data,
|
|
priv = this->private;
|
|
ctx = this->ctx;
|
|
|
|
+ if (event_thread_died) {
|
|
+ rpc_transport_notify(this, RPC_TRANSPORT_EVENT_THREAD_DIED,
|
|
+ (void *)(unsigned long)gen);
|
|
+ return 0;
|
|
+ }
|
|
+
|
|
/* NOTE:
|
|
* We have done away with the critical section in this function. since
|
|
* there's little that it helps with. There's no other code that
|
|
@@ -2840,6 +2855,7 @@ socket_server_event_handler (int fd, int idx, int gen, void *data,
|
|
new_trans->mydata = this->mydata;
|
|
new_trans->notify = this->notify;
|
|
new_trans->listener = this;
|
|
+ new_trans->notify_poller_death = this->poller_death_accept;
|
|
new_priv = new_trans->private;
|
|
|
|
if (new_sockaddr.ss_family == AF_UNIX) {
|
|
@@ -2935,7 +2951,8 @@ socket_server_event_handler (int fd, int idx, int gen, void *data,
|
|
new_sock,
|
|
socket_event_handler,
|
|
new_trans,
|
|
- 1, 0);
|
|
+ 1, 0,
|
|
+ new_trans->notify_poller_death);
|
|
if (new_priv->idx == -1) {
|
|
ret = -1;
|
|
gf_log(this->name, GF_LOG_ERROR,
|
|
@@ -3388,7 +3405,8 @@ handler:
|
|
else {
|
|
priv->idx = event_register (ctx->event_pool, priv->sock,
|
|
socket_event_handler,
|
|
- this, 1, 1);
|
|
+ this, 1, 1,
|
|
+ this->notify_poller_death);
|
|
if (priv->idx == -1) {
|
|
gf_log ("", GF_LOG_WARNING,
|
|
"failed to register the event");
|
|
@@ -3560,7 +3578,8 @@ socket_listen (rpc_transport_t *this)
|
|
|
|
priv->idx = event_register (ctx->event_pool, priv->sock,
|
|
socket_server_event_handler,
|
|
- this, 1, 0);
|
|
+ this, 1, 0,
|
|
+ this->notify_poller_death);
|
|
|
|
if (priv->idx == -1) {
|
|
gf_log (this->name, GF_LOG_WARNING,
|
|
diff --git a/xlators/protocol/server/src/server-helpers.c b/xlators/protocol/server/src/server-helpers.c
|
|
index 7cc3d15..30045ef 100644
|
|
--- a/xlators/protocol/server/src/server-helpers.c
|
|
+++ b/xlators/protocol/server/src/server-helpers.c
|
|
@@ -557,6 +557,10 @@ get_frame_from_request (rpcsvc_request_t *req)
|
|
}
|
|
}
|
|
|
|
+ /* Add a ref for this fop */
|
|
+ if (client)
|
|
+ gf_client_ref (client);
|
|
+
|
|
frame->root->uid = req->uid;
|
|
frame->root->gid = req->gid;
|
|
frame->root->pid = req->pid;
|
|
diff --git a/xlators/protocol/server/src/server.c b/xlators/protocol/server/src/server.c
|
|
index ba3b831..d32f5dd 100644
|
|
--- a/xlators/protocol/server/src/server.c
|
|
+++ b/xlators/protocol/server/src/server.c
|
|
@@ -1342,6 +1342,9 @@ init (xlator_t *this)
|
|
ret = -1;
|
|
goto out;
|
|
}
|
|
+
|
|
+ ret = dict_set_int32(this->options, "notify-poller-death", 1);
|
|
+
|
|
ret = rpcsvc_create_listeners (conf->rpc, this->options,
|
|
this->name);
|
|
if (ret < 1) {
|
|
--
|
|
1.8.3.1
|
|
|