squid/squid-5.5-CVE-2023-5824.patch

3927 lines
142 KiB
Diff

diff --git a/src/HttpHeader.cc b/src/HttpHeader.cc
index dfd17ae..539c5da 100644
--- a/src/HttpHeader.cc
+++ b/src/HttpHeader.cc
@@ -9,6 +9,7 @@
/* DEBUG: section 55 HTTP Header */
#include "squid.h"
+#include "base/Assure.h"
#include "base/CharacterSet.h"
#include "base/EnumIterator.h"
#include "base64.h"
diff --git a/src/HttpReply.cc b/src/HttpReply.cc
index f002a6f..25099bc 100644
--- a/src/HttpReply.cc
+++ b/src/HttpReply.cc
@@ -21,7 +21,9 @@
#include "HttpReply.h"
#include "HttpRequest.h"
#include "MemBuf.h"
+#include "sbuf/Stream.h"
#include "SquidConfig.h"
+#include "SquidMath.h"
#include "SquidTime.h"
#include "Store.h"
#include "StrList.h"
@@ -457,6 +459,38 @@ HttpReply::parseFirstLine(const char *blk_start, const char *blk_end)
return sline.parse(protoPrefix, blk_start, blk_end);
}
+size_t
+HttpReply::parseTerminatedPrefix(const char * const terminatedBuf, const size_t bufSize)
+{
+ auto error = Http::scNone;
+ const bool eof = false; // TODO: Remove after removing atEnd from HttpHeader::parse()
+ if (parse(terminatedBuf, bufSize, eof, &error)) {
+ debugs(58, 7, "success after accumulating " << bufSize << " bytes and parsing " << hdr_sz);
+ Assure(pstate == Http::Message::psParsed);
+ Assure(hdr_sz > 0);
+ Assure(!Less(bufSize, hdr_sz)); // cannot parse more bytes than we have
+ return hdr_sz; // success
+ }
+
+ Assure(pstate != Http::Message::psParsed);
+ hdr_sz = 0;
+
+ if (error) {
+ throw TextException(ToSBuf("failed to parse HTTP headers",
+ Debug::Extra, "parser error code: ", error,
+ Debug::Extra, "accumulated unparsed bytes: ", bufSize,
+ Debug::Extra, "reply_header_max_size: ", Config.maxReplyHeaderSize),
+ Here());
+ }
+
+ debugs(58, 3, "need more bytes after accumulating " << bufSize << " out of " << Config.maxReplyHeaderSize);
+
+ // the parse() call above enforces Config.maxReplyHeaderSize limit
+ // XXX: Make this a strict comparison after fixing Http::Message::parse() enforcement
+ Assure(bufSize <= Config.maxReplyHeaderSize);
+ return 0; // parsed nothing, need more data
+}
+
void
HttpReply::configureContentLengthInterpreter(Http::ContentLengthInterpreter &interpreter)
{
diff --git a/src/HttpReply.h b/src/HttpReply.h
index 878d4f1..77ef7e6 100644
--- a/src/HttpReply.h
+++ b/src/HttpReply.h
@@ -132,6 +132,13 @@ public:
/// parses reply header using Parser
bool parseHeader(Http1::Parser &hp);
+ /// Parses response status line and headers at the start of the given
+ /// NUL-terminated buffer of the given size. Respects reply_header_max_size.
+ /// Assures pstate becomes Http::Message::psParsed on (and only on) success.
+ /// \returns the number of bytes in a successfully parsed prefix (or zero)
+ /// \retval 0 implies that more data is needed to parse the response prefix
+ size_t parseTerminatedPrefix(const char *, size_t);
+
private:
/** initialize */
void init();
diff --git a/src/MemObject.cc b/src/MemObject.cc
index e0e44c4..7f154ca 100644
--- a/src/MemObject.cc
+++ b/src/MemObject.cc
@@ -187,8 +187,8 @@ struct LowestMemReader : public unary_function<store_client, void> {
LowestMemReader(int64_t seed):current(seed) {}
void operator() (store_client const &x) {
- if (x.memReaderHasLowerOffset(current))
- current = x.copyInto.offset;
+ if (x.getType() == STORE_MEM_CLIENT)
+ current = std::min(current, x.discardableHttpEnd());
}
int64_t current;
@@ -374,6 +374,12 @@ MemObject::policyLowestOffsetToKeep(bool swap) const
*/
int64_t lowest_offset = lowestMemReaderOffset();
+ // XXX: Remove the last (Config.onoff.memory_cache_first-based) condition
+ // and update keepForLocalMemoryCache() accordingly. The caller wants to
+ // remove all local memory that is safe to remove. Honoring caching
+ // preferences is its responsibility. Our responsibility is safety. The
+ // situation was different when ff4b33f added that condition -- there was no
+ // keepInLocalMemory/keepForLocalMemoryCache() call guard back then.
if (endOffset() < lowest_offset ||
endOffset() - inmem_lo > (int64_t)Config.Store.maxInMemObjSize ||
(swap && !Config.onoff.memory_cache_first))
@@ -497,7 +503,7 @@ MemObject::mostBytesAllowed() const
#endif
- j = sc->delayId.bytesWanted(0, sc->copyInto.length);
+ j = sc->bytesWanted();
if (j > jmax) {
jmax = j;
diff --git a/src/MemObject.h b/src/MemObject.h
index 1b9b6f0..56bcd46 100644
--- a/src/MemObject.h
+++ b/src/MemObject.h
@@ -89,6 +89,15 @@ public:
bool appliedUpdates = false;
void stat (MemBuf * mb) const;
+
+ /// The offset of the last memory-stored HTTP response byte plus one.
+ /// * HTTP response headers (if any) are stored at offset zero.
+ /// * HTTP response body byte[n] usually has offset (hdr_sz + n), where
+ /// hdr_sz is the size of stored HTTP response headers (zero if none); and
+ /// n is the corresponding byte offset in the whole resource body.
+ /// However, some 206 (Partial Content) response bodies are stored (and
+ /// retrieved) as regular 200 response bodies, disregarding offsets of
+ /// their body parts. \sa HttpStateData::decideIfWeDoRanges().
int64_t endOffset () const;
/// sets baseReply().hdr_sz (i.e. written reply headers size) to endOffset()
diff --git a/src/MemStore.cc b/src/MemStore.cc
index 99806df..fb26eb0 100644
--- a/src/MemStore.cc
+++ b/src/MemStore.cc
@@ -17,6 +17,8 @@
#include "MemObject.h"
#include "MemStore.h"
#include "mime_header.h"
+#include "sbuf/SBuf.h"
+#include "sbuf/Stream.h"
#include "SquidConfig.h"
#include "SquidMath.h"
#include "StoreStats.h"
@@ -311,19 +313,25 @@ MemStore::get(const cache_key *key)
// create a brand new store entry and initialize it with stored info
StoreEntry *e = new StoreEntry();
- // XXX: We do not know the URLs yet, only the key, but we need to parse and
- // store the response for the Root().find() callers to be happy because they
- // expect IN_MEMORY entries to already have the response headers and body.
- e->createMemObject();
-
- anchorEntry(*e, index, *slot);
-
- const bool copied = copyFromShm(*e, index, *slot);
-
- if (copied)
- return e;
+ try {
+ // XXX: We do not know the URLs yet, only the key, but we need to parse and
+ // store the response for the Root().find() callers to be happy because they
+ // expect IN_MEMORY entries to already have the response headers and body.
+ e->createMemObject();
+
+ anchorEntry(*e, index, *slot);
+
+ // TODO: make copyFromShm() throw on all failures, simplifying this code
+ if (copyFromShm(*e, index, *slot))
+ return e;
+ debugs(20, 3, "failed for " << *e);
+ } catch (...) {
+ // see store_client::parseHttpHeadersFromDisk() for problems this may log
+ debugs(20, DBG_IMPORTANT, "ERROR: Cannot load a cache hit from shared memory" <<
+ Debug::Extra << "exception: " << CurrentException <<
+ Debug::Extra << "cache_mem entry: " << *e);
+ }
- debugs(20, 3, "failed for " << *e);
map->freeEntry(index); // do not let others into the same trap
destroyStoreEntry(static_cast<hash_link *>(e));
return NULL;
@@ -465,6 +473,8 @@ MemStore::copyFromShm(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnc
Ipc::StoreMapSliceId sid = anchor.start; // optimize: remember the last sid
bool wasEof = anchor.complete() && sid < 0;
int64_t sliceOffset = 0;
+
+ SBuf httpHeaderParsingBuffer;
while (sid >= 0) {
const Ipc::StoreMapSlice &slice = map->readableSlice(index, sid);
// slice state may change during copying; take snapshots now
@@ -487,10 +497,18 @@ MemStore::copyFromShm(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnc
const StoreIOBuffer sliceBuf(wasSize - prefixSize,
e.mem_obj->endOffset(),
page + prefixSize);
- if (!copyFromShmSlice(e, sliceBuf, wasEof))
- return false;
+
+ copyFromShmSlice(e, sliceBuf);
debugs(20, 8, "entry " << index << " copied slice " << sid <<
" from " << extra.page << '+' << prefixSize);
+
+ // parse headers if needed; they might span multiple slices!
+ auto &reply = e.mem().adjustableBaseReply();
+ if (reply.pstate != Http::Message::psParsed) {
+ httpHeaderParsingBuffer.append(sliceBuf.data, sliceBuf.length);
+ if (reply.parseTerminatedPrefix(httpHeaderParsingBuffer.c_str(), httpHeaderParsingBuffer.length()))
+ httpHeaderParsingBuffer = SBuf(); // we do not need these bytes anymore
+ }
}
// else skip a [possibly incomplete] slice that we copied earlier
@@ -516,6 +534,9 @@ MemStore::copyFromShm(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnc
debugs(20, 5, "mem-loaded all " << e.mem_obj->endOffset() << '/' <<
anchor.basics.swap_file_sz << " bytes of " << e);
+ if (e.mem().adjustableBaseReply().pstate != Http::Message::psParsed)
+ throw TextException(ToSBuf("truncated mem-cached headers; accumulated: ", httpHeaderParsingBuffer.length()), Here());
+
// from StoreEntry::complete()
e.mem_obj->object_sz = e.mem_obj->endOffset();
e.store_status = STORE_OK;
@@ -531,32 +552,11 @@ MemStore::copyFromShm(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnc
}
/// imports one shared memory slice into local memory
-bool
-MemStore::copyFromShmSlice(StoreEntry &e, const StoreIOBuffer &buf, bool eof)
+void
+MemStore::copyFromShmSlice(StoreEntry &e, const StoreIOBuffer &buf)
{
debugs(20, 7, "buf: " << buf.offset << " + " << buf.length);
- // from store_client::readBody()
- // parse headers if needed; they might span multiple slices!
- const auto rep = &e.mem().adjustableBaseReply();
- if (rep->pstate < Http::Message::psParsed) {
- // XXX: have to copy because httpMsgParseStep() requires 0-termination
- MemBuf mb;
- mb.init(buf.length+1, buf.length+1);
- mb.append(buf.data, buf.length);
- mb.terminate();
- const int result = rep->httpMsgParseStep(mb.buf, buf.length, eof);
- if (result > 0) {
- assert(rep->pstate == Http::Message::psParsed);
- } else if (result < 0) {
- debugs(20, DBG_IMPORTANT, "Corrupted mem-cached headers: " << e);
- return false;
- } else { // more slices are needed
- assert(!eof);
- }
- }
- debugs(20, 7, "rep pstate: " << rep->pstate);
-
// local memory stores both headers and body so copy regardless of pstate
const int64_t offBefore = e.mem_obj->endOffset();
assert(e.mem_obj->data_hdr.write(buf)); // from MemObject::write()
@@ -564,7 +564,6 @@ MemStore::copyFromShmSlice(StoreEntry &e, const StoreIOBuffer &buf, bool eof)
// expect to write the entire buf because StoreEntry::write() never fails
assert(offAfter >= 0 && offBefore <= offAfter &&
static_cast<size_t>(offAfter - offBefore) == buf.length);
- return true;
}
/// whether we should cache the entry
diff --git a/src/MemStore.h b/src/MemStore.h
index 306dc32..d7e8bf2 100644
--- a/src/MemStore.h
+++ b/src/MemStore.h
@@ -80,7 +80,7 @@ protected:
void copyToShm(StoreEntry &e);
void copyToShmSlice(StoreEntry &e, Ipc::StoreMapAnchor &anchor, Ipc::StoreMap::Slice &slice);
bool copyFromShm(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnchor &anchor);
- bool copyFromShmSlice(StoreEntry &e, const StoreIOBuffer &buf, bool eof);
+ void copyFromShmSlice(StoreEntry &, const StoreIOBuffer &);
void updateHeadersOrThrow(Ipc::StoreMapUpdate &update);
diff --git a/src/SquidMath.h b/src/SquidMath.h
index f4ec462..23b7525 100644
--- a/src/SquidMath.h
+++ b/src/SquidMath.h
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 1996-2022 The Squid Software Foundation and contributors
+ * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
*
* Squid software is distributed under GPLv2+ license and includes
* contributions from numerous individuals and organizations.
@@ -9,6 +9,14 @@
#ifndef _SQUID_SRC_SQUIDMATH_H
#define _SQUID_SRC_SQUIDMATH_H
+#include "base/forward.h"
+#include "base/TypeTraits.h"
+
+#include <limits>
+#include <optional>
+
+// TODO: Move to src/base/Math.h and drop the Math namespace
+
/* Math functions we define locally for Squid */
namespace Math
{
@@ -21,5 +29,164 @@ double doubleAverage(const double, const double, int, const int);
} // namespace Math
+// If Sum() performance becomes important, consider using GCC and clang
+// built-ins like __builtin_add_overflow() instead of manual overflow checks.
+
+/// detects a pair of unsigned types
+/// reduces code duplication in declarations further below
+template <typename T, typename U>
+using AllUnsigned = typename std::conditional<
+ std::is_unsigned<T>::value && std::is_unsigned<U>::value,
+ std::true_type,
+ std::false_type
+ >::type;
+
+// TODO: Replace with std::cmp_less() after migrating to C++20.
+/// whether integer a is less than integer b, with correct overflow handling
+template <typename A, typename B>
+constexpr bool
+Less(const A a, const B b) {
+ // The casts below make standard C++ integer conversions explicit. They
+ // quell compiler warnings about signed/unsigned comparison. The first two
+ // lines exclude different-sign a and b, making the casts/comparison safe.
+ using AB = typename std::common_type<A, B>::type;
+ return
+ (a >= 0 && b < 0) ? false :
+ (a < 0 && b >= 0) ? true :
+ /* (a >= 0) == (b >= 0) */ static_cast<AB>(a) < static_cast<AB>(b);
+}
+
+/// ensure that T is supported by NaturalSum() and friends
+template<typename T>
+constexpr void
+AssertNaturalType()
+{
+ static_assert(std::numeric_limits<T>::is_bounded, "std::numeric_limits<T>::max() is meaningful");
+ static_assert(std::numeric_limits<T>::is_exact, "no silent loss of precision");
+ static_assert(!std::is_enum<T>::value, "no silent creation of non-enumerated values");
+}
+
+// TODO: Investigate whether this optimization can be expanded to [signed] types
+// A and B when std::numeric_limits<decltype(A(0)+B(0))>::is_modulo is true.
+/// This IncreaseSumInternal() overload is optimized for speed.
+/// \returns a non-overflowing sum of the two unsigned arguments (or nothing)
+/// \prec both argument types are unsigned
+template <typename S, typename A, typename B, std::enable_if_t<AllUnsigned<A,B>::value, int> = 0>
+std::optional<S>
+IncreaseSumInternal(const A a, const B b) {
+ // paranoid: AllUnsigned<A,B> precondition established that already
+ static_assert(std::is_unsigned<A>::value, "AllUnsigned dispatch worked for A");
+ static_assert(std::is_unsigned<B>::value, "AllUnsigned dispatch worked for B");
+
+ AssertNaturalType<S>();
+ AssertNaturalType<A>();
+ AssertNaturalType<B>();
+
+ // we should only be called by IncreaseSum(); it forces integer promotion
+ static_assert(std::is_same<A, decltype(+a)>::value, "a will not be promoted");
+ static_assert(std::is_same<B, decltype(+b)>::value, "b will not be promoted");
+ // and without integer promotions, a sum of unsigned integers is unsigned
+ static_assert(std::is_unsigned<decltype(a+b)>::value, "a+b is unsigned");
+
+ // with integer promotions ruled out, a or b can only undergo integer
+ // conversion to the higher rank type (A or B, we do not know which)
+ using AB = typename std::common_type<A, B>::type;
+ static_assert(std::is_same<AB, A>::value || std::is_same<AB, B>::value, "no unexpected conversions");
+ static_assert(std::is_same<AB, decltype(a+b)>::value, "lossless assignment");
+ const AB sum = a + b;
+
+ static_assert(std::numeric_limits<AB>::is_modulo, "we can detect overflows");
+ // 1. modulo math: overflowed sum is smaller than any of its operands
+ // 2. the sum may overflow S (i.e. the return base type)
+ // We do not need Less() here because we compare promoted unsigned types.
+ return (sum >= a && sum <= std::numeric_limits<S>::max()) ?
+ std::optional<S>(sum) : std::optional<S>();
+}
+
+/// This IncreaseSumInternal() overload supports a larger variety of types.
+/// \returns a non-overflowing sum of the two arguments (or nothing)
+/// \returns nothing if at least one of the arguments is negative
+/// \prec at least one of the argument types is signed
+template <typename S, typename A, typename B, std::enable_if_t<!AllUnsigned<A,B>::value, int> = 0>
+std::optional<S> constexpr
+IncreaseSumInternal(const A a, const B b) {
+ AssertNaturalType<S>();
+ AssertNaturalType<A>();
+ AssertNaturalType<B>();
+
+ // we should only be called by IncreaseSum() that does integer promotion
+ static_assert(std::is_same<A, decltype(+a)>::value, "a will not be promoted");
+ static_assert(std::is_same<B, decltype(+b)>::value, "b will not be promoted");
+
+ return
+ // We could support a non-under/overflowing sum of negative numbers, but
+ // our callers use negative values specially (e.g., for do-not-use or
+ // do-not-limit settings) and are not supposed to do math with them.
+ (a < 0 || b < 0) ? std::optional<S>() :
+ // To avoid undefined behavior of signed overflow, we must not compute
+ // the raw a+b sum if it may overflow. When A is not B, a or b undergoes
+ // (safe for non-negatives) integer conversion in these expressions, so
+ // we do not know the resulting a+b type AB and its maximum. We must
+ // also detect subsequent casting-to-S overflows.
+ // Overflow condition: (a + b > maxAB) or (a + b > maxS).
+ // A is an integer promotion of S, so maxS <= maxA <= maxAB.
+ // Since maxS <= maxAB, it is sufficient to just check: a + b > maxS,
+ // which is the same as the overflow-safe condition here: maxS - a < b.
+ // Finally, (maxS - a) cannot overflow because a is not negative and
+ // cannot underflow because a is a promotion of s: 0 <= a <= maxS.
+ Less(std::numeric_limits<S>::max() - a, b) ? std::optional<S>() :
+ std::optional<S>(a + b);
+}
+
+/// argument pack expansion termination for IncreaseSum<S, T, Args...>()
+template <typename S, typename T>
+std::optional<S>
+IncreaseSum(const S s, const T t)
+{
+ // Force (always safe) integer promotions now, to give std::enable_if_t<>
+ // promoted types instead of entering IncreaseSumInternal<AllUnsigned>(s,t)
+ // but getting a _signed_ promoted value of s or t in s + t.
+ return IncreaseSumInternal<S>(+s, +t);
+}
+
+/// \returns a non-overflowing sum of the arguments (or nothing)
+template <typename S, typename T, typename... Args>
+std::optional<S>
+IncreaseSum(const S sum, const T t, const Args... args) {
+ if (const auto head = IncreaseSum(sum, t)) {
+ return IncreaseSum(head.value(), args...);
+ } else {
+ // std::optional<S>() triggers bogus -Wmaybe-uninitialized warnings in GCC v10.3
+ return std::nullopt;
+ }
+}
+
+/// \returns an exact, non-overflowing sum of the arguments (or nothing)
+template <typename SummationType, typename... Args>
+std::optional<SummationType>
+NaturalSum(const Args... args) {
+ return IncreaseSum<SummationType>(0, args...);
+}
+
+/// Safely resets the given variable to NaturalSum() of the given arguments.
+/// If the sum overflows, resets to variable's maximum possible value.
+/// \returns the new variable value (like an assignment operator would)
+template <typename S, typename... Args>
+S
+SetToNaturalSumOrMax(S &var, const Args... args)
+{
+ var = NaturalSum<S>(args...).value_or(std::numeric_limits<S>::max());
+ return var;
+}
+
+/// converts a given non-negative integer into an integer of a given type
+/// without loss of information or undefined behavior
+template <typename Result, typename Source>
+Result
+NaturalCast(const Source s)
+{
+ return NaturalSum<Result>(s).value();
+}
+
#endif /* _SQUID_SRC_SQUIDMATH_H */
diff --git a/src/StoreClient.h b/src/StoreClient.h
index a046ac9..484b413 100644
--- a/src/StoreClient.h
+++ b/src/StoreClient.h
@@ -10,11 +10,25 @@
#define SQUID_STORECLIENT_H
#include "base/forward.h"
+#include "base/AsyncCall.h"
#include "dlink.h"
+#include "store/ParsingBuffer.h"
#include "StoreIOBuffer.h"
#include "StoreIOState.h"
-typedef void STCB(void *, StoreIOBuffer); /* store callback */
+/// A storeClientCopy() callback function.
+///
+/// Upon storeClientCopy() success, StoreIOBuffer::flags.error is zero, and
+/// * HTTP response headers (if any) are available via MemObject::freshestReply();
+/// * HTTP response body bytes (if any) are available via StoreIOBuffer.
+///
+/// STCB callbacks may use response semantics to detect certain EOF conditions.
+/// Callbacks that expect HTTP headers may call store_client::atEof(). Similar
+/// to clientStreamCallback() callbacks, callbacks dedicated to receiving HTTP
+/// bodies may use zero StoreIOBuffer::length as an EOF condition.
+///
+/// Errors are indicated by setting StoreIOBuffer flags.error.
+using STCB = void (void *, StoreIOBuffer);
class StoreEntry;
class ACLFilledChecklist;
@@ -67,17 +81,34 @@ class store_client
public:
store_client(StoreEntry *);
~store_client();
- bool memReaderHasLowerOffset(int64_t) const;
+
+ /// the client will not use HTTP response bytes with lower offsets (if any)
+ auto discardableHttpEnd() const { return discardableHttpEnd_; }
+
int getType() const;
- void fail();
- void callback(ssize_t len, bool error = false);
+
+ /// React to the end of reading the response from disk. There will be no
+ /// more readHeader() and readBody() callbacks for the current storeRead()
+ /// swapin after this notification.
+ void noteSwapInDone(bool error);
+
void doCopy (StoreEntry *e);
void readHeader(const char *buf, ssize_t len);
void readBody(const char *buf, ssize_t len);
+
+ /// Request StoreIOBuffer-described response data via an asynchronous STCB
+ /// callback. At most one outstanding request is allowed per store_client.
void copy(StoreEntry *, StoreIOBuffer, STCB *, void *);
+
void dumpStats(MemBuf * output, int clientNumber) const;
- int64_t cmp_offset;
+ // TODO: When STCB gets a dedicated Answer type, move this info there.
+ /// Whether the last successful storeClientCopy() answer was known to
+ /// contain the last body bytes of the HTTP response
+ /// \retval true requesting bytes at higher offsets is futile
+ /// \sa STCB
+ bool atEof() const { return atEof_; }
+
#if STORE_CLIENT_LIST_DEBUG
void *owner;
@@ -87,33 +118,86 @@ public:
StoreIOState::Pointer swapin_sio;
struct {
+ /// whether we are expecting a response to be swapped in from disk
+ /// (i.e. whether async storeRead() is currently in progress)
+ // TODO: a better name reflecting the 'in' scope of the flag
bool disk_io_pending;
+
+ /// whether the store_client::doCopy()-initiated STCB sequence is
+ /// currently in progress
bool store_copying;
- bool copy_event_pending;
} flags;
#if USE_DELAY_POOLS
DelayId delayId;
+
+ /// The maximum number of bytes the Store client can read/copy next without
+ /// overflowing its buffer and without violating delay pool limits. Store
+ /// I/O is not rate-limited, but we assume that the same number of bytes may
+ /// be read from the Squid-to-server connection that may be rate-limited.
+ int bytesWanted() const;
+
void setDelayId(DelayId delay_id);
#endif
dlink_node node;
- /* Below here is private - do no alter outside storeClient calls */
- StoreIOBuffer copyInto;
private:
- bool moreToSend() const;
+ bool moreToRead() const;
+ bool canReadFromMemory() const;
+ bool answeredOnce() const { return answers >= 1; }
+ bool sendingHttpHeaders() const;
+ int64_t nextHttpReadOffset() const;
void fileRead();
void scheduleDiskRead();
- void scheduleMemRead();
+ void readFromMemory();
void scheduleRead();
bool startSwapin();
bool unpackHeader(char const *buf, ssize_t len);
+ void handleBodyFromDisk();
+ void maybeWriteFromDiskToMemory(const StoreIOBuffer &);
+
+ bool parseHttpHeadersFromDisk();
+ bool tryParsingHttpHeaders();
+ void skipHttpHeadersFromDisk();
+
+ void fail();
+ void callback(ssize_t);
+ void noteCopiedBytes(size_t);
+ void noteNews();
+ void finishCallback();
+ static void FinishCallback(store_client *);
int type;
bool object_ok;
+ /// \copydoc atEof()
+ bool atEof_;
+
+ /// Storage and metadata associated with the current copy() request. Ought
+ /// to be ignored when not answering a copy() request.
+ /// * copyInto.offset is the requested HTTP response body offset;
+ /// * copyInto.data is the client-owned, client-provided result buffer;
+ /// * copyInto.length is the size of the .data result buffer;
+ /// * copyInto.flags are unused by this class.
+ StoreIOBuffer copyInto;
+
+ // TODO: Convert to uint64_t after fixing mem_hdr::endOffset() and friends.
+ /// \copydoc discardableHttpEnd()
+ int64_t discardableHttpEnd_ = 0;
+
+ /// the total number of finishCallback() calls
+ uint64_t answers;
+
+ /// Accumulates raw bytes read from Store while answering the current copy()
+ /// request. Buffer contents depends on the source and parsing stage; it may
+ /// hold (parts of) swap metadata, HTTP response headers, and/or HTTP
+ /// response body bytes.
+ std::optional<Store::ParsingBuffer> parsingBuffer;
+
+ StoreIOBuffer lastDiskRead; ///< buffer used for the last storeRead() call
+
/* Until we finish stuffing code into store_client */
public:
@@ -122,14 +206,33 @@ public:
Callback ():callback_handler(NULL), callback_data(NULL) {}
Callback (STCB *, void *);
+
+ /// Whether the copy() answer is needed/expected (by the client) and has
+ /// not been computed (by us). False during (asynchronous) answer
+ /// delivery to the STCB callback_handler.
bool pending() const;
+
STCB *callback_handler;
void *callback_data;
CodeContextPointer codeContext; ///< Store client context
+
+ /// a scheduled asynchronous finishCallback() call (or nil)
+ AsyncCall::Pointer notifier;
} _callback;
};
+/// Asynchronously read HTTP response headers and/or body bytes from Store.
+///
+/// The requested zero-based HTTP body offset is specified via the
+/// StoreIOBuffer::offset field. The first call (for a given store_client
+/// object) must specify zero offset.
+///
+/// The requested HTTP body portion size is specified via the
+/// StoreIOBuffer::length field. The function may return fewer body bytes.
+///
+/// See STCB for result delivery details.
void storeClientCopy(store_client *, StoreEntry *, StoreIOBuffer, STCB *, void *);
+
store_client* storeClientListAdd(StoreEntry * e, void *data);
int storeClientCopyPending(store_client *, StoreEntry * e, void *data);
int storeUnregister(store_client * sc, StoreEntry * e, void *data);
diff --git a/src/StoreIOBuffer.h b/src/StoreIOBuffer.h
index 859eda6..984713a 100644
--- a/src/StoreIOBuffer.h
+++ b/src/StoreIOBuffer.h
@@ -43,6 +43,9 @@ public:
return Range<int64_t>(offset, offset + length);
}
+ /// convenience method for changing the offset of a being-configured buffer
+ StoreIOBuffer &positionAt(const int64_t newOffset) { offset = newOffset; return *this; }
+
void dump() const {
if (fwrite(data, length, 1, stderr)) {}
if (fwrite("\n", 1, 1, stderr)) {}
diff --git a/src/acl/Asn.cc b/src/acl/Asn.cc
index ea9c065..629bf23 100644
--- a/src/acl/Asn.cc
+++ b/src/acl/Asn.cc
@@ -16,22 +16,21 @@
#include "acl/DestinationIp.h"
#include "acl/SourceAsn.h"
#include "acl/Strategised.h"
+#include "base/CharacterSet.h"
#include "FwdState.h"
#include "HttpReply.h"
#include "HttpRequest.h"
#include "ipcache.h"
#include "MasterXaction.h"
#include "mgr/Registration.h"
+#include "parser/Tokenizer.h"
#include "radix.h"
#include "RequestFlags.h"
+#include "sbuf/SBuf.h"
#include "SquidConfig.h"
#include "Store.h"
#include "StoreClient.h"
-#ifndef AS_REQBUF_SZ
-#define AS_REQBUF_SZ 4096
-#endif
-
/* BEGIN of definitions for radix tree entries */
/* 32/128 bits address in memory with length */
@@ -71,9 +70,8 @@ class ASState
CBDATA_CLASS(ASState);
public:
- ASState() {
- memset(reqbuf, 0, sizeof(reqbuf));
- }
+ ASState() = default;
+
~ASState() {
if (entry) {
debugs(53, 3, entry->url());
@@ -87,10 +85,9 @@ public:
store_client *sc = nullptr;
HttpRequest::Pointer request;
int as_number = 0;
- int64_t offset = 0;
- int reqofs = 0;
- char reqbuf[AS_REQBUF_SZ];
- bool dataRead = false;
+
+ /// for receiving a WHOIS reply body from Store and interpreting it
+ Store::ParsingBuffer parsingBuffer;
};
CBDATA_CLASS_INIT(ASState);
@@ -103,7 +100,7 @@ struct rtentry_t {
m_ADDR e_mask;
};
-static int asnAddNet(char *, int);
+static int asnAddNet(const SBuf &, int);
static void asnCacheStart(int as);
@@ -258,8 +255,7 @@ asnCacheStart(int as)
xfree(asres);
asState->entry = e;
- StoreIOBuffer readBuffer (AS_REQBUF_SZ, asState->offset, asState->reqbuf);
- storeClientCopy(asState->sc, e, readBuffer, asHandleReply, asState);
+ storeClientCopy(asState->sc, e, asState->parsingBuffer.makeInitialSpace(), asHandleReply, asState);
}
static void
@@ -267,13 +263,8 @@ asHandleReply(void *data, StoreIOBuffer result)
{
ASState *asState = (ASState *)data;
StoreEntry *e = asState->entry;
- char *s;
- char *t;
- char *buf = asState->reqbuf;
- int leftoversz = -1;
- debugs(53, 3, "asHandleReply: Called with size=" << (unsigned int)result.length);
- debugs(53, 3, "asHandleReply: buffer='" << buf << "'");
+ debugs(53, 3, result << " for " << asState->as_number << " with " << *e);
/* First figure out whether we should abort the request */
@@ -282,11 +273,7 @@ asHandleReply(void *data, StoreIOBuffer result)
return;
}
- if (result.length == 0 && asState->dataRead) {
- debugs(53, 3, "asHandleReply: Done: " << e->url());
- delete asState;
- return;
- } else if (result.flags.error) {
+ if (result.flags.error) {
debugs(53, DBG_IMPORTANT, "asHandleReply: Called with Error set and size=" << (unsigned int) result.length);
delete asState;
return;
@@ -296,117 +283,85 @@ asHandleReply(void *data, StoreIOBuffer result)
return;
}
- /*
- * Next, attempt to parse our request
- * Remembering that the actual buffer size is retsize + reqofs!
- */
- s = buf;
+ asState->parsingBuffer.appended(result.data, result.length);
+ Parser::Tokenizer tok(SBuf(asState->parsingBuffer.content().data, asState->parsingBuffer.contentSize()));
+ SBuf address;
+ // Word delimiters in WHOIS ASN replies. RFC 3912 mentions SP, CR, and LF.
+ // Others are added to mimic an earlier isspace()-based implementation.
+ static const auto WhoisSpaces = CharacterSet("ASCII_spaces", " \f\r\n\t\v");
+ while (tok.token(address, WhoisSpaces)) {
+ (void)asnAddNet(address, asState->as_number);
+ }
+ asState->parsingBuffer.consume(tok.parsedSize());
+ const auto leftoverBytes = asState->parsingBuffer.contentSize();
- while ((size_t)(s - buf) < result.length + asState->reqofs && *s != '\0') {
- while (*s && xisspace(*s))
- ++s;
+ if (asState->sc->atEof()) {
+ if (leftoverBytes)
+ debugs(53, 2, "WHOIS: Discarding the last " << leftoverBytes << " received bytes of a truncated AS response");
+ delete asState;
+ return;
+ }
- for (t = s; *t; ++t) {
- if (xisspace(*t))
- break;
- }
+ if (asState->sc->atEof()) {
+ if (leftoverBytes)
+ debugs(53, 2, "WHOIS: Discarding the last " << leftoverBytes << " received bytes of a truncated AS response");
+ delete asState;
+ return;
+ }
- if (*t == '\0') {
- /* oof, word should continue on next block */
- break;
- }
+ const auto remainingSpace = asState->parsingBuffer.space().positionAt(result.offset + result.length);
- *t = '\0';
- debugs(53, 3, "asHandleReply: AS# " << s << " (" << asState->as_number << ")");
- asnAddNet(s, asState->as_number);
- s = t + 1;
- asState->dataRead = true;
+ if (!remainingSpace.length) {
+ Assure(leftoverBytes);
+ debugs(53, DBG_IMPORTANT, "WARNING: Ignoring the tail of a WHOIS AS response" <<
+ " with an unparsable section of " << leftoverBytes <<
+ " bytes ending at offset " << remainingSpace.offset);
+ delete asState;
+ return;
}
- /*
- * Next, grab the end of the 'valid data' in the buffer, and figure
- * out how much data is left in our buffer, which we need to keep
- * around for the next request
- */
- leftoversz = (asState->reqofs + result.length) - (s - buf);
-
- assert(leftoversz >= 0);
-
- /*
- * Next, copy the left over data, from s to s + leftoversz to the
- * beginning of the buffer
- */
- memmove(buf, s, leftoversz);
-
- /*
- * Next, update our offset and reqofs, and kick off a copy if required
- */
- asState->offset += result.length;
-
- asState->reqofs = leftoversz;
-
- debugs(53, 3, "asState->offset = " << asState->offset);
-
- if (e->store_status == STORE_PENDING) {
- debugs(53, 3, "asHandleReply: store_status == STORE_PENDING: " << e->url() );
- StoreIOBuffer tempBuffer (AS_REQBUF_SZ - asState->reqofs,
- asState->offset,
- asState->reqbuf + asState->reqofs);
- storeClientCopy(asState->sc,
- e,
- tempBuffer,
- asHandleReply,
- asState);
- } else {
- StoreIOBuffer tempBuffer;
- debugs(53, 3, "asHandleReply: store complete, but data received " << e->url() );
- tempBuffer.offset = asState->offset;
- tempBuffer.length = AS_REQBUF_SZ - asState->reqofs;
- tempBuffer.data = asState->reqbuf + asState->reqofs;
- storeClientCopy(asState->sc,
- e,
- tempBuffer,
- asHandleReply,
- asState);
- }
+ const decltype(StoreIOBuffer::offset) stillReasonableOffset = 100000; // an arbitrary limit in bytes
+ if (remainingSpace.offset > stillReasonableOffset) {
+ // stop suspicious accumulation of parsed addresses and/or work
+ debugs(53, DBG_IMPORTANT, "WARNING: Ignoring the tail of a suspiciously large WHOIS AS response" <<
+ " exceeding " << stillReasonableOffset << " bytes");
+ delete asState;
+ return;
+ }
+
+ storeClientCopy(asState->sc, e, remainingSpace, asHandleReply, asState);
}
/**
* add a network (addr, mask) to the radix tree, with matching AS number
*/
static int
-asnAddNet(char *as_string, int as_number)
+asnAddNet(const SBuf &addressAndMask, const int as_number)
{
struct squid_radix_node *rn;
CbDataList<int> **Tail = NULL;
CbDataList<int> *q = NULL;
as_info *asinfo = NULL;
- Ip::Address mask;
- Ip::Address addr;
- char *t;
- int bitl;
-
- t = strchr(as_string, '/');
-
- if (t == NULL) {
+ static const CharacterSet NonSlashSet = CharacterSet("slash", "/").complement("non-slash");
+ Parser::Tokenizer tok(addressAndMask);
+ SBuf addressToken;
+ if (!(tok.prefix(addressToken, NonSlashSet) && tok.skip('/'))) {
debugs(53, 3, "asnAddNet: failed, invalid response from whois server.");
return 0;
}
- *t = '\0';
- addr = as_string;
- bitl = atoi(t + 1);
-
- if (bitl < 0)
- bitl = 0;
+ const Ip::Address addr = addressToken.c_str();
// INET6 TODO : find a better way of identifying the base IPA family for mask than this.
- t = strchr(as_string, '.');
+ const auto addrFamily = (addressToken.find('.') != SBuf::npos) ? AF_INET : AF_INET6;
// generate Netbits Format Mask
+ Ip::Address mask;
mask.setNoAddr();
- mask.applyMask(bitl, (t!=NULL?AF_INET:AF_INET6) );
+ int64_t bitl = 0;
+ if (tok.int64(bitl, 10, false))
+ mask.applyMask(bitl, addrFamily);
debugs(53, 3, "asnAddNet: called for " << addr << "/" << mask );
diff --git a/src/base/Assure.cc b/src/base/Assure.cc
new file mode 100644
index 0000000..cb69fc5
--- /dev/null
+++ b/src/base/Assure.cc
@@ -0,0 +1,24 @@
+/*
+ * Copyright (C) 1996-2022 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#include "squid.h"
+#include "base/Assure.h"
+#include "base/TextException.h"
+#include "sbuf/Stream.h"
+
+[[ noreturn ]] void
+ReportAndThrow_(const int debugLevel, const char *description, const SourceLocation &location)
+{
+ const TextException ex(description, location);
+ const auto label = debugLevel <= DBG_IMPORTANT ? "ERROR: Squid BUG: " : "";
+ // TODO: Consider also printing the number of BUGs reported so far. It would
+ // require GC, but we could even print the number of same-location reports.
+ debugs(0, debugLevel, label << ex);
+ throw ex;
+}
+
diff --git a/src/base/Assure.h b/src/base/Assure.h
new file mode 100644
index 0000000..bb571d2
--- /dev/null
+++ b/src/base/Assure.h
@@ -0,0 +1,52 @@
+/*
+ * Copyright (C) 1996-2022 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#ifndef SQUID_SRC_BASE_ASSURE_H
+#define SQUID_SRC_BASE_ASSURE_H
+
+#include "base/Here.h"
+
+/// Reports the description (at the given debugging level) and throws
+/// the corresponding exception. Reduces compiled code size of Assure() and
+/// Must() callers. Do not call directly; use Assure() instead.
+/// \param description explains the condition (i.e. what MUST happen)
+[[ noreturn ]] void ReportAndThrow_(int debugLevel, const char *description, const SourceLocation &);
+
+/// Calls ReportAndThrow() if needed. Reduces caller code duplication.
+/// Do not call directly; use Assure() instead.
+/// \param description c-string explaining the condition (i.e. what MUST happen)
+#define Assure_(debugLevel, condition, description, location) \
+ while (!(condition)) \
+ ReportAndThrow_((debugLevel), (description), (location))
+
+#if !defined(NDEBUG)
+
+/// Like assert() but throws an exception instead of aborting the process. Use
+/// this macro to detect code logic mistakes (i.e. bugs) where aborting the
+/// current AsyncJob or a similar task is unlikely to jeopardize Squid service
+/// integrity. For example, this macro is _not_ appropriate for detecting bugs
+/// that indicate a dangerous global state corruption which may go unnoticed by
+/// other jobs after the current job or task is aborted.
+#define Assure(condition) \
+ Assure2((condition), #condition)
+
+/// Like Assure() but allows the caller to customize the exception message.
+/// \param description string literal describing the condition (i.e. what MUST happen)
+#define Assure2(condition, description) \
+ Assure_(0, (condition), ("assurance failed: " description), Here())
+
+#else
+
+/* do-nothing implementations for NDEBUG builds */
+#define Assure(condition) ((void)0)
+#define Assure2(condition, description) ((void)0)
+
+#endif /* NDEBUG */
+
+#endif /* SQUID_SRC_BASE_ASSURE_H */
+
diff --git a/src/base/Makefile.am b/src/base/Makefile.am
index a2b4527..4602998 100644
--- a/src/base/Makefile.am
+++ b/src/base/Makefile.am
@@ -11,6 +11,8 @@ include $(top_srcdir)/src/TestHeaders.am
noinst_LTLIBRARIES = libbase.la
libbase_la_SOURCES = \
+ Assure.cc \
+ Assure.h \
AsyncCall.cc \
AsyncCall.h \
AsyncCallQueue.cc \
diff --git a/src/base/Makefile.in b/src/base/Makefile.in
index 66ad8a4..a578941 100644
--- a/src/base/Makefile.in
+++ b/src/base/Makefile.in
@@ -164,7 +164,7 @@ CONFIG_CLEAN_FILES =
CONFIG_CLEAN_VPATH_FILES =
LTLIBRARIES = $(noinst_LTLIBRARIES)
libbase_la_LIBADD =
-am_libbase_la_OBJECTS = AsyncCall.lo AsyncCallQueue.lo AsyncJob.lo \
+am_libbase_la_OBJECTS = Assure.lo AsyncCall.lo AsyncCallQueue.lo AsyncJob.lo \
CharacterSet.lo CodeContext.lo File.lo Here.lo InstanceId.lo \
JobWait.lo RegexPattern.lo RunnersRegistry.lo TextException.lo
libbase_la_OBJECTS = $(am_libbase_la_OBJECTS)
@@ -187,7 +187,7 @@ am__v_at_1 =
DEFAULT_INCLUDES =
depcomp = $(SHELL) $(top_srcdir)/cfgaux/depcomp
am__maybe_remake_depfiles = depfiles
-am__depfiles_remade = ./$(DEPDIR)/AsyncCall.Plo \
+am__depfiles_remade = ./$(DEPDIR)/Assure.Plo ./$(DEPDIR)/AsyncCall.Plo \
./$(DEPDIR)/AsyncCallQueue.Plo ./$(DEPDIR)/AsyncJob.Plo \
./$(DEPDIR)/CharacterSet.Plo ./$(DEPDIR)/CodeContext.Plo \
./$(DEPDIR)/File.Plo ./$(DEPDIR)/Here.Plo \
@@ -738,6 +738,8 @@ COMPAT_LIB = $(top_builddir)/compat/libcompatsquid.la $(LIBPROFILER)
subst_perlshell = sed -e 's,[@]PERL[@],$(PERL),g' <$(srcdir)/$@.pl.in >$@ || ($(RM) -f $@ ; exit 1)
noinst_LTLIBRARIES = libbase.la
libbase_la_SOURCES = \
+ Assure.cc \
+ Assure.h \
AsyncCall.cc \
AsyncCall.h \
AsyncCallQueue.cc \
@@ -846,6 +848,7 @@ mostlyclean-compile:
distclean-compile:
-rm -f *.tab.c
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Assure.Plo@am__quote@ # am--include-marker
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/AsyncCall.Plo@am__quote@ # am--include-marker
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/AsyncCallQueue.Plo@am__quote@ # am--include-marker
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/AsyncJob.Plo@am__quote@ # am--include-marker
@@ -1188,7 +1191,8 @@ clean-am: clean-checkPROGRAMS clean-generic clean-libtool \
clean-noinstLTLIBRARIES mostlyclean-am
distclean: distclean-am
- -rm -f ./$(DEPDIR)/AsyncCall.Plo
+ -rm -f ./$(DEPDIR)/Assure.Plo
+ -rm -f ./$(DEPDIR)/AsyncCall.Plo
-rm -f ./$(DEPDIR)/AsyncCallQueue.Plo
-rm -f ./$(DEPDIR)/AsyncJob.Plo
-rm -f ./$(DEPDIR)/CharacterSet.Plo
@@ -1245,7 +1249,8 @@ install-ps-am:
installcheck-am:
maintainer-clean: maintainer-clean-am
- -rm -f ./$(DEPDIR)/AsyncCall.Plo
+ -rm -f ./$(DEPDIR)/Assure.Plo
+ -rm -f ./$(DEPDIR)/AsyncCall.Plo
-rm -f ./$(DEPDIR)/AsyncCallQueue.Plo
-rm -f ./$(DEPDIR)/AsyncJob.Plo
-rm -f ./$(DEPDIR)/CharacterSet.Plo
diff --git a/src/base/TextException.h b/src/base/TextException.h
index 412783e..e96cede 100644
--- a/src/base/TextException.h
+++ b/src/base/TextException.h
@@ -9,6 +9,7 @@
#ifndef SQUID__TEXTEXCEPTION_H
#define SQUID__TEXTEXCEPTION_H
+#include "base/Assure.h"
#include "base/Here.h"
#include <stdexcept>
@@ -57,19 +58,16 @@ std::ostream &operator <<(std::ostream &, const TextException &);
/// legacy convenience macro; it is not difficult to type Here() now
#define TexcHere(msg) TextException((msg), Here())
-/// Like assert() but throws an exception instead of aborting the process
-/// and allows the caller to customize the exception message and location.
+/// Like Must() but supports custom exception message and location.
/// \param description string literal describing the condition; what MUST happen
+/// Deprecated: Use Assure2() for code logic checks and throw explicitly when
+/// input validation fails.
#define Must3(condition, description, location) \
- do { \
- if (!(condition)) { \
- const TextException Must_ex_(("check failed: " description), (location)); \
- debugs(0, 3, Must_ex_.what()); \
- throw Must_ex_; \
- } \
- } while (/*CONSTCOND*/ false)
-
-/// Like assert() but throws an exception instead of aborting the process.
+ Assure_(3, (condition), ("check failed: " description), (location))
+
+/// Like Assure() but only logs the exception if level-3 debugging is enabled
+/// and runs even when NDEBUG macro is defined. Deprecated: Use Assure() for
+/// code logic checks and throw explicitly when input validation fails.
#define Must(condition) Must3((condition), #condition, Here())
/// Reports and swallows all exceptions to prevent compiler warnings and runtime
diff --git a/src/clientStream.cc b/src/clientStream.cc
index 7aadb83..06ab8b1 100644
--- a/src/clientStream.cc
+++ b/src/clientStream.cc
@@ -154,8 +154,7 @@ clientStreamCallback(clientStreamNode * thisObject, ClientHttpRequest * http,
assert(thisObject && http && thisObject->node.next);
next = thisObject->next();
- debugs(87, 3, "clientStreamCallback: Calling " << next->callback << " with cbdata " <<
- next->data.getRaw() << " from node " << thisObject);
+ debugs(87, 3, thisObject << " gives " << next->data << ' ' << replyBuffer);
next->callback(next, http, rep, replyBuffer);
}
diff --git a/src/client_side_reply.cc b/src/client_side_reply.cc
index baf0612..27746c6 100644
--- a/src/client_side_reply.cc
+++ b/src/client_side_reply.cc
@@ -34,6 +34,7 @@
#include "RequestFlags.h"
#include "SquidConfig.h"
#include "SquidTime.h"
+#include "SquidMath.h"
#include "Store.h"
#include "StrList.h"
#include "tools.h"
@@ -76,11 +77,7 @@ clientReplyContext::clientReplyContext(ClientHttpRequest *clientContext) :
purgeStatus(Http::scNone),
lookingforstore(0),
http(cbdataReference(clientContext)),
- headers_sz(0),
sc(NULL),
- old_reqsize(0),
- reqsize(0),
- reqofs(0),
#if USE_CACHE_DIGESTS
lookup_type(NULL),
#endif
@@ -166,8 +163,6 @@ void clientReplyContext::setReplyToStoreEntry(StoreEntry *entry, const char *rea
#if USE_DELAY_POOLS
sc->setDelayId(DelayId::DelayClient(http));
#endif
- reqofs = 0;
- reqsize = 0;
if (http->request)
http->request->ignoreRange(reason);
flags.storelogiccomplete = 1;
@@ -206,13 +201,10 @@ clientReplyContext::saveState()
old_sc = sc;
old_lastmod = http->request->lastmod;
old_etag = http->request->etag;
- old_reqsize = reqsize;
- tempBuffer.offset = reqofs;
+
/* Prevent accessing the now saved entries */
http->storeEntry(NULL);
sc = NULL;
- reqsize = 0;
- reqofs = 0;
}
void
@@ -223,8 +215,6 @@ clientReplyContext::restoreState()
removeClientStoreReference(&sc, http);
http->storeEntry(old_entry);
sc = old_sc;
- reqsize = old_reqsize;
- reqofs = tempBuffer.offset;
http->request->lastmod = old_lastmod;
http->request->etag = old_etag;
/* Prevent accessed the old saved entries */
@@ -232,7 +222,7 @@ clientReplyContext::restoreState()
old_sc = NULL;
old_lastmod = -1;
old_etag.clean();
- old_reqsize = 0;
+
tempBuffer.offset = 0;
}
@@ -250,18 +240,27 @@ clientReplyContext::getNextNode() const
return (clientStreamNode *)ourNode->node.next->data;
}
-/* This function is wrong - the client parameters don't include the
- * header offset
- */
+/// Request HTTP response headers from Store, to be sent to the given recipient.
+/// That recipient also gets zero, some, or all HTTP response body bytes (into
+/// next()->readBuffer).
void
-clientReplyContext::triggerInitialStoreRead()
+clientReplyContext::triggerInitialStoreRead(STCB recipient)
{
- /* when confident, 0 becomes reqofs, and then this factors into
- * startSendProcess
- */
- assert(reqofs == 0);
+ Assure(recipient != HandleIMSReply);
+ lastStreamBufferedBytes = StoreIOBuffer(); // storeClientCopy(next()->readBuffer) invalidates
StoreIOBuffer localTempBuffer (next()->readBuffer.length, 0, next()->readBuffer.data);
- storeClientCopy(sc, http->storeEntry(), localTempBuffer, SendMoreData, this);
+ ::storeClientCopy(sc, http->storeEntry(), localTempBuffer, recipient, this);
+}
+
+/// Request HTTP response body bytes from Store into next()->readBuffer. This
+/// method requests body bytes at readerBuffer.offset and, hence, it should only
+/// be called after we triggerInitialStoreRead() and get the requested HTTP
+/// response headers (using zero offset).
+void
+clientReplyContext::requestMoreBodyFromStore()
+{
+ lastStreamBufferedBytes = StoreIOBuffer(); // storeClientCopy(next()->readBuffer) invalidates
+ ::storeClientCopy(sc, http->storeEntry(), next()->readBuffer, SendMoreData, this);
}
/* there is an expired entry in the store.
@@ -368,30 +367,23 @@ clientReplyContext::processExpired()
{
/* start counting the length from 0 */
StoreIOBuffer localTempBuffer(HTTP_REQBUF_SZ, 0, tempbuf);
- storeClientCopy(sc, entry, localTempBuffer, HandleIMSReply, this);
+ // keep lastStreamBufferedBytes: tempbuf is not a Client Stream buffer
+ ::storeClientCopy(sc, entry, localTempBuffer, HandleIMSReply, this);
}
}
void
-clientReplyContext::sendClientUpstreamResponse()
+clientReplyContext::sendClientUpstreamResponse(const StoreIOBuffer &upstreamResponse)
{
- StoreIOBuffer tempresult;
removeStoreReference(&old_sc, &old_entry);
if (collapsedRevalidation)
http->storeEntry()->clearPublicKeyScope();
/* here the data to send is the data we just received */
- tempBuffer.offset = 0;
- old_reqsize = 0;
- /* sendMoreData tracks the offset as well.
- * Force it back to zero */
- reqofs = 0;
assert(!EBIT_TEST(http->storeEntry()->flags, ENTRY_ABORTED));
- /* TODO: provide sendMoreData with the ready parsed reply */
- tempresult.length = reqsize;
- tempresult.data = tempbuf;
- sendMoreData(tempresult);
+
+ sendMoreData(upstreamResponse);
}
void
@@ -408,11 +400,9 @@ clientReplyContext::sendClientOldEntry()
restoreState();
/* here the data to send is in the next nodes buffers already */
assert(!EBIT_TEST(http->storeEntry()->flags, ENTRY_ABORTED));
- /* sendMoreData tracks the offset as well.
- * Force it back to zero */
- reqofs = 0;
- StoreIOBuffer tempresult (reqsize, reqofs, next()->readBuffer.data);
- sendMoreData(tempresult);
+ Assure(matchesStreamBodyBuffer(lastStreamBufferedBytes));
+ Assure(!lastStreamBufferedBytes.offset);
+ sendMoreData(lastStreamBufferedBytes);
}
/* This is the workhorse of the HandleIMSReply callback.
@@ -426,11 +416,11 @@ clientReplyContext::handleIMSReply(StoreIOBuffer result)
if (deleting)
return;
- debugs(88, 3, http->storeEntry()->url() << ", " << (long unsigned) result.length << " bytes");
-
if (http->storeEntry() == NULL)
return;
+ debugs(88, 3, http->storeEntry()->url() << " got " << result);
+
if (result.flags.error && !EBIT_TEST(http->storeEntry()->flags, ENTRY_ABORTED))
return;
@@ -443,9 +433,6 @@ clientReplyContext::handleIMSReply(StoreIOBuffer result)
return;
}
- /* update size of the request */
- reqsize = result.length + reqofs;
-
// request to origin was aborted
if (EBIT_TEST(http->storeEntry()->flags, ENTRY_ABORTED)) {
debugs(88, 3, "request to origin aborted '" << http->storeEntry()->url() << "', sending old entry to client");
@@ -474,7 +461,7 @@ clientReplyContext::handleIMSReply(StoreIOBuffer result)
if (http->request->flags.ims && !old_entry->modifiedSince(http->request->ims, http->request->imslen)) {
// forward the 304 from origin
debugs(88, 3, "origin replied 304, revalidated existing entry and forwarding 304 to client");
- sendClientUpstreamResponse();
+ sendClientUpstreamResponse(result);
return;
}
@@ -497,7 +484,7 @@ clientReplyContext::handleIMSReply(StoreIOBuffer result)
http->logType.update(LOG_TCP_REFRESH_MODIFIED);
debugs(88, 3, "origin replied " << status << ", forwarding to client");
- sendClientUpstreamResponse();
+ sendClientUpstreamResponse(result);
return;
}
@@ -505,7 +492,7 @@ clientReplyContext::handleIMSReply(StoreIOBuffer result)
if (http->request->flags.failOnValidationError) {
http->logType.update(LOG_TCP_REFRESH_FAIL_ERR);
debugs(88, 3, "origin replied with error " << status << ", forwarding to client due to fail_on_validation_err");
- sendClientUpstreamResponse();
+ sendClientUpstreamResponse(result);
return;
}
@@ -518,13 +505,7 @@ clientReplyContext::handleIMSReply(StoreIOBuffer result)
SQUIDCEXTERN CSR clientGetMoreData;
SQUIDCEXTERN CSD clientReplyDetach;
-/**
- * clientReplyContext::cacheHit Should only be called until the HTTP reply headers
- * have been parsed. Normally this should be a single call, but
- * it might take more than one. As soon as we have the headers,
- * we hand off to clientSendMoreData, processExpired, or
- * processMiss.
- */
+/// \copydoc clientReplyContext::cacheHit()
void
clientReplyContext::CacheHit(void *data, StoreIOBuffer result)
{
@@ -532,11 +513,11 @@ clientReplyContext::CacheHit(void *data, StoreIOBuffer result)
context->cacheHit(result);
}
-/**
- * Process a possible cache HIT.
- */
+/// Processes HTTP response headers received from Store on a suspected cache hit
+/// path. May be called several times (e.g., a Vary marker object hit followed
+/// by the corresponding variant hit).
void
-clientReplyContext::cacheHit(StoreIOBuffer result)
+clientReplyContext::cacheHit(const StoreIOBuffer result)
{
/** Ignore if the HIT object is being deleted. */
if (deleting) {
@@ -548,7 +529,7 @@ clientReplyContext::cacheHit(StoreIOBuffer result)
HttpRequest *r = http->request;
- debugs(88, 3, "clientCacheHit: " << http->uri << ", " << result.length << " bytes");
+ debugs(88, 3, http->uri << " got " << result);
if (http->storeEntry() == NULL) {
debugs(88, 3, "clientCacheHit: request aborted");
@@ -572,20 +553,7 @@ clientReplyContext::cacheHit(StoreIOBuffer result)
return;
}
- if (result.length == 0) {
- debugs(88, 5, "store IO buffer has no content. MISS");
- /* the store couldn't get enough data from the file for us to id the
- * object
- */
- /* treat as a miss */
- http->logType.update(LOG_TCP_MISS);
- processMiss();
- return;
- }
-
assert(!EBIT_TEST(e->flags, ENTRY_ABORTED));
- /* update size of the request */
- reqsize = result.length + reqofs;
/*
* Got the headers, now grok them
@@ -599,6 +567,8 @@ clientReplyContext::cacheHit(StoreIOBuffer result)
return;
}
+ noteStreamBufferredBytes(result);
+
switch (varyEvaluateMatch(e, r)) {
case VARY_NONE:
@@ -699,7 +669,7 @@ clientReplyContext::cacheHit(StoreIOBuffer result)
return;
} else if (r->conditional()) {
debugs(88, 5, "conditional HIT");
- if (processConditional(result))
+ if (processConditional())
return;
}
@@ -818,7 +788,7 @@ clientReplyContext::processOnlyIfCachedMiss()
/// process conditional request from client
bool
-clientReplyContext::processConditional(StoreIOBuffer &result)
+clientReplyContext::processConditional()
{
StoreEntry *const e = http->storeEntry();
@@ -1001,16 +971,7 @@ clientReplyContext::purgeFoundObject(StoreEntry *entry)
http->logType.update(LOG_TCP_HIT);
- reqofs = 0;
-
- localTempBuffer.offset = http->out.offset;
-
- localTempBuffer.length = next()->readBuffer.length;
-
- localTempBuffer.data = next()->readBuffer.data;
-
- storeClientCopy(sc, http->storeEntry(),
- localTempBuffer, CacheHit, this);
+ triggerInitialStoreRead(CacheHit);
}
void
@@ -1124,16 +1085,10 @@ clientReplyContext::purgeDoPurgeHead(StoreEntry *newEntry)
}
void
-clientReplyContext::traceReply(clientStreamNode * node)
+clientReplyContext::traceReply()
{
- clientStreamNode *nextNode = (clientStreamNode *)node->node.next->data;
- StoreIOBuffer localTempBuffer;
createStoreEntry(http->request->method, RequestFlags());
- localTempBuffer.offset = nextNode->readBuffer.offset + headers_sz;
- localTempBuffer.length = nextNode->readBuffer.length;
- localTempBuffer.data = nextNode->readBuffer.data;
- storeClientCopy(sc, http->storeEntry(),
- localTempBuffer, SendMoreData, this);
+ triggerInitialStoreRead();
http->storeEntry()->releaseRequest();
http->storeEntry()->buffer();
const HttpReplyPointer rep(new HttpReply);
@@ -1182,16 +1137,16 @@ int
clientReplyContext::storeOKTransferDone() const
{
assert(http->storeEntry()->objectLen() >= 0);
+ const auto headers_sz = http->storeEntry()->mem().baseReply().hdr_sz;
assert(http->storeEntry()->objectLen() >= headers_sz);
- if (http->out.offset >= http->storeEntry()->objectLen() - headers_sz) {
- debugs(88,3,HERE << "storeOKTransferDone " <<
- " out.offset=" << http->out.offset <<
- " objectLen()=" << http->storeEntry()->objectLen() <<
- " headers_sz=" << headers_sz);
- return 1;
- }
- return 0;
+ const auto done = http->out.offset >= http->storeEntry()->objectLen() - headers_sz;
+ const auto debugLevel = done ? 3 : 5;
+ debugs(88, debugLevel, done <<
+ " out.offset=" << http->out.offset <<
+ " objectLen()=" << http->storeEntry()->objectLen() <<
+ " headers_sz=" << headers_sz);
+ return done ? 1 : 0;
}
int
@@ -1204,8 +1159,7 @@ clientReplyContext::storeNotOKTransferDone() const
assert(mem != NULL);
assert(http->request != NULL);
- /* mem->reply was wrong because it uses the UPSTREAM header length!!! */
- if (headers_sz == 0)
+ if (mem->baseReply().pstate != Http::Message::psParsed)
/* haven't found end of headers yet */
return 0;
@@ -1226,16 +1180,12 @@ clientReplyContext::storeNotOKTransferDone() const
if (expectedBodySize < 0)
return 0;
- const uint64_t expectedLength = expectedBodySize + http->out.headers_sz;
-
- if (http->out.size < expectedLength)
- return 0;
- else {
- debugs(88,3,HERE << "storeNotOKTransferDone " <<
- " out.size=" << http->out.size <<
- " expectedLength=" << expectedLength);
- return 1;
- }
+ const auto done = http->out.offset >= expectedBodySize;
+ const auto debugLevel = done ? 3 : 5;
+ debugs(88, debugLevel, done <<
+ " out.offset=" << http->out.offset <<
+ " expectedBodySize=" << expectedBodySize);
+ return done ? 1 : 0;
}
/* A write has completed, what is the next status based on the
@@ -1804,20 +1754,12 @@ clientGetMoreData(clientStreamNode * aNode, ClientHttpRequest * http)
assert (context);
assert(context->http == http);
- clientStreamNode *next = ( clientStreamNode *)aNode->node.next->data;
-
if (!context->ourNode)
context->ourNode = aNode;
/* no cbdatareference, this is only used once, and safely */
if (context->flags.storelogiccomplete) {
- StoreIOBuffer tempBuffer;
- tempBuffer.offset = next->readBuffer.offset + context->headers_sz;
- tempBuffer.length = next->readBuffer.length;
- tempBuffer.data = next->readBuffer.data;
-
- storeClientCopy(context->sc, http->storeEntry(),
- tempBuffer, clientReplyContext::SendMoreData, context);
+ context->requestMoreBodyFromStore();
return;
}
@@ -1830,7 +1772,7 @@ clientGetMoreData(clientStreamNode * aNode, ClientHttpRequest * http)
if (context->http->request->method == Http::METHOD_TRACE) {
if (context->http->request->header.getInt64(Http::HdrType::MAX_FORWARDS) == 0) {
- context->traceReply(aNode);
+ context->traceReply();
return;
}
@@ -1860,7 +1802,6 @@ clientReplyContext::doGetMoreData()
#endif
assert(http->logType.oldType == LOG_TCP_HIT);
- reqofs = 0;
/* guarantee nothing has been sent yet! */
assert(http->out.size == 0);
assert(http->out.offset == 0);
@@ -1875,10 +1816,7 @@ clientReplyContext::doGetMoreData()
}
}
- localTempBuffer.offset = reqofs;
- localTempBuffer.length = getNextNode()->readBuffer.length;
- localTempBuffer.data = getNextNode()->readBuffer.data;
- storeClientCopy(sc, http->storeEntry(), localTempBuffer, CacheHit, this);
+ triggerInitialStoreRead(CacheHit);
} else {
/* MISS CASE, http->logType is already set! */
processMiss();
@@ -1913,12 +1851,11 @@ clientReplyContext::makeThisHead()
}
bool
-clientReplyContext::errorInStream(StoreIOBuffer const &result, size_t const &sizeToProcess)const
+clientReplyContext::errorInStream(const StoreIOBuffer &result) const
{
return /* aborted request */
(http->storeEntry() && EBIT_TEST(http->storeEntry()->flags, ENTRY_ABORTED)) ||
- /* Upstream read error */ (result.flags.error) ||
- /* Upstream EOF */ (sizeToProcess == 0);
+ /* Upstream read error */ (result.flags.error);
}
void
@@ -1939,24 +1876,17 @@ clientReplyContext::sendStreamError(StoreIOBuffer const &result)
}
void
-clientReplyContext::pushStreamData(StoreIOBuffer const &result, char *source)
+clientReplyContext::pushStreamData(const StoreIOBuffer &result)
{
- StoreIOBuffer localTempBuffer;
-
if (result.length == 0) {
debugs(88, 5, "clientReplyContext::pushStreamData: marking request as complete due to 0 length store result");
flags.complete = 1;
}
- assert(result.offset - headers_sz == next()->readBuffer.offset);
- localTempBuffer.offset = result.offset - headers_sz;
- localTempBuffer.length = result.length;
-
- if (localTempBuffer.length)
- localTempBuffer.data = source;
+ assert(!result.length || result.offset == next()->readBuffer.offset);
clientStreamCallback((clientStreamNode*)http->client_stream.head->data, http, NULL,
- localTempBuffer);
+ result);
}
clientStreamNode *
@@ -2048,7 +1978,6 @@ clientReplyContext::processReplyAccess ()
if (http->logType.oldType == LOG_TCP_DENIED ||
http->logType.oldType == LOG_TCP_DENIED_REPLY ||
alwaysAllowResponse(reply->sline.status())) {
- headers_sz = reply->hdr_sz;
processReplyAccessResult(ACCESS_ALLOWED);
return;
}
@@ -2059,8 +1988,6 @@ clientReplyContext::processReplyAccess ()
return;
}
- headers_sz = reply->hdr_sz;
-
/** check for absent access controls (permit by default) */
if (!Config.accessList.reply) {
processReplyAccessResult(ACCESS_ALLOWED);
@@ -2117,11 +2044,9 @@ clientReplyContext::processReplyAccessResult(const Acl::Answer &accessAllowed)
/* Ok, the reply is allowed, */
http->loggingEntry(http->storeEntry());
- ssize_t body_size = reqofs - reply->hdr_sz;
- if (body_size < 0) {
- reqofs = reply->hdr_sz;
- body_size = 0;
- }
+ Assure(matchesStreamBodyBuffer(lastStreamBufferedBytes));
+ Assure(!lastStreamBufferedBytes.offset);
+ auto body_size = lastStreamBufferedBytes.length; // may be zero
debugs(88, 3, "clientReplyContext::sendMoreData: Appending " <<
(int) body_size << " bytes after " << reply->hdr_sz <<
@@ -2149,19 +2074,27 @@ clientReplyContext::processReplyAccessResult(const Acl::Answer &accessAllowed)
assert (!flags.headersSent);
flags.headersSent = true;
+ // next()->readBuffer.offset may be positive for Range requests, but our
+ // localTempBuffer initialization code assumes that next()->readBuffer.data
+ // points to the response body at offset 0 because the first
+ // storeClientCopy() request always has offset 0 (i.e. our first Store
+ // request ignores next()->readBuffer.offset).
+ //
+ // XXX: We cannot fully check that assumption: readBuffer.offset field is
+ // often out of sync with the buffer content, and if some buggy code updates
+ // the buffer while we were waiting for the processReplyAccessResult()
+ // callback, we may not notice.
+
StoreIOBuffer localTempBuffer;
- char *buf = next()->readBuffer.data;
- char *body_buf = buf + reply->hdr_sz;
+ const auto body_buf = next()->readBuffer.data;
//Server side may disable ranges under some circumstances.
if ((!http->request->range))
next()->readBuffer.offset = 0;
- body_buf -= next()->readBuffer.offset;
-
- if (next()->readBuffer.offset != 0) {
- if (next()->readBuffer.offset > body_size) {
+ if (next()->readBuffer.offset > 0) {
+ if (Less(body_size, next()->readBuffer.offset)) {
/* Can't use any of the body we received. send nothing */
localTempBuffer.length = 0;
localTempBuffer.data = NULL;
@@ -2174,7 +2107,6 @@ clientReplyContext::processReplyAccessResult(const Acl::Answer &accessAllowed)
localTempBuffer.data = body_buf;
}
- /* TODO??: move the data in the buffer back by the request header size */
clientStreamCallback((clientStreamNode *)http->client_stream.head->data,
http, reply, localTempBuffer);
@@ -2187,6 +2119,8 @@ clientReplyContext::sendMoreData (StoreIOBuffer result)
if (deleting)
return;
+ debugs(88, 5, http->uri << " got " << result);
+
StoreEntry *entry = http->storeEntry();
if (ConnStateData * conn = http->getConn()) {
@@ -2199,7 +2133,9 @@ clientReplyContext::sendMoreData (StoreIOBuffer result)
return;
}
- if (reqofs==0 && !http->logType.isTcpHit()) {
+ if (!flags.headersSent && !http->logType.isTcpHit()) {
+ // We get here twice if processReplyAccessResult() calls startError().
+ // TODO: Revise when we check/change QoS markings to reduce syscalls.
if (Ip::Qos::TheConfig.isHitTosActive()) {
Ip::Qos::doTosLocalMiss(conn->clientConnection, http->request->hier.code);
}
@@ -2213,21 +2149,9 @@ clientReplyContext::sendMoreData (StoreIOBuffer result)
" out.offset=" << http->out.offset);
}
- char *buf = next()->readBuffer.data;
-
- if (buf != result.data) {
- /* we've got to copy some data */
- assert(result.length <= next()->readBuffer.length);
- memcpy(buf, result.data, result.length);
- }
-
/* We've got the final data to start pushing... */
flags.storelogiccomplete = 1;
- reqofs += result.length;
-
- assert(reqofs <= HTTP_REQBUF_SZ || flags.headersSent);
-
assert(http->request != NULL);
/* ESI TODO: remove this assert once everything is stable */
@@ -2236,20 +2160,25 @@ clientReplyContext::sendMoreData (StoreIOBuffer result)
makeThisHead();
- debugs(88, 5, "clientReplyContext::sendMoreData: " << http->uri << ", " <<
- reqofs << " bytes (" << result.length <<
- " new bytes)");
-
- /* update size of the request */
- reqsize = reqofs;
-
- if (errorInStream(result, reqofs)) {
+ if (errorInStream(result)) {
sendStreamError(result);
return;
}
+ if (!matchesStreamBodyBuffer(result)) {
+ // Subsequent processing expects response body bytes to be at the start
+ // of our Client Stream buffer. When given something else (e.g., bytes
+ // in our tempbuf), we copy and adjust to meet those expectations.
+ const auto &ourClientStreamsBuffer = next()->readBuffer;
+ assert(result.length <= ourClientStreamsBuffer.length);
+ memcpy(ourClientStreamsBuffer.data, result.data, result.length);
+ result.data = ourClientStreamsBuffer.data;
+ }
+
+ noteStreamBufferredBytes(result);
+
if (flags.headersSent) {
- pushStreamData (result, buf);
+ pushStreamData(result);
return;
}
@@ -2265,6 +2194,32 @@ clientReplyContext::sendMoreData (StoreIOBuffer result)
return;
}
+/// Whether the given body area describes the start of our Client Stream buffer.
+/// An empty area does.
+bool
+clientReplyContext::matchesStreamBodyBuffer(const StoreIOBuffer &their) const
+{
+ // the answer is undefined for errors; they are not really "body buffers"
+ Assure(!their.flags.error);
+
+ if (!their.length)
+ return true; // an empty body area always matches our body area
+
+ if (their.data != next()->readBuffer.data) {
+ debugs(88, 7, "no: " << their << " vs. " << next()->readBuffer);
+ return false;
+ }
+
+ return true;
+}
+
+void
+clientReplyContext::noteStreamBufferredBytes(const StoreIOBuffer &result)
+{
+ Assure(matchesStreamBodyBuffer(result));
+ lastStreamBufferedBytes = result; // may be unchanged and/or zero-length
+}
+
void
clientReplyContext::fillChecklist(ACLFilledChecklist &checklist) const
{
@@ -2310,13 +2265,6 @@ clientReplyContext::createStoreEntry(const HttpRequestMethod& m, RequestFlags re
sc->setDelayId(DelayId::DelayClient(http));
#endif
- reqofs = 0;
-
- reqsize = 0;
-
- /* I don't think this is actually needed! -- adrian */
- /* http->reqbuf = http->norm_reqbuf; */
- // assert(http->reqbuf == http->norm_reqbuf);
/* The next line is illegal because we don't know if the client stream
* buffers have been set up
*/
diff --git a/src/client_side_reply.h b/src/client_side_reply.h
index 8a901af..45fdf22 100644
--- a/src/client_side_reply.h
+++ b/src/client_side_reply.h
@@ -39,7 +39,6 @@ public:
void purgeFoundGet(StoreEntry *newEntry);
void purgeFoundHead(StoreEntry *newEntry);
void purgeFoundObject(StoreEntry *entry);
- void sendClientUpstreamResponse();
void purgeDoPurgeGet(StoreEntry *entry);
void purgeDoPurgeHead(StoreEntry *entry);
void doGetMoreData();
@@ -67,7 +66,7 @@ public:
void processExpired();
clientStream_status_t replyStatus();
void processMiss();
- void traceReply(clientStreamNode * node);
+ void traceReply();
const char *storeId() const { return (http->store_id.size() > 0 ? http->store_id.termedBuf() : http->uri); }
Http::StatusCode purgeStatus;
@@ -80,16 +79,15 @@ public:
virtual LogTags *loggingTags();
ClientHttpRequest *http;
- /// Base reply header bytes received from Store.
- /// Compatible with ClientHttpRequest::Out::offset.
- /// Not to be confused with ClientHttpRequest::Out::headers_sz.
- int headers_sz;
store_client *sc; /* The store_client we're using */
StoreIOBuffer tempBuffer; /* For use in validating requests via IMS */
int old_reqsize; /* ... again, for the buffer */
- size_t reqsize;
- size_t reqofs;
- char tempbuf[HTTP_REQBUF_SZ]; ///< a temporary buffer if we need working storage
+
+ /// Buffer dedicated to receiving storeClientCopy() responses to generated
+ /// revalidation requests. These requests cannot use next()->readBuffer
+ /// because the latter keeps the contents of the stale HTTP response during
+ /// revalidation. sendClientOldEntry() uses that contents.
+ char tempbuf[HTTP_REQBUF_SZ];
#if USE_CACHE_DIGESTS
const char *lookup_type; /* temporary hack: storeGet() result: HIT/MISS/NONE */
@@ -110,9 +108,10 @@ private:
clientStreamNode *getNextNode() const;
void makeThisHead();
- bool errorInStream(StoreIOBuffer const &result, size_t const &sizeToProcess)const ;
+ bool errorInStream(const StoreIOBuffer &result) const;
+ bool matchesStreamBodyBuffer(const StoreIOBuffer &) const;
void sendStreamError(StoreIOBuffer const &result);
- void pushStreamData(StoreIOBuffer const &result, char *source);
+ void pushStreamData(const StoreIOBuffer &);
clientStreamNode * next() const;
StoreIOBuffer holdingBuffer;
HttpReply *reply;
@@ -124,11 +123,13 @@ private:
bool alwaysAllowResponse(Http::StatusCode sline) const;
int checkTransferDone();
void processOnlyIfCachedMiss();
- bool processConditional(StoreIOBuffer &result);
+ bool processConditional();
+ void noteStreamBufferredBytes(const StoreIOBuffer &);
void cacheHit(StoreIOBuffer result);
void handleIMSReply(StoreIOBuffer result);
void sendMoreData(StoreIOBuffer result);
- void triggerInitialStoreRead();
+ void triggerInitialStoreRead(STCB = SendMoreData);
+ void requestMoreBodyFromStore();
void sendClientOldEntry();
void purgeAllCached();
void forgetHit();
@@ -138,6 +139,13 @@ private:
void sendPreconditionFailedError();
void sendNotModified();
void sendNotModifiedOrPreconditionFailedError();
+ void sendClientUpstreamResponse(const StoreIOBuffer &upstreamResponse);
+
+ /// Reduces a chance of an accidental direct storeClientCopy() call that
+ /// (should but) forgets to invalidate our lastStreamBufferedBytes. This
+ /// function is not defined; decltype() syntax prohibits "= delete", but
+ /// function usage will trigger deprecation warnings and linking errors.
+ static decltype(::storeClientCopy) storeClientCopy [[deprecated]];
StoreEntry *old_entry;
/* ... for entry to be validated */
@@ -154,6 +162,12 @@ private:
} CollapsedRevalidation;
CollapsedRevalidation collapsedRevalidation;
+
+ /// HTTP response body bytes stored in our Client Stream buffer (if any)
+ StoreIOBuffer lastStreamBufferedBytes;
+
+ // TODO: Remove after moving the meat of this function into a method.
+ friend CSR clientGetMoreData;
};
#endif /* SQUID_CLIENTSIDEREPLY_H */
diff --git a/src/client_side_request.h b/src/client_side_request.h
index 40e0a2e..8f6802e 100644
--- a/src/client_side_request.h
+++ b/src/client_side_request.h
@@ -112,7 +112,6 @@ public:
/// Response header and body bytes written to the client connection.
uint64_t size;
/// Response header bytes written to the client connection.
- /// Not to be confused with clientReplyContext::headers_sz.
size_t headers_sz;
} out;
diff --git a/src/enums.h b/src/enums.h
index 237b5c1..36b11c5 100644
--- a/src/enums.h
+++ b/src/enums.h
@@ -202,7 +202,6 @@ enum {
typedef enum {
DIGEST_READ_NONE,
DIGEST_READ_REPLY,
- DIGEST_READ_HEADERS,
DIGEST_READ_CBLOCK,
DIGEST_READ_MASK,
DIGEST_READ_DONE
diff --git a/src/icmp/net_db.cc b/src/icmp/net_db.cc
index 0a12a8e..3fba418 100644
--- a/src/icmp/net_db.cc
+++ b/src/icmp/net_db.cc
@@ -33,6 +33,7 @@
#include "mime_header.h"
#include "neighbors.h"
#include "PeerSelectState.h"
+#include "sbuf/SBuf.h"
#include "SquidConfig.h"
#include "SquidTime.h"
#include "Store.h"
@@ -49,8 +50,6 @@
#include "ipcache.h"
#include "StoreClient.h"
-#define NETDB_REQBUF_SZ 4096
-
typedef enum {
STATE_NONE,
STATE_HEADER,
@@ -66,7 +65,6 @@ public:
p(aPeer),
r(theReq)
{
- *buf = 0;
assert(r);
// TODO: check if we actually need to do this. should be implicit
r->http_ver = Http::ProtocolVersion();
@@ -82,10 +80,10 @@ public:
StoreEntry *e = nullptr;
store_client *sc = nullptr;
HttpRequestPointer r;
- int64_t used = 0;
- size_t buf_sz = NETDB_REQBUF_SZ;
- char buf[NETDB_REQBUF_SZ];
- int buf_ofs = 0;
+
+ /// for receiving a NetDB reply body from Store and interpreting it
+ Store::ParsingBuffer parsingBuffer;
+
netdb_conn_state_t connstate = STATE_HEADER;
};
@@ -688,23 +686,19 @@ netdbExchangeHandleReply(void *data, StoreIOBuffer receivedData)
Ip::Address addr;
netdbExchangeState *ex = (netdbExchangeState *)data;
- int rec_sz = 0;
- int o;
struct in_addr line_addr;
double rtt;
double hops;
- char *p;
int j;
- size_t hdr_sz;
int nused = 0;
- int size;
- int oldbufofs = ex->buf_ofs;
- rec_sz = 0;
+ size_t rec_sz = 0; // received record size (TODO: make const)
rec_sz += 1 + sizeof(struct in_addr);
rec_sz += 1 + sizeof(int);
rec_sz += 1 + sizeof(int);
+ // to make progress without growing buffer space, we must parse at least one record per call
+ Assure(rec_sz <= ex->parsingBuffer.capacity());
debugs(38, 3, "netdbExchangeHandleReply: " << receivedData.length << " read bytes");
if (!ex->p.valid()) {
@@ -715,64 +709,28 @@ netdbExchangeHandleReply(void *data, StoreIOBuffer receivedData)
debugs(38, 3, "netdbExchangeHandleReply: for '" << ex->p->host << ":" << ex->p->http_port << "'");
- if (receivedData.length == 0 && !receivedData.flags.error) {
- debugs(38, 3, "netdbExchangeHandleReply: Done");
+ if (receivedData.flags.error) {
delete ex;
return;
}
- p = ex->buf;
-
- /* Get the size of the buffer now */
- size = ex->buf_ofs + receivedData.length;
- debugs(38, 3, "netdbExchangeHandleReply: " << size << " bytes buf");
-
- /* Check if we're still doing headers */
-
if (ex->connstate == STATE_HEADER) {
-
- ex->buf_ofs += receivedData.length;
-
- /* skip reply headers */
-
- if ((hdr_sz = headersEnd(p, ex->buf_ofs))) {
- debugs(38, 5, "netdbExchangeHandleReply: hdr_sz = " << hdr_sz);
- const auto scode = ex->e->mem().baseReply().sline.status();
- assert(scode != Http::scNone);
- debugs(38, 3, "netdbExchangeHandleReply: reply status " << scode);
-
- if (scode != Http::scOkay) {
- delete ex;
- return;
- }
-
- assert((size_t)ex->buf_ofs >= hdr_sz);
-
- /*
- * Now, point p to the part of the buffer where the data
- * starts, and update the size accordingly
- */
- assert(ex->used == 0);
- ex->used = hdr_sz;
- size = ex->buf_ofs - hdr_sz;
- p += hdr_sz;
-
- /* Finally, set the conn state mode to STATE_BODY */
- ex->connstate = STATE_BODY;
- } else {
- StoreIOBuffer tempBuffer;
- tempBuffer.offset = ex->buf_ofs;
- tempBuffer.length = ex->buf_sz - ex->buf_ofs;
- tempBuffer.data = ex->buf + ex->buf_ofs;
- /* Have more headers .. */
- storeClientCopy(ex->sc, ex->e, tempBuffer,
- netdbExchangeHandleReply, ex);
+ const auto scode = ex->e->mem().baseReply().sline.status();
+ assert(scode != Http::scNone);
+ debugs(38, 3, "reply status " << scode);
+ if (scode != Http::scOkay) {
+ delete ex;
return;
}
+ ex->connstate = STATE_BODY;
}
assert(ex->connstate == STATE_BODY);
+ ex->parsingBuffer.appended(receivedData.data, receivedData.length);
+ auto p = ex->parsingBuffer.c_str(); // current parsing position
+ auto size = ex->parsingBuffer.contentSize(); // bytes we still need to parse
+
/* If we get here, we have some body to parse .. */
debugs(38, 5, "netdbExchangeHandleReply: start parsing loop, size = " << size);
@@ -781,6 +739,7 @@ netdbExchangeHandleReply(void *data, StoreIOBuffer receivedData)
addr.setAnyAddr();
hops = rtt = 0.0;
+ size_t o; // current record parsing offset
for (o = 0; o < rec_sz;) {
switch ((int) *(p + o)) {
@@ -818,8 +777,6 @@ netdbExchangeHandleReply(void *data, StoreIOBuffer receivedData)
assert(o == rec_sz);
- ex->used += rec_sz;
-
size -= rec_sz;
p += rec_sz;
@@ -827,32 +784,8 @@ netdbExchangeHandleReply(void *data, StoreIOBuffer receivedData)
++nused;
}
- /*
- * Copy anything that is left over to the beginning of the buffer,
- * and adjust buf_ofs accordingly
- */
-
- /*
- * Evilly, size refers to the buf size left now,
- * ex->buf_ofs is the original buffer size, so just copy that
- * much data over
- */
- memmove(ex->buf, ex->buf + (ex->buf_ofs - size), size);
-
- ex->buf_ofs = size;
-
- /*
- * And don't re-copy the remaining data ..
- */
- ex->used += size;
-
- /*
- * Now the tricky bit - size _included_ the leftover bit from the _last_
- * storeClientCopy. We don't want to include that, or our offset will be wrong.
- * So, don't count the size of the leftover buffer we began with.
- * This can _disappear_ when we're not tracking offsets ..
- */
- ex->used -= oldbufofs;
+ const auto parsedSize = ex->parsingBuffer.contentSize() - size;
+ ex->parsingBuffer.consume(parsedSize);
debugs(38, 3, "netdbExchangeHandleReply: size left over in this buffer: " << size << " bytes");
@@ -860,20 +793,26 @@ netdbExchangeHandleReply(void *data, StoreIOBuffer receivedData)
" entries, (x " << rec_sz << " bytes) == " << nused * rec_sz <<
" bytes total");
- debugs(38, 3, "netdbExchangeHandleReply: used " << ex->used);
-
if (EBIT_TEST(ex->e->flags, ENTRY_ABORTED)) {
debugs(38, 3, "netdbExchangeHandleReply: ENTRY_ABORTED");
delete ex;
- } else if (ex->e->store_status == STORE_PENDING) {
- StoreIOBuffer tempBuffer;
- tempBuffer.offset = ex->used;
- tempBuffer.length = ex->buf_sz - ex->buf_ofs;
- tempBuffer.data = ex->buf + ex->buf_ofs;
- debugs(38, 3, "netdbExchangeHandleReply: EOF not received");
- storeClientCopy(ex->sc, ex->e, tempBuffer,
- netdbExchangeHandleReply, ex);
+ return;
}
+
+ if (ex->sc->atEof()) {
+ if (const auto leftoverBytes = ex->parsingBuffer.contentSize())
+ debugs(38, 2, "discarding a partially received record due to Store EOF: " << leftoverBytes);
+ delete ex;
+ return;
+ }
+
+ // TODO: To protect us from a broken peer sending an "infinite" stream of
+ // new addresses, limit the cumulative number of received bytes or records?
+
+ const auto remainingSpace = ex->parsingBuffer.space().positionAt(receivedData.offset + receivedData.length);
+ // rec_sz is at most buffer capacity, and we consume all fully loaded records
+ Assure(remainingSpace.length);
+ storeClientCopy(ex->sc, ex->e, remainingSpace, netdbExchangeHandleReply, ex);
}
#endif /* USE_ICMP */
@@ -1285,14 +1224,9 @@ netdbExchangeStart(void *data)
ex->e = storeCreateEntry(uri, uri, RequestFlags(), Http::METHOD_GET);
assert(NULL != ex->e);
- StoreIOBuffer tempBuffer;
- tempBuffer.length = ex->buf_sz;
- tempBuffer.data = ex->buf;
-
ex->sc = storeClientListAdd(ex->e, ex);
+ storeClientCopy(ex->sc, ex->e, ex->parsingBuffer.makeInitialSpace(), netdbExchangeHandleReply, ex);
- storeClientCopy(ex->sc, ex->e, tempBuffer,
- netdbExchangeHandleReply, ex);
ex->r->flags.loopDetected = true; /* cheat! -- force direct */
// XXX: send as Proxy-Authenticate instead
diff --git a/src/internal.cc b/src/internal.cc
index 849d129..14d4e81 100644
--- a/src/internal.cc
+++ b/src/internal.cc
@@ -10,6 +10,7 @@
#include "squid.h"
#include "AccessLogEntry.h"
+#include "base/Assure.h"
#include "CacheManager.h"
#include "comm/Connection.h"
#include "errorpage.h"
diff --git a/src/peer_digest.cc b/src/peer_digest.cc
index 82bed13..e1706f7 100644
--- a/src/peer_digest.cc
+++ b/src/peer_digest.cc
@@ -39,7 +39,6 @@ static EVH peerDigestCheck;
static void peerDigestRequest(PeerDigest * pd);
static STCB peerDigestHandleReply;
static int peerDigestFetchReply(void *, char *, ssize_t);
-int peerDigestSwapInHeaders(void *, char *, ssize_t);
int peerDigestSwapInCBlock(void *, char *, ssize_t);
int peerDigestSwapInMask(void *, char *, ssize_t);
static int peerDigestFetchedEnough(DigestFetchState * fetch, char *buf, ssize_t size, const char *step_name);
@@ -375,6 +374,9 @@ peerDigestRequest(PeerDigest * pd)
fetch->sc = storeClientListAdd(e, fetch);
/* set lastmod to trigger IMS request if possible */
+ // TODO: Also check for fetch->pd->cd presence as a precondition for sending
+ // IMS requests because peerDigestFetchReply() does not accept 304 responses
+ // without an in-memory cache digest.
if (old_e)
e->lastModified(old_e->lastModified());
@@ -409,11 +411,16 @@ peerDigestHandleReply(void *data, StoreIOBuffer receivedData)
digest_read_state_t prevstate;
int newsize;
- assert(fetch->pd && receivedData.data);
+ if (receivedData.flags.error) {
+ peerDigestFetchAbort(fetch, fetch->buf, "failure loading digest reply from Store");
+ return;
+ }
+
+ assert(fetch->pd);
/* The existing code assumes that the received pointer is
* where we asked the data to be put
*/
- assert(fetch->buf + fetch->bufofs == receivedData.data);
+ assert(!receivedData.data || fetch->buf + fetch->bufofs == receivedData.data);
/* Update the buffer size */
fetch->bufofs += receivedData.length;
@@ -445,10 +452,6 @@ peerDigestHandleReply(void *data, StoreIOBuffer receivedData)
retsize = peerDigestFetchReply(fetch, fetch->buf, fetch->bufofs);
break;
- case DIGEST_READ_HEADERS:
- retsize = peerDigestSwapInHeaders(fetch, fetch->buf, fetch->bufofs);
- break;
-
case DIGEST_READ_CBLOCK:
retsize = peerDigestSwapInCBlock(fetch, fetch->buf, fetch->bufofs);
break;
@@ -488,7 +491,7 @@ peerDigestHandleReply(void *data, StoreIOBuffer receivedData)
// checking at the beginning of this function. However, in this case, we would have to require
// that the parser does not regard EOF as a special condition (it is true now but may change
// in the future).
- if (!receivedData.length) { // EOF
+ if (fetch->sc->atEof()) {
peerDigestFetchAbort(fetch, fetch->buf, "premature end of digest reply");
return;
}
@@ -507,19 +510,12 @@ peerDigestHandleReply(void *data, StoreIOBuffer receivedData)
}
}
-/* wait for full http headers to be received then parse them */
-/*
- * This routine handles parsing the reply line.
- * If the reply line indicates an OK, the same data is thrown
- * to SwapInHeaders(). If the reply line is a NOT_MODIFIED,
- * we simply stop parsing.
- */
+/// handle HTTP response headers in the initial storeClientCopy() response
static int
peerDigestFetchReply(void *data, char *buf, ssize_t size)
{
DigestFetchState *fetch = (DigestFetchState *)data;
PeerDigest *pd = fetch->pd;
- size_t hdr_size;
assert(pd && buf);
assert(!fetch->offset);
@@ -528,7 +524,7 @@ peerDigestFetchReply(void *data, char *buf, ssize_t size)
if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestFetchReply"))
return -1;
- if ((hdr_size = headersEnd(buf, size))) {
+ {
const auto &reply = fetch->entry->mem().freshestReply();
const auto status = reply.sline.status();
assert(status != Http::scNone);
@@ -561,6 +557,15 @@ peerDigestFetchReply(void *data, char *buf, ssize_t size)
/* preserve request -- we need its size to update counters */
/* requestUnlink(r); */
/* fetch->entry->mem_obj->request = NULL; */
+
+ if (!fetch->pd->cd) {
+ peerDigestFetchAbort(fetch, buf, "304 without the old in-memory digest");
+ return -1;
+ }
+
+ // stay with the old in-memory digest
+ peerDigestFetchStop(fetch, buf, "Not modified");
+ fetch->state = DIGEST_READ_DONE;
} else if (status == Http::scOkay) {
/* get rid of old entry if any */
@@ -571,70 +576,16 @@ peerDigestFetchReply(void *data, char *buf, ssize_t size)
fetch->old_entry->unlock("peerDigestFetchReply 200");
fetch->old_entry = NULL;
}
+
+ fetch->state = DIGEST_READ_CBLOCK;
} else {
/* some kind of a bug */
peerDigestFetchAbort(fetch, buf, reply.sline.reason());
return -1; /* XXX -1 will abort stuff in ReadReply! */
}
-
- /* must have a ready-to-use store entry if we got here */
- /* can we stay with the old in-memory digest? */
- if (status == Http::scNotModified && fetch->pd->cd) {
- peerDigestFetchStop(fetch, buf, "Not modified");
- fetch->state = DIGEST_READ_DONE;
- } else {
- fetch->state = DIGEST_READ_HEADERS;
- }
- } else {
- /* need more data, do we have space? */
-
- if (size >= SM_PAGE_SIZE)
- peerDigestFetchAbort(fetch, buf, "reply header too big");
- }
-
- /* We don't want to actually ack that we've handled anything,
- * otherwise SwapInHeaders() won't get the reply line .. */
- return 0;
-}
-
-/* fetch headers from disk, pass on to SwapInCBlock */
-int
-peerDigestSwapInHeaders(void *data, char *buf, ssize_t size)
-{
- DigestFetchState *fetch = (DigestFetchState *)data;
- size_t hdr_size;
-
- assert(fetch->state == DIGEST_READ_HEADERS);
-
- if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestSwapInHeaders"))
- return -1;
-
- assert(!fetch->offset);
-
- if ((hdr_size = headersEnd(buf, size))) {
- const auto &reply = fetch->entry->mem().freshestReply();
- const auto status = reply.sline.status();
- assert(status != Http::scNone);
-
- if (status != Http::scOkay) {
- debugs(72, DBG_IMPORTANT, "peerDigestSwapInHeaders: " << fetch->pd->host <<
- " status " << status << " got cached!");
-
- peerDigestFetchAbort(fetch, buf, "internal status error");
- return -1;
- }
-
- fetch->state = DIGEST_READ_CBLOCK;
- return hdr_size; /* Say how much data we read */
- }
-
- /* need more data, do we have space? */
- if (size >= SM_PAGE_SIZE) {
- peerDigestFetchAbort(fetch, buf, "stored header too big");
- return -1;
}
- return 0; /* We need to read more to parse .. */
+ return 0; // we consumed/used no buffered bytes
}
int
@@ -756,7 +707,7 @@ peerDigestFetchedEnough(DigestFetchState * fetch, char *buf, ssize_t size, const
}
/* continue checking (maybe-successful eof case) */
- if (!reason && !size) {
+ if (!reason && !size && fetch->state != DIGEST_READ_REPLY) {
if (!pd->cd)
reason = "null digest?!";
else if (fetch->mask_offset != pd->cd->mask_size)
diff --git a/src/stmem.cc b/src/stmem.cc
index 8483f86..95218af 100644
--- a/src/stmem.cc
+++ b/src/stmem.cc
@@ -96,8 +96,6 @@ mem_hdr::freeDataUpto(int64_t target_offset)
break;
}
- assert (lowestOffset () <= target_offset);
-
return lowestOffset ();
}
diff --git a/src/store.cc b/src/store.cc
index edc9d41..57eb920 100644
--- a/src/store.cc
+++ b/src/store.cc
@@ -275,6 +275,8 @@ StoreEntry::storeClientType() const
assert(mem_obj);
+ debugs(20, 7, *this << " inmem_lo=" << mem_obj->inmem_lo);
+
if (mem_obj->inmem_lo)
return STORE_DISK_CLIENT;
@@ -302,6 +304,7 @@ StoreEntry::storeClientType() const
return STORE_MEM_CLIENT;
}
}
+ debugs(20, 7, "STORE_OK STORE_DISK_CLIENT");
return STORE_DISK_CLIENT;
}
@@ -321,10 +324,18 @@ StoreEntry::storeClientType() const
if (swap_status == SWAPOUT_NONE)
return STORE_MEM_CLIENT;
+ // TODO: The above "must make this a mem client" logic contradicts "Slight
+ // weirdness" logic in store_client::doCopy() that converts hits to misses
+ // on startSwapin() failures. We should probably attempt to open a swapin
+ // file _here_ instead (and avoid STORE_DISK_CLIENT designation for clients
+ // that fail to do so). That would also address a similar problem with Rock
+ // store that does not yet support swapin during SWAPOUT_WRITING.
+
/*
* otherwise, make subsequent clients read from disk so they
* can not delay the first, and vice-versa.
*/
+ debugs(20, 7, "STORE_PENDING STORE_DISK_CLIENT");
return STORE_DISK_CLIENT;
}
diff --git a/src/store/Makefile.am b/src/store/Makefile.am
index e71cd71..7bc6557 100644
--- a/src/store/Makefile.am
+++ b/src/store/Makefile.am
@@ -22,5 +22,7 @@ libstore_la_SOURCES = \
Disks.h \
LocalSearch.cc \
LocalSearch.h \
+ ParsingBuffer.cc \
+ ParsingBuffer.h \
Storage.h \
forward.h
diff --git a/src/store/Makefile.in b/src/store/Makefile.in
index 819f398..779916f 100644
--- a/src/store/Makefile.in
+++ b/src/store/Makefile.in
@@ -164,7 +164,7 @@ CONFIG_CLEAN_FILES =
CONFIG_CLEAN_VPATH_FILES =
LTLIBRARIES = $(noinst_LTLIBRARIES)
libstore_la_LIBADD =
-am_libstore_la_OBJECTS = Controller.lo Disk.lo Disks.lo LocalSearch.lo
+am_libstore_la_OBJECTS = Controller.lo Disk.lo Disks.lo LocalSearch.lo ParsingBuffer.lo
libstore_la_OBJECTS = $(am_libstore_la_OBJECTS)
AM_V_lt = $(am__v_lt_@AM_V@)
am__v_lt_ = $(am__v_lt_@AM_DEFAULT_V@)
@@ -186,7 +186,7 @@ DEFAULT_INCLUDES =
depcomp = $(SHELL) $(top_srcdir)/cfgaux/depcomp
am__maybe_remake_depfiles = depfiles
am__depfiles_remade = ./$(DEPDIR)/Controller.Plo ./$(DEPDIR)/Disk.Plo \
- ./$(DEPDIR)/Disks.Plo ./$(DEPDIR)/LocalSearch.Plo
+ ./$(DEPDIR)/Disks.Plo ./$(DEPDIR)/LocalSearch.Plo ./$(DEPDIR)/ParsingBuffer.Plo
am__mv = mv -f
CXXCOMPILE = $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) \
$(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CXXFLAGS) $(CXXFLAGS)
@@ -782,6 +782,8 @@ libstore_la_SOURCES = \
Disks.h \
LocalSearch.cc \
LocalSearch.h \
+ ParsingBuffer.cc \
+ ParsingBuffer.h \
Storage.h \
forward.h
@@ -853,6 +855,7 @@ distclean-compile:
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Disk.Plo@am__quote@ # am--include-marker
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Disks.Plo@am__quote@ # am--include-marker
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/LocalSearch.Plo@am__quote@ # am--include-marker
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/ParsingBuffer.Plo@am__quote@ # am--include-marker
$(am__depfiles_remade):
@$(MKDIR_P) $(@D)
@@ -1260,6 +1263,7 @@ distclean: distclean-recursive
-rm -f ./$(DEPDIR)/Disk.Plo
-rm -f ./$(DEPDIR)/Disks.Plo
-rm -f ./$(DEPDIR)/LocalSearch.Plo
+ -rm -f ./$(DEPDIR)/ParsingBuffer.Plo
-rm -f Makefile
distclean-am: clean-am distclean-compile distclean-generic \
distclean-tags
@@ -1309,6 +1313,7 @@ maintainer-clean: maintainer-clean-recursive
-rm -f ./$(DEPDIR)/Disk.Plo
-rm -f ./$(DEPDIR)/Disks.Plo
-rm -f ./$(DEPDIR)/LocalSearch.Plo
+ -rm -f ./$(DEPDIR)/ParsingBuffer.Plo
-rm -f Makefile
maintainer-clean-am: distclean-am maintainer-clean-generic
diff --git a/src/store/ParsingBuffer.cc b/src/store/ParsingBuffer.cc
new file mode 100644
index 0000000..77b5740
--- /dev/null
+++ b/src/store/ParsingBuffer.cc
@@ -0,0 +1,198 @@
+/*
+ * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#include "squid.h"
+#include "sbuf/Stream.h"
+#include "SquidMath.h"
+#include "store/ParsingBuffer.h"
+
+#include <iostream>
+
+// Several Store::ParsingBuffer() methods use assert() because the corresponding
+// failure means there is a good chance that somebody have already read from (or
+// written to) the wrong memory location. Since this buffer is used for storing
+// HTTP response bytes, such failures may corrupt traffic. No Assure() handling
+// code can safely recover from such failures.
+
+Store::ParsingBuffer::ParsingBuffer(StoreIOBuffer &initialSpace):
+ readerSuppliedMemory_(initialSpace)
+{
+}
+
+/// a read-only content start (or nil for some zero-size buffers)
+const char *
+Store::ParsingBuffer::memory() const
+{
+ return extraMemory_ ? extraMemory_->rawContent() : readerSuppliedMemory_.data;
+}
+
+size_t
+Store::ParsingBuffer::capacity() const
+{
+ return extraMemory_ ? (extraMemory_->length() + extraMemory_->spaceSize()) : readerSuppliedMemory_.length;
+}
+
+size_t
+Store::ParsingBuffer::contentSize() const
+{
+ return extraMemory_ ? extraMemory_->length() : readerSuppliedMemoryContentSize_;
+}
+
+void
+Store::ParsingBuffer::appended(const char * const newBytes, const size_t newByteCount)
+{
+ // a positive newByteCount guarantees that, after the first assertion below
+ // succeeds, the second assertion will not increment a nil memory() pointer
+ if (!newByteCount)
+ return;
+
+ // these checks order guarantees that memory() is not nil in the second assertion
+ assert(newByteCount <= spaceSize()); // the new bytes end in our space
+ assert(memory() + contentSize() == newBytes); // the new bytes start in our space
+ // and now we know that newBytes is not nil either
+
+ if (extraMemory_)
+ extraMemory_->rawAppendFinish(newBytes, newByteCount);
+ else
+ readerSuppliedMemoryContentSize_ = *IncreaseSum(readerSuppliedMemoryContentSize_, newByteCount);
+
+ assert(contentSize() <= capacity()); // paranoid
+}
+
+void
+Store::ParsingBuffer::consume(const size_t parsedBytes)
+{
+ Assure(contentSize() >= parsedBytes); // more conservative than extraMemory_->consume()
+ if (extraMemory_) {
+ extraMemory_->consume(parsedBytes);
+ } else {
+ readerSuppliedMemoryContentSize_ -= parsedBytes;
+ if (parsedBytes && readerSuppliedMemoryContentSize_)
+ memmove(readerSuppliedMemory_.data, memory() + parsedBytes, readerSuppliedMemoryContentSize_);
+ }
+}
+
+StoreIOBuffer
+Store::ParsingBuffer::space()
+{
+ const auto size = spaceSize();
+ const auto start = extraMemory_ ?
+ extraMemory_->rawAppendStart(size) :
+ (readerSuppliedMemory_.data + readerSuppliedMemoryContentSize_);
+ return StoreIOBuffer(spaceSize(), 0, start);
+}
+
+StoreIOBuffer
+Store::ParsingBuffer::makeSpace(const size_t pageSize)
+{
+ growSpace(pageSize);
+ auto result = space();
+ Assure(result.length >= pageSize);
+ result.length = pageSize;
+ return result;
+}
+
+StoreIOBuffer
+Store::ParsingBuffer::content() const
+{
+ // This const_cast is a StoreIOBuffer API limitation: That class does not
+ // support a "constant content view", even though it is used as such a view.
+ return StoreIOBuffer(contentSize(), 0, const_cast<char*>(memory()));
+}
+
+/// makes sure we have the requested number of bytes, allocates enough memory if needed
+void
+Store::ParsingBuffer::growSpace(const size_t minimumSpaceSize)
+{
+ const auto capacityIncreaseAttempt = IncreaseSum(contentSize(), minimumSpaceSize);
+ if (!capacityIncreaseAttempt)
+ throw TextException(ToSBuf("no support for a single memory block of ", contentSize(), '+', minimumSpaceSize, " bytes"), Here());
+ const auto newCapacity = *capacityIncreaseAttempt;
+
+ if (newCapacity <= capacity())
+ return; // already have enough space; no reallocation is needed
+
+ debugs(90, 7, "growing to provide " << minimumSpaceSize << " in " << *this);
+
+ if (extraMemory_) {
+ extraMemory_->reserveCapacity(newCapacity);
+ } else {
+ SBuf newStorage;
+ newStorage.reserveCapacity(newCapacity);
+ newStorage.append(readerSuppliedMemory_.data, readerSuppliedMemoryContentSize_);
+ extraMemory_ = std::move(newStorage);
+ }
+ Assure(spaceSize() >= minimumSpaceSize);
+}
+
+SBuf
+Store::ParsingBuffer::toSBuf() const
+{
+ return extraMemory_ ? *extraMemory_ : SBuf(content().data, content().length);
+}
+
+size_t
+Store::ParsingBuffer::spaceSize() const
+{
+ if (extraMemory_)
+ return extraMemory_->spaceSize();
+
+ assert(readerSuppliedMemoryContentSize_ <= readerSuppliedMemory_.length);
+ return readerSuppliedMemory_.length - readerSuppliedMemoryContentSize_;
+}
+
+/// 0-terminates stored byte sequence, allocating more memory if needed, but
+/// without increasing the number of stored content bytes
+void
+Store::ParsingBuffer::terminate()
+{
+ *makeSpace(1).data = 0;
+}
+
+StoreIOBuffer
+Store::ParsingBuffer::packBack()
+{
+ const auto bytesToPack = contentSize();
+ // until our callers do not have to work around legacy code expectations
+ Assure(bytesToPack);
+
+ // if we accumulated more bytes at some point, any extra metadata should
+ // have been consume()d by now, allowing readerSuppliedMemory_.data reuse
+ Assure(bytesToPack <= readerSuppliedMemory_.length);
+
+ auto result = readerSuppliedMemory_;
+ result.length = bytesToPack;
+ Assure(result.data);
+
+ if (!extraMemory_) {
+ // no accumulated bytes copying because they are in readerSuppliedMemory_
+ debugs(90, 7, "quickly exporting " << result.length << " bytes via " << readerSuppliedMemory_);
+ } else {
+ debugs(90, 7, "slowly exporting " << result.length << " bytes from " << extraMemory_->id << " back into " << readerSuppliedMemory_);
+ memmove(result.data, extraMemory_->rawContent(), result.length);
+ }
+
+ return result;
+}
+
+void
+Store::ParsingBuffer::print(std::ostream &os) const
+{
+ os << "size=" << contentSize();
+
+ if (extraMemory_) {
+ os << " capacity=" << capacity();
+ os << " extra=" << extraMemory_->id;
+ }
+
+ // report readerSuppliedMemory_ (if any) even if we are no longer using it
+ // for content storage; it affects packBack() and related parsing logic
+ if (readerSuppliedMemory_.length)
+ os << ' ' << readerSuppliedMemory_;
+}
+
diff --git a/src/store/ParsingBuffer.h b/src/store/ParsingBuffer.h
new file mode 100644
index 0000000..b8aa957
--- /dev/null
+++ b/src/store/ParsingBuffer.h
@@ -0,0 +1,128 @@
+/*
+ * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#ifndef SQUID_SRC_STORE_PARSINGBUFFER_H
+#define SQUID_SRC_STORE_PARSINGBUFFER_H
+
+#include "sbuf/SBuf.h"
+#include "StoreIOBuffer.h"
+
+#include <optional>
+
+namespace Store
+{
+
+/// A continuous buffer for efficient accumulation and NUL-termination of
+/// Store-read bytes. The buffer accumulates two kinds of Store readers:
+///
+/// * Readers that do not have any external buffer to worry about but need to
+/// accumulate, terminate, and/or consume buffered content read by Store.
+/// These readers use the default constructor and then allocate the initial
+/// buffer space for their first read (if any).
+///
+/// * Readers that supply their StoreIOBuffer at construction time. That buffer
+/// is enough to handle the majority of use cases. However, the supplied
+/// StoreIOBuffer capacity may be exceeded when parsing requires accumulating
+/// multiple Store read results and/or NUL-termination of a full buffer.
+///
+/// This buffer seamlessly grows as needed, reducing memory over-allocation and,
+/// in case of StoreIOBuffer-seeded construction, memory copies.
+class ParsingBuffer
+{
+public:
+ /// creates buffer without any space or content
+ ParsingBuffer() = default;
+
+ /// seeds this buffer with the caller-supplied buffer space
+ explicit ParsingBuffer(StoreIOBuffer &);
+
+ /// a NUL-terminated version of content(); same lifetime as content()
+ const char *c_str() { terminate(); return memory(); }
+
+ /// export content() into SBuf, avoiding content copying when possible
+ SBuf toSBuf() const;
+
+ /// the total number of append()ed bytes that were not consume()d
+ size_t contentSize() const;
+
+ /// the number of bytes in the space() buffer
+ size_t spaceSize() const;
+
+ /// the maximum number of bytes we can store without allocating more space
+ size_t capacity() const;
+
+ /// Stored append()ed bytes that have not been consume()d. The returned
+ /// buffer offset is set to zero; the caller is responsible for adjusting
+ /// the offset if needed (TODO: Add/return a no-offset Mem::View instead).
+ /// The returned buffer is invalidated by calling a non-constant method or
+ /// by changing the StoreIOBuffer contents given to our constructor.
+ StoreIOBuffer content() const;
+
+ /// A (possibly empty) buffer for reading the next byte(s). The returned
+ /// buffer offset is set to zero; the caller is responsible for adjusting
+ /// the offset if needed (TODO: Add/return a no-offset Mem::Area instead).
+ /// The returned buffer is invalidated by calling a non-constant method or
+ /// by changing the StoreIOBuffer contents given to our constructor.
+ StoreIOBuffer space();
+
+ /// A buffer for reading the exact number of next byte(s). The method may
+ /// allocate new memory and copy previously appended() bytes as needed.
+ /// \param pageSize the exact number of bytes the caller wants to read
+ /// \returns space() after any necessary allocations
+ StoreIOBuffer makeSpace(size_t pageSize);
+
+ /// A buffer suitable for the first storeClientCopy() call. The method may
+ /// allocate new memory and copy previously appended() bytes as needed.
+ /// \returns space() after any necessary allocations
+ /// \deprecated New clients should call makeSpace() with client-specific
+ /// pageSize instead of this one-size-fits-all legacy method.
+ StoreIOBuffer makeInitialSpace() { return makeSpace(4096); }
+
+ /// remember the new bytes received into the previously provided space()
+ void appended(const char *, size_t);
+
+ /// get rid of previously appended() prefix of a given size
+ void consume(size_t);
+
+ /// Returns stored content, reusing the StoreIOBuffer given at the
+ /// construction time. Copying is avoided if we did not allocate extra
+ /// memory since construction. Not meant for default-constructed buffers.
+ /// \prec positive contentSize() (\sa store_client::finishCallback())
+ StoreIOBuffer packBack();
+
+ /// summarizes object state (for debugging)
+ void print(std::ostream &) const;
+
+private:
+ const char *memory() const;
+ void terminate();
+ void growSpace(size_t);
+
+private:
+ /// externally allocated buffer we were seeded with (or a zero-size one)
+ StoreIOBuffer readerSuppliedMemory_;
+
+ /// append()ed to readerSuppliedMemory_ bytes that were not consume()d
+ size_t readerSuppliedMemoryContentSize_ = 0;
+
+ /// our internal buffer that takes over readerSuppliedMemory_ when the
+ /// latter becomes full and more memory is needed
+ std::optional<SBuf> extraMemory_;
+};
+
+inline std::ostream &
+operator <<(std::ostream &os, const ParsingBuffer &b)
+{
+ b.print(os);
+ return os;
+}
+
+} // namespace Store
+
+#endif /* SQUID_SRC_STORE_PARSINGBUFFER_H */
+
diff --git a/src/store/forward.h b/src/store/forward.h
index 0c870ec..f0869ca 100644
--- a/src/store/forward.h
+++ b/src/store/forward.h
@@ -46,6 +46,7 @@ class Disks;
class Disk;
class DiskConfig;
class EntryGuard;
+class ParsingBuffer;
typedef ::StoreEntry Entry;
typedef ::MemStore Memory;
diff --git a/src/store_client.cc b/src/store_client.cc
index 5180ec6..13ead49 100644
--- a/src/store_client.cc
+++ b/src/store_client.cc
@@ -10,6 +10,7 @@
#include "squid.h"
#include "acl/FilledChecklist.h"
+#include "base/AsyncCbdataCalls.h"
#include "base/CodeContext.h"
#include "event.h"
#include "globals.h"
@@ -18,8 +19,10 @@
#include "MemBuf.h"
#include "MemObject.h"
#include "mime_header.h"
+#include "sbuf/Stream.h"
#include "profiler/Profiler.h"
#include "SquidConfig.h"
+#include "SquidMath.h"
#include "StatCounters.h"
#include "Store.h"
#include "store_swapin.h"
@@ -41,7 +44,6 @@
static StoreIOState::STRCB storeClientReadBody;
static StoreIOState::STRCB storeClientReadHeader;
static void storeClientCopy2(StoreEntry * e, store_client * sc);
-static EVH storeClientCopyEvent;
static bool CheckQuickAbortIsReasonable(StoreEntry * entry);
CBDATA_CLASS_INIT(store_client);
@@ -93,12 +95,6 @@ StoreClient::fillChecklist(ACLFilledChecklist &checklist) const
/* store_client */
-bool
-store_client::memReaderHasLowerOffset(int64_t anOffset) const
-{
- return getType() == STORE_MEM_CLIENT && copyInto.offset < anOffset;
-}
-
int
store_client::getType() const
{
@@ -153,22 +149,44 @@ storeClientListAdd(StoreEntry * e, void *data)
return sc;
}
+/// finishCallback() wrapper; TODO: Add NullaryMemFunT for non-jobs.
void
-store_client::callback(ssize_t sz, bool error)
+store_client::FinishCallback(store_client * const sc)
{
- size_t bSz = 0;
+ sc->finishCallback();
+}
- if (sz >= 0 && !error)
- bSz = sz;
+/// finishes a copy()-STCB sequence by synchronously calling STCB
+void
+store_client::finishCallback()
+{
+ Assure(_callback.callback_handler);
+ Assure(_callback.notifier);
+
+ // XXX: Some legacy code relies on zero-length buffers having nil data
+ // pointers. Some other legacy code expects "correct" result.offset even
+ // when there is no body to return. Accommodate all those expectations.
+ auto result = StoreIOBuffer(0, copyInto.offset, nullptr);
+ if (object_ok && parsingBuffer && parsingBuffer->contentSize())
+ result = parsingBuffer->packBack();
+ result.flags.error = object_ok ? 0 : 1;
+
+ // TODO: Move object_ok handling above into this `if` statement.
+ if (object_ok) {
+ // works for zero hdr_sz cases as well; see also: nextHttpReadOffset()
+ discardableHttpEnd_ = NaturalSum<int64_t>(entry->mem().baseReply().hdr_sz, result.offset, result.length).value();
+ } else {
+ // object_ok is sticky, so we will not be able to use any response bytes
+ discardableHttpEnd_ = entry->mem().endOffset();
+ }
+ debugs(90, 7, "with " << result << "; discardableHttpEnd_=" << discardableHttpEnd_);
- StoreIOBuffer result(bSz, 0,copyInto.data);
+ // no HTTP headers and no body bytes (but not because there was no space)
+ atEof_ = !sendingHttpHeaders() && !result.length && copyInto.length;
- if (sz < 0 || error)
- result.flags.error = 1;
+ parsingBuffer.reset();
+ ++answers;
- result.offset = cmp_offset;
- assert(_callback.pending());
- cmp_offset = copyInto.offset + bSz;
STCB *temphandler = _callback.callback_handler;
void *cbdata = _callback.callback_data;
_callback = Callback(NULL, NULL);
@@ -180,32 +198,18 @@ store_client::callback(ssize_t sz, bool error)
cbdataReferenceDone(cbdata);
}
-static void
-storeClientCopyEvent(void *data)
-{
- store_client *sc = (store_client *)data;
- debugs(90, 3, "storeClientCopyEvent: Running");
- assert (sc->flags.copy_event_pending);
- sc->flags.copy_event_pending = false;
-
- if (!sc->_callback.pending())
- return;
-
- storeClientCopy2(sc->entry, sc);
-}
-
store_client::store_client(StoreEntry *e) :
- cmp_offset(0),
#if STORE_CLIENT_LIST_DEBUG
owner(cbdataReference(data)),
#endif
entry(e),
type(e->storeClientType()),
- object_ok(true)
+ object_ok(true),
+ atEof_(false),
+ answers(0)
{
flags.disk_io_pending = false;
flags.store_copying = false;
- flags.copy_event_pending = false;
++ entry->refcount;
if (getType() == STORE_DISK_CLIENT) {
@@ -251,16 +255,32 @@ store_client::copy(StoreEntry * anEntry,
#endif
assert(!_callback.pending());
-#if ONLYCONTIGUOUSREQUESTS
-
- assert(cmp_offset == copyRequest.offset);
-#endif
- /* range requests will skip into the body */
- cmp_offset = copyRequest.offset;
_callback = Callback (callback_fn, cbdataReference(data));
copyInto.data = copyRequest.data;
copyInto.length = copyRequest.length;
copyInto.offset = copyRequest.offset;
+ Assure(copyInto.offset >= 0);
+
+ if (!copyInto.length) {
+ // During the first storeClientCopy() call, a zero-size buffer means
+ // that we will have to drop any HTTP response body bytes we read (with
+ // the HTTP headers from disk). After that, it means we cannot return
+ // anything to the caller at all.
+ debugs(90, 2, "WARNING: zero-size storeClientCopy() buffer: " << copyInto);
+ // keep going; moreToRead() should prevent any from-Store reading
+ }
+
+ // Our nextHttpReadOffset() expects the first copy() call to have zero
+ // offset. More complex code could handle a positive first offset, but it
+ // would only be useful when reading responses from memory: We would not
+ // _delay_ the response (to read the requested HTTP body bytes from disk)
+ // when we already can respond with HTTP headers.
+ Assure(!copyInto.offset || answeredOnce());
+
+ parsingBuffer.emplace(copyInto);
+
+ discardableHttpEnd_ = nextHttpReadOffset();
+ debugs(90, 7, "discardableHttpEnd_=" << discardableHttpEnd_);
static bool copying (false);
assert (!copying);
@@ -288,50 +308,41 @@ store_client::copy(StoreEntry * anEntry,
// Add no code here. This object may no longer exist.
}
-/// Whether there is (or will be) more entry data for us.
+/// Whether Store has (or possibly will have) more entry data for us.
bool
-store_client::moreToSend() const
+store_client::moreToRead() const
{
+ if (!copyInto.length)
+ return false; // the client supplied a zero-size buffer
+
if (entry->store_status == STORE_PENDING)
return true; // there may be more coming
/* STORE_OK, including aborted entries: no more data is coming */
- const int64_t len = entry->objectLen();
+ if (canReadFromMemory())
+ return true; // memory has the first byte wanted by the client
- // If we do not know the entry length, then we have to open the swap file.
- const bool canSwapIn = entry->hasDisk();
- if (len < 0)
- return canSwapIn;
+ if (!entry->hasDisk())
+ return false; // cannot read anything from disk either
- if (copyInto.offset >= len)
- return false; // sent everything there is
+ if (entry->objectLen() >= 0 && copyInto.offset >= entry->contentLen())
+ return false; // the disk cannot have byte(s) wanted by the client
- if (canSwapIn)
- return true; // if we lack prefix, we can swap it in
-
- // If we cannot swap in, make sure we have what we want in RAM. Otherwise,
- // scheduleRead calls scheduleDiskRead which asserts without a swap file.
- const MemObject *mem = entry->mem_obj;
- return mem &&
- mem->inmem_lo <= copyInto.offset && copyInto.offset < mem->endOffset();
+ // we cannot be sure until we swap in metadata and learn contentLen(),
+ // but the disk may have the byte(s) wanted by the client
+ return true;
}
static void
storeClientCopy2(StoreEntry * e, store_client * sc)
{
/* reentrancy not allowed - note this could lead to
- * dropped events
+ * dropped notifications about response data availability
*/
- if (sc->flags.copy_event_pending) {
- return;
- }
-
if (sc->flags.store_copying) {
- sc->flags.copy_event_pending = true;
- debugs(90, 3, "storeClientCopy2: Queueing storeClientCopyEvent()");
- eventAdd("storeClientCopyEvent", storeClientCopyEvent, sc, 0.0, 0);
+ debugs(90, 3, "prevented recursive copying for " << *e);
return;
}
@@ -344,39 +355,44 @@ storeClientCopy2(StoreEntry * e, store_client * sc)
* if the peer aborts, we want to give the client(s)
* everything we got before the abort condition occurred.
*/
- /* Warning: doCopy may indirectly free itself in callbacks,
- * hence the lock to keep it active for the duration of
- * this function
- * XXX: Locking does not prevent calling sc destructor (it only prevents
- * freeing sc memory) so sc may become invalid from C++ p.o.v.
- */
- CbcPointer<store_client> tmpLock = sc;
- assert (!sc->flags.store_copying);
sc->doCopy(e);
- assert(!sc->flags.store_copying);
+}
+
+/// Whether our answer, if sent right now, will announce the availability of
+/// HTTP response headers (to the STCB callback) for the first time.
+bool
+store_client::sendingHttpHeaders() const
+{
+ return !answeredOnce() && entry->mem().baseReply().hdr_sz > 0;
}
void
store_client::doCopy(StoreEntry *anEntry)
{
+ Assure(_callback.pending());
+ Assure(!flags.disk_io_pending);
+ Assure(!flags.store_copying);
+
assert (anEntry == entry);
flags.store_copying = true;
MemObject *mem = entry->mem_obj;
- debugs(33, 5, "store_client::doCopy: co: " <<
- copyInto.offset << ", hi: " <<
- mem->endOffset());
+ debugs(33, 5, this << " into " << copyInto <<
+ " hi: " << mem->endOffset() <<
+ " objectLen: " << entry->objectLen() <<
+ " past_answers: " << answers);
- if (!moreToSend()) {
+ const auto sendHttpHeaders = sendingHttpHeaders();
+
+ if (!sendHttpHeaders && !moreToRead()) {
/* There is no more to send! */
debugs(33, 3, HERE << "There is no more to send!");
- callback(0);
+ noteNews();
flags.store_copying = false;
return;
}
- /* Check that we actually have data */
- if (anEntry->store_status == STORE_PENDING && copyInto.offset >= mem->endOffset()) {
+ if (!sendHttpHeaders && anEntry->store_status == STORE_PENDING && nextHttpReadOffset() >= mem->endOffset()) {
debugs(90, 3, "store_client::doCopy: Waiting for more");
flags.store_copying = false;
return;
@@ -398,7 +414,24 @@ store_client::doCopy(StoreEntry *anEntry)
if (!startSwapin())
return; // failure
}
- scheduleRead();
+
+ // send any immediately available body bytes even if we also sendHttpHeaders
+ if (canReadFromMemory()) {
+ readFromMemory();
+ noteNews(); // will sendHttpHeaders (if needed) as well
+ flags.store_copying = false;
+ return;
+ }
+
+ if (sendHttpHeaders) {
+ debugs(33, 5, "just send HTTP headers: " << mem->baseReply().hdr_sz);
+ noteNews();
+ flags.store_copying = false;
+ return;
+ }
+
+ // no information that the client needs is available immediately
+ scheduleDiskRead();
}
/// opens the swapin "file" if possible; otherwise, fail()s and returns false
@@ -432,14 +465,13 @@ store_client::startSwapin()
}
void
-store_client::scheduleRead()
+store_client::noteSwapInDone(const bool error)
{
- MemObject *mem = entry->mem_obj;
-
- if (copyInto.offset >= mem->inmem_lo && copyInto.offset < mem->endOffset())
- scheduleMemRead();
+ Assure(_callback.pending());
+ if (error)
+ fail();
else
- scheduleDiskRead();
+ noteNews();
}
void
@@ -464,15 +496,44 @@ store_client::scheduleDiskRead()
flags.store_copying = false;
}
+/// whether at least one byte wanted by the client is in memory
+bool
+store_client::canReadFromMemory() const
+{
+ const auto &mem = entry->mem();
+ const auto memReadOffset = nextHttpReadOffset();
+ return mem.inmem_lo <= memReadOffset && memReadOffset < mem.endOffset() &&
+ parsingBuffer->spaceSize();
+}
+
+/// The offset of the next stored HTTP response byte wanted by the client.
+int64_t
+store_client::nextHttpReadOffset() const
+{
+ Assure(parsingBuffer);
+ const auto &mem = entry->mem();
+ const auto hdr_sz = mem.baseReply().hdr_sz;
+ // Certain SMP cache manager transactions do not store HTTP headers in
+ // mem_hdr; they store just a kid-specific piece of the future report body.
+ // In such cases, hdr_sz ought to be zero. In all other (known) cases,
+ // mem_hdr contains HTTP response headers (positive hdr_sz if parsed)
+ // followed by HTTP response body. This code math accommodates all cases.
+ return NaturalSum<int64_t>(hdr_sz, copyInto.offset, parsingBuffer->contentSize()).value();
+}
+
+/// Copies at least some of the requested body bytes from MemObject memory,
+/// satisfying the copy() request.
+/// \pre canReadFromMemory() is true
void
-store_client::scheduleMemRead()
+store_client::readFromMemory()
{
- /* What the client wants is in memory */
- /* Old style */
- debugs(90, 3, "store_client::doCopy: Copying normal from memory");
- size_t sz = entry->mem_obj->data_hdr.copy(copyInto);
- callback(sz);
- flags.store_copying = false;
+ Assure(parsingBuffer);
+ const auto readInto = parsingBuffer->space().positionAt(nextHttpReadOffset());
+
+ debugs(90, 3, "copying HTTP body bytes from memory into " << readInto);
+ const auto sz = entry->mem_obj->data_hdr.copy(readInto);
+ Assure(sz > 0); // our canReadFromMemory() precondition guarantees that
+ parsingBuffer->appended(readInto.data, sz);
}
void
@@ -484,63 +545,150 @@ store_client::fileRead()
assert(!flags.disk_io_pending);
flags.disk_io_pending = true;
+ // mem->swap_hdr_sz is zero here during initial read(s)
+ const auto nextStoreReadOffset = NaturalSum<int64_t>(mem->swap_hdr_sz, nextHttpReadOffset()).value();
+
+ // XXX: If fileRead() is called when we do not yet know mem->swap_hdr_sz,
+ // then we must start reading from disk offset zero to learn it: we cannot
+ // compute correct HTTP response start offset on disk without it. However,
+ // late startSwapin() calls imply that the assertion below might fail.
+ Assure(mem->swap_hdr_sz > 0 || !nextStoreReadOffset);
+
+ // TODO: Remove this assertion. Introduced in 1998 commit 3157c72, it
+ // assumes that swapped out memory is freed unconditionally, but we no
+ // longer do that because trimMemory() path checks lowestMemReaderOffset().
+ // It is also misplaced: We are not swapping out anything here and should
+ // not care about any swapout invariants.
if (mem->swap_hdr_sz != 0)
if (entry->swappingOut())
- assert(mem->swapout.sio->offset() > copyInto.offset + (int64_t)mem->swap_hdr_sz);
+ assert(mem->swapout.sio->offset() > nextStoreReadOffset);
+
+ // XXX: We should let individual cache_dirs limit the read size instead, but
+ // we cannot do that without more fixes and research because:
+ // * larger reads corrupt responses when cache_dir uses SharedMemory::get();
+ // * we do not know how to find all I/O code that assumes this limit;
+ // * performance effects of larger disk reads may be negative somewhere.
+ const decltype(StoreIOBuffer::length) maxReadSize = SM_PAGE_SIZE;
+
+ Assure(parsingBuffer);
+ // also, do not read more than we can return (via a copyInto.length buffer)
+ const auto readSize = std::min(copyInto.length, maxReadSize);
+ lastDiskRead = parsingBuffer->makeSpace(readSize).positionAt(nextStoreReadOffset);
+ debugs(90, 5, "into " << lastDiskRead);
storeRead(swapin_sio,
- copyInto.data,
- copyInto.length,
- copyInto.offset + mem->swap_hdr_sz,
+ lastDiskRead.data,
+ lastDiskRead.length,
+ lastDiskRead.offset,
mem->swap_hdr_sz == 0 ? storeClientReadHeader
: storeClientReadBody,
this);
}
void
-store_client::readBody(const char *, ssize_t len)
+store_client::readBody(const char * const buf, const ssize_t lastIoResult)
{
- int parsed_header = 0;
-
- // Don't assert disk_io_pending here.. may be called by read_header
+ Assure(flags.disk_io_pending);
flags.disk_io_pending = false;
assert(_callback.pending());
- debugs(90, 3, "storeClientReadBody: len " << len << "");
+ Assure(parsingBuffer);
+ debugs(90, 3, "got " << lastIoResult << " using " << *parsingBuffer);
- if (len < 0)
+ if (lastIoResult < 0)
return fail();
- const auto rep = entry->mem_obj ? &entry->mem().baseReply() : nullptr;
- if (copyInto.offset == 0 && len > 0 && rep && rep->sline.status() == Http::scNone) {
- /* Our structure ! */
- if (!entry->mem_obj->adjustableBaseReply().parseCharBuf(copyInto.data, headersEnd(copyInto.data, len))) {
- debugs(90, DBG_CRITICAL, "Could not parse headers from on disk object");
- } else {
- parsed_header = 1;
- }
+ if (!lastIoResult) {
+ if (answeredOnce())
+ return noteNews();
+
+ debugs(90, DBG_CRITICAL, "ERROR: Truncated HTTP headers in on-disk object");
+ return fail();
}
- if (len > 0 && rep && entry->mem_obj->inmem_lo == 0 && entry->objectLen() <= (int64_t)Config.Store.maxInMemObjSize && Config.onoff.memory_cache_disk) {
- storeGetMemSpace(len);
+ assert(lastDiskRead.data == buf);
+ lastDiskRead.length = lastIoResult;
+
+ parsingBuffer->appended(buf, lastIoResult);
+
+ // we know swap_hdr_sz by now and were reading beyond swap metadata because
+ // readHead() would have been called otherwise (to read swap metadata)
+ const auto swap_hdr_sz = entry->mem().swap_hdr_sz;
+ Assure(swap_hdr_sz > 0);
+ Assure(!Less(lastDiskRead.offset, swap_hdr_sz));
+
+ // Map lastDiskRead (i.e. the disk area we just read) to an HTTP reply part.
+ // The bytes are the same, but disk and HTTP offsets differ by swap_hdr_sz.
+ const auto httpOffset = lastDiskRead.offset - swap_hdr_sz;
+ const auto httpPart = StoreIOBuffer(lastDiskRead).positionAt(httpOffset);
+
+ maybeWriteFromDiskToMemory(httpPart);
+ handleBodyFromDisk();
+}
+
+/// de-serializes HTTP response (partially) read from disk storage
+void
+store_client::handleBodyFromDisk()
+{
+ // We cannot de-serialize on-disk HTTP response without MemObject because
+ // without MemObject::swap_hdr_sz we cannot know where that response starts.
+ Assure(entry->mem_obj);
+ Assure(entry->mem_obj->swap_hdr_sz > 0);
+
+ if (!answeredOnce()) {
+ // All on-disk responses have HTTP headers. First disk body read(s)
+ // include HTTP headers that we must parse (if needed) and skip.
+ const auto haveHttpHeaders = entry->mem_obj->baseReply().pstate == Http::Message::psParsed;
+ if (!haveHttpHeaders && !parseHttpHeadersFromDisk())
+ return;
+ skipHttpHeadersFromDisk();
+ }
+
+ noteNews();
+}
+
+/// Adds HTTP response data loaded from disk to the memory cache (if
+/// needed/possible). The given part may contain portions of HTTP response
+/// headers and/or HTTP response body.
+void
+store_client::maybeWriteFromDiskToMemory(const StoreIOBuffer &httpResponsePart)
+{
+ // XXX: Reject [memory-]uncachable/unshareable responses instead of assuming
+ // that an HTTP response should be written to MemObject's data_hdr (and that
+ // it may purge already cached entries) just because it "fits" and was
+ // loaded from disk. For example, this response may already be marked for
+ // release. The (complex) cachability decision(s) should be made outside
+ // (and obeyed by) this low-level code.
+ if (httpResponsePart.length && entry->mem_obj->inmem_lo == 0 && entry->objectLen() <= (int64_t)Config.Store.maxInMemObjSize && Config.onoff.memory_cache_disk) {
+ storeGetMemSpace(httpResponsePart.length);
+ // XXX: This "recheck" is not needed because storeGetMemSpace() cannot
+ // purge mem_hdr bytes of a locked entry, and we do lock ours. And
+ // inmem_lo offset itself should not be relevant to appending new bytes.
+ //
// The above may start to free our object so we need to check again
if (entry->mem_obj->inmem_lo == 0) {
- /* Copy read data back into memory.
- * copyInto.offset includes headers, which is what mem cache needs
- */
- int64_t mem_offset = entry->mem_obj->endOffset();
- if ((copyInto.offset == mem_offset) || (parsed_header && mem_offset == rep->hdr_sz)) {
- entry->mem_obj->write(StoreIOBuffer(len, copyInto.offset, copyInto.data));
- }
+ // XXX: This code assumes a non-shared memory cache.
+ if (httpResponsePart.offset == entry->mem_obj->endOffset())
+ entry->mem_obj->write(httpResponsePart);
}
}
-
- callback(len);
}
void
store_client::fail()
{
+ debugs(90, 3, (object_ok ? "once" : "again"));
+ if (!object_ok)
+ return; // we failed earlier; nothing to do now
+
object_ok = false;
+
+ noteNews();
+}
+
+/// if necessary and possible, informs the Store reader about copy() result
+void
+store_client::noteNews()
+{
/* synchronous open failures callback from the store,
* before startSwapin detects the failure.
* TODO: fix this inconsistent behaviour - probably by
@@ -548,8 +696,20 @@ store_client::fail()
* not synchronous
*/
- if (_callback.pending())
- callback(0, true);
+ if (!_callback.callback_handler) {
+ debugs(90, 5, "client lost interest");
+ return;
+ }
+
+ if (_callback.notifier) {
+ debugs(90, 5, "earlier news is being delivered by " << _callback.notifier);
+ return;
+ }
+
+ _callback.notifier = asyncCall(90, 4, "store_client::FinishCallback", cbdataDialer(store_client::FinishCallback, this));
+ ScheduleCallHere(_callback.notifier);
+
+ Assure(!_callback.pending());
}
static void
@@ -620,38 +780,22 @@ store_client::readHeader(char const *buf, ssize_t len)
if (!object_ok)
return;
+ Assure(parsingBuffer);
+ debugs(90, 3, "got " << len << " using " << *parsingBuffer);
+
if (len < 0)
return fail();
+ Assure(!parsingBuffer->contentSize());
+ parsingBuffer->appended(buf, len);
if (!unpackHeader(buf, len)) {
fail();
return;
}
+ parsingBuffer->consume(mem->swap_hdr_sz);
- /*
- * If our last read got some data the client wants, then give
- * it to them, otherwise schedule another read.
- */
- size_t body_sz = len - mem->swap_hdr_sz;
-
- if (copyInto.offset < static_cast<int64_t>(body_sz)) {
- /*
- * we have (part of) what they want
- */
- size_t copy_sz = min(copyInto.length, body_sz);
- debugs(90, 3, "storeClientReadHeader: copying " << copy_sz << " bytes of body");
- memmove(copyInto.data, copyInto.data + mem->swap_hdr_sz, copy_sz);
-
- readBody(copyInto.data, copy_sz);
-
- return;
- }
-
- /*
- * we don't have what the client wants, but at least we now
- * know the swap header size.
- */
- fileRead();
+ maybeWriteFromDiskToMemory(parsingBuffer->content());
+ handleBodyFromDisk();
}
int
@@ -720,10 +864,12 @@ storeUnregister(store_client * sc, StoreEntry * e, void *data)
++statCounter.swap.ins;
}
- if (sc->_callback.pending()) {
- /* callback with ssize = -1 to indicate unexpected termination */
- debugs(90, 3, "store_client for " << *e << " has a callback");
- sc->fail();
+ if (sc->_callback.callback_handler || sc->_callback.notifier) {
+ debugs(90, 3, "forgetting store_client callback for " << *e);
+ // Do not notify: Callers want to stop copying and forget about this
+ // pending copy request. Some would mishandle a notification from here.
+ if (sc->_callback.notifier)
+ sc->_callback.notifier->cancel("storeUnregister");
}
#if STORE_CLIENT_LIST_DEBUG
@@ -731,6 +877,8 @@ storeUnregister(store_client * sc, StoreEntry * e, void *data)
#endif
+ // XXX: We might be inside sc store_client method somewhere up the call
+ // stack. TODO: Convert store_client to AsyncJob to make destruction async.
delete sc;
assert(e->locked());
@@ -788,6 +936,16 @@ StoreEntry::invokeHandlers()
if (sc->flags.disk_io_pending)
continue;
+ if (sc->flags.store_copying)
+ continue;
+
+ // XXX: If invokeHandlers() is (indirectly) called from a store_client
+ // method, then the above three conditions may not be sufficient to
+ // prevent us from reentering the same store_client object! This
+ // probably does not happen in the current code, but no observed
+ // invariant prevents this from (accidentally) happening in the future.
+
+ // TODO: Convert store_client into AsyncJob; make this call asynchronous
CodeContext::Reset(sc->_callback.codeContext);
debugs(90, 3, "checking client #" << i);
storeClientCopy2(this, sc);
@@ -906,6 +1064,63 @@ CheckQuickAbortIsReasonable(StoreEntry * entry)
return true;
}
+/// parses HTTP header bytes loaded from disk
+/// \returns false if fail() or scheduleDiskRead() has been called and, hence,
+/// the caller should just quit without any further action
+bool
+store_client::parseHttpHeadersFromDisk()
+{
+ try {
+ return tryParsingHttpHeaders();
+ } catch (...) {
+ // XXX: Our parser enforces Config.maxReplyHeaderSize limit, but our
+ // packer does not. Since packing might increase header size, we may
+ // cache a header that we cannot parse and get here. Same for MemStore.
+ debugs(90, DBG_CRITICAL, "ERROR: Cannot parse on-disk HTTP headers" <<
+ Debug::Extra << "exception: " << CurrentException <<
+ Debug::Extra << "raw input size: " << parsingBuffer->contentSize() << " bytes" <<
+ Debug::Extra << "current buffer capacity: " << parsingBuffer->capacity() << " bytes");
+ fail();
+ return false;
+ }
+}
+
+/// parseHttpHeadersFromDisk() helper
+/// \copydoc parseHttpHeaders()
+bool
+store_client::tryParsingHttpHeaders()
+{
+ Assure(parsingBuffer);
+ Assure(!copyInto.offset); // otherwise, parsingBuffer cannot have HTTP response headers
+ auto &adjustableReply = entry->mem().adjustableBaseReply();
+ if (adjustableReply.parseTerminatedPrefix(parsingBuffer->c_str(), parsingBuffer->contentSize()))
+ return true;
+
+ // TODO: Optimize by checking memory as well. For simplicity sake, we
+ // continue on the disk-reading path, but readFromMemory() can give us the
+ // missing header bytes immediately if a concurrent request put those bytes
+ // into memory while we were waiting for our disk response.
+ scheduleDiskRead();
+ return false;
+}
+
+/// skips HTTP header bytes previously loaded from disk
+void
+store_client::skipHttpHeadersFromDisk()
+{
+ const auto hdr_sz = entry->mem_obj->baseReply().hdr_sz;
+ Assure(hdr_sz > 0); // all on-disk responses have HTTP headers
+ if (Less(parsingBuffer->contentSize(), hdr_sz)) {
+ debugs(90, 5, "discovered " << hdr_sz << "-byte HTTP headers in memory after reading some of them from disk: " << *parsingBuffer);
+ parsingBuffer->consume(parsingBuffer->contentSize()); // skip loaded HTTP header prefix
+ } else {
+ parsingBuffer->consume(hdr_sz); // skip loaded HTTP headers
+ const auto httpBodyBytesAfterHeader = parsingBuffer->contentSize(); // may be zero
+ Assure(httpBodyBytesAfterHeader <= copyInto.length);
+ debugs(90, 5, "read HTTP body prefix: " << httpBodyBytesAfterHeader);
+ }
+}
+
void
store_client::dumpStats(MemBuf * output, int clientNumber) const
{
@@ -923,8 +1138,8 @@ store_client::dumpStats(MemBuf * output, int clientNumber) const
if (flags.store_copying)
output->append(" store_copying", 14);
- if (flags.copy_event_pending)
- output->append(" copy_event_pending", 19);
+ if (_callback.notifier)
+ output->append(" notifying", 10);
output->append("\n",1);
}
@@ -932,7 +1147,7 @@ store_client::dumpStats(MemBuf * output, int clientNumber) const
bool
store_client::Callback::pending() const
{
- return callback_handler && callback_data;
+ return callback_handler && !notifier;
}
store_client::Callback::Callback(STCB *function, void *data):
@@ -943,6 +1158,13 @@ store_client::Callback::Callback(STCB *function, void *data):
}
#if USE_DELAY_POOLS
+int
+store_client::bytesWanted() const
+{
+ // TODO: To avoid using stale copyInto, return zero if !_callback.pending()?
+ return delayId.bytesWanted(0, copyInto.length);
+}
+
void
store_client::setDelayId(DelayId delay_id)
{
diff --git a/src/store_swapin.cc b/src/store_swapin.cc
index 968a86a..cf77f3d 100644
--- a/src/store_swapin.cc
+++ b/src/store_swapin.cc
@@ -56,7 +56,7 @@ storeSwapInFileClosed(void *data, int errflag, StoreIOState::Pointer)
if (sc->_callback.pending()) {
assert (errflag <= 0);
- sc->callback(0, errflag ? true : false);
+ sc->noteSwapInDone(errflag);
}
++statCounter.swap.ins;
diff --git a/src/tests/stub_HttpReply.cc b/src/tests/stub_HttpReply.cc
index d460722..3248f75 100644
--- a/src/tests/stub_HttpReply.cc
+++ b/src/tests/stub_HttpReply.cc
@@ -25,6 +25,7 @@ void httpBodyPackInto(const HttpBody *, Packable *) STUB
bool HttpReply::sanityCheckStartLine(const char *buf, const size_t hdr_len, Http::StatusCode *error) STUB_RETVAL(false)
int HttpReply::httpMsgParseError() STUB_RETVAL(0)
bool HttpReply::expectingBody(const HttpRequestMethod&, int64_t&) const STUB_RETVAL(false)
+size_t HttpReply::parseTerminatedPrefix(const char *, size_t) STUB_RETVAL(0)
bool HttpReply::parseFirstLine(const char *start, const char *end) STUB_RETVAL(false)
void HttpReply::hdrCacheInit() STUB
HttpReply * HttpReply::clone() const STUB_RETVAL(NULL)
diff --git a/src/tests/stub_store_client.cc b/src/tests/stub_store_client.cc
index 91c98cd..794d573 100644
--- a/src/tests/stub_store_client.cc
+++ b/src/tests/stub_store_client.cc
@@ -34,7 +34,12 @@ void storeLogOpen(void) STUB
void storeDigestInit(void) STUB
void storeRebuildStart(void) STUB
void storeReplSetup(void) STUB
-bool store_client::memReaderHasLowerOffset(int64_t anOffset) const STUB_RETVAL(false)
void store_client::dumpStats(MemBuf * output, int clientNumber) const STUB
int store_client::getType() const STUB_RETVAL(0)
+void store_client::noteSwapInDone(bool) STUB
+#if USE_DELAY_POOLS
+int store_client::bytesWanted() const STUB_RETVAL(0)
+#endif
+
+
diff --git a/src/urn.cc b/src/urn.cc
index 64b6f2f..0b97540 100644
--- a/src/urn.cc
+++ b/src/urn.cc
@@ -28,8 +28,6 @@
#include "tools.h"
#include "urn.h"
-#define URN_REQBUF_SZ 4096
-
class UrnState : public StoreClient
{
CBDATA_CLASS(UrnState);
@@ -50,8 +48,8 @@ public:
HttpRequest::Pointer urlres_r;
AccessLogEntry::Pointer ale; ///< details of the requesting transaction
- char reqbuf[URN_REQBUF_SZ] = { '\0' };
- int reqofs = 0;
+ /// for receiving a URN resolver reply body from Store and interpreting it
+ Store::ParsingBuffer parsingBuffer;
private:
/* StoreClient API */
@@ -72,7 +70,7 @@ typedef struct {
} url_entry;
static STCB urnHandleReply;
-static url_entry *urnParseReply(const char *inbuf, const HttpRequestMethod&);
+static url_entry *urnParseReply(const SBuf &, const HttpRequestMethod &);
static const char *const crlf = "\r\n";
CBDATA_CLASS_INIT(UrnState);
@@ -201,13 +199,8 @@ UrnState::created(StoreEntry *e)
sc = storeClientListAdd(urlres_e, this);
}
- reqofs = 0;
- StoreIOBuffer tempBuffer;
- tempBuffer.offset = reqofs;
- tempBuffer.length = URN_REQBUF_SZ;
- tempBuffer.data = reqbuf;
storeClientCopy(sc, urlres_e,
- tempBuffer,
+ parsingBuffer.makeInitialSpace(),
urnHandleReply,
this);
}
@@ -242,19 +235,14 @@ urnHandleReply(void *data, StoreIOBuffer result)
UrnState *urnState = static_cast<UrnState *>(data);
StoreEntry *e = urnState->entry;
StoreEntry *urlres_e = urnState->urlres_e;
- char *s = NULL;
- size_t k;
- HttpReply *rep;
url_entry *urls;
url_entry *u;
url_entry *min_u;
ErrorState *err;
int i;
int urlcnt = 0;
- char *buf = urnState->reqbuf;
- StoreIOBuffer tempBuffer;
- debugs(52, 3, "urnHandleReply: Called with size=" << result.length << ".");
+ debugs(52, 3, result << " with " << *e);
if (EBIT_TEST(urlres_e->flags, ENTRY_ABORTED) || result.flags.error) {
delete urnState;
@@ -267,59 +255,39 @@ urnHandleReply(void *data, StoreIOBuffer result)
return;
}
- /* Update reqofs to point to where in the buffer we'd be */
- urnState->reqofs += result.length;
-
- /* Handle reqofs being bigger than normal */
- if (urnState->reqofs >= URN_REQBUF_SZ) {
- delete urnState;
- return;
- }
+ urnState->parsingBuffer.appended(result.data, result.length);
/* If we haven't received the entire object (urn), copy more */
- if (urlres_e->store_status == STORE_PENDING) {
- Must(result.length > 0); // zero length ought to imply STORE_OK
- tempBuffer.offset = urnState->reqofs;
- tempBuffer.length = URN_REQBUF_SZ - urnState->reqofs;
- tempBuffer.data = urnState->reqbuf + urnState->reqofs;
+ if (!urnState->sc->atEof()) {
+ const auto bufferedBytes = urnState->parsingBuffer.contentSize();
+ const auto remainingSpace = urnState->parsingBuffer.space().positionAt(bufferedBytes);
+
+ if (!remainingSpace.length) {
+ debugs(52, 3, "ran out of buffer space after " << bufferedBytes << " bytes");
+ // TODO: Here and in other error cases, send ERR_URN_RESOLVE to client.
+ delete urnState;
+ return;
+ }
+
storeClientCopy(urnState->sc, urlres_e,
- tempBuffer,
+ remainingSpace,
urnHandleReply,
urnState);
return;
}
- /* we know its STORE_OK */
- k = headersEnd(buf, urnState->reqofs);
-
- if (0 == k) {
- debugs(52, DBG_IMPORTANT, "urnHandleReply: didn't find end-of-headers for " << e->url() );
- delete urnState;
- return;
- }
-
- s = buf + k;
- // TODO: Check whether we should parse urlres_e reply, as before 528b2c61.
- rep = new HttpReply;
- rep->parseCharBuf(buf, k);
- debugs(52, 3, "reply exists, code=" << rep->sline.status() << ".");
-
- if (rep->sline.status() != Http::scOkay) {
+ const auto &peerReply = urlres_e->mem().baseReply();
+ debugs(52, 3, "got reply, code=" << peerReply.sline.status());
+ if (peerReply.sline.status() != Http::scOkay) {
debugs(52, 3, "urnHandleReply: failed.");
err = new ErrorState(ERR_URN_RESOLVE, Http::scNotFound, urnState->request.getRaw(), urnState->ale);
err->url = xstrdup(e->url());
errorAppendEntry(e, err);
- delete rep;
delete urnState;
return;
}
- delete rep;
-
- while (xisspace(*s))
- ++s;
-
- urls = urnParseReply(s, urnState->request->method);
+ urls = urnParseReply(urnState->parsingBuffer.toSBuf(), urnState->request->method);
if (!urls) { /* unknown URN error */
debugs(52, 3, "urnTranslateDone: unknown URN " << e->url());
@@ -367,7 +335,7 @@ urnHandleReply(void *data, StoreIOBuffer result)
"Generated by %s@%s\n"
"</ADDRESS>\n",
APP_FULLNAME, getMyHostname());
- rep = new HttpReply;
+ const auto rep = new HttpReply;
rep->setHeaders(Http::scFound, NULL, "text/html", mb->length(), 0, squid_curtime);
if (min_u) {
@@ -389,9 +357,8 @@ urnHandleReply(void *data, StoreIOBuffer result)
}
static url_entry *
-urnParseReply(const char *inbuf, const HttpRequestMethod& m)
+urnParseReply(const SBuf &inBuf, const HttpRequestMethod &m)
{
- char *buf = xstrdup(inbuf);
char *token;
url_entry *list;
url_entry *old;
@@ -400,6 +367,13 @@ urnParseReply(const char *inbuf, const HttpRequestMethod& m)
debugs(52, 3, "urnParseReply");
list = (url_entry *)xcalloc(n + 1, sizeof(*list));
+ // XXX: Switch to tokenizer-based parsing.
+ const auto allocated = SBufToCstring(inBuf);
+
+ auto buf = allocated;
+ while (xisspace(*buf))
+ ++buf;
+
for (token = strtok(buf, crlf); token; token = strtok(NULL, crlf)) {
debugs(52, 3, "urnParseReply: got '" << token << "'");
@@ -435,7 +409,7 @@ urnParseReply(const char *inbuf, const HttpRequestMethod& m)
}
debugs(52, 3, "urnParseReply: Found " << i << " URLs");
- xfree(buf);
+ xfree(allocated);
return list;
}