summaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
authorDonald Sharp <donaldsharp72@gmail.com>2022-03-07 17:49:48 +0100
committerGitHub <noreply@github.com>2022-03-07 17:49:48 +0100
commitdb0a45d0d6469a94ef9bd53de7f84b7f0e9e2c9b (patch)
treead3f33ea679c4673af1265819c3dd83ecc3d3649 /lib
parentMerge pull request #10745 from mjstapp/fix_doc_star_again (diff)
parentlib: grpc: fix handling of "empty" yang type (diff)
downloadfrr-db0a45d0d6469a94ef9bd53de7f84b7f0e9e2c9b.tar.xz
frr-db0a45d0d6469a94ef9bd53de7f84b7f0e9e2c9b.zip
Merge pull request #10741 from LabNConsulting/chopps/critfixgrpc
critical fixes for grpc
Diffstat (limited to 'lib')
-rw-r--r--lib/northbound_grpc.cpp142
1 files changed, 92 insertions, 50 deletions
diff --git a/lib/northbound_grpc.cpp b/lib/northbound_grpc.cpp
index 34bb1e498..e2a629003 100644
--- a/lib/northbound_grpc.cpp
+++ b/lib/northbound_grpc.cpp
@@ -1,7 +1,7 @@
//
+// Copyright (c) 2021-2022, LabN Consulting, L.L.C
// Copyright (C) 2019 NetDEF, Inc.
// Renato Westphal
-// Copyright (c) 2021, LabN Consulting, L.L.C
//
// This program is free software; you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by the Free
@@ -50,6 +50,8 @@ static struct thread_master *main_master;
static struct frr_pthread *fpt;
+static bool grpc_running;
+
#define grpc_debug(...) \
do { \
if (nb_dbg_client_grpc) \
@@ -96,11 +98,11 @@ class Candidates
{
char errmsg[BUFSIZ] = {0};
- _cdb.erase(c->id);
nb_config_free(c->config);
if (c->transaction)
nb_candidate_commit_abort(c->transaction, errmsg,
sizeof(errmsg));
+ _cdb.erase(c->id);
}
struct candidate *get_candidate(uint32_t id)
@@ -116,9 +118,11 @@ class Candidates
class RpcStateBase
{
public:
+ virtual ~RpcStateBase() = default;
virtual CallState doCallback() = 0;
virtual void do_request(::frr::Northbound::AsyncService *service,
- ::grpc::ServerCompletionQueue *cq) = 0;
+ ::grpc::ServerCompletionQueue *cq,
+ bool no_copy) = 0;
};
/*
@@ -188,17 +192,22 @@ template <typename Q, typename S> class NewRpcState : RpcStateBase
}
void do_request(::frr::Northbound::AsyncService *service,
- ::grpc::ServerCompletionQueue *cq) override
+ ::grpc::ServerCompletionQueue *cq,
+ bool no_copy) override
{
grpc_debug("%s, posting a request for: %s", __func__, name);
if (requestf) {
NewRpcState<Q, S> *copy =
- new NewRpcState(cdb, requestf, callback, name);
+ no_copy ? this
+ : new NewRpcState(cdb, requestf,
+ callback, name);
(service->*requestf)(&copy->ctx, &copy->request,
&copy->responder, cq, cq, copy);
} else {
NewRpcState<Q, S> *copy =
- new NewRpcState(cdb, requestsf, callback, name);
+ no_copy ? this
+ : new NewRpcState(cdb, requestsf,
+ callback, name);
(service->*requestsf)(&copy->ctx, &copy->request,
&copy->async_responder, cq, cq,
copy);
@@ -227,7 +236,6 @@ template <typename Q, typename S> class NewRpcState : RpcStateBase
pthread_mutex_unlock(&_tag->cmux);
return;
}
- NewRpcState<Q, S> *orig;
const char *name;
grpc::ServerContext ctx;
@@ -238,12 +246,12 @@ template <typename Q, typename S> class NewRpcState : RpcStateBase
Candidates *cdb;
void (*callback)(NewRpcState<Q, S> *);
- reqfunc_t requestf;
- reqsfunc_t requestsf;
+ reqfunc_t requestf = NULL;
+ reqsfunc_t requestsf = NULL;
pthread_mutex_t cmux = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
- void *context;
+ void *context = 0;
CallState state = CREATE;
};
@@ -268,10 +276,10 @@ static LYD_FORMAT encoding2lyd_format(enum frr::Encoding encoding)
}
static int yang_dnode_edit(struct lyd_node *dnode, const std::string &path,
- const std::string &value)
+ const char *value)
{
- LY_ERR err = lyd_new_path(dnode, ly_native_ctx, path.c_str(),
- value.c_str(), LYD_NEW_PATH_UPDATE, &dnode);
+ LY_ERR err = lyd_new_path(dnode, ly_native_ctx, path.c_str(), value,
+ LYD_NEW_PATH_UPDATE, &dnode);
if (err != LY_SUCCESS) {
flog_warn(EC_LIB_LIBYANG, "%s: lyd_new_path() failed: %s",
__func__, ly_errmsg(ly_native_ctx));
@@ -698,8 +706,8 @@ void HandleUnaryEditCandidate(
auto pvs = tag->request.update();
for (const frr::PathValue &pv : pvs) {
- if (yang_dnode_edit(candidate_tmp->dnode, pv.path(), pv.value())
- != 0) {
+ if (yang_dnode_edit(candidate_tmp->dnode, pv.path(),
+ pv.value().c_str()) != 0) {
nb_config_free(candidate_tmp);
tag->responder.Finish(
@@ -1226,7 +1234,7 @@ void HandleUnaryExecute(
frr::NAME##Response>( \
(cdb), &frr::Northbound::AsyncService::Request##NAME, \
&HandleUnary##NAME, #NAME); \
- _rpcState->do_request(service, s_cq); \
+ _rpcState->do_request(&service, cq.get(), true); \
} while (0)
#define REQUEST_NEWRPC_STREAMING(NAME, cdb) \
@@ -1235,7 +1243,7 @@ void HandleUnaryExecute(
frr::NAME##Response>( \
(cdb), &frr::Northbound::AsyncService::Request##NAME, \
&HandleStreaming##NAME, #NAME); \
- _rpcState->do_request(service, s_cq); \
+ _rpcState->do_request(&service, cq.get(), true); \
} while (0)
struct grpc_pthread_attr {
@@ -1244,8 +1252,8 @@ struct grpc_pthread_attr {
};
// Capture these objects so we can try to shut down cleanly
-static std::unique_ptr<grpc::Server> s_server;
-static grpc::ServerCompletionQueue *s_cq;
+static pthread_mutex_t s_server_lock = PTHREAD_MUTEX_INITIALIZER;
+static grpc::Server *s_server;
static void *grpc_pthread_start(void *arg)
{
@@ -1255,18 +1263,22 @@ static void *grpc_pthread_start(void *arg)
Candidates candidates;
grpc::ServerBuilder builder;
std::stringstream server_address;
- frr::Northbound::AsyncService *service =
- new frr::Northbound::AsyncService();
+ frr::Northbound::AsyncService service;
frr_pthread_set_name(fpt);
server_address << "0.0.0.0:" << port;
builder.AddListeningPort(server_address.str(),
grpc::InsecureServerCredentials());
- builder.RegisterService(service);
- auto cq = builder.AddCompletionQueue();
- s_cq = cq.get();
- s_server = builder.BuildAndStart();
+ builder.RegisterService(&service);
+ builder.AddChannelArgument(
+ GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 5000);
+ std::unique_ptr<grpc::ServerCompletionQueue> cq =
+ builder.AddCompletionQueue();
+ std::unique_ptr<grpc::Server> server = builder.BuildAndStart();
+ s_server = server.get();
+
+ grpc_running = true;
/* Schedule all RPC handlers */
REQUEST_NEWRPC(GetCapabilities, NULL);
@@ -1287,20 +1299,25 @@ static void *grpc_pthread_start(void *arg)
server_address.str().c_str());
/* Process inbound RPCs */
- while (true) {
- void *tag;
- bool ok;
-
- s_cq->Next(&tag, &ok);
- if (!ok)
+ bool ok;
+ void *tag;
+ while (grpc_running) {
+ if (!cq->Next(&tag, &ok)) {
+ grpc_debug("%s: CQ empty exiting", __func__);
break;
+ }
- grpc_debug("%s: Got next from CompletionQueue, %p %d", __func__,
- tag, ok);
+ grpc_debug("%s: got next from CQ tag: %p ok: %d", __func__, tag,
+ ok);
+
+ if (!ok || !grpc_running) {
+ delete static_cast<RpcStateBase *>(tag);
+ break;
+ }
RpcStateBase *rpc = static_cast<RpcStateBase *>(tag);
CallState state = rpc->doCallback();
- grpc_debug("%s: Callback returned RPC State: %s", __func__,
+ grpc_debug("%s: callback returned RPC State: %s", __func__,
call_states[state]);
/*
@@ -1310,10 +1327,30 @@ static void *grpc_pthread_start(void *arg)
* to be called back once more in the FINISH state (from the
* user indicating Finish() for cleanup.
*/
- if (state == FINISH)
- rpc->do_request(service, s_cq);
+ if (state == FINISH && grpc_running)
+ rpc->do_request(&service, cq.get(), false);
+ }
+
+ /* This was probably done for us to get here, but let's be safe */
+ pthread_mutex_lock(&s_server_lock);
+ grpc_running = false;
+ if (s_server) {
+ grpc_debug("%s: shutdown server and CQ", __func__);
+ server->Shutdown();
+ s_server = NULL;
+ }
+ pthread_mutex_unlock(&s_server_lock);
+
+ grpc_debug("%s: shutting down CQ", __func__);
+ cq->Shutdown();
+
+ grpc_debug("%s: draining the CQ", __func__);
+ while (cq->Next(&tag, &ok)) {
+ grpc_debug("%s: drain tag %p", __func__, tag);
+ delete static_cast<RpcStateBase *>(tag);
}
+ zlog_info("%s: exiting from grpc pthread", __func__);
return NULL;
}
@@ -1325,6 +1362,8 @@ static int frr_grpc_init(uint port)
.stop = NULL,
};
+ grpc_debug("%s: entered", __func__);
+
fpt = frr_pthread_new(&attr, "frr-grpc", "frr-grpc");
fpt->data = reinterpret_cast<void *>((intptr_t)port);
@@ -1340,24 +1379,27 @@ static int frr_grpc_init(uint port)
static int frr_grpc_finish(void)
{
- // Shutdown the grpc server
- if (s_server) {
- s_server->Shutdown();
- s_cq->Shutdown();
-
- // And drain the queue
- void *ignore;
- bool ok;
+ grpc_debug("%s: entered", __func__);
- while (s_cq->Next(&ignore, &ok))
- ;
- }
+ if (!fpt)
+ return 0;
- if (fpt) {
- pthread_join(fpt->thread, NULL);
- frr_pthread_destroy(fpt);
+ /*
+ * Shut the server down here in main thread. This will cause the wait on
+ * the completion queue (cq.Next()) to exit and cleanup everything else.
+ */
+ pthread_mutex_lock(&s_server_lock);
+ grpc_running = false;
+ if (s_server) {
+ grpc_debug("%s: shutdown server", __func__);
+ s_server->Shutdown();
+ s_server = NULL;
}
+ pthread_mutex_unlock(&s_server_lock);
+ grpc_debug("%s: joining and destroy grpc thread", __func__);
+ pthread_join(fpt->thread, NULL);
+ frr_pthread_destroy(fpt);
return 0;
}