795 lines
26 KiB
Diff
795 lines
26 KiB
Diff
|
From ac40ae11bc9983e11185749b23e793568cb366cc Mon Sep 17 00:00:00 2001
|
||
|
From: "Richard W.M. Jones" <rjones@redhat.com>
|
||
|
Date: Sat, 16 Apr 2022 18:39:13 +0100
|
||
|
Subject: [PATCH] readahead: Rewrite this filter so it prefetches using .cache
|
||
|
|
||
|
The previous readahead filter did not work well and we have stopped
|
||
|
using it in virt-v2v. However the concept is a good one if we can
|
||
|
make it work. This commit completely rethinks the filter.
|
||
|
|
||
|
Now, in parallel with the ordinary pread command, we issue a prefetch
|
||
|
(ie. .cache) to the underlying plugin for the data immediately
|
||
|
following the pread. For example a simple sequence of operations:
|
||
|
|
||
|
t=1 pread (offset=0, count=65536)
|
||
|
t=2 pread (offset=65536, count=65536)
|
||
|
t=3 pread (offset=131072, count=65536)
|
||
|
|
||
|
would become:
|
||
|
|
||
|
t=1 pread (offset=0, count=65536) <--\ issued
|
||
|
cache (offset=65536, count=65536) <--/ in parallel
|
||
|
t=2 pread (offset=65536, count=65536)
|
||
|
cache (offset=131072, count=65536)
|
||
|
t=3 pread (offset=131072, count=65536)
|
||
|
cache (offset=196608, count=65536)
|
||
|
|
||
|
This requires that the underlying filter(s) and plugin chain can
|
||
|
actually do something with the .cache request. If this is not the
|
||
|
case then the filter does nothing (but it will print a warning). For
|
||
|
plugins which don't have native support for prefetching, it is
|
||
|
sufficient to insert nbdkit-cache-filter after this filter.
|
||
|
(nbdkit-cow-filter can also be used with cow-on-cache=true, but that
|
||
|
is more useful for advanced users who are already using the cow
|
||
|
filter).
|
||
|
|
||
|
The implementation creates a background thread per connection to issue
|
||
|
the parallel .cache requests. This is safer than the alternative (one
|
||
|
background thread in total) since we don't have to deal with the
|
||
|
problem of cache requests being issued with the wrong export name or
|
||
|
stale next pointer. The background thread is controlled by a queue of
|
||
|
commands, with the only possible commands being "cache" or "quit".
|
||
|
|
||
|
Because the background thread issues parallel requests on the same
|
||
|
connection, the underlying plugin must support the parallel thread
|
||
|
model, otherwise we would be violating the plugin's thread model. It
|
||
|
may be possible in future to open a new connection from the background
|
||
|
thread (but with the same exportname), which would lift this
|
||
|
restriction to at least serialize_requests. Because of the current
|
||
|
limitation, nbdkit-curl-plugin cannot use prefetch.
|
||
|
|
||
|
(cherry picked from commit 2ff548d66ad3eae87868402ec5b3319edd12090f)
|
||
|
---
|
||
|
TODO | 22 +-
|
||
|
filters/readahead/Makefile.am | 2 +
|
||
|
filters/readahead/bgthread.c | 76 ++++
|
||
|
filters/readahead/nbdkit-readahead-filter.pod | 70 +++-
|
||
|
filters/readahead/readahead.c | 338 +++++++++---------
|
||
|
filters/readahead/readahead.h | 60 ++++
|
||
|
tests/test-readahead.sh | 2 +-
|
||
|
7 files changed, 367 insertions(+), 203 deletions(-)
|
||
|
create mode 100644 filters/readahead/bgthread.c
|
||
|
create mode 100644 filters/readahead/readahead.h
|
||
|
|
||
|
diff --git a/TODO b/TODO
|
||
|
index 5ae21db5..4d2a9796 100644
|
||
|
--- a/TODO
|
||
|
+++ b/TODO
|
||
|
@@ -62,16 +62,6 @@ General ideas for improvements
|
||
|
continue to keep their non-standard handshake while utilizing nbdkit
|
||
|
to prototype new behaviors in serving the kernel.
|
||
|
|
||
|
-* Background thread for filters. Some filters (readahead, cache and
|
||
|
- proposed scan filter - see below) could be more effective if they
|
||
|
- were able to defer work to a background thread. We finally have
|
||
|
- nbdkit_next_context_open and friends for allowing a background
|
||
|
- thread to have access into the plugin, but still need to worry about
|
||
|
- thread-safety (how much must the filter do vs. nbdkit, to avoid
|
||
|
- calling into the plugin too many times at once) and cleanup
|
||
|
- (spawning the thread during .after_fork is viable, but cleaning it
|
||
|
- up during .unload is too late).
|
||
|
-
|
||
|
* "nbdkit.so": nbdkit as a loadable shared library. The aim of nbdkit
|
||
|
is to make it reusable from other programs (see nbdkit-captive(1)).
|
||
|
If it was a loadable shared library it would be even more reusable.
|
||
|
@@ -228,6 +218,8 @@ Suggestions for filters
|
||
|
* nbdkit-cache-filter should handle ENOSPC errors automatically by
|
||
|
reclaiming blocks from the cache
|
||
|
|
||
|
+* nbdkit-cache-filter could use a background thread for reclaiming.
|
||
|
+
|
||
|
* zstd filter was requested as a way to do what we currently do with
|
||
|
xz but saving many hours on compression (at the cost of hundreds of
|
||
|
MBs of extra data)
|
||
|
@@ -240,6 +232,16 @@ Suggestions for filters
|
||
|
could inject a flush after pausing. However this requires that
|
||
|
filter background threads have access to the plugin (see above).
|
||
|
|
||
|
+nbdkit-readahead-filter:
|
||
|
+
|
||
|
+* The filter should open a new connection to the plugin per background
|
||
|
+ thread so it is able to work with plugins that use the
|
||
|
+ serialize_requests thread model (like curl). At the moment it makes
|
||
|
+ requests on the same connection, so it requires plugins to use the
|
||
|
+ parallel thread model.
|
||
|
+
|
||
|
+* It should combine (or avoid) overlapping cache requests.
|
||
|
+
|
||
|
nbdkit-rate-filter:
|
||
|
|
||
|
* allow other kinds of traffic shaping such as VBR
|
||
|
diff --git a/filters/readahead/Makefile.am b/filters/readahead/Makefile.am
|
||
|
index ee5bb3fb..187993ae 100644
|
||
|
--- a/filters/readahead/Makefile.am
|
||
|
+++ b/filters/readahead/Makefile.am
|
||
|
@@ -37,6 +37,8 @@ filter_LTLIBRARIES = nbdkit-readahead-filter.la
|
||
|
|
||
|
nbdkit_readahead_filter_la_SOURCES = \
|
||
|
readahead.c \
|
||
|
+ readahead.h \
|
||
|
+ bgthread.c \
|
||
|
$(top_srcdir)/include/nbdkit-filter.h \
|
||
|
$(NULL)
|
||
|
|
||
|
diff --git a/filters/readahead/bgthread.c b/filters/readahead/bgthread.c
|
||
|
new file mode 100644
|
||
|
index 00000000..5894bb5f
|
||
|
--- /dev/null
|
||
|
+++ b/filters/readahead/bgthread.c
|
||
|
@@ -0,0 +1,76 @@
|
||
|
+/* nbdkit
|
||
|
+ * Copyright (C) 2019-2022 Red Hat Inc.
|
||
|
+ *
|
||
|
+ * Redistribution and use in source and binary forms, with or without
|
||
|
+ * modification, are permitted provided that the following conditions are
|
||
|
+ * met:
|
||
|
+ *
|
||
|
+ * * Redistributions of source code must retain the above copyright
|
||
|
+ * notice, this list of conditions and the following disclaimer.
|
||
|
+ *
|
||
|
+ * * Redistributions in binary form must reproduce the above copyright
|
||
|
+ * notice, this list of conditions and the following disclaimer in the
|
||
|
+ * documentation and/or other materials provided with the distribution.
|
||
|
+ *
|
||
|
+ * * Neither the name of Red Hat nor the names of its contributors may be
|
||
|
+ * used to endorse or promote products derived from this software without
|
||
|
+ * specific prior written permission.
|
||
|
+ *
|
||
|
+ * THIS SOFTWARE IS PROVIDED BY RED HAT AND CONTRIBUTORS ''AS IS'' AND
|
||
|
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
|
||
|
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
|
||
|
+ * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL RED HAT OR
|
||
|
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||
|
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||
|
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
|
||
|
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
|
||
|
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
||
|
+ * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
|
||
|
+ * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
|
||
|
+ * SUCH DAMAGE.
|
||
|
+ */
|
||
|
+
|
||
|
+#include <config.h>
|
||
|
+
|
||
|
+#include <stdio.h>
|
||
|
+#include <stdlib.h>
|
||
|
+#include <stdint.h>
|
||
|
+#include <pthread.h>
|
||
|
+
|
||
|
+#include <nbdkit-filter.h>
|
||
|
+
|
||
|
+#include "readahead.h"
|
||
|
+
|
||
|
+#include "cleanup.h"
|
||
|
+
|
||
|
+void *
|
||
|
+readahead_thread (void *vp)
|
||
|
+{
|
||
|
+ struct bgthread_ctrl *ctrl = vp;
|
||
|
+
|
||
|
+ for (;;) {
|
||
|
+ struct command cmd;
|
||
|
+
|
||
|
+ /* Wait until we are sent at least one command. */
|
||
|
+ {
|
||
|
+ ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&ctrl->lock);
|
||
|
+ while (ctrl->cmds.len == 0)
|
||
|
+ pthread_cond_wait (&ctrl->cond, &ctrl->lock);
|
||
|
+ cmd = ctrl->cmds.ptr[0];
|
||
|
+ command_queue_remove (&ctrl->cmds, 0);
|
||
|
+ }
|
||
|
+
|
||
|
+ switch (cmd.type) {
|
||
|
+ case CMD_QUIT:
|
||
|
+ /* Finish processing and exit the thread. */
|
||
|
+ return NULL;
|
||
|
+
|
||
|
+ case CMD_CACHE:
|
||
|
+ /* Issue .cache (readahead) to underlying plugin. We ignore any
|
||
|
+ * errors because there's no way to communicate that back to the
|
||
|
+ * client, and readahead is only advisory.
|
||
|
+ */
|
||
|
+ cmd.next->cache (cmd.next, cmd.count, cmd.offset, 0, NULL);
|
||
|
+ }
|
||
|
+ }
|
||
|
+}
|
||
|
diff --git a/filters/readahead/nbdkit-readahead-filter.pod b/filters/readahead/nbdkit-readahead-filter.pod
|
||
|
index c220d379..630e5924 100644
|
||
|
--- a/filters/readahead/nbdkit-readahead-filter.pod
|
||
|
+++ b/filters/readahead/nbdkit-readahead-filter.pod
|
||
|
@@ -1,28 +1,66 @@
|
||
|
=head1 NAME
|
||
|
|
||
|
-nbdkit-readahead-filter - prefetch data when reading sequentially
|
||
|
+nbdkit-readahead-filter - prefetch data ahead of sequential reads
|
||
|
|
||
|
=head1 SYNOPSIS
|
||
|
|
||
|
- nbdkit --filter=readahead plugin
|
||
|
+ nbdkit --filter=readahead PLUGIN
|
||
|
+
|
||
|
+ nbdkit --filter=readahead --filter=cache PLUGIN
|
||
|
+
|
||
|
+ nbdkit --filter=readahead --filter=cow PLUGIN cow-on-cache=true
|
||
|
|
||
|
=head1 DESCRIPTION
|
||
|
|
||
|
C<nbdkit-readahead-filter> is a filter that prefetches data when the
|
||
|
-client is reading sequentially.
|
||
|
+client is reading.
|
||
|
|
||
|
-A common use for this filter is to accelerate sequential copy
|
||
|
-operations (like S<C<qemu-img convert>>) when plugin requests have a
|
||
|
-high overhead (like L<nbdkit-curl-plugin(1)>). For example:
|
||
|
-
|
||
|
- nbdkit -U - --filter=readahead curl https://example.com/disk.img \
|
||
|
- --run 'qemu-img convert $nbd disk.img'
|
||
|
+When the client issues a read, this filter issues a parallel prefetch
|
||
|
+(C<.cache>) for subsequent data. Plugins which support this command
|
||
|
+will prefetch the data, making sequential reads faster. For plugins
|
||
|
+which do not support this command, you can inject
|
||
|
+L<nbdkit-cache-filter(1)> below (after) this filter, giving
|
||
|
+approximately the same effect. L<nbdkit-cow-filter(1)> can be used
|
||
|
+instead of nbdkit-cache-filter, if you add the C<cow-on-cache=true>
|
||
|
+option.
|
||
|
|
||
|
The filter uses a simple adaptive algorithm which accelerates
|
||
|
-sequential reads, but has a small penalty if the client does random
|
||
|
-reads. If the client mixes reads with writes or write-like operations
|
||
|
-(trimming, zeroing) then it will work but there can be a large
|
||
|
-performance penalty.
|
||
|
+sequential reads and requires no further configuration.
|
||
|
+
|
||
|
+=head2 Limitations
|
||
|
+
|
||
|
+In a number of significant cases this filter will do nothing. The
|
||
|
+filter will print a warning message if this happens.
|
||
|
+
|
||
|
+=over 4
|
||
|
+
|
||
|
+=item Thread model must be parallel
|
||
|
+
|
||
|
+For example L<nbdkit-curl-plugin(1)> only supports
|
||
|
+C<serialize_requests>, and so this filter cannot perform prefetches in
|
||
|
+parallel with the read requests.
|
||
|
+
|
||
|
+We may be able to lift this restriction in future.
|
||
|
+
|
||
|
+=item Underlying filters or plugin must support C<.cache> (prefetch)
|
||
|
+
|
||
|
+Very many plugins do not have the concept of prefetching and/or
|
||
|
+do not implement the C<.cache> callback, and so there is no
|
||
|
+way for this filter to issue prefetches.
|
||
|
+
|
||
|
+You can usually get around this by adding I<--filter=cache> after this
|
||
|
+filter as explained above. It may be necessary to limit the total
|
||
|
+size of the cache (see L<nbdkit-cache-filter(1)/CACHE MAXIMUM SIZE>).
|
||
|
+
|
||
|
+=item Clients and kernels may do readahead already
|
||
|
+
|
||
|
+It may be the case that NBD clients are already issuing
|
||
|
+C<NBD_CMD_CACHE> (NBD prefetch) commands. It may also be the case
|
||
|
+that your plugin is using local file functions where the kernel is
|
||
|
+doing readahead. In such cases this filter is not necessary and may
|
||
|
+be pessimal.
|
||
|
+
|
||
|
+=back
|
||
|
|
||
|
=head1 PARAMETERS
|
||
|
|
||
|
@@ -50,9 +88,9 @@ C<nbdkit-readahead-filter> first appeared in nbdkit 1.12.
|
||
|
|
||
|
L<nbdkit(1)>,
|
||
|
L<nbdkit-cache-filter(1)>,
|
||
|
-L<nbdkit-curl-plugin(1)>,
|
||
|
+L<nbdkit-cow-filter(1)>,
|
||
|
+L<nbdkit-file-plugin(1)>,
|
||
|
L<nbdkit-retry-filter(1)>,
|
||
|
-L<nbdkit-ssh-plugin(1)>,
|
||
|
L<nbdkit-torrent-plugin(1)>,
|
||
|
L<nbdkit-vddk-plugin(1)>,
|
||
|
L<nbdkit-filter(3)>,
|
||
|
@@ -64,4 +102,4 @@ Richard W.M. Jones
|
||
|
|
||
|
=head1 COPYRIGHT
|
||
|
|
||
|
-Copyright (C) 2019 Red Hat Inc.
|
||
|
+Copyright (C) 2019-2022 Red Hat Inc.
|
||
|
diff --git a/filters/readahead/readahead.c b/filters/readahead/readahead.c
|
||
|
index f5552d4c..1d7ae111 100644
|
||
|
--- a/filters/readahead/readahead.c
|
||
|
+++ b/filters/readahead/readahead.c
|
||
|
@@ -1,5 +1,5 @@
|
||
|
/* nbdkit
|
||
|
- * Copyright (C) 2019-2021 Red Hat Inc.
|
||
|
+ * Copyright (C) 2019-2022 Red Hat Inc.
|
||
|
*
|
||
|
* Redistribution and use in source and binary forms, with or without
|
||
|
* modification, are permitted provided that the following conditions are
|
||
|
@@ -34,232 +34,218 @@
|
||
|
|
||
|
#include <stdio.h>
|
||
|
#include <stdlib.h>
|
||
|
+#include <stdbool.h>
|
||
|
#include <stdint.h>
|
||
|
#include <string.h>
|
||
|
#include <errno.h>
|
||
|
-
|
||
|
#include <pthread.h>
|
||
|
|
||
|
#include <nbdkit-filter.h>
|
||
|
|
||
|
+#include "readahead.h"
|
||
|
+
|
||
|
#include "cleanup.h"
|
||
|
#include "minmax.h"
|
||
|
-
|
||
|
-/* Copied from server/plugins.c. */
|
||
|
-#define MAX_REQUEST_SIZE (64 * 1024 * 1024)
|
||
|
+#include "vector.h"
|
||
|
|
||
|
/* These could be made configurable in future. */
|
||
|
-#define READAHEAD_MIN 65536
|
||
|
-#define READAHEAD_MAX MAX_REQUEST_SIZE
|
||
|
-
|
||
|
-/* This lock protects the global state. */
|
||
|
-static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
|
||
|
-
|
||
|
-/* The real size of the underlying plugin. */
|
||
|
-static uint64_t size;
|
||
|
+#define READAHEAD_MIN 32768
|
||
|
+#define READAHEAD_MAX (4*1024*1024)
|
||
|
|
||
|
/* Size of the readahead window. */
|
||
|
+static pthread_mutex_t window_lock = PTHREAD_MUTEX_INITIALIZER;
|
||
|
static uint64_t window = READAHEAD_MIN;
|
||
|
+static uint64_t last_offset = 0, last_readahead = 0;
|
||
|
|
||
|
-/* The single prefetch buffer shared by all threads, and its virtual
|
||
|
- * location in the virtual disk. The prefetch buffer grows
|
||
|
- * dynamically as required, but never shrinks.
|
||
|
+static int thread_model = -1; /* Thread model of the underlying plugin. */
|
||
|
+
|
||
|
+/* Per-connection data. */
|
||
|
+struct readahead_handle {
|
||
|
+ int can_cache; /* Can the underlying plugin cache? */
|
||
|
+ pthread_t thread; /* The background thread, one per connection. */
|
||
|
+ struct bgthread_ctrl ctrl;
|
||
|
+};
|
||
|
+
|
||
|
+/* We have various requirements of the underlying filter(s) + plugin:
|
||
|
+ * - They must support NBDKIT_CACHE_NATIVE (otherwise our requests
|
||
|
+ * would not do anything useful).
|
||
|
+ * - They must use the PARALLEL thread model (otherwise we could
|
||
|
+ * violate their thread model).
|
||
|
+ */
|
||
|
+static bool
|
||
|
+filter_working (struct readahead_handle *h)
|
||
|
+{
|
||
|
+ return
|
||
|
+ h->can_cache == NBDKIT_CACHE_NATIVE &&
|
||
|
+ thread_model == NBDKIT_THREAD_MODEL_PARALLEL;
|
||
|
+}
|
||
|
+
|
||
|
+static bool
|
||
|
+suggest_cache_filter (struct readahead_handle *h)
|
||
|
+{
|
||
|
+ return
|
||
|
+ h->can_cache != NBDKIT_CACHE_NATIVE &&
|
||
|
+ thread_model == NBDKIT_THREAD_MODEL_PARALLEL;
|
||
|
+}
|
||
|
+
|
||
|
+/* We need to hook into .get_ready() so we can read the final thread
|
||
|
+ * model (of the whole server).
|
||
|
*/
|
||
|
-static char *buffer = NULL;
|
||
|
-static size_t bufsize = 0;
|
||
|
-static uint64_t position;
|
||
|
-static uint32_t length = 0;
|
||
|
+static int
|
||
|
+readahead_get_ready (int final_thread_model)
|
||
|
+{
|
||
|
+ thread_model = final_thread_model;
|
||
|
+ return 0;
|
||
|
+}
|
||
|
+
|
||
|
+static int
|
||
|
+send_command_to_background_thread (struct bgthread_ctrl *ctrl,
|
||
|
+ const struct command cmd)
|
||
|
+{
|
||
|
+ ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&ctrl->lock);
|
||
|
+ if (command_queue_append (&ctrl->cmds, cmd) == -1)
|
||
|
+ return -1;
|
||
|
+ /* Signal the thread if it could be sleeping on an empty queue. */
|
||
|
+ if (ctrl->cmds.len == 1)
|
||
|
+ pthread_cond_signal (&ctrl->cond);
|
||
|
+ return 0;
|
||
|
+}
|
||
|
+
|
||
|
+static void *
|
||
|
+readahead_open (nbdkit_next_open *next, nbdkit_context *nxdata,
|
||
|
+ int readonly, const char *exportname, int is_tls)
|
||
|
+{
|
||
|
+ struct readahead_handle *h;
|
||
|
+ int err;
|
||
|
+
|
||
|
+ if (next (nxdata, readonly, exportname) == -1)
|
||
|
+ return NULL;
|
||
|
+
|
||
|
+ h = malloc (sizeof *h);
|
||
|
+ if (h == NULL) {
|
||
|
+ nbdkit_error ("malloc: %m");
|
||
|
+ return NULL;
|
||
|
+ }
|
||
|
+
|
||
|
+ h->ctrl.cmds = (command_queue) empty_vector;
|
||
|
+ pthread_mutex_init (&h->ctrl.lock, NULL);
|
||
|
+ pthread_cond_init (&h->ctrl.cond, NULL);
|
||
|
+
|
||
|
+ /* Create the background thread. */
|
||
|
+ err = pthread_create (&h->thread, NULL, readahead_thread, &h->ctrl);
|
||
|
+ if (err != 0) {
|
||
|
+ errno = err;
|
||
|
+ nbdkit_error ("pthread_create: %m");
|
||
|
+ pthread_cond_destroy (&h->ctrl.cond);
|
||
|
+ pthread_mutex_destroy (&h->ctrl.lock);
|
||
|
+ free (h);
|
||
|
+ return NULL;
|
||
|
+ }
|
||
|
+
|
||
|
+ return h;
|
||
|
+}
|
||
|
|
||
|
static void
|
||
|
-readahead_unload (void)
|
||
|
+readahead_close (void *handle)
|
||
|
{
|
||
|
- free (buffer);
|
||
|
+ struct readahead_handle *h = handle;
|
||
|
+ const struct command quit_cmd = { .type = CMD_QUIT };
|
||
|
+
|
||
|
+ send_command_to_background_thread (&h->ctrl, quit_cmd);
|
||
|
+ pthread_join (h->thread, NULL);
|
||
|
+ pthread_cond_destroy (&h->ctrl.cond);
|
||
|
+ pthread_mutex_destroy (&h->ctrl.lock);
|
||
|
+ command_queue_reset (&h->ctrl.cmds);
|
||
|
+ free (h);
|
||
|
}
|
||
|
|
||
|
-static int64_t readahead_get_size (nbdkit_next *next, void *handle);
|
||
|
-
|
||
|
-/* In prepare, force a call to get_size which sets the size global. */
|
||
|
static int
|
||
|
-readahead_prepare (nbdkit_next *next, void *handle, int readonly)
|
||
|
+readahead_can_cache (nbdkit_next *next, void *handle)
|
||
|
{
|
||
|
- int64_t r;
|
||
|
+ struct readahead_handle *h = handle;
|
||
|
+ int r;
|
||
|
|
||
|
- r = readahead_get_size (next, handle);
|
||
|
- return r >= 0 ? 0 : -1;
|
||
|
-}
|
||
|
-
|
||
|
-/* Get the size. */
|
||
|
-static int64_t
|
||
|
-readahead_get_size (nbdkit_next *next, void *handle)
|
||
|
-{
|
||
|
- int64_t r;
|
||
|
-
|
||
|
- r = next->get_size (next);
|
||
|
+ /* Call next->can_cache to read the underlying 'can_cache'. */
|
||
|
+ r = next->can_cache (next);
|
||
|
if (r == -1)
|
||
|
return -1;
|
||
|
+ h->can_cache = r;
|
||
|
|
||
|
- ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&lock);
|
||
|
- size = r;
|
||
|
+ if (!filter_working (h)) {
|
||
|
+ nbdkit_error ("readahead: warning: underlying plugin does not support "
|
||
|
+ "NBD_CMD_CACHE or PARALLEL thread model, so the filter "
|
||
|
+ "won't do anything");
|
||
|
+ if (suggest_cache_filter (h))
|
||
|
+ nbdkit_error ("readahead: try adding --filter=cache "
|
||
|
+ "after this filter");
|
||
|
+ /* This is an error, but that's just to ensure that the warning
|
||
|
+ * above is seen. We don't need to return -1 here.
|
||
|
+ */
|
||
|
+ }
|
||
|
|
||
|
return r;
|
||
|
}
|
||
|
|
||
|
-/* Cache */
|
||
|
-static int
|
||
|
-readahead_can_cache (nbdkit_next *next, void *handle)
|
||
|
-{
|
||
|
- /* We are already operating as a cache regardless of the plugin's
|
||
|
- * underlying .can_cache, but it's easiest to just rely on nbdkit's
|
||
|
- * behavior of calling .pread for caching.
|
||
|
- */
|
||
|
- return NBDKIT_CACHE_EMULATE;
|
||
|
-}
|
||
|
-
|
||
|
/* Read data. */
|
||
|
-
|
||
|
-static int
|
||
|
-fill_readahead (nbdkit_next *next,
|
||
|
- uint32_t count, uint64_t offset, uint32_t flags, int *err)
|
||
|
-{
|
||
|
- position = offset;
|
||
|
-
|
||
|
- /* Read at least window bytes, but if count is larger read that.
|
||
|
- * Note that the count cannot be bigger than the buffer size.
|
||
|
- */
|
||
|
- length = MAX (count, window);
|
||
|
-
|
||
|
- /* Don't go beyond the end of the underlying file. */
|
||
|
- length = MIN (length, size - position);
|
||
|
-
|
||
|
- /* Grow the buffer if necessary. */
|
||
|
- if (bufsize < length) {
|
||
|
- char *new_buffer = realloc (buffer, length);
|
||
|
- if (new_buffer == NULL) {
|
||
|
- *err = errno;
|
||
|
- nbdkit_error ("realloc: %m");
|
||
|
- return -1;
|
||
|
- }
|
||
|
- buffer = new_buffer;
|
||
|
- bufsize = length;
|
||
|
- }
|
||
|
-
|
||
|
- if (next->pread (next, buffer, length, offset, flags, err) == -1) {
|
||
|
- length = 0; /* failed to fill the prefetch buffer */
|
||
|
- return -1;
|
||
|
- }
|
||
|
-
|
||
|
- return 0;
|
||
|
-}
|
||
|
-
|
||
|
static int
|
||
|
readahead_pread (nbdkit_next *next,
|
||
|
void *handle, void *buf, uint32_t count, uint64_t offset,
|
||
|
uint32_t flags, int *err)
|
||
|
{
|
||
|
- ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&lock);
|
||
|
+ struct readahead_handle *h = handle;
|
||
|
|
||
|
- while (count > 0) {
|
||
|
- if (length == 0) {
|
||
|
- /* We don't have a prefetch buffer at all. This could be the
|
||
|
- * first request or reset after a miss.
|
||
|
- */
|
||
|
- window = READAHEAD_MIN;
|
||
|
- if (fill_readahead (next, count, offset, flags, err) == -1)
|
||
|
- return -1;
|
||
|
- }
|
||
|
+ /* If the underlying plugin doesn't support caching then skip that
|
||
|
+ * step completely. The filter will do nothing.
|
||
|
+ */
|
||
|
+ if (filter_working (h)) {
|
||
|
+ struct command ra_cmd = { .type = CMD_CACHE, .next = NULL };
|
||
|
+ int64_t size;
|
||
|
|
||
|
- /* Can we satisfy this request partly or entirely from the prefetch
|
||
|
- * buffer?
|
||
|
- */
|
||
|
- else if (position <= offset && offset < position + length) {
|
||
|
- uint32_t n = MIN (position - offset + length, count);
|
||
|
- memcpy (buf, &buffer[offset-position], n);
|
||
|
- buf += n;
|
||
|
- offset += n;
|
||
|
- count -= n;
|
||
|
- }
|
||
|
+ size = next->get_size (next);
|
||
|
+ if (size >= 0) {
|
||
|
+ ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&window_lock);
|
||
|
|
||
|
- /* Does the request start immediately after the prefetch buffer?
|
||
|
- * This is a “hit” allowing us to double the window size.
|
||
|
- */
|
||
|
- else if (offset == position + length) {
|
||
|
- window = MIN (window * 2, READAHEAD_MAX);
|
||
|
- if (fill_readahead (next, count, offset, flags, err) == -1)
|
||
|
- return -1;
|
||
|
+ /* Generate the asynchronous (background) cache command for
|
||
|
+ * the readahead window.
|
||
|
+ */
|
||
|
+ ra_cmd.offset = offset + count;
|
||
|
+ if (ra_cmd.offset < size) {
|
||
|
+ ra_cmd.count = MIN (window, size - ra_cmd.offset);
|
||
|
+ ra_cmd.next = next; /* If .next is non-NULL, we'll send it below. */
|
||
|
+ }
|
||
|
+
|
||
|
+ /* Should we change the window size?
|
||
|
+ * If the last readahead < current offset, double the window.
|
||
|
+ * If not, but we're still making forward progress, keep the window.
|
||
|
+ * If we're not making forward progress, reduce the window to minimum.
|
||
|
+ */
|
||
|
+ if (last_readahead < offset)
|
||
|
+ window = MIN (window * 2, READAHEAD_MAX);
|
||
|
+ else if (last_offset < offset)
|
||
|
+ /* leave window unchanged */ ;
|
||
|
+ else
|
||
|
+ window = READAHEAD_MIN;
|
||
|
+ last_offset = offset;
|
||
|
+ last_readahead = ra_cmd.offset + ra_cmd.count;
|
||
|
}
|
||
|
|
||
|
- /* Else it's a “miss”. Reset everything and start again. */
|
||
|
- else
|
||
|
- length = 0;
|
||
|
+ if (ra_cmd.next &&
|
||
|
+ send_command_to_background_thread (&h->ctrl, ra_cmd) == -1)
|
||
|
+ return -1;
|
||
|
}
|
||
|
|
||
|
- return 0;
|
||
|
-}
|
||
|
-
|
||
|
-/* Any writes or write-like operations kill the prefetch buffer.
|
||
|
- *
|
||
|
- * We could do better here, but for the current use case of this
|
||
|
- * filter it doesn't matter. XXX
|
||
|
- */
|
||
|
-
|
||
|
-static void
|
||
|
-kill_readahead (void)
|
||
|
-{
|
||
|
- ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&lock);
|
||
|
- window = READAHEAD_MIN;
|
||
|
- length = 0;
|
||
|
-}
|
||
|
-
|
||
|
-static int
|
||
|
-readahead_pwrite (nbdkit_next *next,
|
||
|
- void *handle,
|
||
|
- const void *buf, uint32_t count, uint64_t offset,
|
||
|
- uint32_t flags, int *err)
|
||
|
-{
|
||
|
- kill_readahead ();
|
||
|
- return next->pwrite (next, buf, count, offset, flags, err);
|
||
|
-}
|
||
|
-
|
||
|
-static int
|
||
|
-readahead_trim (nbdkit_next *next,
|
||
|
- void *handle,
|
||
|
- uint32_t count, uint64_t offset, uint32_t flags,
|
||
|
- int *err)
|
||
|
-{
|
||
|
- kill_readahead ();
|
||
|
- return next->trim (next, count, offset, flags, err);
|
||
|
-}
|
||
|
-
|
||
|
-static int
|
||
|
-readahead_zero (nbdkit_next *next,
|
||
|
- void *handle,
|
||
|
- uint32_t count, uint64_t offset, uint32_t flags,
|
||
|
- int *err)
|
||
|
-{
|
||
|
- kill_readahead ();
|
||
|
- return next->zero (next, count, offset, flags, err);
|
||
|
-}
|
||
|
-
|
||
|
-static int
|
||
|
-readahead_flush (nbdkit_next *next,
|
||
|
- void *handle, uint32_t flags, int *err)
|
||
|
-{
|
||
|
- kill_readahead ();
|
||
|
- return next->flush (next, flags, err);
|
||
|
+ /* Issue the synchronous read. */
|
||
|
+ return next->pread (next, buf, count, offset, flags, err);
|
||
|
}
|
||
|
|
||
|
static struct nbdkit_filter filter = {
|
||
|
.name = "readahead",
|
||
|
.longname = "nbdkit readahead filter",
|
||
|
- .unload = readahead_unload,
|
||
|
- .prepare = readahead_prepare,
|
||
|
- .get_size = readahead_get_size,
|
||
|
+ .get_ready = readahead_get_ready,
|
||
|
+ .open = readahead_open,
|
||
|
+ .close = readahead_close,
|
||
|
.can_cache = readahead_can_cache,
|
||
|
.pread = readahead_pread,
|
||
|
- .pwrite = readahead_pwrite,
|
||
|
- .trim = readahead_trim,
|
||
|
- .zero = readahead_zero,
|
||
|
- .flush = readahead_flush,
|
||
|
};
|
||
|
|
||
|
NBDKIT_REGISTER_FILTER(filter)
|
||
|
diff --git a/filters/readahead/readahead.h b/filters/readahead/readahead.h
|
||
|
new file mode 100644
|
||
|
index 00000000..a68204d5
|
||
|
--- /dev/null
|
||
|
+++ b/filters/readahead/readahead.h
|
||
|
@@ -0,0 +1,60 @@
|
||
|
+/* nbdkit
|
||
|
+ * Copyright (C) 2019-2022 Red Hat Inc.
|
||
|
+ *
|
||
|
+ * Redistribution and use in source and binary forms, with or without
|
||
|
+ * modification, are permitted provided that the following conditions are
|
||
|
+ * met:
|
||
|
+ *
|
||
|
+ * * Redistributions of source code must retain the above copyright
|
||
|
+ * notice, this list of conditions and the following disclaimer.
|
||
|
+ *
|
||
|
+ * * Redistributions in binary form must reproduce the above copyright
|
||
|
+ * notice, this list of conditions and the following disclaimer in the
|
||
|
+ * documentation and/or other materials provided with the distribution.
|
||
|
+ *
|
||
|
+ * * Neither the name of Red Hat nor the names of its contributors may be
|
||
|
+ * used to endorse or promote products derived from this software without
|
||
|
+ * specific prior written permission.
|
||
|
+ *
|
||
|
+ * THIS SOFTWARE IS PROVIDED BY RED HAT AND CONTRIBUTORS ''AS IS'' AND
|
||
|
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
|
||
|
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
|
||
|
+ * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL RED HAT OR
|
||
|
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||
|
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||
|
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
|
||
|
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
|
||
|
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
||
|
+ * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
|
||
|
+ * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
|
||
|
+ * SUCH DAMAGE.
|
||
|
+ */
|
||
|
+
|
||
|
+#ifndef NBDKIT_READAHEAD_H
|
||
|
+#define NBDKIT_READAHEAD_H
|
||
|
+
|
||
|
+#include <pthread.h>
|
||
|
+
|
||
|
+#include <nbdkit-filter.h>
|
||
|
+
|
||
|
+#include "vector.h"
|
||
|
+
|
||
|
+/* List of commands issued to the background thread. */
|
||
|
+struct command {
|
||
|
+ enum { CMD_QUIT, CMD_CACHE } type;
|
||
|
+ nbdkit_next *next;
|
||
|
+ uint64_t offset;
|
||
|
+ uint32_t count;
|
||
|
+};
|
||
|
+DEFINE_VECTOR_TYPE(command_queue, struct command);
|
||
|
+
|
||
|
+struct bgthread_ctrl {
|
||
|
+ command_queue cmds; /* Command queue. */
|
||
|
+ pthread_mutex_t lock; /* Lock for queue. */
|
||
|
+ pthread_cond_t cond; /* Condition queue size 0 -> 1. */
|
||
|
+};
|
||
|
+
|
||
|
+/* Start background thread (one per connection). */
|
||
|
+extern void *readahead_thread (void *vp);
|
||
|
+
|
||
|
+#endif /* NBDKIT_READAHEAD_H */
|
||
|
diff --git a/tests/test-readahead.sh b/tests/test-readahead.sh
|
||
|
index 7ec7f8e9..17126e5a 100755
|
||
|
--- a/tests/test-readahead.sh
|
||
|
+++ b/tests/test-readahead.sh
|
||
|
@@ -59,7 +59,7 @@ for i in range(0, 512*10, 512):
|
||
|
echo $((end_t - start_t))
|
||
|
}
|
||
|
|
||
|
-t1=$(test --filter=readahead)
|
||
|
+t1=$(test --filter=readahead --filter=cache)
|
||
|
t2=$(test)
|
||
|
|
||
|
# In the t1 case we should make only 1 request into the plugin,
|
||
|
--
|
||
|
2.31.1
|
||
|
|