diff options
-rw-r--r-- | .clang-format | 5 | ||||
-rw-r--r-- | lib/mgmt.proto | 32 | ||||
-rw-r--r-- | lib/mgmt_be_client.c | 1490 | ||||
-rw-r--r-- | lib/mgmt_be_client.h | 273 | ||||
-rw-r--r-- | lib/northbound.c | 299 | ||||
-rw-r--r-- | lib/northbound.h | 86 | ||||
-rw-r--r-- | lib/northbound_cli.c | 70 | ||||
-rw-r--r-- | lib/northbound_confd.c | 3 | ||||
-rw-r--r-- | lib/northbound_grpc.cpp | 3 | ||||
-rw-r--r-- | lib/northbound_sysrepo.c | 3 | ||||
-rw-r--r-- | lib/subdir.am | 2 | ||||
-rw-r--r-- | mgmtd/mgmt.c | 10 | ||||
-rw-r--r-- | mgmtd/mgmt_be_adapter.c | 1288 | ||||
-rw-r--r-- | mgmtd/mgmt_be_adapter.h | 231 | ||||
-rw-r--r-- | mgmtd/mgmt_be_server.c | 160 | ||||
-rw-r--r-- | mgmtd/mgmt_be_server.h | 20 | ||||
-rw-r--r-- | mgmtd/mgmt_defines.h | 17 | ||||
-rw-r--r-- | mgmtd/mgmt_main.c | 12 | ||||
-rw-r--r-- | mgmtd/mgmt_memory.c | 1 | ||||
-rw-r--r-- | mgmtd/mgmt_memory.h | 1 | ||||
-rw-r--r-- | mgmtd/mgmt_vty.c | 55 | ||||
-rw-r--r-- | mgmtd/subdir.am | 4 |
22 files changed, 3941 insertions, 124 deletions
diff --git a/.clang-format b/.clang-format index 424662b81..01e909d93 100644 --- a/.clang-format +++ b/.clang-format @@ -52,6 +52,11 @@ ForEachMacros: - FOR_ALL_INTERFACES - FOR_ALL_INTERFACES_ADDRESSES - JSON_FOREACH + - FOREACH_BE_TXN_BATCH_IN_LIST + - FOREACH_BE_APPLY_BATCH_IN_LIST + - FOREACH_BE_TXN_IN_LIST + - FOREACH_SESSION_IN_LIST + - FOREACH_MGMTD_BE_CLIENT_ID # libyang - LY_FOR_KEYS - LY_LIST_FOR diff --git a/lib/mgmt.proto b/lib/mgmt.proto index 437679410..8a11ff0fa 100644 --- a/lib/mgmt.proto +++ b/lib/mgmt.proto @@ -106,18 +106,6 @@ message BeCfgDataCreateReply { optional string error_if_any = 4; } -message BeCfgDataValidateReq { - required uint64 txn_id = 1; - repeated uint64 batch_ids = 2; -} - -message BeCfgDataValidateReply { - required uint64 txn_id = 1; - repeated uint64 batch_ids = 2; - required bool success = 3; - optional string error_if_any = 4; -} - message BeCfgDataApplyReq { required uint64 txn_id = 1; } @@ -181,17 +169,15 @@ message BeMessage { BeTxnReply txn_reply = 5; BeCfgDataCreateReq cfg_data_req = 6; BeCfgDataCreateReply cfg_data_reply = 7; - BeCfgDataValidateReq cfg_validate_req = 8; - BeCfgDataValidateReply cfg_validate_reply = 9; - BeCfgDataApplyReq cfg_apply_req = 10; - BeCfgDataApplyReply cfg_apply_reply = 11; - BeOperDataGetReq get_req = 12; - BeOperDataGetReply get_reply = 13; - BeOperDataNotify notify_data = 14; - BeConfigCmdReq cfg_cmd_req = 15; - BeConfigCmdReply cfg_cmd_reply = 16; - BeShowCmdReq show_cmd_req = 17; - BeShowCmdReply show_cmd_reply = 18; + BeCfgDataApplyReq cfg_apply_req = 8; + BeCfgDataApplyReply cfg_apply_reply = 9; + BeOperDataGetReq get_req = 10; + BeOperDataGetReply get_reply = 11; + BeOperDataNotify notify_data = 12; + BeConfigCmdReq cfg_cmd_req = 13; + BeConfigCmdReply cfg_cmd_reply = 14; + BeShowCmdReq show_cmd_req = 15; + BeShowCmdReply show_cmd_reply = 16; } } diff --git a/lib/mgmt_be_client.c b/lib/mgmt_be_client.c new file mode 100644 index 000000000..d86f3d336 --- /dev/null +++ b/lib/mgmt_be_client.c @@ -0,0 +1,1490 @@ +// SPDX-License-Identifier: GPL-2.0-or-later +/* + * MGMTD Backend Client Library api interfaces + * Copyright (C) 2021 Vmware, Inc. + * Pushpasis Sarkar <spushpasis@vmware.com> + */ + +#include <zebra.h> +#include "libfrr.h" +#include "mgmtd/mgmt.h" +#include "mgmt_be_client.h" +#include "mgmt_pb.h" +#include "network.h" +#include "stream.h" +#include "sockopt.h" + +#ifdef REDIRECT_DEBUG_TO_STDERR +#define MGMTD_BE_CLIENT_DBG(fmt, ...) \ + fprintf(stderr, "%s: " fmt "\n", __func__, ##__VA_ARGS__) +#define MGMTD_BE_CLIENT_ERR(fmt, ...) \ + fprintf(stderr, "%s: ERROR, " fmt "\n", __func__, ##__VA_ARGS__) +#else /* REDIRECT_DEBUG_TO_STDERR */ +#define MGMTD_BE_CLIENT_DBG(fmt, ...) \ + do { \ + if (mgmt_debug_be_client) \ + zlog_debug("%s: " fmt, __func__, ##__VA_ARGS__); \ + } while (0) +#define MGMTD_BE_CLIENT_ERR(fmt, ...) \ + zlog_err("%s: ERROR: " fmt, __func__, ##__VA_ARGS__) +#endif /* REDIRECT_DEBUG_TO_STDERR */ + +DEFINE_MTYPE_STATIC(LIB, MGMTD_BE_BATCH, + "MGMTD backend transaction batch data"); +DEFINE_MTYPE_STATIC(LIB, MGMTD_BE_TXN, "MGMTD backend transaction data"); + +enum mgmt_be_txn_event { + MGMTD_BE_TXN_PROC_SETCFG = 1, + MGMTD_BE_TXN_PROC_GETCFG, + MGMTD_BE_TXN_PROC_GETDATA +}; + +struct mgmt_be_set_cfg_req { + struct nb_cfg_change cfg_changes[MGMTD_MAX_CFG_CHANGES_IN_BATCH]; + uint16_t num_cfg_changes; +}; + +struct mgmt_be_get_data_req { + char *xpaths[MGMTD_MAX_NUM_DATA_REQ_IN_BATCH]; + uint16_t num_xpaths; +}; + +struct mgmt_be_txn_req { + enum mgmt_be_txn_event event; + union { + struct mgmt_be_set_cfg_req set_cfg; + struct mgmt_be_get_data_req get_data; + } req; +}; + +PREDECL_LIST(mgmt_be_batches); +struct mgmt_be_batch_ctx { + /* Batch-Id as assigned by MGMTD */ + uint64_t batch_id; + + struct mgmt_be_txn_req txn_req; + + uint32_t flags; + + struct mgmt_be_batches_item list_linkage; +}; +#define MGMTD_BE_BATCH_FLAGS_CFG_PREPARED (1U << 0) +#define MGMTD_BE_TXN_FLAGS_CFG_APPLIED (1U << 1) +DECLARE_LIST(mgmt_be_batches, struct mgmt_be_batch_ctx, list_linkage); + +struct mgmt_be_client_ctx; + +PREDECL_LIST(mgmt_be_txns); +struct mgmt_be_txn_ctx { + /* Txn-Id as assigned by MGMTD */ + uint64_t txn_id; + uint32_t flags; + + struct mgmt_be_client_txn_ctx client_data; + struct mgmt_be_client_ctx *client_ctx; + + /* List of batches belonging to this transaction */ + struct mgmt_be_batches_head cfg_batches; + struct mgmt_be_batches_head apply_cfgs; + + struct mgmt_be_txns_item list_linkage; + + struct nb_transaction *nb_txn; + uint32_t nb_txn_id; +}; +#define MGMTD_BE_TXN_FLAGS_CFGPREP_FAILED (1U << 1) + +DECLARE_LIST(mgmt_be_txns, struct mgmt_be_txn_ctx, list_linkage); + +#define FOREACH_BE_TXN_BATCH_IN_LIST(txn, batch) \ + frr_each_safe (mgmt_be_batches, &(txn)->cfg_batches, (batch)) + +#define FOREACH_BE_APPLY_BATCH_IN_LIST(txn, batch) \ + frr_each_safe (mgmt_be_batches, &(txn)->apply_cfgs, (batch)) + +struct mgmt_be_client_ctx { + int conn_fd; + struct thread_master *tm; + struct thread *conn_retry_tmr; + struct thread *conn_read_ev; + struct thread *conn_write_ev; + struct thread *conn_writes_on; + struct thread *msg_proc_ev; + uint32_t flags; + uint32_t num_msg_tx; + uint32_t num_msg_rx; + + struct stream_fifo *ibuf_fifo; + struct stream *ibuf_work; + struct stream_fifo *obuf_fifo; + struct stream *obuf_work; + uint8_t msg_buf[MGMTD_BE_MSG_MAX_LEN]; + + struct nb_config *candidate_config; + struct nb_config *running_config; + + unsigned long num_batch_find; + unsigned long avg_batch_find_tm; + unsigned long num_edit_nb_cfg; + unsigned long avg_edit_nb_cfg_tm; + unsigned long num_prep_nb_cfg; + unsigned long avg_prep_nb_cfg_tm; + unsigned long num_apply_nb_cfg; + unsigned long avg_apply_nb_cfg_tm; + + struct mgmt_be_txns_head txn_head; + struct mgmt_be_client_params client_params; +}; + +#define MGMTD_BE_CLIENT_FLAGS_WRITES_OFF (1U << 0) + +#define FOREACH_BE_TXN_IN_LIST(client_ctx, txn) \ + frr_each_safe (mgmt_be_txns, &(client_ctx)->txn_head, (txn)) + +static bool mgmt_debug_be_client; + +static struct mgmt_be_client_ctx mgmt_be_client_ctx = {0}; + +const char *mgmt_be_client_names[MGMTD_BE_CLIENT_ID_MAX + 1] = { +#if 0 +#ifdef HAVE_STATICD + [MGMTD_BE_CLIENT_ID_STATICD] = "staticd", +#endif +#endif + [MGMTD_BE_CLIENT_ID_MAX] = "Unknown/Invalid", +}; + +/* Forward declarations */ +static void +mgmt_be_client_register_event(struct mgmt_be_client_ctx *client_ctx, + enum mgmt_be_event event); +static void +mgmt_be_client_schedule_conn_retry(struct mgmt_be_client_ctx *client_ctx, + unsigned long intvl_secs); +static int mgmt_be_client_send_msg(struct mgmt_be_client_ctx *client_ctx, + Mgmtd__BeMessage *be_msg); + +static void +mgmt_be_server_disconnect(struct mgmt_be_client_ctx *client_ctx, + bool reconnect) +{ + /* Notify client through registered callback (if any) */ + if (client_ctx->client_params.client_connect_notify) + (void)(*client_ctx->client_params + .client_connect_notify)( + (uintptr_t)client_ctx, + client_ctx->client_params.user_data, false); + + if (client_ctx->conn_fd) { + close(client_ctx->conn_fd); + client_ctx->conn_fd = 0; + } + + if (reconnect) + mgmt_be_client_schedule_conn_retry( + client_ctx, + client_ctx->client_params.conn_retry_intvl_sec); +} + +static struct mgmt_be_batch_ctx * +mgmt_be_find_batch_by_id(struct mgmt_be_txn_ctx *txn, + uint64_t batch_id) +{ + struct mgmt_be_batch_ctx *batch = NULL; + + FOREACH_BE_TXN_BATCH_IN_LIST (txn, batch) { + if (batch->batch_id == batch_id) + return batch; + } + + return NULL; +} + +static struct mgmt_be_batch_ctx * +mgmt_be_batch_create(struct mgmt_be_txn_ctx *txn, uint64_t batch_id) +{ + struct mgmt_be_batch_ctx *batch = NULL; + + batch = mgmt_be_find_batch_by_id(txn, batch_id); + if (!batch) { + batch = XCALLOC(MTYPE_MGMTD_BE_BATCH, + sizeof(struct mgmt_be_batch_ctx)); + assert(batch); + + batch->batch_id = batch_id; + mgmt_be_batches_add_tail(&txn->cfg_batches, batch); + + MGMTD_BE_CLIENT_DBG("Added new batch 0x%llx to transaction", + (unsigned long long)batch_id); + } + + return batch; +} + +static void mgmt_be_batch_delete(struct mgmt_be_txn_ctx *txn, + struct mgmt_be_batch_ctx **batch) +{ + uint16_t indx; + + if (!batch) + return; + + mgmt_be_batches_del(&txn->cfg_batches, *batch); + if ((*batch)->txn_req.event == MGMTD_BE_TXN_PROC_SETCFG) { + for (indx = 0; indx < MGMTD_MAX_CFG_CHANGES_IN_BATCH; indx++) { + if ((*batch)->txn_req.req.set_cfg.cfg_changes[indx] + .value) { + free((char *)(*batch) + ->txn_req.req.set_cfg + .cfg_changes[indx] + .value); + } + } + } + + XFREE(MTYPE_MGMTD_BE_BATCH, *batch); + *batch = NULL; +} + +static void mgmt_be_cleanup_all_batches(struct mgmt_be_txn_ctx *txn) +{ + struct mgmt_be_batch_ctx *batch = NULL; + + FOREACH_BE_TXN_BATCH_IN_LIST (txn, batch) { + mgmt_be_batch_delete(txn, &batch); + } + + FOREACH_BE_APPLY_BATCH_IN_LIST (txn, batch) { + mgmt_be_batch_delete(txn, &batch); + } +} + +static struct mgmt_be_txn_ctx * +mgmt_be_find_txn_by_id(struct mgmt_be_client_ctx *client_ctx, + uint64_t txn_id) +{ + struct mgmt_be_txn_ctx *txn = NULL; + + FOREACH_BE_TXN_IN_LIST (client_ctx, txn) { + if (txn->txn_id == txn_id) + return txn; + } + + return NULL; +} + +static struct mgmt_be_txn_ctx * +mgmt_be_txn_create(struct mgmt_be_client_ctx *client_ctx, + uint64_t txn_id) +{ + struct mgmt_be_txn_ctx *txn = NULL; + + txn = mgmt_be_find_txn_by_id(client_ctx, txn_id); + if (!txn) { + txn = XCALLOC(MTYPE_MGMTD_BE_TXN, + sizeof(struct mgmt_be_txn_ctx)); + assert(txn); + + txn->txn_id = txn_id; + txn->client_ctx = client_ctx; + mgmt_be_batches_init(&txn->cfg_batches); + mgmt_be_batches_init(&txn->apply_cfgs); + mgmt_be_txns_add_tail(&client_ctx->txn_head, txn); + + MGMTD_BE_CLIENT_DBG("Added new transaction 0x%llx", + (unsigned long long)txn_id); + } + + return txn; +} + +static void mgmt_be_txn_delete(struct mgmt_be_client_ctx *client_ctx, + struct mgmt_be_txn_ctx **txn) +{ + char err_msg[] = "MGMT Transaction Delete"; + + if (!txn) + return; + + /* + * Remove the transaction from the list of transactions + * so that future lookups with the same transaction id + * does not return this one. + */ + mgmt_be_txns_del(&client_ctx->txn_head, *txn); + + /* + * Time to delete the transaction which should also + * take care of cleaning up all batches created via + * CFGDATA_CREATE_REQs. But first notify the client + * about the transaction delete. + */ + if (client_ctx->client_params.txn_notify) + (void)(*client_ctx->client_params + .txn_notify)( + (uintptr_t)client_ctx, + client_ctx->client_params.user_data, + &(*txn)->client_data, true); + + mgmt_be_cleanup_all_batches(*txn); + if ((*txn)->nb_txn) + nb_candidate_commit_abort((*txn)->nb_txn, err_msg, + sizeof(err_msg)); + XFREE(MTYPE_MGMTD_BE_TXN, *txn); + + *txn = NULL; +} + +static void +mgmt_be_cleanup_all_txns(struct mgmt_be_client_ctx *client_ctx) +{ + struct mgmt_be_txn_ctx *txn = NULL; + + FOREACH_BE_TXN_IN_LIST (client_ctx, txn) { + mgmt_be_txn_delete(client_ctx, &txn); + } +} + +static int mgmt_be_send_txn_reply(struct mgmt_be_client_ctx *client_ctx, + uint64_t txn_id, bool create, + bool success) +{ + Mgmtd__BeMessage be_msg; + Mgmtd__BeTxnReply txn_reply; + + mgmtd__be_txn_reply__init(&txn_reply); + txn_reply.create = create; + txn_reply.txn_id = txn_id; + txn_reply.success = success; + + mgmtd__be_message__init(&be_msg); + be_msg.message_case = MGMTD__BE_MESSAGE__MESSAGE_TXN_REPLY; + be_msg.txn_reply = &txn_reply; + + MGMTD_BE_CLIENT_DBG( + "Sending TXN_REPLY message to MGMTD for txn 0x%llx", + (unsigned long long)txn_id); + + return mgmt_be_client_send_msg(client_ctx, &be_msg); +} + +static int mgmt_be_process_txn_req(struct mgmt_be_client_ctx *client_ctx, + uint64_t txn_id, bool create) +{ + struct mgmt_be_txn_ctx *txn; + + txn = mgmt_be_find_txn_by_id(client_ctx, txn_id); + if (create) { + if (txn) { + /* + * Transaction with same txn-id already exists. + * Should not happen under any circumstances. + */ + MGMTD_BE_CLIENT_ERR( + "Transaction 0x%llx already exists!!!", + (unsigned long long)txn_id); + mgmt_be_send_txn_reply(client_ctx, txn_id, create, + false); + } + + MGMTD_BE_CLIENT_DBG("Created new transaction 0x%llx", + (unsigned long long)txn_id); + txn = mgmt_be_txn_create(client_ctx, txn_id); + + if (client_ctx->client_params.txn_notify) + (void)(*client_ctx->client_params + .txn_notify)( + (uintptr_t)client_ctx, + client_ctx->client_params.user_data, + &txn->client_data, false); + } else { + if (!txn) { + /* + * Transaction with same txn-id does not exists. + * Return sucess anyways. + */ + MGMTD_BE_CLIENT_DBG( + "Transaction to delete 0x%llx does NOT exists!!!", + (unsigned long long)txn_id); + } else { + MGMTD_BE_CLIENT_DBG("Delete transaction 0x%llx", + (unsigned long long)txn_id); + mgmt_be_txn_delete(client_ctx, &txn); + } + } + + mgmt_be_send_txn_reply(client_ctx, txn_id, create, true); + + return 0; +} + +static int +mgmt_be_send_cfgdata_create_reply(struct mgmt_be_client_ctx *client_ctx, + uint64_t txn_id, uint64_t batch_id, + bool success, const char *error_if_any) +{ + Mgmtd__BeMessage be_msg; + Mgmtd__BeCfgDataCreateReply cfgdata_reply; + + mgmtd__be_cfg_data_create_reply__init(&cfgdata_reply); + cfgdata_reply.txn_id = (uint64_t)txn_id; + cfgdata_reply.batch_id = (uint64_t)batch_id; + cfgdata_reply.success = success; + if (error_if_any) + cfgdata_reply.error_if_any = (char *)error_if_any; + + mgmtd__be_message__init(&be_msg); + be_msg.message_case = MGMTD__BE_MESSAGE__MESSAGE_CFG_DATA_REPLY; + be_msg.cfg_data_reply = &cfgdata_reply; + + MGMTD_BE_CLIENT_DBG( + "Sending CFGDATA_CREATE_REPLY message to MGMTD for txn 0x%llx batch 0x%llx", + (unsigned long long)txn_id, (unsigned long long)batch_id); + + return mgmt_be_client_send_msg(client_ctx, &be_msg); +} + +static void mgmt_be_txn_cfg_abort(struct mgmt_be_txn_ctx *txn) +{ + char errmsg[BUFSIZ] = {0}; + + assert(txn && txn->client_ctx); + if (txn->nb_txn) { + MGMTD_BE_CLIENT_ERR( + "Aborting configurations after prep for Txn 0x%llx", + (unsigned long long)txn->txn_id); + nb_candidate_commit_abort(txn->nb_txn, errmsg, sizeof(errmsg)); + txn->nb_txn = 0; + } + + /* + * revert candidate back to running + * + * This is one txn ctx but the candidate_config is per client ctx, how + * does that work? + */ + MGMTD_BE_CLIENT_DBG( + "Reset candidate configurations after abort of Txn 0x%llx", + (unsigned long long)txn->txn_id); + nb_config_replace(txn->client_ctx->candidate_config, + txn->client_ctx->running_config, true); +} + +static int mgmt_be_txn_cfg_prepare(struct mgmt_be_txn_ctx *txn) +{ + struct mgmt_be_client_ctx *client_ctx; + struct mgmt_be_txn_req *txn_req = NULL; + struct nb_context nb_ctx = {0}; + struct timeval edit_nb_cfg_start; + struct timeval edit_nb_cfg_end; + unsigned long edit_nb_cfg_tm; + struct timeval prep_nb_cfg_start; + struct timeval prep_nb_cfg_end; + unsigned long prep_nb_cfg_tm; + struct mgmt_be_batch_ctx *batch; + bool error; + char err_buf[BUFSIZ]; + size_t num_processed; + bool debug_be = mgmt_debug_be_client; + int err; + + assert(txn && txn->client_ctx); + client_ctx = txn->client_ctx; + + num_processed = 0; + FOREACH_BE_TXN_BATCH_IN_LIST (txn, batch) { + txn_req = &batch->txn_req; + error = false; + nb_ctx.client = NB_CLIENT_CLI; + nb_ctx.user = (void *)client_ctx->client_params.user_data; + + if (!txn->nb_txn) { + /* + * This happens when the current backend client is only + * interested in consuming the config items but is not + * interested in validating it. + */ + error = false; + if (debug_be) + gettimeofday(&edit_nb_cfg_start, NULL); + nb_candidate_edit_config_changes( + client_ctx->candidate_config, + txn_req->req.set_cfg.cfg_changes, + (size_t)txn_req->req.set_cfg.num_cfg_changes, + NULL, NULL, 0, err_buf, sizeof(err_buf), + &error); + if (error) { + err_buf[sizeof(err_buf) - 1] = 0; + MGMTD_BE_CLIENT_ERR( + "Failed to update configs for Txn %llx Batch %llx to Candidate! Err: '%s'", + (unsigned long long)txn->txn_id, + (unsigned long long)batch->batch_id, + err_buf); + return -1; + } + if (debug_be) { + gettimeofday(&edit_nb_cfg_end, NULL); + edit_nb_cfg_tm = timeval_elapsed( + edit_nb_cfg_end, edit_nb_cfg_start); + client_ctx->avg_edit_nb_cfg_tm = + ((client_ctx->avg_edit_nb_cfg_tm + * client_ctx->num_edit_nb_cfg) + + edit_nb_cfg_tm) + / (client_ctx->num_edit_nb_cfg + 1); + } + client_ctx->num_edit_nb_cfg++; + } + + num_processed++; + } + + if (!num_processed) + return 0; + + /* + * Now prepare all the batches we have applied in one go. + */ + nb_ctx.client = NB_CLIENT_CLI; + nb_ctx.user = (void *)client_ctx->client_params.user_data; + if (debug_be) + gettimeofday(&prep_nb_cfg_start, NULL); + err = nb_candidate_commit_prepare(nb_ctx, client_ctx->candidate_config, + "MGMTD Backend Txn", &txn->nb_txn, +#ifdef MGMTD_LOCAL_VALIDATIONS_ENABLED + true, true, +#else + false, true, +#endif + err_buf, sizeof(err_buf) - 1); + if (err != NB_OK) { + err_buf[sizeof(err_buf) - 1] = 0; + if (err == NB_ERR_VALIDATION) + MGMTD_BE_CLIENT_ERR( + "Failed to validate configs for Txn %llx %u Batches! Err: '%s'", + (unsigned long long)txn->txn_id, + (uint32_t)num_processed, err_buf); + else + MGMTD_BE_CLIENT_ERR( + "Failed to prepare configs for Txn %llx, %u Batches! Err: '%s'", + (unsigned long long)txn->txn_id, + (uint32_t)num_processed, err_buf); + error = true; + SET_FLAG(txn->flags, MGMTD_BE_TXN_FLAGS_CFGPREP_FAILED); + } else + MGMTD_BE_CLIENT_DBG( + "Prepared configs for Txn %llx, %u Batches! successfully!", + (unsigned long long)txn->txn_id, + (uint32_t)num_processed); + if (debug_be) { + gettimeofday(&prep_nb_cfg_end, NULL); + prep_nb_cfg_tm = + timeval_elapsed(prep_nb_cfg_end, prep_nb_cfg_start); + client_ctx->avg_prep_nb_cfg_tm = + ((client_ctx->avg_prep_nb_cfg_tm + * client_ctx->num_prep_nb_cfg) + + prep_nb_cfg_tm) + / (client_ctx->num_prep_nb_cfg + 1); + } + client_ctx->num_prep_nb_cfg++; + + FOREACH_BE_TXN_BATCH_IN_LIST (txn, batch) { + mgmt_be_send_cfgdata_create_reply( + client_ctx, txn->txn_id, batch->batch_id, + error ? false : true, error ? err_buf : NULL); + if (!error) { + SET_FLAG(batch->flags, + MGMTD_BE_BATCH_FLAGS_CFG_PREPARED); + mgmt_be_batches_del(&txn->cfg_batches, batch); + mgmt_be_batches_add_tail(&txn->apply_cfgs, batch); + } + } + + if (debug_be) + MGMTD_BE_CLIENT_DBG( + "Avg-nb-edit-duration %lu uSec, nb-prep-duration %lu (avg: %lu) uSec, batch size %u", + client_ctx->avg_edit_nb_cfg_tm, prep_nb_cfg_tm, + client_ctx->avg_prep_nb_cfg_tm, (uint32_t)num_processed); + + if (error) + mgmt_be_txn_cfg_abort(txn); + + return 0; +} + +/* + * Process all CFG_DATA_REQs received so far and prepare them all in one go. + */ +static int +mgmt_be_update_setcfg_in_batch(struct mgmt_be_client_ctx *client_ctx, + struct mgmt_be_txn_ctx *txn, + uint64_t batch_id, + Mgmtd__YangCfgDataReq * cfg_req[], + int num_req) +{ + struct mgmt_be_batch_ctx *batch = NULL; + struct mgmt_be_txn_req *txn_req = NULL; + int index; + struct nb_cfg_change *cfg_chg; + + batch = mgmt_be_batch_create(txn, batch_id); + if (!batch) { + MGMTD_BE_CLIENT_ERR("Batch create failed!"); + return -1; + } + + txn_req = &batch->txn_req; + txn_req->event = MGMTD_BE_TXN_PROC_SETCFG; + MGMTD_BE_CLIENT_DBG( + "Created Set-Config request for batch 0x%llx, txn id 0x%llx, cfg-items:%d", + (unsigned long long)batch_id, (unsigned long long)txn->txn_id, + num_req); + + txn_req->req.set_cfg.num_cfg_changes = num_req; + for (index = 0; index < num_req; index++) { + cfg_chg = &txn_req->req.set_cfg.cfg_changes[index]; + + if (cfg_req[index]->req_type + == MGMTD__CFG_DATA_REQ_TYPE__DELETE_DATA) + cfg_chg->operation = NB_OP_DESTROY; + else + cfg_chg->operation = NB_OP_CREATE; + + strlcpy(cfg_chg->xpath, cfg_req[index]->data->xpath, + sizeof(cfg_chg->xpath)); + cfg_chg->value = (cfg_req[index]->data->value + && cfg_req[index] + ->data->value + ->encoded_str_val + ? strdup(cfg_req[index] + ->data->value + ->encoded_str_val) + : NULL); + if (cfg_chg->value + && !strncmp(cfg_chg->value, MGMTD_BE_CONTAINER_NODE_VAL, + strlen(MGMTD_BE_CONTAINER_NODE_VAL))) { + free((char *)cfg_chg->value); + cfg_chg->value = NULL; + } + } + + return 0; +} + +static int +mgmt_be_process_cfgdata_req(struct mgmt_be_client_ctx *client_ctx, + uint64_t txn_id, uint64_t batch_id, + Mgmtd__YangCfgDataReq * cfg_req[], int num_req, + bool end_of_data) +{ + struct mgmt_be_txn_ctx *txn; + + txn = mgmt_be_find_txn_by_id(client_ctx, txn_id); + if (!txn) { + MGMTD_BE_CLIENT_ERR( + "Invalid txn-id 0x%llx provided from MGMTD server", + (unsigned long long)txn_id); + mgmt_be_send_cfgdata_create_reply( + client_ctx, txn_id, batch_id, false, + "Transaction context not created yet"); + } else { + mgmt_be_update_setcfg_in_batch(client_ctx, txn, batch_id, + cfg_req, num_req); + } + + if (txn && end_of_data) { + MGMTD_BE_CLIENT_DBG("Triggering CFG_PREPARE_REQ processing"); + mgmt_be_txn_cfg_prepare(txn); + } + + return 0; +} + +static int mgmt_be_send_apply_reply(struct mgmt_be_client_ctx *client_ctx, + uint64_t txn_id, uint64_t batch_ids[], + size_t num_batch_ids, bool success, + const char *error_if_any) +{ + Mgmtd__BeMessage be_msg; + Mgmtd__BeCfgDataApplyReply apply_reply; + + mgmtd__be_cfg_data_apply_reply__init(&apply_reply); + apply_reply.success = success; + apply_reply.txn_id = txn_id; + apply_reply.batch_ids = (uint64_t *)batch_ids; + apply_reply.n_batch_ids = num_batch_ids; + + if (error_if_any) + apply_reply.error_if_any = (char *)error_if_any; + + mgmtd__be_message__init(&be_msg); + be_msg.message_case = MGMTD__BE_MESSAGE__MESSAGE_CFG_APPLY_REPLY; + be_msg.cfg_apply_reply = &apply_reply; + + MGMTD_BE_CLIENT_DBG( + "Sending CFG_APPLY_REPLY message to MGMTD for txn 0x%llx, %d batches [0x%llx - 0x%llx]", + (unsigned long long)txn_id, (int)num_batch_ids, + success && num_batch_ids ? + (unsigned long long)batch_ids[0] : 0, + success && num_batch_ids ? + (unsigned long long)batch_ids[num_batch_ids - 1] : 0); + + return mgmt_be_client_send_msg(client_ctx, &be_msg); +} + +static int mgmt_be_txn_proc_cfgapply(struct mgmt_be_txn_ctx *txn) +{ + struct mgmt_be_client_ctx *client_ctx; + struct timeval apply_nb_cfg_start; + struct timeval apply_nb_cfg_end; + unsigned long apply_nb_cfg_tm; + struct mgmt_be_batch_ctx *batch; + char err_buf[BUFSIZ]; + size_t num_processed; + static uint64_t batch_ids[MGMTD_BE_MAX_BATCH_IDS_IN_REQ]; + bool debug_be = mgmt_debug_be_client; + + assert(txn && txn->client_ctx); + client_ctx = txn->client_ctx; + + assert(txn->nb_txn); + num_processed = 0; + + /* + * Now apply all the batches we have applied in one go. + */ + if (debug_be) + gettimeofday(&apply_nb_cfg_start, NULL); + (void)nb_candidate_commit_apply(txn->nb_txn, true, &txn->nb_txn_id, + err_buf, sizeof(err_buf) - 1); + if (debug_be) { + gettimeofday(&apply_nb_cfg_end, NULL); + apply_nb_cfg_tm = + timeval_elapsed(apply_nb_cfg_end, apply_nb_cfg_start); + client_ctx->avg_apply_nb_cfg_tm = + ((client_ctx->avg_apply_nb_cfg_tm + * client_ctx->num_apply_nb_cfg) + + apply_nb_cfg_tm) + / (client_ctx->num_apply_nb_cfg + 1); + } + client_ctx->num_apply_nb_cfg++; + txn->nb_txn = NULL; + + /* + * Send back CFG_APPLY_REPLY for all batches applied. + */ + FOREACH_BE_APPLY_BATCH_IN_LIST (txn, batch) { + /* + * No need to delete the batch yet. Will be deleted during + * transaction cleanup on receiving TXN_DELETE_REQ. + */ + SET_FLAG(batch->flags, MGMTD_BE_TXN_FLAGS_CFG_APPLIED); + mgmt_be_batches_del(&txn->apply_cfgs, batch); + mgmt_be_batches_add_tail(&txn->cfg_batches, batch); + + batch_ids[num_processed] = batch->batch_id; + num_processed++; + if (num_processed == MGMTD_BE_MAX_BATCH_IDS_IN_REQ) { + mgmt_be_send_apply_reply(client_ctx, txn->txn_id, + batch_ids, num_processed, + true, NULL); + num_processed = 0; + } + } + + mgmt_be_send_apply_reply(client_ctx, txn->txn_id, batch_ids, + num_processed, true, NULL); + + if (debug_be) + MGMTD_BE_CLIENT_DBG("Nb-apply-duration %lu (avg: %lu) uSec", + apply_nb_cfg_tm, + client_ctx->avg_apply_nb_cfg_tm); + + return 0; +} + +static int +mgmt_be_process_cfg_apply(struct mgmt_be_client_ctx *client_ctx, + uint64_t txn_id) +{ + struct mgmt_be_txn_ctx *txn; + + txn = mgmt_be_find_txn_by_id(client_ctx, txn_id); + if (!txn) { + mgmt_be_send_apply_reply(client_ctx, txn_id, NULL, 0, false, + "Transaction not created yet!"); + return -1; + } + + MGMTD_BE_CLIENT_DBG("Trigger CFG_APPLY_REQ processing"); + mgmt_be_txn_proc_cfgapply(txn); + + return 0; +} + +static int +mgmt_be_client_handle_msg(struct mgmt_be_client_ctx *client_ctx, + Mgmtd__BeMessage *be_msg) +{ + switch (be_msg->message_case) { + case MGMTD__BE_MESSAGE__MESSAGE_SUBSCR_REPLY: + MGMTD_BE_CLIENT_DBG("Subscribe Reply Msg from mgmt, status %u", + be_msg->subscr_reply->success); + break; + case MGMTD__BE_MESSAGE__MESSAGE_TXN_REQ: + mgmt_be_process_txn_req(client_ctx, + be_msg->txn_req->txn_id, + be_msg->txn_req->create); + break; + case MGMTD__BE_MESSAGE__MESSAGE_CFG_DATA_REQ: + mgmt_be_process_cfgdata_req( + client_ctx, be_msg->cfg_data_req->txn_id, + be_msg->cfg_data_req->batch_id, + be_msg->cfg_data_req->data_req, + be_msg->cfg_data_req->n_data_req, + be_msg->cfg_data_req->end_of_data); + break; + case MGMTD__BE_MESSAGE__MESSAGE_CFG_APPLY_REQ: + mgmt_be_process_cfg_apply( + client_ctx, (uint64_t)be_msg->cfg_apply_req->txn_id); + break; + case MGMTD__BE_MESSAGE__MESSAGE_GET_REQ: + case MGMTD__BE_MESSAGE__MESSAGE_SUBSCR_REQ: + case MGMTD__BE_MESSAGE__MESSAGE_CFG_CMD_REQ: + case MGMTD__BE_MESSAGE__MESSAGE_SHOW_CMD_REQ: + /* + * TODO: Add handling code in future. + */ + break; + /* + * NOTE: The following messages are always sent from Backend + * clients to MGMTd only and/or need not be handled here. + */ + case MGMTD__BE_MESSAGE__MESSAGE_GET_REPLY: + case MGMTD__BE_MESSAGE__MESSAGE_TXN_REPLY: + case MGMTD__BE_MESSAGE__MESSAGE_CFG_DATA_REPLY: + case MGMTD__BE_MESSAGE__MESSAGE_CFG_APPLY_REPLY: + case MGMTD__BE_MESSAGE__MESSAGE_CFG_CMD_REPLY: + case MGMTD__BE_MESSAGE__MESSAGE_SHOW_CMD_REPLY: + case MGMTD__BE_MESSAGE__MESSAGE_NOTIFY_DATA: + case MGMTD__BE_MESSAGE__MESSAGE__NOT_SET: +#if PROTOBUF_C_VERSION_NUMBER >= 1003000 + case _MGMTD__BE_MESSAGE__MESSAGE_IS_INT_SIZE: +#endif + default: + /* + * A 'default' case is being added contrary to the + * FRR code guidelines to take care of build + * failures on certain build systems (courtesy of + * the proto-c package). + */ + break; + } + + return 0; +} + +static int +mgmt_be_client_process_msg(struct mgmt_be_client_ctx *client_ctx, + uint8_t *msg_buf, int bytes_read) +{ + Mgmtd__BeMessage *be_msg; + struct mgmt_be_msg *msg; + uint16_t bytes_left; + uint16_t processed = 0; + + MGMTD_BE_CLIENT_DBG( + "Got message of %d bytes from MGMTD Backend Server", + bytes_read); + + bytes_left = bytes_read; + for (; bytes_left > MGMTD_BE_MSG_HDR_LEN; + bytes_left -= msg->hdr.len, msg_buf += msg->hdr.len) { + msg = (struct mgmt_be_msg *)msg_buf; + if (msg->hdr.marker != MGMTD_BE_MSG_MARKER) { + MGMTD_BE_CLIENT_DBG( + "Marker not found in message from MGMTD '%s'", + client_ctx->client_params.name); + break; + } + + if (bytes_left < msg->hdr.len) { + MGMTD_BE_CLIENT_DBG( + "Incomplete message of %d bytes (epxected: %u) from MGMTD '%s'", + bytes_left, msg->hdr.len, + client_ctx->client_params.name); + break; + } + + be_msg = mgmtd__be_message__unpack( + NULL, (size_t)(msg->hdr.len - MGMTD_BE_MSG_HDR_LEN), + msg->payload); + if (!be_msg) { + MGMTD_BE_CLIENT_DBG( + "Failed to decode %d bytes from MGMTD '%s'", + msg->hdr.len, client_ctx->client_params.name); + continue; + } + + (void)mgmt_be_client_handle_msg(client_ctx, be_msg); + mgmtd__be_message__free_unpacked(be_msg, NULL); + processed++; + client_ctx->num_msg_rx++; + } + + return processed; +} + +static void mgmt_be_client_proc_msgbufs(struct thread *thread) +{ + struct mgmt_be_client_ctx *client_ctx; + struct stream *work; + int processed = 0; + + client_ctx = (struct mgmt_be_client_ctx *)THREAD_ARG(thread); + assert(client_ctx); + + if (client_ctx->conn_fd == 0) + return; + + for (; processed < MGMTD_BE_MAX_NUM_MSG_PROC;) { + work = stream_fifo_pop_safe(client_ctx->ibuf_fifo); + if (!work) + break; + + processed += mgmt_be_client_process_msg( + client_ctx, STREAM_DATA(work), stream_get_endp(work)); + + if (work != client_ctx->ibuf_work) { + /* Free it up */ + stream_free(work); + } else { + /* Reset stream buffer for next read */ + stream_reset(work); + } + } + + /* + * If we have more to process, reschedule for processing it. + */ + if (stream_fifo_head(client_ctx->ibuf_fifo)) + mgmt_be_client_register_event(client_ctx, + MGMTD_BE_PROC_MSG); +} + +static void mgmt_be_client_read(struct thread *thread) +{ + struct mgmt_be_client_ctx *client_ctx; + int bytes_read, msg_cnt; + size_t total_bytes, bytes_left; + struct mgmt_be_msg_hdr *msg_hdr; + bool incomplete = false; + + client_ctx = (struct mgmt_be_client_ctx *)THREAD_ARG(thread); + assert(client_ctx && client_ctx->conn_fd); + + total_bytes = 0; + bytes_left = STREAM_SIZE(client_ctx->ibuf_work) + - stream_get_endp(client_ctx->ibuf_work); + for (; bytes_left > MGMTD_BE_MSG_HDR_LEN;) { + bytes_read = stream_read_try(client_ctx->ibuf_work, + client_ctx->conn_fd, bytes_left); + MGMTD_BE_CLIENT_DBG( + "Got %d bytes of message from MGMTD Backend server", + bytes_read); + /* -2 is normal nothing read, and to retry */ + if (bytes_read == -2) + break; + if (bytes_read <= 0) { + if (bytes_read == 0) { + MGMTD_BE_CLIENT_ERR( + "Got EOF/disconnect while reading from MGMTD Frontend server."); + } else { + MGMTD_BE_CLIENT_ERR( + "Got error (%d) while reading from MGMTD Backend server. Err: '%s'", + bytes_read, safe_strerror(errno)); + } + mgmt_be_server_disconnect(client_ctx, true); + return; + } + total_bytes += bytes_read; + bytes_left -= bytes_read; + } + /* + * Check if we have read complete messages or not. + */ + stream_set_getp(client_ctx->ibuf_work, 0); + total_bytes = 0; + msg_cnt = 0; + bytes_left = stream_get_endp(client_ctx->ibuf_work); + for (; bytes_left > MGMTD_BE_MSG_HDR_LEN;) { + msg_hdr = (struct mgmt_be_msg_hdr + *)(STREAM_DATA(client_ctx->ibuf_work) + + total_bytes); + if (msg_hdr->marker != MGMTD_BE_MSG_MARKER) { + /* Corrupted buffer. Force disconnect?? */ + MGMTD_BE_CLIENT_ERR( + "Received corrupted buffer from MGMTD backend server."); + mgmt_be_server_disconnect(client_ctx, true); + return; + } + if (msg_hdr->len > bytes_left) + break; + + total_bytes += msg_hdr->len; + bytes_left -= msg_hdr->len; + msg_cnt++; + } + + if (!msg_cnt) + goto resched; + + if (bytes_left > 0) + incomplete = true; + + /* + * We have read one or several messages. + * Schedule processing them now. + */ + msg_hdr = + (struct mgmt_be_msg_hdr *)(STREAM_DATA(client_ctx->ibuf_work) + + total_bytes); + stream_set_endp(client_ctx->ibuf_work, total_bytes); + stream_fifo_push(client_ctx->ibuf_fifo, client_ctx->ibuf_work); + client_ctx->ibuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); + if (incomplete) { + stream_put(client_ctx->ibuf_work, msg_hdr, bytes_left); + stream_set_endp(client_ctx->ibuf_work, bytes_left); + } + mgmt_be_client_register_event(client_ctx, MGMTD_BE_PROC_MSG); + +resched: + mgmt_be_client_register_event(client_ctx, MGMTD_BE_CONN_READ); +} + +static inline void +mgmt_be_client_sched_msg_write(struct mgmt_be_client_ctx *client_ctx) +{ + if (!CHECK_FLAG(client_ctx->flags, MGMTD_BE_CLIENT_FLAGS_WRITES_OFF)) + mgmt_be_client_register_event(client_ctx, + MGMTD_BE_CONN_WRITE); +} + +static inline void +mgmt_be_client_writes_on(struct mgmt_be_client_ctx *client_ctx) +{ + MGMTD_BE_CLIENT_DBG("Resume writing msgs"); + UNSET_FLAG(client_ctx->flags, MGMTD_BE_CLIENT_FLAGS_WRITES_OFF); + if (client_ctx->obuf_work + || stream_fifo_count_safe(client_ctx->obuf_fifo)) + mgmt_be_client_sched_msg_write(client_ctx); +} + +static inline void +mgmt_be_client_writes_off(struct mgmt_be_client_ctx *client_ctx) +{ + SET_FLAG(client_ctx->flags, MGMTD_BE_CLIENT_FLAGS_WRITES_OFF); + MGMTD_BE_CLIENT_DBG("Paused writing msgs"); +} + +static int mgmt_be_client_send_msg(struct mgmt_be_client_ctx *client_ctx, + Mgmtd__BeMessage *be_msg) +{ + size_t msg_size; + uint8_t *msg_buf = client_ctx->msg_buf; + struct mgmt_be_msg *msg; + + if (client_ctx->conn_fd == 0) + return -1; + + msg_size = mgmtd__be_message__get_packed_size(be_msg); + msg_size += MGMTD_BE_MSG_HDR_LEN; + if (msg_size > MGMTD_BE_MSG_MAX_LEN) { + MGMTD_BE_CLIENT_ERR( + "Message size %d more than max size'%d. Not sending!'", + (int)msg_size, (int)MGMTD_BE_MSG_MAX_LEN); + return -1; + } + + msg = (struct mgmt_be_msg *)msg_buf; + msg->hdr.marker = MGMTD_BE_MSG_MARKER; + msg->hdr.len = (uint16_t)msg_size; + mgmtd__be_message__pack(be_msg, msg->payload); + + if (!client_ctx->obuf_work) + client_ctx->obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); + if (STREAM_WRITEABLE(client_ctx->obuf_work) < msg_size) { + stream_fifo_push(client_ctx->obuf_fifo, client_ctx->obuf_work); + client_ctx->obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); + } + stream_write(client_ctx->obuf_work, (void *)msg_buf, msg_size); + + mgmt_be_client_sched_msg_write(client_ctx); + client_ctx->num_msg_tx++; + return 0; +} + +static void mgmt_be_client_write(struct thread *thread) +{ + int bytes_written = 0; + int processed = 0; + int msg_size = 0; + struct stream *s = NULL; + struct stream *free = NULL; + struct mgmt_be_client_ctx *client_ctx; + + client_ctx = (struct mgmt_be_client_ctx *)THREAD_ARG(thread); + assert(client_ctx && client_ctx->conn_fd); + + /* Ensure pushing any pending write buffer to FIFO */ + if (client_ctx->obuf_work) { + stream_fifo_push(client_ctx->obuf_fifo, client_ctx->obuf_work); + client_ctx->obuf_work = NULL; + } + + for (s = stream_fifo_head(client_ctx->obuf_fifo); + s && processed < MGMTD_BE_MAX_NUM_MSG_WRITE; + s = stream_fifo_head(client_ctx->obuf_fifo)) { + /* msg_size = (int)stream_get_size(s); */ + msg_size = (int)STREAM_READABLE(s); + bytes_written = stream_flush(s, client_ctx->conn_fd); + if (bytes_written == -1 + && (errno == EAGAIN || errno == EWOULDBLOCK)) { + mgmt_be_client_register_event( + client_ctx, MGMTD_BE_CONN_WRITE); + return; + } else if (bytes_written != msg_size) { + MGMTD_BE_CLIENT_ERR( + "Could not write all %d bytes (wrote: %d) to MGMTD Backend client socket. Err: '%s'", + msg_size, bytes_written, safe_strerror(errno)); + if (bytes_written > 0) { + stream_forward_getp(s, (size_t)bytes_written); + stream_pulldown(s); + mgmt_be_client_register_event( + client_ctx, MGMTD_BE_CONN_WRITE); + return; + } + mgmt_be_server_disconnect(client_ctx, true); + return; + } + + free = stream_fifo_pop(client_ctx->obuf_fifo); + stream_free(free); + MGMTD_BE_CLIENT_DBG( + "Wrote %d bytes of message to MGMTD Backend client socket.'", + bytes_written); + processed++; + } + + if (s) { + mgmt_be_client_writes_off(client_ctx); + mgmt_be_client_register_event(client_ctx, + MGMTD_BE_CONN_WRITES_ON); + } +} + +static void mgmt_be_client_resume_writes(struct thread *thread) +{ + struct mgmt_be_client_ctx *client_ctx; + + client_ctx = (struct mgmt_be_client_ctx *)THREAD_ARG(thread); + assert(client_ctx && client_ctx->conn_fd); + + mgmt_be_client_writes_on(client_ctx); +} + +static int mgmt_be_send_subscr_req(struct mgmt_be_client_ctx *client_ctx, + bool subscr_xpaths, + uint16_t num_reg_xpaths, + char **reg_xpaths) +{ + Mgmtd__BeMessage be_msg; + Mgmtd__BeSubscribeReq subscr_req; + + mgmtd__be_subscribe_req__init(&subscr_req); + subscr_req.client_name = client_ctx->client_params.name; + subscr_req.n_xpath_reg = num_reg_xpaths; + if (num_reg_xpaths) + subscr_req.xpath_reg = reg_xpaths; + else + subscr_req.xpath_reg = NULL; + subscr_req.subscribe_xpaths = subscr_xpaths; + + mgmtd__be_message__init(&be_msg); + be_msg.message_case = MGMTD__BE_MESSAGE__MESSAGE_SUBSCR_REQ; + be_msg.subscr_req = &subscr_req; + + return mgmt_be_client_send_msg(client_ctx, &be_msg); +} + +static int mgmt_be_server_connect(struct mgmt_be_client_ctx *client_ctx) +{ + int ret, sock, len; + struct sockaddr_un addr; + + MGMTD_BE_CLIENT_DBG("Trying to connect to MGMTD Backend server at %s", + MGMTD_BE_SERVER_PATH); + + assert(!client_ctx->conn_fd); + + sock = socket(AF_UNIX, SOCK_STREAM, 0); + if (sock < 0) { + MGMTD_BE_CLIENT_ERR("Failed to create socket"); + goto mgmt_be_server_connect_failed; + } + + MGMTD_BE_CLIENT_DBG( + "Created MGMTD Backend server socket successfully!"); + + memset(&addr, 0, sizeof(struct sockaddr_un)); + addr.sun_family = AF_UNIX; + strlcpy(addr.sun_path, MGMTD_BE_SERVER_PATH, sizeof(addr.sun_path)); +#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN + len = addr.sun_len = SUN_LEN(&addr); +#else + len = sizeof(addr.sun_family) + strlen(addr.sun_path); +#endif /* HAVE_STRUCT_SOCKADDR_UN_SUN_LEN */ + + ret = connect(sock, (struct sockaddr *)&addr, len); + if (ret < 0) { + MGMTD_BE_CLIENT_ERR( + "Failed to connect to MGMTD Backend Server at %s. Err: %s", + addr.sun_path, safe_strerror(errno)); + close(sock); + goto mgmt_be_server_connect_failed; + } + + MGMTD_BE_CLIENT_DBG( + "Connected to MGMTD Backend Server at %s successfully!", + addr.sun_path); + client_ctx->conn_fd = sock; + + /* Make client socket non-blocking. */ + set_nonblocking(sock); + setsockopt_so_sendbuf(client_ctx->conn_fd, + MGMTD_SOCKET_BE_SEND_BUF_SIZE); + setsockopt_so_recvbuf(client_ctx->conn_fd, + MGMTD_SOCKET_BE_RECV_BUF_SIZE); + + mgmt_be_client_register_event(client_ctx, MGMTD_BE_CONN_READ); + + /* Notify client through registered callback (if any) */ + if (client_ctx->client_params.client_connect_notify) + (void)(*client_ctx->client_params + .client_connect_notify)( + (uintptr_t)client_ctx, + client_ctx->client_params.user_data, true); + + /* Send SUBSCRIBE_REQ message */ + if (mgmt_be_send_subscr_req(client_ctx, false, 0, NULL) != 0) + goto mgmt_be_server_connect_failed; + + return 0; + +mgmt_be_server_connect_failed: + if (sock && sock != client_ctx->conn_fd) + close(sock); + + mgmt_be_server_disconnect(client_ctx, true); + return -1; +} + +static void mgmt_be_client_conn_timeout(struct thread *thread) +{ + struct mgmt_be_client_ctx *client_ctx; + + client_ctx = (struct mgmt_be_client_ctx *)THREAD_ARG(thread); + assert(client_ctx); + + mgmt_be_server_connect(client_ctx); +} + +static void +mgmt_be_client_register_event(struct mgmt_be_client_ctx *client_ctx, + enum mgmt_be_event event) +{ + struct timeval tv = {0}; + + switch (event) { + case MGMTD_BE_CONN_READ: + thread_add_read(client_ctx->tm, mgmt_be_client_read, + client_ctx, client_ctx->conn_fd, + &client_ctx->conn_read_ev); + assert(client_ctx->conn_read_ev); + break; + case MGMTD_BE_CONN_WRITE: + thread_add_write(client_ctx->tm, mgmt_be_client_write, + client_ctx, client_ctx->conn_fd, + &client_ctx->conn_write_ev); + assert(client_ctx->conn_write_ev); + break; + case MGMTD_BE_PROC_MSG: + tv.tv_usec = MGMTD_BE_MSG_PROC_DELAY_USEC; + thread_add_timer_tv(client_ctx->tm, + mgmt_be_client_proc_msgbufs, client_ctx, + &tv, &client_ctx->msg_proc_ev); + assert(client_ctx->msg_proc_ev); + break; + case MGMTD_BE_CONN_WRITES_ON: + thread_add_timer_msec( + client_ctx->tm, mgmt_be_client_resume_writes, + client_ctx, MGMTD_BE_MSG_WRITE_DELAY_MSEC, + &client_ctx->conn_writes_on); + assert(client_ctx->conn_writes_on); + break; + case MGMTD_BE_SERVER: + case MGMTD_BE_CONN_INIT: + case MGMTD_BE_SCHED_CFG_PREPARE: + case MGMTD_BE_RESCHED_CFG_PREPARE: + case MGMTD_BE_SCHED_CFG_APPLY: + case MGMTD_BE_RESCHED_CFG_APPLY: + assert(!"mgmt_be_client_post_event() called incorrectly"); + break; + } +} + +static void +mgmt_be_client_schedule_conn_retry(struct mgmt_be_client_ctx *client_ctx, + unsigned long intvl_secs) +{ + MGMTD_BE_CLIENT_DBG( + "Scheduling MGMTD Backend server connection retry after %lu seconds", + intvl_secs); + thread_add_timer(client_ctx->tm, mgmt_be_client_conn_timeout, + (void *)client_ctx, intvl_secs, + &client_ctx->conn_retry_tmr); +} + +extern struct nb_config *running_config; + +/* + * Initialize library and try connecting with MGMTD. + */ +uintptr_t mgmt_be_client_lib_init(struct mgmt_be_client_params *params, + struct thread_master *master_thread) +{ + assert(master_thread && params && strlen(params->name) + && !mgmt_be_client_ctx.tm); + + mgmt_be_client_ctx.tm = master_thread; + + if (!running_config) + assert(!"MGMTD Be Client lib_init() after frr_init() only!"); + mgmt_be_client_ctx.running_config = running_config; + mgmt_be_client_ctx.candidate_config = nb_config_new(NULL); + + memcpy(&mgmt_be_client_ctx.client_params, params, + sizeof(mgmt_be_client_ctx.client_params)); + if (!mgmt_be_client_ctx.client_params.conn_retry_intvl_sec) + mgmt_be_client_ctx.client_params.conn_retry_intvl_sec = + MGMTD_BE_DEFAULT_CONN_RETRY_INTVL_SEC; + + assert(!mgmt_be_client_ctx.ibuf_fifo && !mgmt_be_client_ctx.ibuf_work && + !mgmt_be_client_ctx.obuf_fifo && !mgmt_be_client_ctx.obuf_work); + + mgmt_be_txns_init(&mgmt_be_client_ctx.txn_head); + mgmt_be_client_ctx.ibuf_fifo = stream_fifo_new(); + mgmt_be_client_ctx.ibuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); + mgmt_be_client_ctx.obuf_fifo = stream_fifo_new(); + /* mgmt_be_client_ctx.obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); + */ + mgmt_be_client_ctx.obuf_work = NULL; + + /* Start trying to connect to MGMTD backend server immediately */ + mgmt_be_client_schedule_conn_retry(&mgmt_be_client_ctx, 1); + + MGMTD_BE_CLIENT_DBG("Initialized client '%s'", params->name); + + return (uintptr_t)&mgmt_be_client_ctx; +} + +/* + * Subscribe with MGMTD for one or more YANG subtree(s). + */ +enum mgmt_result mgmt_be_subscribe_yang_data(uintptr_t lib_hndl, + char *reg_yang_xpaths[], + int num_reg_xpaths) +{ + struct mgmt_be_client_ctx *client_ctx; + + client_ctx = (struct mgmt_be_client_ctx *)lib_hndl; + if (!client_ctx) + return MGMTD_INVALID_PARAM; + + if (mgmt_be_send_subscr_req(client_ctx, true, num_reg_xpaths, + reg_yang_xpaths) + != 0) + return MGMTD_INTERNAL_ERROR; + + return MGMTD_SUCCESS; +} + +/* + * Unsubscribe with MGMTD for one or more YANG subtree(s). + */ +enum mgmt_result mgmt_be_unsubscribe_yang_data(uintptr_t lib_hndl, + char *reg_yang_xpaths[], + int num_reg_xpaths) +{ + struct mgmt_be_client_ctx *client_ctx; + + client_ctx = (struct mgmt_be_client_ctx *)lib_hndl; + if (!client_ctx) + return MGMTD_INVALID_PARAM; + + + if (mgmt_be_send_subscr_req(client_ctx, false, num_reg_xpaths, + reg_yang_xpaths) + < 0) + return MGMTD_INTERNAL_ERROR; + + return MGMTD_SUCCESS; +} + +/* + * Send one or more YANG notifications to MGMTD daemon. + */ +enum mgmt_result mgmt_be_send_yang_notify(uintptr_t lib_hndl, + Mgmtd__YangData * data_elems[], + int num_elems) +{ + struct mgmt_be_client_ctx *client_ctx; + + client_ctx = (struct mgmt_be_client_ctx *)lib_hndl; + if (!client_ctx) + return MGMTD_INVALID_PARAM; + + return MGMTD_SUCCESS; +} + +/* + * Destroy library and cleanup everything. + */ +void mgmt_be_client_lib_destroy(uintptr_t lib_hndl) +{ + struct mgmt_be_client_ctx *client_ctx; + + client_ctx = (struct mgmt_be_client_ctx *)lib_hndl; + assert(client_ctx); + + MGMTD_BE_CLIENT_DBG("Destroying MGMTD Backend Client '%s'", + client_ctx->client_params.name); + + mgmt_be_server_disconnect(client_ctx, false); + + assert(mgmt_be_client_ctx.ibuf_fifo && mgmt_be_client_ctx.obuf_fifo); + + stream_fifo_free(mgmt_be_client_ctx.ibuf_fifo); + if (mgmt_be_client_ctx.ibuf_work) + stream_free(mgmt_be_client_ctx.ibuf_work); + stream_fifo_free(mgmt_be_client_ctx.obuf_fifo); + if (mgmt_be_client_ctx.obuf_work) + stream_free(mgmt_be_client_ctx.obuf_work); + + THREAD_OFF(client_ctx->conn_retry_tmr); + THREAD_OFF(client_ctx->conn_read_ev); + THREAD_OFF(client_ctx->conn_write_ev); + THREAD_OFF(client_ctx->conn_writes_on); + THREAD_OFF(client_ctx->msg_proc_ev); + mgmt_be_cleanup_all_txns(client_ctx); + mgmt_be_txns_fini(&client_ctx->txn_head); +} diff --git a/lib/mgmt_be_client.h b/lib/mgmt_be_client.h new file mode 100644 index 000000000..75b4ddf01 --- /dev/null +++ b/lib/mgmt_be_client.h @@ -0,0 +1,273 @@ +// SPDX-License-Identifier: GPL-2.0-or-later +/* + * MGMTD Backend Client Library api interfaces + * Copyright (C) 2021 Vmware, Inc. + * Pushpasis Sarkar <spushpasis@vmware.com> + */ + +#ifndef _FRR_MGMTD_BE_CLIENT_H_ +#define _FRR_MGMTD_BE_CLIENT_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include "mgmtd/mgmt_defines.h" + +/*************************************************************** + * Client IDs + ***************************************************************/ + +/* + * Add enum value for each supported component, wrap with + * #ifdef HAVE_COMPONENT + */ +enum mgmt_be_client_id { + MGMTD_BE_CLIENT_ID_MIN = 0, + MGMTD_BE_CLIENT_ID_INIT = -1, +#if 0 /* example */ +#ifdef HAVE_STATICD + MGMTD_BE_CLIENT_ID_STATICD, +#endif +#endif + MGMTD_BE_CLIENT_ID_MAX +}; + +#define FOREACH_MGMTD_BE_CLIENT_ID(id) \ + for ((id) = MGMTD_BE_CLIENT_ID_MIN; \ + (id) < MGMTD_BE_CLIENT_ID_MAX; (id)++) + +/*************************************************************** + * Constants + ***************************************************************/ + +#define MGMTD_BE_CLIENT_ERROR_STRING_MAX_LEN 32 + +#define MGMTD_BE_DEFAULT_CONN_RETRY_INTVL_SEC 5 + +#define MGMTD_BE_MSG_PROC_DELAY_USEC 10 +#define MGMTD_BE_MAX_NUM_MSG_PROC 500 + +#define MGMTD_BE_MSG_WRITE_DELAY_MSEC 1 +#define MGMTD_BE_MAX_NUM_MSG_WRITE 1000 + +#define GMGD_BE_MAX_NUM_REQ_ITEMS 64 + +#define MGMTD_BE_MSG_MAX_LEN 16384 + +#define MGMTD_SOCKET_BE_SEND_BUF_SIZE 65535 +#define MGMTD_SOCKET_BE_RECV_BUF_SIZE MGMTD_SOCKET_BE_SEND_BUF_SIZE + +#define MGMTD_MAX_CFG_CHANGES_IN_BATCH \ + ((10 * MGMTD_BE_MSG_MAX_LEN) / \ + (MGMTD_MAX_XPATH_LEN + MGMTD_MAX_YANG_VALUE_LEN)) + +/* + * MGMTD_BE_MSG_MAX_LEN must be used 80% + * since there is overhead of google protobuf + * that gets added to sent message + */ +#define MGMTD_BE_CFGDATA_PACKING_EFFICIENCY 0.8 +#define MGMTD_BE_CFGDATA_MAX_MSG_LEN \ + (MGMTD_BE_MSG_MAX_LEN * MGMTD_BE_CFGDATA_PACKING_EFFICIENCY) + +#define MGMTD_BE_MAX_BATCH_IDS_IN_REQ \ + (MGMTD_BE_MSG_MAX_LEN - 128) / sizeof(uint64_t) + +#define MGMTD_BE_CONTAINER_NODE_VAL "<<container>>" + +/*************************************************************** + * Data-structures + ***************************************************************/ + +#define MGMTD_BE_MAX_CLIENTS_PER_XPATH_REG 32 + +struct mgmt_be_msg_hdr { + uint16_t marker; + uint16_t len; /* Includes header */ +}; +#define MGMTD_BE_MSG_HDR_LEN sizeof(struct mgmt_be_msg_hdr) +#define MGMTD_BE_MSG_MARKER 0xfeed + +struct mgmt_be_msg { + struct mgmt_be_msg_hdr hdr; + uint8_t payload[]; +}; + +struct mgmt_be_client_txn_ctx { + uintptr_t *user_ctx; +}; + +/* + * All the client-specific information this library needs to + * initialize itself, setup connection with MGMTD BackEnd interface + * and carry on all required procedures appropriately. + * + * BackEnd clients need to initialise a instance of this structure + * with appropriate data and pass it while calling the API + * to initialize the library (See mgmt_be_client_lib_init for + * more details). + */ +struct mgmt_be_client_params { + char name[MGMTD_CLIENT_NAME_MAX_LEN]; + uintptr_t user_data; + unsigned long conn_retry_intvl_sec; + + void (*client_connect_notify)(uintptr_t lib_hndl, + uintptr_t usr_data, + bool connected); + + void (*client_subscribe_notify)( + uintptr_t lib_hndl, uintptr_t usr_data, + struct nb_yang_xpath **xpath, + enum mgmt_result subscribe_result[], int num_paths); + + void (*txn_notify)( + uintptr_t lib_hndl, uintptr_t usr_data, + struct mgmt_be_client_txn_ctx *txn_ctx, bool destroyed); + + enum mgmt_result (*data_validate)( + uintptr_t lib_hndl, uintptr_t usr_data, + struct mgmt_be_client_txn_ctx *txn_ctx, + struct nb_yang_xpath *xpath, struct nb_yang_value *data, + bool delete, char *error_if_any); + + enum mgmt_result (*data_apply)( + uintptr_t lib_hndl, uintptr_t usr_data, + struct mgmt_be_client_txn_ctx *txn_ctx, + struct nb_yang_xpath *xpath, struct nb_yang_value *data, + bool delete); + + enum mgmt_result (*get_data_elem)( + uintptr_t lib_hndl, uintptr_t usr_data, + struct mgmt_be_client_txn_ctx *txn_ctx, + struct nb_yang_xpath *xpath, struct nb_yang_xpath_elem *elem); + + enum mgmt_result (*get_data)( + uintptr_t lib_hndl, uintptr_t usr_data, + struct mgmt_be_client_txn_ctx *txn_ctx, + struct nb_yang_xpath *xpath, bool keys_only, + struct nb_yang_xpath_elem **elems, int *num_elems, + int *next_key); + + enum mgmt_result (*get_next_data)( + uintptr_t lib_hndl, uintptr_t usr_data, + struct mgmt_be_client_txn_ctx *txn_ctx, + struct nb_yang_xpath *xpath, bool keys_only, + struct nb_yang_xpath_elem **elems, int *num_elems); +}; + +/*************************************************************** + * Global data exported + ***************************************************************/ + +extern const char *mgmt_be_client_names[MGMTD_BE_CLIENT_ID_MAX + 1]; + +static inline const char *mgmt_be_client_id2name(enum mgmt_be_client_id id) +{ + if (id > MGMTD_BE_CLIENT_ID_MAX) + id = MGMTD_BE_CLIENT_ID_MAX; + return mgmt_be_client_names[id]; +} + +static inline enum mgmt_be_client_id +mgmt_be_client_name2id(const char *name) +{ + enum mgmt_be_client_id id; + + FOREACH_MGMTD_BE_CLIENT_ID (id) { + if (!strncmp(mgmt_be_client_names[id], name, + MGMTD_CLIENT_NAME_MAX_LEN)) + return id; + } + + return MGMTD_BE_CLIENT_ID_MAX; +} + +/*************************************************************** + * API prototypes + ***************************************************************/ + +/* + * Initialize library and try connecting with MGMTD. + * + * params + * Backend client parameters. + * + * master_thread + * Thread master. + * + * Returns: + * Backend client lib handler (nothing but address of mgmt_be_client_ctx) + */ +extern uintptr_t +mgmt_be_client_lib_init(struct mgmt_be_client_params *params, + struct thread_master *master_thread); + +/* + * Subscribe with MGMTD for one or more YANG subtree(s). + * + * lib_hndl + * Client library handler. + * + * reg_yang_xpaths + * Yang xpath(s) that needs to be subscribed to. + * + * num_xpaths + * Number of xpaths + * + * Returns: + * MGMTD_SUCCESS on success, MGMTD_* otherwise. + */ +extern enum mgmt_result mgmt_be_subscribe_yang_data(uintptr_t lib_hndl, + char **reg_yang_xpaths, + int num_xpaths); + +/* + * Send one or more YANG notifications to MGMTD daemon. + * + * lib_hndl + * Client library handler. + * + * data_elems + * Yang data elements from data tree. + * + * num_elems + * Number of data elements. + * + * Returns: + * MGMTD_SUCCESS on success, MGMTD_* otherwise. + */ +extern enum mgmt_result +mgmt_be_send_yang_notify(uintptr_t lib_hndl, Mgmtd__YangData **data_elems, + int num_elems); + +/* + * Un-subscribe with MGMTD for one or more YANG subtree(s). + * + * lib_hndl + * Client library handler. + * + * reg_yang_xpaths + * Yang xpath(s) that needs to be un-subscribed from. + * + * num_reg_xpaths + * Number of subscribed xpaths + * + * Returns: + * MGMTD_SUCCESS on success, MGMTD_* otherwise. + */ +enum mgmt_result mgmt_be_unsubscribe_yang_data(uintptr_t lib_hndl, + char **reg_yang_xpaths, + int num_reg_xpaths); + +/* + * Destroy library and cleanup everything. + */ +extern void mgmt_be_client_lib_destroy(uintptr_t lib_hndl); + +#ifdef __cplusplus +} +#endif + +#endif /* _FRR_MGMTD_BE_CLIENT_H_ */ diff --git a/lib/northbound.c b/lib/northbound.c index 0d4c3f0f0..b9b840a60 100644 --- a/lib/northbound.c +++ b/lib/northbound.c @@ -93,7 +93,9 @@ static int nb_node_new_cb(const struct lysc_node *snode, void *arg) { struct nb_node *nb_node; struct lysc_node *sparent, *sparent_list; + struct frr_yang_module_info *module; + module = (struct frr_yang_module_info *)arg; nb_node = XCALLOC(MTYPE_NB_NODE, sizeof(*nb_node)); yang_snode_get_path(snode, YANG_PATH_DATA, nb_node->xpath, sizeof(nb_node->xpath)); @@ -128,6 +130,9 @@ static int nb_node_new_cb(const struct lysc_node *snode, void *arg) assert(snode->priv == NULL); ((struct lysc_node *)snode)->priv = nb_node; + if (module && module->ignore_cbs) + SET_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS); + return YANG_ITER_CONTINUE; } @@ -230,6 +235,9 @@ static unsigned int nb_node_validate_cbs(const struct nb_node *nb_node) { unsigned int error = 0; + if (CHECK_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS)) + return error; + error += nb_node_validate_cb(nb_node, NB_OP_CREATE, !!nb_node->cbs.create, false); error += nb_node_validate_cb(nb_node, NB_OP_MODIFY, @@ -297,6 +305,8 @@ struct nb_config *nb_config_new(struct lyd_node *dnode) config->dnode = yang_dnode_new(ly_native_ctx, true); config->version = 0; + RB_INIT(nb_config_cbs, &config->cfg_chgs); + return config; } @@ -304,6 +314,7 @@ void nb_config_free(struct nb_config *config) { if (config->dnode) yang_dnode_free(config->dnode); + nb_config_diff_del_changes(&config->cfg_chgs); XFREE(MTYPE_NB_CONFIG, config); } @@ -315,6 +326,8 @@ struct nb_config *nb_config_dup(const struct nb_config *config) dup->dnode = yang_dnode_dup(config->dnode); dup->version = config->version; + RB_INIT(nb_config_cbs, &dup->cfg_chgs); + return dup; } @@ -405,7 +418,7 @@ static void nb_config_diff_add_change(struct nb_config_cbs *changes, RB_INSERT(nb_config_cbs, changes, &change->cb); } -static void nb_config_diff_del_changes(struct nb_config_cbs *changes) +void nb_config_diff_del_changes(struct nb_config_cbs *changes) { while (!RB_EMPTY(nb_config_cbs, changes)) { struct nb_config_change *change; @@ -422,8 +435,8 @@ static void nb_config_diff_del_changes(struct nb_config_cbs *changes) * configurations. Given a new subtree, calculate all new YANG data nodes, * excluding default leafs and leaf-lists. This is a recursive function. */ -static void nb_config_diff_created(const struct lyd_node *dnode, uint32_t *seq, - struct nb_config_cbs *changes) +void nb_config_diff_created(const struct lyd_node *dnode, uint32_t *seq, + struct nb_config_cbs *changes) { enum nb_operation operation; struct lyd_node *child; @@ -525,10 +538,16 @@ static inline void nb_config_diff_dnode_log(const char *context, } #endif -/* Calculate the delta between two different configurations. */ -static void nb_config_diff(const struct nb_config *config1, - const struct nb_config *config2, - struct nb_config_cbs *changes) +/* + * Calculate the delta between two different configurations. + * + * NOTE: 'config1' is the reference DB, while 'config2' is + * the DB being compared against 'config1'. Typically 'config1' + * should be the Running DB and 'config2' is the Candidate DB. + */ +void nb_config_diff(const struct nb_config *config1, + const struct nb_config *config2, + struct nb_config_cbs *changes) { struct lyd_node *diff = NULL; const struct lyd_node *root, *dnode; @@ -734,6 +753,169 @@ int nb_candidate_edit(struct nb_config *candidate, return NB_OK; } +static void nb_update_candidate_changes(struct nb_config *candidate, + struct nb_cfg_change *change, + uint32_t *seq) +{ + enum nb_operation oper = change->operation; + char *xpath = change->xpath; + struct lyd_node *root = NULL; + struct lyd_node *dnode; + struct nb_config_cbs *cfg_chgs = &candidate->cfg_chgs; + int op; + + switch (oper) { + case NB_OP_CREATE: + case NB_OP_MODIFY: + root = yang_dnode_get(candidate->dnode, xpath); + break; + case NB_OP_DESTROY: + root = yang_dnode_get(running_config->dnode, xpath); + /* code */ + break; + case NB_OP_MOVE: + case NB_OP_PRE_VALIDATE: + case NB_OP_APPLY_FINISH: + case NB_OP_GET_ELEM: + case NB_OP_GET_NEXT: + case NB_OP_GET_KEYS: + case NB_OP_LOOKUP_ENTRY: + case NB_OP_RPC: + break; + default: + assert(!"non-enum value, invalid"); + } + + if (!root) + return; + + LYD_TREE_DFS_BEGIN (root, dnode) { + op = nb_lyd_diff_get_op(dnode); + switch (op) { + case 'c': + nb_config_diff_created(dnode, seq, cfg_chgs); + LYD_TREE_DFS_continue = 1; + break; + case 'd': + nb_config_diff_deleted(dnode, seq, cfg_chgs); + LYD_TREE_DFS_continue = 1; + break; + case 'r': + nb_config_diff_add_change(cfg_chgs, NB_OP_MODIFY, seq, + dnode); + break; + default: + break; + } + LYD_TREE_DFS_END(root, dnode); + } +} + +static bool nb_is_operation_allowed(struct nb_node *nb_node, + struct nb_cfg_change *change) +{ + enum nb_operation oper = change->operation; + + if (lysc_is_key(nb_node->snode)) { + if (oper == NB_OP_MODIFY || oper == NB_OP_DESTROY) + return false; + } + return true; +} + +void nb_candidate_edit_config_changes( + struct nb_config *candidate_config, struct nb_cfg_change cfg_changes[], + size_t num_cfg_changes, const char *xpath_base, const char *curr_xpath, + int xpath_index, char *err_buf, int err_bufsize, bool *error) +{ + uint32_t seq = 0; + + if (error) + *error = false; + + if (xpath_base == NULL) + xpath_base = ""; + + /* Edit candidate configuration. */ + for (size_t i = 0; i < num_cfg_changes; i++) { + struct nb_cfg_change *change = &cfg_changes[i]; + struct nb_node *nb_node; + char xpath[XPATH_MAXLEN]; + struct yang_data *data; + int ret; + + /* Handle relative XPaths. */ + memset(xpath, 0, sizeof(xpath)); + if (xpath_index > 0 + && (xpath_base[0] == '.' || change->xpath[0] == '.')) + strlcpy(xpath, curr_xpath, sizeof(xpath)); + if (xpath_base[0]) { + if (xpath_base[0] == '.') + strlcat(xpath, xpath_base + 1, sizeof(xpath)); + else + strlcat(xpath, xpath_base, sizeof(xpath)); + } + if (change->xpath[0] == '.') + strlcat(xpath, change->xpath + 1, sizeof(xpath)); + else + strlcpy(xpath, change->xpath, sizeof(xpath)); + + /* Find the northbound node associated to the data path. */ + nb_node = nb_node_find(xpath); + if (!nb_node) { + flog_warn(EC_LIB_YANG_UNKNOWN_DATA_PATH, + "%s: unknown data path: %s", __func__, xpath); + if (error) + *error = true; + continue; + } + /* Find if the node to be edited is not a key node */ + if (!nb_is_operation_allowed(nb_node, change)) { + zlog_err(" Xpath %s points to key node", xpath); + if (error) + *error = true; + break; + } + + /* If the value is not set, get the default if it exists. */ + if (change->value == NULL) + change->value = yang_snode_get_default(nb_node->snode); + data = yang_data_new(xpath, change->value); + + /* + * Ignore "not found" errors when editing the candidate + * configuration. + */ + ret = nb_candidate_edit(candidate_config, nb_node, + change->operation, xpath, NULL, data); + yang_data_free(data); + if (ret != NB_OK && ret != NB_ERR_NOT_FOUND) { + flog_warn( + EC_LIB_NB_CANDIDATE_EDIT_ERROR, + "%s: failed to edit candidate configuration: operation [%s] xpath [%s]", + __func__, nb_operation_name(change->operation), + xpath); + if (error) + *error = true; + continue; + } + nb_update_candidate_changes(candidate_config, change, &seq); + } + + if (error && *error) { + char buf[BUFSIZ]; + + /* + * Failure to edit the candidate configuration should never + * happen in practice, unless there's a bug in the code. When + * that happens, log the error but otherwise ignore it. + */ + snprintf(err_buf, err_bufsize, + "%% Failed to edit configuration.\n\n%s", + yang_print_errors(ly_native_ctx, buf, sizeof(buf))); + } +} + bool nb_candidate_needs_update(const struct nb_config *candidate) { if (candidate->version < running_config->version) @@ -761,8 +943,8 @@ int nb_candidate_update(struct nb_config *candidate) * WARNING: lyd_validate() can change the configuration as part of the * validation process. */ -static int nb_candidate_validate_yang(struct nb_config *candidate, char *errmsg, - size_t errmsg_len) +int nb_candidate_validate_yang(struct nb_config *candidate, char *errmsg, + size_t errmsg_len) { if (lyd_validate_all(&candidate->dnode, ly_native_ctx, LYD_VALIDATE_NO_STATE, NULL) @@ -775,10 +957,10 @@ static int nb_candidate_validate_yang(struct nb_config *candidate, char *errmsg, } /* Perform code-level validation using the northbound callbacks. */ -static int nb_candidate_validate_code(struct nb_context *context, - struct nb_config *candidate, - struct nb_config_cbs *changes, - char *errmsg, size_t errmsg_len) +int nb_candidate_validate_code(struct nb_context *context, + struct nb_config *candidate, + struct nb_config_cbs *changes, char *errmsg, + size_t errmsg_len) { struct nb_config_cb *cb; struct lyd_node *root, *child; @@ -816,6 +998,21 @@ static int nb_candidate_validate_code(struct nb_context *context, return NB_OK; } +int nb_candidate_diff_and_validate_yang(struct nb_context *context, + struct nb_config *candidate, + struct nb_config_cbs *changes, + char *errmsg, size_t errmsg_len) +{ + if (nb_candidate_validate_yang(candidate, errmsg, sizeof(errmsg_len)) + != NB_OK) + return NB_ERR_VALIDATION; + + RB_INIT(nb_config_cbs, changes); + nb_config_diff(running_config, candidate, changes); + + return NB_OK; +} + int nb_candidate_validate(struct nb_context *context, struct nb_config *candidate, char *errmsg, size_t errmsg_len) @@ -823,11 +1020,11 @@ int nb_candidate_validate(struct nb_context *context, struct nb_config_cbs changes; int ret; - if (nb_candidate_validate_yang(candidate, errmsg, errmsg_len) != NB_OK) - return NB_ERR_VALIDATION; + ret = nb_candidate_diff_and_validate_yang(context, candidate, &changes, + errmsg, errmsg_len); + if (ret != NB_OK) + return ret; - RB_INIT(nb_config_cbs, &changes); - nb_config_diff(running_config, candidate, &changes); ret = nb_candidate_validate_code(context, candidate, &changes, errmsg, errmsg_len); nb_config_diff_del_changes(&changes); @@ -839,12 +1036,14 @@ int nb_candidate_commit_prepare(struct nb_context context, struct nb_config *candidate, const char *comment, struct nb_transaction **transaction, + bool skip_validate, bool ignore_zero_change, char *errmsg, size_t errmsg_len) { struct nb_config_cbs changes; - if (nb_candidate_validate_yang(candidate, errmsg, errmsg_len) - != NB_OK) { + if (!skip_validate + && nb_candidate_validate_yang(candidate, errmsg, errmsg_len) + != NB_OK) { flog_warn(EC_LIB_NB_CANDIDATE_INVALID, "%s: failed to validate candidate configuration", __func__); @@ -853,15 +1052,17 @@ int nb_candidate_commit_prepare(struct nb_context context, RB_INIT(nb_config_cbs, &changes); nb_config_diff(running_config, candidate, &changes); - if (RB_EMPTY(nb_config_cbs, &changes)) { + if (!ignore_zero_change && RB_EMPTY(nb_config_cbs, &changes)) { snprintf( errmsg, errmsg_len, "No changes to apply were found during preparation phase"); return NB_ERR_NO_CHANGES; } - if (nb_candidate_validate_code(&context, candidate, &changes, errmsg, - errmsg_len) != NB_OK) { + if (!skip_validate + && nb_candidate_validate_code(&context, candidate, &changes, errmsg, + errmsg_len) + != NB_OK) { flog_warn(EC_LIB_NB_CANDIDATE_INVALID, "%s: failed to validate candidate configuration", __func__); @@ -869,8 +1070,12 @@ int nb_candidate_commit_prepare(struct nb_context context, return NB_ERR_VALIDATION; } - *transaction = nb_transaction_new(context, candidate, &changes, comment, - errmsg, errmsg_len); + /* + * Re-use an existing transaction if provided. Else allocate a new one. + */ + if (!*transaction) + *transaction = nb_transaction_new(context, candidate, &changes, + comment, errmsg, errmsg_len); if (*transaction == NULL) { flog_warn(EC_LIB_NB_TRANSACTION_CREATION_FAILED, "%s: failed to create transaction: %s", __func__, @@ -921,7 +1126,8 @@ int nb_candidate_commit(struct nb_context context, struct nb_config *candidate, int ret; ret = nb_candidate_commit_prepare(context, candidate, comment, - &transaction, errmsg, errmsg_len); + &transaction, false, false, errmsg, + errmsg_len); /* * Apply the changes if the preparation phase succeeded. Otherwise abort * the transaction. @@ -1015,6 +1221,8 @@ static int nb_callback_create(struct nb_context *context, bool unexpected_error = false; int ret; + assert(!CHECK_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS)); + nb_log_config_callback(event, NB_OP_CREATE, dnode); args.context = context; @@ -1064,6 +1272,8 @@ static int nb_callback_modify(struct nb_context *context, bool unexpected_error = false; int ret; + assert(!CHECK_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS)); + nb_log_config_callback(event, NB_OP_MODIFY, dnode); args.context = context; @@ -1113,6 +1323,8 @@ static int nb_callback_destroy(struct nb_context *context, bool unexpected_error = false; int ret; + assert(!CHECK_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS)); + nb_log_config_callback(event, NB_OP_DESTROY, dnode); args.context = context; @@ -1156,6 +1368,8 @@ static int nb_callback_move(struct nb_context *context, bool unexpected_error = false; int ret; + assert(!CHECK_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS)); + nb_log_config_callback(event, NB_OP_MOVE, dnode); args.context = context; @@ -1199,6 +1413,9 @@ static int nb_callback_pre_validate(struct nb_context *context, bool unexpected_error = false; int ret; + if (CHECK_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS)) + return 0; + nb_log_config_callback(NB_EV_VALIDATE, NB_OP_PRE_VALIDATE, dnode); args.dnode = dnode; @@ -1230,6 +1447,9 @@ static void nb_callback_apply_finish(struct nb_context *context, { struct nb_cb_apply_finish_args args = {}; + if (CHECK_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS)) + return; + nb_log_config_callback(NB_EV_APPLY, NB_OP_APPLY_FINISH, dnode); args.context = context; @@ -1245,6 +1465,9 @@ struct yang_data *nb_callback_get_elem(const struct nb_node *nb_node, { struct nb_cb_get_elem_args args = {}; + if (CHECK_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS)) + return NULL; + DEBUGD(&nb_dbg_cbs_state, "northbound callback (get_elem): xpath [%s] list_entry [%p]", xpath, list_entry); @@ -1260,6 +1483,9 @@ const void *nb_callback_get_next(const struct nb_node *nb_node, { struct nb_cb_get_next_args args = {}; + if (CHECK_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS)) + return NULL; + DEBUGD(&nb_dbg_cbs_state, "northbound callback (get_next): node [%s] parent_list_entry [%p] list_entry [%p]", nb_node->xpath, parent_list_entry, list_entry); @@ -1274,6 +1500,9 @@ int nb_callback_get_keys(const struct nb_node *nb_node, const void *list_entry, { struct nb_cb_get_keys_args args = {}; + if (CHECK_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS)) + return 0; + DEBUGD(&nb_dbg_cbs_state, "northbound callback (get_keys): node [%s] list_entry [%p]", nb_node->xpath, list_entry); @@ -1289,6 +1518,9 @@ const void *nb_callback_lookup_entry(const struct nb_node *nb_node, { struct nb_cb_lookup_entry_args args = {}; + if (CHECK_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS)) + return NULL; + DEBUGD(&nb_dbg_cbs_state, "northbound callback (lookup_entry): node [%s] parent_list_entry [%p]", nb_node->xpath, parent_list_entry); @@ -1304,6 +1536,9 @@ int nb_callback_rpc(const struct nb_node *nb_node, const char *xpath, { struct nb_cb_rpc_args args = {}; + if (CHECK_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS)) + return 0; + DEBUGD(&nb_dbg_cbs_rpc, "northbound RPC: %s", xpath); args.xpath = xpath; @@ -1330,6 +1565,9 @@ static int nb_callback_configuration(struct nb_context *context, union nb_resource *resource; int ret = NB_ERR; + if (CHECK_FLAG(nb_node->flags, F_NB_NODE_IGNORE_CBS)) + return NB_OK; + if (event == NB_EV_VALIDATE) resource = NULL; else @@ -1733,7 +1971,7 @@ static int nb_oper_data_iter_list(const struct nb_node *nb_node, /* Iterate over all list entries. */ do { const struct lysc_node_leaf *skey; - struct yang_list_keys list_keys; + struct yang_list_keys list_keys = {}; char xpath[XPATH_MAXLEN * 2]; int ret; @@ -2391,6 +2629,8 @@ const char *nb_client_name(enum nb_client client) return "Pcep"; case NB_CLIENT_MGMTD_SERVER: return "MGMTD Server"; + case NB_CLIENT_MGMTD_BE: + return "MGMT Backend"; case NB_CLIENT_NONE: return "None"; } @@ -2400,6 +2640,10 @@ const char *nb_client_name(enum nb_client client) static void nb_load_callbacks(const struct frr_yang_module_info *module) { + + if (module->ignore_cbs) + return; + for (size_t i = 0; module->nodes[i].xpath; i++) { struct nb_node *nb_node; uint32_t priority; @@ -2473,7 +2717,8 @@ void nb_init(struct thread_master *tm, /* Initialize the compiled nodes with northbound data */ for (size_t i = 0; i < nmodules; i++) { - yang_snodes_iterate(loaded[i]->info, nb_node_new_cb, 0, NULL); + yang_snodes_iterate(loaded[i]->info, nb_node_new_cb, 0, + (void *)modules[i]); nb_load_callbacks(modules[i]); } diff --git a/lib/northbound.h b/lib/northbound.h index 572d669dc..b63b216b0 100644 --- a/lib/northbound.h +++ b/lib/northbound.h @@ -22,6 +22,39 @@ extern "C" { struct vty; struct debug; +struct nb_yang_xpath_tag { + uint32_t ns; + uint32_t id; +}; + +struct nb_yang_value { + struct lyd_value value; + LY_DATA_TYPE value_type; + uint8_t value_flags; +}; + +struct nb_yang_xpath_elem { + struct nb_yang_xpath_tag tag; + struct nb_yang_value val; +}; + +#define NB_MAX_NUM_KEYS UINT8_MAX +#define NB_MAX_NUM_XPATH_TAGS UINT8_MAX + +struct nb_yang_xpath { + uint8_t length; + struct { + uint8_t num_keys; + struct nb_yang_xpath_elem keys[NB_MAX_NUM_KEYS]; + } tags[NB_MAX_NUM_XPATH_TAGS]; +}; + +#define NB_YANG_XPATH_KEY(__xpath, __indx1, __indx2) \ + ((__xpath->num_tags > __indx1) \ + && (__xpath->tags[__indx1].num_keys > __indx2) \ + ? &__xpath->tags[__indx1].keys[__indx2] \ + : NULL) + /* Northbound events. */ enum nb_event { /* @@ -564,6 +597,8 @@ struct nb_node { #define F_NB_NODE_CONFIG_ONLY 0x01 /* The YANG list doesn't contain key leafs. */ #define F_NB_NODE_KEYLESS_LIST 0x02 +/* Ignore callbacks for this node */ +#define F_NB_NODE_IGNORE_CBS 0x04 /* * HACK: old gcc versions (< 5.x) have a bug that prevents C99 flexible arrays @@ -576,6 +611,12 @@ struct frr_yang_module_info { /* YANG module name. */ const char *name; + /* + * Ignore callbacks for this module. Set this to true to + * load module without any callbacks. + */ + bool ignore_cbs; + /* Northbound callbacks. */ const struct { /* Data path of this YANG node. */ @@ -620,6 +661,7 @@ enum nb_client { NB_CLIENT_GRPC, NB_CLIENT_PCEP, NB_CLIENT_MGMTD_SERVER, + NB_CLIENT_MGMTD_BE, }; /* Northbound context. */ @@ -631,12 +673,6 @@ struct nb_context { const void *user; }; -/* Northbound configuration. */ -struct nb_config { - struct lyd_node *dnode; - uint32_t version; -}; - /* Northbound configuration callback. */ struct nb_config_cb { RB_ENTRY(nb_config_cb) entry; @@ -663,6 +699,13 @@ struct nb_transaction { struct nb_config_cbs changes; }; +/* Northbound configuration. */ +struct nb_config { + struct lyd_node *dnode; + uint32_t version; + struct nb_config_cbs cfg_chgs; +}; + /* Callback function used by nb_oper_data_iterate(). */ typedef int (*nb_oper_data_cb)(const struct lysc_node *snode, struct yang_translator *translator, @@ -832,6 +875,9 @@ extern int nb_candidate_edit(struct nb_config *candidate, const struct yang_data *previous, const struct yang_data *data); +extern void nb_config_diff_created(const struct lyd_node *dnode, uint32_t *seq, + struct nb_config_cbs *changes); + /* * Check if a candidate configuration is outdated and needs to be updated. * @@ -843,6 +889,30 @@ extern int nb_candidate_edit(struct nb_config *candidate, */ extern bool nb_candidate_needs_update(const struct nb_config *candidate); +extern void nb_candidate_edit_config_changes( + struct nb_config *candidate_config, struct nb_cfg_change cfg_changes[], + size_t num_cfg_changes, const char *xpath_base, const char *curr_xpath, + int xpath_index, char *err_buf, int err_bufsize, bool *error); + +extern void nb_config_diff_del_changes(struct nb_config_cbs *changes); + +extern int nb_candidate_diff_and_validate_yang(struct nb_context *context, + struct nb_config *candidate, + struct nb_config_cbs *changes, + char *errmsg, size_t errmsg_len); + +extern void nb_config_diff(const struct nb_config *reference, + const struct nb_config *incremental, + struct nb_config_cbs *changes); + +extern int nb_candidate_validate_yang(struct nb_config *candidate, char *errmsg, + size_t errmsg_len); + +extern int nb_candidate_validate_code(struct nb_context *context, + struct nb_config *candidate, + struct nb_config_cbs *changes, + char *errmsg, size_t errmsg_len); + /* * Update a candidate configuration by rebasing the changes on top of the latest * running configuration. Resolve conflicts automatically by giving preference @@ -922,7 +992,9 @@ extern int nb_candidate_commit_prepare(struct nb_context context, struct nb_config *candidate, const char *comment, struct nb_transaction **transaction, - char *errmsg, size_t errmsg_len); + bool skip_validate, + bool ignore_zero_change, char *errmsg, + size_t errmsg_len); /* * Abort a previously created configuration transaction, releasing all resources diff --git a/lib/northbound_cli.c b/lib/northbound_cli.c index 27db2059b..2ead14cc3 100644 --- a/lib/northbound_cli.c +++ b/lib/northbound_cli.c @@ -141,79 +141,21 @@ static int nb_cli_apply_changes_internal(struct vty *vty, bool clear_pending) { bool error = false; - - if (xpath_base == NULL) - xpath_base = ""; + char buf[BUFSIZ]; VTY_CHECK_XPATH; - /* Edit candidate configuration. */ - for (size_t i = 0; i < vty->num_cfg_changes; i++) { - struct nb_cfg_change *change = &vty->cfg_changes[i]; - struct nb_node *nb_node; - char xpath[XPATH_MAXLEN]; - struct yang_data *data; - int ret; - - /* Handle relative XPaths. */ - memset(xpath, 0, sizeof(xpath)); - if (vty->xpath_index > 0 - && (xpath_base[0] == '.' || change->xpath[0] == '.')) - strlcpy(xpath, VTY_CURR_XPATH, sizeof(xpath)); - if (xpath_base[0]) { - if (xpath_base[0] == '.') - strlcat(xpath, xpath_base + 1, sizeof(xpath)); - else - strlcat(xpath, xpath_base, sizeof(xpath)); - } - if (change->xpath[0] == '.') - strlcat(xpath, change->xpath + 1, sizeof(xpath)); - else - strlcpy(xpath, change->xpath, sizeof(xpath)); - - /* Find the northbound node associated to the data path. */ - nb_node = nb_node_find(xpath); - if (!nb_node) { - flog_warn(EC_LIB_YANG_UNKNOWN_DATA_PATH, - "%s: unknown data path: %s", __func__, xpath); - error = true; - continue; - } - - /* If the value is not set, get the default if it exists. */ - if (change->value == NULL) - change->value = yang_snode_get_default(nb_node->snode); - data = yang_data_new(xpath, change->value); - - /* - * Ignore "not found" errors when editing the candidate - * configuration. - */ - ret = nb_candidate_edit(vty->candidate_config, nb_node, - change->operation, xpath, NULL, data); - yang_data_free(data); - if (ret != NB_OK && ret != NB_ERR_NOT_FOUND) { - flog_warn( - EC_LIB_NB_CANDIDATE_EDIT_ERROR, - "%s: failed to edit candidate configuration: operation [%s] xpath [%s]", - __func__, nb_operation_name(change->operation), - xpath); - error = true; - continue; - } - } - + nb_candidate_edit_config_changes( + vty->candidate_config, vty->cfg_changes, vty->num_cfg_changes, + xpath_base, VTY_CURR_XPATH, vty->xpath_index, buf, sizeof(buf), + &error); if (error) { - char buf[BUFSIZ]; - /* * Failure to edit the candidate configuration should never * happen in practice, unless there's a bug in the code. When * that happens, log the error but otherwise ignore it. */ - vty_out(vty, "%% Failed to edit configuration.\n\n"); - vty_out(vty, "%s", - yang_print_errors(ly_native_ctx, buf, sizeof(buf))); + vty_out(vty, "%s", buf); } /* diff --git a/lib/northbound_confd.c b/lib/northbound_confd.c index 2b57ff270..ee1956851 100644 --- a/lib/northbound_confd.c +++ b/lib/northbound_confd.c @@ -312,7 +312,8 @@ static void frr_confd_cdb_read_cb_prepare(int fd, int *subp, int reslen) transaction = NULL; context.client = NB_CLIENT_CONFD; ret = nb_candidate_commit_prepare(context, candidate, NULL, - &transaction, errmsg, sizeof(errmsg)); + &transaction, false, false, errmsg, + sizeof(errmsg)); if (ret != NB_OK && ret != NB_ERR_NO_CHANGES) { enum confd_errcode errcode; diff --git a/lib/northbound_grpc.cpp b/lib/northbound_grpc.cpp index 1459146ea..274a0ca45 100644 --- a/lib/northbound_grpc.cpp +++ b/lib/northbound_grpc.cpp @@ -825,7 +825,8 @@ HandleUnaryCommit(UnaryRpcState<frr::CommitRequest, frr::CommitResponse> *tag) grpc_debug("`-> Performing PREPARE"); ret = nb_candidate_commit_prepare( context, candidate->config, comment.c_str(), - &candidate->transaction, errmsg, sizeof(errmsg)); + &candidate->transaction, false, false, errmsg, + sizeof(errmsg)); break; case frr::CommitRequest::ABORT: grpc_debug("`-> Performing ABORT"); diff --git a/lib/northbound_sysrepo.c b/lib/northbound_sysrepo.c index 096414ff2..337fb690d 100644 --- a/lib/northbound_sysrepo.c +++ b/lib/northbound_sysrepo.c @@ -269,7 +269,8 @@ static int frr_sr_config_change_cb_prepare(sr_session_ctx_t *session, * required to apply them. */ ret = nb_candidate_commit_prepare(context, candidate, NULL, - &transaction, errmsg, sizeof(errmsg)); + &transaction, false, false, errmsg, + sizeof(errmsg)); if (ret != NB_OK && ret != NB_ERR_NO_CHANGES) flog_warn( EC_LIB_LIBSYSREPO, diff --git a/lib/subdir.am b/lib/subdir.am index eb7cbc4f4..84a35417e 100644 --- a/lib/subdir.am +++ b/lib/subdir.am @@ -64,6 +64,7 @@ lib_libfrr_la_SOURCES = \ lib/log_vty.c \ lib/md5.c \ lib/memory.c \ + lib/mgmt_be_client.c \ lib/mgmt_fe_client.c \ lib/mlag.c \ lib/module.c \ @@ -241,6 +242,7 @@ pkginclude_HEADERS += \ lib/md5.h \ lib/memory.h \ lib/mgmt.pb-c.h \ + lib/mgmt_be_client.h \ lib/mgmt_fe_client.h \ lib/mgmt_pb.h \ lib/module.h \ diff --git a/mgmtd/mgmt.c b/mgmtd/mgmt.c index dd0929e0c..34db900af 100644 --- a/mgmtd/mgmt.c +++ b/mgmtd/mgmt.c @@ -8,6 +8,8 @@ #include <zebra.h> #include "mgmtd/mgmt.h" +#include "mgmtd/mgmt_be_server.h" +#include "mgmtd/mgmt_be_adapter.h" #include "mgmtd/mgmt_fe_server.h" #include "mgmtd/mgmt_fe_adapter.h" #include "mgmtd/mgmt_ds.h" @@ -47,9 +49,15 @@ void mgmt_init(void) /* Initialize datastores */ mgmt_ds_init(mm); + /* Initialize the MGMTD Backend Adapter Module */ + mgmt_be_adapter_init(mm->master); + /* Initialize the MGMTD Frontend Adapter Module */ mgmt_fe_adapter_init(mm->master, mm); + /* Start the MGMTD Backend Server for clients to connect */ + mgmt_be_server_init(mm->master); + /* Start the MGMTD Frontend Server for clients to connect */ mgmt_fe_server_init(mm->master); @@ -61,5 +69,7 @@ void mgmt_terminate(void) { mgmt_fe_server_destroy(); mgmt_fe_adapter_destroy(); + mgmt_be_server_destroy(); + mgmt_be_adapter_destroy(); mgmt_ds_destroy(); } diff --git a/mgmtd/mgmt_be_adapter.c b/mgmtd/mgmt_be_adapter.c new file mode 100644 index 000000000..f5385d218 --- /dev/null +++ b/mgmtd/mgmt_be_adapter.c @@ -0,0 +1,1288 @@ +// SPDX-License-Identifier: GPL-2.0-or-later +/* + * MGMTD Backend Client Connection Adapter + * + * Copyright (C) 2021 Vmware, Inc. + * Pushpasis Sarkar <spushpasis@vmware.com> + */ + +#include <zebra.h> +#include "thread.h" +#include "sockopt.h" +#include "network.h" +#include "libfrr.h" +#include "mgmt_pb.h" +#include "mgmtd/mgmt.h" +#include "mgmtd/mgmt_memory.h" +#include "mgmt_be_client.h" +#include "mgmtd/mgmt_be_adapter.h" + +#ifdef REDIRECT_DEBUG_TO_STDERR +#define MGMTD_BE_ADAPTER_DBG(fmt, ...) \ + fprintf(stderr, "%s: " fmt "\n", __func__, ##__VA_ARGS__) +#define MGMTD_BE_ADAPTER_ERR(fmt, ...) \ + fprintf(stderr, "%s: ERROR, " fmt "\n", __func__, ##__VA_ARGS__) +#else /* REDIRECT_DEBUG_TO_STDERR */ +#define MGMTD_BE_ADAPTER_DBG(fmt, ...) \ + do { \ + if (mgmt_debug_be) \ + zlog_debug("%s: " fmt, __func__, ##__VA_ARGS__); \ + } while (0) +#define MGMTD_BE_ADAPTER_ERR(fmt, ...) \ + zlog_err("%s: ERROR: " fmt, __func__, ##__VA_ARGS__) +#endif /* REDIRECT_DEBUG_TO_STDERR */ + +#define FOREACH_ADAPTER_IN_LIST(adapter) \ + frr_each_safe (mgmt_be_adapters, &mgmt_be_adapters, (adapter)) + +/* + * Static mapping of YANG XPath regular expressions and + * the corresponding interested backend clients. + * NOTE: Thiis is a static mapping defined by all MGMTD + * backend client modules (for now, till we develop a + * more dynamic way of creating and updating this map). + * A running map is created by MGMTD in run-time to + * handle real-time mapping of YANG xpaths to one or + * more interested backend client adapters. + * + * Please see xpath_map_reg[] in lib/mgmt_be_client.c + * for the actual map + */ +struct mgmt_be_xpath_map_reg { + const char *xpath_regexp; /* Longest matching regular expression */ + enum mgmt_be_client_id *be_clients; /* clients to notify */ +}; + +struct mgmt_be_xpath_regexp_map { + const char *xpath_regexp; + struct mgmt_be_client_subscr_info be_subscrs; +}; + +struct mgmt_be_get_adapter_config_params { + struct mgmt_be_client_adapter *adapter; + struct nb_config_cbs *cfg_chgs; + uint32_t seq; +}; + +/* + * Static mapping of YANG XPath regular expressions and + * the corresponding interested backend clients. + * NOTE: Thiis is a static mapping defined by all MGMTD + * backend client modules (for now, till we develop a + * more dynamic way of creating and updating this map). + * A running map is created by MGMTD in run-time to + * handle real-time mapping of YANG xpaths to one or + * more interested backend client adapters. + */ +static const struct mgmt_be_xpath_map_reg xpath_static_map_reg[] = { + {.xpath_regexp = "/frr-vrf:lib/*", + .be_clients = + (enum mgmt_be_client_id[]){ +#if 0 +#if HAVE_STATICD + MGMTD_BE_CLIENT_ID_STATICD, +#endif +#endif + MGMTD_BE_CLIENT_ID_MAX}}, + {.xpath_regexp = "/frr-interface:lib/*", + .be_clients = + (enum mgmt_be_client_id[]){ +#if 0 +#if HAVE_STATICD + MGMTD_BE_CLIENT_ID_STATICD, +#endif +#endif + MGMTD_BE_CLIENT_ID_MAX}}, + {.xpath_regexp = + "/frr-routing:routing/control-plane-protocols/control-plane-protocol[type='frr-staticd:staticd'][name='staticd'][vrf='default']/frr-staticd:staticd/*", + + .be_clients = + (enum mgmt_be_client_id[]){ +#if 0 +#if HAVE_STATICD + MGMTD_BE_CLIENT_ID_STATICD, +#endif +#endif + MGMTD_BE_CLIENT_ID_MAX}}, +}; + +#define MGMTD_BE_MAX_NUM_XPATH_MAP 256 +static struct mgmt_be_xpath_regexp_map + mgmt_xpath_map[MGMTD_BE_MAX_NUM_XPATH_MAP]; +static int mgmt_num_xpath_maps; + +static struct thread_master *mgmt_be_adapter_tm; + +static struct mgmt_be_adapters_head mgmt_be_adapters; + +static struct mgmt_be_client_adapter + *mgmt_be_adapters_by_id[MGMTD_BE_CLIENT_ID_MAX]; + +/* Forward declarations */ +static void +mgmt_be_adapter_register_event(struct mgmt_be_client_adapter *adapter, + enum mgmt_be_event event); + +static struct mgmt_be_client_adapter * +mgmt_be_find_adapter_by_fd(int conn_fd) +{ + struct mgmt_be_client_adapter *adapter; + + FOREACH_ADAPTER_IN_LIST (adapter) { + if (adapter->conn_fd == conn_fd) + return adapter; + } + + return NULL; +} + +static struct mgmt_be_client_adapter * +mgmt_be_find_adapter_by_name(const char *name) +{ + struct mgmt_be_client_adapter *adapter; + + FOREACH_ADAPTER_IN_LIST (adapter) { + if (!strncmp(adapter->name, name, sizeof(adapter->name))) + return adapter; + } + + return NULL; +} + +static void +mgmt_be_cleanup_adapters(void) +{ + struct mgmt_be_client_adapter *adapter; + + FOREACH_ADAPTER_IN_LIST (adapter) + mgmt_be_adapter_unlock(&adapter); +} + +static void mgmt_be_xpath_map_init(void) +{ + int indx, num_xpath_maps; + uint16_t indx1; + enum mgmt_be_client_id id; + + MGMTD_BE_ADAPTER_DBG("Init XPath Maps"); + + num_xpath_maps = (int)array_size(xpath_static_map_reg); + for (indx = 0; indx < num_xpath_maps; indx++) { + MGMTD_BE_ADAPTER_DBG(" - XPATH: '%s'", + xpath_static_map_reg[indx].xpath_regexp); + mgmt_xpath_map[indx].xpath_regexp = + xpath_static_map_reg[indx].xpath_regexp; + for (indx1 = 0;; indx1++) { + id = xpath_static_map_reg[indx].be_clients[indx1]; + if (id == MGMTD_BE_CLIENT_ID_MAX) + break; + MGMTD_BE_ADAPTER_DBG(" -- Client: %s Id: %u", + mgmt_be_client_id2name(id), + id); + if (id < MGMTD_BE_CLIENT_ID_MAX) { + mgmt_xpath_map[indx] + .be_subscrs.xpath_subscr[id] + .validate_config = 1; + mgmt_xpath_map[indx] + .be_subscrs.xpath_subscr[id] + .notify_config = 1; + mgmt_xpath_map[indx] + .be_subscrs.xpath_subscr[id] + .own_oper_data = 1; + } + } + } + + mgmt_num_xpath_maps = indx; + MGMTD_BE_ADAPTER_DBG("Total XPath Maps: %u", mgmt_num_xpath_maps); +} + +static int mgmt_be_eval_regexp_match(const char *xpath_regexp, + const char *xpath) +{ + int match_len = 0, re_indx = 0, xp_indx = 0; + int rexp_len, xpath_len; + bool match = true, re_wild = false, xp_wild = false; + bool delim = false, enter_wild_match = false; + char wild_delim = 0; + + rexp_len = strlen(xpath_regexp); + xpath_len = strlen(xpath); + + /* + * Remove the trailing wildcard from the regexp and Xpath. + */ + if (rexp_len && xpath_regexp[rexp_len-1] == '*') + rexp_len--; + if (xpath_len && xpath[xpath_len-1] == '*') + xpath_len--; + + if (!rexp_len || !xpath_len) + return 0; + + for (re_indx = 0, xp_indx = 0; + match && re_indx < rexp_len && xp_indx < xpath_len;) { + match = (xpath_regexp[re_indx] == xpath[xp_indx]); + + /* + * Check if we need to enter wildcard matching. + */ + if (!enter_wild_match && !match && + (xpath_regexp[re_indx] == '*' + || xpath[xp_indx] == '*')) { + /* + * Found wildcard + */ + enter_wild_match = + (xpath_regexp[re_indx-1] == '/' + || xpath_regexp[re_indx-1] == '\'' + || xpath[xp_indx-1] == '/' + || xpath[xp_indx-1] == '\''); + if (enter_wild_match) { + if (xpath_regexp[re_indx] == '*') { + /* + * Begin RE wildcard match. + */ + re_wild = true; + wild_delim = xpath_regexp[re_indx-1]; + } else if (xpath[xp_indx] == '*') { + /* + * Begin XP wildcard match. + */ + xp_wild = true; + wild_delim = xpath[xp_indx-1]; + } + } + } + + /* + * Check if we need to exit wildcard matching. + */ + if (enter_wild_match) { + if (re_wild && xpath[xp_indx] == wild_delim) { + /* + * End RE wildcard matching. + */ + re_wild = false; + if (re_indx < rexp_len-1) + re_indx++; + enter_wild_match = false; + } else if (xp_wild + && xpath_regexp[re_indx] == wild_delim) { + /* + * End XP wildcard matching. + */ + xp_wild = false; + if (xp_indx < xpath_len-1) + xp_indx++; + enter_wild_match = false; + } + } + + match = (xp_wild || re_wild + || xpath_regexp[re_indx] == xpath[xp_indx]); + + /* + * Check if we found a delimiter in both the Xpaths + */ + if ((xpath_regexp[re_indx] == '/' + && xpath[xp_indx] == '/') + || (xpath_regexp[re_indx] == ']' + && xpath[xp_indx] == ']') + || (xpath_regexp[re_indx] == '[' + && xpath[xp_indx] == '[')) { + /* + * Increment the match count if we have a + * new delimiter. + */ + if (match && re_indx && xp_indx && !delim) + match_len++; + delim = true; + } else { + delim = false; + } + + /* + * Proceed to the next character in the RE/XP string as + * necessary. + */ + if (!re_wild) + re_indx++; + if (!xp_wild) + xp_indx++; + } + + /* + * If we finished matching and the last token was a full match + * increment the match count appropriately. + */ + if (match && !delim && + (xpath_regexp[re_indx] == '/' + || xpath_regexp[re_indx] == ']')) + match_len++; + + return match_len; +} + +static void mgmt_be_adapter_disconnect(struct mgmt_be_client_adapter *adapter) +{ + if (adapter->conn_fd >= 0) { + close(adapter->conn_fd); + adapter->conn_fd = -1; + } + + /* + * TODO: Notify about client disconnect for appropriate cleanup + * mgmt_txn_notify_be_adapter_conn(adapter, false); + */ + + if (adapter->id < MGMTD_BE_CLIENT_ID_MAX) { + mgmt_be_adapters_by_id[adapter->id] = NULL; + adapter->id = MGMTD_BE_CLIENT_ID_MAX; + } + + mgmt_be_adapters_del(&mgmt_be_adapters, adapter); + + mgmt_be_adapter_unlock(&adapter); +} + +static void +mgmt_be_adapter_cleanup_old_conn(struct mgmt_be_client_adapter *adapter) +{ + struct mgmt_be_client_adapter *old; + + FOREACH_ADAPTER_IN_LIST (old) { + if (old != adapter + && !strncmp(adapter->name, old->name, sizeof(adapter->name))) { + /* + * We have a Zombie lingering around + */ + MGMTD_BE_ADAPTER_DBG( + "Client '%s' (FD:%d) seems to have reconnected. Removing old connection (FD:%d)!", + adapter->name, adapter->conn_fd, old->conn_fd); + mgmt_be_adapter_disconnect(old); + } + } +} + +static int +mgmt_be_adapter_handle_msg(struct mgmt_be_client_adapter *adapter, + Mgmtd__BeMessage *be_msg) +{ + switch (be_msg->message_case) { + case MGMTD__BE_MESSAGE__MESSAGE_SUBSCR_REQ: + MGMTD_BE_ADAPTER_DBG( + "Got Subscribe Req Msg from '%s' to %sregister %u xpaths", + be_msg->subscr_req->client_name, + !be_msg->subscr_req->subscribe_xpaths + && be_msg->subscr_req->n_xpath_reg + ? "de" + : "", + (uint32_t)be_msg->subscr_req->n_xpath_reg); + + if (strlen(be_msg->subscr_req->client_name)) { + strlcpy(adapter->name, be_msg->subscr_req->client_name, + sizeof(adapter->name)); + adapter->id = mgmt_be_client_name2id(adapter->name); + if (adapter->id >= MGMTD_BE_CLIENT_ID_MAX) { + MGMTD_BE_ADAPTER_ERR( + "Unable to resolve adapter '%s' to a valid ID. Disconnecting!", + adapter->name); + mgmt_be_adapter_disconnect(adapter); + } + mgmt_be_adapters_by_id[adapter->id] = adapter; + mgmt_be_adapter_cleanup_old_conn(adapter); + } + break; + case MGMTD__BE_MESSAGE__MESSAGE_TXN_REPLY: + MGMTD_BE_ADAPTER_DBG( + "Got %s TXN_REPLY Msg for Txn-Id 0x%llx from '%s' with '%s'", + be_msg->txn_reply->create ? "Create" : "Delete", + (unsigned long long)be_msg->txn_reply->txn_id, + adapter->name, + be_msg->txn_reply->success ? "success" : "failure"); + /* + * TODO: Forward the TXN_REPLY to txn module. + * mgmt_txn_notify_be_txn_reply( + * be_msg->txn_reply->txn_id, + * be_msg->txn_reply->create, + * be_msg->txn_reply->success, adapter); + */ + break; + case MGMTD__BE_MESSAGE__MESSAGE_CFG_DATA_REPLY: + MGMTD_BE_ADAPTER_DBG( + "Got CFGDATA_REPLY Msg from '%s' for Txn-Id 0x%llx Batch-Id 0x%llx with Err:'%s'", + adapter->name, + (unsigned long long)be_msg->cfg_data_reply->txn_id, + (unsigned long long)be_msg->cfg_data_reply->batch_id, + be_msg->cfg_data_reply->error_if_any + ? be_msg->cfg_data_reply->error_if_any + : "None"); + /* + * TODO: Forward the CGFData-create reply to txn module. + * mgmt_txn_notify_be_cfgdata_reply( + * be_msg->cfg_data_reply->txn_id, + * be_msg->cfg_data_reply->batch_id, + * be_msg->cfg_data_reply->success, + * be_msg->cfg_data_reply->error_if_any, adapter); + */ + break; + case MGMTD__BE_MESSAGE__MESSAGE_CFG_APPLY_REPLY: + MGMTD_BE_ADAPTER_DBG( + "Got %s CFG_APPLY_REPLY Msg from '%s' for Txn-Id 0x%llx for %d batches (Id 0x%llx-0x%llx), Err:'%s'", + be_msg->cfg_apply_reply->success ? "successful" + : "failed", + adapter->name, + (unsigned long long) + be_msg->cfg_apply_reply->txn_id, + (int)be_msg->cfg_apply_reply->n_batch_ids, + (unsigned long long) + be_msg->cfg_apply_reply->batch_ids[0], + (unsigned long long)be_msg->cfg_apply_reply + ->batch_ids[be_msg->cfg_apply_reply + ->n_batch_ids + - 1], + be_msg->cfg_apply_reply->error_if_any + ? be_msg->cfg_apply_reply->error_if_any + : "None"); + /* TODO: Forward the CGFData-apply reply to txn module. + * mgmt_txn_notify_be_cfg_apply_reply( + * be_msg->cfg_apply_reply->txn_id, + * be_msg->cfg_apply_reply->success, + * (uint64_t *)be_msg->cfg_apply_reply->batch_ids, + * be_msg->cfg_apply_reply->n_batch_ids, + * be_msg->cfg_apply_reply->error_if_any, adapter); + */ + break; + case MGMTD__BE_MESSAGE__MESSAGE_GET_REPLY: + case MGMTD__BE_MESSAGE__MESSAGE_CFG_CMD_REPLY: + case MGMTD__BE_MESSAGE__MESSAGE_SHOW_CMD_REPLY: + case MGMTD__BE_MESSAGE__MESSAGE_NOTIFY_DATA: + /* + * TODO: Add handling code in future. + */ + break; + /* + * NOTE: The following messages are always sent from MGMTD to + * Backend clients only and/or need not be handled on MGMTd. + */ + case MGMTD__BE_MESSAGE__MESSAGE_SUBSCR_REPLY: + case MGMTD__BE_MESSAGE__MESSAGE_GET_REQ: + case MGMTD__BE_MESSAGE__MESSAGE_TXN_REQ: + case MGMTD__BE_MESSAGE__MESSAGE_CFG_DATA_REQ: + case MGMTD__BE_MESSAGE__MESSAGE_CFG_APPLY_REQ: + case MGMTD__BE_MESSAGE__MESSAGE_CFG_CMD_REQ: + case MGMTD__BE_MESSAGE__MESSAGE_SHOW_CMD_REQ: + case MGMTD__BE_MESSAGE__MESSAGE__NOT_SET: +#if PROTOBUF_C_VERSION_NUMBER >= 1003000 + case _MGMTD__BE_MESSAGE__MESSAGE_IS_INT_SIZE: +#endif + default: + /* + * A 'default' case is being added contrary to the + * FRR code guidelines to take care of build + * failures on certain build systems (courtesy of + * the proto-c package). + */ + break; + } + + return 0; +} + +static inline void +mgmt_be_adapter_sched_msg_write(struct mgmt_be_client_adapter *adapter) +{ + if (!CHECK_FLAG(adapter->flags, MGMTD_BE_ADAPTER_FLAGS_WRITES_OFF)) + mgmt_be_adapter_register_event(adapter, MGMTD_BE_CONN_WRITE); +} + +static inline void +mgmt_be_adapter_writes_on(struct mgmt_be_client_adapter *adapter) +{ + MGMTD_BE_ADAPTER_DBG("Resume writing msgs for '%s'", adapter->name); + UNSET_FLAG(adapter->flags, MGMTD_BE_ADAPTER_FLAGS_WRITES_OFF); + if (adapter->obuf_work || stream_fifo_count_safe(adapter->obuf_fifo)) + mgmt_be_adapter_sched_msg_write(adapter); +} + +static inline void +mgmt_be_adapter_writes_off(struct mgmt_be_client_adapter *adapter) +{ + SET_FLAG(adapter->flags, MGMTD_BE_ADAPTER_FLAGS_WRITES_OFF); + MGMTD_BE_ADAPTER_DBG("Pause writing msgs for '%s'", adapter->name); +} + +static int mgmt_be_adapter_send_msg(struct mgmt_be_client_adapter *adapter, + Mgmtd__BeMessage *be_msg) +{ + size_t msg_size; + uint8_t *msg_buf = adapter->msg_buf; + struct mgmt_be_msg *msg; + + if (adapter->conn_fd < 0) + return -1; + + msg_size = mgmtd__be_message__get_packed_size(be_msg); + msg_size += MGMTD_BE_MSG_HDR_LEN; + if (msg_size > MGMTD_BE_MSG_MAX_LEN) { + MGMTD_BE_ADAPTER_ERR( + "Message size %d more than max size'%d. Not sending!'", + (int)msg_size, (int)MGMTD_BE_MSG_MAX_LEN); + return -1; + } + + msg = (struct mgmt_be_msg *)msg_buf; + msg->hdr.marker = MGMTD_BE_MSG_MARKER; + msg->hdr.len = (uint16_t)msg_size; + mgmtd__be_message__pack(be_msg, msg->payload); + + if (!adapter->obuf_work) + adapter->obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); + if (STREAM_WRITEABLE(adapter->obuf_work) < msg_size) { + stream_fifo_push(adapter->obuf_fifo, adapter->obuf_work); + adapter->obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); + } + stream_write(adapter->obuf_work, (void *)msg_buf, msg_size); + + mgmt_be_adapter_sched_msg_write(adapter); + adapter->num_msg_tx++; + return 0; +} + +static int mgmt_be_send_txn_req(struct mgmt_be_client_adapter *adapter, + uint64_t txn_id, bool create) +{ + Mgmtd__BeMessage be_msg; + Mgmtd__BeTxnReq txn_req; + + mgmtd__be_txn_req__init(&txn_req); + txn_req.create = create; + txn_req.txn_id = txn_id; + + mgmtd__be_message__init(&be_msg); + be_msg.message_case = MGMTD__BE_MESSAGE__MESSAGE_TXN_REQ; + be_msg.txn_req = &txn_req; + + MGMTD_BE_ADAPTER_DBG( + "Sending TXN_REQ message to Backend client '%s' for Txn-Id %llx", + adapter->name, (unsigned long long)txn_id); + + return mgmt_be_adapter_send_msg(adapter, &be_msg); +} + +static int +mgmt_be_send_cfgdata_create_req(struct mgmt_be_client_adapter *adapter, + uint64_t txn_id, uint64_t batch_id, + Mgmtd__YangCfgDataReq **cfgdata_reqs, + size_t num_reqs, bool end_of_data) +{ + Mgmtd__BeMessage be_msg; + Mgmtd__BeCfgDataCreateReq cfgdata_req; + + mgmtd__be_cfg_data_create_req__init(&cfgdata_req); + cfgdata_req.batch_id = batch_id; + cfgdata_req.txn_id = txn_id; + cfgdata_req.data_req = cfgdata_reqs; + cfgdata_req.n_data_req = num_reqs; + cfgdata_req.end_of_data = end_of_data; + + mgmtd__be_message__init(&be_msg); + be_msg.message_case = MGMTD__BE_MESSAGE__MESSAGE_CFG_DATA_REQ; + be_msg.cfg_data_req = &cfgdata_req; + + MGMTD_BE_ADAPTER_DBG( + "Sending CFGDATA_CREATE_REQ message to Backend client '%s' for Txn-Id %llx, Batch-Id: %llx", + adapter->name, (unsigned long long)txn_id, + (unsigned long long)batch_id); + + return mgmt_be_adapter_send_msg(adapter, &be_msg); +} + +static int mgmt_be_send_cfgapply_req(struct mgmt_be_client_adapter *adapter, + uint64_t txn_id) +{ + Mgmtd__BeMessage be_msg; + Mgmtd__BeCfgDataApplyReq apply_req; + + mgmtd__be_cfg_data_apply_req__init(&apply_req); + apply_req.txn_id = txn_id; + + mgmtd__be_message__init(&be_msg); + be_msg.message_case = MGMTD__BE_MESSAGE__MESSAGE_CFG_APPLY_REQ; + be_msg.cfg_apply_req = &apply_req; + + MGMTD_BE_ADAPTER_DBG( + "Sending CFG_APPLY_REQ message to Backend client '%s' for Txn-Id 0x%llx", + adapter->name, (unsigned long long)txn_id); + + return mgmt_be_adapter_send_msg(adapter, &be_msg); +} + +static uint16_t +mgmt_be_adapter_process_msg(struct mgmt_be_client_adapter *adapter, + uint8_t *msg_buf, uint16_t bytes_read) +{ + Mgmtd__BeMessage *be_msg; + struct mgmt_be_msg *msg; + uint16_t bytes_left; + uint16_t processed = 0; + + bytes_left = bytes_read; + for (; bytes_left > MGMTD_BE_MSG_HDR_LEN; + bytes_left -= msg->hdr.len, msg_buf += msg->hdr.len) { + msg = (struct mgmt_be_msg *)msg_buf; + if (msg->hdr.marker != MGMTD_BE_MSG_MARKER) { + MGMTD_BE_ADAPTER_DBG( + "Marker not found in message from MGMTD Backend adapter '%s'", + adapter->name); + break; + } + + if (bytes_left < msg->hdr.len) { + MGMTD_BE_ADAPTER_DBG( + "Incomplete message of %d bytes (epxected: %u) from MGMTD Backend adapter '%s'", + bytes_left, msg->hdr.len, adapter->name); + break; + } + + be_msg = mgmtd__be_message__unpack( + NULL, (size_t)(msg->hdr.len - MGMTD_BE_MSG_HDR_LEN), + msg->payload); + if (!be_msg) { + MGMTD_BE_ADAPTER_DBG( + "Failed to decode %d bytes from MGMTD Backend adapter '%s'", + msg->hdr.len, adapter->name); + continue; + } + + (void)mgmt_be_adapter_handle_msg(adapter, be_msg); + mgmtd__be_message__free_unpacked(be_msg, NULL); + processed++; + adapter->num_msg_rx++; + } + + return processed; +} + +static void mgmt_be_adapter_proc_msgbufs(struct thread *thread) +{ + struct mgmt_be_client_adapter *adapter; + struct stream *work; + int processed = 0; + + adapter = (struct mgmt_be_client_adapter *)THREAD_ARG(thread); + assert(adapter); + + if (adapter->conn_fd < 0) + return; + + for (; processed < MGMTD_BE_MAX_NUM_MSG_PROC;) { + work = stream_fifo_pop_safe(adapter->ibuf_fifo); + if (!work) + break; + + processed += mgmt_be_adapter_process_msg( + adapter, STREAM_DATA(work), stream_get_endp(work)); + + if (work != adapter->ibuf_work) { + /* Free it up */ + stream_free(work); + } else { + /* Reset stream buffer for next read */ + stream_reset(work); + } + } + + /* + * If we have more to process, reschedule for processing it. + */ + if (stream_fifo_head(adapter->ibuf_fifo)) + mgmt_be_adapter_register_event(adapter, MGMTD_BE_PROC_MSG); +} + +static void mgmt_be_adapter_read(struct thread *thread) +{ + struct mgmt_be_client_adapter *adapter; + int bytes_read, msg_cnt; + size_t total_bytes, bytes_left; + struct mgmt_be_msg_hdr *msg_hdr; + bool incomplete = false; + + adapter = (struct mgmt_be_client_adapter *)THREAD_ARG(thread); + assert(adapter && adapter->conn_fd >= 0); + + total_bytes = 0; + bytes_left = STREAM_SIZE(adapter->ibuf_work) - + stream_get_endp(adapter->ibuf_work); + for (; bytes_left > MGMTD_BE_MSG_HDR_LEN;) { + bytes_read = stream_read_try(adapter->ibuf_work, + adapter->conn_fd, bytes_left); + MGMTD_BE_ADAPTER_DBG( + "Got %d bytes of message from MGMTD Backend adapter '%s'", + bytes_read, adapter->name); + if (bytes_read <= 0) { + if (bytes_read == -1 + && (errno == EAGAIN || errno == EWOULDBLOCK)) { + mgmt_be_adapter_register_event( + adapter, MGMTD_BE_CONN_READ); + return; + } + + if (!bytes_read) { + /* Looks like connection closed */ + MGMTD_BE_ADAPTER_ERR( + "Got error (%d) while reading from MGMTD Backend adapter '%s'. Err: '%s'", + bytes_read, adapter->name, + safe_strerror(errno)); + mgmt_be_adapter_disconnect(adapter); + return; + } + break; + } + + total_bytes += bytes_read; + bytes_left -= bytes_read; + } + + /* + * Check if we would have read incomplete messages or not. + */ + stream_set_getp(adapter->ibuf_work, 0); + total_bytes = 0; + msg_cnt = 0; + bytes_left = stream_get_endp(adapter->ibuf_work); + for (; bytes_left > MGMTD_BE_MSG_HDR_LEN;) { + msg_hdr = + (struct mgmt_be_msg_hdr *)(STREAM_DATA( + adapter->ibuf_work) + + total_bytes); + if (msg_hdr->marker != MGMTD_BE_MSG_MARKER) { + /* Corrupted buffer. Force disconnect?? */ + MGMTD_BE_ADAPTER_ERR( + "Received corrupted buffer from MGMTD Backend client."); + mgmt_be_adapter_disconnect(adapter); + return; + } + if (msg_hdr->len > bytes_left) + break; + + total_bytes += msg_hdr->len; + bytes_left -= msg_hdr->len; + msg_cnt++; + } + + if (bytes_left > 0) + incomplete = true; + + /* + * We would have read one or several messages. + * Schedule processing them now. + */ + msg_hdr = (struct mgmt_be_msg_hdr *)(STREAM_DATA(adapter->ibuf_work) + + total_bytes); + stream_set_endp(adapter->ibuf_work, total_bytes); + stream_fifo_push(adapter->ibuf_fifo, adapter->ibuf_work); + adapter->ibuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); + if (incomplete) { + stream_put(adapter->ibuf_work, msg_hdr, bytes_left); + stream_set_endp(adapter->ibuf_work, bytes_left); + } + + if (msg_cnt) + mgmt_be_adapter_register_event(adapter, MGMTD_BE_PROC_MSG); + + mgmt_be_adapter_register_event(adapter, MGMTD_BE_CONN_READ); +} + +static void mgmt_be_adapter_write(struct thread *thread) +{ + int bytes_written = 0; + int processed = 0; + int msg_size = 0; + struct stream *s = NULL; + struct stream *free = NULL; + struct mgmt_be_client_adapter *adapter; + + adapter = (struct mgmt_be_client_adapter *)THREAD_ARG(thread); + assert(adapter && adapter->conn_fd >= 0); + + /* Ensure pushing any pending write buffer to FIFO */ + if (adapter->obuf_work) { + stream_fifo_push(adapter->obuf_fifo, adapter->obuf_work); + adapter->obuf_work = NULL; + } + + for (s = stream_fifo_head(adapter->obuf_fifo); + s && processed < MGMTD_BE_MAX_NUM_MSG_WRITE; + s = stream_fifo_head(adapter->obuf_fifo)) { + /* msg_size = (int)stream_get_size(s); */ + msg_size = (int)STREAM_READABLE(s); + bytes_written = stream_flush(s, adapter->conn_fd); + if (bytes_written == -1 + && (errno == EAGAIN || errno == EWOULDBLOCK)) { + mgmt_be_adapter_register_event(adapter, + MGMTD_BE_CONN_WRITE); + return; + } else if (bytes_written != msg_size) { + MGMTD_BE_ADAPTER_ERR( + "Could not write all %d bytes (wrote: %d) to MGMTD Backend client socket. Err: '%s'", + msg_size, bytes_written, safe_strerror(errno)); + if (bytes_written > 0) { + stream_forward_getp(s, (size_t)bytes_written); + stream_pulldown(s); + mgmt_be_adapter_register_event( + adapter, MGMTD_BE_CONN_WRITE); + return; + } + mgmt_be_adapter_disconnect(adapter); + return; + } + + free = stream_fifo_pop(adapter->obuf_fifo); + stream_free(free); + MGMTD_BE_ADAPTER_DBG( + "Wrote %d bytes of message to MGMTD Backend client socket.'", + bytes_written); + processed++; + } + + if (s) { + mgmt_be_adapter_writes_off(adapter); + mgmt_be_adapter_register_event(adapter, + MGMTD_BE_CONN_WRITES_ON); + } +} + +static void mgmt_be_adapter_resume_writes(struct thread *thread) +{ + struct mgmt_be_client_adapter *adapter; + + adapter = (struct mgmt_be_client_adapter *)THREAD_ARG(thread); + assert(adapter && adapter->conn_fd >= 0); + + mgmt_be_adapter_writes_on(adapter); +} + +static void mgmt_be_iter_and_get_cfg(struct mgmt_ds_ctx *ds_ctx, + char *xpath, struct lyd_node *node, + struct nb_node *nb_node, void *ctx) +{ + struct mgmt_be_client_subscr_info subscr_info; + struct mgmt_be_get_adapter_config_params *parms; + struct mgmt_be_client_adapter *adapter; + struct nb_config_cbs *root; + uint32_t *seq; + + if (mgmt_be_get_subscr_info_for_xpath(xpath, &subscr_info) != 0) { + MGMTD_BE_ADAPTER_ERR( + "ERROR: Failed to get subscriber for '%s'", xpath); + return; + } + + parms = (struct mgmt_be_get_adapter_config_params *)ctx; + + adapter = parms->adapter; + if (!subscr_info.xpath_subscr[adapter->id].subscribed) + return; + + root = parms->cfg_chgs; + seq = &parms->seq; + nb_config_diff_created(node, seq, root); +} + +static void mgmt_be_adapter_conn_init(struct thread *thread) +{ + struct mgmt_be_client_adapter *adapter; + + adapter = (struct mgmt_be_client_adapter *)THREAD_ARG(thread); + assert(adapter && adapter->conn_fd >= 0); + + /* + * TODO: Check first if the current session can run a CONFIG + * transaction or not. Reschedule if a CONFIG transaction + * from another session is already in progress. + if (mgmt_config_txn_in_progress() != MGMTD_SESSION_ID_NONE) { + mgmt_be_adapter_register_event(adapter, MGMTD_BE_CONN_INIT); + return 0; + } + */ + + /* + * TODO: Notify TXN module to create a CONFIG transaction and + * download the CONFIGs identified for this new client. + * If the TXN module fails to initiate the CONFIG transaction + * disconnect from the client forcing a reconnect later. + * That should also take care of destroying the adapter. + * + if (mgmt_txn_notify_be_adapter_conn(adapter, true) != 0) { + mgmt_be_adapter_disconnect(adapter); + adapter = NULL; + } + */ +} + +static void +mgmt_be_adapter_register_event(struct mgmt_be_client_adapter *adapter, + enum mgmt_be_event event) +{ + struct timeval tv = {0}; + + switch (event) { + case MGMTD_BE_CONN_INIT: + thread_add_timer_msec(mgmt_be_adapter_tm, + mgmt_be_adapter_conn_init, adapter, + MGMTD_BE_CONN_INIT_DELAY_MSEC, + &adapter->conn_init_ev); + assert(adapter->conn_init_ev); + break; + case MGMTD_BE_CONN_READ: + thread_add_read(mgmt_be_adapter_tm, mgmt_be_adapter_read, + adapter, adapter->conn_fd, &adapter->conn_read_ev); + assert(adapter->conn_read_ev); + break; + case MGMTD_BE_CONN_WRITE: + thread_add_write(mgmt_be_adapter_tm, mgmt_be_adapter_write, + adapter, adapter->conn_fd, &adapter->conn_write_ev); + assert(adapter->conn_write_ev); + break; + case MGMTD_BE_PROC_MSG: + tv.tv_usec = MGMTD_BE_MSG_PROC_DELAY_USEC; + thread_add_timer_tv(mgmt_be_adapter_tm, + mgmt_be_adapter_proc_msgbufs, adapter, &tv, + &adapter->proc_msg_ev); + assert(adapter->proc_msg_ev); + break; + case MGMTD_BE_CONN_WRITES_ON: + thread_add_timer_msec(mgmt_be_adapter_tm, + mgmt_be_adapter_resume_writes, adapter, + MGMTD_BE_MSG_WRITE_DELAY_MSEC, + &adapter->conn_writes_on); + assert(adapter->conn_writes_on); + break; + case MGMTD_BE_SERVER: + case MGMTD_BE_SCHED_CFG_PREPARE: + case MGMTD_BE_RESCHED_CFG_PREPARE: + case MGMTD_BE_SCHED_CFG_APPLY: + case MGMTD_BE_RESCHED_CFG_APPLY: + assert(!"mgmt_be_adapter_post_event() called incorrectly"); + break; + } +} + +void mgmt_be_adapter_lock(struct mgmt_be_client_adapter *adapter) +{ + adapter->refcount++; +} + +extern void mgmt_be_adapter_unlock(struct mgmt_be_client_adapter **adapter) +{ + assert(*adapter && (*adapter)->refcount); + + (*adapter)->refcount--; + if (!(*adapter)->refcount) { + mgmt_be_adapters_del(&mgmt_be_adapters, *adapter); + + stream_fifo_free((*adapter)->ibuf_fifo); + stream_free((*adapter)->ibuf_work); + stream_fifo_free((*adapter)->obuf_fifo); + stream_free((*adapter)->obuf_work); + + THREAD_OFF((*adapter)->conn_init_ev); + THREAD_OFF((*adapter)->conn_read_ev); + THREAD_OFF((*adapter)->conn_write_ev); + THREAD_OFF((*adapter)->conn_writes_on); + THREAD_OFF((*adapter)->proc_msg_ev); + XFREE(MTYPE_MGMTD_BE_ADPATER, *adapter); + } + + *adapter = NULL; +} + +int mgmt_be_adapter_init(struct thread_master *tm) +{ + if (!mgmt_be_adapter_tm) { + mgmt_be_adapter_tm = tm; + memset(mgmt_xpath_map, 0, sizeof(mgmt_xpath_map)); + mgmt_num_xpath_maps = 0; + memset(mgmt_be_adapters_by_id, 0, + sizeof(mgmt_be_adapters_by_id)); + mgmt_be_adapters_init(&mgmt_be_adapters); + mgmt_be_xpath_map_init(); + } + + return 0; +} + +void mgmt_be_adapter_destroy(void) +{ + mgmt_be_cleanup_adapters(); +} + +struct mgmt_be_client_adapter * +mgmt_be_create_adapter(int conn_fd, union sockunion *from) +{ + struct mgmt_be_client_adapter *adapter = NULL; + + adapter = mgmt_be_find_adapter_by_fd(conn_fd); + if (!adapter) { + adapter = XCALLOC(MTYPE_MGMTD_BE_ADPATER, + sizeof(struct mgmt_be_client_adapter)); + assert(adapter); + + adapter->conn_fd = conn_fd; + adapter->id = MGMTD_BE_CLIENT_ID_MAX; + memcpy(&adapter->conn_su, from, sizeof(adapter->conn_su)); + snprintf(adapter->name, sizeof(adapter->name), "Unknown-FD-%d", + adapter->conn_fd); + adapter->ibuf_fifo = stream_fifo_new(); + adapter->ibuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); + adapter->obuf_fifo = stream_fifo_new(); + /* adapter->obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); */ + adapter->obuf_work = NULL; + mgmt_be_adapter_lock(adapter); + + mgmt_be_adapter_register_event(adapter, MGMTD_BE_CONN_READ); + mgmt_be_adapters_add_tail(&mgmt_be_adapters, adapter); + + RB_INIT(nb_config_cbs, &adapter->cfg_chgs); + + MGMTD_BE_ADAPTER_DBG("Added new MGMTD Backend adapter '%s'", + adapter->name); + } + + /* Make client socket non-blocking. */ + set_nonblocking(adapter->conn_fd); + setsockopt_so_sendbuf(adapter->conn_fd, MGMTD_SOCKET_BE_SEND_BUF_SIZE); + setsockopt_so_recvbuf(adapter->conn_fd, MGMTD_SOCKET_BE_RECV_BUF_SIZE); + + /* Trigger resync of config with the new adapter */ + mgmt_be_adapter_register_event(adapter, MGMTD_BE_CONN_INIT); + + return adapter; +} + +struct mgmt_be_client_adapter * +mgmt_be_get_adapter_by_id(enum mgmt_be_client_id id) +{ + return (id < MGMTD_BE_CLIENT_ID_MAX ? mgmt_be_adapters_by_id[id] + : NULL); +} + +struct mgmt_be_client_adapter * +mgmt_be_get_adapter_by_name(const char *name) +{ + return mgmt_be_find_adapter_by_name(name); +} + +int mgmt_be_get_adapter_config(struct mgmt_be_client_adapter *adapter, + struct mgmt_ds_ctx *ds_ctx, + struct nb_config_cbs **cfg_chgs) +{ + char base_xpath[] = "/"; + struct mgmt_be_get_adapter_config_params parms; + + assert(cfg_chgs); + + if (RB_EMPTY(nb_config_cbs, &adapter->cfg_chgs)) { + parms.adapter = adapter; + parms.cfg_chgs = &adapter->cfg_chgs; + parms.seq = 0; + + mgmt_ds_iter_data(ds_ctx, base_xpath, + mgmt_be_iter_and_get_cfg, (void *)&parms, + false); + } + + *cfg_chgs = &adapter->cfg_chgs; + return 0; +} + +int mgmt_be_create_txn(struct mgmt_be_client_adapter *adapter, + uint64_t txn_id) +{ + return mgmt_be_send_txn_req(adapter, txn_id, true); +} + +int mgmt_be_destroy_txn(struct mgmt_be_client_adapter *adapter, + uint64_t txn_id) +{ + return mgmt_be_send_txn_req(adapter, txn_id, false); +} + +int mgmt_be_send_cfg_data_create_req(struct mgmt_be_client_adapter *adapter, + uint64_t txn_id, uint64_t batch_id, + struct mgmt_be_cfgreq *cfg_req, + bool end_of_data) +{ + return mgmt_be_send_cfgdata_create_req( + adapter, txn_id, batch_id, cfg_req->cfgdata_reqs, + cfg_req->num_reqs, end_of_data); +} + +extern int +mgmt_be_send_cfg_apply_req(struct mgmt_be_client_adapter *adapter, + uint64_t txn_id) +{ + return mgmt_be_send_cfgapply_req(adapter, txn_id); +} + +/* + * This function maps a YANG dtata Xpath to one or more + * Backend Clients that should be contacted for various purposes. + */ +int mgmt_be_get_subscr_info_for_xpath( + const char *xpath, struct mgmt_be_client_subscr_info *subscr_info) +{ + int indx, match, max_match = 0, num_reg; + enum mgmt_be_client_id id; + struct mgmt_be_client_subscr_info + *reg_maps[array_size(mgmt_xpath_map)] = {0}; + bool root_xp = false; + + if (!subscr_info) + return -1; + + num_reg = 0; + memset(subscr_info, 0, sizeof(*subscr_info)); + + if (strlen(xpath) <= 2 && xpath[0] == '/' + && (!xpath[1] || xpath[1] == '*')) { + root_xp = true; + } + + MGMTD_BE_ADAPTER_DBG("XPATH: %s", xpath); + for (indx = 0; indx < mgmt_num_xpath_maps; indx++) { + /* + * For Xpaths: '/' and '/ *' all xpath maps should match + * the given xpath. + */ + if (!root_xp) { + match = mgmt_be_eval_regexp_match( + mgmt_xpath_map[indx].xpath_regexp, xpath); + + if (!match || match < max_match) + continue; + + if (match > max_match) { + num_reg = 0; + max_match = match; + } + } + + reg_maps[num_reg] = &mgmt_xpath_map[indx].be_subscrs; + num_reg++; + } + + for (indx = 0; indx < num_reg; indx++) { + FOREACH_MGMTD_BE_CLIENT_ID (id) { + if (reg_maps[indx]->xpath_subscr[id].subscribed) { + MGMTD_BE_ADAPTER_DBG( + "Cient: %s", + mgmt_be_client_id2name(id)); + memcpy(&subscr_info->xpath_subscr[id], + ®_maps[indx]->xpath_subscr[id], + sizeof(subscr_info->xpath_subscr[id])); + } + } + } + + return 0; +} + +void mgmt_be_adapter_status_write(struct vty *vty) +{ + struct mgmt_be_client_adapter *adapter; + + vty_out(vty, "MGMTD Backend Adapters\n"); + + FOREACH_ADAPTER_IN_LIST (adapter) { + vty_out(vty, " Client: \t\t\t%s\n", adapter->name); + vty_out(vty, " Conn-FD: \t\t\t%d\n", adapter->conn_fd); + vty_out(vty, " Client-Id: \t\t\t%d\n", adapter->id); + vty_out(vty, " Ref-Count: \t\t\t%u\n", adapter->refcount); + vty_out(vty, " Msg-Sent: \t\t\t%u\n", adapter->num_msg_tx); + vty_out(vty, " Msg-Recvd: \t\t\t%u\n", adapter->num_msg_rx); + } + vty_out(vty, " Total: %d\n", + (int)mgmt_be_adapters_count(&mgmt_be_adapters)); +} + +void mgmt_be_xpath_register_write(struct vty *vty) +{ + int indx; + enum mgmt_be_client_id id; + struct mgmt_be_client_adapter *adapter; + + vty_out(vty, "MGMTD Backend XPath Registry\n"); + + for (indx = 0; indx < mgmt_num_xpath_maps; indx++) { + vty_out(vty, " - XPATH: '%s'\n", + mgmt_xpath_map[indx].xpath_regexp); + FOREACH_MGMTD_BE_CLIENT_ID (id) { + if (mgmt_xpath_map[indx] + .be_subscrs.xpath_subscr[id] + .subscribed) { + vty_out(vty, + " -- Client: '%s' \t Validate:%s, Notify:%s, Own:%s\n", + mgmt_be_client_id2name(id), + mgmt_xpath_map[indx] + .be_subscrs + .xpath_subscr[id] + .validate_config + ? "T" + : "F", + mgmt_xpath_map[indx] + .be_subscrs + .xpath_subscr[id] + .notify_config + ? "T" + : "F", + mgmt_xpath_map[indx] + .be_subscrs + .xpath_subscr[id] + .own_oper_data + ? "T" + : "F"); + adapter = mgmt_be_get_adapter_by_id(id); + if (adapter) { + vty_out(vty, " -- Adapter: %p\n", + adapter); + } + } + } + } + + vty_out(vty, "Total XPath Registries: %u\n", mgmt_num_xpath_maps); +} + +void mgmt_be_xpath_subscr_info_write(struct vty *vty, const char *xpath) +{ + struct mgmt_be_client_subscr_info subscr; + enum mgmt_be_client_id id; + struct mgmt_be_client_adapter *adapter; + + if (mgmt_be_get_subscr_info_for_xpath(xpath, &subscr) != 0) { + vty_out(vty, "ERROR: Failed to get subscriber for '%s'\n", + xpath); + return; + } + + vty_out(vty, "XPath: '%s'\n", xpath); + FOREACH_MGMTD_BE_CLIENT_ID (id) { + if (subscr.xpath_subscr[id].subscribed) { + vty_out(vty, + " -- Client: '%s' \t Validate:%s, Notify:%s, Own:%s\n", + mgmt_be_client_id2name(id), + subscr.xpath_subscr[id].validate_config ? "T" + : "F", + subscr.xpath_subscr[id].notify_config ? "T" + : "F", + subscr.xpath_subscr[id].own_oper_data ? "T" + : "F"); + adapter = mgmt_be_get_adapter_by_id(id); + if (adapter) + vty_out(vty, " -- Adapter: %p\n", adapter); + } + } +} diff --git a/mgmtd/mgmt_be_adapter.h b/mgmtd/mgmt_be_adapter.h new file mode 100644 index 000000000..5dfc2386d --- /dev/null +++ b/mgmtd/mgmt_be_adapter.h @@ -0,0 +1,231 @@ +// SPDX-License-Identifier: GPL-2.0-or-later +/* + * MGMTD Backend Client Connection Adapter + * + * Copyright (C) 2021 Vmware, Inc. + * Pushpasis Sarkar <spushpasis@vmware.com> + */ + +#ifndef _FRR_MGMTD_BE_ADAPTER_H_ +#define _FRR_MGMTD_BE_ADAPTER_H_ + +#include "mgmtd/mgmt_defines.h" +#include "mgmt_be_client.h" +#include "mgmtd/mgmt_ds.h" + +#define MGMTD_BE_CONN_INIT_DELAY_MSEC 50 + +#define MGMTD_FIND_ADAPTER_BY_INDEX(adapter_index) \ + mgmt_adaptr_ref[adapter_index] + +enum mgmt_be_req_type { + MGMTD_BE_REQ_NONE = 0, + MGMTD_BE_REQ_CFG_VALIDATE, + MGMTD_BE_REQ_CFG_APPLY, + MGMTD_BE_REQ_DATA_GET_ELEM, + MGMTD_BE_REQ_DATA_GET_NEXT +}; + +struct mgmt_be_cfgreq { + Mgmtd__YangCfgDataReq **cfgdata_reqs; + size_t num_reqs; +}; + +struct mgmt_be_datareq { + Mgmtd__YangGetDataReq **getdata_reqs; + size_t num_reqs; +}; + +PREDECL_LIST(mgmt_be_adapters); +PREDECL_LIST(mgmt_txn_badapters); + +struct mgmt_be_client_adapter { + enum mgmt_be_client_id id; + int conn_fd; + union sockunion conn_su; + struct thread *conn_init_ev; + struct thread *conn_read_ev; + struct thread *conn_write_ev; + struct thread *conn_writes_on; + struct thread *proc_msg_ev; + uint32_t flags; + char name[MGMTD_CLIENT_NAME_MAX_LEN]; + uint8_t num_xpath_reg; + char xpath_reg[MGMTD_MAX_NUM_XPATH_REG][MGMTD_MAX_XPATH_LEN]; + + /* IO streams for read and write */ + /* pthread_mutex_t ibuf_mtx; */ + struct stream_fifo *ibuf_fifo; + /* pthread_mutex_t obuf_mtx; */ + struct stream_fifo *obuf_fifo; + + /* Private I/O buffers */ + struct stream *ibuf_work; + struct stream *obuf_work; + uint8_t msg_buf[MGMTD_BE_MSG_MAX_LEN]; + + /* Buffer of data waiting to be written to client. */ + /* struct buffer *wb; */ + + int refcount; + uint32_t num_msg_tx; + uint32_t num_msg_rx; + + /* + * List of config items that should be sent to the + * backend during re/connect. This is temporarily + * created and then freed-up as soon as the initial + * config items has been applied onto the backend. + */ + struct nb_config_cbs cfg_chgs; + + struct mgmt_be_adapters_item list_linkage; + struct mgmt_txn_badapters_item txn_list_linkage; +}; + +#define MGMTD_BE_ADAPTER_FLAGS_WRITES_OFF (1U << 0) +#define MGMTD_BE_ADAPTER_FLAGS_CFG_SYNCED (1U << 1) + +DECLARE_LIST(mgmt_be_adapters, struct mgmt_be_client_adapter, list_linkage); +DECLARE_LIST(mgmt_txn_badapters, struct mgmt_be_client_adapter, + txn_list_linkage); + +union mgmt_be_xpath_subscr_info { + uint8_t subscribed; + struct { + uint8_t validate_config : 1; + uint8_t notify_config : 1; + uint8_t own_oper_data : 1; + }; +}; + +struct mgmt_be_client_subscr_info { + union mgmt_be_xpath_subscr_info xpath_subscr[MGMTD_BE_CLIENT_ID_MAX]; +}; + +/* Initialise backend adapter module. */ +extern int mgmt_be_adapter_init(struct thread_master *tm); + +/* Destroy the backend adapter module. */ +extern void mgmt_be_adapter_destroy(void); + +/* Acquire lock for backend adapter. */ +extern void mgmt_be_adapter_lock(struct mgmt_be_client_adapter *adapter); + +/* Remove lock from backend adapter. */ +extern void mgmt_be_adapter_unlock(struct mgmt_be_client_adapter **adapter); + +/* Create backend adapter. */ +extern struct mgmt_be_client_adapter * +mgmt_be_create_adapter(int conn_fd, union sockunion *su); + +/* Fetch backend adapter given an adapter name. */ +extern struct mgmt_be_client_adapter * +mgmt_be_get_adapter_by_name(const char *name); + +/* Fetch backend adapter given an client ID. */ +extern struct mgmt_be_client_adapter * +mgmt_be_get_adapter_by_id(enum mgmt_be_client_id id); + +/* Fetch backend adapter config. */ +extern int +mgmt_be_get_adapter_config(struct mgmt_be_client_adapter *adapter, + struct mgmt_ds_ctx *ds_ctx, + struct nb_config_cbs **cfg_chgs); + +/* Create a transaction. */ +extern int mgmt_be_create_txn(struct mgmt_be_client_adapter *adapter, + uint64_t txn_id); + +/* Destroy a transaction. */ +extern int mgmt_be_destroy_txn(struct mgmt_be_client_adapter *adapter, + uint64_t txn_id); + +/* + * Send config data create request to backend client. + * + * adaptr + * Backend adapter information. + * + * txn_id + * Unique transaction identifier. + * + * batch_id + * Request batch ID. + * + * cfg_req + * Config data request. + * + * end_of_data + * TRUE if the data from last batch, FALSE otherwise. + * + * Returns: + * 0 on success, -1 on failure. + */ +extern int mgmt_be_send_cfg_data_create_req( + struct mgmt_be_client_adapter *adapter, uint64_t txn_id, + uint64_t batch_id, struct mgmt_be_cfgreq *cfg_req, bool end_of_data); + +/* + * Send config validate request to backend client. + * + * adaptr + * Backend adapter information. + * + * txn_id + * Unique transaction identifier. + * + * batch_ids + * List of request batch IDs. + * + * num_batch_ids + * Number of batch ids. + * + * Returns: + * 0 on success, -1 on failure. + */ +extern int +mgmt_be_send_cfg_validate_req(struct mgmt_be_client_adapter *adapter, + uint64_t txn_id, uint64_t batch_ids[], + size_t num_batch_ids); + +/* + * Send config apply request to backend client. + * + * adaptr + * Backend adapter information. + * + * txn_id + * Unique transaction identifier. + * + * Returns: + * 0 on success, -1 on failure. + */ +extern int +mgmt_be_send_cfg_apply_req(struct mgmt_be_client_adapter *adapter, + uint64_t txn_id); + +/* + * Dump backend adapter status to vty. + */ +extern void mgmt_be_adapter_status_write(struct vty *vty); + +/* + * Dump xpath registry for each backend client to vty. + */ +extern void mgmt_be_xpath_register_write(struct vty *vty); + +/* + * Maps a YANG dtata Xpath to one or more + * backend clients that should be contacted for various purposes. + */ +extern int mgmt_be_get_subscr_info_for_xpath( + const char *xpath, struct mgmt_be_client_subscr_info *subscr_info); + +/* + * Dump backend client information for a given xpath to vty. + */ +extern void mgmt_be_xpath_subscr_info_write(struct vty *vty, + const char *xpath); + +#endif /* _FRR_MGMTD_BE_ADAPTER_H_ */ diff --git a/mgmtd/mgmt_be_server.c b/mgmtd/mgmt_be_server.c new file mode 100644 index 000000000..6464b12ae --- /dev/null +++ b/mgmtd/mgmt_be_server.c @@ -0,0 +1,160 @@ +// SPDX-License-Identifier: GPL-2.0-or-later +/* + * MGMTD Backend Server + * + * Copyright (C) 2021 Vmware, Inc. + * Pushpasis Sarkar <spushpasis@vmware.com> + */ + +#include <zebra.h> +#include "network.h" +#include "libfrr.h" +#include "mgmtd/mgmt.h" +#include "mgmtd/mgmt_be_server.h" +#include "mgmtd/mgmt_be_adapter.h" + +#ifdef REDIRECT_DEBUG_TO_STDERR +#define MGMTD_BE_SRVR_DBG(fmt, ...) \ + fprintf(stderr, "%s: " fmt "\n", __func__, ##__VA_ARGS__) +#define MGMTD_BE_SRVR_ERR(fmt, ...) \ + fprintf(stderr, "%s: ERROR, " fmt "\n", __func__, ##__VA_ARGS__) +#else /* REDIRECT_DEBUG_TO_STDERR */ +#define MGMTD_BE_SRVR_DBG(fmt, ...) \ + do { \ + if (mgmt_debug_be) \ + zlog_debug("%s: " fmt, __func__, ##__VA_ARGS__); \ + } while (0) +#define MGMTD_BE_SRVR_ERR(fmt, ...) \ + zlog_err("%s: ERROR: " fmt, __func__, ##__VA_ARGS__) +#endif /* REDIRECT_DEBUG_TO_STDERR */ + +static int mgmt_be_listen_fd; +static struct thread_master *mgmt_be_listen_tm; +static struct thread *mgmt_be_listen_ev; +static void mgmt_be_server_register_event(enum mgmt_be_event event); + +static void mgmt_be_conn_accept(struct thread *thread) +{ + int client_conn_fd; + union sockunion su; + + if (mgmt_be_listen_fd < 0) + return; + + /* We continue hearing server listen socket. */ + mgmt_be_server_register_event(MGMTD_BE_SERVER); + + memset(&su, 0, sizeof(union sockunion)); + + /* We can handle IPv4 or IPv6 socket. */ + client_conn_fd = sockunion_accept(mgmt_be_listen_fd, &su); + if (client_conn_fd < 0) { + MGMTD_BE_SRVR_ERR( + "Failed to accept MGMTD Backend client connection : %s", + safe_strerror(errno)); + return; + } + set_nonblocking(client_conn_fd); + set_cloexec(client_conn_fd); + + MGMTD_BE_SRVR_DBG("Got a new MGMTD Backend connection"); + + mgmt_be_create_adapter(client_conn_fd, &su); +} + +static void mgmt_be_server_register_event(enum mgmt_be_event event) +{ + if (event == MGMTD_BE_SERVER) { + thread_add_read(mgmt_be_listen_tm, mgmt_be_conn_accept, + NULL, mgmt_be_listen_fd, + &mgmt_be_listen_ev); + assert(mgmt_be_listen_ev); + } else { + assert(!"mgmt_be_server_post_event() called incorrectly"); + } +} + +static void mgmt_be_server_start(const char *hostname) +{ + int ret; + int sock; + struct sockaddr_un addr; + mode_t old_mask; + + /* Set umask */ + old_mask = umask(0077); + + sock = socket(AF_UNIX, SOCK_STREAM, PF_UNSPEC); + if (sock < 0) { + MGMTD_BE_SRVR_ERR("Failed to create server socket: %s", + safe_strerror(errno)); + goto mgmt_be_server_start_failed; + } + + addr.sun_family = AF_UNIX, + strlcpy(addr.sun_path, MGMTD_BE_SERVER_PATH, sizeof(addr.sun_path)); + unlink(addr.sun_path); + ret = bind(sock, (struct sockaddr *)&addr, sizeof(addr)); + if (ret < 0) { + MGMTD_BE_SRVR_ERR( + "Failed to bind server socket to '%s'. Err: %s", + addr.sun_path, safe_strerror(errno)); + goto mgmt_be_server_start_failed; + } + + ret = listen(sock, MGMTD_BE_MAX_CONN); + if (ret < 0) { + MGMTD_BE_SRVR_ERR("Failed to listen on server socket: %s", + safe_strerror(errno)); + goto mgmt_be_server_start_failed; + } + + /* Restore umask */ + umask(old_mask); + + mgmt_be_listen_fd = sock; + mgmt_be_server_register_event(MGMTD_BE_SERVER); + + MGMTD_BE_SRVR_DBG("Started MGMTD Backend Server!"); + return; + +mgmt_be_server_start_failed: + if (sock) + close(sock); + + mgmt_be_listen_fd = -1; + exit(-1); +} + +int mgmt_be_server_init(struct thread_master *master) +{ + if (mgmt_be_listen_tm) { + MGMTD_BE_SRVR_DBG("MGMTD Backend Server already running!"); + return 0; + } + + mgmt_be_listen_tm = master; + + mgmt_be_server_start("localhost"); + + return 0; +} + +void mgmt_be_server_destroy(void) +{ + if (mgmt_be_listen_tm) { + MGMTD_BE_SRVR_DBG("Closing MGMTD Backend Server!"); + + if (mgmt_be_listen_ev) { + THREAD_OFF(mgmt_be_listen_ev); + mgmt_be_listen_ev = NULL; + } + + if (mgmt_be_listen_fd >= 0) { + close(mgmt_be_listen_fd); + mgmt_be_listen_fd = -1; + } + + mgmt_be_listen_tm = NULL; + } +} diff --git a/mgmtd/mgmt_be_server.h b/mgmtd/mgmt_be_server.h new file mode 100644 index 000000000..5ee57fdf1 --- /dev/null +++ b/mgmtd/mgmt_be_server.h @@ -0,0 +1,20 @@ +// SPDX-License-Identifier: GPL-2.0-or-later +/* + * MGMTD Backend Server + * + * Copyright (C) 2021 Vmware, Inc. + * Pushpasis Sarkar + */ + +#ifndef _FRR_MGMTD_BE_SERVER_H_ +#define _FRR_MGMTD_BE_SERVER_H_ + +#define MGMTD_BE_MAX_CONN 32 + +/* Initialise backend server */ +extern int mgmt_be_server_init(struct thread_master *master); + +/* Destroy backend server */ +extern void mgmt_be_server_destroy(void); + +#endif /* _FRR_MGMTD_BE_SERVER_H_ */ diff --git a/mgmtd/mgmt_defines.h b/mgmtd/mgmt_defines.h index 6f593320b..2af8f3f55 100644 --- a/mgmtd/mgmt_defines.h +++ b/mgmtd/mgmt_defines.h @@ -17,6 +17,10 @@ #define MGMTD_MAX_YANG_VALUE_LEN YANG_VALUE_MAXLEN +#define MGMTD_MAX_NUM_XPATH_REG 128 + +#define MGMTD_MAX_NUM_DATA_REQ_IN_BATCH 32 + enum mgmt_result { MGMTD_SUCCESS = 0, MGMTD_INVALID_PARAM, @@ -35,6 +39,19 @@ enum mgmt_fe_event { MGMTD_FE_PROC_MSG }; +enum mgmt_be_event { + MGMTD_BE_SERVER = 1, + MGMTD_BE_CONN_INIT, + MGMTD_BE_CONN_READ, + MGMTD_BE_CONN_WRITE, + MGMTD_BE_CONN_WRITES_ON, + MGMTD_BE_PROC_MSG, + MGMTD_BE_SCHED_CFG_PREPARE, + MGMTD_BE_RESCHED_CFG_PREPARE, + MGMTD_BE_SCHED_CFG_APPLY, + MGMTD_BE_RESCHED_CFG_APPLY, +}; + #define MGMTD_TXN_ID_NONE 0 #endif /* _FRR_MGMTD_DEFINES_H */ diff --git a/mgmtd/mgmt_main.c b/mgmtd/mgmt_main.c index f47b91df6..017263ca8 100644 --- a/mgmtd/mgmt_main.c +++ b/mgmtd/mgmt_main.c @@ -16,6 +16,7 @@ #include "mgmtd/mgmt_ds.h" #include "routing_nb.h" + /* mgmt options, we use GNU getopt library. */ static const struct option longopts[] = { {"skip_runas", no_argument, NULL, 'S'}, @@ -195,6 +196,17 @@ static void mgmt_vrf_terminate(void) static const struct frr_yang_module_info *const mgmt_yang_modules[] = { &frr_filter_info, &frr_interface_info, &frr_route_map_info, &frr_routing_info, &frr_vrf_info, +/* + * YANG module info supported by backend clients get added here. + * NOTE: Always set .ignore_cbs true for to avoid validating + * backend northbound callbacks during loading. + */ +#if 0 +#ifdef HAVE_STATICD + &(struct frr_yang_module_info){.name = "frr-staticd", + .ignore_cbs = true}, +#endif +#endif }; FRR_DAEMON_INFO(mgmtd, MGMTD, .vty_port = MGMTD_VTY_PORT, diff --git a/mgmtd/mgmt_memory.c b/mgmtd/mgmt_memory.c index 1ec1df7ec..39e036c30 100644 --- a/mgmtd/mgmt_memory.c +++ b/mgmtd/mgmt_memory.c @@ -19,5 +19,6 @@ DEFINE_MGROUP(MGMTD, "mgmt"); DEFINE_MTYPE(MGMTD, MGMTD, "MGMTD instance"); +DEFINE_MTYPE(MGMTD, MGMTD_BE_ADPATER, "MGMTD backend adapter"); DEFINE_MTYPE(MGMTD, MGMTD_FE_ADPATER, "MGMTD Frontend adapter"); DEFINE_MTYPE(MGMTD, MGMTD_FE_SESSION, "MGMTD Frontend Client Session"); diff --git a/mgmtd/mgmt_memory.h b/mgmtd/mgmt_memory.h index e6acfc8d6..d8b3ac7e0 100644 --- a/mgmtd/mgmt_memory.h +++ b/mgmtd/mgmt_memory.h @@ -13,6 +13,7 @@ DECLARE_MGROUP(MGMTD); DECLARE_MTYPE(MGMTD); +DECLARE_MTYPE(MGMTD_BE_ADPATER); DECLARE_MTYPE(MGMTD_FE_ADPATER); DECLARE_MTYPE(MGMTD_FE_SESSION); #endif /* _FRR_MGMTD_MEMORY_H */ diff --git a/mgmtd/mgmt_vty.c b/mgmtd/mgmt_vty.c index 4efd38e97..8ee5921db 100644 --- a/mgmtd/mgmt_vty.c +++ b/mgmtd/mgmt_vty.c @@ -11,12 +11,39 @@ #include "command.h" #include "json.h" #include "mgmtd/mgmt.h" +#include "mgmtd/mgmt_be_server.h" +#include "mgmtd/mgmt_be_adapter.h" #include "mgmtd/mgmt_fe_server.h" #include "mgmtd/mgmt_fe_adapter.h" #include "mgmtd/mgmt_ds.h" #include "mgmtd/mgmt_vty_clippy.c" +DEFPY(show_mgmt_be_adapter, + show_mgmt_be_adapter_cmd, + "show mgmt backend-adapter all", + SHOW_STR + MGMTD_STR + MGMTD_BE_ADAPTER_STR + "Display all Backend Adapters\n") +{ + mgmt_be_adapter_status_write(vty); + + return CMD_SUCCESS; +} + +DEFPY(show_mgmt_be_xpath_reg, + show_mgmt_be_xpath_reg_cmd, + "show mgmt backend-yang-xpath-registry", + SHOW_STR + MGMTD_STR + "Backend Adapter YANG Xpath Registry\n") +{ + mgmt_be_xpath_register_write(vty); + + return CMD_SUCCESS; +} + DEFPY(show_mgmt_fe_adapter, show_mgmt_fe_adapter_cmd, "show mgmt frontend-adapter all [detail$detail]", SHOW_STR @@ -199,6 +226,18 @@ DEFPY(show_mgmt_dump_data, return CMD_SUCCESS; } +DEFPY(show_mgmt_map_xpath, + show_mgmt_map_xpath_cmd, + "show mgmt yang-xpath-subscription WORD$path", + SHOW_STR + MGMTD_STR + "Get YANG Backend Subscription\n" + "XPath expression specifying the YANG data path\n") +{ + mgmt_be_xpath_subscr_info_write(vty, path); + return CMD_SUCCESS; +} + DEFPY(mgmt_load_config, mgmt_load_config_cmd, "mgmt load-config WORD$filepath <merge|replace>$type", @@ -331,13 +370,29 @@ DEFPY(debug_mgmt, void mgmt_vty_init(void) { + /* + * Initialize command handling from VTYSH connection. + * Call command initialization routines defined by + * backend components that are moved to new MGMTD infra + * here one by one. + */ +#if 0 +#if HAVE_STATICD + extern void static_vty_init(void); + static_vty_init(); +#endif +#endif + install_node(&debug_node); + install_element(VIEW_NODE, &show_mgmt_be_adapter_cmd); + install_element(VIEW_NODE, &show_mgmt_be_xpath_reg_cmd); install_element(VIEW_NODE, &show_mgmt_fe_adapter_cmd); install_element(VIEW_NODE, &show_mgmt_ds_cmd); install_element(VIEW_NODE, &show_mgmt_get_config_cmd); install_element(VIEW_NODE, &show_mgmt_get_data_cmd); install_element(VIEW_NODE, &show_mgmt_dump_data_cmd); + install_element(VIEW_NODE, &show_mgmt_map_xpath_cmd); install_element(CONFIG_NODE, &mgmt_commit_cmd); install_element(CONFIG_NODE, &mgmt_set_config_data_cmd); diff --git a/mgmtd/subdir.am b/mgmtd/subdir.am index 732d5560b..64228f968 100644 --- a/mgmtd/subdir.am +++ b/mgmtd/subdir.am @@ -18,6 +18,8 @@ noinst_LIBRARIES += mgmtd/libmgmtd.a mgmtd_libmgmtd_a_SOURCES = \ mgmtd/mgmt.c \ mgmtd/mgmt_ds.c \ + mgmtd/mgmt_be_server.c \ + mgmtd/mgmt_be_adapter.c \ mgmtd/mgmt_fe_server.c \ mgmtd/mgmt_fe_adapter.c \ mgmtd/mgmt_memory.c \ @@ -31,6 +33,8 @@ mgmtdheader_HEADERS = \ noinst_HEADERS += \ mgmtd/mgmt.h \ + mgmtd/mgmt_be_server.h \ + mgmtd/mgmt_be_adapter.h \ mgmtd/mgmt_ds.h \ mgmtd/mgmt_fe_server.h \ mgmtd/mgmt_fe_adapter.h \ |