From 673e4407700b90c5ecbbcec5e06fec07fc1149b1 Mon Sep 17 00:00:00 2001 From: Rafael Zalamena Date: Mon, 21 Feb 2022 06:28:11 -0500 Subject: [PATCH 1/5] lib: tweak northbound gRPC default timeout Don't let open sockets hang for too long. This will fix an issue where a improperly coded client (e.g. socat) could exaust the amount of open file descriptors. Documentation: https://grpc.github.io/grpc/cpp/md_doc_keepalive.html Signed-off-by: Rafael Zalamena --- lib/northbound_grpc.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/northbound_grpc.cpp b/lib/northbound_grpc.cpp index e227d0385c4..ecab732e928 100644 --- a/lib/northbound_grpc.cpp +++ b/lib/northbound_grpc.cpp @@ -1264,6 +1264,8 @@ static void *grpc_pthread_start(void *arg) builder.AddListeningPort(server_address.str(), grpc::InsecureServerCredentials()); builder.RegisterService(service); + builder.AddChannelArgument( + GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 5000); auto cq = builder.AddCompletionQueue(); s_cq = cq.get(); s_server = builder.BuildAndStart(); From 96d434f8530b9b096c661a7ac30fffe47f57c774 Mon Sep 17 00:00:00 2001 From: Christian Hopps Date: Sun, 6 Mar 2022 06:58:22 -0500 Subject: [PATCH 2/5] lib: grpc: do not remove candidate entry too soon Fix from Rafael Zalamena Signed-off-by: Christian Hopps --- lib/northbound_grpc.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/northbound_grpc.cpp b/lib/northbound_grpc.cpp index ecab732e928..69669a288e1 100644 --- a/lib/northbound_grpc.cpp +++ b/lib/northbound_grpc.cpp @@ -96,11 +96,11 @@ class Candidates { char errmsg[BUFSIZ] = {0}; - _cdb.erase(c->id); nb_config_free(c->config); if (c->transaction) nb_candidate_commit_abort(c->transaction, errmsg, sizeof(errmsg)); + _cdb.erase(c->id); } struct candidate *get_candidate(uint32_t id) From c85ecd64050faccd9796ff9e8b935c3b0868eb7b Mon Sep 17 00:00:00 2001 From: Christian Hopps Date: Sat, 26 Feb 2022 07:55:32 -0500 Subject: [PATCH 3/5] lib: grpc: initialize uninitialized member variables fixes #9732, fixes #10578 Signed-off-by: Christian Hopps --- lib/northbound_grpc.cpp | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/lib/northbound_grpc.cpp b/lib/northbound_grpc.cpp index 69669a288e1..2d9b61483bf 100644 --- a/lib/northbound_grpc.cpp +++ b/lib/northbound_grpc.cpp @@ -1,7 +1,7 @@ // +// Copyright (c) 2021-2022, LabN Consulting, L.L.C // Copyright (C) 2019 NetDEF, Inc. // Renato Westphal -// Copyright (c) 2021, LabN Consulting, L.L.C // // This program is free software; you can redistribute it and/or modify it // under the terms of the GNU General Public License as published by the Free @@ -227,7 +227,6 @@ template class NewRpcState : RpcStateBase pthread_mutex_unlock(&_tag->cmux); return 0; } - NewRpcState *orig; const char *name; grpc::ServerContext ctx; @@ -238,12 +237,12 @@ template class NewRpcState : RpcStateBase Candidates *cdb; void (*callback)(NewRpcState *); - reqfunc_t requestf; - reqsfunc_t requestsf; + reqfunc_t requestf = NULL; + reqsfunc_t requestsf = NULL; pthread_mutex_t cmux = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t cond = PTHREAD_COND_INITIALIZER; - void *context; + void *context = 0; CallState state = CREATE; }; From 83f6fce7d2fece4617bd85e3da0894fc820dab8d Mon Sep 17 00:00:00 2001 From: Christian Hopps Date: Sat, 5 Mar 2022 11:04:43 -0500 Subject: [PATCH 4/5] lib: grpc: fix shutdown code fixes #9732 Signed-off-by: Christian Hopps --- lib/northbound_grpc.cpp | 119 +++++++++++++++++++++++++++------------- 1 file changed, 80 insertions(+), 39 deletions(-) diff --git a/lib/northbound_grpc.cpp b/lib/northbound_grpc.cpp index 2d9b61483bf..ca253031f45 100644 --- a/lib/northbound_grpc.cpp +++ b/lib/northbound_grpc.cpp @@ -50,6 +50,8 @@ static struct thread_master *main_master; static struct frr_pthread *fpt; +static bool grpc_running; + #define grpc_debug(...) \ do { \ if (nb_dbg_client_grpc) \ @@ -116,9 +118,11 @@ class Candidates class RpcStateBase { public: + virtual ~RpcStateBase() = default; virtual CallState doCallback() = 0; virtual void do_request(::frr::Northbound::AsyncService *service, - ::grpc::ServerCompletionQueue *cq) = 0; + ::grpc::ServerCompletionQueue *cq, + bool no_copy) = 0; }; /* @@ -188,17 +192,22 @@ template class NewRpcState : RpcStateBase } void do_request(::frr::Northbound::AsyncService *service, - ::grpc::ServerCompletionQueue *cq) override + ::grpc::ServerCompletionQueue *cq, + bool no_copy) override { grpc_debug("%s, posting a request for: %s", __func__, name); if (requestf) { NewRpcState *copy = - new NewRpcState(cdb, requestf, callback, name); + no_copy ? this + : new NewRpcState(cdb, requestf, + callback, name); (service->*requestf)(©->ctx, ©->request, ©->responder, cq, cq, copy); } else { NewRpcState *copy = - new NewRpcState(cdb, requestsf, callback, name); + no_copy ? this + : new NewRpcState(cdb, requestsf, + callback, name); (service->*requestsf)(©->ctx, ©->request, ©->async_responder, cq, cq, copy); @@ -1225,7 +1234,7 @@ void HandleUnaryExecute( frr::NAME##Response>( \ (cdb), &frr::Northbound::AsyncService::Request##NAME, \ &HandleUnary##NAME, #NAME); \ - _rpcState->do_request(service, s_cq); \ + _rpcState->do_request(&service, cq.get(), true); \ } while (0) #define REQUEST_NEWRPC_STREAMING(NAME, cdb) \ @@ -1234,7 +1243,7 @@ void HandleUnaryExecute( frr::NAME##Response>( \ (cdb), &frr::Northbound::AsyncService::Request##NAME, \ &HandleStreaming##NAME, #NAME); \ - _rpcState->do_request(service, s_cq); \ + _rpcState->do_request(&service, cq.get(), true); \ } while (0) struct grpc_pthread_attr { @@ -1243,8 +1252,8 @@ struct grpc_pthread_attr { }; // Capture these objects so we can try to shut down cleanly -static std::unique_ptr s_server; -static grpc::ServerCompletionQueue *s_cq; +static pthread_mutex_t s_server_lock = PTHREAD_MUTEX_INITIALIZER; +static grpc::Server *s_server; static void *grpc_pthread_start(void *arg) { @@ -1254,20 +1263,22 @@ static void *grpc_pthread_start(void *arg) Candidates candidates; grpc::ServerBuilder builder; std::stringstream server_address; - frr::Northbound::AsyncService *service = - new frr::Northbound::AsyncService(); + frr::Northbound::AsyncService service; frr_pthread_set_name(fpt); server_address << "0.0.0.0:" << port; builder.AddListeningPort(server_address.str(), grpc::InsecureServerCredentials()); - builder.RegisterService(service); + builder.RegisterService(&service); builder.AddChannelArgument( GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 5000); - auto cq = builder.AddCompletionQueue(); - s_cq = cq.get(); - s_server = builder.BuildAndStart(); + std::unique_ptr cq = + builder.AddCompletionQueue(); + std::unique_ptr server = builder.BuildAndStart(); + s_server = server.get(); + + grpc_running = true; /* Schedule all RPC handlers */ REQUEST_NEWRPC(GetCapabilities, NULL); @@ -1288,20 +1299,25 @@ static void *grpc_pthread_start(void *arg) server_address.str().c_str()); /* Process inbound RPCs */ - while (true) { - void *tag; - bool ok; - - s_cq->Next(&tag, &ok); - if (!ok) + bool ok; + void *tag; + while (grpc_running) { + if (!cq->Next(&tag, &ok)) { + grpc_debug("%s: CQ empty exiting", __func__); break; + } - grpc_debug("%s: Got next from CompletionQueue, %p %d", __func__, - tag, ok); + grpc_debug("%s: got next from CQ tag: %p ok: %d", __func__, tag, + ok); + + if (!ok || !grpc_running) { + delete static_cast(tag); + break; + } RpcStateBase *rpc = static_cast(tag); CallState state = rpc->doCallback(); - grpc_debug("%s: Callback returned RPC State: %s", __func__, + grpc_debug("%s: callback returned RPC State: %s", __func__, call_states[state]); /* @@ -1311,10 +1327,30 @@ static void *grpc_pthread_start(void *arg) * to be called back once more in the FINISH state (from the * user indicating Finish() for cleanup. */ - if (state == FINISH) - rpc->do_request(service, s_cq); + if (state == FINISH && grpc_running) + rpc->do_request(&service, cq.get(), false); } + /* This was probably done for us to get here, but let's be safe */ + pthread_mutex_lock(&s_server_lock); + grpc_running = false; + if (s_server) { + grpc_debug("%s: shutdown server and CQ", __func__); + server->Shutdown(); + s_server = NULL; + } + pthread_mutex_unlock(&s_server_lock); + + grpc_debug("%s: shutting down CQ", __func__); + cq->Shutdown(); + + grpc_debug("%s: draining the CQ", __func__); + while (cq->Next(&tag, &ok)) { + grpc_debug("%s: drain tag %p", __func__, tag); + delete static_cast(tag); + } + + zlog_info("%s: exiting from grpc pthread", __func__); return NULL; } @@ -1326,6 +1362,8 @@ static int frr_grpc_init(uint port) .stop = NULL, }; + grpc_debug("%s: entered", __func__); + fpt = frr_pthread_new(&attr, "frr-grpc", "frr-grpc"); fpt->data = reinterpret_cast((intptr_t)port); @@ -1341,24 +1379,27 @@ static int frr_grpc_init(uint port) static int frr_grpc_finish(void) { - // Shutdown the grpc server - if (s_server) { - s_server->Shutdown(); - s_cq->Shutdown(); + grpc_debug("%s: entered", __func__); - // And drain the queue - void *ignore; - bool ok; - - while (s_cq->Next(&ignore, &ok)) - ; - } + if (!fpt) + return 0; - if (fpt) { - pthread_join(fpt->thread, NULL); - frr_pthread_destroy(fpt); + /* + * Shut the server down here in main thread. This will cause the wait on + * the completion queue (cq.Next()) to exit and cleanup everything else. + */ + pthread_mutex_lock(&s_server_lock); + grpc_running = false; + if (s_server) { + grpc_debug("%s: shutdown server", __func__); + s_server->Shutdown(); + s_server = NULL; } + pthread_mutex_unlock(&s_server_lock); + grpc_debug("%s: joining and destroy grpc thread", __func__); + pthread_join(fpt->thread, NULL); + frr_pthread_destroy(fpt); return 0; } From fe095adc24a91eef3265ac803bb805cd297b3522 Mon Sep 17 00:00:00 2001 From: Christian Hopps Date: Sun, 6 Mar 2022 11:58:26 -0500 Subject: [PATCH 5/5] lib: grpc: fix handling of "empty" yang type - rather than coerce `const char *` to std:string&, just pass the C ptr, as that's what is used anyway. fixes #10578 Signed-off-by: Christian Hopps --- lib/northbound_grpc.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/northbound_grpc.cpp b/lib/northbound_grpc.cpp index ca253031f45..f5c2a91a500 100644 --- a/lib/northbound_grpc.cpp +++ b/lib/northbound_grpc.cpp @@ -276,10 +276,10 @@ static LYD_FORMAT encoding2lyd_format(enum frr::Encoding encoding) } static int yang_dnode_edit(struct lyd_node *dnode, const std::string &path, - const std::string &value) + const char *value) { - LY_ERR err = lyd_new_path(dnode, ly_native_ctx, path.c_str(), - value.c_str(), LYD_NEW_PATH_UPDATE, &dnode); + LY_ERR err = lyd_new_path(dnode, ly_native_ctx, path.c_str(), value, + LYD_NEW_PATH_UPDATE, &dnode); if (err != LY_SUCCESS) { flog_warn(EC_LIB_LIBYANG, "%s: lyd_new_path() failed: %s", __func__, ly_errmsg(ly_native_ctx)); @@ -706,8 +706,8 @@ void HandleUnaryEditCandidate( auto pvs = tag->request.update(); for (const frr::PathValue &pv : pvs) { - if (yang_dnode_edit(candidate_tmp->dnode, pv.path(), pv.value()) - != 0) { + if (yang_dnode_edit(candidate_tmp->dnode, pv.path(), + pv.value().c_str()) != 0) { nb_config_free(candidate_tmp); tag->responder.Finish(