diff --git a/0004-src-CMakeLists.txt.patch b/0004-src-CMakeLists.txt.patch deleted file mode 100644 index f3f2e36..0000000 --- a/0004-src-CMakeLists.txt.patch +++ /dev/null @@ -1,12 +0,0 @@ ---- ceph-16.0.0/src/CMakeLists.txt.orig 2021-01-25 13:45:15.316053258 -0500 -+++ ceph-16.0.0/src/CMakeLists.txt 2021-01-25 13:43:34.418305591 -0500 -@@ -29,7 +29,8 @@ - -D_THREAD_SAFE - -D__STDC_FORMAT_MACROS - -D_FILE_OFFSET_BITS=64 -- -DBOOST_ASIO_DISABLE_THREAD_KEYWORD_EXTENSION) -+ -DBOOST_ASIO_DISABLE_THREAD_KEYWORD_EXTENSION -+ -DBOOST_ASIO_USE_TS_EXECUTOR_AS_DEFAULT) - if(LINUX) - add_definitions("-D_GNU_SOURCE") - endif() diff --git a/0012-rgw.patch b/0012-rgw.patch deleted file mode 100644 index 0d85501..0000000 --- a/0012-rgw.patch +++ /dev/null @@ -1,7721 +0,0 @@ -From 483302af2622cb26983c847196b8bad0a80fbd2f Mon Sep 17 00:00:00 2001 -From: "Adam C. Emerson" -Date: Sat, 21 Nov 2020 17:04:12 -0500 -Subject: [PATCH 01/26] cls/log: Take const references of things you won't - modify - -Signed-off-by: Adam C. Emerson -(cherry picked from commit 73ea8cec06addc6af2ba354321f1099f657f13c5) -Signed-off-by: Adam C. Emerson ---- - src/cls/log/cls_log_client.cc | 4 ++-- - src/cls/log/cls_log_client.h | 6 +++--- - 2 files changed, 5 insertions(+), 5 deletions(-) - -diff --git a/src/cls/log/cls_log_client.cc b/src/cls/log/cls_log_client.cc -index 418599c8066e4..182bb9fec47e9 100644 ---- a/src/cls/log/cls_log_client.cc -+++ b/src/cls/log/cls_log_client.cc -@@ -113,8 +113,8 @@ class LogListCtx : public ObjectOperationCompletion { - } - }; - --void cls_log_list(librados::ObjectReadOperation& op, utime_t& from, utime_t& to, -- const string& in_marker, int max_entries, -+void cls_log_list(librados::ObjectReadOperation& op, const utime_t& from, -+ const utime_t& to, const string& in_marker, int max_entries, - list& entries, - string *out_marker, bool *truncated) - { -diff --git a/src/cls/log/cls_log_client.h b/src/cls/log/cls_log_client.h -index b049c2cc01bda..2afdabeb3e0a2 100644 ---- a/src/cls/log/cls_log_client.h -+++ b/src/cls/log/cls_log_client.h -@@ -19,9 +19,9 @@ void cls_log_add(librados::ObjectWriteOperation& op, cls_log_entry& entry); - void cls_log_add(librados::ObjectWriteOperation& op, const utime_t& timestamp, - const std::string& section, const std::string& name, ceph::buffer::list& bl); - --void cls_log_list(librados::ObjectReadOperation& op, utime_t& from, utime_t& to, -- const std::string& in_marker, int max_entries, -- std::list& entries, -+void cls_log_list(librados::ObjectReadOperation& op, const utime_t& from, -+ const utime_t& to, const std::string& in_marker, -+ int max_entries, std::list& entries, - std::string *out_marker, bool *truncated); - - void cls_log_trim(librados::ObjectWriteOperation& op, const utime_t& from_time, const utime_t& to_time, - -From 35f044f39da713b3bf4c5002aade7b456727190e Mon Sep 17 00:00:00 2001 -From: "Adam C. Emerson" -Date: Tue, 3 Nov 2020 16:02:26 -0500 -Subject: [PATCH 02/26] rgw: Add AioCompletion* versions for the rest of the - FIFO methods - -Signed-off-by: Adam C. Emerson -(cherry picked from commit 665573ab8905bfa2e1ede6fc3be9bc80a625cb49) -Signed-off-by: Adam C. Emerson ---- - src/rgw/cls_fifo_legacy.cc | 1583 +++++++++++++++++++++----- - src/rgw/cls_fifo_legacy.h | 91 +- - src/rgw/rgw_datalog.cc | 7 +- - src/test/rgw/test_cls_fifo_legacy.cc | 484 +++++++- - 4 files changed, 1826 insertions(+), 339 deletions(-) - -diff --git a/src/rgw/cls_fifo_legacy.cc b/src/rgw/cls_fifo_legacy.cc -index d835aeec76ab8..569a3e77c458f 100644 ---- a/src/rgw/cls_fifo_legacy.cc -+++ b/src/rgw/cls_fifo_legacy.cc -@@ -109,6 +109,7 @@ int get_meta(lr::IoCtx& ioctx, const std::string& oid, - return r; - }; - -+namespace { - void update_meta(lr::ObjectWriteOperation* op, const fifo::objv& objv, - const fifo::update& update) - { -@@ -175,6 +176,27 @@ int push_part(lr::IoCtx& ioctx, const std::string& oid, std::string_view tag, - return retval; - } - -+void push_part(lr::IoCtx& ioctx, const std::string& oid, std::string_view tag, -+ std::deque data_bufs, std::uint64_t tid, -+ lr::AioCompletion* c) -+{ -+ lr::ObjectWriteOperation op; -+ fifo::op::push_part pp; -+ -+ pp.tag = tag; -+ pp.data_bufs = data_bufs; -+ pp.total_len = 0; -+ -+ for (const auto& bl : data_bufs) -+ pp.total_len += bl.length(); -+ -+ cb::list in; -+ encode(pp, in); -+ op.exec(fifo::op::CLASS, fifo::op::PUSH_PART, in); -+ auto r = ioctx.aio_operate(oid, c, &op, lr::OPERATION_RETURNVEC); -+ ceph_assert(r >= 0); -+} -+ - void trim_part(lr::ObjectWriteOperation* op, - std::optional tag, - std::uint64_t ofs, bool exclusive) -@@ -232,6 +254,70 @@ int list_part(lr::IoCtx& ioctx, const std::string& oid, - return r; - } - -+struct list_entry_completion : public lr::ObjectOperationCompletion { -+ CephContext* cct; -+ int* r_out; -+ std::vector* entries; -+ bool* more; -+ bool* full_part; -+ std::string* ptag; -+ std::uint64_t tid; -+ -+ list_entry_completion(CephContext* cct, int* r_out, std::vector* entries, -+ bool* more, bool* full_part, std::string* ptag, -+ std::uint64_t tid) -+ : cct(cct), r_out(r_out), entries(entries), more(more), -+ full_part(full_part), ptag(ptag), tid(tid) {} -+ virtual ~list_entry_completion() = default; -+ void handle_completion(int r, bufferlist& bl) override { -+ if (r >= 0) try { -+ fifo::op::list_part_reply reply; -+ auto iter = bl.cbegin(); -+ decode(reply, iter); -+ if (entries) *entries = std::move(reply.entries); -+ if (more) *more = reply.more; -+ if (full_part) *full_part = reply.full_part; -+ if (ptag) *ptag = reply.tag; -+ } catch (const cb::error& err) { -+ lderr(cct) -+ << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " decode failed: " << err.what() -+ << " tid=" << tid << dendl; -+ r = from_error_code(err.code()); -+ } else if (r < 0) { -+ lderr(cct) -+ << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " fifo::op::LIST_PART failed r=" << r << " tid=" << tid -+ << dendl; -+ } -+ if (r_out) *r_out = r; -+ } -+}; -+ -+lr::ObjectReadOperation list_part(CephContext* cct, -+ std::optional tag, -+ std::uint64_t ofs, -+ std::uint64_t max_entries, -+ int* r_out, -+ std::vector* entries, -+ bool* more, bool* full_part, -+ std::string* ptag, std::uint64_t tid) -+{ -+ lr::ObjectReadOperation op; -+ fifo::op::list_part lp; -+ -+ lp.tag = tag; -+ lp.ofs = ofs; -+ lp.max_entries = max_entries; -+ -+ cb::list in; -+ encode(lp, in); -+ op.exec(fifo::op::CLASS, fifo::op::LIST_PART, in, -+ new list_entry_completion(cct, r_out, entries, more, full_part, -+ ptag, tid)); -+ return op; -+} -+ - int get_part_info(lr::IoCtx& ioctx, const std::string& oid, - fifo::part_header* header, - std::uint64_t tid, optional_yield y) -@@ -264,29 +350,131 @@ int get_part_info(lr::IoCtx& ioctx, const std::string& oid, - return r; - } - --static void complete(lr::AioCompletion* c_, int r) -+struct partinfo_completion : public lr::ObjectOperationCompletion { -+ CephContext* cct; -+ int* rp; -+ fifo::part_header* h; -+ std::uint64_t tid; -+ partinfo_completion(CephContext* cct, int* rp, fifo::part_header* h, -+ std::uint64_t tid) : -+ cct(cct), rp(rp), h(h), tid(tid) { -+ } -+ virtual ~partinfo_completion() = default; -+ void handle_completion(int r, bufferlist& bl) override { -+ if (r >= 0) try { -+ fifo::op::get_part_info_reply reply; -+ auto iter = bl.cbegin(); -+ decode(reply, iter); -+ if (h) *h = std::move(reply.header); -+ } catch (const cb::error& err) { -+ r = from_error_code(err.code()); -+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " decode failed: " << err.what() -+ << " tid=" << tid << dendl; -+ } else { -+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " fifo::op::GET_PART_INFO failed r=" << r << " tid=" << tid -+ << dendl; -+ } -+ if (rp) { -+ *rp = r; -+ } -+ } -+}; -+ -+template -+struct Completion { -+private: -+ lr::AioCompletion* _cur = nullptr; -+ lr::AioCompletion* _super; -+public: -+ -+ using Ptr = std::unique_ptr; -+ -+ lr::AioCompletion* cur() const { -+ return _cur; -+ } -+ lr::AioCompletion* super() const { -+ return _super; -+ } -+ -+ Completion(lr::AioCompletion* super) : _super(super) { -+ super->pc->get(); -+ } -+ -+ ~Completion() { -+ if (_super) { -+ _super->pc->put(); -+ } -+ if (_cur) -+ _cur->release(); -+ _super = nullptr; -+ _cur = nullptr; -+ } -+ -+ // The only times that aio_operate can return an error are: -+ // 1. The completion contains a null pointer. This should just -+ // crash, and in our case it does. -+ // 2. An attempt is made to write to a snapshot. RGW doesn't use -+ // snapshots, so we don't care. -+ // -+ // So we will just assert that initiating an Aio operation succeeds -+ // and not worry about recovering. -+ static lr::AioCompletion* call(Ptr&& p) { -+ p->_cur = lr::Rados::aio_create_completion(static_cast(p.get()), -+ &cb); -+ auto c = p->_cur; -+ p.release(); -+ return c; -+ } -+ static void complete(Ptr&& p, int r) { -+ auto c = p->_super->pc; -+ p->_super = nullptr; -+ c->lock.lock(); -+ c->rval = r; -+ c->complete = true; -+ c->lock.unlock(); -+ -+ auto cb_complete = c->callback_complete; -+ auto cb_complete_arg = c->callback_complete_arg; -+ if (cb_complete) -+ cb_complete(c, cb_complete_arg); -+ -+ auto cb_safe = c->callback_safe; -+ auto cb_safe_arg = c->callback_safe_arg; -+ if (cb_safe) -+ cb_safe(c, cb_safe_arg); -+ -+ c->lock.lock(); -+ c->callback_complete = nullptr; -+ c->callback_safe = nullptr; -+ c->cond.notify_all(); -+ c->put_unlock(); -+ } -+ -+ static void cb(lr::completion_t, void* arg) { -+ auto t = static_cast(arg); -+ auto r = t->_cur->get_return_value(); -+ t->_cur->release(); -+ t->_cur = nullptr; -+ t->handle(Ptr(t), r); -+ } -+}; -+ -+lr::ObjectReadOperation get_part_info(CephContext* cct, -+ fifo::part_header* header, -+ std::uint64_t tid, int* r = 0) - { -- auto c = c_->pc; -- c->lock.lock(); -- c->rval = r; -- c->complete = true; -- c->lock.unlock(); -- -- auto cb_complete = c->callback_complete; -- auto cb_complete_arg = c->callback_complete_arg; -- if (cb_complete) -- cb_complete(c, cb_complete_arg); -- -- auto cb_safe = c->callback_safe; -- auto cb_safe_arg = c->callback_safe_arg; -- if (cb_safe) -- cb_safe(c, cb_safe_arg); -- -- c->lock.lock(); -- c->callback_complete = NULL; -- c->callback_safe = NULL; -- c->cond.notify_all(); -- c->put_unlock(); -+ lr::ObjectReadOperation op; -+ fifo::op::get_part_info gpi; -+ -+ cb::list in; -+ cb::list bl; -+ encode(gpi, in); -+ op.exec(fifo::op::CLASS, fifo::op::GET_PART_INFO, in, -+ new partinfo_completion(cct, r, header, tid)); -+ return op; -+} - } - - std::optional FIFO::to_marker(std::string_view s) -@@ -385,11 +573,8 @@ int FIFO::_update_meta(const fifo::update& update, - return r; - } - --struct Updater { -+struct Updater : public Completion { - FIFO* fifo; -- lr::AioCompletion* super; -- lr::AioCompletion* cur = lr::Rados::aio_create_completion( -- static_cast(this), &FIFO::update_callback); - fifo::update update; - fifo::objv version; - bool reread = false; -@@ -398,92 +583,74 @@ struct Updater { - Updater(FIFO* fifo, lr::AioCompletion* super, - const fifo::update& update, fifo::objv version, - bool* pcanceled, std::uint64_t tid) -- : fifo(fifo), super(super), update(update), version(version), -- pcanceled(pcanceled), tid(tid) { -- super->pc->get(); -- } -- ~Updater() { -- cur->release(); -- } --}; -- --void FIFO::update_callback(lr::completion_t, void* arg) --{ -- std::unique_ptr updater(static_cast(arg)); -- auto cct = updater->fifo->cct; -- auto tid = updater->tid; -- ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -- << " entering: tid=" << tid << dendl; -- if (!updater->reread) { -- ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -- << " handling async update_meta: tid=" -- << tid << dendl; -- int r = updater->cur->get_return_value(); -+ : Completion(super), fifo(fifo), update(update), version(version), -+ pcanceled(pcanceled) {} -+ -+ void handle(Ptr&& p, int r) { -+ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " entering: tid=" << tid << dendl; -+ if (reread) -+ handle_reread(std::move(p), r); -+ else -+ handle_update(std::move(p), r); -+ } -+ -+ void handle_update(Ptr&& p, int r) { -+ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " handling async update_meta: tid=" -+ << tid << dendl; - if (r < 0 && r != -ECANCELED) { -- lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " update failed: r=" << r << " tid=" << tid << dendl; -- complete(updater->super, r); -+ complete(std::move(p), r); - return; - } - bool canceled = (r == -ECANCELED); - if (!canceled) { -- int r = updater->fifo->apply_update(&updater->fifo->info, -- updater->version, -- updater->update, tid); -+ int r = fifo->apply_update(&fifo->info, version, update, tid); - if (r < 0) { -- ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -- << " update failed, marking canceled: r=" << r << " tid=" -- << tid << dendl; -+ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " update failed, marking canceled: r=" << r -+ << " tid=" << tid << dendl; - canceled = true; - } - } - if (canceled) { -- updater->cur->release(); -- updater->cur = lr::Rados::aio_create_completion( -- arg, &FIFO::update_callback); -- updater->reread = true; -- auto r = updater->fifo->read_meta(tid, updater->cur); -- if (r < 0) { -- lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -- << " failed dispatching read_meta: r=" << r << " tid=" -- << tid << dendl; -- complete(updater->super, r); -- } else { -- updater.release(); -- } -+ reread = true; -+ fifo->read_meta(tid, call(std::move(p))); - return; - } -- if (updater->pcanceled) -- *updater->pcanceled = false; -- ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -- << " completing: tid=" << tid << dendl; -- complete(updater->super, 0); -- return; -- } -- -- ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -- << " handling async read_meta: tid=" -- << tid << dendl; -- int r = updater->cur->get_return_value(); -- if (r < 0 && updater->pcanceled) { -- *updater->pcanceled = false; -- } else if (r >= 0 && updater->pcanceled) { -- *updater->pcanceled = true; -- } -- if (r < 0) { -- lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -- << " failed dispatching read_meta: r=" << r << " tid=" -- << tid << dendl; -- } else { -- ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -- << " completing: tid=" << tid << dendl; -+ if (pcanceled) -+ *pcanceled = false; -+ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " completing: tid=" << tid << dendl; -+ complete(std::move(p), 0); -+ } -+ -+ void handle_reread(Ptr&& p, int r) { -+ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " handling async read_meta: tid=" -+ << tid << dendl; -+ if (r < 0 && pcanceled) { -+ *pcanceled = false; -+ } else if (r >= 0 && pcanceled) { -+ *pcanceled = true; -+ } -+ if (r < 0) { -+ lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " failed dispatching read_meta: r=" << r << " tid=" -+ << tid << dendl; -+ } else { -+ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " completing: tid=" << tid << dendl; -+ } -+ complete(std::move(p), r); - } -- complete(updater->super, r); --} -+}; - --int FIFO::_update_meta(const fifo::update& update, -- fifo::objv version, bool* pcanceled, -- std::uint64_t tid, lr::AioCompletion* c) -+void FIFO::_update_meta(const fifo::update& update, -+ fifo::objv version, bool* pcanceled, -+ std::uint64_t tid, lr::AioCompletion* c) - { - ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering: tid=" << tid << dendl; -@@ -491,15 +658,8 @@ int FIFO::_update_meta(const fifo::update& update, - update_meta(&op, info.version, update); - auto updater = std::make_unique(this, c, update, version, pcanceled, - tid); -- auto r = ioctx.aio_operate(oid, updater->cur, &op); -- if (r < 0) { -- lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -- << " failed dispatching update_meta: r=" << r << " tid=" -- << tid << dendl; -- } else { -- updater.release(); -- } -- return r; -+ auto r = ioctx.aio_operate(oid, Updater::call(std::move(updater)), &op); -+ assert(r >= 0); - } - - int FIFO::create_part(int64_t part_num, std::string_view tag, std::uint64_t tid, -@@ -509,7 +669,7 @@ int FIFO::create_part(int64_t part_num, std::string_view tag, std::uint64_t tid, - << " entering: tid=" << tid << dendl; - lr::ObjectWriteOperation op; - op.create(false); /* We don't need exclusivity, part_init ensures -- we're creating from the same journal entry. */ -+ we're creating from the same journal entry. */ - std::unique_lock l(m); - part_init(&op, tag, info.params); - auto oid = info.part_oid(part_num); -@@ -806,6 +966,209 @@ int FIFO::_prepare_new_head(std::uint64_t tid, optional_yield y) - return 0; - } - -+struct NewPartPreparer : public Completion { -+ FIFO* f; -+ std::vector jentries; -+ int i = 0; -+ std::int64_t new_head_part_num; -+ bool canceled = false; -+ uint64_t tid; -+ -+ NewPartPreparer(FIFO* f, lr::AioCompletion* super, -+ std::vector jentries, -+ std::int64_t new_head_part_num, -+ std::uint64_t tid) -+ : Completion(super), f(f), jentries(std::move(jentries)), -+ new_head_part_num(new_head_part_num), tid(tid) {} -+ -+ void handle(Ptr&& p, int r) { -+ ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " entering: tid=" << tid << dendl; -+ if (r < 0) { -+ lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " _update_meta failed: r=" << r -+ << " tid=" << tid << dendl; -+ complete(std::move(p), r); -+ return; -+ } -+ -+ if (canceled) { -+ std::unique_lock l(f->m); -+ auto iter = f->info.journal.find(jentries.front().part_num); -+ auto max_push_part_num = f->info.max_push_part_num; -+ auto head_part_num = f->info.head_part_num; -+ auto version = f->info.version; -+ auto found = (iter != f->info.journal.end()); -+ l.unlock(); -+ if ((max_push_part_num >= jentries.front().part_num && -+ head_part_num >= new_head_part_num)) { -+ ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " raced, but journaled and processed: i=" << i -+ << " tid=" << tid << dendl; -+ complete(std::move(p), 0); -+ return; -+ } -+ if (i >= MAX_RACE_RETRIES) { -+ complete(std::move(p), -ECANCELED); -+ return; -+ } -+ if (!found) { -+ ++i; -+ f->_update_meta(fifo::update{} -+ .journal_entries_add(jentries), -+ version, &canceled, tid, call(std::move(p))); -+ return; -+ } else { -+ ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " raced, journaled but not processed: i=" << i -+ << " tid=" << tid << dendl; -+ canceled = false; -+ } -+ // Fall through. We still need to process the journal. -+ } -+ f->process_journal(tid, super()); -+ return; -+ } -+}; -+ -+void FIFO::_prepare_new_part(bool is_head, std::uint64_t tid, -+ lr::AioCompletion* c) -+{ -+ std::unique_lock l(m); -+ std::vector jentries = { info.next_journal_entry(generate_tag()) }; -+ if (info.journal.find(jentries.front().part_num) != info.journal.end()) { -+ l.unlock(); -+ ldout(cct, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " new part journaled, but not processed: tid=" -+ << tid << dendl; -+ process_journal(tid, c); -+ return; -+ } -+ std::int64_t new_head_part_num = info.head_part_num; -+ auto version = info.version; -+ -+ if (is_head) { -+ auto new_head_jentry = jentries.front(); -+ new_head_jentry.op = fifo::journal_entry::Op::set_head; -+ new_head_part_num = jentries.front().part_num; -+ jentries.push_back(std::move(new_head_jentry)); -+ } -+ l.unlock(); -+ -+ auto n = std::make_unique(this, c, jentries, -+ new_head_part_num, tid); -+ auto np = n.get(); -+ _update_meta(fifo::update{}.journal_entries_add(jentries), version, -+ &np->canceled, tid, NewPartPreparer::call(std::move(n))); -+} -+ -+struct NewHeadPreparer : public Completion { -+ FIFO* f; -+ int i = 0; -+ bool newpart; -+ std::int64_t new_head_num; -+ bool canceled = false; -+ std::uint64_t tid; -+ -+ NewHeadPreparer(FIFO* f, lr::AioCompletion* super, -+ bool newpart, std::int64_t new_head_num, std::uint64_t tid) -+ : Completion(super), f(f), newpart(newpart), new_head_num(new_head_num), -+ tid(tid) {} -+ -+ void handle(Ptr&& p, int r) { -+ if (newpart) -+ handle_newpart(std::move(p), r); -+ else -+ handle_update(std::move(p), r); -+ } -+ -+ void handle_newpart(Ptr&& p, int r) { -+ if (r < 0) { -+ lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " _prepare_new_part failed: r=" << r -+ << " tid=" << tid << dendl; -+ complete(std::move(p), r); -+ return; -+ } -+ std::unique_lock l(f->m); -+ if (f->info.max_push_part_num < new_head_num) { -+ l.unlock(); -+ lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " _prepare_new_part failed: r=" << r -+ << " tid=" << tid << dendl; -+ complete(std::move(p), -EIO); -+ } else { -+ l.unlock(); -+ complete(std::move(p), 0); -+ } -+ } -+ -+ void handle_update(Ptr&& p, int r) { -+ std::unique_lock l(f->m); -+ auto head_part_num = f->info.head_part_num; -+ auto version = f->info.version; -+ l.unlock(); -+ -+ if (r < 0) { -+ lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " _update_meta failed: r=" << r -+ << " tid=" << tid << dendl; -+ complete(std::move(p), r); -+ return; -+ } -+ if (canceled) { -+ if (i >= MAX_RACE_RETRIES) { -+ lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " canceled too many times, giving up: tid=" << tid << dendl; -+ complete(std::move(p), -ECANCELED); -+ return; -+ } -+ -+ // Raced, but there's still work to do! -+ if (head_part_num < new_head_num) { -+ canceled = false; -+ ++i; -+ ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " updating head: i=" << i << " tid=" << tid << dendl; -+ f->_update_meta(fifo::update{}.head_part_num(new_head_num), -+ version, &this->canceled, tid, call(std::move(p))); -+ return; -+ } -+ } -+ ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " succeeded : i=" << i << " tid=" << tid << dendl; -+ complete(std::move(p), 0); -+ return; -+ } -+}; -+ -+void FIFO::_prepare_new_head(std::uint64_t tid, lr::AioCompletion* c) -+{ -+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " entering: tid=" << tid << dendl; -+ std::unique_lock l(m); -+ int64_t new_head_num = info.head_part_num + 1; -+ auto max_push_part_num = info.max_push_part_num; -+ auto version = info.version; -+ l.unlock(); -+ -+ if (max_push_part_num < new_head_num) { -+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " need new part: tid=" << tid << dendl; -+ auto n = std::make_unique(this, c, true, new_head_num, -+ tid); -+ _prepare_new_part(true, tid, NewHeadPreparer::call(std::move(n))); -+ } else { -+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " updating head: tid=" << tid << dendl; -+ auto n = std::make_unique(this, c, false, new_head_num, -+ tid); -+ auto np = n.get(); -+ _update_meta(fifo::update{}.head_part_num(new_head_num), version, -+ &np->canceled, tid, NewHeadPreparer::call(std::move(n))); -+ } -+} -+ - int FIFO::push_entries(const std::deque& data_bufs, - std::uint64_t tid, optional_yield y) - { -@@ -825,6 +1188,18 @@ int FIFO::push_entries(const std::deque& data_bufs, - return r; - } - -+void FIFO::push_entries(const std::deque& data_bufs, -+ std::uint64_t tid, lr::AioCompletion* c) -+{ -+ std::unique_lock l(m); -+ auto head_part_num = info.head_part_num; -+ auto tag = info.head_tag; -+ const auto part_oid = info.part_oid(head_part_num); -+ l.unlock(); -+ -+ push_part(ioctx, part_oid, tag, data_bufs, tid, c); -+} -+ - int FIFO::trim_part(int64_t part_num, uint64_t ofs, - std::optional tag, - bool exclusive, std::uint64_t tid, -@@ -845,10 +1220,10 @@ int FIFO::trim_part(int64_t part_num, uint64_t ofs, - return 0; - } - --int FIFO::trim_part(int64_t part_num, uint64_t ofs, -- std::optional tag, -- bool exclusive, std::uint64_t tid, -- lr::AioCompletion* c) -+void FIFO::trim_part(int64_t part_num, uint64_t ofs, -+ std::optional tag, -+ bool exclusive, std::uint64_t tid, -+ lr::AioCompletion* c) - { - ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering: tid=" << tid << dendl; -@@ -858,12 +1233,7 @@ int FIFO::trim_part(int64_t part_num, uint64_t ofs, - l.unlock(); - rgw::cls::fifo::trim_part(&op, tag, ofs, exclusive); - auto r = ioctx.aio_operate(part_oid, c, &op); -- if (r < 0) { -- lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -- << " failed scheduling trim_part: r=" << r -- << " tid=" << tid << dendl; -- } -- return r; -+ ceph_assert(r >= 0); - } - - int FIFO::open(lr::IoCtx ioctx, std::string oid, std::unique_ptr* fifo, -@@ -960,54 +1330,42 @@ int FIFO::read_meta(optional_yield y) { - return read_meta(tid, y); - } - --struct Reader { -+struct Reader : public Completion { - FIFO* fifo; - cb::list bl; -- lr::AioCompletion* super; - std::uint64_t tid; -- lr::AioCompletion* cur = lr::Rados::aio_create_completion( -- static_cast(this), &FIFO::read_callback); - Reader(FIFO* fifo, lr::AioCompletion* super, std::uint64_t tid) -- : fifo(fifo), super(super), tid(tid) { -- super->pc->get(); -- } -- ~Reader() { -- cur->release(); -- } --}; -+ : Completion(super), fifo(fifo), tid(tid) {} - --void FIFO::read_callback(lr::completion_t, void* arg) --{ -- std::unique_ptr reader(static_cast(arg)); -- auto cct = reader->fifo->cct; -- auto tid = reader->tid; -- ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -- << " entering: tid=" << tid << dendl; -- auto r = reader->cur->get_return_value(); -- if (r >= 0) try { -- fifo::op::get_meta_reply reply; -- auto iter = reader->bl.cbegin(); -- decode(reply, iter); -- std::unique_lock l(reader->fifo->m); -- if (reply.info.version.same_or_later(reader->fifo->info.version)) { -- reader->fifo->info = std::move(reply.info); -- reader->fifo->part_header_size = reply.part_header_size; -- reader->fifo->part_entry_overhead = reply.part_entry_overhead; -- } -- } catch (const cb::error& err) { -+ void handle(Ptr&& p, int r) { -+ auto cct = fifo->cct; -+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " entering: tid=" << tid << dendl; -+ if (r >= 0) try { -+ fifo::op::get_meta_reply reply; -+ auto iter = bl.cbegin(); -+ decode(reply, iter); -+ std::unique_lock l(fifo->m); -+ if (reply.info.version.same_or_later(fifo->info.version)) { -+ fifo->info = std::move(reply.info); -+ fifo->part_header_size = reply.part_header_size; -+ fifo->part_entry_overhead = reply.part_entry_overhead; -+ } -+ } catch (const cb::error& err) { -+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " failed to decode response err=" << err.what() -+ << " tid=" << tid << dendl; -+ r = from_error_code(err.code()); -+ } else { - lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -- << " failed to decode response err=" << err.what() -+ << " read_meta failed r=" << r - << " tid=" << tid << dendl; -- r = from_error_code(err.code()); -- } else { -- lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -- << " read_meta failed r=" << r -- << " tid=" << tid << dendl; -+ } -+ complete(std::move(p), r); - } -- complete(reader->super, r); --} -+}; - --int FIFO::read_meta(std::uint64_t tid, lr::AioCompletion* c) -+void FIFO::read_meta(std::uint64_t tid, lr::AioCompletion* c) - { - ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering: tid=" << tid << dendl; -@@ -1016,16 +1374,10 @@ int FIFO::read_meta(std::uint64_t tid, lr::AioCompletion* c) - cb::list in; - encode(gm, in); - auto reader = std::make_unique(this, c, tid); -- auto r = ioctx.aio_exec(oid, reader->cur, fifo::op::CLASS, -- fifo::op::GET_META, in, &reader->bl); -- if (r < 0) { -- lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -- << " failed scheduling read_meta r=" << r -- << " tid=" << tid << dendl; -- } else { -- reader.release(); -- } -- return r; -+ auto rp = reader.get(); -+ auto r = ioctx.aio_exec(oid, Reader::call(std::move(reader)), fifo::op::CLASS, -+ fifo::op::GET_META, in, &rp->bl); -+ assert(r >= 0); - } - - const fifo::info& FIFO::meta() const { -@@ -1040,6 +1392,10 @@ int FIFO::push(const cb::list& bl, optional_yield y) { - return push(std::vector{ bl }, y); - } - -+void FIFO::push(const cb::list& bl, lr::AioCompletion* c) { -+ push(std::vector{ bl }, c); -+} -+ - int FIFO::push(const std::vector& data_bufs, optional_yield y) - { - std::unique_lock l(m); -@@ -1153,24 +1509,185 @@ int FIFO::push(const std::vector& data_bufs, optional_yield y) - return 0; - } - --int FIFO::list(int max_entries, -- std::optional markstr, -- std::vector* presult, bool* pmore, -- optional_yield y) --{ -- std::unique_lock l(m); -- auto tid = ++next_tid; -- std::int64_t part_num = info.tail_part_num; -- l.unlock(); -- ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -- << " entering: tid=" << tid << dendl; -- std::uint64_t ofs = 0; -- if (markstr) { -- auto marker = to_marker(*markstr); -- if (!marker) { -- lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -- << " invalid marker string: " << markstr -- << " tid= "<< tid << dendl; -+struct Pusher : public Completion { -+ FIFO* f; -+ std::deque remaining; -+ std::deque batch; -+ int i = 0; -+ std::uint64_t tid; -+ bool new_heading = false; -+ -+ void prep_then_push(Ptr&& p, const unsigned successes) { -+ std::unique_lock l(f->m); -+ auto max_part_size = f->info.params.max_part_size; -+ auto part_entry_overhead = f->part_entry_overhead; -+ l.unlock(); -+ -+ ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " preparing push: remaining=" << remaining.size() -+ << " batch=" << batch.size() << " i=" << i -+ << " tid=" << tid << dendl; -+ -+ uint64_t batch_len = 0; -+ if (successes > 0) { -+ if (successes == batch.size()) { -+ batch.clear(); -+ } else { -+ batch.erase(batch.begin(), batch.begin() + successes); -+ for (const auto& b : batch) { -+ batch_len += b.length() + part_entry_overhead; -+ } -+ } -+ } -+ -+ if (batch.empty() && remaining.empty()) { -+ complete(std::move(p), 0); -+ return; -+ } -+ -+ while (!remaining.empty() && -+ (remaining.front().length() + batch_len <= max_part_size)) { -+ -+ /* We can send entries with data_len up to max_entry_size, -+ however, we want to also account the overhead when -+ dealing with multiple entries. Previous check doesn't -+ account for overhead on purpose. */ -+ batch_len += remaining.front().length() + part_entry_overhead; -+ batch.push_back(std::move(remaining.front())); -+ remaining.pop_front(); -+ } -+ ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " prepared push: remaining=" << remaining.size() -+ << " batch=" << batch.size() << " i=" << i -+ << " batch_len=" << batch_len -+ << " tid=" << tid << dendl; -+ push(std::move(p)); -+ } -+ -+ void push(Ptr&& p) { -+ f->push_entries(batch, tid, call(std::move(p))); -+ } -+ -+ void new_head(Ptr&& p) { -+ new_heading = true; -+ f->_prepare_new_head(tid, call(std::move(p))); -+ } -+ -+ void handle(Ptr&& p, int r) { -+ if (!new_heading) { -+ if (r == -ERANGE) { -+ ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " need new head tid=" << tid << dendl; -+ new_head(std::move(p)); -+ return; -+ } -+ if (r < 0) { -+ lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " push_entries failed: r=" << r -+ << " tid=" << tid << dendl; -+ complete(std::move(p), r); -+ return; -+ } -+ i = 0; // We've made forward progress, so reset the race counter! -+ prep_then_push(std::move(p), r); -+ } else { -+ if (r < 0) { -+ lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " prepare_new_head failed: r=" << r -+ << " tid=" << tid << dendl; -+ complete(std::move(p), r); -+ return; -+ } -+ new_heading = false; -+ handle_new_head(std::move(p), r); -+ } -+ } -+ -+ void handle_new_head(Ptr&& p, int r) { -+ if (r == -ECANCELED) { -+ if (p->i == MAX_RACE_RETRIES) { -+ lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " canceled too many times, giving up: tid=" << tid << dendl; -+ complete(std::move(p), -ECANCELED); -+ return; -+ } -+ ++p->i; -+ } else if (r) { -+ complete(std::move(p), r); -+ return; -+ } -+ -+ if (p->batch.empty()) { -+ prep_then_push(std::move(p), 0); -+ return; -+ } else { -+ push(std::move(p)); -+ return; -+ } -+ } -+ -+ Pusher(FIFO* f, std::deque&& remaining, -+ std::uint64_t tid, lr::AioCompletion* super) -+ : Completion(super), f(f), remaining(std::move(remaining)), -+ tid(tid) {} -+}; -+ -+void FIFO::push(const std::vector& data_bufs, -+ lr::AioCompletion* c) -+{ -+ std::unique_lock l(m); -+ auto tid = ++next_tid; -+ auto max_entry_size = info.params.max_entry_size; -+ auto need_new_head = info.need_new_head(); -+ l.unlock(); -+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " entering: tid=" << tid << dendl; -+ auto p = std::make_unique(this, std::deque(data_bufs.begin(), data_bufs.end()), -+ tid, c); -+ // Validate sizes -+ for (const auto& bl : data_bufs) { -+ if (bl.length() > max_entry_size) { -+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " entry bigger than max_entry_size tid=" << tid << dendl; -+ Pusher::complete(std::move(p), -E2BIG); -+ return; -+ } -+ } -+ -+ if (data_bufs.empty() ) { -+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " empty push, returning success tid=" << tid << dendl; -+ Pusher::complete(std::move(p), 0); -+ return; -+ } -+ -+ if (need_new_head) { -+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " need new head tid=" << tid << dendl; -+ p->new_head(std::move(p)); -+ } else { -+ p->prep_then_push(std::move(p), 0); -+ } -+} -+ -+int FIFO::list(int max_entries, -+ std::optional markstr, -+ std::vector* presult, bool* pmore, -+ optional_yield y) -+{ -+ std::unique_lock l(m); -+ auto tid = ++next_tid; -+ std::int64_t part_num = info.tail_part_num; -+ l.unlock(); -+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " entering: tid=" << tid << dendl; -+ std::uint64_t ofs = 0; -+ if (markstr) { -+ auto marker = to_marker(*markstr); -+ if (!marker) { -+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " invalid marker string: " << markstr -+ << " tid= "<< tid << dendl; - return -EINVAL; - } - part_num = marker->num; -@@ -1340,157 +1857,116 @@ int FIFO::trim(std::string_view markstr, bool exclusive, optional_yield y) - return 0; - } - --struct Trimmer { -+struct Trimmer : public Completion { - FIFO* fifo; - std::int64_t part_num; - std::uint64_t ofs; - std::int64_t pn; - bool exclusive; -- lr::AioCompletion* super; - std::uint64_t tid; -- lr::AioCompletion* cur = lr::Rados::aio_create_completion( -- static_cast(this), &FIFO::trim_callback); - bool update = false; - bool canceled = false; - int retries = 0; - - Trimmer(FIFO* fifo, std::int64_t part_num, std::uint64_t ofs, std::int64_t pn, - bool exclusive, lr::AioCompletion* super, std::uint64_t tid) -- : fifo(fifo), part_num(part_num), ofs(ofs), pn(pn), exclusive(exclusive), -- super(super), tid(tid) { -- super->pc->get(); -- } -- ~Trimmer() { -- cur->release(); -- } --}; -+ : Completion(super), fifo(fifo), part_num(part_num), ofs(ofs), pn(pn), -+ exclusive(exclusive), tid(tid) {} - --void FIFO::trim_callback(lr::completion_t, void* arg) --{ -- std::unique_ptr trimmer(static_cast(arg)); -- auto cct = trimmer->fifo->cct; -- auto tid = trimmer->tid; -- ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -- << " entering: tid=" << tid << dendl; -- int r = trimmer->cur->get_return_value(); -- if (r == -ENOENT) { -- r = 0; -- } -- -- if (r < 0) { -- lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -- << " trim failed: r=" << r << " tid=" << tid << dendl; -- complete(trimmer->super, r); -- return; -- } -- -- if (!trimmer->update) { -+ void handle(Ptr&& p, int r) { -+ auto cct = fifo->cct; - ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -- << " handling preceding trim callback: tid=" << tid << dendl; -- trimmer->retries = 0; -- if (trimmer->pn < trimmer->part_num) { -- std::unique_lock l(trimmer->fifo->m); -- const auto max_part_size = trimmer->fifo->info.params.max_part_size; -- l.unlock(); -- trimmer->cur->release(); -- trimmer->cur = lr::Rados::aio_create_completion(arg, &FIFO::trim_callback); -- r = trimmer->fifo->trim_part(trimmer->pn++, max_part_size, std::nullopt, -- false, tid, trimmer->cur); -- if (r < 0) { -- lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -- << " trim failed: r=" << r << " tid=" << tid << dendl; -- complete(trimmer->super, r); -- } else { -- trimmer.release(); -- } -- return; -+ << " entering: tid=" << tid << dendl; -+ if (r == -ENOENT) { -+ r = 0; - } - -- std::unique_lock l(trimmer->fifo->m); -- const auto tail_part_num = trimmer->fifo->info.tail_part_num; -- l.unlock(); -- trimmer->cur->release(); -- trimmer->cur = lr::Rados::aio_create_completion(arg, &FIFO::trim_callback); -- trimmer->update = true; -- trimmer->canceled = tail_part_num < trimmer->part_num; -- r = trimmer->fifo->trim_part(trimmer->part_num, trimmer->ofs, -- std::nullopt, trimmer->exclusive, tid, trimmer->cur); - if (r < 0) { - lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -- << " failed scheduling trim: r=" << r << " tid=" << tid << dendl; -- complete(trimmer->super, r); -- } else { -- trimmer.release(); -+ << (update ? " update_meta " : " trim ") << "failed: r=" -+ << r << " tid=" << tid << dendl; -+ complete(std::move(p), r); -+ return; - } -- return; -- } - -- ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -- << " handling update-needed callback: tid=" << tid << dendl; -- std::unique_lock l(trimmer->fifo->m); -- auto tail_part_num = trimmer->fifo->info.tail_part_num; -- auto objv = trimmer->fifo->info.version; -- l.unlock(); -- if ((tail_part_num < trimmer->part_num) && -- trimmer->canceled) { -- if (trimmer->retries > MAX_RACE_RETRIES) { -- lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -- << " canceled too many times, giving up: tid=" << tid << dendl; -- complete(trimmer->super, -EIO); -+ if (!update) { -+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " handling preceding trim callback: tid=" << tid << dendl; -+ retries = 0; -+ if (pn < part_num) { -+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " pn=" << pn << " tid=" << tid << dendl; -+ std::unique_lock l(fifo->m); -+ const auto max_part_size = fifo->info.params.max_part_size; -+ l.unlock(); -+ fifo->trim_part(pn++, max_part_size, std::nullopt, -+ false, tid, call(std::move(p))); -+ return; -+ } -+ -+ std::unique_lock l(fifo->m); -+ const auto tail_part_num = fifo->info.tail_part_num; -+ l.unlock(); -+ update = true; -+ canceled = tail_part_num < part_num; -+ fifo->trim_part(part_num, ofs, std::nullopt, exclusive, tid, -+ call(std::move(p))); - return; - } -- trimmer->cur->release(); -- trimmer->cur = lr::Rados::aio_create_completion(arg, -- &FIFO::trim_callback); -- ++trimmer->retries; -- r = trimmer->fifo->_update_meta(fifo::update{} -- .tail_part_num(trimmer->part_num), -- objv, &trimmer->canceled, -- tid, trimmer->cur); -- if (r < 0) { -- lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -- << " failed scheduling _update_meta: r=" -- << r << " tid=" << tid << dendl; -- complete(trimmer->super, r); -+ -+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " handling update-needed callback: tid=" << tid << dendl; -+ std::unique_lock l(fifo->m); -+ auto tail_part_num = fifo->info.tail_part_num; -+ auto objv = fifo->info.version; -+ l.unlock(); -+ if ((tail_part_num < part_num) && -+ canceled) { -+ if (retries > MAX_RACE_RETRIES) { -+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " canceled too many times, giving up: tid=" << tid << dendl; -+ complete(std::move(p), -EIO); -+ return; -+ } -+ ++retries; -+ fifo->_update_meta(fifo::update{} -+ .tail_part_num(part_num), objv, &canceled, -+ tid, call(std::move(p))); - } else { -- trimmer.release(); -+ complete(std::move(p), 0); - } -- } else { -- complete(trimmer->super, 0); - } --} -+}; - --int FIFO::trim(std::string_view markstr, bool exclusive, lr::AioCompletion* c) { -+void FIFO::trim(std::string_view markstr, bool exclusive, -+ lr::AioCompletion* c) { - auto marker = to_marker(markstr); -- if (!marker) { -- return -EINVAL; -- } -+ auto realmark = marker.value_or(::rgw::cls::fifo::marker{}); - std::unique_lock l(m); - const auto max_part_size = info.params.max_part_size; - const auto pn = info.tail_part_num; - const auto part_oid = info.part_oid(pn); - auto tid = ++next_tid; - l.unlock(); -- auto trimmer = std::make_unique(this, marker->num, marker->ofs, pn, exclusive, c, -- tid); -+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " entering: tid=" << tid << dendl; -+ auto trimmer = std::make_unique(this, realmark.num, realmark.ofs, -+ pn, exclusive, c, tid); -+ if (!marker) { -+ Trimmer::complete(std::move(trimmer), -EINVAL); -+ return; -+ } - ++trimmer->pn; - auto ofs = marker->ofs; - if (pn < marker->num) { -+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " pn=" << pn << " tid=" << tid << dendl; - ofs = max_part_size; - } else { - trimmer->update = true; - } -- auto r = trim_part(pn, ofs, std::nullopt, exclusive, -- tid, trimmer->cur); -- if (r < 0) { -- lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -- << " failed scheduling trim_part: r=" -- << r << " tid=" << tid << dendl; -- complete(trimmer->super, r); -- } else { -- trimmer.release(); -- } -- return r; -+ trim_part(pn, ofs, std::nullopt, exclusive, -+ tid, Trimmer::call(std::move(trimmer))); - } - - int FIFO::get_part_info(int64_t part_num, -@@ -1509,4 +1985,521 @@ int FIFO::get_part_info(int64_t part_num, - } - return r; - } -+ -+void FIFO::get_part_info(int64_t part_num, -+ fifo::part_header* header, -+ lr::AioCompletion* c) -+{ -+ std::unique_lock l(m); -+ const auto part_oid = info.part_oid(part_num); -+ auto tid = ++next_tid; -+ l.unlock(); -+ auto op = rgw::cls::fifo::get_part_info(cct, header, tid); -+ auto r = ioctx.aio_operate(part_oid, c, &op, nullptr); -+ ceph_assert(r >= 0); -+} -+ -+struct InfoGetter : Completion { -+ FIFO* fifo; -+ fifo::part_header header; -+ fu2::function f; -+ std::uint64_t tid; -+ bool headerread = false; -+ -+ InfoGetter(FIFO* fifo, fu2::function f, -+ std::uint64_t tid, lr::AioCompletion* super) -+ : Completion(super), fifo(fifo), f(std::move(f)), tid(tid) {} -+ void handle(Ptr&& p, int r) { -+ if (!headerread) { -+ if (r < 0) { -+ lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " read_meta failed: r=" -+ << r << " tid=" << tid << dendl; -+ if (f) -+ f(r, {}); -+ complete(std::move(p), r); -+ return; -+ } -+ -+ auto info = fifo->meta(); -+ auto hpn = info.head_part_num; -+ if (hpn < 0) { -+ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " no head, returning empty partinfo r=" -+ << r << " tid=" << tid << dendl; -+ if (f) -+ f(0, {}); -+ complete(std::move(p), r); -+ return; -+ } -+ headerread = true; -+ auto op = rgw::cls::fifo::get_part_info(fifo->cct, &header, tid); -+ std::unique_lock l(fifo->m); -+ auto oid = fifo->info.part_oid(hpn); -+ l.unlock(); -+ r = fifo->ioctx.aio_operate(oid, call(std::move(p)), &op, -+ nullptr); -+ ceph_assert(r >= 0); -+ return; -+ } -+ -+ if (r < 0) { -+ lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " get_part_info failed: r=" -+ << r << " tid=" << tid << dendl; -+ } -+ -+ if (f) -+ f(r, std::move(header)); -+ complete(std::move(p), r); -+ return; -+ } -+}; -+ -+void FIFO::get_head_info(fu2::unique_function f, -+ lr::AioCompletion* c) -+{ -+ std::unique_lock l(m); -+ auto tid = ++next_tid; -+ l.unlock(); -+ auto ig = std::make_unique(this, std::move(f), tid, c); -+ read_meta(tid, InfoGetter::call(std::move(ig))); -+} -+ -+struct JournalProcessor : public Completion { -+private: -+ FIFO* const fifo; -+ -+ std::vector processed; -+ std::multimap journal; -+ std::multimap::iterator iter; -+ std::int64_t new_tail; -+ std::int64_t new_head; -+ std::int64_t new_max; -+ int race_retries = 0; -+ bool first_pp = true; -+ bool canceled = false; -+ std::uint64_t tid; -+ -+ enum { -+ entry_callback, -+ pp_callback, -+ } state; -+ -+ void create_part(Ptr&& p, int64_t part_num, -+ std::string_view tag) { -+ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " entering: tid=" << tid << dendl; -+ state = entry_callback; -+ lr::ObjectWriteOperation op; -+ op.create(false); /* We don't need exclusivity, part_init ensures -+ we're creating from the same journal entry. */ -+ std::unique_lock l(fifo->m); -+ part_init(&op, tag, fifo->info.params); -+ auto oid = fifo->info.part_oid(part_num); -+ l.unlock(); -+ auto r = fifo->ioctx.aio_operate(oid, call(std::move(p)), &op); -+ ceph_assert(r >= 0); -+ return; -+ } -+ -+ void remove_part(Ptr&& p, int64_t part_num, -+ std::string_view tag) { -+ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " entering: tid=" << tid << dendl; -+ state = entry_callback; -+ lr::ObjectWriteOperation op; -+ op.remove(); -+ std::unique_lock l(fifo->m); -+ auto oid = fifo->info.part_oid(part_num); -+ l.unlock(); -+ auto r = fifo->ioctx.aio_operate(oid, call(std::move(p)), &op); -+ ceph_assert(r >= 0); -+ return; -+ } -+ -+ void finish_je(Ptr&& p, int r, -+ const fifo::journal_entry& entry) { -+ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " entering: tid=" << tid << dendl; -+ -+ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " finishing entry: entry=" << entry -+ << " tid=" << tid << dendl; -+ -+ if (entry.op == fifo::journal_entry::Op::remove && r == -ENOENT) -+ r = 0; -+ -+ if (r < 0) { -+ lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " processing entry failed: entry=" << entry -+ << " r=" << r << " tid=" << tid << dendl; -+ complete(std::move(p), r); -+ return; -+ } else { -+ switch (entry.op) { -+ case fifo::journal_entry::Op::unknown: -+ case fifo::journal_entry::Op::set_head: -+ // Can't happen. Filtered out in process. -+ complete(std::move(p), -EIO); -+ return; -+ -+ case fifo::journal_entry::Op::create: -+ if (entry.part_num > new_max) { -+ new_max = entry.part_num; -+ } -+ break; -+ case fifo::journal_entry::Op::remove: -+ if (entry.part_num >= new_tail) { -+ new_tail = entry.part_num + 1; -+ } -+ break; -+ } -+ processed.push_back(entry); -+ } -+ ++iter; -+ process(std::move(p)); -+ } -+ -+ void postprocess(Ptr&& p) { -+ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " entering: tid=" << tid << dendl; -+ if (processed.empty()) { -+ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " nothing to update any more: race_retries=" -+ << race_retries << " tid=" << tid << dendl; -+ complete(std::move(p), 0); -+ return; -+ } -+ pp_run(std::move(p), 0, false); -+ } -+ -+public: -+ -+ JournalProcessor(FIFO* fifo, std::uint64_t tid, lr::AioCompletion* super) -+ : Completion(super), fifo(fifo), tid(tid) { -+ std::unique_lock l(fifo->m); -+ journal = fifo->info.journal; -+ iter = journal.begin(); -+ new_tail = fifo->info.tail_part_num; -+ new_head = fifo->info.head_part_num; -+ new_max = fifo->info.max_push_part_num; -+ } -+ -+ void pp_run(Ptr&& p, int r, bool canceled) { -+ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " entering: tid=" << tid << dendl; -+ std::optional tail_part_num; -+ std::optional head_part_num; -+ std::optional max_part_num; -+ -+ if (r < 0) { -+ lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " failed, r=: " << r << " tid=" << tid << dendl; -+ complete(std::move(p), r); -+ } -+ -+ -+ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " postprocessing: race_retries=" -+ << race_retries << " tid=" << tid << dendl; -+ -+ if (!first_pp && r == 0 && !canceled) { -+ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " nothing to update any more: race_retries=" -+ << race_retries << " tid=" << tid << dendl; -+ complete(std::move(p), 0); -+ return; -+ } -+ -+ first_pp = false; -+ -+ if (canceled) { -+ if (race_retries >= MAX_RACE_RETRIES) { -+ lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " canceled too many times, giving up: tid=" -+ << tid << dendl; -+ complete(std::move(p), -ECANCELED); -+ return; -+ } -+ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " update canceled, retrying: race_retries=" -+ << race_retries << " tid=" << tid << dendl; -+ -+ ++race_retries; -+ -+ std::vector new_processed; -+ std::unique_lock l(fifo->m); -+ for (auto& e : processed) { -+ auto jiter = fifo->info.journal.find(e.part_num); -+ /* journal entry was already processed */ -+ if (jiter == fifo->info.journal.end() || -+ !(jiter->second == e)) { -+ continue; -+ } -+ new_processed.push_back(e); -+ } -+ processed = std::move(new_processed); -+ } -+ -+ std::unique_lock l(fifo->m); -+ auto objv = fifo->info.version; -+ if (new_tail > fifo->info.tail_part_num) { -+ tail_part_num = new_tail; -+ } -+ -+ if (new_head > fifo->info.head_part_num) { -+ head_part_num = new_head; -+ } -+ -+ if (new_max > fifo->info.max_push_part_num) { -+ max_part_num = new_max; -+ } -+ l.unlock(); -+ -+ if (processed.empty() && -+ !tail_part_num && -+ !max_part_num) { -+ /* nothing to update anymore */ -+ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " nothing to update any more: race_retries=" -+ << race_retries << " tid=" << tid << dendl; -+ complete(std::move(p), 0); -+ return; -+ } -+ state = pp_callback; -+ fifo->_update_meta(fifo::update{} -+ .tail_part_num(tail_part_num) -+ .head_part_num(head_part_num) -+ .max_push_part_num(max_part_num) -+ .journal_entries_rm(processed), -+ objv, &this->canceled, tid, call(std::move(p))); -+ return; -+ } -+ -+ JournalProcessor(const JournalProcessor&) = delete; -+ JournalProcessor& operator =(const JournalProcessor&) = delete; -+ JournalProcessor(JournalProcessor&&) = delete; -+ JournalProcessor& operator =(JournalProcessor&&) = delete; -+ -+ void process(Ptr&& p) { -+ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " entering: tid=" << tid << dendl; -+ while (iter != journal.end()) { -+ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " processing entry: entry=" << *iter -+ << " tid=" << tid << dendl; -+ const auto entry = iter->second; -+ switch (entry.op) { -+ case fifo::journal_entry::Op::create: -+ create_part(std::move(p), entry.part_num, entry.part_tag); -+ return; -+ case fifo::journal_entry::Op::set_head: -+ if (entry.part_num > new_head) { -+ new_head = entry.part_num; -+ } -+ processed.push_back(entry); -+ ++iter; -+ continue; -+ case fifo::journal_entry::Op::remove: -+ remove_part(std::move(p), entry.part_num, entry.part_tag); -+ return; -+ default: -+ lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " unknown journaled op: entry=" << entry << " tid=" -+ << tid << dendl; -+ complete(std::move(p), -EIO); -+ return; -+ } -+ } -+ postprocess(std::move(p)); -+ return; -+ } -+ -+ void handle(Ptr&& p, int r) { -+ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " entering: tid=" << tid << dendl; -+ switch (state) { -+ case entry_callback: -+ finish_je(std::move(p), r, iter->second); -+ return; -+ case pp_callback: -+ auto c = canceled; -+ canceled = false; -+ pp_run(std::move(p), r, c); -+ return; -+ } -+ -+ abort(); -+ } -+ -+}; -+ -+void FIFO::process_journal(std::uint64_t tid, lr::AioCompletion* c) { -+ auto p = std::make_unique(this, tid, c); -+ p->process(std::move(p)); -+} -+ -+struct Lister : Completion { -+ FIFO* f; -+ std::vector result; -+ bool more = false; -+ std::int64_t part_num; -+ std::uint64_t ofs; -+ int max_entries; -+ int r_out = 0; -+ std::vector entries; -+ bool part_more = false; -+ bool part_full = false; -+ std::vector* entries_out; -+ bool* more_out; -+ std::uint64_t tid; -+ -+ bool read = false; -+ -+ void complete(Ptr&& p, int r) { -+ if (r >= 0) { -+ if (more_out) *more_out = more; -+ if (entries_out) *entries_out = std::move(result); -+ } -+ Completion::complete(std::move(p), r); -+ } -+ -+public: -+ Lister(FIFO* f, std::int64_t part_num, std::uint64_t ofs, int max_entries, -+ std::vector* entries_out, bool* more_out, -+ std::uint64_t tid, lr::AioCompletion* super) -+ : Completion(super), f(f), part_num(part_num), ofs(ofs), max_entries(max_entries), -+ entries_out(entries_out), more_out(more_out), tid(tid) { -+ result.reserve(max_entries); -+ } -+ -+ Lister(const Lister&) = delete; -+ Lister& operator =(const Lister&) = delete; -+ Lister(Lister&&) = delete; -+ Lister& operator =(Lister&&) = delete; -+ -+ void handle(Ptr&& p, int r) { -+ if (read) -+ handle_read(std::move(p), r); -+ else -+ handle_list(std::move(p), r); -+ } -+ -+ void list(Ptr&& p) { -+ if (max_entries > 0) { -+ part_more = false; -+ part_full = false; -+ entries.clear(); -+ -+ std::unique_lock l(f->m); -+ auto part_oid = f->info.part_oid(part_num); -+ l.unlock(); -+ -+ read = false; -+ auto op = list_part(f->cct, {}, ofs, max_entries, &r_out, -+ &entries, &part_more, &part_full, -+ nullptr, tid); -+ f->ioctx.aio_operate(part_oid, call(std::move(p)), &op, nullptr); -+ } else { -+ complete(std::move(p), 0); -+ } -+ } -+ -+ void handle_read(Ptr&& p, int r) { -+ read = false; -+ if (r >= 0) r = r_out; -+ r_out = 0; -+ -+ if (r < 0) { -+ complete(std::move(p), r); -+ return; -+ } -+ -+ if (part_num < f->info.tail_part_num) { -+ /* raced with trim? restart */ -+ max_entries += result.size(); -+ result.clear(); -+ part_num = f->info.tail_part_num; -+ ofs = 0; -+ list(std::move(p)); -+ return; -+ } -+ /* assuming part was not written yet, so end of data */ -+ more = false; -+ complete(std::move(p), 0); -+ return; -+ } -+ -+ void handle_list(Ptr&& p, int r) { -+ if (r >= 0) r = r_out; -+ r_out = 0; -+ std::unique_lock l(f->m); -+ auto part_oid = f->info.part_oid(part_num); -+ l.unlock(); -+ if (r == -ENOENT) { -+ read = true; -+ f->read_meta(tid, call(std::move(p))); -+ return; -+ } -+ if (r < 0) { -+ complete(std::move(p), r); -+ return; -+ } -+ -+ more = part_full || part_more; -+ for (auto& entry : entries) { -+ list_entry e; -+ e.data = std::move(entry.data); -+ e.marker = marker{part_num, entry.ofs}.to_string(); -+ e.mtime = entry.mtime; -+ result.push_back(std::move(e)); -+ } -+ max_entries -= entries.size(); -+ entries.clear(); -+ if (max_entries > 0 && part_more) { -+ list(std::move(p)); -+ return; -+ } -+ -+ if (!part_full) { /* head part is not full */ -+ complete(std::move(p), 0); -+ return; -+ } -+ ++part_num; -+ ofs = 0; -+ list(std::move(p)); -+ } -+}; -+ -+void FIFO::list(int max_entries, -+ std::optional markstr, -+ std::vector* out, -+ bool* more, -+ lr::AioCompletion* c) { -+ std::unique_lock l(m); -+ auto tid = ++next_tid; -+ std::int64_t part_num = info.tail_part_num; -+ l.unlock(); -+ std::uint64_t ofs = 0; -+ std::optional<::rgw::cls::fifo::marker> marker; -+ -+ if (markstr) { -+ marker = to_marker(*markstr); -+ if (marker) { -+ part_num = marker->num; -+ ofs = marker->ofs; -+ } -+ } -+ -+ auto ls = std::make_unique(this, part_num, ofs, max_entries, out, -+ more, tid, c); -+ if (markstr && !marker) { -+ auto l = ls.get(); -+ l->complete(std::move(ls), -EINVAL); -+ } else { -+ ls->list(std::move(ls)); -+ } -+} - } -diff --git a/src/rgw/cls_fifo_legacy.h b/src/rgw/cls_fifo_legacy.h -index 1f8d3f3fc95d8..b6b5f04bb30ad 100644 ---- a/src/rgw/cls_fifo_legacy.h -+++ b/src/rgw/cls_fifo_legacy.h -@@ -31,6 +31,7 @@ - - #include "include/rados/librados.hpp" - #include "include/buffer.h" -+#include "include/function2.hpp" - - #include "common/async/yield_context.h" - -@@ -57,24 +58,6 @@ int get_meta(lr::IoCtx& ioctx, const std::string& oid, - std::uint32_t* part_entry_overhead, - std::uint64_t tid, optional_yield y, - bool probe = false); --void update_meta(lr::ObjectWriteOperation* op, const fifo::objv& objv, -- const fifo::update& update); --void part_init(lr::ObjectWriteOperation* op, std::string_view tag, -- fifo::data_params params); --int push_part(lr::IoCtx& ioctx, const std::string& oid, std::string_view tag, -- std::deque data_bufs, std::uint64_t tid, optional_yield y); --void trim_part(lr::ObjectWriteOperation* op, -- std::optional tag, std::uint64_t ofs, -- bool exclusive); --int list_part(lr::IoCtx& ioctx, const std::string& oid, -- std::optional tag, std::uint64_t ofs, -- std::uint64_t max_entries, -- std::vector* entries, -- bool* more, bool* full_part, std::string* ptag, -- std::uint64_t tid, optional_yield y); --int get_part_info(lr::IoCtx& ioctx, const std::string& oid, -- fifo::part_header* header, std::uint64_t, -- optional_yield y); - - struct marker { - std::int64_t num = 0; -@@ -117,6 +100,12 @@ class FIFO { - friend struct Reader; - friend struct Updater; - friend struct Trimmer; -+ friend struct InfoGetter; -+ friend struct Pusher; -+ friend struct NewPartPreparer; -+ friend struct NewHeadPreparer; -+ friend struct JournalProcessor; -+ friend struct Lister; - - mutable lr::IoCtx ioctx; - CephContext* cct = static_cast(ioctx.cct()); -@@ -144,32 +133,34 @@ class FIFO { - int _update_meta(const fifo::update& update, - fifo::objv version, bool* pcanceled, - std::uint64_t tid, optional_yield y); -- int _update_meta(const fifo::update& update, -- fifo::objv version, bool* pcanceled, -- std::uint64_t tid, lr::AioCompletion* c); -+ void _update_meta(const fifo::update& update, -+ fifo::objv version, bool* pcanceled, -+ std::uint64_t tid, lr::AioCompletion* c); - int create_part(int64_t part_num, std::string_view tag, std::uint64_t tid, - optional_yield y); - int remove_part(int64_t part_num, std::string_view tag, std::uint64_t tid, - optional_yield y); - int process_journal(std::uint64_t tid, optional_yield y); -+ void process_journal(std::uint64_t tid, lr::AioCompletion* c); - int _prepare_new_part(bool is_head, std::uint64_t tid, optional_yield y); -+ void _prepare_new_part(bool is_head, std::uint64_t tid, lr::AioCompletion* c); - int _prepare_new_head(std::uint64_t tid, optional_yield y); -+ void _prepare_new_head(std::uint64_t tid, lr::AioCompletion* c); - int push_entries(const std::deque& data_bufs, - std::uint64_t tid, optional_yield y); -+ void push_entries(const std::deque& data_bufs, -+ std::uint64_t tid, lr::AioCompletion* c); - int trim_part(int64_t part_num, uint64_t ofs, - std::optional tag, bool exclusive, - std::uint64_t tid, optional_yield y); -- int trim_part(int64_t part_num, uint64_t ofs, -- std::optional tag, bool exclusive, -- std::uint64_t tid, lr::AioCompletion* c); -+ void trim_part(int64_t part_num, uint64_t ofs, -+ std::optional tag, bool exclusive, -+ std::uint64_t tid, lr::AioCompletion* c); - -- static void trim_callback(lr::completion_t, void* arg); -- static void update_callback(lr::completion_t, void* arg); -- static void read_callback(lr::completion_t, void* arg); - /// Force refresh of metadata, yielding/blocking style - int read_meta(std::uint64_t tid, optional_yield y); - /// Force refresh of metadata, with a librados Completion -- int read_meta(std::uint64_t tid, lr::AioCompletion* c); -+ void read_meta(std::uint64_t tid, lr::AioCompletion* c); - - public: - -@@ -215,12 +206,20 @@ class FIFO { - int push(const cb::list& bl, //< Entry to push - optional_yield y //< Optional yield - ); -- /// Push entres to the FIFO -+ /// Push an entry to the FIFO -+ void push(const cb::list& bl, //< Entry to push -+ lr::AioCompletion* c //< Async Completion -+ ); -+ /// Push entries to the FIFO - int push(const std::vector& data_bufs, //< Entries to push -- /// Optional yield -- optional_yield y); -+ optional_yield y //< Optional yield -+ ); -+ /// Push entries to the FIFO -+ void push(const std::vector& data_bufs, //< Entries to push -+ lr::AioCompletion* c //< Async Completion -+ ); - /// List entries -- int list(int max_entries, /// Maximum entries to list -+ int list(int max_entries, //< Maximum entries to list - /// Point after which to begin listing. Start at tail if null - std::optional markstr, - std::vector* out, //< OUT: entries -@@ -228,6 +227,14 @@ class FIFO { - bool* more, - optional_yield y //< Optional yield - ); -+ void list(int max_entries, //< Maximum entries to list -+ /// Point after which to begin listing. Start at tail if null -+ std::optional markstr, -+ std::vector* out, //< OUT: entries -+ /// OUT: True if more entries in FIFO beyond the last returned -+ bool* more, -+ lr::AioCompletion* c //< Async Completion -+ ); - /// Trim entries, coroutine/block style - int trim(std::string_view markstr, //< Position to which to trim, inclusive - bool exclusive, //< If true, do not trim the target entry -@@ -235,16 +242,28 @@ class FIFO { - optional_yield y //< Optional yield - ); - /// Trim entries, librados AioCompletion style -- int trim(std::string_view markstr, //< Position to which to trim, inclusive -- bool exclusive, //< If true, do not trim the target entry -- //< itself, just all those before it. -- lr::AioCompletion* c //< librados AIO Completion -+ void trim(std::string_view markstr, //< Position to which to trim, inclusive -+ bool exclusive, //< If true, do not trim the target entry -+ //< itself, just all those before it. -+ lr::AioCompletion* c //< librados AIO Completion - ); - /// Get part info - int get_part_info(int64_t part_num, /// Part number - fifo::part_header* header, //< OUT: Information - optional_yield y //< Optional yield - ); -+ /// Get part info -+ void get_part_info(int64_t part_num, //< Part number -+ fifo::part_header* header, //< OUT: Information -+ lr::AioCompletion* c //< AIO Completion -+ ); -+ /// A convenience method to fetch the part information for the FIFO -+ /// head, using librados::AioCompletion, since -+ /// libradio::AioCompletions compose lousily. -+ void get_head_info(fu2::unique_function< //< Function to receive info -+ void(int r, fifo::part_header&&)>, -+ lr::AioCompletion* c //< AIO Completion -+ ); - }; - } - -diff --git a/src/rgw/rgw_datalog.cc b/src/rgw/rgw_datalog.cc -index a875d075ecade..8142b26e01a8b 100644 ---- a/src/rgw/rgw_datalog.cc -+++ b/src/rgw/rgw_datalog.cc -@@ -469,12 +469,7 @@ class RGWDataChangesFIFO final : public RGWDataChangesBE { - pc->cond.notify_all(); - pc->put_unlock(); - } else { -- r = fifos[index]->trim(marker, false, c); -- if (r < 0) { -- lderr(cct) << __PRETTY_FUNCTION__ -- << ": unable to trim FIFO: " << get_oid(index) -- << ": " << cpp_strerror(-r) << dendl; -- } -+ fifos[index]->trim(marker, false, c); - } - return r; - } -diff --git a/src/test/rgw/test_cls_fifo_legacy.cc b/src/test/rgw/test_cls_fifo_legacy.cc -index dae4980f8dca4..69cee5a887405 100644 ---- a/src/test/rgw/test_cls_fifo_legacy.cc -+++ b/src/test/rgw/test_cls_fifo_legacy.cc -@@ -69,6 +69,8 @@ class LegacyFIFO : public testing::Test { - }; - - using LegacyClsFIFO = LegacyFIFO; -+using AioLegacyFIFO = LegacyFIFO; -+ - - TEST_F(LegacyClsFIFO, TestCreate) - { -@@ -577,8 +579,7 @@ TEST_F(LegacyFIFO, TestAioTrim) - marker = result.front().marker; - std::unique_ptr c(rados.aio_create_completion(nullptr, - nullptr)); -- r = f->trim(*marker, false, c.get()); -- ASSERT_EQ(0, r); -+ f->trim(*marker, false, c.get()); - c->wait_for_complete(); - r = c->get_return_value(); - ASSERT_EQ(0, r); -@@ -645,3 +646,482 @@ TEST_F(LegacyFIFO, TestTrimExclusive) { - ASSERT_EQ(result.size(), 1); - ASSERT_EQ(max_entries - 1, val); - } -+ -+TEST_F(AioLegacyFIFO, TestPushListTrim) -+{ -+ std::unique_ptr f; -+ auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield); -+ ASSERT_EQ(0, r); -+ static constexpr auto max_entries = 10u; -+ for (uint32_t i = 0; i < max_entries; ++i) { -+ cb::list bl; -+ encode(i, bl); -+ auto c = R::Rados::aio_create_completion(); -+ f->push(bl, c); -+ c->wait_for_complete(); -+ r = c->get_return_value(); -+ c->release(); -+ ASSERT_EQ(0, r); -+ } -+ -+ std::optional marker; -+ /* get entries one by one */ -+ std::vector result; -+ bool more = false; -+ for (auto i = 0u; i < max_entries; ++i) { -+ auto c = R::Rados::aio_create_completion(); -+ f->list(1, marker, &result, &more, c); -+ c->wait_for_complete(); -+ r = c->get_return_value(); -+ c->release(); -+ ASSERT_EQ(0, r); -+ -+ bool expected_more = (i != (max_entries - 1)); -+ ASSERT_EQ(expected_more, more); -+ ASSERT_EQ(1, result.size()); -+ -+ std::uint32_t val; -+ std::tie(val, marker) = decode_entry(result.front()); -+ -+ ASSERT_EQ(i, val); -+ result.clear(); -+ } -+ -+ /* get all entries at once */ -+ std::string markers[max_entries]; -+ std::uint32_t min_entry = 0; -+ auto c = R::Rados::aio_create_completion(); -+ f->list(max_entries * 10, std::nullopt, &result, &more, c); -+ c->wait_for_complete(); -+ r = c->get_return_value(); -+ c->release(); -+ ASSERT_EQ(0, r); -+ -+ ASSERT_FALSE(more); -+ ASSERT_EQ(max_entries, result.size()); -+ for (auto i = 0u; i < max_entries; ++i) { -+ std::uint32_t val; -+ std::tie(val, markers[i]) = decode_entry(result[i]); -+ ASSERT_EQ(i, val); -+ } -+ -+ /* trim one entry */ -+ c = R::Rados::aio_create_completion(); -+ f->trim(markers[min_entry], false, c); -+ c->wait_for_complete(); -+ r = c->get_return_value(); -+ c->release(); -+ ASSERT_EQ(0, r); -+ ++min_entry; -+ -+ c = R::Rados::aio_create_completion(); -+ f->list(max_entries * 10, std::nullopt, &result, &more, c); -+ c->wait_for_complete(); -+ r = c->get_return_value(); -+ c->release(); -+ ASSERT_EQ(0, r); -+ ASSERT_FALSE(more); -+ ASSERT_EQ(max_entries - min_entry, result.size()); -+ -+ for (auto i = min_entry; i < max_entries; ++i) { -+ std::uint32_t val; -+ std::tie(val, markers[i - min_entry]) = -+ decode_entry(result[i - min_entry]); -+ EXPECT_EQ(i, val); -+ } -+} -+ -+ -+TEST_F(AioLegacyFIFO, TestPushTooBig) -+{ -+ static constexpr auto max_part_size = 2048ull; -+ static constexpr auto max_entry_size = 128ull; -+ -+ std::unique_ptr f; -+ auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield, std::nullopt, -+ std::nullopt, false, max_part_size, max_entry_size); -+ ASSERT_EQ(0, r); -+ -+ char buf[max_entry_size + 1]; -+ memset(buf, 0, sizeof(buf)); -+ -+ cb::list bl; -+ bl.append(buf, sizeof(buf)); -+ -+ auto c = R::Rados::aio_create_completion(); -+ f->push(bl, c); -+ c->wait_for_complete(); -+ r = c->get_return_value(); -+ ASSERT_EQ(-E2BIG, r); -+ c->release(); -+ -+ c = R::Rados::aio_create_completion(); -+ f->push(std::vector{}, c); -+ c->wait_for_complete(); -+ r = c->get_return_value(); -+ c->release(); -+ EXPECT_EQ(0, r); -+} -+ -+ -+TEST_F(AioLegacyFIFO, TestMultipleParts) -+{ -+ static constexpr auto max_part_size = 2048ull; -+ static constexpr auto max_entry_size = 128ull; -+ std::unique_ptr f; -+ auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield, std::nullopt, -+ std::nullopt, false, max_part_size, -+ max_entry_size); -+ ASSERT_EQ(0, r); -+ -+ { -+ auto c = R::Rados::aio_create_completion(); -+ f->get_head_info([&](int r, RCf::part_info&& p) { -+ ASSERT_TRUE(p.tag.empty()); -+ ASSERT_EQ(0, p.magic); -+ ASSERT_EQ(0, p.min_ofs); -+ ASSERT_EQ(0, p.last_ofs); -+ ASSERT_EQ(0, p.next_ofs); -+ ASSERT_EQ(0, p.min_index); -+ ASSERT_EQ(0, p.max_index); -+ ASSERT_EQ(ceph::real_time{}, p.max_time); -+ }, c); -+ c->wait_for_complete(); -+ r = c->get_return_value(); -+ c->release(); -+ } -+ -+ char buf[max_entry_size]; -+ memset(buf, 0, sizeof(buf)); -+ const auto [part_header_size, part_entry_overhead] = -+ f->get_part_layout_info(); -+ const auto entries_per_part = ((max_part_size - part_header_size) / -+ (max_entry_size + part_entry_overhead)); -+ const auto max_entries = entries_per_part * 4 + 1; -+ /* push enough entries */ -+ for (auto i = 0u; i < max_entries; ++i) { -+ cb::list bl; -+ *(int *)buf = i; -+ bl.append(buf, sizeof(buf)); -+ auto c = R::Rados::aio_create_completion(); -+ f->push(bl, c); -+ c->wait_for_complete(); -+ r = c->get_return_value(); -+ c->release(); -+ EXPECT_EQ(0, r); -+ } -+ -+ auto info = f->meta(); -+ ASSERT_EQ(info.id, fifo_id); -+ /* head should have advanced */ -+ ASSERT_GT(info.head_part_num, 0); -+ -+ /* list all at once */ -+ std::vector result; -+ bool more = false; -+ auto c = R::Rados::aio_create_completion(); -+ f->list(max_entries, std::nullopt, &result, &more, c); -+ c->wait_for_complete(); -+ r = c->get_return_value(); -+ c->release(); -+ EXPECT_EQ(0, r); -+ EXPECT_EQ(false, more); -+ ASSERT_EQ(max_entries, result.size()); -+ -+ for (auto i = 0u; i < max_entries; ++i) { -+ auto& bl = result[i].data; -+ ASSERT_EQ(i, *(int *)bl.c_str()); -+ } -+ -+ std::optional marker; -+ /* get entries one by one */ -+ -+ for (auto i = 0u; i < max_entries; ++i) { -+ c = R::Rados::aio_create_completion(); -+ f->list(1, marker, &result, &more, c); -+ c->wait_for_complete(); -+ r = c->get_return_value(); -+ c->release(); -+ EXPECT_EQ(0, r); -+ ASSERT_EQ(result.size(), 1); -+ const bool expected_more = (i != (max_entries - 1)); -+ ASSERT_EQ(expected_more, more); -+ -+ std::uint32_t val; -+ std::tie(val, marker) = decode_entry(result.front()); -+ -+ auto& entry = result.front(); -+ auto& bl = entry.data; -+ ASSERT_EQ(i, *(int *)bl.c_str()); -+ marker = entry.marker; -+ } -+ -+ /* trim one at a time */ -+ marker.reset(); -+ for (auto i = 0u; i < max_entries; ++i) { -+ /* read single entry */ -+ c = R::Rados::aio_create_completion(); -+ f->list(1, marker, &result, &more, c); -+ c->wait_for_complete(); -+ r = c->get_return_value(); -+ c->release(); -+ EXPECT_EQ(0, r); -+ ASSERT_EQ(result.size(), 1); -+ const bool expected_more = (i != (max_entries - 1)); -+ ASSERT_EQ(expected_more, more); -+ -+ marker = result.front().marker; -+ c = R::Rados::aio_create_completion(); -+ f->trim(*marker, false, c); -+ c->wait_for_complete(); -+ r = c->get_return_value(); -+ c->release(); -+ EXPECT_EQ(0, r); -+ ASSERT_EQ(result.size(), 1); -+ -+ /* check tail */ -+ info = f->meta(); -+ ASSERT_EQ(info.tail_part_num, i / entries_per_part); -+ -+ /* try to read all again, see how many entries left */ -+ c = R::Rados::aio_create_completion(); -+ f->list(max_entries, marker, &result, &more, c); -+ c->wait_for_complete(); -+ r = c->get_return_value(); -+ c->release(); -+ EXPECT_EQ(0, r); -+ ASSERT_EQ(max_entries - i - 1, result.size()); -+ ASSERT_EQ(false, more); -+ } -+ -+ /* tail now should point at head */ -+ info = f->meta(); -+ ASSERT_EQ(info.head_part_num, info.tail_part_num); -+ -+ /* check old tails are removed */ -+ for (auto i = 0; i < info.tail_part_num; ++i) { -+ c = R::Rados::aio_create_completion(); -+ RCf::part_info partinfo; -+ f->get_part_info(i, &partinfo, c); -+ c->wait_for_complete(); -+ r = c->get_return_value(); -+ c->release(); -+ ASSERT_EQ(-ENOENT, r); -+ } -+ /* check current tail exists */ -+ std::uint64_t next_ofs; -+ { -+ c = R::Rados::aio_create_completion(); -+ RCf::part_info partinfo; -+ f->get_part_info(info.tail_part_num, &partinfo, c); -+ c->wait_for_complete(); -+ r = c->get_return_value(); -+ c->release(); -+ next_ofs = partinfo.next_ofs; -+ } -+ ASSERT_EQ(0, r); -+ -+ c = R::Rados::aio_create_completion(); -+ f->get_head_info([&](int r, RCf::part_info&& p) { -+ ASSERT_EQ(next_ofs, p.next_ofs); -+ }, c); -+ c->wait_for_complete(); -+ r = c->get_return_value(); -+ c->release(); -+ ASSERT_EQ(0, r); -+} -+ -+TEST_F(AioLegacyFIFO, TestTwoPushers) -+{ -+ static constexpr auto max_part_size = 2048ull; -+ static constexpr auto max_entry_size = 128ull; -+ -+ std::unique_ptr f; -+ auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield, std::nullopt, -+ std::nullopt, false, max_part_size, -+ max_entry_size); -+ ASSERT_EQ(0, r); -+ char buf[max_entry_size]; -+ memset(buf, 0, sizeof(buf)); -+ -+ auto [part_header_size, part_entry_overhead] = f->get_part_layout_info(); -+ const auto entries_per_part = ((max_part_size - part_header_size) / -+ (max_entry_size + part_entry_overhead)); -+ const auto max_entries = entries_per_part * 4 + 1; -+ std::unique_ptr f2; -+ r = RCf::FIFO::open(ioctx, fifo_id, &f2, null_yield); -+ std::vector fifos{&f, &f2}; -+ -+ for (auto i = 0u; i < max_entries; ++i) { -+ cb::list bl; -+ *(int *)buf = i; -+ bl.append(buf, sizeof(buf)); -+ auto& f = *fifos[i % fifos.size()]; -+ auto c = R::Rados::aio_create_completion(); -+ f->push(bl, c); -+ c->wait_for_complete(); -+ r = c->get_return_value(); -+ c->release(); -+ ASSERT_EQ(0, r); -+ } -+ -+ /* list all by both */ -+ std::vector result; -+ bool more = false; -+ auto c = R::Rados::aio_create_completion(); -+ f2->list(max_entries, std::nullopt, &result, &more, c); -+ c->wait_for_complete(); -+ r = c->get_return_value(); -+ c->release(); -+ ASSERT_EQ(0, r); -+ ASSERT_EQ(false, more); -+ ASSERT_EQ(max_entries, result.size()); -+ -+ c = R::Rados::aio_create_completion(); -+ f2->list(max_entries, std::nullopt, &result, &more, c); -+ c->wait_for_complete(); -+ r = c->get_return_value(); -+ c->release(); -+ ASSERT_EQ(0, r); -+ ASSERT_EQ(false, more); -+ ASSERT_EQ(max_entries, result.size()); -+ -+ for (auto i = 0u; i < max_entries; ++i) { -+ auto& bl = result[i].data; -+ ASSERT_EQ(i, *(int *)bl.c_str()); -+ } -+} -+ -+TEST_F(AioLegacyFIFO, TestTwoPushersTrim) -+{ -+ static constexpr auto max_part_size = 2048ull; -+ static constexpr auto max_entry_size = 128ull; -+ std::unique_ptr f1; -+ auto r = RCf::FIFO::create(ioctx, fifo_id, &f1, null_yield, std::nullopt, -+ std::nullopt, false, max_part_size, -+ max_entry_size); -+ ASSERT_EQ(0, r); -+ -+ char buf[max_entry_size]; -+ memset(buf, 0, sizeof(buf)); -+ -+ auto [part_header_size, part_entry_overhead] = f1->get_part_layout_info(); -+ const auto entries_per_part = ((max_part_size - part_header_size) / -+ (max_entry_size + part_entry_overhead)); -+ const auto max_entries = entries_per_part * 4 + 1; -+ -+ std::unique_ptr f2; -+ r = RCf::FIFO::open(ioctx, fifo_id, &f2, null_yield); -+ ASSERT_EQ(0, r); -+ -+ /* push one entry to f2 and the rest to f1 */ -+ for (auto i = 0u; i < max_entries; ++i) { -+ cb::list bl; -+ *(int *)buf = i; -+ bl.append(buf, sizeof(buf)); -+ auto& f = (i < 1 ? f2 : f1); -+ auto c = R::Rados::aio_create_completion(); -+ f->push(bl, c); -+ c->wait_for_complete(); -+ r = c->get_return_value(); -+ c->release(); -+ ASSERT_EQ(0, r); -+ } -+ -+ /* trim half by fifo1 */ -+ auto num = max_entries / 2; -+ std::string marker; -+ std::vector result; -+ bool more = false; -+ auto c = R::Rados::aio_create_completion(); -+ f1->list(num, std::nullopt, &result, &more, c); -+ c->wait_for_complete(); -+ r = c->get_return_value(); -+ c->release(); -+ ASSERT_EQ(0, r); -+ ASSERT_EQ(true, more); -+ ASSERT_EQ(num, result.size()); -+ -+ for (auto i = 0u; i < num; ++i) { -+ auto& bl = result[i].data; -+ ASSERT_EQ(i, *(int *)bl.c_str()); -+ } -+ -+ auto& entry = result[num - 1]; -+ marker = entry.marker; -+ c = R::Rados::aio_create_completion(); -+ f1->trim(marker, false, c); -+ c->wait_for_complete(); -+ r = c->get_return_value(); -+ c->release(); -+ ASSERT_EQ(0, r); -+ /* list what's left by fifo2 */ -+ -+ const auto left = max_entries - num; -+ c = R::Rados::aio_create_completion(); -+ f2->list(left, marker, &result, &more, c); -+ c->wait_for_complete(); -+ r = c->get_return_value(); -+ c->release(); -+ ASSERT_EQ(0, r); -+ ASSERT_EQ(left, result.size()); -+ ASSERT_EQ(false, more); -+ -+ for (auto i = num; i < max_entries; ++i) { -+ auto& bl = result[i - num].data; -+ ASSERT_EQ(i, *(int *)bl.c_str()); -+ } -+} -+ -+TEST_F(AioLegacyFIFO, TestPushBatch) -+{ -+ static constexpr auto max_part_size = 2048ull; -+ static constexpr auto max_entry_size = 128ull; -+ -+ std::unique_ptr f; -+ auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield, std::nullopt, -+ std::nullopt, false, max_part_size, -+ max_entry_size); -+ ASSERT_EQ(0, r); -+ -+ char buf[max_entry_size]; -+ memset(buf, 0, sizeof(buf)); -+ auto [part_header_size, part_entry_overhead] = f->get_part_layout_info(); -+ auto entries_per_part = ((max_part_size - part_header_size) / -+ (max_entry_size + part_entry_overhead)); -+ auto max_entries = entries_per_part * 4 + 1; /* enough entries to span multiple parts */ -+ std::vector bufs; -+ for (auto i = 0u; i < max_entries; ++i) { -+ cb::list bl; -+ *(int *)buf = i; -+ bl.append(buf, sizeof(buf)); -+ bufs.push_back(bl); -+ } -+ ASSERT_EQ(max_entries, bufs.size()); -+ -+ auto c = R::Rados::aio_create_completion(); -+ f->push(bufs, c); -+ c->wait_for_complete(); -+ r = c->get_return_value(); -+ c->release(); -+ ASSERT_EQ(0, r); -+ -+ /* list all */ -+ -+ std::vector result; -+ bool more = false; -+ c = R::Rados::aio_create_completion(); -+ f->list(max_entries, std::nullopt, &result, &more, c); -+ c->wait_for_complete(); -+ r = c->get_return_value(); -+ c->release(); -+ ASSERT_EQ(0, r); -+ ASSERT_EQ(false, more); -+ ASSERT_EQ(max_entries, result.size()); -+ for (auto i = 0u; i < max_entries; ++i) { -+ auto& bl = result[i].data; -+ ASSERT_EQ(i, *(int *)bl.c_str()); -+ } -+ auto& info = f->meta(); -+ ASSERT_EQ(info.head_part_num, 4); -+} - -From aede44ac6667c9a1ec7e813b547f8765754d896f Mon Sep 17 00:00:00 2001 -From: "Adam C. Emerson" -Date: Sat, 21 Nov 2020 01:44:36 -0500 -Subject: [PATCH 03/26] rgw: Factor out tool to deal with different log backing - -Read through the shards of a log and find out what kind it is. - -Also remove a log. - -Signed-off-by: Adam C. Emerson -(cherry picked from commit ed15d03f068c6f6e959f04d9d8f99eac82ebbd29) -Signed-off-by: Adam C. Emerson ---- - src/cls/log/cls_log_types.h | 3 + - src/rgw/CMakeLists.txt | 1 + - src/rgw/rgw_log_backing.cc | 215 +++++++++++++++++++++++++++++++ - src/rgw/rgw_log_backing.h | 70 ++++++++++ - src/test/rgw/CMakeLists.txt | 5 + - src/test/rgw/test_log_backing.cc | 176 +++++++++++++++++++++++++ - 6 files changed, 470 insertions(+) - create mode 100644 src/rgw/rgw_log_backing.cc - create mode 100644 src/rgw/rgw_log_backing.h - create mode 100644 src/test/rgw/test_log_backing.cc - -diff --git a/src/cls/log/cls_log_types.h b/src/cls/log/cls_log_types.h -index c5c00766d8156..1746d243e5a14 100644 ---- a/src/cls/log/cls_log_types.h -+++ b/src/cls/log/cls_log_types.h -@@ -65,6 +65,9 @@ inline bool operator ==(const cls_log_header& lhs, const cls_log_header& rhs) { - return (lhs.max_marker == rhs.max_marker && - lhs.max_time == rhs.max_time); - } -+inline bool operator !=(const cls_log_header& lhs, const cls_log_header& rhs) { -+ return !(lhs == rhs); -+} - WRITE_CLASS_ENCODER(cls_log_header) - - -diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt -index 44de25895ea2d..d3d91d4957947 100644 ---- a/src/rgw/CMakeLists.txt -+++ b/src/rgw/CMakeLists.txt -@@ -141,6 +141,7 @@ set(librgw_common_srcs - rgw_tag.cc - rgw_tag_s3.cc - rgw_tools.cc -+ rgw_log_backing.cc - rgw_user.cc - rgw_website.cc - rgw_xml.cc -diff --git a/src/rgw/rgw_log_backing.cc b/src/rgw/rgw_log_backing.cc -new file mode 100644 -index 0000000000000..63edf972a0307 ---- /dev/null -+++ b/src/rgw/rgw_log_backing.cc -@@ -0,0 +1,215 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab ft=cpp -+ -+#include "cls/log/cls_log_client.h" -+ -+#include "rgw_log_backing.h" -+#include "rgw_tools.h" -+#include "cls_fifo_legacy.h" -+ -+static constexpr auto dout_subsys = ceph_subsys_rgw; -+ -+enum class shard_check { dne, omap, fifo, corrupt }; -+inline std::ostream& operator <<(std::ostream& m, const shard_check& t) { -+ switch (t) { -+ case shard_check::dne: -+ return m << "shard_check::dne"; -+ case shard_check::omap: -+ return m << "shard_check::omap"; -+ case shard_check::fifo: -+ return m << "shard_check::fifo"; -+ case shard_check::corrupt: -+ return m << "shard_check::corrupt"; -+ } -+ -+ return m << "shard_check::UNKNOWN=" << static_cast(t); -+} -+ -+namespace { -+/// Return the shard type, and a bool to see whether it has entries. -+std::pair -+probe_shard(librados::IoCtx& ioctx, const std::string& oid, optional_yield y) -+{ -+ auto cct = static_cast(ioctx.cct()); -+ bool omap = false; -+ { -+ librados::ObjectReadOperation op; -+ cls_log_header header; -+ cls_log_info(op, &header); -+ auto r = rgw_rados_operate(ioctx, oid, &op, nullptr, y); -+ if (r == -ENOENT) { -+ return { shard_check::dne, {} }; -+ } -+ -+ if (r < 0) { -+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " error probing for omap: r=" << r -+ << ", oid=" << oid << dendl; -+ return { shard_check::corrupt, {} }; -+ } -+ if (header != cls_log_header{}) -+ omap = true; -+ } -+ std::unique_ptr fifo; -+ auto r = rgw::cls::fifo::FIFO::open(ioctx, oid, -+ &fifo, y, -+ std::nullopt, true); -+ if (r < 0 && !(r == -ENOENT || r == -ENODATA)) { -+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " error probing for fifo: r=" << r -+ << ", oid=" << oid << dendl; -+ return { shard_check::corrupt, {} }; -+ } -+ if (fifo && omap) { -+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " fifo and omap found: oid=" << oid << dendl; -+ return { shard_check::corrupt, {} }; -+ } -+ if (fifo) { -+ bool more = false; -+ std::vector entries; -+ r = fifo->list(1, nullopt, &entries, &more, y); -+ if (r < 0) { -+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << ": unable to list entries: r=" << r -+ << ", oid=" << oid << dendl; -+ return { shard_check::corrupt, {} }; -+ } -+ return { shard_check::fifo, !entries.empty() }; -+ } -+ if (omap) { -+ std::list entries; -+ std::string out_marker; -+ bool truncated = false; -+ librados::ObjectReadOperation op; -+ cls_log_list(op, {}, {}, {}, 1, entries, -+ &out_marker, &truncated); -+ auto r = rgw_rados_operate(ioctx, oid, &op, nullptr, y); -+ if (r < 0) { -+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << ": failed to list: r=" << r << ", oid=" << oid << dendl; -+ return { shard_check::corrupt, {} }; -+ } -+ return { shard_check::omap, !entries.empty() }; -+ } -+ -+ // An object exists, but has never had FIFO or cls_log entries written -+ // to it. Likely just the marker Omap. -+ return { shard_check::dne, {} }; -+} -+ -+tl::expected -+handle_dne(librados::IoCtx& ioctx, -+ log_type def, -+ std::string oid, -+ optional_yield y) -+{ -+ auto cct = static_cast(ioctx.cct()); -+ if (def == log_type::fifo) { -+ std::unique_ptr fifo; -+ auto r = rgw::cls::fifo::FIFO::create(ioctx, oid, -+ &fifo, y, -+ std::nullopt); -+ if (r < 0) { -+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " error creating FIFO: r=" << r -+ << ", oid=" << oid << dendl; -+ return tl::unexpected(bs::error_code(-r, bs::system_category())); -+ } -+ } -+ return def; -+} -+} -+ -+tl::expected -+log_backing_type(librados::IoCtx& ioctx, -+ log_type def, -+ int shards, -+ const fu2::unique_function& get_oid, -+ optional_yield y) -+{ -+ auto cct = static_cast(ioctx.cct()); -+ auto check = shard_check::dne; -+ for (int i = 0; i < shards; ++i) { -+ auto [c, e] = probe_shard(ioctx, get_oid(i), y); -+ if (c == shard_check::corrupt) -+ return tl::unexpected(bs::error_code(EIO, bs::system_category())); -+ if (c == shard_check::dne) continue; -+ if (check == shard_check::dne) { -+ check = c; -+ continue; -+ } -+ -+ if (check != c) { -+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " clashing types: check=" << check -+ << ", c=" << c << dendl; -+ return tl::unexpected(bs::error_code(EIO, bs::system_category())); -+ } -+ } -+ if (check == shard_check::corrupt) { -+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << " should be unreachable!" << dendl; -+ return tl::unexpected(bs::error_code(EIO, bs::system_category())); -+ } -+ -+ if (check == shard_check::dne) -+ return handle_dne(ioctx, -+ def, -+ get_oid(0), -+ y); -+ -+ return (check == shard_check::fifo ? log_type::fifo : log_type::omap); -+} -+ -+bs::error_code log_remove(librados::IoCtx& ioctx, -+ int shards, -+ const fu2::unique_function& get_oid, -+ optional_yield y) -+{ -+ bs::error_code ec; -+ auto cct = static_cast(ioctx.cct()); -+ for (int i = 0; i < shards; ++i) { -+ auto oid = get_oid(i); -+ rados::cls::fifo::info info; -+ uint32_t part_header_size = 0, part_entry_overhead = 0; -+ -+ auto r = rgw::cls::fifo::get_meta(ioctx, oid, nullopt, &info, -+ &part_header_size, &part_entry_overhead, -+ 0, y, true); -+ if (r == -ENOENT) continue; -+ if (r == 0 && info.head_part_num > -1) { -+ for (auto j = info.tail_part_num; j <= info.head_part_num; ++j) { -+ librados::ObjectWriteOperation op; -+ op.remove(); -+ auto part_oid = info.part_oid(j); -+ auto subr = rgw_rados_operate(ioctx, part_oid, &op, null_yield); -+ if (subr < 0 && subr != -ENOENT) { -+ if (!ec) -+ ec = bs::error_code(-subr, bs::system_category()); -+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << ": failed removing FIFO part: part_oid=" << part_oid -+ << ", subr=" << subr << dendl; -+ } -+ } -+ } -+ if (r < 0 && r != -ENODATA) { -+ if (!ec) -+ ec = bs::error_code(-r, bs::system_category()); -+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << ": failed checking FIFO part: oid=" << oid -+ << ", r=" << r << dendl; -+ } -+ librados::ObjectWriteOperation op; -+ op.remove(); -+ r = rgw_rados_operate(ioctx, oid, &op, null_yield); -+ if (r < 0 && r != -ENOENT) { -+ if (!ec) -+ ec = bs::error_code(-r, bs::system_category()); -+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ -+ << ": failed removing shard: oid=" << oid -+ << ", r=" << r << dendl; -+ } -+ } -+ return ec; -+} -diff --git a/src/rgw/rgw_log_backing.h b/src/rgw/rgw_log_backing.h -new file mode 100644 -index 0000000000000..d769af48b01fe ---- /dev/null -+++ b/src/rgw/rgw_log_backing.h -@@ -0,0 +1,70 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab ft=cpp -+ -+#ifndef CEPH_RGW_LOGBACKING_H -+#define CEPH_RGW_LOGBACKING_H -+ -+#include -+#include -+#include -+#include -+ -+#include -+ -+#include -+ -+#include "include/rados/librados.hpp" -+#include "include/expected.hpp" -+#include "include/function2.hpp" -+ -+#include "common/async/yield_context.h" -+ -+namespace bs = boost::system; -+ -+/// Type of log backing, stored in the mark used in the quick check, -+/// and passed to checking functions. -+enum class log_type { -+ omap = 0, -+ fifo = 1 -+}; -+ -+inline std::optional to_log_type(std::string_view s) { -+ if (strncasecmp(s.data(), "omap", s.length()) == 0) { -+ return log_type::omap; -+ } else if (strncasecmp(s.data(), "fifo", s.length()) == 0) { -+ return log_type::fifo; -+ } else { -+ return std::nullopt; -+ } -+} -+inline std::ostream& operator <<(std::ostream& m, const log_type& t) { -+ switch (t) { -+ case log_type::omap: -+ return m << "log_type::omap"; -+ case log_type::fifo: -+ return m << "log_type::fifo"; -+ } -+ -+ return m << "log_type::UNKNOWN=" << static_cast(t); -+} -+ -+/// Look over the shards in a log and determine the type. -+tl::expected -+log_backing_type(librados::IoCtx& ioctx, -+ log_type def, -+ int shards, //< Total number of shards -+ /// A function taking a shard number and -+ /// returning an oid. -+ const fu2::unique_function& get_oid, -+ optional_yield y); -+ -+/// Remove all log shards and associated parts of fifos. -+bs::error_code log_remove(librados::IoCtx& ioctx, -+ int shards, //< Total number of shards -+ /// A function taking a shard number and -+ /// returning an oid. -+ const fu2::unique_function& get_oid, -+ optional_yield y); -+ -+ -+#endif -diff --git a/src/test/rgw/CMakeLists.txt b/src/test/rgw/CMakeLists.txt -index 7817a42ef9ab8..c4aa22db81749 100644 ---- a/src/test/rgw/CMakeLists.txt -+++ b/src/test/rgw/CMakeLists.txt -@@ -213,6 +213,11 @@ add_executable(unittest_cls_fifo_legacy test_cls_fifo_legacy.cc) - target_link_libraries(unittest_cls_fifo_legacy radostest-cxx ${UNITTEST_LIBS} - ${rgw_libs}) - -+# unittest_log_backing -+add_executable(unittest_log_backing test_log_backing.cc) -+target_link_libraries(unittest_log_backing radostest-cxx ${UNITTEST_LIBS} -+ ${rgw_libs}) -+ - add_executable(unittest_rgw_lua test_rgw_lua.cc) - add_ceph_unittest(unittest_rgw_lua) - target_link_libraries(unittest_rgw_lua ${rgw_libs} ${LUA_LIBRARIES}) -diff --git a/src/test/rgw/test_log_backing.cc b/src/test/rgw/test_log_backing.cc -new file mode 100644 -index 0000000000000..5180d5fc74fe8 ---- /dev/null -+++ b/src/test/rgw/test_log_backing.cc -@@ -0,0 +1,176 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+/* -+ * Ceph - scalable distributed file system -+ * -+ * Copyright (C) 2019 Red Hat, Inc. -+ * -+ * This is free software; you can redistribute it and/or -+ * modify it under the terms of the GNU Lesser General Public -+ * License version 2.1, as published by the Free Software -+ * Foundation. See file COPYING. -+ * -+ */ -+ -+#include "rgw_log_backing.h" -+ -+#include -+#include -+#include -+ -+#undef FMT_HEADER_ONLY -+#define FMT_HEADER_ONLY 1 -+#include -+ -+#include "include/types.h" -+#include "include/rados/librados.hpp" -+ -+#include "test/librados/test_cxx.h" -+#include "global/global_context.h" -+ -+#include "cls/log/cls_log_client.h" -+ -+#include "rgw/rgw_tools.h" -+#include "rgw/cls_fifo_legacy.h" -+ -+#include "gtest/gtest.h" -+ -+namespace lr = librados; -+namespace cb = ceph::buffer; -+namespace fifo = rados::cls::fifo; -+namespace RCf = rgw::cls::fifo; -+ -+class LogBacking : public testing::Test { -+protected: -+ static constexpr int SHARDS = 3; -+ const std::string pool_name = get_temp_pool_name(); -+ lr::Rados rados; -+ lr::IoCtx ioctx; -+ -+ void SetUp() override { -+ ASSERT_EQ("", create_one_pool_pp(pool_name, rados)); -+ ASSERT_EQ(0, rados.ioctx_create(pool_name.c_str(), ioctx)); -+ } -+ void TearDown() override { -+ destroy_one_pool_pp(pool_name, rados); -+ } -+ -+ static std::string get_oid(int i) { -+ return fmt::format("shard.{}", i); -+ } -+ -+ void make_omap() { -+ for (int i = 0; i < SHARDS; ++i) { -+ using ceph::encode; -+ lr::ObjectWriteOperation op; -+ cb::list bl; -+ encode(i, bl); -+ cls_log_add(op, ceph_clock_now(), {}, "meow", bl); -+ auto r = rgw_rados_operate(ioctx, get_oid(i), &op, null_yield); -+ ASSERT_GE(r, 0); -+ } -+ } -+ -+ void add_omap(int i) { -+ using ceph::encode; -+ lr::ObjectWriteOperation op; -+ cb::list bl; -+ encode(i, bl); -+ cls_log_add(op, ceph_clock_now(), {}, "meow", bl); -+ auto r = rgw_rados_operate(ioctx, get_oid(i), &op, null_yield); -+ ASSERT_GE(r, 0); -+ } -+ -+ void empty_omap() { -+ for (int i = 0; i < SHARDS; ++i) { -+ auto oid = get_oid(i); -+ std::string to_marker; -+ { -+ lr::ObjectReadOperation op; -+ std::list entries; -+ bool truncated = false; -+ cls_log_list(op, {}, {}, {}, 1, entries, &to_marker, &truncated); -+ auto r = rgw_rados_operate(ioctx, oid, &op, nullptr, null_yield); -+ ASSERT_GE(r, 0); -+ ASSERT_FALSE(entries.empty()); -+ } -+ { -+ lr::ObjectWriteOperation op; -+ cls_log_trim(op, {}, {}, {}, to_marker); -+ auto r = rgw_rados_operate(ioctx, oid, &op, null_yield); -+ ASSERT_GE(r, 0); -+ } -+ { -+ lr::ObjectReadOperation op; -+ std::list entries; -+ bool truncated = false; -+ cls_log_list(op, {}, {}, {}, 1, entries, &to_marker, &truncated); -+ auto r = rgw_rados_operate(ioctx, oid, &op, nullptr, null_yield); -+ ASSERT_GE(r, 0); -+ ASSERT_TRUE(entries.empty()); -+ } -+ } -+ } -+ -+ void make_fifo() -+ { -+ for (int i = 0; i < SHARDS; ++i) { -+ std::unique_ptr fifo; -+ auto r = RCf::FIFO::create(ioctx, get_oid(i), &fifo, null_yield); -+ ASSERT_EQ(0, r); -+ ASSERT_TRUE(fifo); -+ } -+ } -+ -+ void add_fifo(int i) -+ { -+ using ceph::encode; -+ std::unique_ptr fifo; -+ auto r = RCf::FIFO::open(ioctx, get_oid(i), &fifo, null_yield); -+ ASSERT_GE(0, r); -+ ASSERT_TRUE(fifo); -+ cb::list bl; -+ encode(i, bl); -+ r = fifo->push(bl, null_yield); -+ ASSERT_GE(0, r); -+ } -+ -+ void assert_empty() { -+ std::vector result; -+ lr::ObjectCursor next; -+ auto r = ioctx.object_list(ioctx.object_list_begin(), ioctx.object_list_end(), -+ 100, {}, &result, &next); -+ ASSERT_GE(r, 0); -+ ASSERT_TRUE(result.empty()); -+ } -+}; -+ -+TEST_F(LogBacking, TestOmap) -+{ -+ make_omap(); -+ auto stat = log_backing_type(ioctx, log_type::fifo, SHARDS, -+ get_oid, null_yield); -+ ASSERT_EQ(log_type::omap, *stat); -+} -+ -+TEST_F(LogBacking, TestOmapEmpty) -+{ -+ auto stat = log_backing_type(ioctx, log_type::omap, SHARDS, -+ get_oid, null_yield); -+ ASSERT_EQ(log_type::omap, *stat); -+} -+ -+TEST_F(LogBacking, TestFIFO) -+{ -+ make_fifo(); -+ auto stat = log_backing_type(ioctx, log_type::fifo, SHARDS, -+ get_oid, null_yield); -+ ASSERT_EQ(log_type::fifo, *stat); -+} -+ -+TEST_F(LogBacking, TestFIFOEmpty) -+{ -+ auto stat = log_backing_type(ioctx, log_type::fifo, SHARDS, -+ get_oid, null_yield); -+ ASSERT_EQ(log_type::fifo, *stat); -+} - -From 8c81b6fa1b2a0f1d409afbd0126d18cfc97315c4 Mon Sep 17 00:00:00 2001 -From: "Adam C. Emerson" -Date: Sat, 21 Nov 2020 15:45:12 -0500 -Subject: [PATCH 04/26] rgw: Use refactored log backing tools - -Signed-off-by: Adam C. Emerson -(cherry picked from commit da6223d281e33e43fa74c50f4d0eedb5ac25ace4) -Signed-off-by: Adam C. Emerson ---- - src/common/options.cc | 16 ++-- - src/rgw/rgw_datalog.cc | 208 +++++------------------------------------ - src/rgw/rgw_datalog.h | 5 +- - 3 files changed, 31 insertions(+), 198 deletions(-) - -diff --git a/src/common/options.cc b/src/common/options.cc -index 75d6589c08296..8fdd62fb14ccb 100644 ---- a/src/common/options.cc -+++ b/src/common/options.cc -@@ -7407,17 +7407,15 @@ std::vector