commit bf9a9ec5329bde6acc26797d1fa7a7a165fec01f Author: Tomas Korbar Date: Tue Nov 21 13:21:43 2023 +0100 Fix CVE-2023-5824 (#1335) (#1561) (#1562) Supply ALE with HttpReply before checking http_reply_access (#398) Replace adjustable base reply - downstream change neccessary for backport diff --git a/src/AccessLogEntry.cc b/src/AccessLogEntry.cc index 1956c9b..4f1e73e 100644 --- a/src/AccessLogEntry.cc +++ b/src/AccessLogEntry.cc @@ -10,6 +10,7 @@ #include "AccessLogEntry.h" #include "HttpReply.h" #include "HttpRequest.h" +#include "MemBuf.h" #include "SquidConfig.h" #if USE_OPENSSL @@ -89,6 +90,8 @@ AccessLogEntry::getExtUser() const return nullptr; } +AccessLogEntry::AccessLogEntry() {} + AccessLogEntry::~AccessLogEntry() { safe_free(headers.request); @@ -97,14 +100,11 @@ AccessLogEntry::~AccessLogEntry() safe_free(adapt.last_meta); #endif - safe_free(headers.reply); - safe_free(headers.adapted_request); HTTPMSGUNLOCK(adapted_request); safe_free(lastAclName); - HTTPMSGUNLOCK(reply); HTTPMSGUNLOCK(request); #if ICAP_CLIENT HTTPMSGUNLOCK(icap.reply); @@ -124,3 +124,10 @@ AccessLogEntry::effectiveVirginUrl() const return nullptr; } +void +AccessLogEntry::packReplyHeaders(MemBuf &mb) const +{ + if (reply) + reply->packHeadersUsingFastPacker(mb); +} + diff --git a/src/AccessLogEntry.h b/src/AccessLogEntry.h index 1f29e61..f1d2ecc 100644 --- a/src/AccessLogEntry.h +++ b/src/AccessLogEntry.h @@ -40,13 +40,7 @@ class AccessLogEntry: public RefCountable public: typedef RefCount Pointer; - AccessLogEntry() : - url(nullptr), - lastAclName(nullptr), - reply(nullptr), - request(nullptr), - adapted_request(nullptr) - {} + AccessLogEntry(); ~AccessLogEntry(); /// Fetch the client IP log string into the given buffer. @@ -63,6 +57,9 @@ public: /// Fetch the transaction method string (ICP opcode, HTCP opcode or HTTP method) SBuf getLogMethod() const; + /// dump all reply headers (for sending or risky logging) + void packReplyHeaders(MemBuf &mb) const; + SBuf url; /// TCP/IP level details about the client connection @@ -187,14 +184,12 @@ public: public: Headers() : request(NULL), - adapted_request(NULL), - reply(NULL) {} + adapted_request(NULL) + {} char *request; //< virgin HTTP request headers char *adapted_request; //< HTTP request headers after adaptation and redirection - - char *reply; } headers; #if USE_ADAPTATION @@ -212,13 +207,13 @@ public: } adapt; #endif - const char *lastAclName; ///< string for external_acl_type %ACL format code + const char *lastAclName = nullptr; ///< string for external_acl_type %ACL format code SBuf lastAclData; ///< string for external_acl_type %DATA format code HierarchyLogEntry hier; - HttpReply *reply; - HttpRequest *request; //< virgin HTTP request - HttpRequest *adapted_request; //< HTTP request after adaptation and redirection + HttpReplyPointer reply; + HttpRequest *request = nullptr; //< virgin HTTP request + HttpRequest *adapted_request = nullptr; //< HTTP request after adaptation and redirection /// key:value pairs set by squid.conf note directive and /// key=value pairs returned from URL rewrite/redirect helper diff --git a/src/HttpHeader.cc b/src/HttpHeader.cc index 8dcc7e3..21206a9 100644 --- a/src/HttpHeader.cc +++ b/src/HttpHeader.cc @@ -9,6 +9,7 @@ /* DEBUG: section 55 HTTP Header */ #include "squid.h" +#include "base/Assure.h" #include "base/EnumIterator.h" #include "base64.h" #include "globals.h" diff --git a/src/HttpHeaderTools.cc b/src/HttpHeaderTools.cc index f1e45a4..1337b8d 100644 --- a/src/HttpHeaderTools.cc +++ b/src/HttpHeaderTools.cc @@ -479,7 +479,7 @@ httpHdrAdd(HttpHeader *heads, HttpRequest *request, const AccessLogEntryPointer checklist.al = al; if (al && al->reply) { - checklist.reply = al->reply; + checklist.reply = al->reply.getRaw(); HTTPMSGLOCK(checklist.reply); } diff --git a/src/HttpReply.cc b/src/HttpReply.cc index 6feb262..e74960b 100644 --- a/src/HttpReply.cc +++ b/src/HttpReply.cc @@ -20,7 +20,9 @@ #include "HttpReply.h" #include "HttpRequest.h" #include "MemBuf.h" +#include "sbuf/Stream.h" #include "SquidConfig.h" +#include "SquidMath.h" #include "SquidTime.h" #include "Store.h" #include "StrList.h" @@ -524,6 +526,38 @@ HttpReply::expectedBodyTooLarge(HttpRequest& request) return expectedSize > bodySizeMax; } +size_t +HttpReply::parseTerminatedPrefix(const char * const terminatedBuf, const size_t bufSize) +{ + auto error = Http::scNone; + const bool eof = false; // TODO: Remove after removing atEnd from HttpHeader::parse() + if (parse(terminatedBuf, bufSize, eof, &error)) { + debugs(58, 7, "success after accumulating " << bufSize << " bytes and parsing " << hdr_sz); + Assure(pstate == psParsed); + Assure(hdr_sz > 0); + Assure(!Less(bufSize, hdr_sz)); // cannot parse more bytes than we have + return hdr_sz; // success + } + + Assure(pstate != psParsed); + hdr_sz = 0; + + if (error) { + throw TextException(ToSBuf("failed to parse HTTP headers", + Debug::Extra, "parser error code: ", error, + Debug::Extra, "accumulated unparsed bytes: ", bufSize, + Debug::Extra, "reply_header_max_size: ", Config.maxReplyHeaderSize), + Here()); + } + + debugs(58, 3, "need more bytes after accumulating " << bufSize << " out of " << Config.maxReplyHeaderSize); + + // the parse() call above enforces Config.maxReplyHeaderSize limit + // XXX: Make this a strict comparison after fixing Http::Message::parse() enforcement + Assure(bufSize <= Config.maxReplyHeaderSize); + return 0; // parsed nothing, need more data +} + void HttpReply::calcMaxBodySize(HttpRequest& request) const { diff --git a/src/HttpReply.h b/src/HttpReply.h index 6c90e20..4301cfd 100644 --- a/src/HttpReply.h +++ b/src/HttpReply.h @@ -121,6 +121,13 @@ public: /// \returns false if any information is missing bool olderThan(const HttpReply *them) const; + /// Parses response status line and headers at the start of the given + /// NUL-terminated buffer of the given size. Respects reply_header_max_size. + /// Assures pstate becomes Http::Message::psParsed on (and only on) success. + /// \returns the number of bytes in a successfully parsed prefix (or zero) + /// \retval 0 implies that more data is needed to parse the response prefix + size_t parseTerminatedPrefix(const char *, size_t); + private: /** initialize */ void init(); diff --git a/src/MemObject.cc b/src/MemObject.cc index df7791f..650d3fd 100644 --- a/src/MemObject.cc +++ b/src/MemObject.cc @@ -196,8 +196,8 @@ struct LowestMemReader : public unary_function { LowestMemReader(int64_t seed):current(seed) {} void operator() (store_client const &x) { - if (x.memReaderHasLowerOffset(current)) - current = x.copyInto.offset; + if (x.getType() == STORE_MEM_CLIENT) + current = std::min(current, x.discardableHttpEnd()); } int64_t current; @@ -369,6 +369,12 @@ MemObject::policyLowestOffsetToKeep(bool swap) const */ int64_t lowest_offset = lowestMemReaderOffset(); + // XXX: Remove the last (Config.onoff.memory_cache_first-based) condition + // and update keepForLocalMemoryCache() accordingly. The caller wants to + // remove all local memory that is safe to remove. Honoring caching + // preferences is its responsibility. Our responsibility is safety. The + // situation was different when ff4b33f added that condition -- there was no + // keepInLocalMemory/keepForLocalMemoryCache() call guard back then. if (endOffset() < lowest_offset || endOffset() - inmem_lo > (int64_t)Config.Store.maxInMemObjSize || (swap && !Config.onoff.memory_cache_first)) @@ -492,7 +498,7 @@ MemObject::mostBytesAllowed() const #endif - j = sc->delayId.bytesWanted(0, sc->copyInto.length); + j = sc->bytesWanted(); if (j > jmax) { jmax = j; diff --git a/src/MemObject.h b/src/MemObject.h index 711966d..9f4add0 100644 --- a/src/MemObject.h +++ b/src/MemObject.h @@ -56,9 +56,23 @@ public: void write(const StoreIOBuffer &buf); void unlinkRequest(); + + /// HTTP response before 304 (Not Modified) updates + /// starts "empty"; modified via replaceBaseReply() or adjustableBaseReply() + HttpReply &baseReply() const { return *_reply; } + HttpReply const *getReply() const; void replaceHttpReply(HttpReply *newrep); void stat (MemBuf * mb) const; + + /// The offset of the last memory-stored HTTP response byte plus one. + /// * HTTP response headers (if any) are stored at offset zero. + /// * HTTP response body byte[n] usually has offset (hdr_sz + n), where + /// hdr_sz is the size of stored HTTP response headers (zero if none); and + /// n is the corresponding byte offset in the whole resource body. + /// However, some 206 (Partial Content) response bodies are stored (and + /// retrieved) as regular 200 response bodies, disregarding offsets of + /// their body parts. \sa HttpStateData::decideIfWeDoRanges(). int64_t endOffset () const; void markEndOfReplyHeaders(); ///< sets _reply->hdr_sz to endOffset() /// negative if unknown; otherwise, expected object_sz, expected endOffset diff --git a/src/MemStore.cc b/src/MemStore.cc index a4a6ab2..6762c4f 100644 --- a/src/MemStore.cc +++ b/src/MemStore.cc @@ -17,6 +17,8 @@ #include "MemObject.h" #include "MemStore.h" #include "mime_header.h" +#include "sbuf/SBuf.h" +#include "sbuf/Stream.h" #include "SquidConfig.h" #include "SquidMath.h" #include "StoreStats.h" @@ -316,19 +318,25 @@ MemStore::get(const cache_key *key) // create a brand new store entry and initialize it with stored info StoreEntry *e = new StoreEntry(); - // XXX: We do not know the URLs yet, only the key, but we need to parse and - // store the response for the Root().find() callers to be happy because they - // expect IN_MEMORY entries to already have the response headers and body. - e->createMemObject(); - - anchorEntry(*e, index, *slot); - - const bool copied = copyFromShm(*e, index, *slot); - - if (copied) - return e; + try { + // XXX: We do not know the URLs yet, only the key, but we need to parse and + // store the response for the Root().find() callers to be happy because they + // expect IN_MEMORY entries to already have the response headers and body. + e->createMemObject(); + + anchorEntry(*e, index, *slot); + + // TODO: make copyFromShm() throw on all failures, simplifying this code + if (copyFromShm(*e, index, *slot)) + return e; + debugs(20, 3, "failed for " << *e); + } catch (...) { + // see store_client::parseHttpHeadersFromDisk() for problems this may log + debugs(20, DBG_IMPORTANT, "ERROR: Cannot load a cache hit from shared memory" << + Debug::Extra << "exception: " << CurrentException << + Debug::Extra << "cache_mem entry: " << *e); + } - debugs(20, 3, "failed for " << *e); map->freeEntry(index); // do not let others into the same trap destroyStoreEntry(static_cast(e)); return NULL; @@ -473,6 +481,8 @@ MemStore::copyFromShm(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnc Ipc::StoreMapSliceId sid = anchor.start; // optimize: remember the last sid bool wasEof = anchor.complete() && sid < 0; int64_t sliceOffset = 0; + + SBuf httpHeaderParsingBuffer; while (sid >= 0) { const Ipc::StoreMapSlice &slice = map->readableSlice(index, sid); // slice state may change during copying; take snapshots now @@ -495,10 +505,18 @@ MemStore::copyFromShm(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnc const StoreIOBuffer sliceBuf(wasSize - prefixSize, e.mem_obj->endOffset(), page + prefixSize); - if (!copyFromShmSlice(e, sliceBuf, wasEof)) - return false; + + copyFromShmSlice(e, sliceBuf); debugs(20, 8, "entry " << index << " copied slice " << sid << " from " << extra.page << '+' << prefixSize); + + // parse headers if needed; they might span multiple slices! + auto &reply = e.mem().baseReply(); + if (reply.pstate != psParsed) { + httpHeaderParsingBuffer.append(sliceBuf.data, sliceBuf.length); + if (reply.parseTerminatedPrefix(httpHeaderParsingBuffer.c_str(), httpHeaderParsingBuffer.length())) + httpHeaderParsingBuffer = SBuf(); // we do not need these bytes anymore + } } // else skip a [possibly incomplete] slice that we copied earlier @@ -524,6 +542,9 @@ MemStore::copyFromShm(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnc debugs(20, 5, "mem-loaded all " << e.mem_obj->endOffset() << '/' << anchor.basics.swap_file_sz << " bytes of " << e); + if (e.mem().baseReply().pstate != psParsed) + throw TextException(ToSBuf("truncated mem-cached headers; accumulated: ", httpHeaderParsingBuffer.length()), Here()); + // from StoreEntry::complete() e.mem_obj->object_sz = e.mem_obj->endOffset(); e.store_status = STORE_OK; @@ -539,32 +560,11 @@ MemStore::copyFromShm(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnc } /// imports one shared memory slice into local memory -bool -MemStore::copyFromShmSlice(StoreEntry &e, const StoreIOBuffer &buf, bool eof) +void +MemStore::copyFromShmSlice(StoreEntry &e, const StoreIOBuffer &buf) { debugs(20, 7, "buf: " << buf.offset << " + " << buf.length); - // from store_client::readBody() - // parse headers if needed; they might span multiple slices! - HttpReply *rep = (HttpReply *)e.getReply(); - if (rep->pstate < psParsed) { - // XXX: have to copy because httpMsgParseStep() requires 0-termination - MemBuf mb; - mb.init(buf.length+1, buf.length+1); - mb.append(buf.data, buf.length); - mb.terminate(); - const int result = rep->httpMsgParseStep(mb.buf, buf.length, eof); - if (result > 0) { - assert(rep->pstate == psParsed); - } else if (result < 0) { - debugs(20, DBG_IMPORTANT, "Corrupted mem-cached headers: " << e); - return false; - } else { // more slices are needed - assert(!eof); - } - } - debugs(20, 7, "rep pstate: " << rep->pstate); - // local memory stores both headers and body so copy regardless of pstate const int64_t offBefore = e.mem_obj->endOffset(); assert(e.mem_obj->data_hdr.write(buf)); // from MemObject::write() @@ -572,7 +572,6 @@ MemStore::copyFromShmSlice(StoreEntry &e, const StoreIOBuffer &buf, bool eof) // expect to write the entire buf because StoreEntry::write() never fails assert(offAfter >= 0 && offBefore <= offAfter && static_cast(offAfter - offBefore) == buf.length); - return true; } /// whether we should cache the entry diff --git a/src/MemStore.h b/src/MemStore.h index 516da3c..31a2015 100644 --- a/src/MemStore.h +++ b/src/MemStore.h @@ -76,7 +76,7 @@ protected: void copyToShm(StoreEntry &e); void copyToShmSlice(StoreEntry &e, Ipc::StoreMapAnchor &anchor, Ipc::StoreMap::Slice &slice); bool copyFromShm(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnchor &anchor); - bool copyFromShmSlice(StoreEntry &e, const StoreIOBuffer &buf, bool eof); + void copyFromShmSlice(StoreEntry &, const StoreIOBuffer &); void updateHeadersOrThrow(Ipc::StoreMapUpdate &update); diff --git a/src/SquidMath.h b/src/SquidMath.h index c70acd1..bfca0cc 100644 --- a/src/SquidMath.h +++ b/src/SquidMath.h @@ -9,6 +9,11 @@ #ifndef _SQUID_SRC_SQUIDMATH_H #define _SQUID_SRC_SQUIDMATH_H +#include +#include + +// TODO: Move to src/base/Math.h and drop the Math namespace + /* Math functions we define locally for Squid */ namespace Math { @@ -21,5 +26,165 @@ double doubleAverage(const double, const double, int, const int); } // namespace Math +// If Sum() performance becomes important, consider using GCC and clang +// built-ins like __builtin_add_overflow() instead of manual overflow checks. + +/// detects a pair of unsigned types +/// reduces code duplication in declarations further below +template +using AllUnsigned = typename std::conditional< + std::is_unsigned::value && std::is_unsigned::value, + std::true_type, + std::false_type + >::type; + +// TODO: Replace with std::cmp_less() after migrating to C++20. +/// whether integer a is less than integer b, with correct overflow handling +template +constexpr bool +Less(const A a, const B b) { + // The casts below make standard C++ integer conversions explicit. They + // quell compiler warnings about signed/unsigned comparison. The first two + // lines exclude different-sign a and b, making the casts/comparison safe. + using AB = typename std::common_type::type; + return + (a >= 0 && b < 0) ? false : + (a < 0 && b >= 0) ? true : + /* (a >= 0) == (b >= 0) */ static_cast(a) < static_cast(b); +} + +/// ensure that T is supported by NaturalSum() and friends +template +constexpr void +AssertNaturalType() +{ + static_assert(std::numeric_limits::is_bounded, "std::numeric_limits::max() is meaningful"); + static_assert(std::numeric_limits::is_exact, "no silent loss of precision"); + static_assert(!std::is_enum::value, "no silent creation of non-enumerated values"); +} + +// TODO: Investigate whether this optimization can be expanded to [signed] types +// A and B when std::numeric_limits::is_modulo is true. +/// This IncreaseSumInternal() overload is optimized for speed. +/// \returns a non-overflowing sum of the two unsigned arguments (or nothing) +/// \prec both argument types are unsigned +template ::value, int> = 0> +std::pair +IncreaseSumInternal(const A a, const B b) { + // paranoid: AllUnsigned precondition established that already + static_assert(std::is_unsigned::value, "AllUnsigned dispatch worked for A"); + static_assert(std::is_unsigned::value, "AllUnsigned dispatch worked for B"); + + AssertNaturalType(); + AssertNaturalType(); + AssertNaturalType(); + + // we should only be called by IncreaseSum(); it forces integer promotion + static_assert(std::is_same::value, "a will not be promoted"); + static_assert(std::is_same::value, "b will not be promoted"); + // and without integer promotions, a sum of unsigned integers is unsigned + static_assert(std::is_unsigned::value, "a+b is unsigned"); + + // with integer promotions ruled out, a or b can only undergo integer + // conversion to the higher rank type (A or B, we do not know which) + using AB = typename std::common_type::type; + static_assert(std::is_same::value || std::is_same::value, "no unexpected conversions"); + static_assert(std::is_same::value, "lossless assignment"); + const AB sum = a + b; + + static_assert(std::numeric_limits::is_modulo, "we can detect overflows"); + // 1. modulo math: overflowed sum is smaller than any of its operands + // 2. the sum may overflow S (i.e. the return base type) + // We do not need Less() here because we compare promoted unsigned types. + return (sum >= a && sum <= std::numeric_limits::max()) ? + std::make_pair(sum, true) : std::make_pair(S(), false); +} + +/// This IncreaseSumInternal() overload supports a larger variety of types. +/// \returns a non-overflowing sum of the two arguments (or nothing) +/// \returns nothing if at least one of the arguments is negative +/// \prec at least one of the argument types is signed +template ::value, int> = 0> +std::pair constexpr +IncreaseSumInternal(const A a, const B b) { + AssertNaturalType(); + AssertNaturalType(); + AssertNaturalType(); + + // we should only be called by IncreaseSum() that does integer promotion + static_assert(std::is_same::value, "a will not be promoted"); + static_assert(std::is_same::value, "b will not be promoted"); + + return + // We could support a non-under/overflowing sum of negative numbers, but + // our callers use negative values specially (e.g., for do-not-use or + // do-not-limit settings) and are not supposed to do math with them. + (a < 0 || b < 0) ? std::make_pair(S(), false) : + // To avoid undefined behavior of signed overflow, we must not compute + // the raw a+b sum if it may overflow. When A is not B, a or b undergoes + // (safe for non-negatives) integer conversion in these expressions, so + // we do not know the resulting a+b type AB and its maximum. We must + // also detect subsequent casting-to-S overflows. + // Overflow condition: (a + b > maxAB) or (a + b > maxS). + // A is an integer promotion of S, so maxS <= maxA <= maxAB. + // Since maxS <= maxAB, it is sufficient to just check: a + b > maxS, + // which is the same as the overflow-safe condition here: maxS - a < b. + // Finally, (maxS - a) cannot overflow because a is not negative and + // cannot underflow because a is a promotion of s: 0 <= a <= maxS. + Less(std::numeric_limits::max() - a, b) ? std::make_pair(S(), false) : + std::make_pair(S(a + b), true); +} + +/// argument pack expansion termination for IncreaseSum() +template +std::pair +IncreaseSum(const S s, const T t) +{ + // Force (always safe) integer promotions now, to give std::enable_if_t<> + // promoted types instead of entering IncreaseSumInternal(s,t) + // but getting a _signed_ promoted value of s or t in s + t. + return IncreaseSumInternal(+s, +t); +} + +/// \returns a non-overflowing sum of the arguments (or nothing) +template +std::pair +IncreaseSum(const S sum, const T t, const Args... args) { + const auto head = IncreaseSum(sum, t); + if (head.second) { + return IncreaseSum(head.first, args...); + } else { + // std::optional() triggers bogus -Wmaybe-uninitialized warnings in GCC v10.3 + return std::make_pair(S(), false); + } +} + +/// \returns an exact, non-overflowing sum of the arguments (or nothing) +template +std::pair +NaturalSum(const Args... args) { + return IncreaseSum(0, args...); +} + +/// Safely resets the given variable to NaturalSum() of the given arguments. +/// If the sum overflows, resets to variable's maximum possible value. +/// \returns the new variable value (like an assignment operator would) +template +S +SetToNaturalSumOrMax(S &var, const Args... args) +{ + var = NaturalSum(args...).value_or(std::numeric_limits::max()); + return var; +} + +/// converts a given non-negative integer into an integer of a given type +/// without loss of information or undefined behavior +template +Result +NaturalCast(const Source s) +{ + return NaturalSum(s).value(); +} + #endif /* _SQUID_SRC_SQUIDMATH_H */ diff --git a/src/Store.h b/src/Store.h index 3eb6b84..2475fe0 100644 --- a/src/Store.h +++ b/src/Store.h @@ -49,6 +49,9 @@ public: StoreEntry(); virtual ~StoreEntry(); + MemObject &mem() { assert(mem_obj); return *mem_obj; } + const MemObject &mem() const { assert(mem_obj); return *mem_obj; } + virtual HttpReply const *getReply() const; virtual void write (StoreIOBuffer); diff --git a/src/StoreClient.h b/src/StoreClient.h index 65472d8..942f9fc 100644 --- a/src/StoreClient.h +++ b/src/StoreClient.h @@ -9,11 +9,13 @@ #ifndef SQUID_STORECLIENT_H #define SQUID_STORECLIENT_H +#include "base/AsyncCall.h" #include "dlink.h" +#include "store/ParsingBuffer.h" #include "StoreIOBuffer.h" #include "StoreIOState.h" -typedef void STCB(void *, StoreIOBuffer); /* store callback */ +using STCB = void (void *, StoreIOBuffer); /* store callback */ class StoreEntry; @@ -39,17 +41,34 @@ class store_client public: store_client(StoreEntry *); ~store_client(); - bool memReaderHasLowerOffset(int64_t) const; + + /// the client will not use HTTP response bytes with lower offsets (if any) + auto discardableHttpEnd() const { return discardableHttpEnd_; } + int getType() const; - void fail(); - void callback(ssize_t len, bool error = false); + + /// React to the end of reading the response from disk. There will be no + /// more readHeader() and readBody() callbacks for the current storeRead() + /// swapin after this notification. + void noteSwapInDone(bool error); + void doCopy (StoreEntry *e); void readHeader(const char *buf, ssize_t len); void readBody(const char *buf, ssize_t len); + + /// Request StoreIOBuffer-described response data via an asynchronous STCB + /// callback. At most one outstanding request is allowed per store_client. void copy(StoreEntry *, StoreIOBuffer, STCB *, void *); + void dumpStats(MemBuf * output, int clientNumber) const; - int64_t cmp_offset; + // TODO: When STCB gets a dedicated Answer type, move this info there. + /// Whether the last successful storeClientCopy() answer was known to + /// contain the last body bytes of the HTTP response + /// \retval true requesting bytes at higher offsets is futile + /// \sa STCB + bool atEof() const { return atEof_; } + #if STORE_CLIENT_LIST_DEBUG void *owner; @@ -59,33 +78,86 @@ public: StoreIOState::Pointer swapin_sio; struct { + /// whether we are expecting a response to be swapped in from disk + /// (i.e. whether async storeRead() is currently in progress) + // TODO: a better name reflecting the 'in' scope of the flag bool disk_io_pending; + + /// whether the store_client::doCopy()-initiated STCB sequence is + /// currently in progress bool store_copying; - bool copy_event_pending; } flags; #if USE_DELAY_POOLS DelayId delayId; + + /// The maximum number of bytes the Store client can read/copy next without + /// overflowing its buffer and without violating delay pool limits. Store + /// I/O is not rate-limited, but we assume that the same number of bytes may + /// be read from the Squid-to-server connection that may be rate-limited. + int bytesWanted() const; + void setDelayId(DelayId delay_id); #endif dlink_node node; - /* Below here is private - do no alter outside storeClient calls */ - StoreIOBuffer copyInto; private: - bool moreToSend() const; + bool moreToRead() const; + bool canReadFromMemory() const; + bool answeredOnce() const { return answers >= 1; } + bool sendingHttpHeaders() const; + int64_t nextHttpReadOffset() const; void fileRead(); void scheduleDiskRead(); - void scheduleMemRead(); + void readFromMemory(); void scheduleRead(); bool startSwapin(); bool unpackHeader(char const *buf, ssize_t len); + void handleBodyFromDisk(); + void maybeWriteFromDiskToMemory(const StoreIOBuffer &); + + bool parseHttpHeadersFromDisk(); + bool tryParsingHttpHeaders(); + void skipHttpHeadersFromDisk(); + + void fail(); + void callback(ssize_t); + void noteCopiedBytes(size_t); + void noteNews(); + void finishCallback(); + static void FinishCallback(store_client *); int type; bool object_ok; + /// \copydoc atEof() + bool atEof_; + + /// Storage and metadata associated with the current copy() request. Ought + /// to be ignored when not answering a copy() request. + /// * copyInto.offset is the requested HTTP response body offset; + /// * copyInto.data is the client-owned, client-provided result buffer; + /// * copyInto.length is the size of the .data result buffer; + /// * copyInto.flags are unused by this class. + StoreIOBuffer copyInto; + + // TODO: Convert to uint64_t after fixing mem_hdr::endOffset() and friends. + /// \copydoc discardableHttpEnd() + int64_t discardableHttpEnd_ = 0; + + /// the total number of finishCallback() calls + uint64_t answers; + + /// Accumulates raw bytes read from Store while answering the current copy() + /// request. Buffer contents depends on the source and parsing stage; it may + /// hold (parts of) swap metadata, HTTP response headers, and/or HTTP + /// response body bytes. + std::pair parsingBuffer = std::make_pair(Store::ParsingBuffer(), false); + + StoreIOBuffer lastDiskRead; ///< buffer used for the last storeRead() call + /* Until we finish stuffing code into store_client */ public: @@ -97,6 +169,7 @@ public: bool pending() const; STCB *callback_handler; void *callback_data; + AsyncCall::Pointer notifier; } _callback; }; diff --git a/src/StoreIOBuffer.h b/src/StoreIOBuffer.h index 009aafe..ad1c491 100644 --- a/src/StoreIOBuffer.h +++ b/src/StoreIOBuffer.h @@ -43,6 +43,9 @@ public: return Range(offset, offset + length); } + /// convenience method for changing the offset of a being-configured buffer + StoreIOBuffer &positionAt(const int64_t newOffset) { offset = newOffset; return *this; } + void dump() const { if (fwrite(data, length, 1, stderr)) {} if (fwrite("\n", 1, 1, stderr)) {} diff --git a/src/acl/Asn.cc b/src/acl/Asn.cc index 94ec862..07353d6 100644 --- a/src/acl/Asn.cc +++ b/src/acl/Asn.cc @@ -16,20 +16,22 @@ #include "acl/DestinationIp.h" #include "acl/SourceAsn.h" #include "acl/Strategised.h" +#include "base/CharacterSet.h" #include "FwdState.h" #include "HttpReply.h" #include "HttpRequest.h" #include "ipcache.h" #include "MasterXaction.h" #include "mgr/Registration.h" +#include "parser/Tokenizer.h" #include "radix.h" #include "RequestFlags.h" +#include "sbuf/SBuf.h" #include "SquidConfig.h" #include "Store.h" #include "StoreClient.h" #define WHOIS_PORT 43 -#define AS_REQBUF_SZ 4096 /* BEGIN of definitions for radix tree entries */ @@ -70,33 +72,18 @@ class ASState CBDATA_CLASS(ASState); public: - ASState(); + ASState() = default; ~ASState(); StoreEntry *entry; store_client *sc; HttpRequest::Pointer request; int as_number; - int64_t offset; - int reqofs; - char reqbuf[AS_REQBUF_SZ]; - bool dataRead; + Store::ParsingBuffer parsingBuffer; }; CBDATA_CLASS_INIT(ASState); -ASState::ASState() : - entry(NULL), - sc(NULL), - request(NULL), - as_number(0), - offset(0), - reqofs(0), - dataRead(false) -{ - memset(reqbuf, 0, AS_REQBUF_SZ); -} - ASState::~ASState() { debugs(53, 3, entry->url()); @@ -112,7 +99,7 @@ struct rtentry_t { m_ADDR e_mask; }; -static int asnAddNet(char *, int); +static int asnAddNet(const SBuf &, int); static void asnCacheStart(int as); @@ -256,8 +243,7 @@ asnCacheStart(int as) } asState->entry = e; - StoreIOBuffer readBuffer (AS_REQBUF_SZ, asState->offset, asState->reqbuf); - storeClientCopy(asState->sc, e, readBuffer, asHandleReply, asState); + storeClientCopy(asState->sc, e, asState->parsingBuffer.makeInitialSpace(), asHandleReply, asState); } static void @@ -265,13 +251,8 @@ asHandleReply(void *data, StoreIOBuffer result) { ASState *asState = (ASState *)data; StoreEntry *e = asState->entry; - char *s; - char *t; - char *buf = asState->reqbuf; - int leftoversz = -1; - debugs(53, 3, "asHandleReply: Called with size=" << (unsigned int)result.length); - debugs(53, 3, "asHandleReply: buffer='" << buf << "'"); + debugs(53, 3, result << " for " << asState->as_number << " with " << *e); /* First figure out whether we should abort the request */ @@ -280,11 +261,7 @@ asHandleReply(void *data, StoreIOBuffer result) return; } - if (result.length == 0 && asState->dataRead) { - debugs(53, 3, "asHandleReply: Done: " << e->url()); - delete asState; - return; - } else if (result.flags.error) { + if (result.flags.error) { debugs(53, DBG_IMPORTANT, "asHandleReply: Called with Error set and size=" << (unsigned int) result.length); delete asState; return; @@ -294,117 +271,85 @@ asHandleReply(void *data, StoreIOBuffer result) return; } - /* - * Next, attempt to parse our request - * Remembering that the actual buffer size is retsize + reqofs! - */ - s = buf; + asState->parsingBuffer.appended(result.data, result.length); + Parser::Tokenizer tok(SBuf(asState->parsingBuffer.content().data, asState->parsingBuffer.contentSize())); + SBuf address; + // Word delimiters in WHOIS ASN replies. RFC 3912 mentions SP, CR, and LF. + // Others are added to mimic an earlier isspace()-based implementation. + static const auto WhoisSpaces = CharacterSet("ASCII_spaces", " \f\r\n\t\v"); + while (tok.token(address, WhoisSpaces)) { + (void)asnAddNet(address, asState->as_number); + } + asState->parsingBuffer.consume(tok.parsedSize()); + const auto leftoverBytes = asState->parsingBuffer.contentSize(); - while ((size_t)(s - buf) < result.length + asState->reqofs && *s != '\0') { - while (*s && xisspace(*s)) - ++s; + if (asState->sc->atEof()) { + if (leftoverBytes) + debugs(53, 2, "WHOIS: Discarding the last " << leftoverBytes << " received bytes of a truncated AS response"); + delete asState; + return; + } - for (t = s; *t; ++t) { - if (xisspace(*t)) - break; - } + if (asState->sc->atEof()) { + if (leftoverBytes) + debugs(53, 2, "WHOIS: Discarding the last " << leftoverBytes << " received bytes of a truncated AS response"); + delete asState; + return; + } - if (*t == '\0') { - /* oof, word should continue on next block */ - break; - } + const auto remainingSpace = asState->parsingBuffer.space().positionAt(result.offset + result.length); - *t = '\0'; - debugs(53, 3, "asHandleReply: AS# " << s << " (" << asState->as_number << ")"); - asnAddNet(s, asState->as_number); - s = t + 1; - asState->dataRead = true; + if (!remainingSpace.length) { + Assure(leftoverBytes); + debugs(53, DBG_IMPORTANT, "WARNING: Ignoring the tail of a WHOIS AS response" << + " with an unparsable section of " << leftoverBytes << + " bytes ending at offset " << remainingSpace.offset); + delete asState; + return; } - /* - * Next, grab the end of the 'valid data' in the buffer, and figure - * out how much data is left in our buffer, which we need to keep - * around for the next request - */ - leftoversz = (asState->reqofs + result.length) - (s - buf); - - assert(leftoversz >= 0); - - /* - * Next, copy the left over data, from s to s + leftoversz to the - * beginning of the buffer - */ - memmove(buf, s, leftoversz); - - /* - * Next, update our offset and reqofs, and kick off a copy if required - */ - asState->offset += result.length; - - asState->reqofs = leftoversz; - - debugs(53, 3, "asState->offset = " << asState->offset); - - if (e->store_status == STORE_PENDING) { - debugs(53, 3, "asHandleReply: store_status == STORE_PENDING: " << e->url() ); - StoreIOBuffer tempBuffer (AS_REQBUF_SZ - asState->reqofs, - asState->offset, - asState->reqbuf + asState->reqofs); - storeClientCopy(asState->sc, - e, - tempBuffer, - asHandleReply, - asState); - } else { - StoreIOBuffer tempBuffer; - debugs(53, 3, "asHandleReply: store complete, but data received " << e->url() ); - tempBuffer.offset = asState->offset; - tempBuffer.length = AS_REQBUF_SZ - asState->reqofs; - tempBuffer.data = asState->reqbuf + asState->reqofs; - storeClientCopy(asState->sc, - e, - tempBuffer, - asHandleReply, - asState); - } + const decltype(StoreIOBuffer::offset) stillReasonableOffset = 100000; // an arbitrary limit in bytes + if (remainingSpace.offset > stillReasonableOffset) { + // stop suspicious accumulation of parsed addresses and/or work + debugs(53, DBG_IMPORTANT, "WARNING: Ignoring the tail of a suspiciously large WHOIS AS response" << + " exceeding " << stillReasonableOffset << " bytes"); + delete asState; + return; + } + + storeClientCopy(asState->sc, e, remainingSpace, asHandleReply, asState); } /** * add a network (addr, mask) to the radix tree, with matching AS number */ static int -asnAddNet(char *as_string, int as_number) +asnAddNet(const SBuf &addressAndMask, const int as_number) { struct squid_radix_node *rn; CbDataList **Tail = NULL; CbDataList *q = NULL; as_info *asinfo = NULL; - Ip::Address mask; - Ip::Address addr; - char *t; - int bitl; - - t = strchr(as_string, '/'); - - if (t == NULL) { + static const CharacterSet NonSlashSet = CharacterSet("slash", "/").complement("non-slash"); + Parser::Tokenizer tok(addressAndMask); + SBuf addressToken; + if (!(tok.prefix(addressToken, NonSlashSet) && tok.skip('/'))) { debugs(53, 3, "asnAddNet: failed, invalid response from whois server."); return 0; } - *t = '\0'; - addr = as_string; - bitl = atoi(t + 1); - - if (bitl < 0) - bitl = 0; + const Ip::Address addr = addressToken.c_str(); // INET6 TODO : find a better way of identifying the base IPA family for mask than this. - t = strchr(as_string, '.'); + const auto addrFamily = (addressToken.find('.') != SBuf::npos) ? AF_INET : AF_INET6; // generate Netbits Format Mask + Ip::Address mask; mask.setNoAddr(); - mask.applyMask(bitl, (t!=NULL?AF_INET:AF_INET6) ); + int64_t bitl = 0; + if (tok.int64(bitl, 10, false)) + mask.applyMask(bitl, addrFamily); debugs(53, 3, "asnAddNet: called for " << addr << "/" << mask ); diff --git a/src/acl/FilledChecklist.cc b/src/acl/FilledChecklist.cc index 9826c24..33eeb67 100644 --- a/src/acl/FilledChecklist.cc +++ b/src/acl/FilledChecklist.cc @@ -116,7 +116,6 @@ ACLFilledChecklist::verifyAle() const if (reply && !al->reply) { showDebugWarning("HttpReply object"); al->reply = reply; - HTTPMSGLOCK(al->reply); } #if USE_IDENT diff --git a/src/adaptation/icap/ModXact.cc b/src/adaptation/icap/ModXact.cc index 370f077..2bcc917 100644 --- a/src/adaptation/icap/ModXact.cc +++ b/src/adaptation/icap/ModXact.cc @@ -1292,11 +1292,8 @@ void Adaptation::Icap::ModXact::finalizeLogInfo() al.adapted_request = adapted_request_; HTTPMSGLOCK(al.adapted_request); - if (adapted_reply_) { - al.reply = adapted_reply_; - HTTPMSGLOCK(al.reply); - } else - al.reply = NULL; + // XXX: This reply (and other ALE members!) may have been needed earlier. + al.reply = adapted_reply_; if (h->rfc931.size()) al.cache.rfc931 = h->rfc931.termedBuf(); @@ -1331,12 +1328,6 @@ void Adaptation::Icap::ModXact::finalizeLogInfo() if (replyHttpBodySize >= 0) al.cache.highOffset = replyHttpBodySize; //don't set al.cache.objectSize because it hasn't exist yet - - MemBuf mb; - mb.init(); - adapted_reply_->header.packInto(&mb); - al.headers.reply = xstrdup(mb.buf); - mb.clean(); } prepareLogWithRequestDetails(adapted_request_, alep); Xaction::finalizeLogInfo(); diff --git a/src/adaptation/icap/icap_log.cc b/src/adaptation/icap/icap_log.cc index ecc4baf..6bb5a6d 100644 --- a/src/adaptation/icap/icap_log.cc +++ b/src/adaptation/icap/icap_log.cc @@ -62,7 +62,7 @@ void icapLogLog(AccessLogEntry::Pointer &al) if (IcapLogfileStatus == LOG_ENABLE) { ACLFilledChecklist checklist(NULL, al->adapted_request, NULL); if (al->reply) { - checklist.reply = al->reply; + checklist.reply = al->reply.getRaw(); HTTPMSGLOCK(checklist.reply); } accessLogLogTo(Config.Log.icaplogs, al, &checklist); diff --git a/src/base/Assure.cc b/src/base/Assure.cc new file mode 100644 index 0000000..cb69fc5 --- /dev/null +++ b/src/base/Assure.cc @@ -0,0 +1,24 @@ +/* + * Copyright (C) 1996-2022 The Squid Software Foundation and contributors + * + * Squid software is distributed under GPLv2+ license and includes + * contributions from numerous individuals and organizations. + * Please see the COPYING and CONTRIBUTORS files for details. + */ + +#include "squid.h" +#include "base/Assure.h" +#include "base/TextException.h" +#include "sbuf/Stream.h" + +[[ noreturn ]] void +ReportAndThrow_(const int debugLevel, const char *description, const SourceLocation &location) +{ + const TextException ex(description, location); + const auto label = debugLevel <= DBG_IMPORTANT ? "ERROR: Squid BUG: " : ""; + // TODO: Consider also printing the number of BUGs reported so far. It would + // require GC, but we could even print the number of same-location reports. + debugs(0, debugLevel, label << ex); + throw ex; +} + diff --git a/src/base/Assure.h b/src/base/Assure.h new file mode 100644 index 0000000..bb571d2 --- /dev/null +++ b/src/base/Assure.h @@ -0,0 +1,52 @@ +/* + * Copyright (C) 1996-2022 The Squid Software Foundation and contributors + * + * Squid software is distributed under GPLv2+ license and includes + * contributions from numerous individuals and organizations. + * Please see the COPYING and CONTRIBUTORS files for details. + */ + +#ifndef SQUID_SRC_BASE_ASSURE_H +#define SQUID_SRC_BASE_ASSURE_H + +#include "base/Here.h" + +/// Reports the description (at the given debugging level) and throws +/// the corresponding exception. Reduces compiled code size of Assure() and +/// Must() callers. Do not call directly; use Assure() instead. +/// \param description explains the condition (i.e. what MUST happen) +[[ noreturn ]] void ReportAndThrow_(int debugLevel, const char *description, const SourceLocation &); + +/// Calls ReportAndThrow() if needed. Reduces caller code duplication. +/// Do not call directly; use Assure() instead. +/// \param description c-string explaining the condition (i.e. what MUST happen) +#define Assure_(debugLevel, condition, description, location) \ + while (!(condition)) \ + ReportAndThrow_((debugLevel), (description), (location)) + +#if !defined(NDEBUG) + +/// Like assert() but throws an exception instead of aborting the process. Use +/// this macro to detect code logic mistakes (i.e. bugs) where aborting the +/// current AsyncJob or a similar task is unlikely to jeopardize Squid service +/// integrity. For example, this macro is _not_ appropriate for detecting bugs +/// that indicate a dangerous global state corruption which may go unnoticed by +/// other jobs after the current job or task is aborted. +#define Assure(condition) \ + Assure2((condition), #condition) + +/// Like Assure() but allows the caller to customize the exception message. +/// \param description string literal describing the condition (i.e. what MUST happen) +#define Assure2(condition, description) \ + Assure_(0, (condition), ("assurance failed: " description), Here()) + +#else + +/* do-nothing implementations for NDEBUG builds */ +#define Assure(condition) ((void)0) +#define Assure2(condition, description) ((void)0) + +#endif /* NDEBUG */ + +#endif /* SQUID_SRC_BASE_ASSURE_H */ + diff --git a/src/base/Makefile.am b/src/base/Makefile.am index 9b0f4cf..d5f4c01 100644 --- a/src/base/Makefile.am +++ b/src/base/Makefile.am @@ -11,6 +11,8 @@ include $(top_srcdir)/src/TestHeaders.am noinst_LTLIBRARIES = libbase.la libbase_la_SOURCES = \ + Assure.cc \ + Assure.h \ AsyncCall.cc \ AsyncCall.h \ AsyncCallQueue.cc \ diff --git a/src/base/Makefile.in b/src/base/Makefile.in index 90a4f5b..6a83aa4 100644 --- a/src/base/Makefile.in +++ b/src/base/Makefile.in @@ -163,7 +163,7 @@ CONFIG_CLEAN_FILES = CONFIG_CLEAN_VPATH_FILES = LTLIBRARIES = $(noinst_LTLIBRARIES) libbase_la_LIBADD = -am_libbase_la_OBJECTS = AsyncCall.lo AsyncCallQueue.lo AsyncJob.lo \ +am_libbase_la_OBJECTS = Assure.lo AsyncCall.lo AsyncCallQueue.lo AsyncJob.lo \ CharacterSet.lo File.lo Here.lo RegexPattern.lo \ RunnersRegistry.lo TextException.lo libbase_la_OBJECTS = $(am_libbase_la_OBJECTS) @@ -186,7 +186,7 @@ am__v_at_1 = DEFAULT_INCLUDES = depcomp = $(SHELL) $(top_srcdir)/cfgaux/depcomp am__maybe_remake_depfiles = depfiles -am__depfiles_remade = ./$(DEPDIR)/AsyncCall.Plo \ +am__depfiles_remade = ./$(DEPDIR)/Assure.Plo ./$(DEPDIR)/AsyncCall.Plo \ ./$(DEPDIR)/AsyncCallQueue.Plo ./$(DEPDIR)/AsyncJob.Plo \ ./$(DEPDIR)/CharacterSet.Plo ./$(DEPDIR)/File.Plo \ ./$(DEPDIR)/Here.Plo ./$(DEPDIR)/RegexPattern.Plo \ @@ -729,6 +729,8 @@ COMPAT_LIB = $(top_builddir)/compat/libcompatsquid.la $(LIBPROFILER) subst_perlshell = sed -e 's,[@]PERL[@],$(PERL),g' <$(srcdir)/$@.pl.in >$@ || ($(RM) -f $@ ; exit 1) noinst_LTLIBRARIES = libbase.la libbase_la_SOURCES = \ + Assure.cc \ + Assure.h \ AsyncCall.cc \ AsyncCall.h \ AsyncCallQueue.cc \ @@ -827,6 +829,7 @@ mostlyclean-compile: distclean-compile: -rm -f *.tab.c +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Assure.Plo@am__quote@ # am--include-marker @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/AsyncCall.Plo@am__quote@ # am--include-marker @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/AsyncCallQueue.Plo@am__quote@ # am--include-marker @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/AsyncJob.Plo@am__quote@ # am--include-marker @@ -1167,7 +1170,8 @@ clean-am: clean-checkPROGRAMS clean-generic clean-libtool \ clean-noinstLTLIBRARIES mostlyclean-am distclean: distclean-am - -rm -f ./$(DEPDIR)/AsyncCall.Plo + -rm -f ./$(DEPDIR)/Assure.Plo + -rm -f ./$(DEPDIR)/AsyncCall.Plo -rm -f ./$(DEPDIR)/AsyncCallQueue.Plo -rm -f ./$(DEPDIR)/AsyncJob.Plo -rm -f ./$(DEPDIR)/CharacterSet.Plo @@ -1221,7 +1225,8 @@ install-ps-am: installcheck-am: maintainer-clean: maintainer-clean-am - -rm -f ./$(DEPDIR)/AsyncCall.Plo + -rm -f ./$(DEPDIR)/Assure.Plo + -rm -f ./$(DEPDIR)/AsyncCall.Plo -rm -f ./$(DEPDIR)/AsyncCallQueue.Plo -rm -f ./$(DEPDIR)/AsyncJob.Plo -rm -f ./$(DEPDIR)/CharacterSet.Plo diff --git a/src/base/TextException.cc b/src/base/TextException.cc index 5cfeb26..f895ae9 100644 --- a/src/base/TextException.cc +++ b/src/base/TextException.cc @@ -58,6 +58,13 @@ TextException::what() const throw() return result.what(); } +std::ostream & +operator <<(std::ostream &os, const TextException &ex) +{ + ex.print(os); + return os; +} + std::ostream & CurrentException(std::ostream &os) { diff --git a/src/base/TextException.h b/src/base/TextException.h index 6a79536..1f9ca11 100644 --- a/src/base/TextException.h +++ b/src/base/TextException.h @@ -9,6 +9,7 @@ #ifndef SQUID__TEXTEXCEPTION_H #define SQUID__TEXTEXCEPTION_H +#include "base/Assure.h" #include "base/Here.h" #include @@ -51,11 +52,12 @@ public: /// prints active (i.e., thrown but not yet handled) exception std::ostream &CurrentException(std::ostream &); +/// efficiently prints TextException +std::ostream &operator <<(std::ostream &, const TextException &); + /// legacy convenience macro; it is not difficult to type Here() now #define TexcHere(msg) TextException((msg), Here()) -/// Like assert() but throws an exception instead of aborting the process -/// and allows the caller to specify a custom exception message. #define Must2(condition, message) \ do { \ if (!(condition)) { \ @@ -65,8 +67,13 @@ std::ostream &CurrentException(std::ostream &); } \ } while (/*CONSTCOND*/ false) +/// Like assert() but throws an exception instead of aborting the process +/// and allows the caller to specify a custom exception message. +#define Must3(condition, description, location) \ + Assure_(3, (condition), ("check failed: " description), (location)) + /// Like assert() but throws an exception instead of aborting the process. -#define Must(condition) Must2((condition), "check failed: " #condition) +#define Must(condition) Must3((condition), #condition, Here()) /// Reports and swallows all exceptions to prevent compiler warnings and runtime /// errors related to throwing class destructors. Should be used for most dtors. diff --git a/src/clientStream.cc b/src/clientStream.cc index 04d89c0..bd5dd09 100644 --- a/src/clientStream.cc +++ b/src/clientStream.cc @@ -154,8 +154,7 @@ clientStreamCallback(clientStreamNode * thisObject, ClientHttpRequest * http, assert(thisObject && http && thisObject->node.next); next = thisObject->next(); - debugs(87, 3, "clientStreamCallback: Calling " << next->callback << " with cbdata " << - next->data.getRaw() << " from node " << thisObject); + debugs(87, 3, thisObject << " gives " << next->data << ' ' << replyBuffer); next->callback(next, http, rep, replyBuffer); } diff --git a/src/client_side.cc b/src/client_side.cc index ab393e4..c46a845 100644 --- a/src/client_side.cc +++ b/src/client_side.cc @@ -429,7 +429,7 @@ ClientHttpRequest::logRequest() // The al->notes and request->notes must point to the same object. (void)SyncNotes(*al, *request); for (auto i = Config.notes.begin(); i != Config.notes.end(); ++i) { - if (const char *value = (*i)->match(request, al->reply, al)) { + if (const char *value = (*i)->match(request, al->reply.getRaw(), al)) { NotePairs ¬es = SyncNotes(*al, *request); notes.add((*i)->key.termedBuf(), value); debugs(33, 3, (*i)->key.termedBuf() << " " << value); @@ -439,7 +439,7 @@ ClientHttpRequest::logRequest() ACLFilledChecklist checklist(NULL, request, NULL); if (al->reply) { - checklist.reply = al->reply; + checklist.reply = al->reply.getRaw(); HTTPMSGLOCK(checklist.reply); } @@ -457,7 +457,7 @@ ClientHttpRequest::logRequest() ACLFilledChecklist statsCheck(Config.accessList.stats_collection, request, NULL); statsCheck.al = al; if (al->reply) { - statsCheck.reply = al->reply; + statsCheck.reply = al->reply.getRaw(); HTTPMSGLOCK(statsCheck.reply); } updatePerformanceCounters = statsCheck.fastCheck().allowed(); @@ -3844,6 +3844,11 @@ ConnStateData::finishDechunkingRequest(bool withSuccess) void ConnStateData::sendControlMsg(HttpControlMsg msg) { + if (const auto context = pipeline.front()) { + if (context->http) + context->http->al->reply = msg.reply; + } + if (!isOpen()) { debugs(33, 3, HERE << "ignoring 1xx due to earlier closure"); return; diff --git a/src/client_side_reply.cc b/src/client_side_reply.cc index c919af4..fea5ecb 100644 --- a/src/client_side_reply.cc +++ b/src/client_side_reply.cc @@ -34,6 +34,7 @@ #include "RequestFlags.h" #include "SquidConfig.h" #include "SquidTime.h" +#include "SquidMath.h" #include "Store.h" #include "StrList.h" #include "tools.h" @@ -76,11 +77,7 @@ clientReplyContext::clientReplyContext(ClientHttpRequest *clientContext) : purgeStatus(Http::scNone), lookingforstore(0), http(cbdataReference(clientContext)), - headers_sz(0), sc(NULL), - old_reqsize(0), - reqsize(0), - reqofs(0), #if USE_CACHE_DIGESTS lookup_type(NULL), #endif @@ -166,8 +163,6 @@ void clientReplyContext::setReplyToStoreEntry(StoreEntry *entry, const char *rea #if USE_DELAY_POOLS sc->setDelayId(DelayId::DelayClient(http)); #endif - reqofs = 0; - reqsize = 0; if (http->request) http->request->ignoreRange(reason); flags.storelogiccomplete = 1; @@ -206,13 +201,10 @@ clientReplyContext::saveState() old_sc = sc; old_lastmod = http->request->lastmod; old_etag = http->request->etag; - old_reqsize = reqsize; - tempBuffer.offset = reqofs; + /* Prevent accessing the now saved entries */ http->storeEntry(NULL); sc = NULL; - reqsize = 0; - reqofs = 0; } void @@ -223,8 +215,6 @@ clientReplyContext::restoreState() removeClientStoreReference(&sc, http); http->storeEntry(old_entry); sc = old_sc; - reqsize = old_reqsize; - reqofs = tempBuffer.offset; http->request->lastmod = old_lastmod; http->request->etag = old_etag; /* Prevent accessed the old saved entries */ @@ -232,7 +222,7 @@ clientReplyContext::restoreState() old_sc = NULL; old_lastmod = -1; old_etag.clean(); - old_reqsize = 0; + tempBuffer.offset = 0; } @@ -250,18 +240,27 @@ clientReplyContext::getNextNode() const return (clientStreamNode *)ourNode->node.next->data; } -/* This function is wrong - the client parameters don't include the - * header offset - */ +/// Request HTTP response headers from Store, to be sent to the given recipient. +/// That recipient also gets zero, some, or all HTTP response body bytes (into +/// next()->readBuffer). void -clientReplyContext::triggerInitialStoreRead() +clientReplyContext::triggerInitialStoreRead(STCB recipient) { - /* when confident, 0 becomes reqofs, and then this factors into - * startSendProcess - */ - assert(reqofs == 0); + Assure(recipient != HandleIMSReply); + lastStreamBufferedBytes = StoreIOBuffer(); // storeClientCopy(next()->readBuffer) invalidates StoreIOBuffer localTempBuffer (next()->readBuffer.length, 0, next()->readBuffer.data); - storeClientCopy(sc, http->storeEntry(), localTempBuffer, SendMoreData, this); + ::storeClientCopy(sc, http->storeEntry(), localTempBuffer, recipient, this); +} + +/// Request HTTP response body bytes from Store into next()->readBuffer. This +/// method requests body bytes at readerBuffer.offset and, hence, it should only +/// be called after we triggerInitialStoreRead() and get the requested HTTP +/// response headers (using zero offset). +void +clientReplyContext::requestMoreBodyFromStore() +{ + lastStreamBufferedBytes = StoreIOBuffer(); // storeClientCopy(next()->readBuffer) invalidates + ::storeClientCopy(sc, http->storeEntry(), next()->readBuffer, SendMoreData, this); } /* there is an expired entry in the store. @@ -358,30 +357,23 @@ clientReplyContext::processExpired() { /* start counting the length from 0 */ StoreIOBuffer localTempBuffer(HTTP_REQBUF_SZ, 0, tempbuf); - storeClientCopy(sc, entry, localTempBuffer, HandleIMSReply, this); + // keep lastStreamBufferedBytes: tempbuf is not a Client Stream buffer + ::storeClientCopy(sc, entry, localTempBuffer, HandleIMSReply, this); } } void -clientReplyContext::sendClientUpstreamResponse() +clientReplyContext::sendClientUpstreamResponse(const StoreIOBuffer &upstreamResponse) { - StoreIOBuffer tempresult; removeStoreReference(&old_sc, &old_entry); if (collapsedRevalidation) http->storeEntry()->clearPublicKeyScope(); /* here the data to send is the data we just received */ - tempBuffer.offset = 0; - old_reqsize = 0; - /* sendMoreData tracks the offset as well. - * Force it back to zero */ - reqofs = 0; assert(!EBIT_TEST(http->storeEntry()->flags, ENTRY_ABORTED)); - /* TODO: provide sendMoreData with the ready parsed reply */ - tempresult.length = reqsize; - tempresult.data = tempbuf; - sendMoreData(tempresult); + + sendMoreData(upstreamResponse); } void @@ -398,11 +390,9 @@ clientReplyContext::sendClientOldEntry() restoreState(); /* here the data to send is in the next nodes buffers already */ assert(!EBIT_TEST(http->storeEntry()->flags, ENTRY_ABORTED)); - /* sendMoreData tracks the offset as well. - * Force it back to zero */ - reqofs = 0; - StoreIOBuffer tempresult (reqsize, reqofs, next()->readBuffer.data); - sendMoreData(tempresult); + Assure(matchesStreamBodyBuffer(lastStreamBufferedBytes)); + Assure(!lastStreamBufferedBytes.offset); + sendMoreData(lastStreamBufferedBytes); } /* This is the workhorse of the HandleIMSReply callback. @@ -416,11 +406,11 @@ clientReplyContext::handleIMSReply(StoreIOBuffer result) if (deleting) return; - debugs(88, 3, http->storeEntry()->url() << ", " << (long unsigned) result.length << " bytes"); - if (http->storeEntry() == NULL) return; + debugs(88, 3, http->storeEntry()->url() << " got " << result); + if (result.flags.error && !EBIT_TEST(http->storeEntry()->flags, ENTRY_ABORTED)) return; @@ -433,9 +423,6 @@ clientReplyContext::handleIMSReply(StoreIOBuffer result) return; } - /* update size of the request */ - reqsize = result.length + reqofs; - const Http::StatusCode status = http->storeEntry()->getReply()->sline.status(); // request to origin was aborted @@ -460,7 +447,7 @@ clientReplyContext::handleIMSReply(StoreIOBuffer result) if (http->request->flags.ims && !old_entry->modifiedSince(http->request->ims, http->request->imslen)) { // forward the 304 from origin debugs(88, 3, "origin replied 304, revalidating existing entry and forwarding 304 to client"); - sendClientUpstreamResponse(); + sendClientUpstreamResponse(result); } else { // send existing entry, it's still valid debugs(88, 3, "origin replied 304, revalidating existing entry and sending " << @@ -484,7 +471,7 @@ clientReplyContext::handleIMSReply(StoreIOBuffer result) http->logType = LOG_TCP_REFRESH_MODIFIED; debugs(88, 3, "origin replied " << status << ", replacing existing entry and forwarding to client"); - sendClientUpstreamResponse(); + sendClientUpstreamResponse(result); } } @@ -493,7 +480,7 @@ clientReplyContext::handleIMSReply(StoreIOBuffer result) http->logType = LOG_TCP_REFRESH_FAIL_ERR; debugs(88, 3, "origin replied with error " << status << ", forwarding to client due to fail_on_validation_err"); - sendClientUpstreamResponse(); + sendClientUpstreamResponse(result); } else { // ignore and let client have old entry http->logType = LOG_TCP_REFRESH_FAIL_OLD; @@ -506,13 +493,7 @@ clientReplyContext::handleIMSReply(StoreIOBuffer result) SQUIDCEXTERN CSR clientGetMoreData; SQUIDCEXTERN CSD clientReplyDetach; -/** - * clientReplyContext::cacheHit Should only be called until the HTTP reply headers - * have been parsed. Normally this should be a single call, but - * it might take more than one. As soon as we have the headers, - * we hand off to clientSendMoreData, processExpired, or - * processMiss. - */ +/// \copydoc clientReplyContext::cacheHit() void clientReplyContext::CacheHit(void *data, StoreIOBuffer result) { @@ -520,11 +501,11 @@ clientReplyContext::CacheHit(void *data, StoreIOBuffer result) context->cacheHit(result); } -/** - * Process a possible cache HIT. - */ +/// Processes HTTP response headers received from Store on a suspected cache hit +/// path. May be called several times (e.g., a Vary marker object hit followed +/// by the corresponding variant hit). void -clientReplyContext::cacheHit(StoreIOBuffer result) +clientReplyContext::cacheHit(const StoreIOBuffer result) { /** Ignore if the HIT object is being deleted. */ if (deleting) { @@ -536,7 +517,7 @@ clientReplyContext::cacheHit(StoreIOBuffer result) HttpRequest *r = http->request; - debugs(88, 3, "clientCacheHit: " << http->uri << ", " << result.length << " bytes"); + debugs(88, 3, http->uri << " got " << result); if (http->storeEntry() == NULL) { debugs(88, 3, "clientCacheHit: request aborted"); @@ -560,20 +541,7 @@ clientReplyContext::cacheHit(StoreIOBuffer result) return; } - if (result.length == 0) { - debugs(88, 5, "store IO buffer has no content. MISS"); - /* the store couldn't get enough data from the file for us to id the - * object - */ - /* treat as a miss */ - http->logType = LOG_TCP_MISS; - processMiss(); - return; - } - assert(!EBIT_TEST(e->flags, ENTRY_ABORTED)); - /* update size of the request */ - reqsize = result.length + reqofs; /* * Got the headers, now grok them @@ -587,6 +555,8 @@ clientReplyContext::cacheHit(StoreIOBuffer result) return; } + noteStreamBufferredBytes(result); + switch (varyEvaluateMatch(e, r)) { case VARY_NONE: @@ -687,7 +657,7 @@ clientReplyContext::cacheHit(StoreIOBuffer result) return; } else if (r->conditional()) { debugs(88, 5, "conditional HIT"); - if (processConditional(result)) + if (processConditional()) return; } @@ -806,7 +776,7 @@ clientReplyContext::processOnlyIfCachedMiss() /// process conditional request from client bool -clientReplyContext::processConditional(StoreIOBuffer &result) +clientReplyContext::processConditional() { StoreEntry *const e = http->storeEntry(); @@ -984,16 +954,7 @@ clientReplyContext::purgeFoundObject(StoreEntry *entry) http->logType = LOG_TCP_HIT; - reqofs = 0; - - localTempBuffer.offset = http->out.offset; - - localTempBuffer.length = next()->readBuffer.length; - - localTempBuffer.data = next()->readBuffer.data; - - storeClientCopy(sc, http->storeEntry(), - localTempBuffer, CacheHit, this); + triggerInitialStoreRead(CacheHit); } void @@ -1111,16 +1072,10 @@ clientReplyContext::purgeDoPurgeHead(StoreEntry *newEntry) } void -clientReplyContext::traceReply(clientStreamNode * node) +clientReplyContext::traceReply() { - clientStreamNode *nextNode = (clientStreamNode *)node->node.next->data; - StoreIOBuffer localTempBuffer; createStoreEntry(http->request->method, RequestFlags()); - localTempBuffer.offset = nextNode->readBuffer.offset + headers_sz; - localTempBuffer.length = nextNode->readBuffer.length; - localTempBuffer.data = nextNode->readBuffer.data; - storeClientCopy(sc, http->storeEntry(), - localTempBuffer, SendMoreData, this); + triggerInitialStoreRead(); http->storeEntry()->releaseRequest(); http->storeEntry()->buffer(); HttpReply *rep = new HttpReply; @@ -1169,16 +1124,16 @@ int clientReplyContext::storeOKTransferDone() const { assert(http->storeEntry()->objectLen() >= 0); + const auto headers_sz = http->storeEntry()->mem().baseReply().hdr_sz; assert(http->storeEntry()->objectLen() >= headers_sz); - if (http->out.offset >= http->storeEntry()->objectLen() - headers_sz) { - debugs(88,3,HERE << "storeOKTransferDone " << - " out.offset=" << http->out.offset << - " objectLen()=" << http->storeEntry()->objectLen() << - " headers_sz=" << headers_sz); - return 1; - } - return 0; + const auto done = http->out.offset >= http->storeEntry()->objectLen() - headers_sz; + const auto debugLevel = done ? 3 : 5; + debugs(88, debugLevel, done << + " out.offset=" << http->out.offset << + " objectLen()=" << http->storeEntry()->objectLen() << + " headers_sz=" << headers_sz); + return done ? 1 : 0; } int @@ -1190,10 +1145,9 @@ clientReplyContext::storeNotOKTransferDone() const MemObject *mem = http->storeEntry()->mem_obj; assert(mem != NULL); assert(http->request != NULL); - /* mem->reply was wrong because it uses the UPSTREAM header length!!! */ - HttpReply const *curReply = mem->getReply(); + const auto expectedBodySize = mem->baseReply().content_length; - if (headers_sz == 0) + if (mem->baseReply().pstate != psParsed) /* haven't found end of headers yet */ return 0; @@ -1202,19 +1156,14 @@ clientReplyContext::storeNotOKTransferDone() const * If we are sending a body and we don't have a content-length, * then we must wait for the object to become STORE_OK. */ - if (curReply->content_length < 0) - return 0; - - uint64_t expectedLength = curReply->content_length + http->out.headers_sz; - - if (http->out.size < expectedLength) + if (expectedBodySize < 0) return 0; - else { - debugs(88,3,HERE << "storeNotOKTransferDone " << - " out.size=" << http->out.size << - " expectedLength=" << expectedLength); - return 1; - } + const auto done = http->out.offset >= expectedBodySize; + const auto debugLevel = done ? 3 : 5; + debugs(88, debugLevel, done << + " out.offset=" << http->out.offset << + " expectedBodySize=" << expectedBodySize); + return done ? 1 : 0; } /* A write has completed, what is the next status based on the @@ -1632,6 +1581,8 @@ clientReplyContext::cloneReply() reply = http->storeEntry()->getReply()->clone(); HTTPMSGLOCK(reply); + http->al->reply = reply; + if (reply->sline.protocol == AnyP::PROTO_HTTP) { /* RFC 2616 requires us to advertise our version (but only on real HTTP traffic) */ reply->sline.version = Http::ProtocolVersion(); @@ -1778,20 +1729,12 @@ clientGetMoreData(clientStreamNode * aNode, ClientHttpRequest * http) assert (context); assert(context->http == http); - clientStreamNode *next = ( clientStreamNode *)aNode->node.next->data; - if (!context->ourNode) context->ourNode = aNode; /* no cbdatareference, this is only used once, and safely */ if (context->flags.storelogiccomplete) { - StoreIOBuffer tempBuffer; - tempBuffer.offset = next->readBuffer.offset + context->headers_sz; - tempBuffer.length = next->readBuffer.length; - tempBuffer.data = next->readBuffer.data; - - storeClientCopy(context->sc, http->storeEntry(), - tempBuffer, clientReplyContext::SendMoreData, context); + context->requestMoreBodyFromStore(); return; } @@ -1804,7 +1747,7 @@ clientGetMoreData(clientStreamNode * aNode, ClientHttpRequest * http) if (context->http->request->method == Http::METHOD_TRACE) { if (context->http->request->header.getInt64(Http::HdrType::MAX_FORWARDS) == 0) { - context->traceReply(aNode); + context->traceReply(); return; } @@ -1834,7 +1777,6 @@ clientReplyContext::doGetMoreData() #endif assert(http->logType.oldType == LOG_TCP_HIT); - reqofs = 0; /* guarantee nothing has been sent yet! */ assert(http->out.size == 0); assert(http->out.offset == 0); @@ -1849,10 +1791,7 @@ clientReplyContext::doGetMoreData() } } - localTempBuffer.offset = reqofs; - localTempBuffer.length = getNextNode()->readBuffer.length; - localTempBuffer.data = getNextNode()->readBuffer.data; - storeClientCopy(sc, http->storeEntry(), localTempBuffer, CacheHit, this); + triggerInitialStoreRead(CacheHit); } else { /* MISS CASE, http->logType is already set! */ processMiss(); @@ -1887,12 +1826,11 @@ clientReplyContext::makeThisHead() } bool -clientReplyContext::errorInStream(StoreIOBuffer const &result, size_t const &sizeToProcess)const +clientReplyContext::errorInStream(const StoreIOBuffer &result) const { return /* aborted request */ (http->storeEntry() && EBIT_TEST(http->storeEntry()->flags, ENTRY_ABORTED)) || - /* Upstream read error */ (result.flags.error) || - /* Upstream EOF */ (sizeToProcess == 0); + /* Upstream read error */ (result.flags.error); } void @@ -1913,24 +1851,17 @@ clientReplyContext::sendStreamError(StoreIOBuffer const &result) } void -clientReplyContext::pushStreamData(StoreIOBuffer const &result, char *source) +clientReplyContext::pushStreamData(const StoreIOBuffer &result) { - StoreIOBuffer localTempBuffer; - if (result.length == 0) { debugs(88, 5, "clientReplyContext::pushStreamData: marking request as complete due to 0 length store result"); flags.complete = 1; } - assert(result.offset - headers_sz == next()->readBuffer.offset); - localTempBuffer.offset = result.offset - headers_sz; - localTempBuffer.length = result.length; - - if (localTempBuffer.length) - localTempBuffer.data = source; + assert(!result.length || result.offset == next()->readBuffer.offset); clientStreamCallback((clientStreamNode*)http->client_stream.head->data, http, NULL, - localTempBuffer); + result); } clientStreamNode * @@ -2022,7 +1953,6 @@ clientReplyContext::processReplyAccess () if (http->logType.oldType == LOG_TCP_DENIED || http->logType.oldType == LOG_TCP_DENIED_REPLY || alwaysAllowResponse(reply->sline.status())) { - headers_sz = reply->hdr_sz; processReplyAccessResult(ACCESS_ALLOWED); return; } @@ -2033,8 +1963,6 @@ clientReplyContext::processReplyAccess () return; } - headers_sz = reply->hdr_sz; - /** check for absent access controls (permit by default) */ if (!Config.accessList.reply) { processReplyAccessResult(ACCESS_ALLOWED); @@ -2091,11 +2019,9 @@ clientReplyContext::processReplyAccessResult(const allow_t &accessAllowed) /* Ok, the reply is allowed, */ http->loggingEntry(http->storeEntry()); - ssize_t body_size = reqofs - reply->hdr_sz; - if (body_size < 0) { - reqofs = reply->hdr_sz; - body_size = 0; - } + Assure(matchesStreamBodyBuffer(lastStreamBufferedBytes)); + Assure(!lastStreamBufferedBytes.offset); + auto body_size = lastStreamBufferedBytes.length; // may be zero debugs(88, 3, "clientReplyContext::sendMoreData: Appending " << (int) body_size << " bytes after " << reply->hdr_sz << @@ -2123,19 +2049,27 @@ clientReplyContext::processReplyAccessResult(const allow_t &accessAllowed) assert (!flags.headersSent); flags.headersSent = true; + // next()->readBuffer.offset may be positive for Range requests, but our + // localTempBuffer initialization code assumes that next()->readBuffer.data + // points to the response body at offset 0 because the first + // storeClientCopy() request always has offset 0 (i.e. our first Store + // request ignores next()->readBuffer.offset). + // + // XXX: We cannot fully check that assumption: readBuffer.offset field is + // often out of sync with the buffer content, and if some buggy code updates + // the buffer while we were waiting for the processReplyAccessResult() + // callback, we may not notice. + StoreIOBuffer localTempBuffer; - char *buf = next()->readBuffer.data; - char *body_buf = buf + reply->hdr_sz; + const auto body_buf = next()->readBuffer.data; //Server side may disable ranges under some circumstances. if ((!http->request->range)) next()->readBuffer.offset = 0; - body_buf -= next()->readBuffer.offset; - - if (next()->readBuffer.offset != 0) { - if (next()->readBuffer.offset > body_size) { + if (next()->readBuffer.offset > 0) { + if (Less(body_size, next()->readBuffer.offset)) { /* Can't use any of the body we received. send nothing */ localTempBuffer.length = 0; localTempBuffer.data = NULL; @@ -2148,7 +2082,6 @@ clientReplyContext::processReplyAccessResult(const allow_t &accessAllowed) localTempBuffer.data = body_buf; } - /* TODO??: move the data in the buffer back by the request header size */ clientStreamCallback((clientStreamNode *)http->client_stream.head->data, http, reply, localTempBuffer); @@ -2161,6 +2094,8 @@ clientReplyContext::sendMoreData (StoreIOBuffer result) if (deleting) return; + debugs(88, 5, http->uri << " got " << result); + StoreEntry *entry = http->storeEntry(); if (ConnStateData * conn = http->getConn()) { @@ -2173,7 +2108,9 @@ clientReplyContext::sendMoreData (StoreIOBuffer result) return; } - if (reqofs==0 && !http->logType.isTcpHit()) { + if (!flags.headersSent && !http->logType.isTcpHit()) { + // We get here twice if processReplyAccessResult() calls startError(). + // TODO: Revise when we check/change QoS markings to reduce syscalls. if (Ip::Qos::TheConfig.isHitTosActive()) { Ip::Qos::doTosLocalMiss(conn->clientConnection, http->request->hier.code); } @@ -2187,21 +2124,9 @@ clientReplyContext::sendMoreData (StoreIOBuffer result) " out.offset=" << http->out.offset); } - char *buf = next()->readBuffer.data; - - if (buf != result.data) { - /* we've got to copy some data */ - assert(result.length <= next()->readBuffer.length); - memcpy(buf, result.data, result.length); - } - /* We've got the final data to start pushing... */ flags.storelogiccomplete = 1; - reqofs += result.length; - - assert(reqofs <= HTTP_REQBUF_SZ || flags.headersSent); - assert(http->request != NULL); /* ESI TODO: remove this assert once everything is stable */ @@ -2210,20 +2135,25 @@ clientReplyContext::sendMoreData (StoreIOBuffer result) makeThisHead(); - debugs(88, 5, "clientReplyContext::sendMoreData: " << http->uri << ", " << - reqofs << " bytes (" << result.length << - " new bytes)"); - - /* update size of the request */ - reqsize = reqofs; - - if (errorInStream(result, reqofs)) { + if (errorInStream(result)) { sendStreamError(result); return; } + if (!matchesStreamBodyBuffer(result)) { + // Subsequent processing expects response body bytes to be at the start + // of our Client Stream buffer. When given something else (e.g., bytes + // in our tempbuf), we copy and adjust to meet those expectations. + const auto &ourClientStreamsBuffer = next()->readBuffer; + assert(result.length <= ourClientStreamsBuffer.length); + memcpy(ourClientStreamsBuffer.data, result.data, result.length); + result.data = ourClientStreamsBuffer.data; + } + + noteStreamBufferredBytes(result); + if (flags.headersSent) { - pushStreamData (result, buf); + pushStreamData(result); return; } @@ -2234,23 +2164,38 @@ clientReplyContext::sendMoreData (StoreIOBuffer result) sc->setDelayId(DelayId::DelayClient(http,reply)); #endif - /* handle headers */ + holdingBuffer = result; + processReplyAccess(); + return; +} + +/// Whether the given body area describes the start of our Client Stream buffer. +/// An empty area does. +bool +clientReplyContext::matchesStreamBodyBuffer(const StoreIOBuffer &their) const +{ + // the answer is undefined for errors; they are not really "body buffers" + Assure(!their.flags.error); - if (Config.onoff.log_mime_hdrs) { - size_t k; + if (!their.length) + return true; // an empty body area always matches our body area - if ((k = headersEnd(buf, reqofs))) { - safe_free(http->al->headers.reply); - http->al->headers.reply = (char *)xcalloc(k + 1, 1); - xstrncpy(http->al->headers.reply, buf, k); - } + if (their.data != next()->readBuffer.data) { + debugs(88, 7, "no: " << their << " vs. " << next()->readBuffer); + return false; } - holdingBuffer = result; - processReplyAccess(); - return; + return true; +} + +void +clientReplyContext::noteStreamBufferredBytes(const StoreIOBuffer &result) +{ + Assure(matchesStreamBodyBuffer(result)); + lastStreamBufferedBytes = result; // may be unchanged and/or zero-length } + /* Using this breaks the client layering just a little! */ void @@ -2289,13 +2234,6 @@ clientReplyContext::createStoreEntry(const HttpRequestMethod& m, RequestFlags re sc->setDelayId(DelayId::DelayClient(http)); #endif - reqofs = 0; - - reqsize = 0; - - /* I don't think this is actually needed! -- adrian */ - /* http->reqbuf = http->norm_reqbuf; */ - // assert(http->reqbuf == http->norm_reqbuf); /* The next line is illegal because we don't know if the client stream * buffers have been set up */ diff --git a/src/client_side_reply.h b/src/client_side_reply.h index dddab1a..bf705a4 100644 --- a/src/client_side_reply.h +++ b/src/client_side_reply.h @@ -39,7 +39,6 @@ public: void purgeFoundGet(StoreEntry *newEntry); void purgeFoundHead(StoreEntry *newEntry); void purgeFoundObject(StoreEntry *entry); - void sendClientUpstreamResponse(); void purgeDoPurgeGet(StoreEntry *entry); void purgeDoPurgeHead(StoreEntry *entry); void doGetMoreData(); @@ -67,7 +66,7 @@ public: void processExpired(); clientStream_status_t replyStatus(); void processMiss(); - void traceReply(clientStreamNode * node); + void traceReply(); const char *storeId() const { return (http->store_id.size() > 0 ? http->store_id.termedBuf() : http->uri); } Http::StatusCode purgeStatus; @@ -77,13 +76,14 @@ public: virtual void created (StoreEntry *newEntry); ClientHttpRequest *http; - int headers_sz; store_client *sc; /* The store_client we're using */ StoreIOBuffer tempBuffer; /* For use in validating requests via IMS */ int old_reqsize; /* ... again, for the buffer */ - size_t reqsize; - size_t reqofs; - char tempbuf[HTTP_REQBUF_SZ]; ///< a temporary buffer if we need working storage + /// Buffer dedicated to receiving storeClientCopy() responses to generated + /// revalidation requests. These requests cannot use next()->readBuffer + /// because the latter keeps the contents of the stale HTTP response during + /// revalidation. sendClientOldEntry() uses that contents. + char tempbuf[HTTP_REQBUF_SZ]; #if USE_CACHE_DIGESTS const char *lookup_type; /* temporary hack: storeGet() result: HIT/MISS/NONE */ @@ -101,9 +101,10 @@ public: private: clientStreamNode *getNextNode() const; void makeThisHead(); - bool errorInStream(StoreIOBuffer const &result, size_t const &sizeToProcess)const ; + bool errorInStream(const StoreIOBuffer &result) const; + bool matchesStreamBodyBuffer(const StoreIOBuffer &) const; void sendStreamError(StoreIOBuffer const &result); - void pushStreamData(StoreIOBuffer const &result, char *source); + void pushStreamData(const StoreIOBuffer &); clientStreamNode * next() const; StoreIOBuffer holdingBuffer; HttpReply *reply; @@ -115,11 +116,13 @@ private: bool alwaysAllowResponse(Http::StatusCode sline) const; int checkTransferDone(); void processOnlyIfCachedMiss(); - bool processConditional(StoreIOBuffer &result); + bool processConditional(); + void noteStreamBufferredBytes(const StoreIOBuffer &); void cacheHit(StoreIOBuffer result); void handleIMSReply(StoreIOBuffer result); void sendMoreData(StoreIOBuffer result); - void triggerInitialStoreRead(); + void triggerInitialStoreRead(STCB = SendMoreData); + void requestMoreBodyFromStore(); void sendClientOldEntry(); void purgeAllCached(); void forgetHit(); @@ -129,6 +132,13 @@ private: void sendPreconditionFailedError(); void sendNotModified(); void sendNotModifiedOrPreconditionFailedError(); + void sendClientUpstreamResponse(const StoreIOBuffer &upstreamResponse); + + /// Reduces a chance of an accidental direct storeClientCopy() call that + /// (should but) forgets to invalidate our lastStreamBufferedBytes. This + /// function is not defined; decltype() syntax prohibits "= delete", but + /// function usage will trigger deprecation warnings and linking errors. + static decltype(::storeClientCopy) storeClientCopy [[deprecated]]; StoreEntry *old_entry; /* ... for entry to be validated */ @@ -145,6 +155,12 @@ private: } CollapsedRevalidation; CollapsedRevalidation collapsedRevalidation; + + /// HTTP response body bytes stored in our Client Stream buffer (if any) + StoreIOBuffer lastStreamBufferedBytes; + + // TODO: Remove after moving the meat of this function into a method. + friend CSR clientGetMoreData; }; #endif /* SQUID_CLIENTSIDEREPLY_H */ diff --git a/src/client_side_request.cc b/src/client_side_request.cc index ab08fd2..92da530 100644 --- a/src/client_side_request.cc +++ b/src/client_side_request.cc @@ -2045,6 +2045,8 @@ ClientHttpRequest::handleAdaptedHeader(HttpMsg *msg) storeEntry()->replaceHttpReply(new_rep); storeEntry()->timestampsSet(); + al->reply = new_rep; + if (!adaptedBodySource) // no body storeEntry()->complete(); clientGetMoreData(node, this); diff --git a/src/clients/Client.cc b/src/clients/Client.cc index f5defbb..cada70e 100644 --- a/src/clients/Client.cc +++ b/src/clients/Client.cc @@ -136,6 +136,8 @@ Client::setVirginReply(HttpReply *rep) assert(rep); theVirginReply = rep; HTTPMSGLOCK(theVirginReply); + if (fwd->al) + fwd->al->reply = theVirginReply; return theVirginReply; } @@ -155,6 +157,8 @@ Client::setFinalReply(HttpReply *rep) assert(rep); theFinalReply = rep; HTTPMSGLOCK(theFinalReply); + if (fwd->al) + fwd->al->reply = theFinalReply; // give entry the reply because haveParsedReplyHeaders() expects it there entry->replaceHttpReply(theFinalReply, false); // but do not write yet @@ -550,6 +554,7 @@ Client::blockCaching() ACLFilledChecklist ch(acl, originalRequest(), NULL); ch.reply = const_cast(entry->getReply()); // ACLFilledChecklist API bug HTTPMSGLOCK(ch.reply); + ch.al = fwd->al; if (!ch.fastCheck().allowed()) { // when in doubt, block debugs(20, 3, "store_miss prohibits caching"); return true; diff --git a/src/enums.h b/src/enums.h index 4a860d8..262d62c 100644 --- a/src/enums.h +++ b/src/enums.h @@ -203,7 +203,6 @@ enum { typedef enum { DIGEST_READ_NONE, DIGEST_READ_REPLY, - DIGEST_READ_HEADERS, DIGEST_READ_CBLOCK, DIGEST_READ_MASK, DIGEST_READ_DONE diff --git a/src/format/Format.cc b/src/format/Format.cc index 3b6a44b..689bdf9 100644 --- a/src/format/Format.cc +++ b/src/format/Format.cc @@ -330,7 +330,7 @@ log_quoted_string(const char *str, char *out) static const HttpMsg * actualReplyHeader(const AccessLogEntry::Pointer &al) { - const HttpMsg *msg = al->reply; + const HttpMsg *msg = al->reply.getRaw(); #if ICAP_CLIENT // al->icap.reqMethod is methodNone in access.log context if (!msg && al->icap.reqMethod == Adaptation::methodReqmod) @@ -853,24 +853,35 @@ Format::Format::assemble(MemBuf &mb, const AccessLogEntry::Pointer &al, int logS } else #endif { + // just headers without start-line and CRLF + // XXX: reconcile with 'headers.request; quote = 1; } break; case LFT_ADAPTED_REQUEST_ALL_HEADERS: + // just headers without start-line and CRLF + // XXX: reconcile with 'headers.adapted_request; quote = 1; break; - case LFT_REPLY_ALL_HEADERS: - out = al->headers.reply; + case LFT_REPLY_ALL_HEADERS: { + MemBuf allHeaders; + allHeaders.init(); + // status-line + headers + CRLF + // XXX: reconcile with '>h' and '>ha' + al->packReplyHeaders(allHeaders); + sb.assign(allHeaders.content(), allHeaders.contentSize()); + out = sb.c_str(); #if ICAP_CLIENT if (!out && al->icap.reqMethod == Adaptation::methodReqmod) out = al->headers.adapted_request; #endif quote = 1; - break; + } + break; case LFT_USER_NAME: #if USE_AUTH diff --git a/src/http.cc b/src/http.cc index 017e492..877172d 100644 --- a/src/http.cc +++ b/src/http.cc @@ -775,6 +775,9 @@ HttpStateData::processReplyHeader() void HttpStateData::handle1xx(HttpReply *reply) { + if (fwd->al) + fwd->al->reply = reply; + HttpReply::Pointer msg(reply); // will destroy reply if unused // one 1xx at a time: we must not be called while waiting for previous 1xx diff --git a/src/icmp/net_db.cc b/src/icmp/net_db.cc index 7dc42a2..52595f6 100644 --- a/src/icmp/net_db.cc +++ b/src/icmp/net_db.cc @@ -33,6 +33,7 @@ #include "mgr/Registration.h" #include "mime_header.h" #include "neighbors.h" +#include "sbuf/SBuf.h" #include "SquidConfig.h" #include "SquidTime.h" #include "Store.h" @@ -49,8 +50,6 @@ #include "ipcache.h" #include "StoreClient.h" -#define NETDB_REQBUF_SZ 4096 - typedef enum { STATE_NONE, STATE_HEADER, @@ -67,12 +66,8 @@ public: e(NULL), sc(NULL), r(theReq), - used(0), - buf_sz(NETDB_REQBUF_SZ), - buf_ofs(0), connstate(STATE_HEADER) { - *buf = 0; assert(NULL != r); HTTPMSGLOCK(r); @@ -92,10 +87,10 @@ public: StoreEntry *e; store_client *sc; HttpRequest *r; - int64_t used; - size_t buf_sz; - char buf[NETDB_REQBUF_SZ]; - int buf_ofs; + + /// for receiving a NetDB reply body from Store and interpreting it + Store::ParsingBuffer parsingBuffer; + netdb_conn_state_t connstate; }; @@ -698,24 +693,19 @@ netdbExchangeHandleReply(void *data, StoreIOBuffer receivedData) Ip::Address addr; netdbExchangeState *ex = (netdbExchangeState *)data; - int rec_sz = 0; - int o; struct in_addr line_addr; double rtt; double hops; - char *p; int j; HttpReply const *rep; - size_t hdr_sz; int nused = 0; - int size; - int oldbufofs = ex->buf_ofs; - rec_sz = 0; + size_t rec_sz = 0; // received record size (TODO: make const) rec_sz += 1 + sizeof(struct in_addr); rec_sz += 1 + sizeof(int); rec_sz += 1 + sizeof(int); + Assure(rec_sz <= ex->parsingBuffer.capacity()); debugs(38, 3, "netdbExchangeHandleReply: " << receivedData.length << " read bytes"); if (!cbdataReferenceValid(ex->p)) { @@ -726,64 +716,29 @@ netdbExchangeHandleReply(void *data, StoreIOBuffer receivedData) debugs(38, 3, "netdbExchangeHandleReply: for '" << ex->p->host << ":" << ex->p->http_port << "'"); - if (receivedData.length == 0 && !receivedData.flags.error) { + if (receivedData.flags.error) { debugs(38, 3, "netdbExchangeHandleReply: Done"); delete ex; return; } - p = ex->buf; - - /* Get the size of the buffer now */ - size = ex->buf_ofs + receivedData.length; - debugs(38, 3, "netdbExchangeHandleReply: " << size << " bytes buf"); - - /* Check if we're still doing headers */ - if (ex->connstate == STATE_HEADER) { - - ex->buf_ofs += receivedData.length; - - /* skip reply headers */ - - if ((hdr_sz = headersEnd(p, ex->buf_ofs))) { - debugs(38, 5, "netdbExchangeHandleReply: hdr_sz = " << hdr_sz); - rep = ex->e->getReply(); - assert(rep->sline.status() != Http::scNone); - debugs(38, 3, "netdbExchangeHandleReply: reply status " << rep->sline.status()); - - if (rep->sline.status() != Http::scOkay) { - delete ex; - return; - } - - assert((size_t)ex->buf_ofs >= hdr_sz); - - /* - * Now, point p to the part of the buffer where the data - * starts, and update the size accordingly - */ - assert(ex->used == 0); - ex->used = hdr_sz; - size = ex->buf_ofs - hdr_sz; - p += hdr_sz; - - /* Finally, set the conn state mode to STATE_BODY */ - ex->connstate = STATE_BODY; - } else { - StoreIOBuffer tempBuffer; - tempBuffer.offset = ex->buf_ofs; - tempBuffer.length = ex->buf_sz - ex->buf_ofs; - tempBuffer.data = ex->buf + ex->buf_ofs; - /* Have more headers .. */ - storeClientCopy(ex->sc, ex->e, tempBuffer, - netdbExchangeHandleReply, ex); + const auto scode = ex->e->mem().baseReply().sline.status(); + assert(scode != Http::scNone); + debugs(38, 3, "reply status " << scode); + if (scode != Http::scOkay) { + delete ex; return; - } + } + ex->connstate = STATE_BODY; } assert(ex->connstate == STATE_BODY); + ex->parsingBuffer.appended(receivedData.data, receivedData.length); + auto p = ex->parsingBuffer.c_str(); // current parsing position + auto size = ex->parsingBuffer.contentSize(); // bytes we still need to parse + /* If we get here, we have some body to parse .. */ debugs(38, 5, "netdbExchangeHandleReply: start parsing loop, size = " << size); @@ -792,6 +747,7 @@ netdbExchangeHandleReply(void *data, StoreIOBuffer receivedData) addr.setAnyAddr(); hops = rtt = 0.0; + size_t o; // current record parsing offset for (o = 0; o < rec_sz;) { switch ((int) *(p + o)) { @@ -829,8 +785,6 @@ netdbExchangeHandleReply(void *data, StoreIOBuffer receivedData) assert(o == rec_sz); - ex->used += rec_sz; - size -= rec_sz; p += rec_sz; @@ -838,32 +792,8 @@ netdbExchangeHandleReply(void *data, StoreIOBuffer receivedData) ++nused; } - /* - * Copy anything that is left over to the beginning of the buffer, - * and adjust buf_ofs accordingly - */ - - /* - * Evilly, size refers to the buf size left now, - * ex->buf_ofs is the original buffer size, so just copy that - * much data over - */ - memmove(ex->buf, ex->buf + (ex->buf_ofs - size), size); - - ex->buf_ofs = size; - - /* - * And don't re-copy the remaining data .. - */ - ex->used += size; - - /* - * Now the tricky bit - size _included_ the leftover bit from the _last_ - * storeClientCopy. We don't want to include that, or our offset will be wrong. - * So, don't count the size of the leftover buffer we began with. - * This can _disappear_ when we're not tracking offsets .. - */ - ex->used -= oldbufofs; + const auto parsedSize = ex->parsingBuffer.contentSize() - size; + ex->parsingBuffer.consume(parsedSize); debugs(38, 3, "netdbExchangeHandleReply: size left over in this buffer: " << size << " bytes"); @@ -871,20 +801,26 @@ netdbExchangeHandleReply(void *data, StoreIOBuffer receivedData) " entries, (x " << rec_sz << " bytes) == " << nused * rec_sz << " bytes total"); - debugs(38, 3, "netdbExchangeHandleReply: used " << ex->used); - if (EBIT_TEST(ex->e->flags, ENTRY_ABORTED)) { debugs(38, 3, "netdbExchangeHandleReply: ENTRY_ABORTED"); delete ex; - } else if (ex->e->store_status == STORE_PENDING) { - StoreIOBuffer tempBuffer; - tempBuffer.offset = ex->used; - tempBuffer.length = ex->buf_sz - ex->buf_ofs; - tempBuffer.data = ex->buf + ex->buf_ofs; - debugs(38, 3, "netdbExchangeHandleReply: EOF not received"); - storeClientCopy(ex->sc, ex->e, tempBuffer, - netdbExchangeHandleReply, ex); + return; } + + if (ex->sc->atEof()) { + if (const auto leftoverBytes = ex->parsingBuffer.contentSize()) + debugs(38, 2, "discarding a partially received record due to Store EOF: " << leftoverBytes); + delete ex; + return; + } + + // TODO: To protect us from a broken peer sending an "infinite" stream of + // new addresses, limit the cumulative number of received bytes or records? + + const auto remainingSpace = ex->parsingBuffer.space().positionAt(receivedData.offset + receivedData.length); + // rec_sz is at most buffer capacity, and we consume all fully loaded records + Assure(remainingSpace.length); + storeClientCopy(ex->sc, ex->e, remainingSpace, netdbExchangeHandleReply, ex); } #endif /* USE_ICMP */ @@ -1296,14 +1232,9 @@ netdbExchangeStart(void *data) ex->e = storeCreateEntry(uri, uri, RequestFlags(), Http::METHOD_GET); assert(NULL != ex->e); - StoreIOBuffer tempBuffer; - tempBuffer.length = ex->buf_sz; - tempBuffer.data = ex->buf; - ex->sc = storeClientListAdd(ex->e, ex); + storeClientCopy(ex->sc, ex->e, ex->parsingBuffer.makeInitialSpace(), netdbExchangeHandleReply, ex); - storeClientCopy(ex->sc, ex->e, tempBuffer, - netdbExchangeHandleReply, ex); ex->r->flags.loopDetected = true; /* cheat! -- force direct */ // XXX: send as Proxy-Authenticate instead diff --git a/src/internal.cc b/src/internal.cc index 81d5175..3a04ce0 100644 --- a/src/internal.cc +++ b/src/internal.cc @@ -9,6 +9,7 @@ /* DEBUG: section 76 Internal Squid Object handling */ #include "squid.h" +#include "base/Assure.h" #include "CacheManager.h" #include "comm/Connection.h" #include "errorpage.h" diff --git a/src/log/FormatHttpdCombined.cc b/src/log/FormatHttpdCombined.cc index 6639e88..70ea336 100644 --- a/src/log/FormatHttpdCombined.cc +++ b/src/log/FormatHttpdCombined.cc @@ -69,7 +69,10 @@ Log::Format::HttpdCombined(const AccessLogEntry::Pointer &al, Logfile * logfile) if (Config.onoff.log_mime_hdrs) { char *ereq = ::Format::QuoteMimeBlob(al->headers.request); - char *erep = ::Format::QuoteMimeBlob(al->headers.reply); + MemBuf mb; + mb.init(); + al->packReplyHeaders(mb); + auto erep = ::Format::QuoteMimeBlob(mb.content()); logfilePrintf(logfile, " [%s] [%s]\n", ereq, erep); safe_free(ereq); safe_free(erep); diff --git a/src/log/FormatHttpdCommon.cc b/src/log/FormatHttpdCommon.cc index 1613d0e..9e933a0 100644 --- a/src/log/FormatHttpdCommon.cc +++ b/src/log/FormatHttpdCommon.cc @@ -54,7 +54,10 @@ Log::Format::HttpdCommon(const AccessLogEntry::Pointer &al, Logfile * logfile) if (Config.onoff.log_mime_hdrs) { char *ereq = ::Format::QuoteMimeBlob(al->headers.request); - char *erep = ::Format::QuoteMimeBlob(al->headers.reply); + MemBuf mb; + mb.init(); + al->packReplyHeaders(mb); + auto erep = ::Format::QuoteMimeBlob(mb.content()); logfilePrintf(logfile, " [%s] [%s]\n", ereq, erep); safe_free(ereq); safe_free(erep); diff --git a/src/log/FormatSquidNative.cc b/src/log/FormatSquidNative.cc index 0ab97e4..23076b2 100644 --- a/src/log/FormatSquidNative.cc +++ b/src/log/FormatSquidNative.cc @@ -71,7 +71,10 @@ Log::Format::SquidNative(const AccessLogEntry::Pointer &al, Logfile * logfile) if (Config.onoff.log_mime_hdrs) { char *ereq = ::Format::QuoteMimeBlob(al->headers.request); - char *erep = ::Format::QuoteMimeBlob(al->headers.reply); + MemBuf mb; + mb.init(); + al->packReplyHeaders(mb); + auto erep = ::Format::QuoteMimeBlob(mb.content()); logfilePrintf(logfile, " [%s] [%s]\n", ereq, erep); safe_free(ereq); safe_free(erep); diff --git a/src/peer_digest.cc b/src/peer_digest.cc index 7b6314d..8a66277 100644 --- a/src/peer_digest.cc +++ b/src/peer_digest.cc @@ -39,7 +39,6 @@ static EVH peerDigestCheck; static void peerDigestRequest(PeerDigest * pd); static STCB peerDigestHandleReply; static int peerDigestFetchReply(void *, char *, ssize_t); -int peerDigestSwapInHeaders(void *, char *, ssize_t); int peerDigestSwapInCBlock(void *, char *, ssize_t); int peerDigestSwapInMask(void *, char *, ssize_t); static int peerDigestFetchedEnough(DigestFetchState * fetch, char *buf, ssize_t size, const char *step_name); @@ -374,6 +373,9 @@ peerDigestRequest(PeerDigest * pd) fetch->sc = storeClientListAdd(e, fetch); /* set lastmod to trigger IMS request if possible */ + // TODO: Also check for fetch->pd->cd presence as a precondition for sending + // IMS requests because peerDigestFetchReply() does not accept 304 responses + // without an in-memory cache digest. if (old_e) e->lastModified(old_e->lastModified()); @@ -408,11 +410,16 @@ peerDigestHandleReply(void *data, StoreIOBuffer receivedData) digest_read_state_t prevstate; int newsize; - assert(fetch->pd && receivedData.data); + if (receivedData.flags.error) { + peerDigestFetchAbort(fetch, fetch->buf, "failure loading digest reply from Store"); + return; + } + + assert(fetch->pd); /* The existing code assumes that the received pointer is * where we asked the data to be put */ - assert(fetch->buf + fetch->bufofs == receivedData.data); + assert(!receivedData.data || fetch->buf + fetch->bufofs == receivedData.data); /* Update the buffer size */ fetch->bufofs += receivedData.length; @@ -444,10 +451,6 @@ peerDigestHandleReply(void *data, StoreIOBuffer receivedData) retsize = peerDigestFetchReply(fetch, fetch->buf, fetch->bufofs); break; - case DIGEST_READ_HEADERS: - retsize = peerDigestSwapInHeaders(fetch, fetch->buf, fetch->bufofs); - break; - case DIGEST_READ_CBLOCK: retsize = peerDigestSwapInCBlock(fetch, fetch->buf, fetch->bufofs); break; @@ -487,7 +490,7 @@ peerDigestHandleReply(void *data, StoreIOBuffer receivedData) // checking at the beginning of this function. However, in this case, we would have to require // that the parser does not regard EOF as a special condition (it is true now but may change // in the future). - if (!receivedData.length) { // EOF + if (fetch->sc->atEof()) { peerDigestFetchAbort(fetch, fetch->buf, "premature end of digest reply"); return; } @@ -506,19 +509,12 @@ peerDigestHandleReply(void *data, StoreIOBuffer receivedData) } } -/* wait for full http headers to be received then parse them */ -/* - * This routine handles parsing the reply line. - * If the reply line indicates an OK, the same data is thrown - * to SwapInHeaders(). If the reply line is a NOT_MODIFIED, - * we simply stop parsing. - */ +/// handle HTTP response headers in the initial storeClientCopy() response static int peerDigestFetchReply(void *data, char *buf, ssize_t size) { DigestFetchState *fetch = (DigestFetchState *)data; PeerDigest *pd = fetch->pd; - size_t hdr_size; assert(pd && buf); assert(!fetch->offset); @@ -527,7 +523,7 @@ peerDigestFetchReply(void *data, char *buf, ssize_t size) if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestFetchReply")) return -1; - if ((hdr_size = headersEnd(buf, size))) { + { HttpReply const *reply = fetch->entry->getReply(); assert(reply); assert(reply->sline.status() != Http::scNone); @@ -563,6 +559,15 @@ peerDigestFetchReply(void *data, char *buf, ssize_t size) /* preserve request -- we need its size to update counters */ /* requestUnlink(r); */ /* fetch->entry->mem_obj->request = NULL; */ + + if (!fetch->pd->cd) { + peerDigestFetchAbort(fetch, buf, "304 without the old in-memory digest"); + return -1; + } + + // stay with the old in-memory digest + peerDigestFetchStop(fetch, buf, "Not modified"); + fetch->state = DIGEST_READ_DONE; } else if (status == Http::scOkay) { /* get rid of old entry if any */ @@ -573,67 +578,12 @@ peerDigestFetchReply(void *data, char *buf, ssize_t size) fetch->old_entry->unlock("peerDigestFetchReply 200"); fetch->old_entry = NULL; } + fetch->state = DIGEST_READ_CBLOCK; } else { /* some kind of a bug */ peerDigestFetchAbort(fetch, buf, reply->sline.reason()); return -1; /* XXX -1 will abort stuff in ReadReply! */ } - - /* must have a ready-to-use store entry if we got here */ - /* can we stay with the old in-memory digest? */ - if (status == Http::scNotModified && fetch->pd->cd) { - peerDigestFetchStop(fetch, buf, "Not modified"); - fetch->state = DIGEST_READ_DONE; - } else { - fetch->state = DIGEST_READ_HEADERS; - } - } else { - /* need more data, do we have space? */ - - if (size >= SM_PAGE_SIZE) - peerDigestFetchAbort(fetch, buf, "reply header too big"); - } - - /* We don't want to actually ack that we've handled anything, - * otherwise SwapInHeaders() won't get the reply line .. */ - return 0; -} - -/* fetch headers from disk, pass on to SwapInCBlock */ -int -peerDigestSwapInHeaders(void *data, char *buf, ssize_t size) -{ - DigestFetchState *fetch = (DigestFetchState *)data; - size_t hdr_size; - - assert(fetch->state == DIGEST_READ_HEADERS); - - if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestSwapInHeaders")) - return -1; - - assert(!fetch->offset); - - if ((hdr_size = headersEnd(buf, size))) { - assert(fetch->entry->getReply()); - assert(fetch->entry->getReply()->sline.status() != Http::scNone); - - if (fetch->entry->getReply()->sline.status() != Http::scOkay) { - debugs(72, DBG_IMPORTANT, "peerDigestSwapInHeaders: " << fetch->pd->host << - " status " << fetch->entry->getReply()->sline.status() << - " got cached!"); - - peerDigestFetchAbort(fetch, buf, "internal status error"); - return -1; - } - - fetch->state = DIGEST_READ_CBLOCK; - return hdr_size; /* Say how much data we read */ - } - - /* need more data, do we have space? */ - if (size >= SM_PAGE_SIZE) { - peerDigestFetchAbort(fetch, buf, "stored header too big"); - return -1; } return 0; /* We need to read more to parse .. */ @@ -755,7 +705,7 @@ peerDigestFetchedEnough(DigestFetchState * fetch, char *buf, ssize_t size, const } /* continue checking (maybe-successful eof case) */ - if (!reason && !size) { + if (!reason && !size && fetch->state != DIGEST_READ_REPLY) { if (!pd->cd) reason = "null digest?!"; else if (fetch->mask_offset != pd->cd->mask_size) diff --git a/src/servers/FtpServer.cc b/src/servers/FtpServer.cc index fab26cf..d3faa8d 100644 --- a/src/servers/FtpServer.cc +++ b/src/servers/FtpServer.cc @@ -777,12 +777,6 @@ Ftp::Server::handleReply(HttpReply *reply, StoreIOBuffer data) Http::StreamPointer context = pipeline.front(); assert(context != nullptr); - if (context->http && context->http->al != NULL && - !context->http->al->reply && reply) { - context->http->al->reply = reply; - HTTPMSGLOCK(context->http->al->reply); - } - static ReplyHandler handlers[] = { NULL, // fssBegin NULL, // fssConnected diff --git a/src/servers/Http1Server.cc b/src/servers/Http1Server.cc index 7514779..e76fb3e 100644 --- a/src/servers/Http1Server.cc +++ b/src/servers/Http1Server.cc @@ -310,9 +310,6 @@ Http::One::Server::handleReply(HttpReply *rep, StoreIOBuffer receivedData) } assert(rep); - HTTPMSGUNLOCK(http->al->reply); - http->al->reply = rep; - HTTPMSGLOCK(http->al->reply); context->sendStartOfMessage(rep, receivedData); } diff --git a/src/stmem.cc b/src/stmem.cc index d117c15..b627005 100644 --- a/src/stmem.cc +++ b/src/stmem.cc @@ -95,8 +95,6 @@ mem_hdr::freeDataUpto(int64_t target_offset) break; } - assert (lowestOffset () <= target_offset); - return lowestOffset (); } diff --git a/src/store.cc b/src/store.cc index 1948447..b4c7f82 100644 --- a/src/store.cc +++ b/src/store.cc @@ -273,6 +273,8 @@ StoreEntry::storeClientType() const assert(mem_obj); + debugs(20, 7, *this << " inmem_lo=" << mem_obj->inmem_lo); + if (mem_obj->inmem_lo) return STORE_DISK_CLIENT; @@ -300,6 +302,7 @@ StoreEntry::storeClientType() const return STORE_MEM_CLIENT; } } + debugs(20, 7, "STORE_OK STORE_DISK_CLIENT"); return STORE_DISK_CLIENT; } @@ -319,10 +322,18 @@ StoreEntry::storeClientType() const if (swap_status == SWAPOUT_NONE) return STORE_MEM_CLIENT; + // TODO: The above "must make this a mem client" logic contradicts "Slight + // weirdness" logic in store_client::doCopy() that converts hits to misses + // on startSwapin() failures. We should probably attempt to open a swapin + // file _here_ instead (and avoid STORE_DISK_CLIENT designation for clients + // that fail to do so). That would also address a similar problem with Rock + // store that does not yet support swapin during SWAPOUT_WRITING. + /* * otherwise, make subsequent clients read from disk so they * can not delay the first, and vice-versa. */ + debugs(20, 7, "STORE_PENDING STORE_DISK_CLIENT"); return STORE_DISK_CLIENT; } diff --git a/src/store/Makefile.am b/src/store/Makefile.am index be177d8..ccfc2dd 100644 --- a/src/store/Makefile.am +++ b/src/store/Makefile.am @@ -23,4 +23,6 @@ libstore_la_SOURCES= \ forward.h \ LocalSearch.cc \ LocalSearch.h \ + ParsingBuffer.cc \ + ParsingBuffer.h \ Storage.h diff --git a/src/store/Makefile.in b/src/store/Makefile.in index bb4387d..1959c99 100644 --- a/src/store/Makefile.in +++ b/src/store/Makefile.in @@ -163,7 +163,7 @@ CONFIG_CLEAN_FILES = CONFIG_CLEAN_VPATH_FILES = LTLIBRARIES = $(noinst_LTLIBRARIES) libstore_la_LIBADD = -am_libstore_la_OBJECTS = Controller.lo Disk.lo Disks.lo LocalSearch.lo +am_libstore_la_OBJECTS = Controller.lo Disk.lo Disks.lo LocalSearch.lo ParsingBuffer.lo libstore_la_OBJECTS = $(am_libstore_la_OBJECTS) AM_V_lt = $(am__v_lt_@AM_V@) am__v_lt_ = $(am__v_lt_@AM_DEFAULT_V@) @@ -185,7 +185,7 @@ DEFAULT_INCLUDES = depcomp = $(SHELL) $(top_srcdir)/cfgaux/depcomp am__maybe_remake_depfiles = depfiles am__depfiles_remade = ./$(DEPDIR)/Controller.Plo ./$(DEPDIR)/Disk.Plo \ - ./$(DEPDIR)/Disks.Plo ./$(DEPDIR)/LocalSearch.Plo + ./$(DEPDIR)/Disks.Plo ./$(DEPDIR)/LocalSearch.Plo ./$(DEPDIR)/ParsingBuffer.Plo am__mv = mv -f CXXCOMPILE = $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) \ $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CXXFLAGS) $(CXXFLAGS) @@ -776,6 +776,8 @@ libstore_la_SOURCES = \ forward.h \ LocalSearch.cc \ LocalSearch.h \ + ParsingBuffer.cc \ + ParsingBuffer.h \ Storage.h all: all-recursive @@ -846,6 +848,7 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Disk.Plo@am__quote@ # am--include-marker @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Disks.Plo@am__quote@ # am--include-marker @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/LocalSearch.Plo@am__quote@ # am--include-marker +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/ParsingBuffer.Plo@am__quote@ # am--include-marker $(am__depfiles_remade): @$(MKDIR_P) $(@D) @@ -1254,6 +1257,7 @@ distclean: distclean-recursive -rm -f ./$(DEPDIR)/Disk.Plo -rm -f ./$(DEPDIR)/Disks.Plo -rm -f ./$(DEPDIR)/LocalSearch.Plo + -rm -f ./$(DEPDIR)/ParsingBuffer.Plo -rm -f Makefile distclean-am: clean-am distclean-compile distclean-generic \ distclean-tags @@ -1303,6 +1307,7 @@ maintainer-clean: maintainer-clean-recursive -rm -f ./$(DEPDIR)/Disk.Plo -rm -f ./$(DEPDIR)/Disks.Plo -rm -f ./$(DEPDIR)/LocalSearch.Plo + -rm -f ./$(DEPDIR)/ParsingBuffer.Plo -rm -f Makefile maintainer-clean-am: distclean-am maintainer-clean-generic diff --git a/src/store/ParsingBuffer.cc b/src/store/ParsingBuffer.cc new file mode 100644 index 0000000..ca6be72 --- /dev/null +++ b/src/store/ParsingBuffer.cc @@ -0,0 +1,199 @@ +/* + * Copyright (C) 1996-2023 The Squid Software Foundation and contributors + * + * Squid software is distributed under GPLv2+ license and includes + * contributions from numerous individuals and organizations. + * Please see the COPYING and CONTRIBUTORS files for details. + */ + +#include "squid.h" +#include "sbuf/Stream.h" +#include "SquidMath.h" +#include "store/ParsingBuffer.h" + +#include + +// Several Store::ParsingBuffer() methods use assert() because the corresponding +// failure means there is a good chance that somebody have already read from (or +// written to) the wrong memory location. Since this buffer is used for storing +// HTTP response bytes, such failures may corrupt traffic. No Assure() handling +// code can safely recover from such failures. + +Store::ParsingBuffer::ParsingBuffer(StoreIOBuffer &initialSpace): + readerSuppliedMemory_(initialSpace) +{ +} + +/// a read-only content start (or nil for some zero-size buffers) +const char * +Store::ParsingBuffer::memory() const +{ + return extraMemory_.second ? extraMemory_.first.rawContent() : readerSuppliedMemory_.data; +} + +size_t +Store::ParsingBuffer::capacity() const +{ + return extraMemory_.second ? (extraMemory_.first.length() + extraMemory_.first.spaceSize()) : readerSuppliedMemory_.length; +} + +size_t +Store::ParsingBuffer::contentSize() const +{ + return extraMemory_.second ? extraMemory_.first.length() : readerSuppliedMemoryContentSize_; +} + +void +Store::ParsingBuffer::appended(const char * const newBytes, const size_t newByteCount) +{ + // a positive newByteCount guarantees that, after the first assertion below + // succeeds, the second assertion will not increment a nil memory() pointer + if (!newByteCount) + return; + + // these checks order guarantees that memory() is not nil in the second assertion + assert(newByteCount <= spaceSize()); // the new bytes end in our space + assert(memory() + contentSize() == newBytes); // the new bytes start in our space + // and now we know that newBytes is not nil either + + if (extraMemory_.second) + extraMemory_.first.rawAppendFinish(newBytes, newByteCount); + else + readerSuppliedMemoryContentSize_ = IncreaseSum(readerSuppliedMemoryContentSize_, newByteCount).first; + + assert(contentSize() <= capacity()); // paranoid +} + +void +Store::ParsingBuffer::consume(const size_t parsedBytes) +{ + Assure(contentSize() >= parsedBytes); // more conservative than extraMemory_->consume() + if (extraMemory_.second) { + extraMemory_.first.consume(parsedBytes); + } else { + readerSuppliedMemoryContentSize_ -= parsedBytes; + if (parsedBytes && readerSuppliedMemoryContentSize_) + memmove(readerSuppliedMemory_.data, memory() + parsedBytes, readerSuppliedMemoryContentSize_); + } +} + +StoreIOBuffer +Store::ParsingBuffer::space() +{ + const auto size = spaceSize(); + const auto start = extraMemory_.second ? + extraMemory_.first.rawAppendStart(size) : + (readerSuppliedMemory_.data + readerSuppliedMemoryContentSize_); + return StoreIOBuffer(spaceSize(), 0, start); +} + +StoreIOBuffer +Store::ParsingBuffer::makeSpace(const size_t pageSize) +{ + growSpace(pageSize); + auto result = space(); + Assure(result.length >= pageSize); + result.length = pageSize; + return result; +} + +StoreIOBuffer +Store::ParsingBuffer::content() const +{ + // This const_cast is a StoreIOBuffer API limitation: That class does not + // support a "constant content view", even though it is used as such a view. + return StoreIOBuffer(contentSize(), 0, const_cast(memory())); +} + +/// makes sure we have the requested number of bytes, allocates enough memory if needed +void +Store::ParsingBuffer::growSpace(const size_t minimumSpaceSize) +{ + const auto capacityIncreaseAttempt = IncreaseSum(contentSize(), minimumSpaceSize); + if (!capacityIncreaseAttempt.second) + throw TextException(ToSBuf("no support for a single memory block of ", contentSize(), '+', minimumSpaceSize, " bytes"), Here()); + const auto newCapacity = capacityIncreaseAttempt.first; + + if (newCapacity <= capacity()) + return; // already have enough space; no reallocation is needed + + debugs(90, 7, "growing to provide " << minimumSpaceSize << " in " << *this); + + if (extraMemory_.second) { + extraMemory_.first.reserveCapacity(newCapacity); + } else { + SBuf newStorage; + newStorage.reserveCapacity(newCapacity); + newStorage.append(readerSuppliedMemory_.data, readerSuppliedMemoryContentSize_); + extraMemory_.first = std::move(newStorage); + extraMemory_.second = true; + } + Assure(spaceSize() >= minimumSpaceSize); +} + +SBuf +Store::ParsingBuffer::toSBuf() const +{ + return extraMemory_.second ? extraMemory_.first : SBuf(content().data, content().length); +} + +size_t +Store::ParsingBuffer::spaceSize() const +{ + if (extraMemory_.second) + return extraMemory_.first.spaceSize(); + + assert(readerSuppliedMemoryContentSize_ <= readerSuppliedMemory_.length); + return readerSuppliedMemory_.length - readerSuppliedMemoryContentSize_; +} + +/// 0-terminates stored byte sequence, allocating more memory if needed, but +/// without increasing the number of stored content bytes +void +Store::ParsingBuffer::terminate() +{ + *makeSpace(1).data = 0; +} + +StoreIOBuffer +Store::ParsingBuffer::packBack() +{ + const auto bytesToPack = contentSize(); + // until our callers do not have to work around legacy code expectations + Assure(bytesToPack); + + // if we accumulated more bytes at some point, any extra metadata should + // have been consume()d by now, allowing readerSuppliedMemory_.data reuse + Assure(bytesToPack <= readerSuppliedMemory_.length); + + auto result = readerSuppliedMemory_; + result.length = bytesToPack; + Assure(result.data); + + if (!extraMemory_.second) { + // no accumulated bytes copying because they are in readerSuppliedMemory_ + debugs(90, 7, "quickly exporting " << result.length << " bytes via " << readerSuppliedMemory_); + } else { + debugs(90, 7, "slowly exporting " << result.length << " bytes from " << extraMemory_.first.id << " back into " << readerSuppliedMemory_); + memmove(result.data, extraMemory_.first.rawContent(), result.length); + } + + return result; +} + +void +Store::ParsingBuffer::print(std::ostream &os) const +{ + os << "size=" << contentSize(); + + if (extraMemory_.second) { + os << " capacity=" << capacity(); + os << " extra=" << extraMemory_.first.id; + } + + // report readerSuppliedMemory_ (if any) even if we are no longer using it + // for content storage; it affects packBack() and related parsing logic + if (readerSuppliedMemory_.length) + os << ' ' << readerSuppliedMemory_; +} + diff --git a/src/store/ParsingBuffer.h b/src/store/ParsingBuffer.h new file mode 100644 index 0000000..b473ac6 --- /dev/null +++ b/src/store/ParsingBuffer.h @@ -0,0 +1,128 @@ +/* + * Copyright (C) 1996-2023 The Squid Software Foundation and contributors + * + * Squid software is distributed under GPLv2+ license and includes + * contributions from numerous individuals and organizations. + * Please see the COPYING and CONTRIBUTORS files for details. + */ + +#ifndef SQUID_SRC_STORE_PARSINGBUFFER_H +#define SQUID_SRC_STORE_PARSINGBUFFER_H + +#include "sbuf/SBuf.h" +#include "StoreIOBuffer.h" + +#include + +namespace Store +{ + +/// A continuous buffer for efficient accumulation and NUL-termination of +/// Store-read bytes. The buffer accumulates two kinds of Store readers: +/// +/// * Readers that do not have any external buffer to worry about but need to +/// accumulate, terminate, and/or consume buffered content read by Store. +/// These readers use the default constructor and then allocate the initial +/// buffer space for their first read (if any). +/// +/// * Readers that supply their StoreIOBuffer at construction time. That buffer +/// is enough to handle the majority of use cases. However, the supplied +/// StoreIOBuffer capacity may be exceeded when parsing requires accumulating +/// multiple Store read results and/or NUL-termination of a full buffer. +/// +/// This buffer seamlessly grows as needed, reducing memory over-allocation and, +/// in case of StoreIOBuffer-seeded construction, memory copies. +class ParsingBuffer +{ +public: + /// creates buffer without any space or content + ParsingBuffer() = default; + + /// seeds this buffer with the caller-supplied buffer space + explicit ParsingBuffer(StoreIOBuffer &); + + /// a NUL-terminated version of content(); same lifetime as content() + const char *c_str() { terminate(); return memory(); } + + /// export content() into SBuf, avoiding content copying when possible + SBuf toSBuf() const; + + /// the total number of append()ed bytes that were not consume()d + size_t contentSize() const; + + /// the number of bytes in the space() buffer + size_t spaceSize() const; + + /// the maximum number of bytes we can store without allocating more space + size_t capacity() const; + + /// Stored append()ed bytes that have not been consume()d. The returned + /// buffer offset is set to zero; the caller is responsible for adjusting + /// the offset if needed (TODO: Add/return a no-offset Mem::View instead). + /// The returned buffer is invalidated by calling a non-constant method or + /// by changing the StoreIOBuffer contents given to our constructor. + StoreIOBuffer content() const; + + /// A (possibly empty) buffer for reading the next byte(s). The returned + /// buffer offset is set to zero; the caller is responsible for adjusting + /// the offset if needed (TODO: Add/return a no-offset Mem::Area instead). + /// The returned buffer is invalidated by calling a non-constant method or + /// by changing the StoreIOBuffer contents given to our constructor. + StoreIOBuffer space(); + + /// A buffer for reading the exact number of next byte(s). The method may + /// allocate new memory and copy previously appended() bytes as needed. + /// \param pageSize the exact number of bytes the caller wants to read + /// \returns space() after any necessary allocations + StoreIOBuffer makeSpace(size_t pageSize); + + /// A buffer suitable for the first storeClientCopy() call. The method may + /// allocate new memory and copy previously appended() bytes as needed. + /// \returns space() after any necessary allocations + /// \deprecated New clients should call makeSpace() with client-specific + /// pageSize instead of this one-size-fits-all legacy method. + StoreIOBuffer makeInitialSpace() { return makeSpace(4096); } + + /// remember the new bytes received into the previously provided space() + void appended(const char *, size_t); + + /// get rid of previously appended() prefix of a given size + void consume(size_t); + + /// Returns stored content, reusing the StoreIOBuffer given at the + /// construction time. Copying is avoided if we did not allocate extra + /// memory since construction. Not meant for default-constructed buffers. + /// \prec positive contentSize() (\sa store_client::finishCallback()) + StoreIOBuffer packBack(); + + /// summarizes object state (for debugging) + void print(std::ostream &) const; + +private: + const char *memory() const; + void terminate(); + void growSpace(size_t); + +private: + /// externally allocated buffer we were seeded with (or a zero-size one) + StoreIOBuffer readerSuppliedMemory_; + + /// append()ed to readerSuppliedMemory_ bytes that were not consume()d + size_t readerSuppliedMemoryContentSize_ = 0; + + /// our internal buffer that takes over readerSuppliedMemory_ when the + /// latter becomes full and more memory is needed + std::pair extraMemory_ = std::make_pair(SBuf(), false); +}; + +inline std::ostream & +operator <<(std::ostream &os, const ParsingBuffer &b) +{ + b.print(os); + return os; +} + +} // namespace Store + +#endif /* SQUID_SRC_STORE_PARSINGBUFFER_H */ + diff --git a/src/store/forward.h b/src/store/forward.h index 1422a85..db5ee1c 100644 --- a/src/store/forward.h +++ b/src/store/forward.h @@ -46,6 +46,7 @@ class Disks; class Disk; class DiskConfig; class EntryGuard; +class ParsingBuffer; typedef ::StoreEntry Entry; typedef ::MemStore Memory; diff --git a/src/store_client.cc b/src/store_client.cc index 1b54f04..a5f2440 100644 --- a/src/store_client.cc +++ b/src/store_client.cc @@ -9,6 +9,7 @@ /* DEBUG: section 90 Storage Manager Client-Side Interface */ #include "squid.h" +#include "base/AsyncCbdataCalls.h" #include "event.h" #include "globals.h" #include "HttpReply.h" @@ -16,8 +17,10 @@ #include "MemBuf.h" #include "MemObject.h" #include "mime_header.h" +#include "sbuf/Stream.h" #include "profiler/Profiler.h" #include "SquidConfig.h" +#include "SquidMath.h" #include "StatCounters.h" #include "Store.h" #include "store_swapin.h" @@ -39,17 +42,10 @@ static StoreIOState::STRCB storeClientReadBody; static StoreIOState::STRCB storeClientReadHeader; static void storeClientCopy2(StoreEntry * e, store_client * sc); -static EVH storeClientCopyEvent; static bool CheckQuickAbortIsReasonable(StoreEntry * entry); CBDATA_CLASS_INIT(store_client); -bool -store_client::memReaderHasLowerOffset(int64_t anOffset) const -{ - return getType() == STORE_MEM_CLIENT && copyInto.offset < anOffset; -} - int store_client::getType() const { @@ -105,25 +101,35 @@ storeClientListAdd(StoreEntry * e, void *data) } void -store_client::callback(ssize_t sz, bool error) +store_client::FinishCallback(store_client * const sc) { - size_t bSz = 0; + sc->finishCallback(); +} - if (sz >= 0 && !error) - bSz = sz; +void +store_client::finishCallback() +{ + Assure(_callback.callback_handler); + Assure(_callback.notifier); - StoreIOBuffer result(bSz, 0 ,copyInto.data); + // XXX: Some legacy code relies on zero-length buffers having nil data + // pointers. Some other legacy code expects "correct" result.offset even + // when there is no body to return. Accommodate all those expectations. + auto result = StoreIOBuffer(0, copyInto.offset, nullptr); + if (object_ok && parsingBuffer.second && parsingBuffer.first.contentSize()) + result = parsingBuffer.first.packBack(); + result.flags.error = object_ok ? 0 : 1; - if (sz < 0 || error) - result.flags.error = 1; + // no HTTP headers and no body bytes (but not because there was no space) + atEof_ = !sendingHttpHeaders() && !result.length && copyInto.length; + + parsingBuffer.second = false; + ++answers; - result.offset = cmp_offset; - assert(_callback.pending()); - cmp_offset = copyInto.offset + bSz; STCB *temphandler = _callback.callback_handler; void *cbdata = _callback.callback_data; - _callback = Callback(NULL, NULL); - copyInto.data = NULL; + _callback = Callback(nullptr, nullptr); + copyInto.data = nullptr; if (cbdataReferenceValid(cbdata)) temphandler(cbdata, result); @@ -131,32 +137,18 @@ store_client::callback(ssize_t sz, bool error) cbdataReferenceDone(cbdata); } -static void -storeClientCopyEvent(void *data) -{ - store_client *sc = (store_client *)data; - debugs(90, 3, "storeClientCopyEvent: Running"); - assert (sc->flags.copy_event_pending); - sc->flags.copy_event_pending = false; - - if (!sc->_callback.pending()) - return; - - storeClientCopy2(sc->entry, sc); -} - store_client::store_client(StoreEntry *e) : - cmp_offset(0), #if STORE_CLIENT_LIST_DEBUG owner(cbdataReference(data)), #endif entry(e), type(e->storeClientType()), - object_ok(true) + object_ok(true), + atEof_(false), + answers(0) { flags.disk_io_pending = false; flags.store_copying = false; - flags.copy_event_pending = false; ++ entry->refcount; if (getType() == STORE_DISK_CLIENT) { @@ -202,16 +194,33 @@ store_client::copy(StoreEntry * anEntry, #endif assert(!_callback.pending()); -#if ONLYCONTIGUOUSREQUESTS - - assert(cmp_offset == copyRequest.offset); -#endif - /* range requests will skip into the body */ - cmp_offset = copyRequest.offset; _callback = Callback (callback_fn, cbdataReference(data)); copyInto.data = copyRequest.data; copyInto.length = copyRequest.length; copyInto.offset = copyRequest.offset; + Assure(copyInto.offset >= 0); + + if (!copyInto.length) { + // During the first storeClientCopy() call, a zero-size buffer means + // that we will have to drop any HTTP response body bytes we read (with + // the HTTP headers from disk). After that, it means we cannot return + // anything to the caller at all. + debugs(90, 2, "WARNING: zero-size storeClientCopy() buffer: " << copyInto); + // keep going; moreToRead() should prevent any from-Store reading + } + + // Our nextHttpReadOffset() expects the first copy() call to have zero + // offset. More complex code could handle a positive first offset, but it + // would only be useful when reading responses from memory: We would not + // _delay_ the response (to read the requested HTTP body bytes from disk) + // when we already can respond with HTTP headers. + Assure(!copyInto.offset || answeredOnce()); + + parsingBuffer.first = Store::ParsingBuffer(copyInto); + parsingBuffer.second = true; + + discardableHttpEnd_ = nextHttpReadOffset(); + debugs(90, 7, "discardableHttpEnd_=" << discardableHttpEnd_); static bool copying (false); assert (!copying); @@ -239,50 +248,41 @@ store_client::copy(StoreEntry * anEntry, // Add no code here. This object may no longer exist. } -/// Whether there is (or will be) more entry data for us. +/// Whether Store has (or possibly will have) more entry data for us. bool -store_client::moreToSend() const +store_client::moreToRead() const { + if (!copyInto.length) + return false; // the client supplied a zero-size buffer + if (entry->store_status == STORE_PENDING) return true; // there may be more coming /* STORE_OK, including aborted entries: no more data is coming */ - const int64_t len = entry->objectLen(); + if (canReadFromMemory()) + return true; // memory has the first byte wanted by the client - // If we do not know the entry length, then we have to open the swap file. - const bool canSwapIn = entry->hasDisk(); - if (len < 0) - return canSwapIn; + if (!entry->hasDisk()) + return false; // cannot read anything from disk either - if (copyInto.offset >= len) - return false; // sent everything there is + if (entry->objectLen() >= 0 && copyInto.offset >= entry->contentLen()) + return false; // the disk cannot have byte(s) wanted by the client - if (canSwapIn) - return true; // if we lack prefix, we can swap it in - - // If we cannot swap in, make sure we have what we want in RAM. Otherwise, - // scheduleRead calls scheduleDiskRead which asserts without a swap file. - const MemObject *mem = entry->mem_obj; - return mem && - mem->inmem_lo <= copyInto.offset && copyInto.offset < mem->endOffset(); + // we cannot be sure until we swap in metadata and learn contentLen(), + // but the disk may have the byte(s) wanted by the client + return true; } static void storeClientCopy2(StoreEntry * e, store_client * sc) { /* reentrancy not allowed - note this could lead to - * dropped events + * dropped notifications about response data availability */ - if (sc->flags.copy_event_pending) { - return; - } - if (sc->flags.store_copying) { - sc->flags.copy_event_pending = true; - debugs(90, 3, "storeClientCopy2: Queueing storeClientCopyEvent()"); - eventAdd("storeClientCopyEvent", storeClientCopyEvent, sc, 0.0, 0); + debugs(90, 3, "prevented recursive copying for " << *e); return; } @@ -295,39 +295,44 @@ storeClientCopy2(StoreEntry * e, store_client * sc) * if the peer aborts, we want to give the client(s) * everything we got before the abort condition occurred. */ - /* Warning: doCopy may indirectly free itself in callbacks, - * hence the lock to keep it active for the duration of - * this function - * XXX: Locking does not prevent calling sc destructor (it only prevents - * freeing sc memory) so sc may become invalid from C++ p.o.v. - */ - CbcPointer tmpLock = sc; - assert (!sc->flags.store_copying); sc->doCopy(e); - assert(!sc->flags.store_copying); +} + +/// Whether our answer, if sent right now, will announce the availability of +/// HTTP response headers (to the STCB callback) for the first time. +bool +store_client::sendingHttpHeaders() const +{ + return !answeredOnce() && entry->mem().baseReply().hdr_sz > 0; } void store_client::doCopy(StoreEntry *anEntry) { + Assure(_callback.pending()); + Assure(!flags.disk_io_pending); + Assure(!flags.store_copying); + assert (anEntry == entry); flags.store_copying = true; MemObject *mem = entry->mem_obj; - debugs(33, 5, "store_client::doCopy: co: " << - copyInto.offset << ", hi: " << - mem->endOffset()); + debugs(33, 5, this << " into " << copyInto << + " hi: " << mem->endOffset() << + " objectLen: " << entry->objectLen() << + " past_answers: " << answers); - if (!moreToSend()) { + const auto sendHttpHeaders = sendingHttpHeaders(); + + if (!sendHttpHeaders && !moreToRead()) { /* There is no more to send! */ debugs(33, 3, HERE << "There is no more to send!"); - callback(0); + noteNews(); flags.store_copying = false; return; } - /* Check that we actually have data */ - if (anEntry->store_status == STORE_PENDING && copyInto.offset >= mem->endOffset()) { + if (!sendHttpHeaders && anEntry->store_status == STORE_PENDING && nextHttpReadOffset() >= mem->endOffset()) { debugs(90, 3, "store_client::doCopy: Waiting for more"); flags.store_copying = false; return; @@ -349,7 +354,24 @@ store_client::doCopy(StoreEntry *anEntry) if (!startSwapin()) return; // failure } - scheduleRead(); + + // send any immediately available body bytes even if we also sendHttpHeaders + if (canReadFromMemory()) { + readFromMemory(); + noteNews(); // will sendHttpHeaders (if needed) as well + flags.store_copying = false; + return; + } + + if (sendHttpHeaders) { + debugs(33, 5, "just send HTTP headers: " << mem->baseReply().hdr_sz); + noteNews(); + flags.store_copying = false; + return; + } + + // no information that the client needs is available immediately + scheduleDiskRead(); } /// opens the swapin "file" if possible; otherwise, fail()s and returns false @@ -383,14 +405,13 @@ store_client::startSwapin() } void -store_client::scheduleRead() +store_client::noteSwapInDone(const bool error) { - MemObject *mem = entry->mem_obj; - - if (copyInto.offset >= mem->inmem_lo && copyInto.offset < mem->endOffset()) - scheduleMemRead(); + Assure(_callback.pending()); + if (error) + fail(); else - scheduleDiskRead(); + noteNews(); } void @@ -415,15 +436,44 @@ store_client::scheduleDiskRead() flags.store_copying = false; } +/// whether at least one byte wanted by the client is in memory +bool +store_client::canReadFromMemory() const +{ + const auto &mem = entry->mem(); + const auto memReadOffset = nextHttpReadOffset(); + return mem.inmem_lo <= memReadOffset && memReadOffset < mem.endOffset() && + parsingBuffer.first.spaceSize(); +} + +/// The offset of the next stored HTTP response byte wanted by the client. +int64_t +store_client::nextHttpReadOffset() const +{ + Assure(parsingBuffer.second); + const auto &mem = entry->mem(); + const auto hdr_sz = mem.baseReply().hdr_sz; + // Certain SMP cache manager transactions do not store HTTP headers in + // mem_hdr; they store just a kid-specific piece of the future report body. + // In such cases, hdr_sz ought to be zero. In all other (known) cases, + // mem_hdr contains HTTP response headers (positive hdr_sz if parsed) + // followed by HTTP response body. This code math accommodates all cases. + return NaturalSum(hdr_sz, copyInto.offset, parsingBuffer.first.contentSize()).first; +} + +/// Copies at least some of the requested body bytes from MemObject memory, +/// satisfying the copy() request. +/// \pre canReadFromMemory() is true void -store_client::scheduleMemRead() +store_client::readFromMemory() { - /* What the client wants is in memory */ - /* Old style */ - debugs(90, 3, "store_client::doCopy: Copying normal from memory"); - size_t sz = entry->mem_obj->data_hdr.copy(copyInto); - callback(sz); - flags.store_copying = false; + Assure(parsingBuffer.second); + const auto readInto = parsingBuffer.first.space().positionAt(nextHttpReadOffset()); + + debugs(90, 3, "copying HTTP body bytes from memory into " << readInto); + const auto sz = entry->mem_obj->data_hdr.copy(readInto); + Assure(sz > 0); // our canReadFromMemory() precondition guarantees that + parsingBuffer.first.appended(readInto.data, sz); } void @@ -435,65 +485,150 @@ store_client::fileRead() assert(!flags.disk_io_pending); flags.disk_io_pending = true; + // mem->swap_hdr_sz is zero here during initial read(s) + const auto nextStoreReadOffset = NaturalSum(mem->swap_hdr_sz, nextHttpReadOffset()).first; + + // XXX: If fileRead() is called when we do not yet know mem->swap_hdr_sz, + // then we must start reading from disk offset zero to learn it: we cannot + // compute correct HTTP response start offset on disk without it. However, + // late startSwapin() calls imply that the assertion below might fail. + Assure(mem->swap_hdr_sz > 0 || !nextStoreReadOffset); + + // TODO: Remove this assertion. Introduced in 1998 commit 3157c72, it + // assumes that swapped out memory is freed unconditionally, but we no + // longer do that because trimMemory() path checks lowestMemReaderOffset(). + // It is also misplaced: We are not swapping out anything here and should + // not care about any swapout invariants. if (mem->swap_hdr_sz != 0) if (entry->swappingOut()) - assert(mem->swapout.sio->offset() > copyInto.offset + (int64_t)mem->swap_hdr_sz); + assert(mem->swapout.sio->offset() > nextStoreReadOffset); + + // XXX: We should let individual cache_dirs limit the read size instead, but + // we cannot do that without more fixes and research because: + // * larger reads corrupt responses when cache_dir uses SharedMemory::get(); + // * we do not know how to find all I/O code that assumes this limit; + // * performance effects of larger disk reads may be negative somewhere. + const decltype(StoreIOBuffer::length) maxReadSize = SM_PAGE_SIZE; + + Assure(parsingBuffer.second); + // also, do not read more than we can return (via a copyInto.length buffer) + const auto readSize = std::min(copyInto.length, maxReadSize); + lastDiskRead = parsingBuffer.first.makeSpace(readSize).positionAt(nextStoreReadOffset); + debugs(90, 5, "into " << lastDiskRead); storeRead(swapin_sio, - copyInto.data, - copyInto.length, - copyInto.offset + mem->swap_hdr_sz, + lastDiskRead.data, + lastDiskRead.length, + lastDiskRead.offset, mem->swap_hdr_sz == 0 ? storeClientReadHeader : storeClientReadBody, this); } void -store_client::readBody(const char *, ssize_t len) +store_client::readBody(const char * const buf, const ssize_t lastIoResult) { - int parsed_header = 0; - - // Don't assert disk_io_pending here.. may be called by read_header + Assure(flags.disk_io_pending); flags.disk_io_pending = false; assert(_callback.pending()); - debugs(90, 3, "storeClientReadBody: len " << len << ""); + Assure(parsingBuffer.second); + debugs(90, 3, "got " << lastIoResult << " using " << parsingBuffer.first); - if (len < 0) + if (lastIoResult < 0) return fail(); - if (copyInto.offset == 0 && len > 0 && entry->getReply()->sline.status() == Http::scNone) { - /* Our structure ! */ - HttpReply *rep = (HttpReply *) entry->getReply(); // bypass const + if (!lastIoResult) { + if (answeredOnce()) + return noteNews(); - if (!rep->parseCharBuf(copyInto.data, headersEnd(copyInto.data, len))) { - debugs(90, DBG_CRITICAL, "Could not parse headers from on disk object"); - } else { - parsed_header = 1; - } + debugs(90, DBG_CRITICAL, "ERROR: Truncated HTTP headers in on-disk object"); + return fail(); } - const HttpReply *rep = entry->getReply(); - if (len > 0 && rep && entry->mem_obj->inmem_lo == 0 && entry->objectLen() <= (int64_t)Config.Store.maxInMemObjSize && Config.onoff.memory_cache_disk) { - storeGetMemSpace(len); - // The above may start to free our object so we need to check again + assert(lastDiskRead.data == buf); + lastDiskRead.length = lastIoResult; + + parsingBuffer.first.appended(buf, lastIoResult); + + // we know swap_hdr_sz by now and were reading beyond swap metadata because + // readHead() would have been called otherwise (to read swap metadata) + const auto swap_hdr_sz = entry->mem().swap_hdr_sz; + Assure(swap_hdr_sz > 0); + Assure(!Less(lastDiskRead.offset, swap_hdr_sz)); + + // Map lastDiskRead (i.e. the disk area we just read) to an HTTP reply part. + // The bytes are the same, but disk and HTTP offsets differ by swap_hdr_sz. + const auto httpOffset = lastDiskRead.offset - swap_hdr_sz; + const auto httpPart = StoreIOBuffer(lastDiskRead).positionAt(httpOffset); + + maybeWriteFromDiskToMemory(httpPart); + handleBodyFromDisk(); +} + +/// de-serializes HTTP response (partially) read from disk storage +void +store_client::handleBodyFromDisk() +{ + // We cannot de-serialize on-disk HTTP response without MemObject because + // without MemObject::swap_hdr_sz we cannot know where that response starts. + Assure(entry->mem_obj); + Assure(entry->mem_obj->swap_hdr_sz > 0); + + if (!answeredOnce()) { + // All on-disk responses have HTTP headers. First disk body read(s) + // include HTTP headers that we must parse (if needed) and skip. + const auto haveHttpHeaders = entry->mem_obj->baseReply().pstate == psParsed; + if (!haveHttpHeaders && !parseHttpHeadersFromDisk()) + return; + skipHttpHeadersFromDisk(); + } + + noteNews(); +} + +/// Adds HTTP response data loaded from disk to the memory cache (if +/// needed/possible). The given part may contain portions of HTTP response +/// headers and/or HTTP response body. +void +store_client::maybeWriteFromDiskToMemory(const StoreIOBuffer &httpResponsePart) +{ + // XXX: Reject [memory-]uncachable/unshareable responses instead of assuming + // that an HTTP response should be written to MemObject's data_hdr (and that + // it may purge already cached entries) just because it "fits" and was + // loaded from disk. For example, this response may already be marked for + // release. The (complex) cachability decision(s) should be made outside + // (and obeyed by) this low-level code. + if (httpResponsePart.length && entry->mem_obj->inmem_lo == 0 && entry->objectLen() <= (int64_t)Config.Store.maxInMemObjSize && Config.onoff.memory_cache_disk) { + storeGetMemSpace(httpResponsePart.length); + // XXX: This "recheck" is not needed because storeGetMemSpace() cannot + // purge mem_hdr bytes of a locked entry, and we do lock ours. And + // inmem_lo offset itself should not be relevant to appending new bytes. + // + // recheck for the above call may purge entry's data from the memory cache if (entry->mem_obj->inmem_lo == 0) { - /* Copy read data back into memory. - * copyInto.offset includes headers, which is what mem cache needs - */ - int64_t mem_offset = entry->mem_obj->endOffset(); - if ((copyInto.offset == mem_offset) || (parsed_header && mem_offset == rep->hdr_sz)) { - entry->mem_obj->write(StoreIOBuffer(len, copyInto.offset, copyInto.data)); - } + // XXX: This code assumes a non-shared memory cache. + if (httpResponsePart.offset == entry->mem_obj->endOffset()) + entry->mem_obj->write(httpResponsePart); } } - - callback(len); } void store_client::fail() { + debugs(90, 3, (object_ok ? "once" : "again")); + if (!object_ok) + return; // we failed earlier; nothing to do now + object_ok = false; + + noteNews(); +} + +/// if necessary and possible, informs the Store reader about copy() result +void +store_client::noteNews() +{ /* synchronous open failures callback from the store, * before startSwapin detects the failure. * TODO: fix this inconsistent behaviour - probably by @@ -501,8 +636,20 @@ store_client::fail() * not synchronous */ - if (_callback.pending()) - callback(0, true); + if (!_callback.callback_handler) { + debugs(90, 5, "client lost interest"); + return; + } + + if (_callback.notifier) { + debugs(90, 5, "earlier news is being delivered by " << _callback.notifier); + return; + } + + _callback.notifier = asyncCall(90, 4, "store_client::FinishCallback", cbdataDialer(store_client::FinishCallback, this)); + ScheduleCallHere(_callback.notifier); + + Assure(!_callback.pending()); } static void @@ -573,38 +720,22 @@ store_client::readHeader(char const *buf, ssize_t len) if (!object_ok) return; + Assure(parsingBuffer.second); + debugs(90, 3, "got " << len << " using " << parsingBuffer.first); + if (len < 0) return fail(); + Assure(!parsingBuffer.first.contentSize()); + parsingBuffer.first.appended(buf, len); if (!unpackHeader(buf, len)) { fail(); return; } + parsingBuffer.first.consume(mem->swap_hdr_sz); - /* - * If our last read got some data the client wants, then give - * it to them, otherwise schedule another read. - */ - size_t body_sz = len - mem->swap_hdr_sz; - - if (copyInto.offset < static_cast(body_sz)) { - /* - * we have (part of) what they want - */ - size_t copy_sz = min(copyInto.length, body_sz); - debugs(90, 3, "storeClientReadHeader: copying " << copy_sz << " bytes of body"); - memmove(copyInto.data, copyInto.data + mem->swap_hdr_sz, copy_sz); - - readBody(copyInto.data, copy_sz); - - return; - } - - /* - * we don't have what the client wants, but at least we now - * know the swap header size. - */ - fileRead(); + maybeWriteFromDiskToMemory(parsingBuffer.first.content()); + handleBodyFromDisk(); } int @@ -673,10 +804,12 @@ storeUnregister(store_client * sc, StoreEntry * e, void *data) ++statCounter.swap.ins; } - if (sc->_callback.pending()) { - /* callback with ssize = -1 to indicate unexpected termination */ - debugs(90, 3, "store_client for " << *e << " has a callback"); - sc->fail(); + if (sc->_callback.callback_handler || sc->_callback.notifier) { + debugs(90, 3, "forgetting store_client callback for " << *e); + // Do not notify: Callers want to stop copying and forget about this + // pending copy request. Some would mishandle a notification from here. + if (sc->_callback.notifier) + sc->_callback.notifier->cancel("storeUnregister"); } #if STORE_CLIENT_LIST_DEBUG @@ -684,6 +817,8 @@ storeUnregister(store_client * sc, StoreEntry * e, void *data) #endif + // XXX: We might be inside sc store_client method somewhere up the call + // stack. TODO: Convert store_client to AsyncJob to make destruction async. delete sc; assert(e->locked()); @@ -740,6 +875,9 @@ StoreEntry::invokeHandlers() if (sc->flags.disk_io_pending) continue; + + if (sc->flags.store_copying) + continue; storeClientCopy2(this, sc); } @@ -847,6 +985,63 @@ CheckQuickAbortIsReasonable(StoreEntry * entry) return true; } +/// parses HTTP header bytes loaded from disk +/// \returns false if fail() or scheduleDiskRead() has been called and, hence, +/// the caller should just quit without any further action +bool +store_client::parseHttpHeadersFromDisk() +{ + try { + return tryParsingHttpHeaders(); + } catch (...) { + // XXX: Our parser enforces Config.maxReplyHeaderSize limit, but our + // packer does not. Since packing might increase header size, we may + // cache a header that we cannot parse and get here. Same for MemStore. + debugs(90, DBG_CRITICAL, "ERROR: Cannot parse on-disk HTTP headers" << + Debug::Extra << "exception: " << CurrentException << + Debug::Extra << "raw input size: " << parsingBuffer.first.contentSize() << " bytes" << + Debug::Extra << "current buffer capacity: " << parsingBuffer.first.capacity() << " bytes"); + fail(); + return false; + } +} + +/// parseHttpHeadersFromDisk() helper +/// \copydoc parseHttpHeaders() +bool +store_client::tryParsingHttpHeaders() +{ + Assure(parsingBuffer.second); + Assure(!copyInto.offset); // otherwise, parsingBuffer cannot have HTTP response headers + auto &adjustableReply = entry->mem().baseReply(); + if (adjustableReply.parseTerminatedPrefix(parsingBuffer.first.c_str(), parsingBuffer.first.contentSize())) + return true; + + // TODO: Optimize by checking memory as well. For simplicity sake, we + // continue on the disk-reading path, but readFromMemory() can give us the + // missing header bytes immediately if a concurrent request put those bytes + // into memory while we were waiting for our disk response. + scheduleDiskRead(); + return false; +} + +/// skips HTTP header bytes previously loaded from disk +void +store_client::skipHttpHeadersFromDisk() +{ + const auto hdr_sz = entry->mem_obj->baseReply().hdr_sz; + Assure(hdr_sz > 0); // all on-disk responses have HTTP headers + if (Less(parsingBuffer.first.contentSize(), hdr_sz)) { + debugs(90, 5, "discovered " << hdr_sz << "-byte HTTP headers in memory after reading some of them from disk: " << parsingBuffer.first); + parsingBuffer.first.consume(parsingBuffer.first.contentSize()); // skip loaded HTTP header prefix + } else { + parsingBuffer.first.consume(hdr_sz); // skip loaded HTTP headers + const auto httpBodyBytesAfterHeader = parsingBuffer.first.contentSize(); // may be zero + Assure(httpBodyBytesAfterHeader <= copyInto.length); + debugs(90, 5, "read HTTP body prefix: " << httpBodyBytesAfterHeader); + } +} + void store_client::dumpStats(MemBuf * output, int clientNumber) const { @@ -864,8 +1059,8 @@ store_client::dumpStats(MemBuf * output, int clientNumber) const if (flags.store_copying) output->append(" store_copying", 14); - if (flags.copy_event_pending) - output->append(" copy_event_pending", 19); + if (_callback.notifier) + output->append(" notifying", 10); output->append("\n",1); } @@ -873,12 +1068,19 @@ store_client::dumpStats(MemBuf * output, int clientNumber) const bool store_client::Callback::pending() const { - return callback_handler && callback_data; + return callback_handler && !notifier; } store_client::Callback::Callback(STCB *function, void *data) : callback_handler(function), callback_data (data) {} #if USE_DELAY_POOLS +int +store_client::bytesWanted() const +{ + // TODO: To avoid using stale copyInto, return zero if !_callback.pending()? + return delayId.bytesWanted(0, copyInto.length); +} + void store_client::setDelayId(DelayId delay_id) { diff --git a/src/store_swapin.cc b/src/store_swapin.cc index a05d7e3..cd32e94 100644 --- a/src/store_swapin.cc +++ b/src/store_swapin.cc @@ -56,7 +56,7 @@ storeSwapInFileClosed(void *data, int errflag, StoreIOState::Pointer) if (sc->_callback.pending()) { assert (errflag <= 0); - sc->callback(0, errflag ? true : false); + sc->noteSwapInDone(errflag); } ++statCounter.swap.ins; diff --git a/src/tests/stub_HttpReply.cc b/src/tests/stub_HttpReply.cc index 8ca7f9e..5cde8e6 100644 --- a/src/tests/stub_HttpReply.cc +++ b/src/tests/stub_HttpReply.cc @@ -25,6 +25,7 @@ void httpBodyPackInto(const HttpBody *, Packable *) STUB bool HttpReply::sanityCheckStartLine(const char *buf, const size_t hdr_len, Http::StatusCode *error) STUB_RETVAL(false) int HttpReply::httpMsgParseError() STUB_RETVAL(0) bool HttpReply::expectingBody(const HttpRequestMethod&, int64_t&) const STUB_RETVAL(false) +size_t HttpReply::parseTerminatedPrefix(const char *, size_t) STUB_RETVAL(0) bool HttpReply::parseFirstLine(const char *start, const char *end) STUB_RETVAL(false) void HttpReply::hdrCacheInit() STUB HttpReply * HttpReply::clone() const STUB_RETVAL(NULL) diff --git a/src/tests/stub_store_client.cc b/src/tests/stub_store_client.cc index 2a13874..debe24e 100644 --- a/src/tests/stub_store_client.cc +++ b/src/tests/stub_store_client.cc @@ -34,7 +34,12 @@ void storeLogOpen(void) STUB void storeDigestInit(void) STUB void storeRebuildStart(void) STUB void storeReplSetup(void) STUB -bool store_client::memReaderHasLowerOffset(int64_t anOffset) const STUB_RETVAL(false) void store_client::dumpStats(MemBuf * output, int clientNumber) const STUB int store_client::getType() const STUB_RETVAL(0) +void store_client::noteSwapInDone(bool) STUB +#if USE_DELAY_POOLS +int store_client::bytesWanted() const STUB_RETVAL(0) +#endif + + diff --git a/src/urn.cc b/src/urn.cc index 74453e1..6efdec1 100644 --- a/src/urn.cc +++ b/src/urn.cc @@ -26,8 +26,6 @@ #include "tools.h" #include "urn.h" -#define URN_REQBUF_SZ 4096 - class UrnState : public StoreClient { CBDATA_CLASS(UrnState); @@ -45,8 +43,8 @@ public: HttpRequest::Pointer request; HttpRequest::Pointer urlres_r; - char reqbuf[URN_REQBUF_SZ] = { '\0' }; - int reqofs = 0; + /// for receiving a URN resolver reply body from Store and interpreting it + Store::ParsingBuffer parsingBuffer; private: char *urlres; @@ -63,7 +61,7 @@ typedef struct { } url_entry; static STCB urnHandleReply; -static url_entry *urnParseReply(const char *inbuf, const HttpRequestMethod&); +static url_entry *urnParseReply(const SBuf &, const HttpRequestMethod &); static const char *const crlf = "\r\n"; CBDATA_CLASS_INIT(UrnState); @@ -183,13 +181,8 @@ UrnState::created(StoreEntry *newEntry) sc = storeClientListAdd(urlres_e, this); } - reqofs = 0; - StoreIOBuffer tempBuffer; - tempBuffer.offset = reqofs; - tempBuffer.length = URN_REQBUF_SZ; - tempBuffer.data = reqbuf; storeClientCopy(sc, urlres_e, - tempBuffer, + parsingBuffer.makeInitialSpace(), urnHandleReply, this); } @@ -224,9 +217,6 @@ urnHandleReply(void *data, StoreIOBuffer result) UrnState *urnState = static_cast(data); StoreEntry *e = urnState->entry; StoreEntry *urlres_e = urnState->urlres_e; - char *s = NULL; - size_t k; - HttpReply *rep; url_entry *urls; url_entry *u; url_entry *min_u; @@ -234,10 +224,7 @@ urnHandleReply(void *data, StoreIOBuffer result) ErrorState *err; int i; int urlcnt = 0; - char *buf = urnState->reqbuf; - StoreIOBuffer tempBuffer; - - debugs(52, 3, "urnHandleReply: Called with size=" << result.length << "."); + debugs(52, 3, result << " with " << *e); if (EBIT_TEST(urlres_e->flags, ENTRY_ABORTED) || result.flags.error) { delete urnState; @@ -250,59 +237,39 @@ urnHandleReply(void *data, StoreIOBuffer result) return; } - /* Update reqofs to point to where in the buffer we'd be */ - urnState->reqofs += result.length; - - /* Handle reqofs being bigger than normal */ - if (urnState->reqofs >= URN_REQBUF_SZ) { - delete urnState; - return; - } + urnState->parsingBuffer.appended(result.data, result.length); /* If we haven't received the entire object (urn), copy more */ - if (urlres_e->store_status == STORE_PENDING) { - Must(result.length > 0); // zero length ought to imply STORE_OK - tempBuffer.offset = urnState->reqofs; - tempBuffer.length = URN_REQBUF_SZ - urnState->reqofs; - tempBuffer.data = urnState->reqbuf + urnState->reqofs; + if (!urnState->sc->atEof()) { + const auto bufferedBytes = urnState->parsingBuffer.contentSize(); + const auto remainingSpace = urnState->parsingBuffer.space().positionAt(bufferedBytes); + + if (!remainingSpace.length) { + debugs(52, 3, "ran out of buffer space after " << bufferedBytes << " bytes"); + // TODO: Here and in other error cases, send ERR_URN_RESOLVE to client. + delete urnState; + return; + } + storeClientCopy(urnState->sc, urlres_e, - tempBuffer, + remainingSpace, urnHandleReply, urnState); return; } - /* we know its STORE_OK */ - k = headersEnd(buf, urnState->reqofs); - - if (0 == k) { - debugs(52, DBG_IMPORTANT, "urnHandleReply: didn't find end-of-headers for " << e->url() ); - delete urnState; - return; - } - - s = buf + k; - assert(urlres_e->getReply()); - rep = new HttpReply; - rep->parseCharBuf(buf, k); - debugs(52, 3, "reply exists, code=" << rep->sline.status() << "."); - - if (rep->sline.status() != Http::scOkay) { + const auto &peerReply = urlres_e->mem().baseReply(); + debugs(52, 3, "got reply, code=" << peerReply.sline.status()); + if (peerReply.sline.status() != Http::scOkay) { debugs(52, 3, "urnHandleReply: failed."); err = new ErrorState(ERR_URN_RESOLVE, Http::scNotFound, urnState->request.getRaw()); err->url = xstrdup(e->url()); errorAppendEntry(e, err); - delete rep; delete urnState; return; } - delete rep; - - while (xisspace(*s)) - ++s; - - urls = urnParseReply(s, urnState->request->method); + urls = urnParseReply(urnState->parsingBuffer.toSBuf(), urnState->request->method); if (!urls) { /* unknown URN error */ debugs(52, 3, "urnTranslateDone: unknown URN " << e->url()); @@ -350,7 +317,7 @@ urnHandleReply(void *data, StoreIOBuffer result) "Generated by %s@%s\n" "\n", APP_FULLNAME, getMyHostname()); - rep = new HttpReply; + const auto rep = new HttpReply; rep->setHeaders(Http::scFound, NULL, "text/html", mb->contentSize(), 0, squid_curtime); if (min_u) { @@ -372,9 +339,8 @@ urnHandleReply(void *data, StoreIOBuffer result) } static url_entry * -urnParseReply(const char *inbuf, const HttpRequestMethod& m) +urnParseReply(const SBuf &inBuf, const HttpRequestMethod &m) { - char *buf = xstrdup(inbuf); char *token; url_entry *list; url_entry *old; @@ -383,6 +349,13 @@ urnParseReply(const char *inbuf, const HttpRequestMethod& m) debugs(52, 3, "urnParseReply"); list = (url_entry *)xcalloc(n + 1, sizeof(*list)); + // XXX: Switch to tokenizer-based parsing. + const auto allocated = SBufToCstring(inBuf); + + auto buf = allocated; + while (xisspace(*buf)) + ++buf; + for (token = strtok(buf, crlf); token; token = strtok(NULL, crlf)) { debugs(52, 3, "urnParseReply: got '" << token << "'"); @@ -418,7 +391,7 @@ urnParseReply(const char *inbuf, const HttpRequestMethod& m) } debugs(52, 3, "urnParseReply: Found " << i << " URLs"); - xfree(buf); + xfree(allocated); return list; }