squid/SOURCES/0002-Remove-serialized-HTTP...

3389 lines
126 KiB
Diff

From e4e1a48a6d53cad77c8bab561addb1ed48abba4f Mon Sep 17 00:00:00 2001
From: Alex Rousskov <rousskov@measurement-factory.com>
Date: Thu, 7 Dec 2023 17:58:49 +0000
Subject: [PATCH 2/7] Remove serialized HTTP headers from storeClientCopy()
(#1335)
Do not send serialized HTTP response header bytes in storeClientCopy()
answers. Ignore serialized header size when calling storeClientCopy().
This complex change adjusts storeClientCopy() API to addresses several
related problems with storeClientCopy() and its callers. The sections
below summarize storeClientCopy() changes and then move on to callers.
Squid incorrectly assumed that serialized HTTP response headers are read
from disk in a single storeRead() request. In reality, many situations
lead to store_client::readBody() receiving partial HTTP headers,
resulting in parseCharBuf() failure and a level-0 cache.log message:
Could not parse headers from on disk object
Inadequate handling of this failure resulted in a variety of problems.
Squid now accumulates storeRead() results to parse larger headers and
also handles parsing failures better, but we could not just stop there.
With the storeRead() accumulation in place, it is no longer possible to
send parsed serialized HTTP headers to storeClientCopy() callers because
those callers do not provide enough buffer space to fit larger headers.
Increasing caller buffer capacity does not work well because the actual
size of the serialized header is unknown in advance and may be quite
large. Always allocating large buffers "just in case" is bad for
performance. Finally, larger buffers may jeopardize hard-to-find code
that uses hard-coded 4KB buffers without using HTTP_REQBUF_SZ macro.
Fortunately, storeClientCopy() callers either do not care about
serialized HTTP response headers or should not care about them! The API
forced callers to deal with serialized headers, but callers could (and
some did) just use the parsed headers available in the corresponding
MemObject. With this API change, storeClientCopy() callers no longer
receive serialized headers and do not need to parse or skip them.
Consequently, callers also do not need to account for response headers
size when computing offsets for subsequent storeClientCopy() requests.
Restricting storeClientCopy() API to HTTP _body_ bytes removed a lot of
problematic caller code. Caller changes are summarized further below.
A similar HTTP response header parsing problem existed in shared memory
cache code. That code was actually aware that headers may span multiple
cache slices but incorrectly assumed that httpMsgParseStep() accumulates
input as needed (to make another parsing "step"). It does not. Large
response headers cached in shared memory triggered a level-1 message:
Corrupted mem-cached headers: e:...
Fixed MemStore code now accumulates serialized HTTP response headers as
needed to parse them, sharing high-level parsing code with store_client.
Old clientReplyContext methods worked hard to skip received serialized
HTTP headers. The code contained dangerous and often complex/unreadable
manipulation of various raw offsets and buffer pointers, aggravated by
the perceived need to save/restore those offsets across asynchronous
checks (see below). That header skipping code is gone now. Several stale
and misleading comments related to Store buffers management were also
removed or updated.
We replaced reqofs/reqsize with simpler/safer lastStreamBufferedBytes,
while becoming more consistent with that "cached" info invalidation. We
still need this info to resume HTTP body processing after asynchronous
http_reply_access checks and cache hit validations, but we no longer
save/restore this info for hit validation: No need to save/restore
information about the buffer that hit validation does not use and must
never touch!
The API change also moved from-Store StoreIOBuffer usage closer to
StoreIOBuffers manipulated by Clients Streams code. Buffers in both
categories now contain just the body bytes, and both now treat zero
length as EOF only _after_ processing the response headers.
These changes improve overall code quality, but this code path and these
changes still suffer from utterly unsafe legacy interfaces like
StoreIOBuffer and clientStreamNode. We cannot rely on the compiler to
check our work. The risk of these changes exposing/causing bugs is high.
asHandleReply() expected WHOIS response body bytes where serialized HTTP
headers were! The code also had multiple problems typical for manually
written C parsers dealing with raw input buffers. Now replaced with a
Tokenizer-based code.
To skip received HTTP response headers, peerDigestHandleReply() helper
functions called headersEnd() on the received buffer. Twice. We have now
merged those two parsing helper functions into one (that just checks the
already parsed headers). This merger preserved "304s must come with
fetch->pd->cd" logic that was hidden/spread across those two functions.
urnHandleReply() re-parsed received HTTP response headers. We left its
HTTP body parsing code unchanged except for polishing NUL-termination.
netdbExchangeHandleReply() re-parsed received HTTP response headers to
find where they end (via headersEnd()). We improved handing of corner
cases and replaced some "tricky bits" code, reusing the new
Store::ParsingBuffer class. The net_db record parsing code is unchanged.
[root@ol8-gcc squid]# cat 0002-Remove-serialized-HTTP-headers-from-storeClientCopy-.patch |head -n 140
From 8d50a09c3a0e9500becd21624ea62eb02660cc6d Mon Sep 17 00:00:00 2001
From: Alex Rousskov <rousskov@measurement-factory.com>
Date: Thu, 23 Nov 2023 18:26:33 +0000
Subject: [PATCH 2/6] Remove serialized HTTP headers from storeClientCopy()
(#1335)
Do not send serialized HTTP response header bytes in storeClientCopy()
answers. Ignore serialized header size when calling storeClientCopy().
This complex change adjusts storeClientCopy() API to addresses several
related problems with storeClientCopy() and its callers. The sections
below summarize storeClientCopy() changes and then move on to callers.
Squid incorrectly assumed that serialized HTTP response headers are read
from disk in a single storeRead() request. In reality, many situations
lead to store_client::readBody() receiving partial HTTP headers,
resulting in parseCharBuf() failure and a level-0 cache.log message:
Could not parse headers from on disk object
Inadequate handling of this failure resulted in a variety of problems.
Squid now accumulates storeRead() results to parse larger headers and
also handles parsing failures better, but we could not just stop there.
With the storeRead() accumulation in place, it is no longer possible to
send parsed serialized HTTP headers to storeClientCopy() callers because
those callers do not provide enough buffer space to fit larger headers.
Increasing caller buffer capacity does not work well because the actual
size of the serialized header is unknown in advance and may be quite
large. Always allocating large buffers "just in case" is bad for
performance. Finally, larger buffers may jeopardize hard-to-find code
that uses hard-coded 4KB buffers without using HTTP_REQBUF_SZ macro.
Fortunately, storeClientCopy() callers either do not care about
serialized HTTP response headers or should not care about them! The API
forced callers to deal with serialized headers, but callers could (and
some did) just use the parsed headers available in the corresponding
MemObject. With this API change, storeClientCopy() callers no longer
receive serialized headers and do not need to parse or skip them.
Consequently, callers also do not need to account for response headers
size when computing offsets for subsequent storeClientCopy() requests.
Restricting storeClientCopy() API to HTTP _body_ bytes removed a lot of
problematic caller code. Caller changes are summarized further below.
A similar HTTP response header parsing problem existed in shared memory
cache code. That code was actually aware that headers may span multiple
cache slices but incorrectly assumed that httpMsgParseStep() accumulates
input as needed (to make another parsing "step"). It does not. Large
response headers cached in shared memory triggered a level-1 message:
Corrupted mem-cached headers: e:...
Fixed MemStore code now accumulates serialized HTTP response headers as
needed to parse them, sharing high-level parsing code with store_client.
Old clientReplyContext methods worked hard to skip received serialized
HTTP headers. The code contained dangerous and often complex/unreadable
manipulation of various raw offsets and buffer pointers, aggravated by
the perceived need to save/restore those offsets across asynchronous
checks (see below). That header skipping code is gone now. Several stale
and misleading comments related to Store buffers management were also
removed or updated.
We replaced reqofs/reqsize with simpler/safer lastStreamBufferedBytes,
while becoming more consistent with that "cached" info invalidation. We
still need this info to resume HTTP body processing after asynchronous
http_reply_access checks and cache hit validations, but we no longer
save/restore this info for hit validation: No need to save/restore
information about the buffer that hit validation does not use and must
never touch!
The API change also moved from-Store StoreIOBuffer usage closer to
StoreIOBuffers manipulated by Clients Streams code. Buffers in both
categories now contain just the body bytes, and both now treat zero
length as EOF only _after_ processing the response headers.
These changes improve overall code quality, but this code path and these
changes still suffer from utterly unsafe legacy interfaces like
StoreIOBuffer and clientStreamNode. We cannot rely on the compiler to
check our work. The risk of these changes exposing/causing bugs is high.
asHandleReply() expected WHOIS response body bytes where serialized HTTP
headers were! The code also had multiple problems typical for manually
written C parsers dealing with raw input buffers. Now replaced with a
Tokenizer-based code.
To skip received HTTP response headers, peerDigestHandleReply() helper
functions called headersEnd() on the received buffer. Twice. We have now
merged those two parsing helper functions into one (that just checks the
already parsed headers). This merger preserved "304s must come with
fetch->pd->cd" logic that was hidden/spread across those two functions.
urnHandleReply() re-parsed received HTTP response headers. We left its
HTTP body parsing code unchanged except for polishing NUL-termination.
netdbExchangeHandleReply() re-parsed received HTTP response headers to
find where they end (via headersEnd()). We improved handing of corner
cases and replaced some "tricky bits" code, reusing the new
Store::ParsingBuffer class. The net_db record parsing code is unchanged.
Mgr::StoreToCommWriter::noteStoreCopied() is a very special case. It
actually worked OK because, unlike all other storeClientCopy() callers,
this code does not get serialized HTTP headers from Store: The code
adding bytes to the corresponding StoreEntry does not write serialized
HTTP headers at all. StoreToCommWriter is used to deliver kid-specific
pieces of an HTTP body of an SMP cache manager response. The HTTP
headers of that response are handled elsewhere. We left this code
unchanged, but the existence of the special no-headers case does
complicate storeClientCopy() API documentation, implementation, and
understanding.
Co-authored-by: Eduard Bagdasaryan <eduard.bagdasaryan@measurement-factory.com>
Modified-by: Alex Burmashev <alexander.burmashev@oracle.com>
Signed-off-by: Alex Burmashev <alexander.burmashev@oracle.com>
---
src/HttpReply.cc | 34 +++
src/HttpReply.h | 7 +
src/MemObject.cc | 6 +
src/MemObject.h | 9 +
src/MemStore.cc | 75 ++++---
src/MemStore.h | 2 +-
src/StoreClient.h | 65 +++++-
src/StoreIOBuffer.h | 3 +
src/acl/Asn.cc | 163 +++++---------
src/clientStream.cc | 3 +-
src/client_side_reply.cc | 322 +++++++++++----------------
src/client_side_reply.h | 38 +++-
src/enums.h | 1 -
src/icmp/net_db.cc | 144 ++++--------
src/peer_digest.cc | 96 ++------
src/store.cc | 11 +
src/store/Makefile.am | 2 +
src/store/Makefile.in | 9 +-
src/store/ParsingBuffer.cc | 198 +++++++++++++++++
src/store/ParsingBuffer.h | 128 +++++++++++
src/store/forward.h | 1 +
src/store_client.cc | 429 ++++++++++++++++++++++++------------
src/tests/stub_HttpReply.cc | 1 +
src/urn.cc | 89 +++-----
24 files changed, 1094 insertions(+), 742 deletions(-)
create mode 100644 src/store/ParsingBuffer.cc
create mode 100644 src/store/ParsingBuffer.h
diff --git a/src/HttpReply.cc b/src/HttpReply.cc
index 6feb262..af2bd4d 100644
--- a/src/HttpReply.cc
+++ b/src/HttpReply.cc
@@ -20,7 +20,9 @@
#include "HttpReply.h"
#include "HttpRequest.h"
#include "MemBuf.h"
+#include "sbuf/Stream.h"
#include "SquidConfig.h"
+#include "SquidMath.h"
#include "SquidTime.h"
#include "Store.h"
#include "StrList.h"
@@ -524,6 +526,38 @@ HttpReply::expectedBodyTooLarge(HttpRequest& request)
return expectedSize > bodySizeMax;
}
+size_t
+HttpReply::parseTerminatedPrefix(const char * const terminatedBuf, const size_t bufSize)
+{
+ auto error = Http::scNone;
+ const bool eof = false; // TODO: Remove after removing atEnd from HttpHeader::parse()
+ if (parse(terminatedBuf, bufSize, eof, &error)) {
+ debugs(58, 7, "success after accumulating " << bufSize << " bytes and parsing " << hdr_sz);
+ Assure(pstate == Http::Message::psParsed);
+ Assure(hdr_sz > 0);
+ Assure(!Less(bufSize, hdr_sz)); // cannot parse more bytes than we have
+ return hdr_sz; // success
+ }
+
+ Assure(pstate != Http::Message::psParsed);
+ hdr_sz = 0;
+
+ if (error) {
+ throw TextException(ToSBuf("failed to parse HTTP headers",
+ Debug::Extra, "parser error code: ", error,
+ Debug::Extra, "accumulated unparsed bytes: ", bufSize,
+ Debug::Extra, "reply_header_max_size: ", Config.maxReplyHeaderSize),
+ Here());
+ }
+
+ debugs(58, 3, "need more bytes after accumulating " << bufSize << " out of " << Config.maxReplyHeaderSize);
+
+ // the parse() call above enforces Config.maxReplyHeaderSize limit
+ // XXX: Make this a strict comparison after fixing Http::Message::parse() enforcement
+ Assure(bufSize <= Config.maxReplyHeaderSize);
+ return 0; // parsed nothing, need more data
+}
+
void
HttpReply::calcMaxBodySize(HttpRequest& request) const
{
diff --git a/src/HttpReply.h b/src/HttpReply.h
index 6c90e20..4301cfd 100644
--- a/src/HttpReply.h
+++ b/src/HttpReply.h
@@ -121,6 +121,13 @@ public:
/// \returns false if any information is missing
bool olderThan(const HttpReply *them) const;
+ /// Parses response status line and headers at the start of the given
+ /// NUL-terminated buffer of the given size. Respects reply_header_max_size.
+ /// Assures pstate becomes Http::Message::psParsed on (and only on) success.
+ /// \returns the number of bytes in a successfully parsed prefix (or zero)
+ /// \retval 0 implies that more data is needed to parse the response prefix
+ size_t parseTerminatedPrefix(const char *, size_t);
+
private:
/** initialize */
void init();
diff --git a/src/MemObject.cc b/src/MemObject.cc
index 4ba63cc..d7aaf5e 100644
--- a/src/MemObject.cc
+++ b/src/MemObject.cc
@@ -369,6 +369,12 @@ MemObject::policyLowestOffsetToKeep(bool swap) const
*/
int64_t lowest_offset = lowestMemReaderOffset();
+ // XXX: Remove the last (Config.onoff.memory_cache_first-based) condition
+ // and update keepForLocalMemoryCache() accordingly. The caller wants to
+ // remove all local memory that is safe to remove. Honoring caching
+ // preferences is its responsibility. Our responsibility is safety. The
+ // situation was different when ff4b33f added that condition -- there was no
+ // keepInLocalMemory/keepForLocalMemoryCache() call guard back then.
if (endOffset() < lowest_offset ||
endOffset() - inmem_lo > (int64_t)Config.Store.maxInMemObjSize ||
(swap && !Config.onoff.memory_cache_first))
diff --git a/src/MemObject.h b/src/MemObject.h
index 711966d..ba6646f 100644
--- a/src/MemObject.h
+++ b/src/MemObject.h
@@ -59,6 +59,15 @@ public:
HttpReply const *getReply() const;
void replaceHttpReply(HttpReply *newrep);
void stat (MemBuf * mb) const;
+
+ /// The offset of the last memory-stored HTTP response byte plus one.
+ /// * HTTP response headers (if any) are stored at offset zero.
+ /// * HTTP response body byte[n] usually has offset (hdr_sz + n), where
+ /// hdr_sz is the size of stored HTTP response headers (zero if none); and
+ /// n is the corresponding byte offset in the whole resource body.
+ /// However, some 206 (Partial Content) response bodies are stored (and
+ /// retrieved) as regular 200 response bodies, disregarding offsets of
+ /// their body parts. \sa HttpStateData::decideIfWeDoRanges().
int64_t endOffset () const;
void markEndOfReplyHeaders(); ///< sets _reply->hdr_sz to endOffset()
/// negative if unknown; otherwise, expected object_sz, expected endOffset
diff --git a/src/MemStore.cc b/src/MemStore.cc
index a4a6ab2..fe7af2f 100644
--- a/src/MemStore.cc
+++ b/src/MemStore.cc
@@ -17,6 +17,8 @@
#include "MemObject.h"
#include "MemStore.h"
#include "mime_header.h"
+#include "sbuf/SBuf.h"
+#include "sbuf/Stream.h"
#include "SquidConfig.h"
#include "SquidMath.h"
#include "StoreStats.h"
@@ -316,19 +318,25 @@ MemStore::get(const cache_key *key)
// create a brand new store entry and initialize it with stored info
StoreEntry *e = new StoreEntry();
- // XXX: We do not know the URLs yet, only the key, but we need to parse and
- // store the response for the Root().find() callers to be happy because they
- // expect IN_MEMORY entries to already have the response headers and body.
- e->createMemObject();
-
- anchorEntry(*e, index, *slot);
-
- const bool copied = copyFromShm(*e, index, *slot);
-
- if (copied)
- return e;
+ try {
+ // XXX: We do not know the URLs yet, only the key, but we need to parse and
+ // store the response for the Root().find() callers to be happy because they
+ // expect IN_MEMORY entries to already have the response headers and body.
+ e->createMemObject();
+
+ anchorEntry(*e, index, *slot);
+
+ // TODO: make copyFromShm() throw on all failures, simplifying this code
+ if (copyFromShm(*e, index, *slot))
+ return e;
+ debugs(20, 3, "failed for " << *e);
+ } catch (...) {
+ // see store_client::parseHttpHeadersFromDisk() for problems this may log
+ debugs(20, DBG_IMPORTANT, "ERROR: Cannot load a cache hit from shared memory" <<
+ Debug::Extra << "exception: " << CurrentException <<
+ Debug::Extra << "cache_mem entry: " << *e);
+ }
- debugs(20, 3, "failed for " << *e);
map->freeEntry(index); // do not let others into the same trap
destroyStoreEntry(static_cast<hash_link *>(e));
return NULL;
@@ -473,6 +481,8 @@ MemStore::copyFromShm(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnc
Ipc::StoreMapSliceId sid = anchor.start; // optimize: remember the last sid
bool wasEof = anchor.complete() && sid < 0;
int64_t sliceOffset = 0;
+
+ SBuf httpHeaderParsingBuffer;
while (sid >= 0) {
const Ipc::StoreMapSlice &slice = map->readableSlice(index, sid);
// slice state may change during copying; take snapshots now
@@ -495,10 +505,18 @@ MemStore::copyFromShm(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnc
const StoreIOBuffer sliceBuf(wasSize - prefixSize,
e.mem_obj->endOffset(),
page + prefixSize);
- if (!copyFromShmSlice(e, sliceBuf, wasEof))
- return false;
+
+ copyFromShmSlice(e, sliceBuf);
debugs(20, 8, "entry " << index << " copied slice " << sid <<
" from " << extra.page << '+' << prefixSize);
+
+ // parse headers if needed; they might span multiple slices!
+ auto &reply = e.mem().adjustableBaseReply();
+ if (reply.pstate != Http::Message::psParsed) {
+ httpHeaderParsingBuffer.append(sliceBuf.data, sliceBuf.length);
+ if (reply.parseTerminatedPrefix(httpHeaderParsingBuffer.c_str(), httpHeaderParsingBuffer.length()))
+ httpHeaderParsingBuffer = SBuf(); // we do not need these bytes anymore
+ }
}
// else skip a [possibly incomplete] slice that we copied earlier
@@ -524,6 +542,9 @@ MemStore::copyFromShm(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnc
debugs(20, 5, "mem-loaded all " << e.mem_obj->endOffset() << '/' <<
anchor.basics.swap_file_sz << " bytes of " << e);
+ if (e.mem().adjustableBaseReply().pstate != Http::Message::psParsed)
+ throw TextException(ToSBuf("truncated mem-cached headers; accumulated: ", httpHeaderParsingBuffer.length()), Here());
+
// from StoreEntry::complete()
e.mem_obj->object_sz = e.mem_obj->endOffset();
e.store_status = STORE_OK;
@@ -539,32 +560,11 @@ MemStore::copyFromShm(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnc
}
/// imports one shared memory slice into local memory
-bool
-MemStore::copyFromShmSlice(StoreEntry &e, const StoreIOBuffer &buf, bool eof)
+void
+MemStore::copyFromShmSlice(StoreEntry &e, const StoreIOBuffer &buf)
{
debugs(20, 7, "buf: " << buf.offset << " + " << buf.length);
- // from store_client::readBody()
- // parse headers if needed; they might span multiple slices!
- HttpReply *rep = (HttpReply *)e.getReply();
- if (rep->pstate < psParsed) {
- // XXX: have to copy because httpMsgParseStep() requires 0-termination
- MemBuf mb;
- mb.init(buf.length+1, buf.length+1);
- mb.append(buf.data, buf.length);
- mb.terminate();
- const int result = rep->httpMsgParseStep(mb.buf, buf.length, eof);
- if (result > 0) {
- assert(rep->pstate == psParsed);
- } else if (result < 0) {
- debugs(20, DBG_IMPORTANT, "Corrupted mem-cached headers: " << e);
- return false;
- } else { // more slices are needed
- assert(!eof);
- }
- }
- debugs(20, 7, "rep pstate: " << rep->pstate);
-
// local memory stores both headers and body so copy regardless of pstate
const int64_t offBefore = e.mem_obj->endOffset();
assert(e.mem_obj->data_hdr.write(buf)); // from MemObject::write()
@@ -572,7 +572,6 @@ MemStore::copyFromShmSlice(StoreEntry &e, const StoreIOBuffer &buf, bool eof)
// expect to write the entire buf because StoreEntry::write() never fails
assert(offAfter >= 0 && offBefore <= offAfter &&
static_cast<size_t>(offAfter - offBefore) == buf.length);
- return true;
}
/// whether we should cache the entry
diff --git a/src/MemStore.h b/src/MemStore.h
index 516da3c..31a2015 100644
--- a/src/MemStore.h
+++ b/src/MemStore.h
@@ -76,7 +76,7 @@ protected:
void copyToShm(StoreEntry &e);
void copyToShmSlice(StoreEntry &e, Ipc::StoreMapAnchor &anchor, Ipc::StoreMap::Slice &slice);
bool copyFromShm(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnchor &anchor);
- bool copyFromShmSlice(StoreEntry &e, const StoreIOBuffer &buf, bool eof);
+ void copyFromShmSlice(StoreEntry &, const StoreIOBuffer &);
void updateHeadersOrThrow(Ipc::StoreMapUpdate &update);
diff --git a/src/StoreClient.h b/src/StoreClient.h
index 457844a..1d90e5a 100644
--- a/src/StoreClient.h
+++ b/src/StoreClient.h
@@ -10,11 +10,24 @@
#define SQUID_STORECLIENT_H
#include "dlink.h"
+#include "store/ParsingBuffer.h"
#include "StoreIOBuffer.h"
#include "StoreIOState.h"
#include "base/AsyncCall.h"
-typedef void STCB(void *, StoreIOBuffer); /* store callback */
+/// A storeClientCopy() callback function.
+///
+/// Upon storeClientCopy() success, StoreIOBuffer::flags.error is zero, and
+/// * HTTP response headers (if any) are available via MemObject::freshestReply();
+/// * HTTP response body bytes (if any) are available via StoreIOBuffer.
+///
+/// STCB callbacks may use response semantics to detect certain EOF conditions.
+/// Callbacks that expect HTTP headers may call store_client::atEof(). Similar
+/// to clientStreamCallback() callbacks, callbacks dedicated to receiving HTTP
+/// bodies may use zero StoreIOBuffer::length as an EOF condition.
+///
+/// Errors are indicated by setting StoreIOBuffer flags.error.
+using STCB = void (void *, StoreIOBuffer);
class StoreEntry;
@@ -68,7 +81,13 @@ public:
void dumpStats(MemBuf * output, int clientNumber) const;
- int64_t cmp_offset;
+ // TODO: When STCB gets a dedicated Answer type, move this info there.
+ /// Whether the last successful storeClientCopy() answer was known to
+ /// contain the last body bytes of the HTTP response
+ /// \retval true requesting bytes at higher offsets is futile
+ /// \sa STCB
+ bool atEof() const { return atEof_; }
+
#if STORE_CLIENT_LIST_DEBUG
void *owner;
@@ -103,19 +122,28 @@ public:
dlink_node node;
private:
- bool moreToSend() const;
+ bool moreToRead() const;
+ bool canReadFromMemory() const;
+ bool answeredOnce() const { return answers >= 1; }
+ bool sendingHttpHeaders() const;
+ int64_t nextHttpReadOffset() const;
void fileRead();
void scheduleDiskRead();
- void scheduleMemRead();
+ void readFromMemory();
void scheduleRead();
bool startSwapin();
bool unpackHeader(char const *buf, ssize_t len);
+ void handleBodyFromDisk();
+ void maybeWriteFromDiskToMemory(const StoreIOBuffer &);
+
+ bool parseHttpHeadersFromDisk();
+ bool tryParsingHttpHeaders();
+ void skipHttpHeadersFromDisk();
void fail();
void callback(ssize_t);
void noteCopiedBytes(size_t);
- void noteEof();
void noteNews();
void finishCallback();
static void FinishCallback(store_client *);
@@ -123,13 +151,23 @@ private:
int type;
bool object_ok;
+ /// \copydoc atEof()
+ bool atEof_;
+
/// Storage and metadata associated with the current copy() request. Ought
/// to be ignored when not answering a copy() request.
StoreIOBuffer copyInto;
- /// The number of bytes loaded from Store into copyInto while answering the
- /// current copy() request. Ought to be ignored when not answering.
- size_t copiedSize;
+ /// the total number of finishCallback() calls
+ uint64_t answers;
+
+ /// Accumulates raw bytes read from Store while answering the current copy()
+ /// request. Buffer contents depends on the source and parsing stage; it may
+ /// hold (parts of) swap metadata, HTTP response headers, and/or HTTP
+ /// response body bytes.
+ std::optional<Store::ParsingBuffer> parsingBuffer;
+
+ StoreIOBuffer lastDiskRead; ///< buffer used for the last storeRead() call
/* Until we finish stuffing code into store_client */
@@ -152,7 +190,18 @@ public:
} _callback;
};
+/// Asynchronously read HTTP response headers and/or body bytes from Store.
+///
+/// The requested zero-based HTTP body offset is specified via the
+/// StoreIOBuffer::offset field. The first call (for a given store_client
+/// object) must specify zero offset.
+///
+/// The requested HTTP body portion size is specified via the
+/// StoreIOBuffer::length field. The function may return fewer body bytes.
+///
+/// See STCB for result delivery details.
void storeClientCopy(store_client *, StoreEntry *, StoreIOBuffer, STCB *, void *);
+
store_client* storeClientListAdd(StoreEntry * e, void *data);
int storeClientCopyPending(store_client *, StoreEntry * e, void *data);
int storeUnregister(store_client * sc, StoreEntry * e, void *data);
diff --git a/src/StoreIOBuffer.h b/src/StoreIOBuffer.h
index 009aafe..ad1c491 100644
--- a/src/StoreIOBuffer.h
+++ b/src/StoreIOBuffer.h
@@ -43,6 +43,9 @@ public:
return Range<int64_t>(offset, offset + length);
}
+ /// convenience method for changing the offset of a being-configured buffer
+ StoreIOBuffer &positionAt(const int64_t newOffset) { offset = newOffset; return *this; }
+
void dump() const {
if (fwrite(data, length, 1, stderr)) {}
if (fwrite("\n", 1, 1, stderr)) {}
diff --git a/src/acl/Asn.cc b/src/acl/Asn.cc
index 94ec862..ad450c0 100644
--- a/src/acl/Asn.cc
+++ b/src/acl/Asn.cc
@@ -16,20 +16,22 @@
#include "acl/DestinationIp.h"
#include "acl/SourceAsn.h"
#include "acl/Strategised.h"
+#include "base/CharacterSet.h"
#include "FwdState.h"
#include "HttpReply.h"
#include "HttpRequest.h"
#include "ipcache.h"
#include "MasterXaction.h"
#include "mgr/Registration.h"
+#include "parser/Tokenizer.h"
#include "radix.h"
#include "RequestFlags.h"
+#include "sbuf/SBuf.h"
#include "SquidConfig.h"
#include "Store.h"
#include "StoreClient.h"
#define WHOIS_PORT 43
-#define AS_REQBUF_SZ 4096
/* BEGIN of definitions for radix tree entries */
@@ -77,10 +79,9 @@ public:
store_client *sc;
HttpRequest::Pointer request;
int as_number;
- int64_t offset;
- int reqofs;
- char reqbuf[AS_REQBUF_SZ];
- bool dataRead;
+
+ /// for receiving a WHOIS reply body from Store and interpreting it
+ Store::ParsingBuffer parsingBuffer;
};
CBDATA_CLASS_INIT(ASState);
@@ -112,7 +113,7 @@ struct rtentry_t {
m_ADDR e_mask;
};
-static int asnAddNet(char *, int);
+static int asnAddNet(const SBuf &, int);
static void asnCacheStart(int as);
@@ -256,8 +257,7 @@ asnCacheStart(int as)
}
asState->entry = e;
- StoreIOBuffer readBuffer (AS_REQBUF_SZ, asState->offset, asState->reqbuf);
- storeClientCopy(asState->sc, e, readBuffer, asHandleReply, asState);
+ storeClientCopy(asState->sc, e, asState->parsingBuffer.makeInitialSpace(), asHandleReply, asState);
}
static void
@@ -265,13 +265,8 @@ asHandleReply(void *data, StoreIOBuffer result)
{
ASState *asState = (ASState *)data;
StoreEntry *e = asState->entry;
- char *s;
- char *t;
- char *buf = asState->reqbuf;
- int leftoversz = -1;
- debugs(53, 3, "asHandleReply: Called with size=" << (unsigned int)result.length);
- debugs(53, 3, "asHandleReply: buffer='" << buf << "'");
+ debugs(53, 3, result << " for " << asState->as_number << " with " << *e);
/* First figure out whether we should abort the request */
@@ -280,11 +275,7 @@ asHandleReply(void *data, StoreIOBuffer result)
return;
}
- if (result.length == 0 && asState->dataRead) {
- debugs(53, 3, "asHandleReply: Done: " << e->url());
- delete asState;
- return;
- } else if (result.flags.error) {
+ if (result.flags.error) {
debugs(53, DBG_IMPORTANT, "asHandleReply: Called with Error set and size=" << (unsigned int) result.length);
delete asState;
return;
@@ -294,78 +285,39 @@ asHandleReply(void *data, StoreIOBuffer result)
return;
}
- /*
- * Next, attempt to parse our request
- * Remembering that the actual buffer size is retsize + reqofs!
- */
- s = buf;
-
- while ((size_t)(s - buf) < result.length + asState->reqofs && *s != '\0') {
- while (*s && xisspace(*s))
- ++s;
-
- for (t = s; *t; ++t) {
- if (xisspace(*t))
- break;
- }
-
- if (*t == '\0') {
- /* oof, word should continue on next block */
- break;
- }
-
- *t = '\0';
- debugs(53, 3, "asHandleReply: AS# " << s << " (" << asState->as_number << ")");
- asnAddNet(s, asState->as_number);
- s = t + 1;
- asState->dataRead = true;
+ asState->parsingBuffer.appended(result.data, result.length);
+ Parser::Tokenizer tok(SBuf(asState->parsingBuffer.content().data, asState->parsingBuffer.contentSize()));
+ SBuf address;
+ // Word delimiters in WHOIS ASN replies. RFC 3912 mentions SP, CR, and LF.
+ // Others are added to mimic an earlier isspace()-based implementation.
+ static const auto WhoisSpaces = CharacterSet("ASCII_spaces", " \f\r\n\t\v");
+ while (tok.token(address, WhoisSpaces)) {
+ (void)asnAddNet(address, asState->as_number);
}
-
- /*
- * Next, grab the end of the 'valid data' in the buffer, and figure
- * out how much data is left in our buffer, which we need to keep
- * around for the next request
- */
- leftoversz = (asState->reqofs + result.length) - (s - buf);
-
- assert(leftoversz >= 0);
-
- /*
- * Next, copy the left over data, from s to s + leftoversz to the
- * beginning of the buffer
- */
- memmove(buf, s, leftoversz);
-
- /*
- * Next, update our offset and reqofs, and kick off a copy if required
- */
- asState->offset += result.length;
-
- asState->reqofs = leftoversz;
-
- debugs(53, 3, "asState->offset = " << asState->offset);
-
- if (e->store_status == STORE_PENDING) {
- debugs(53, 3, "asHandleReply: store_status == STORE_PENDING: " << e->url() );
- StoreIOBuffer tempBuffer (AS_REQBUF_SZ - asState->reqofs,
- asState->offset,
- asState->reqbuf + asState->reqofs);
- storeClientCopy(asState->sc,
- e,
- tempBuffer,
- asHandleReply,
- asState);
- } else {
- StoreIOBuffer tempBuffer;
- debugs(53, 3, "asHandleReply: store complete, but data received " << e->url() );
- tempBuffer.offset = asState->offset;
- tempBuffer.length = AS_REQBUF_SZ - asState->reqofs;
- tempBuffer.data = asState->reqbuf + asState->reqofs;
- storeClientCopy(asState->sc,
- e,
- tempBuffer,
- asHandleReply,
- asState);
+ asState->parsingBuffer.consume(tok.parsedSize());
+ const auto leftoverBytes = asState->parsingBuffer.contentSize();
+ if (asState->sc->atEof()) {
+ if (leftoverBytes)
+ debugs(53, 2, "WHOIS: Discarding the last " << leftoverBytes << " received bytes of a truncated AS response");
+ delete asState;
+ return;
+ }
+ const auto remainingSpace = asState->parsingBuffer.space().positionAt(result.offset + result.length);
+ if (!remainingSpace.length) {
+ Assure(leftoverBytes);
+ debugs(53, DBG_IMPORTANT, "WARNING: Ignoring the tail of a WHOIS AS response" <<
+ " with an unparsable section of " << leftoverBytes <<
+ " bytes ending at offset " << remainingSpace.offset);
+ delete asState;
+ return;
+ }
+ const decltype(StoreIOBuffer::offset) stillReasonableOffset = 100000; // an arbitrary limit in bytes
+ if (remainingSpace.offset > stillReasonableOffset) {
+ // stop suspicious accumulation of parsed addresses and/or work
+ debugs(53, DBG_IMPORTANT, "WARNING: Ignoring the tail of a suspiciously large WHOIS AS response" <<
+ " exceeding " << stillReasonableOffset << " bytes");
+ delete asState;
+ return;
}
}
@@ -373,38 +325,29 @@ asHandleReply(void *data, StoreIOBuffer result)
* add a network (addr, mask) to the radix tree, with matching AS number
*/
static int
-asnAddNet(char *as_string, int as_number)
+asnAddNet(const SBuf &addressAndMask, const int as_number)
{
struct squid_radix_node *rn;
CbDataList<int> **Tail = NULL;
CbDataList<int> *q = NULL;
as_info *asinfo = NULL;
- Ip::Address mask;
- Ip::Address addr;
- char *t;
- int bitl;
-
- t = strchr(as_string, '/');
-
- if (t == NULL) {
+ static const CharacterSet NonSlashSet = CharacterSet("slash", "/").complement("non-slash");
+ Parser::Tokenizer tok(addressAndMask);
+ SBuf addressToken;
+ if (!(tok.prefix(addressToken, NonSlashSet) && tok.skip('/'))) {
debugs(53, 3, "asnAddNet: failed, invalid response from whois server.");
return 0;
}
-
- *t = '\0';
- addr = as_string;
- bitl = atoi(t + 1);
-
- if (bitl < 0)
- bitl = 0;
-
+ const Ip::Address addr = addressToken.c_str();
// INET6 TODO : find a better way of identifying the base IPA family for mask than this.
- t = strchr(as_string, '.');
-
+ const auto addrFamily = (addressToken.find('.') != SBuf::npos) ? AF_INET : AF_INET6;
// generate Netbits Format Mask
+ Ip::Address mask;
mask.setNoAddr();
- mask.applyMask(bitl, (t!=NULL?AF_INET:AF_INET6) );
+ int64_t bitl = 0;
+ if (tok.int64(bitl, 10, false))
+ mask.applyMask(bitl, addrFamily);
debugs(53, 3, "asnAddNet: called for " << addr << "/" << mask );
diff --git a/src/clientStream.cc b/src/clientStream.cc
index 04d89c0..bd5dd09 100644
--- a/src/clientStream.cc
+++ b/src/clientStream.cc
@@ -154,8 +154,7 @@ clientStreamCallback(clientStreamNode * thisObject, ClientHttpRequest * http,
assert(thisObject && http && thisObject->node.next);
next = thisObject->next();
- debugs(87, 3, "clientStreamCallback: Calling " << next->callback << " with cbdata " <<
- next->data.getRaw() << " from node " << thisObject);
+ debugs(87, 3, thisObject << " gives " << next->data << ' ' << replyBuffer);
next->callback(next, http, rep, replyBuffer);
}
diff --git a/src/client_side_reply.cc b/src/client_side_reply.cc
index c919af4..861f4b4 100644
--- a/src/client_side_reply.cc
+++ b/src/client_side_reply.cc
@@ -33,6 +33,7 @@
#include "refresh.h"
#include "RequestFlags.h"
#include "SquidConfig.h"
+#include "SquidMath.h"
#include "SquidTime.h"
#include "Store.h"
#include "StrList.h"
@@ -76,11 +77,7 @@ clientReplyContext::clientReplyContext(ClientHttpRequest *clientContext) :
purgeStatus(Http::scNone),
lookingforstore(0),
http(cbdataReference(clientContext)),
- headers_sz(0),
sc(NULL),
- old_reqsize(0),
- reqsize(0),
- reqofs(0),
#if USE_CACHE_DIGESTS
lookup_type(NULL),
#endif
@@ -166,8 +163,6 @@ void clientReplyContext::setReplyToStoreEntry(StoreEntry *entry, const char *rea
#if USE_DELAY_POOLS
sc->setDelayId(DelayId::DelayClient(http));
#endif
- reqofs = 0;
- reqsize = 0;
if (http->request)
http->request->ignoreRange(reason);
flags.storelogiccomplete = 1;
@@ -206,13 +201,9 @@ clientReplyContext::saveState()
old_sc = sc;
old_lastmod = http->request->lastmod;
old_etag = http->request->etag;
- old_reqsize = reqsize;
- tempBuffer.offset = reqofs;
/* Prevent accessing the now saved entries */
http->storeEntry(NULL);
sc = NULL;
- reqsize = 0;
- reqofs = 0;
}
void
@@ -223,8 +214,6 @@ clientReplyContext::restoreState()
removeClientStoreReference(&sc, http);
http->storeEntry(old_entry);
sc = old_sc;
- reqsize = old_reqsize;
- reqofs = tempBuffer.offset;
http->request->lastmod = old_lastmod;
http->request->etag = old_etag;
/* Prevent accessed the old saved entries */
@@ -232,7 +221,6 @@ clientReplyContext::restoreState()
old_sc = NULL;
old_lastmod = -1;
old_etag.clean();
- old_reqsize = 0;
tempBuffer.offset = 0;
}
@@ -250,18 +238,27 @@ clientReplyContext::getNextNode() const
return (clientStreamNode *)ourNode->node.next->data;
}
-/* This function is wrong - the client parameters don't include the
- * header offset
- */
+/// Request HTTP response headers from Store, to be sent to the given recipient.
+/// That recipient also gets zero, some, or all HTTP response body bytes (into
+/// next()->readBuffer).
void
-clientReplyContext::triggerInitialStoreRead()
+clientReplyContext::triggerInitialStoreRead(STCB recipient)
{
- /* when confident, 0 becomes reqofs, and then this factors into
- * startSendProcess
- */
- assert(reqofs == 0);
+ Assure(recipient != HandleIMSReply);
+ lastStreamBufferedBytes = StoreIOBuffer(); // storeClientCopy(next()->readBuffer) invalidates
StoreIOBuffer localTempBuffer (next()->readBuffer.length, 0, next()->readBuffer.data);
- storeClientCopy(sc, http->storeEntry(), localTempBuffer, SendMoreData, this);
+ ::storeClientCopy(sc, http->storeEntry(), localTempBuffer, recipient, this);
+}
+
+/// Request HTTP response body bytes from Store into next()->readBuffer. This
+/// method requests body bytes at readerBuffer.offset and, hence, it should only
+/// be called after we triggerInitialStoreRead() and get the requested HTTP
+/// response headers (using zero offset).
+void
+clientReplyContext::requestMoreBodyFromStore()
+{
+ lastStreamBufferedBytes = StoreIOBuffer(); // storeClientCopy(next()->readBuffer) invalidates
+ ::storeClientCopy(sc, http->storeEntry(), next()->readBuffer, SendMoreData, this);
}
/* there is an expired entry in the store.
@@ -358,30 +355,22 @@ clientReplyContext::processExpired()
{
/* start counting the length from 0 */
StoreIOBuffer localTempBuffer(HTTP_REQBUF_SZ, 0, tempbuf);
- storeClientCopy(sc, entry, localTempBuffer, HandleIMSReply, this);
+ // keep lastStreamBufferedBytes: tempbuf is not a Client Stream buffer
+ ::storeClientCopy(sc, entry, localTempBuffer, HandleIMSReply, this);
}
}
void
-clientReplyContext::sendClientUpstreamResponse()
+clientReplyContext::sendClientUpstreamResponse(const StoreIOBuffer &upstreamResponse)
{
- StoreIOBuffer tempresult;
removeStoreReference(&old_sc, &old_entry);
if (collapsedRevalidation)
http->storeEntry()->clearPublicKeyScope();
/* here the data to send is the data we just received */
- tempBuffer.offset = 0;
- old_reqsize = 0;
- /* sendMoreData tracks the offset as well.
- * Force it back to zero */
- reqofs = 0;
assert(!EBIT_TEST(http->storeEntry()->flags, ENTRY_ABORTED));
- /* TODO: provide sendMoreData with the ready parsed reply */
- tempresult.length = reqsize;
- tempresult.data = tempbuf;
- sendMoreData(tempresult);
+ sendMoreData(upstreamResponse);
}
void
@@ -398,11 +387,9 @@ clientReplyContext::sendClientOldEntry()
restoreState();
/* here the data to send is in the next nodes buffers already */
assert(!EBIT_TEST(http->storeEntry()->flags, ENTRY_ABORTED));
- /* sendMoreData tracks the offset as well.
- * Force it back to zero */
- reqofs = 0;
- StoreIOBuffer tempresult (reqsize, reqofs, next()->readBuffer.data);
- sendMoreData(tempresult);
+ Assure(matchesStreamBodyBuffer(lastStreamBufferedBytes));
+ Assure(!lastStreamBufferedBytes.offset);
+ sendMoreData(lastStreamBufferedBytes);
}
/* This is the workhorse of the HandleIMSReply callback.
@@ -411,16 +398,16 @@ clientReplyContext::sendClientOldEntry()
* IMS request to revalidate a stale entry.
*/
void
-clientReplyContext::handleIMSReply(StoreIOBuffer result)
+clientReplyContext::handleIMSReply(const StoreIOBuffer result)
{
if (deleting)
return;
- debugs(88, 3, http->storeEntry()->url() << ", " << (long unsigned) result.length << " bytes");
-
if (http->storeEntry() == NULL)
return;
+ debugs(88, 3, http->storeEntry()->url() << " got " << result);
+
if (result.flags.error && !EBIT_TEST(http->storeEntry()->flags, ENTRY_ABORTED))
return;
@@ -433,9 +420,6 @@ clientReplyContext::handleIMSReply(StoreIOBuffer result)
return;
}
- /* update size of the request */
- reqsize = result.length + reqofs;
-
const Http::StatusCode status = http->storeEntry()->getReply()->sline.status();
// request to origin was aborted
@@ -460,7 +444,7 @@ clientReplyContext::handleIMSReply(StoreIOBuffer result)
if (http->request->flags.ims && !old_entry->modifiedSince(http->request->ims, http->request->imslen)) {
// forward the 304 from origin
debugs(88, 3, "origin replied 304, revalidating existing entry and forwarding 304 to client");
- sendClientUpstreamResponse();
+ sendClientUpstreamResponse(result);
} else {
// send existing entry, it's still valid
debugs(88, 3, "origin replied 304, revalidating existing entry and sending " <<
@@ -484,7 +468,7 @@ clientReplyContext::handleIMSReply(StoreIOBuffer result)
http->logType = LOG_TCP_REFRESH_MODIFIED;
debugs(88, 3, "origin replied " << status <<
", replacing existing entry and forwarding to client");
- sendClientUpstreamResponse();
+ sendClientUpstreamResponse(result);
}
}
@@ -493,7 +477,7 @@ clientReplyContext::handleIMSReply(StoreIOBuffer result)
http->logType = LOG_TCP_REFRESH_FAIL_ERR;
debugs(88, 3, "origin replied with error " << status <<
", forwarding to client due to fail_on_validation_err");
- sendClientUpstreamResponse();
+ sendClientUpstreamResponse(result);
} else {
// ignore and let client have old entry
http->logType = LOG_TCP_REFRESH_FAIL_OLD;
@@ -506,13 +490,7 @@ clientReplyContext::handleIMSReply(StoreIOBuffer result)
SQUIDCEXTERN CSR clientGetMoreData;
SQUIDCEXTERN CSD clientReplyDetach;
-/**
- * clientReplyContext::cacheHit Should only be called until the HTTP reply headers
- * have been parsed. Normally this should be a single call, but
- * it might take more than one. As soon as we have the headers,
- * we hand off to clientSendMoreData, processExpired, or
- * processMiss.
- */
+/// \copydoc clientReplyContext::cacheHit()
void
clientReplyContext::CacheHit(void *data, StoreIOBuffer result)
{
@@ -520,11 +498,11 @@ clientReplyContext::CacheHit(void *data, StoreIOBuffer result)
context->cacheHit(result);
}
-/**
- * Process a possible cache HIT.
- */
+/// Processes HTTP response headers received from Store on a suspected cache hit
+/// path. May be called several times (e.g., a Vary marker object hit followed
+/// by the corresponding variant hit).
void
-clientReplyContext::cacheHit(StoreIOBuffer result)
+clientReplyContext::cacheHit(const StoreIOBuffer result)
{
/** Ignore if the HIT object is being deleted. */
if (deleting) {
@@ -536,7 +514,7 @@ clientReplyContext::cacheHit(StoreIOBuffer result)
HttpRequest *r = http->request;
- debugs(88, 3, "clientCacheHit: " << http->uri << ", " << result.length << " bytes");
+ debugs(88, 3, http->uri << " got " << result);
if (http->storeEntry() == NULL) {
debugs(88, 3, "clientCacheHit: request aborted");
@@ -560,20 +538,7 @@ clientReplyContext::cacheHit(StoreIOBuffer result)
return;
}
- if (result.length == 0) {
- debugs(88, 5, "store IO buffer has no content. MISS");
- /* the store couldn't get enough data from the file for us to id the
- * object
- */
- /* treat as a miss */
- http->logType = LOG_TCP_MISS;
- processMiss();
- return;
- }
-
assert(!EBIT_TEST(e->flags, ENTRY_ABORTED));
- /* update size of the request */
- reqsize = result.length + reqofs;
/*
* Got the headers, now grok them
@@ -587,6 +552,8 @@ clientReplyContext::cacheHit(StoreIOBuffer result)
return;
}
+ noteStreamBufferredBytes(result);
+
switch (varyEvaluateMatch(e, r)) {
case VARY_NONE:
@@ -687,7 +654,7 @@ clientReplyContext::cacheHit(StoreIOBuffer result)
return;
} else if (r->conditional()) {
debugs(88, 5, "conditional HIT");
- if (processConditional(result))
+ if (processConditional())
return;
}
@@ -806,7 +773,7 @@ clientReplyContext::processOnlyIfCachedMiss()
/// process conditional request from client
bool
-clientReplyContext::processConditional(StoreIOBuffer &result)
+clientReplyContext::processConditional()
{
StoreEntry *const e = http->storeEntry();
@@ -984,16 +951,7 @@ clientReplyContext::purgeFoundObject(StoreEntry *entry)
http->logType = LOG_TCP_HIT;
- reqofs = 0;
-
- localTempBuffer.offset = http->out.offset;
-
- localTempBuffer.length = next()->readBuffer.length;
-
- localTempBuffer.data = next()->readBuffer.data;
-
- storeClientCopy(sc, http->storeEntry(),
- localTempBuffer, CacheHit, this);
+ triggerInitialStoreRead(CacheHit);
}
void
@@ -1111,16 +1069,10 @@ clientReplyContext::purgeDoPurgeHead(StoreEntry *newEntry)
}
void
-clientReplyContext::traceReply(clientStreamNode * node)
+clientReplyContext::traceReply()
{
- clientStreamNode *nextNode = (clientStreamNode *)node->node.next->data;
- StoreIOBuffer localTempBuffer;
createStoreEntry(http->request->method, RequestFlags());
- localTempBuffer.offset = nextNode->readBuffer.offset + headers_sz;
- localTempBuffer.length = nextNode->readBuffer.length;
- localTempBuffer.data = nextNode->readBuffer.data;
- storeClientCopy(sc, http->storeEntry(),
- localTempBuffer, SendMoreData, this);
+ triggerInitialStoreRead();
http->storeEntry()->releaseRequest();
http->storeEntry()->buffer();
HttpReply *rep = new HttpReply;
@@ -1169,16 +1121,15 @@ int
clientReplyContext::storeOKTransferDone() const
{
assert(http->storeEntry()->objectLen() >= 0);
+ const auto headers_sz = http->storeEntry()->mem().baseReply().hdr_sz;
assert(http->storeEntry()->objectLen() >= headers_sz);
- if (http->out.offset >= http->storeEntry()->objectLen() - headers_sz) {
- debugs(88,3,HERE << "storeOKTransferDone " <<
- " out.offset=" << http->out.offset <<
- " objectLen()=" << http->storeEntry()->objectLen() <<
- " headers_sz=" << headers_sz);
- return 1;
- }
-
- return 0;
+ const auto done = http->out.offset >= http->storeEntry()->objectLen() - headers_sz;
+ const auto debugLevel = done ? 3 : 5;
+ debugs(88, debugLevel, done <<
+ " out.offset=" << http->out.offset <<
+ " objectLen()=" << http->storeEntry()->objectLen() <<
+ " headers_sz=" << headers_sz);
+ return done ? 1 : 0;
}
int
@@ -1190,11 +1141,8 @@ clientReplyContext::storeNotOKTransferDone() const
MemObject *mem = http->storeEntry()->mem_obj;
assert(mem != NULL);
assert(http->request != NULL);
- /* mem->reply was wrong because it uses the UPSTREAM header length!!! */
- HttpReply const *curReply = mem->getReply();
- if (headers_sz == 0)
- /* haven't found end of headers yet */
+ if (mem->baseReply().pstate != Http::Message::psParsed)
return 0;
/*
@@ -1202,19 +1150,12 @@ clientReplyContext::storeNotOKTransferDone() const
* If we are sending a body and we don't have a content-length,
* then we must wait for the object to become STORE_OK.
*/
- if (curReply->content_length < 0)
- return 0;
-
- uint64_t expectedLength = curReply->content_length + http->out.headers_sz;
-
- if (http->out.size < expectedLength)
- return 0;
- else {
- debugs(88,3,HERE << "storeNotOKTransferDone " <<
- " out.size=" << http->out.size <<
- " expectedLength=" << expectedLength);
- return 1;
- }
+ const auto done = http->out.offset >= expectedBodySize;
+ const auto debugLevel = done ? 3 : 5;
+ debugs(88, debugLevel, done <<
+ " out.offset=" << http->out.offset <<
+ " expectedBodySize=" << expectedBodySize);
+ return done ? 1 : 0;
}
/* A write has completed, what is the next status based on the
@@ -1778,20 +1719,12 @@ clientGetMoreData(clientStreamNode * aNode, ClientHttpRequest * http)
assert (context);
assert(context->http == http);
- clientStreamNode *next = ( clientStreamNode *)aNode->node.next->data;
-
if (!context->ourNode)
context->ourNode = aNode;
/* no cbdatareference, this is only used once, and safely */
if (context->flags.storelogiccomplete) {
- StoreIOBuffer tempBuffer;
- tempBuffer.offset = next->readBuffer.offset + context->headers_sz;
- tempBuffer.length = next->readBuffer.length;
- tempBuffer.data = next->readBuffer.data;
-
- storeClientCopy(context->sc, http->storeEntry(),
- tempBuffer, clientReplyContext::SendMoreData, context);
+ context->requestMoreBodyFromStore();
return;
}
@@ -1804,7 +1737,7 @@ clientGetMoreData(clientStreamNode * aNode, ClientHttpRequest * http)
if (context->http->request->method == Http::METHOD_TRACE) {
if (context->http->request->header.getInt64(Http::HdrType::MAX_FORWARDS) == 0) {
- context->traceReply(aNode);
+ context->traceReply();
return;
}
@@ -1834,7 +1767,6 @@ clientReplyContext::doGetMoreData()
#endif
assert(http->logType.oldType == LOG_TCP_HIT);
- reqofs = 0;
/* guarantee nothing has been sent yet! */
assert(http->out.size == 0);
assert(http->out.offset == 0);
@@ -1849,10 +1781,7 @@ clientReplyContext::doGetMoreData()
}
}
- localTempBuffer.offset = reqofs;
- localTempBuffer.length = getNextNode()->readBuffer.length;
- localTempBuffer.data = getNextNode()->readBuffer.data;
- storeClientCopy(sc, http->storeEntry(), localTempBuffer, CacheHit, this);
+ triggerInitialStoreRead(CacheHit);
} else {
/* MISS CASE, http->logType is already set! */
processMiss();
@@ -1878,6 +1807,32 @@ clientReplyContext::SendMoreData(void *data, StoreIOBuffer result)
context->sendMoreData (result);
}
+/// Whether the given body area describes the start of our Client Stream buffer.
+/// An empty area does.
+bool
+clientReplyContext::matchesStreamBodyBuffer(const StoreIOBuffer &their) const
+{
+ // the answer is undefined for errors; they are not really "body buffers"
+ Assure(!their.flags.error);
+
+ if (!their.length)
+ return true; // an empty body area always matches our body area
+
+ if (their.data != next()->readBuffer.data) {
+ debugs(88, 7, "no: " << their << " vs. " << next()->readBuffer);
+ return false;
+ }
+
+ return true;
+}
+
+void
+clientReplyContext::noteStreamBufferredBytes(const StoreIOBuffer &result)
+{
+ Assure(matchesStreamBodyBuffer(result));
+ lastStreamBufferedBytes = result; // may be unchanged and/or zero-length
+}
+
void
clientReplyContext::makeThisHead()
{
@@ -1887,12 +1842,11 @@ clientReplyContext::makeThisHead()
}
bool
-clientReplyContext::errorInStream(StoreIOBuffer const &result, size_t const &sizeToProcess)const
+clientReplyContext::errorInStream(const StoreIOBuffer &result) const
{
return /* aborted request */
(http->storeEntry() && EBIT_TEST(http->storeEntry()->flags, ENTRY_ABORTED)) ||
- /* Upstream read error */ (result.flags.error) ||
- /* Upstream EOF */ (sizeToProcess == 0);
+ /* Upstream read error */ (result.flags.error);
}
void
@@ -1913,24 +1867,16 @@ clientReplyContext::sendStreamError(StoreIOBuffer const &result)
}
void
-clientReplyContext::pushStreamData(StoreIOBuffer const &result, char *source)
+clientReplyContext::pushStreamData(const StoreIOBuffer &result)
{
- StoreIOBuffer localTempBuffer;
-
if (result.length == 0) {
debugs(88, 5, "clientReplyContext::pushStreamData: marking request as complete due to 0 length store result");
flags.complete = 1;
}
- assert(result.offset - headers_sz == next()->readBuffer.offset);
- localTempBuffer.offset = result.offset - headers_sz;
- localTempBuffer.length = result.length;
-
- if (localTempBuffer.length)
- localTempBuffer.data = source;
-
+ assert(!result.length || result.offset == next()->readBuffer.offset);
clientStreamCallback((clientStreamNode*)http->client_stream.head->data, http, NULL,
- localTempBuffer);
+ result);
}
clientStreamNode *
@@ -2022,7 +1968,6 @@ clientReplyContext::processReplyAccess ()
if (http->logType.oldType == LOG_TCP_DENIED ||
http->logType.oldType == LOG_TCP_DENIED_REPLY ||
alwaysAllowResponse(reply->sline.status())) {
- headers_sz = reply->hdr_sz;
processReplyAccessResult(ACCESS_ALLOWED);
return;
}
@@ -2033,8 +1978,6 @@ clientReplyContext::processReplyAccess ()
return;
}
- headers_sz = reply->hdr_sz;
-
/** check for absent access controls (permit by default) */
if (!Config.accessList.reply) {
processReplyAccessResult(ACCESS_ALLOWED);
@@ -2091,11 +2034,9 @@ clientReplyContext::processReplyAccessResult(const allow_t &accessAllowed)
/* Ok, the reply is allowed, */
http->loggingEntry(http->storeEntry());
- ssize_t body_size = reqofs - reply->hdr_sz;
- if (body_size < 0) {
- reqofs = reply->hdr_sz;
- body_size = 0;
- }
+ Assure(matchesStreamBodyBuffer(lastStreamBufferedBytes));
+ Assure(!lastStreamBufferedBytes.offset);
+ auto body_size = lastStreamBufferedBytes.length; // may be zero
debugs(88, 3, "clientReplyContext::sendMoreData: Appending " <<
(int) body_size << " bytes after " << reply->hdr_sz <<
@@ -2123,19 +2064,27 @@ clientReplyContext::processReplyAccessResult(const allow_t &accessAllowed)
assert (!flags.headersSent);
flags.headersSent = true;
+ // next()->readBuffer.offset may be positive for Range requests, but our
+ // localTempBuffer initialization code assumes that next()->readBuffer.data
+ // points to the response body at offset 0 because the first
+ // storeClientCopy() request always has offset 0 (i.e. our first Store
+ // request ignores next()->readBuffer.offset).
+ //
+ // XXX: We cannot fully check that assumption: readBuffer.offset field is
+ // often out of sync with the buffer content, and if some buggy code updates
+ // the buffer while we were waiting for the processReplyAccessResult()
+ // callback, we may not notice.
+
StoreIOBuffer localTempBuffer;
- char *buf = next()->readBuffer.data;
- char *body_buf = buf + reply->hdr_sz;
+ const auto body_buf = next()->readBuffer.data;
//Server side may disable ranges under some circumstances.
if ((!http->request->range))
next()->readBuffer.offset = 0;
- body_buf -= next()->readBuffer.offset;
-
- if (next()->readBuffer.offset != 0) {
- if (next()->readBuffer.offset > body_size) {
+ if (next()->readBuffer.offset > 0) {
+ if (Less(body_size, next()->readBuffer.offset)) {
/* Can't use any of the body we received. send nothing */
localTempBuffer.length = 0;
localTempBuffer.data = NULL;
@@ -2148,7 +2097,6 @@ clientReplyContext::processReplyAccessResult(const allow_t &accessAllowed)
localTempBuffer.data = body_buf;
}
- /* TODO??: move the data in the buffer back by the request header size */
clientStreamCallback((clientStreamNode *)http->client_stream.head->data,
http, reply, localTempBuffer);
@@ -2161,6 +2109,8 @@ clientReplyContext::sendMoreData (StoreIOBuffer result)
if (deleting)
return;
+ debugs(88, 5, http->uri << " got " << result);
+
StoreEntry *entry = http->storeEntry();
if (ConnStateData * conn = http->getConn()) {
@@ -2173,7 +2123,9 @@ clientReplyContext::sendMoreData (StoreIOBuffer result)
return;
}
- if (reqofs==0 && !http->logType.isTcpHit()) {
+ if (!flags.headersSent && !http->logType.isTcpHit()) {
+ // We get here twice if processReplyAccessResult() calls startError().
+ // TODO: Revise when we check/change QoS markings to reduce syscalls.
if (Ip::Qos::TheConfig.isHitTosActive()) {
Ip::Qos::doTosLocalMiss(conn->clientConnection, http->request->hier.code);
}
@@ -2187,21 +2139,9 @@ clientReplyContext::sendMoreData (StoreIOBuffer result)
" out.offset=" << http->out.offset);
}
- char *buf = next()->readBuffer.data;
-
- if (buf != result.data) {
- /* we've got to copy some data */
- assert(result.length <= next()->readBuffer.length);
- memcpy(buf, result.data, result.length);
- }
-
/* We've got the final data to start pushing... */
flags.storelogiccomplete = 1;
- reqofs += result.length;
-
- assert(reqofs <= HTTP_REQBUF_SZ || flags.headersSent);
-
assert(http->request != NULL);
/* ESI TODO: remove this assert once everything is stable */
@@ -2210,20 +2150,25 @@ clientReplyContext::sendMoreData (StoreIOBuffer result)
makeThisHead();
- debugs(88, 5, "clientReplyContext::sendMoreData: " << http->uri << ", " <<
- reqofs << " bytes (" << result.length <<
- " new bytes)");
-
- /* update size of the request */
- reqsize = reqofs;
-
- if (errorInStream(result, reqofs)) {
+ if (errorInStream(result)) {
sendStreamError(result);
return;
}
+ if (!matchesStreamBodyBuffer(result)) {
+ // Subsequent processing expects response body bytes to be at the start
+ // of our Client Stream buffer. When given something else (e.g., bytes
+ // in our tempbuf), we copy and adjust to meet those expectations.
+ const auto &ourClientStreamsBuffer = next()->readBuffer;
+ assert(result.length <= ourClientStreamsBuffer.length);
+ memcpy(ourClientStreamsBuffer.data, result.data, result.length);
+ result.data = ourClientStreamsBuffer.data;
+ }
+
+ noteStreamBufferredBytes(result);
+
if (flags.headersSent) {
- pushStreamData (result, buf);
+ pushStreamData(result);
return;
}
@@ -2289,13 +2234,6 @@ clientReplyContext::createStoreEntry(const HttpRequestMethod& m, RequestFlags re
sc->setDelayId(DelayId::DelayClient(http));
#endif
- reqofs = 0;
-
- reqsize = 0;
-
- /* I don't think this is actually needed! -- adrian */
- /* http->reqbuf = http->norm_reqbuf; */
- // assert(http->reqbuf == http->norm_reqbuf);
/* The next line is illegal because we don't know if the client stream
* buffers have been set up
*/
diff --git a/src/client_side_reply.h b/src/client_side_reply.h
index dddab1a..bc702e3 100644
--- a/src/client_side_reply.h
+++ b/src/client_side_reply.h
@@ -39,7 +39,6 @@ public:
void purgeFoundGet(StoreEntry *newEntry);
void purgeFoundHead(StoreEntry *newEntry);
void purgeFoundObject(StoreEntry *entry);
- void sendClientUpstreamResponse();
void purgeDoPurgeGet(StoreEntry *entry);
void purgeDoPurgeHead(StoreEntry *entry);
void doGetMoreData();
@@ -67,7 +66,7 @@ public:
void processExpired();
clientStream_status_t replyStatus();
void processMiss();
- void traceReply(clientStreamNode * node);
+ void traceReply();
const char *storeId() const { return (http->store_id.size() > 0 ? http->store_id.termedBuf() : http->uri); }
Http::StatusCode purgeStatus;
@@ -77,13 +76,14 @@ public:
virtual void created (StoreEntry *newEntry);
ClientHttpRequest *http;
- int headers_sz;
store_client *sc; /* The store_client we're using */
StoreIOBuffer tempBuffer; /* For use in validating requests via IMS */
- int old_reqsize; /* ... again, for the buffer */
- size_t reqsize;
- size_t reqofs;
- char tempbuf[HTTP_REQBUF_SZ]; ///< a temporary buffer if we need working storage
+
+ /// Buffer dedicated to receiving storeClientCopy() responses to generated
+ /// revalidation requests. These requests cannot use next()->readBuffer
+ /// because the latter keeps the contents of the stale HTTP response during
+ /// revalidation. sendClientOldEntry() uses that contents.
+ char tempbuf[HTTP_REQBUF_SZ];
#if USE_CACHE_DIGESTS
const char *lookup_type; /* temporary hack: storeGet() result: HIT/MISS/NONE */
@@ -101,9 +101,10 @@ public:
private:
clientStreamNode *getNextNode() const;
void makeThisHead();
- bool errorInStream(StoreIOBuffer const &result, size_t const &sizeToProcess)const ;
+ bool errorInStream(const StoreIOBuffer &result) const;
+ bool matchesStreamBodyBuffer(const StoreIOBuffer &) const;
void sendStreamError(StoreIOBuffer const &result);
- void pushStreamData(StoreIOBuffer const &result, char *source);
+ void pushStreamData(const StoreIOBuffer &);
clientStreamNode * next() const;
StoreIOBuffer holdingBuffer;
HttpReply *reply;
@@ -115,11 +116,13 @@ private:
bool alwaysAllowResponse(Http::StatusCode sline) const;
int checkTransferDone();
void processOnlyIfCachedMiss();
- bool processConditional(StoreIOBuffer &result);
+ bool processConditional();
+ void noteStreamBufferredBytes(const StoreIOBuffer &);
void cacheHit(StoreIOBuffer result);
void handleIMSReply(StoreIOBuffer result);
void sendMoreData(StoreIOBuffer result);
- void triggerInitialStoreRead();
+ void triggerInitialStoreRead(STCB = SendMoreData);
+ void requestMoreBodyFromStore();
void sendClientOldEntry();
void purgeAllCached();
void forgetHit();
@@ -129,6 +132,13 @@ private:
void sendPreconditionFailedError();
void sendNotModified();
void sendNotModifiedOrPreconditionFailedError();
+ void sendClientUpstreamResponse(const StoreIOBuffer &upstreamResponse);
+
+ /// Reduces a chance of an accidental direct storeClientCopy() call that
+ /// (should but) forgets to invalidate our lastStreamBufferedBytes. This
+ /// function is not defined; decltype() syntax prohibits "= delete", but
+ /// function usage will trigger deprecation warnings and linking errors.
+ static decltype(::storeClientCopy) storeClientCopy [[deprecated]];
StoreEntry *old_entry;
/* ... for entry to be validated */
@@ -145,6 +155,12 @@ private:
} CollapsedRevalidation;
CollapsedRevalidation collapsedRevalidation;
+
+ /// HTTP response body bytes stored in our Client Stream buffer (if any)
+ StoreIOBuffer lastStreamBufferedBytes;
+
+ // TODO: Remove after moving the meat of this function into a method.
+ friend CSR clientGetMoreData;
};
#endif /* SQUID_CLIENTSIDEREPLY_H */
diff --git a/src/enums.h b/src/enums.h
index 4a860d8..262d62c 100644
--- a/src/enums.h
+++ b/src/enums.h
@@ -203,7 +203,6 @@ enum {
typedef enum {
DIGEST_READ_NONE,
DIGEST_READ_REPLY,
- DIGEST_READ_HEADERS,
DIGEST_READ_CBLOCK,
DIGEST_READ_MASK,
DIGEST_READ_DONE
diff --git a/src/icmp/net_db.cc b/src/icmp/net_db.cc
index 7dc42a2..ce8067a 100644
--- a/src/icmp/net_db.cc
+++ b/src/icmp/net_db.cc
@@ -33,6 +33,7 @@
#include "mgr/Registration.h"
#include "mime_header.h"
#include "neighbors.h"
+#include "sbuf/SBuf.h"
#include "SquidConfig.h"
#include "SquidTime.h"
#include "Store.h"
@@ -49,8 +50,6 @@
#include "ipcache.h"
#include "StoreClient.h"
-#define NETDB_REQBUF_SZ 4096
-
typedef enum {
STATE_NONE,
STATE_HEADER,
@@ -72,7 +71,6 @@ public:
buf_ofs(0),
connstate(STATE_HEADER)
{
- *buf = 0;
assert(NULL != r);
HTTPMSGLOCK(r);
@@ -92,10 +90,10 @@ public:
StoreEntry *e;
store_client *sc;
HttpRequest *r;
- int64_t used;
- size_t buf_sz;
- char buf[NETDB_REQBUF_SZ];
- int buf_ofs;
+
+ /// for receiving a NetDB reply body from Store and interpreting it
+ Store::ParsingBuffer parsingBuffer;
+
netdb_conn_state_t connstate;
};
@@ -698,24 +696,20 @@ netdbExchangeHandleReply(void *data, StoreIOBuffer receivedData)
Ip::Address addr;
netdbExchangeState *ex = (netdbExchangeState *)data;
- int rec_sz = 0;
- int o;
struct in_addr line_addr;
double rtt;
double hops;
- char *p;
int j;
HttpReply const *rep;
- size_t hdr_sz;
int nused = 0;
- int size;
- int oldbufofs = ex->buf_ofs;
- rec_sz = 0;
+ size_t rec_sz = 0; // received record size (TODO: make const)
rec_sz += 1 + sizeof(struct in_addr);
rec_sz += 1 + sizeof(int);
rec_sz += 1 + sizeof(int);
+ // to make progress without growing buffer space, we must parse at least one record per call
+ Assure(rec_sz <= ex->parsingBuffer.capacity());
debugs(38, 3, "netdbExchangeHandleReply: " << receivedData.length << " read bytes");
if (!cbdataReferenceValid(ex->p)) {
@@ -726,64 +720,28 @@ netdbExchangeHandleReply(void *data, StoreIOBuffer receivedData)
debugs(38, 3, "netdbExchangeHandleReply: for '" << ex->p->host << ":" << ex->p->http_port << "'");
- if (receivedData.length == 0 && !receivedData.flags.error) {
- debugs(38, 3, "netdbExchangeHandleReply: Done");
+ if (receivedData.flags.error) {
delete ex;
return;
}
- p = ex->buf;
-
- /* Get the size of the buffer now */
- size = ex->buf_ofs + receivedData.length;
- debugs(38, 3, "netdbExchangeHandleReply: " << size << " bytes buf");
-
- /* Check if we're still doing headers */
-
if (ex->connstate == STATE_HEADER) {
-
- ex->buf_ofs += receivedData.length;
-
- /* skip reply headers */
-
- if ((hdr_sz = headersEnd(p, ex->buf_ofs))) {
- debugs(38, 5, "netdbExchangeHandleReply: hdr_sz = " << hdr_sz);
- rep = ex->e->getReply();
- assert(rep->sline.status() != Http::scNone);
- debugs(38, 3, "netdbExchangeHandleReply: reply status " << rep->sline.status());
-
- if (rep->sline.status() != Http::scOkay) {
- delete ex;
- return;
- }
-
- assert((size_t)ex->buf_ofs >= hdr_sz);
-
- /*
- * Now, point p to the part of the buffer where the data
- * starts, and update the size accordingly
- */
- assert(ex->used == 0);
- ex->used = hdr_sz;
- size = ex->buf_ofs - hdr_sz;
- p += hdr_sz;
-
- /* Finally, set the conn state mode to STATE_BODY */
- ex->connstate = STATE_BODY;
- } else {
- StoreIOBuffer tempBuffer;
- tempBuffer.offset = ex->buf_ofs;
- tempBuffer.length = ex->buf_sz - ex->buf_ofs;
- tempBuffer.data = ex->buf + ex->buf_ofs;
- /* Have more headers .. */
- storeClientCopy(ex->sc, ex->e, tempBuffer,
- netdbExchangeHandleReply, ex);
+ const auto scode = ex->e->mem().baseReply().sline.status();
+ assert(scode != Http::scNone);
+ debugs(38, 3, "reply status " << scode);
+ if (scode != Http::scOkay) {
+ delete ex;
return;
}
+ ex->connstate = STATE_BODY;
}
assert(ex->connstate == STATE_BODY);
+ ex->parsingBuffer.appended(receivedData.data, receivedData.length);
+ auto p = ex->parsingBuffer.c_str(); // current parsing position
+ auto size = ex->parsingBuffer.contentSize(); // bytes we still need to parse
+
/* If we get here, we have some body to parse .. */
debugs(38, 5, "netdbExchangeHandleReply: start parsing loop, size = " << size);
@@ -792,6 +750,7 @@ netdbExchangeHandleReply(void *data, StoreIOBuffer receivedData)
addr.setAnyAddr();
hops = rtt = 0.0;
+ size_t o; // current record parsing offset
for (o = 0; o < rec_sz;) {
switch ((int) *(p + o)) {
@@ -829,8 +788,6 @@ netdbExchangeHandleReply(void *data, StoreIOBuffer receivedData)
assert(o == rec_sz);
- ex->used += rec_sz;
-
size -= rec_sz;
p += rec_sz;
@@ -838,32 +795,8 @@ netdbExchangeHandleReply(void *data, StoreIOBuffer receivedData)
++nused;
}
- /*
- * Copy anything that is left over to the beginning of the buffer,
- * and adjust buf_ofs accordingly
- */
-
- /*
- * Evilly, size refers to the buf size left now,
- * ex->buf_ofs is the original buffer size, so just copy that
- * much data over
- */
- memmove(ex->buf, ex->buf + (ex->buf_ofs - size), size);
-
- ex->buf_ofs = size;
-
- /*
- * And don't re-copy the remaining data ..
- */
- ex->used += size;
-
- /*
- * Now the tricky bit - size _included_ the leftover bit from the _last_
- * storeClientCopy. We don't want to include that, or our offset will be wrong.
- * So, don't count the size of the leftover buffer we began with.
- * This can _disappear_ when we're not tracking offsets ..
- */
- ex->used -= oldbufofs;
+ const auto parsedSize = ex->parsingBuffer.contentSize() - size;
+ ex->parsingBuffer.consume(parsedSize);
debugs(38, 3, "netdbExchangeHandleReply: size left over in this buffer: " << size << " bytes");
@@ -871,20 +804,26 @@ netdbExchangeHandleReply(void *data, StoreIOBuffer receivedData)
" entries, (x " << rec_sz << " bytes) == " << nused * rec_sz <<
" bytes total");
- debugs(38, 3, "netdbExchangeHandleReply: used " << ex->used);
-
if (EBIT_TEST(ex->e->flags, ENTRY_ABORTED)) {
debugs(38, 3, "netdbExchangeHandleReply: ENTRY_ABORTED");
delete ex;
- } else if (ex->e->store_status == STORE_PENDING) {
- StoreIOBuffer tempBuffer;
- tempBuffer.offset = ex->used;
- tempBuffer.length = ex->buf_sz - ex->buf_ofs;
- tempBuffer.data = ex->buf + ex->buf_ofs;
- debugs(38, 3, "netdbExchangeHandleReply: EOF not received");
- storeClientCopy(ex->sc, ex->e, tempBuffer,
- netdbExchangeHandleReply, ex);
+ return;
}
+
+ if (ex->sc->atEof()) {
+ if (const auto leftoverBytes = ex->parsingBuffer.contentSize())
+ debugs(38, 2, "discarding a partially received record due to Store EOF: " << leftoverBytes);
+ delete ex;
+ return;
+ }
+
+ // TODO: To protect us from a broken peer sending an "infinite" stream of
+ // new addresses, limit the cumulative number of received bytes or records?
+
+ const auto remainingSpace = ex->parsingBuffer.space().positionAt(receivedData.offset + receivedData.length);
+ // rec_sz is at most buffer capacity, and we consume all fully loaded records
+ Assure(remainingSpace.length);
+ storeClientCopy(ex->sc, ex->e, remainingSpace, netdbExchangeHandleReply, ex);
}
#endif /* USE_ICMP */
@@ -1296,14 +1235,9 @@ netdbExchangeStart(void *data)
ex->e = storeCreateEntry(uri, uri, RequestFlags(), Http::METHOD_GET);
assert(NULL != ex->e);
- StoreIOBuffer tempBuffer;
- tempBuffer.length = ex->buf_sz;
- tempBuffer.data = ex->buf;
-
ex->sc = storeClientListAdd(ex->e, ex);
+ storeClientCopy(ex->sc, ex->e, ex->parsingBuffer.makeInitialSpace(), netdbExchangeHandleReply, ex);
- storeClientCopy(ex->sc, ex->e, tempBuffer,
- netdbExchangeHandleReply, ex);
ex->r->flags.loopDetected = true; /* cheat! -- force direct */
// XXX: send as Proxy-Authenticate instead
diff --git a/src/peer_digest.cc b/src/peer_digest.cc
index 7b6314d..abfea4a 100644
--- a/src/peer_digest.cc
+++ b/src/peer_digest.cc
@@ -39,7 +39,6 @@ static EVH peerDigestCheck;
static void peerDigestRequest(PeerDigest * pd);
static STCB peerDigestHandleReply;
static int peerDigestFetchReply(void *, char *, ssize_t);
-int peerDigestSwapInHeaders(void *, char *, ssize_t);
int peerDigestSwapInCBlock(void *, char *, ssize_t);
int peerDigestSwapInMask(void *, char *, ssize_t);
static int peerDigestFetchedEnough(DigestFetchState * fetch, char *buf, ssize_t size, const char *step_name);
@@ -374,6 +373,9 @@ peerDigestRequest(PeerDigest * pd)
fetch->sc = storeClientListAdd(e, fetch);
/* set lastmod to trigger IMS request if possible */
+ // TODO: Also check for fetch->pd->cd presence as a precondition for sending
+ // IMS requests because peerDigestFetchReply() does not accept 304 responses
+ // without an in-memory cache digest.
if (old_e)
e->lastModified(old_e->lastModified());
@@ -408,6 +410,11 @@ peerDigestHandleReply(void *data, StoreIOBuffer receivedData)
digest_read_state_t prevstate;
int newsize;
+ if (receivedData.flags.error) {
+ peerDigestFetchAbort(fetch, fetch->buf, "failure loading digest reply from Store");
+ return;
+ }
+
assert(fetch->pd && receivedData.data);
/* The existing code assumes that the received pointer is
* where we asked the data to be put
@@ -444,10 +451,6 @@ peerDigestHandleReply(void *data, StoreIOBuffer receivedData)
retsize = peerDigestFetchReply(fetch, fetch->buf, fetch->bufofs);
break;
- case DIGEST_READ_HEADERS:
- retsize = peerDigestSwapInHeaders(fetch, fetch->buf, fetch->bufofs);
- break;
-
case DIGEST_READ_CBLOCK:
retsize = peerDigestSwapInCBlock(fetch, fetch->buf, fetch->bufofs);
break;
@@ -487,7 +490,7 @@ peerDigestHandleReply(void *data, StoreIOBuffer receivedData)
// checking at the beginning of this function. However, in this case, we would have to require
// that the parser does not regard EOF as a special condition (it is true now but may change
// in the future).
- if (!receivedData.length) { // EOF
+ if (fetch->sc->atEof()) {
peerDigestFetchAbort(fetch, fetch->buf, "premature end of digest reply");
return;
}
@@ -506,19 +509,12 @@ peerDigestHandleReply(void *data, StoreIOBuffer receivedData)
}
}
-/* wait for full http headers to be received then parse them */
-/*
- * This routine handles parsing the reply line.
- * If the reply line indicates an OK, the same data is thrown
- * to SwapInHeaders(). If the reply line is a NOT_MODIFIED,
- * we simply stop parsing.
- */
+/// handle HTTP response headers in the initial storeClientCopy() response
static int
peerDigestFetchReply(void *data, char *buf, ssize_t size)
{
DigestFetchState *fetch = (DigestFetchState *)data;
PeerDigest *pd = fetch->pd;
- size_t hdr_size;
assert(pd && buf);
assert(!fetch->offset);
@@ -527,7 +523,7 @@ peerDigestFetchReply(void *data, char *buf, ssize_t size)
if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestFetchReply"))
return -1;
- if ((hdr_size = headersEnd(buf, size))) {
+ {
HttpReply const *reply = fetch->entry->getReply();
assert(reply);
assert(reply->sline.status() != Http::scNone);
@@ -563,6 +559,15 @@ peerDigestFetchReply(void *data, char *buf, ssize_t size)
/* preserve request -- we need its size to update counters */
/* requestUnlink(r); */
/* fetch->entry->mem_obj->request = NULL; */
+
+ if (!fetch->pd->cd) {
+ peerDigestFetchAbort(fetch, buf, "304 without the old in-memory digest");
+ return -1;
+ }
+
+ // stay with the old in-memory digest
+ peerDigestFetchStop(fetch, buf, "Not modified");
+ fetch->state = DIGEST_READ_DONE;
} else if (status == Http::scOkay) {
/* get rid of old entry if any */
@@ -573,70 +578,15 @@ peerDigestFetchReply(void *data, char *buf, ssize_t size)
fetch->old_entry->unlock("peerDigestFetchReply 200");
fetch->old_entry = NULL;
}
+
+ fetch->state = DIGEST_READ_CBLOCK;
} else {
/* some kind of a bug */
peerDigestFetchAbort(fetch, buf, reply->sline.reason());
return -1; /* XXX -1 will abort stuff in ReadReply! */
}
- /* must have a ready-to-use store entry if we got here */
- /* can we stay with the old in-memory digest? */
- if (status == Http::scNotModified && fetch->pd->cd) {
- peerDigestFetchStop(fetch, buf, "Not modified");
- fetch->state = DIGEST_READ_DONE;
- } else {
- fetch->state = DIGEST_READ_HEADERS;
- }
- } else {
- /* need more data, do we have space? */
-
- if (size >= SM_PAGE_SIZE)
- peerDigestFetchAbort(fetch, buf, "reply header too big");
- }
-
- /* We don't want to actually ack that we've handled anything,
- * otherwise SwapInHeaders() won't get the reply line .. */
- return 0;
-}
-
-/* fetch headers from disk, pass on to SwapInCBlock */
-int
-peerDigestSwapInHeaders(void *data, char *buf, ssize_t size)
-{
- DigestFetchState *fetch = (DigestFetchState *)data;
- size_t hdr_size;
-
- assert(fetch->state == DIGEST_READ_HEADERS);
-
- if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestSwapInHeaders"))
- return -1;
-
- assert(!fetch->offset);
-
- if ((hdr_size = headersEnd(buf, size))) {
- assert(fetch->entry->getReply());
- assert(fetch->entry->getReply()->sline.status() != Http::scNone);
-
- if (fetch->entry->getReply()->sline.status() != Http::scOkay) {
- debugs(72, DBG_IMPORTANT, "peerDigestSwapInHeaders: " << fetch->pd->host <<
- " status " << fetch->entry->getReply()->sline.status() <<
- " got cached!");
-
- peerDigestFetchAbort(fetch, buf, "internal status error");
- return -1;
- }
-
- fetch->state = DIGEST_READ_CBLOCK;
- return hdr_size; /* Say how much data we read */
- }
-
- /* need more data, do we have space? */
- if (size >= SM_PAGE_SIZE) {
- peerDigestFetchAbort(fetch, buf, "stored header too big");
- return -1;
- }
-
- return 0; /* We need to read more to parse .. */
+ return 0; // we consumed/used no buffered bytes
}
int
diff --git a/src/store.cc b/src/store.cc
index 1948447..b4c7f82 100644
--- a/src/store.cc
+++ b/src/store.cc
@@ -273,6 +273,8 @@ StoreEntry::storeClientType() const
assert(mem_obj);
+ debugs(20, 7, *this << " inmem_lo=" << mem_obj->inmem_lo);
+
if (mem_obj->inmem_lo)
return STORE_DISK_CLIENT;
@@ -300,6 +302,7 @@ StoreEntry::storeClientType() const
return STORE_MEM_CLIENT;
}
}
+ debugs(20, 7, "STORE_OK STORE_DISK_CLIENT");
return STORE_DISK_CLIENT;
}
@@ -319,10 +322,18 @@ StoreEntry::storeClientType() const
if (swap_status == SWAPOUT_NONE)
return STORE_MEM_CLIENT;
+ // TODO: The above "must make this a mem client" logic contradicts "Slight
+ // weirdness" logic in store_client::doCopy() that converts hits to misses
+ // on startSwapin() failures. We should probably attempt to open a swapin
+ // file _here_ instead (and avoid STORE_DISK_CLIENT designation for clients
+ // that fail to do so). That would also address a similar problem with Rock
+ // store that does not yet support swapin during SWAPOUT_WRITING.
+
/*
* otherwise, make subsequent clients read from disk so they
* can not delay the first, and vice-versa.
*/
+ debugs(20, 7, "STORE_PENDING STORE_DISK_CLIENT");
return STORE_DISK_CLIENT;
}
diff --git a/src/store/Makefile.am b/src/store/Makefile.am
index be177d8..ccfc2dd 100644
--- a/src/store/Makefile.am
+++ b/src/store/Makefile.am
@@ -23,4 +23,6 @@ libstore_la_SOURCES= \
forward.h \
LocalSearch.cc \
LocalSearch.h \
+ ParsingBuffer.cc \
+ ParsingBuffer.h \
Storage.h
diff --git a/src/store/Makefile.in b/src/store/Makefile.in
index bb4387d..1ea6c45 100644
--- a/src/store/Makefile.in
+++ b/src/store/Makefile.in
@@ -163,7 +163,7 @@ CONFIG_CLEAN_FILES =
CONFIG_CLEAN_VPATH_FILES =
LTLIBRARIES = $(noinst_LTLIBRARIES)
libstore_la_LIBADD =
-am_libstore_la_OBJECTS = Controller.lo Disk.lo Disks.lo LocalSearch.lo
+am_libstore_la_OBJECTS = Controller.lo Disk.lo Disks.lo LocalSearch.lo ParsingBuffer.lo
libstore_la_OBJECTS = $(am_libstore_la_OBJECTS)
AM_V_lt = $(am__v_lt_@AM_V@)
am__v_lt_ = $(am__v_lt_@AM_DEFAULT_V@)
@@ -184,7 +184,7 @@ am__v_at_1 =
DEFAULT_INCLUDES =
depcomp = $(SHELL) $(top_srcdir)/cfgaux/depcomp
am__maybe_remake_depfiles = depfiles
-am__depfiles_remade = ./$(DEPDIR)/Controller.Plo ./$(DEPDIR)/Disk.Plo \
+am__depfiles_remade = ./$(DEPDIR)/Controller.Plo ./$(DEPDIR)/Disk.Plo ./$(DEPDIR)/ParsingBuffer.Plo \
./$(DEPDIR)/Disks.Plo ./$(DEPDIR)/LocalSearch.Plo
am__mv = mv -f
CXXCOMPILE = $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) \
@@ -776,6 +776,8 @@ libstore_la_SOURCES = \
forward.h \
LocalSearch.cc \
LocalSearch.h \
+ ParsingBuffer.cc \
+ ParsingBuffer.h \
Storage.h
all: all-recursive
@@ -846,6 +848,7 @@ distclean-compile:
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Disk.Plo@am__quote@ # am--include-marker
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Disks.Plo@am__quote@ # am--include-marker
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/LocalSearch.Plo@am__quote@ # am--include-marker
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/ParsingBuffer.Plo@am__quote@ # am--include-marker
$(am__depfiles_remade):
@$(MKDIR_P) $(@D)
@@ -1254,6 +1257,7 @@ distclean: distclean-recursive
-rm -f ./$(DEPDIR)/Disk.Plo
-rm -f ./$(DEPDIR)/Disks.Plo
-rm -f ./$(DEPDIR)/LocalSearch.Plo
+ -rm -f ./$(DEPDIR)/ParsingBuffer.Plo
-rm -f Makefile
distclean-am: clean-am distclean-compile distclean-generic \
distclean-tags
@@ -1303,6 +1307,7 @@ maintainer-clean: maintainer-clean-recursive
-rm -f ./$(DEPDIR)/Disk.Plo
-rm -f ./$(DEPDIR)/Disks.Plo
-rm -f ./$(DEPDIR)/LocalSearch.Plo
+ -rm -f ./$(DEPDIR)/ParsingBuffer.Plo
-rm -f Makefile
maintainer-clean-am: distclean-am maintainer-clean-generic
diff --git a/src/store/ParsingBuffer.cc b/src/store/ParsingBuffer.cc
new file mode 100644
index 0000000..e948fe2
--- /dev/null
+++ b/src/store/ParsingBuffer.cc
@@ -0,0 +1,198 @@
+/*
+ * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#include "squid.h"
+#include "sbuf/Stream.h"
+#include "SquidMath.h"
+#include "store/ParsingBuffer.h"
+#include "base/Assure.h"
+#include <iostream>
+
+// Several Store::ParsingBuffer() methods use assert() because the corresponding
+// failure means there is a good chance that somebody have already read from (or
+// written to) the wrong memory location. Since this buffer is used for storing
+// HTTP response bytes, such failures may corrupt traffic. No Assure() handling
+// code can safely recover from such failures.
+
+Store::ParsingBuffer::ParsingBuffer(StoreIOBuffer &initialSpace):
+ readerSuppliedMemory_(initialSpace)
+{
+}
+
+/// a read-only content start (or nil for some zero-size buffers)
+const char *
+Store::ParsingBuffer::memory() const
+{
+ return extraMemory_ ? extraMemory_->rawContent() : readerSuppliedMemory_.data;
+}
+
+size_t
+Store::ParsingBuffer::capacity() const
+{
+ return extraMemory_ ? (extraMemory_->length() + extraMemory_->spaceSize()) : readerSuppliedMemory_.length;
+}
+
+size_t
+Store::ParsingBuffer::contentSize() const
+{
+ return extraMemory_ ? extraMemory_->length() : readerSuppliedMemoryContentSize_;
+}
+
+void
+Store::ParsingBuffer::appended(const char * const newBytes, const size_t newByteCount)
+{
+ // a positive newByteCount guarantees that, after the first assertion below
+ // succeeds, the second assertion will not increment a nil memory() pointer
+ if (!newByteCount)
+ return;
+
+ // these checks order guarantees that memory() is not nil in the second assertion
+ assert(newByteCount <= spaceSize()); // the new bytes end in our space
+ assert(memory() + contentSize() == newBytes); // the new bytes start in our space
+ // and now we know that newBytes is not nil either
+
+ if (extraMemory_)
+ extraMemory_->rawAppendFinish(newBytes, newByteCount);
+ else
+ readerSuppliedMemoryContentSize_ = *IncreaseSum(readerSuppliedMemoryContentSize_, newByteCount);
+
+ assert(contentSize() <= capacity()); // paranoid
+}
+
+void
+Store::ParsingBuffer::consume(const size_t parsedBytes)
+{
+ Assure(contentSize() >= parsedBytes); // more conservative than extraMemory_->consume()
+ if (extraMemory_) {
+ extraMemory_->consume(parsedBytes);
+ } else {
+ readerSuppliedMemoryContentSize_ -= parsedBytes;
+ if (parsedBytes && readerSuppliedMemoryContentSize_)
+ memmove(readerSuppliedMemory_.data, memory() + parsedBytes, readerSuppliedMemoryContentSize_);
+ }
+}
+
+StoreIOBuffer
+Store::ParsingBuffer::space()
+{
+ const auto size = spaceSize();
+ const auto start = extraMemory_ ?
+ extraMemory_->rawAppendStart(size) :
+ (readerSuppliedMemory_.data + readerSuppliedMemoryContentSize_);
+ return StoreIOBuffer(spaceSize(), 0, start);
+}
+
+StoreIOBuffer
+Store::ParsingBuffer::makeSpace(const size_t pageSize)
+{
+ growSpace(pageSize);
+ auto result = space();
+ Assure(result.length >= pageSize);
+ result.length = pageSize;
+ return result;
+}
+
+StoreIOBuffer
+Store::ParsingBuffer::content() const
+{
+ // This const_cast is a StoreIOBuffer API limitation: That class does not
+ // support a "constant content view", even though it is used as such a view.
+ return StoreIOBuffer(contentSize(), 0, const_cast<char*>(memory()));
+}
+
+/// makes sure we have the requested number of bytes, allocates enough memory if needed
+void
+Store::ParsingBuffer::growSpace(const size_t minimumSpaceSize)
+{
+ const auto capacityIncreaseAttempt = IncreaseSum(contentSize(), minimumSpaceSize);
+ if (!capacityIncreaseAttempt)
+ throw TextException(ToSBuf("no support for a single memory block of ", contentSize(), '+', minimumSpaceSize, " bytes"), Here());
+ const auto newCapacity = *capacityIncreaseAttempt;
+
+ if (newCapacity <= capacity())
+ return; // already have enough space; no reallocation is needed
+
+ debugs(90, 7, "growing to provide " << minimumSpaceSize << " in " << *this);
+
+ if (extraMemory_) {
+ extraMemory_->reserveCapacity(newCapacity);
+ } else {
+ SBuf newStorage;
+ newStorage.reserveCapacity(newCapacity);
+ newStorage.append(readerSuppliedMemory_.data, readerSuppliedMemoryContentSize_);
+ extraMemory_ = std::move(newStorage);
+ }
+ Assure(spaceSize() >= minimumSpaceSize);
+}
+
+SBuf
+Store::ParsingBuffer::toSBuf() const
+{
+ return extraMemory_ ? *extraMemory_ : SBuf(content().data, content().length);
+}
+
+size_t
+Store::ParsingBuffer::spaceSize() const
+{
+ if (extraMemory_)
+ return extraMemory_->spaceSize();
+
+ assert(readerSuppliedMemoryContentSize_ <= readerSuppliedMemory_.length);
+ return readerSuppliedMemory_.length - readerSuppliedMemoryContentSize_;
+}
+
+/// 0-terminates stored byte sequence, allocating more memory if needed, but
+/// without increasing the number of stored content bytes
+void
+Store::ParsingBuffer::terminate()
+{
+ *makeSpace(1).data = 0;
+}
+
+StoreIOBuffer
+Store::ParsingBuffer::packBack()
+{
+ const auto bytesToPack = contentSize();
+ // until our callers do not have to work around legacy code expectations
+ Assure(bytesToPack);
+
+ // if we accumulated more bytes at some point, any extra metadata should
+ // have been consume()d by now, allowing readerSuppliedMemory_.data reuse
+ Assure(bytesToPack <= readerSuppliedMemory_.length);
+
+ auto result = readerSuppliedMemory_;
+ result.length = bytesToPack;
+ Assure(result.data);
+
+ if (!extraMemory_) {
+ // no accumulated bytes copying because they are in readerSuppliedMemory_
+ debugs(90, 7, "quickly exporting " << result.length << " bytes via " << readerSuppliedMemory_);
+ } else {
+ debugs(90, 7, "slowly exporting " << result.length << " bytes from " << extraMemory_->id << " back into " << readerSuppliedMemory_);
+ memmove(result.data, extraMemory_->rawContent(), result.length);
+ }
+
+ return result;
+}
+
+void
+Store::ParsingBuffer::print(std::ostream &os) const
+{
+ os << "size=" << contentSize();
+
+ if (extraMemory_) {
+ os << " capacity=" << capacity();
+ os << " extra=" << extraMemory_->id;
+ }
+
+ // report readerSuppliedMemory_ (if any) even if we are no longer using it
+ // for content storage; it affects packBack() and related parsing logic
+ if (readerSuppliedMemory_.length)
+ os << ' ' << readerSuppliedMemory_;
+}
+
diff --git a/src/store/ParsingBuffer.h b/src/store/ParsingBuffer.h
new file mode 100644
index 0000000..b8aa957
--- /dev/null
+++ b/src/store/ParsingBuffer.h
@@ -0,0 +1,128 @@
+/*
+ * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#ifndef SQUID_SRC_STORE_PARSINGBUFFER_H
+#define SQUID_SRC_STORE_PARSINGBUFFER_H
+
+#include "sbuf/SBuf.h"
+#include "StoreIOBuffer.h"
+
+#include <optional>
+
+namespace Store
+{
+
+/// A continuous buffer for efficient accumulation and NUL-termination of
+/// Store-read bytes. The buffer accumulates two kinds of Store readers:
+///
+/// * Readers that do not have any external buffer to worry about but need to
+/// accumulate, terminate, and/or consume buffered content read by Store.
+/// These readers use the default constructor and then allocate the initial
+/// buffer space for their first read (if any).
+///
+/// * Readers that supply their StoreIOBuffer at construction time. That buffer
+/// is enough to handle the majority of use cases. However, the supplied
+/// StoreIOBuffer capacity may be exceeded when parsing requires accumulating
+/// multiple Store read results and/or NUL-termination of a full buffer.
+///
+/// This buffer seamlessly grows as needed, reducing memory over-allocation and,
+/// in case of StoreIOBuffer-seeded construction, memory copies.
+class ParsingBuffer
+{
+public:
+ /// creates buffer without any space or content
+ ParsingBuffer() = default;
+
+ /// seeds this buffer with the caller-supplied buffer space
+ explicit ParsingBuffer(StoreIOBuffer &);
+
+ /// a NUL-terminated version of content(); same lifetime as content()
+ const char *c_str() { terminate(); return memory(); }
+
+ /// export content() into SBuf, avoiding content copying when possible
+ SBuf toSBuf() const;
+
+ /// the total number of append()ed bytes that were not consume()d
+ size_t contentSize() const;
+
+ /// the number of bytes in the space() buffer
+ size_t spaceSize() const;
+
+ /// the maximum number of bytes we can store without allocating more space
+ size_t capacity() const;
+
+ /// Stored append()ed bytes that have not been consume()d. The returned
+ /// buffer offset is set to zero; the caller is responsible for adjusting
+ /// the offset if needed (TODO: Add/return a no-offset Mem::View instead).
+ /// The returned buffer is invalidated by calling a non-constant method or
+ /// by changing the StoreIOBuffer contents given to our constructor.
+ StoreIOBuffer content() const;
+
+ /// A (possibly empty) buffer for reading the next byte(s). The returned
+ /// buffer offset is set to zero; the caller is responsible for adjusting
+ /// the offset if needed (TODO: Add/return a no-offset Mem::Area instead).
+ /// The returned buffer is invalidated by calling a non-constant method or
+ /// by changing the StoreIOBuffer contents given to our constructor.
+ StoreIOBuffer space();
+
+ /// A buffer for reading the exact number of next byte(s). The method may
+ /// allocate new memory and copy previously appended() bytes as needed.
+ /// \param pageSize the exact number of bytes the caller wants to read
+ /// \returns space() after any necessary allocations
+ StoreIOBuffer makeSpace(size_t pageSize);
+
+ /// A buffer suitable for the first storeClientCopy() call. The method may
+ /// allocate new memory and copy previously appended() bytes as needed.
+ /// \returns space() after any necessary allocations
+ /// \deprecated New clients should call makeSpace() with client-specific
+ /// pageSize instead of this one-size-fits-all legacy method.
+ StoreIOBuffer makeInitialSpace() { return makeSpace(4096); }
+
+ /// remember the new bytes received into the previously provided space()
+ void appended(const char *, size_t);
+
+ /// get rid of previously appended() prefix of a given size
+ void consume(size_t);
+
+ /// Returns stored content, reusing the StoreIOBuffer given at the
+ /// construction time. Copying is avoided if we did not allocate extra
+ /// memory since construction. Not meant for default-constructed buffers.
+ /// \prec positive contentSize() (\sa store_client::finishCallback())
+ StoreIOBuffer packBack();
+
+ /// summarizes object state (for debugging)
+ void print(std::ostream &) const;
+
+private:
+ const char *memory() const;
+ void terminate();
+ void growSpace(size_t);
+
+private:
+ /// externally allocated buffer we were seeded with (or a zero-size one)
+ StoreIOBuffer readerSuppliedMemory_;
+
+ /// append()ed to readerSuppliedMemory_ bytes that were not consume()d
+ size_t readerSuppliedMemoryContentSize_ = 0;
+
+ /// our internal buffer that takes over readerSuppliedMemory_ when the
+ /// latter becomes full and more memory is needed
+ std::optional<SBuf> extraMemory_;
+};
+
+inline std::ostream &
+operator <<(std::ostream &os, const ParsingBuffer &b)
+{
+ b.print(os);
+ return os;
+}
+
+} // namespace Store
+
+#endif /* SQUID_SRC_STORE_PARSINGBUFFER_H */
+
diff --git a/src/store/forward.h b/src/store/forward.h
index 1422a85..db5ee1c 100644
--- a/src/store/forward.h
+++ b/src/store/forward.h
@@ -46,6 +46,7 @@ class Disks;
class Disk;
class DiskConfig;
class EntryGuard;
+class ParsingBuffer;
typedef ::StoreEntry Entry;
typedef ::MemStore Memory;
diff --git a/src/store_client.cc b/src/store_client.cc
index 207c96b..1731c4c 100644
--- a/src/store_client.cc
+++ b/src/store_client.cc
@@ -16,9 +16,11 @@
#include "HttpRequest.h"
#include "MemBuf.h"
#include "MemObject.h"
+#include "sbuf/Stream.h"
#include "mime_header.h"
#include "profiler/Profiler.h"
#include "SquidConfig.h"
+#include "SquidMath.h"
#include "StatCounters.h"
#include "Store.h"
#include "store_swapin.h"
@@ -98,19 +100,6 @@ storeClientListAdd(StoreEntry * e, void *data)
return sc;
}
-/// schedules asynchronous STCB call to relay disk or memory read results
-/// \param outcome an error signal (if negative), an EOF signal (if zero), or the number of bytes read
-void
-store_client::callback(const ssize_t outcome)
-{
- if (outcome > 0)
- return noteCopiedBytes(outcome);
-
- if (outcome < 0)
- return fail();
-
- noteEof();
-}
/// finishCallback() wrapper; TODO: Add NullaryMemFunT for non-jobs.
void
store_client::FinishCallback(store_client * const sc)
@@ -125,14 +114,20 @@ store_client::finishCallback()
Assure(_callback.callback_handler);
Assure(_callback.notifier);
- // callers are not ready to handle a content+error combination
- Assure(object_ok || !copiedSize);
-
- StoreIOBuffer result(copiedSize, copyInto.offset, copyInto.data);
+ // XXX: Some legacy code relies on zero-length buffers having nil data
+ // pointers. Some other legacy code expects "correct" result.offset even
+ // when there is no body to return. Accommodate all those expectations.
+ auto result = StoreIOBuffer(0, copyInto.offset, nullptr);
+ if (object_ok && parsingBuffer && parsingBuffer->contentSize())
+ result = parsingBuffer->packBack();
result.flags.error = object_ok ? 0 : 1;
- copiedSize = 0;
- cmp_offset = result.offset + result.length;
+ // no HTTP headers and no body bytes (but not because there was no space)
+ atEof_ = !sendingHttpHeaders() && !result.length && copyInto.length;
+
+ parsingBuffer.reset();
+ ++answers;
+
STCB *temphandler = _callback.callback_handler;
void *cbdata = _callback.callback_data;
_callback = Callback(NULL, NULL);
@@ -144,35 +139,15 @@ store_client::finishCallback()
cbdataReferenceDone(cbdata);
}
-/// schedules asynchronous STCB call to relay a successful disk or memory read
-/// \param bytesCopied the number of response bytes copied into copyInto
-void
-store_client::noteCopiedBytes(const size_t bytesCopied)
-{
- debugs(90, 5, bytesCopied);
- Assure(bytesCopied > 0);
- Assure(!copiedSize);
- copiedSize = bytesCopied;
- noteNews();
-}
-
-void
-store_client::noteEof()
-{
- debugs(90, 5, copiedSize);
- Assure(!copiedSize);
- noteNews();
-}
-
store_client::store_client(StoreEntry *e) :
- cmp_offset(0),
#if STORE_CLIENT_LIST_DEBUG
owner(cbdataReference(data)),
#endif
entry(e),
type(e->storeClientType()),
object_ok(true),
- copiedSize(0)
+ atEof_(false),
+ answers(0)
{
flags.disk_io_pending = false;
flags.store_copying = false;
@@ -221,16 +196,29 @@ store_client::copy(StoreEntry * anEntry,
#endif
assert(!_callback.pending());
-#if ONLYCONTIGUOUSREQUESTS
-
- assert(cmp_offset == copyRequest.offset);
-#endif
- /* range requests will skip into the body */
- cmp_offset = copyRequest.offset;
_callback = Callback (callback_fn, cbdataReference(data));
copyInto.data = copyRequest.data;
copyInto.length = copyRequest.length;
copyInto.offset = copyRequest.offset;
+ Assure(copyInto.offset >= 0);
+
+ if (!copyInto.length) {
+ // During the first storeClientCopy() call, a zero-size buffer means
+ // that we will have to drop any HTTP response body bytes we read (with
+ // the HTTP headers from disk). After that, it means we cannot return
+ // anything to the caller at all.
+ debugs(90, 2, "WARNING: zero-size storeClientCopy() buffer: " << copyInto);
+ // keep going; moreToRead() should prevent any from-Store reading
+ }
+
+ // Our nextHttpReadOffset() expects the first copy() call to have zero
+ // offset. More complex code could handle a positive first offset, but it
+ // would only be useful when reading responses from memory: We would not
+ // _delay_ the response (to read the requested HTTP body bytes from disk)
+ // when we already can respond with HTTP headers.
+ Assure(!copyInto.offset || answeredOnce());
+
+ parsingBuffer.emplace(copyInto);
static bool copying (false);
assert (!copying);
@@ -258,33 +246,30 @@ store_client::copy(StoreEntry * anEntry,
// Add no code here. This object may no longer exist.
}
-/// Whether there is (or will be) more entry data for us.
+/// Whether Store has (or possibly will have) more entry data for us.
bool
-store_client::moreToSend() const
+store_client::moreToRead() const
{
+ if (!copyInto.length)
+ return false; // the client supplied a zero-size buffer
+
if (entry->store_status == STORE_PENDING)
return true; // there may be more coming
/* STORE_OK, including aborted entries: no more data is coming */
- const int64_t len = entry->objectLen();
+ if (canReadFromMemory())
+ return true; // memory has the first byte wanted by the client
- // If we do not know the entry length, then we have to open the swap file.
- const bool canSwapIn = entry->hasDisk();
- if (len < 0)
- return canSwapIn;
-
- if (copyInto.offset >= len)
- return false; // sent everything there is
+ if (!entry->hasDisk())
+ return false; // cannot read anything from disk either
- if (canSwapIn)
- return true; // if we lack prefix, we can swap it in
+ if (entry->objectLen() >= 0 && copyInto.offset >= entry->contentLen())
+ return false; // the disk cannot have byte(s) wanted by the client
- // If we cannot swap in, make sure we have what we want in RAM. Otherwise,
- // scheduleRead calls scheduleDiskRead which asserts without a swap file.
- const MemObject *mem = entry->mem_obj;
- return mem &&
- mem->inmem_lo <= copyInto.offset && copyInto.offset < mem->endOffset();
+ // we cannot be sure until we swap in metadata and learn contentLen(),
+ // but the disk may have the byte(s) wanted by the client
+ return true;
}
static void
@@ -311,6 +296,14 @@ storeClientCopy2(StoreEntry * e, store_client * sc)
sc->doCopy(e);
}
+/// Whether our answer, if sent right now, will announce the availability of
+/// HTTP response headers (to the STCB callback) for the first time.
+bool
+store_client::sendingHttpHeaders() const
+{
+ return !answeredOnce() && entry->mem().baseReply().hdr_sz > 0;
+}
+
void
store_client::doCopy(StoreEntry *anEntry)
{
@@ -322,20 +315,22 @@ store_client::doCopy(StoreEntry *anEntry)
flags.store_copying = true;
MemObject *mem = entry->mem_obj;
- debugs(33, 5, "store_client::doCopy: co: " <<
- copyInto.offset << ", hi: " <<
- mem->endOffset());
+ debugs(33, 5, this << " into " << copyInto <<
+ " hi: " << mem->endOffset() <<
+ " objectLen: " << entry->objectLen() <<
+ " past_answers: " << answers);
- if (!moreToSend()) {
+ const auto sendHttpHeaders = sendingHttpHeaders();
+
+ if (!sendHttpHeaders && !moreToRead()) {
/* There is no more to send! */
debugs(33, 3, HERE << "There is no more to send!");
- noteEof();
+ noteNews();
flags.store_copying = false;
return;
}
- /* Check that we actually have data */
- if (anEntry->store_status == STORE_PENDING && copyInto.offset >= mem->endOffset()) {
+ if (!sendHttpHeaders && anEntry->store_status == STORE_PENDING && nextHttpReadOffset() >= mem->endOffset()) {
debugs(90, 3, "store_client::doCopy: Waiting for more");
flags.store_copying = false;
return;
@@ -357,7 +352,24 @@ store_client::doCopy(StoreEntry *anEntry)
if (!startSwapin())
return; // failure
}
- scheduleRead();
+
+ // send any immediately available body bytes even if we also sendHttpHeaders
+ if (canReadFromMemory()) {
+ readFromMemory();
+ noteNews(); // will sendHttpHeaders (if needed) as well
+ flags.store_copying = false;
+ return;
+ }
+
+ if (sendHttpHeaders) {
+ debugs(33, 5, "just send HTTP headers: " << mem->baseReply().hdr_sz);
+ noteNews();
+ flags.store_copying = false;
+ return;
+ }
+
+ // no information that the client needs is available immediately
+ scheduleDiskRead();
}
/// opens the swapin "file" if possible; otherwise, fail()s and returns false
@@ -397,18 +409,7 @@ store_client::noteSwapInDone(const bool error)
if (error)
fail();
else
- noteEof();
-}
-
-void
-store_client::scheduleRead()
-{
- MemObject *mem = entry->mem_obj;
-
- if (copyInto.offset >= mem->inmem_lo && copyInto.offset < mem->endOffset())
- scheduleMemRead();
- else
- scheduleDiskRead();
+ noteNews();
}
void
@@ -433,15 +434,44 @@ store_client::scheduleDiskRead()
flags.store_copying = false;
}
+/// whether at least one byte wanted by the client is in memory
+bool
+store_client::canReadFromMemory() const
+{
+ const auto &mem = entry->mem();
+ const auto memReadOffset = nextHttpReadOffset();
+ return mem.inmem_lo <= memReadOffset && memReadOffset < mem.endOffset() &&
+ parsingBuffer->spaceSize();
+}
+
+/// The offset of the next stored HTTP response byte wanted by the client.
+int64_t
+store_client::nextHttpReadOffset() const
+{
+ Assure(parsingBuffer);
+ const auto &mem = entry->mem();
+ const auto hdr_sz = mem.baseReply().hdr_sz;
+ // Certain SMP cache manager transactions do not store HTTP headers in
+ // mem_hdr; they store just a kid-specific piece of the future report body.
+ // In such cases, hdr_sz ought to be zero. In all other (known) cases,
+ // mem_hdr contains HTTP response headers (positive hdr_sz if parsed)
+ // followed by HTTP response body. This code math accommodates all cases.
+ return NaturalSum<int64_t>(hdr_sz, copyInto.offset, parsingBuffer->contentSize()).value();
+}
+
+/// Copies at least some of the requested body bytes from MemObject memory,
+/// satisfying the copy() request.
+/// \pre canReadFromMemory() is true
void
-store_client::scheduleMemRead()
+store_client::readFromMemory()
{
- /* What the client wants is in memory */
- /* Old style */
- debugs(90, 3, "store_client::doCopy: Copying normal from memory");
- const auto sz = entry->mem_obj->data_hdr.copy(copyInto); // may be <= 0 per copy() API
- callback(sz);
- flags.store_copying = false;
+ Assure(parsingBuffer);
+ const auto readInto = parsingBuffer->space().positionAt(nextHttpReadOffset());
+
+ debugs(90, 3, "copying HTTP body bytes from memory into " << readInto);
+ const auto sz = entry->mem_obj->data_hdr.copy(readInto);
+ Assure(sz > 0); // our canReadFromMemory() precondition guarantees that
+ parsingBuffer->appended(readInto.data, sz);
}
void
@@ -453,59 +483,136 @@ store_client::fileRead()
assert(!flags.disk_io_pending);
flags.disk_io_pending = true;
+ // mem->swap_hdr_sz is zero here during initial read(s)
+ const auto nextStoreReadOffset = NaturalSum<int64_t>(mem->swap_hdr_sz, nextHttpReadOffset()).value();
+
+ // XXX: If fileRead() is called when we do not yet know mem->swap_hdr_sz,
+ // then we must start reading from disk offset zero to learn it: we cannot
+ // compute correct HTTP response start offset on disk without it. However,
+ // late startSwapin() calls imply that the assertion below might fail.
+ Assure(mem->swap_hdr_sz > 0 || !nextStoreReadOffset);
+
+ // TODO: Remove this assertion. Introduced in 1998 commit 3157c72, it
+ // assumes that swapped out memory is freed unconditionally, but we no
+ // longer do that because trimMemory() path checks lowestMemReaderOffset().
+ // It is also misplaced: We are not swapping out anything here and should
+ // not care about any swapout invariants.
+
if (mem->swap_hdr_sz != 0)
if (entry->swappingOut())
- assert(mem->swapout.sio->offset() > copyInto.offset + (int64_t)mem->swap_hdr_sz);
+ assert(mem->swapout.sio->offset() > nextStoreReadOffset);
+
+ // XXX: We should let individual cache_dirs limit the read size instead, but
+ // we cannot do that without more fixes and research because:
+ // * larger reads corrupt responses when cache_dir uses SharedMemory::get();
+ // * we do not know how to find all I/O code that assumes this limit;
+ // * performance effects of larger disk reads may be negative somewhere.
+ const decltype(StoreIOBuffer::length) maxReadSize = SM_PAGE_SIZE;
+
+ Assure(parsingBuffer);
+ // also, do not read more than we can return (via a copyInto.length buffer)
+ const auto readSize = std::min(copyInto.length, maxReadSize);
+ lastDiskRead = parsingBuffer->makeSpace(readSize).positionAt(nextStoreReadOffset);
+ debugs(90, 5, "into " << lastDiskRead);
storeRead(swapin_sio,
- copyInto.data,
- copyInto.length,
- copyInto.offset + mem->swap_hdr_sz,
+ lastDiskRead.data,
+ lastDiskRead.length,
+ lastDiskRead.offset,
mem->swap_hdr_sz == 0 ? storeClientReadHeader
: storeClientReadBody,
this);
}
void
-store_client::readBody(const char *, ssize_t len)
+store_client::readBody(const char * const buf, const ssize_t lastIoResult)
{
int parsed_header = 0;
- // Don't assert disk_io_pending here.. may be called by read_header
+ Assure(flags.disk_io_pending);
flags.disk_io_pending = false;
assert(_callback.pending());
- debugs(90, 3, "storeClientReadBody: len " << len << "");
+ Assure(parsingBuffer);
+ debugs(90, 3, "got " << lastIoResult << " using " << *parsingBuffer);
+ if (lastIoResult < 0)
+ return fail();
- if (len < 0)
+ if (!lastIoResult) {
+ if (answeredOnce())
+ return noteNews();
+
+ debugs(90, DBG_CRITICAL, "ERROR: Truncated HTTP headers in on-disk object");
return fail();
+ }
+ assert(lastDiskRead.data == buf);
+ lastDiskRead.length = lastIoResult;
- if (copyInto.offset == 0 && len > 0 && entry->getReply()->sline.status() == Http::scNone) {
- /* Our structure ! */
- HttpReply *rep = (HttpReply *) entry->getReply(); // bypass const
+ parsingBuffer->appended(buf, lastIoResult);
- if (!rep->parseCharBuf(copyInto.data, headersEnd(copyInto.data, len))) {
- debugs(90, DBG_CRITICAL, "Could not parse headers from on disk object");
- } else {
- parsed_header = 1;
- }
+ // we know swap_hdr_sz by now and were reading beyond swap metadata because
+ // readHead() would have been called otherwise (to read swap metadata)
+ const auto swap_hdr_sz = entry->mem().swap_hdr_sz;
+ Assure(swap_hdr_sz > 0);
+ Assure(!Less(lastDiskRead.offset, swap_hdr_sz));
+
+ // Map lastDiskRead (i.e. the disk area we just read) to an HTTP reply part.
+ // The bytes are the same, but disk and HTTP offsets differ by swap_hdr_sz.
+ const auto httpOffset = lastDiskRead.offset - swap_hdr_sz;
+ const auto httpPart = StoreIOBuffer(lastDiskRead).positionAt(httpOffset);
+
+ maybeWriteFromDiskToMemory(httpPart);
+ handleBodyFromDisk();
+}
+
+/// de-serializes HTTP response (partially) read from disk storage
+void
+store_client::handleBodyFromDisk()
+{
+ // We cannot de-serialize on-disk HTTP response without MemObject because
+ // without MemObject::swap_hdr_sz we cannot know where that response starts.
+ Assure(entry->mem_obj);
+ Assure(entry->mem_obj->swap_hdr_sz > 0);
+
+ if (!answeredOnce()) {
+ // All on-disk responses have HTTP headers. First disk body read(s)
+ // include HTTP headers that we must parse (if needed) and skip.
+ const auto haveHttpHeaders = entry->mem_obj->baseReply().pstate == Http::Message::psParsed;
+ if (!haveHttpHeaders && !parseHttpHeadersFromDisk())
+ return;
+ skipHttpHeadersFromDisk();
}
const HttpReply *rep = entry->getReply();
- if (len > 0 && rep && entry->mem_obj->inmem_lo == 0 && entry->objectLen() <= (int64_t)Config.Store.maxInMemObjSize && Config.onoff.memory_cache_disk) {
- storeGetMemSpace(len);
+ noteNews();
+}
+
+/// Adds HTTP response data loaded from disk to the memory cache (if
+/// needed/possible). The given part may contain portions of HTTP response
+/// headers and/or HTTP response body.
+void
+store_client::maybeWriteFromDiskToMemory(const StoreIOBuffer &httpResponsePart)
+{
+ // XXX: Reject [memory-]uncachable/unshareable responses instead of assuming
+ // that an HTTP response should be written to MemObject's data_hdr (and that
+ // it may purge already cached entries) just because it "fits" and was
+ // loaded from disk. For example, this response may already be marked for
+ // release. The (complex) cachability decision(s) should be made outside
+ // (and obeyed by) this low-level code.
+ if (httpResponsePart.length && entry->mem_obj->inmem_lo == 0 && entry->objectLen() <= (int64_t)Config.Store.maxInMemObjSize && Config.onoff.memory_cache_disk) {
+ storeGetMemSpace(httpResponsePart.length);
+ // XXX: This "recheck" is not needed because storeGetMemSpace() cannot
+ // purge mem_hdr bytes of a locked entry, and we do lock ours. And
+ // inmem_lo offset itself should not be relevant to appending new bytes.
+ //
// The above may start to free our object so we need to check again
if (entry->mem_obj->inmem_lo == 0) {
- /* Copy read data back into memory.
- * copyInto.offset includes headers, which is what mem cache needs
- */
- int64_t mem_offset = entry->mem_obj->endOffset();
- if ((copyInto.offset == mem_offset) || (parsed_header && mem_offset == rep->hdr_sz)) {
- entry->mem_obj->write(StoreIOBuffer(len, copyInto.offset, copyInto.data));
+ // XXX: This code assumes a non-shared memory cache.
+ if (httpResponsePart.offset == entry->mem_obj->endOffset())
+ entry->mem_obj->write(httpResponsePart);
}
}
}
- callback(len);
}
void
@@ -615,38 +722,21 @@ store_client::readHeader(char const *buf, ssize_t len)
if (!object_ok)
return;
+ Assure(parsingBuffer);
+ debugs(90, 3, "got " << len << " using " << *parsingBuffer);
+
if (len < 0)
return fail();
+ Assure(!parsingBuffer->contentSize());
+ parsingBuffer->appended(buf, len);
if (!unpackHeader(buf, len)) {
fail();
return;
}
-
- /*
- * If our last read got some data the client wants, then give
- * it to them, otherwise schedule another read.
- */
- size_t body_sz = len - mem->swap_hdr_sz;
-
- if (copyInto.offset < static_cast<int64_t>(body_sz)) {
- /*
- * we have (part of) what they want
- */
- size_t copy_sz = min(copyInto.length, body_sz);
- debugs(90, 3, "storeClientReadHeader: copying " << copy_sz << " bytes of body");
- memmove(copyInto.data, copyInto.data + mem->swap_hdr_sz, copy_sz);
-
- readBody(copyInto.data, copy_sz);
-
- return;
- }
-
- /*
- * we don't have what the client wants, but at least we now
- * know the swap header size.
- */
- fileRead();
+ parsingBuffer->consume(mem->swap_hdr_sz);
+ maybeWriteFromDiskToMemory(parsingBuffer->content());
+ handleBodyFromDisk();
}
int
@@ -903,6 +993,63 @@ CheckQuickAbortIsReasonable(StoreEntry * entry)
return true;
}
+/// parses HTTP header bytes loaded from disk
+/// \returns false if fail() or scheduleDiskRead() has been called and, hence,
+/// the caller should just quit without any further action
+bool
+store_client::parseHttpHeadersFromDisk()
+{
+ try {
+ return tryParsingHttpHeaders();
+ } catch (...) {
+ // XXX: Our parser enforces Config.maxReplyHeaderSize limit, but our
+ // packer does not. Since packing might increase header size, we may
+ // cache a header that we cannot parse and get here. Same for MemStore.
+ debugs(90, DBG_CRITICAL, "ERROR: Cannot parse on-disk HTTP headers" <<
+ Debug::Extra << "exception: " << CurrentException <<
+ Debug::Extra << "raw input size: " << parsingBuffer->contentSize() << " bytes" <<
+ Debug::Extra << "current buffer capacity: " << parsingBuffer->capacity() << " bytes");
+ fail();
+ return false;
+ }
+}
+
+/// parseHttpHeadersFromDisk() helper
+/// \copydoc parseHttpHeaders()
+bool
+store_client::tryParsingHttpHeaders()
+{
+ Assure(parsingBuffer);
+ Assure(!copyInto.offset); // otherwise, parsingBuffer cannot have HTTP response headers
+ auto &adjustableReply = entry->mem().adjustableBaseReply();
+ if (adjustableReply.parseTerminatedPrefix(parsingBuffer->c_str(), parsingBuffer->contentSize()))
+ return true;
+
+ // TODO: Optimize by checking memory as well. For simplicity sake, we
+ // continue on the disk-reading path, but readFromMemory() can give us the
+ // missing header bytes immediately if a concurrent request put those bytes
+ // into memory while we were waiting for our disk response.
+ scheduleDiskRead();
+ return false;
+}
+
+/// skips HTTP header bytes previously loaded from disk
+void
+store_client::skipHttpHeadersFromDisk()
+{
+ const auto hdr_sz = entry->mem_obj->baseReply().hdr_sz;
+ Assure(hdr_sz > 0); // all on-disk responses have HTTP headers
+ if (Less(parsingBuffer->contentSize(), hdr_sz)) {
+ debugs(90, 5, "discovered " << hdr_sz << "-byte HTTP headers in memory after reading some of them from disk: " << *parsingBuffer);
+ parsingBuffer->consume(parsingBuffer->contentSize()); // skip loaded HTTP header prefix
+ } else {
+ parsingBuffer->consume(hdr_sz); // skip loaded HTTP headers
+ const auto httpBodyBytesAfterHeader = parsingBuffer->contentSize(); // may be zero
+ Assure(httpBodyBytesAfterHeader <= copyInto.length);
+ debugs(90, 5, "read HTTP body prefix: " << httpBodyBytesAfterHeader);
+ }
+}
+
void
store_client::dumpStats(MemBuf * output, int clientNumber) const
{
diff --git a/src/tests/stub_HttpReply.cc b/src/tests/stub_HttpReply.cc
index 8ca7f9e..5cde8e6 100644
--- a/src/tests/stub_HttpReply.cc
+++ b/src/tests/stub_HttpReply.cc
@@ -25,6 +25,7 @@ void httpBodyPackInto(const HttpBody *, Packable *) STUB
bool HttpReply::sanityCheckStartLine(const char *buf, const size_t hdr_len, Http::StatusCode *error) STUB_RETVAL(false)
int HttpReply::httpMsgParseError() STUB_RETVAL(0)
bool HttpReply::expectingBody(const HttpRequestMethod&, int64_t&) const STUB_RETVAL(false)
+size_t HttpReply::parseTerminatedPrefix(const char *, size_t) STUB_RETVAL(0)
bool HttpReply::parseFirstLine(const char *start, const char *end) STUB_RETVAL(false)
void HttpReply::hdrCacheInit() STUB
HttpReply * HttpReply::clone() const STUB_RETVAL(NULL)
diff --git a/src/urn.cc b/src/urn.cc
index 74453e1..9f5e89d 100644
--- a/src/urn.cc
+++ b/src/urn.cc
@@ -26,8 +26,6 @@
#include "tools.h"
#include "urn.h"
-#define URN_REQBUF_SZ 4096
-
class UrnState : public StoreClient
{
CBDATA_CLASS(UrnState);
@@ -45,8 +43,8 @@ public:
HttpRequest::Pointer request;
HttpRequest::Pointer urlres_r;
- char reqbuf[URN_REQBUF_SZ] = { '\0' };
- int reqofs = 0;
+ /// for receiving a URN resolver reply body from Store and interpreting it
+ Store::ParsingBuffer parsingBuffer;
private:
char *urlres;
@@ -63,7 +61,7 @@ typedef struct {
} url_entry;
static STCB urnHandleReply;
-static url_entry *urnParseReply(const char *inbuf, const HttpRequestMethod&);
+static url_entry *urnParseReply(const SBuf &, const HttpRequestMethod &);
static const char *const crlf = "\r\n";
CBDATA_CLASS_INIT(UrnState);
@@ -183,13 +181,8 @@ UrnState::created(StoreEntry *newEntry)
sc = storeClientListAdd(urlres_e, this);
}
- reqofs = 0;
- StoreIOBuffer tempBuffer;
- tempBuffer.offset = reqofs;
- tempBuffer.length = URN_REQBUF_SZ;
- tempBuffer.data = reqbuf;
storeClientCopy(sc, urlres_e,
- tempBuffer,
+ parsingBuffer.makeInitialSpace(),
urnHandleReply,
this);
}
@@ -224,9 +217,6 @@ urnHandleReply(void *data, StoreIOBuffer result)
UrnState *urnState = static_cast<UrnState *>(data);
StoreEntry *e = urnState->entry;
StoreEntry *urlres_e = urnState->urlres_e;
- char *s = NULL;
- size_t k;
- HttpReply *rep;
url_entry *urls;
url_entry *u;
url_entry *min_u;
@@ -234,10 +224,8 @@ urnHandleReply(void *data, StoreIOBuffer result)
ErrorState *err;
int i;
int urlcnt = 0;
- char *buf = urnState->reqbuf;
- StoreIOBuffer tempBuffer;
- debugs(52, 3, "urnHandleReply: Called with size=" << result.length << ".");
+ debugs(52, 3, result << " with " << *e);
if (EBIT_TEST(urlres_e->flags, ENTRY_ABORTED) || result.flags.error) {
delete urnState;
@@ -250,59 +238,38 @@ urnHandleReply(void *data, StoreIOBuffer result)
return;
}
- /* Update reqofs to point to where in the buffer we'd be */
- urnState->reqofs += result.length;
-
- /* Handle reqofs being bigger than normal */
- if (urnState->reqofs >= URN_REQBUF_SZ) {
- delete urnState;
- return;
- }
++ urnState->parsingBuffer.appended(result.data, result.length);
/* If we haven't received the entire object (urn), copy more */
- if (urlres_e->store_status == STORE_PENDING) {
- Must(result.length > 0); // zero length ought to imply STORE_OK
- tempBuffer.offset = urnState->reqofs;
- tempBuffer.length = URN_REQBUF_SZ - urnState->reqofs;
- tempBuffer.data = urnState->reqbuf + urnState->reqofs;
+ if (!urnState->sc->atEof()) {
+ const auto bufferedBytes = urnState->parsingBuffer.contentSize();
+ const auto remainingSpace = urnState->parsingBuffer.space().positionAt(bufferedBytes);
+
+ if (!remainingSpace.length) {
+ debugs(52, 3, "ran out of buffer space after " << bufferedBytes << " bytes");
+ // TODO: Here and in other error cases, send ERR_URN_RESOLVE to client.
+ delete urnState;
+ return;
+ }
storeClientCopy(urnState->sc, urlres_e,
- tempBuffer,
+ remainingSpace,
urnHandleReply,
urnState);
return;
}
- /* we know its STORE_OK */
- k = headersEnd(buf, urnState->reqofs);
-
- if (0 == k) {
- debugs(52, DBG_IMPORTANT, "urnHandleReply: didn't find end-of-headers for " << e->url() );
- delete urnState;
- return;
- }
-
- s = buf + k;
- assert(urlres_e->getReply());
- rep = new HttpReply;
- rep->parseCharBuf(buf, k);
- debugs(52, 3, "reply exists, code=" << rep->sline.status() << ".");
-
- if (rep->sline.status() != Http::scOkay) {
+ const auto &peerReply = urlres_e->mem().baseReply();
+ debugs(52, 3, "got reply, code=" << peerReply.sline.status());
+ if (peerReply.sline.status() != Http::scOkay) {
debugs(52, 3, "urnHandleReply: failed.");
err = new ErrorState(ERR_URN_RESOLVE, Http::scNotFound, urnState->request.getRaw());
err->url = xstrdup(e->url());
errorAppendEntry(e, err);
- delete rep;
delete urnState;
return;
}
- delete rep;
-
- while (xisspace(*s))
- ++s;
-
- urls = urnParseReply(s, urnState->request->method);
+ urls = urnParseReply(urnState->parsingBuffer.toSBuf(), urnState->request->method);
if (!urls) { /* unknown URN error */
debugs(52, 3, "urnTranslateDone: unknown URN " << e->url());
@@ -350,7 +317,7 @@ urnHandleReply(void *data, StoreIOBuffer result)
"Generated by %s@%s\n"
"</ADDRESS>\n",
APP_FULLNAME, getMyHostname());
- rep = new HttpReply;
+ const auto rep = new HttpReply;
rep->setHeaders(Http::scFound, NULL, "text/html", mb->contentSize(), 0, squid_curtime);
if (min_u) {
@@ -372,9 +339,8 @@ urnHandleReply(void *data, StoreIOBuffer result)
}
static url_entry *
-urnParseReply(const char *inbuf, const HttpRequestMethod& m)
+urnParseReply(const SBuf &inBuf, const HttpRequestMethod &m)
{
- char *buf = xstrdup(inbuf);
char *token;
url_entry *list;
url_entry *old;
@@ -383,6 +349,13 @@ urnParseReply(const char *inbuf, const HttpRequestMethod& m)
debugs(52, 3, "urnParseReply");
list = (url_entry *)xcalloc(n + 1, sizeof(*list));
+ // XXX: Switch to tokenizer-based parsing.
+ const auto allocated = SBufToCstring(inBuf);
+
+ auto buf = allocated;
+ while (xisspace(*buf))
+ ++buf;
+
for (token = strtok(buf, crlf); token; token = strtok(NULL, crlf)) {
debugs(52, 3, "urnParseReply: got '" << token << "'");
@@ -418,7 +391,7 @@ urnParseReply(const char *inbuf, const HttpRequestMethod& m)
}
debugs(52, 3, "urnParseReply: Found " << i << " URLs");
- xfree(buf);
+ xfree(allocated);
return list;
}
--
2.39.3