diff options
author | Christian Hopps <chopps@gmail.com> | 2021-05-20 08:50:34 +0200 |
---|---|---|
committer | Christian Hopps <chopps@gmail.com> | 2021-06-01 17:27:54 +0200 |
commit | b680134e1122cdbb67f5c6ed158ee712255ee3cc (patch) | |
tree | f6079a4f6b7a7239c990ef0b6f14ccaa606f0c18 /lib/northbound_grpc.cpp | |
parent | Merge pull request #8769 from ton31337/fix/time_to_remove (diff) | |
download | frr-b680134e1122cdbb67f5c6ed158ee712255ee3cc.tar.xz frr-b680134e1122cdbb67f5c6ed158ee712255ee3cc.zip |
lib: fix threading bug in GRPC code
The code that actually calls FRR northbound functions needs to be running in the
master thread. The previous code was running on a GRPC pthread. While fixing
moved to more functional vs OOP to make this easier to see.
Also fix ly merge to merge siblings not throw the originals away.
Signed-off-by: Christian Hopps <chopps@labn.net>
Diffstat (limited to 'lib/northbound_grpc.cpp')
-rw-r--r-- | lib/northbound_grpc.cpp | 2313 |
1 files changed, 1138 insertions, 1175 deletions
diff --git a/lib/northbound_grpc.cpp b/lib/northbound_grpc.cpp index c61effdda..807d1252c 100644 --- a/lib/northbound_grpc.cpp +++ b/lib/northbound_grpc.cpp @@ -1,6 +1,7 @@ // // Copyright (C) 2019 NetDEF, Inc. // Renato Westphal +// Copyright (c) 2021, LabN Consulting, L.L.C // // This program is free software; you can redistribute it and/or modify it // under the terms of the GNU General Public License as published by the Free @@ -24,6 +25,7 @@ #include "log.h" #include "libfrr.h" #include "lib/version.h" +#include "lib/thread.h" #include "command.h" #include "lib_errors.h" #include "northbound.h" @@ -37,1315 +39,1286 @@ #define GRPC_DEFAULT_PORT 50051 -static void *grpc_pthread_start(void *arg); - /* * NOTE: we can't use the FRR debugging infrastructure here since it uses * atomics and C++ has a different atomics API. Enable gRPC debugging * unconditionally until we figure out a way to solve this problem. */ -static bool nb_dbg_client_grpc = 1; +static bool nb_dbg_client_grpc = 0; + +static struct thread_master *main_master; static struct frr_pthread *fpt; -/* Default frr_pthread attributes */ -static const struct frr_pthread_attr attr = { - .start = grpc_pthread_start, - .stop = NULL, +#define grpc_debug(...) \ + do { \ + if (nb_dbg_client_grpc) \ + zlog_debug(__VA_ARGS__); \ + } while (0) + +// ------------------------------------------------------ +// New Types +// ------------------------------------------------------ + +enum CallState { CREATE, PROCESS, MORE, FINISH, DELETED }; +const char *call_states[] = {"CREATE", "PROCESS", "MORE", "FINISH", "DELETED"}; + +struct candidate { + uint64_t id; + struct nb_config *config; + struct nb_transaction *transaction; }; -enum CallStatus { CREATE, PROCESS, FINISH }; +class Candidates +{ + public: + ~Candidates(void) + { + // Delete candidates. + for (auto it = _cdb.begin(); it != _cdb.end(); it++) + delete_candidate(&it->second); + } + + struct candidate *create_candidate(void) + { + uint64_t id = ++_next_id; + assert(id); // TODO: implement an algorithm for unique reusable + // IDs. + struct candidate *c = &_cdb[id]; + c->id = id; + c->config = nb_config_dup(running_config); + c->transaction = NULL; + + return c; + } + + void delete_candidate(struct candidate *c) + { + char errmsg[BUFSIZ] = {0}; + + _cdb.erase(c->id); + nb_config_free(c->config); + if (c->transaction) + nb_candidate_commit_abort(c->transaction, errmsg, + sizeof(errmsg)); + } + + struct candidate *get_candidate(uint32_t id) + { + return _cdb.count(id) == 0 ? NULL : &_cdb[id]; + } + + private: + uint64_t _next_id = 0; + std::map<uint32_t, struct candidate> _cdb; +}; -/* Thanks gooble */ class RpcStateBase { public: - virtual void doCallback() = 0; + virtual CallState doCallback() = 0; + virtual void do_request(::frr::Northbound::AsyncService *service, + ::grpc::ServerCompletionQueue *cq) = 0; }; -class NorthboundImpl; - -template <typename Q, typename S> class RpcState : RpcStateBase +/* + * The RPC state class is used to track the execution of an RPC. + */ +template <typename Q, typename S> class NewRpcState : RpcStateBase { + typedef void (frr::Northbound::AsyncService::*reqfunc_t)( + ::grpc::ServerContext *, Q *, + ::grpc::ServerAsyncResponseWriter<S> *, + ::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *, + void *); + typedef void (frr::Northbound::AsyncService::*reqsfunc_t)( + ::grpc::ServerContext *, Q *, ::grpc::ServerAsyncWriter<S> *, + ::grpc::CompletionQueue *, ::grpc::ServerCompletionQueue *, + void *); + public: - RpcState(NorthboundImpl *svc, - void (NorthboundImpl::*cb)(RpcState<Q, S> *)) - : callback(cb), responder(&ctx), async_responder(&ctx), - service(svc){}; + NewRpcState(Candidates *cdb, reqfunc_t rfunc, + void (*cb)(NewRpcState<Q, S> *), const char *name) + : requestf(rfunc), callback(cb), responder(&ctx), + async_responder(&ctx), name(name), cdb(cdb){}; + NewRpcState(Candidates *cdb, reqsfunc_t rfunc, + void (*cb)(NewRpcState<Q, S> *), const char *name) + : requestsf(rfunc), callback(cb), responder(&ctx), + async_responder(&ctx), name(name), cdb(cdb){}; + + CallState doCallback() override + { + CallState enter_state = this->state; + CallState new_state; + if (enter_state == FINISH) { + grpc_debug("%s RPC FINISH -> DELETED", name); + new_state = FINISH; + } else { + grpc_debug("%s RPC: %s -> PROCESS", name, + call_states[this->state]); + new_state = PROCESS; + } + /* + * We are either in state CREATE, MORE or FINISH. If CREATE or + * MORE move back to PROCESS, otherwise we are cleaning up + * (FINISH) so leave it in that state. Run the callback on the + * main threadmaster/pthread; and wait for expected transition + * from main thread. If transition is to FINISH->DELETED. + * delete us. + * + * We update the state prior to scheduling the callback which + * may then update the state in the master pthread. Then we + * obtain the lock in the condvar-check-loop as the callback + * will be modifying updating the state value. + */ + this->state = new_state; + thread_add_event(main_master, c_callback, (void *)this, 0, + NULL); + pthread_mutex_lock(&this->cmux); + while (this->state == new_state) + pthread_cond_wait(&this->cond, &this->cmux); + pthread_mutex_unlock(&this->cmux); + + if (this->state == DELETED) { + grpc_debug("%s RPC: -> [DELETED]", name); + delete this; + return DELETED; + } + return this->state; + } - void doCallback() override + void do_request(::frr::Northbound::AsyncService *service, + ::grpc::ServerCompletionQueue *cq) override { - (service->*callback)(this); + grpc_debug("%s, posting a request for: %s", __func__, name); + if (requestf) { + NewRpcState<Q, S> *copy = + new NewRpcState(cdb, requestf, callback, name); + (service->*requestf)(©->ctx, ©->request, + ©->responder, cq, cq, copy); + } else { + NewRpcState<Q, S> *copy = + new NewRpcState(cdb, requestsf, callback, name); + (service->*requestsf)(©->ctx, ©->request, + ©->async_responder, cq, cq, + copy); + } + } + + + static int c_callback(struct thread *thread) + { + auto _tag = static_cast<NewRpcState<Q, S> *>(thread->arg); + /* + * We hold the lock until the callback finishes and has updated + * _tag->state, then we signal done and release. + */ + pthread_mutex_lock(&_tag->cmux); + + CallState enter_state = _tag->state; + grpc_debug("%s RPC running on main thread", _tag->name); + + _tag->callback(_tag); + + grpc_debug("%s RPC: %s -> %s", _tag->name, + call_states[enter_state], call_states[_tag->state]); + + pthread_cond_signal(&_tag->cond); + pthread_mutex_unlock(&_tag->cmux); + return 0; } + NewRpcState<Q, S> *orig; + const char *name; grpc::ServerContext ctx; Q request; S response; grpc::ServerAsyncResponseWriter<S> responder; grpc::ServerAsyncWriter<S> async_responder; - NorthboundImpl *service; - void (NorthboundImpl::*callback)(RpcState<Q, S> *); + Candidates *cdb; + void (*callback)(NewRpcState<Q, S> *); + reqfunc_t requestf; + reqsfunc_t requestsf; + pthread_mutex_t cmux = PTHREAD_MUTEX_INITIALIZER; + pthread_cond_t cond = PTHREAD_COND_INITIALIZER; void *context; - CallStatus state = CREATE; + + CallState state = CREATE; }; -#define REQUEST_RPC(NAME) \ - do { \ - auto _rpcState = \ - new RpcState<frr::NAME##Request, frr::NAME##Response>( \ - this, &NorthboundImpl::Handle##NAME); \ - _service->Request##NAME(&_rpcState->ctx, &_rpcState->request, \ - &_rpcState->responder, _cq, _cq, \ - _rpcState); \ - } while (0) +// ------------------------------------------------------ +// Utility Functions +// ------------------------------------------------------ -#define REQUEST_RPC_STREAMING(NAME) \ - do { \ - auto _rpcState = \ - new RpcState<frr::NAME##Request, frr::NAME##Response>( \ - this, &NorthboundImpl::Handle##NAME); \ - _service->Request##NAME(&_rpcState->ctx, &_rpcState->request, \ - &_rpcState->async_responder, _cq, _cq, \ - _rpcState); \ - } while (0) +static LYD_FORMAT encoding2lyd_format(enum frr::Encoding encoding) +{ + switch (encoding) { + case frr::JSON: + return LYD_JSON; + case frr::XML: + return LYD_XML; + default: + flog_err(EC_LIB_DEVELOPMENT, + "%s: unknown data encoding format (%u)", __func__, + encoding); + exit(1); + } +} -class NorthboundImpl +static int yang_dnode_edit(struct lyd_node *dnode, const std::string &path, + const std::string &value) { - public: - NorthboundImpl(void) - { - _nextCandidateId = 0; - _service = new frr::Northbound::AsyncService(); + LY_ERR err = lyd_new_path(dnode, ly_native_ctx, path.c_str(), + value.c_str(), LYD_NEW_PATH_UPDATE, &dnode); + if (err != LY_SUCCESS) { + flog_warn(EC_LIB_LIBYANG, "%s: lyd_new_path() failed: %s", + __func__, ly_errmsg(ly_native_ctx)); + return -1; } - ~NorthboundImpl(void) - { - // Delete candidates. - for (auto it = _candidates.begin(); it != _candidates.end(); - it++) - delete_candidate(&it->second); + return 0; +} + +static int yang_dnode_delete(struct lyd_node *dnode, const std::string &path) +{ + dnode = yang_dnode_get(dnode, path.c_str()); + if (!dnode) + return -1; + + lyd_free_tree(dnode); + + return 0; +} + +static LY_ERR data_tree_from_dnode(frr::DataTree *dt, + const struct lyd_node *dnode, + LYD_FORMAT lyd_format, bool with_defaults) +{ + char *strp; + int options = 0; + + SET_FLAG(options, LYD_PRINT_WITHSIBLINGS); + if (with_defaults) + SET_FLAG(options, LYD_PRINT_WD_ALL); + else + SET_FLAG(options, LYD_PRINT_WD_TRIM); + + LY_ERR err = lyd_print_mem(&strp, dnode, lyd_format, options); + if (err == LY_SUCCESS) { + if (strp) { + dt->set_data(strp); + free(strp); + } } + return err; +} - void Run(unsigned long port) - { - grpc::ServerBuilder builder; - std::stringstream server_address; - - server_address << "0.0.0.0:" << port; - - builder.AddListeningPort(server_address.str(), - grpc::InsecureServerCredentials()); - builder.RegisterService(_service); - - auto cq = builder.AddCompletionQueue(); - _cq = cq.get(); - auto _server = builder.BuildAndStart(); - - /* Schedule all RPC handlers */ - REQUEST_RPC(GetCapabilities); - REQUEST_RPC(CreateCandidate); - REQUEST_RPC(DeleteCandidate); - REQUEST_RPC(UpdateCandidate); - REQUEST_RPC(EditCandidate); - REQUEST_RPC(LoadToCandidate); - REQUEST_RPC(Commit); - REQUEST_RPC(GetTransaction); - REQUEST_RPC(LockConfig); - REQUEST_RPC(UnlockConfig); - REQUEST_RPC(Execute); - REQUEST_RPC_STREAMING(Get); - REQUEST_RPC_STREAMING(ListTransactions); - - zlog_notice("gRPC server listening on %s", - server_address.str().c_str()); - - /* Process inbound RPCs */ - void *tag; - bool ok; - while (true) { - _cq->Next(&tag, &ok); - GPR_ASSERT(ok); - static_cast<RpcStateBase *>(tag)->doCallback(); - tag = nullptr; +static struct lyd_node *dnode_from_data_tree(const frr::DataTree *dt, + bool config_only) +{ + struct lyd_node *dnode; + int options, opt2; + LY_ERR err; + + if (config_only) { + options = LYD_PARSE_NO_STATE; + opt2 = LYD_VALIDATE_NO_STATE; + } else { + options = LYD_PARSE_STRICT; + opt2 = 0; + } + + err = lyd_parse_data_mem(ly_native_ctx, dt->data().c_str(), + encoding2lyd_format(dt->encoding()), options, + opt2, &dnode); + if (err != LY_SUCCESS) { + flog_warn(EC_LIB_LIBYANG, "%s: lyd_parse_mem() failed: %s", + __func__, ly_errmsg(ly_native_ctx)); + } + return dnode; +} + +static struct lyd_node *get_dnode_config(const std::string &path) +{ + struct lyd_node *dnode; + + dnode = yang_dnode_get(running_config->dnode, + path.empty() ? NULL : path.c_str()); + if (dnode) + dnode = yang_dnode_dup(dnode); + + return dnode; +} + +static int get_oper_data_cb(const struct lysc_node *snode, + struct yang_translator *translator, + struct yang_data *data, void *arg) +{ + struct lyd_node *dnode = static_cast<struct lyd_node *>(arg); + int ret = yang_dnode_edit(dnode, data->xpath, data->value); + yang_data_free(data); + + return (ret == 0) ? NB_OK : NB_ERR; +} + +static struct lyd_node *get_dnode_state(const std::string &path) +{ + struct lyd_node *dnode = yang_dnode_new(ly_native_ctx, false); + if (nb_oper_data_iterate(path.c_str(), NULL, 0, get_oper_data_cb, dnode) + != NB_OK) { + yang_dnode_free(dnode); + return NULL; + } + + return dnode; +} + +static grpc::Status get_path(frr::DataTree *dt, const std::string &path, + int type, LYD_FORMAT lyd_format, + bool with_defaults) +{ + struct lyd_node *dnode_config = NULL; + struct lyd_node *dnode_state = NULL; + struct lyd_node *dnode_final; + + // Configuration data. + if (type == frr::GetRequest_DataType_ALL + || type == frr::GetRequest_DataType_CONFIG) { + dnode_config = get_dnode_config(path); + if (!dnode_config) + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + "Data path not found"); + } + + // Operational data. + if (type == frr::GetRequest_DataType_ALL + || type == frr::GetRequest_DataType_STATE) { + dnode_state = get_dnode_state(path); + if (!dnode_state) { + if (dnode_config) + yang_dnode_free(dnode_config); + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + "Failed to fetch operational data"); + } + } + + switch (type) { + case frr::GetRequest_DataType_ALL: + // + // Combine configuration and state data into a single + // dnode. + // + if (lyd_merge_siblings(&dnode_state, dnode_config, + LYD_MERGE_DESTRUCT) + != LY_SUCCESS) { + yang_dnode_free(dnode_state); + yang_dnode_free(dnode_config); + return grpc::Status( + grpc::StatusCode::INTERNAL, + "Failed to merge configuration and state data", + ly_errmsg(ly_native_ctx)); } + + dnode_final = dnode_state; + break; + case frr::GetRequest_DataType_CONFIG: + dnode_final = dnode_config; + break; + case frr::GetRequest_DataType_STATE: + dnode_final = dnode_state; + break; } - void HandleGetCapabilities(RpcState<frr::GetCapabilitiesRequest, + // Validate data to create implicit default nodes if necessary. + int validate_opts = 0; + if (type == frr::GetRequest_DataType_CONFIG) + validate_opts = LYD_VALIDATE_NO_STATE; + else + validate_opts = 0; + + LY_ERR err = lyd_validate_all(&dnode_final, ly_native_ctx, + validate_opts, NULL); + + if (err) + flog_warn(EC_LIB_LIBYANG, "%s: lyd_validate_all() failed: %s", + __func__, ly_errmsg(ly_native_ctx)); + // Dump data using the requested format. + if (!err) + err = data_tree_from_dnode(dt, dnode_final, lyd_format, + with_defaults); + yang_dnode_free(dnode_final); + if (err) + return grpc::Status(grpc::StatusCode::INTERNAL, + "Failed to dump data"); + return grpc::Status::OK; +} + + +// ------------------------------------------------------ +// RPC Callback Functions: run on main thread +// ------------------------------------------------------ + +void HandleUnaryGetCapabilities(NewRpcState<frr::GetCapabilitiesRequest, frr::GetCapabilitiesResponse> *tag) - { - switch (tag->state) { - case CREATE: - REQUEST_RPC(GetCapabilities); - tag->state = PROCESS; - case PROCESS: { - if (nb_dbg_client_grpc) - zlog_debug("received RPC GetCapabilities()"); - - // Response: string frr_version = 1; - tag->response.set_frr_version(FRR_VERSION); - - // Response: bool rollback_support = 2; +{ + grpc_debug("%s: state: %s", __func__, call_states[tag->state]); + + if (tag->state == FINISH) { + tag->state = DELETED; + return; + } + + // Response: string frr_version = 1; + tag->response.set_frr_version(FRR_VERSION); + + // Response: bool rollback_support = 2; #ifdef HAVE_CONFIG_ROLLBACKS - tag->response.set_rollback_support(true); + tag->response.set_rollback_support(true); #else - tag->response.set_rollback_support(false); + tag->response.set_rollback_support(false); #endif + // Response: repeated ModuleData supported_modules = 3; + struct yang_module *module; + RB_FOREACH (module, yang_modules, &yang_modules) { + auto m = tag->response.add_supported_modules(); + + m->set_name(module->name); + if (module->info->revision) + m->set_revision(module->info->revision); + m->set_organization(module->info->org); + } - // Response: repeated ModuleData supported_modules = 3; - struct yang_module *module; - RB_FOREACH (module, yang_modules, &yang_modules) { - auto m = tag->response.add_supported_modules(); + // Response: repeated Encoding supported_encodings = 4; + tag->response.add_supported_encodings(frr::JSON); + tag->response.add_supported_encodings(frr::XML); - m->set_name(module->name); - if (module->info->revision) - m->set_revision(module->info->revision); - m->set_organization(module->info->org); - } + /* Should we do this in the async process call? */ + tag->responder.Finish(tag->response, grpc::Status::OK, tag); - // Response: repeated Encoding supported_encodings = 4; - tag->response.add_supported_encodings(frr::JSON); - tag->response.add_supported_encodings(frr::XML); + /* Indicate we are done. */ + tag->state = FINISH; +} - tag->responder.Finish(tag->response, grpc::Status::OK, - tag); - tag->state = FINISH; - break; - } - case FINISH: - delete tag; - } +void HandleStreamingGet(NewRpcState<frr::GetRequest, frr::GetResponse> *tag) +{ + grpc_debug("%s: state: %s", __func__, call_states[tag->state]); + + if (tag->state == FINISH) { + delete static_cast<std::list<std::string> *>(tag->context); + tag->state = DELETED; + return; } - void HandleGet(RpcState<frr::GetRequest, frr::GetResponse> *tag) - { - switch (tag->state) { - case CREATE: { - auto mypaths = new std::list<std::string>(); - tag->context = mypaths; - auto paths = tag->request.path(); - for (const std::string &path : paths) { - mypaths->push_back(std::string(path)); - } - REQUEST_RPC_STREAMING(Get); - tag->state = PROCESS; - } - case PROCESS: { - // Request: DataType type = 1; - int type = tag->request.type(); - // Request: Encoding encoding = 2; - frr::Encoding encoding = tag->request.encoding(); - // Request: bool with_defaults = 3; - bool with_defaults = tag->request.with_defaults(); - - if (nb_dbg_client_grpc) - zlog_debug( - "received RPC Get(type: %u, encoding: %u, with_defaults: %u)", - type, encoding, with_defaults); - - auto mypaths = static_cast<std::list<std::string> *>( - tag->context); - - if (mypaths->empty()) { - tag->async_responder.Finish(grpc::Status::OK, - tag); - tag->state = FINISH; - return; - } - - - frr::GetResponse response; - grpc::Status status; - - // Response: int64 timestamp = 1; - response.set_timestamp(time(NULL)); - - // Response: DataTree data = 2; - auto *data = response.mutable_data(); - data->set_encoding(tag->request.encoding()); - status = get_path(data, mypaths->back().c_str(), type, - encoding2lyd_format(encoding), - with_defaults); - - // Something went wrong... - if (!status.ok()) { - tag->async_responder.WriteAndFinish( - response, grpc::WriteOptions(), status, - tag); - tag->state = FINISH; - return; - } - - mypaths->pop_back(); - - tag->async_responder.Write(response, tag); - - break; + if (!tag->context) { + /* Creating, first time called for this RPC */ + auto mypaths = new std::list<std::string>(); + tag->context = mypaths; + auto paths = tag->request.path(); + for (const std::string &path : paths) { + mypaths->push_back(std::string(path)); } - case FINISH: - if (nb_dbg_client_grpc) - zlog_debug("received RPC Get() end"); + } - delete static_cast<std::list<std::string> *>( - tag->context); - delete tag; - } + // Request: DataType type = 1; + int type = tag->request.type(); + // Request: Encoding encoding = 2; + frr::Encoding encoding = tag->request.encoding(); + // Request: bool with_defaults = 3; + bool with_defaults = tag->request.with_defaults(); + + auto mypathps = static_cast<std::list<std::string> *>(tag->context); + if (mypathps->empty()) { + tag->async_responder.Finish(grpc::Status::OK, tag); + tag->state = FINISH; + return; } - void HandleCreateCandidate(RpcState<frr::CreateCandidateRequest, + frr::GetResponse response; + grpc::Status status; + + // Response: int64 timestamp = 1; + response.set_timestamp(time(NULL)); + + // Response: DataTree data = 2; + auto *data = response.mutable_data(); + data->set_encoding(tag->request.encoding()); + status = get_path(data, mypathps->back().c_str(), type, + encoding2lyd_format(encoding), with_defaults); + + if (!status.ok()) { + tag->async_responder.WriteAndFinish( + response, grpc::WriteOptions(), status, tag); + tag->state = FINISH; + return; + } + + mypathps->pop_back(); + if (mypathps->empty()) { + tag->async_responder.WriteAndFinish( + response, grpc::WriteOptions(), grpc::Status::OK, tag); + tag->state = FINISH; + } else { + tag->async_responder.Write(response, tag); + tag->state = MORE; + } +} + +void HandleUnaryCreateCandidate(NewRpcState<frr::CreateCandidateRequest, frr::CreateCandidateResponse> *tag) - { - switch (tag->state) { - case CREATE: - REQUEST_RPC(CreateCandidate); - tag->state = PROCESS; - case PROCESS: { - if (nb_dbg_client_grpc) - zlog_debug("received RPC CreateCandidate()"); - - struct candidate *candidate = create_candidate(); - if (!candidate) { - tag->responder.Finish( - tag->response, - grpc::Status( - grpc::StatusCode:: - RESOURCE_EXHAUSTED, - "Can't create candidate configuration"), - tag); - } else { - tag->response.set_candidate_id(candidate->id); - tag->responder.Finish(tag->response, - grpc::Status::OK, tag); - } +{ + grpc_debug("%s: state: %s", __func__, call_states[tag->state]); - tag->state = FINISH; + if (tag->state == FINISH) { + tag->state = DELETED; + return; + } - break; - } - case FINISH: - delete tag; - } + struct candidate *candidate = tag->cdb->create_candidate(); + if (!candidate) { + tag->responder.Finish( + tag->response, + grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED, + "Can't create candidate configuration"), + tag); + } else { + tag->response.set_candidate_id(candidate->id); + tag->responder.Finish(tag->response, grpc::Status::OK, tag); } - void HandleDeleteCandidate(RpcState<frr::DeleteCandidateRequest, + tag->state = FINISH; +} + +void HandleUnaryDeleteCandidate(NewRpcState<frr::DeleteCandidateRequest, frr::DeleteCandidateResponse> *tag) - { - switch (tag->state) { - case CREATE: - REQUEST_RPC(DeleteCandidate); - tag->state = PROCESS; - case PROCESS: { - - // Request: uint32 candidate_id = 1; - uint32_t candidate_id = tag->request.candidate_id(); - - if (nb_dbg_client_grpc) - zlog_debug( - "received RPC DeleteCandidate(candidate_id: %u)", - candidate_id); - - struct candidate *candidate = - get_candidate(candidate_id); - if (!candidate) { - tag->responder.Finish( - tag->response, - grpc::Status( - grpc::StatusCode::NOT_FOUND, - "candidate configuration not found"), - tag); - tag->state = FINISH; - return; - } else { - delete_candidate(candidate); - tag->responder.Finish(tag->response, - grpc::Status::OK, tag); - tag->state = FINISH; - return; - } - tag->state = FINISH; - break; - } - case FINISH: - delete tag; - } +{ + grpc_debug("%s: state: %s", __func__, call_states[tag->state]); + + if (tag->state == FINISH) { + tag->state = DELETED; + return; } - void HandleUpdateCandidate(RpcState<frr::UpdateCandidateRequest, + // Request: uint32 candidate_id = 1; + uint32_t candidate_id = tag->request.candidate_id(); + + grpc_debug("%s(candidate_id: %u)", __func__, candidate_id); + + struct candidate *candidate = tag->cdb->get_candidate(candidate_id); + if (!candidate) { + tag->responder.Finish( + tag->response, + grpc::Status(grpc::StatusCode::NOT_FOUND, + "candidate configuration not found"), + tag); + } else { + tag->cdb->delete_candidate(candidate); + tag->responder.Finish(tag->response, grpc::Status::OK, tag); + } + tag->state = FINISH; +} + +void HandleUnaryUpdateCandidate(NewRpcState<frr::UpdateCandidateRequest, frr::UpdateCandidateResponse> *tag) - { - switch (tag->state) { - case CREATE: - REQUEST_RPC(UpdateCandidate); - tag->state = PROCESS; - case PROCESS: { - - // Request: uint32 candidate_id = 1; - uint32_t candidate_id = tag->request.candidate_id(); - - if (nb_dbg_client_grpc) - zlog_debug( - "received RPC UpdateCandidate(candidate_id: %u)", - candidate_id); - - struct candidate *candidate = - get_candidate(candidate_id); - - if (!candidate) - tag->responder.Finish( - tag->response, - grpc::Status( - grpc::StatusCode::NOT_FOUND, - "candidate configuration not found"), - tag); - else if (candidate->transaction) - tag->responder.Finish( - tag->response, - grpc::Status( - grpc::StatusCode:: - FAILED_PRECONDITION, - "candidate is in the middle of a transaction"), - tag); - else if (nb_candidate_update(candidate->config) - != NB_OK) - tag->responder.Finish( - tag->response, - grpc::Status( - grpc::StatusCode::INTERNAL, - "failed to update candidate configuration"), - tag); - - else - tag->responder.Finish(tag->response, - grpc::Status::OK, tag); +{ + grpc_debug("%s: state: %s", __func__, call_states[tag->state]); - tag->state = FINISH; + if (tag->state == FINISH) { + tag->state = DELETED; + return; + } - break; - } - case FINISH: - delete tag; - } + // Request: uint32 candidate_id = 1; + uint32_t candidate_id = tag->request.candidate_id(); + + grpc_debug("%s(candidate_id: %u)", __func__, candidate_id); + + struct candidate *candidate = tag->cdb->get_candidate(candidate_id); + + if (!candidate) + tag->responder.Finish( + tag->response, + grpc::Status(grpc::StatusCode::NOT_FOUND, + "candidate configuration not found"), + tag); + else if (candidate->transaction) + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode::FAILED_PRECONDITION, + "candidate is in the middle of a transaction"), + tag); + else if (nb_candidate_update(candidate->config) != NB_OK) + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode::INTERNAL, + "failed to update candidate configuration"), + tag); + + else + tag->responder.Finish(tag->response, grpc::Status::OK, tag); + + tag->state = FINISH; +} + +void HandleUnaryEditCandidate( + NewRpcState<frr::EditCandidateRequest, frr::EditCandidateResponse> *tag) +{ + grpc_debug("%s: state: %s", __func__, call_states[tag->state]); + + if (tag->state == FINISH) { + tag->state = DELETED; + return; } - void HandleEditCandidate(RpcState<frr::EditCandidateRequest, - frr::EditCandidateResponse> *tag) - { - switch (tag->state) { - case CREATE: - REQUEST_RPC(EditCandidate); - tag->state = PROCESS; - case PROCESS: { - - // Request: uint32 candidate_id = 1; - uint32_t candidate_id = tag->request.candidate_id(); - - if (nb_dbg_client_grpc) - zlog_debug( - "received RPC EditCandidate(candidate_id: %u)", - candidate_id); - - struct candidate *candidate = - get_candidate(candidate_id); - - if (!candidate) { - tag->responder.Finish( - tag->response, - grpc::Status( - grpc::StatusCode::NOT_FOUND, - "candidate configuration not found"), - tag); - tag->state = FINISH; - break; - } - - struct nb_config *candidate_tmp = - nb_config_dup(candidate->config); - - auto pvs = tag->request.update(); - for (const frr::PathValue &pv : pvs) { - if (yang_dnode_edit(candidate_tmp->dnode, - pv.path(), pv.value()) - != 0) { - nb_config_free(candidate_tmp); - - tag->responder.Finish( - tag->response, - grpc::Status( - grpc::StatusCode:: - INVALID_ARGUMENT, - "Failed to update \"" - + pv.path() - + "\""), - tag); - - tag->state = FINISH; - return; - } - } - - pvs = tag->request.delete_(); - for (const frr::PathValue &pv : pvs) { - if (yang_dnode_delete(candidate_tmp->dnode, - pv.path()) - != 0) { - nb_config_free(candidate_tmp); - tag->responder.Finish( - tag->response, - grpc::Status( - grpc::StatusCode:: - INVALID_ARGUMENT, - "Failed to remove \"" - + pv.path() - + "\""), - tag); - tag->state = FINISH; - return; - } - } - - // No errors, accept all changes. - nb_config_replace(candidate->config, candidate_tmp, - false); - - tag->responder.Finish(tag->response, grpc::Status::OK, - tag); + // Request: uint32 candidate_id = 1; + uint32_t candidate_id = tag->request.candidate_id(); - tag->state = FINISH; + grpc_debug("%s(candidate_id: %u)", __func__, candidate_id); - break; - } - case FINISH: - delete tag; - } + struct candidate *candidate = tag->cdb->get_candidate(candidate_id); + + if (!candidate) { + tag->responder.Finish( + tag->response, + grpc::Status(grpc::StatusCode::NOT_FOUND, + "candidate configuration not found"), + tag); + tag->state = FINISH; + return; } - void HandleLoadToCandidate(RpcState<frr::LoadToCandidateRequest, - frr::LoadToCandidateResponse> *tag) - { - switch (tag->state) { - case CREATE: - REQUEST_RPC(LoadToCandidate); - tag->state = PROCESS; - case PROCESS: { - // Request: uint32 candidate_id = 1; - uint32_t candidate_id = tag->request.candidate_id(); - - if (nb_dbg_client_grpc) - zlog_debug( - "received RPC LoadToCandidate(candidate_id: %u)", - candidate_id); - - // Request: LoadType type = 2; - int load_type = tag->request.type(); - // Request: DataTree config = 3; - auto config = tag->request.config(); - - - struct candidate *candidate = - get_candidate(candidate_id); - - if (!candidate) { - tag->responder.Finish( - tag->response, - grpc::Status( - grpc::StatusCode::NOT_FOUND, - "candidate configuration not found"), - tag); - tag->state = FINISH; - return; - } - - struct lyd_node *dnode = - dnode_from_data_tree(&config, true); - if (!dnode) { - tag->responder.Finish( - tag->response, - grpc::Status( - grpc::StatusCode::INTERNAL, - "Failed to parse the configuration"), - tag); - tag->state = FINISH; - return; - } - - struct nb_config *loaded_config = nb_config_new(dnode); - - if (load_type == frr::LoadToCandidateRequest::REPLACE) - nb_config_replace(candidate->config, - loaded_config, false); - else if (nb_config_merge(candidate->config, - loaded_config, false) - != NB_OK) { - tag->responder.Finish( - tag->response, - grpc::Status( - grpc::StatusCode::INTERNAL, - "Failed to merge the loaded configuration"), - tag); - tag->state = FINISH; - return; - } - - tag->responder.Finish(tag->response, grpc::Status::OK, - tag); + struct nb_config *candidate_tmp = nb_config_dup(candidate->config); + + auto pvs = tag->request.update(); + for (const frr::PathValue &pv : pvs) { + if (yang_dnode_edit(candidate_tmp->dnode, pv.path(), pv.value()) + != 0) { + nb_config_free(candidate_tmp); + + tag->responder.Finish( + tag->response, + grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + "Failed to update \"" + pv.path() + + "\""), + tag); + tag->state = FINISH; - break; - } - case FINISH: - delete tag; + return; } } - void - HandleCommit(RpcState<frr::CommitRequest, frr::CommitResponse> *tag) - { - switch (tag->state) { - case CREATE: - REQUEST_RPC(Commit); - tag->state = PROCESS; - case PROCESS: { - // Request: uint32 candidate_id = 1; - uint32_t candidate_id = tag->request.candidate_id(); - if (nb_dbg_client_grpc) - zlog_debug( - "received RPC Commit(candidate_id: %u)", - candidate_id); - - // Request: Phase phase = 2; - int phase = tag->request.phase(); - // Request: string comment = 3; - const std::string comment = tag->request.comment(); - - // Find candidate configuration. - struct candidate *candidate = - get_candidate(candidate_id); - if (!candidate) { - tag->responder.Finish( - tag->response, - grpc::Status( - grpc::StatusCode::NOT_FOUND, - "candidate configuration not found"), - tag); - tag->state = FINISH; - return; - } - - int ret = NB_OK; - uint32_t transaction_id = 0; - - // Check for misuse of the two-phase commit protocol. - switch (phase) { - case frr::CommitRequest::PREPARE: - case frr::CommitRequest::ALL: - if (candidate->transaction) { - tag->responder.Finish( - tag->response, - grpc::Status( - grpc::StatusCode:: - FAILED_PRECONDITION, - "candidate is in the middle of a transaction"), - tag); - tag->state = FINISH; - return; - } - break; - case frr::CommitRequest::ABORT: - case frr::CommitRequest::APPLY: - if (!candidate->transaction) { - tag->responder.Finish( - tag->response, - grpc::Status( - grpc::StatusCode:: - FAILED_PRECONDITION, - "no transaction in progress"), - tag); - tag->state = FINISH; - return; - } - break; - default: - break; - } - - - // Execute the user request. - struct nb_context context = {}; - context.client = NB_CLIENT_GRPC; - char errmsg[BUFSIZ] = {0}; - - switch (phase) { - case frr::CommitRequest::VALIDATE: - zlog_debug("`-> Performing VALIDATE"); - ret = nb_candidate_validate( - &context, candidate->config, errmsg, - sizeof(errmsg)); - break; - case frr::CommitRequest::PREPARE: - zlog_debug("`-> Performing PREPARE"); - ret = nb_candidate_commit_prepare( - &context, candidate->config, - comment.c_str(), - &candidate->transaction, errmsg, - sizeof(errmsg)); - break; - case frr::CommitRequest::ABORT: - zlog_debug("`-> Performing ABORT"); - nb_candidate_commit_abort( - candidate->transaction, errmsg, - sizeof(errmsg)); - break; - case frr::CommitRequest::APPLY: - zlog_debug("`-> Performing ABORT"); - nb_candidate_commit_apply( - candidate->transaction, true, - &transaction_id, errmsg, - sizeof(errmsg)); - break; - case frr::CommitRequest::ALL: - zlog_debug("`-> Performing ALL"); - ret = nb_candidate_commit( - &context, candidate->config, true, - comment.c_str(), &transaction_id, - errmsg, sizeof(errmsg)); - break; - } - - // Map northbound error codes to gRPC status codes. - grpc::Status status; - switch (ret) { - case NB_OK: - status = grpc::Status::OK; - break; - case NB_ERR_NO_CHANGES: - status = grpc::Status(grpc::StatusCode::ABORTED, - errmsg); - break; - case NB_ERR_LOCKED: - status = grpc::Status( - grpc::StatusCode::UNAVAILABLE, errmsg); - break; - case NB_ERR_VALIDATION: - status = grpc::Status( - grpc::StatusCode::INVALID_ARGUMENT, - errmsg); - break; - case NB_ERR_RESOURCE: - status = grpc::Status( - grpc::StatusCode::RESOURCE_EXHAUSTED, - errmsg); - break; - case NB_ERR: - default: - status = grpc::Status( - grpc::StatusCode::INTERNAL, errmsg); - break; - } - - if (nb_dbg_client_grpc) - zlog_debug("`-> Result: %s (message: '%s')", - nb_err_name((enum nb_error)ret), - errmsg); - - if (ret == NB_OK) { - // Response: uint32 transaction_id = 1; - if (transaction_id) - tag->response.set_transaction_id( - transaction_id); - } - if (strlen(errmsg) > 0) - tag->response.set_error_message(errmsg); - - tag->responder.Finish(tag->response, status, tag); + pvs = tag->request.delete_(); + for (const frr::PathValue &pv : pvs) { + if (yang_dnode_delete(candidate_tmp->dnode, pv.path()) != 0) { + nb_config_free(candidate_tmp); + tag->responder.Finish( + tag->response, + grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + "Failed to remove \"" + pv.path() + + "\""), + tag); tag->state = FINISH; - break; - } - case FINISH: - delete tag; + return; } } - void - HandleListTransactions(RpcState<frr::ListTransactionsRequest, - frr::ListTransactionsResponse> *tag) - { - switch (tag->state) { - case CREATE: - REQUEST_RPC_STREAMING(ListTransactions); - tag->context = new std::list<std::tuple< - int, std::string, std::string, std::string>>(); - nb_db_transactions_iterate(list_transactions_cb, - tag->context); - tag->state = PROCESS; - case PROCESS: { - if (nb_dbg_client_grpc) - zlog_debug("received RPC ListTransactions()"); - - auto list = static_cast<std::list<std::tuple< - int, std::string, std::string, std::string>> *>( - tag->context); - if (list->empty()) { - tag->async_responder.Finish(grpc::Status::OK, - tag); - tag->state = FINISH; - return; - } - auto item = list->back(); - - - frr::ListTransactionsResponse response; - - // Response: uint32 id = 1; - response.set_id(std::get<0>(item)); - - // Response: string client = 2; - response.set_client(std::get<1>(item).c_str()); - - // Response: string date = 3; - response.set_date(std::get<2>(item).c_str()); - - // Response: string comment = 4; - response.set_comment(std::get<3>(item).c_str()); - - list->pop_back(); - - tag->async_responder.Write(response, tag); - break; - } - case FINISH: - delete static_cast<std::list<std::tuple< - int, std::string, std::string, std::string>> *>( - tag->context); - delete tag; - } + // No errors, accept all changes. + nb_config_replace(candidate->config, candidate_tmp, false); + + tag->responder.Finish(tag->response, grpc::Status::OK, tag); + + tag->state = FINISH; +} + +void HandleUnaryLoadToCandidate(NewRpcState<frr::LoadToCandidateRequest, + frr::LoadToCandidateResponse> *tag) +{ + grpc_debug("%s: state: %s", __func__, call_states[tag->state]); + + if (tag->state == FINISH) { + tag->state = DELETED; + return; } - void HandleGetTransaction(RpcState<frr::GetTransactionRequest, - frr::GetTransactionResponse> *tag) - { - switch (tag->state) { - case CREATE: - REQUEST_RPC(GetTransaction); - tag->state = PROCESS; - case PROCESS: { - // Request: uint32 transaction_id = 1; - uint32_t transaction_id = tag->request.transaction_id(); - // Request: Encoding encoding = 2; - frr::Encoding encoding = tag->request.encoding(); - // Request: bool with_defaults = 3; - bool with_defaults = tag->request.with_defaults(); - - if (nb_dbg_client_grpc) - zlog_debug( - "received RPC GetTransaction(transaction_id: %u, encoding: %u)", - transaction_id, encoding); - - struct nb_config *nb_config; - - // Load configuration from the transactions database. - nb_config = nb_db_transaction_load(transaction_id); - if (!nb_config) { - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode:: - INVALID_ARGUMENT, - "Transaction not found"), - tag); - tag->state = FINISH; - return; - } - - // Response: DataTree config = 1; - auto config = tag->response.mutable_config(); - config->set_encoding(encoding); - - // Dump data using the requested format. - if (data_tree_from_dnode(config, nb_config->dnode, - encoding2lyd_format(encoding), - with_defaults) - != 0) { - nb_config_free(nb_config); - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::INTERNAL, - "Failed to dump data"), - tag); - tag->state = FINISH; - return; - } - - nb_config_free(nb_config); - - tag->responder.Finish(tag->response, grpc::Status::OK, - tag); - tag->state = FINISH; - break; - } - case FINISH: - delete tag; - } + // Request: uint32 candidate_id = 1; + uint32_t candidate_id = tag->request.candidate_id(); + + grpc_debug("%s(candidate_id: %u)", __func__, candidate_id); + + // Request: LoadType type = 2; + int load_type = tag->request.type(); + // Request: DataTree config = 3; + auto config = tag->request.config(); + + + struct candidate *candidate = tag->cdb->get_candidate(candidate_id); + + if (!candidate) { + tag->responder.Finish( + tag->response, + grpc::Status(grpc::StatusCode::NOT_FOUND, + "candidate configuration not found"), + tag); + tag->state = FINISH; + return; } - void HandleLockConfig( - RpcState<frr::LockConfigRequest, frr::LockConfigResponse> *tag) - { - switch (tag->state) { - case CREATE: - REQUEST_RPC(LockConfig); - tag->state = PROCESS; - case PROCESS: { - if (nb_dbg_client_grpc) - zlog_debug("received RPC LockConfig()"); - - if (nb_running_lock(NB_CLIENT_GRPC, NULL)) { - tag->responder.Finish( - tag->response, - grpc::Status( - grpc::StatusCode:: - FAILED_PRECONDITION, - "running configuration is locked already"), - tag); - tag->state = FINISH; - return; - } - - tag->responder.Finish(tag->response, grpc::Status::OK, - tag); - tag->state = FINISH; - break; - } - case FINISH: - delete tag; - } + struct lyd_node *dnode = dnode_from_data_tree(&config, true); + if (!dnode) { + tag->responder.Finish( + tag->response, + grpc::Status(grpc::StatusCode::INTERNAL, + "Failed to parse the configuration"), + tag); + tag->state = FINISH; + return; } - void HandleUnlockConfig(RpcState<frr::UnlockConfigRequest, - frr::UnlockConfigResponse> *tag) - { - switch (tag->state) { - case CREATE: - REQUEST_RPC(UnlockConfig); - tag->state = PROCESS; - case PROCESS: { - if (nb_dbg_client_grpc) - zlog_debug("received RPC UnlockConfig()"); - - if (nb_running_unlock(NB_CLIENT_GRPC, NULL)) { - tag->responder.Finish( - tag->response, - grpc::Status( - grpc::StatusCode:: - FAILED_PRECONDITION, - "failed to unlock the running configuration"), - tag); - tag->state = FINISH; - return; - } - - tag->responder.Finish(tag->response, grpc::Status::OK, - tag); - tag->state = FINISH; - break; - } - case FINISH: - delete tag; - } + struct nb_config *loaded_config = nb_config_new(dnode); + + if (load_type == frr::LoadToCandidateRequest::REPLACE) + nb_config_replace(candidate->config, loaded_config, false); + else if (nb_config_merge(candidate->config, loaded_config, false) + != NB_OK) { + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode::INTERNAL, + "Failed to merge the loaded configuration"), + tag); + tag->state = FINISH; + return; } - void - HandleExecute(RpcState<frr::ExecuteRequest, frr::ExecuteResponse> *tag) - { - struct nb_node *nb_node; - struct list *input_list; - struct list *output_list; - struct listnode *node; - struct yang_data *data; - const char *xpath; - char errmsg[BUFSIZ] = {0}; + tag->responder.Finish(tag->response, grpc::Status::OK, tag); + tag->state = FINISH; +} + +void HandleUnaryCommit( + NewRpcState<frr::CommitRequest, frr::CommitResponse> *tag) +{ + grpc_debug("%s: state: %s", __func__, call_states[tag->state]); + + if (tag->state == FINISH) { + tag->state = DELETED; + return; + } - switch (tag->state) { - case CREATE: - REQUEST_RPC(Execute); - tag->state = PROCESS; - case PROCESS: { - // Request: string path = 1; - xpath = tag->request.path().c_str(); - - if (nb_dbg_client_grpc) - zlog_debug("received RPC Execute(path: \"%s\")", - xpath); - - if (tag->request.path().empty()) { - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode:: - INVALID_ARGUMENT, - "Data path is empty"), - tag); - tag->state = FINISH; - return; - } - - nb_node = nb_node_find(xpath); - if (!nb_node) { - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode:: - INVALID_ARGUMENT, - "Unknown data path"), - tag); - tag->state = FINISH; - return; - } - - input_list = yang_data_list_new(); - output_list = yang_data_list_new(); - - // Read input parameters. - auto input = tag->request.input(); - for (const frr::PathValue &pv : input) { - // Request: repeated PathValue input = 2; - data = yang_data_new(pv.path().c_str(), - pv.value().c_str()); - listnode_add(input_list, data); - } - - // Execute callback registered for this XPath. - if (nb_callback_rpc(nb_node, xpath, input_list, - output_list, errmsg, sizeof(errmsg)) - != NB_OK) { - flog_warn(EC_LIB_NB_CB_RPC, - "%s: rpc callback failed: %s", - __func__, xpath); - list_delete(&input_list); - list_delete(&output_list); - - tag->responder.Finish( - tag->response, - grpc::Status(grpc::StatusCode::INTERNAL, - "RPC failed"), - tag); - tag->state = FINISH; - return; - } - - // Process output parameters. - for (ALL_LIST_ELEMENTS_RO(output_list, node, data)) { - // Response: repeated PathValue output = 1; - frr::PathValue *pv = tag->response.add_output(); - pv->set_path(data->xpath); - pv->set_value(data->value); - } - - // Release memory. - list_delete(&input_list); - list_delete(&output_list); - - tag->responder.Finish(tag->response, grpc::Status::OK, - tag); + // Request: uint32 candidate_id = 1; + uint32_t candidate_id = tag->request.candidate_id(); + + grpc_debug("%s(candidate_id: %u)", __func__, candidate_id); + + // Request: Phase phase = 2; + int phase = tag->request.phase(); + // Request: string comment = 3; + const std::string comment = tag->request.comment(); + + // Find candidate configuration. + struct candidate *candidate = tag->cdb->get_candidate(candidate_id); + if (!candidate) { + tag->responder.Finish( + tag->response, + grpc::Status(grpc::StatusCode::NOT_FOUND, + "candidate configuration not found"), + tag); + tag->state = FINISH; + return; + } + + int ret = NB_OK; + uint32_t transaction_id = 0; + + // Check for misuse of the two-phase commit protocol. + switch (phase) { + case frr::CommitRequest::PREPARE: + case frr::CommitRequest::ALL: + if (candidate->transaction) { + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode::FAILED_PRECONDITION, + "candidate is in the middle of a transaction"), + tag); tag->state = FINISH; - break; + return; } - case FINISH: - delete tag; + break; + case frr::CommitRequest::ABORT: + case frr::CommitRequest::APPLY: + if (!candidate->transaction) { + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode::FAILED_PRECONDITION, + "no transaction in progress"), + tag); + tag->state = FINISH; + return; } + break; + default: + break; } - private: - frr::Northbound::AsyncService *_service; - grpc::ServerCompletionQueue *_cq; - struct candidate { - uint32_t id; - struct nb_config *config; - struct nb_transaction *transaction; - }; - std::map<uint32_t, struct candidate> _candidates; - uint32_t _nextCandidateId; + // Execute the user request. + struct nb_context context = {}; + context.client = NB_CLIENT_GRPC; + char errmsg[BUFSIZ] = {0}; + + switch (phase) { + case frr::CommitRequest::VALIDATE: + grpc_debug("`-> Performing VALIDATE"); + ret = nb_candidate_validate(&context, candidate->config, errmsg, + sizeof(errmsg)); + break; + case frr::CommitRequest::PREPARE: + grpc_debug("`-> Performing PREPARE"); + ret = nb_candidate_commit_prepare( + &context, candidate->config, comment.c_str(), + &candidate->transaction, errmsg, sizeof(errmsg)); + break; + case frr::CommitRequest::ABORT: + grpc_debug("`-> Performing ABORT"); + nb_candidate_commit_abort(candidate->transaction, errmsg, + sizeof(errmsg)); + break; + case frr::CommitRequest::APPLY: + grpc_debug("`-> Performing APPLY"); + nb_candidate_commit_apply(candidate->transaction, true, + &transaction_id, errmsg, + sizeof(errmsg)); + break; + case frr::CommitRequest::ALL: + grpc_debug("`-> Performing ALL"); + ret = nb_candidate_commit(&context, candidate->config, true, + comment.c_str(), &transaction_id, + errmsg, sizeof(errmsg)); + break; + } - static int yang_dnode_edit(struct lyd_node *dnode, - const std::string &path, - const std::string &value) - { - LY_ERR err = lyd_new_path(dnode, ly_native_ctx, path.c_str(), - value.c_str(), LYD_NEW_PATH_UPDATE, - &dnode); - if (err != LY_SUCCESS) { - flog_warn(EC_LIB_LIBYANG, - "%s: lyd_new_path() failed: %s", __func__, - ly_errmsg(ly_native_ctx)); - return -1; - } + // Map northbound error codes to gRPC status codes. + grpc::Status status; + switch (ret) { + case NB_OK: + status = grpc::Status::OK; + break; + case NB_ERR_NO_CHANGES: + status = grpc::Status(grpc::StatusCode::ABORTED, errmsg); + break; + case NB_ERR_LOCKED: + status = grpc::Status(grpc::StatusCode::UNAVAILABLE, errmsg); + break; + case NB_ERR_VALIDATION: + status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + errmsg); + break; + case NB_ERR_RESOURCE: + status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED, + errmsg); + break; + case NB_ERR: + default: + status = grpc::Status(grpc::StatusCode::INTERNAL, errmsg); + break; + } - return 0; + grpc_debug("`-> Result: %s (message: '%s')", + nb_err_name((enum nb_error)ret), errmsg); + + if (ret == NB_OK) { + // Response: uint32 transaction_id = 1; + if (transaction_id) + tag->response.set_transaction_id(transaction_id); } + if (strlen(errmsg) > 0) + tag->response.set_error_message(errmsg); - static int yang_dnode_delete(struct lyd_node *dnode, - const std::string &path) - { - dnode = yang_dnode_get(dnode, path.c_str()); - if (!dnode) - return -1; + tag->responder.Finish(tag->response, status, tag); + tag->state = FINISH; +} - lyd_free_tree(dnode); +void HandleUnaryLockConfig( + NewRpcState<frr::LockConfigRequest, frr::LockConfigResponse> *tag) +{ + grpc_debug("%s: state: %s", __func__, call_states[tag->state]); - return 0; + if (tag->state == FINISH) { + tag->state = DELETED; + return; } - static LYD_FORMAT encoding2lyd_format(enum frr::Encoding encoding) - { - switch (encoding) { - case frr::JSON: - return LYD_JSON; - case frr::XML: - return LYD_XML; - default: - flog_err(EC_LIB_DEVELOPMENT, - "%s: unknown data encoding format (%u)", - __func__, encoding); - exit(1); - } + if (nb_running_lock(NB_CLIENT_GRPC, NULL)) { + tag->responder.Finish( + tag->response, + grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, + "running configuration is locked already"), + tag); + } else { + tag->responder.Finish(tag->response, grpc::Status::OK, tag); } + tag->state = FINISH; +} - static int get_oper_data_cb(const struct lysc_node *snode, - struct yang_translator *translator, - struct yang_data *data, void *arg) - { - struct lyd_node *dnode = static_cast<struct lyd_node *>(arg); - int ret = yang_dnode_edit(dnode, data->xpath, data->value); - yang_data_free(data); +void HandleUnaryUnlockConfig( + NewRpcState<frr::UnlockConfigRequest, frr::UnlockConfigResponse> *tag) +{ + grpc_debug("%s: state: %s", __func__, call_states[tag->state]); - return (ret == 0) ? NB_OK : NB_ERR; + if (tag->state == FINISH) { + tag->state = DELETED; + return; } - static void list_transactions_cb(void *arg, int transaction_id, - const char *client_name, - const char *date, const char *comment) - { + if (nb_running_unlock(NB_CLIENT_GRPC, NULL)) { + tag->responder.Finish( + tag->response, + grpc::Status( + grpc::StatusCode::FAILED_PRECONDITION, + "failed to unlock the running configuration"), + tag); + } else { + tag->responder.Finish(tag->response, grpc::Status::OK, tag); + } + tag->state = FINISH; +} - auto list = static_cast<std::list<std::tuple< - int, std::string, std::string, std::string>> *>(arg); - list->push_back(std::make_tuple( - transaction_id, std::string(client_name), - std::string(date), std::string(comment))); +static void list_transactions_cb(void *arg, int transaction_id, + const char *client_name, const char *date, + const char *comment) +{ + auto list = static_cast<std::list< + std::tuple<int, std::string, std::string, std::string>> *>(arg); + list->push_back( + std::make_tuple(transaction_id, std::string(client_name), + std::string(date), std::string(comment))); +} + +void HandleStreamingListTransactions( + NewRpcState<frr::ListTransactionsRequest, frr::ListTransactionsResponse> + *tag) +{ + grpc_debug("%s: state: %s", __func__, call_states[tag->state]); + + if (tag->state == FINISH) { + delete static_cast<std::list<std::tuple< + int, std::string, std::string, std::string>> *>( + tag->context); + tag->state = DELETED; + return; } - static LY_ERR data_tree_from_dnode(frr::DataTree *dt, - const struct lyd_node *dnode, - LYD_FORMAT lyd_format, - bool with_defaults) - { - char *strp; - int options = 0; - - SET_FLAG(options, LYD_PRINT_WITHSIBLINGS); - if (with_defaults) - SET_FLAG(options, LYD_PRINT_WD_ALL); - else - SET_FLAG(options, LYD_PRINT_WD_TRIM); - - LY_ERR err = lyd_print_mem(&strp, dnode, lyd_format, options); - if (err == LY_SUCCESS) { - if (strp) { - dt->set_data(strp); - free(strp); - } - } - return err; + if (!tag->context) { + /* Creating, first time called for this RPC */ + auto new_list = + new std::list<std::tuple<int, std::string, std::string, + std::string>>(); + tag->context = new_list; + nb_db_transactions_iterate(list_transactions_cb, tag->context); + + new_list->push_back(std::make_tuple( + 0xFFFF, std::string("fake client"), + std::string("fake date"), std::string("fake comment"))); + new_list->push_back( + std::make_tuple(0xFFFE, std::string("fake client2"), + std::string("fake date"), + std::string("fake comment2"))); } - static struct lyd_node *dnode_from_data_tree(const frr::DataTree *dt, - bool config_only) - { - struct lyd_node *dnode; - int options, opt2; - LY_ERR err; + auto list = static_cast<std::list< + std::tuple<int, std::string, std::string, std::string>> *>( + tag->context); - if (config_only) { - options = LYD_PARSE_STRICT | LYD_PARSE_NO_STATE; - opt2 = LYD_VALIDATE_NO_STATE; - } else { - options = LYD_PARSE_STRICT; - opt2 = 0; - } - - err = lyd_parse_data_mem(ly_native_ctx, dt->data().c_str(), - encoding2lyd_format(dt->encoding()), - options, opt2, &dnode); - if (err != LY_SUCCESS) { - flog_warn(EC_LIB_LIBYANG, - "%s: lyd_parse_mem() failed: %s", __func__, - ly_errmsg(ly_native_ctx)); - } - return dnode; + if (list->empty()) { + tag->async_responder.Finish(grpc::Status::OK, tag); + tag->state = FINISH; + return; } - static struct lyd_node *get_dnode_config(const std::string &path) - { - struct lyd_node *dnode; + auto item = list->back(); - dnode = yang_dnode_get(running_config->dnode, - path.empty() ? NULL : path.c_str()); - if (dnode) - dnode = yang_dnode_dup(dnode); + frr::ListTransactionsResponse response; - return dnode; - } + // Response: uint32 id = 1; + response.set_id(std::get<0>(item)); - static struct lyd_node *get_dnode_state(const std::string &path) - { - struct lyd_node *dnode; - - dnode = yang_dnode_new(ly_native_ctx, false); - if (nb_oper_data_iterate(path.c_str(), NULL, 0, - get_oper_data_cb, dnode) - != NB_OK) { - yang_dnode_free(dnode); - return NULL; - } + // Response: string client = 2; + response.set_client(std::get<1>(item).c_str()); + + // Response: string date = 3; + response.set_date(std::get<2>(item).c_str()); + + // Response: string comment = 4; + response.set_comment(std::get<3>(item).c_str()); - return dnode; + list->pop_back(); + if (list->empty()) { + tag->async_responder.WriteAndFinish( + response, grpc::WriteOptions(), grpc::Status::OK, tag); + tag->state = FINISH; + } else { + tag->async_responder.Write(response, tag); + tag->state = MORE; } +} - static grpc::Status get_path(frr::DataTree *dt, const std::string &path, - int type, LYD_FORMAT lyd_format, - bool with_defaults) - { - struct lyd_node *dnode_config = NULL; - struct lyd_node *dnode_state = NULL; - struct lyd_node *dnode_final; - - // Configuration data. - if (type == frr::GetRequest_DataType_ALL - || type == frr::GetRequest_DataType_CONFIG) { - dnode_config = get_dnode_config(path); - if (!dnode_config) - return grpc::Status( - grpc::StatusCode::INVALID_ARGUMENT, - "Data path not found"); - } +void HandleUnaryGetTransaction(NewRpcState<frr::GetTransactionRequest, + frr::GetTransactionResponse> *tag) +{ + grpc_debug("%s: state: %s", __func__, call_states[tag->state]); - // Operational data. - if (type == frr::GetRequest_DataType_ALL - || type == frr::GetRequest_DataType_STATE) { - dnode_state = get_dnode_state(path); - if (!dnode_state) { - if (dnode_config) - yang_dnode_free(dnode_config); - return grpc::Status( - grpc::StatusCode::INVALID_ARGUMENT, - "Failed to fetch operational data"); - } - } + if (tag->state == FINISH) { + tag->state = DELETED; + return; + } - switch (type) { - case frr::GetRequest_DataType_ALL: - // - // Combine configuration and state data into a single - // dnode. - // - if (lyd_merge_tree(&dnode_state, dnode_config, - LYD_MERGE_DESTRUCT) - != LY_SUCCESS) { - yang_dnode_free(dnode_state); - yang_dnode_free(dnode_config); - return grpc::Status( - grpc::StatusCode::INTERNAL, - "Failed to merge configuration and state data", - ly_errmsg(ly_native_ctx)); - } - - dnode_final = dnode_state; - break; - case frr::GetRequest_DataType_CONFIG: - dnode_final = dnode_config; - break; - case frr::GetRequest_DataType_STATE: - dnode_final = dnode_state; - break; - } + // Request: uint32 transaction_id = 1; + uint32_t transaction_id = tag->request.transaction_id(); + // Request: Encoding encoding = 2; + frr::Encoding encoding = tag->request.encoding(); + // Request: bool with_defaults = 3; + bool with_defaults = tag->request.with_defaults(); + + grpc_debug("%s(transaction_id: %u, encoding: %u)", __func__, + transaction_id, encoding); + + struct nb_config *nb_config; + + // Load configuration from the transactions database. + nb_config = nb_db_transaction_load(transaction_id); + if (!nb_config) { + tag->responder.Finish( + tag->response, + grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + "Transaction not found"), + tag); + tag->state = FINISH; + return; + } - // Validate data to create implicit default nodes if necessary. - int validate_opts = 0; - if (type == frr::GetRequest_DataType_CONFIG) - validate_opts = LYD_VALIDATE_NO_STATE; - else - validate_opts = 0; - - LY_ERR err = lyd_validate_all(&dnode_final, ly_native_ctx, - validate_opts, NULL); - - if (err) - flog_warn(EC_LIB_LIBYANG, - "%s: lyd_validate_all() failed: %s", __func__, - ly_errmsg(ly_native_ctx)); - // Dump data using the requested format. - if (!err) - err = data_tree_from_dnode(dt, dnode_final, lyd_format, - with_defaults); - yang_dnode_free(dnode_final); - if (err) - return grpc::Status(grpc::StatusCode::INTERNAL, - "Failed to dump data"); - return grpc::Status::OK; + // Response: DataTree config = 1; + auto config = tag->response.mutable_config(); + config->set_encoding(encoding); + + // Dump data using the requested format. + if (data_tree_from_dnode(config, nb_config->dnode, + encoding2lyd_format(encoding), with_defaults) + != 0) { + nb_config_free(nb_config); + tag->responder.Finish(tag->response, + grpc::Status(grpc::StatusCode::INTERNAL, + "Failed to dump data"), + tag); + tag->state = FINISH; + return; } - struct candidate *create_candidate(void) - { - uint32_t candidate_id = ++_nextCandidateId; + nb_config_free(nb_config); - // Check for overflow. - // TODO: implement an algorithm for unique reusable IDs. - if (candidate_id == 0) - return NULL; + tag->responder.Finish(tag->response, grpc::Status::OK, tag); + tag->state = FINISH; +} - struct candidate *candidate = &_candidates[candidate_id]; - candidate->id = candidate_id; - candidate->config = nb_config_dup(running_config); - candidate->transaction = NULL; +void HandleUnaryExecute( + NewRpcState<frr::ExecuteRequest, frr::ExecuteResponse> *tag) +{ + grpc_debug("%s: state: %s", __func__, call_states[tag->state]); - return candidate; + if (tag->state == FINISH) { + tag->state = DELETED; + return; } - void delete_candidate(struct candidate *candidate) - { - char errmsg[BUFSIZ] = {0}; + struct nb_node *nb_node; + struct list *input_list; + struct list *output_list; + struct listnode *node; + struct yang_data *data; + const char *xpath; + char errmsg[BUFSIZ] = {0}; + + // Request: string path = 1; + xpath = tag->request.path().c_str(); + + grpc_debug("%s(path: \"%s\")", __func__, xpath); + + if (tag->request.path().empty()) { + tag->responder.Finish( + tag->response, + grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + "Data path is empty"), + tag); + tag->state = FINISH; + return; + } - _candidates.erase(candidate->id); - nb_config_free(candidate->config); - if (candidate->transaction) - nb_candidate_commit_abort(candidate->transaction, - errmsg, sizeof(errmsg)); + nb_node = nb_node_find(xpath); + if (!nb_node) { + tag->responder.Finish( + tag->response, + grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + "Unknown data path"), + tag); + tag->state = FINISH; + return; } - struct candidate *get_candidate(uint32_t candidate_id) - { - struct candidate *candidate; + input_list = yang_data_list_new(); + output_list = yang_data_list_new(); - if (_candidates.count(candidate_id) == 0) - return NULL; + // Read input parameters. + auto input = tag->request.input(); + for (const frr::PathValue &pv : input) { + // Request: repeated PathValue input = 2; + data = yang_data_new(pv.path().c_str(), pv.value().c_str()); + listnode_add(input_list, data); + } - return &_candidates[candidate_id]; + // Execute callback registered for this XPath. + if (nb_callback_rpc(nb_node, xpath, input_list, output_list, errmsg, + sizeof(errmsg)) + != NB_OK) { + flog_warn(EC_LIB_NB_CB_RPC, "%s: rpc callback failed: %s", + __func__, xpath); + list_delete(&input_list); + list_delete(&output_list); + + tag->responder.Finish( + tag->response, + grpc::Status(grpc::StatusCode::INTERNAL, "RPC failed"), + tag); + tag->state = FINISH; + return; } + + // Process output parameters. + for (ALL_LIST_ELEMENTS_RO(output_list, node, data)) { + // Response: repeated PathValue output = 1; + frr::PathValue *pv = tag->response.add_output(); + pv->set_path(data->xpath); + pv->set_value(data->value); + } + + // Release memory. + list_delete(&input_list); + list_delete(&output_list); + + tag->responder.Finish(tag->response, grpc::Status::OK, tag); + tag->state = FINISH; +} + +// ------------------------------------------------------ +// Thread Initialization and Run Functions +// ------------------------------------------------------ + + +#define REQUEST_NEWRPC(NAME, cdb) \ + do { \ + auto _rpcState = new NewRpcState<frr::NAME##Request, \ + frr::NAME##Response>( \ + (cdb), &frr::Northbound::AsyncService::Request##NAME, \ + &HandleUnary##NAME, #NAME); \ + _rpcState->do_request(service, _cq); \ + } while (0) + +#define REQUEST_NEWRPC_STREAMING(NAME, cdb) \ + do { \ + auto _rpcState = new NewRpcState<frr::NAME##Request, \ + frr::NAME##Response>( \ + (cdb), &frr::Northbound::AsyncService::Request##NAME, \ + &HandleStreaming##NAME, #NAME); \ + _rpcState->do_request(service, _cq); \ + } while (0) + +struct grpc_pthread_attr { + struct frr_pthread_attr attr; + unsigned long port; }; static void *grpc_pthread_start(void *arg) { struct frr_pthread *fpt = static_cast<frr_pthread *>(arg); - unsigned long *port = static_cast<unsigned long *>(fpt->data); + uint port = (uint) reinterpret_cast<intptr_t>(fpt->data); + + Candidates candidates; + grpc::ServerBuilder builder; + std::stringstream server_address; + frr::Northbound::AsyncService *service = + new frr::Northbound::AsyncService(); + grpc::ServerCompletionQueue *_cq; frr_pthread_set_name(fpt); - NorthboundImpl nb; - nb.Run(*port); + server_address << "0.0.0.0:" << port; + builder.AddListeningPort(server_address.str(), + grpc::InsecureServerCredentials()); + builder.RegisterService(service); + auto cq = builder.AddCompletionQueue(); + _cq = cq.get(); + auto server = builder.BuildAndStart(); + + /* Schedule all RPC handlers */ + REQUEST_NEWRPC(GetCapabilities, NULL); + REQUEST_NEWRPC(CreateCandidate, &candidates); + REQUEST_NEWRPC(DeleteCandidate, &candidates); + REQUEST_NEWRPC(UpdateCandidate, &candidates); + REQUEST_NEWRPC(EditCandidate, &candidates); + REQUEST_NEWRPC(LoadToCandidate, &candidates); + REQUEST_NEWRPC(Commit, &candidates); + REQUEST_NEWRPC(GetTransaction, NULL); + REQUEST_NEWRPC(LockConfig, NULL); + REQUEST_NEWRPC(UnlockConfig, NULL); + REQUEST_NEWRPC(Execute, NULL); + REQUEST_NEWRPC_STREAMING(Get, NULL); + REQUEST_NEWRPC_STREAMING(ListTransactions, NULL); + + zlog_notice("gRPC server listening on %s", + server_address.str().c_str()); + + /* Process inbound RPCs */ + while (true) { + void *tag; + bool ok; + + _cq->Next(&tag, &ok); + grpc_debug("%s: Got next from CompletionQueue, %p %d", __func__, + tag, ok); + GPR_ASSERT(ok); + + RpcStateBase *rpc = static_cast<RpcStateBase *>(tag); + CallState state = rpc->doCallback(); + grpc_debug("%s: Callback returned RPC State: %s", __func__, + call_states[state]); + + /* + * Our side is done (FINISH) receive new requests of this type + * We could do this earlier but that would mean we could be + * handling multiple same type requests in parallel. We expect + * to be called back once more in the FINISH state (from the + * user indicating Finish() for cleanup. + */ + if (state == FINISH) + rpc->do_request(service, _cq); + } + /*NOTREACHED*/ return NULL; } -static int frr_grpc_init(unsigned long *port) + +static int frr_grpc_init(uint port) { + struct frr_pthread_attr attr = { + .start = grpc_pthread_start, + .stop = NULL, + }; + fpt = frr_pthread_new(&attr, "frr-grpc", "frr-grpc"); - fpt->data = static_cast<void *>(port); + fpt->data = reinterpret_cast<void *>((intptr_t)port); /* Create a pthread for gRPC since it runs its own event loop. */ if (frr_pthread_run(fpt, NULL) < 0) { @@ -1363,7 +1336,6 @@ static int frr_grpc_finish(void) if (fpt) frr_pthread_destroy(fpt); // TODO: cancel the gRPC pthreads gracefully. - return 0; } @@ -1376,28 +1348,20 @@ static int frr_grpc_finish(void) */ static int frr_grpc_module_very_late_init(struct thread *thread) { - static unsigned long port = GRPC_DEFAULT_PORT; const char *args = THIS_MODULE->load_args; + uint port = GRPC_DEFAULT_PORT; - // Parse port number. if (args) { - try { - port = std::stoul(args); - if (port < 1024) - throw std::invalid_argument( - "can't use privileged port"); - if (port > UINT16_MAX) - throw std::invalid_argument( - "port number is too big"); - } catch (std::exception &e) { + port = std::stoul(args); + if (port < 1024 || port > UINT16_MAX) { flog_err(EC_LIB_GRPC_INIT, - "%s: failed to parse port number: %s", - __func__, e.what()); + "%s: port number must be between 1025 and %d", + __func__, UINT16_MAX); goto error; } } - if (frr_grpc_init(&port) < 0) + if (frr_grpc_init(port) < 0) goto error; return 0; @@ -1409,9 +1373,9 @@ error: static int frr_grpc_module_late_init(struct thread_master *tm) { - thread_add_event(tm, frr_grpc_module_very_late_init, NULL, 0, NULL); + main_master = tm; hook_register(frr_fini, frr_grpc_finish); - + thread_add_event(tm, frr_grpc_module_very_late_init, NULL, 0, NULL); return 0; } @@ -1424,5 +1388,4 @@ static int frr_grpc_module_init(void) FRR_MODULE_SETUP(.name = "frr_grpc", .version = FRR_VERSION, .description = "FRR gRPC northbound module", - .init = frr_grpc_module_init, -); + .init = frr_grpc_module_init, ); |