From d0e4238471c7bcb0757a877622fea593e8906fcc Mon Sep 17 00:00:00 2001 From: "Richard W.M. Jones" Date: Thu, 27 Jul 2023 17:08:25 +0100 Subject: [PATCH] curl: Use curl multi interface See the comment at the top of plugins/curl/pool.c for general information about how this works. This makes a very large difference to performance over the previous implementation. Note for the tests below I also applied the next commit changing the behaviour of the connections parameter. Using this test case: $ http=https://cloud-images.ubuntu.com/lunar/current/lunar-server-cloudimg-amd64.img $ nbdkit -r -U - curl $http ipresolve=v4 --run 'nbdcopy -p $uri null' The times are as follows: multi, connections=64 17.2s multi, connections=32 21.7s wget 28.4s multi, connections=16 41.3s before this commit 180s (cherry picked from commit a74b289ee15a7c75dceb8a96403f1fa8ebd72e88) --- plugins/curl/config.c | 246 -------------------------- plugins/curl/curl.c | 366 +++++++++++++++++++++++++++++++++----- plugins/curl/curldefs.h | 38 ++-- plugins/curl/pool.c | 381 ++++++++++++++++++++++++++++++++-------- 4 files changed, 652 insertions(+), 379 deletions(-) diff --git a/plugins/curl/config.c b/plugins/curl/config.c index 276c79d5..ce82d5f9 100644 --- a/plugins/curl/config.c +++ b/plugins/curl/config.c @@ -48,8 +48,6 @@ #include -#include "ascii-ctype.h" -#include "ascii-string.h" #include "cleanup.h" #include "curldefs.h" @@ -89,12 +87,6 @@ static const char *user_agent = NULL; static int debug_cb (CURL *handle, curl_infotype type, const char *data, size_t size, void *); -static size_t write_cb (char *ptr, size_t size, size_t nmemb, void *opaque); -static size_t read_cb (void *ptr, size_t size, size_t nmemb, void *opaque); -static int get_content_length_accept_range (struct curl_handle *ch); -static bool try_fallback_GET_method (struct curl_handle *ch); -static size_t header_cb (void *ptr, size_t size, size_t nmemb, void *opaque); -static size_t error_cb (char *ptr, size_t size, size_t nmemb, void *opaque); /* Use '-D curl.verbose=1' to set. */ NBDKIT_DLL_PUBLIC int curl_debug_verbose = 0; @@ -671,17 +663,9 @@ allocate_handle (void) if (user_agent) curl_easy_setopt (ch->c, CURLOPT_USERAGENT, user_agent); - if (get_content_length_accept_range (ch) == -1) - goto err; - /* Get set up for reading and writing. */ curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, NULL); curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, NULL); - curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, write_cb); - curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch); - /* These are only used if !readonly but we always register them. */ - curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, read_cb); - curl_easy_setopt (ch->c, CURLOPT_READDATA, ch); return ch; @@ -771,233 +755,3 @@ debug_cb (CURL *handle, curl_infotype type, out: return 0; } - -/* NB: The terminology used by libcurl is confusing! - * - * WRITEFUNCTION / write_cb is used when reading from the remote server - * READFUNCTION / read_cb is used when writing to the remote server. - * - * We use the same terminology as libcurl here. - */ - -static size_t -write_cb (char *ptr, size_t size, size_t nmemb, void *opaque) -{ - struct curl_handle *ch = opaque; - size_t orig_realsize = size * nmemb; - size_t realsize = orig_realsize; - - assert (ch->write_buf); - - /* Don't read more than the requested amount of data, even if the - * server or libcurl sends more. - */ - if (realsize > ch->write_count) - realsize = ch->write_count; - - memcpy (ch->write_buf, ptr, realsize); - - ch->write_count -= realsize; - ch->write_buf += realsize; - - return orig_realsize; -} - -static size_t -read_cb (void *ptr, size_t size, size_t nmemb, void *opaque) -{ - struct curl_handle *ch = opaque; - size_t realsize = size * nmemb; - - assert (ch->read_buf); - if (realsize > ch->read_count) - realsize = ch->read_count; - - memcpy (ptr, ch->read_buf, realsize); - - ch->read_count -= realsize; - ch->read_buf += realsize; - - return realsize; -} - -/* Get the file size and also whether the remote HTTP server - * supports byte ranges. - */ -static int -get_content_length_accept_range (struct curl_handle *ch) -{ - CURLcode r; - long code; -#ifdef HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T - curl_off_t o; -#else - double d; -#endif - - /* We must run the scripts if necessary and set headers in the - * handle. - */ - if (do_scripts (ch) == -1) - return -1; - - /* Set this flag in the handle to false. The callback should set it - * to true if byte ranges are supported, which we check below. - */ - ch->accept_range = false; - - /* No Body, not nobody! This forces a HEAD request. */ - curl_easy_setopt (ch->c, CURLOPT_NOBODY, 1L); - curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb); - curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch); - r = curl_easy_perform (ch->c); - update_times (ch->c); - if (r != CURLE_OK) { - display_curl_error (ch, r, - "problem doing HEAD request to fetch size of URL [%s]", - url); - - /* Get the HTTP status code, if available. */ - r = curl_easy_getinfo (ch->c, CURLINFO_RESPONSE_CODE, &code); - if (r == CURLE_OK) - nbdkit_debug ("HTTP status code: %ld", code); - else - code = -1; - - /* See comment on try_fallback_GET_method below. */ - if (code != 403 || !try_fallback_GET_method (ch)) - return -1; - } - - /* Get the content length. - * - * Note there is some subtlety here: For web servers using chunked - * encoding, either the Content-Length header will not be present, - * or if present it should be ignored. (For such servers the only - * way to find out the true length would be to read all of the - * content, which we don't want to do). - * - * Curl itself resolves this for us. It will ignore the - * Content-Length header if chunked encoding is used, returning the - * length as -1 which we check below (see also - * curl:lib/http.c:Curl_http_size). - */ -#ifdef HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T - r = curl_easy_getinfo (ch->c, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &o); - if (r != CURLE_OK) { - display_curl_error (ch, r, - "could not get length of remote file [%s]", url); - return -1; - } - - if (o == -1) { - nbdkit_error ("could not get length of remote file [%s], " - "is the URL correct?", url); - return -1; - } - - ch->exportsize = o; -#else - r = curl_easy_getinfo (ch->c, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &d); - if (r != CURLE_OK) { - display_curl_error (ch, r, - "could not get length of remote file [%s]", url); - return -1; - } - - if (d == -1) { - nbdkit_error ("could not get length of remote file [%s], " - "is the URL correct?", url); - return -1; - } - - ch->exportsize = d; -#endif - nbdkit_debug ("content length: %" PRIi64, ch->exportsize); - - /* If this is HTTP, check that byte ranges are supported. */ - if (ascii_strncasecmp (url, "http://", strlen ("http://")) == 0 || - ascii_strncasecmp (url, "https://", strlen ("https://")) == 0) { - if (!ch->accept_range) { - nbdkit_error ("server does not support 'range' (byte range) requests"); - return -1; - } - - nbdkit_debug ("accept range supported (for HTTP/HTTPS)"); - } - - return 0; -} - -/* S3 servers can return 403 Forbidden for HEAD but still respond - * to GET, so we give it a second chance in that case. - * https://github.com/kubevirt/containerized-data-importer/issues/2737 - * - * This function issues a GET request with a writefunction that always - * returns an error, thus effectively getting the headers but - * abandoning the transfer as soon as possible after. - */ -static bool -try_fallback_GET_method (struct curl_handle *ch) -{ - CURLcode r; - - nbdkit_debug ("attempting to fetch headers using GET method"); - - curl_easy_setopt (ch->c, CURLOPT_HTTPGET, 1L); - curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb); - curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch); - curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, error_cb); - curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch); - r = curl_easy_perform (ch->c); - update_times (ch->c); - - /* We expect CURLE_WRITE_ERROR here, but CURLE_OK is possible too - * (eg if the remote has zero length). Other errors might happen - * but we ignore them since it is a fallback path. - */ - return r == CURLE_OK || r == CURLE_WRITE_ERROR; -} - -static size_t -header_cb (void *ptr, size_t size, size_t nmemb, void *opaque) -{ - struct curl_handle *ch = opaque; - size_t realsize = size * nmemb; - const char *header = ptr; - const char *end = header + realsize; - const char *accept_ranges = "accept-ranges:"; - const char *bytes = "bytes"; - - if (realsize >= strlen (accept_ranges) && - ascii_strncasecmp (header, accept_ranges, strlen (accept_ranges)) == 0) { - const char *p = strchr (header, ':') + 1; - - /* Skip whitespace between the header name and value. */ - while (p < end && *p && ascii_isspace (*p)) - p++; - - if (end - p >= strlen (bytes) - && strncmp (p, bytes, strlen (bytes)) == 0) { - /* Check that there is nothing but whitespace after the value. */ - p += strlen (bytes); - while (p < end && *p && ascii_isspace (*p)) - p++; - - if (p == end || !*p) - ch->accept_range = true; - } - } - - return realsize; -} - -static size_t -error_cb (char *ptr, size_t size, size_t nmemb, void *opaque) -{ -#ifdef CURL_WRITEFUNC_ERROR - return CURL_WRITEFUNC_ERROR; -#else - return 0; /* in older curl, any size < requested will also be an error */ -#endif -} diff --git a/plugins/curl/curl.c b/plugins/curl/curl.c index be42de36..28cc7bbe 100644 --- a/plugins/curl/curl.c +++ b/plugins/curl/curl.c @@ -48,7 +48,8 @@ #include -#include "cleanup.h" +#include "ascii-ctype.h" +#include "ascii-string.h" #include "curldefs.h" @@ -118,32 +119,6 @@ curl_close (void *handle) #define THREAD_MODEL NBDKIT_THREAD_MODEL_PARALLEL -/* Calls get_handle() ... put_handle() to get a handle for the length - * of the current scope. - */ -#define GET_HANDLE_FOR_CURRENT_SCOPE(ch) \ - CLEANUP_PUT_HANDLE struct curl_handle *ch = get_handle (); -#define CLEANUP_PUT_HANDLE __attribute__ ((cleanup (cleanup_put_handle))) -static void -cleanup_put_handle (void *chp) -{ - struct curl_handle *ch = * (struct curl_handle **) chp; - - if (ch != NULL) - put_handle (ch); -} - -/* Get the file size. */ -static int64_t -curl_get_size (void *handle) -{ - GET_HANDLE_FOR_CURRENT_SCOPE (ch); - if (ch == NULL) - return -1; - - return ch->exportsize; -} - /* Multi-conn is safe for read-only connections, but HTTP does not * have any concept of flushing so we cannot use it for read-write * connections. @@ -156,23 +131,253 @@ curl_can_multi_conn (void *handle) return !! h->readonly; } +/* Get the file size. */ +static int get_content_length_accept_range (struct curl_handle *ch); +static bool try_fallback_GET_method (struct curl_handle *ch); +static size_t header_cb (void *ptr, size_t size, size_t nmemb, void *opaque); +static size_t error_cb (char *ptr, size_t size, size_t nmemb, void *opaque); + +static int64_t +curl_get_size (void *handle) +{ + struct curl_handle *ch; + CURLcode r; + long code; +#ifdef HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T + curl_off_t o; +#else + double d; +#endif + int64_t exportsize; + + /* Get a curl easy handle. */ + ch = allocate_handle (); + if (ch == NULL) goto err; + + /* Prepare to read the headers. */ + if (get_content_length_accept_range (ch) == -1) + goto err; + + /* Send the command to the worker thread and wait. */ + struct command cmd = { + .type = EASY_HANDLE, + .ch = ch, + }; + + r = send_command_and_wait (&cmd); + update_times (ch->c); + if (r != CURLE_OK) { + display_curl_error (ch, r, + "problem doing HEAD request to fetch size of URL [%s]", + url); + + /* Get the HTTP status code, if available. */ + r = curl_easy_getinfo (ch->c, CURLINFO_RESPONSE_CODE, &code); + if (r == CURLE_OK) + nbdkit_debug ("HTTP status code: %ld", code); + else + code = -1; + + /* See comment on try_fallback_GET_method below. */ + if (code != 403 || !try_fallback_GET_method (ch)) + goto err; + } + + /* Get the content length. + * + * Note there is some subtlety here: For web servers using chunked + * encoding, either the Content-Length header will not be present, + * or if present it should be ignored. (For such servers the only + * way to find out the true length would be to read all of the + * content, which we don't want to do). + * + * Curl itself resolves this for us. It will ignore the + * Content-Length header if chunked encoding is used, returning the + * length as -1 which we check below (see also + * curl:lib/http.c:Curl_http_size). + */ +#ifdef HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T + r = curl_easy_getinfo (ch->c, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &o); + if (r != CURLE_OK) { + display_curl_error (ch, r, + "could not get length of remote file [%s]", url); + goto err; + } + + if (o == -1) { + nbdkit_error ("could not get length of remote file [%s], " + "is the URL correct?", url); + goto err; + } + + exportsize = o; +#else + r = curl_easy_getinfo (ch->c, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &d); + if (r != CURLE_OK) { + display_curl_error (ch, r, + "could not get length of remote file [%s]", url); + goto err; + } + + if (d == -1) { + nbdkit_error ("could not get length of remote file [%s], " + "is the URL correct?", url); + goto err; + } + + exportsize = d; +#endif + nbdkit_debug ("content length: %" PRIi64, exportsize); + + /* If this is HTTP, check that byte ranges are supported. */ + if (ascii_strncasecmp (url, "http://", strlen ("http://")) == 0 || + ascii_strncasecmp (url, "https://", strlen ("https://")) == 0) { + if (!ch->accept_range) { + nbdkit_error ("server does not support 'range' (byte range) requests"); + goto err; + } + + nbdkit_debug ("accept range supported (for HTTP/HTTPS)"); + } + + free_handle (ch); + return exportsize; + + err: + if (ch) + free_handle (ch); + return -1; +} + +/* Get the file size and also whether the remote HTTP server + * supports byte ranges. + */ +static int +get_content_length_accept_range (struct curl_handle *ch) +{ + /* We must run the scripts if necessary and set headers in the + * handle. + */ + if (do_scripts (ch) == -1) + return -1; + + /* Set this flag in the handle to false. The callback should set it + * to true if byte ranges are supported, which we check below. + */ + ch->accept_range = false; + + /* No Body, not nobody! This forces a HEAD request. */ + curl_easy_setopt (ch->c, CURLOPT_NOBODY, 1L); + curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb); + curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch); + curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, NULL); + curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, NULL); + curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, NULL); + curl_easy_setopt (ch->c, CURLOPT_READDATA, NULL); + return 0; +} + +/* S3 servers can return 403 Forbidden for HEAD but still respond + * to GET, so we give it a second chance in that case. + * https://github.com/kubevirt/containerized-data-importer/issues/2737 + * + * This function issues a GET request with a writefunction that always + * returns an error, thus effectively getting the headers but + * abandoning the transfer as soon as possible after. + */ +static bool +try_fallback_GET_method (struct curl_handle *ch) +{ + CURLcode r; + + nbdkit_debug ("attempting to fetch headers using GET method"); + + curl_easy_setopt (ch->c, CURLOPT_HTTPGET, 1L); + curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb); + curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch); + curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, error_cb); + curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch); + + struct command cmd = { + .type = EASY_HANDLE, + .ch = ch, + }; + + r = send_command_and_wait (&cmd); + update_times (ch->c); + + /* We expect CURLE_WRITE_ERROR here, but CURLE_OK is possible too + * (eg if the remote has zero length). Other errors might happen + * but we ignore them since it is a fallback path. + */ + return r == CURLE_OK || r == CURLE_WRITE_ERROR; +} + +static size_t +header_cb (void *ptr, size_t size, size_t nmemb, void *opaque) +{ + struct curl_handle *ch = opaque; + size_t realsize = size * nmemb; + const char *header = ptr; + const char *end = header + realsize; + const char *accept_ranges = "accept-ranges:"; + const char *bytes = "bytes"; + + if (realsize >= strlen (accept_ranges) && + ascii_strncasecmp (header, accept_ranges, strlen (accept_ranges)) == 0) { + const char *p = strchr (header, ':') + 1; + + /* Skip whitespace between the header name and value. */ + while (p < end && *p && ascii_isspace (*p)) + p++; + + if (end - p >= strlen (bytes) + && strncmp (p, bytes, strlen (bytes)) == 0) { + /* Check that there is nothing but whitespace after the value. */ + p += strlen (bytes); + while (p < end && *p && ascii_isspace (*p)) + p++; + + if (p == end || !*p) + ch->accept_range = true; + } + } + + return realsize; +} + +static size_t +error_cb (char *ptr, size_t size, size_t nmemb, void *opaque) +{ +#ifdef CURL_WRITEFUNC_ERROR + return CURL_WRITEFUNC_ERROR; +#else + return 0; /* in older curl, any size < requested will also be an error */ +#endif +} + /* Read data from the remote server. */ +static size_t write_cb (char *ptr, size_t size, size_t nmemb, void *opaque); + static int curl_pread (void *handle, void *buf, uint32_t count, uint64_t offset) { CURLcode r; + struct curl_handle *ch; char range[128]; - GET_HANDLE_FOR_CURRENT_SCOPE (ch); - if (ch == NULL) - return -1; + /* Get a curl easy handle. */ + ch = allocate_handle (); + if (ch == NULL) goto err; /* Run the scripts if necessary and set headers in the handle. */ - if (do_scripts (ch) == -1) return -1; + if (do_scripts (ch) == -1) goto err; /* Tell the write_cb where we want the data to be written. write_cb * will update this if the data comes in multiple sections. */ + curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, write_cb); + curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch); ch->write_buf = buf; ch->write_count = count; @@ -183,11 +388,16 @@ curl_pread (void *handle, void *buf, uint32_t count, uint64_t offset) offset, offset + count); curl_easy_setopt (ch->c, CURLOPT_RANGE, range); - /* The assumption here is that curl will look after timeouts. */ - r = curl_easy_perform (ch->c); + /* Send the command to the worker thread and wait. */ + struct command cmd = { + .type = EASY_HANDLE, + .ch = ch, + }; + + r = send_command_and_wait (&cmd); if (r != CURLE_OK) { - display_curl_error (ch, r, "pread: curl_easy_perform"); - return -1; + display_curl_error (ch, r, "pread"); + goto err; } update_times (ch->c); @@ -198,26 +408,67 @@ curl_pread (void *handle, void *buf, uint32_t count, uint64_t offset) /* As far as I understand the cURL API, this should never happen. */ assert (ch->write_count == 0); + free_handle (ch); return 0; + + err: + if (ch) + free_handle (ch); + return -1; +} + +/* NB: The terminology used by libcurl is confusing! + * + * WRITEFUNCTION / write_cb is used when reading from the remote server + * READFUNCTION / read_cb is used when writing to the remote server. + * + * We use the same terminology as libcurl here. + */ +static size_t +write_cb (char *ptr, size_t size, size_t nmemb, void *opaque) +{ + struct curl_handle *ch = opaque; + size_t orig_realsize = size * nmemb; + size_t realsize = orig_realsize; + + assert (ch->write_buf); + + /* Don't read more than the requested amount of data, even if the + * server or libcurl sends more. + */ + if (realsize > ch->write_count) + realsize = ch->write_count; + + memcpy (ch->write_buf, ptr, realsize); + + ch->write_count -= realsize; + ch->write_buf += realsize; + + return orig_realsize; } /* Write data to the remote server. */ +static size_t read_cb (void *ptr, size_t size, size_t nmemb, void *opaque); + static int curl_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset) { CURLcode r; + struct curl_handle *ch; char range[128]; - GET_HANDLE_FOR_CURRENT_SCOPE (ch); - if (ch == NULL) - return -1; + /* Get a curl easy handle. */ + ch = allocate_handle (); + if (ch == NULL) goto err; /* Run the scripts if necessary and set headers in the handle. */ - if (do_scripts (ch) == -1) return -1; + if (do_scripts (ch) == -1) goto err; /* Tell the read_cb where we want the data to be read from. read_cb * will update this if the data comes in multiple sections. */ + curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, read_cb); + curl_easy_setopt (ch->c, CURLOPT_READDATA, ch); ch->read_buf = buf; ch->read_count = count; @@ -228,11 +479,16 @@ curl_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset) offset, offset + count); curl_easy_setopt (ch->c, CURLOPT_RANGE, range); - /* The assumption here is that curl will look after timeouts. */ - r = curl_easy_perform (ch->c); + /* Send the command to the worker thread and wait. */ + struct command cmd = { + .type = EASY_HANDLE, + .ch = ch, + }; + + r = send_command_and_wait (&cmd); if (r != CURLE_OK) { - display_curl_error (ch, r, "pwrite: curl_easy_perform"); - return -1; + display_curl_error (ch, r, "pwrite"); + goto err; } update_times (ch->c); @@ -243,7 +499,31 @@ curl_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset) /* As far as I understand the cURL API, this should never happen. */ assert (ch->read_count == 0); + free_handle (ch); return 0; + + err: + if (ch) + free_handle (ch); + return -1; +} + +static size_t +read_cb (void *ptr, size_t size, size_t nmemb, void *opaque) +{ + struct curl_handle *ch = opaque; + size_t realsize = size * nmemb; + + assert (ch->read_buf); + if (realsize > ch->read_count) + realsize = ch->read_count; + + memcpy (ptr, ch->read_buf, realsize); + + ch->read_count -= realsize; + ch->read_buf += realsize; + + return realsize; } static struct nbdkit_plugin plugin = { diff --git a/plugins/curl/curldefs.h b/plugins/curl/curldefs.h index 939c8d37..6b158d85 100644 --- a/plugins/curl/curldefs.h +++ b/plugins/curl/curldefs.h @@ -62,6 +62,9 @@ #define HAVE_CURLINFO_TOTAL_TIME_T #define HAVE_CURLINFO_REDIRECT_TIME_T #endif +#if CURL_AT_LEAST_VERSION (7, 66, 0) +#define HAVE_CURL_MULTI_POLL +#endif #if CURL_AT_LEAST_VERSION (8, 2, 0) #define HAVE_CURLINFO_CONN_ID #define HAVE_CURLINFO_XFER_ID @@ -89,16 +92,6 @@ struct curl_handle { /* The underlying curl handle. */ CURL *c; - /* Index of this handle in the pool (for debugging). */ - size_t i; - - /* True if the handle is in use by a thread. */ - bool in_use; - - /* These fields are used/initialized when we create the handle. */ - bool accept_range; - int64_t exportsize; - char errbuf[CURL_ERROR_SIZE]; /* Before doing a read or write operation, set these to point to the @@ -111,8 +104,30 @@ struct curl_handle { const char *read_buf; uint32_t read_count; + /* This field is used by curl_get_size. */ + bool accept_range; + /* Used by scripts.c */ struct curl_slist *headers_copy; + + /* Used by pool.c */ + struct command *cmd; +}; + +/* Asynchronous commands that can be sent to the pool thread. */ +enum command_type { EASY_HANDLE, STOP }; +struct command { + /* These fields are set by the caller. */ + enum command_type type; /* command */ + struct curl_handle *ch; /* for EASY_HANDLE, the easy handle */ + + /* This field is set to a unique value by send_command_and_wait. */ + uint64_t id; /* serial number */ + + /* These fields are used to signal back that the command finished. */ + pthread_mutex_t mutex; /* completion mutex */ + pthread_cond_t cond; /* completion condition */ + CURLcode status; /* status code (CURLE_OK = succeeded) */ }; /* config.c */ @@ -127,8 +142,7 @@ extern void free_handle (struct curl_handle *); extern int pool_get_ready (void); extern int pool_after_fork (void); extern void pool_unload (void); -extern struct curl_handle *get_handle (void); -extern void put_handle (struct curl_handle *ch); +extern CURLcode send_command_and_wait (struct command *cmd); /* scripts.c */ extern int do_scripts (struct curl_handle *ch); diff --git a/plugins/curl/pool.c b/plugins/curl/pool.c index eb2d330e..254951d1 100644 --- a/plugins/curl/pool.c +++ b/plugins/curl/pool.c @@ -30,11 +30,29 @@ * SUCH DAMAGE. */ -/* Curl handle pool. +/* Worker thread which processes the curl multi interface. * - * To get a libcurl handle, call get_handle(). When you hold the - * handle, it is yours exclusively to use. After you have finished - * with the handle, put it back into the pool by calling put_handle(). + * The main nbdkit threads (see curl.c) create curl easy handles + * initialized with the work they want to carry out. Note there is + * one easy handle per task (eg. per pread/pwrite request). The easy + * handles are not reused. + * + * The commands + optional easy handle are submitted to the worker + * thread over a self-pipe (it's easy to use a pipe here because the + * way curl multi works is it can listen on an extra fd, but not on + * anything else like a pthread condition). The curl multi performs + * the work of the outstanding easy handles. + * + * When an easy handle finishes work or errors, we retire the command + * by signalling back to the waiting nbdkit thread using a pthread + * condition. + * + * In my experiments, we're almost always I/O bound so I haven't seen + * any strong need to use more than one curl multi / worker thread, + * although it would be possible to add more in future. + * + * See also this extremely useful thread: + * https://curl.se/mail/lib-2019-03/0100.html */ #include @@ -45,6 +63,7 @@ #include #include #include +#include #include #include @@ -62,115 +81,321 @@ NBDKIT_DLL_PUBLIC int curl_debug_pool = 0; unsigned connections = 4; -/* This lock protects access to the curl_handles vector below. */ -static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; +/* Pipe used to notify background thread that a command is pending in + * the queue. A pointer to the 'struct command' is sent over the + * pipe. + */ +static int self_pipe[2] = { -1, -1 }; -/* List of curl handles. This is allocated dynamically as more - * handles are requested. Currently it does not shrink. It may grow - * up to 'connections' in length. +/* The curl multi handle. */ +static CURLM *multi; + +/* List of running easy handles. We only need to maintain this so we + * can remove them from the multi handle when cleaning up. */ DEFINE_VECTOR_TYPE (curl_handle_list, struct curl_handle *); static curl_handle_list curl_handles = empty_vector; -/* The condition is used when the curl handles vector is full and - * we're waiting for a thread to put_handle. - */ -static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; -static size_t in_use = 0, waiting = 0; +static const char * +command_type_to_string (enum command_type type) +{ + switch (type) { + case EASY_HANDLE: return "EASY_HANDLE"; + case STOP: return "STOP"; + default: abort (); + } +} int pool_get_ready (void) { + multi = curl_multi_init (); + if (multi == NULL) { + nbdkit_error ("curl_multi_init failed: %m"); + return -1; + } + return 0; } +/* Start and stop the background thread. */ +static pthread_t thread; +static bool thread_running; +static void *pool_worker (void *); + int pool_after_fork (void) { + int err; + + if (pipe (self_pipe) == -1) { + nbdkit_error ("pipe: %m"); + return -1; + } + + /* Start the pool background thread where all the curl work is done. */ + err = pthread_create (&thread, NULL, pool_worker, NULL); + if (err != 0) { + errno = err; + nbdkit_error ("pthread_create: %m"); + return -1; + } + thread_running = true; + return 0; } -/* Close and free all handles in the pool. */ +/* Unload the background thread. */ void pool_unload (void) { - size_t i; + if (thread_running) { + /* Stop the background thread. */ + struct command cmd = { .type = STOP }; + send_command_and_wait (&cmd); + pthread_join (thread, NULL); + thread_running = false; + } - if (curl_debug_pool) - nbdkit_debug ("unload_pool: number of curl handles allocated: %zu", - curl_handles.len); + if (self_pipe[0] >= 0) { + close (self_pipe[0]); + self_pipe[0] = -1; + } + if (self_pipe[1] >= 0) { + close (self_pipe[1]); + self_pipe[1] = -1; + } - for (i = 0; i < curl_handles.len; ++i) - free_handle (curl_handles.ptr[i]); - curl_handle_list_reset (&curl_handles); + if (multi) { + size_t i; + + /* Remove and free any easy handles in the multi. */ + for (i = 0; i < curl_handles.len; ++i) { + curl_multi_remove_handle (multi, curl_handles.ptr[i]->c); + free_handle (curl_handles.ptr[i]); + } + + curl_multi_cleanup (multi); + multi = NULL; + } } -/* Get a handle from the pool. - * - * It is owned exclusively by the caller until they call put_handle. +/* Command queue. */ +static _Atomic uint64_t id; /* next command ID */ + +/* Send command to the background thread and wait for completion. + * This is only called by one of the nbdkit threads. */ -struct curl_handle * -get_handle (void) +CURLcode +send_command_and_wait (struct command *cmd) { - ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&lock); - size_t i; - struct curl_handle *ch; + cmd->id = id++; - again: - /* Look for a handle which is not in_use. */ - for (i = 0; i < curl_handles.len; ++i) { - ch = curl_handles.ptr[i]; - if (!ch->in_use) { - ch->in_use = true; - in_use++; - if (curl_debug_pool) - nbdkit_debug ("get_handle: %zu", ch->i); - return ch; - } - } - - /* If more connections are allowed, then allocate a new handle. */ - if (curl_handles.len < connections) { - ch = allocate_handle (); - if (ch == NULL) - return NULL; - if (curl_handle_list_append (&curl_handles, ch) == -1) { - free_handle (ch); - return NULL; - } - ch->i = curl_handles.len - 1; - ch->in_use = true; - in_use++; - if (curl_debug_pool) - nbdkit_debug ("get_handle: %zu", ch->i); - return ch; - } - - /* Otherwise we have run out of connections so we must wait until - * another thread calls put_handle. + /* CURLcode is 0 (CURLE_OK) or > 0, so use -1 as a sentinel to + * indicate that the command has not yet been completed and status + * set. */ - assert (in_use == connections); - waiting++; - while (in_use == connections) - pthread_cond_wait (&cond, &lock); - waiting--; + cmd->status = -1; - goto again; + /* This will be used to signal command completion back to us. */ + pthread_mutex_init (&cmd->mutex, NULL); + pthread_cond_init (&cmd->cond, NULL); + + /* Send the command to the background thread. */ + if (write (self_pipe[1], &cmd, sizeof cmd) != sizeof cmd) + abort (); + + /* Wait for the command to be completed by the background thread. */ + { + ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex); + while (cmd->status == -1) /* for -1, see above */ + pthread_cond_wait (&cmd->cond, &cmd->mutex); + } + + pthread_mutex_destroy (&cmd->mutex); + pthread_cond_destroy (&cmd->cond); + + /* Note the main thread must call nbdkit_error on error! */ + return cmd->status; } -/* Return the handle to the pool. */ -void -put_handle (struct curl_handle *ch) +/* The background thread. */ +static struct command *process_multi_handle (void); +static void check_for_finished_handles (void); +static void retire_command (struct command *cmd, CURLcode code); +static void do_easy_handle (struct command *cmd); + +static void * +pool_worker (void *vp) { - ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&lock); + bool stop = false; + + if (curl_debug_pool) + nbdkit_debug ("curl: background thread started"); + + while (!stop) { + struct command *cmd = NULL; + + cmd = process_multi_handle (); + if (cmd == NULL) + continue; /* or die?? */ + + if (curl_debug_pool) + nbdkit_debug ("curl: dispatching %s command %" PRIu64, + command_type_to_string (cmd->type), cmd->id); + + switch (cmd->type) { + case STOP: + stop = true; + retire_command (cmd, CURLE_OK); + break; + + case EASY_HANDLE: + do_easy_handle (cmd); + break; + } + } /* while (!stop) */ if (curl_debug_pool) - nbdkit_debug ("put_handle: %zu", ch->i); + nbdkit_debug ("curl: background thread stopped"); + + return NULL; +} + +/* Process the multi handle, and look out for new commands. Returns + * when there is a new command. + */ +static struct command * +process_multi_handle (void) +{ + struct curl_waitfd extra_fds[1] = + { { .fd = self_pipe[0], .events = CURL_WAIT_POLLIN } }; + CURLMcode mc; + int numfds, running_handles; + struct command *cmd = NULL; +#ifndef HAVE_CURL_MULTI_POLL + int repeats = 0; +#endif + + while (!cmd) { + /* Process the multi handle. */ + mc = curl_multi_perform (multi, &running_handles); + if (mc != CURLM_OK) { + nbdkit_error ("curl_multi_perform: %s", curl_multi_strerror (mc)); + return NULL; + } + + check_for_finished_handles (); + +#ifdef HAVE_CURL_MULTI_POLL + mc = curl_multi_poll (multi, extra_fds, 1, 1000000, &numfds); + if (mc != CURLM_OK) { + nbdkit_error ("curl_multi_poll: %s", curl_multi_strerror (mc)); + return NULL; + } +#else + /* This is the older curl_multi_wait function. For unclear + * reasons this often gets "stuck" in the nbdkit_nanosleep case + * below, wasting large amounts of time. Luckily the newer curl + * no longer uses this function. + */ + mc = curl_multi_wait (multi, extra_fds, 1, 1000000, &numfds); + if (mc != CURLM_OK) { + nbdkit_error ("curl_multi_wait: %s", curl_multi_strerror (mc)); + return NULL; + } + + if (numfds == 0) { + repeats++; + if (repeats > 1) + nbdkit_nanosleep (1, 0); + continue; + } + else + repeats = 0; +#endif + + if (curl_debug_pool) + nbdkit_debug ( +#ifdef HAVE_CURL_MULTI_POLL + "curl_multi_poll" +#else + "curl_multi_wait" +#endif + ": running_handles=%d numfds=%d", + running_handles, numfds); + + if (extra_fds[0].revents == CURL_WAIT_POLLIN) { + /* There's a command waiting. */ + if (read (self_pipe[0], &cmd, sizeof cmd) != sizeof cmd) + abort (); + } + } + + return cmd; +} + +/* This checks if any easy handles in the multi have + * finished and retires the associated commands. + */ +static void +check_for_finished_handles (void) +{ + CURLMsg *msg; + int msgs_in_queue; + + while ((msg = curl_multi_info_read (multi, &msgs_in_queue)) != NULL) { + size_t i; + struct curl_handle *ch = NULL; + + if (msg->msg == CURLMSG_DONE) { + /* Find this curl_handle. */ + for (i = 0; i < curl_handles.len; ++i) { + if (curl_handles.ptr[i]->c == msg->easy_handle) { + ch = curl_handles.ptr[i]; + curl_handle_list_remove (&curl_handles, i); + break; + } + } + if (ch == NULL) abort (); + curl_multi_remove_handle (multi, ch->c); + + retire_command (ch->cmd, msg->data.result); + } + } +} + +/* Retire a command. status is a CURLcode. */ +static void +retire_command (struct command *cmd, CURLcode status) +{ + if (curl_debug_pool) + nbdkit_debug ("curl: retiring %s command %" PRIu64, + command_type_to_string (cmd->type), cmd->id); + + ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex); + cmd->status = status; + pthread_cond_signal (&cmd->cond); +} + +static void +do_easy_handle (struct command *cmd) +{ + CURLMcode mc; + + cmd->ch->cmd = cmd; + + /* Add the handle to the multi. */ + mc = curl_multi_add_handle (multi, cmd->ch->c); + if (mc != CURLM_OK) { + nbdkit_error ("curl_multi_add_handle: %s", curl_multi_strerror (mc)); + goto err; + } - ch->in_use = false; - in_use--; + if (curl_handle_list_append (&curl_handles, cmd->ch) == -1) + goto err; + return; - /* Signal the next thread which is waiting. */ - if (waiting > 0) - pthread_cond_signal (&cond); + err: + retire_command (cmd, CURLE_OUT_OF_MEMORY); } -- 2.39.3