343 lines
14 KiB
Diff
343 lines
14 KiB
Diff
From e0b1163f19ad7f63e92e6ea9ee94d51d5eebeacf Mon Sep 17 00:00:00 2001
|
|
From: Alex Rousskov <rousskov@measurement-factory.com>
|
|
Date: Thu, 27 Nov 2025 10:51:27 +0100
|
|
Subject: [PATCH] Bug 5352: Do not get stuck when RESPMOD is slower than
|
|
read(2) (#1777)
|
|
|
|
... RESPMOD BodyPipe buffer becomes full ...
|
|
maybeMakeSpaceAvailable: will not read up to 0
|
|
The AsyncCall Client::noteDelayAwareReadChance constructed
|
|
|
|
... RESPMOD consumes some buffered virgin body data ...
|
|
entering BodyProducer::noteMoreBodySpaceAvailable
|
|
leaving BodyProducer::noteMoreBodySpaceAvailable
|
|
|
|
... read_timeout seconds later ...
|
|
http.cc(148) httpTimeout
|
|
FwdState.cc(471) fail: ERR_READ_TIMEOUT "Gateway Timeout"
|
|
|
|
When RESPMOD does not empty its adaptation BodyPipe buffer fast enough,
|
|
readReply() may eventually fill that buffer and call delayRead(),
|
|
anticipating a noteDelayAwareReadChance() callback from Store or Server
|
|
delay pools. That callback never happens if Store and Server are not
|
|
getting any data -- they do not even start working until RESPMOD service
|
|
starts releasing adapted/echoed response back to Squid! Meanwhile, our
|
|
flags.do_next_read (cleared by readReply() caller) remains false.
|
|
|
|
When/if RESPMOD service eventually frees some BodyPipe buffer space,
|
|
triggering noteMoreBodySpaceAvailable() notification, nothing changes
|
|
because maybeReadVirginBody() quits when flags.do_next_read is false.
|
|
|
|
noteMoreBodySpaceAvailable() could not just make flags.do_next_read true
|
|
because that flag may be false for a variety of other/permanent reasons.
|
|
Instead, we replaced that one-size-fits-all flag with more specific
|
|
checks so that reading can resume if it is safe to resume it. This
|
|
change addresses a couple of flag-related XXXs.
|
|
|
|
The bug was introduced in 2023 commit 50c5af88. Prior that that change,
|
|
delayRead() was not called when RESPMOD BodyPipe buffer became full
|
|
because maybeMakeSpaceAvailable() returned false in that case, blocking
|
|
maybeReadVirginBody() from triggering readReply() via Comm::Read(). We
|
|
missed flags.do_next_read dependency and that Store-specific delayRead()
|
|
cannot be used to wait for adaptation buffer space to become available.
|
|
|
|
XXX: To reduce risks, this change duplicates a part of
|
|
calcBufferSpaceToReserve() logic. Removing that duplication requires
|
|
significant (and risky) refactoring of several related methods.
|
|
---
|
|
src/adaptation/icap/ModXact.cc | 8 +---
|
|
src/clients/Client.cc | 3 ++
|
|
src/clients/Client.h | 3 ++
|
|
src/clients/FtpClient.cc | 2 +
|
|
src/http.cc | 70 ++++++++++++++++++++++------------
|
|
src/http.h | 3 ++
|
|
src/http/StateFlags.h | 1 -
|
|
7 files changed, 59 insertions(+), 31 deletions(-)
|
|
|
|
diff --git a/src/adaptation/icap/ModXact.cc b/src/adaptation/icap/ModXact.cc
|
|
index d2f9529..9331215 100644
|
|
--- a/src/adaptation/icap/ModXact.cc
|
|
+++ b/src/adaptation/icap/ModXact.cc
|
|
@@ -435,14 +435,10 @@ void Adaptation::Icap::ModXact::virginConsume()
|
|
BodyPipe &bp = *virgin.body_pipe;
|
|
const bool wantToPostpone = isRepeatable || canStartBypass || protectGroupBypass;
|
|
|
|
- // Why > 2? HttpState does not use the last bytes in the buffer
|
|
- // because Client::delayRead() is arguably broken. See
|
|
- // HttpStateData::maybeReadVirginBody for more details.
|
|
- if (wantToPostpone && bp.buf().spaceSize() > 2) {
|
|
+ if (wantToPostpone && bp.buf().potentialSpaceSize() > 0) {
|
|
// Postponing may increase memory footprint and slow the HTTP side
|
|
// down. Not postponing may increase the number of ICAP errors
|
|
- // if the ICAP service fails. We may also use "potential" space to
|
|
- // postpone more aggressively. Should the trade-off be configurable?
|
|
+ // if the ICAP service fails. Should the trade-off be configurable?
|
|
debugs(93, 8, "postponing consumption from " << bp.status());
|
|
return;
|
|
}
|
|
diff --git a/src/clients/Client.cc b/src/clients/Client.cc
|
|
index cbf5693..7de6274 100644
|
|
--- a/src/clients/Client.cc
|
|
+++ b/src/clients/Client.cc
|
|
@@ -1028,6 +1028,9 @@ Client::adjustBodyBytesRead(const int64_t delta)
|
|
void
|
|
Client::delayRead()
|
|
{
|
|
+ Assure(!waitingForDelayAwareReadChance);
|
|
+ waitingForDelayAwareReadChance = true;
|
|
+
|
|
using DeferredReadDialer = NullaryMemFunT<Client>;
|
|
AsyncCall::Pointer call = asyncCall(11, 5, "Client::noteDelayAwareReadChance",
|
|
DeferredReadDialer(this, &Client::noteDelayAwareReadChance));
|
|
diff --git a/src/clients/Client.h b/src/clients/Client.h
|
|
index f60d8ce..6a53315 100644
|
|
--- a/src/clients/Client.h
|
|
+++ b/src/clients/Client.h
|
|
@@ -194,6 +194,9 @@ protected:
|
|
#endif
|
|
bool receivedWholeRequestBody = false; ///< handleRequestBodyProductionEnded called
|
|
|
|
+ /// whether we are waiting for MemObject::delayRead() to call us back
|
|
+ bool waitingForDelayAwareReadChance = false;
|
|
+
|
|
/// whether we should not be talking to FwdState; XXX: clear fwd instead
|
|
/// points to a string literal which is used only for debugging
|
|
const char *doneWithFwd = nullptr;
|
|
diff --git a/src/clients/FtpClient.cc b/src/clients/FtpClient.cc
|
|
index 3630766..6f9d8fa 100644
|
|
--- a/src/clients/FtpClient.cc
|
|
+++ b/src/clients/FtpClient.cc
|
|
@@ -907,6 +907,8 @@ Ftp::Client::dataConnection() const
|
|
void
|
|
Ftp::Client::noteDelayAwareReadChance()
|
|
{
|
|
+ // TODO: Merge with HttpStateData::noteDelayAwareReadChance()
|
|
+ waitingForDelayAwareReadChance = false;
|
|
data.read_pending = false;
|
|
maybeReadVirginBody();
|
|
}
|
|
diff --git a/src/http.cc b/src/http.cc
|
|
index 4916c15..e3f9b4c 100644
|
|
--- a/src/http.cc
|
|
+++ b/src/http.cc
|
|
@@ -1168,17 +1168,15 @@ HttpStateData::persistentConnStatus() const
|
|
void
|
|
HttpStateData::noteDelayAwareReadChance()
|
|
{
|
|
- flags.do_next_read = true;
|
|
+ waitingForDelayAwareReadChance = false;
|
|
maybeReadVirginBody();
|
|
}
|
|
|
|
void
|
|
HttpStateData::readReply(const CommIoCbParams &io)
|
|
{
|
|
- Must(!flags.do_next_read); // XXX: should have been set false by mayReadVirginBody()
|
|
- flags.do_next_read = false;
|
|
-
|
|
debugs(11, 5, io.conn);
|
|
+ waitingForCommRead = false;
|
|
|
|
// Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us
|
|
if (io.flag == Comm::ERR_CLOSING) {
|
|
@@ -1212,7 +1210,27 @@ HttpStateData::readReply(const CommIoCbParams &io)
|
|
const auto readSizeWanted = readSizeMax ? entry->bytesWanted(Range<size_t>(0, readSizeMax)) : 0;
|
|
|
|
if (readSizeWanted <= 0) {
|
|
- delayRead();
|
|
+ // XXX: If we avoid Comm::ReadNow(), we should not Comm::Read() again
|
|
+ // when the wait is over. We should go straight to readReply() instead.
|
|
+
|
|
+#if USE_ADAPTATION
|
|
+ // XXX: We are duplicating Client::calcBufferSpaceToReserve() logic.
|
|
+ // XXX: Some other delayRead() cases may lack kickReads() guarantees.
|
|
+ // TODO: Refactor maybeMakeSpaceAvailable() to properly treat each
|
|
+ // no-read case instead of calling delayRead() for the remaining cases.
|
|
+
|
|
+ if (responseBodyBuffer) {
|
|
+ debugs(11, 5, "avoid delayRead() to give adaptation a chance to drain overflow buffer: " << responseBodyBuffer->contentSize());
|
|
+ return; // wait for Client::noteMoreBodySpaceAvailable()
|
|
+ }
|
|
+
|
|
+ if (virginBodyDestination && !virginBodyDestination->buf().hasPotentialSpace()) {
|
|
+ debugs(11, 5, "avoid delayRead() to give adaptation a chance to drain body pipe buffer: " << virginBodyDestination->buf().contentSize());
|
|
+ return; // wait for Client::noteMoreBodySpaceAvailable()
|
|
+ }
|
|
+#endif
|
|
+
|
|
+ delayRead(); /// wait for Client::noteDelayAwareReadChance()
|
|
return;
|
|
}
|
|
|
|
@@ -1223,7 +1241,6 @@ HttpStateData::readReply(const CommIoCbParams &io)
|
|
case Comm::INPROGRESS:
|
|
if (inBuf.isEmpty())
|
|
debugs(33, 2, io.conn << ": no data to process, " << xstrerr(rd.xerrno));
|
|
- flags.do_next_read = true;
|
|
maybeReadVirginBody();
|
|
return;
|
|
|
|
@@ -1253,7 +1270,6 @@ HttpStateData::readReply(const CommIoCbParams &io)
|
|
|
|
case Comm::ENDFILE: // close detected by 0-byte read
|
|
eof = 1;
|
|
- flags.do_next_read = false;
|
|
|
|
/* Continue to process previously read data */
|
|
break;
|
|
@@ -1264,7 +1280,6 @@ HttpStateData::readReply(const CommIoCbParams &io)
|
|
const auto err = new ErrorState(ERR_READ_ERROR, Http::scBadGateway, fwd->request, fwd->al);
|
|
err->xerrno = rd.xerrno;
|
|
fwd->fail(err);
|
|
- flags.do_next_read = false;
|
|
closeServer();
|
|
mustStop("HttpStateData::readReply");
|
|
return;
|
|
@@ -1318,7 +1333,6 @@ HttpStateData::continueAfterParsingHeader()
|
|
|
|
if (!flags.headers_parsed && !eof) {
|
|
debugs(11, 9, "needs more at " << inBuf.length());
|
|
- flags.do_next_read = true;
|
|
/** \retval false If we have not finished parsing the headers and may get more data.
|
|
* Schedules more reads to retrieve the missing data.
|
|
*/
|
|
@@ -1369,7 +1383,6 @@ HttpStateData::continueAfterParsingHeader()
|
|
assert(error != ERR_NONE);
|
|
entry->reset();
|
|
fwd->fail(new ErrorState(error, Http::scBadGateway, fwd->request, fwd->al));
|
|
- flags.do_next_read = false;
|
|
closeServer();
|
|
mustStop("HttpStateData::continueAfterParsingHeader");
|
|
return false; // quit on error
|
|
@@ -1455,7 +1468,6 @@ HttpStateData::decodeAndWriteReplyBody()
|
|
addVirginReplyBody(decodedData.content(), decodedData.contentSize());
|
|
if (doneParsing) {
|
|
lastChunk = 1;
|
|
- flags.do_next_read = false;
|
|
markParsedVirginReplyAsWhole("http parsed last-chunk");
|
|
} else if (eof) {
|
|
markPrematureReplyBodyEofFailure();
|
|
@@ -1479,7 +1491,6 @@ void
|
|
HttpStateData::processReplyBody()
|
|
{
|
|
if (!flags.headers_parsed) {
|
|
- flags.do_next_read = true;
|
|
maybeReadVirginBody();
|
|
return;
|
|
}
|
|
@@ -1499,7 +1510,6 @@ HttpStateData::processReplyBody()
|
|
if (entry->isAccepting()) {
|
|
if (flags.chunked) {
|
|
if (!decodeAndWriteReplyBody()) {
|
|
- flags.do_next_read = false;
|
|
serverComplete();
|
|
return;
|
|
}
|
|
@@ -1525,8 +1535,6 @@ HttpStateData::processReplyBody()
|
|
} else {
|
|
commSetConnTimeout(serverConnection, Config.Timeout.read, nil);
|
|
}
|
|
-
|
|
- flags.do_next_read = true;
|
|
}
|
|
break;
|
|
|
|
@@ -1536,7 +1544,6 @@ HttpStateData::processReplyBody()
|
|
// TODO: Remove serverConnectionSaved but preserve exception safety.
|
|
|
|
commUnsetConnTimeout(serverConnection);
|
|
- flags.do_next_read = false;
|
|
|
|
comm_remove_close_handler(serverConnection->fd, closeHandler);
|
|
closeHandler = nullptr;
|
|
@@ -1596,29 +1603,45 @@ HttpStateData::mayReadVirginReplyBody() const
|
|
void
|
|
HttpStateData::maybeReadVirginBody()
|
|
{
|
|
- // too late to read
|
|
- if (!Comm::IsConnOpen(serverConnection) || fd_table[serverConnection->fd].closing())
|
|
+ if (!Comm::IsConnOpen(serverConnection) || fd_table[serverConnection->fd].closing()) {
|
|
+ debugs(11, 3, "no, peer connection gone");
|
|
return;
|
|
+ }
|
|
+
|
|
+ if (eof) {
|
|
+ // tolerate hypothetical calls between Comm::ENDFILE and closeServer()
|
|
+ debugs(11, 3, "no, saw EOF");
|
|
+ return;
|
|
+ }
|
|
+
|
|
+ if (lastChunk) {
|
|
+ // tolerate hypothetical calls between setting lastChunk and clearing serverConnection
|
|
+ debugs(11, 3, "no, saw last-chunk");
|
|
+ return;
|
|
+ }
|
|
|
|
if (!canBufferMoreReplyBytes()) {
|
|
abortTransaction("more response bytes required, but the read buffer is full and cannot be drained");
|
|
return;
|
|
}
|
|
|
|
- // XXX: get rid of the do_next_read flag
|
|
- // check for the proper reasons preventing read(2)
|
|
- if (!flags.do_next_read)
|
|
+ if (waitingForDelayAwareReadChance) {
|
|
+ debugs(11, 5, "no, still waiting for noteDelayAwareReadChance()");
|
|
return;
|
|
+ }
|
|
|
|
- flags.do_next_read = false;
|
|
+ if (waitingForCommRead) {
|
|
+ debugs(11, 3, "no, already waiting for readReply()");
|
|
+ return;
|
|
+ }
|
|
|
|
- // must not already be waiting for read(2) ...
|
|
assert(!Comm::MonitorsRead(serverConnection->fd));
|
|
|
|
// wait for read(2) to be possible.
|
|
typedef CommCbMemFunT<HttpStateData, CommIoCbParams> Dialer;
|
|
AsyncCall::Pointer call = JobCallback(11, 5, Dialer, this, HttpStateData::readReply);
|
|
Comm::Read(serverConnection, call);
|
|
+ waitingForCommRead = true;
|
|
}
|
|
|
|
/// Desired inBuf capacity based on various capacity preferences/limits:
|
|
@@ -2406,7 +2429,6 @@ HttpStateData::sendRequest()
|
|
AsyncCall::Pointer timeoutCall = JobCallback(11, 5,
|
|
TimeoutDialer, this, HttpStateData::httpTimeout);
|
|
commSetConnTimeout(serverConnection, Config.Timeout.lifetime, timeoutCall);
|
|
- flags.do_next_read = true;
|
|
maybeReadVirginBody();
|
|
|
|
if (request->body_pipe != nullptr) {
|
|
diff --git a/src/http.h b/src/http.h
|
|
index cb4ae06..a15f103 100644
|
|
--- a/src/http.h
|
|
+++ b/src/http.h
|
|
@@ -152,6 +152,9 @@ private:
|
|
/// positive when we read more than we wanted
|
|
int64_t payloadTruncated = 0;
|
|
|
|
+ /// whether we are waiting for our Comm::Read() handler to be called
|
|
+ bool waitingForCommRead = false;
|
|
+
|
|
/// Whether we received a Date header older than that of a matching
|
|
/// cached response.
|
|
bool sawDateGoBack = false;
|
|
diff --git a/src/http/StateFlags.h b/src/http/StateFlags.h
|
|
index ae3ca99..6d26b3e 100644
|
|
--- a/src/http/StateFlags.h
|
|
+++ b/src/http/StateFlags.h
|
|
@@ -58,7 +58,6 @@ public:
|
|
bool keepalive_broken = false;
|
|
bool abuse_detected = false;
|
|
bool request_sent = false;
|
|
- bool do_next_read = false;
|
|
bool chunked = false; ///< reading a chunked response; TODO: rename
|
|
bool chunked_request = false; ///< writing a chunked request
|
|
bool sentLastChunk = false; ///< do not try to write last-chunk again
|
|
--
|
|
2.44.0
|
|
|