600 lines
20 KiB
Diff
600 lines
20 KiB
Diff
|
From 4896d07bf753683a3dbba4210384b0d862ff2d11 Mon Sep 17 00:00:00 2001
|
||
|
From: Eduard Bagdasaryan <eduard.bagdasaryan@measurement-factory.com>
|
||
|
Date: Thu, 7 Dec 2023 16:47:08 +0000
|
||
|
Subject: [PATCH 1/7] Break long store_client call chains with async calls
|
||
|
(#1056)
|
||
|
|
||
|
The store_client class design created very long call chains spanning
|
||
|
Squid-client and Squid-server processing and multiple transactions.
|
||
|
These call chains also create ideal conditions for dangerous recursive
|
||
|
relationships between communicating classes (a.k.a. "reentrancy" among
|
||
|
Squid developers). For example, storeClientCopy() enters store_client
|
||
|
and triggers disk I/O that triggers invokeHandlers() that re-enters the
|
||
|
same store_client object and starts competing with the original
|
||
|
storeClientCopy() processing state.
|
||
|
|
||
|
The official code prevented the worst recursion cases with three(!)
|
||
|
boolean flags and time-based events abused to break some of the call
|
||
|
chains, but that approach did not solve all of the problems while also
|
||
|
losing transaction context information across time-based events.
|
||
|
|
||
|
This change effectively makes STCB storeClientCopy() callbacks
|
||
|
asynchronous, eliminating the need for time-based events and one of the
|
||
|
flags. It shortens many call chains and preserves transaction context.
|
||
|
The remaining problems can and should be eliminated by converting
|
||
|
store_client into AsyncJob, but those changes deserve a dedicated PR.
|
||
|
|
||
|
store_client orchestrates cooperation of multiple asynchronous players:
|
||
|
|
||
|
* Sink: A Store client requests a STCB callback via a
|
||
|
storeClientCopy()/copy() call. A set _callback.callback_handler
|
||
|
implies that the client is waiting for this callback.
|
||
|
|
||
|
* Source1: A Store disk reading subsystem activated by the storeRead()
|
||
|
call "spontaneously" delivers response bytes via storeClientRead*()
|
||
|
callbacks. The disk_io_pending flag implies waiting for them.
|
||
|
|
||
|
* Source2: Store memory subsystem activated by storeClientListAdd()
|
||
|
"spontaneously" delivers response bytes via invokeHandlers().
|
||
|
|
||
|
* Source3: Store disk subsystem activated by storeSwapInStart()
|
||
|
"spontaneously" notifies of EOF/error by calling noteSwapInDone().
|
||
|
|
||
|
* Source4: A store_client object owner may delete the object by
|
||
|
"spontaneously" calling storeUnregister(). The official code was
|
||
|
converting this event into an error-notifying callback.
|
||
|
|
||
|
We continue to answer each storeClientCopy() request with the first
|
||
|
available information even though several SourceN calls are possible
|
||
|
while we are waiting to complete the STCB callback. The StoreIOBuffer
|
||
|
API and STCB recipients do not support data+error/eof combinations, and
|
||
|
future code will move this wait to the main event loop anyway. This
|
||
|
first-available approach means that the creation of the notifier call
|
||
|
effectively ends answer processing -- store_client just waits for that
|
||
|
call to fire so that it can relay the answer to still-synchronous STCB.
|
||
|
When STCB itself becomes asynchronous, this logic will continue to work.
|
||
|
|
||
|
Also stopped calling STCB from storeUnregister(). Conceptually, the
|
||
|
storeUnregister() and storeClientCopy() callers ought to represent the
|
||
|
same get-content-from-Store task; there should be no need to notify that
|
||
|
task about what it is doing. Technically, analysis of STCB callbacks
|
||
|
showed that many such notifications would be dangerous (if they are or
|
||
|
become reachable). At the time of the storeUnregister() call, the STCB
|
||
|
callbacks are usually unset (e.g., when storeUnregister() is called from
|
||
|
the destructor, after that object has finished copying -- a very common
|
||
|
case) or do not do anything (useful).
|
||
|
|
||
|
Also removed callback_data from the Callback::pending() condition. It is
|
||
|
conceptually wrong to require non-nil callback parameter, and it is
|
||
|
never cleared separately from the callback_handler data member anyway.
|
||
|
|
||
|
Also hid copyInto into the private store_client section to make sure it
|
||
|
is not modified while we are waiting to complete the STCB callback. This
|
||
|
move required adding a couple of read-only wrapper methods like
|
||
|
bytesWanted() and noteSwapInDone().
|
||
|
|
||
|
Also simplified error/EOF/bytes handling on copy()-STCB path using
|
||
|
dedicated methods (e.g., store_client::callback() API is no longer
|
||
|
mixing EOF and error signals).
|
||
|
|
||
|
Modified-by: Alex Burmashev <alexander.burmashev@oracle.com>
|
||
|
Signed-off-by: Alex Burmashev <alexander.burmashev@oracle.com>
|
||
|
---
|
||
|
src/MemObject.cc | 6 +-
|
||
|
src/StoreClient.h | 64 ++++++++++--
|
||
|
src/store_client.cc | 177 ++++++++++++++++++++++-----------
|
||
|
src/store_swapin.cc | 2 +-
|
||
|
src/tests/stub_store_client.cc | 5 +-
|
||
|
5 files changed, 186 insertions(+), 68 deletions(-)
|
||
|
|
||
|
diff --git a/src/MemObject.cc b/src/MemObject.cc
|
||
|
index df7791f..4ba63cc 100644
|
||
|
--- a/src/MemObject.cc
|
||
|
+++ b/src/MemObject.cc
|
||
|
@@ -196,8 +196,8 @@ struct LowestMemReader : public unary_function<store_client, void> {
|
||
|
LowestMemReader(int64_t seed):current(seed) {}
|
||
|
|
||
|
void operator() (store_client const &x) {
|
||
|
- if (x.memReaderHasLowerOffset(current))
|
||
|
- current = x.copyInto.offset;
|
||
|
+ if (x.getType() == STORE_MEM_CLIENT)
|
||
|
+ current = std::min(current, x.readOffset());
|
||
|
}
|
||
|
|
||
|
int64_t current;
|
||
|
@@ -492,7 +492,7 @@ MemObject::mostBytesAllowed() const
|
||
|
|
||
|
#endif
|
||
|
|
||
|
- j = sc->delayId.bytesWanted(0, sc->copyInto.length);
|
||
|
+ j = sc->bytesWanted();
|
||
|
|
||
|
if (j > jmax) {
|
||
|
jmax = j;
|
||
|
diff --git a/src/StoreClient.h b/src/StoreClient.h
|
||
|
index 65472d8..457844a 100644
|
||
|
--- a/src/StoreClient.h
|
||
|
+++ b/src/StoreClient.h
|
||
|
@@ -12,6 +12,7 @@
|
||
|
#include "dlink.h"
|
||
|
#include "StoreIOBuffer.h"
|
||
|
#include "StoreIOState.h"
|
||
|
+#include "base/AsyncCall.h"
|
||
|
|
||
|
typedef void STCB(void *, StoreIOBuffer); /* store callback */
|
||
|
|
||
|
@@ -39,14 +40,32 @@ class store_client
|
||
|
public:
|
||
|
store_client(StoreEntry *);
|
||
|
~store_client();
|
||
|
- bool memReaderHasLowerOffset(int64_t) const;
|
||
|
+
|
||
|
+ /// An offset into the stored response bytes, including the HTTP response
|
||
|
+ /// headers (if any). Note that this offset does not include Store entry
|
||
|
+ /// metadata, because it is not a part of the stored response.
|
||
|
+ /// \retval 0 means the client wants to read HTTP response headers.
|
||
|
+ /// \retval +N the response byte that the client wants to read next.
|
||
|
+ /// \retval -N should not occur.
|
||
|
+ // TODO: Callers do not expect negative offset. Verify that the return
|
||
|
+ // value cannot be negative and convert to unsigned in this case.
|
||
|
+ int64_t readOffset() const { return copyInto.offset; }
|
||
|
+
|
||
|
int getType() const;
|
||
|
- void fail();
|
||
|
- void callback(ssize_t len, bool error = false);
|
||
|
+
|
||
|
+ /// React to the end of reading the response from disk. There will be no
|
||
|
+ /// more readHeader() and readBody() callbacks for the current storeRead()
|
||
|
+ /// swapin after this notification.
|
||
|
+ void noteSwapInDone(bool error);
|
||
|
+
|
||
|
void doCopy (StoreEntry *e);
|
||
|
void readHeader(const char *buf, ssize_t len);
|
||
|
void readBody(const char *buf, ssize_t len);
|
||
|
+
|
||
|
+ /// Request StoreIOBuffer-described response data via an asynchronous STCB
|
||
|
+ /// callback. At most one outstanding request is allowed per store_client.
|
||
|
void copy(StoreEntry *, StoreIOBuffer, STCB *, void *);
|
||
|
+
|
||
|
void dumpStats(MemBuf * output, int clientNumber) const;
|
||
|
|
||
|
int64_t cmp_offset;
|
||
|
@@ -59,19 +78,29 @@ public:
|
||
|
StoreIOState::Pointer swapin_sio;
|
||
|
|
||
|
struct {
|
||
|
+ /// whether we are expecting a response to be swapped in from disk
|
||
|
+ /// (i.e. whether async storeRead() is currently in progress)
|
||
|
+ // TODO: a better name reflecting the 'in' scope of the flag
|
||
|
bool disk_io_pending;
|
||
|
+
|
||
|
+ /// whether the store_client::doCopy()-initiated STCB sequence is
|
||
|
+ /// currently in progress
|
||
|
bool store_copying;
|
||
|
- bool copy_event_pending;
|
||
|
} flags;
|
||
|
|
||
|
#if USE_DELAY_POOLS
|
||
|
DelayId delayId;
|
||
|
+
|
||
|
+ /// The maximum number of bytes the Store client can read/copy next without
|
||
|
+ /// overflowing its buffer and without violating delay pool limits. Store
|
||
|
+ /// I/O is not rate-limited, but we assume that the same number of bytes may
|
||
|
+ /// be read from the Squid-to-server connection that may be rate-limited.
|
||
|
+ int bytesWanted() const;
|
||
|
+
|
||
|
void setDelayId(DelayId delay_id);
|
||
|
#endif
|
||
|
|
||
|
dlink_node node;
|
||
|
- /* Below here is private - do no alter outside storeClient calls */
|
||
|
- StoreIOBuffer copyInto;
|
||
|
|
||
|
private:
|
||
|
bool moreToSend() const;
|
||
|
@@ -83,9 +112,25 @@ private:
|
||
|
bool startSwapin();
|
||
|
bool unpackHeader(char const *buf, ssize_t len);
|
||
|
|
||
|
+ void fail();
|
||
|
+ void callback(ssize_t);
|
||
|
+ void noteCopiedBytes(size_t);
|
||
|
+ void noteEof();
|
||
|
+ void noteNews();
|
||
|
+ void finishCallback();
|
||
|
+ static void FinishCallback(store_client *);
|
||
|
+
|
||
|
int type;
|
||
|
bool object_ok;
|
||
|
|
||
|
+ /// Storage and metadata associated with the current copy() request. Ought
|
||
|
+ /// to be ignored when not answering a copy() request.
|
||
|
+ StoreIOBuffer copyInto;
|
||
|
+
|
||
|
+ /// The number of bytes loaded from Store into copyInto while answering the
|
||
|
+ /// current copy() request. Ought to be ignored when not answering.
|
||
|
+ size_t copiedSize;
|
||
|
+
|
||
|
/* Until we finish stuffing code into store_client */
|
||
|
|
||
|
public:
|
||
|
@@ -94,9 +139,16 @@ public:
|
||
|
Callback ():callback_handler(NULL), callback_data(NULL) {}
|
||
|
|
||
|
Callback (STCB *, void *);
|
||
|
+
|
||
|
+ /// Whether the copy() answer is needed/expected (by the client) and has
|
||
|
+ /// not been computed (by us). False during (asynchronous) answer
|
||
|
+ /// delivery to the STCB callback_handler.
|
||
|
bool pending() const;
|
||
|
STCB *callback_handler;
|
||
|
void *callback_data;
|
||
|
+
|
||
|
+ /// a scheduled asynchronous finishCallback() call (or nil)
|
||
|
+ AsyncCall::Pointer notifier;
|
||
|
} _callback;
|
||
|
};
|
||
|
|
||
|
diff --git a/src/store_client.cc b/src/store_client.cc
|
||
|
index 1b54f04..207c96b 100644
|
||
|
--- a/src/store_client.cc
|
||
|
+++ b/src/store_client.cc
|
||
|
@@ -9,6 +9,7 @@
|
||
|
/* DEBUG: section 90 Storage Manager Client-Side Interface */
|
||
|
|
||
|
#include "squid.h"
|
||
|
+#include "base/AsyncCbdataCalls.h"
|
||
|
#include "event.h"
|
||
|
#include "globals.h"
|
||
|
#include "HttpReply.h"
|
||
|
@@ -39,17 +40,10 @@
|
||
|
static StoreIOState::STRCB storeClientReadBody;
|
||
|
static StoreIOState::STRCB storeClientReadHeader;
|
||
|
static void storeClientCopy2(StoreEntry * e, store_client * sc);
|
||
|
-static EVH storeClientCopyEvent;
|
||
|
static bool CheckQuickAbortIsReasonable(StoreEntry * entry);
|
||
|
|
||
|
CBDATA_CLASS_INIT(store_client);
|
||
|
|
||
|
-bool
|
||
|
-store_client::memReaderHasLowerOffset(int64_t anOffset) const
|
||
|
-{
|
||
|
- return getType() == STORE_MEM_CLIENT && copyInto.offset < anOffset;
|
||
|
-}
|
||
|
-
|
||
|
int
|
||
|
store_client::getType() const
|
||
|
{
|
||
|
@@ -104,22 +98,41 @@ storeClientListAdd(StoreEntry * e, void *data)
|
||
|
return sc;
|
||
|
}
|
||
|
|
||
|
+/// schedules asynchronous STCB call to relay disk or memory read results
|
||
|
+/// \param outcome an error signal (if negative), an EOF signal (if zero), or the number of bytes read
|
||
|
+void
|
||
|
+store_client::callback(const ssize_t outcome)
|
||
|
+{
|
||
|
+ if (outcome > 0)
|
||
|
+ return noteCopiedBytes(outcome);
|
||
|
+
|
||
|
+ if (outcome < 0)
|
||
|
+ return fail();
|
||
|
+
|
||
|
+ noteEof();
|
||
|
+}
|
||
|
+/// finishCallback() wrapper; TODO: Add NullaryMemFunT for non-jobs.
|
||
|
void
|
||
|
-store_client::callback(ssize_t sz, bool error)
|
||
|
+store_client::FinishCallback(store_client * const sc)
|
||
|
{
|
||
|
- size_t bSz = 0;
|
||
|
+ sc->finishCallback();
|
||
|
+}
|
||
|
|
||
|
- if (sz >= 0 && !error)
|
||
|
- bSz = sz;
|
||
|
+/// finishes a copy()-STCB sequence by synchronously calling STCB
|
||
|
+void
|
||
|
+store_client::finishCallback()
|
||
|
+{
|
||
|
+ Assure(_callback.callback_handler);
|
||
|
+ Assure(_callback.notifier);
|
||
|
|
||
|
- StoreIOBuffer result(bSz, 0 ,copyInto.data);
|
||
|
+ // callers are not ready to handle a content+error combination
|
||
|
+ Assure(object_ok || !copiedSize);
|
||
|
|
||
|
- if (sz < 0 || error)
|
||
|
- result.flags.error = 1;
|
||
|
+ StoreIOBuffer result(copiedSize, copyInto.offset, copyInto.data);
|
||
|
+ result.flags.error = object_ok ? 0 : 1;
|
||
|
+ copiedSize = 0;
|
||
|
|
||
|
- result.offset = cmp_offset;
|
||
|
- assert(_callback.pending());
|
||
|
- cmp_offset = copyInto.offset + bSz;
|
||
|
+ cmp_offset = result.offset + result.length;
|
||
|
STCB *temphandler = _callback.callback_handler;
|
||
|
void *cbdata = _callback.callback_data;
|
||
|
_callback = Callback(NULL, NULL);
|
||
|
@@ -131,18 +144,24 @@ store_client::callback(ssize_t sz, bool error)
|
||
|
cbdataReferenceDone(cbdata);
|
||
|
}
|
||
|
|
||
|
-static void
|
||
|
-storeClientCopyEvent(void *data)
|
||
|
+/// schedules asynchronous STCB call to relay a successful disk or memory read
|
||
|
+/// \param bytesCopied the number of response bytes copied into copyInto
|
||
|
+void
|
||
|
+store_client::noteCopiedBytes(const size_t bytesCopied)
|
||
|
{
|
||
|
- store_client *sc = (store_client *)data;
|
||
|
- debugs(90, 3, "storeClientCopyEvent: Running");
|
||
|
- assert (sc->flags.copy_event_pending);
|
||
|
- sc->flags.copy_event_pending = false;
|
||
|
-
|
||
|
- if (!sc->_callback.pending())
|
||
|
- return;
|
||
|
+ debugs(90, 5, bytesCopied);
|
||
|
+ Assure(bytesCopied > 0);
|
||
|
+ Assure(!copiedSize);
|
||
|
+ copiedSize = bytesCopied;
|
||
|
+ noteNews();
|
||
|
+}
|
||
|
|
||
|
- storeClientCopy2(sc->entry, sc);
|
||
|
+void
|
||
|
+store_client::noteEof()
|
||
|
+{
|
||
|
+ debugs(90, 5, copiedSize);
|
||
|
+ Assure(!copiedSize);
|
||
|
+ noteNews();
|
||
|
}
|
||
|
|
||
|
store_client::store_client(StoreEntry *e) :
|
||
|
@@ -152,11 +171,11 @@ store_client::store_client(StoreEntry *e) :
|
||
|
#endif
|
||
|
entry(e),
|
||
|
type(e->storeClientType()),
|
||
|
- object_ok(true)
|
||
|
+ object_ok(true),
|
||
|
+ copiedSize(0)
|
||
|
{
|
||
|
flags.disk_io_pending = false;
|
||
|
flags.store_copying = false;
|
||
|
- flags.copy_event_pending = false;
|
||
|
++ entry->refcount;
|
||
|
|
||
|
if (getType() == STORE_DISK_CLIENT) {
|
||
|
@@ -272,17 +291,11 @@ static void
|
||
|
storeClientCopy2(StoreEntry * e, store_client * sc)
|
||
|
{
|
||
|
/* reentrancy not allowed - note this could lead to
|
||
|
- * dropped events
|
||
|
+ * dropped notifications about response data availability
|
||
|
*/
|
||
|
|
||
|
- if (sc->flags.copy_event_pending) {
|
||
|
- return;
|
||
|
- }
|
||
|
-
|
||
|
if (sc->flags.store_copying) {
|
||
|
- sc->flags.copy_event_pending = true;
|
||
|
- debugs(90, 3, "storeClientCopy2: Queueing storeClientCopyEvent()");
|
||
|
- eventAdd("storeClientCopyEvent", storeClientCopyEvent, sc, 0.0, 0);
|
||
|
+ debugs(90, 3, "prevented recursive copying for " << *e);
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
@@ -295,21 +308,16 @@ storeClientCopy2(StoreEntry * e, store_client * sc)
|
||
|
* if the peer aborts, we want to give the client(s)
|
||
|
* everything we got before the abort condition occurred.
|
||
|
*/
|
||
|
- /* Warning: doCopy may indirectly free itself in callbacks,
|
||
|
- * hence the lock to keep it active for the duration of
|
||
|
- * this function
|
||
|
- * XXX: Locking does not prevent calling sc destructor (it only prevents
|
||
|
- * freeing sc memory) so sc may become invalid from C++ p.o.v.
|
||
|
- */
|
||
|
- CbcPointer<store_client> tmpLock = sc;
|
||
|
- assert (!sc->flags.store_copying);
|
||
|
sc->doCopy(e);
|
||
|
- assert(!sc->flags.store_copying);
|
||
|
}
|
||
|
|
||
|
void
|
||
|
store_client::doCopy(StoreEntry *anEntry)
|
||
|
{
|
||
|
+ Assure(_callback.pending());
|
||
|
+ Assure(!flags.disk_io_pending);
|
||
|
+ Assure(!flags.store_copying);
|
||
|
+
|
||
|
assert (anEntry == entry);
|
||
|
flags.store_copying = true;
|
||
|
MemObject *mem = entry->mem_obj;
|
||
|
@@ -321,7 +329,7 @@ store_client::doCopy(StoreEntry *anEntry)
|
||
|
if (!moreToSend()) {
|
||
|
/* There is no more to send! */
|
||
|
debugs(33, 3, HERE << "There is no more to send!");
|
||
|
- callback(0);
|
||
|
+ noteEof();
|
||
|
flags.store_copying = false;
|
||
|
return;
|
||
|
}
|
||
|
@@ -382,6 +390,16 @@ store_client::startSwapin()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
+void
|
||
|
+store_client::noteSwapInDone(const bool error)
|
||
|
+{
|
||
|
+ Assure(_callback.pending());
|
||
|
+ if (error)
|
||
|
+ fail();
|
||
|
+ else
|
||
|
+ noteEof();
|
||
|
+}
|
||
|
+
|
||
|
void
|
||
|
store_client::scheduleRead()
|
||
|
{
|
||
|
@@ -421,7 +439,7 @@ store_client::scheduleMemRead()
|
||
|
/* What the client wants is in memory */
|
||
|
/* Old style */
|
||
|
debugs(90, 3, "store_client::doCopy: Copying normal from memory");
|
||
|
- size_t sz = entry->mem_obj->data_hdr.copy(copyInto);
|
||
|
+ const auto sz = entry->mem_obj->data_hdr.copy(copyInto); // may be <= 0 per copy() API
|
||
|
callback(sz);
|
||
|
flags.store_copying = false;
|
||
|
}
|
||
|
@@ -493,7 +511,19 @@ store_client::readBody(const char *, ssize_t len)
|
||
|
void
|
||
|
store_client::fail()
|
||
|
{
|
||
|
+ debugs(90, 3, (object_ok ? "once" : "again"));
|
||
|
+ if (!object_ok)
|
||
|
+ return; // we failed earlier; nothing to do now
|
||
|
+
|
||
|
object_ok = false;
|
||
|
+
|
||
|
+ noteNews();
|
||
|
+}
|
||
|
+
|
||
|
+/// if necessary and possible, informs the Store reader about copy() result
|
||
|
+void
|
||
|
+store_client::noteNews()
|
||
|
+{
|
||
|
/* synchronous open failures callback from the store,
|
||
|
* before startSwapin detects the failure.
|
||
|
* TODO: fix this inconsistent behaviour - probably by
|
||
|
@@ -501,8 +531,20 @@ store_client::fail()
|
||
|
* not synchronous
|
||
|
*/
|
||
|
|
||
|
- if (_callback.pending())
|
||
|
- callback(0, true);
|
||
|
+ if (!_callback.callback_handler) {
|
||
|
+ debugs(90, 5, "client lost interest");
|
||
|
+ return;
|
||
|
+ }
|
||
|
+
|
||
|
+ if (_callback.notifier) {
|
||
|
+ debugs(90, 5, "earlier news is being delivered by " << _callback.notifier);
|
||
|
+ return;
|
||
|
+ }
|
||
|
+
|
||
|
+ _callback.notifier = asyncCall(90, 4, "store_client::FinishCallback", cbdataDialer(store_client::FinishCallback, this));
|
||
|
+ ScheduleCallHere(_callback.notifier);
|
||
|
+
|
||
|
+ Assure(!_callback.pending());
|
||
|
}
|
||
|
|
||
|
static void
|
||
|
@@ -673,10 +715,12 @@ storeUnregister(store_client * sc, StoreEntry * e, void *data)
|
||
|
++statCounter.swap.ins;
|
||
|
}
|
||
|
|
||
|
- if (sc->_callback.pending()) {
|
||
|
- /* callback with ssize = -1 to indicate unexpected termination */
|
||
|
- debugs(90, 3, "store_client for " << *e << " has a callback");
|
||
|
- sc->fail();
|
||
|
+ if (sc->_callback.callback_handler || sc->_callback.notifier) {
|
||
|
+ debugs(90, 3, "forgetting store_client callback for " << *e);
|
||
|
+ // Do not notify: Callers want to stop copying and forget about this
|
||
|
+ // pending copy request. Some would mishandle a notification from here.
|
||
|
+ if (sc->_callback.notifier)
|
||
|
+ sc->_callback.notifier->cancel("storeUnregister");
|
||
|
}
|
||
|
|
||
|
#if STORE_CLIENT_LIST_DEBUG
|
||
|
@@ -684,6 +728,8 @@ storeUnregister(store_client * sc, StoreEntry * e, void *data)
|
||
|
|
||
|
#endif
|
||
|
|
||
|
+ // XXX: We might be inside sc store_client method somewhere up the call
|
||
|
+ // stack. TODO: Convert store_client to AsyncJob to make destruction async.
|
||
|
delete sc;
|
||
|
|
||
|
assert(e->locked());
|
||
|
@@ -740,6 +786,16 @@ StoreEntry::invokeHandlers()
|
||
|
|
||
|
if (sc->flags.disk_io_pending)
|
||
|
continue;
|
||
|
+ if (sc->flags.store_copying)
|
||
|
+ continue;
|
||
|
+
|
||
|
+ // XXX: If invokeHandlers() is (indirectly) called from a store_client
|
||
|
+ // method, then the above three conditions may not be sufficient to
|
||
|
+ // prevent us from reentering the same store_client object! This
|
||
|
+ // probably does not happen in the current code, but no observed
|
||
|
+ // invariant prevents this from (accidentally) happening in the future.
|
||
|
+
|
||
|
+ // TODO: Convert store_client into AsyncJob; make this call asynchronous
|
||
|
|
||
|
storeClientCopy2(this, sc);
|
||
|
}
|
||
|
@@ -864,8 +920,8 @@ store_client::dumpStats(MemBuf * output, int clientNumber) const
|
||
|
if (flags.store_copying)
|
||
|
output->append(" store_copying", 14);
|
||
|
|
||
|
- if (flags.copy_event_pending)
|
||
|
- output->append(" copy_event_pending", 19);
|
||
|
+ if (_callback.notifier)
|
||
|
+ output->append(" notifying", 10);
|
||
|
|
||
|
output->append("\n",1);
|
||
|
}
|
||
|
@@ -873,12 +929,19 @@ store_client::dumpStats(MemBuf * output, int clientNumber) const
|
||
|
bool
|
||
|
store_client::Callback::pending() const
|
||
|
{
|
||
|
- return callback_handler && callback_data;
|
||
|
+ return callback_handler && !notifier;
|
||
|
}
|
||
|
|
||
|
store_client::Callback::Callback(STCB *function, void *data) : callback_handler(function), callback_data (data) {}
|
||
|
|
||
|
#if USE_DELAY_POOLS
|
||
|
+int
|
||
|
+store_client::bytesWanted() const
|
||
|
+{
|
||
|
+ // TODO: To avoid using stale copyInto, return zero if !_callback.pending()?
|
||
|
+ return delayId.bytesWanted(0, copyInto.length);
|
||
|
+}
|
||
|
+
|
||
|
void
|
||
|
store_client::setDelayId(DelayId delay_id)
|
||
|
{
|
||
|
diff --git a/src/store_swapin.cc b/src/store_swapin.cc
|
||
|
index a05d7e3..cd32e94 100644
|
||
|
--- a/src/store_swapin.cc
|
||
|
+++ b/src/store_swapin.cc
|
||
|
@@ -56,7 +56,7 @@ storeSwapInFileClosed(void *data, int errflag, StoreIOState::Pointer)
|
||
|
|
||
|
if (sc->_callback.pending()) {
|
||
|
assert (errflag <= 0);
|
||
|
- sc->callback(0, errflag ? true : false);
|
||
|
+ sc->noteSwapInDone(errflag);
|
||
|
}
|
||
|
|
||
|
++statCounter.swap.ins;
|
||
|
diff --git a/src/tests/stub_store_client.cc b/src/tests/stub_store_client.cc
|
||
|
index 2a13874..4a73863 100644
|
||
|
--- a/src/tests/stub_store_client.cc
|
||
|
+++ b/src/tests/stub_store_client.cc
|
||
|
@@ -34,7 +34,10 @@ void storeLogOpen(void) STUB
|
||
|
void storeDigestInit(void) STUB
|
||
|
void storeRebuildStart(void) STUB
|
||
|
void storeReplSetup(void) STUB
|
||
|
-bool store_client::memReaderHasLowerOffset(int64_t anOffset) const STUB_RETVAL(false)
|
||
|
+void store_client::noteSwapInDone(bool) STUB
|
||
|
+#if USE_DELAY_POOLS
|
||
|
+int store_client::bytesWanted() const STUB_RETVAL(0)
|
||
|
+#endif
|
||
|
void store_client::dumpStats(MemBuf * output, int clientNumber) const STUB
|
||
|
int store_client::getType() const STUB_RETVAL(0)
|
||
|
|
||
|
--
|
||
|
2.39.3
|
||
|
|