diff options
author | Christian Hopps <chopps@labn.net> | 2023-02-28 16:00:19 +0100 |
---|---|---|
committer | Christian Hopps <chopps@labn.net> | 2023-03-22 03:08:32 +0100 |
commit | 7d65b7b7f441ddb2e730d4274023c0d3d2907c6b (patch) | |
tree | a1a7106459429fb936a493cba746aff105d3ce06 | |
parent | mgmtd: Add MGMT Frontend Interface Framework (diff) | |
download | frr-7d65b7b7f441ddb2e730d4274023c0d3d2907c6b.tar.xz frr-7d65b7b7f441ddb2e730d4274023c0d3d2907c6b.zip |
mgmtd: Add MGMT Backend Interface Framework
This commit introduces the MGMT Backend Interface which can be used
by back-end management client daemons like BGPd, Staticd, Zebra to
connect with new FRR Management daemon (MGMTd) and utilize the new
FRR Management Framework to let any Frontend clients to retrieve any
operational data or manipulate any configuration data owned by the
individual Backend daemon component.
This commit includes the following functionalities in the changeset:
1. Add new Backend server for Backend daemons connect to.
2. Add a C-based Backend client library which can be used by daemons
to communicate with MGMTd via the Backend interface.
3. Maintain a backend adapter for each connection from an appropriate
Backend client to facilitate client requests and track one or more
transactions initiated from Frontend client sessions that involves
the backend client component.
4. Add the following commands to inspect various Backend client
related information
a. show mgmt backend-adapter all
b. show mgmt backend-yang-xpath-registry
c. show mgmt yang-xpath-subscription
Co-authored-by: Pushpasis Sarkar <pushpasis@gmail.com>
Co-authored-by: Abhinay Ramesh <rabhinay@vmware.com>
Co-authored-by: Ujwal P <ujwalp@vmware.com>
Signed-off-by: Yash Ranjan <ranjany@vmware.com>
-rw-r--r-- | .clang-format | 5 | ||||
-rw-r--r-- | lib/mgmt.proto | 32 | ||||
-rw-r--r-- | lib/mgmt_be_client.c | 1490 | ||||
-rw-r--r-- | lib/mgmt_be_client.h | 273 | ||||
-rw-r--r-- | lib/northbound.c | 299 | ||||
-rw-r--r-- | lib/northbound.h | 86 | ||||
-rw-r--r-- | lib/northbound_cli.c | 70 | ||||
-rw-r--r-- | lib/northbound_confd.c | 3 | ||||
-rw-r--r-- | lib/northbound_grpc.cpp | 3 | ||||
-rw-r--r-- | lib/northbound_sysrepo.c | 3 | ||||
-rw-r--r-- | lib/subdir.am | 2 | ||||
-rw-r--r-- | mgmtd/mgmt.c | 10 | ||||
-rw-r--r-- | mgmtd/mgmt_be_adapter.c | 1288 | ||||
-rw-r--r-- | mgmtd/mgmt_be_adapter.h | 231 | ||||
-rw-r--r-- | mgmtd/mgmt_be_server.c | 160 | ||||
-rw-r--r-- | mgmtd/mgmt_be_server.h | 20 | ||||
-rw-r--r-- | mgmtd/mgmt_defines.h | 17 | ||||
-rw-r--r-- | mgmtd/mgmt_main.c | 12 | ||||
-rw-r--r-- | mgmtd/mgmt_memory.c | 1 | ||||
-rw-r--r-- | mgmtd/mgmt_memory.h | 1 | ||||
-rw-r--r-- | mgmtd/mgmt_vty.c | 55 | ||||
-rw-r--r-- | mgmtd/subdir.am | 4 |
22 files changed, 3941 insertions, 124 deletions
diff --git a/.clang-format b/.clang-format index 424662b81..01e909d93 100644 --- a/.clang-format +++ b/.clang-format @@ -52,6 +52,11 @@ ForEachMacros: - FOR_ALL_INTERFACES - FOR_ALL_INTERFACES_ADDRESSES - JSON_FOREACH + - FOREACH_BE_TXN_BATCH_IN_LIST + - FOREACH_BE_APPLY_BATCH_IN_LIST + - FOREACH_BE_TXN_IN_LIST + - FOREACH_SESSION_IN_LIST + - FOREACH_MGMTD_BE_CLIENT_ID # libyang - LY_FOR_KEYS - LY_LIST_FOR diff --git a/lib/mgmt.proto b/lib/mgmt.proto index 437679410..8a11ff0fa 100644 --- a/lib/mgmt.proto +++ b/lib/mgmt.proto @@ -106,18 +106,6 @@ message BeCfgDataCreateReply { optional string error_if_any = 4; } -message BeCfgDataValidateReq { - required uint64 txn_id = 1; - repeated uint64 batch_ids = 2; -} - -message BeCfgDataValidateReply { - required uint64 txn_id = 1; - repeated uint64 batch_ids = 2; - required bool success = 3; - optional string error_if_any = 4; -} - message BeCfgDataApplyReq { required uint64 txn_id = 1; } @@ -181,17 +169,15 @@ message BeMessage { BeTxnReply txn_reply = 5; BeCfgDataCreateReq cfg_data_req = 6; BeCfgDataCreateReply cfg_data_reply = 7; - BeCfgDataValidateReq cfg_validate_req = 8; - BeCfgDataValidateReply cfg_validate_reply = 9; - BeCfgDataApplyReq cfg_apply_req = 10; - BeCfgDataApplyReply cfg_apply_reply = 11; - BeOperDataGetReq get_req = 12; - BeOperDataGetReply get_reply = 13; - BeOperDataNotify notify_data = 14; - BeConfigCmdReq cfg_cmd_req = 15; - BeConfigCmdReply cfg_cmd_reply = 16; - BeShowCmdReq show_cmd_req = 17; - BeShowCmdReply show_cmd_reply = 18; + BeCfgDataApplyReq cfg_apply_req = 8; + BeCfgDataApplyReply cfg_apply_reply = 9; + BeOperDataGetReq get_req = 10; + BeOperDataGetReply get_reply = 11; + BeOperDataNotify notify_data = 12; + BeConfigCmdReq cfg_cmd_req = 13; + BeConfigCmdReply cfg_cmd_reply = 14; + BeShowCmdReq show_cmd_req = 15; + BeShowCmdReply show_cmd_reply = 16; } } diff --git a/lib/mgmt_be_client.c b/lib/mgmt_be_client.c new file mode 100644 index 000000000..d86f3d336 --- /dev/null +++ b/lib/mgmt_be_client.c @@ -0,0 +1,1490 @@ +// SPDX-License-Identifier: GPL-2.0-or-later +/* + * MGMTD Backend Client Library api interfaces + * Copyright (C) 2021 Vmware, Inc. + * Pushpasis Sarkar <spushpasis@vmware.com> + */ + +#include <zebra.h> +#include "libfrr.h" +#include "mgmtd/mgmt.h" +#include "mgmt_be_client.h" +#include "mgmt_pb.h" +#include "network.h" +#include "stream.h" +#include "sockopt.h" + +#ifdef REDIRECT_DEBUG_TO_STDERR +#define MGMTD_BE_CLIENT_DBG(fmt, ...) \ + fprintf(stderr, "%s: " fmt "\n", __func__, ##__VA_ARGS__) +#define MGMTD_BE_CLIENT_ERR(fmt, ...) \ + fprintf(stderr, "%s: ERROR, " fmt "\n", __func__, ##__VA_ARGS__) +#else /* REDIRECT_DEBUG_TO_STDERR */ +#define MGMTD_BE_CLIENT_DBG(fmt, ...) \ + do { \ + if (mgmt_debug_be_client) \ + zlog_debug("%s: " fmt, __func__, ##__VA_ARGS__); \ + } while (0) +#define MGMTD_BE_CLIENT_ERR(fmt, ...) \ + zlog_err("%s: ERROR: " fmt, __func__, ##__VA_ARGS__) +#endif /* REDIRECT_DEBUG_TO_STDERR */ + +DEFINE_MTYPE_STATIC(LIB, MGMTD_BE_BATCH, + "MGMTD backend transaction batch data"); +DEFINE_MTYPE_STATIC(LIB, MGMTD_BE_TXN, "MGMTD backend transaction data"); + +enum mgmt_be_txn_event { + MGMTD_BE_TXN_PROC_SETCFG = 1, + MGMTD_BE_TXN_PROC_GETCFG, + MGMTD_BE_TXN_PROC_GETDATA +}; + +struct mgmt_be_set_cfg_req { + struct nb_cfg_change cfg_changes[MGMTD_MAX_CFG_CHANGES_IN_BATCH]; + uint16_t num_cfg_changes; +}; + +struct mgmt_be_get_data_req { + char *xpaths[MGMTD_MAX_NUM_DATA_REQ_IN_BATCH]; + uint16_t num_xpaths; +}; + +struct mgmt_be_txn_req { + enum mgmt_be_txn_event event; + union { + struct mgmt_be_set_cfg_req set_cfg; + struct mgmt_be_get_data_req get_data; + } req; +}; + +PREDECL_LIST(mgmt_be_batches); +struct mgmt_be_batch_ctx { + /* Batch-Id as assigned by MGMTD */ + uint64_t batch_id; + + struct mgmt_be_txn_req txn_req; + + uint32_t flags; + + struct mgmt_be_batches_item list_linkage; +}; +#define MGMTD_BE_BATCH_FLAGS_CFG_PREPARED (1U << 0) +#define MGMTD_BE_TXN_FLAGS_CFG_APPLIED (1U << 1) +DECLARE_LIST(mgmt_be_batches, struct mgmt_be_batch_ctx, list_linkage); + +struct mgmt_be_client_ctx; + +PREDECL_LIST(mgmt_be_txns); +struct mgmt_be_txn_ctx { + /* Txn-Id as assigned by MGMTD */ + uint64_t txn_id; + uint32_t flags; + + struct mgmt_be_client_txn_ctx client_data; + struct mgmt_be_client_ctx *client_ctx; + + /* List of batches belonging to this transaction */ + struct mgmt_be_batches_head cfg_batches; + struct mgmt_be_batches_head apply_cfgs; + + struct mgmt_be_txns_item list_linkage; + + struct nb_transaction *nb_txn; + uint32_t nb_txn_id; +}; +#define MGMTD_BE_TXN_FLAGS_CFGPREP_FAILED (1U << 1) + +DECLARE_LIST(mgmt_be_txns, struct mgmt_be_txn_ctx, list_linkage); + +#define FOREACH_BE_TXN_BATCH_IN_LIST(txn, batch) \ + frr_each_safe (mgmt_be_batches, &(txn)->cfg_batches, (batch)) + +#define FOREACH_BE_APPLY_BATCH_IN_LIST(txn, batch) \ + frr_each_safe (mgmt_be_batches, &(txn)->apply_cfgs, (batch)) + +struct mgmt_be_client_ctx { + int conn_fd; + struct thread_master *tm; + struct thread *conn_retry_tmr; + struct thread *conn_read_ev; + struct thread *conn_write_ev; + struct thread *conn_writes_on; + struct thread *msg_proc_ev; + uint32_t flags; + uint32_t num_msg_tx; + uint32_t num_msg_rx; + + struct stream_fifo *ibuf_fifo; + struct stream *ibuf_work; + struct stream_fifo *obuf_fifo; + struct stream *obuf_work; + uint8_t msg_buf[MGMTD_BE_MSG_MAX_LEN]; + + struct nb_config *candidate_config; + struct nb_config *running_config; + + unsigned long num_batch_find; + unsigned long avg_batch_find_tm; + unsigned long num_edit_nb_cfg; + unsigned long avg_edit_nb_cfg_tm; + unsigned long num_prep_nb_cfg; + unsigned long avg_prep_nb_cfg_tm; + unsigned long num_apply_nb_cfg; + unsigned long avg_apply_nb_cfg_tm; + + struct mgmt_be_txns_head txn_head; + struct mgmt_be_client_params client_params; +}; + +#define MGMTD_BE_CLIENT_FLAGS_WRITES_OFF (1U << 0) + +#define FOREACH_BE_TXN_IN_LIST(client_ctx, txn) \ + frr_each_safe (mgmt_be_txns, &(client_ctx)->txn_head, (txn)) + +static bool mgmt_debug_be_client; + +static struct mgmt_be_client_ctx mgmt_be_client_ctx = {0}; + +const char *mgmt_be_client_names[MGMTD_BE_CLIENT_ID_MAX + 1] = { +#if 0 +#ifdef HAVE_STATICD + [MGMTD_BE_CLIENT_ID_STATICD] = "staticd", +#endif +#endif + [MGMTD_BE_CLIENT_ID_MAX] = "Unknown/Invalid", +}; + +/* Forward declarations */ +static void +mgmt_be_client_register_event(struct mgmt_be_client_ctx *client_ctx, + enum mgmt_be_event event); +static void +mgmt_be_client_schedule_conn_retry(struct mgmt_be_client_ctx *client_ctx, + unsigned long intvl_secs); +static int mgmt_be_client_send_msg(struct mgmt_be_client_ctx *client_ctx, + Mgmtd__BeMessage *be_msg); + +static void +mgmt_be_server_disconnect(struct mgmt_be_client_ctx *client_ctx, + bool reconnect) +{ + /* Notify client through registered callback (if any) */ + if (client_ctx->client_params.client_connect_notify) + (void)(*client_ctx->client_params + .client_connect_notify)( + (uintptr_t)client_ctx, + client_ctx->client_params.user_data, false); + + if (client_ctx->conn_fd) { + close(client_ctx->conn_fd); + client_ctx->conn_fd = 0; + } + + if (reconnect) + mgmt_be_client_schedule_conn_retry( + client_ctx, + client_ctx->client_params.conn_retry_intvl_sec); +} + +static struct mgmt_be_batch_ctx * +mgmt_be_find_batch_by_id(struct mgmt_be_txn_ctx *txn, + uint64_t batch_id) +{ + struct mgmt_be_batch_ctx *batch = NULL; + + FOREACH_BE_TXN_BATCH_IN_LIST (txn, batch) { + if (batch->batch_id == batch_id) + return batch; + } + + return NULL; +} + +static struct mgmt_be_batch_ctx * +mgmt_be_batch_create(struct mgmt_be_txn_ctx *txn, uint64_t batch_id) +{ + struct mgmt_be_batch_ctx *batch = NULL; + + batch = mgmt_be_find_batch_by_id(txn, batch_id); + if (!batch) { + batch = XCALLOC(MTYPE_MGMTD_BE_BATCH, + sizeof(struct mgmt_be_batch_ctx)); + assert(batch); + + batch->batch_id = batch_id; + mgmt_be_batches_add_tail(&txn->cfg_batches, batch); + + MGMTD_BE_CLIENT_DBG("Added new batch 0x%llx to transaction", + (unsigned long long)batch_id); + } + + return batch; +} + +static void mgmt_be_batch_delete(struct mgmt_be_txn_ctx *txn, + struct mgmt_be_batch_ctx **batch) +{ + uint16_t indx; + + if (!batch) + return; + + mgmt_be_batches_del(&txn->cfg_batches, *batch); + if ((*batch)->txn_req.event == MGMTD_BE_TXN_PROC_SETCFG) { + for (indx = 0; indx < MGMTD_MAX_CFG_CHANGES_IN_BATCH; indx++) { + if ((*batch)->txn_req.req.set_cfg.cfg_changes[indx] + .value) { + free((char *)(*batch) + ->txn_req.req.set_cfg + .cfg_changes[indx] + .value); + } + } + } + + XFREE(MTYPE_MGMTD_BE_BATCH, *batch); + *batch = NULL; +} + +static void mgmt_be_cleanup_all_batches(struct mgmt_be_txn_ctx *txn) +{ + struct mgmt_be_batch_ctx *batch = NULL; + + FOREACH_BE_TXN_BATCH_IN_LIST (txn, batch) { + mgmt_be_batch_delete(txn, &batch); + } + + FOREACH_BE_APPLY_BATCH_IN_LIST (txn, batch) { + mgmt_be_batch_delete(txn, &batch); + } +} + +static struct mgmt_be_txn_ctx * +mgmt_be_find_txn_by_id(struct mgmt_be_client_ctx *client_ctx, + uint64_t txn_id) +{ + struct mgmt_be_txn_ctx *txn = NULL; + + FOREACH_BE_TXN_IN_LIST (client_ctx, txn) { + if (txn->txn_id == txn_id) + return txn; + } + + return NULL; +} + +static struct mgmt_be_txn_ctx * +mgmt_be_txn_create(struct mgmt_be_client_ctx *client_ctx, + uint64_t txn_id) +{ + struct mgmt_be_txn_ctx *txn = NULL; + + txn = mgmt_be_find_txn_by_id(client_ctx, txn_id); + if (!txn) { + txn = XCALLOC(MTYPE_MGMTD_BE_TXN, + sizeof(struct mgmt_be_txn_ctx)); + assert(txn); + + txn->txn_id = txn_id; + txn->client_ctx = client_ctx; + mgmt_be_batches_init(&txn->cfg_batches); + mgmt_be_batches_init(&txn->apply_cfgs); + mgmt_be_txns_add_tail(&client_ctx->txn_head, txn); + + MGMTD_BE_CLIENT_DBG("Added new transaction 0x%llx", + (unsigned long long)txn_id); + } + + return txn; +} + +static void mgmt_be_txn_delete(struct mgmt_be_client_ctx *client_ctx, + struct mgmt_be_txn_ctx **txn) +{ + char err_msg[] = "MGMT Transaction Delete"; + + if (!txn) + return; + + /* + * Remove the transaction from the list of transactions + * so that future lookups with the same transaction id + * does not return this one. + */ + mgmt_be_txns_del(&client_ctx->txn_head, *txn); + + /* + * Time to delete the transaction which should also + * take care of cleaning up all batches created via + * CFGDATA_CREATE_REQs. But first notify the client + * about the transaction delete. + */ + if (client_ctx->client_params.txn_notify) + (void)(*client_ctx->client_params + .txn_notify)( + (uintptr_t)client_ctx, + client_ctx->client_params.user_data, + &(*txn)->client_data, true); + + mgmt_be_cleanup_all_batches(*txn); + if ((*txn)->nb_txn) + nb_candidate_commit_abort((*txn)->nb_txn, err_msg, + sizeof(err_msg)); + XFREE(MTYPE_MGMTD_BE_TXN, *txn); + + *txn = NULL; +} + +static void +mgmt_be_cleanup_all_txns(struct mgmt_be_client_ctx *client_ctx) +{ + struct mgmt_be_txn_ctx *txn = NULL; + + FOREACH_BE_TXN_IN_LIST (client_ctx, txn) { + mgmt_be_txn_delete(client_ctx, &txn); + } +} + +static int mgmt_be_send_txn_reply(struct mgmt_be_client_ctx *client_ctx, + uint64_t txn_id, bool create, + bool success) +{ + Mgmtd__BeMessage be_msg; + Mgmtd__BeTxnReply txn_reply; + + mgmtd__be_txn_reply__init(&txn_reply); + txn_reply.create = create; + txn_reply.txn_id = txn_id; + txn_reply.success = success; + + mgmtd__be_message__init(&be_msg); + be_msg.message_case = MGMTD__BE_MESSAGE__MESSAGE_TXN_REPLY; + be_msg.txn_reply = &txn_reply; + + MGMTD_BE_CLIENT_DBG( + "Sending TXN_REPLY message to MGMTD for txn 0x%llx", + (unsigned long long)txn_id); + + return mgmt_be_client_send_msg(client_ctx, &be_msg); +} + +static int mgmt_be_process_txn_req(struct mgmt_be_client_ctx *client_ctx, + uint64_t txn_id, bool create) +{ + struct mgmt_be_txn_ctx *txn; + + txn = mgmt_be_find_txn_by_id(client_ctx, txn_id); + if (create) { + if (txn) { + /* + * Transaction with same txn-id already exists. + * Should not happen under any circumstances. + */ + MGMTD_BE_CLIENT_ERR( + "Transaction 0x%llx already exists!!!", + (unsigned long long)txn_id); + mgmt_be_send_txn_reply(client_ctx, txn_id, create, + false); + } + + MGMTD_BE_CLIENT_DBG("Created new transaction 0x%llx", + (unsigned long long)txn_id); + txn = mgmt_be_txn_create(client_ctx, txn_id); + + if (client_ctx->client_params.txn_notify) + (void)(*client_ctx->client_params + .txn_notify)( + (uintptr_t)client_ctx, + client_ctx->client_params.user_data, + &txn->client_data, false); + } else { + if (!txn) { + /* + * Transaction with same txn-id does not exists. + * Return sucess anyways. + */ + MGMTD_BE_CLIENT_DBG( + "Transaction to delete 0x%llx does NOT exists!!!", + (unsigned long long)txn_id); + } else { + MGMTD_BE_CLIENT_DBG("Delete transaction 0x%llx", + (unsigned long long)txn_id); + mgmt_be_txn_delete(client_ctx, &txn); + } + } + + mgmt_be_send_txn_reply(client_ctx, txn_id, create, true); + + return 0; +} + +static int +mgmt_be_send_cfgdata_create_reply(struct mgmt_be_client_ctx *client_ctx, + uint64_t txn_id, uint64_t batch_id, + bool success, const char *error_if_any) +{ + Mgmtd__BeMessage be_msg; + Mgmtd__BeCfgDataCreateReply cfgdata_reply; + + mgmtd__be_cfg_data_create_reply__init(&cfgdata_reply); + cfgdata_reply.txn_id = (uint64_t)txn_id; + cfgdata_reply.batch_id = (uint64_t)batch_id; + cfgdata_reply.success = success; + if (error_if_any) + cfgdata_reply.error_if_any = (char *)error_if_any; + + mgmtd__be_message__init(&be_msg); + be_msg.message_case = MGMTD__BE_MESSAGE__MESSAGE_CFG_DATA_REPLY; + be_msg.cfg_data_reply = &cfgdata_reply; + + MGMTD_BE_CLIENT_DBG( + "Sending CFGDATA_CREATE_REPLY message to MGMTD for txn 0x%llx batch 0x%llx", + (unsigned long long)txn_id, (unsigned long long)batch_id); + + return mgmt_be_client_send_msg(client_ctx, &be_msg); +} + +static void mgmt_be_txn_cfg_abort(struct mgmt_be_txn_ctx *txn) +{ + char errmsg[BUFSIZ] = {0}; + + assert(txn && txn->client_ctx); + if (txn->nb_txn) { + MGMTD_BE_CLIENT_ERR( + "Aborting configurations after prep for Txn 0x%llx", + (unsigned long long)txn->txn_id); + nb_candidate_commit_abort(txn->nb_txn, errmsg, sizeof(errmsg)); + txn->nb_txn = 0; + } + + /* + * revert candidate back to running + * + * This is one txn ctx but the candidate_config is per client ctx, how + * does that work? + */ + MGMTD_BE_CLIENT_DBG( + "Reset candidate configurations after abort of Txn 0x%llx", + (unsigned long long)txn->txn_id); + nb_config_replace(txn->client_ctx->candidate_config, + txn->client_ctx->running_config, true); +} + +static int mgmt_be_txn_cfg_prepare(struct mgmt_be_txn_ctx *txn) +{ + struct mgmt_be_client_ctx *client_ctx; + struct mgmt_be_txn_req *txn_req = NULL; + struct nb_context nb_ctx = {0}; + struct timeval edit_nb_cfg_start; + struct timeval edit_nb_cfg_end; + unsigned long edit_nb_cfg_tm; + struct timeval prep_nb_cfg_start; + struct timeval prep_nb_cfg_end; + unsigned long prep_nb_cfg_tm; + struct mgmt_be_batch_ctx *batch; + bool error; + char err_buf[BUFSIZ]; + size_t num_processed; + bool debug_be = mgmt_debug_be_client; + int err; + + assert(txn && txn->client_ctx); + client_ctx = txn->client_ctx; + + num_processed = 0; + FOREACH_BE_TXN_BATCH_IN_LIST (txn, batch) { + txn_req = &batch->txn_req; + error = false; + nb_ctx.client = NB_CLIENT_CLI; + nb_ctx.user = (void *)client_ctx->client_params.user_data; + + if (!txn->nb_txn) { + /* + * This happens when the current backend client is only + * interested in consuming the config items but is not + * interested in validating it. + */ + error = false; + if (debug_be) + gettimeofday(&edit_nb_cfg_start, NULL); + nb_candidate_edit_config_changes( + client_ctx->candidate_config, + txn_req->req.set_cfg.cfg_changes, + (size_t)txn_req->req.set_cfg.num_cfg_changes, + NULL, NULL, 0, err_buf, sizeof(err_buf), + &error); + if (error) { + err_buf[sizeof(err_buf) - 1] = 0; + MGMTD_BE_CLIENT_ERR( + "Failed to update configs for Txn %llx Batch %llx to Candidate! Err: '%s'", + (unsigned long long)txn->txn_id, + (unsigned long long)batch->batch_id, + err_buf); + return -1; + } + if (debug_be) { + gettimeofday(&edit_nb_cfg_end, NULL); + edit_nb_cfg_tm = timeval_elapsed( + edit_nb_cfg_end, edit_nb_cfg_start); + client_ctx->avg_edit_nb_cfg_tm = + ((client_ctx->avg_edit_nb_cfg_tm + * client_ctx->num_edit_nb_cfg) + + edit_nb_cfg_tm) + / (client_ctx->num_edit_nb_cfg + 1); + } + client_ctx->num_edit_nb_cfg++; + } + + num_processed++; + } + + if (!num_processed) + return 0; + + /* + * Now prepare all the batches we have applied in one go. + */ + nb_ctx.client = NB_CLIENT_CLI; + nb_ctx.user = (void *)client_ctx->client_params.user_data; + if (debug_be) + gettimeofday(&prep_nb_cfg_start, NULL); + err = nb_candidate_commit_prepare(nb_ctx, client_ctx->candidate_config, + "MGMTD Backend Txn", &txn->nb_txn, +#ifdef MGMTD_LOCAL_VALIDATIONS_ENABLED + true, true, +#else + false, true, +#endif + err_buf, sizeof(err_buf) - 1); + if (err != NB_OK) { + err_buf[sizeof(err_buf) - 1] = 0; + if (err == NB_ERR_VALIDATION) + MGMTD_BE_CLIENT_ERR( + "Failed to validate configs for Txn %llx %u Batches! Err: '%s'", + (unsigned long long)txn->txn_id, + (uint32_t)num_processed, err_buf); + else + MGMTD_BE_CLIENT_ERR( + "Failed to prepare configs for Txn %llx, %u Batches! Err: '%s'", + (unsigned long long)txn->txn_id, + (uint32_t)num_processed, err_buf); + error = true; + SET_FLAG(txn->flags, MGMTD_BE_TXN_FLAGS_CFGPREP_FAILED); + } else + MGMTD_BE_CLIENT_DBG( + "Prepared configs for Txn %llx, %u Batches! successfully!", + (unsigned long long)txn->txn_id, + (uint32_t)num_processed); + if (debug_be) { + gettimeofday(&prep_nb_cfg_end, NULL); + prep_nb_cfg_tm = + timeval_elapsed(prep_nb_cfg_end, prep_nb_cfg_start); + client_ctx->avg_prep_nb_cfg_tm = + ((client_ctx->avg_prep_nb_cfg_tm + * client_ctx->num_prep_nb_cfg) + + prep_nb_cfg_tm) + / (client_ctx->num_prep_nb_cfg + 1); + } + client_ctx->num_prep_nb_cfg++; + + FOREACH_BE_TXN_BATCH_IN_LIST (txn, batch) { + mgmt_be_send_cfgdata_create_reply( + client_ctx, txn->txn_id, batch->batch_id, + error ? false : true, error ? err_buf : NULL); + if (!error) { + SET_FLAG(batch->flags, + MGMTD_BE_BATCH_FLAGS_CFG_PREPARED); + mgmt_be_batches_del(&txn->cfg_batches, batch); + mgmt_be_batches_add_tail(&txn->apply_cfgs, batch); + } + } + + if (debug_be) + MGMTD_BE_CLIENT_DBG( + "Avg-nb-edit-duration %lu uSec, nb-prep-duration %lu (avg: %lu) uSec, batch size %u", + client_ctx->avg_edit_nb_cfg_tm, prep_nb_cfg_tm, + client_ctx->avg_prep_nb_cfg_tm, (uint32_t)num_processed); + + if (error) + mgmt_be_txn_cfg_abort(txn); + + return 0; +} + +/* + * Process all CFG_DATA_REQs received so far and prepare them all in one go. + */ +static int +mgmt_be_update_setcfg_in_batch(struct mgmt_be_client_ctx *client_ctx, + struct mgmt_be_txn_ctx *txn, + uint64_t batch_id, + Mgmtd__YangCfgDataReq * cfg_req[], + int num_req) +{ + struct mgmt_be_batch_ctx *batch = NULL; + struct mgmt_be_txn_req *txn_req = NULL; + int index; + struct nb_cfg_change *cfg_chg; + + batch = mgmt_be_batch_create(txn, batch_id); + if (!batch) { + MGMTD_BE_CLIENT_ERR("Batch create failed!"); + return -1; + } + + txn_req = &batch->txn_req; + txn_req->event = MGMTD_BE_TXN_PROC_SETCFG; + MGMTD_BE_CLIENT_DBG( + "Created Set-Config request for batch 0x%llx, txn id 0x%llx, cfg-items:%d", + (unsigned long long)batch_id, (unsigned long long)txn->txn_id, + num_req); + + txn_req->req.set_cfg.num_cfg_changes = num_req; + for (index = 0; index < num_req; index++) { + cfg_chg = &txn_req->req.set_cfg.cfg_changes[index]; + + if (cfg_req[index]->req_type + == MGMTD__CFG_DATA_REQ_TYPE__DELETE_DATA) + cfg_chg->operation = NB_OP_DESTROY; + else + cfg_chg->operation = NB_OP_CREATE; + + strlcpy(cfg_chg->xpath, cfg_req[index]->data->xpath, + sizeof(cfg_chg->xpath)); + cfg_chg->value = (cfg_req[index]->data->value + && cfg_req[index] + ->data->value + ->encoded_str_val + ? strdup(cfg_req[index] + ->data->value + ->encoded_str_val) + : NULL); + if (cfg_chg->value + && !strncmp(cfg_chg->value, MGMTD_BE_CONTAINER_NODE_VAL, + strlen(MGMTD_BE_CONTAINER_NODE_VAL))) { + free((char *)cfg_chg->value); + cfg_chg->value = NULL; + } + } + + return 0; +} + +static int +mgmt_be_process_cfgdata_req(struct mgmt_be_client_ctx *client_ctx, + uint64_t txn_id, uint64_t batch_id, + Mgmtd__YangCfgDataReq * cfg_req[], int num_req, + bool end_of_data) +{ + struct mgmt_be_txn_ctx *txn; + + txn = mgmt_be_find_txn_by_id(client_ctx, txn_id); + if (!txn) { + MGMTD_BE_CLIENT_ERR( + "Invalid txn-id 0x%llx provided from MGMTD server", + (unsigned long long)txn_id); + mgmt_be_send_cfgdata_create_reply( + client_ctx, txn_id, batch_id, false, + "Transaction context not created yet"); + } else { + mgmt_be_update_setcfg_in_batch(client_ctx, txn, batch_id, + cfg_req, num_req); + } + + if (txn && end_of_data) { + MGMTD_BE_CLIENT_DBG("Triggering CFG_PREPARE_REQ processing"); + mgmt_be_txn_cfg_prepare(txn); + } + + return 0; +} + +static int mgmt_be_send_apply_reply(struct mgmt_be_client_ctx *client_ctx, + uint64_t txn_id, uint64_t batch_ids[], + size_t num_batch_ids, bool success, + const char *error_if_any) +{ + Mgmtd__BeMessage be_msg; + Mgmtd__BeCfgDataApplyReply apply_reply; + + mgmtd__be_cfg_data_apply_reply__init(&apply_reply); + apply_reply.success = success; + apply_reply.txn_id = txn_id; + apply_reply.batch_ids = (uint64_t *)batch_ids; + apply_reply.n_batch_ids = num_batch_ids; + + if (error_if_any) + apply_reply.error_if_any = (char *)error_if_any; + + mgmtd__be_message__init(&be_msg); + be_msg.message_case = MGMTD__BE_MESSAGE__MESSAGE_CFG_APPLY_REPLY; + be_msg.cfg_apply_reply = &apply_reply; + + MGMTD_BE_CLIENT_DBG( + "Sending CFG_APPLY_REPLY message to MGMTD for txn 0x%llx, %d batches [0x%llx - 0x%llx]", + (unsigned long long)txn_id, (int)num_batch_ids, + success && num_batch_ids ? + (unsigned long long)batch_ids[0] : 0, + success && num_batch_ids ? + (unsigned long long)batch_ids[num_batch_ids - 1] : 0); + + return mgmt_be_client_send_msg(client_ctx, &be_msg); +} + +static int mgmt_be_txn_proc_cfgapply(struct mgmt_be_txn_ctx *txn) +{ + struct mgmt_be_client_ctx *client_ctx; + struct timeval apply_nb_cfg_start; + struct timeval apply_nb_cfg_end; + unsigned long apply_nb_cfg_tm; + struct mgmt_be_batch_ctx *batch; + char err_buf[BUFSIZ]; + size_t num_processed; + static uint64_t batch_ids[MGMTD_BE_MAX_BATCH_IDS_IN_REQ]; + bool debug_be = mgmt_debug_be_client; + + assert(txn && txn->client_ctx); + client_ctx = txn->client_ctx; + + assert(txn->nb_txn); + num_processed = 0; + + /* + * Now apply all the batches we have applied in one go. + */ + if (debug_be) + gettimeofday(&apply_nb_cfg_start, NULL); + (void)nb_candidate_commit_apply(txn->nb_txn, true, &txn->nb_txn_id, + err_buf, sizeof(err_buf) - 1); + if (debug_be) { + gettimeofday(&apply_nb_cfg_end, NULL); + apply_nb_cfg_tm = + timeval_elapsed(apply_nb_cfg_end, apply_nb_cfg_start); + client_ctx->avg_apply_nb_cfg_tm = + ((client_ctx->avg_apply_nb_cfg_tm + * client_ctx->num_apply_nb_cfg) + + apply_nb_cfg_tm) + / (client_ctx->num_apply_nb_cfg + 1); + } + client_ctx->num_apply_nb_cfg++; + txn->nb_txn = NULL; + + /* + * Send back CFG_APPLY_REPLY for all batches applied. + */ + FOREACH_BE_APPLY_BATCH_IN_LIST (txn, batch) { + /* + * No need to delete the batch yet. Will be deleted during + * transaction cleanup on receiving TXN_DELETE_REQ. + */ + SET_FLAG(batch->flags, MGMTD_BE_TXN_FLAGS_CFG_APPLIED); + mgmt_be_batches_del(&txn->apply_cfgs, batch); + mgmt_be_batches_add_tail(&txn->cfg_batches, batch); + + batch_ids[num_processed] = batch->batch_id; + num_processed++; + if (num_processed == MGMTD_BE_MAX_BATCH_IDS_IN_REQ) { + mgmt_be_send_apply_reply(client_ctx, txn->txn_id, + batch_ids, num_processed, + true, NULL); + num_processed = 0; + } + } + + mgmt_be_send_apply_reply(client_ctx, txn->txn_id, batch_ids, + num_processed, true, NULL); + + if (debug_be) + MGMTD_BE_CLIENT_DBG("Nb-apply-duration %lu (avg: %lu) uSec", + apply_nb_cfg_tm, + client_ctx->avg_apply_nb_cfg_tm); + + return 0; +} + +static int +mgmt_be_process_cfg_apply(struct mgmt_be_client_ctx *client_ctx, + uint64_t txn_id) +{ + struct mgmt_be_txn_ctx *txn; + + txn = mgmt_be_find_txn_by_id(client_ctx, txn_id); + if (!txn) { + mgmt_be_send_apply_reply(client_ctx, txn_id, NULL, 0, false, + "Transaction not created yet!"); + return -1; + } + + MGMTD_BE_CLIENT_DBG("Trigger CFG_APPLY_REQ processing"); + mgmt_be_txn_proc_cfgapply(txn); + + return 0; +} + +static int +mgmt_be_client_handle_msg(struct mgmt_be_client_ctx *client_ctx, + Mgmtd__BeMessage *be_msg) +{ + switch (be_msg->message_case) { + case MGMTD__BE_MESSAGE__MESSAGE_SUBSCR_REPLY: + MGMTD_BE_CLIENT_DBG("Subscribe Reply Msg from mgmt, status %u", + be_msg->subscr_reply->success); + break; + case MGMTD__BE_MESSAGE__MESSAGE_TXN_REQ: + mgmt_be_process_txn_req(client_ctx, + be_msg->txn_req->txn_id, + be_msg->txn_req->create); + break; + case MGMTD__BE_MESSAGE__MESSAGE_CFG_DATA_REQ: + mgmt_be_process_cfgdata_req( + client_ctx, be_msg->cfg_data_req->txn_id, + be_msg->cfg_data_req->batch_id, + be_msg->cfg_data_req->data_req, + be_msg->cfg_data_req->n_data_req, + be_msg->cfg_data_req->end_of_data); + break; + case MGMTD__BE_MESSAGE__MESSAGE_CFG_APPLY_REQ: + mgmt_be_process_cfg_apply( + client_ctx, (uint64_t)be_msg->cfg_apply_req->txn_id); + break; + case MGMTD__BE_MESSAGE__MESSAGE_GET_REQ: + case MGMTD__BE_MESSAGE__MESSAGE_SUBSCR_REQ: + case MGMTD__BE_MESSAGE__MESSAGE_CFG_CMD_REQ: + case MGMTD__BE_MESSAGE__MESSAGE_SHOW_CMD_REQ: + /* + * TODO: Add handling code in future. + */ + break; + /* + * NOTE: The following messages are always sent from Backend + * clients to MGMTd only and/or need not be handled here. + */ + case MGMTD__BE_MESSAGE__MESSAGE_GET_REPLY: + case MGMTD__BE_MESSAGE__MESSAGE_TXN_REPLY: + case MGMTD__BE_MESSAGE__MESSAGE_CFG_DATA_REPLY: + case MGMTD__BE_MESSAGE__MESSAGE_CFG_APPLY_REPLY: + case MGMTD__BE_MESSAGE__MESSAGE_CFG_CMD_REPLY: + case MGMTD__BE_MESSAGE__MESSAGE_SHOW_CMD_REPLY: + case MGMTD__BE_MESSAGE__MESSAGE_NOTIFY_DATA: + case MGMTD__BE_MESSAGE__MESSAGE__NOT_SET: +#if PROTOBUF_C_VERSION_NUMBER >= 1003000 + case _MGMTD__BE_MESSAGE__MESSAGE_IS_INT_SIZE: +#endif + default: + /* + * A 'default' case is being added contrary to the + * FRR code guidelines to take care of build + * failures on certain build systems (courtesy of + * the proto-c package). + */ + break; + } + + return 0; +} + +static int +mgmt_be_client_process_msg(struct mgmt_be_client_ctx *client_ctx, + uint8_t *msg_buf, int bytes_read) +{ + Mgmtd__BeMessage *be_msg; + struct mgmt_be_msg *msg; + uint16_t bytes_left; + uint16_t processed = 0; + + MGMTD_BE_CLIENT_DBG( + "Got message of %d bytes from MGMTD Backend Server", + bytes_read); + + bytes_left = bytes_read; + for (; bytes_left > MGMTD_BE_MSG_HDR_LEN; + bytes_left -= msg->hdr.len, msg_buf += msg->hdr.len) { + msg = (struct mgmt_be_msg *)msg_buf; + if (msg->hdr.marker != MGMTD_BE_MSG_MARKER) { + MGMTD_BE_CLIENT_DBG( + "Marker not found in message from MGMTD '%s'", + client_ctx->client_params.name); + break; + } + + if (bytes_left < msg->hdr.len) { + MGMTD_BE_CLIENT_DBG( + "Incomplete message of %d bytes (epxected: %u) from MGMTD '%s'", + bytes_left, msg->hdr.len, + client_ctx->client_params.name); + break; + } + + be_msg = mgmtd__be_message__unpack( + NULL, (size_t)(msg->hdr.len - MGMTD_BE_MSG_HDR_LEN), + msg->payload); + if (!be_msg) { + MGMTD_BE_CLIENT_DBG( + "Failed to decode %d bytes from MGMTD '%s'", + msg->hdr.len, client_ctx->client_params.name); + continue; + } + + (void)mgmt_be_client_handle_msg(client_ctx, be_msg); + mgmtd__be_message__free_unpacked(be_msg, NULL); + processed++; + client_ctx->num_msg_rx++; + } + + return processed; +} + +static void mgmt_be_client_proc_msgbufs(struct thread *thread) +{ + struct mgmt_be_client_ctx *client_ctx; + struct stream *work; + int processed = 0; + + client_ctx = (struct mgmt_be_client_ctx *)THREAD_ARG(thread); + assert(client_ctx); + + if (client_ctx->conn_fd == 0) + return; + + for (; processed < MGMTD_BE_MAX_NUM_MSG_PROC;) { + work = stream_fifo_pop_safe(client_ctx->ibuf_fifo); + if (!work) + break; + + processed += mgmt_be_client_process_msg( + client_ctx, STREAM_DATA(work), stream_get_endp(work)); + + if (work != client_ctx->ibuf_work) { + /* Free it up */ + stream_free(work); + } else { + /* Reset stream buffer for next read */ + stream_reset(work); + } + } + + /* + * If we have more to process, reschedule for processing it. + */ + if (stream_fifo_head(client_ctx->ibuf_fifo)) + mgmt_be_client_register_event(client_ctx, + MGMTD_BE_PROC_MSG); +} + +static void mgmt_be_client_read(struct thread *thread) +{ + struct mgmt_be_client_ctx *client_ctx; + int bytes_read, msg_cnt; + size_t total_bytes, bytes_left; + struct mgmt_be_msg_hdr *msg_hdr; + bool incomplete = false; + + client_ctx = (struct mgmt_be_client_ctx *)THREAD_ARG(thread); + assert(client_ctx && client_ctx->conn_fd); + + total_bytes = 0; + bytes_left = STREAM_SIZE(client_ctx->ibuf_work) + - stream_get_endp(client_ctx->ibuf_work); + for (; bytes_left > MGMTD_BE_MSG_HDR_LEN;) { + bytes_read = stream_read_try(client_ctx->ibuf_work, + client_ctx->conn_fd, bytes_left); + MGMTD_BE_CLIENT_DBG( + "Got %d bytes of message from MGMTD Backend server", + bytes_read); + /* -2 is normal nothing read, and to retry */ + if (bytes_read == -2) + break; + if (bytes_read <= 0) { + if (bytes_read == 0) { + MGMTD_BE_CLIENT_ERR( + "Got EOF/disconnect while reading from MGMTD Frontend server."); + } else { + MGMTD_BE_CLIENT_ERR( + "Got error (%d) while reading from MGMTD Backend server. Err: '%s'", + bytes_read, safe_strerror(errno)); + } + mgmt_be_server_disconnect(client_ctx, true); + return; + } + total_bytes += bytes_read; + bytes_left -= bytes_read; + } + /* + * Check if we have read complete messages or not. + */ + stream_set_getp(client_ctx->ibuf_work, 0); + total_bytes = 0; + msg_cnt = 0; + bytes_left = stream_get_endp(client_ctx->ibuf_work); + for (; bytes_left > MGMTD_BE_MSG_HDR_LEN;) { + msg_hdr = (struct mgmt_be_msg_hdr + *)(STREAM_DATA(client_ctx->ibuf_work) + + total_bytes); + if (msg_hdr->marker != MGMTD_BE_MSG_MARKER) { + /* Corrupted buffer. Force disconnect?? */ + MGMTD_BE_CLIENT_ERR( + "Received corrupted buffer from MGMTD backend server."); + mgmt_be_server_disconnect(client_ctx, true); + return; + } + if (msg_hdr->len > bytes_left) + break; + + total_bytes += msg_hdr->len; + bytes_left -= msg_hdr->len; + msg_cnt++; + } + + if (!msg_cnt) + goto resched; + + if (bytes_left > 0) + incomplete = true; + + /* + * We have read one or several messages. + * Schedule processing them now. + */ + msg_hdr = + (struct mgmt_be_msg_hdr *)(STREAM_DATA(client_ctx->ibuf_work) + + total_bytes); + stream_set_endp(client_ctx->ibuf_work, total_bytes); + stream_fifo_push(client_ctx->ibuf_fifo, client_ctx->ibuf_work); + client_ctx->ibuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); + if (incomplete) { + stream_put(client_ctx->ibuf_work, msg_hdr, bytes_left); + stream_set_endp(client_ctx->ibuf_work, bytes_left); + } + mgmt_be_client_register_event(client_ctx, MGMTD_BE_PROC_MSG); + +resched: + mgmt_be_client_register_event(client_ctx, MGMTD_BE_CONN_READ); +} + +static inline void +mgmt_be_client_sched_msg_write(struct mgmt_be_client_ctx *client_ctx) +{ + if (!CHECK_FLAG(client_ctx->flags, MGMTD_BE_CLIENT_FLAGS_WRITES_OFF)) + mgmt_be_client_register_event(client_ctx, + MGMTD_BE_CONN_WRITE); +} + +static inline void +mgmt_be_client_writes_on(struct mgmt_be_client_ctx *client_ctx) +{ + MGMTD_BE_CLIENT_DBG("Resume writing msgs"); + UNSET_FLAG(client_ctx->flags, MGMTD_BE_CLIENT_FLAGS_WRITES_OFF); + if (client_ctx->obuf_work + || stream_fifo_count_safe(client_ctx->obuf_fifo)) + mgmt_be_client_sched_msg_write(client_ctx); +} + +static inline void +mgmt_be_client_writes_off(struct mgmt_be_client_ctx *client_ctx) +{ + SET_FLAG(client_ctx->flags, MGMTD_BE_CLIENT_FLAGS_WRITES_OFF); + MGMTD_BE_CLIENT_DBG("Paused writing msgs"); +} + +static int mgmt_be_client_send_msg(struct mgmt_be_client_ctx *client_ctx, + Mgmtd__BeMessage *be_msg) +{ + size_t msg_size; + uint8_t *msg_buf = client_ctx->msg_buf; + struct mgmt_be_msg *msg; + + if (client_ctx->conn_fd == 0) + return -1; + + msg_size = mgmtd__be_message__get_packed_size(be_msg); + msg_size += MGMTD_BE_MSG_HDR_LEN; + if (msg_size > MGMTD_BE_MSG_MAX_LEN) { + MGMTD_BE_CLIENT_ERR( + "Message size %d more than max size'%d. Not sending!'", + (int)msg_size, (int)MGMTD_BE_MSG_MAX_LEN); + return -1; + } + + msg = (struct mgmt_be_msg *)msg_buf; + msg->hdr.marker = MGMTD_BE_MSG_MARKER; + msg->hdr.len = (uint16_t)msg_size; + mgmtd__be_message__pack(be_msg, msg->payload); + + if (!client_ctx->obuf_work) + client_ctx->obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); + if (STREAM_WRITEABLE(client_ctx->obuf_work) < msg_size) { + stream_fifo_push(client_ctx->obuf_fifo, client_ctx->obuf_work); + client_ctx->obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); + } + stream_write(client_ctx->obuf_work, (void *)msg_buf, msg_size); + + mgmt_be_client_sched_msg_write(client_ctx); + client_ctx->num_msg_tx++; + return 0; +} + +static void mgmt_be_client_write(struct thread *thread) +{ + int bytes_written = 0; + int processed = 0; + int msg_size = 0; + struct stream *s = NULL; + struct stream *free = NULL; + struct mgmt_be_client_ctx *client_ctx; + + client_ctx = (struct mgmt_be_client_ctx *)THREAD_ARG(thread); + assert(client_ctx && client_ctx->conn_fd); + + /* Ensure pushing any pending write buffer to FIFO */ + if (client_ctx->obuf_work) { + stream_fifo_push(client_ctx->obuf_fifo, client_ctx->obuf_work); + client_ctx->obuf_work = NULL; + } + + for (s = stream_fifo_head(client_ctx->obuf_fifo); + s && processed < MGMTD_BE_MAX_NUM_MSG_WRITE; + s = stream_fifo_head(client_ctx->obuf_fifo)) { + /* msg_size = (int)stream_get_size(s); */ + msg_size = (int)STREAM_READABLE(s); + bytes_written = stream_flush(s, client_ctx->conn_fd); + if (bytes_written == -1 + && (errno == EAGAIN || errno == EWOULDBLOCK)) { + mgmt_be_client_register_event( + client_ctx, MGMTD_BE_CONN_WRITE); + return; + } else if (bytes_written != msg_size) { + MGMTD_BE_CLIENT_ERR( + "Could not write all %d bytes (wrote: %d) to MGMTD Backend client socket. Err: '%s'", + msg_size, bytes_written, safe_strerror(errno)); + if (bytes_written > 0) { + stream_forward_getp(s, (size_t)bytes_written); + stream_pulldown(s); + mgmt_be_client_register_event( + client_ctx, MGMTD_BE_CONN_WRITE); + return; + } + mgmt_be_server_disconnect(client_ctx, true); + return; + } + + free = stream_fifo_pop(client_ctx->obuf_fifo); + stream_free(free); + MGMTD_BE_CLIENT_DBG( + "Wrote %d bytes of message to MGMTD Backend client socket.'", + bytes_written); + processed++; + } + + if (s) { + mgmt_be_client_writes_off(client_ctx); + mgmt_be_client_register_event(client_ctx, + MGMTD_BE_CONN_WRITES_ON); + } +} + +static void mgmt_be_client_resume_writes(struct thread *thread) +{ + struct mgmt_be_client_ctx *client_ctx; + + client_ctx = (struct mgmt_be_client_ctx *)THREAD_ARG(thread); + assert(client_ctx && client_ctx->conn_fd); + + mgmt_be_client_writes_on(client_ctx); +} + +static int mgmt_be_send_subscr_req(struct mgmt_be_client_ctx *client_ctx, + bool subscr_xpaths, + uint16_t num_reg_xpaths, + char **reg_xpaths) +{ + Mgmtd__BeMessage be_msg; + Mgmtd__BeSubscribeReq subscr_req; + + mgmtd__be_subscribe_req__init(&subscr_req); + subscr_req.client_name = client_ctx->client_params.name; + subscr_req.n_xpath_reg = num_reg_xpaths; + if (num_reg_xpaths) + subscr_req.xpath_reg = reg_xpaths; + else + subscr_req.xpath_reg = NULL; + subscr_req.subscribe_xpaths = subscr_xpaths; + + mgmtd__be_message__init(&be_msg); + be_msg.message_case = MGMTD__BE_MESSAGE__MESSAGE_SUBSCR_REQ; + be_msg.subscr_req = &subscr_req; + + return mgmt_be_client_send_msg(client_ctx, &be_msg); +} + +static int mgmt_be_server_connect(struct mgmt_be_client_ctx *client_ctx) +{ + int ret, sock, len; + struct sockaddr_un addr; + + MGMTD_BE_CLIENT_DBG("Trying to connect to MGMTD Backend server at %s", + MGMTD_BE_SERVER_PATH); + + assert(!client_ctx->conn_fd); + + sock = socket(AF_UNIX, SOCK_STREAM, 0); + if (sock < 0) { + MGMTD_BE_CLIENT_ERR("Failed to create socket"); + goto mgmt_be_server_connect_failed; + } + + MGMTD_BE_CLIENT_DBG( + "Created MGMTD Backend server socket successfully!"); + + memset(&addr, 0, sizeof(struct sockaddr_un)); + addr.sun_family = AF_UNIX; + strlcpy(addr.sun_path, MGMTD_BE_SERVER_PATH, sizeof(addr.sun_path)); +#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN + len = addr.sun_len = SUN_LEN(&addr); +#else + len = sizeof(addr.sun_family) + strlen(addr.sun_path); +#endif /* HAVE_STRUCT_SOCKADDR_UN_SUN_LEN */ + + ret = connect(sock, (struct sockaddr *)&addr, len); + if (ret < 0) { + MGMTD_BE_CLIENT_ERR( + "Failed to connect to MGMTD Backend Server at %s. Err: %s", + addr.sun_path, safe_strerror(errno)); + close(sock); + goto mgmt_be_server_connect_failed; + } + + MGMTD_BE_CLIENT_DBG( + "Connected to MGMTD Backend Server at %s successfully!", + addr.sun_path); + client_ctx->conn_fd = sock; + + /* Make client socket non-blocking. */ + set_nonblocking(sock); + setsockopt_so_sendbuf(client_ctx->conn_fd, + MGMTD_SOCKET_BE_SEND_BUF_SIZE); + setsockopt_so_recvbuf(client_ctx->conn_fd, + MGMTD_SOCKET_BE_RECV_BUF_SIZE); + + mgmt_be_client_register_event(client_ctx, MGMTD_BE_CONN_READ); + + /* Notify client through registered callback (if any) */ + if (client_ctx->client_params.client_connect_notify) + (void)(*client_ctx->client_params + .client_connect_notify)( + (uintptr_t)client_ctx, + client_ctx->client_params.user_data, true); + + /* Send SUBSCRIBE_REQ message */ + if (mgmt_be_send_subscr_req(client_ctx, false, 0, NULL) != 0) + goto mgmt_be_server_connect_failed; + + return 0; + +mgmt_be_server_connect_failed: + if (sock && sock != client_ctx->conn_fd) + close(sock); + + mgmt_be_server_disconnect(client_ctx, true); + return -1; +} + +static void mgmt_be_client_conn_timeout(struct thread *thread) +{ + struct mgmt_be_client_ctx *client_ctx; + + client_ctx = (struct mgmt_be_client_ctx *)THREAD_ARG(thread); + assert(client_ctx); + + mgmt_be_server_connect(client_ctx); +} + +static void +mgmt_be_client_register_event(struct mgmt_be_client_ctx *client_ctx, + enum mgmt_be_event event) +{ + struct timeval tv = {0}; + + switch (event) { + case MGMTD_BE_CONN_READ: + thread_add_read(client_ctx->tm, mgmt_be_client_read, + client_ctx, client_ctx->conn_fd, + &client_ctx->conn_read_ev); + assert(client_ctx->conn_read_ev); + break; + case MGMTD_BE_CONN_WRITE: + thread_add_write(client_ctx->tm, mgmt_be_client_write, + client_ctx, client_ctx->conn_fd, + &client_ctx->conn_write_ev); + assert(client_ctx->conn_write_ev); + break; + case MGMTD_BE_PROC_MSG: + tv.tv_usec = MGMTD_BE_MSG_PROC_DELAY_USEC; + thread_add_timer_tv(client_ctx->tm, + mgmt_be_client_proc_msgbufs, client_ctx, + &tv, &client_ctx->msg_proc_ev); + assert(client_ctx->msg_proc_ev); + break; + case MGMTD_BE_CONN_WRITES_ON: + thread_add_timer_msec( + client_ctx->tm, mgmt_be_client_resume_writes, + client_ctx, MGMTD_BE_MSG_WRITE_DELAY_MSEC, + &client_ctx->conn_writes_on); + assert(client_ctx->conn_writes_on); + break; + case MGMTD_BE_SERVER: + case MGMTD_BE_CONN_INIT: + case MGMTD_BE_SCHED_CFG_PREPARE: + case MGMTD_BE_RESCHED_CFG_PREPARE: + case MGMTD_BE_SCHED_CFG_APPLY: + case MGMTD_BE_RESCHED_CFG_APPLY: + assert(!"mgmt_be_client_post_event() called incorrectly"); + break; + } +} + +static void +mgmt_be_client_schedule_conn_retry(struct mgmt_be_client_ctx *client_ctx, + unsigned long intvl_secs) +{ + MGMTD_BE_CLIENT_DBG( + "Scheduling MGMTD Backend server connection retry after %lu seconds", + intvl_secs); + thread_add_timer(client_ctx->tm, mgmt_be_client_conn_timeout, + (void *)client_ctx, intvl_secs, + &client_ctx->conn_retry_tmr); +} + +extern struct nb_config *running_config; + +/* + * Initialize library and try connecting with MGMTD. + */ +uintptr_t mgmt_be_client_lib_init(struct mgmt_be_client_params *params, + struct thread_master *master_thread) +{ + assert(master_thread && params && strlen(params->name) + && !mgmt_be_client_ctx.tm); + + mgmt_be_client_ctx.tm = master_thread; + + if (!running_config) + assert(!"MGMTD Be Client lib_init() after frr_init() only!"); + mgmt_be_client_ctx.running_config = running_config; + mgmt_be_client_ctx.candidate_config = nb_config_new(NULL); + + memcpy(&mgmt_be_client_ctx.client_params, params, + sizeof(mgmt_be_client_ctx.client_params)); + if (!mgmt_be_client_ctx.client_params.conn_retry_intvl_sec) + mgmt_be_client_ctx.client_params.conn_retry_intvl_sec = + MGMTD_BE_DEFAULT_CONN_RETRY_INTVL_SEC; + + assert(!mgmt_be_client_ctx.ibuf_fifo && !mgmt_be_client_ctx.ibuf_work && + !mgmt_be_client_ctx.obuf_fifo && !mgmt_be_client_ctx.obuf_work); + + mgmt_be_txns_init(&mgmt_be_client_ctx.txn_head); + mgmt_be_client_ctx.ibuf_fifo = stream_fifo_new(); + mgmt_be_client_ctx.ibuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); + mgmt_be_client_ctx.obuf_fifo = stream_fifo_new(); + /* mgmt_be_client_ctx.obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); + */ + mgmt_be_client_ctx.obuf_work = NULL; + + /* Start trying to connect to MGMTD backend server immediately */ + mgmt_be_client_schedule_conn_retry(&mgmt_be_client_ctx, 1); + + MGMTD_BE_CLIENT_DBG("Initialized client '%s'", params->name); + + return (uintptr_t)&mgmt_be_client_ctx; +} + +/* + * Subscribe with MGMTD for one or more YANG subtree(s). + */ +enum mgmt_result mgmt_be_subscribe_yang_data(uintptr_t lib_hndl, + char *reg_yang_xpaths[], + int num_reg_xpaths) +{ + struct mgmt_be_client_ctx *client_ctx; + + client_ctx = (struct mgmt_be_client_ctx *)lib_hndl; + if (!client_ctx) + return MGMTD_INVALID_PARAM; + + if (mgmt_be_send_subscr_req(client_ctx, true, num_reg_xpaths, + reg_yang_xpaths) + != 0) + return MGMTD_INTERNAL_ERROR; + + return MGMTD_SUCCESS; +} + +/* + * Unsubscribe with MGMTD for one or more YANG subtree(s). + */ +enum mgmt_result mgmt_be_unsubscribe_yang_data(uintptr_t lib_hndl, + char *reg_yang_xpaths[], + int num_reg_xpaths) +{ + struct mgmt_be_client_ctx *client_ctx; + + client_ctx = (struct mgmt_be_client_ctx *)lib_hndl; + if (!client_ctx) + return MGMTD_INVALID_PARAM; + + + if (mgmt_be_send_subscr_req(client_ctx, false, num_reg_xpaths, + reg_yang_xpaths) + < 0) + return MGMTD_INTERNAL_ERROR; + + return MGMTD_SUCCESS; +} + +/* + * Send one or more YANG notifications to MGMTD daemon. + */ +enum mgmt_result mgmt_be_send_yang_notify(uintptr_t lib_hndl, + Mgmtd__YangData * data_elems[], + int num_elems) +{ + struct mgmt_be_client_ctx *client_ctx; + + client_ctx = (struct mgmt_be_client_ctx *)lib_hndl; + if (!client_ctx) + return MGMTD_INVALID_PARAM; + + return MGMTD_SUCCESS; +} + +/* + * Destroy library and cleanup everything. + */ +void mgmt_be_client_lib_destroy(uintptr_t lib_hndl) +{ + struct mgmt_be_client_ctx *client_ctx; + + client_ctx = (struct mgmt_be_client_ctx *)lib_hndl; + assert(client_ctx); + + MGMTD_BE_CLIENT_DBG("Destroying MGMTD Backend Client '%s'", + client_ctx->client_params.name); + + mgmt_be_server_disconnect(client_ctx, false); + + assert(mgmt_be_client_ctx.ibuf_fifo && mgmt_be_client_ctx.obuf_fifo); + + stream_fifo_free(mgmt_be_client_ctx.ibuf_fifo); + if (mgmt_be_client_ctx.ibuf_work) + stream_free(mgmt_be_client_ctx.ibuf_work); + stream_fifo_free(mgmt_be_client_ctx.obuf_fifo); + if (mgmt_be_client_ctx.obuf_work) + stream_free(mgmt_be_client_ctx.obuf_work); + + THREAD_OFF(client_ctx->conn_retry_tmr); + THREAD_OFF(client_ctx->conn_read_ev); + THREAD_OFF(client_ctx->conn_write_ev); + THREAD_OFF(client_ctx->conn_writes_on); + THREAD_OFF(client_ctx->msg_proc_ev); + mgmt_be_cleanup_all_txns(client_ctx); + mgmt_be_txns_fini(&client_ctx->txn_head); +} diff --git a/lib/mgmt_be_client.h b/lib/mgmt_be_client.h new file mode 100644 index 000000000..75b4ddf01 --- /dev/null +++ b/lib/mgmt_be_client.h @@ -0,0 +1,273 @@ +// SPDX-License-Identifier: GPL-2.0-or-later +/* + * MGMTD Backend Client Library api interfaces + * Copyright (C) 2021 Vmware, Inc. + * Pushpasis Sarkar <spushpasis@vmware.com> + */ + +#ifndef _FRR_MGMTD_BE_CLIENT_H_ +#define _FRR_MGMTD_BE_CLIENT_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include "mgmtd/mgmt_defines.h" + +/*************************************************************** + * Client IDs + ***************************************************************/ + +/* + * Add enum value for each supported component, wrap with + * #ifdef HAVE_COMPONENT + */ +enum mgmt_be_client_id { + MGMTD_BE_CLIENT_ID_MIN = 0, + MGMTD_BE_CLIENT_ID_INIT = -1, +#if 0 /* example */ +#ifdef HAVE_STATICD + MGMTD_BE_CLIENT_ID_STATICD, +#endif +#endif + MGMTD_BE_CLIENT_ID_MAX +}; + +#define FOREACH_MGMTD_BE_CLIENT_ID(id) \ + for ((id) = MGMTD_BE_CLIENT_ID_MIN; \ + (id) < MGMTD_BE_CLIENT_ID_MAX; (id)++) + +/*************************************************************** + * Constants + ***************************************************************/ + +#define MGMTD_BE_CLIENT_ERROR_STRING_MAX_LEN 32 + +#define MGMTD_BE_DEFAULT_CONN_RETRY_INTVL_SEC 5 + +#define MGMTD_BE_MSG_PROC_DELAY_USEC 10 +#define MGMTD_BE_MAX_NUM_MSG_PROC 500 + +#define MGMTD_BE_MSG_WRITE_DELAY_MSEC 1 +#define MGMTD_BE_MAX_NUM_MSG_WRITE 1000 + +#define GMGD_BE_MAX_NUM_REQ_ITEMS 64 + +#define MGMTD_BE_MSG_MAX_LEN 16384 + +#define MGMTD_SOCKET_BE_SEND_BUF_SIZE 65535 +#define MGMTD_SOCKET_BE_RECV_BUF_SIZE MGMTD_SOCKET_BE_SEND_BUF_SIZE + +#define MGMTD_MAX_CFG_CHANGES_IN_BATCH \ + ((10 * MGMTD_BE_MSG_MAX_LEN) / \ + (MGMTD_MAX_XPATH_LEN + MGMTD_MAX_YANG_VALUE_LEN)) + +/* + * MGMTD_BE_MSG_MAX_LEN must be used 80% + * since there is overhead of google protobuf + * that gets added to sent message + */ +#define MGMTD_BE_CFGDATA_PACKING_EFFICIENCY 0.8 +#define MGMTD_BE_CFGDATA_MAX_MSG_LEN \ + (MGMTD_BE_MSG_MAX_LEN * MGMTD_BE_CFGDATA_PACKING_EFFICIENCY) + +#define MGMTD_BE_MAX_BATCH_IDS_IN_REQ \ + (MGMTD_BE_MSG_MAX_LEN - 128) / sizeof(uint64_t) + +#define MGMTD_BE_CONTAINER_NODE_VAL "<<container>>" + +/*************************************************************** + * Data-structures + ***************************************************************/ + +#define MGMTD_BE_MAX_CLIENTS_PER_XPATH_REG 32 + +struct mgmt_be_msg_hdr { + uint16_t marker; + uint16_t len; /* Includes header */ +}; +#define MGMTD_BE_MSG_HDR_LEN sizeof(struct mgmt_be_msg_hdr) +#define MGMTD_BE_MSG_MARKER 0xfeed + +struct mgmt_be_msg { + struct mgmt_be_msg_hdr hdr; + uint8_t payload[]; +}; + +struct mgmt_be_client_txn_ctx { + uintptr_t *user_ctx; +}; + +/* + * All the client-specific information this library needs to + * initialize itself, setup connection with MGMTD BackEnd interface + * and carry on all required procedures appropriately. + * + * BackEnd clients need to initialise a instance of this structure + * with appropriate data and pass it while calling the API + * to initialize the library (See mgmt_be_client_lib_init for + * more details). + */ +struct mgmt_be_client_params { + char name[MGMTD_CLIENT_NAME_MAX_LEN]; + uintptr_t user_data; + unsigned long conn_retry_intvl_sec; + + void (*client_connect_notify)(uintptr_t lib_hndl, + uintptr_t usr_data, + bool connected); + + void (*client_subscribe_notify)( + uintptr_t lib_hndl, uintptr_t usr_data, + struct nb_yang_xpath **xpath, + enum mgmt_result subscribe_result[], int num_paths); + + void (*txn_notify)( + uintptr_t lib_hndl, uintptr_t usr_data, + struct mgmt_be_client_txn_ctx *txn_ctx, bool destroyed); + + enum mgmt_result (*data_validate)( + uintptr_t lib_hndl, uintptr_t usr_data, + struct mgmt_be_client_txn_ctx *txn_ctx, + struct nb_yang_xpath *xpath, struct nb_yang_value *data, + bool delete, char *error_if_any); + + enum mgmt_result (*data_apply)( + uintptr_t lib_hndl, uintptr_t usr_data, + struct mgmt_be_client_txn_ctx *txn_ctx, + struct nb_yang_xpath *xpath, struct nb_yang_value *data, + bool delete); + + enum mgmt_result (*get_data_elem)( + uintptr_t lib_hndl, uintptr_t usr_data, + struct mgmt_be_client_txn_ctx *txn_ctx, + struct nb_yang_xpath *xpath, struct nb_yang_xpath_elem *elem); + + enum mgmt_result (*get_data)( + uintptr_t lib_hndl, uintptr_t usr_data, + struct mgmt_be_client_txn_ctx *txn_ctx, + struct nb_yang_xpath *xpath, bool keys_only, + struct nb_yang_xpath_elem **elems, int *num_elems, + int *next_key); + + enum mgmt_result (*get_next_data)( + uintptr_t lib_hndl, uintptr_t usr_data, + struct mgmt_be_client_txn_ctx *txn_ctx, + struct nb_yang_xpath *xpath, bool keys_only, + struct nb_yang_xpath_elem **elems, int *num_elems); +}; + +/*************************************************************** + * Global data exported + ***************************************************************/ + +extern const char *mgmt_be_client_names[MGMTD_BE_CLIENT_ID_MAX + 1]; + +static inline const char *mgmt_be_client_id2name(enum mgmt_be_client_id id) +{ + if (id > MGMTD_BE_CLIENT_ID_MAX) + id = MGMTD_BE_CLIENT_ID_MAX; + return mgmt_be_client_names[id]; +} + +static inline enum mgmt_be_client_id +mgmt_be_client_name2id(const char *name) +{ + enum mgmt_be_client_id id; + + FOREACH_MGMTD_BE_CLIENT_ID (id) { + if (!strncmp(mgmt_be_client_names[id], name, + MGMTD_CLIENT_NAME_MAX_LEN)) + return id; + } + + return MGMTD_BE_CLIENT_ID_MAX; +} + +/*************************************************************** + * API prototypes + ***************************************************************/ + +/* + * Initialize library and try connecting with MGMTD. + * + * params + * Backend client parameters. + * + * master_thread + * Thread master. + * + * Returns: + * Backend client lib handler (nothing but address of mgmt_be_client_ctx) + */ +extern uintptr_t +mgmt_be_client_lib_init(struct mgmt_be_client_params *params, + struct thread_master *master_thread); + +/* + * Subscribe with MGMTD for one or more YANG subtree(s). + * + * lib_hndl + * Client library handler. + * + * reg_yang_xpaths + * Yang xpath(s) that needs to be subscribed to. + * + * num_xpaths + * Number of xpaths + * + * Returns: + * MGMTD_SUCCESS on success, MGMTD_* otherwise. + */ +extern enum mgmt_result mgmt_be_subscribe_yang_data(uintptr_t lib_hndl, + char **reg_yang_xpaths, + int num_xpaths); + +/* + * Send one or more YANG notifications to MGMTD daemon. + * + * lib_hndl + * Client library handler. + * + * data_elems + * Yang data elements from data tree. + * + * num_elems + * Number of data elements. + * + * Returns: + * MGMTD_SUCCESS on success, MGMTD_* otherwise. + */ +extern enum mgmt_result +mgmt_be_send_yang_notify(uintptr_t lib_hndl, Mgmtd__YangData **data_elems, + int num_elems); + +/* + * Un-subscribe with MGMTD for one or more YANG subtree(s). + * + * lib_hndl + * Client library handler. + * + * reg_yang_xpaths + * Yang xpath(s) that needs to be un-subscribed from. + * + * num_reg_xpaths + * Number of subscribed xpaths + * + * Returns: + * MGMTD_SUCCESS on success, MGMTD_* otherwise. + */ +enum mgmt_result mgmt_be_unsubscribe_yang_data(uintptr_t lib_hndl, + char **reg_yang_xpaths, + int num_reg_xpaths); + +/* + * Destroy library and cleanup everything. + */ +extern void mgmt_be_client_lib_destroy(uintptr_t lib_hndl); + +#ifdef __cplusplus +} +#endif + +#endif /* _FRR_MGMTD_BE_CLIENT_H_ */ diff --git a/lib/northbound.c b/lib/northbound.c index 0d4c3f0f0..b9b840a60 100644 --- a/lib/northbound.c +++ b/lib/northbound.c @@ -93,7 +93,9 @@ static int nb_node_new_cb(const struct lysc_node *snode, void *arg) { struct nb_node *nb_node; struct lysc_node *sparent, *sparent_list; + struct frr_yang_module_info *module; + module = (struct frr_yang_module_info *)arg; nb_node = XCALLOC(MTYPE_NB_NODE, sizeof(*nb_node)); yang_snode_get_path(snode, YANG_PATH_DATA, nb_node->xpath, sizeof(nb_node->xpath)); @@ -128,6 +130,9 @@ static int nb_node_new_cb(const struct lysc_node *snode, void *arg) assert(snode->priv == NULL); ((struct lysc_node *)snode)->priv = nb_node; + if (module && module->ignore_cbs) + SET_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS); + return YANG_ITER_CONTINUE; } @@ -230,6 +235,9 @@ static unsigned int nb_node_validate_cbs(const struct nb_node *nb_node) { unsigned int error = 0; + if (CHECK_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS)) + return error; + error += nb_node_validate_cb(nb_node, NB_OP_CREATE, !!nb_node->cbs.create, false); error += nb_node_validate_cb(nb_node, NB_OP_MODIFY, @@ -297,6 +305,8 @@ struct nb_config *nb_config_new(struct lyd_node *dnode) config->dnode = yang_dnode_new(ly_native_ctx, true); config->version = 0; + RB_INIT(nb_config_cbs, &config->cfg_chgs); + return config; } @@ -304,6 +314,7 @@ void nb_config_free(struct nb_config *config) { if (config->dnode) yang_dnode_free(config->dnode); + nb_config_diff_del_changes(&config->cfg_chgs); XFREE(MTYPE_NB_CONFIG, config); } @@ -315,6 +326,8 @@ struct nb_config *nb_config_dup(const struct nb_config *config) dup->dnode = yang_dnode_dup(config->dnode); dup->version = config->version; + RB_INIT(nb_config_cbs, &dup->cfg_chgs); + return dup; } @@ -405,7 +418,7 @@ static void nb_config_diff_add_change(struct nb_config_cbs *changes, RB_INSERT(nb_config_cbs, changes, &change->cb); } -static void nb_config_diff_del_changes(struct nb_config_cbs *changes) +void nb_config_diff_del_changes(struct nb_config_cbs *changes) { while (!RB_EMPTY(nb_config_cbs, changes)) { struct nb_config_change *change; @@ -422,8 +435,8 @@ static void nb_config_diff_del_changes(struct nb_config_cbs *changes) * configurations. Given a new subtree, calculate all new YANG data nodes, * excluding default leafs and leaf-lists. This is a recursive function. */ -static void nb_config_diff_created(const struct lyd_node *dnode, uint32_t *seq, - struct nb_config_cbs *changes) +void nb_config_diff_created(const struct lyd_node *dnode, uint32_t *seq, + struct nb_config_cbs *changes) { enum nb_operation operation; struct lyd_node *child; @@ -525,10 +538,16 @@ static inline void nb_config_diff_dnode_log(const char *context, } #endif -/* Calculate the delta between two different configurations. */ -static void nb_config_diff(const struct nb_config *config1, - const struct nb_config *config2, - struct nb_config_cbs *changes) +/* + * Calculate the delta between two different configurations. + * + * NOTE: 'config1' is the reference DB, while 'config2' is + * the DB being compared against 'config1'. Typically 'config1' + * should be the Running DB and 'config2' is the Candidate DB. + */ +void nb_config_diff(const struct nb_config *config1, + const struct nb_config *config2, + struct nb_config_cbs *changes) { struct lyd_node *diff = NULL; const struct lyd_node *root, *dnode; @@ -734,6 +753,169 @@ int nb_candidate_edit(struct nb_config *candidate, return NB_OK; } +static void nb_update_candidate_changes(struct nb_config *candidate, + struct nb_cfg_change *change, + uint32_t *seq) +{ + enum nb_operation oper = change->operation; + char *xpath = change->xpath; + struct lyd_node *root = NULL; + struct lyd_node *dnode; + struct nb_config_cbs *cfg_chgs = &candidate->cfg_chgs; + int op; + + switch (oper) { + case NB_OP_CREATE: + case NB_OP_MODIFY: + root = yang_dnode_get(candidate->dnode, xpath); + break; + case NB_OP_DESTROY: + root = yang_dnode_get(running_config->dnode, xpath); + /* code */ + break; + case NB_OP_MOVE: + case NB_OP_PRE_VALIDATE: + case NB_OP_APPLY_FINISH: + case NB_OP_GET_ELEM: + case NB_OP_GET_NEXT: + case NB_OP_GET_KEYS: + case NB_OP_LOOKUP_ENTRY: + case NB_OP_RPC: + break; + default: + assert(!"non-enum value, invalid"); + } + + if (!root) + return; + + LYD_TREE_DFS_BEGIN (root, dnode) { + op = nb_lyd_diff_get_op(dnode); + switch (op) { + case 'c': + nb_config_diff_created(dnode, seq, cfg_chgs); + LYD_TREE_DFS_continue = 1; + break; + case 'd': + nb_config_diff_deleted(dnode, seq, cfg_chgs); + LYD_TREE_DFS_continue = 1; + break; + case 'r': + nb_config_diff_add_change(cfg_chgs, NB_OP_MODIFY, seq, + dnode); + break; + default: + break; + } + LYD_TREE_DFS_END(root, dnode); + } +} + +static bool nb_is_operation_allowed(struct nb_node *nb_node, + struct nb_cfg_change *change) +{ + enum nb_operation oper = change->operation; + + if (lysc_is_key(nb_node->snode)) { + if (oper == NB_OP_MODIFY || oper == NB_OP_DESTROY) + return false; + } + return true; +} + +void nb_candidate_edit_config_changes( + struct nb_config *candidate_config, struct nb_cfg_change cfg_changes[], + size_t num_cfg_changes, const char *xpath_base, const char *curr_xpath, + int xpath_index, char *err_buf, int err_bufsize, bool *error) +{ + uint32_t seq = 0; + + if (error) + *error = false; + + if (xpath_base == NULL) + xpath_base = ""; + + /* Edit candidate configuration. */ + for (size_t i = 0; i < num_cfg_changes; i++) { + struct nb_cfg_change *change = &cfg_changes[i]; + struct nb_node *nb_node; + char xpath[XPATH_MAXLEN]; + struct yang_data *data; + int ret; + + /* Handle relative XPaths. */ + memset(xpath, 0, sizeof(xpath)); + if (xpath_index > 0 + && (xpath_base[0] == '.' || change->xpath[0] == '.')) + strlcpy(xpath, curr_xpath, sizeof(xpath)); + if (xpath_base[0]) { + if (xpath_base[0] == '.') + strlcat(xpath, xpath_base + 1, sizeof(xpath)); + else + strlcat(xpath, xpath_base, sizeof(xpath)); + } + if (change->xpath[0] == '.') + strlcat(xpath, change->xpath + 1, sizeof(xpath)); + else + strlcpy(xpath, change->xpath, sizeof(xpath)); + + /* Find the northbound node associated to the data path. */ + nb_node = nb_node_find(xpath); + if (!nb_node) { + flog_warn(EC_LIB_YANG_UNKNOWN_DATA_PATH, + "%s: unknown data path: %s", __func__, xpath); + if (error) + *error = true; + continue; + } + /* Find if the node to be edited is not a key node */ + if (!nb_is_operation_allowed(nb_node, change)) { + zlog_err(" Xpath %s points to key node", xpath); + if (error) + *error = true; + break; + } + + /* If the value is not set, get the default if it exists. */ + if (change->value == NULL) + change->value = yang_snode_get_default(nb_node->snode); + data = yang_data_new(xpath, change->value); + + /* + * Ignore "not found" errors when editing the candidate + * configuration. + */ + ret = nb_candidate_edit(candidate_config, nb_node, + change->operation, xpath, NULL, data); + yang_data_free(data); + if (ret != NB_OK && ret != NB_ERR_NOT_FOUND) { + flog_warn( + EC_LIB_NB_CANDIDATE_EDIT_ERROR, + "%s: failed to edit candidate configuration: operation [%s] xpath [%s]", + __func__, nb_operation_name(change->operation), + xpath); + if (error) + *error = true; + continue; + } + nb_update_candidate_changes(candidate_config, change, &seq); + } + + if (error && *error) { + char buf[BUFSIZ]; + + /* + * Failure to edit the candidate configuration should never + * happen in practice, unless there's a bug in the code. When + * that happens, log the error but otherwise ignore it. + */ + snprintf(err_buf, err_bufsize, + "%% Failed to edit configuration.\n\n%s", + yang_print_errors(ly_native_ctx, buf, sizeof(buf))); + } +} + bool nb_candidate_needs_update(const struct nb_config *candidate) { if (candidate->version < running_config->version) @@ -761,8 +943,8 @@ int nb_candidate_update(struct nb_config *candidate) * WARNING: lyd_validate() can change the configuration as part of the * validation process. */ -static int nb_candidate_validate_yang(struct nb_config *candidate, char *errmsg, - size_t errmsg_len) +int nb_candidate_validate_yang(struct nb_config *candidate, char *errmsg, + size_t errmsg_len) { if (lyd_validate_all(&candidate->dnode, ly_native_ctx, LYD_VALIDATE_NO_STATE, NULL) @@ -775,10 +957,10 @@ static int nb_candidate_validate_yang(struct nb_config *candidate, char *errmsg, } /* Perform code-level validation using the northbound callbacks. */ -static int nb_candidate_validate_code(struct nb_context *context, - struct nb_config *candidate, - struct nb_config_cbs *changes, - char *errmsg, size_t errmsg_len) +int nb_candidate_validate_code(struct nb_context *context, + struct nb_config *candidate, + struct nb_config_cbs *changes, char *errmsg, + size_t errmsg_len) { struct nb_config_cb *cb; struct lyd_node *root, *child; @@ -816,6 +998,21 @@ static int nb_candidate_validate_code(struct nb_context *context, return NB_OK; } +int nb_candidate_diff_and_validate_yang(struct nb_context *context, + struct nb_config *candidate, + struct nb_config_cbs *changes, + char *errmsg, size_t errmsg_len) +{ + if (nb_candidate_validate_yang(candidate, errmsg, sizeof(errmsg_len)) + != NB_OK) + return NB_ERR_VALIDATION; + + RB_INIT(nb_config_cbs, changes); + nb_config_diff(running_config, candidate, changes); + + return NB_OK; +} + int nb_candidate_validate(struct nb_context *context, struct nb_config *candidate, char *errmsg, size_t errmsg_len) @@ -823,11 +1020,11 @@ int nb_candidate_validate(struct nb_context *context, struct nb_config_cbs changes; int ret; - if (nb_candidate_validate_yang(candidate, errmsg, errmsg_len) != NB_OK) - return NB_ERR_VALIDATION; + ret = nb_candidate_diff_and_validate_yang(context, candidate, &changes, + errmsg, errmsg_len); + if (ret != NB_OK) + return ret; - RB_INIT(nb_config_cbs, &changes); - nb_config_diff(running_config, candidate, &changes); ret = nb_candidate_validate_code(context, candidate, &changes, errmsg, errmsg_len); nb_config_diff_del_changes(&changes); @@ -839,12 +1036,14 @@ int nb_candidate_commit_prepare(struct nb_context context, struct nb_config *candidate, const char *comment, struct nb_transaction **transaction, + bool skip_validate, bool ignore_zero_change, char *errmsg, size_t errmsg_len) { struct nb_config_cbs changes; - if (nb_candidate_validate_yang(candidate, errmsg, errmsg_len) - != NB_OK) { + if (!skip_validate + && nb_candidate_validate_yang(candidate, errmsg, errmsg_len) + != NB_OK) { flog_warn(EC_LIB_NB_CANDIDATE_INVALID, "%s: failed to validate candidate configuration", __func__); @@ -853,15 +1052,17 @@ int nb_candidate_commit_prepare(struct nb_context context, RB_INIT(nb_config_cbs, &changes); nb_config_diff(running_config, candidate, &changes); - if (RB_EMPTY(nb_config_cbs, &changes)) { + if (!ignore_zero_change && RB_EMPTY(nb_config_cbs, &changes)) { snprintf( errmsg, errmsg_len, "No changes to apply were found during preparation phase"); return NB_ERR_NO_CHANGES; } - if (nb_candidate_validate_code(&context, candidate, &changes, errmsg, - errmsg_len) != NB_OK) { + if (!skip_validate + && nb_candidate_validate_code(&context, candidate, &changes, errmsg, + errmsg_len) + != NB_OK) { flog_warn(EC_LIB_NB_CANDIDATE_INVALID, "%s: failed to validate candidate configuration", __func__); @@ -869,8 +1070,12 @@ int nb_candidate_commit_prepare(struct nb_context context, return NB_ERR_VALIDATION; } - *transaction = nb_transaction_new(context, candidate, &changes, comment, - errmsg, errmsg_len); + /* + * Re-use an existing transaction if provided. Else allocate a new one. + */ + if (!*transaction) + *transaction = nb_transaction_new(context, candidate, &changes, + comment, errmsg, errmsg_len); if (*transaction == NULL) { flog_warn(EC_LIB_NB_TRANSACTION_CREATION_FAILED, "%s: failed to create transaction: %s", __func__, @@ -921,7 +1126,8 @@ int nb_candidate_commit(struct nb_context context, struct nb_config *candidate, int ret; ret = nb_candidate_commit_prepare(context, candidate, comment, - &transaction, errmsg, errmsg_len); + &transaction, false, false, errmsg, + errmsg_len); /* * Apply the changes if the preparation phase succeeded. Otherwise abort * the transaction. @@ -1015,6 +1221,8 @@ static int nb_callback_create(struct nb_context *context, bool unexpected_error = false; int ret; + assert(!CHECK_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS)); + nb_log_config_callback(event, NB_OP_CREATE, dnode); args.context = context; @@ -1064,6 +1272,8 @@ static int nb_callback_modify(struct nb_context *context, bool unexpected_error = false; int ret; + assert(!CHECK_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS)); + nb_log_config_callback(event, NB_OP_MODIFY, dnode); args.context = context; @@ -1113,6 +1323,8 @@ static int nb_callback_destroy(struct nb_context *context, bool unexpected_error = false; int ret; + assert(!CHECK_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS)); + nb_log_config_callback(event, NB_OP_DESTROY, dnode); args.context = context; @@ -1156,6 +1368,8 @@ static int nb_callback_move(struct nb_context *context, bool unexpected_error = false; int ret; + assert(!CHECK_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS)); + nb_log_config_callback(event, NB_OP_MOVE, dnode); args.context = context; @@ -1199,6 +1413,9 @@ static int nb_callback_pre_validate(struct nb_context *context, bool unexpected_error = false; int ret; + if (CHECK_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS)) + return 0; + nb_log_config_callback(NB_EV_VALIDATE, NB_OP_PRE_VALIDATE, dnode); args.dnode = dnode; @@ -1230,6 +1447,9 @@ static void nb_callback_apply_finish(struct nb_context *context, { struct nb_cb_apply_finish_args args = {}; + if (CHECK_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS)) + return; + nb_log_config_callback(NB_EV_APPLY, NB_OP_APPLY_FINISH, dnode); args.context = context; @@ -1245,6 +1465,9 @@ struct yang_data *nb_callback_get_elem(const struct nb_node *nb_node, { struct nb_cb_get_elem_args args = {}; + if (CHECK_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS)) + return NULL; + DEBUGD(&nb_dbg_cbs_state, "northbound callback (get_elem): xpath [%s] list_entry [%p]", xpath, list_entry); @@ -1260,6 +1483,9 @@ const void *nb_callback_get_next(const struct nb_node *nb_node, { struct nb_cb_get_next_args args = {}; + if (CHECK_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS)) + return NULL; + DEBUGD(&nb_dbg_cbs_state, "northbound callback (get_next): node [%s] parent_list_entry [%p] list_entry [%p]", nb_node->xpath, parent_list_entry, list_entry); @@ -1274,6 +1500,9 @@ int nb_callback_get_keys(const struct nb_node *nb_node, const void *list_entry, { struct nb_cb_get_keys_args args = {}; + if (CHECK_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS)) + return 0; + DEBUGD(&nb_dbg_cbs_state, "northbound callback (get_keys): node [%s] list_entry [%p]", nb_node->xpath, list_entry); @@ -1289,6 +1518,9 @@ const void *nb_callback_lookup_entry(const struct nb_node *nb_node, { struct nb_cb_lookup_entry_args args = {}; + if (CHECK_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS)) + return NULL; + DEBUGD(&nb_dbg_cbs_state, "northbound callback (lookup_entry): node [%s] parent_list_entry [%p]", nb_node->xpath, parent_list_entry); @@ -1304,6 +1536,9 @@ int nb_callback_rpc(const struct nb_node *nb_node, const char *xpath, { struct nb_cb_rpc_args args = {}; + if (CHECK_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS)) + return 0; + DEBUGD(&nb_dbg_cbs_rpc, "northbound RPC: %s", xpath); args.xpath = xpath; @@ -1330,6 +1565,9 @@ static int nb_callback_configuration(struct nb_context *context, union nb_resource *resource; int ret = NB_ERR; + if (CHECK_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS)) + return NB_OK; + if (event == NB_EV_VALIDATE) resource = NULL; else @@ -1733,7 +1971,7 @@ static int nb_oper_data_iter_list(const struct nb_node *nb_node, /* Iterate over all list entries. */ do { const struct lysc_node_leaf *skey; - struct yang_list_keys list_keys; + struct yang_list_keys list_keys = {}; char xpath[XPATH_MAXLEN * 2]; int ret; @@ -2391,6 +2629,8 @@ const char *nb_client_name(enum nb_client client) return "Pcep"; case NB_CLIENT_MGMTD_SERVER: return "MGMTD Server"; + case NB_CLIENT_MGMTD_BE: + return "MGMT Backend"; case NB_CLIENT_NONE: return "None"; } @@ -2400,6 +2640,10 @@ const char *nb_client_name(enum nb_client client) static void nb_load_callbacks(const struct frr_yang_module_info *module) { + + if (module->ignore_cbs) + return; + for (size_t i = 0; module->nodes[i].xpath; i++) { struct nb_node *nb_node; uint32_t priority; @@ -2473,7 +2717,8 @@ void nb_init(struct thread_master *tm, /* Initialize the compiled nodes with northbound data */ for (size_t i = 0; i < nmodules; i++) { - yang_snodes_iterate(loaded[i]->info, nb_node_new_cb, 0, NULL); + yang_snodes_iterate(loaded[i]->info, nb_node_new_cb, 0, + (void *)modules[i]); nb_load_callbacks(modules[i]); } diff --git a/lib/northbound.h b/lib/northbound.h index 572d669dc..b63b216b0 100644 --- a/lib/northbound.h +++ b/lib/northbound.h @@ -22,6 +22,39 @@ extern "C" { struct vty; struct debug; +struct nb_yang_xpath_tag { + uint32_t ns; + uint32_t id; +}; + +struct nb_yang_value { + struct lyd_value value; + LY_DATA_TYPE value_type; + uint8_t value_flags; +}; + +struct nb_yang_xpath_elem { + struct nb_yang_xpath_tag tag; + struct nb_yang_value val; +}; + +#define NB_MAX_NUM_KEYS UINT8_MAX +#define NB_MAX_NUM_XPATH_TAGS UINT8_MAX + +struct nb_yang_xpath { + uint8_t length; + struct { + uint8_t num_keys; + struct nb_yang_xpath_elem keys[NB_MAX_NUM_KEYS]; + } tags[NB_MAX_NUM_XPATH_TAGS]; +}; + +#define NB_YANG_XPATH_KEY(__xpath, __indx1, __indx2) \ + ((__xpath->num_tags > __indx1) \ + && (__xpath->tags[__indx1].num_keys > __indx2) \ + ? &__xpath->tags[__indx1].keys[__indx2] \ + : NULL) + /* Northbound events. */ enum nb_event { /* @@ -564,6 +597,8 @@ struct nb_node { #define F_NB_NODE_CONFIG_ONLY 0x01 /* The YANG list doesn't contain key leafs. */ #define F_NB_NODE_KEYLESS_LIST 0x02 +/* Ignore callbacks for this node */ +#define F_NB_NODE_IGNORE_CBS 0x04 /* * HACK: old gcc versions (< 5.x) have a bug that prevents C99 flexible arrays @@ -576,6 +611,12 @@ struct frr_yang_module_info { /* YANG module name. */ const char *name; + /* + * Ignore callbacks for this module. Set this to true to + * load module without any callbacks. + */ + bool ignore_cbs; + /* Northbound callbacks. */ const struct { /* Data path of this YANG node. */ @@ -620,6 +661,7 @@ enum nb_client { NB_CLIENT_GRPC, NB_CLIENT_PCEP, NB_CLIENT_MGMTD_SERVER, + NB_CLIENT_MGMTD_BE, }; /* Northbound context. */ @@ -631,12 +673,6 @@ struct nb_context { const void *user; }; -/* Northbound configuration. */ -struct nb_config { - struct lyd_node *dnode; - uint32_t version; -}; - /* Northbound configuration callback. */ struct nb_config_cb { RB_ENTRY(nb_config_cb) entry; @@ -663,6 +699,13 @@ struct nb_transaction { struct nb_config_cbs changes; }; +/* Northbound configuration. */ +struct nb_config { + struct lyd_node *dnode; + uint32_t version; + struct nb_config_cbs cfg_chgs; +}; + /* Callback function used by nb_oper_data_iterate(). */ typedef int (*nb_oper_data_cb)(const struct lysc_node *snode, struct yang_translator *translator, @@ -832,6 +875,9 @@ extern int nb_candidate_edit(struct nb_config *candidate, const struct yang_data *previous, const struct yang_data *data); +extern void nb_config_diff_created(const struct lyd_node *dnode, uint32_t *seq, + struct nb_config_cbs *changes); + /* * Check if a candidate configuration is outdated and needs to be updated. * @@ -843,6 +889,30 @@ extern int nb_candidate_edit(struct nb_config *candidate, */ extern bool nb_candidate_needs_update(const struct nb_config *candidate); +extern void nb_candidate_edit_config_changes( + struct nb_config *candidate_config, struct nb_cfg_change cfg_changes[], + size_t num_cfg_changes, const char *xpath_base, const char *curr_xpath, + int xpath_index, char *err_buf, int err_bufsize, bool *error); + +extern void nb_config_diff_del_changes(struct nb_config_cbs *changes); + +extern int nb_candidate_diff_and_validate_yang(struct nb_context *context, + struct nb_config *candidate, + struct nb_config_cbs *changes, + char *errmsg, size_t errmsg_len); + +extern void nb_config_diff(const struct nb_config *reference, + const struct nb_config *incremental, + struct nb_config_cbs *changes); + +extern int nb_candidate_validate_yang(struct nb_config *candidate, char *errmsg, + size_t errmsg_len); + +extern int nb_candidate_validate_code(struct nb_context *context, + struct nb_config *candidate, + struct nb_config_cbs *changes, + char *errmsg, size_t errmsg_len); + /* * Update a candidate configuration by rebasing the changes on top of the latest * running configuration. Resolve conflicts automatically by giving preference @@ -922,7 +992,9 @@ extern int nb_candidate_commit_prepare(struct nb_context context, struct nb_config *candidate, const char *comment, struct nb_transaction **transaction, - char *errmsg, size_t errmsg_len); + bool skip_validate, + bool ignore_zero_change, char *errmsg, + size_t errmsg_len); /* * Abort a previously created configuration transaction, releasing all resources diff --git a/lib/northbound_cli.c b/lib/northbound_cli.c index 27db2059b..2ead14cc3 100644 --- a/lib/northbound_cli.c +++ b/lib/northbound_cli.c @@ -141,79 +141,21 @@ static int nb_cli_apply_changes_internal(struct vty *vty, bool clear_pending) { bool error = false; - - if (xpath_base == NULL) - xpath_base = ""; + char buf[BUFSIZ]; VTY_CHECK_XPATH; - /* Edit candidate configuration. */ - for (size_t i = 0; i < vty->num_cfg_changes; i++) { - struct nb_cfg_change *change = &vty->cfg_changes[i]; - struct nb_node *nb_node; - char xpath[XPATH_MAXLEN]; - struct yang_data *data; - int ret; - - /* Handle relative XPaths. */ - memset(xpath, 0, sizeof(xpath)); - if (vty->xpath_index > 0 - && (xpath_base[0] == '.' || change->xpath[0] == '.')) - strlcpy(xpath, VTY_CURR_XPATH, sizeof(xpath)); - if (xpath_base[0]) { - if (xpath_base[0] == '.') - strlcat(xpath, xpath_base + 1, sizeof(xpath)); - else - strlcat(xpath, xpath_base, sizeof(xpath)); - } - if (change->xpath[0] == '.') - strlcat(xpath, change->xpath + 1, sizeof(xpath)); - else - strlcpy(xpath, change->xpath, sizeof(xpath)); - - /* Find the northbound node associated to the data path. */ - nb_node = nb_node_find(xpath); - if (!nb_node) { - flog_warn(EC_LIB_YANG_UNKNOWN_DATA_PATH, - "%s: unknown data path: %s", __func__, xpath); - error = true; - continue; - } - - /* If the value is not set, get the default if it exists. */ - if (change->value == NULL) - change->value = yang_snode_get_default(nb_node->snode); - data = yang_data_new(xpath, change->value); - - /* - * Ignore "not found" errors when editing the candidate - * configuration. - */ - ret = nb_candidate_edit(vty->candidate_config, nb_node, - change->operation, xpath, NULL, data); - yang_data_free(data); - if (ret != NB_OK && ret != NB_ERR_NOT_FOUND) { - flog_warn( - EC_LIB_NB_CANDIDATE_EDIT_ERROR, - "%s: failed to edit candidate configuration: operation [%s] xpath [%s]", - __func__, nb_operation_name(change->operation), - xpath); - error = true; - continue; - } - } - + nb_candidate_edit_config_changes( + vty->candidate_config, vty->cfg_changes, vty->num_cfg_changes, + xpath_base, VTY_CURR_XPATH, vty->xpath_index, buf, sizeof(buf), + &error); if (error) { - char buf[BUFSIZ]; - /* * Failure to edit the candidate configuration should never * happen in practice, unless there's a bug in the code. When * that happens, log the error but otherwise ignore it. */ - vty_out(vty, "%% Failed to edit configuration.\n\n"); - vty_out(vty, "%s", - yang_print_errors(ly_native_ctx, buf, sizeof(buf))); + vty_out(vty, "%s", buf); } /* diff --git a/lib/northbound_confd.c b/lib/northbound_confd.c index 2b57ff270..ee1956851 100644 --- a/lib/northbound_confd.c +++ b/lib/northbound_confd.c @@ -312,7 +312,8 @@ static void frr_confd_cdb_read_cb_prepare(int fd, int *subp, int reslen) transaction = NULL; context.client = NB_CLIENT_CONFD; ret = nb_candidate_commit_prepare(context, candidate, NULL, - &transaction, errmsg, sizeof(errmsg)); + &transaction, false, false, errmsg, + sizeof(errmsg)); if (ret != NB_OK && ret != NB_ERR_NO_CHANGES) { enum confd_errcode errcode; diff --git a/lib/northbound_grpc.cpp b/lib/northbound_grpc.cpp index 1459146ea..274a0ca45 100644 --- a/lib/northbound_grpc.cpp +++ b/lib/northbound_grpc.cpp @@ -825,7 +825,8 @@ HandleUnaryCommit(UnaryRpcState<frr::CommitRequest, frr::CommitResponse> *tag) grpc_debug("`-> Performing PREPARE"); ret = nb_candidate_commit_prepare( context, candidate->config, comment.c_str(), - &candidate->transaction, errmsg, sizeof(errmsg)); + &candidate->transaction, false, false, errmsg, + sizeof(errmsg)); break; case frr::CommitRequest::ABORT: grpc_debug("`-> Performing ABORT"); diff --git a/lib/northbound_sysrepo.c b/lib/northbound_sysrepo.c index 096414ff2..337fb690d 100644 --- a/lib/northbound_sysrepo.c +++ b/lib/northbound_sysrepo.c @@ -269,7 +269,8 @@ static int frr_sr_config_change_cb_prepare(sr_session_ctx_t *session, * required to apply them. */ ret = nb_candidate_commit_prepare(context, candidate, NULL, - &transaction, errmsg, sizeof(errmsg)); + &transaction, false, false, errmsg, + sizeof(errmsg)); if (ret != NB_OK && ret != NB_ERR_NO_CHANGES) flog_warn( EC_LIB_LIBSYSREPO, diff --git a/lib/subdir.am b/lib/subdir.am index eb7cbc4f4..84a35417e 100644 --- a/lib/subdir.am +++ b/lib/subdir.am @@ -64,6 +64,7 @@ lib_libfrr_la_SOURCES = \ lib/log_vty.c \ lib/md5.c \ lib/memory.c \ + lib/mgmt_be_client.c \ lib/mgmt_fe_client.c \ lib/mlag.c \ lib/module.c \ @@ -241,6 +242,7 @@ pkginclude_HEADERS += \ lib/md5.h \ lib/memory.h \ lib/mgmt.pb-c.h \ + lib/mgmt_be_client.h \ lib/mgmt_fe_client.h \ lib/mgmt_pb.h \ lib/module.h \ diff --git a/mgmtd/mgmt.c b/mgmtd/mgmt.c index dd0929e0c..34db900af 100644 --- a/mgmtd/mgmt.c +++ b/mgmtd/mgmt.c @@ -8,6 +8,8 @@ #include <zebra.h> #include "mgmtd/mgmt.h" +#include "mgmtd/mgmt_be_server.h" +#include "mgmtd/mgmt_be_adapter.h" #include "mgmtd/mgmt_fe_server.h" #include "mgmtd/mgmt_fe_adapter.h" #include "mgmtd/mgmt_ds.h" @@ -47,9 +49,15 @@ void mgmt_init(void) /* Initialize datastores */ mgmt_ds_init(mm); + /* Initialize the MGMTD Backend Adapter Module */ + mgmt_be_adapter_init(mm->master); + /* Initialize the MGMTD Frontend Adapter Module */ mgmt_fe_adapter_init(mm->master, mm); + /* Start the MGMTD Backend Server for clients to connect */ + mgmt_be_server_init(mm->master); + /* Start the MGMTD Frontend Server for clients to connect */ mgmt_fe_server_init(mm->master); @@ -61,5 +69,7 @@ void mgmt_terminate(void) { mgmt_fe_server_destroy(); mgmt_fe_adapter_destroy(); + mgmt_be_server_destroy(); + mgmt_be_adapter_destroy(); mgmt_ds_destroy(); } diff --git a/mgmtd/mgmt_be_adapter.c b/mgmtd/mgmt_be_adapter.c new file mode 100644 index 000000000..f5385d218 --- /dev/null +++ b/mgmtd/mgmt_be_adapter.c @@ -0,0 +1,1288 @@ +// SPDX-License-Identifier: GPL-2.0-or-later +/* + * MGMTD Backend Client Connection Adapter + * + * Copyright (C) 2021 Vmware, Inc. + * Pushpasis Sarkar <spushpasis@vmware.com> + */ + +#include <zebra.h> +#include "thread.h" +#include "sockopt.h" +#include "network.h" +#include "libfrr.h" +#include "mgmt_pb.h" +#include "mgmtd/mgmt.h" +#include "mgmtd/mgmt_memory.h" +#include "mgmt_be_client.h" +#include "mgmtd/mgmt_be_adapter.h" + +#ifdef REDIRECT_DEBUG_TO_STDERR +#define MGMTD_BE_ADAPTER_DBG(fmt, ...) \ + fprintf(stderr, "%s: " fmt "\n", __func__, ##__VA_ARGS__) +#define MGMTD_BE_ADAPTER_ERR(fmt, ...) \ + fprintf(stderr, "%s: ERROR, " fmt "\n", __func__, ##__VA_ARGS__) +#else /* REDIRECT_DEBUG_TO_STDERR */ +#define MGMTD_BE_ADAPTER_DBG(fmt, ...) \ + do { \ + if (mgmt_debug_be) \ + zlog_debug("%s: " fmt, __func__, ##__VA_ARGS__); \ + } while (0) +#define MGMTD_BE_ADAPTER_ERR(fmt, ...) \ + zlog_err("%s: ERROR: " fmt, __func__, ##__VA_ARGS__) +#endif /* REDIRECT_DEBUG_TO_STDERR */ + +#define FOREACH_ADAPTER_IN_LIST(adapter) \ + frr_each_safe (mgmt_be_adapters, &mgmt_be_adapters, (adapter)) + +/* + * Static mapping of YANG XPath regular expressions and + * the corresponding interested backend clients. + * NOTE: Thiis is a static mapping defined by all MGMTD + * backend client modules (for now, till we develop a + * more dynamic way of creating and updating this map). + * A running map is created by MGMTD in run-time to + * handle real-time mapping of YANG xpaths to one or + * more interested backend client adapters. + * + * Please see xpath_map_reg[] in lib/mgmt_be_client.c + * for the actual map + */ +struct mgmt_be_xpath_map_reg { + const char *xpath_regexp; /* Longest matching regular expression */ + enum mgmt_be_client_id *be_clients; /* clients to notify */ +}; + +struct mgmt_be_xpath_regexp_map { + const char *xpath_regexp; + struct mgmt_be_client_subscr_info be_subscrs; +}; + +struct mgmt_be_get_adapter_config_params { + struct mgmt_be_client_adapter *adapter; + struct nb_config_cbs *cfg_chgs; + uint32_t seq; +}; + +/* + * Static mapping of YANG XPath regular expressions and + * the corresponding interested backend clients. + * NOTE: Thiis is a static mapping defined by all MGMTD + * backend client modules (for now, till we develop a + * more dynamic way of creating and updating this map). + * A running map is created by MGMTD in run-time to + * handle real-time mapping of YANG xpaths to one or + * more interested backend client adapters. + */ +static const struct mgmt_be_xpath_map_reg xpath_static_map_reg[] = { + {.xpath_regexp = "/frr-vrf:lib/*", + .be_clients = + (enum mgmt_be_client_id[]){ +#if 0 +#if HAVE_STATICD + MGMTD_BE_CLIENT_ID_STATICD, +#endif +#endif + MGMTD_BE_CLIENT_ID_MAX}}, + {.xpath_regexp = "/frr-interface:lib/*", + .be_clients = + (enum mgmt_be_client_id[]){ +#if 0 +#if HAVE_STATICD + MGMTD_BE_CLIENT_ID_STATICD, +#endif +#endif + MGMTD_BE_CLIENT_ID_MAX}}, + {.xpath_regexp = + "/frr-routing:routing/control-plane-protocols/control-plane-protocol[type='frr-staticd:staticd'][name='staticd'][vrf='default']/frr-staticd:staticd/*", + + .be_clients = + (enum mgmt_be_client_id[]){ +#if 0 +#if HAVE_STATICD + MGMTD_BE_CLIENT_ID_STATICD, +#endif +#endif + MGMTD_BE_CLIENT_ID_MAX}}, +}; + +#define MGMTD_BE_MAX_NUM_XPATH_MAP 256 +static struct mgmt_be_xpath_regexp_map + mgmt_xpath_map[MGMTD_BE_MAX_NUM_XPATH_MAP]; +static int mgmt_num_xpath_maps; + +static struct thread_master *mgmt_be_adapter_tm; + +static struct mgmt_be_adapters_head mgmt_be_adapters; + +static struct mgmt_be_client_adapter + *mgmt_be_adapters_by_id[MGMTD_BE_CLIENT_ID_MAX]; + +/* Forward declarations */ +static void +mgmt_be_adapter_register_event(struct mgmt_be_client_adapter *adapter, + enum mgmt_be_event event); + +static struct mgmt_be_client_adapter * +mgmt_be_find_adapter_by_fd(int conn_fd) +{ + struct mgmt_be_client_adapter *adapter; + + FOREACH_ADAPTER_IN_LIST (adapter) { + if (adapter->conn_fd == conn_fd) + return adapter; + } + + return NULL; +} + +static struct mgmt_be_client_adapter * +mgmt_be_find_adapter_by_name(const char *name) +{ + struct mgmt_be_client_adapter *adapter; + + FOREACH_ADAPTER_IN_LIST (adapter) { + if (!strncmp(adapter->name, name, sizeof(adapter->name))) + return adapter; + } + + return NULL; +} + +static void +mgmt_be_cleanup_adapters(void) +{ + struct mgmt_be_client_adapter *adapter; + + FOREACH_ADAPTER_IN_LIST (adapter) + mgmt_be_adapter_unlock(&adapter); +} + +static void mgmt_be_xpath_map_init(void) +{ + int indx, num_xpath_maps; + uint16_t indx1; + enum mgmt_be_client_id id; + + MGMTD_BE_ADAPTER_DBG("Init XPath Maps"); + + num_xpath_maps = (int)array_size(xpath_static_map_reg); + for (indx = 0; indx < num_xpath_maps; indx++) { + MGMTD_BE_ADAPTER_DBG(" - XPATH: '%s'", + xpath_static_map_reg[indx].xpath_regexp); + mgmt_xpath_map[indx].xpath_regexp = + xpath_static_map_reg[indx].xpath_regexp; + for (indx1 = 0;; indx1++) { + id = xpath_static_map_reg[indx].be_clients[indx1]; + if (id == MGMTD_BE_CLIENT_ID_MAX) + break; + MGMTD_BE_ADAPTER_DBG(" -- Client: %s Id: %u", + mgmt_be_client_id2name(id), + id); + if (id < MGMTD_BE_CLIENT_ID_MAX) { + mgmt_xpath_map[indx] + .be_subscrs.xpath_subscr[id] + .validate_config = 1; + mgmt_xpath_map[indx] + .be_subscrs.xpath_subscr[id] + .notify_config = 1; + mgmt_xpath_map[indx] + .be_subscrs.xpath_subscr[id] + .own_oper_data = 1; + } + } + } + + mgmt_num_xpath_maps = indx; + MGMTD_BE_ADAPTER_DBG("Total XPath Maps: %u", mgmt_num_xpath_maps); +} + +static int mgmt_be_eval_regexp_match(const char *xpath_regexp, + const char *xpath) +{ + int match_len = 0, re_indx = 0, xp_indx = 0; + int rexp_len, xpath_len; + bool match = true, re_wild = false, xp_wild = false; + bool delim = false, enter_wild_match = false; + char wild_delim = 0; + + rexp_len = strlen(xpath_regexp); + xpath_len = strlen(xpath); + + /* + * Remove the trailing wildcard from the regexp and Xpath. + */ + if (rexp_len && xpath_regexp[rexp_len-1] == '*') + rexp_len--; + if (xpath_len && xpath[xpath_len-1] == '*') + xpath_len--; + + if (!rexp_len || !xpath_len) + return 0; + + for (re_indx = 0, xp_indx = 0; + match && re_indx < rexp_len && xp_indx < xpath_len;) { + match = (xpath_regexp[re_indx] == xpath[xp_indx]); + + /* + * Check if we need to enter wildcard matching. + */ + if (!enter_wild_match && !match && + (xpath_regexp[re_indx] == '*' + || xpath[xp_indx] == '*')) { + /* + * Found wildcard + */ + enter_wild_match = + (xpath_regexp[re_indx-1] == '/' + || xpath_regexp[re_indx-1] == '\'' + || xpath[xp_indx-1] == '/' + || xpath[xp_indx-1] == '\''); + if (enter_wild_match) { + if (xpath_regexp[re_indx] == '*') { + /* + * Begin RE wildcard match. + */ + re_wild = true; + wild_delim = xpath_regexp[re_indx-1]; + } else if (xpath[xp_indx] == '*') { + /* + * Begin XP wildcard match. + */ + xp_wild = true; + wild_delim = xpath[xp_indx-1]; + } + } + } + + /* + * Check if we need to exit wildcard matching. + */ + if (enter_wild_match) { + if (re_wild && xpath[xp_indx] == wild_delim) { + /* + * End RE wildcard matching. + */ + re_wild = false; + if (re_indx < rexp_len-1) + re_indx++; + enter_wild_match = false; + } else if (xp_wild + && xpath_regexp[re_indx] == wild_delim) { + /* + * End XP wildcard matching. + */ + xp_wild = false; + if (xp_indx < xpath_len-1) + xp_indx++; + enter_wild_match = false; + } + } + + match = (xp_wild || re_wild + || xpath_regexp[re_indx] == xpath[xp_indx]); + + /* + * Check if we found a delimiter in both the Xpaths + */ + if ((xpath_regexp[re_indx] == '/' + && xpath[xp_indx] == '/') + || (xpath_regexp[re_indx] == ']' + && xpath[xp_indx] == ']') + || (xpath_regexp[re_indx] == '[' + && xpath[xp_indx] == '[')) { + /* + * Increment the match count if we have a + * new delimiter. + */ + if (match && re_indx && xp_indx && !delim) + match_len++; + delim = true; + } else { + delim = false; + } + + /* + * Proceed to the next character in the RE/XP string as + * necessary. + */ + if (!re_wild) + re_indx++; + if (!xp_wild) + xp_indx++; + } + + /* + * If we finished matching and the last token was a full match + * increment the match count appropriately. + */ + if (match && !delim && + (xpath_regexp[re_indx] == '/' + || xpath_regexp[re_indx] == ']')) + match_len++; + + return match_len; +} + +static void mgmt_be_adapter_disconnect(struct mgmt_be_client_adapter *adapter) +{ + if (adapter->conn_fd >= 0) { + close(adapter->conn_fd); + adapter->conn_fd = -1; + } + + /* + * TODO: Notify about client disconnect for appropriate cleanup + * mgmt_txn_notify_be_adapter_conn(adapter, false); + */ + + if (adapter->id < MGMTD_BE_CLIENT_ID_MAX) { + mgmt_be_adapters_by_id[adapter->id] = NULL; + adapter->id = MGMTD_BE_CLIENT_ID_MAX; + } + + mgmt_be_adapters_del(&mgmt_be_adapters, adapter); + + mgmt_be_adapter_unlock(&adapter); +} + +static void +mgmt_be_adapter_cleanup_old_conn(struct mgmt_be_client_adapter *adapter) +{ + struct mgmt_be_client_adapter *old; + + FOREACH_ADAPTER_IN_LIST (old) { + if (old != adapter + && !strncmp(adapter->name, old->name, sizeof(adapter->name))) { + /* + * We have a Zombie lingering around + */ + MGMTD_BE_ADAPTER_DBG( + "Client '%s' (FD:%d) seems to have reconnected. Removing old connection (FD:%d)!", + adapter->name, adapter->conn_fd, old->conn_fd); + mgmt_be_adapter_disconnect(old); + } + } +} + +static int +mgmt_be_adapter_handle_msg(struct mgmt_be_client_adapter *adapter, + Mgmtd__BeMessage *be_msg) +{ + switch (be_msg->message_case) { + case MGMTD__BE_MESSAGE__MESSAGE_SUBSCR_REQ: + MGMTD_BE_ADAPTER_DBG( + "Got Subscribe Req Msg from '%s' to %sregister %u xpaths", + be_msg->subscr_req->client_name, + !be_msg->subscr_req->subscribe_xpaths + && be_msg->subscr_req->n_xpath_reg + ? "de" + : "", + (uint32_t)be_msg->subscr_req->n_xpath_reg); + + if (strlen(be_msg->subscr_req->client_name)) { + strlcpy(adapter->name, be_msg->subscr_req->client_name, + sizeof(adapter->name)); + adapter->id = mgmt_be_client_name2id(adapter->name); + if (adapter->id >= MGMTD_BE_CLIENT_ID_MAX) { + MGMTD_BE_ADAPTER_ERR( + "Unable to resolve adapter '%s' to a valid ID. Disconnecting!", + adapter->name); + mgmt_be_adapter_disconnect(adapter); + } + mgmt_be_adapters_by_id[adapter->id] = adapter; + mgmt_be_adapter_cleanup_old_conn(adapter); + } + break; + case MGMTD__BE_MESSAGE__MESSAGE_TXN_REPLY: + MGMTD_BE_ADAPTER_DBG( + "Got %s TXN_REPLY Msg for Txn-Id 0x%llx from '%s' with '%s'", + be_msg->txn_reply->create ? "Create" : "Delete", + (unsigned long long)be_msg->txn_reply->txn_id, + adapter->name, + be_msg->txn_reply->success ? "success" : "failure"); + /* + * TODO: Forward the TXN_REPLY to txn module. + * mgmt_txn_notify_be_txn_reply( + * be_msg->txn_reply->txn_id, + * be_msg->txn_reply->create, + * be_msg->txn_reply->success, adapter); + */ + break; + case MGMTD__BE_MESSAGE__MESSAGE_CFG_DATA_REPLY: + MGMTD_BE_ADAPTER_DBG( + "Got CFGDATA_REPLY Msg from '%s' for Txn-Id 0x%llx Batch-Id 0x%llx with Err:'%s'", + adapter->name, + (unsigned long long)be_msg->cfg_data_reply->txn_id, + (unsigned long long)be_msg->cfg_data_reply->batch_id, + be_msg->cfg_data_reply->error_if_any + ? be_msg->cfg_data_reply->error_if_any + : "None"); + /* + * TODO: Forward the CGFData-create reply to txn module. + * mgmt_txn_notify_be_cfgdata_reply( + * be_msg->cfg_data_reply->txn_id, + * be_msg->cfg_data_reply->batch_id, + * be_msg->cfg_data_reply->success, + * be_msg->cfg_data_reply->error_if_any, adapter); + */ + break; + case MGMTD__BE_MESSAGE__MESSAGE_CFG_APPLY_REPLY: + MGMTD_BE_ADAPTER_DBG( + "Got %s CFG_APPLY_REPLY Msg from '%s' for Txn-Id 0x%llx for %d batches (Id 0x%llx-0x%llx), Err:'%s'", + be_msg->cfg_apply_reply->success ? "successful" + : "failed", + adapter->name, + (unsigned long long) + be_msg->cfg_apply_reply->txn_id, + (int)be_msg->cfg_apply_reply->n_batch_ids, + (unsigned long long) + be_msg->cfg_apply_reply->batch_ids[0], + (unsigned long long)be_msg->cfg_apply_reply + ->batch_ids[be_msg->cfg_apply_reply + ->n_batch_ids + - 1], + be_msg->cfg_apply_reply->error_if_any + ? be_msg->cfg_apply_reply->error_if_any + : "None"); + /* TODO: Forward the CGFData-apply reply to txn module. + * mgmt_txn_notify_be_cfg_apply_reply( + * be_msg->cfg_apply_reply->txn_id, + * be_msg->cfg_apply_reply->success, + * (uint64_t *)be_msg->cfg_apply_reply->batch_ids, + * be_msg->cfg_apply_reply->n_batch_ids, + * be_msg->cfg_apply_reply->error_if_any, adapter); + */ + break; + case MGMTD__BE_MESSAGE__MESSAGE_GET_REPLY: + case MGMTD__BE_MESSAGE__MESSAGE_CFG_CMD_REPLY: + case MGMTD__BE_MESSAGE__MESSAGE_SHOW_CMD_REPLY: + case MGMTD__BE_MESSAGE__MESSAGE_NOTIFY_DATA: + /* + * TODO: Add handling code in future. + */ + break; + /* + * NOTE: The following messages are always sent from MGMTD to + * Backend clients only and/or need not be handled on MGMTd. + */ + case MGMTD__BE_MESSAGE__MESSAGE_SUBSCR_REPLY: + case MGMTD__BE_MESSAGE__MESSAGE_GET_REQ: + case MGMTD__BE_MESSAGE__MESSAGE_TXN_REQ: + case MGMTD__BE_MESSAGE__MESSAGE_CFG_DATA_REQ: + case MGMTD__BE_MESSAGE__MESSAGE_CFG_APPLY_REQ: + case MGMTD__BE_MESSAGE__MESSAGE_CFG_CMD_REQ: + case MGMTD__BE_MESSAGE__MESSAGE_SHOW_CMD_REQ: + case MGMTD__BE_MESSAGE__MESSAGE__NOT_SET: +#if PROTOBUF_C_VERSION_NUMBER >= 1003000 + case _MGMTD__BE_MESSAGE__MESSAGE_IS_INT_SIZE: +#endif + default: + /* + * A 'default' case is being added contrary to the + * FRR code guidelines to take care of build + * failures on certain build systems (courtesy of + * the proto-c package). + */ + break; + } + + return 0; +} + +static inline void +mgmt_be_adapter_sched_msg_write(struct mgmt_be_client_adapter *adapter) +{ + if (!CHECK_FLAG(adapter->flags, MGMTD_BE_ADAPTER_FLAGS_WRITES_OFF)) + mgmt_be_adapter_register_event(adapter, MGMTD_BE_CONN_WRITE); +} + +static inline void +mgmt_be_adapter_writes_on(struct mgmt_be_client_adapter *adapter) +{ + MGMTD_BE_ADAPTER_DBG("Resume writing msgs for '%s'", adapter->name); + UNSET_FLAG(adapter->flags, MGMTD_BE_ADAPTER_FLAGS_WRITES_OFF); + if (adapter->obuf_work || stream_fifo_count_safe(adapter->obuf_fifo)) + mgmt_be_adapter_sched_msg_write(adapter); +} + +static inline void +mgmt_be_adapter_writes_off(struct mgmt_be_client_adapter *adapter) +{ + SET_FLAG(adapter->flags, MGMTD_BE_ADAPTER_FLAGS_WRITES_OFF); + MGMTD_BE_ADAPTER_DBG("Pause writing msgs for '%s'", adapter->name); +} + +static int mgmt_be_adapter_send_msg(struct mgmt_be_client_adapter *adapter, + Mgmtd__BeMessage *be_msg) +{ + size_t msg_size; + uint8_t *msg_buf = adapter->msg_buf; + struct mgmt_be_msg *msg; + + if (adapter->conn_fd < 0) + return -1; + + msg_size = mgmtd__be_message__get_packed_size(be_msg); + msg_size += MGMTD_BE_MSG_HDR_LEN; + if (msg_size > MGMTD_BE_MSG_MAX_LEN) { + MGMTD_BE_ADAPTER_ERR( + "Message size %d more than max size'%d. Not sending!'", + (int)msg_size, (int)MGMTD_BE_MSG_MAX_LEN); + return -1; + } + + msg = (struct mgmt_be_msg *)msg_buf; + msg->hdr.marker = MGMTD_BE_MSG_MARKER; + msg->hdr.len = (uint16_t)msg_size; + mgmtd__be_message__pack(be_msg, msg->payload); + + if (!adapter->obuf_work) + adapter->obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); + if (STREAM_WRITEABLE(adapter->obuf_work) < msg_size) { + stream_fifo_push(adapter->obuf_fifo, adapter->obuf_work); + adapter->obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); + } + stream_write(adapter->obuf_work, (void *)msg_buf, msg_size); + + mgmt_be_adapter_sched_msg_write(adapter); + adapter->num_msg_tx++; + return 0; +} + +static int mgmt_be_send_txn_req(struct mgmt_be_client_adapter *adapter, + uint64_t txn_id, bool create) +{ + Mgmtd__BeMessage be_msg; + Mgmtd__BeTxnReq txn_req; + + mgmtd__be_txn_req__init(&txn_req); + txn_req.create = create; + txn_req.txn_id = txn_id; + + mgmtd__be_message__init(&be_msg); + be_msg.message_case = MGMTD__BE_MESSAGE__MESSAGE_TXN_REQ; + be_msg.txn_req = &txn_req; + + MGMTD_BE_ADAPTER_DBG( + "Sending TXN_REQ message to Backend client '%s' for Txn-Id %llx", + adapter->name, (unsigned long long)txn_id); + + return mgmt_be_adapter_send_msg(adapter, &be_msg); +} + +static int +mgmt_be_send_cfgdata_create_req(struct mgmt_be_client_adapter *adapter, + uint64_t txn_id, uint64_t batch_id, + Mgmtd__YangCfgDataReq **cfgdata_reqs, + size_t num_reqs, bool end_of_data) +{ + Mgmtd__BeMessage be_msg; + Mgmtd__BeCfgDataCreateReq cfgdata_req; + + mgmtd__be_cfg_data_create_req__init(&cfgdata_req); + cfgdata_req.batch_id = batch_id; + cfgdata_req.txn_id = txn_id; + cfgdata_req.data_req = cfgdata_reqs; + cfgdata_req.n_data_req = num_reqs; + cfgdata_req.end_of_data = end_of_data; + + mgmtd__be_message__init(&be_msg); + be_msg.message_case = MGMTD__BE_MESSAGE__MESSAGE_CFG_DATA_REQ; + be_msg.cfg_data_req = &cfgdata_req; + + MGMTD_BE_ADAPTER_DBG( + "Sending CFGDATA_CREATE_REQ message to Backend client '%s' for Txn-Id %llx, Batch-Id: %llx", + adapter->name, (unsigned long long)txn_id, + (unsigned long long)batch_id); + + return mgmt_be_adapter_send_msg(adapter, &be_msg); +} + +static int mgmt_be_send_cfgapply_req(struct mgmt_be_client_adapter *adapter, + uint64_t txn_id) +{ + Mgmtd__BeMessage be_msg; + Mgmtd__BeCfgDataApplyReq apply_req; + + mgmtd__be_cfg_data_apply_req__init(&apply_req); + apply_req.txn_id = txn_id; + + mgmtd__be_message__init(&be_msg); + be_msg.message_case = MGMTD__BE_MESSAGE__MESSAGE_CFG_APPLY_REQ; + be_msg.cfg_apply_req = &apply_req; + + MGMTD_BE_ADAPTER_DBG( + "Sending CFG_APPLY_REQ message to Backend client '%s' for Txn-Id 0x%llx", + adapter->name, (unsigned long long)txn_id); + + return mgmt_be_adapter_send_msg(adapter, &be_msg); +} + +static uint16_t +mgmt_be_adapter_process_msg(struct mgmt_be_client_adapter *adapter, + uint8_t *msg_buf, uint16_t bytes_read) +{ + Mgmtd__BeMessage *be_msg; + struct mgmt_be_msg *msg; + uint16_t bytes_left; + uint16_t processed = 0; + + bytes_left = bytes_read; + for (; bytes_left > MGMTD_BE_MSG_HDR_LEN; + bytes_left -= msg->hdr.len, msg_buf += msg->hdr.len) { + msg = (struct mgmt_be_msg *)msg_buf; + if (msg->hdr.marker != MGMTD_BE_MSG_MARKER) { + MGMTD_BE_ADAPTER_DBG( + "Marker not found in message from MGMTD Backend adapter '%s'", + adapter->name); + break; + } + + if (bytes_left < msg->hdr.len) { + MGMTD_BE_ADAPTER_DBG( + "Incomplete message of %d bytes (epxected: %u) from MGMTD Backend adapter '%s'", + bytes_left, msg->hdr.len, adapter->name); + break; + } + + be_msg = mgmtd__be_message__unpack( + NULL, (size_t)(msg->hdr.len - MGMTD_BE_MSG_HDR_LEN), + msg->payload); + if (!be_msg) { + MGMTD_BE_ADAPTER_DBG( + "Failed to decode %d bytes from MGMTD Backend adapter '%s'", + msg->hdr.len, adapter->name); + continue; + } + + (void)mgmt_be_adapter_handle_msg(adapter, be_msg); + mgmtd__be_message__free_unpacked(be_msg, NULL); + processed++; + adapter->num_msg_rx++; + } + + return processed; +} + +static void mgmt_be_adapter_proc_msgbufs(struct thread *thread) +{ + struct mgmt_be_client_adapter *adapter; + struct stream *work; + int processed = 0; + + adapter = (struct mgmt_be_client_adapter *)THREAD_ARG(thread); + assert(adapter); + + if (adapter->conn_fd < 0) + return; + + for (; processed < MGMTD_BE_MAX_NUM_MSG_PROC;) { + work = stream_fifo_pop_safe(adapter->ibuf_fifo); + if (!work) + break; + + processed += mgmt_be_adapter_process_msg( + adapter, STREAM_DATA(work), stream_get_endp(work)); + + if (work != adapter->ibuf_work) { + /* Free it up */ + stream_free(work); + } else { + /* Reset stream buffer for next read */ + stream_reset(work); + } + } + + /* + * If we have more to process, reschedule for processing it. + */ + if (stream_fifo_head(adapter->ibuf_fifo)) + mgmt_be_adapter_register_event(adapter, MGMTD_BE_PROC_MSG); +} + +static void mgmt_be_adapter_read(struct thread *thread) +{ + struct mgmt_be_client_adapter *adapter; + int bytes_read, msg_cnt; + size_t total_bytes, bytes_left; + struct mgmt_be_msg_hdr *msg_hdr; + bool incomplete = false; + + adapter = (struct mgmt_be_client_adapter *)THREAD_ARG(thread); + assert(adapter && adapter->conn_fd >= 0); + + total_bytes = 0; + bytes_left = STREAM_SIZE(adapter->ibuf_work) - + stream_get_endp(adapter->ibuf_work); + for (; bytes_left > MGMTD_BE_MSG_HDR_LEN;) { + bytes_read = stream_read_try(adapter->ibuf_work, + adapter->conn_fd, bytes_left); + MGMTD_BE_ADAPTER_DBG( + "Got %d bytes of message from MGMTD Backend adapter '%s'", + bytes_read, adapter->name); + if (bytes_read <= 0) { + if (bytes_read == -1 + && (errno == EAGAIN || errno == EWOULDBLOCK)) { + mgmt_be_adapter_register_event( + adapter, MGMTD_BE_CONN_READ); + return; + } + + if (!bytes_read) { + /* Looks like connection closed */ + MGMTD_BE_ADAPTER_ERR( + "Got error (%d) while reading from MGMTD Backend adapter '%s'. Err: '%s'", + bytes_read, adapter->name, + safe_strerror(errno)); + mgmt_be_adapter_disconnect(adapter); + return; + } + break; + } + + total_bytes += bytes_read; + bytes_left -= bytes_read; + } + + /* + * Check if we would have read incomplete messages or not. + */ + stream_set_getp(adapter->ibuf_work, 0); + total_bytes = 0; + msg_cnt = 0; + bytes_left = stream_get_endp(adapter->ibuf_work); + for (; bytes_left > MGMTD_BE_MSG_HDR_LEN;) { + msg_hdr = + (struct mgmt_be_msg_hdr *)(STREAM_DATA( + adapter->ibuf_work) + + total_bytes); + if (msg_hdr->marker != MGMTD_BE_MSG_MARKER) { + /* Corrupted buffer. Force disconnect?? */ + MGMTD_BE_ADAPTER_ERR( + "Received corrupted buffer from MGMTD Backend client."); + mgmt_be_adapter_disconnect(adapter); + return; + } + if (msg_hdr->len > bytes_left) + break; + + total_bytes += msg_hdr->len; + bytes_left -= msg_hdr->len; + msg_cnt++; + } + + if (bytes_left > 0) + incomplete = true; + + /* + * We would have read one or several messages. + * Schedule processing them now. + */ + msg_hdr = (struct mgmt_be_msg_hdr *)(STREAM_DATA(adapter->ibuf_work) + + total_bytes); + stream_set_endp(adapter->ibuf_work, total_bytes); + stream_fifo_push(adapter->ibuf_fifo, adapter->ibuf_work); + adapter->ibuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); + if (incomplete) { + stream_put(adapter->ibuf_work, msg_hdr, bytes_left); + stream_set_endp(adapter->ibuf_work, bytes_left); + } + + if (msg_cnt) + mgmt_be_adapter_register_event(adapter, MGMTD_BE_PROC_MSG); + + mgmt_be_adapter_register_event(adapter, MGMTD_BE_CONN_READ); +} + +static void mgmt_be_adapter_write(struct thread *thread) +{ + int bytes_written = 0; + int processed = 0; + int msg_size = 0; + struct stream *s = NULL; + struct stream *free = NULL; + struct mgmt_be_client_adapter *adapter; + + adapter = (struct mgmt_be_client_adapter *)THREAD_ARG(thread); + assert(adapter && adapter->conn_fd >= 0); + + /* Ensure pushing any pending write buffer to FIFO */ + if (adapter->obuf_work) { + stream_fifo_push(adapter->obuf_fifo, adapter->obuf_work); + adapter->obuf_work = NULL; + } + + for (s = stream_fifo_head(adapter->obuf_fifo); + s && processed < MGMTD_BE_MAX_NUM_MSG_WRITE; + s = stream_fifo_head(adapter->obuf_fifo)) { + /* msg_size = (int)stream_get_size(s); */ + msg_size = (int)STREAM_READABLE(s); + bytes_written = stream_flush(s, adapter->conn_fd); + if (bytes_written == -1 + && (errno == EAGAIN || errno == EWOULDBLOCK)) { + mgmt_be_adapter_register_event(adapter, + MGMTD_BE_CONN_WRITE); + return; + } else if (bytes_written != msg_size) { + MGMTD_BE_ADAPTER_ERR( + "Could not write all %d bytes (wrote: %d) to MGMTD Backend client socket. Err: '%s'", + msg_size, bytes_written, safe_strerror(errno)); + if (bytes_written > 0) { + stream_forward_getp(s, (size_t)bytes_written); + stream_pulldown(s); + mgmt_be_adapter_register_event( + adapter, MGMTD_BE_CONN_WRITE); + return; + } + mgmt_be_adapter_disconnect(adapter); + return; + } + + free = stream_fifo_pop(adapter->obuf_fifo); + stream_free(free); + MGMTD_BE_ADAPTER_DBG( + "Wrote %d bytes of message to MGMTD Backend client socket.'", + bytes_written); + processed++; + } + + if (s) { + mgmt_be_adapter_writes_off(adapter); + mgmt_be_adapter_register_event(adapter, + MGMTD_BE_CONN_WRITES_ON); + } +} + +static void mgmt_be_adapter_resume_writes(struct thread *thread) +{ + struct mgmt_be_client_adapter *adapter; + + adapter = (struct mgmt_be_client_adapter *)THREAD_ARG(thread); + assert(adapter && adapter->conn_fd >= 0); + + mgmt_be_adapter_writes_on(adapter); +} + +static void mgmt_be_iter_and_get_cfg(struct mgmt_ds_ctx *ds_ctx, + char *xpath, struct lyd_node *node, + struct nb_node *nb_node, void *ctx) +{ + struct mgmt_be_client_subscr_info subscr_info; + struct mgmt_be_get_adapter_config_params *parms; + struct mgmt_be_client_adapter *adapter; + struct nb_config_cbs *root; + uint32_t *seq; + + if (mgmt_be_get_subscr_info_for_xpath(xpath, &subscr_info) != 0) { + MGMTD_BE_ADAPTER_ERR( + "ERROR: Failed to get subscriber for '%s'", xpath); + return; + } + + parms = (struct mgmt_be_get_adapter_config_params *)ctx; + + adapter = parms->adapter; + if (!subscr_info.xpath_subscr[adapter->id].subscribed) + return; + + root = parms->cfg_chgs; + seq = &parms->seq; + nb_config_diff_created(node, seq, root); +} + +static void mgmt_be_adapter_conn_init(struct thread *thread) +{ + struct mgmt_be_client_adapter *adapter; + + adapter = (struct mgmt_be_client_adapter *)THREAD_ARG(thread); + assert(adapter && adapter->conn_fd >= 0); + + /* + * TODO: Check first if the current session can run a CONFIG + * transaction or not. Reschedule if a CONFIG transaction + * from another session is already in progress. + if (mgmt_config_txn_in_progress() != MGMTD_SESSION_ID_NONE) { + mgmt_be_adapter_register_event(adapter, MGMTD_BE_CONN_INIT); + return 0; + } + */ + + /* + * TODO: Notify TXN module to create a CONFIG transaction and + * download the CONFIGs identified for this new client. + * If the TXN module fails to initiate the CONFIG transaction + * disconnect from the client forcing a reconnect later. + * That should also take care of destroying the adapter. + * + if (mgmt_txn_notify_be_adapter_conn(adapter, true) != 0) { + mgmt_be_adapter_disconnect(adapter); + adapter = NULL; + } + */ +} + +static void +mgmt_be_adapter_register_event(struct mgmt_be_client_adapter *adapter, + enum mgmt_be_event event) +{ + struct timeval tv = {0}; + + switch (event) { + case MGMTD_BE_CONN_INIT: + thread_add_timer_msec(mgmt_be_adapter_tm, + mgmt_be_adapter_conn_init, adapter, + MGMTD_BE_CONN_INIT_DELAY_MSEC, + &adapter->conn_init_ev); + assert(adapter->conn_init_ev); + break; + case MGMTD_BE_CONN_READ: + thread_add_read(mgmt_be_adapter_tm, mgmt_be_adapter_read, + adapter, adapter->conn_fd, &adapter->conn_read_ev); + assert(adapter->conn_read_ev); + break; + case MGMTD_BE_CONN_WRITE: + thread_add_write(mgmt_be_adapter_tm, mgmt_be_adapter_write, + adapter, adapter->conn_fd, &adapter->conn_write_ev); + assert(adapter->conn_write_ev); + break; + case MGMTD_BE_PROC_MSG: + tv.tv_usec = MGMTD_BE_MSG_PROC_DELAY_USEC; + thread_add_timer_tv(mgmt_be_adapter_tm, + mgmt_be_adapter_proc_msgbufs, adapter, &tv, + &adapter->proc_msg_ev); + assert(adapter->proc_msg_ev); + break; + case MGMTD_BE_CONN_WRITES_ON: + thread_add_timer_msec(mgmt_be_adapter_tm, + mgmt_be_adapter_resume_writes, adapter, + MGMTD_BE_MSG_WRITE_DELAY_MSEC, + &adapter->conn_writes_on); + assert(adapter->conn_writes_on); + break; + case MGMTD_BE_SERVER: + case MGMTD_BE_SCHED_CFG_PREPARE: + case MGMTD_BE_RESCHED_CFG_PREPARE: + case MGMTD_BE_SCHED_CFG_APPLY: + case MGMTD_BE_RESCHED_CFG_APPLY: + assert(!"mgmt_be_adapter_post_event() called incorrectly"); + break; + } +} + +void mgmt_be_adapter_lock(struct mgmt_be_client_adapter *adapter) +{ + adapter->refcount++; +} + +extern void mgmt_be_adapter_unlock(struct mgmt_be_client_adapter **adapter) +{ + assert(*adapter && (*adapter)->refcount); + + (*adapter)->refcount--; + if (!(*adapter)->refcount) { + mgmt_be_adapters_del(&mgmt_be_adapters, *adapter); + + stream_fifo_free((*adapter)->ibuf_fifo); + stream_free((*adapter)->ibuf_work); + stream_fifo_free((*adapter)->obuf_fifo); + stream_free((*adapter)->obuf_work); + + THREAD_OFF((*adapter)->conn_init_ev); + THREAD_OFF((*adapter)->conn_read_ev); + THREAD_OFF((*adapter)->conn_write_ev); + THREAD_OFF((*adapter)->conn_writes_on); + THREAD_OFF((*adapter)->proc_msg_ev); + XFREE(MTYPE_MGMTD_BE_ADPATER, *adapter); + } + + *adapter = NULL; +} + +int mgmt_be_adapter_init(struct thread_master *tm) +{ + if (!mgmt_be_adapter_tm) { + mgmt_be_adapter_tm = tm; + memset(mgmt_xpath_map, 0, sizeof(mgmt_xpath_map)); + mgmt_num_xpath_maps = 0; + memset(mgmt_be_adapters_by_id, 0, + sizeof(mgmt_be_adapters_by_id)); + mgmt_be_adapters_init(&mgmt_be_adapters); + mgmt_be_xpath_map_init(); + } + + return 0; +} + +void mgmt_be_adapter_destroy(void) +{ + mgmt_be_cleanup_adapters(); +} + +struct mgmt_be_client_adapter * +mgmt_be_create_adapter(int conn_fd, union sockunion *from) +{ + struct mgmt_be_client_adapter *adapter = NULL; + + adapter = mgmt_be_find_adapter_by_fd(conn_fd); + if (!adapter) { + adapter = XCALLOC(MTYPE_MGMTD_BE_ADPATER, + sizeof(struct mgmt_be_client_adapter)); + assert(adapter); + + adapter->conn_fd = conn_fd; + adapter->id = MGMTD_BE_CLIENT_ID_MAX; + memcpy(&adapter->conn_su, from, sizeof(adapter->conn_su)); + snprintf(adapter->name, sizeof(adapter->name), "Unknown-FD-%d", + adapter->conn_fd); + adapter->ibuf_fifo = stream_fifo_new(); + adapter->ibuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); + adapter->obuf_fifo = stream_fifo_new(); + /* adapter->obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); */ + adapter->obuf_work = NULL; + mgmt_be_adapter_lock(adapter); + + mgmt_be_adapter_register_event(adapter, MGMTD_BE_CONN_READ); + mgmt_be_adapters_add_tail(&mgmt_be_adapters, adapter); + + RB_INIT(nb_config_cbs, &adapter->cfg_chgs); + + MGMTD_BE_ADAPTER_DBG("Added new MGMTD Backend adapter '%s'", + adapter->name); + } + + /* Make client socket non-blocking. */ + set_nonblocking(adapter->conn_fd); + setsockopt_so_sendbuf(adapter->conn_fd, MGMTD_SOCKET_BE_SEND_BUF_SIZE); + setsockopt_so_recvbuf(adapter->conn_fd, MGMTD_SOCKET_BE_RECV_BUF_SIZE); + + /* Trigger resync of config with the new adapter */ + mgmt_be_adapter_register_event(adapter, MGMTD_BE_CONN_INIT); + + return adapter; +} + +struct mgmt_be_client_adapter * +mgmt_be_get_adapter_by_id(enum mgmt_be_client_id id) +{ + return (id < MGMTD_BE_CLIENT_ID_MAX ? mgmt_be_adapters_by_id[id] + : NULL); +} + +struct mgmt_be_client_adapter * +mgmt_be_get_adapter_by_name(const char *name) +{ + return mgmt_be_find_adapter_by_name(name); +} + +int mgmt_be_get_adapter_config(struct mgmt_be_client_adapter *adapter, + struct mgmt_ds_ctx *ds_ctx, + struct nb_config_cbs **cfg_chgs) +{ + char base_xpath[] = "/"; + struct mgmt_be_get_adapter_config_params parms; + + assert(cfg_chgs); + + if (RB_EMPTY(nb_config_cbs, &adapter->cfg_chgs)) { + parms.adapter = adapter; + parms.cfg_chgs = &adapter->cfg_chgs; + parms.seq = 0; + + mgmt_ds_iter_data(ds_ctx, base_xpath, + mgmt_be_iter_and_get_cfg, (void *)&parms, + false); + } + + *cfg_chgs = &adapter->cfg_chgs; + return 0; +} + +int mgmt_be_create_txn(struct mgmt_be_client_adapter *adapter, + uint64_t txn_id) +{ + return mgmt_be_send_txn_req(adapter, txn_id, true); +} + +int mgmt_be_destroy_txn(struct mgmt_be_client_adapter *adapter, + uint64_t txn_id) +{ + return mgmt_be_send_txn_req(adapter, txn_id, false); +} + +int mgmt_be_send_cfg_data_create_req(struct mgmt_be_client_adapter *adapter, + uint64_t txn_id, uint64_t batch_id, + struct mgmt_be_cfgreq *cfg_req, + bool end_of_data) +{ + return mgmt_be_send_cfgdata_create_req( + adapter, txn_id, batch_id, cfg_req->cfgdata_reqs, + cfg_req->num_reqs, end_of_data); +} + +extern int +mgmt_be_send_cfg_apply_req(struct mgmt_be_client_adapter *adapter, + uint64_t txn_id) +{ + return mgmt_be_send_cfgapply_req(adapter, txn_id); +} + +/* + * This function maps a YANG dtata Xpath to one or more + * Backend Clients that should be contacted for various purposes. + */ +int mgmt_be_get_subscr_info_for_xpath( + const char *xpath, struct mgmt_be_client_subscr_info *subscr_info) +{ + int indx, match, max_match = 0, num_reg; + enum mgmt_be_client_id id; + struct mgmt_be_client_subscr_info + *reg_maps[array_size(mgmt_xpath_map)] = {0}; + bool root_xp = false; + + if (!subscr_info) + return -1; + + num_reg = 0; + memset(subscr_info, 0, sizeof(*subscr_info)); + + if (strlen(xpath) <= 2 && xpath[0] == '/' + && (!xpath[1] || xpath[1] == '*')) { + root_xp = true; + } + + MGMTD_BE_ADAPTER_DBG("XPATH: %s", xpath); + for (indx = 0; indx < mgmt_num_xpath_maps; indx++) { + /* + * For Xpaths: '/' and '/ *' all xpath maps should match + * the given xpath. + */ + if (!root_xp) { + match = mgmt_be_eval_regexp_match( + mgmt_xpath_map[indx].xpath_regexp, xpath); + + if (!match || match < max_match) + continue; + + if (match > max_match) { + num_reg = 0; + max_match = match; + } + } + + reg_maps[num_reg] = &mgmt_xpath_map[indx].be_subscrs; + num_reg++; + } + + for (indx = 0; indx < num_reg; indx++) { + FOREACH_MGMTD_BE_CLIENT_ID (id) { + if (reg_maps[indx]->xpath_subscr[id].subscribed) { + MGMTD_BE_ADAPTER_DBG( + "Cient: %s", + mgmt_be_client_id2name(id)); + memcpy(&subscr_info->xpath_subscr[id], + ®_maps[indx]->xpath_subscr[id], + sizeof(subscr_info->xpath_subscr[id])); + } + } + } + + return 0; +} + +void mgmt_be_adapter_status_write(struct vty *vty) +{ + struct mgmt_be_client_adapter *adapter; + + vty_out(vty, "MGMTD Backend Adapters\n"); + + FOREACH_ADAPTER_IN_LIST (adapter) { + vty_out(vty, " Client: \t\t\t%s\n", adapter->name); + vty_out(vty, " Conn-FD: \t\t\t%d\n", adapter->conn_fd); + vty_out(vty, " Client-Id: \t\t\t%d\n", adapter->id); + vty_out(vty, " Ref-Count: \t\t\t%u\n", adapter->refcount); + vty_out(vty, " Msg-Sent: \t\t\t%u\n", adapter->num_msg_tx); + vty_out(vty, " Msg-Recvd: \t\t\t%u\n", adapter->num_msg_rx); + } + vty_out(vty, " Total: %d\n", + (int)mgmt_be_adapters_count(&mgmt_be_adapters)); +} + +void mgmt_be_xpath_register_write(struct vty *vty) +{ + int indx; + enum mgmt_be_client_id id; + struct mgmt_be_client_adapter *adapter; + + vty_out(vty, "MGMTD Backend XPath Registry\n"); + + for (indx = 0; indx < mgmt_num_xpath_maps; indx++) { + vty_out(vty, " - XPATH: '%s'\n", + mgmt_xpath_map[indx].xpath_regexp); + FOREACH_MGMTD_BE_CLIENT_ID (id) { + if (mgmt_xpath_map[indx] + .be_subscrs.xpath_subscr[id] + .subscribed) { + vty_out(vty, + " -- Client: '%s' \t Validate:%s, Notify:%s, Own:%s\n", + mgmt_be_client_id2name(id), + mgmt_xpath_map[indx] + .be_subscrs + .xpath_subscr[id] + .validate_config + ? "T" + : "F", + mgmt_xpath_map[indx] + .be_subscrs + .xpath_subscr[id] + .notify_config + ? "T" + : "F", + mgmt_xpath_map[indx] + .be_subscrs + .xpath_subscr[id] + .own_oper_data + ? "T" + : "F"); + adapter = mgmt_be_get_adapter_by_id(id); + if (adapter) { + vty_out(vty, " -- Adapter: %p\n", + adapter); + } + } + } + } + + vty_out(vty, "Total XPath Registries: %u\n", mgmt_num_xpath_maps); +} + +void mgmt_be_xpath_subscr_info_write(struct vty *vty, const char *xpath) +{ + struct mgmt_be_client_subscr_info subscr; + enum mgmt_be_client_id id; + struct mgmt_be_client_adapter *adapter; + + if (mgmt_be_get_subscr_info_for_xpath(xpath, &subscr) != 0) { + vty_out(vty, "ERROR: Failed to get subscriber for '%s'\n", + xpath); + return; + } + + vty_out(vty, "XPath: '%s'\n", xpath); + FOREACH_MGMTD_BE_CLIENT_ID (id) { + if (subscr.xpath_subscr[id].subscribed) { + vty_out(vty, + " -- Client: '%s' \t Validate:%s, Notify:%s, Own:%s\n", + mgmt_be_client_id2name(id), + subscr.xpath_subscr[id].validate_config ? "T" + : "F", + subscr.xpath_subscr[id].notify_config ? "T" + : "F", + subscr.xpath_subscr[id].own_oper_data ? "T" + : "F"); + adapter = mgmt_be_get_adapter_by_id(id); + if (adapter) + vty_out(vty, " -- Adapter: %p\n", adapter); + } + } +} diff --git a/mgmtd/mgmt_be_adapter.h b/mgmtd/mgmt_be_adapter.h new file mode 100644 index 000000000..5dfc2386d --- /dev/null +++ b/mgmtd/mgmt_be_adapter.h @@ -0,0 +1,231 @@ +// SPDX-License-Identifier: GPL-2.0-or-later +/* + * MGMTD Backend Client Connection Adapter + * + * Copyright (C) 2021 Vmware, Inc. + * Pushpasis Sarkar <spushpasis@vmware.com> + */ + +#ifndef _FRR_MGMTD_BE_ADAPTER_H_ +#define _FRR_MGMTD_BE_ADAPTER_H_ + +#include "mgmtd/mgmt_defines.h" +#include "mgmt_be_client.h" +#include "mgmtd/mgmt_ds.h" + +#define MGMTD_BE_CONN_INIT_DELAY_MSEC 50 + +#define MGMTD_FIND_ADAPTER_BY_INDEX(adapter_index) \ + mgmt_adaptr_ref[adapter_index] + +enum mgmt_be_req_type { + MGMTD_BE_REQ_NONE = 0, + MGMTD_BE_REQ_CFG_VALIDATE, + MGMTD_BE_REQ_CFG_APPLY, + MGMTD_BE_REQ_DATA_GET_ELEM, + MGMTD_BE_REQ_DATA_GET_NEXT +}; + +struct mgmt_be_cfgreq { + Mgmtd__YangCfgDataReq **cfgdata_reqs; + size_t num_reqs; +}; + +struct mgmt_be_datareq { + Mgmtd__YangGetDataReq **getdata_reqs; + size_t num_reqs; +}; + +PREDECL_LIST(mgmt_be_adapters); +PREDECL_LIST(mgmt_txn_badapters); + +struct mgmt_be_client_adapter { + enum mgmt_be_client_id id; + int conn_fd; + union sockunion conn_su; + struct thread *conn_init_ev; + struct thread *conn_read_ev; + struct thread *conn_write_ev; + struct thread *conn_writes_on; + struct thread *proc_msg_ev; + uint32_t flags; + char name[MGMTD_CLIENT_NAME_MAX_LEN]; + uint8_t num_xpath_reg; + char xpath_reg[MGMTD_MAX_NUM_XPATH_REG][MGMTD_MAX_XPATH_LEN]; + + /* IO streams for read and write */ + /* pthread_mutex_t ibuf_mtx; */ + struct stream_fifo *ibuf_fifo; + /* pthread_mutex_t obuf_mtx; */ + struct stream_fifo *obuf_fifo; + + /* Private I/O buffers */ + struct stream *ibuf_work; + struct stream *obuf_work; + uint8_t msg_buf[MGMTD_BE_MSG_MAX_LEN]; + + /* Buffer of data waiting to be written to client. */ + /* struct buffer *wb; */ + + int refcount; + uint32_t num_msg_tx; + uint32_t num_msg_rx; + + /* + * List of config items that should be sent to the + * backend during re/connect. This is temporarily + * created and then freed-up as soon as the initial + * config items has been applied onto the backend. + */ + struct nb_config_cbs cfg_chgs; + + struct mgmt_be_adapters_item list_linkage; + struct mgmt_txn_badapters_item txn_list_linkage; +}; + +#define MGMTD_BE_ADAPTER_FLAGS_WRITES_OFF (1U << 0) +#define MGMTD_BE_ADAPTER_FLAGS_CFG_SYNCED (1U << 1) + +DECLARE_LIST(mgmt_be_adapters, struct mgmt_be_client_adapter, list_linkage); +DECLARE_LIST(mgmt_txn_badapters, struct mgmt_be_client_adapter, + txn_list_linkage); + +union mgmt_be_xpath_subscr_info { + uint8_t subscribed; + struct { + uint8_t validate_config : 1; + uint8_t notify_config : 1; + uint8_t own_oper_data : 1; + }; +}; + +struct mgmt_be_client_subscr_info { + union mgmt_be_xpath_subscr_info xpath_subscr[MGMTD_BE_CLIENT_ID_MAX]; +}; + +/* Initialise backend adapter module. */ +extern int mgmt_be_adapter_init(struct thread_master *tm); + +/* Destroy the backend adapter module. */ +extern void mgmt_be_adapter_destroy(void); + +/* Acquire lock for backend adapter. */ +extern void mgmt_be_adapter_lock(struct mgmt_be_client_adapter *adapter); + +/* Remove lock from backend adapter. */ +extern void mgmt_be_adapter_unlock(struct mgmt_be_client_adapter **adapter); + +/* Create backend adapter. */ +extern struct mgmt_be_client_adapter * +mgmt_be_create_adapter(int conn_fd, union sockunion *su); + +/* Fetch backend adapter given an adapter name. */ +extern struct mgmt_be_client_adapter * +mgmt_be_get_adapter_by_name(const char *name); + +/* Fetch backend adapter given an client ID. */ +extern struct mgmt_be_client_adapter * +mgmt_be_get_adapter_by_id(enum mgmt_be_client_id id); + +/* Fetch backend adapter config. */ +extern int +mgmt_be_get_adapter_config(struct mgmt_be_client_adapter *adapter, + struct mgmt_ds_ctx *ds_ctx, + struct nb_config_cbs **cfg_chgs); + +/* Create a transaction. */ +extern int mgmt_be_create_txn(struct mgmt_be_client_adapter *adapter, + uint64_t txn_id); + +/* Destroy a transaction. */ +extern int mgmt_be_destroy_txn(struct mgmt_be_client_adapter *adapter, + uint64_t txn_id); + +/* + * Send config data create request to backend client. + * + * adaptr + * Backend adapter information. + * + * txn_id + * Unique transaction identifier. + * + * batch_id + * Request batch ID. + * + * cfg_req + * Config data request. + * + * end_of_data + * TRUE if the data from last batch, FALSE otherwise. + * + * Returns: + * 0 on success, -1 on failure. + */ +extern int mgmt_be_send_cfg_data_create_req( + struct mgmt_be_client_adapter *adapter, uint64_t txn_id, + uint64_t batch_id, struct mgmt_be_cfgreq *cfg_req, bool end_of_data); + +/* + * Send config validate request to backend client. + * + * adaptr + * Backend adapter information. + * + * txn_id + * Unique transaction identifier. + * + * batch_ids + * List of request batch IDs. + * + * num_batch_ids + * Number of batch ids. + * + * Returns: + * 0 on success, -1 on failure. + */ +extern int +mgmt_be_send_cfg_validate_req(struct mgmt_be_client_adapter *adapter, + uint64_t txn_id, uint64_t batch_ids[], + size_t num_batch_ids); + +/* + * Send config apply request to backend client. + * + * adaptr + * Backend adapter information. + * + * txn_id + * Unique transaction identifier. + * + * Returns: + * 0 on success, -1 on failure. + */ +extern int +mgmt_be_send_cfg_apply_req(struct mgmt_be_client_adapter *adapter, + uint64_t txn_id); + +/* + * Dump backend adapter status to vty. + */ +extern void mgmt_be_adapter_status_write(struct vty *vty); + +/* + * Dump xpath registry for each backend client to vty. + */ +extern void mgmt_be_xpath_register_write(struct vty *vty); + +/* + * Maps a YANG dtata Xpath to one or more + * backend clients that should be contacted for various purposes. + */ +extern int mgmt_be_get_subscr_info_for_xpath( + const char *xpath, struct mgmt_be_client_subscr_info *subscr_info); + +/* + * Dump backend client information for a given xpath to vty. + */ +extern void mgmt_be_xpath_subscr_info_write(struct vty *vty, + const char *xpath); + +#endif /* _FRR_MGMTD_BE_ADAPTER_H_ */ diff --git a/mgmtd/mgmt_be_server.c b/mgmtd/mgmt_be_server.c new file mode 100644 index 000000000..6464b12ae --- /dev/null +++ b/mgmtd/mgmt_be_server.c @@ -0,0 +1,160 @@ +// SPDX-License-Identifier: GPL-2.0-or-later +/* + * MGMTD Backend Server + * + * Copyright (C) 2021 Vmware, Inc. + * Pushpasis Sarkar <spushpasis@vmware.com> + */ + +#include <zebra.h> +#include "network.h" +#include "libfrr.h" +#include "mgmtd/mgmt.h" +#include "mgmtd/mgmt_be_server.h" +#include "mgmtd/mgmt_be_adapter.h" + +#ifdef REDIRECT_DEBUG_TO_STDERR +#define MGMTD_BE_SRVR_DBG(fmt, ...) \ + fprintf(stderr, "%s: " fmt "\n", __func__, ##__VA_ARGS__) +#define MGMTD_BE_SRVR_ERR(fmt, ...) \ + fprintf(stderr, "%s: ERROR, " fmt "\n", __func__, ##__VA_ARGS__) +#else /* REDIRECT_DEBUG_TO_STDERR */ +#define MGMTD_BE_SRVR_DBG(fmt, ...) \ + do { \ + if (mgmt_debug_be) \ + zlog_debug("%s: " fmt, __func__, ##__VA_ARGS__); \ + } while (0) +#define MGMTD_BE_SRVR_ERR(fmt, ...) \ + zlog_err("%s: ERROR: " fmt, __func__, ##__VA_ARGS__) +#endif /* REDIRECT_DEBUG_TO_STDERR */ + +static int mgmt_be_listen_fd; +static struct thread_master *mgmt_be_listen_tm; +static struct thread *mgmt_be_listen_ev; +static void mgmt_be_server_register_event(enum mgmt_be_event event); + +static void mgmt_be_conn_accept(struct thread *thread) +{ + int client_conn_fd; + union sockunion su; + + if (mgmt_be_listen_fd < 0) + return; + + /* We continue hearing server listen socket. */ + mgmt_be_server_register_event(MGMTD_BE_SERVER); + + memset(&su, 0, sizeof(union sockunion)); + + /* We can handle IPv4 or IPv6 socket. */ + client_conn_fd = sockunion_accept(mgmt_be_listen_fd, &su); + if (client_conn_fd < 0) { + MGMTD_BE_SRVR_ERR( + "Failed to accept MGMTD Backend client connection : %s", + safe_strerror(errno)); + return; + } + set_nonblocking(client_conn_fd); + set_cloexec(client_conn_fd); + + MGMTD_BE_SRVR_DBG("Got a new MGMTD Backend connection"); + + mgmt_be_create_adapter(client_conn_fd, &su); +} + +static void mgmt_be_server_register_event(enum mgmt_be_event event) +{ + if (event == MGMTD_BE_SERVER) { + thread_add_read(mgmt_be_listen_tm, mgmt_be_conn_accept, + NULL, mgmt_be_listen_fd, + &mgmt_be_listen_ev); + assert(mgmt_be_listen_ev); + } else { + assert(!"mgmt_be_server_post_event() called incorrectly"); + } +} + +static void mgmt_be_server_start(const char *hostname) +{ + int ret; + int sock; + struct sockaddr_un addr; + mode_t old_mask; + + /* Set umask */ + old_mask = umask(0077); + + sock = socket(AF_UNIX, SOCK_STREAM, PF_UNSPEC); + if (sock < 0) { + MGMTD_BE_SRVR_ERR("Failed to create server socket: %s", + safe_strerror(errno)); + goto mgmt_be_server_start_failed; + } + + addr.sun_family = AF_UNIX, + strlcpy(addr.sun_path, MGMTD_BE_SERVER_PATH, sizeof(addr.sun_path)); + unlink(addr.sun_path); + ret = bind(sock, (struct sockaddr *)&addr, sizeof(addr)); + if (ret < 0) { + MGMTD_BE_SRVR_ERR( + "Failed to bind server socket to '%s'. Err: %s", + addr.sun_path, safe_strerror(errno)); + goto mgmt_be_server_start_failed; + } + + ret = listen(sock, MGMTD_BE_MAX_CONN); + if (ret < 0) { + MGMTD_BE_SRVR_ERR("Failed to listen on server socket: %s", + safe_strerror(errno)); + goto mgmt_be_server_start_failed; + } + + /* Restore umask */ + umask(old_mask); + + mgmt_be_listen_fd = sock; + mgmt_be_server_register_event(MGMTD_BE_SERVER); + + MGMTD_BE_SRVR_DBG("Started MGMTD Backend Server!"); + return; + +mgmt_be_server_start_failed: + if (sock) + close(sock); + + mgmt_be_listen_fd = -1; + exit(-1); +} + +int mgmt_be_server_init(struct thread_master *master) +{ + if (mgmt_be_listen_tm) { + MGMTD_BE_SRVR_DBG("MGMTD Backend Server already running!"); + return 0; + } + + mgmt_be_listen_tm = master; + + mgmt_be_server_start("localhost"); + + return 0; +} + +void mgmt_be_server_destroy(void) +{ + if (mgmt_be_listen_tm) { + MGMTD_BE_SRVR_DBG("Closing MGMTD Backend Server!"); + + if (mgmt_be_listen_ev) { + THREAD_OFF(mgmt_be_listen_ev); + mgmt_be_listen_ev = NULL; + } + + if (mgmt_be_listen_fd >= 0) { + close(mgmt_be_listen_fd); + mgmt_be_listen_fd = -1; + } + + mgmt_be_listen_tm = NULL; + } +} diff --git a/mgmtd/mgmt_be_server.h b/mgmtd/mgmt_be_server.h new file mode 100644 index 000000000..5ee57fdf1 --- /dev/null +++ b/mgmtd/mgmt_be_server.h @@ -0,0 +1,20 @@ +// SPDX-License-Identifier: GPL-2.0-or-later +/* + * MGMTD Backend Server + * + * Copyright (C) 2021 Vmware, Inc. + * Pushpasis Sarkar + */ + +#ifndef _FRR_MGMTD_BE_SERVER_H_ +#define _FRR_MGMTD_BE_SERVER_H_ + +#define MGMTD_BE_MAX_CONN 32 + +/* Initialise backend server */ +extern int mgmt_be_server_init(struct thread_master *master); + +/* Destroy backend server */ +extern void mgmt_be_server_destroy(void); + +#endif /* _FRR_MGMTD_BE_SERVER_H_ */ diff --git a/mgmtd/mgmt_defines.h b/mgmtd/mgmt_defines.h index 6f593320b..2af8f3f55 100644 --- a/mgmtd/mgmt_defines.h +++ b/mgmtd/mgmt_defines.h @@ -17,6 +17,10 @@ #define MGMTD_MAX_YANG_VALUE_LEN YANG_VALUE_MAXLEN +#define MGMTD_MAX_NUM_XPATH_REG 128 + +#define MGMTD_MAX_NUM_DATA_REQ_IN_BATCH 32 + enum mgmt_result { MGMTD_SUCCESS = 0, MGMTD_INVALID_PARAM, @@ -35,6 +39,19 @@ enum mgmt_fe_event { MGMTD_FE_PROC_MSG }; +enum mgmt_be_event { + MGMTD_BE_SERVER = 1, + MGMTD_BE_CONN_INIT, + MGMTD_BE_CONN_READ, + MGMTD_BE_CONN_WRITE, + MGMTD_BE_CONN_WRITES_ON, + MGMTD_BE_PROC_MSG, + MGMTD_BE_SCHED_CFG_PREPARE, + MGMTD_BE_RESCHED_CFG_PREPARE, + MGMTD_BE_SCHED_CFG_APPLY, + MGMTD_BE_RESCHED_CFG_APPLY, +}; + #define MGMTD_TXN_ID_NONE 0 #endif /* _FRR_MGMTD_DEFINES_H */ diff --git a/mgmtd/mgmt_main.c b/mgmtd/mgmt_main.c index f47b91df6..017263ca8 100644 --- a/mgmtd/mgmt_main.c +++ b/mgmtd/mgmt_main.c @@ -16,6 +16,7 @@ #include "mgmtd/mgmt_ds.h" #include "routing_nb.h" + /* mgmt options, we use GNU getopt library. */ static const struct option longopts[] = { {"skip_runas", no_argument, NULL, 'S'}, @@ -195,6 +196,17 @@ static void mgmt_vrf_terminate(void) static const struct frr_yang_module_info *const mgmt_yang_modules[] = { &frr_filter_info, &frr_interface_info, &frr_route_map_info, &frr_routing_info, &frr_vrf_info, +/* + * YANG module info supported by backend clients get added here. + * NOTE: Always set .ignore_cbs true for to avoid validating + * backend northbound callbacks during loading. + */ +#if 0 +#ifdef HAVE_STATICD + &(struct frr_yang_module_info){.name = "frr-staticd", + .ignore_cbs = true}, +#endif +#endif }; FRR_DAEMON_INFO(mgmtd, MGMTD, .vty_port = MGMTD_VTY_PORT, diff --git a/mgmtd/mgmt_memory.c b/mgmtd/mgmt_memory.c index 1ec1df7ec..39e036c30 100644 --- a/mgmtd/mgmt_memory.c +++ b/mgmtd/mgmt_memory.c @@ -19,5 +19,6 @@ DEFINE_MGROUP(MGMTD, "mgmt"); DEFINE_MTYPE(MGMTD, MGMTD, "MGMTD instance"); +DEFINE_MTYPE(MGMTD, MGMTD_BE_ADPATER, "MGMTD backend adapter"); DEFINE_MTYPE(MGMTD, MGMTD_FE_ADPATER, "MGMTD Frontend adapter"); DEFINE_MTYPE(MGMTD, MGMTD_FE_SESSION, "MGMTD Frontend Client Session"); diff --git a/mgmtd/mgmt_memory.h b/mgmtd/mgmt_memory.h index e6acfc8d6..d8b3ac7e0 100644 --- a/mgmtd/mgmt_memory.h +++ b/mgmtd/mgmt_memory.h @@ -13,6 +13,7 @@ DECLARE_MGROUP(MGMTD); DECLARE_MTYPE(MGMTD); +DECLARE_MTYPE(MGMTD_BE_ADPATER); DECLARE_MTYPE(MGMTD_FE_ADPATER); DECLARE_MTYPE(MGMTD_FE_SESSION); #endif /* _FRR_MGMTD_MEMORY_H */ diff --git a/mgmtd/mgmt_vty.c b/mgmtd/mgmt_vty.c index 4efd38e97..8ee5921db 100644 --- a/mgmtd/mgmt_vty.c +++ b/mgmtd/mgmt_vty.c @@ -11,12 +11,39 @@ #include "command.h" #include "json.h" #include "mgmtd/mgmt.h" +#include "mgmtd/mgmt_be_server.h" +#include "mgmtd/mgmt_be_adapter.h" #include "mgmtd/mgmt_fe_server.h" #include "mgmtd/mgmt_fe_adapter.h" #include "mgmtd/mgmt_ds.h" #include "mgmtd/mgmt_vty_clippy.c" +DEFPY(show_mgmt_be_adapter, + show_mgmt_be_adapter_cmd, + "show mgmt backend-adapter all", + SHOW_STR + MGMTD_STR + MGMTD_BE_ADAPTER_STR + "Display all Backend Adapters\n") +{ + mgmt_be_adapter_status_write(vty); + + return CMD_SUCCESS; +} + +DEFPY(show_mgmt_be_xpath_reg, + show_mgmt_be_xpath_reg_cmd, + "show mgmt backend-yang-xpath-registry", + SHOW_STR + MGMTD_STR + "Backend Adapter YANG Xpath Registry\n") +{ + mgmt_be_xpath_register_write(vty); + + return CMD_SUCCESS; +} + DEFPY(show_mgmt_fe_adapter, show_mgmt_fe_adapter_cmd, "show mgmt frontend-adapter all [detail$detail]", SHOW_STR @@ -199,6 +226,18 @@ DEFPY(show_mgmt_dump_data, return CMD_SUCCESS; } +DEFPY(show_mgmt_map_xpath, + show_mgmt_map_xpath_cmd, + "show mgmt yang-xpath-subscription WORD$path", + SHOW_STR + MGMTD_STR + "Get YANG Backend Subscription\n" + "XPath expression specifying the YANG data path\n") +{ + mgmt_be_xpath_subscr_info_write(vty, path); + return CMD_SUCCESS; +} + DEFPY(mgmt_load_config, mgmt_load_config_cmd, "mgmt load-config WORD$filepath <merge|replace>$type", @@ -331,13 +370,29 @@ DEFPY(debug_mgmt, void mgmt_vty_init(void) { + /* + * Initialize command handling from VTYSH connection. + * Call command initialization routines defined by + * backend components that are moved to new MGMTD infra + * here one by one. + */ +#if 0 +#if HAVE_STATICD + extern void static_vty_init(void); + static_vty_init(); +#endif +#endif + install_node(&debug_node); + install_element(VIEW_NODE, &show_mgmt_be_adapter_cmd); + install_element(VIEW_NODE, &show_mgmt_be_xpath_reg_cmd); install_element(VIEW_NODE, &show_mgmt_fe_adapter_cmd); install_element(VIEW_NODE, &show_mgmt_ds_cmd); install_element(VIEW_NODE, &show_mgmt_get_config_cmd); install_element(VIEW_NODE, &show_mgmt_get_data_cmd); install_element(VIEW_NODE, &show_mgmt_dump_data_cmd); + install_element(VIEW_NODE, &show_mgmt_map_xpath_cmd); install_element(CONFIG_NODE, &mgmt_commit_cmd); install_element(CONFIG_NODE, &mgmt_set_config_data_cmd); diff --git a/mgmtd/subdir.am b/mgmtd/subdir.am index 732d5560b..64228f968 100644 --- a/mgmtd/subdir.am +++ b/mgmtd/subdir.am @@ -18,6 +18,8 @@ noinst_LIBRARIES += mgmtd/libmgmtd.a mgmtd_libmgmtd_a_SOURCES = \ mgmtd/mgmt.c \ mgmtd/mgmt_ds.c \ + mgmtd/mgmt_be_server.c \ + mgmtd/mgmt_be_adapter.c \ mgmtd/mgmt_fe_server.c \ mgmtd/mgmt_fe_adapter.c \ mgmtd/mgmt_memory.c \ @@ -31,6 +33,8 @@ mgmtdheader_HEADERS = \ noinst_HEADERS += \ mgmtd/mgmt.h \ + mgmtd/mgmt_be_server.h \ + mgmtd/mgmt_be_adapter.h \ mgmtd/mgmt_ds.h \ mgmtd/mgmt_fe_server.h \ mgmtd/mgmt_fe_adapter.h \ |