Non-blocking IO + Extended request debug logging

This commit is contained in:
Robbie Harwood 2017-08-22 19:27:03 +00:00
parent b9a828d680
commit 21c2f037de
3 changed files with 614 additions and 1 deletions

View File

@ -0,0 +1,485 @@
From 8a453fde5c655ef19663a01f3d1a017a247c3c85 Mon Sep 17 00:00:00 2001
From: Alexander Scheel <ascheel@redhat.com>
Date: Wed, 2 Aug 2017 15:11:49 -0400
Subject: [PATCH] [client] Switch to non-blocking sockets
Switch the gssproxy client library to non-blocking sockets, allowing
for timeout and retry operations. The client will automatically retry
both send() and recv() operations three times on ETIMEDOUT. If the
combined send() and recv() hit the three time limit, ETIMEDOUT will be
exposed to the caller in the minor status.
Signed-off-by: Alexander Scheel <ascheel@redhat.com>
Reviewed-by: Simo Sorce <simo@redhat.com>
[rharwood@redhat.com: commit message cleanups, rebased]
Reviewed-by: Robbie Harwood <rharwood@redhat.com>
(cherry picked from commit d035646c8feb0b78f0c157580ca02c46cd00dd7e)
---
proxy/src/client/gpm_common.c | 317 +++++++++++++++++++++++++++++++++++++++---
1 file changed, 295 insertions(+), 22 deletions(-)
diff --git a/proxy/src/client/gpm_common.c b/proxy/src/client/gpm_common.c
index 12b14ae..9a10c2f 100644
--- a/proxy/src/client/gpm_common.c
+++ b/proxy/src/client/gpm_common.c
@@ -7,9 +7,15 @@
#include <stdlib.h>
#include <time.h>
#include <pthread.h>
+#include <sys/epoll.h>
+#include <fcntl.h>
+#include <sys/timerfd.h>
#define FRAGMENT_BIT (1 << 31)
+#define RESPONSE_TIMEOUT 15
+#define MAX_TIMEOUT_RETRY 3
+
struct gpm_ctx {
pthread_mutex_t lock;
int fd;
@@ -20,6 +26,9 @@ struct gpm_ctx {
gid_t gid;
int next_xid;
+
+ int epollfd;
+ int timerfd;
};
/* a single global struct is not particularly efficient,
@@ -39,6 +48,8 @@ static void gpm_init_once(void)
pthread_mutex_init(&gpm_global_ctx.lock, &attr);
gpm_global_ctx.fd = -1;
+ gpm_global_ctx.epollfd = -1;
+ gpm_global_ctx.timerfd = -1;
seedp = time(NULL) + getpid() + pthread_self();
gpm_global_ctx.next_xid = rand_r(&seedp);
@@ -69,6 +80,7 @@ static int gpm_open_socket(struct gpm_ctx *gpmctx)
struct sockaddr_un addr = {0};
char name[PATH_MAX];
int ret;
+ unsigned flags;
int fd = -1;
ret = get_pipe_name(name);
@@ -86,6 +98,18 @@ static int gpm_open_socket(struct gpm_ctx *gpmctx)
goto done;
}
+ ret = fcntl(fd, F_GETFD, &flags);
+ if (ret != 0) {
+ ret = errno;
+ goto done;
+ }
+
+ ret = fcntl(fd, F_SETFD, flags | O_NONBLOCK);
+ if (ret != 0) {
+ ret = errno;
+ goto done;
+ }
+
ret = connect(fd, (struct sockaddr *)&addr, sizeof(addr));
if (ret == -1) {
ret = errno;
@@ -161,6 +185,158 @@ static int gpm_release_sock(struct gpm_ctx *gpmctx)
return pthread_mutex_unlock(&gpmctx->lock);
}
+static void gpm_timer_close(struct gpm_ctx *gpmctx) {
+ if (gpmctx->timerfd < 0) {
+ return;
+ }
+
+ close(gpmctx->timerfd);
+ gpmctx->timerfd = -1;
+}
+
+static int gpm_timer_setup(struct gpm_ctx *gpmctx, int timeout_seconds) {
+ int ret;
+ struct itimerspec its;
+
+ if (gpmctx->timerfd >= 0) {
+ gpm_timer_close(gpmctx);
+ }
+
+ gpmctx->timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
+ if (gpmctx->timerfd < 0) {
+ return errno;
+ }
+
+ its.it_interval.tv_sec = timeout_seconds;
+ its.it_interval.tv_nsec = 0;
+ its.it_value.tv_sec = timeout_seconds;
+ its.it_value.tv_nsec = 0;
+
+ ret = timerfd_settime(gpmctx->timerfd, 0, &its, NULL);
+ if (ret) {
+ ret = errno;
+ gpm_timer_close(gpmctx);
+ return ret;
+ }
+
+ return 0;
+}
+
+static void gpm_epoll_close(struct gpm_ctx *gpmctx) {
+ if (gpmctx->epollfd < 0) {
+ return;
+ }
+
+ close(gpmctx->epollfd);
+ gpmctx->epollfd = -1;
+}
+
+static int gpm_epoll_setup(struct gpm_ctx *gpmctx) {
+ struct epoll_event ev;
+ int ret;
+
+ if (gpmctx->epollfd >= 0) {
+ gpm_epoll_close(gpmctx);
+ }
+
+ gpmctx->epollfd = epoll_create1(EPOLL_CLOEXEC);
+ if (gpmctx->epollfd == -1) {
+ return errno;
+ }
+
+ /* Add timer */
+ ev.events = EPOLLIN;
+ ev.data.fd = gpmctx->timerfd;
+ ret = epoll_ctl(gpmctx->epollfd, EPOLL_CTL_ADD, gpmctx->timerfd, &ev);
+ if (ret == -1) {
+ ret = errno;
+ gpm_epoll_close(gpmctx);
+ return ret;
+ }
+
+ return ret;
+}
+
+static int gpm_epoll_wait(struct gpm_ctx *gpmctx, uint32_t event_flags) {
+ int ret;
+ int epoll_ret;
+ struct epoll_event ev;
+ struct epoll_event events[2];
+ uint64_t timer_read;
+
+ if (gpmctx->epollfd < 0) {
+ ret = gpm_epoll_setup(gpmctx);
+ if (ret)
+ return ret;
+ }
+
+ ev.events = event_flags;
+ ev.data.fd = gpmctx->fd;
+ epoll_ret = epoll_ctl(gpmctx->epollfd, EPOLL_CTL_ADD, gpmctx->fd, &ev);
+ if (epoll_ret == -1) {
+ ret = errno;
+ gpm_epoll_close(gpmctx);
+ return ret;
+ }
+
+ do {
+ epoll_ret = epoll_wait(gpmctx->epollfd, events, 2, -1);
+ } while (epoll_ret < 0 && errno == EINTR);
+
+ if (epoll_ret < 0) {
+ /* Error while waiting that isn't EINTR */
+ ret = errno;
+ gpm_epoll_close(gpmctx);
+ } else if (epoll_ret == 0) {
+ /* Shouldn't happen as timeout == -1; treat it like a timeout
+ * occurred. */
+ ret = ETIMEDOUT;
+ gpm_epoll_close(gpmctx);
+ } else if (epoll_ret == 1 && events[0].data.fd == gpmctx->timerfd) {
+ /* Got an event which is only our timer */
+ ret = read(gpmctx->timerfd, &timer_read, sizeof(uint64_t));
+ if (ret == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
+ /* In the case when reading from the timer failed, don't hide the
+ * timer error behind ETIMEDOUT such that it isn't retried */
+ ret = errno;
+ } else {
+ /* If ret == 0, then we definitely timed out. Else, if ret == -1
+ * and errno == EAGAIN or errno == EWOULDBLOCK, we're in a weird
+ * edge case where epoll thinks the timer can be read, but it
+ * is blocking more; treat it like a TIMEOUT and retry, as
+ * nothing around us would handle EAGAIN from timer and retry
+ * it. */
+ ret = ETIMEDOUT;
+ }
+ gpm_epoll_close(gpmctx);
+ } else {
+ /* If ret == 2, then we ignore the timerfd; that way if the next
+ * operation cannot be performed immediately, we timeout and retry.
+ * If ret == 1 and data.fd == gpmctx->fd, return 0. */
+ ret = 0;
+ }
+
+ epoll_ret = epoll_ctl(gpmctx->epollfd, EPOLL_CTL_DEL, gpmctx->fd, NULL);
+ if (epoll_ret == -1) {
+ /* If we previously had an error, expose that error instead of
+ * clobbering it with errno; else if no error, then assume it is
+ * better to notify of the error deleting the event than it is
+ * to continue. */
+ if (ret == 0)
+ ret = errno;
+ gpm_epoll_close(gpmctx);
+ }
+
+ return ret;
+}
+
+static int gpm_retry_socket(struct gpm_ctx *gpmctx)
+{
+ gpm_epoll_close(gpmctx);
+ gpm_close_socket(gpmctx);
+ return gpm_open_socket(gpmctx);
+}
+
/* must be called after the lock has been grabbed */
static int gpm_send_buffer(struct gpm_ctx *gpmctx,
char *buffer, uint32_t length)
@@ -181,8 +357,13 @@ static int gpm_send_buffer(struct gpm_ctx *gpmctx,
retry = false;
do {
do {
+ ret = gpm_epoll_wait(gpmctx, EPOLLOUT);
+ if (ret != 0) {
+ goto done;
+ }
+
ret = 0;
- wn = send(gpmctx->fd, &size, sizeof(uint32_t), MSG_NOSIGNAL);
+ wn = write(gpmctx->fd, &size, sizeof(uint32_t));
if (wn == -1) {
ret = errno;
}
@@ -190,8 +371,7 @@ static int gpm_send_buffer(struct gpm_ctx *gpmctx,
if (wn != 4) {
/* reopen and retry once */
if (retry == false) {
- gpm_close_socket(gpmctx);
- ret = gpm_open_socket(gpmctx);
+ ret = gpm_retry_socket(gpmctx);
if (ret == 0) {
retry = true;
continue;
@@ -206,9 +386,14 @@ static int gpm_send_buffer(struct gpm_ctx *gpmctx,
pos = 0;
while (length > pos) {
- wn = send(gpmctx->fd, buffer + pos, length - pos, MSG_NOSIGNAL);
+ ret = gpm_epoll_wait(gpmctx, EPOLLOUT);
+ if (ret) {
+ goto done;
+ }
+
+ wn = write(gpmctx->fd, buffer + pos, length - pos);
if (wn == -1) {
- if (errno == EINTR) {
+ if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
continue;
}
ret = errno;
@@ -229,7 +414,7 @@ done:
/* must be called after the lock has been grabbed */
static int gpm_recv_buffer(struct gpm_ctx *gpmctx,
- char *buffer, uint32_t *length)
+ char **buffer, uint32_t *length)
{
uint32_t size;
size_t rn;
@@ -237,6 +422,11 @@ static int gpm_recv_buffer(struct gpm_ctx *gpmctx,
int ret;
do {
+ ret = gpm_epoll_wait(gpmctx, EPOLLIN);
+ if (ret) {
+ goto done;
+ }
+
ret = 0;
rn = read(gpmctx->fd, &size, sizeof(uint32_t));
if (rn == -1) {
@@ -256,11 +446,22 @@ static int gpm_recv_buffer(struct gpm_ctx *gpmctx,
goto done;
}
+ *buffer = malloc(*length);
+ if (*buffer == NULL) {
+ ret = ENOMEM;
+ goto done;
+ }
+
pos = 0;
while (*length > pos) {
- rn = read(gpmctx->fd, buffer + pos, *length - pos);
+ ret = gpm_epoll_wait(gpmctx, EPOLLIN);
+ if (ret) {
+ goto done;
+ }
+
+ rn = read(gpmctx->fd, *buffer + pos, *length - pos);
if (rn == -1) {
- if (errno == EINTR) {
+ if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
continue;
}
ret = errno;
@@ -279,6 +480,7 @@ done:
if (ret) {
/* on errors we can only close the fd and return */
gpm_close_socket(gpmctx);
+ gpm_epoll_close(gpmctx);
}
return ret;
}
@@ -312,6 +514,63 @@ static struct gpm_ctx *gpm_get_ctx(void)
return &gpm_global_ctx;
}
+static int gpm_send_recv_loop(struct gpm_ctx *gpmctx, char *send_buffer,
+ uint32_t send_length, char** recv_buffer,
+ uint32_t *recv_length)
+{
+ int ret;
+ int retry_count;
+
+ /* setup timer */
+ ret = gpm_timer_setup(gpmctx, RESPONSE_TIMEOUT);
+ if (ret)
+ return ret;
+
+ for (retry_count = 0; retry_count < MAX_TIMEOUT_RETRY; retry_count++) {
+ /* send to proxy */
+ ret = gpm_send_buffer(gpmctx, send_buffer, send_length);
+
+ if (ret == 0) {
+ /* No error, continue to recv */
+ } else if (ret == ETIMEDOUT) {
+ /* Close and reopen socket before trying again */
+ ret = gpm_retry_socket(gpmctx);
+ if (ret != 0)
+ return ret;
+ ret = ETIMEDOUT;
+
+ /* RETRY entire send */
+ continue;
+ } else {
+ /* Other error */
+ return ret;
+ }
+
+ /* receive answer */
+ ret = gpm_recv_buffer(gpmctx, recv_buffer, recv_length);
+ if (ret == 0) {
+ /* No error */
+ break;
+ } else if (ret == ETIMEDOUT) {
+ /* Close and reopen socket before trying again */
+ ret = gpm_retry_socket(gpmctx);
+
+ /* Free buffer and set it to NULL to prevent free(xdr_reply_ctx) */
+ free(recv_buffer);
+ recv_buffer = NULL;
+
+ if (ret != 0)
+ return ret;
+ ret = ETIMEDOUT;
+ } else {
+ /* Other error */
+ return ret;
+ }
+ }
+
+ return ret;
+}
+
OM_uint32 gpm_release_buffer(OM_uint32 *minor_status,
gss_buffer_t buffer)
{
@@ -402,15 +661,20 @@ int gpm_make_call(int proc, union gp_rpc_arg *arg, union gp_rpc_res *res)
gp_rpc_msg msg;
XDR xdr_call_ctx;
XDR xdr_reply_ctx;
- char buffer[MAX_RPC_SIZE];
- uint32_t length;
+ char *send_buffer = NULL;
+ char *recv_buffer = NULL;
+ uint32_t send_length;
+ uint32_t recv_length;
uint32_t xid;
bool xdrok;
bool sockgrab = false;
int ret;
- xdrmem_create(&xdr_call_ctx, buffer, MAX_RPC_SIZE, XDR_ENCODE);
- xdrmem_create(&xdr_reply_ctx, buffer, MAX_RPC_SIZE, XDR_DECODE);
+ send_buffer = malloc(MAX_RPC_SIZE);
+ if (send_buffer == NULL)
+ return ENOMEM;
+
+ xdrmem_create(&xdr_call_ctx, send_buffer, MAX_RPC_SIZE, XDR_ENCODE);
memset(&msg, 0, sizeof(gp_rpc_msg));
msg.header.type = GP_RPC_CALL;
@@ -453,22 +717,22 @@ int gpm_make_call(int proc, union gp_rpc_arg *arg, union gp_rpc_res *res)
goto done;
}
- /* send to proxy */
- ret = gpm_send_buffer(gpmctx, buffer, xdr_getpos(&xdr_call_ctx));
- if (ret) {
- goto done;
- }
+ /* set send_length */
+ send_length = xdr_getpos(&xdr_call_ctx);
- /* receive answer */
- ret = gpm_recv_buffer(gpmctx, buffer, &length);
- if (ret) {
+ /* Send request, receive response with timeout */
+ ret = gpm_send_recv_loop(gpmctx, send_buffer, send_length, &recv_buffer,
+ &recv_length);
+ if (ret)
goto done;
- }
/* release the lock */
gpm_release_sock(gpmctx);
sockgrab = false;
+ /* Create the reply context */
+ xdrmem_create(&xdr_reply_ctx, recv_buffer, recv_length, XDR_DECODE);
+
/* decode header */
memset(&msg, 0, sizeof(gp_rpc_msg));
xdrok = xdr_gp_rpc_msg(&xdr_reply_ctx, &msg);
@@ -492,12 +756,21 @@ int gpm_make_call(int proc, union gp_rpc_arg *arg, union gp_rpc_res *res)
}
done:
+ gpm_timer_close(gpmctx);
+ gpm_epoll_close(gpmctx);
+
if (sockgrab) {
gpm_release_sock(gpmctx);
}
xdr_free((xdrproc_t)xdr_gp_rpc_msg, (char *)&msg);
xdr_destroy(&xdr_call_ctx);
- xdr_destroy(&xdr_reply_ctx);
+
+ if (recv_buffer != NULL)
+ xdr_destroy(&xdr_reply_ctx);
+
+ free(send_buffer);
+ free(recv_buffer);
+
return ret;
}

View File

@ -1,6 +1,6 @@
Name: gssproxy
Version: 0.7.0
Release: 13%{?dist}
Release: 14%{?dist}
Summary: GSSAPI Proxy
Group: System Environment/Libraries
@ -25,6 +25,8 @@ Patch7: Include-header-for-writev.patch
Patch8: Make-proc-file-failure-loud-but-nonfatal.patch
Patch9: Tolerate-NULL-pointers-in-gp_same.patch
Patch10: Add-Client-ID-to-debug-messages.patch
Patch11: client-Switch-to-non-blocking-sockets.patch
Patch12: server-Add-detailed-request-logging.patch
### Dependencies ###
Requires: krb5-libs >= 1.12.0
@ -117,6 +119,9 @@ rm -rf %{buildroot}
%systemd_postun_with_restart gssproxy.service
%changelog
* Tue Aug 22 2017 Robbie Harwood <rharwood@redhat.com> - 0.7.0-14
- Non-blocking IO + Extended request debug logging
* Sun Aug 20 2017 Ville Skyttä <ville.skytta@iki.fi> - 0.7.0-13
- Own the %%{_libdir}/gssproxy dir
- Mark COPYING as %%license

View File

@ -0,0 +1,123 @@
From fa32378a02d8d8f95e1d3942c7cfc151e0018d4a Mon Sep 17 00:00:00 2001
From: Alexander Scheel <ascheel@redhat.com>
Date: Fri, 4 Aug 2017 16:09:20 -0400
Subject: [PATCH] [server] Add detailed request logging
Add request logging to track requests through gssproxy. Requests are
logged as they are read, processed, handled, and replies sent. These
are identified by buffer memory address and size.
Signed-off-by: Alexander Scheel <ascheel@redhat.com>
Reviewed-by: Simo Sorce <simo@redhat.com>
[rharwood@redhat.com: commit message cleanups, rebase]
Reviewed-by: Robbie Harwood <rharwood@redhat.com>
Merges: #205
(cherry picked from commit 4097dafad3f276c3cf7b1255fe0540e16d59ae03)
---
proxy/src/gp_rpc_process.c | 6 ++++++
proxy/src/gp_socket.c | 12 ++++++++++++
proxy/src/gp_workers.c | 5 +++++
3 files changed, 23 insertions(+)
diff --git a/proxy/src/gp_rpc_process.c b/proxy/src/gp_rpc_process.c
index 0ea17f0..eaffc55 100644
--- a/proxy/src/gp_rpc_process.c
+++ b/proxy/src/gp_rpc_process.c
@@ -372,9 +372,12 @@ int gp_rpc_process_call(struct gp_call_ctx *gpcall,
xdrmem_create(&xdr_reply_ctx, reply_buffer, MAX_RPC_SIZE, XDR_ENCODE);
/* decode request */
+ GPDEBUGN(3, "[status] Processing request [%p (%zu)]\n", inbuf, inlen);
ret = gp_rpc_decode_call(&xdr_call_ctx, &xid, &proc, &arg, &acc, &rej);
if (!ret) {
/* execute request */
+ GPDEBUGN(3, "[status] Executing request %d (%s) from [%p (%zu)]\n",
+ proc, gp_rpc_procname(proc), inbuf, inlen);
ret = gp_rpc_execute(gpcall, proc, &arg, &res);
if (ret) {
acc = GP_RPC_SYSTEM_ERR;
@@ -388,6 +391,9 @@ int gp_rpc_process_call(struct gp_call_ctx *gpcall,
/* return encoded buffer */
ret = gp_rpc_return_buffer(&xdr_reply_ctx,
reply_buffer, outbuf, outlen);
+ GPDEBUGN(3, "[status] Returned buffer %d (%s) from [%p (%zu)]: "
+ "[%p (%zu)]\n", proc, gp_rpc_procname(proc), inbuf, inlen,
+ *outbuf, *outlen);
}
/* free resources */
gp_rpc_free_xdrs(proc, &arg, &res);
diff --git a/proxy/src/gp_socket.c b/proxy/src/gp_socket.c
index 133db9c..1974a28 100644
--- a/proxy/src/gp_socket.c
+++ b/proxy/src/gp_socket.c
@@ -441,6 +441,8 @@ void gp_socket_send_data(verto_ctx *vctx, struct gp_conn *conn,
wbuf = calloc(1, sizeof(struct gp_buffer));
if (!wbuf) {
+ GPDEBUGN(3, "[status] OOM in gp_socket_send_data: %p (%zu)\n",
+ buffer, buflen);
/* too bad, must kill the client connection now */
gp_conn_free(conn);
return;
@@ -467,6 +469,8 @@ static void gp_socket_write(verto_ctx *vctx, verto_ev *ev)
vecs = 0;
+ GPDEBUGN(3, "[status] Sending data: %p (%zu)\n", wbuf->data, wbuf->size);
+
if (wbuf->pos == 0) {
/* first write, send the buffer size as packet header */
size = wbuf->size | FRAGMENT_BIT;
@@ -489,6 +493,9 @@ static void gp_socket_write(verto_ctx *vctx, verto_ev *ev)
gp_socket_schedule_write(vctx, wbuf);
} else {
/* error on socket, close and release it */
+ GPDEBUGN(3, "[status] Error %d in gp_socket_write on writing for "
+ "[%p (%zu:%zu)]\n", errno, wbuf->data, wbuf->pos,
+ wbuf->size);
gp_conn_free(wbuf->conn);
gp_buffer_free(wbuf);
}
@@ -498,6 +505,8 @@ static void gp_socket_write(verto_ctx *vctx, verto_ev *ev)
if (wn < sizeof(size)) {
/* don't bother trying to handle sockets that can't
* buffer even 4 bytes */
+ GPDEBUGN(3, "[status] Sending data [%p (%zu)]: failed with short "
+ "write of %d\n", wbuf->data, wbuf->size, wn);
gp_conn_free(wbuf->conn);
gp_buffer_free(wbuf);
return;
@@ -505,6 +514,9 @@ static void gp_socket_write(verto_ctx *vctx, verto_ev *ev)
wn -= sizeof(size);
}
+ GPDEBUGN(3, "[status] Sending data [%p (%zu)]: successful write of %d\n",
+ wbuf->data, wbuf->size, wn);
+
wbuf->pos += wn;
if (wbuf->size > wbuf->pos) {
/* short write, reschedule */
diff --git a/proxy/src/gp_workers.c b/proxy/src/gp_workers.c
index d37e57c..2a33c21 100644
--- a/proxy/src/gp_workers.c
+++ b/proxy/src/gp_workers.c
@@ -319,6 +319,7 @@ static void gp_handle_reply(verto_ctx *vctx, verto_ev *ev)
break;
case GP_QUERY_OUT:
+ GPDEBUGN(3, "[status] Handling query reply: %p (%zu)\n", q->buffer, q->buflen);
gp_socket_send_data(vctx, q->conn, q->buffer, q->buflen);
gp_query_free(q, false);
break;
@@ -381,7 +382,11 @@ static void *gp_worker_main(void *pvt)
gp_debug_set_conn_id(gp_conn_get_cid(q->conn));
/* handle the client request */
+ GPDEBUGN(3, "[status] Handling query input: %p (%zu)\n", q->buffer,
+ q->buflen);
gp_handle_query(t->pool, q);
+ GPDEBUGN(3 ,"[status] Handling query output: %p (%zu)\n", q->buffer,
+ q->buflen);
/* now get lock on main queue, to play with the reply list */
/* ======> POOL LOCK */