300 lines
8.5 KiB
Diff
300 lines
8.5 KiB
Diff
From 35f1bac5ff9ec694e64b65e51f0e7a3226aa3aaf Mon Sep 17 00:00:00 2001
|
|
From: Carlos Garcia Campos <cgarcia@igalia.com>
|
|
Date: Wed, 28 Aug 2019 10:51:18 +0200
|
|
Subject: [PATCH] WebSockets: only poll IO stream when needed
|
|
|
|
Instead of having two pollable sources constantly running, always try to
|
|
read/write without blocking and start polling if the operation returns
|
|
G_IO_ERROR_WOULD_BLOCK. This patch also fixes test
|
|
/websocket/direct/close-after-close that was passing but not actually
|
|
testing what we wanted, because the client close was never sent. When
|
|
the mutex is released, the frame has been queued, but not sent.
|
|
|
|
diff --git libsoup/soup-websocket-connection.c libsoup/soup-websocket-connection.c
|
|
index 345040fe..6afbbe67 100644
|
|
--- a/libsoup/soup-websocket-connection.c
|
|
+++ b/libsoup/soup-websocket-connection.c
|
|
@@ -147,6 +147,7 @@
|
|
};
|
|
|
|
#define MAX_INCOMING_PAYLOAD_SIZE_DEFAULT 128 * 1024
|
|
+#define READ_BUFFER_SIZE 1024
|
|
|
|
G_DEFINE_TYPE_WITH_PRIVATE (SoupWebsocketConnection, soup_websocket_connection, G_TYPE_OBJECT)
|
|
|
|
@@ -155,6 +156,11 @@
|
|
|
|
static void protocol_error_and_close (SoupWebsocketConnection *self);
|
|
|
|
+static gboolean on_web_socket_input (GObject *pollable_stream,
|
|
+ gpointer user_data);
|
|
+static gboolean on_web_socket_output (GObject *pollable_stream,
|
|
+ gpointer user_data);
|
|
+
|
|
/* Code below is based on g_utf8_validate() implementation,
|
|
* but handling NULL characters as valid, as expected by
|
|
* WebSockets and compliant with RFC 3629.
|
|
@@ -283,7 +289,20 @@
|
|
}
|
|
|
|
static void
|
|
-stop_input (SoupWebsocketConnection *self)
|
|
+soup_websocket_connection_start_input_source (SoupWebsocketConnection *self)
|
|
+{
|
|
+ SoupWebsocketConnectionPrivate *pv = self->pv;
|
|
+
|
|
+ if (pv->input_source)
|
|
+ return;
|
|
+
|
|
+ pv->input_source = g_pollable_input_stream_create_source (pv->input, NULL);
|
|
+ g_source_set_callback (pv->input_source, (GSourceFunc)on_web_socket_input, self, NULL);
|
|
+ g_source_attach (pv->input_source, pv->main_context);
|
|
+}
|
|
+
|
|
+static void
|
|
+soup_websocket_connection_stop_input_source (SoupWebsocketConnection *self)
|
|
{
|
|
SoupWebsocketConnectionPrivate *pv = self->pv;
|
|
|
|
@@ -296,7 +315,20 @@
|
|
}
|
|
|
|
static void
|
|
-stop_output (SoupWebsocketConnection *self)
|
|
+soup_websocket_connection_start_output_source (SoupWebsocketConnection *self)
|
|
+{
|
|
+ SoupWebsocketConnectionPrivate *pv = self->pv;
|
|
+
|
|
+ if (pv->output_source)
|
|
+ return;
|
|
+
|
|
+ pv->output_source = g_pollable_output_stream_create_source (pv->output, NULL);
|
|
+ g_source_set_callback (pv->output_source, (GSourceFunc)on_web_socket_output, self, NULL);
|
|
+ g_source_attach (pv->output_source, pv->main_context);
|
|
+}
|
|
+
|
|
+static void
|
|
+soup_websocket_connection_stop_output_source (SoupWebsocketConnection *self)
|
|
{
|
|
SoupWebsocketConnectionPrivate *pv = self->pv;
|
|
|
|
@@ -341,8 +373,8 @@
|
|
close_io_stop_timeout (self);
|
|
|
|
if (!pv->io_closing) {
|
|
- stop_input (self);
|
|
- stop_output (self);
|
|
+ soup_websocket_connection_stop_input_source (self);
|
|
+ soup_websocket_connection_stop_output_source (self);
|
|
pv->io_closing = TRUE;
|
|
g_debug ("closing io stream");
|
|
g_io_stream_close_async (pv->io_stream, G_PRIORITY_DEFAULT,
|
|
@@ -359,7 +391,7 @@
|
|
GSocket *socket;
|
|
GError *error = NULL;
|
|
|
|
- stop_output (self);
|
|
+ soup_websocket_connection_stop_output_source (self);
|
|
|
|
if (G_IS_SOCKET_CONNECTION (pv->io_stream)) {
|
|
socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (pv->io_stream));
|
|
@@ -612,9 +644,6 @@
|
|
self->pv->connection_type == SOUP_WEBSOCKET_CONNECTION_SERVER ? "server" : "client",
|
|
payload_len, self->pv->max_incoming_payload_size);
|
|
emit_error_and_close (self, error, TRUE);
|
|
-
|
|
- /* The input is in an invalid state now */
|
|
- stop_input (self);
|
|
}
|
|
|
|
static void
|
|
@@ -981,32 +1010,31 @@
|
|
;
|
|
}
|
|
|
|
-static gboolean
|
|
-on_web_socket_input (GObject *pollable_stream,
|
|
- gpointer user_data)
|
|
+static void
|
|
+soup_websocket_connection_read (SoupWebsocketConnection *self)
|
|
{
|
|
- SoupWebsocketConnection *self = SOUP_WEBSOCKET_CONNECTION (user_data);
|
|
SoupWebsocketConnectionPrivate *pv = self->pv;
|
|
GError *error = NULL;
|
|
gboolean end = FALSE;
|
|
gssize count;
|
|
gsize len;
|
|
|
|
+ soup_websocket_connection_stop_input_source (self);
|
|
+
|
|
do {
|
|
len = pv->incoming->len;
|
|
- g_byte_array_set_size (pv->incoming, len + 1024);
|
|
+ g_byte_array_set_size (pv->incoming, len + READ_BUFFER_SIZE);
|
|
|
|
count = g_pollable_input_stream_read_nonblocking (pv->input,
|
|
pv->incoming->data + len,
|
|
- 1024, NULL, &error);
|
|
-
|
|
+ READ_BUFFER_SIZE, NULL, &error);
|
|
if (count < 0) {
|
|
if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
|
|
g_error_free (error);
|
|
count = 0;
|
|
} else {
|
|
emit_error_and_close (self, error, TRUE);
|
|
- return TRUE;
|
|
+ return;
|
|
}
|
|
} else if (count == 0) {
|
|
end = TRUE;
|
|
@@ -1026,16 +1054,24 @@
|
|
}
|
|
|
|
close_io_stream (self);
|
|
+ return;
|
|
}
|
|
|
|
- return TRUE;
|
|
+ soup_websocket_connection_start_input_source (self);
|
|
}
|
|
|
|
static gboolean
|
|
-on_web_socket_output (GObject *pollable_stream,
|
|
- gpointer user_data)
|
|
+on_web_socket_input (GObject *pollable_stream,
|
|
+ gpointer user_data)
|
|
+{
|
|
+ soup_websocket_connection_read (SOUP_WEBSOCKET_CONNECTION (user_data));
|
|
+
|
|
+ return G_SOURCE_REMOVE;
|
|
+}
|
|
+
|
|
+static void
|
|
+soup_websocket_connection_write (SoupWebsocketConnection *self)
|
|
{
|
|
- SoupWebsocketConnection *self = SOUP_WEBSOCKET_CONNECTION (user_data);
|
|
SoupWebsocketConnectionPrivate *pv = self->pv;
|
|
const guint8 *data;
|
|
GError *error = NULL;
|
|
@@ -1043,19 +1079,18 @@
|
|
gssize count;
|
|
gsize len;
|
|
|
|
+ soup_websocket_connection_stop_output_source (self);
|
|
+
|
|
if (soup_websocket_connection_get_state (self) == SOUP_WEBSOCKET_STATE_CLOSED) {
|
|
g_debug ("Ignoring message since the connection is closed");
|
|
- stop_output (self);
|
|
- return TRUE;
|
|
+ return;
|
|
}
|
|
|
|
frame = g_queue_peek_head (&pv->outgoing);
|
|
|
|
/* No more frames to send */
|
|
- if (frame == NULL) {
|
|
- stop_output (self);
|
|
- return TRUE;
|
|
- }
|
|
+ if (frame == NULL)
|
|
+ return;
|
|
|
|
data = g_bytes_get_data (frame->data, &len);
|
|
g_assert (len > 0);
|
|
@@ -1075,7 +1110,7 @@
|
|
frame->pending = TRUE;
|
|
} else {
|
|
emit_error_and_close (self, error, TRUE);
|
|
- return FALSE;
|
|
+ return;
|
|
}
|
|
}
|
|
|
|
@@ -1093,23 +1128,21 @@
|
|
}
|
|
}
|
|
frame_free (frame);
|
|
+
|
|
+ if (g_queue_is_empty (&pv->outgoing))
|
|
+ return;
|
|
}
|
|
|
|
- return TRUE;
|
|
+ soup_websocket_connection_start_output_source (self);
|
|
}
|
|
|
|
-static void
|
|
-start_output (SoupWebsocketConnection *self)
|
|
+static gboolean
|
|
+on_web_socket_output (GObject *pollable_stream,
|
|
+ gpointer user_data)
|
|
{
|
|
- SoupWebsocketConnectionPrivate *pv = self->pv;
|
|
-
|
|
- if (pv->output_source)
|
|
- return;
|
|
+ soup_websocket_connection_write (SOUP_WEBSOCKET_CONNECTION (user_data));
|
|
|
|
- g_debug ("starting output source");
|
|
- pv->output_source = g_pollable_output_stream_create_source (pv->output, NULL);
|
|
- g_source_set_callback (pv->output_source, (GSourceFunc)on_web_socket_output, self, NULL);
|
|
- g_source_attach (pv->output_source, pv->main_context);
|
|
+ return G_SOURCE_REMOVE;
|
|
}
|
|
|
|
static void
|
|
@@ -1150,7 +1183,7 @@
|
|
g_queue_push_tail (&pv->outgoing, frame);
|
|
}
|
|
|
|
- start_output (self);
|
|
+ soup_websocket_connection_write (self);
|
|
}
|
|
|
|
static void
|
|
@@ -1175,9 +1208,7 @@
|
|
pv->output = G_POLLABLE_OUTPUT_STREAM (os);
|
|
g_return_if_fail (g_pollable_output_stream_can_poll (pv->output));
|
|
|
|
- pv->input_source = g_pollable_input_stream_create_source (pv->input, NULL);
|
|
- g_source_set_callback (pv->input_source, (GSourceFunc)on_web_socket_input, self, NULL);
|
|
- g_source_attach (pv->input_source, pv->main_context);
|
|
+ soup_websocket_connection_start_input_source (self);
|
|
}
|
|
|
|
static void
|
|
diff --git tests/websocket-test.c tests/websocket-test.c
|
|
index 146fdf82..26d064df 100644
|
|
--- a/tests/websocket-test.c
|
|
+++ b/tests/websocket-test.c
|
|
@@ -733,6 +733,7 @@
|
|
const char frames[] =
|
|
"\x88\x09\x03\xe8""reason1"
|
|
"\x88\x09\x03\xe8""reason2";
|
|
+ GSocket *socket;
|
|
GError *error = NULL;
|
|
|
|
g_mutex_lock (&test->mutex);
|
|
@@ -742,7 +743,8 @@
|
|
frames, sizeof (frames) -1, &written, NULL, &error);
|
|
g_assert_no_error (error);
|
|
g_assert_cmpuint (written, ==, sizeof (frames) - 1);
|
|
- g_io_stream_close (test->raw_server, NULL, &error);
|
|
+ socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (test->raw_server));
|
|
+ g_socket_shutdown (socket, FALSE, TRUE, &error);
|
|
g_assert_no_error (error);
|
|
|
|
return NULL;
|
|
@@ -766,6 +768,7 @@
|
|
WAIT_UNTIL (soup_websocket_connection_get_state (test->client) == SOUP_WEBSOCKET_STATE_CLOSED);
|
|
g_assert_cmpuint (soup_websocket_connection_get_close_code (test->client), ==, SOUP_WEBSOCKET_CLOSE_NORMAL);
|
|
g_assert_cmpstr (soup_websocket_connection_get_close_data (test->client), ==, "reason1");
|
|
+ g_io_stream_close (test->raw_server, NULL, NULL);
|
|
}
|
|
|
|
static gpointer
|
|
--
|
|
2.26.2
|
|
|