a5bd08701a
- kvm-target-arm-arch_dump-Add-SVE-notes.patch [bz#1725084] - kvm-vhost-Add-names-to-section-rounded-warning.patch [bz#1779041] - kvm-vhost-Only-align-sections-for-vhost-user.patch [bz#1779041] - kvm-vhost-coding-style-fix.patch [bz#1779041] - kvm-virtio-fs-fix-MSI-X-nvectors-calculation.patch [bz#1694164] - kvm-vhost-user-fs-remove-vhostfd-property.patch [bz#1694164] - kvm-build-rename-CONFIG_LIBCAP-to-CONFIG_LIBCAP_NG.patch [bz#1694164] - kvm-virtiofsd-Pull-in-upstream-headers.patch [bz#1694164] - kvm-virtiofsd-Pull-in-kernel-s-fuse.h.patch [bz#1694164] - kvm-virtiofsd-Add-auxiliary-.c-s.patch [bz#1694164] - kvm-virtiofsd-Add-fuse_lowlevel.c.patch [bz#1694164] - kvm-virtiofsd-Add-passthrough_ll.patch [bz#1694164] - kvm-virtiofsd-Trim-down-imported-files.patch [bz#1694164] - kvm-virtiofsd-Format-imported-files-to-qemu-style.patch [bz#1694164] - kvm-virtiofsd-remove-mountpoint-dummy-argument.patch [bz#1694164] - kvm-virtiofsd-remove-unused-notify-reply-support.patch [bz#1694164] - kvm-virtiofsd-Remove-unused-enum-fuse_buf_copy_flags.patch [bz#1694164] - kvm-virtiofsd-Fix-fuse_daemonize-ignored-return-values.patch [bz#1694164] - kvm-virtiofsd-Fix-common-header-and-define-for-QEMU-buil.patch [bz#1694164] - kvm-virtiofsd-Trim-out-compatibility-code.patch [bz#1694164] - kvm-vitriofsd-passthrough_ll-fix-fallocate-ifdefs.patch [bz#1694164] - kvm-virtiofsd-Make-fsync-work-even-if-only-inode-is-pass.patch [bz#1694164] - kvm-virtiofsd-Add-options-for-virtio.patch [bz#1694164] - kvm-virtiofsd-add-o-source-PATH-to-help-output.patch [bz#1694164] - kvm-virtiofsd-Open-vhost-connection-instead-of-mounting.patch [bz#1694164] - kvm-virtiofsd-Start-wiring-up-vhost-user.patch [bz#1694164] - kvm-virtiofsd-Add-main-virtio-loop.patch [bz#1694164] - kvm-virtiofsd-get-set-features-callbacks.patch [bz#1694164] - kvm-virtiofsd-Start-queue-threads.patch [bz#1694164] - kvm-virtiofsd-Poll-kick_fd-for-queue.patch [bz#1694164] - kvm-virtiofsd-Start-reading-commands-from-queue.patch [bz#1694164] - kvm-virtiofsd-Send-replies-to-messages.patch [bz#1694164] - kvm-virtiofsd-Keep-track-of-replies.patch [bz#1694164] - kvm-virtiofsd-Add-Makefile-wiring-for-virtiofsd-contrib.patch [bz#1694164] - kvm-virtiofsd-Fast-path-for-virtio-read.patch [bz#1694164] - kvm-virtiofsd-add-fd-FDNUM-fd-passing-option.patch [bz#1694164] - kvm-virtiofsd-make-f-foreground-the-default.patch [bz#1694164] - kvm-virtiofsd-add-vhost-user.json-file.patch [bz#1694164] - kvm-virtiofsd-add-print-capabilities-option.patch [bz#1694164] - kvm-virtiofs-Add-maintainers-entry.patch [bz#1694164] - kvm-virtiofsd-passthrough_ll-create-new-files-in-caller-.patch [bz#1694164] - kvm-virtiofsd-passthrough_ll-add-lo_map-for-ino-fh-indir.patch [bz#1694164] - kvm-virtiofsd-passthrough_ll-add-ino_map-to-hide-lo_inod.patch [bz#1694164] - kvm-virtiofsd-passthrough_ll-add-dirp_map-to-hide-lo_dir.patch [bz#1694164] - kvm-virtiofsd-passthrough_ll-add-fd_map-to-hide-file-des.patch [bz#1694164] - kvm-virtiofsd-passthrough_ll-add-fallback-for-racy-ops.patch [bz#1694164] - kvm-virtiofsd-validate-path-components.patch [bz#1694164] - kvm-virtiofsd-Plumb-fuse_bufvec-through-to-do_write_buf.patch [bz#1694164] - kvm-virtiofsd-Pass-write-iov-s-all-the-way-through.patch [bz#1694164] - kvm-virtiofsd-add-fuse_mbuf_iter-API.patch [bz#1694164] - kvm-virtiofsd-validate-input-buffer-sizes-in-do_write_bu.patch [bz#1694164] - kvm-virtiofsd-check-input-buffer-size-in-fuse_lowlevel.c.patch [bz#1694164] - kvm-virtiofsd-prevent-.-escape-in-lo_do_lookup.patch [bz#1694164] - kvm-virtiofsd-prevent-.-escape-in-lo_do_readdir.patch [bz#1694164] - kvm-virtiofsd-use-proc-self-fd-O_PATH-file-descriptor.patch [bz#1694164] - kvm-virtiofsd-sandbox-mount-namespace.patch [bz#1694164] - kvm-virtiofsd-move-to-an-empty-network-namespace.patch [bz#1694164] - kvm-virtiofsd-move-to-a-new-pid-namespace.patch [bz#1694164] - kvm-virtiofsd-add-seccomp-whitelist.patch [bz#1694164] - kvm-virtiofsd-Parse-flag-FUSE_WRITE_KILL_PRIV.patch [bz#1694164] - kvm-virtiofsd-cap-ng-helpers.patch [bz#1694164] - kvm-virtiofsd-Drop-CAP_FSETID-if-client-asked-for-it.patch [bz#1694164] - kvm-virtiofsd-set-maximum-RLIMIT_NOFILE-limit.patch [bz#1694164] - kvm-virtiofsd-fix-libfuse-information-leaks.patch [bz#1694164] - kvm-virtiofsd-add-syslog-command-line-option.patch [bz#1694164] - kvm-virtiofsd-print-log-only-when-priority-is-high-enoug.patch [bz#1694164] - kvm-virtiofsd-Add-ID-to-the-log-with-FUSE_LOG_DEBUG-leve.patch [bz#1694164] - kvm-virtiofsd-Add-timestamp-to-the-log-with-FUSE_LOG_DEB.patch [bz#1694164] - kvm-virtiofsd-Handle-reinit.patch [bz#1694164] - kvm-virtiofsd-Handle-hard-reboot.patch [bz#1694164] - kvm-virtiofsd-Kill-threads-when-queues-are-stopped.patch [bz#1694164] - kvm-vhost-user-Print-unexpected-slave-message-types.patch [bz#1694164] - kvm-contrib-libvhost-user-Protect-slave-fd-with-mutex.patch [bz#1694164] - kvm-virtiofsd-passthrough_ll-add-renameat2-support.patch [bz#1694164] - kvm-virtiofsd-passthrough_ll-disable-readdirplus-on-cach.patch [bz#1694164] - kvm-virtiofsd-passthrough_ll-control-readdirplus.patch [bz#1694164] - kvm-virtiofsd-rename-unref_inode-to-unref_inode_lolocked.patch [bz#1694164] - kvm-virtiofsd-fail-when-parent-inode-isn-t-known-in-lo_d.patch [bz#1694164] - kvm-virtiofsd-extract-root-inode-init-into-setup_root.patch [bz#1694164] - kvm-virtiofsd-passthrough_ll-clean-up-cache-related-opti.patch [bz#1694164] - kvm-virtiofsd-passthrough_ll-use-hashtable.patch [bz#1694164] - kvm-virtiofsd-Clean-up-inodes-on-destroy.patch [bz#1694164] - kvm-virtiofsd-support-nanosecond-resolution-for-file-tim.patch [bz#1694164] - kvm-virtiofsd-fix-error-handling-in-main.patch [bz#1694164] - kvm-virtiofsd-cleanup-allocated-resource-in-se.patch [bz#1694164] - kvm-virtiofsd-fix-memory-leak-on-lo.source.patch [bz#1694164] - kvm-virtiofsd-add-helper-for-lo_data-cleanup.patch [bz#1694164] - kvm-virtiofsd-Prevent-multiply-running-with-same-vhost_u.patch [bz#1694164] - kvm-virtiofsd-enable-PARALLEL_DIROPS-during-INIT.patch [bz#1694164] - kvm-virtiofsd-fix-incorrect-error-handling-in-lo_do_look.patch [bz#1694164] - kvm-Virtiofsd-fix-memory-leak-on-fuse-queueinfo.patch [bz#1694164] - kvm-virtiofsd-Support-remote-posix-locks.patch [bz#1694164] - kvm-virtiofsd-use-fuse_lowlevel_is_virtio-in-fuse_sessio.patch [bz#1694164] - kvm-virtiofsd-prevent-fv_queue_thread-vs-virtio_loop-rac.patch [bz#1694164] - kvm-virtiofsd-make-lo_release-atomic.patch [bz#1694164] - kvm-virtiofsd-prevent-races-with-lo_dirp_put.patch [bz#1694164] - kvm-virtiofsd-rename-inode-refcount-to-inode-nlookup.patch [bz#1694164] - kvm-libvhost-user-Fix-some-memtable-remap-cases.patch [bz#1694164] - kvm-virtiofsd-passthrough_ll-fix-refcounting-on-remove-r.patch [bz#1694164] - kvm-virtiofsd-introduce-inode-refcount-to-prevent-use-af.patch [bz#1694164] - kvm-virtiofsd-do-not-always-set-FUSE_FLOCK_LOCKS.patch [bz#1694164] - kvm-virtiofsd-convert-more-fprintf-and-perror-to-use-fus.patch [bz#1694164] - kvm-virtiofsd-Reset-O_DIRECT-flag-during-file-open.patch [bz#1694164] - kvm-virtiofsd-Fix-data-corruption-with-O_APPEND-write-in.patch [bz#1694164] - kvm-virtiofsd-passthrough_ll-Use-cache_readdir-for-direc.patch [bz#1694164] - kvm-virtiofsd-add-definition-of-fuse_buf_writev.patch [bz#1694164] - kvm-virtiofsd-use-fuse_buf_writev-to-replace-fuse_buf_wr.patch [bz#1694164] - kvm-virtiofsd-process-requests-in-a-thread-pool.patch [bz#1694164] - kvm-virtiofsd-prevent-FUSE_INIT-FUSE_DESTROY-races.patch [bz#1694164] - kvm-virtiofsd-fix-lo_destroy-resource-leaks.patch [bz#1694164] - kvm-virtiofsd-add-thread-pool-size-NUM-option.patch [bz#1694164] - kvm-virtiofsd-Convert-lo_destroy-to-take-the-lo-mutex-lo.patch [bz#1694164] - kvm-virtiofsd-passthrough_ll-Pass-errno-to-fuse_reply_er.patch [bz#1694164] - kvm-virtiofsd-stop-all-queue-threads-on-exit-in-virtio_l.patch [bz#1694164] - kvm-virtiofsd-add-some-options-to-the-help-message.patch [bz#1694164] - kvm-redhat-ship-virtiofsd-vhost-user-device-backend.patch [bz#1694164] - Resolves: bz#1694164 (virtio-fs: host<->guest shared file system (qemu)) - Resolves: bz#1725084 (aarch64: support dumping SVE registers) - Resolves: bz#1779041 (netkvm: no connectivity Windows guest with q35 + hugepages + vhost + hv_synic)
534 lines
19 KiB
Diff
534 lines
19 KiB
Diff
From b0db5e666aaa43eadff3e60a1ada704f33b03074 Mon Sep 17 00:00:00 2001
|
|
From: "Dr. David Alan Gilbert" <dgilbert@redhat.com>
|
|
Date: Mon, 27 Jan 2020 19:02:19 +0100
|
|
Subject: [PATCH 108/116] virtiofsd: process requests in a thread pool
|
|
MIME-Version: 1.0
|
|
Content-Type: text/plain; charset=UTF-8
|
|
Content-Transfer-Encoding: 8bit
|
|
|
|
RH-Author: Dr. David Alan Gilbert <dgilbert@redhat.com>
|
|
Message-id: <20200127190227.40942-105-dgilbert@redhat.com>
|
|
Patchwork-id: 93554
|
|
O-Subject: [RHEL-AV-8.2 qemu-kvm PATCH 104/112] virtiofsd: process requests in a thread pool
|
|
Bugzilla: 1694164
|
|
RH-Acked-by: Philippe Mathieu-Daudé <philmd@redhat.com>
|
|
RH-Acked-by: Stefan Hajnoczi <stefanha@redhat.com>
|
|
RH-Acked-by: Sergio Lopez Pascual <slp@redhat.com>
|
|
|
|
From: Stefan Hajnoczi <stefanha@redhat.com>
|
|
|
|
Introduce a thread pool so that fv_queue_thread() just pops
|
|
VuVirtqElements and hands them to the thread pool. For the time being
|
|
only one worker thread is allowed since passthrough_ll.c is not
|
|
thread-safe yet. Future patches will lift this restriction so that
|
|
multiple FUSE requests can be processed in parallel.
|
|
|
|
The main new concept is struct FVRequest, which contains both
|
|
VuVirtqElement and struct fuse_chan. We now have fv_VuDev for a device,
|
|
fv_QueueInfo for a virtqueue, and FVRequest for a request. Some of
|
|
fv_QueueInfo's fields are moved into FVRequest because they are
|
|
per-request. The name FVRequest conforms to QEMU coding style and I
|
|
expect the struct fv_* types will be renamed in a future refactoring.
|
|
|
|
This patch series is not optimal. fbuf reuse is dropped so each request
|
|
does malloc(se->bufsize), but there is no clean and cheap way to keep
|
|
this with a thread pool. The vq_lock mutex is held for longer than
|
|
necessary, especially during the eventfd_write() syscall. Performance
|
|
can be improved in the future.
|
|
|
|
prctl(2) had to be added to the seccomp whitelist because glib invokes
|
|
it.
|
|
|
|
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
|
|
Reviewed-by: Misono Tomohiro <misono.tomohiro@jp.fujitsu.com>
|
|
Signed-off-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
|
|
(cherry picked from commit a3d756c5aecccc4c0e51060a7e2f1c87bf8f1180)
|
|
Signed-off-by: Miroslav Rezanina <mrezanin@redhat.com>
|
|
---
|
|
tools/virtiofsd/fuse_virtio.c | 359 +++++++++++++++++++++++-------------------
|
|
1 file changed, 201 insertions(+), 158 deletions(-)
|
|
|
|
diff --git a/tools/virtiofsd/fuse_virtio.c b/tools/virtiofsd/fuse_virtio.c
|
|
index f6242f9..0dcf2ef 100644
|
|
--- a/tools/virtiofsd/fuse_virtio.c
|
|
+++ b/tools/virtiofsd/fuse_virtio.c
|
|
@@ -22,6 +22,7 @@
|
|
|
|
#include <assert.h>
|
|
#include <errno.h>
|
|
+#include <glib.h>
|
|
#include <stdint.h>
|
|
#include <stdio.h>
|
|
#include <stdlib.h>
|
|
@@ -37,17 +38,28 @@
|
|
struct fv_VuDev;
|
|
struct fv_QueueInfo {
|
|
pthread_t thread;
|
|
+ /*
|
|
+ * This lock protects the VuVirtq preventing races between
|
|
+ * fv_queue_thread() and fv_queue_worker().
|
|
+ */
|
|
+ pthread_mutex_t vq_lock;
|
|
+
|
|
struct fv_VuDev *virtio_dev;
|
|
|
|
/* Our queue index, corresponds to array position */
|
|
int qidx;
|
|
int kick_fd;
|
|
int kill_fd; /* For killing the thread */
|
|
+};
|
|
|
|
- /* The element for the command currently being processed */
|
|
- VuVirtqElement *qe;
|
|
+/* A FUSE request */
|
|
+typedef struct {
|
|
+ VuVirtqElement elem;
|
|
+ struct fuse_chan ch;
|
|
+
|
|
+ /* Used to complete requests that involve no reply */
|
|
bool reply_sent;
|
|
-};
|
|
+} FVRequest;
|
|
|
|
/*
|
|
* We pass the dev element into libvhost-user
|
|
@@ -191,8 +203,11 @@ static void copy_iov(struct iovec *src_iov, int src_count,
|
|
int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch,
|
|
struct iovec *iov, int count)
|
|
{
|
|
- VuVirtqElement *elem;
|
|
- VuVirtq *q;
|
|
+ FVRequest *req = container_of(ch, FVRequest, ch);
|
|
+ struct fv_QueueInfo *qi = ch->qi;
|
|
+ VuDev *dev = &se->virtio_dev->dev;
|
|
+ VuVirtq *q = vu_get_queue(dev, qi->qidx);
|
|
+ VuVirtqElement *elem = &req->elem;
|
|
int ret = 0;
|
|
|
|
assert(count >= 1);
|
|
@@ -205,11 +220,7 @@ int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch,
|
|
|
|
/* unique == 0 is notification, which we don't support */
|
|
assert(out->unique);
|
|
- /* For virtio we always have ch */
|
|
- assert(ch);
|
|
- assert(!ch->qi->reply_sent);
|
|
- elem = ch->qi->qe;
|
|
- q = &ch->qi->virtio_dev->dev.vq[ch->qi->qidx];
|
|
+ assert(!req->reply_sent);
|
|
|
|
/* The 'in' part of the elem is to qemu */
|
|
unsigned int in_num = elem->in_num;
|
|
@@ -236,9 +247,15 @@ int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch,
|
|
}
|
|
|
|
copy_iov(iov, count, in_sg, in_num, tosend_len);
|
|
- vu_queue_push(&se->virtio_dev->dev, q, elem, tosend_len);
|
|
- vu_queue_notify(&se->virtio_dev->dev, q);
|
|
- ch->qi->reply_sent = true;
|
|
+
|
|
+ pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
|
|
+ pthread_mutex_lock(&qi->vq_lock);
|
|
+ vu_queue_push(dev, q, elem, tosend_len);
|
|
+ vu_queue_notify(dev, q);
|
|
+ pthread_mutex_unlock(&qi->vq_lock);
|
|
+ pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
|
|
+
|
|
+ req->reply_sent = true;
|
|
|
|
err:
|
|
return ret;
|
|
@@ -254,9 +271,12 @@ int virtio_send_data_iov(struct fuse_session *se, struct fuse_chan *ch,
|
|
struct iovec *iov, int count, struct fuse_bufvec *buf,
|
|
size_t len)
|
|
{
|
|
+ FVRequest *req = container_of(ch, FVRequest, ch);
|
|
+ struct fv_QueueInfo *qi = ch->qi;
|
|
+ VuDev *dev = &se->virtio_dev->dev;
|
|
+ VuVirtq *q = vu_get_queue(dev, qi->qidx);
|
|
+ VuVirtqElement *elem = &req->elem;
|
|
int ret = 0;
|
|
- VuVirtqElement *elem;
|
|
- VuVirtq *q;
|
|
|
|
assert(count >= 1);
|
|
assert(iov[0].iov_len >= sizeof(struct fuse_out_header));
|
|
@@ -275,11 +295,7 @@ int virtio_send_data_iov(struct fuse_session *se, struct fuse_chan *ch,
|
|
/* unique == 0 is notification which we don't support */
|
|
assert(out->unique);
|
|
|
|
- /* For virtio we always have ch */
|
|
- assert(ch);
|
|
- assert(!ch->qi->reply_sent);
|
|
- elem = ch->qi->qe;
|
|
- q = &ch->qi->virtio_dev->dev.vq[ch->qi->qidx];
|
|
+ assert(!req->reply_sent);
|
|
|
|
/* The 'in' part of the elem is to qemu */
|
|
unsigned int in_num = elem->in_num;
|
|
@@ -395,33 +411,175 @@ int virtio_send_data_iov(struct fuse_session *se, struct fuse_chan *ch,
|
|
|
|
ret = 0;
|
|
|
|
- vu_queue_push(&se->virtio_dev->dev, q, elem, tosend_len);
|
|
- vu_queue_notify(&se->virtio_dev->dev, q);
|
|
+ pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
|
|
+ pthread_mutex_lock(&qi->vq_lock);
|
|
+ vu_queue_push(dev, q, elem, tosend_len);
|
|
+ vu_queue_notify(dev, q);
|
|
+ pthread_mutex_unlock(&qi->vq_lock);
|
|
+ pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
|
|
|
|
err:
|
|
if (ret == 0) {
|
|
- ch->qi->reply_sent = true;
|
|
+ req->reply_sent = true;
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
+/* Process one FVRequest in a thread pool */
|
|
+static void fv_queue_worker(gpointer data, gpointer user_data)
|
|
+{
|
|
+ struct fv_QueueInfo *qi = user_data;
|
|
+ struct fuse_session *se = qi->virtio_dev->se;
|
|
+ struct VuDev *dev = &qi->virtio_dev->dev;
|
|
+ FVRequest *req = data;
|
|
+ VuVirtqElement *elem = &req->elem;
|
|
+ struct fuse_buf fbuf = {};
|
|
+ bool allocated_bufv = false;
|
|
+ struct fuse_bufvec bufv;
|
|
+ struct fuse_bufvec *pbufv;
|
|
+
|
|
+ assert(se->bufsize > sizeof(struct fuse_in_header));
|
|
+
|
|
+ /*
|
|
+ * An element contains one request and the space to send our response
|
|
+ * They're spread over multiple descriptors in a scatter/gather set
|
|
+ * and we can't trust the guest to keep them still; so copy in/out.
|
|
+ */
|
|
+ fbuf.mem = malloc(se->bufsize);
|
|
+ assert(fbuf.mem);
|
|
+
|
|
+ fuse_mutex_init(&req->ch.lock);
|
|
+ req->ch.fd = -1;
|
|
+ req->ch.qi = qi;
|
|
+
|
|
+ /* The 'out' part of the elem is from qemu */
|
|
+ unsigned int out_num = elem->out_num;
|
|
+ struct iovec *out_sg = elem->out_sg;
|
|
+ size_t out_len = iov_size(out_sg, out_num);
|
|
+ fuse_log(FUSE_LOG_DEBUG,
|
|
+ "%s: elem %d: with %d out desc of length %zd\n",
|
|
+ __func__, elem->index, out_num, out_len);
|
|
+
|
|
+ /*
|
|
+ * The elem should contain a 'fuse_in_header' (in to fuse)
|
|
+ * plus the data based on the len in the header.
|
|
+ */
|
|
+ if (out_len < sizeof(struct fuse_in_header)) {
|
|
+ fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for in_header\n",
|
|
+ __func__, elem->index);
|
|
+ assert(0); /* TODO */
|
|
+ }
|
|
+ if (out_len > se->bufsize) {
|
|
+ fuse_log(FUSE_LOG_ERR, "%s: elem %d too large for buffer\n", __func__,
|
|
+ elem->index);
|
|
+ assert(0); /* TODO */
|
|
+ }
|
|
+ /* Copy just the first element and look at it */
|
|
+ copy_from_iov(&fbuf, 1, out_sg);
|
|
+
|
|
+ pbufv = NULL; /* Compiler thinks an unitialised path */
|
|
+ if (out_num > 2 &&
|
|
+ out_sg[0].iov_len == sizeof(struct fuse_in_header) &&
|
|
+ ((struct fuse_in_header *)fbuf.mem)->opcode == FUSE_WRITE &&
|
|
+ out_sg[1].iov_len == sizeof(struct fuse_write_in)) {
|
|
+ /*
|
|
+ * For a write we don't actually need to copy the
|
|
+ * data, we can just do it straight out of guest memory
|
|
+ * but we must still copy the headers in case the guest
|
|
+ * was nasty and changed them while we were using them.
|
|
+ */
|
|
+ fuse_log(FUSE_LOG_DEBUG, "%s: Write special case\n", __func__);
|
|
+
|
|
+ /* copy the fuse_write_in header afte rthe fuse_in_header */
|
|
+ fbuf.mem += out_sg->iov_len;
|
|
+ copy_from_iov(&fbuf, 1, out_sg + 1);
|
|
+ fbuf.mem -= out_sg->iov_len;
|
|
+ fbuf.size = out_sg[0].iov_len + out_sg[1].iov_len;
|
|
+
|
|
+ /* Allocate the bufv, with space for the rest of the iov */
|
|
+ pbufv = malloc(sizeof(struct fuse_bufvec) +
|
|
+ sizeof(struct fuse_buf) * (out_num - 2));
|
|
+ if (!pbufv) {
|
|
+ fuse_log(FUSE_LOG_ERR, "%s: pbufv malloc failed\n",
|
|
+ __func__);
|
|
+ goto out;
|
|
+ }
|
|
+
|
|
+ allocated_bufv = true;
|
|
+ pbufv->count = 1;
|
|
+ pbufv->buf[0] = fbuf;
|
|
+
|
|
+ size_t iovindex, pbufvindex;
|
|
+ iovindex = 2; /* 2 headers, separate iovs */
|
|
+ pbufvindex = 1; /* 2 headers, 1 fusebuf */
|
|
+
|
|
+ for (; iovindex < out_num; iovindex++, pbufvindex++) {
|
|
+ pbufv->count++;
|
|
+ pbufv->buf[pbufvindex].pos = ~0; /* Dummy */
|
|
+ pbufv->buf[pbufvindex].flags = 0;
|
|
+ pbufv->buf[pbufvindex].mem = out_sg[iovindex].iov_base;
|
|
+ pbufv->buf[pbufvindex].size = out_sg[iovindex].iov_len;
|
|
+ }
|
|
+ } else {
|
|
+ /* Normal (non fast write) path */
|
|
+
|
|
+ /* Copy the rest of the buffer */
|
|
+ fbuf.mem += out_sg->iov_len;
|
|
+ copy_from_iov(&fbuf, out_num - 1, out_sg + 1);
|
|
+ fbuf.mem -= out_sg->iov_len;
|
|
+ fbuf.size = out_len;
|
|
+
|
|
+ /* TODO! Endianness of header */
|
|
+
|
|
+ /* TODO: Add checks for fuse_session_exited */
|
|
+ bufv.buf[0] = fbuf;
|
|
+ bufv.count = 1;
|
|
+ pbufv = &bufv;
|
|
+ }
|
|
+ pbufv->idx = 0;
|
|
+ pbufv->off = 0;
|
|
+ fuse_session_process_buf_int(se, pbufv, &req->ch);
|
|
+
|
|
+out:
|
|
+ if (allocated_bufv) {
|
|
+ free(pbufv);
|
|
+ }
|
|
+
|
|
+ /* If the request has no reply, still recycle the virtqueue element */
|
|
+ if (!req->reply_sent) {
|
|
+ struct VuVirtq *q = vu_get_queue(dev, qi->qidx);
|
|
+
|
|
+ fuse_log(FUSE_LOG_DEBUG, "%s: elem %d no reply sent\n", __func__,
|
|
+ elem->index);
|
|
+
|
|
+ pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
|
|
+ pthread_mutex_lock(&qi->vq_lock);
|
|
+ vu_queue_push(dev, q, elem, 0);
|
|
+ vu_queue_notify(dev, q);
|
|
+ pthread_mutex_unlock(&qi->vq_lock);
|
|
+ pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
|
|
+ }
|
|
+
|
|
+ pthread_mutex_destroy(&req->ch.lock);
|
|
+ free(fbuf.mem);
|
|
+ free(req);
|
|
+}
|
|
+
|
|
/* Thread function for individual queues, created when a queue is 'started' */
|
|
static void *fv_queue_thread(void *opaque)
|
|
{
|
|
struct fv_QueueInfo *qi = opaque;
|
|
struct VuDev *dev = &qi->virtio_dev->dev;
|
|
struct VuVirtq *q = vu_get_queue(dev, qi->qidx);
|
|
- struct fuse_session *se = qi->virtio_dev->se;
|
|
- struct fuse_chan ch;
|
|
- struct fuse_buf fbuf;
|
|
+ GThreadPool *pool;
|
|
|
|
- fbuf.mem = NULL;
|
|
- fbuf.flags = 0;
|
|
-
|
|
- fuse_mutex_init(&ch.lock);
|
|
- ch.fd = (int)0xdaff0d111;
|
|
- ch.qi = qi;
|
|
+ pool = g_thread_pool_new(fv_queue_worker, qi, 1 /* TODO max_threads */,
|
|
+ TRUE, NULL);
|
|
+ if (!pool) {
|
|
+ fuse_log(FUSE_LOG_ERR, "%s: g_thread_pool_new failed\n", __func__);
|
|
+ return NULL;
|
|
+ }
|
|
|
|
fuse_log(FUSE_LOG_INFO, "%s: Start for queue %d kick_fd %d\n", __func__,
|
|
qi->qidx, qi->kick_fd);
|
|
@@ -478,6 +636,7 @@ static void *fv_queue_thread(void *opaque)
|
|
/* Mutual exclusion with virtio_loop() */
|
|
ret = pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
|
|
assert(ret == 0); /* there is no possible error case */
|
|
+ pthread_mutex_lock(&qi->vq_lock);
|
|
/* out is from guest, in is too guest */
|
|
unsigned int in_bytes, out_bytes;
|
|
vu_queue_get_avail_bytes(dev, q, &in_bytes, &out_bytes, ~0, ~0);
|
|
@@ -486,141 +645,22 @@ static void *fv_queue_thread(void *opaque)
|
|
"%s: Queue %d gave evalue: %zx available: in: %u out: %u\n",
|
|
__func__, qi->qidx, (size_t)evalue, in_bytes, out_bytes);
|
|
|
|
-
|
|
while (1) {
|
|
- bool allocated_bufv = false;
|
|
- struct fuse_bufvec bufv;
|
|
- struct fuse_bufvec *pbufv;
|
|
-
|
|
- /*
|
|
- * An element contains one request and the space to send our
|
|
- * response They're spread over multiple descriptors in a
|
|
- * scatter/gather set and we can't trust the guest to keep them
|
|
- * still; so copy in/out.
|
|
- */
|
|
- VuVirtqElement *elem = vu_queue_pop(dev, q, sizeof(VuVirtqElement));
|
|
- if (!elem) {
|
|
+ FVRequest *req = vu_queue_pop(dev, q, sizeof(FVRequest));
|
|
+ if (!req) {
|
|
break;
|
|
}
|
|
|
|
- qi->qe = elem;
|
|
- qi->reply_sent = false;
|
|
+ req->reply_sent = false;
|
|
|
|
- if (!fbuf.mem) {
|
|
- fbuf.mem = malloc(se->bufsize);
|
|
- assert(fbuf.mem);
|
|
- assert(se->bufsize > sizeof(struct fuse_in_header));
|
|
- }
|
|
- /* The 'out' part of the elem is from qemu */
|
|
- unsigned int out_num = elem->out_num;
|
|
- struct iovec *out_sg = elem->out_sg;
|
|
- size_t out_len = iov_size(out_sg, out_num);
|
|
- fuse_log(FUSE_LOG_DEBUG,
|
|
- "%s: elem %d: with %d out desc of length %zd\n", __func__,
|
|
- elem->index, out_num, out_len);
|
|
-
|
|
- /*
|
|
- * The elem should contain a 'fuse_in_header' (in to fuse)
|
|
- * plus the data based on the len in the header.
|
|
- */
|
|
- if (out_len < sizeof(struct fuse_in_header)) {
|
|
- fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for in_header\n",
|
|
- __func__, elem->index);
|
|
- assert(0); /* TODO */
|
|
- }
|
|
- if (out_len > se->bufsize) {
|
|
- fuse_log(FUSE_LOG_ERR, "%s: elem %d too large for buffer\n",
|
|
- __func__, elem->index);
|
|
- assert(0); /* TODO */
|
|
- }
|
|
- /* Copy just the first element and look at it */
|
|
- copy_from_iov(&fbuf, 1, out_sg);
|
|
-
|
|
- if (out_num > 2 &&
|
|
- out_sg[0].iov_len == sizeof(struct fuse_in_header) &&
|
|
- ((struct fuse_in_header *)fbuf.mem)->opcode == FUSE_WRITE &&
|
|
- out_sg[1].iov_len == sizeof(struct fuse_write_in)) {
|
|
- /*
|
|
- * For a write we don't actually need to copy the
|
|
- * data, we can just do it straight out of guest memory
|
|
- * but we must still copy the headers in case the guest
|
|
- * was nasty and changed them while we were using them.
|
|
- */
|
|
- fuse_log(FUSE_LOG_DEBUG, "%s: Write special case\n", __func__);
|
|
-
|
|
- /* copy the fuse_write_in header after the fuse_in_header */
|
|
- fbuf.mem += out_sg->iov_len;
|
|
- copy_from_iov(&fbuf, 1, out_sg + 1);
|
|
- fbuf.mem -= out_sg->iov_len;
|
|
- fbuf.size = out_sg[0].iov_len + out_sg[1].iov_len;
|
|
-
|
|
- /* Allocate the bufv, with space for the rest of the iov */
|
|
- allocated_bufv = true;
|
|
- pbufv = malloc(sizeof(struct fuse_bufvec) +
|
|
- sizeof(struct fuse_buf) * (out_num - 2));
|
|
- if (!pbufv) {
|
|
- vu_queue_unpop(dev, q, elem, 0);
|
|
- free(elem);
|
|
- fuse_log(FUSE_LOG_ERR, "%s: pbufv malloc failed\n",
|
|
- __func__);
|
|
- goto out;
|
|
- }
|
|
-
|
|
- pbufv->count = 1;
|
|
- pbufv->buf[0] = fbuf;
|
|
-
|
|
- size_t iovindex, pbufvindex;
|
|
- iovindex = 2; /* 2 headers, separate iovs */
|
|
- pbufvindex = 1; /* 2 headers, 1 fusebuf */
|
|
-
|
|
- for (; iovindex < out_num; iovindex++, pbufvindex++) {
|
|
- pbufv->count++;
|
|
- pbufv->buf[pbufvindex].pos = ~0; /* Dummy */
|
|
- pbufv->buf[pbufvindex].flags = 0;
|
|
- pbufv->buf[pbufvindex].mem = out_sg[iovindex].iov_base;
|
|
- pbufv->buf[pbufvindex].size = out_sg[iovindex].iov_len;
|
|
- }
|
|
- } else {
|
|
- /* Normal (non fast write) path */
|
|
-
|
|
- /* Copy the rest of the buffer */
|
|
- fbuf.mem += out_sg->iov_len;
|
|
- copy_from_iov(&fbuf, out_num - 1, out_sg + 1);
|
|
- fbuf.mem -= out_sg->iov_len;
|
|
- fbuf.size = out_len;
|
|
-
|
|
- /* TODO! Endianness of header */
|
|
-
|
|
- /* TODO: Add checks for fuse_session_exited */
|
|
- bufv.buf[0] = fbuf;
|
|
- bufv.count = 1;
|
|
- pbufv = &bufv;
|
|
- }
|
|
- pbufv->idx = 0;
|
|
- pbufv->off = 0;
|
|
- fuse_session_process_buf_int(se, pbufv, &ch);
|
|
-
|
|
- if (allocated_bufv) {
|
|
- free(pbufv);
|
|
- }
|
|
-
|
|
- if (!qi->reply_sent) {
|
|
- fuse_log(FUSE_LOG_DEBUG, "%s: elem %d no reply sent\n",
|
|
- __func__, elem->index);
|
|
- /* I think we've still got to recycle the element */
|
|
- vu_queue_push(dev, q, elem, 0);
|
|
- vu_queue_notify(dev, q);
|
|
- }
|
|
- qi->qe = NULL;
|
|
- free(elem);
|
|
- elem = NULL;
|
|
+ g_thread_pool_push(pool, req, NULL);
|
|
}
|
|
|
|
+ pthread_mutex_unlock(&qi->vq_lock);
|
|
pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
|
|
}
|
|
-out:
|
|
- pthread_mutex_destroy(&ch.lock);
|
|
- free(fbuf.mem);
|
|
+
|
|
+ g_thread_pool_free(pool, FALSE, TRUE);
|
|
|
|
return NULL;
|
|
}
|
|
@@ -643,6 +683,7 @@ static void fv_queue_cleanup_thread(struct fv_VuDev *vud, int qidx)
|
|
fuse_log(FUSE_LOG_ERR, "%s: Failed to join thread idx %d err %d\n",
|
|
__func__, qidx, ret);
|
|
}
|
|
+ pthread_mutex_destroy(&ourqi->vq_lock);
|
|
close(ourqi->kill_fd);
|
|
ourqi->kick_fd = -1;
|
|
free(vud->qi[qidx]);
|
|
@@ -696,6 +737,8 @@ static void fv_queue_set_started(VuDev *dev, int qidx, bool started)
|
|
|
|
ourqi->kill_fd = eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE);
|
|
assert(ourqi->kill_fd != -1);
|
|
+ pthread_mutex_init(&ourqi->vq_lock, NULL);
|
|
+
|
|
if (pthread_create(&ourqi->thread, NULL, fv_queue_thread, ourqi)) {
|
|
fuse_log(FUSE_LOG_ERR, "%s: Failed to create thread for queue %d\n",
|
|
__func__, qidx);
|
|
--
|
|
1.8.3.1
|
|
|