534 lines
19 KiB
Diff
534 lines
19 KiB
Diff
From b0db5e666aaa43eadff3e60a1ada704f33b03074 Mon Sep 17 00:00:00 2001
|
|
From: "Dr. David Alan Gilbert" <dgilbert@redhat.com>
|
|
Date: Mon, 27 Jan 2020 19:02:19 +0100
|
|
Subject: [PATCH 108/116] virtiofsd: process requests in a thread pool
|
|
MIME-Version: 1.0
|
|
Content-Type: text/plain; charset=UTF-8
|
|
Content-Transfer-Encoding: 8bit
|
|
|
|
RH-Author: Dr. David Alan Gilbert <dgilbert@redhat.com>
|
|
Message-id: <20200127190227.40942-105-dgilbert@redhat.com>
|
|
Patchwork-id: 93554
|
|
O-Subject: [RHEL-AV-8.2 qemu-kvm PATCH 104/112] virtiofsd: process requests in a thread pool
|
|
Bugzilla: 1694164
|
|
RH-Acked-by: Philippe Mathieu-Daudé <philmd@redhat.com>
|
|
RH-Acked-by: Stefan Hajnoczi <stefanha@redhat.com>
|
|
RH-Acked-by: Sergio Lopez Pascual <slp@redhat.com>
|
|
|
|
From: Stefan Hajnoczi <stefanha@redhat.com>
|
|
|
|
Introduce a thread pool so that fv_queue_thread() just pops
|
|
VuVirtqElements and hands them to the thread pool. For the time being
|
|
only one worker thread is allowed since passthrough_ll.c is not
|
|
thread-safe yet. Future patches will lift this restriction so that
|
|
multiple FUSE requests can be processed in parallel.
|
|
|
|
The main new concept is struct FVRequest, which contains both
|
|
VuVirtqElement and struct fuse_chan. We now have fv_VuDev for a device,
|
|
fv_QueueInfo for a virtqueue, and FVRequest for a request. Some of
|
|
fv_QueueInfo's fields are moved into FVRequest because they are
|
|
per-request. The name FVRequest conforms to QEMU coding style and I
|
|
expect the struct fv_* types will be renamed in a future refactoring.
|
|
|
|
This patch series is not optimal. fbuf reuse is dropped so each request
|
|
does malloc(se->bufsize), but there is no clean and cheap way to keep
|
|
this with a thread pool. The vq_lock mutex is held for longer than
|
|
necessary, especially during the eventfd_write() syscall. Performance
|
|
can be improved in the future.
|
|
|
|
prctl(2) had to be added to the seccomp whitelist because glib invokes
|
|
it.
|
|
|
|
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
|
|
Reviewed-by: Misono Tomohiro <misono.tomohiro@jp.fujitsu.com>
|
|
Signed-off-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
|
|
(cherry picked from commit a3d756c5aecccc4c0e51060a7e2f1c87bf8f1180)
|
|
Signed-off-by: Miroslav Rezanina <mrezanin@redhat.com>
|
|
---
|
|
tools/virtiofsd/fuse_virtio.c | 359 +++++++++++++++++++++++-------------------
|
|
1 file changed, 201 insertions(+), 158 deletions(-)
|
|
|
|
diff --git a/tools/virtiofsd/fuse_virtio.c b/tools/virtiofsd/fuse_virtio.c
|
|
index f6242f9..0dcf2ef 100644
|
|
--- a/tools/virtiofsd/fuse_virtio.c
|
|
+++ b/tools/virtiofsd/fuse_virtio.c
|
|
@@ -22,6 +22,7 @@
|
|
|
|
#include <assert.h>
|
|
#include <errno.h>
|
|
+#include <glib.h>
|
|
#include <stdint.h>
|
|
#include <stdio.h>
|
|
#include <stdlib.h>
|
|
@@ -37,17 +38,28 @@
|
|
struct fv_VuDev;
|
|
struct fv_QueueInfo {
|
|
pthread_t thread;
|
|
+ /*
|
|
+ * This lock protects the VuVirtq preventing races between
|
|
+ * fv_queue_thread() and fv_queue_worker().
|
|
+ */
|
|
+ pthread_mutex_t vq_lock;
|
|
+
|
|
struct fv_VuDev *virtio_dev;
|
|
|
|
/* Our queue index, corresponds to array position */
|
|
int qidx;
|
|
int kick_fd;
|
|
int kill_fd; /* For killing the thread */
|
|
+};
|
|
|
|
- /* The element for the command currently being processed */
|
|
- VuVirtqElement *qe;
|
|
+/* A FUSE request */
|
|
+typedef struct {
|
|
+ VuVirtqElement elem;
|
|
+ struct fuse_chan ch;
|
|
+
|
|
+ /* Used to complete requests that involve no reply */
|
|
bool reply_sent;
|
|
-};
|
|
+} FVRequest;
|
|
|
|
/*
|
|
* We pass the dev element into libvhost-user
|
|
@@ -191,8 +203,11 @@ static void copy_iov(struct iovec *src_iov, int src_count,
|
|
int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch,
|
|
struct iovec *iov, int count)
|
|
{
|
|
- VuVirtqElement *elem;
|
|
- VuVirtq *q;
|
|
+ FVRequest *req = container_of(ch, FVRequest, ch);
|
|
+ struct fv_QueueInfo *qi = ch->qi;
|
|
+ VuDev *dev = &se->virtio_dev->dev;
|
|
+ VuVirtq *q = vu_get_queue(dev, qi->qidx);
|
|
+ VuVirtqElement *elem = &req->elem;
|
|
int ret = 0;
|
|
|
|
assert(count >= 1);
|
|
@@ -205,11 +220,7 @@ int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch,
|
|
|
|
/* unique == 0 is notification, which we don't support */
|
|
assert(out->unique);
|
|
- /* For virtio we always have ch */
|
|
- assert(ch);
|
|
- assert(!ch->qi->reply_sent);
|
|
- elem = ch->qi->qe;
|
|
- q = &ch->qi->virtio_dev->dev.vq[ch->qi->qidx];
|
|
+ assert(!req->reply_sent);
|
|
|
|
/* The 'in' part of the elem is to qemu */
|
|
unsigned int in_num = elem->in_num;
|
|
@@ -236,9 +247,15 @@ int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch,
|
|
}
|
|
|
|
copy_iov(iov, count, in_sg, in_num, tosend_len);
|
|
- vu_queue_push(&se->virtio_dev->dev, q, elem, tosend_len);
|
|
- vu_queue_notify(&se->virtio_dev->dev, q);
|
|
- ch->qi->reply_sent = true;
|
|
+
|
|
+ pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
|
|
+ pthread_mutex_lock(&qi->vq_lock);
|
|
+ vu_queue_push(dev, q, elem, tosend_len);
|
|
+ vu_queue_notify(dev, q);
|
|
+ pthread_mutex_unlock(&qi->vq_lock);
|
|
+ pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
|
|
+
|
|
+ req->reply_sent = true;
|
|
|
|
err:
|
|
return ret;
|
|
@@ -254,9 +271,12 @@ int virtio_send_data_iov(struct fuse_session *se, struct fuse_chan *ch,
|
|
struct iovec *iov, int count, struct fuse_bufvec *buf,
|
|
size_t len)
|
|
{
|
|
+ FVRequest *req = container_of(ch, FVRequest, ch);
|
|
+ struct fv_QueueInfo *qi = ch->qi;
|
|
+ VuDev *dev = &se->virtio_dev->dev;
|
|
+ VuVirtq *q = vu_get_queue(dev, qi->qidx);
|
|
+ VuVirtqElement *elem = &req->elem;
|
|
int ret = 0;
|
|
- VuVirtqElement *elem;
|
|
- VuVirtq *q;
|
|
|
|
assert(count >= 1);
|
|
assert(iov[0].iov_len >= sizeof(struct fuse_out_header));
|
|
@@ -275,11 +295,7 @@ int virtio_send_data_iov(struct fuse_session *se, struct fuse_chan *ch,
|
|
/* unique == 0 is notification which we don't support */
|
|
assert(out->unique);
|
|
|
|
- /* For virtio we always have ch */
|
|
- assert(ch);
|
|
- assert(!ch->qi->reply_sent);
|
|
- elem = ch->qi->qe;
|
|
- q = &ch->qi->virtio_dev->dev.vq[ch->qi->qidx];
|
|
+ assert(!req->reply_sent);
|
|
|
|
/* The 'in' part of the elem is to qemu */
|
|
unsigned int in_num = elem->in_num;
|
|
@@ -395,33 +411,175 @@ int virtio_send_data_iov(struct fuse_session *se, struct fuse_chan *ch,
|
|
|
|
ret = 0;
|
|
|
|
- vu_queue_push(&se->virtio_dev->dev, q, elem, tosend_len);
|
|
- vu_queue_notify(&se->virtio_dev->dev, q);
|
|
+ pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
|
|
+ pthread_mutex_lock(&qi->vq_lock);
|
|
+ vu_queue_push(dev, q, elem, tosend_len);
|
|
+ vu_queue_notify(dev, q);
|
|
+ pthread_mutex_unlock(&qi->vq_lock);
|
|
+ pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
|
|
|
|
err:
|
|
if (ret == 0) {
|
|
- ch->qi->reply_sent = true;
|
|
+ req->reply_sent = true;
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
+/* Process one FVRequest in a thread pool */
|
|
+static void fv_queue_worker(gpointer data, gpointer user_data)
|
|
+{
|
|
+ struct fv_QueueInfo *qi = user_data;
|
|
+ struct fuse_session *se = qi->virtio_dev->se;
|
|
+ struct VuDev *dev = &qi->virtio_dev->dev;
|
|
+ FVRequest *req = data;
|
|
+ VuVirtqElement *elem = &req->elem;
|
|
+ struct fuse_buf fbuf = {};
|
|
+ bool allocated_bufv = false;
|
|
+ struct fuse_bufvec bufv;
|
|
+ struct fuse_bufvec *pbufv;
|
|
+
|
|
+ assert(se->bufsize > sizeof(struct fuse_in_header));
|
|
+
|
|
+ /*
|
|
+ * An element contains one request and the space to send our response
|
|
+ * They're spread over multiple descriptors in a scatter/gather set
|
|
+ * and we can't trust the guest to keep them still; so copy in/out.
|
|
+ */
|
|
+ fbuf.mem = malloc(se->bufsize);
|
|
+ assert(fbuf.mem);
|
|
+
|
|
+ fuse_mutex_init(&req->ch.lock);
|
|
+ req->ch.fd = -1;
|
|
+ req->ch.qi = qi;
|
|
+
|
|
+ /* The 'out' part of the elem is from qemu */
|
|
+ unsigned int out_num = elem->out_num;
|
|
+ struct iovec *out_sg = elem->out_sg;
|
|
+ size_t out_len = iov_size(out_sg, out_num);
|
|
+ fuse_log(FUSE_LOG_DEBUG,
|
|
+ "%s: elem %d: with %d out desc of length %zd\n",
|
|
+ __func__, elem->index, out_num, out_len);
|
|
+
|
|
+ /*
|
|
+ * The elem should contain a 'fuse_in_header' (in to fuse)
|
|
+ * plus the data based on the len in the header.
|
|
+ */
|
|
+ if (out_len < sizeof(struct fuse_in_header)) {
|
|
+ fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for in_header\n",
|
|
+ __func__, elem->index);
|
|
+ assert(0); /* TODO */
|
|
+ }
|
|
+ if (out_len > se->bufsize) {
|
|
+ fuse_log(FUSE_LOG_ERR, "%s: elem %d too large for buffer\n", __func__,
|
|
+ elem->index);
|
|
+ assert(0); /* TODO */
|
|
+ }
|
|
+ /* Copy just the first element and look at it */
|
|
+ copy_from_iov(&fbuf, 1, out_sg);
|
|
+
|
|
+ pbufv = NULL; /* Compiler thinks an unitialised path */
|
|
+ if (out_num > 2 &&
|
|
+ out_sg[0].iov_len == sizeof(struct fuse_in_header) &&
|
|
+ ((struct fuse_in_header *)fbuf.mem)->opcode == FUSE_WRITE &&
|
|
+ out_sg[1].iov_len == sizeof(struct fuse_write_in)) {
|
|
+ /*
|
|
+ * For a write we don't actually need to copy the
|
|
+ * data, we can just do it straight out of guest memory
|
|
+ * but we must still copy the headers in case the guest
|
|
+ * was nasty and changed them while we were using them.
|
|
+ */
|
|
+ fuse_log(FUSE_LOG_DEBUG, "%s: Write special case\n", __func__);
|
|
+
|
|
+ /* copy the fuse_write_in header afte rthe fuse_in_header */
|
|
+ fbuf.mem += out_sg->iov_len;
|
|
+ copy_from_iov(&fbuf, 1, out_sg + 1);
|
|
+ fbuf.mem -= out_sg->iov_len;
|
|
+ fbuf.size = out_sg[0].iov_len + out_sg[1].iov_len;
|
|
+
|
|
+ /* Allocate the bufv, with space for the rest of the iov */
|
|
+ pbufv = malloc(sizeof(struct fuse_bufvec) +
|
|
+ sizeof(struct fuse_buf) * (out_num - 2));
|
|
+ if (!pbufv) {
|
|
+ fuse_log(FUSE_LOG_ERR, "%s: pbufv malloc failed\n",
|
|
+ __func__);
|
|
+ goto out;
|
|
+ }
|
|
+
|
|
+ allocated_bufv = true;
|
|
+ pbufv->count = 1;
|
|
+ pbufv->buf[0] = fbuf;
|
|
+
|
|
+ size_t iovindex, pbufvindex;
|
|
+ iovindex = 2; /* 2 headers, separate iovs */
|
|
+ pbufvindex = 1; /* 2 headers, 1 fusebuf */
|
|
+
|
|
+ for (; iovindex < out_num; iovindex++, pbufvindex++) {
|
|
+ pbufv->count++;
|
|
+ pbufv->buf[pbufvindex].pos = ~0; /* Dummy */
|
|
+ pbufv->buf[pbufvindex].flags = 0;
|
|
+ pbufv->buf[pbufvindex].mem = out_sg[iovindex].iov_base;
|
|
+ pbufv->buf[pbufvindex].size = out_sg[iovindex].iov_len;
|
|
+ }
|
|
+ } else {
|
|
+ /* Normal (non fast write) path */
|
|
+
|
|
+ /* Copy the rest of the buffer */
|
|
+ fbuf.mem += out_sg->iov_len;
|
|
+ copy_from_iov(&fbuf, out_num - 1, out_sg + 1);
|
|
+ fbuf.mem -= out_sg->iov_len;
|
|
+ fbuf.size = out_len;
|
|
+
|
|
+ /* TODO! Endianness of header */
|
|
+
|
|
+ /* TODO: Add checks for fuse_session_exited */
|
|
+ bufv.buf[0] = fbuf;
|
|
+ bufv.count = 1;
|
|
+ pbufv = &bufv;
|
|
+ }
|
|
+ pbufv->idx = 0;
|
|
+ pbufv->off = 0;
|
|
+ fuse_session_process_buf_int(se, pbufv, &req->ch);
|
|
+
|
|
+out:
|
|
+ if (allocated_bufv) {
|
|
+ free(pbufv);
|
|
+ }
|
|
+
|
|
+ /* If the request has no reply, still recycle the virtqueue element */
|
|
+ if (!req->reply_sent) {
|
|
+ struct VuVirtq *q = vu_get_queue(dev, qi->qidx);
|
|
+
|
|
+ fuse_log(FUSE_LOG_DEBUG, "%s: elem %d no reply sent\n", __func__,
|
|
+ elem->index);
|
|
+
|
|
+ pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
|
|
+ pthread_mutex_lock(&qi->vq_lock);
|
|
+ vu_queue_push(dev, q, elem, 0);
|
|
+ vu_queue_notify(dev, q);
|
|
+ pthread_mutex_unlock(&qi->vq_lock);
|
|
+ pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
|
|
+ }
|
|
+
|
|
+ pthread_mutex_destroy(&req->ch.lock);
|
|
+ free(fbuf.mem);
|
|
+ free(req);
|
|
+}
|
|
+
|
|
/* Thread function for individual queues, created when a queue is 'started' */
|
|
static void *fv_queue_thread(void *opaque)
|
|
{
|
|
struct fv_QueueInfo *qi = opaque;
|
|
struct VuDev *dev = &qi->virtio_dev->dev;
|
|
struct VuVirtq *q = vu_get_queue(dev, qi->qidx);
|
|
- struct fuse_session *se = qi->virtio_dev->se;
|
|
- struct fuse_chan ch;
|
|
- struct fuse_buf fbuf;
|
|
+ GThreadPool *pool;
|
|
|
|
- fbuf.mem = NULL;
|
|
- fbuf.flags = 0;
|
|
-
|
|
- fuse_mutex_init(&ch.lock);
|
|
- ch.fd = (int)0xdaff0d111;
|
|
- ch.qi = qi;
|
|
+ pool = g_thread_pool_new(fv_queue_worker, qi, 1 /* TODO max_threads */,
|
|
+ TRUE, NULL);
|
|
+ if (!pool) {
|
|
+ fuse_log(FUSE_LOG_ERR, "%s: g_thread_pool_new failed\n", __func__);
|
|
+ return NULL;
|
|
+ }
|
|
|
|
fuse_log(FUSE_LOG_INFO, "%s: Start for queue %d kick_fd %d\n", __func__,
|
|
qi->qidx, qi->kick_fd);
|
|
@@ -478,6 +636,7 @@ static void *fv_queue_thread(void *opaque)
|
|
/* Mutual exclusion with virtio_loop() */
|
|
ret = pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
|
|
assert(ret == 0); /* there is no possible error case */
|
|
+ pthread_mutex_lock(&qi->vq_lock);
|
|
/* out is from guest, in is too guest */
|
|
unsigned int in_bytes, out_bytes;
|
|
vu_queue_get_avail_bytes(dev, q, &in_bytes, &out_bytes, ~0, ~0);
|
|
@@ -486,141 +645,22 @@ static void *fv_queue_thread(void *opaque)
|
|
"%s: Queue %d gave evalue: %zx available: in: %u out: %u\n",
|
|
__func__, qi->qidx, (size_t)evalue, in_bytes, out_bytes);
|
|
|
|
-
|
|
while (1) {
|
|
- bool allocated_bufv = false;
|
|
- struct fuse_bufvec bufv;
|
|
- struct fuse_bufvec *pbufv;
|
|
-
|
|
- /*
|
|
- * An element contains one request and the space to send our
|
|
- * response They're spread over multiple descriptors in a
|
|
- * scatter/gather set and we can't trust the guest to keep them
|
|
- * still; so copy in/out.
|
|
- */
|
|
- VuVirtqElement *elem = vu_queue_pop(dev, q, sizeof(VuVirtqElement));
|
|
- if (!elem) {
|
|
+ FVRequest *req = vu_queue_pop(dev, q, sizeof(FVRequest));
|
|
+ if (!req) {
|
|
break;
|
|
}
|
|
|
|
- qi->qe = elem;
|
|
- qi->reply_sent = false;
|
|
+ req->reply_sent = false;
|
|
|
|
- if (!fbuf.mem) {
|
|
- fbuf.mem = malloc(se->bufsize);
|
|
- assert(fbuf.mem);
|
|
- assert(se->bufsize > sizeof(struct fuse_in_header));
|
|
- }
|
|
- /* The 'out' part of the elem is from qemu */
|
|
- unsigned int out_num = elem->out_num;
|
|
- struct iovec *out_sg = elem->out_sg;
|
|
- size_t out_len = iov_size(out_sg, out_num);
|
|
- fuse_log(FUSE_LOG_DEBUG,
|
|
- "%s: elem %d: with %d out desc of length %zd\n", __func__,
|
|
- elem->index, out_num, out_len);
|
|
-
|
|
- /*
|
|
- * The elem should contain a 'fuse_in_header' (in to fuse)
|
|
- * plus the data based on the len in the header.
|
|
- */
|
|
- if (out_len < sizeof(struct fuse_in_header)) {
|
|
- fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for in_header\n",
|
|
- __func__, elem->index);
|
|
- assert(0); /* TODO */
|
|
- }
|
|
- if (out_len > se->bufsize) {
|
|
- fuse_log(FUSE_LOG_ERR, "%s: elem %d too large for buffer\n",
|
|
- __func__, elem->index);
|
|
- assert(0); /* TODO */
|
|
- }
|
|
- /* Copy just the first element and look at it */
|
|
- copy_from_iov(&fbuf, 1, out_sg);
|
|
-
|
|
- if (out_num > 2 &&
|
|
- out_sg[0].iov_len == sizeof(struct fuse_in_header) &&
|
|
- ((struct fuse_in_header *)fbuf.mem)->opcode == FUSE_WRITE &&
|
|
- out_sg[1].iov_len == sizeof(struct fuse_write_in)) {
|
|
- /*
|
|
- * For a write we don't actually need to copy the
|
|
- * data, we can just do it straight out of guest memory
|
|
- * but we must still copy the headers in case the guest
|
|
- * was nasty and changed them while we were using them.
|
|
- */
|
|
- fuse_log(FUSE_LOG_DEBUG, "%s: Write special case\n", __func__);
|
|
-
|
|
- /* copy the fuse_write_in header after the fuse_in_header */
|
|
- fbuf.mem += out_sg->iov_len;
|
|
- copy_from_iov(&fbuf, 1, out_sg + 1);
|
|
- fbuf.mem -= out_sg->iov_len;
|
|
- fbuf.size = out_sg[0].iov_len + out_sg[1].iov_len;
|
|
-
|
|
- /* Allocate the bufv, with space for the rest of the iov */
|
|
- allocated_bufv = true;
|
|
- pbufv = malloc(sizeof(struct fuse_bufvec) +
|
|
- sizeof(struct fuse_buf) * (out_num - 2));
|
|
- if (!pbufv) {
|
|
- vu_queue_unpop(dev, q, elem, 0);
|
|
- free(elem);
|
|
- fuse_log(FUSE_LOG_ERR, "%s: pbufv malloc failed\n",
|
|
- __func__);
|
|
- goto out;
|
|
- }
|
|
-
|
|
- pbufv->count = 1;
|
|
- pbufv->buf[0] = fbuf;
|
|
-
|
|
- size_t iovindex, pbufvindex;
|
|
- iovindex = 2; /* 2 headers, separate iovs */
|
|
- pbufvindex = 1; /* 2 headers, 1 fusebuf */
|
|
-
|
|
- for (; iovindex < out_num; iovindex++, pbufvindex++) {
|
|
- pbufv->count++;
|
|
- pbufv->buf[pbufvindex].pos = ~0; /* Dummy */
|
|
- pbufv->buf[pbufvindex].flags = 0;
|
|
- pbufv->buf[pbufvindex].mem = out_sg[iovindex].iov_base;
|
|
- pbufv->buf[pbufvindex].size = out_sg[iovindex].iov_len;
|
|
- }
|
|
- } else {
|
|
- /* Normal (non fast write) path */
|
|
-
|
|
- /* Copy the rest of the buffer */
|
|
- fbuf.mem += out_sg->iov_len;
|
|
- copy_from_iov(&fbuf, out_num - 1, out_sg + 1);
|
|
- fbuf.mem -= out_sg->iov_len;
|
|
- fbuf.size = out_len;
|
|
-
|
|
- /* TODO! Endianness of header */
|
|
-
|
|
- /* TODO: Add checks for fuse_session_exited */
|
|
- bufv.buf[0] = fbuf;
|
|
- bufv.count = 1;
|
|
- pbufv = &bufv;
|
|
- }
|
|
- pbufv->idx = 0;
|
|
- pbufv->off = 0;
|
|
- fuse_session_process_buf_int(se, pbufv, &ch);
|
|
-
|
|
- if (allocated_bufv) {
|
|
- free(pbufv);
|
|
- }
|
|
-
|
|
- if (!qi->reply_sent) {
|
|
- fuse_log(FUSE_LOG_DEBUG, "%s: elem %d no reply sent\n",
|
|
- __func__, elem->index);
|
|
- /* I think we've still got to recycle the element */
|
|
- vu_queue_push(dev, q, elem, 0);
|
|
- vu_queue_notify(dev, q);
|
|
- }
|
|
- qi->qe = NULL;
|
|
- free(elem);
|
|
- elem = NULL;
|
|
+ g_thread_pool_push(pool, req, NULL);
|
|
}
|
|
|
|
+ pthread_mutex_unlock(&qi->vq_lock);
|
|
pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
|
|
}
|
|
-out:
|
|
- pthread_mutex_destroy(&ch.lock);
|
|
- free(fbuf.mem);
|
|
+
|
|
+ g_thread_pool_free(pool, FALSE, TRUE);
|
|
|
|
return NULL;
|
|
}
|
|
@@ -643,6 +683,7 @@ static void fv_queue_cleanup_thread(struct fv_VuDev *vud, int qidx)
|
|
fuse_log(FUSE_LOG_ERR, "%s: Failed to join thread idx %d err %d\n",
|
|
__func__, qidx, ret);
|
|
}
|
|
+ pthread_mutex_destroy(&ourqi->vq_lock);
|
|
close(ourqi->kill_fd);
|
|
ourqi->kick_fd = -1;
|
|
free(vud->qi[qidx]);
|
|
@@ -696,6 +737,8 @@ static void fv_queue_set_started(VuDev *dev, int qidx, bool started)
|
|
|
|
ourqi->kill_fd = eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE);
|
|
assert(ourqi->kill_fd != -1);
|
|
+ pthread_mutex_init(&ourqi->vq_lock, NULL);
|
|
+
|
|
if (pthread_create(&ourqi->thread, NULL, fv_queue_thread, ourqi)) {
|
|
fuse_log(FUSE_LOG_ERR, "%s: Failed to create thread for queue %d\n",
|
|
__func__, qidx);
|
|
--
|
|
1.8.3.1
|
|
|