summaryrefslogtreecommitdiffstats
path: root/lib/northbound_grpc.cpp
diff options
context:
space:
mode:
authorChristian Hopps <chopps@gmail.com>2021-05-20 08:50:34 +0200
committerChristian Hopps <chopps@gmail.com>2021-06-01 17:27:54 +0200
commitb680134e1122cdbb67f5c6ed158ee712255ee3cc (patch)
treef6079a4f6b7a7239c990ef0b6f14ccaa606f0c18 /lib/northbound_grpc.cpp
parentMerge pull request #8769 from ton31337/fix/time_to_remove (diff)
downloadfrr-b680134e1122cdbb67f5c6ed158ee712255ee3cc.tar.xz
frr-b680134e1122cdbb67f5c6ed158ee712255ee3cc.zip
lib: fix threading bug in GRPC code
The code that actually calls FRR northbound functions needs to be running in the master thread. The previous code was running on a GRPC pthread. While fixing moved to more functional vs OOP to make this easier to see. Also fix ly merge to merge siblings not throw the originals away. Signed-off-by: Christian Hopps <chopps@labn.net>
Diffstat (limited to 'lib/northbound_grpc.cpp')
-rw-r--r--lib/northbound_grpc.cpp2313
1 files changed, 1138 insertions, 1175 deletions
diff --git a/lib/northbound_grpc.cpp b/lib/northbound_grpc.cpp
index c61effdda..807d1252c 100644
--- a/lib/northbound_grpc.cpp
+++ b/lib/northbound_grpc.cpp
@@ -1,6 +1,7 @@
//
// Copyright (C) 2019 NetDEF, Inc.
// Renato Westphal
+// Copyright (c) 2021, LabN Consulting, L.L.C
//
// This program is free software; you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by the Free
@@ -24,6 +25,7 @@
#include "log.h"
#include "libfrr.h"
#include "lib/version.h"
+#include "lib/thread.h"
#include "command.h"
#include "lib_errors.h"
#include "northbound.h"
@@ -37,1315 +39,1286 @@
#define GRPC_DEFAULT_PORT 50051
-static void *grpc_pthread_start(void *arg);
-
/*
* NOTE: we can't use the FRR debugging infrastructure here since it uses
* atomics and C++ has a different atomics API. Enable gRPC debugging
* unconditionally until we figure out a way to solve this problem.
*/
-static bool nb_dbg_client_grpc = 1;
+static bool nb_dbg_client_grpc = 0;
+
+static struct thread_master *main_master;
static struct frr_pthread *fpt;
-/* Default frr_pthread attributes */
-static const struct frr_pthread_attr attr = {
- .start = grpc_pthread_start,
- .stop = NULL,
+#define grpc_debug(...) \
+ do { \
+ if (nb_dbg_client_grpc) \
+ zlog_debug(__VA_ARGS__); \
+ } while (0)
+
+// ------------------------------------------------------
+// New Types
+// ------------------------------------------------------
+
+enum CallState { CREATE, PROCESS, MORE, FINISH, DELETED };
+const char *call_states[] = {"CREATE", "PROCESS", "MORE", "FINISH", "DELETED"};
+
+struct candidate {
+ uint64_t id;
+ struct nb_config *config;
+ struct nb_transaction *transaction;
};
-enum CallStatus { CREATE, PROCESS, FINISH };
+class Candidates
+{
+ public:
+ ~Candidates(void)
+ {
+ // Delete candidates.
+ for (auto it = _cdb.begin(); it != _cdb.end(); it++)
+ delete_candidate(&it->second);
+ }
+
+ struct candidate *create_candidate(void)
+ {
+ uint64_t id = ++_next_id;
+ assert(id); // TODO: implement an algorithm for unique reusable
+ // IDs.
+ struct candidate *c = &_cdb[id];
+ c->id = id;
+ c->config = nb_config_dup(running_config);
+ c->transaction = NULL;
+
+ return c;
+ }
+
+ void delete_candidate(struct candidate *c)
+ {
+ char errmsg[BUFSIZ] = {0};
+
+ _cdb.erase(c->id);
+ nb_config_free(c->config);
+ if (c->transaction)
+ nb_candidate_commit_abort(c->transaction, errmsg,
+ sizeof(errmsg));
+ }
+
+ struct candidate *get_candidate(uint32_t id)
+ {
+ return _cdb.count(id) == 0 ? NULL : &_cdb[id];
+ }
+
+ private:
+ uint64_t _next_id = 0;
+ std::map<uint32_t, struct candidate> _cdb;
+};
-/* Thanks gooble */
class RpcStateBase
{
public:
- virtual void doCallback() = 0;
+ virtual CallState doCallback() = 0;
+ virtual void do_request(::frr::Northbound::AsyncService *service,
+ ::grpc::ServerCompletionQueue *cq) = 0;
};
-class NorthboundImpl;
-
-template <typename Q, typename S> class RpcState : RpcStateBase
+/*
+ * The RPC state class is used to track the execution of an RPC.
+ */
+template <typename Q, typename S> class NewRpcState : RpcStateBase
{
+ typedef void (frr::Northbound::AsyncService::*reqfunc_t)(
+ ::grpc::ServerContext *, Q *,
+ ::grpc::ServerAsyncResponseWriter<S> *,
+ ::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *,
+ void *);
+ typedef void (frr::Northbound::AsyncService::*reqsfunc_t)(
+ ::grpc::ServerContext *, Q *, ::grpc::ServerAsyncWriter<S> *,
+ ::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *,
+ void *);
+
public:
- RpcState(NorthboundImpl *svc,
- void (NorthboundImpl::*cb)(RpcState<Q, S> *))
- : callback(cb), responder(&ctx), async_responder(&ctx),
- service(svc){};
+ NewRpcState(Candidates *cdb, reqfunc_t rfunc,
+ void (*cb)(NewRpcState<Q, S> *), const char *name)
+ : requestf(rfunc), callback(cb), responder(&ctx),
+ async_responder(&ctx), name(name), cdb(cdb){};
+ NewRpcState(Candidates *cdb, reqsfunc_t rfunc,
+ void (*cb)(NewRpcState<Q, S> *), const char *name)
+ : requestsf(rfunc), callback(cb), responder(&ctx),
+ async_responder(&ctx), name(name), cdb(cdb){};
+
+ CallState doCallback() override
+ {
+ CallState enter_state = this->state;
+ CallState new_state;
+ if (enter_state == FINISH) {
+ grpc_debug("%s RPC FINISH -> DELETED", name);
+ new_state = FINISH;
+ } else {
+ grpc_debug("%s RPC: %s -> PROCESS", name,
+ call_states[this->state]);
+ new_state = PROCESS;
+ }
+ /*
+ * We are either in state CREATE, MORE or FINISH. If CREATE or
+ * MORE move back to PROCESS, otherwise we are cleaning up
+ * (FINISH) so leave it in that state. Run the callback on the
+ * main threadmaster/pthread; and wait for expected transition
+ * from main thread. If transition is to FINISH->DELETED.
+ * delete us.
+ *
+ * We update the state prior to scheduling the callback which
+ * may then update the state in the master pthread. Then we
+ * obtain the lock in the condvar-check-loop as the callback
+ * will be modifying updating the state value.
+ */
+ this->state = new_state;
+ thread_add_event(main_master, c_callback, (void *)this, 0,
+ NULL);
+ pthread_mutex_lock(&this->cmux);
+ while (this->state == new_state)
+ pthread_cond_wait(&this->cond, &this->cmux);
+ pthread_mutex_unlock(&this->cmux);
+
+ if (this->state == DELETED) {
+ grpc_debug("%s RPC: -> [DELETED]", name);
+ delete this;
+ return DELETED;
+ }
+ return this->state;
+ }
- void doCallback() override
+ void do_request(::frr::Northbound::AsyncService *service,
+ ::grpc::ServerCompletionQueue *cq) override
{
- (service->*callback)(this);
+ grpc_debug("%s, posting a request for: %s", __func__, name);
+ if (requestf) {
+ NewRpcState<Q, S> *copy =
+ new NewRpcState(cdb, requestf, callback, name);
+ (service->*requestf)(&copy->ctx, &copy->request,
+ &copy->responder, cq, cq, copy);
+ } else {
+ NewRpcState<Q, S> *copy =
+ new NewRpcState(cdb, requestsf, callback, name);
+ (service->*requestsf)(&copy->ctx, &copy->request,
+ &copy->async_responder, cq, cq,
+ copy);
+ }
+ }
+
+
+ static int c_callback(struct thread *thread)
+ {
+ auto _tag = static_cast<NewRpcState<Q, S> *>(thread->arg);
+ /*
+ * We hold the lock until the callback finishes and has updated
+ * _tag->state, then we signal done and release.
+ */
+ pthread_mutex_lock(&_tag->cmux);
+
+ CallState enter_state = _tag->state;
+ grpc_debug("%s RPC running on main thread", _tag->name);
+
+ _tag->callback(_tag);
+
+ grpc_debug("%s RPC: %s -> %s", _tag->name,
+ call_states[enter_state], call_states[_tag->state]);
+
+ pthread_cond_signal(&_tag->cond);
+ pthread_mutex_unlock(&_tag->cmux);
+ return 0;
}
+ NewRpcState<Q, S> *orig;
+ const char *name;
grpc::ServerContext ctx;
Q request;
S response;
grpc::ServerAsyncResponseWriter<S> responder;
grpc::ServerAsyncWriter<S> async_responder;
- NorthboundImpl *service;
- void (NorthboundImpl::*callback)(RpcState<Q, S> *);
+ Candidates *cdb;
+ void (*callback)(NewRpcState<Q, S> *);
+ reqfunc_t requestf;
+ reqsfunc_t requestsf;
+ pthread_mutex_t cmux = PTHREAD_MUTEX_INITIALIZER;
+ pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
void *context;
- CallStatus state = CREATE;
+
+ CallState state = CREATE;
};
-#define REQUEST_RPC(NAME) \
- do { \
- auto _rpcState = \
- new RpcState<frr::NAME##Request, frr::NAME##Response>( \
- this, &NorthboundImpl::Handle##NAME); \
- _service->Request##NAME(&_rpcState->ctx, &_rpcState->request, \
- &_rpcState->responder, _cq, _cq, \
- _rpcState); \
- } while (0)
+// ------------------------------------------------------
+// Utility Functions
+// ------------------------------------------------------
-#define REQUEST_RPC_STREAMING(NAME) \
- do { \
- auto _rpcState = \
- new RpcState<frr::NAME##Request, frr::NAME##Response>( \
- this, &NorthboundImpl::Handle##NAME); \
- _service->Request##NAME(&_rpcState->ctx, &_rpcState->request, \
- &_rpcState->async_responder, _cq, _cq, \
- _rpcState); \
- } while (0)
+static LYD_FORMAT encoding2lyd_format(enum frr::Encoding encoding)
+{
+ switch (encoding) {
+ case frr::JSON:
+ return LYD_JSON;
+ case frr::XML:
+ return LYD_XML;
+ default:
+ flog_err(EC_LIB_DEVELOPMENT,
+ "%s: unknown data encoding format (%u)", __func__,
+ encoding);
+ exit(1);
+ }
+}
-class NorthboundImpl
+static int yang_dnode_edit(struct lyd_node *dnode, const std::string &path,
+ const std::string &value)
{
- public:
- NorthboundImpl(void)
- {
- _nextCandidateId = 0;
- _service = new frr::Northbound::AsyncService();
+ LY_ERR err = lyd_new_path(dnode, ly_native_ctx, path.c_str(),
+ value.c_str(), LYD_NEW_PATH_UPDATE, &dnode);
+ if (err != LY_SUCCESS) {
+ flog_warn(EC_LIB_LIBYANG, "%s: lyd_new_path() failed: %s",
+ __func__, ly_errmsg(ly_native_ctx));
+ return -1;
}
- ~NorthboundImpl(void)
- {
- // Delete candidates.
- for (auto it = _candidates.begin(); it != _candidates.end();
- it++)
- delete_candidate(&it->second);
+ return 0;
+}
+
+static int yang_dnode_delete(struct lyd_node *dnode, const std::string &path)
+{
+ dnode = yang_dnode_get(dnode, path.c_str());
+ if (!dnode)
+ return -1;
+
+ lyd_free_tree(dnode);
+
+ return 0;
+}
+
+static LY_ERR data_tree_from_dnode(frr::DataTree *dt,
+ const struct lyd_node *dnode,
+ LYD_FORMAT lyd_format, bool with_defaults)
+{
+ char *strp;
+ int options = 0;
+
+ SET_FLAG(options, LYD_PRINT_WITHSIBLINGS);
+ if (with_defaults)
+ SET_FLAG(options, LYD_PRINT_WD_ALL);
+ else
+ SET_FLAG(options, LYD_PRINT_WD_TRIM);
+
+ LY_ERR err = lyd_print_mem(&strp, dnode, lyd_format, options);
+ if (err == LY_SUCCESS) {
+ if (strp) {
+ dt->set_data(strp);
+ free(strp);
+ }
}
+ return err;
+}
- void Run(unsigned long port)
- {
- grpc::ServerBuilder builder;
- std::stringstream server_address;
-
- server_address << "0.0.0.0:" << port;
-
- builder.AddListeningPort(server_address.str(),
- grpc::InsecureServerCredentials());
- builder.RegisterService(_service);
-
- auto cq = builder.AddCompletionQueue();
- _cq = cq.get();
- auto _server = builder.BuildAndStart();
-
- /* Schedule all RPC handlers */
- REQUEST_RPC(GetCapabilities);
- REQUEST_RPC(CreateCandidate);
- REQUEST_RPC(DeleteCandidate);
- REQUEST_RPC(UpdateCandidate);
- REQUEST_RPC(EditCandidate);
- REQUEST_RPC(LoadToCandidate);
- REQUEST_RPC(Commit);
- REQUEST_RPC(GetTransaction);
- REQUEST_RPC(LockConfig);
- REQUEST_RPC(UnlockConfig);
- REQUEST_RPC(Execute);
- REQUEST_RPC_STREAMING(Get);
- REQUEST_RPC_STREAMING(ListTransactions);
-
- zlog_notice("gRPC server listening on %s",
- server_address.str().c_str());
-
- /* Process inbound RPCs */
- void *tag;
- bool ok;
- while (true) {
- _cq->Next(&tag, &ok);
- GPR_ASSERT(ok);
- static_cast<RpcStateBase *>(tag)->doCallback();
- tag = nullptr;
+static struct lyd_node *dnode_from_data_tree(const frr::DataTree *dt,
+ bool config_only)
+{
+ struct lyd_node *dnode;
+ int options, opt2;
+ LY_ERR err;
+
+ if (config_only) {
+ options = LYD_PARSE_NO_STATE;
+ opt2 = LYD_VALIDATE_NO_STATE;
+ } else {
+ options = LYD_PARSE_STRICT;
+ opt2 = 0;
+ }
+
+ err = lyd_parse_data_mem(ly_native_ctx, dt->data().c_str(),
+ encoding2lyd_format(dt->encoding()), options,
+ opt2, &dnode);
+ if (err != LY_SUCCESS) {
+ flog_warn(EC_LIB_LIBYANG, "%s: lyd_parse_mem() failed: %s",
+ __func__, ly_errmsg(ly_native_ctx));
+ }
+ return dnode;
+}
+
+static struct lyd_node *get_dnode_config(const std::string &path)
+{
+ struct lyd_node *dnode;
+
+ dnode = yang_dnode_get(running_config->dnode,
+ path.empty() ? NULL : path.c_str());
+ if (dnode)
+ dnode = yang_dnode_dup(dnode);
+
+ return dnode;
+}
+
+static int get_oper_data_cb(const struct lysc_node *snode,
+ struct yang_translator *translator,
+ struct yang_data *data, void *arg)
+{
+ struct lyd_node *dnode = static_cast<struct lyd_node *>(arg);
+ int ret = yang_dnode_edit(dnode, data->xpath, data->value);
+ yang_data_free(data);
+
+ return (ret == 0) ? NB_OK : NB_ERR;
+}
+
+static struct lyd_node *get_dnode_state(const std::string &path)
+{
+ struct lyd_node *dnode = yang_dnode_new(ly_native_ctx, false);
+ if (nb_oper_data_iterate(path.c_str(), NULL, 0, get_oper_data_cb, dnode)
+ != NB_OK) {
+ yang_dnode_free(dnode);
+ return NULL;
+ }
+
+ return dnode;
+}
+
+static grpc::Status get_path(frr::DataTree *dt, const std::string &path,
+ int type, LYD_FORMAT lyd_format,
+ bool with_defaults)
+{
+ struct lyd_node *dnode_config = NULL;
+ struct lyd_node *dnode_state = NULL;
+ struct lyd_node *dnode_final;
+
+ // Configuration data.
+ if (type == frr::GetRequest_DataType_ALL
+ || type == frr::GetRequest_DataType_CONFIG) {
+ dnode_config = get_dnode_config(path);
+ if (!dnode_config)
+ return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
+ "Data path not found");
+ }
+
+ // Operational data.
+ if (type == frr::GetRequest_DataType_ALL
+ || type == frr::GetRequest_DataType_STATE) {
+ dnode_state = get_dnode_state(path);
+ if (!dnode_state) {
+ if (dnode_config)
+ yang_dnode_free(dnode_config);
+ return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
+ "Failed to fetch operational data");
+ }
+ }
+
+ switch (type) {
+ case frr::GetRequest_DataType_ALL:
+ //
+ // Combine configuration and state data into a single
+ // dnode.
+ //
+ if (lyd_merge_siblings(&dnode_state, dnode_config,
+ LYD_MERGE_DESTRUCT)
+ != LY_SUCCESS) {
+ yang_dnode_free(dnode_state);
+ yang_dnode_free(dnode_config);
+ return grpc::Status(
+ grpc::StatusCode::INTERNAL,
+ "Failed to merge configuration and state data",
+ ly_errmsg(ly_native_ctx));
}
+
+ dnode_final = dnode_state;
+ break;
+ case frr::GetRequest_DataType_CONFIG:
+ dnode_final = dnode_config;
+ break;
+ case frr::GetRequest_DataType_STATE:
+ dnode_final = dnode_state;
+ break;
}
- void HandleGetCapabilities(RpcState<frr::GetCapabilitiesRequest,
+ // Validate data to create implicit default nodes if necessary.
+ int validate_opts = 0;
+ if (type == frr::GetRequest_DataType_CONFIG)
+ validate_opts = LYD_VALIDATE_NO_STATE;
+ else
+ validate_opts = 0;
+
+ LY_ERR err = lyd_validate_all(&dnode_final, ly_native_ctx,
+ validate_opts, NULL);
+
+ if (err)
+ flog_warn(EC_LIB_LIBYANG, "%s: lyd_validate_all() failed: %s",
+ __func__, ly_errmsg(ly_native_ctx));
+ // Dump data using the requested format.
+ if (!err)
+ err = data_tree_from_dnode(dt, dnode_final, lyd_format,
+ with_defaults);
+ yang_dnode_free(dnode_final);
+ if (err)
+ return grpc::Status(grpc::StatusCode::INTERNAL,
+ "Failed to dump data");
+ return grpc::Status::OK;
+}
+
+
+// ------------------------------------------------------
+// RPC Callback Functions: run on main thread
+// ------------------------------------------------------
+
+void HandleUnaryGetCapabilities(NewRpcState<frr::GetCapabilitiesRequest,
frr::GetCapabilitiesResponse> *tag)
- {
- switch (tag->state) {
- case CREATE:
- REQUEST_RPC(GetCapabilities);
- tag->state = PROCESS;
- case PROCESS: {
- if (nb_dbg_client_grpc)
- zlog_debug("received RPC GetCapabilities()");
-
- // Response: string frr_version = 1;
- tag->response.set_frr_version(FRR_VERSION);
-
- // Response: bool rollback_support = 2;
+{
+ grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
+
+ if (tag->state == FINISH) {
+ tag->state = DELETED;
+ return;
+ }
+
+ // Response: string frr_version = 1;
+ tag->response.set_frr_version(FRR_VERSION);
+
+ // Response: bool rollback_support = 2;
#ifdef HAVE_CONFIG_ROLLBACKS
- tag->response.set_rollback_support(true);
+ tag->response.set_rollback_support(true);
#else
- tag->response.set_rollback_support(false);
+ tag->response.set_rollback_support(false);
#endif
+ // Response: repeated ModuleData supported_modules = 3;
+ struct yang_module *module;
+ RB_FOREACH (module, yang_modules, &yang_modules) {
+ auto m = tag->response.add_supported_modules();
+
+ m->set_name(module->name);
+ if (module->info->revision)
+ m->set_revision(module->info->revision);
+ m->set_organization(module->info->org);
+ }
- // Response: repeated ModuleData supported_modules = 3;
- struct yang_module *module;
- RB_FOREACH (module, yang_modules, &yang_modules) {
- auto m = tag->response.add_supported_modules();
+ // Response: repeated Encoding supported_encodings = 4;
+ tag->response.add_supported_encodings(frr::JSON);
+ tag->response.add_supported_encodings(frr::XML);
- m->set_name(module->name);
- if (module->info->revision)
- m->set_revision(module->info->revision);
- m->set_organization(module->info->org);
- }
+ /* Should we do this in the async process call? */
+ tag->responder.Finish(tag->response, grpc::Status::OK, tag);
- // Response: repeated Encoding supported_encodings = 4;
- tag->response.add_supported_encodings(frr::JSON);
- tag->response.add_supported_encodings(frr::XML);
+ /* Indicate we are done. */
+ tag->state = FINISH;
+}
- tag->responder.Finish(tag->response, grpc::Status::OK,
- tag);
- tag->state = FINISH;
- break;
- }
- case FINISH:
- delete tag;
- }
+void HandleStreamingGet(NewRpcState<frr::GetRequest, frr::GetResponse> *tag)
+{
+ grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
+
+ if (tag->state == FINISH) {
+ delete static_cast<std::list<std::string> *>(tag->context);
+ tag->state = DELETED;
+ return;
}
- void HandleGet(RpcState<frr::GetRequest, frr::GetResponse> *tag)
- {
- switch (tag->state) {
- case CREATE: {
- auto mypaths = new std::list<std::string>();
- tag->context = mypaths;
- auto paths = tag->request.path();
- for (const std::string &path : paths) {
- mypaths->push_back(std::string(path));
- }
- REQUEST_RPC_STREAMING(Get);
- tag->state = PROCESS;
- }
- case PROCESS: {
- // Request: DataType type = 1;
- int type = tag->request.type();
- // Request: Encoding encoding = 2;
- frr::Encoding encoding = tag->request.encoding();
- // Request: bool with_defaults = 3;
- bool with_defaults = tag->request.with_defaults();
-
- if (nb_dbg_client_grpc)
- zlog_debug(
- "received RPC Get(type: %u, encoding: %u, with_defaults: %u)",
- type, encoding, with_defaults);
-
- auto mypaths = static_cast<std::list<std::string> *>(
- tag->context);
-
- if (mypaths->empty()) {
- tag->async_responder.Finish(grpc::Status::OK,
- tag);
- tag->state = FINISH;
- return;
- }
-
-
- frr::GetResponse response;
- grpc::Status status;
-
- // Response: int64 timestamp = 1;
- response.set_timestamp(time(NULL));
-
- // Response: DataTree data = 2;
- auto *data = response.mutable_data();
- data->set_encoding(tag->request.encoding());
- status = get_path(data, mypaths->back().c_str(), type,
- encoding2lyd_format(encoding),
- with_defaults);
-
- // Something went wrong...
- if (!status.ok()) {
- tag->async_responder.WriteAndFinish(
- response, grpc::WriteOptions(), status,
- tag);
- tag->state = FINISH;
- return;
- }
-
- mypaths->pop_back();
-
- tag->async_responder.Write(response, tag);
-
- break;
+ if (!tag->context) {
+ /* Creating, first time called for this RPC */
+ auto mypaths = new std::list<std::string>();
+ tag->context = mypaths;
+ auto paths = tag->request.path();
+ for (const std::string &path : paths) {
+ mypaths->push_back(std::string(path));
}
- case FINISH:
- if (nb_dbg_client_grpc)
- zlog_debug("received RPC Get() end");
+ }
- delete static_cast<std::list<std::string> *>(
- tag->context);
- delete tag;
- }
+ // Request: DataType type = 1;
+ int type = tag->request.type();
+ // Request: Encoding encoding = 2;
+ frr::Encoding encoding = tag->request.encoding();
+ // Request: bool with_defaults = 3;
+ bool with_defaults = tag->request.with_defaults();
+
+ auto mypathps = static_cast<std::list<std::string> *>(tag->context);
+ if (mypathps->empty()) {
+ tag->async_responder.Finish(grpc::Status::OK, tag);
+ tag->state = FINISH;
+ return;
}
- void HandleCreateCandidate(RpcState<frr::CreateCandidateRequest,
+ frr::GetResponse response;
+ grpc::Status status;
+
+ // Response: int64 timestamp = 1;
+ response.set_timestamp(time(NULL));
+
+ // Response: DataTree data = 2;
+ auto *data = response.mutable_data();
+ data->set_encoding(tag->request.encoding());
+ status = get_path(data, mypathps->back().c_str(), type,
+ encoding2lyd_format(encoding), with_defaults);
+
+ if (!status.ok()) {
+ tag->async_responder.WriteAndFinish(
+ response, grpc::WriteOptions(), status, tag);
+ tag->state = FINISH;
+ return;
+ }
+
+ mypathps->pop_back();
+ if (mypathps->empty()) {
+ tag->async_responder.WriteAndFinish(
+ response, grpc::WriteOptions(), grpc::Status::OK, tag);
+ tag->state = FINISH;
+ } else {
+ tag->async_responder.Write(response, tag);
+ tag->state = MORE;
+ }
+}
+
+void HandleUnaryCreateCandidate(NewRpcState<frr::CreateCandidateRequest,
frr::CreateCandidateResponse> *tag)
- {
- switch (tag->state) {
- case CREATE:
- REQUEST_RPC(CreateCandidate);
- tag->state = PROCESS;
- case PROCESS: {
- if (nb_dbg_client_grpc)
- zlog_debug("received RPC CreateCandidate()");
-
- struct candidate *candidate = create_candidate();
- if (!candidate) {
- tag->responder.Finish(
- tag->response,
- grpc::Status(
- grpc::StatusCode::
- RESOURCE_EXHAUSTED,
- "Can't create candidate configuration"),
- tag);
- } else {
- tag->response.set_candidate_id(candidate->id);
- tag->responder.Finish(tag->response,
- grpc::Status::OK, tag);
- }
+{
+ grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
- tag->state = FINISH;
+ if (tag->state == FINISH) {
+ tag->state = DELETED;
+ return;
+ }
- break;
- }
- case FINISH:
- delete tag;
- }
+ struct candidate *candidate = tag->cdb->create_candidate();
+ if (!candidate) {
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
+ "Can't create candidate configuration"),
+ tag);
+ } else {
+ tag->response.set_candidate_id(candidate->id);
+ tag->responder.Finish(tag->response, grpc::Status::OK, tag);
}
- void HandleDeleteCandidate(RpcState<frr::DeleteCandidateRequest,
+ tag->state = FINISH;
+}
+
+void HandleUnaryDeleteCandidate(NewRpcState<frr::DeleteCandidateRequest,
frr::DeleteCandidateResponse> *tag)
- {
- switch (tag->state) {
- case CREATE:
- REQUEST_RPC(DeleteCandidate);
- tag->state = PROCESS;
- case PROCESS: {
-
- // Request: uint32 candidate_id = 1;
- uint32_t candidate_id = tag->request.candidate_id();
-
- if (nb_dbg_client_grpc)
- zlog_debug(
- "received RPC DeleteCandidate(candidate_id: %u)",
- candidate_id);
-
- struct candidate *candidate =
- get_candidate(candidate_id);
- if (!candidate) {
- tag->responder.Finish(
- tag->response,
- grpc::Status(
- grpc::StatusCode::NOT_FOUND,
- "candidate configuration not found"),
- tag);
- tag->state = FINISH;
- return;
- } else {
- delete_candidate(candidate);
- tag->responder.Finish(tag->response,
- grpc::Status::OK, tag);
- tag->state = FINISH;
- return;
- }
- tag->state = FINISH;
- break;
- }
- case FINISH:
- delete tag;
- }
+{
+ grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
+
+ if (tag->state == FINISH) {
+ tag->state = DELETED;
+ return;
}
- void HandleUpdateCandidate(RpcState<frr::UpdateCandidateRequest,
+ // Request: uint32 candidate_id = 1;
+ uint32_t candidate_id = tag->request.candidate_id();
+
+ grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
+
+ struct candidate *candidate = tag->cdb->get_candidate(candidate_id);
+ if (!candidate) {
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(grpc::StatusCode::NOT_FOUND,
+ "candidate configuration not found"),
+ tag);
+ } else {
+ tag->cdb->delete_candidate(candidate);
+ tag->responder.Finish(tag->response, grpc::Status::OK, tag);
+ }
+ tag->state = FINISH;
+}
+
+void HandleUnaryUpdateCandidate(NewRpcState<frr::UpdateCandidateRequest,
frr::UpdateCandidateResponse> *tag)
- {
- switch (tag->state) {
- case CREATE:
- REQUEST_RPC(UpdateCandidate);
- tag->state = PROCESS;
- case PROCESS: {
-
- // Request: uint32 candidate_id = 1;
- uint32_t candidate_id = tag->request.candidate_id();
-
- if (nb_dbg_client_grpc)
- zlog_debug(
- "received RPC UpdateCandidate(candidate_id: %u)",
- candidate_id);
-
- struct candidate *candidate =
- get_candidate(candidate_id);
-
- if (!candidate)
- tag->responder.Finish(
- tag->response,
- grpc::Status(
- grpc::StatusCode::NOT_FOUND,
- "candidate configuration not found"),
- tag);
- else if (candidate->transaction)
- tag->responder.Finish(
- tag->response,
- grpc::Status(
- grpc::StatusCode::
- FAILED_PRECONDITION,
- "candidate is in the middle of a transaction"),
- tag);
- else if (nb_candidate_update(candidate->config)
- != NB_OK)
- tag->responder.Finish(
- tag->response,
- grpc::Status(
- grpc::StatusCode::INTERNAL,
- "failed to update candidate configuration"),
- tag);
-
- else
- tag->responder.Finish(tag->response,
- grpc::Status::OK, tag);
+{
+ grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
- tag->state = FINISH;
+ if (tag->state == FINISH) {
+ tag->state = DELETED;
+ return;
+ }
- break;
- }
- case FINISH:
- delete tag;
- }
+ // Request: uint32 candidate_id = 1;
+ uint32_t candidate_id = tag->request.candidate_id();
+
+ grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
+
+ struct candidate *candidate = tag->cdb->get_candidate(candidate_id);
+
+ if (!candidate)
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(grpc::StatusCode::NOT_FOUND,
+ "candidate configuration not found"),
+ tag);
+ else if (candidate->transaction)
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(
+ grpc::StatusCode::FAILED_PRECONDITION,
+ "candidate is in the middle of a transaction"),
+ tag);
+ else if (nb_candidate_update(candidate->config) != NB_OK)
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(
+ grpc::StatusCode::INTERNAL,
+ "failed to update candidate configuration"),
+ tag);
+
+ else
+ tag->responder.Finish(tag->response, grpc::Status::OK, tag);
+
+ tag->state = FINISH;
+}
+
+void HandleUnaryEditCandidate(
+ NewRpcState<frr::EditCandidateRequest, frr::EditCandidateResponse> *tag)
+{
+ grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
+
+ if (tag->state == FINISH) {
+ tag->state = DELETED;
+ return;
}
- void HandleEditCandidate(RpcState<frr::EditCandidateRequest,
- frr::EditCandidateResponse> *tag)
- {
- switch (tag->state) {
- case CREATE:
- REQUEST_RPC(EditCandidate);
- tag->state = PROCESS;
- case PROCESS: {
-
- // Request: uint32 candidate_id = 1;
- uint32_t candidate_id = tag->request.candidate_id();
-
- if (nb_dbg_client_grpc)
- zlog_debug(
- "received RPC EditCandidate(candidate_id: %u)",
- candidate_id);
-
- struct candidate *candidate =
- get_candidate(candidate_id);
-
- if (!candidate) {
- tag->responder.Finish(
- tag->response,
- grpc::Status(
- grpc::StatusCode::NOT_FOUND,
- "candidate configuration not found"),
- tag);
- tag->state = FINISH;
- break;
- }
-
- struct nb_config *candidate_tmp =
- nb_config_dup(candidate->config);
-
- auto pvs = tag->request.update();
- for (const frr::PathValue &pv : pvs) {
- if (yang_dnode_edit(candidate_tmp->dnode,
- pv.path(), pv.value())
- != 0) {
- nb_config_free(candidate_tmp);
-
- tag->responder.Finish(
- tag->response,
- grpc::Status(
- grpc::StatusCode::
- INVALID_ARGUMENT,
- "Failed to update \""
- + pv.path()
- + "\""),
- tag);
-
- tag->state = FINISH;
- return;
- }
- }
-
- pvs = tag->request.delete_();
- for (const frr::PathValue &pv : pvs) {
- if (yang_dnode_delete(candidate_tmp->dnode,
- pv.path())
- != 0) {
- nb_config_free(candidate_tmp);
- tag->responder.Finish(
- tag->response,
- grpc::Status(
- grpc::StatusCode::
- INVALID_ARGUMENT,
- "Failed to remove \""
- + pv.path()
- + "\""),
- tag);
- tag->state = FINISH;
- return;
- }
- }
-
- // No errors, accept all changes.
- nb_config_replace(candidate->config, candidate_tmp,
- false);
-
- tag->responder.Finish(tag->response, grpc::Status::OK,
- tag);
+ // Request: uint32 candidate_id = 1;
+ uint32_t candidate_id = tag->request.candidate_id();
- tag->state = FINISH;
+ grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
- break;
- }
- case FINISH:
- delete tag;
- }
+ struct candidate *candidate = tag->cdb->get_candidate(candidate_id);
+
+ if (!candidate) {
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(grpc::StatusCode::NOT_FOUND,
+ "candidate configuration not found"),
+ tag);
+ tag->state = FINISH;
+ return;
}
- void HandleLoadToCandidate(RpcState<frr::LoadToCandidateRequest,
- frr::LoadToCandidateResponse> *tag)
- {
- switch (tag->state) {
- case CREATE:
- REQUEST_RPC(LoadToCandidate);
- tag->state = PROCESS;
- case PROCESS: {
- // Request: uint32 candidate_id = 1;
- uint32_t candidate_id = tag->request.candidate_id();
-
- if (nb_dbg_client_grpc)
- zlog_debug(
- "received RPC LoadToCandidate(candidate_id: %u)",
- candidate_id);
-
- // Request: LoadType type = 2;
- int load_type = tag->request.type();
- // Request: DataTree config = 3;
- auto config = tag->request.config();
-
-
- struct candidate *candidate =
- get_candidate(candidate_id);
-
- if (!candidate) {
- tag->responder.Finish(
- tag->response,
- grpc::Status(
- grpc::StatusCode::NOT_FOUND,
- "candidate configuration not found"),
- tag);
- tag->state = FINISH;
- return;
- }
-
- struct lyd_node *dnode =
- dnode_from_data_tree(&config, true);
- if (!dnode) {
- tag->responder.Finish(
- tag->response,
- grpc::Status(
- grpc::StatusCode::INTERNAL,
- "Failed to parse the configuration"),
- tag);
- tag->state = FINISH;
- return;
- }
-
- struct nb_config *loaded_config = nb_config_new(dnode);
-
- if (load_type == frr::LoadToCandidateRequest::REPLACE)
- nb_config_replace(candidate->config,
- loaded_config, false);
- else if (nb_config_merge(candidate->config,
- loaded_config, false)
- != NB_OK) {
- tag->responder.Finish(
- tag->response,
- grpc::Status(
- grpc::StatusCode::INTERNAL,
- "Failed to merge the loaded configuration"),
- tag);
- tag->state = FINISH;
- return;
- }
-
- tag->responder.Finish(tag->response, grpc::Status::OK,
- tag);
+ struct nb_config *candidate_tmp = nb_config_dup(candidate->config);
+
+ auto pvs = tag->request.update();
+ for (const frr::PathValue &pv : pvs) {
+ if (yang_dnode_edit(candidate_tmp->dnode, pv.path(), pv.value())
+ != 0) {
+ nb_config_free(candidate_tmp);
+
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
+ "Failed to update \"" + pv.path()
+ + "\""),
+ tag);
+
tag->state = FINISH;
- break;
- }
- case FINISH:
- delete tag;
+ return;
}
}
- void
- HandleCommit(RpcState<frr::CommitRequest, frr::CommitResponse> *tag)
- {
- switch (tag->state) {
- case CREATE:
- REQUEST_RPC(Commit);
- tag->state = PROCESS;
- case PROCESS: {
- // Request: uint32 candidate_id = 1;
- uint32_t candidate_id = tag->request.candidate_id();
- if (nb_dbg_client_grpc)
- zlog_debug(
- "received RPC Commit(candidate_id: %u)",
- candidate_id);
-
- // Request: Phase phase = 2;
- int phase = tag->request.phase();
- // Request: string comment = 3;
- const std::string comment = tag->request.comment();
-
- // Find candidate configuration.
- struct candidate *candidate =
- get_candidate(candidate_id);
- if (!candidate) {
- tag->responder.Finish(
- tag->response,
- grpc::Status(
- grpc::StatusCode::NOT_FOUND,
- "candidate configuration not found"),
- tag);
- tag->state = FINISH;
- return;
- }
-
- int ret = NB_OK;
- uint32_t transaction_id = 0;
-
- // Check for misuse of the two-phase commit protocol.
- switch (phase) {
- case frr::CommitRequest::PREPARE:
- case frr::CommitRequest::ALL:
- if (candidate->transaction) {
- tag->responder.Finish(
- tag->response,
- grpc::Status(
- grpc::StatusCode::
- FAILED_PRECONDITION,
- "candidate is in the middle of a transaction"),
- tag);
- tag->state = FINISH;
- return;
- }
- break;
- case frr::CommitRequest::ABORT:
- case frr::CommitRequest::APPLY:
- if (!candidate->transaction) {
- tag->responder.Finish(
- tag->response,
- grpc::Status(
- grpc::StatusCode::
- FAILED_PRECONDITION,
- "no transaction in progress"),
- tag);
- tag->state = FINISH;
- return;
- }
- break;
- default:
- break;
- }
-
-
- // Execute the user request.
- struct nb_context context = {};
- context.client = NB_CLIENT_GRPC;
- char errmsg[BUFSIZ] = {0};
-
- switch (phase) {
- case frr::CommitRequest::VALIDATE:
- zlog_debug("`-> Performing VALIDATE");
- ret = nb_candidate_validate(
- &context, candidate->config, errmsg,
- sizeof(errmsg));
- break;
- case frr::CommitRequest::PREPARE:
- zlog_debug("`-> Performing PREPARE");
- ret = nb_candidate_commit_prepare(
- &context, candidate->config,
- comment.c_str(),
- &candidate->transaction, errmsg,
- sizeof(errmsg));
- break;
- case frr::CommitRequest::ABORT:
- zlog_debug("`-> Performing ABORT");
- nb_candidate_commit_abort(
- candidate->transaction, errmsg,
- sizeof(errmsg));
- break;
- case frr::CommitRequest::APPLY:
- zlog_debug("`-> Performing ABORT");
- nb_candidate_commit_apply(
- candidate->transaction, true,
- &transaction_id, errmsg,
- sizeof(errmsg));
- break;
- case frr::CommitRequest::ALL:
- zlog_debug("`-> Performing ALL");
- ret = nb_candidate_commit(
- &context, candidate->config, true,
- comment.c_str(), &transaction_id,
- errmsg, sizeof(errmsg));
- break;
- }
-
- // Map northbound error codes to gRPC status codes.
- grpc::Status status;
- switch (ret) {
- case NB_OK:
- status = grpc::Status::OK;
- break;
- case NB_ERR_NO_CHANGES:
- status = grpc::Status(grpc::StatusCode::ABORTED,
- errmsg);
- break;
- case NB_ERR_LOCKED:
- status = grpc::Status(
- grpc::StatusCode::UNAVAILABLE, errmsg);
- break;
- case NB_ERR_VALIDATION:
- status = grpc::Status(
- grpc::StatusCode::INVALID_ARGUMENT,
- errmsg);
- break;
- case NB_ERR_RESOURCE:
- status = grpc::Status(
- grpc::StatusCode::RESOURCE_EXHAUSTED,
- errmsg);
- break;
- case NB_ERR:
- default:
- status = grpc::Status(
- grpc::StatusCode::INTERNAL, errmsg);
- break;
- }
-
- if (nb_dbg_client_grpc)
- zlog_debug("`-> Result: %s (message: '%s')",
- nb_err_name((enum nb_error)ret),
- errmsg);
-
- if (ret == NB_OK) {
- // Response: uint32 transaction_id = 1;
- if (transaction_id)
- tag->response.set_transaction_id(
- transaction_id);
- }
- if (strlen(errmsg) > 0)
- tag->response.set_error_message(errmsg);
-
- tag->responder.Finish(tag->response, status, tag);
+ pvs = tag->request.delete_();
+ for (const frr::PathValue &pv : pvs) {
+ if (yang_dnode_delete(candidate_tmp->dnode, pv.path()) != 0) {
+ nb_config_free(candidate_tmp);
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
+ "Failed to remove \"" + pv.path()
+ + "\""),
+ tag);
tag->state = FINISH;
- break;
- }
- case FINISH:
- delete tag;
+ return;
}
}
- void
- HandleListTransactions(RpcState<frr::ListTransactionsRequest,
- frr::ListTransactionsResponse> *tag)
- {
- switch (tag->state) {
- case CREATE:
- REQUEST_RPC_STREAMING(ListTransactions);
- tag->context = new std::list<std::tuple<
- int, std::string, std::string, std::string>>();
- nb_db_transactions_iterate(list_transactions_cb,
- tag->context);
- tag->state = PROCESS;
- case PROCESS: {
- if (nb_dbg_client_grpc)
- zlog_debug("received RPC ListTransactions()");
-
- auto list = static_cast<std::list<std::tuple<
- int, std::string, std::string, std::string>> *>(
- tag->context);
- if (list->empty()) {
- tag->async_responder.Finish(grpc::Status::OK,
- tag);
- tag->state = FINISH;
- return;
- }
- auto item = list->back();
-
-
- frr::ListTransactionsResponse response;
-
- // Response: uint32 id = 1;
- response.set_id(std::get<0>(item));
-
- // Response: string client = 2;
- response.set_client(std::get<1>(item).c_str());
-
- // Response: string date = 3;
- response.set_date(std::get<2>(item).c_str());
-
- // Response: string comment = 4;
- response.set_comment(std::get<3>(item).c_str());
-
- list->pop_back();
-
- tag->async_responder.Write(response, tag);
- break;
- }
- case FINISH:
- delete static_cast<std::list<std::tuple<
- int, std::string, std::string, std::string>> *>(
- tag->context);
- delete tag;
- }
+ // No errors, accept all changes.
+ nb_config_replace(candidate->config, candidate_tmp, false);
+
+ tag->responder.Finish(tag->response, grpc::Status::OK, tag);
+
+ tag->state = FINISH;
+}
+
+void HandleUnaryLoadToCandidate(NewRpcState<frr::LoadToCandidateRequest,
+ frr::LoadToCandidateResponse> *tag)
+{
+ grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
+
+ if (tag->state == FINISH) {
+ tag->state = DELETED;
+ return;
}
- void HandleGetTransaction(RpcState<frr::GetTransactionRequest,
- frr::GetTransactionResponse> *tag)
- {
- switch (tag->state) {
- case CREATE:
- REQUEST_RPC(GetTransaction);
- tag->state = PROCESS;
- case PROCESS: {
- // Request: uint32 transaction_id = 1;
- uint32_t transaction_id = tag->request.transaction_id();
- // Request: Encoding encoding = 2;
- frr::Encoding encoding = tag->request.encoding();
- // Request: bool with_defaults = 3;
- bool with_defaults = tag->request.with_defaults();
-
- if (nb_dbg_client_grpc)
- zlog_debug(
- "received RPC GetTransaction(transaction_id: %u, encoding: %u)",
- transaction_id, encoding);
-
- struct nb_config *nb_config;
-
- // Load configuration from the transactions database.
- nb_config = nb_db_transaction_load(transaction_id);
- if (!nb_config) {
- tag->responder.Finish(
- tag->response,
- grpc::Status(grpc::StatusCode::
- INVALID_ARGUMENT,
- "Transaction not found"),
- tag);
- tag->state = FINISH;
- return;
- }
-
- // Response: DataTree config = 1;
- auto config = tag->response.mutable_config();
- config->set_encoding(encoding);
-
- // Dump data using the requested format.
- if (data_tree_from_dnode(config, nb_config->dnode,
- encoding2lyd_format(encoding),
- with_defaults)
- != 0) {
- nb_config_free(nb_config);
- tag->responder.Finish(
- tag->response,
- grpc::Status(grpc::StatusCode::INTERNAL,
- "Failed to dump data"),
- tag);
- tag->state = FINISH;
- return;
- }
-
- nb_config_free(nb_config);
-
- tag->responder.Finish(tag->response, grpc::Status::OK,
- tag);
- tag->state = FINISH;
- break;
- }
- case FINISH:
- delete tag;
- }
+ // Request: uint32 candidate_id = 1;
+ uint32_t candidate_id = tag->request.candidate_id();
+
+ grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
+
+ // Request: LoadType type = 2;
+ int load_type = tag->request.type();
+ // Request: DataTree config = 3;
+ auto config = tag->request.config();
+
+
+ struct candidate *candidate = tag->cdb->get_candidate(candidate_id);
+
+ if (!candidate) {
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(grpc::StatusCode::NOT_FOUND,
+ "candidate configuration not found"),
+ tag);
+ tag->state = FINISH;
+ return;
}
- void HandleLockConfig(
- RpcState<frr::LockConfigRequest, frr::LockConfigResponse> *tag)
- {
- switch (tag->state) {
- case CREATE:
- REQUEST_RPC(LockConfig);
- tag->state = PROCESS;
- case PROCESS: {
- if (nb_dbg_client_grpc)
- zlog_debug("received RPC LockConfig()");
-
- if (nb_running_lock(NB_CLIENT_GRPC, NULL)) {
- tag->responder.Finish(
- tag->response,
- grpc::Status(
- grpc::StatusCode::
- FAILED_PRECONDITION,
- "running configuration is locked already"),
- tag);
- tag->state = FINISH;
- return;
- }
-
- tag->responder.Finish(tag->response, grpc::Status::OK,
- tag);
- tag->state = FINISH;
- break;
- }
- case FINISH:
- delete tag;
- }
+ struct lyd_node *dnode = dnode_from_data_tree(&config, true);
+ if (!dnode) {
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(grpc::StatusCode::INTERNAL,
+ "Failed to parse the configuration"),
+ tag);
+ tag->state = FINISH;
+ return;
}
- void HandleUnlockConfig(RpcState<frr::UnlockConfigRequest,
- frr::UnlockConfigResponse> *tag)
- {
- switch (tag->state) {
- case CREATE:
- REQUEST_RPC(UnlockConfig);
- tag->state = PROCESS;
- case PROCESS: {
- if (nb_dbg_client_grpc)
- zlog_debug("received RPC UnlockConfig()");
-
- if (nb_running_unlock(NB_CLIENT_GRPC, NULL)) {
- tag->responder.Finish(
- tag->response,
- grpc::Status(
- grpc::StatusCode::
- FAILED_PRECONDITION,
- "failed to unlock the running configuration"),
- tag);
- tag->state = FINISH;
- return;
- }
-
- tag->responder.Finish(tag->response, grpc::Status::OK,
- tag);
- tag->state = FINISH;
- break;
- }
- case FINISH:
- delete tag;
- }
+ struct nb_config *loaded_config = nb_config_new(dnode);
+
+ if (load_type == frr::LoadToCandidateRequest::REPLACE)
+ nb_config_replace(candidate->config, loaded_config, false);
+ else if (nb_config_merge(candidate->config, loaded_config, false)
+ != NB_OK) {
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(
+ grpc::StatusCode::INTERNAL,
+ "Failed to merge the loaded configuration"),
+ tag);
+ tag->state = FINISH;
+ return;
}
- void
- HandleExecute(RpcState<frr::ExecuteRequest, frr::ExecuteResponse> *tag)
- {
- struct nb_node *nb_node;
- struct list *input_list;
- struct list *output_list;
- struct listnode *node;
- struct yang_data *data;
- const char *xpath;
- char errmsg[BUFSIZ] = {0};
+ tag->responder.Finish(tag->response, grpc::Status::OK, tag);
+ tag->state = FINISH;
+}
+
+void HandleUnaryCommit(
+ NewRpcState<frr::CommitRequest, frr::CommitResponse> *tag)
+{
+ grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
+
+ if (tag->state == FINISH) {
+ tag->state = DELETED;
+ return;
+ }
- switch (tag->state) {
- case CREATE:
- REQUEST_RPC(Execute);
- tag->state = PROCESS;
- case PROCESS: {
- // Request: string path = 1;
- xpath = tag->request.path().c_str();
-
- if (nb_dbg_client_grpc)
- zlog_debug("received RPC Execute(path: \"%s\")",
- xpath);
-
- if (tag->request.path().empty()) {
- tag->responder.Finish(
- tag->response,
- grpc::Status(grpc::StatusCode::
- INVALID_ARGUMENT,
- "Data path is empty"),
- tag);
- tag->state = FINISH;
- return;
- }
-
- nb_node = nb_node_find(xpath);
- if (!nb_node) {
- tag->responder.Finish(
- tag->response,
- grpc::Status(grpc::StatusCode::
- INVALID_ARGUMENT,
- "Unknown data path"),
- tag);
- tag->state = FINISH;
- return;
- }
-
- input_list = yang_data_list_new();
- output_list = yang_data_list_new();
-
- // Read input parameters.
- auto input = tag->request.input();
- for (const frr::PathValue &pv : input) {
- // Request: repeated PathValue input = 2;
- data = yang_data_new(pv.path().c_str(),
- pv.value().c_str());
- listnode_add(input_list, data);
- }
-
- // Execute callback registered for this XPath.
- if (nb_callback_rpc(nb_node, xpath, input_list,
- output_list, errmsg, sizeof(errmsg))
- != NB_OK) {
- flog_warn(EC_LIB_NB_CB_RPC,
- "%s: rpc callback failed: %s",
- __func__, xpath);
- list_delete(&input_list);
- list_delete(&output_list);
-
- tag->responder.Finish(
- tag->response,
- grpc::Status(grpc::StatusCode::INTERNAL,
- "RPC failed"),
- tag);
- tag->state = FINISH;
- return;
- }
-
- // Process output parameters.
- for (ALL_LIST_ELEMENTS_RO(output_list, node, data)) {
- // Response: repeated PathValue output = 1;
- frr::PathValue *pv = tag->response.add_output();
- pv->set_path(data->xpath);
- pv->set_value(data->value);
- }
-
- // Release memory.
- list_delete(&input_list);
- list_delete(&output_list);
-
- tag->responder.Finish(tag->response, grpc::Status::OK,
- tag);
+ // Request: uint32 candidate_id = 1;
+ uint32_t candidate_id = tag->request.candidate_id();
+
+ grpc_debug("%s(candidate_id: %u)", __func__, candidate_id);
+
+ // Request: Phase phase = 2;
+ int phase = tag->request.phase();
+ // Request: string comment = 3;
+ const std::string comment = tag->request.comment();
+
+ // Find candidate configuration.
+ struct candidate *candidate = tag->cdb->get_candidate(candidate_id);
+ if (!candidate) {
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(grpc::StatusCode::NOT_FOUND,
+ "candidate configuration not found"),
+ tag);
+ tag->state = FINISH;
+ return;
+ }
+
+ int ret = NB_OK;
+ uint32_t transaction_id = 0;
+
+ // Check for misuse of the two-phase commit protocol.
+ switch (phase) {
+ case frr::CommitRequest::PREPARE:
+ case frr::CommitRequest::ALL:
+ if (candidate->transaction) {
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(
+ grpc::StatusCode::FAILED_PRECONDITION,
+ "candidate is in the middle of a transaction"),
+ tag);
tag->state = FINISH;
- break;
+ return;
}
- case FINISH:
- delete tag;
+ break;
+ case frr::CommitRequest::ABORT:
+ case frr::CommitRequest::APPLY:
+ if (!candidate->transaction) {
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(
+ grpc::StatusCode::FAILED_PRECONDITION,
+ "no transaction in progress"),
+ tag);
+ tag->state = FINISH;
+ return;
}
+ break;
+ default:
+ break;
}
- private:
- frr::Northbound::AsyncService *_service;
- grpc::ServerCompletionQueue *_cq;
- struct candidate {
- uint32_t id;
- struct nb_config *config;
- struct nb_transaction *transaction;
- };
- std::map<uint32_t, struct candidate> _candidates;
- uint32_t _nextCandidateId;
+ // Execute the user request.
+ struct nb_context context = {};
+ context.client = NB_CLIENT_GRPC;
+ char errmsg[BUFSIZ] = {0};
+
+ switch (phase) {
+ case frr::CommitRequest::VALIDATE:
+ grpc_debug("`-> Performing VALIDATE");
+ ret = nb_candidate_validate(&context, candidate->config, errmsg,
+ sizeof(errmsg));
+ break;
+ case frr::CommitRequest::PREPARE:
+ grpc_debug("`-> Performing PREPARE");
+ ret = nb_candidate_commit_prepare(
+ &context, candidate->config, comment.c_str(),
+ &candidate->transaction, errmsg, sizeof(errmsg));
+ break;
+ case frr::CommitRequest::ABORT:
+ grpc_debug("`-> Performing ABORT");
+ nb_candidate_commit_abort(candidate->transaction, errmsg,
+ sizeof(errmsg));
+ break;
+ case frr::CommitRequest::APPLY:
+ grpc_debug("`-> Performing APPLY");
+ nb_candidate_commit_apply(candidate->transaction, true,
+ &transaction_id, errmsg,
+ sizeof(errmsg));
+ break;
+ case frr::CommitRequest::ALL:
+ grpc_debug("`-> Performing ALL");
+ ret = nb_candidate_commit(&context, candidate->config, true,
+ comment.c_str(), &transaction_id,
+ errmsg, sizeof(errmsg));
+ break;
+ }
- static int yang_dnode_edit(struct lyd_node *dnode,
- const std::string &path,
- const std::string &value)
- {
- LY_ERR err = lyd_new_path(dnode, ly_native_ctx, path.c_str(),
- value.c_str(), LYD_NEW_PATH_UPDATE,
- &dnode);
- if (err != LY_SUCCESS) {
- flog_warn(EC_LIB_LIBYANG,
- "%s: lyd_new_path() failed: %s", __func__,
- ly_errmsg(ly_native_ctx));
- return -1;
- }
+ // Map northbound error codes to gRPC status codes.
+ grpc::Status status;
+ switch (ret) {
+ case NB_OK:
+ status = grpc::Status::OK;
+ break;
+ case NB_ERR_NO_CHANGES:
+ status = grpc::Status(grpc::StatusCode::ABORTED, errmsg);
+ break;
+ case NB_ERR_LOCKED:
+ status = grpc::Status(grpc::StatusCode::UNAVAILABLE, errmsg);
+ break;
+ case NB_ERR_VALIDATION:
+ status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
+ errmsg);
+ break;
+ case NB_ERR_RESOURCE:
+ status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
+ errmsg);
+ break;
+ case NB_ERR:
+ default:
+ status = grpc::Status(grpc::StatusCode::INTERNAL, errmsg);
+ break;
+ }
- return 0;
+ grpc_debug("`-> Result: %s (message: '%s')",
+ nb_err_name((enum nb_error)ret), errmsg);
+
+ if (ret == NB_OK) {
+ // Response: uint32 transaction_id = 1;
+ if (transaction_id)
+ tag->response.set_transaction_id(transaction_id);
}
+ if (strlen(errmsg) > 0)
+ tag->response.set_error_message(errmsg);
- static int yang_dnode_delete(struct lyd_node *dnode,
- const std::string &path)
- {
- dnode = yang_dnode_get(dnode, path.c_str());
- if (!dnode)
- return -1;
+ tag->responder.Finish(tag->response, status, tag);
+ tag->state = FINISH;
+}
- lyd_free_tree(dnode);
+void HandleUnaryLockConfig(
+ NewRpcState<frr::LockConfigRequest, frr::LockConfigResponse> *tag)
+{
+ grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
- return 0;
+ if (tag->state == FINISH) {
+ tag->state = DELETED;
+ return;
}
- static LYD_FORMAT encoding2lyd_format(enum frr::Encoding encoding)
- {
- switch (encoding) {
- case frr::JSON:
- return LYD_JSON;
- case frr::XML:
- return LYD_XML;
- default:
- flog_err(EC_LIB_DEVELOPMENT,
- "%s: unknown data encoding format (%u)",
- __func__, encoding);
- exit(1);
- }
+ if (nb_running_lock(NB_CLIENT_GRPC, NULL)) {
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(grpc::StatusCode::FAILED_PRECONDITION,
+ "running configuration is locked already"),
+ tag);
+ } else {
+ tag->responder.Finish(tag->response, grpc::Status::OK, tag);
}
+ tag->state = FINISH;
+}
- static int get_oper_data_cb(const struct lysc_node *snode,
- struct yang_translator *translator,
- struct yang_data *data, void *arg)
- {
- struct lyd_node *dnode = static_cast<struct lyd_node *>(arg);
- int ret = yang_dnode_edit(dnode, data->xpath, data->value);
- yang_data_free(data);
+void HandleUnaryUnlockConfig(
+ NewRpcState<frr::UnlockConfigRequest, frr::UnlockConfigResponse> *tag)
+{
+ grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
- return (ret == 0) ? NB_OK : NB_ERR;
+ if (tag->state == FINISH) {
+ tag->state = DELETED;
+ return;
}
- static void list_transactions_cb(void *arg, int transaction_id,
- const char *client_name,
- const char *date, const char *comment)
- {
+ if (nb_running_unlock(NB_CLIENT_GRPC, NULL)) {
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(
+ grpc::StatusCode::FAILED_PRECONDITION,
+ "failed to unlock the running configuration"),
+ tag);
+ } else {
+ tag->responder.Finish(tag->response, grpc::Status::OK, tag);
+ }
+ tag->state = FINISH;
+}
- auto list = static_cast<std::list<std::tuple<
- int, std::string, std::string, std::string>> *>(arg);
- list->push_back(std::make_tuple(
- transaction_id, std::string(client_name),
- std::string(date), std::string(comment)));
+static void list_transactions_cb(void *arg, int transaction_id,
+ const char *client_name, const char *date,
+ const char *comment)
+{
+ auto list = static_cast<std::list<
+ std::tuple<int, std::string, std::string, std::string>> *>(arg);
+ list->push_back(
+ std::make_tuple(transaction_id, std::string(client_name),
+ std::string(date), std::string(comment)));
+}
+
+void HandleStreamingListTransactions(
+ NewRpcState<frr::ListTransactionsRequest, frr::ListTransactionsResponse>
+ *tag)
+{
+ grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
+
+ if (tag->state == FINISH) {
+ delete static_cast<std::list<std::tuple<
+ int, std::string, std::string, std::string>> *>(
+ tag->context);
+ tag->state = DELETED;
+ return;
}
- static LY_ERR data_tree_from_dnode(frr::DataTree *dt,
- const struct lyd_node *dnode,
- LYD_FORMAT lyd_format,
- bool with_defaults)
- {
- char *strp;
- int options = 0;
-
- SET_FLAG(options, LYD_PRINT_WITHSIBLINGS);
- if (with_defaults)
- SET_FLAG(options, LYD_PRINT_WD_ALL);
- else
- SET_FLAG(options, LYD_PRINT_WD_TRIM);
-
- LY_ERR err = lyd_print_mem(&strp, dnode, lyd_format, options);
- if (err == LY_SUCCESS) {
- if (strp) {
- dt->set_data(strp);
- free(strp);
- }
- }
- return err;
+ if (!tag->context) {
+ /* Creating, first time called for this RPC */
+ auto new_list =
+ new std::list<std::tuple<int, std::string, std::string,
+ std::string>>();
+ tag->context = new_list;
+ nb_db_transactions_iterate(list_transactions_cb, tag->context);
+
+ new_list->push_back(std::make_tuple(
+ 0xFFFF, std::string("fake client"),
+ std::string("fake date"), std::string("fake comment")));
+ new_list->push_back(
+ std::make_tuple(0xFFFE, std::string("fake client2"),
+ std::string("fake date"),
+ std::string("fake comment2")));
}
- static struct lyd_node *dnode_from_data_tree(const frr::DataTree *dt,
- bool config_only)
- {
- struct lyd_node *dnode;
- int options, opt2;
- LY_ERR err;
+ auto list = static_cast<std::list<
+ std::tuple<int, std::string, std::string, std::string>> *>(
+ tag->context);
- if (config_only) {
- options = LYD_PARSE_STRICT | LYD_PARSE_NO_STATE;
- opt2 = LYD_VALIDATE_NO_STATE;
- } else {
- options = LYD_PARSE_STRICT;
- opt2 = 0;
- }
-
- err = lyd_parse_data_mem(ly_native_ctx, dt->data().c_str(),
- encoding2lyd_format(dt->encoding()),
- options, opt2, &dnode);
- if (err != LY_SUCCESS) {
- flog_warn(EC_LIB_LIBYANG,
- "%s: lyd_parse_mem() failed: %s", __func__,
- ly_errmsg(ly_native_ctx));
- }
- return dnode;
+ if (list->empty()) {
+ tag->async_responder.Finish(grpc::Status::OK, tag);
+ tag->state = FINISH;
+ return;
}
- static struct lyd_node *get_dnode_config(const std::string &path)
- {
- struct lyd_node *dnode;
+ auto item = list->back();
- dnode = yang_dnode_get(running_config->dnode,
- path.empty() ? NULL : path.c_str());
- if (dnode)
- dnode = yang_dnode_dup(dnode);
+ frr::ListTransactionsResponse response;
- return dnode;
- }
+ // Response: uint32 id = 1;
+ response.set_id(std::get<0>(item));
- static struct lyd_node *get_dnode_state(const std::string &path)
- {
- struct lyd_node *dnode;
-
- dnode = yang_dnode_new(ly_native_ctx, false);
- if (nb_oper_data_iterate(path.c_str(), NULL, 0,
- get_oper_data_cb, dnode)
- != NB_OK) {
- yang_dnode_free(dnode);
- return NULL;
- }
+ // Response: string client = 2;
+ response.set_client(std::get<1>(item).c_str());
+
+ // Response: string date = 3;
+ response.set_date(std::get<2>(item).c_str());
+
+ // Response: string comment = 4;
+ response.set_comment(std::get<3>(item).c_str());
- return dnode;
+ list->pop_back();
+ if (list->empty()) {
+ tag->async_responder.WriteAndFinish(
+ response, grpc::WriteOptions(), grpc::Status::OK, tag);
+ tag->state = FINISH;
+ } else {
+ tag->async_responder.Write(response, tag);
+ tag->state = MORE;
}
+}
- static grpc::Status get_path(frr::DataTree *dt, const std::string &path,
- int type, LYD_FORMAT lyd_format,
- bool with_defaults)
- {
- struct lyd_node *dnode_config = NULL;
- struct lyd_node *dnode_state = NULL;
- struct lyd_node *dnode_final;
-
- // Configuration data.
- if (type == frr::GetRequest_DataType_ALL
- || type == frr::GetRequest_DataType_CONFIG) {
- dnode_config = get_dnode_config(path);
- if (!dnode_config)
- return grpc::Status(
- grpc::StatusCode::INVALID_ARGUMENT,
- "Data path not found");
- }
+void HandleUnaryGetTransaction(NewRpcState<frr::GetTransactionRequest,
+ frr::GetTransactionResponse> *tag)
+{
+ grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
- // Operational data.
- if (type == frr::GetRequest_DataType_ALL
- || type == frr::GetRequest_DataType_STATE) {
- dnode_state = get_dnode_state(path);
- if (!dnode_state) {
- if (dnode_config)
- yang_dnode_free(dnode_config);
- return grpc::Status(
- grpc::StatusCode::INVALID_ARGUMENT,
- "Failed to fetch operational data");
- }
- }
+ if (tag->state == FINISH) {
+ tag->state = DELETED;
+ return;
+ }
- switch (type) {
- case frr::GetRequest_DataType_ALL:
- //
- // Combine configuration and state data into a single
- // dnode.
- //
- if (lyd_merge_tree(&dnode_state, dnode_config,
- LYD_MERGE_DESTRUCT)
- != LY_SUCCESS) {
- yang_dnode_free(dnode_state);
- yang_dnode_free(dnode_config);
- return grpc::Status(
- grpc::StatusCode::INTERNAL,
- "Failed to merge configuration and state data",
- ly_errmsg(ly_native_ctx));
- }
-
- dnode_final = dnode_state;
- break;
- case frr::GetRequest_DataType_CONFIG:
- dnode_final = dnode_config;
- break;
- case frr::GetRequest_DataType_STATE:
- dnode_final = dnode_state;
- break;
- }
+ // Request: uint32 transaction_id = 1;
+ uint32_t transaction_id = tag->request.transaction_id();
+ // Request: Encoding encoding = 2;
+ frr::Encoding encoding = tag->request.encoding();
+ // Request: bool with_defaults = 3;
+ bool with_defaults = tag->request.with_defaults();
+
+ grpc_debug("%s(transaction_id: %u, encoding: %u)", __func__,
+ transaction_id, encoding);
+
+ struct nb_config *nb_config;
+
+ // Load configuration from the transactions database.
+ nb_config = nb_db_transaction_load(transaction_id);
+ if (!nb_config) {
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
+ "Transaction not found"),
+ tag);
+ tag->state = FINISH;
+ return;
+ }
- // Validate data to create implicit default nodes if necessary.
- int validate_opts = 0;
- if (type == frr::GetRequest_DataType_CONFIG)
- validate_opts = LYD_VALIDATE_NO_STATE;
- else
- validate_opts = 0;
-
- LY_ERR err = lyd_validate_all(&dnode_final, ly_native_ctx,
- validate_opts, NULL);
-
- if (err)
- flog_warn(EC_LIB_LIBYANG,
- "%s: lyd_validate_all() failed: %s", __func__,
- ly_errmsg(ly_native_ctx));
- // Dump data using the requested format.
- if (!err)
- err = data_tree_from_dnode(dt, dnode_final, lyd_format,
- with_defaults);
- yang_dnode_free(dnode_final);
- if (err)
- return grpc::Status(grpc::StatusCode::INTERNAL,
- "Failed to dump data");
- return grpc::Status::OK;
+ // Response: DataTree config = 1;
+ auto config = tag->response.mutable_config();
+ config->set_encoding(encoding);
+
+ // Dump data using the requested format.
+ if (data_tree_from_dnode(config, nb_config->dnode,
+ encoding2lyd_format(encoding), with_defaults)
+ != 0) {
+ nb_config_free(nb_config);
+ tag->responder.Finish(tag->response,
+ grpc::Status(grpc::StatusCode::INTERNAL,
+ "Failed to dump data"),
+ tag);
+ tag->state = FINISH;
+ return;
}
- struct candidate *create_candidate(void)
- {
- uint32_t candidate_id = ++_nextCandidateId;
+ nb_config_free(nb_config);
- // Check for overflow.
- // TODO: implement an algorithm for unique reusable IDs.
- if (candidate_id == 0)
- return NULL;
+ tag->responder.Finish(tag->response, grpc::Status::OK, tag);
+ tag->state = FINISH;
+}
- struct candidate *candidate = &_candidates[candidate_id];
- candidate->id = candidate_id;
- candidate->config = nb_config_dup(running_config);
- candidate->transaction = NULL;
+void HandleUnaryExecute(
+ NewRpcState<frr::ExecuteRequest, frr::ExecuteResponse> *tag)
+{
+ grpc_debug("%s: state: %s", __func__, call_states[tag->state]);
- return candidate;
+ if (tag->state == FINISH) {
+ tag->state = DELETED;
+ return;
}
- void delete_candidate(struct candidate *candidate)
- {
- char errmsg[BUFSIZ] = {0};
+ struct nb_node *nb_node;
+ struct list *input_list;
+ struct list *output_list;
+ struct listnode *node;
+ struct yang_data *data;
+ const char *xpath;
+ char errmsg[BUFSIZ] = {0};
+
+ // Request: string path = 1;
+ xpath = tag->request.path().c_str();
+
+ grpc_debug("%s(path: \"%s\")", __func__, xpath);
+
+ if (tag->request.path().empty()) {
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
+ "Data path is empty"),
+ tag);
+ tag->state = FINISH;
+ return;
+ }
- _candidates.erase(candidate->id);
- nb_config_free(candidate->config);
- if (candidate->transaction)
- nb_candidate_commit_abort(candidate->transaction,
- errmsg, sizeof(errmsg));
+ nb_node = nb_node_find(xpath);
+ if (!nb_node) {
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
+ "Unknown data path"),
+ tag);
+ tag->state = FINISH;
+ return;
}
- struct candidate *get_candidate(uint32_t candidate_id)
- {
- struct candidate *candidate;
+ input_list = yang_data_list_new();
+ output_list = yang_data_list_new();
- if (_candidates.count(candidate_id) == 0)
- return NULL;
+ // Read input parameters.
+ auto input = tag->request.input();
+ for (const frr::PathValue &pv : input) {
+ // Request: repeated PathValue input = 2;
+ data = yang_data_new(pv.path().c_str(), pv.value().c_str());
+ listnode_add(input_list, data);
+ }
- return &_candidates[candidate_id];
+ // Execute callback registered for this XPath.
+ if (nb_callback_rpc(nb_node, xpath, input_list, output_list, errmsg,
+ sizeof(errmsg))
+ != NB_OK) {
+ flog_warn(EC_LIB_NB_CB_RPC, "%s: rpc callback failed: %s",
+ __func__, xpath);
+ list_delete(&input_list);
+ list_delete(&output_list);
+
+ tag->responder.Finish(
+ tag->response,
+ grpc::Status(grpc::StatusCode::INTERNAL, "RPC failed"),
+ tag);
+ tag->state = FINISH;
+ return;
}
+
+ // Process output parameters.
+ for (ALL_LIST_ELEMENTS_RO(output_list, node, data)) {
+ // Response: repeated PathValue output = 1;
+ frr::PathValue *pv = tag->response.add_output();
+ pv->set_path(data->xpath);
+ pv->set_value(data->value);
+ }
+
+ // Release memory.
+ list_delete(&input_list);
+ list_delete(&output_list);
+
+ tag->responder.Finish(tag->response, grpc::Status::OK, tag);
+ tag->state = FINISH;
+}
+
+// ------------------------------------------------------
+// Thread Initialization and Run Functions
+// ------------------------------------------------------
+
+
+#define REQUEST_NEWRPC(NAME, cdb) \
+ do { \
+ auto _rpcState = new NewRpcState<frr::NAME##Request, \
+ frr::NAME##Response>( \
+ (cdb), &frr::Northbound::AsyncService::Request##NAME, \
+ &HandleUnary##NAME, #NAME); \
+ _rpcState->do_request(service, _cq); \
+ } while (0)
+
+#define REQUEST_NEWRPC_STREAMING(NAME, cdb) \
+ do { \
+ auto _rpcState = new NewRpcState<frr::NAME##Request, \
+ frr::NAME##Response>( \
+ (cdb), &frr::Northbound::AsyncService::Request##NAME, \
+ &HandleStreaming##NAME, #NAME); \
+ _rpcState->do_request(service, _cq); \
+ } while (0)
+
+struct grpc_pthread_attr {
+ struct frr_pthread_attr attr;
+ unsigned long port;
};
static void *grpc_pthread_start(void *arg)
{
struct frr_pthread *fpt = static_cast<frr_pthread *>(arg);
- unsigned long *port = static_cast<unsigned long *>(fpt->data);
+ uint port = (uint) reinterpret_cast<intptr_t>(fpt->data);
+
+ Candidates candidates;
+ grpc::ServerBuilder builder;
+ std::stringstream server_address;
+ frr::Northbound::AsyncService *service =
+ new frr::Northbound::AsyncService();
+ grpc::ServerCompletionQueue *_cq;
frr_pthread_set_name(fpt);
- NorthboundImpl nb;
- nb.Run(*port);
+ server_address << "0.0.0.0:" << port;
+ builder.AddListeningPort(server_address.str(),
+ grpc::InsecureServerCredentials());
+ builder.RegisterService(service);
+ auto cq = builder.AddCompletionQueue();
+ _cq = cq.get();
+ auto server = builder.BuildAndStart();
+
+ /* Schedule all RPC handlers */
+ REQUEST_NEWRPC(GetCapabilities, NULL);
+ REQUEST_NEWRPC(CreateCandidate, &candidates);
+ REQUEST_NEWRPC(DeleteCandidate, &candidates);
+ REQUEST_NEWRPC(UpdateCandidate, &candidates);
+ REQUEST_NEWRPC(EditCandidate, &candidates);
+ REQUEST_NEWRPC(LoadToCandidate, &candidates);
+ REQUEST_NEWRPC(Commit, &candidates);
+ REQUEST_NEWRPC(GetTransaction, NULL);
+ REQUEST_NEWRPC(LockConfig, NULL);
+ REQUEST_NEWRPC(UnlockConfig, NULL);
+ REQUEST_NEWRPC(Execute, NULL);
+ REQUEST_NEWRPC_STREAMING(Get, NULL);
+ REQUEST_NEWRPC_STREAMING(ListTransactions, NULL);
+
+ zlog_notice("gRPC server listening on %s",
+ server_address.str().c_str());
+
+ /* Process inbound RPCs */
+ while (true) {
+ void *tag;
+ bool ok;
+
+ _cq->Next(&tag, &ok);
+ grpc_debug("%s: Got next from CompletionQueue, %p %d", __func__,
+ tag, ok);
+ GPR_ASSERT(ok);
+
+ RpcStateBase *rpc = static_cast<RpcStateBase *>(tag);
+ CallState state = rpc->doCallback();
+ grpc_debug("%s: Callback returned RPC State: %s", __func__,
+ call_states[state]);
+
+ /*
+ * Our side is done (FINISH) receive new requests of this type
+ * We could do this earlier but that would mean we could be
+ * handling multiple same type requests in parallel. We expect
+ * to be called back once more in the FINISH state (from the
+ * user indicating Finish() for cleanup.
+ */
+ if (state == FINISH)
+ rpc->do_request(service, _cq);
+ }
+ /*NOTREACHED*/
return NULL;
}
-static int frr_grpc_init(unsigned long *port)
+
+static int frr_grpc_init(uint port)
{
+ struct frr_pthread_attr attr = {
+ .start = grpc_pthread_start,
+ .stop = NULL,
+ };
+
fpt = frr_pthread_new(&attr, "frr-grpc", "frr-grpc");
- fpt->data = static_cast<void *>(port);
+ fpt->data = reinterpret_cast<void *>((intptr_t)port);
/* Create a pthread for gRPC since it runs its own event loop. */
if (frr_pthread_run(fpt, NULL) < 0) {
@@ -1363,7 +1336,6 @@ static int frr_grpc_finish(void)
if (fpt)
frr_pthread_destroy(fpt);
// TODO: cancel the gRPC pthreads gracefully.
-
return 0;
}
@@ -1376,28 +1348,20 @@ static int frr_grpc_finish(void)
*/
static int frr_grpc_module_very_late_init(struct thread *thread)
{
- static unsigned long port = GRPC_DEFAULT_PORT;
const char *args = THIS_MODULE->load_args;
+ uint port = GRPC_DEFAULT_PORT;
- // Parse port number.
if (args) {
- try {
- port = std::stoul(args);
- if (port < 1024)
- throw std::invalid_argument(
- "can't use privileged port");
- if (port > UINT16_MAX)
- throw std::invalid_argument(
- "port number is too big");
- } catch (std::exception &e) {
+ port = std::stoul(args);
+ if (port < 1024 || port > UINT16_MAX) {
flog_err(EC_LIB_GRPC_INIT,
- "%s: failed to parse port number: %s",
- __func__, e.what());
+ "%s: port number must be between 1025 and %d",
+ __func__, UINT16_MAX);
goto error;
}
}
- if (frr_grpc_init(&port) < 0)
+ if (frr_grpc_init(port) < 0)
goto error;
return 0;
@@ -1409,9 +1373,9 @@ error:
static int frr_grpc_module_late_init(struct thread_master *tm)
{
- thread_add_event(tm, frr_grpc_module_very_late_init, NULL, 0, NULL);
+ main_master = tm;
hook_register(frr_fini, frr_grpc_finish);
-
+ thread_add_event(tm, frr_grpc_module_very_late_init, NULL, 0, NULL);
return 0;
}
@@ -1424,5 +1388,4 @@ static int frr_grpc_module_init(void)
FRR_MODULE_SETUP(.name = "frr_grpc", .version = FRR_VERSION,
.description = "FRR gRPC northbound module",
- .init = frr_grpc_module_init,
-);
+ .init = frr_grpc_module_init, );