From d60970c7cd0a45f48ab11bb66ec7ec4cf60c8266 Mon Sep 17 00:00:00 2001 From: "Kaleb S. KEITHLEY" Date: Sat, 10 Apr 2021 08:12:36 -0400 Subject: [PATCH] 16.2.0, libamqp_mock fix (FTBFS, #1947281), rgw fix Signed-off-by: Kaleb S. KEITHLEY --- 0011-src-test-rgw-amqp_mock.cc.patch | 23 + 0012-rgw.patch | 7721 ++++++++++++++++++++++++++ ceph.spec | 7 +- 3 files changed, 7750 insertions(+), 1 deletion(-) create mode 100644 0011-src-test-rgw-amqp_mock.cc.patch create mode 100644 0012-rgw.patch diff --git a/0011-src-test-rgw-amqp_mock.cc.patch b/0011-src-test-rgw-amqp_mock.cc.patch new file mode 100644 index 0000000..c0c5179 --- /dev/null +++ b/0011-src-test-rgw-amqp_mock.cc.patch @@ -0,0 +1,23 @@ +--- a/src/test/rgw/amqp_mock.cc ++++ b/src/test/rgw/amqp_mock.cc +@@ -291,7 +291,11 @@ amqp_confirm_select_ok_t* amqp_confirm_select(amqp_connection_state_t state, amq + return state->confirm; + } + +-int amqp_simple_wait_frame_noblock(amqp_connection_state_t state, amqp_frame_t *decoded_frame, struct timeval* tv) { ++extern "C" { ++ ++int amqp_simple_wait_frame_noblock(amqp_connection_state_t state, ++ amqp_frame_t *decoded_frame, ++ const struct timeval* tv) { + if (state->socket && state->socket->open_called && + state->login_called && state->channel1 && state->channel2 && state->exchange && + state->queue && state->consume && state->confirm && !FAIL_NEXT_READ) { +@@ -345,6 +349,7 @@ int amqp_simple_wait_frame_noblock(amqp_connection_state_t state, amqp_frame_t * + } + return AMQP_STATUS_CONNECTION_CLOSED; + } ++} // extern "C" + + amqp_basic_consume_ok_t* amqp_basic_consume( + amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, diff --git a/0012-rgw.patch b/0012-rgw.patch new file mode 100644 index 0000000..0d85501 --- /dev/null +++ b/0012-rgw.patch @@ -0,0 +1,7721 @@ +From 483302af2622cb26983c847196b8bad0a80fbd2f Mon Sep 17 00:00:00 2001 +From: "Adam C. Emerson" +Date: Sat, 21 Nov 2020 17:04:12 -0500 +Subject: [PATCH 01/26] cls/log: Take const references of things you won't + modify + +Signed-off-by: Adam C. Emerson +(cherry picked from commit 73ea8cec06addc6af2ba354321f1099f657f13c5) +Signed-off-by: Adam C. Emerson +--- + src/cls/log/cls_log_client.cc | 4 ++-- + src/cls/log/cls_log_client.h | 6 +++--- + 2 files changed, 5 insertions(+), 5 deletions(-) + +diff --git a/src/cls/log/cls_log_client.cc b/src/cls/log/cls_log_client.cc +index 418599c8066e4..182bb9fec47e9 100644 +--- a/src/cls/log/cls_log_client.cc ++++ b/src/cls/log/cls_log_client.cc +@@ -113,8 +113,8 @@ class LogListCtx : public ObjectOperationCompletion { + } + }; + +-void cls_log_list(librados::ObjectReadOperation& op, utime_t& from, utime_t& to, +- const string& in_marker, int max_entries, ++void cls_log_list(librados::ObjectReadOperation& op, const utime_t& from, ++ const utime_t& to, const string& in_marker, int max_entries, + list& entries, + string *out_marker, bool *truncated) + { +diff --git a/src/cls/log/cls_log_client.h b/src/cls/log/cls_log_client.h +index b049c2cc01bda..2afdabeb3e0a2 100644 +--- a/src/cls/log/cls_log_client.h ++++ b/src/cls/log/cls_log_client.h +@@ -19,9 +19,9 @@ void cls_log_add(librados::ObjectWriteOperation& op, cls_log_entry& entry); + void cls_log_add(librados::ObjectWriteOperation& op, const utime_t& timestamp, + const std::string& section, const std::string& name, ceph::buffer::list& bl); + +-void cls_log_list(librados::ObjectReadOperation& op, utime_t& from, utime_t& to, +- const std::string& in_marker, int max_entries, +- std::list& entries, ++void cls_log_list(librados::ObjectReadOperation& op, const utime_t& from, ++ const utime_t& to, const std::string& in_marker, ++ int max_entries, std::list& entries, + std::string *out_marker, bool *truncated); + + void cls_log_trim(librados::ObjectWriteOperation& op, const utime_t& from_time, const utime_t& to_time, + +From 35f044f39da713b3bf4c5002aade7b456727190e Mon Sep 17 00:00:00 2001 +From: "Adam C. Emerson" +Date: Tue, 3 Nov 2020 16:02:26 -0500 +Subject: [PATCH 02/26] rgw: Add AioCompletion* versions for the rest of the + FIFO methods + +Signed-off-by: Adam C. Emerson +(cherry picked from commit 665573ab8905bfa2e1ede6fc3be9bc80a625cb49) +Signed-off-by: Adam C. Emerson +--- + src/rgw/cls_fifo_legacy.cc | 1583 +++++++++++++++++++++----- + src/rgw/cls_fifo_legacy.h | 91 +- + src/rgw/rgw_datalog.cc | 7 +- + src/test/rgw/test_cls_fifo_legacy.cc | 484 +++++++- + 4 files changed, 1826 insertions(+), 339 deletions(-) + +diff --git a/src/rgw/cls_fifo_legacy.cc b/src/rgw/cls_fifo_legacy.cc +index d835aeec76ab8..569a3e77c458f 100644 +--- a/src/rgw/cls_fifo_legacy.cc ++++ b/src/rgw/cls_fifo_legacy.cc +@@ -109,6 +109,7 @@ int get_meta(lr::IoCtx& ioctx, const std::string& oid, + return r; + }; + ++namespace { + void update_meta(lr::ObjectWriteOperation* op, const fifo::objv& objv, + const fifo::update& update) + { +@@ -175,6 +176,27 @@ int push_part(lr::IoCtx& ioctx, const std::string& oid, std::string_view tag, + return retval; + } + ++void push_part(lr::IoCtx& ioctx, const std::string& oid, std::string_view tag, ++ std::deque data_bufs, std::uint64_t tid, ++ lr::AioCompletion* c) ++{ ++ lr::ObjectWriteOperation op; ++ fifo::op::push_part pp; ++ ++ pp.tag = tag; ++ pp.data_bufs = data_bufs; ++ pp.total_len = 0; ++ ++ for (const auto& bl : data_bufs) ++ pp.total_len += bl.length(); ++ ++ cb::list in; ++ encode(pp, in); ++ op.exec(fifo::op::CLASS, fifo::op::PUSH_PART, in); ++ auto r = ioctx.aio_operate(oid, c, &op, lr::OPERATION_RETURNVEC); ++ ceph_assert(r >= 0); ++} ++ + void trim_part(lr::ObjectWriteOperation* op, + std::optional tag, + std::uint64_t ofs, bool exclusive) +@@ -232,6 +254,70 @@ int list_part(lr::IoCtx& ioctx, const std::string& oid, + return r; + } + ++struct list_entry_completion : public lr::ObjectOperationCompletion { ++ CephContext* cct; ++ int* r_out; ++ std::vector* entries; ++ bool* more; ++ bool* full_part; ++ std::string* ptag; ++ std::uint64_t tid; ++ ++ list_entry_completion(CephContext* cct, int* r_out, std::vector* entries, ++ bool* more, bool* full_part, std::string* ptag, ++ std::uint64_t tid) ++ : cct(cct), r_out(r_out), entries(entries), more(more), ++ full_part(full_part), ptag(ptag), tid(tid) {} ++ virtual ~list_entry_completion() = default; ++ void handle_completion(int r, bufferlist& bl) override { ++ if (r >= 0) try { ++ fifo::op::list_part_reply reply; ++ auto iter = bl.cbegin(); ++ decode(reply, iter); ++ if (entries) *entries = std::move(reply.entries); ++ if (more) *more = reply.more; ++ if (full_part) *full_part = reply.full_part; ++ if (ptag) *ptag = reply.tag; ++ } catch (const cb::error& err) { ++ lderr(cct) ++ << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " decode failed: " << err.what() ++ << " tid=" << tid << dendl; ++ r = from_error_code(err.code()); ++ } else if (r < 0) { ++ lderr(cct) ++ << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " fifo::op::LIST_PART failed r=" << r << " tid=" << tid ++ << dendl; ++ } ++ if (r_out) *r_out = r; ++ } ++}; ++ ++lr::ObjectReadOperation list_part(CephContext* cct, ++ std::optional tag, ++ std::uint64_t ofs, ++ std::uint64_t max_entries, ++ int* r_out, ++ std::vector* entries, ++ bool* more, bool* full_part, ++ std::string* ptag, std::uint64_t tid) ++{ ++ lr::ObjectReadOperation op; ++ fifo::op::list_part lp; ++ ++ lp.tag = tag; ++ lp.ofs = ofs; ++ lp.max_entries = max_entries; ++ ++ cb::list in; ++ encode(lp, in); ++ op.exec(fifo::op::CLASS, fifo::op::LIST_PART, in, ++ new list_entry_completion(cct, r_out, entries, more, full_part, ++ ptag, tid)); ++ return op; ++} ++ + int get_part_info(lr::IoCtx& ioctx, const std::string& oid, + fifo::part_header* header, + std::uint64_t tid, optional_yield y) +@@ -264,29 +350,131 @@ int get_part_info(lr::IoCtx& ioctx, const std::string& oid, + return r; + } + +-static void complete(lr::AioCompletion* c_, int r) ++struct partinfo_completion : public lr::ObjectOperationCompletion { ++ CephContext* cct; ++ int* rp; ++ fifo::part_header* h; ++ std::uint64_t tid; ++ partinfo_completion(CephContext* cct, int* rp, fifo::part_header* h, ++ std::uint64_t tid) : ++ cct(cct), rp(rp), h(h), tid(tid) { ++ } ++ virtual ~partinfo_completion() = default; ++ void handle_completion(int r, bufferlist& bl) override { ++ if (r >= 0) try { ++ fifo::op::get_part_info_reply reply; ++ auto iter = bl.cbegin(); ++ decode(reply, iter); ++ if (h) *h = std::move(reply.header); ++ } catch (const cb::error& err) { ++ r = from_error_code(err.code()); ++ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " decode failed: " << err.what() ++ << " tid=" << tid << dendl; ++ } else { ++ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " fifo::op::GET_PART_INFO failed r=" << r << " tid=" << tid ++ << dendl; ++ } ++ if (rp) { ++ *rp = r; ++ } ++ } ++}; ++ ++template ++struct Completion { ++private: ++ lr::AioCompletion* _cur = nullptr; ++ lr::AioCompletion* _super; ++public: ++ ++ using Ptr = std::unique_ptr; ++ ++ lr::AioCompletion* cur() const { ++ return _cur; ++ } ++ lr::AioCompletion* super() const { ++ return _super; ++ } ++ ++ Completion(lr::AioCompletion* super) : _super(super) { ++ super->pc->get(); ++ } ++ ++ ~Completion() { ++ if (_super) { ++ _super->pc->put(); ++ } ++ if (_cur) ++ _cur->release(); ++ _super = nullptr; ++ _cur = nullptr; ++ } ++ ++ // The only times that aio_operate can return an error are: ++ // 1. The completion contains a null pointer. This should just ++ // crash, and in our case it does. ++ // 2. An attempt is made to write to a snapshot. RGW doesn't use ++ // snapshots, so we don't care. ++ // ++ // So we will just assert that initiating an Aio operation succeeds ++ // and not worry about recovering. ++ static lr::AioCompletion* call(Ptr&& p) { ++ p->_cur = lr::Rados::aio_create_completion(static_cast(p.get()), ++ &cb); ++ auto c = p->_cur; ++ p.release(); ++ return c; ++ } ++ static void complete(Ptr&& p, int r) { ++ auto c = p->_super->pc; ++ p->_super = nullptr; ++ c->lock.lock(); ++ c->rval = r; ++ c->complete = true; ++ c->lock.unlock(); ++ ++ auto cb_complete = c->callback_complete; ++ auto cb_complete_arg = c->callback_complete_arg; ++ if (cb_complete) ++ cb_complete(c, cb_complete_arg); ++ ++ auto cb_safe = c->callback_safe; ++ auto cb_safe_arg = c->callback_safe_arg; ++ if (cb_safe) ++ cb_safe(c, cb_safe_arg); ++ ++ c->lock.lock(); ++ c->callback_complete = nullptr; ++ c->callback_safe = nullptr; ++ c->cond.notify_all(); ++ c->put_unlock(); ++ } ++ ++ static void cb(lr::completion_t, void* arg) { ++ auto t = static_cast(arg); ++ auto r = t->_cur->get_return_value(); ++ t->_cur->release(); ++ t->_cur = nullptr; ++ t->handle(Ptr(t), r); ++ } ++}; ++ ++lr::ObjectReadOperation get_part_info(CephContext* cct, ++ fifo::part_header* header, ++ std::uint64_t tid, int* r = 0) + { +- auto c = c_->pc; +- c->lock.lock(); +- c->rval = r; +- c->complete = true; +- c->lock.unlock(); +- +- auto cb_complete = c->callback_complete; +- auto cb_complete_arg = c->callback_complete_arg; +- if (cb_complete) +- cb_complete(c, cb_complete_arg); +- +- auto cb_safe = c->callback_safe; +- auto cb_safe_arg = c->callback_safe_arg; +- if (cb_safe) +- cb_safe(c, cb_safe_arg); +- +- c->lock.lock(); +- c->callback_complete = NULL; +- c->callback_safe = NULL; +- c->cond.notify_all(); +- c->put_unlock(); ++ lr::ObjectReadOperation op; ++ fifo::op::get_part_info gpi; ++ ++ cb::list in; ++ cb::list bl; ++ encode(gpi, in); ++ op.exec(fifo::op::CLASS, fifo::op::GET_PART_INFO, in, ++ new partinfo_completion(cct, r, header, tid)); ++ return op; ++} + } + + std::optional FIFO::to_marker(std::string_view s) +@@ -385,11 +573,8 @@ int FIFO::_update_meta(const fifo::update& update, + return r; + } + +-struct Updater { ++struct Updater : public Completion { + FIFO* fifo; +- lr::AioCompletion* super; +- lr::AioCompletion* cur = lr::Rados::aio_create_completion( +- static_cast(this), &FIFO::update_callback); + fifo::update update; + fifo::objv version; + bool reread = false; +@@ -398,92 +583,74 @@ struct Updater { + Updater(FIFO* fifo, lr::AioCompletion* super, + const fifo::update& update, fifo::objv version, + bool* pcanceled, std::uint64_t tid) +- : fifo(fifo), super(super), update(update), version(version), +- pcanceled(pcanceled), tid(tid) { +- super->pc->get(); +- } +- ~Updater() { +- cur->release(); +- } +-}; +- +-void FIFO::update_callback(lr::completion_t, void* arg) +-{ +- std::unique_ptr updater(static_cast(arg)); +- auto cct = updater->fifo->cct; +- auto tid = updater->tid; +- ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ +- << " entering: tid=" << tid << dendl; +- if (!updater->reread) { +- ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ +- << " handling async update_meta: tid=" +- << tid << dendl; +- int r = updater->cur->get_return_value(); ++ : Completion(super), fifo(fifo), update(update), version(version), ++ pcanceled(pcanceled) {} ++ ++ void handle(Ptr&& p, int r) { ++ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " entering: tid=" << tid << dendl; ++ if (reread) ++ handle_reread(std::move(p), r); ++ else ++ handle_update(std::move(p), r); ++ } ++ ++ void handle_update(Ptr&& p, int r) { ++ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " handling async update_meta: tid=" ++ << tid << dendl; + if (r < 0 && r != -ECANCELED) { +- lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " update failed: r=" << r << " tid=" << tid << dendl; +- complete(updater->super, r); ++ complete(std::move(p), r); + return; + } + bool canceled = (r == -ECANCELED); + if (!canceled) { +- int r = updater->fifo->apply_update(&updater->fifo->info, +- updater->version, +- updater->update, tid); ++ int r = fifo->apply_update(&fifo->info, version, update, tid); + if (r < 0) { +- ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ +- << " update failed, marking canceled: r=" << r << " tid=" +- << tid << dendl; ++ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " update failed, marking canceled: r=" << r ++ << " tid=" << tid << dendl; + canceled = true; + } + } + if (canceled) { +- updater->cur->release(); +- updater->cur = lr::Rados::aio_create_completion( +- arg, &FIFO::update_callback); +- updater->reread = true; +- auto r = updater->fifo->read_meta(tid, updater->cur); +- if (r < 0) { +- lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ +- << " failed dispatching read_meta: r=" << r << " tid=" +- << tid << dendl; +- complete(updater->super, r); +- } else { +- updater.release(); +- } ++ reread = true; ++ fifo->read_meta(tid, call(std::move(p))); + return; + } +- if (updater->pcanceled) +- *updater->pcanceled = false; +- ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ +- << " completing: tid=" << tid << dendl; +- complete(updater->super, 0); +- return; +- } +- +- ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ +- << " handling async read_meta: tid=" +- << tid << dendl; +- int r = updater->cur->get_return_value(); +- if (r < 0 && updater->pcanceled) { +- *updater->pcanceled = false; +- } else if (r >= 0 && updater->pcanceled) { +- *updater->pcanceled = true; +- } +- if (r < 0) { +- lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ +- << " failed dispatching read_meta: r=" << r << " tid=" +- << tid << dendl; +- } else { +- ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ +- << " completing: tid=" << tid << dendl; ++ if (pcanceled) ++ *pcanceled = false; ++ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " completing: tid=" << tid << dendl; ++ complete(std::move(p), 0); ++ } ++ ++ void handle_reread(Ptr&& p, int r) { ++ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " handling async read_meta: tid=" ++ << tid << dendl; ++ if (r < 0 && pcanceled) { ++ *pcanceled = false; ++ } else if (r >= 0 && pcanceled) { ++ *pcanceled = true; ++ } ++ if (r < 0) { ++ lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " failed dispatching read_meta: r=" << r << " tid=" ++ << tid << dendl; ++ } else { ++ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " completing: tid=" << tid << dendl; ++ } ++ complete(std::move(p), r); + } +- complete(updater->super, r); +-} ++}; + +-int FIFO::_update_meta(const fifo::update& update, +- fifo::objv version, bool* pcanceled, +- std::uint64_t tid, lr::AioCompletion* c) ++void FIFO::_update_meta(const fifo::update& update, ++ fifo::objv version, bool* pcanceled, ++ std::uint64_t tid, lr::AioCompletion* c) + { + ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " entering: tid=" << tid << dendl; +@@ -491,15 +658,8 @@ int FIFO::_update_meta(const fifo::update& update, + update_meta(&op, info.version, update); + auto updater = std::make_unique(this, c, update, version, pcanceled, + tid); +- auto r = ioctx.aio_operate(oid, updater->cur, &op); +- if (r < 0) { +- lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ +- << " failed dispatching update_meta: r=" << r << " tid=" +- << tid << dendl; +- } else { +- updater.release(); +- } +- return r; ++ auto r = ioctx.aio_operate(oid, Updater::call(std::move(updater)), &op); ++ assert(r >= 0); + } + + int FIFO::create_part(int64_t part_num, std::string_view tag, std::uint64_t tid, +@@ -509,7 +669,7 @@ int FIFO::create_part(int64_t part_num, std::string_view tag, std::uint64_t tid, + << " entering: tid=" << tid << dendl; + lr::ObjectWriteOperation op; + op.create(false); /* We don't need exclusivity, part_init ensures +- we're creating from the same journal entry. */ ++ we're creating from the same journal entry. */ + std::unique_lock l(m); + part_init(&op, tag, info.params); + auto oid = info.part_oid(part_num); +@@ -806,6 +966,209 @@ int FIFO::_prepare_new_head(std::uint64_t tid, optional_yield y) + return 0; + } + ++struct NewPartPreparer : public Completion { ++ FIFO* f; ++ std::vector jentries; ++ int i = 0; ++ std::int64_t new_head_part_num; ++ bool canceled = false; ++ uint64_t tid; ++ ++ NewPartPreparer(FIFO* f, lr::AioCompletion* super, ++ std::vector jentries, ++ std::int64_t new_head_part_num, ++ std::uint64_t tid) ++ : Completion(super), f(f), jentries(std::move(jentries)), ++ new_head_part_num(new_head_part_num), tid(tid) {} ++ ++ void handle(Ptr&& p, int r) { ++ ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " entering: tid=" << tid << dendl; ++ if (r < 0) { ++ lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " _update_meta failed: r=" << r ++ << " tid=" << tid << dendl; ++ complete(std::move(p), r); ++ return; ++ } ++ ++ if (canceled) { ++ std::unique_lock l(f->m); ++ auto iter = f->info.journal.find(jentries.front().part_num); ++ auto max_push_part_num = f->info.max_push_part_num; ++ auto head_part_num = f->info.head_part_num; ++ auto version = f->info.version; ++ auto found = (iter != f->info.journal.end()); ++ l.unlock(); ++ if ((max_push_part_num >= jentries.front().part_num && ++ head_part_num >= new_head_part_num)) { ++ ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " raced, but journaled and processed: i=" << i ++ << " tid=" << tid << dendl; ++ complete(std::move(p), 0); ++ return; ++ } ++ if (i >= MAX_RACE_RETRIES) { ++ complete(std::move(p), -ECANCELED); ++ return; ++ } ++ if (!found) { ++ ++i; ++ f->_update_meta(fifo::update{} ++ .journal_entries_add(jentries), ++ version, &canceled, tid, call(std::move(p))); ++ return; ++ } else { ++ ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " raced, journaled but not processed: i=" << i ++ << " tid=" << tid << dendl; ++ canceled = false; ++ } ++ // Fall through. We still need to process the journal. ++ } ++ f->process_journal(tid, super()); ++ return; ++ } ++}; ++ ++void FIFO::_prepare_new_part(bool is_head, std::uint64_t tid, ++ lr::AioCompletion* c) ++{ ++ std::unique_lock l(m); ++ std::vector jentries = { info.next_journal_entry(generate_tag()) }; ++ if (info.journal.find(jentries.front().part_num) != info.journal.end()) { ++ l.unlock(); ++ ldout(cct, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " new part journaled, but not processed: tid=" ++ << tid << dendl; ++ process_journal(tid, c); ++ return; ++ } ++ std::int64_t new_head_part_num = info.head_part_num; ++ auto version = info.version; ++ ++ if (is_head) { ++ auto new_head_jentry = jentries.front(); ++ new_head_jentry.op = fifo::journal_entry::Op::set_head; ++ new_head_part_num = jentries.front().part_num; ++ jentries.push_back(std::move(new_head_jentry)); ++ } ++ l.unlock(); ++ ++ auto n = std::make_unique(this, c, jentries, ++ new_head_part_num, tid); ++ auto np = n.get(); ++ _update_meta(fifo::update{}.journal_entries_add(jentries), version, ++ &np->canceled, tid, NewPartPreparer::call(std::move(n))); ++} ++ ++struct NewHeadPreparer : public Completion { ++ FIFO* f; ++ int i = 0; ++ bool newpart; ++ std::int64_t new_head_num; ++ bool canceled = false; ++ std::uint64_t tid; ++ ++ NewHeadPreparer(FIFO* f, lr::AioCompletion* super, ++ bool newpart, std::int64_t new_head_num, std::uint64_t tid) ++ : Completion(super), f(f), newpart(newpart), new_head_num(new_head_num), ++ tid(tid) {} ++ ++ void handle(Ptr&& p, int r) { ++ if (newpart) ++ handle_newpart(std::move(p), r); ++ else ++ handle_update(std::move(p), r); ++ } ++ ++ void handle_newpart(Ptr&& p, int r) { ++ if (r < 0) { ++ lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " _prepare_new_part failed: r=" << r ++ << " tid=" << tid << dendl; ++ complete(std::move(p), r); ++ return; ++ } ++ std::unique_lock l(f->m); ++ if (f->info.max_push_part_num < new_head_num) { ++ l.unlock(); ++ lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " _prepare_new_part failed: r=" << r ++ << " tid=" << tid << dendl; ++ complete(std::move(p), -EIO); ++ } else { ++ l.unlock(); ++ complete(std::move(p), 0); ++ } ++ } ++ ++ void handle_update(Ptr&& p, int r) { ++ std::unique_lock l(f->m); ++ auto head_part_num = f->info.head_part_num; ++ auto version = f->info.version; ++ l.unlock(); ++ ++ if (r < 0) { ++ lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " _update_meta failed: r=" << r ++ << " tid=" << tid << dendl; ++ complete(std::move(p), r); ++ return; ++ } ++ if (canceled) { ++ if (i >= MAX_RACE_RETRIES) { ++ lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " canceled too many times, giving up: tid=" << tid << dendl; ++ complete(std::move(p), -ECANCELED); ++ return; ++ } ++ ++ // Raced, but there's still work to do! ++ if (head_part_num < new_head_num) { ++ canceled = false; ++ ++i; ++ ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " updating head: i=" << i << " tid=" << tid << dendl; ++ f->_update_meta(fifo::update{}.head_part_num(new_head_num), ++ version, &this->canceled, tid, call(std::move(p))); ++ return; ++ } ++ } ++ ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " succeeded : i=" << i << " tid=" << tid << dendl; ++ complete(std::move(p), 0); ++ return; ++ } ++}; ++ ++void FIFO::_prepare_new_head(std::uint64_t tid, lr::AioCompletion* c) ++{ ++ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " entering: tid=" << tid << dendl; ++ std::unique_lock l(m); ++ int64_t new_head_num = info.head_part_num + 1; ++ auto max_push_part_num = info.max_push_part_num; ++ auto version = info.version; ++ l.unlock(); ++ ++ if (max_push_part_num < new_head_num) { ++ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " need new part: tid=" << tid << dendl; ++ auto n = std::make_unique(this, c, true, new_head_num, ++ tid); ++ _prepare_new_part(true, tid, NewHeadPreparer::call(std::move(n))); ++ } else { ++ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " updating head: tid=" << tid << dendl; ++ auto n = std::make_unique(this, c, false, new_head_num, ++ tid); ++ auto np = n.get(); ++ _update_meta(fifo::update{}.head_part_num(new_head_num), version, ++ &np->canceled, tid, NewHeadPreparer::call(std::move(n))); ++ } ++} ++ + int FIFO::push_entries(const std::deque& data_bufs, + std::uint64_t tid, optional_yield y) + { +@@ -825,6 +1188,18 @@ int FIFO::push_entries(const std::deque& data_bufs, + return r; + } + ++void FIFO::push_entries(const std::deque& data_bufs, ++ std::uint64_t tid, lr::AioCompletion* c) ++{ ++ std::unique_lock l(m); ++ auto head_part_num = info.head_part_num; ++ auto tag = info.head_tag; ++ const auto part_oid = info.part_oid(head_part_num); ++ l.unlock(); ++ ++ push_part(ioctx, part_oid, tag, data_bufs, tid, c); ++} ++ + int FIFO::trim_part(int64_t part_num, uint64_t ofs, + std::optional tag, + bool exclusive, std::uint64_t tid, +@@ -845,10 +1220,10 @@ int FIFO::trim_part(int64_t part_num, uint64_t ofs, + return 0; + } + +-int FIFO::trim_part(int64_t part_num, uint64_t ofs, +- std::optional tag, +- bool exclusive, std::uint64_t tid, +- lr::AioCompletion* c) ++void FIFO::trim_part(int64_t part_num, uint64_t ofs, ++ std::optional tag, ++ bool exclusive, std::uint64_t tid, ++ lr::AioCompletion* c) + { + ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " entering: tid=" << tid << dendl; +@@ -858,12 +1233,7 @@ int FIFO::trim_part(int64_t part_num, uint64_t ofs, + l.unlock(); + rgw::cls::fifo::trim_part(&op, tag, ofs, exclusive); + auto r = ioctx.aio_operate(part_oid, c, &op); +- if (r < 0) { +- lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ +- << " failed scheduling trim_part: r=" << r +- << " tid=" << tid << dendl; +- } +- return r; ++ ceph_assert(r >= 0); + } + + int FIFO::open(lr::IoCtx ioctx, std::string oid, std::unique_ptr* fifo, +@@ -960,54 +1330,42 @@ int FIFO::read_meta(optional_yield y) { + return read_meta(tid, y); + } + +-struct Reader { ++struct Reader : public Completion { + FIFO* fifo; + cb::list bl; +- lr::AioCompletion* super; + std::uint64_t tid; +- lr::AioCompletion* cur = lr::Rados::aio_create_completion( +- static_cast(this), &FIFO::read_callback); + Reader(FIFO* fifo, lr::AioCompletion* super, std::uint64_t tid) +- : fifo(fifo), super(super), tid(tid) { +- super->pc->get(); +- } +- ~Reader() { +- cur->release(); +- } +-}; ++ : Completion(super), fifo(fifo), tid(tid) {} + +-void FIFO::read_callback(lr::completion_t, void* arg) +-{ +- std::unique_ptr reader(static_cast(arg)); +- auto cct = reader->fifo->cct; +- auto tid = reader->tid; +- ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ +- << " entering: tid=" << tid << dendl; +- auto r = reader->cur->get_return_value(); +- if (r >= 0) try { +- fifo::op::get_meta_reply reply; +- auto iter = reader->bl.cbegin(); +- decode(reply, iter); +- std::unique_lock l(reader->fifo->m); +- if (reply.info.version.same_or_later(reader->fifo->info.version)) { +- reader->fifo->info = std::move(reply.info); +- reader->fifo->part_header_size = reply.part_header_size; +- reader->fifo->part_entry_overhead = reply.part_entry_overhead; +- } +- } catch (const cb::error& err) { ++ void handle(Ptr&& p, int r) { ++ auto cct = fifo->cct; ++ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " entering: tid=" << tid << dendl; ++ if (r >= 0) try { ++ fifo::op::get_meta_reply reply; ++ auto iter = bl.cbegin(); ++ decode(reply, iter); ++ std::unique_lock l(fifo->m); ++ if (reply.info.version.same_or_later(fifo->info.version)) { ++ fifo->info = std::move(reply.info); ++ fifo->part_header_size = reply.part_header_size; ++ fifo->part_entry_overhead = reply.part_entry_overhead; ++ } ++ } catch (const cb::error& err) { ++ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " failed to decode response err=" << err.what() ++ << " tid=" << tid << dendl; ++ r = from_error_code(err.code()); ++ } else { + lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ +- << " failed to decode response err=" << err.what() ++ << " read_meta failed r=" << r + << " tid=" << tid << dendl; +- r = from_error_code(err.code()); +- } else { +- lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ +- << " read_meta failed r=" << r +- << " tid=" << tid << dendl; ++ } ++ complete(std::move(p), r); + } +- complete(reader->super, r); +-} ++}; + +-int FIFO::read_meta(std::uint64_t tid, lr::AioCompletion* c) ++void FIFO::read_meta(std::uint64_t tid, lr::AioCompletion* c) + { + ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " entering: tid=" << tid << dendl; +@@ -1016,16 +1374,10 @@ int FIFO::read_meta(std::uint64_t tid, lr::AioCompletion* c) + cb::list in; + encode(gm, in); + auto reader = std::make_unique(this, c, tid); +- auto r = ioctx.aio_exec(oid, reader->cur, fifo::op::CLASS, +- fifo::op::GET_META, in, &reader->bl); +- if (r < 0) { +- lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ +- << " failed scheduling read_meta r=" << r +- << " tid=" << tid << dendl; +- } else { +- reader.release(); +- } +- return r; ++ auto rp = reader.get(); ++ auto r = ioctx.aio_exec(oid, Reader::call(std::move(reader)), fifo::op::CLASS, ++ fifo::op::GET_META, in, &rp->bl); ++ assert(r >= 0); + } + + const fifo::info& FIFO::meta() const { +@@ -1040,6 +1392,10 @@ int FIFO::push(const cb::list& bl, optional_yield y) { + return push(std::vector{ bl }, y); + } + ++void FIFO::push(const cb::list& bl, lr::AioCompletion* c) { ++ push(std::vector{ bl }, c); ++} ++ + int FIFO::push(const std::vector& data_bufs, optional_yield y) + { + std::unique_lock l(m); +@@ -1153,24 +1509,185 @@ int FIFO::push(const std::vector& data_bufs, optional_yield y) + return 0; + } + +-int FIFO::list(int max_entries, +- std::optional markstr, +- std::vector* presult, bool* pmore, +- optional_yield y) +-{ +- std::unique_lock l(m); +- auto tid = ++next_tid; +- std::int64_t part_num = info.tail_part_num; +- l.unlock(); +- ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ +- << " entering: tid=" << tid << dendl; +- std::uint64_t ofs = 0; +- if (markstr) { +- auto marker = to_marker(*markstr); +- if (!marker) { +- lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ +- << " invalid marker string: " << markstr +- << " tid= "<< tid << dendl; ++struct Pusher : public Completion { ++ FIFO* f; ++ std::deque remaining; ++ std::deque batch; ++ int i = 0; ++ std::uint64_t tid; ++ bool new_heading = false; ++ ++ void prep_then_push(Ptr&& p, const unsigned successes) { ++ std::unique_lock l(f->m); ++ auto max_part_size = f->info.params.max_part_size; ++ auto part_entry_overhead = f->part_entry_overhead; ++ l.unlock(); ++ ++ ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " preparing push: remaining=" << remaining.size() ++ << " batch=" << batch.size() << " i=" << i ++ << " tid=" << tid << dendl; ++ ++ uint64_t batch_len = 0; ++ if (successes > 0) { ++ if (successes == batch.size()) { ++ batch.clear(); ++ } else { ++ batch.erase(batch.begin(), batch.begin() + successes); ++ for (const auto& b : batch) { ++ batch_len += b.length() + part_entry_overhead; ++ } ++ } ++ } ++ ++ if (batch.empty() && remaining.empty()) { ++ complete(std::move(p), 0); ++ return; ++ } ++ ++ while (!remaining.empty() && ++ (remaining.front().length() + batch_len <= max_part_size)) { ++ ++ /* We can send entries with data_len up to max_entry_size, ++ however, we want to also account the overhead when ++ dealing with multiple entries. Previous check doesn't ++ account for overhead on purpose. */ ++ batch_len += remaining.front().length() + part_entry_overhead; ++ batch.push_back(std::move(remaining.front())); ++ remaining.pop_front(); ++ } ++ ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " prepared push: remaining=" << remaining.size() ++ << " batch=" << batch.size() << " i=" << i ++ << " batch_len=" << batch_len ++ << " tid=" << tid << dendl; ++ push(std::move(p)); ++ } ++ ++ void push(Ptr&& p) { ++ f->push_entries(batch, tid, call(std::move(p))); ++ } ++ ++ void new_head(Ptr&& p) { ++ new_heading = true; ++ f->_prepare_new_head(tid, call(std::move(p))); ++ } ++ ++ void handle(Ptr&& p, int r) { ++ if (!new_heading) { ++ if (r == -ERANGE) { ++ ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " need new head tid=" << tid << dendl; ++ new_head(std::move(p)); ++ return; ++ } ++ if (r < 0) { ++ lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " push_entries failed: r=" << r ++ << " tid=" << tid << dendl; ++ complete(std::move(p), r); ++ return; ++ } ++ i = 0; // We've made forward progress, so reset the race counter! ++ prep_then_push(std::move(p), r); ++ } else { ++ if (r < 0) { ++ lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " prepare_new_head failed: r=" << r ++ << " tid=" << tid << dendl; ++ complete(std::move(p), r); ++ return; ++ } ++ new_heading = false; ++ handle_new_head(std::move(p), r); ++ } ++ } ++ ++ void handle_new_head(Ptr&& p, int r) { ++ if (r == -ECANCELED) { ++ if (p->i == MAX_RACE_RETRIES) { ++ lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " canceled too many times, giving up: tid=" << tid << dendl; ++ complete(std::move(p), -ECANCELED); ++ return; ++ } ++ ++p->i; ++ } else if (r) { ++ complete(std::move(p), r); ++ return; ++ } ++ ++ if (p->batch.empty()) { ++ prep_then_push(std::move(p), 0); ++ return; ++ } else { ++ push(std::move(p)); ++ return; ++ } ++ } ++ ++ Pusher(FIFO* f, std::deque&& remaining, ++ std::uint64_t tid, lr::AioCompletion* super) ++ : Completion(super), f(f), remaining(std::move(remaining)), ++ tid(tid) {} ++}; ++ ++void FIFO::push(const std::vector& data_bufs, ++ lr::AioCompletion* c) ++{ ++ std::unique_lock l(m); ++ auto tid = ++next_tid; ++ auto max_entry_size = info.params.max_entry_size; ++ auto need_new_head = info.need_new_head(); ++ l.unlock(); ++ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " entering: tid=" << tid << dendl; ++ auto p = std::make_unique(this, std::deque(data_bufs.begin(), data_bufs.end()), ++ tid, c); ++ // Validate sizes ++ for (const auto& bl : data_bufs) { ++ if (bl.length() > max_entry_size) { ++ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " entry bigger than max_entry_size tid=" << tid << dendl; ++ Pusher::complete(std::move(p), -E2BIG); ++ return; ++ } ++ } ++ ++ if (data_bufs.empty() ) { ++ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " empty push, returning success tid=" << tid << dendl; ++ Pusher::complete(std::move(p), 0); ++ return; ++ } ++ ++ if (need_new_head) { ++ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " need new head tid=" << tid << dendl; ++ p->new_head(std::move(p)); ++ } else { ++ p->prep_then_push(std::move(p), 0); ++ } ++} ++ ++int FIFO::list(int max_entries, ++ std::optional markstr, ++ std::vector* presult, bool* pmore, ++ optional_yield y) ++{ ++ std::unique_lock l(m); ++ auto tid = ++next_tid; ++ std::int64_t part_num = info.tail_part_num; ++ l.unlock(); ++ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " entering: tid=" << tid << dendl; ++ std::uint64_t ofs = 0; ++ if (markstr) { ++ auto marker = to_marker(*markstr); ++ if (!marker) { ++ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " invalid marker string: " << markstr ++ << " tid= "<< tid << dendl; + return -EINVAL; + } + part_num = marker->num; +@@ -1340,157 +1857,116 @@ int FIFO::trim(std::string_view markstr, bool exclusive, optional_yield y) + return 0; + } + +-struct Trimmer { ++struct Trimmer : public Completion { + FIFO* fifo; + std::int64_t part_num; + std::uint64_t ofs; + std::int64_t pn; + bool exclusive; +- lr::AioCompletion* super; + std::uint64_t tid; +- lr::AioCompletion* cur = lr::Rados::aio_create_completion( +- static_cast(this), &FIFO::trim_callback); + bool update = false; + bool canceled = false; + int retries = 0; + + Trimmer(FIFO* fifo, std::int64_t part_num, std::uint64_t ofs, std::int64_t pn, + bool exclusive, lr::AioCompletion* super, std::uint64_t tid) +- : fifo(fifo), part_num(part_num), ofs(ofs), pn(pn), exclusive(exclusive), +- super(super), tid(tid) { +- super->pc->get(); +- } +- ~Trimmer() { +- cur->release(); +- } +-}; ++ : Completion(super), fifo(fifo), part_num(part_num), ofs(ofs), pn(pn), ++ exclusive(exclusive), tid(tid) {} + +-void FIFO::trim_callback(lr::completion_t, void* arg) +-{ +- std::unique_ptr trimmer(static_cast(arg)); +- auto cct = trimmer->fifo->cct; +- auto tid = trimmer->tid; +- ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ +- << " entering: tid=" << tid << dendl; +- int r = trimmer->cur->get_return_value(); +- if (r == -ENOENT) { +- r = 0; +- } +- +- if (r < 0) { +- lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ +- << " trim failed: r=" << r << " tid=" << tid << dendl; +- complete(trimmer->super, r); +- return; +- } +- +- if (!trimmer->update) { ++ void handle(Ptr&& p, int r) { ++ auto cct = fifo->cct; + ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ +- << " handling preceding trim callback: tid=" << tid << dendl; +- trimmer->retries = 0; +- if (trimmer->pn < trimmer->part_num) { +- std::unique_lock l(trimmer->fifo->m); +- const auto max_part_size = trimmer->fifo->info.params.max_part_size; +- l.unlock(); +- trimmer->cur->release(); +- trimmer->cur = lr::Rados::aio_create_completion(arg, &FIFO::trim_callback); +- r = trimmer->fifo->trim_part(trimmer->pn++, max_part_size, std::nullopt, +- false, tid, trimmer->cur); +- if (r < 0) { +- lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ +- << " trim failed: r=" << r << " tid=" << tid << dendl; +- complete(trimmer->super, r); +- } else { +- trimmer.release(); +- } +- return; ++ << " entering: tid=" << tid << dendl; ++ if (r == -ENOENT) { ++ r = 0; + } + +- std::unique_lock l(trimmer->fifo->m); +- const auto tail_part_num = trimmer->fifo->info.tail_part_num; +- l.unlock(); +- trimmer->cur->release(); +- trimmer->cur = lr::Rados::aio_create_completion(arg, &FIFO::trim_callback); +- trimmer->update = true; +- trimmer->canceled = tail_part_num < trimmer->part_num; +- r = trimmer->fifo->trim_part(trimmer->part_num, trimmer->ofs, +- std::nullopt, trimmer->exclusive, tid, trimmer->cur); + if (r < 0) { + lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ +- << " failed scheduling trim: r=" << r << " tid=" << tid << dendl; +- complete(trimmer->super, r); +- } else { +- trimmer.release(); ++ << (update ? " update_meta " : " trim ") << "failed: r=" ++ << r << " tid=" << tid << dendl; ++ complete(std::move(p), r); ++ return; + } +- return; +- } + +- ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ +- << " handling update-needed callback: tid=" << tid << dendl; +- std::unique_lock l(trimmer->fifo->m); +- auto tail_part_num = trimmer->fifo->info.tail_part_num; +- auto objv = trimmer->fifo->info.version; +- l.unlock(); +- if ((tail_part_num < trimmer->part_num) && +- trimmer->canceled) { +- if (trimmer->retries > MAX_RACE_RETRIES) { +- lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ +- << " canceled too many times, giving up: tid=" << tid << dendl; +- complete(trimmer->super, -EIO); ++ if (!update) { ++ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " handling preceding trim callback: tid=" << tid << dendl; ++ retries = 0; ++ if (pn < part_num) { ++ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " pn=" << pn << " tid=" << tid << dendl; ++ std::unique_lock l(fifo->m); ++ const auto max_part_size = fifo->info.params.max_part_size; ++ l.unlock(); ++ fifo->trim_part(pn++, max_part_size, std::nullopt, ++ false, tid, call(std::move(p))); ++ return; ++ } ++ ++ std::unique_lock l(fifo->m); ++ const auto tail_part_num = fifo->info.tail_part_num; ++ l.unlock(); ++ update = true; ++ canceled = tail_part_num < part_num; ++ fifo->trim_part(part_num, ofs, std::nullopt, exclusive, tid, ++ call(std::move(p))); + return; + } +- trimmer->cur->release(); +- trimmer->cur = lr::Rados::aio_create_completion(arg, +- &FIFO::trim_callback); +- ++trimmer->retries; +- r = trimmer->fifo->_update_meta(fifo::update{} +- .tail_part_num(trimmer->part_num), +- objv, &trimmer->canceled, +- tid, trimmer->cur); +- if (r < 0) { +- lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ +- << " failed scheduling _update_meta: r=" +- << r << " tid=" << tid << dendl; +- complete(trimmer->super, r); ++ ++ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " handling update-needed callback: tid=" << tid << dendl; ++ std::unique_lock l(fifo->m); ++ auto tail_part_num = fifo->info.tail_part_num; ++ auto objv = fifo->info.version; ++ l.unlock(); ++ if ((tail_part_num < part_num) && ++ canceled) { ++ if (retries > MAX_RACE_RETRIES) { ++ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " canceled too many times, giving up: tid=" << tid << dendl; ++ complete(std::move(p), -EIO); ++ return; ++ } ++ ++retries; ++ fifo->_update_meta(fifo::update{} ++ .tail_part_num(part_num), objv, &canceled, ++ tid, call(std::move(p))); + } else { +- trimmer.release(); ++ complete(std::move(p), 0); + } +- } else { +- complete(trimmer->super, 0); + } +-} ++}; + +-int FIFO::trim(std::string_view markstr, bool exclusive, lr::AioCompletion* c) { ++void FIFO::trim(std::string_view markstr, bool exclusive, ++ lr::AioCompletion* c) { + auto marker = to_marker(markstr); +- if (!marker) { +- return -EINVAL; +- } ++ auto realmark = marker.value_or(::rgw::cls::fifo::marker{}); + std::unique_lock l(m); + const auto max_part_size = info.params.max_part_size; + const auto pn = info.tail_part_num; + const auto part_oid = info.part_oid(pn); + auto tid = ++next_tid; + l.unlock(); +- auto trimmer = std::make_unique(this, marker->num, marker->ofs, pn, exclusive, c, +- tid); ++ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " entering: tid=" << tid << dendl; ++ auto trimmer = std::make_unique(this, realmark.num, realmark.ofs, ++ pn, exclusive, c, tid); ++ if (!marker) { ++ Trimmer::complete(std::move(trimmer), -EINVAL); ++ return; ++ } + ++trimmer->pn; + auto ofs = marker->ofs; + if (pn < marker->num) { ++ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " pn=" << pn << " tid=" << tid << dendl; + ofs = max_part_size; + } else { + trimmer->update = true; + } +- auto r = trim_part(pn, ofs, std::nullopt, exclusive, +- tid, trimmer->cur); +- if (r < 0) { +- lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ +- << " failed scheduling trim_part: r=" +- << r << " tid=" << tid << dendl; +- complete(trimmer->super, r); +- } else { +- trimmer.release(); +- } +- return r; ++ trim_part(pn, ofs, std::nullopt, exclusive, ++ tid, Trimmer::call(std::move(trimmer))); + } + + int FIFO::get_part_info(int64_t part_num, +@@ -1509,4 +1985,521 @@ int FIFO::get_part_info(int64_t part_num, + } + return r; + } ++ ++void FIFO::get_part_info(int64_t part_num, ++ fifo::part_header* header, ++ lr::AioCompletion* c) ++{ ++ std::unique_lock l(m); ++ const auto part_oid = info.part_oid(part_num); ++ auto tid = ++next_tid; ++ l.unlock(); ++ auto op = rgw::cls::fifo::get_part_info(cct, header, tid); ++ auto r = ioctx.aio_operate(part_oid, c, &op, nullptr); ++ ceph_assert(r >= 0); ++} ++ ++struct InfoGetter : Completion { ++ FIFO* fifo; ++ fifo::part_header header; ++ fu2::function f; ++ std::uint64_t tid; ++ bool headerread = false; ++ ++ InfoGetter(FIFO* fifo, fu2::function f, ++ std::uint64_t tid, lr::AioCompletion* super) ++ : Completion(super), fifo(fifo), f(std::move(f)), tid(tid) {} ++ void handle(Ptr&& p, int r) { ++ if (!headerread) { ++ if (r < 0) { ++ lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " read_meta failed: r=" ++ << r << " tid=" << tid << dendl; ++ if (f) ++ f(r, {}); ++ complete(std::move(p), r); ++ return; ++ } ++ ++ auto info = fifo->meta(); ++ auto hpn = info.head_part_num; ++ if (hpn < 0) { ++ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " no head, returning empty partinfo r=" ++ << r << " tid=" << tid << dendl; ++ if (f) ++ f(0, {}); ++ complete(std::move(p), r); ++ return; ++ } ++ headerread = true; ++ auto op = rgw::cls::fifo::get_part_info(fifo->cct, &header, tid); ++ std::unique_lock l(fifo->m); ++ auto oid = fifo->info.part_oid(hpn); ++ l.unlock(); ++ r = fifo->ioctx.aio_operate(oid, call(std::move(p)), &op, ++ nullptr); ++ ceph_assert(r >= 0); ++ return; ++ } ++ ++ if (r < 0) { ++ lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " get_part_info failed: r=" ++ << r << " tid=" << tid << dendl; ++ } ++ ++ if (f) ++ f(r, std::move(header)); ++ complete(std::move(p), r); ++ return; ++ } ++}; ++ ++void FIFO::get_head_info(fu2::unique_function f, ++ lr::AioCompletion* c) ++{ ++ std::unique_lock l(m); ++ auto tid = ++next_tid; ++ l.unlock(); ++ auto ig = std::make_unique(this, std::move(f), tid, c); ++ read_meta(tid, InfoGetter::call(std::move(ig))); ++} ++ ++struct JournalProcessor : public Completion { ++private: ++ FIFO* const fifo; ++ ++ std::vector processed; ++ std::multimap journal; ++ std::multimap::iterator iter; ++ std::int64_t new_tail; ++ std::int64_t new_head; ++ std::int64_t new_max; ++ int race_retries = 0; ++ bool first_pp = true; ++ bool canceled = false; ++ std::uint64_t tid; ++ ++ enum { ++ entry_callback, ++ pp_callback, ++ } state; ++ ++ void create_part(Ptr&& p, int64_t part_num, ++ std::string_view tag) { ++ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " entering: tid=" << tid << dendl; ++ state = entry_callback; ++ lr::ObjectWriteOperation op; ++ op.create(false); /* We don't need exclusivity, part_init ensures ++ we're creating from the same journal entry. */ ++ std::unique_lock l(fifo->m); ++ part_init(&op, tag, fifo->info.params); ++ auto oid = fifo->info.part_oid(part_num); ++ l.unlock(); ++ auto r = fifo->ioctx.aio_operate(oid, call(std::move(p)), &op); ++ ceph_assert(r >= 0); ++ return; ++ } ++ ++ void remove_part(Ptr&& p, int64_t part_num, ++ std::string_view tag) { ++ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " entering: tid=" << tid << dendl; ++ state = entry_callback; ++ lr::ObjectWriteOperation op; ++ op.remove(); ++ std::unique_lock l(fifo->m); ++ auto oid = fifo->info.part_oid(part_num); ++ l.unlock(); ++ auto r = fifo->ioctx.aio_operate(oid, call(std::move(p)), &op); ++ ceph_assert(r >= 0); ++ return; ++ } ++ ++ void finish_je(Ptr&& p, int r, ++ const fifo::journal_entry& entry) { ++ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " entering: tid=" << tid << dendl; ++ ++ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " finishing entry: entry=" << entry ++ << " tid=" << tid << dendl; ++ ++ if (entry.op == fifo::journal_entry::Op::remove && r == -ENOENT) ++ r = 0; ++ ++ if (r < 0) { ++ lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " processing entry failed: entry=" << entry ++ << " r=" << r << " tid=" << tid << dendl; ++ complete(std::move(p), r); ++ return; ++ } else { ++ switch (entry.op) { ++ case fifo::journal_entry::Op::unknown: ++ case fifo::journal_entry::Op::set_head: ++ // Can't happen. Filtered out in process. ++ complete(std::move(p), -EIO); ++ return; ++ ++ case fifo::journal_entry::Op::create: ++ if (entry.part_num > new_max) { ++ new_max = entry.part_num; ++ } ++ break; ++ case fifo::journal_entry::Op::remove: ++ if (entry.part_num >= new_tail) { ++ new_tail = entry.part_num + 1; ++ } ++ break; ++ } ++ processed.push_back(entry); ++ } ++ ++iter; ++ process(std::move(p)); ++ } ++ ++ void postprocess(Ptr&& p) { ++ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " entering: tid=" << tid << dendl; ++ if (processed.empty()) { ++ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " nothing to update any more: race_retries=" ++ << race_retries << " tid=" << tid << dendl; ++ complete(std::move(p), 0); ++ return; ++ } ++ pp_run(std::move(p), 0, false); ++ } ++ ++public: ++ ++ JournalProcessor(FIFO* fifo, std::uint64_t tid, lr::AioCompletion* super) ++ : Completion(super), fifo(fifo), tid(tid) { ++ std::unique_lock l(fifo->m); ++ journal = fifo->info.journal; ++ iter = journal.begin(); ++ new_tail = fifo->info.tail_part_num; ++ new_head = fifo->info.head_part_num; ++ new_max = fifo->info.max_push_part_num; ++ } ++ ++ void pp_run(Ptr&& p, int r, bool canceled) { ++ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " entering: tid=" << tid << dendl; ++ std::optional tail_part_num; ++ std::optional head_part_num; ++ std::optional max_part_num; ++ ++ if (r < 0) { ++ lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " failed, r=: " << r << " tid=" << tid << dendl; ++ complete(std::move(p), r); ++ } ++ ++ ++ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " postprocessing: race_retries=" ++ << race_retries << " tid=" << tid << dendl; ++ ++ if (!first_pp && r == 0 && !canceled) { ++ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " nothing to update any more: race_retries=" ++ << race_retries << " tid=" << tid << dendl; ++ complete(std::move(p), 0); ++ return; ++ } ++ ++ first_pp = false; ++ ++ if (canceled) { ++ if (race_retries >= MAX_RACE_RETRIES) { ++ lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " canceled too many times, giving up: tid=" ++ << tid << dendl; ++ complete(std::move(p), -ECANCELED); ++ return; ++ } ++ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " update canceled, retrying: race_retries=" ++ << race_retries << " tid=" << tid << dendl; ++ ++ ++race_retries; ++ ++ std::vector new_processed; ++ std::unique_lock l(fifo->m); ++ for (auto& e : processed) { ++ auto jiter = fifo->info.journal.find(e.part_num); ++ /* journal entry was already processed */ ++ if (jiter == fifo->info.journal.end() || ++ !(jiter->second == e)) { ++ continue; ++ } ++ new_processed.push_back(e); ++ } ++ processed = std::move(new_processed); ++ } ++ ++ std::unique_lock l(fifo->m); ++ auto objv = fifo->info.version; ++ if (new_tail > fifo->info.tail_part_num) { ++ tail_part_num = new_tail; ++ } ++ ++ if (new_head > fifo->info.head_part_num) { ++ head_part_num = new_head; ++ } ++ ++ if (new_max > fifo->info.max_push_part_num) { ++ max_part_num = new_max; ++ } ++ l.unlock(); ++ ++ if (processed.empty() && ++ !tail_part_num && ++ !max_part_num) { ++ /* nothing to update anymore */ ++ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " nothing to update any more: race_retries=" ++ << race_retries << " tid=" << tid << dendl; ++ complete(std::move(p), 0); ++ return; ++ } ++ state = pp_callback; ++ fifo->_update_meta(fifo::update{} ++ .tail_part_num(tail_part_num) ++ .head_part_num(head_part_num) ++ .max_push_part_num(max_part_num) ++ .journal_entries_rm(processed), ++ objv, &this->canceled, tid, call(std::move(p))); ++ return; ++ } ++ ++ JournalProcessor(const JournalProcessor&) = delete; ++ JournalProcessor& operator =(const JournalProcessor&) = delete; ++ JournalProcessor(JournalProcessor&&) = delete; ++ JournalProcessor& operator =(JournalProcessor&&) = delete; ++ ++ void process(Ptr&& p) { ++ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " entering: tid=" << tid << dendl; ++ while (iter != journal.end()) { ++ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " processing entry: entry=" << *iter ++ << " tid=" << tid << dendl; ++ const auto entry = iter->second; ++ switch (entry.op) { ++ case fifo::journal_entry::Op::create: ++ create_part(std::move(p), entry.part_num, entry.part_tag); ++ return; ++ case fifo::journal_entry::Op::set_head: ++ if (entry.part_num > new_head) { ++ new_head = entry.part_num; ++ } ++ processed.push_back(entry); ++ ++iter; ++ continue; ++ case fifo::journal_entry::Op::remove: ++ remove_part(std::move(p), entry.part_num, entry.part_tag); ++ return; ++ default: ++ lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " unknown journaled op: entry=" << entry << " tid=" ++ << tid << dendl; ++ complete(std::move(p), -EIO); ++ return; ++ } ++ } ++ postprocess(std::move(p)); ++ return; ++ } ++ ++ void handle(Ptr&& p, int r) { ++ ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " entering: tid=" << tid << dendl; ++ switch (state) { ++ case entry_callback: ++ finish_je(std::move(p), r, iter->second); ++ return; ++ case pp_callback: ++ auto c = canceled; ++ canceled = false; ++ pp_run(std::move(p), r, c); ++ return; ++ } ++ ++ abort(); ++ } ++ ++}; ++ ++void FIFO::process_journal(std::uint64_t tid, lr::AioCompletion* c) { ++ auto p = std::make_unique(this, tid, c); ++ p->process(std::move(p)); ++} ++ ++struct Lister : Completion { ++ FIFO* f; ++ std::vector result; ++ bool more = false; ++ std::int64_t part_num; ++ std::uint64_t ofs; ++ int max_entries; ++ int r_out = 0; ++ std::vector entries; ++ bool part_more = false; ++ bool part_full = false; ++ std::vector* entries_out; ++ bool* more_out; ++ std::uint64_t tid; ++ ++ bool read = false; ++ ++ void complete(Ptr&& p, int r) { ++ if (r >= 0) { ++ if (more_out) *more_out = more; ++ if (entries_out) *entries_out = std::move(result); ++ } ++ Completion::complete(std::move(p), r); ++ } ++ ++public: ++ Lister(FIFO* f, std::int64_t part_num, std::uint64_t ofs, int max_entries, ++ std::vector* entries_out, bool* more_out, ++ std::uint64_t tid, lr::AioCompletion* super) ++ : Completion(super), f(f), part_num(part_num), ofs(ofs), max_entries(max_entries), ++ entries_out(entries_out), more_out(more_out), tid(tid) { ++ result.reserve(max_entries); ++ } ++ ++ Lister(const Lister&) = delete; ++ Lister& operator =(const Lister&) = delete; ++ Lister(Lister&&) = delete; ++ Lister& operator =(Lister&&) = delete; ++ ++ void handle(Ptr&& p, int r) { ++ if (read) ++ handle_read(std::move(p), r); ++ else ++ handle_list(std::move(p), r); ++ } ++ ++ void list(Ptr&& p) { ++ if (max_entries > 0) { ++ part_more = false; ++ part_full = false; ++ entries.clear(); ++ ++ std::unique_lock l(f->m); ++ auto part_oid = f->info.part_oid(part_num); ++ l.unlock(); ++ ++ read = false; ++ auto op = list_part(f->cct, {}, ofs, max_entries, &r_out, ++ &entries, &part_more, &part_full, ++ nullptr, tid); ++ f->ioctx.aio_operate(part_oid, call(std::move(p)), &op, nullptr); ++ } else { ++ complete(std::move(p), 0); ++ } ++ } ++ ++ void handle_read(Ptr&& p, int r) { ++ read = false; ++ if (r >= 0) r = r_out; ++ r_out = 0; ++ ++ if (r < 0) { ++ complete(std::move(p), r); ++ return; ++ } ++ ++ if (part_num < f->info.tail_part_num) { ++ /* raced with trim? restart */ ++ max_entries += result.size(); ++ result.clear(); ++ part_num = f->info.tail_part_num; ++ ofs = 0; ++ list(std::move(p)); ++ return; ++ } ++ /* assuming part was not written yet, so end of data */ ++ more = false; ++ complete(std::move(p), 0); ++ return; ++ } ++ ++ void handle_list(Ptr&& p, int r) { ++ if (r >= 0) r = r_out; ++ r_out = 0; ++ std::unique_lock l(f->m); ++ auto part_oid = f->info.part_oid(part_num); ++ l.unlock(); ++ if (r == -ENOENT) { ++ read = true; ++ f->read_meta(tid, call(std::move(p))); ++ return; ++ } ++ if (r < 0) { ++ complete(std::move(p), r); ++ return; ++ } ++ ++ more = part_full || part_more; ++ for (auto& entry : entries) { ++ list_entry e; ++ e.data = std::move(entry.data); ++ e.marker = marker{part_num, entry.ofs}.to_string(); ++ e.mtime = entry.mtime; ++ result.push_back(std::move(e)); ++ } ++ max_entries -= entries.size(); ++ entries.clear(); ++ if (max_entries > 0 && part_more) { ++ list(std::move(p)); ++ return; ++ } ++ ++ if (!part_full) { /* head part is not full */ ++ complete(std::move(p), 0); ++ return; ++ } ++ ++part_num; ++ ofs = 0; ++ list(std::move(p)); ++ } ++}; ++ ++void FIFO::list(int max_entries, ++ std::optional markstr, ++ std::vector* out, ++ bool* more, ++ lr::AioCompletion* c) { ++ std::unique_lock l(m); ++ auto tid = ++next_tid; ++ std::int64_t part_num = info.tail_part_num; ++ l.unlock(); ++ std::uint64_t ofs = 0; ++ std::optional<::rgw::cls::fifo::marker> marker; ++ ++ if (markstr) { ++ marker = to_marker(*markstr); ++ if (marker) { ++ part_num = marker->num; ++ ofs = marker->ofs; ++ } ++ } ++ ++ auto ls = std::make_unique(this, part_num, ofs, max_entries, out, ++ more, tid, c); ++ if (markstr && !marker) { ++ auto l = ls.get(); ++ l->complete(std::move(ls), -EINVAL); ++ } else { ++ ls->list(std::move(ls)); ++ } ++} + } +diff --git a/src/rgw/cls_fifo_legacy.h b/src/rgw/cls_fifo_legacy.h +index 1f8d3f3fc95d8..b6b5f04bb30ad 100644 +--- a/src/rgw/cls_fifo_legacy.h ++++ b/src/rgw/cls_fifo_legacy.h +@@ -31,6 +31,7 @@ + + #include "include/rados/librados.hpp" + #include "include/buffer.h" ++#include "include/function2.hpp" + + #include "common/async/yield_context.h" + +@@ -57,24 +58,6 @@ int get_meta(lr::IoCtx& ioctx, const std::string& oid, + std::uint32_t* part_entry_overhead, + std::uint64_t tid, optional_yield y, + bool probe = false); +-void update_meta(lr::ObjectWriteOperation* op, const fifo::objv& objv, +- const fifo::update& update); +-void part_init(lr::ObjectWriteOperation* op, std::string_view tag, +- fifo::data_params params); +-int push_part(lr::IoCtx& ioctx, const std::string& oid, std::string_view tag, +- std::deque data_bufs, std::uint64_t tid, optional_yield y); +-void trim_part(lr::ObjectWriteOperation* op, +- std::optional tag, std::uint64_t ofs, +- bool exclusive); +-int list_part(lr::IoCtx& ioctx, const std::string& oid, +- std::optional tag, std::uint64_t ofs, +- std::uint64_t max_entries, +- std::vector* entries, +- bool* more, bool* full_part, std::string* ptag, +- std::uint64_t tid, optional_yield y); +-int get_part_info(lr::IoCtx& ioctx, const std::string& oid, +- fifo::part_header* header, std::uint64_t, +- optional_yield y); + + struct marker { + std::int64_t num = 0; +@@ -117,6 +100,12 @@ class FIFO { + friend struct Reader; + friend struct Updater; + friend struct Trimmer; ++ friend struct InfoGetter; ++ friend struct Pusher; ++ friend struct NewPartPreparer; ++ friend struct NewHeadPreparer; ++ friend struct JournalProcessor; ++ friend struct Lister; + + mutable lr::IoCtx ioctx; + CephContext* cct = static_cast(ioctx.cct()); +@@ -144,32 +133,34 @@ class FIFO { + int _update_meta(const fifo::update& update, + fifo::objv version, bool* pcanceled, + std::uint64_t tid, optional_yield y); +- int _update_meta(const fifo::update& update, +- fifo::objv version, bool* pcanceled, +- std::uint64_t tid, lr::AioCompletion* c); ++ void _update_meta(const fifo::update& update, ++ fifo::objv version, bool* pcanceled, ++ std::uint64_t tid, lr::AioCompletion* c); + int create_part(int64_t part_num, std::string_view tag, std::uint64_t tid, + optional_yield y); + int remove_part(int64_t part_num, std::string_view tag, std::uint64_t tid, + optional_yield y); + int process_journal(std::uint64_t tid, optional_yield y); ++ void process_journal(std::uint64_t tid, lr::AioCompletion* c); + int _prepare_new_part(bool is_head, std::uint64_t tid, optional_yield y); ++ void _prepare_new_part(bool is_head, std::uint64_t tid, lr::AioCompletion* c); + int _prepare_new_head(std::uint64_t tid, optional_yield y); ++ void _prepare_new_head(std::uint64_t tid, lr::AioCompletion* c); + int push_entries(const std::deque& data_bufs, + std::uint64_t tid, optional_yield y); ++ void push_entries(const std::deque& data_bufs, ++ std::uint64_t tid, lr::AioCompletion* c); + int trim_part(int64_t part_num, uint64_t ofs, + std::optional tag, bool exclusive, + std::uint64_t tid, optional_yield y); +- int trim_part(int64_t part_num, uint64_t ofs, +- std::optional tag, bool exclusive, +- std::uint64_t tid, lr::AioCompletion* c); ++ void trim_part(int64_t part_num, uint64_t ofs, ++ std::optional tag, bool exclusive, ++ std::uint64_t tid, lr::AioCompletion* c); + +- static void trim_callback(lr::completion_t, void* arg); +- static void update_callback(lr::completion_t, void* arg); +- static void read_callback(lr::completion_t, void* arg); + /// Force refresh of metadata, yielding/blocking style + int read_meta(std::uint64_t tid, optional_yield y); + /// Force refresh of metadata, with a librados Completion +- int read_meta(std::uint64_t tid, lr::AioCompletion* c); ++ void read_meta(std::uint64_t tid, lr::AioCompletion* c); + + public: + +@@ -215,12 +206,20 @@ class FIFO { + int push(const cb::list& bl, //< Entry to push + optional_yield y //< Optional yield + ); +- /// Push entres to the FIFO ++ /// Push an entry to the FIFO ++ void push(const cb::list& bl, //< Entry to push ++ lr::AioCompletion* c //< Async Completion ++ ); ++ /// Push entries to the FIFO + int push(const std::vector& data_bufs, //< Entries to push +- /// Optional yield +- optional_yield y); ++ optional_yield y //< Optional yield ++ ); ++ /// Push entries to the FIFO ++ void push(const std::vector& data_bufs, //< Entries to push ++ lr::AioCompletion* c //< Async Completion ++ ); + /// List entries +- int list(int max_entries, /// Maximum entries to list ++ int list(int max_entries, //< Maximum entries to list + /// Point after which to begin listing. Start at tail if null + std::optional markstr, + std::vector* out, //< OUT: entries +@@ -228,6 +227,14 @@ class FIFO { + bool* more, + optional_yield y //< Optional yield + ); ++ void list(int max_entries, //< Maximum entries to list ++ /// Point after which to begin listing. Start at tail if null ++ std::optional markstr, ++ std::vector* out, //< OUT: entries ++ /// OUT: True if more entries in FIFO beyond the last returned ++ bool* more, ++ lr::AioCompletion* c //< Async Completion ++ ); + /// Trim entries, coroutine/block style + int trim(std::string_view markstr, //< Position to which to trim, inclusive + bool exclusive, //< If true, do not trim the target entry +@@ -235,16 +242,28 @@ class FIFO { + optional_yield y //< Optional yield + ); + /// Trim entries, librados AioCompletion style +- int trim(std::string_view markstr, //< Position to which to trim, inclusive +- bool exclusive, //< If true, do not trim the target entry +- //< itself, just all those before it. +- lr::AioCompletion* c //< librados AIO Completion ++ void trim(std::string_view markstr, //< Position to which to trim, inclusive ++ bool exclusive, //< If true, do not trim the target entry ++ //< itself, just all those before it. ++ lr::AioCompletion* c //< librados AIO Completion + ); + /// Get part info + int get_part_info(int64_t part_num, /// Part number + fifo::part_header* header, //< OUT: Information + optional_yield y //< Optional yield + ); ++ /// Get part info ++ void get_part_info(int64_t part_num, //< Part number ++ fifo::part_header* header, //< OUT: Information ++ lr::AioCompletion* c //< AIO Completion ++ ); ++ /// A convenience method to fetch the part information for the FIFO ++ /// head, using librados::AioCompletion, since ++ /// libradio::AioCompletions compose lousily. ++ void get_head_info(fu2::unique_function< //< Function to receive info ++ void(int r, fifo::part_header&&)>, ++ lr::AioCompletion* c //< AIO Completion ++ ); + }; + } + +diff --git a/src/rgw/rgw_datalog.cc b/src/rgw/rgw_datalog.cc +index a875d075ecade..8142b26e01a8b 100644 +--- a/src/rgw/rgw_datalog.cc ++++ b/src/rgw/rgw_datalog.cc +@@ -469,12 +469,7 @@ class RGWDataChangesFIFO final : public RGWDataChangesBE { + pc->cond.notify_all(); + pc->put_unlock(); + } else { +- r = fifos[index]->trim(marker, false, c); +- if (r < 0) { +- lderr(cct) << __PRETTY_FUNCTION__ +- << ": unable to trim FIFO: " << get_oid(index) +- << ": " << cpp_strerror(-r) << dendl; +- } ++ fifos[index]->trim(marker, false, c); + } + return r; + } +diff --git a/src/test/rgw/test_cls_fifo_legacy.cc b/src/test/rgw/test_cls_fifo_legacy.cc +index dae4980f8dca4..69cee5a887405 100644 +--- a/src/test/rgw/test_cls_fifo_legacy.cc ++++ b/src/test/rgw/test_cls_fifo_legacy.cc +@@ -69,6 +69,8 @@ class LegacyFIFO : public testing::Test { + }; + + using LegacyClsFIFO = LegacyFIFO; ++using AioLegacyFIFO = LegacyFIFO; ++ + + TEST_F(LegacyClsFIFO, TestCreate) + { +@@ -577,8 +579,7 @@ TEST_F(LegacyFIFO, TestAioTrim) + marker = result.front().marker; + std::unique_ptr c(rados.aio_create_completion(nullptr, + nullptr)); +- r = f->trim(*marker, false, c.get()); +- ASSERT_EQ(0, r); ++ f->trim(*marker, false, c.get()); + c->wait_for_complete(); + r = c->get_return_value(); + ASSERT_EQ(0, r); +@@ -645,3 +646,482 @@ TEST_F(LegacyFIFO, TestTrimExclusive) { + ASSERT_EQ(result.size(), 1); + ASSERT_EQ(max_entries - 1, val); + } ++ ++TEST_F(AioLegacyFIFO, TestPushListTrim) ++{ ++ std::unique_ptr f; ++ auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield); ++ ASSERT_EQ(0, r); ++ static constexpr auto max_entries = 10u; ++ for (uint32_t i = 0; i < max_entries; ++i) { ++ cb::list bl; ++ encode(i, bl); ++ auto c = R::Rados::aio_create_completion(); ++ f->push(bl, c); ++ c->wait_for_complete(); ++ r = c->get_return_value(); ++ c->release(); ++ ASSERT_EQ(0, r); ++ } ++ ++ std::optional marker; ++ /* get entries one by one */ ++ std::vector result; ++ bool more = false; ++ for (auto i = 0u; i < max_entries; ++i) { ++ auto c = R::Rados::aio_create_completion(); ++ f->list(1, marker, &result, &more, c); ++ c->wait_for_complete(); ++ r = c->get_return_value(); ++ c->release(); ++ ASSERT_EQ(0, r); ++ ++ bool expected_more = (i != (max_entries - 1)); ++ ASSERT_EQ(expected_more, more); ++ ASSERT_EQ(1, result.size()); ++ ++ std::uint32_t val; ++ std::tie(val, marker) = decode_entry(result.front()); ++ ++ ASSERT_EQ(i, val); ++ result.clear(); ++ } ++ ++ /* get all entries at once */ ++ std::string markers[max_entries]; ++ std::uint32_t min_entry = 0; ++ auto c = R::Rados::aio_create_completion(); ++ f->list(max_entries * 10, std::nullopt, &result, &more, c); ++ c->wait_for_complete(); ++ r = c->get_return_value(); ++ c->release(); ++ ASSERT_EQ(0, r); ++ ++ ASSERT_FALSE(more); ++ ASSERT_EQ(max_entries, result.size()); ++ for (auto i = 0u; i < max_entries; ++i) { ++ std::uint32_t val; ++ std::tie(val, markers[i]) = decode_entry(result[i]); ++ ASSERT_EQ(i, val); ++ } ++ ++ /* trim one entry */ ++ c = R::Rados::aio_create_completion(); ++ f->trim(markers[min_entry], false, c); ++ c->wait_for_complete(); ++ r = c->get_return_value(); ++ c->release(); ++ ASSERT_EQ(0, r); ++ ++min_entry; ++ ++ c = R::Rados::aio_create_completion(); ++ f->list(max_entries * 10, std::nullopt, &result, &more, c); ++ c->wait_for_complete(); ++ r = c->get_return_value(); ++ c->release(); ++ ASSERT_EQ(0, r); ++ ASSERT_FALSE(more); ++ ASSERT_EQ(max_entries - min_entry, result.size()); ++ ++ for (auto i = min_entry; i < max_entries; ++i) { ++ std::uint32_t val; ++ std::tie(val, markers[i - min_entry]) = ++ decode_entry(result[i - min_entry]); ++ EXPECT_EQ(i, val); ++ } ++} ++ ++ ++TEST_F(AioLegacyFIFO, TestPushTooBig) ++{ ++ static constexpr auto max_part_size = 2048ull; ++ static constexpr auto max_entry_size = 128ull; ++ ++ std::unique_ptr f; ++ auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield, std::nullopt, ++ std::nullopt, false, max_part_size, max_entry_size); ++ ASSERT_EQ(0, r); ++ ++ char buf[max_entry_size + 1]; ++ memset(buf, 0, sizeof(buf)); ++ ++ cb::list bl; ++ bl.append(buf, sizeof(buf)); ++ ++ auto c = R::Rados::aio_create_completion(); ++ f->push(bl, c); ++ c->wait_for_complete(); ++ r = c->get_return_value(); ++ ASSERT_EQ(-E2BIG, r); ++ c->release(); ++ ++ c = R::Rados::aio_create_completion(); ++ f->push(std::vector{}, c); ++ c->wait_for_complete(); ++ r = c->get_return_value(); ++ c->release(); ++ EXPECT_EQ(0, r); ++} ++ ++ ++TEST_F(AioLegacyFIFO, TestMultipleParts) ++{ ++ static constexpr auto max_part_size = 2048ull; ++ static constexpr auto max_entry_size = 128ull; ++ std::unique_ptr f; ++ auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield, std::nullopt, ++ std::nullopt, false, max_part_size, ++ max_entry_size); ++ ASSERT_EQ(0, r); ++ ++ { ++ auto c = R::Rados::aio_create_completion(); ++ f->get_head_info([&](int r, RCf::part_info&& p) { ++ ASSERT_TRUE(p.tag.empty()); ++ ASSERT_EQ(0, p.magic); ++ ASSERT_EQ(0, p.min_ofs); ++ ASSERT_EQ(0, p.last_ofs); ++ ASSERT_EQ(0, p.next_ofs); ++ ASSERT_EQ(0, p.min_index); ++ ASSERT_EQ(0, p.max_index); ++ ASSERT_EQ(ceph::real_time{}, p.max_time); ++ }, c); ++ c->wait_for_complete(); ++ r = c->get_return_value(); ++ c->release(); ++ } ++ ++ char buf[max_entry_size]; ++ memset(buf, 0, sizeof(buf)); ++ const auto [part_header_size, part_entry_overhead] = ++ f->get_part_layout_info(); ++ const auto entries_per_part = ((max_part_size - part_header_size) / ++ (max_entry_size + part_entry_overhead)); ++ const auto max_entries = entries_per_part * 4 + 1; ++ /* push enough entries */ ++ for (auto i = 0u; i < max_entries; ++i) { ++ cb::list bl; ++ *(int *)buf = i; ++ bl.append(buf, sizeof(buf)); ++ auto c = R::Rados::aio_create_completion(); ++ f->push(bl, c); ++ c->wait_for_complete(); ++ r = c->get_return_value(); ++ c->release(); ++ EXPECT_EQ(0, r); ++ } ++ ++ auto info = f->meta(); ++ ASSERT_EQ(info.id, fifo_id); ++ /* head should have advanced */ ++ ASSERT_GT(info.head_part_num, 0); ++ ++ /* list all at once */ ++ std::vector result; ++ bool more = false; ++ auto c = R::Rados::aio_create_completion(); ++ f->list(max_entries, std::nullopt, &result, &more, c); ++ c->wait_for_complete(); ++ r = c->get_return_value(); ++ c->release(); ++ EXPECT_EQ(0, r); ++ EXPECT_EQ(false, more); ++ ASSERT_EQ(max_entries, result.size()); ++ ++ for (auto i = 0u; i < max_entries; ++i) { ++ auto& bl = result[i].data; ++ ASSERT_EQ(i, *(int *)bl.c_str()); ++ } ++ ++ std::optional marker; ++ /* get entries one by one */ ++ ++ for (auto i = 0u; i < max_entries; ++i) { ++ c = R::Rados::aio_create_completion(); ++ f->list(1, marker, &result, &more, c); ++ c->wait_for_complete(); ++ r = c->get_return_value(); ++ c->release(); ++ EXPECT_EQ(0, r); ++ ASSERT_EQ(result.size(), 1); ++ const bool expected_more = (i != (max_entries - 1)); ++ ASSERT_EQ(expected_more, more); ++ ++ std::uint32_t val; ++ std::tie(val, marker) = decode_entry(result.front()); ++ ++ auto& entry = result.front(); ++ auto& bl = entry.data; ++ ASSERT_EQ(i, *(int *)bl.c_str()); ++ marker = entry.marker; ++ } ++ ++ /* trim one at a time */ ++ marker.reset(); ++ for (auto i = 0u; i < max_entries; ++i) { ++ /* read single entry */ ++ c = R::Rados::aio_create_completion(); ++ f->list(1, marker, &result, &more, c); ++ c->wait_for_complete(); ++ r = c->get_return_value(); ++ c->release(); ++ EXPECT_EQ(0, r); ++ ASSERT_EQ(result.size(), 1); ++ const bool expected_more = (i != (max_entries - 1)); ++ ASSERT_EQ(expected_more, more); ++ ++ marker = result.front().marker; ++ c = R::Rados::aio_create_completion(); ++ f->trim(*marker, false, c); ++ c->wait_for_complete(); ++ r = c->get_return_value(); ++ c->release(); ++ EXPECT_EQ(0, r); ++ ASSERT_EQ(result.size(), 1); ++ ++ /* check tail */ ++ info = f->meta(); ++ ASSERT_EQ(info.tail_part_num, i / entries_per_part); ++ ++ /* try to read all again, see how many entries left */ ++ c = R::Rados::aio_create_completion(); ++ f->list(max_entries, marker, &result, &more, c); ++ c->wait_for_complete(); ++ r = c->get_return_value(); ++ c->release(); ++ EXPECT_EQ(0, r); ++ ASSERT_EQ(max_entries - i - 1, result.size()); ++ ASSERT_EQ(false, more); ++ } ++ ++ /* tail now should point at head */ ++ info = f->meta(); ++ ASSERT_EQ(info.head_part_num, info.tail_part_num); ++ ++ /* check old tails are removed */ ++ for (auto i = 0; i < info.tail_part_num; ++i) { ++ c = R::Rados::aio_create_completion(); ++ RCf::part_info partinfo; ++ f->get_part_info(i, &partinfo, c); ++ c->wait_for_complete(); ++ r = c->get_return_value(); ++ c->release(); ++ ASSERT_EQ(-ENOENT, r); ++ } ++ /* check current tail exists */ ++ std::uint64_t next_ofs; ++ { ++ c = R::Rados::aio_create_completion(); ++ RCf::part_info partinfo; ++ f->get_part_info(info.tail_part_num, &partinfo, c); ++ c->wait_for_complete(); ++ r = c->get_return_value(); ++ c->release(); ++ next_ofs = partinfo.next_ofs; ++ } ++ ASSERT_EQ(0, r); ++ ++ c = R::Rados::aio_create_completion(); ++ f->get_head_info([&](int r, RCf::part_info&& p) { ++ ASSERT_EQ(next_ofs, p.next_ofs); ++ }, c); ++ c->wait_for_complete(); ++ r = c->get_return_value(); ++ c->release(); ++ ASSERT_EQ(0, r); ++} ++ ++TEST_F(AioLegacyFIFO, TestTwoPushers) ++{ ++ static constexpr auto max_part_size = 2048ull; ++ static constexpr auto max_entry_size = 128ull; ++ ++ std::unique_ptr f; ++ auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield, std::nullopt, ++ std::nullopt, false, max_part_size, ++ max_entry_size); ++ ASSERT_EQ(0, r); ++ char buf[max_entry_size]; ++ memset(buf, 0, sizeof(buf)); ++ ++ auto [part_header_size, part_entry_overhead] = f->get_part_layout_info(); ++ const auto entries_per_part = ((max_part_size - part_header_size) / ++ (max_entry_size + part_entry_overhead)); ++ const auto max_entries = entries_per_part * 4 + 1; ++ std::unique_ptr f2; ++ r = RCf::FIFO::open(ioctx, fifo_id, &f2, null_yield); ++ std::vector fifos{&f, &f2}; ++ ++ for (auto i = 0u; i < max_entries; ++i) { ++ cb::list bl; ++ *(int *)buf = i; ++ bl.append(buf, sizeof(buf)); ++ auto& f = *fifos[i % fifos.size()]; ++ auto c = R::Rados::aio_create_completion(); ++ f->push(bl, c); ++ c->wait_for_complete(); ++ r = c->get_return_value(); ++ c->release(); ++ ASSERT_EQ(0, r); ++ } ++ ++ /* list all by both */ ++ std::vector result; ++ bool more = false; ++ auto c = R::Rados::aio_create_completion(); ++ f2->list(max_entries, std::nullopt, &result, &more, c); ++ c->wait_for_complete(); ++ r = c->get_return_value(); ++ c->release(); ++ ASSERT_EQ(0, r); ++ ASSERT_EQ(false, more); ++ ASSERT_EQ(max_entries, result.size()); ++ ++ c = R::Rados::aio_create_completion(); ++ f2->list(max_entries, std::nullopt, &result, &more, c); ++ c->wait_for_complete(); ++ r = c->get_return_value(); ++ c->release(); ++ ASSERT_EQ(0, r); ++ ASSERT_EQ(false, more); ++ ASSERT_EQ(max_entries, result.size()); ++ ++ for (auto i = 0u; i < max_entries; ++i) { ++ auto& bl = result[i].data; ++ ASSERT_EQ(i, *(int *)bl.c_str()); ++ } ++} ++ ++TEST_F(AioLegacyFIFO, TestTwoPushersTrim) ++{ ++ static constexpr auto max_part_size = 2048ull; ++ static constexpr auto max_entry_size = 128ull; ++ std::unique_ptr f1; ++ auto r = RCf::FIFO::create(ioctx, fifo_id, &f1, null_yield, std::nullopt, ++ std::nullopt, false, max_part_size, ++ max_entry_size); ++ ASSERT_EQ(0, r); ++ ++ char buf[max_entry_size]; ++ memset(buf, 0, sizeof(buf)); ++ ++ auto [part_header_size, part_entry_overhead] = f1->get_part_layout_info(); ++ const auto entries_per_part = ((max_part_size - part_header_size) / ++ (max_entry_size + part_entry_overhead)); ++ const auto max_entries = entries_per_part * 4 + 1; ++ ++ std::unique_ptr f2; ++ r = RCf::FIFO::open(ioctx, fifo_id, &f2, null_yield); ++ ASSERT_EQ(0, r); ++ ++ /* push one entry to f2 and the rest to f1 */ ++ for (auto i = 0u; i < max_entries; ++i) { ++ cb::list bl; ++ *(int *)buf = i; ++ bl.append(buf, sizeof(buf)); ++ auto& f = (i < 1 ? f2 : f1); ++ auto c = R::Rados::aio_create_completion(); ++ f->push(bl, c); ++ c->wait_for_complete(); ++ r = c->get_return_value(); ++ c->release(); ++ ASSERT_EQ(0, r); ++ } ++ ++ /* trim half by fifo1 */ ++ auto num = max_entries / 2; ++ std::string marker; ++ std::vector result; ++ bool more = false; ++ auto c = R::Rados::aio_create_completion(); ++ f1->list(num, std::nullopt, &result, &more, c); ++ c->wait_for_complete(); ++ r = c->get_return_value(); ++ c->release(); ++ ASSERT_EQ(0, r); ++ ASSERT_EQ(true, more); ++ ASSERT_EQ(num, result.size()); ++ ++ for (auto i = 0u; i < num; ++i) { ++ auto& bl = result[i].data; ++ ASSERT_EQ(i, *(int *)bl.c_str()); ++ } ++ ++ auto& entry = result[num - 1]; ++ marker = entry.marker; ++ c = R::Rados::aio_create_completion(); ++ f1->trim(marker, false, c); ++ c->wait_for_complete(); ++ r = c->get_return_value(); ++ c->release(); ++ ASSERT_EQ(0, r); ++ /* list what's left by fifo2 */ ++ ++ const auto left = max_entries - num; ++ c = R::Rados::aio_create_completion(); ++ f2->list(left, marker, &result, &more, c); ++ c->wait_for_complete(); ++ r = c->get_return_value(); ++ c->release(); ++ ASSERT_EQ(0, r); ++ ASSERT_EQ(left, result.size()); ++ ASSERT_EQ(false, more); ++ ++ for (auto i = num; i < max_entries; ++i) { ++ auto& bl = result[i - num].data; ++ ASSERT_EQ(i, *(int *)bl.c_str()); ++ } ++} ++ ++TEST_F(AioLegacyFIFO, TestPushBatch) ++{ ++ static constexpr auto max_part_size = 2048ull; ++ static constexpr auto max_entry_size = 128ull; ++ ++ std::unique_ptr f; ++ auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield, std::nullopt, ++ std::nullopt, false, max_part_size, ++ max_entry_size); ++ ASSERT_EQ(0, r); ++ ++ char buf[max_entry_size]; ++ memset(buf, 0, sizeof(buf)); ++ auto [part_header_size, part_entry_overhead] = f->get_part_layout_info(); ++ auto entries_per_part = ((max_part_size - part_header_size) / ++ (max_entry_size + part_entry_overhead)); ++ auto max_entries = entries_per_part * 4 + 1; /* enough entries to span multiple parts */ ++ std::vector bufs; ++ for (auto i = 0u; i < max_entries; ++i) { ++ cb::list bl; ++ *(int *)buf = i; ++ bl.append(buf, sizeof(buf)); ++ bufs.push_back(bl); ++ } ++ ASSERT_EQ(max_entries, bufs.size()); ++ ++ auto c = R::Rados::aio_create_completion(); ++ f->push(bufs, c); ++ c->wait_for_complete(); ++ r = c->get_return_value(); ++ c->release(); ++ ASSERT_EQ(0, r); ++ ++ /* list all */ ++ ++ std::vector result; ++ bool more = false; ++ c = R::Rados::aio_create_completion(); ++ f->list(max_entries, std::nullopt, &result, &more, c); ++ c->wait_for_complete(); ++ r = c->get_return_value(); ++ c->release(); ++ ASSERT_EQ(0, r); ++ ASSERT_EQ(false, more); ++ ASSERT_EQ(max_entries, result.size()); ++ for (auto i = 0u; i < max_entries; ++i) { ++ auto& bl = result[i].data; ++ ASSERT_EQ(i, *(int *)bl.c_str()); ++ } ++ auto& info = f->meta(); ++ ASSERT_EQ(info.head_part_num, 4); ++} + +From aede44ac6667c9a1ec7e813b547f8765754d896f Mon Sep 17 00:00:00 2001 +From: "Adam C. Emerson" +Date: Sat, 21 Nov 2020 01:44:36 -0500 +Subject: [PATCH 03/26] rgw: Factor out tool to deal with different log backing + +Read through the shards of a log and find out what kind it is. + +Also remove a log. + +Signed-off-by: Adam C. Emerson +(cherry picked from commit ed15d03f068c6f6e959f04d9d8f99eac82ebbd29) +Signed-off-by: Adam C. Emerson +--- + src/cls/log/cls_log_types.h | 3 + + src/rgw/CMakeLists.txt | 1 + + src/rgw/rgw_log_backing.cc | 215 +++++++++++++++++++++++++++++++ + src/rgw/rgw_log_backing.h | 70 ++++++++++ + src/test/rgw/CMakeLists.txt | 5 + + src/test/rgw/test_log_backing.cc | 176 +++++++++++++++++++++++++ + 6 files changed, 470 insertions(+) + create mode 100644 src/rgw/rgw_log_backing.cc + create mode 100644 src/rgw/rgw_log_backing.h + create mode 100644 src/test/rgw/test_log_backing.cc + +diff --git a/src/cls/log/cls_log_types.h b/src/cls/log/cls_log_types.h +index c5c00766d8156..1746d243e5a14 100644 +--- a/src/cls/log/cls_log_types.h ++++ b/src/cls/log/cls_log_types.h +@@ -65,6 +65,9 @@ inline bool operator ==(const cls_log_header& lhs, const cls_log_header& rhs) { + return (lhs.max_marker == rhs.max_marker && + lhs.max_time == rhs.max_time); + } ++inline bool operator !=(const cls_log_header& lhs, const cls_log_header& rhs) { ++ return !(lhs == rhs); ++} + WRITE_CLASS_ENCODER(cls_log_header) + + +diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt +index 44de25895ea2d..d3d91d4957947 100644 +--- a/src/rgw/CMakeLists.txt ++++ b/src/rgw/CMakeLists.txt +@@ -141,6 +141,7 @@ set(librgw_common_srcs + rgw_tag.cc + rgw_tag_s3.cc + rgw_tools.cc ++ rgw_log_backing.cc + rgw_user.cc + rgw_website.cc + rgw_xml.cc +diff --git a/src/rgw/rgw_log_backing.cc b/src/rgw/rgw_log_backing.cc +new file mode 100644 +index 0000000000000..63edf972a0307 +--- /dev/null ++++ b/src/rgw/rgw_log_backing.cc +@@ -0,0 +1,215 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ft=cpp ++ ++#include "cls/log/cls_log_client.h" ++ ++#include "rgw_log_backing.h" ++#include "rgw_tools.h" ++#include "cls_fifo_legacy.h" ++ ++static constexpr auto dout_subsys = ceph_subsys_rgw; ++ ++enum class shard_check { dne, omap, fifo, corrupt }; ++inline std::ostream& operator <<(std::ostream& m, const shard_check& t) { ++ switch (t) { ++ case shard_check::dne: ++ return m << "shard_check::dne"; ++ case shard_check::omap: ++ return m << "shard_check::omap"; ++ case shard_check::fifo: ++ return m << "shard_check::fifo"; ++ case shard_check::corrupt: ++ return m << "shard_check::corrupt"; ++ } ++ ++ return m << "shard_check::UNKNOWN=" << static_cast(t); ++} ++ ++namespace { ++/// Return the shard type, and a bool to see whether it has entries. ++std::pair ++probe_shard(librados::IoCtx& ioctx, const std::string& oid, optional_yield y) ++{ ++ auto cct = static_cast(ioctx.cct()); ++ bool omap = false; ++ { ++ librados::ObjectReadOperation op; ++ cls_log_header header; ++ cls_log_info(op, &header); ++ auto r = rgw_rados_operate(ioctx, oid, &op, nullptr, y); ++ if (r == -ENOENT) { ++ return { shard_check::dne, {} }; ++ } ++ ++ if (r < 0) { ++ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " error probing for omap: r=" << r ++ << ", oid=" << oid << dendl; ++ return { shard_check::corrupt, {} }; ++ } ++ if (header != cls_log_header{}) ++ omap = true; ++ } ++ std::unique_ptr fifo; ++ auto r = rgw::cls::fifo::FIFO::open(ioctx, oid, ++ &fifo, y, ++ std::nullopt, true); ++ if (r < 0 && !(r == -ENOENT || r == -ENODATA)) { ++ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " error probing for fifo: r=" << r ++ << ", oid=" << oid << dendl; ++ return { shard_check::corrupt, {} }; ++ } ++ if (fifo && omap) { ++ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " fifo and omap found: oid=" << oid << dendl; ++ return { shard_check::corrupt, {} }; ++ } ++ if (fifo) { ++ bool more = false; ++ std::vector entries; ++ r = fifo->list(1, nullopt, &entries, &more, y); ++ if (r < 0) { ++ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << ": unable to list entries: r=" << r ++ << ", oid=" << oid << dendl; ++ return { shard_check::corrupt, {} }; ++ } ++ return { shard_check::fifo, !entries.empty() }; ++ } ++ if (omap) { ++ std::list entries; ++ std::string out_marker; ++ bool truncated = false; ++ librados::ObjectReadOperation op; ++ cls_log_list(op, {}, {}, {}, 1, entries, ++ &out_marker, &truncated); ++ auto r = rgw_rados_operate(ioctx, oid, &op, nullptr, y); ++ if (r < 0) { ++ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << ": failed to list: r=" << r << ", oid=" << oid << dendl; ++ return { shard_check::corrupt, {} }; ++ } ++ return { shard_check::omap, !entries.empty() }; ++ } ++ ++ // An object exists, but has never had FIFO or cls_log entries written ++ // to it. Likely just the marker Omap. ++ return { shard_check::dne, {} }; ++} ++ ++tl::expected ++handle_dne(librados::IoCtx& ioctx, ++ log_type def, ++ std::string oid, ++ optional_yield y) ++{ ++ auto cct = static_cast(ioctx.cct()); ++ if (def == log_type::fifo) { ++ std::unique_ptr fifo; ++ auto r = rgw::cls::fifo::FIFO::create(ioctx, oid, ++ &fifo, y, ++ std::nullopt); ++ if (r < 0) { ++ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " error creating FIFO: r=" << r ++ << ", oid=" << oid << dendl; ++ return tl::unexpected(bs::error_code(-r, bs::system_category())); ++ } ++ } ++ return def; ++} ++} ++ ++tl::expected ++log_backing_type(librados::IoCtx& ioctx, ++ log_type def, ++ int shards, ++ const fu2::unique_function& get_oid, ++ optional_yield y) ++{ ++ auto cct = static_cast(ioctx.cct()); ++ auto check = shard_check::dne; ++ for (int i = 0; i < shards; ++i) { ++ auto [c, e] = probe_shard(ioctx, get_oid(i), y); ++ if (c == shard_check::corrupt) ++ return tl::unexpected(bs::error_code(EIO, bs::system_category())); ++ if (c == shard_check::dne) continue; ++ if (check == shard_check::dne) { ++ check = c; ++ continue; ++ } ++ ++ if (check != c) { ++ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " clashing types: check=" << check ++ << ", c=" << c << dendl; ++ return tl::unexpected(bs::error_code(EIO, bs::system_category())); ++ } ++ } ++ if (check == shard_check::corrupt) { ++ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << " should be unreachable!" << dendl; ++ return tl::unexpected(bs::error_code(EIO, bs::system_category())); ++ } ++ ++ if (check == shard_check::dne) ++ return handle_dne(ioctx, ++ def, ++ get_oid(0), ++ y); ++ ++ return (check == shard_check::fifo ? log_type::fifo : log_type::omap); ++} ++ ++bs::error_code log_remove(librados::IoCtx& ioctx, ++ int shards, ++ const fu2::unique_function& get_oid, ++ optional_yield y) ++{ ++ bs::error_code ec; ++ auto cct = static_cast(ioctx.cct()); ++ for (int i = 0; i < shards; ++i) { ++ auto oid = get_oid(i); ++ rados::cls::fifo::info info; ++ uint32_t part_header_size = 0, part_entry_overhead = 0; ++ ++ auto r = rgw::cls::fifo::get_meta(ioctx, oid, nullopt, &info, ++ &part_header_size, &part_entry_overhead, ++ 0, y, true); ++ if (r == -ENOENT) continue; ++ if (r == 0 && info.head_part_num > -1) { ++ for (auto j = info.tail_part_num; j <= info.head_part_num; ++j) { ++ librados::ObjectWriteOperation op; ++ op.remove(); ++ auto part_oid = info.part_oid(j); ++ auto subr = rgw_rados_operate(ioctx, part_oid, &op, null_yield); ++ if (subr < 0 && subr != -ENOENT) { ++ if (!ec) ++ ec = bs::error_code(-subr, bs::system_category()); ++ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << ": failed removing FIFO part: part_oid=" << part_oid ++ << ", subr=" << subr << dendl; ++ } ++ } ++ } ++ if (r < 0 && r != -ENODATA) { ++ if (!ec) ++ ec = bs::error_code(-r, bs::system_category()); ++ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << ": failed checking FIFO part: oid=" << oid ++ << ", r=" << r << dendl; ++ } ++ librados::ObjectWriteOperation op; ++ op.remove(); ++ r = rgw_rados_operate(ioctx, oid, &op, null_yield); ++ if (r < 0 && r != -ENOENT) { ++ if (!ec) ++ ec = bs::error_code(-r, bs::system_category()); ++ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ ++ << ": failed removing shard: oid=" << oid ++ << ", r=" << r << dendl; ++ } ++ } ++ return ec; ++} +diff --git a/src/rgw/rgw_log_backing.h b/src/rgw/rgw_log_backing.h +new file mode 100644 +index 0000000000000..d769af48b01fe +--- /dev/null ++++ b/src/rgw/rgw_log_backing.h +@@ -0,0 +1,70 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ft=cpp ++ ++#ifndef CEPH_RGW_LOGBACKING_H ++#define CEPH_RGW_LOGBACKING_H ++ ++#include ++#include ++#include ++#include ++ ++#include ++ ++#include ++ ++#include "include/rados/librados.hpp" ++#include "include/expected.hpp" ++#include "include/function2.hpp" ++ ++#include "common/async/yield_context.h" ++ ++namespace bs = boost::system; ++ ++/// Type of log backing, stored in the mark used in the quick check, ++/// and passed to checking functions. ++enum class log_type { ++ omap = 0, ++ fifo = 1 ++}; ++ ++inline std::optional to_log_type(std::string_view s) { ++ if (strncasecmp(s.data(), "omap", s.length()) == 0) { ++ return log_type::omap; ++ } else if (strncasecmp(s.data(), "fifo", s.length()) == 0) { ++ return log_type::fifo; ++ } else { ++ return std::nullopt; ++ } ++} ++inline std::ostream& operator <<(std::ostream& m, const log_type& t) { ++ switch (t) { ++ case log_type::omap: ++ return m << "log_type::omap"; ++ case log_type::fifo: ++ return m << "log_type::fifo"; ++ } ++ ++ return m << "log_type::UNKNOWN=" << static_cast(t); ++} ++ ++/// Look over the shards in a log and determine the type. ++tl::expected ++log_backing_type(librados::IoCtx& ioctx, ++ log_type def, ++ int shards, //< Total number of shards ++ /// A function taking a shard number and ++ /// returning an oid. ++ const fu2::unique_function& get_oid, ++ optional_yield y); ++ ++/// Remove all log shards and associated parts of fifos. ++bs::error_code log_remove(librados::IoCtx& ioctx, ++ int shards, //< Total number of shards ++ /// A function taking a shard number and ++ /// returning an oid. ++ const fu2::unique_function& get_oid, ++ optional_yield y); ++ ++ ++#endif +diff --git a/src/test/rgw/CMakeLists.txt b/src/test/rgw/CMakeLists.txt +index 7817a42ef9ab8..c4aa22db81749 100644 +--- a/src/test/rgw/CMakeLists.txt ++++ b/src/test/rgw/CMakeLists.txt +@@ -213,6 +213,11 @@ add_executable(unittest_cls_fifo_legacy test_cls_fifo_legacy.cc) + target_link_libraries(unittest_cls_fifo_legacy radostest-cxx ${UNITTEST_LIBS} + ${rgw_libs}) + ++# unittest_log_backing ++add_executable(unittest_log_backing test_log_backing.cc) ++target_link_libraries(unittest_log_backing radostest-cxx ${UNITTEST_LIBS} ++ ${rgw_libs}) ++ + add_executable(unittest_rgw_lua test_rgw_lua.cc) + add_ceph_unittest(unittest_rgw_lua) + target_link_libraries(unittest_rgw_lua ${rgw_libs} ${LUA_LIBRARIES}) +diff --git a/src/test/rgw/test_log_backing.cc b/src/test/rgw/test_log_backing.cc +new file mode 100644 +index 0000000000000..5180d5fc74fe8 +--- /dev/null ++++ b/src/test/rgw/test_log_backing.cc +@@ -0,0 +1,176 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++/* ++ * Ceph - scalable distributed file system ++ * ++ * Copyright (C) 2019 Red Hat, Inc. ++ * ++ * This is free software; you can redistribute it and/or ++ * modify it under the terms of the GNU Lesser General Public ++ * License version 2.1, as published by the Free Software ++ * Foundation. See file COPYING. ++ * ++ */ ++ ++#include "rgw_log_backing.h" ++ ++#include ++#include ++#include ++ ++#undef FMT_HEADER_ONLY ++#define FMT_HEADER_ONLY 1 ++#include ++ ++#include "include/types.h" ++#include "include/rados/librados.hpp" ++ ++#include "test/librados/test_cxx.h" ++#include "global/global_context.h" ++ ++#include "cls/log/cls_log_client.h" ++ ++#include "rgw/rgw_tools.h" ++#include "rgw/cls_fifo_legacy.h" ++ ++#include "gtest/gtest.h" ++ ++namespace lr = librados; ++namespace cb = ceph::buffer; ++namespace fifo = rados::cls::fifo; ++namespace RCf = rgw::cls::fifo; ++ ++class LogBacking : public testing::Test { ++protected: ++ static constexpr int SHARDS = 3; ++ const std::string pool_name = get_temp_pool_name(); ++ lr::Rados rados; ++ lr::IoCtx ioctx; ++ ++ void SetUp() override { ++ ASSERT_EQ("", create_one_pool_pp(pool_name, rados)); ++ ASSERT_EQ(0, rados.ioctx_create(pool_name.c_str(), ioctx)); ++ } ++ void TearDown() override { ++ destroy_one_pool_pp(pool_name, rados); ++ } ++ ++ static std::string get_oid(int i) { ++ return fmt::format("shard.{}", i); ++ } ++ ++ void make_omap() { ++ for (int i = 0; i < SHARDS; ++i) { ++ using ceph::encode; ++ lr::ObjectWriteOperation op; ++ cb::list bl; ++ encode(i, bl); ++ cls_log_add(op, ceph_clock_now(), {}, "meow", bl); ++ auto r = rgw_rados_operate(ioctx, get_oid(i), &op, null_yield); ++ ASSERT_GE(r, 0); ++ } ++ } ++ ++ void add_omap(int i) { ++ using ceph::encode; ++ lr::ObjectWriteOperation op; ++ cb::list bl; ++ encode(i, bl); ++ cls_log_add(op, ceph_clock_now(), {}, "meow", bl); ++ auto r = rgw_rados_operate(ioctx, get_oid(i), &op, null_yield); ++ ASSERT_GE(r, 0); ++ } ++ ++ void empty_omap() { ++ for (int i = 0; i < SHARDS; ++i) { ++ auto oid = get_oid(i); ++ std::string to_marker; ++ { ++ lr::ObjectReadOperation op; ++ std::list entries; ++ bool truncated = false; ++ cls_log_list(op, {}, {}, {}, 1, entries, &to_marker, &truncated); ++ auto r = rgw_rados_operate(ioctx, oid, &op, nullptr, null_yield); ++ ASSERT_GE(r, 0); ++ ASSERT_FALSE(entries.empty()); ++ } ++ { ++ lr::ObjectWriteOperation op; ++ cls_log_trim(op, {}, {}, {}, to_marker); ++ auto r = rgw_rados_operate(ioctx, oid, &op, null_yield); ++ ASSERT_GE(r, 0); ++ } ++ { ++ lr::ObjectReadOperation op; ++ std::list entries; ++ bool truncated = false; ++ cls_log_list(op, {}, {}, {}, 1, entries, &to_marker, &truncated); ++ auto r = rgw_rados_operate(ioctx, oid, &op, nullptr, null_yield); ++ ASSERT_GE(r, 0); ++ ASSERT_TRUE(entries.empty()); ++ } ++ } ++ } ++ ++ void make_fifo() ++ { ++ for (int i = 0; i < SHARDS; ++i) { ++ std::unique_ptr fifo; ++ auto r = RCf::FIFO::create(ioctx, get_oid(i), &fifo, null_yield); ++ ASSERT_EQ(0, r); ++ ASSERT_TRUE(fifo); ++ } ++ } ++ ++ void add_fifo(int i) ++ { ++ using ceph::encode; ++ std::unique_ptr fifo; ++ auto r = RCf::FIFO::open(ioctx, get_oid(i), &fifo, null_yield); ++ ASSERT_GE(0, r); ++ ASSERT_TRUE(fifo); ++ cb::list bl; ++ encode(i, bl); ++ r = fifo->push(bl, null_yield); ++ ASSERT_GE(0, r); ++ } ++ ++ void assert_empty() { ++ std::vector result; ++ lr::ObjectCursor next; ++ auto r = ioctx.object_list(ioctx.object_list_begin(), ioctx.object_list_end(), ++ 100, {}, &result, &next); ++ ASSERT_GE(r, 0); ++ ASSERT_TRUE(result.empty()); ++ } ++}; ++ ++TEST_F(LogBacking, TestOmap) ++{ ++ make_omap(); ++ auto stat = log_backing_type(ioctx, log_type::fifo, SHARDS, ++ get_oid, null_yield); ++ ASSERT_EQ(log_type::omap, *stat); ++} ++ ++TEST_F(LogBacking, TestOmapEmpty) ++{ ++ auto stat = log_backing_type(ioctx, log_type::omap, SHARDS, ++ get_oid, null_yield); ++ ASSERT_EQ(log_type::omap, *stat); ++} ++ ++TEST_F(LogBacking, TestFIFO) ++{ ++ make_fifo(); ++ auto stat = log_backing_type(ioctx, log_type::fifo, SHARDS, ++ get_oid, null_yield); ++ ASSERT_EQ(log_type::fifo, *stat); ++} ++ ++TEST_F(LogBacking, TestFIFOEmpty) ++{ ++ auto stat = log_backing_type(ioctx, log_type::fifo, SHARDS, ++ get_oid, null_yield); ++ ASSERT_EQ(log_type::fifo, *stat); ++} + +From 8c81b6fa1b2a0f1d409afbd0126d18cfc97315c4 Mon Sep 17 00:00:00 2001 +From: "Adam C. Emerson" +Date: Sat, 21 Nov 2020 15:45:12 -0500 +Subject: [PATCH 04/26] rgw: Use refactored log backing tools + +Signed-off-by: Adam C. Emerson +(cherry picked from commit da6223d281e33e43fa74c50f4d0eedb5ac25ace4) +Signed-off-by: Adam C. Emerson +--- + src/common/options.cc | 16 ++-- + src/rgw/rgw_datalog.cc | 208 +++++------------------------------------ + src/rgw/rgw_datalog.h | 5 +- + 3 files changed, 31 insertions(+), 198 deletions(-) + +diff --git a/src/common/options.cc b/src/common/options.cc +index 75d6589c08296..8fdd62fb14ccb 100644 +--- a/src/common/options.cc ++++ b/src/common/options.cc +@@ -7407,17 +7407,15 @@ std::vector