diff options
author | Christian Hopps <chopps@labn.net> | 2023-04-29 12:22:37 +0200 |
---|---|---|
committer | Christian Hopps <chopps@labn.net> | 2023-05-28 11:13:22 +0200 |
commit | 070c5e7a91c413e08c1b8f78cc885f082a636b01 (patch) | |
tree | 5c55d456cf6459f89f723c12fc8fd5d645bec0f3 /lib | |
parent | lib: mgmt msg: add version to messages (diff) | |
download | frr-070c5e7a91c413e08c1b8f78cc885f082a636b01.tar.xz frr-070c5e7a91c413e08c1b8f78cc885f082a636b01.zip |
lib: msg: refactor common connection code from mgmtd
Signed-off-by: Christian Hopps <chopps@labn.net>
Diffstat (limited to 'lib')
-rw-r--r-- | lib/mgmt_be_client.c | 244 | ||||
-rw-r--r-- | lib/mgmt_be_client.h | 2 | ||||
-rw-r--r-- | lib/mgmt_fe_client.c | 223 | ||||
-rw-r--r-- | lib/mgmt_fe_client.h | 2 | ||||
-rw-r--r-- | lib/mgmt_msg.c | 257 | ||||
-rw-r--r-- | lib/mgmt_msg.h | 92 | ||||
-rw-r--r-- | lib/vty.c | 2 |
7 files changed, 432 insertions, 390 deletions
diff --git a/lib/mgmt_be_client.c b/lib/mgmt_be_client.c index ba8997c5a..9a50c06c1 100644 --- a/lib/mgmt_be_client.c +++ b/lib/mgmt_be_client.c @@ -7,6 +7,7 @@ #include <zebra.h> #include "debug.h" +#include "compiler.h" #include "libfrr.h" #include "mgmtd/mgmt.h" #include "mgmt_be_client.h" @@ -99,14 +100,7 @@ DECLARE_LIST(mgmt_be_txns, struct mgmt_be_txn_ctx, list_linkage); frr_each_safe (mgmt_be_batches, &(txn)->apply_cfgs, (batch)) struct mgmt_be_client_ctx { - int conn_fd; - struct event_loop *tm; - struct event *conn_retry_tmr; - struct event *conn_read_ev; - struct event *conn_write_ev; - struct event *msg_proc_ev; - - struct mgmt_msg_state mstate; + struct msg_client client; struct nb_config *candidate_config; struct nb_config *running_config; @@ -128,8 +122,7 @@ struct mgmt_be_client_ctx { struct debug mgmt_dbg_be_client = {0, "Management backend client operations"}; static struct mgmt_be_client_ctx mgmt_be_client_ctx = { - .conn_fd = -1, -}; + .client = {.conn = {.fd = -1}}}; const char *mgmt_be_client_names[MGMTD_BE_CLIENT_ID_MAX + 1] = { #ifdef HAVE_STATICD @@ -138,35 +131,13 @@ const char *mgmt_be_client_names[MGMTD_BE_CLIENT_ID_MAX + 1] = { [MGMTD_BE_CLIENT_ID_MAX] = "Unknown/Invalid", }; -/* Forward declarations */ -static void -mgmt_be_client_register_event(struct mgmt_be_client_ctx *client_ctx, - enum mgmt_be_event event); -static void -mgmt_be_client_schedule_conn_retry(struct mgmt_be_client_ctx *client_ctx, - unsigned long intvl_secs); static int mgmt_be_client_send_msg(struct mgmt_be_client_ctx *client_ctx, - Mgmtd__BeMessage *be_msg); - -static void -mgmt_be_server_disconnect(struct mgmt_be_client_ctx *client_ctx, - bool reconnect) + Mgmtd__BeMessage *be_msg) { - /* Notify client through registered callback (if any) */ - if (client_ctx->client_params.client_connect_notify) - (void)(*client_ctx->client_params.client_connect_notify)( - (uintptr_t)client_ctx, - client_ctx->client_params.user_data, false); - - if (client_ctx->conn_fd != -1) { - close(client_ctx->conn_fd); - client_ctx->conn_fd = -1; - } - - if (reconnect) - mgmt_be_client_schedule_conn_retry( - client_ctx, - client_ctx->client_params.conn_retry_intvl_sec); + return msg_conn_send_msg( + &client_ctx->client.conn, MGMT_MSG_VERSION_PROTOBUF, be_msg, + mgmtd__be_message__get_packed_size(be_msg), + (size_t(*)(void *, void *))mgmtd__be_message__pack); } static struct mgmt_be_batch_ctx * @@ -853,12 +824,16 @@ mgmt_be_client_handle_msg(struct mgmt_be_client_ctx *client_ctx, return 0; } -static void mgmt_be_client_process_msg(uint8_t version, void *user_ctx, - uint8_t *data, size_t len) +static void mgmt_be_client_process_msg(uint8_t version, uint8_t *data, + size_t len, struct msg_conn *conn) { - struct mgmt_be_client_ctx *client_ctx = user_ctx; + struct mgmt_be_client_ctx *client_ctx; + struct msg_client *client; Mgmtd__BeMessage *be_msg; + client = container_of(conn, struct msg_client, conn); + client_ctx = container_of(client, struct mgmt_be_client_ctx, client); + be_msg = mgmtd__be_message__unpack(NULL, len, data); if (!be_msg) { MGMTD_BE_CLIENT_DBG("Failed to decode %zu bytes from server", @@ -872,69 +847,6 @@ static void mgmt_be_client_process_msg(uint8_t version, void *user_ctx, mgmtd__be_message__free_unpacked(be_msg, NULL); } -static void mgmt_be_client_proc_msgbufs(struct event *thread) -{ - struct mgmt_be_client_ctx *client_ctx = EVENT_ARG(thread); - - if (mgmt_msg_procbufs(&client_ctx->mstate, mgmt_be_client_process_msg, - client_ctx, MGMTD_DBG_BE_CLIENT_CHECK())) - mgmt_be_client_register_event(client_ctx, MGMTD_BE_PROC_MSG); -} - -static void mgmt_be_client_read(struct event *thread) -{ - struct mgmt_be_client_ctx *client_ctx = EVENT_ARG(thread); - enum mgmt_msg_rsched rv; - - rv = mgmt_msg_read(&client_ctx->mstate, client_ctx->conn_fd, - MGMTD_DBG_BE_CLIENT_CHECK()); - if (rv == MSR_DISCONNECT) { - mgmt_be_server_disconnect(client_ctx, true); - return; - } - if (rv == MSR_SCHED_BOTH) - mgmt_be_client_register_event(client_ctx, MGMTD_BE_PROC_MSG); - mgmt_be_client_register_event(client_ctx, MGMTD_BE_CONN_READ); -} - -static inline void -mgmt_be_client_sched_msg_write(struct mgmt_be_client_ctx *client_ctx) -{ - mgmt_be_client_register_event(client_ctx, MGMTD_BE_CONN_WRITE); -} - -static int mgmt_be_client_send_msg(struct mgmt_be_client_ctx *client_ctx, - Mgmtd__BeMessage *be_msg) -{ - if (client_ctx->conn_fd == -1) { - MGMTD_BE_CLIENT_DBG("can't send message on closed connection"); - return -1; - } - - int rv = mgmt_msg_send_msg( - &client_ctx->mstate, MGMT_MSG_VERSION_PROTOBUF, be_msg, - mgmtd__be_message__get_packed_size(be_msg), - (size_t(*)(void *, void *))mgmtd__be_message__pack, - MGMTD_DBG_BE_CLIENT_CHECK()); - mgmt_be_client_sched_msg_write(client_ctx); - return rv; -} - -static void mgmt_be_client_write(struct event *thread) -{ - struct mgmt_be_client_ctx *client_ctx = EVENT_ARG(thread); - enum mgmt_msg_wsched rv; - - rv = mgmt_msg_write(&client_ctx->mstate, client_ctx->conn_fd, - MGMTD_DBG_BE_CLIENT_CHECK()); - if (rv == MSW_SCHED_STREAM) - mgmt_be_client_register_event(client_ctx, MGMTD_BE_CONN_WRITE); - else if (rv == MSW_DISCONNECT) - mgmt_be_server_disconnect(client_ctx, true); - else - assert(rv == MSW_SCHED_NONE); -} - static int mgmt_be_send_subscr_req(struct mgmt_be_client_ctx *client_ctx, bool subscr_xpaths, uint16_t num_reg_xpaths, char **reg_xpaths) @@ -958,80 +870,37 @@ static int mgmt_be_send_subscr_req(struct mgmt_be_client_ctx *client_ctx, return mgmt_be_client_send_msg(client_ctx, &be_msg); } -static void mgmt_be_server_connect(struct mgmt_be_client_ctx *client_ctx) +static int _notify_conenct_disconnect(struct msg_client *client, bool connected) { - const char *dbgtag = MGMTD_DBG_BE_CLIENT_CHECK() ? "BE-client" : NULL; - - assert(client_ctx->conn_fd == -1); - client_ctx->conn_fd = mgmt_msg_connect( - MGMTD_BE_SERVER_PATH, MGMTD_SOCKET_BE_SEND_BUF_SIZE, - MGMTD_SOCKET_BE_RECV_BUF_SIZE, dbgtag); - - /* Send SUBSCRIBE_REQ message */ - if (client_ctx->conn_fd == -1 || - mgmt_be_send_subscr_req(client_ctx, false, 0, NULL) != 0) { - mgmt_be_server_disconnect(client_ctx, true); - return; + struct mgmt_be_client_ctx *client_ctx = + container_of(client, struct mgmt_be_client_ctx, client); + int ret; + + if (connected) { + assert(client->conn.fd != -1); + ret = mgmt_be_send_subscr_req(client_ctx, false, 0, NULL); + if (ret) + return ret; } - /* Start reading from the socket */ - mgmt_be_client_register_event(client_ctx, MGMTD_BE_CONN_READ); - - /* Notify client through registered callback (if any) */ + /* Notify BE client through registered callback (if any) */ if (client_ctx->client_params.client_connect_notify) (void)(*client_ctx->client_params.client_connect_notify)( (uintptr_t)client_ctx, - client_ctx->client_params.user_data, true); + client_ctx->client_params.user_data, connected); + return 0; } -static void mgmt_be_client_conn_timeout(struct event *thread) +static int mgmt_be_client_notify_conenct(struct msg_client *client) { - mgmt_be_server_connect(EVENT_ARG(thread)); + return _notify_conenct_disconnect(client, true); } -static void -mgmt_be_client_register_event(struct mgmt_be_client_ctx *client_ctx, - enum mgmt_be_event event) +static int mgmt_be_client_notify_disconenct(struct msg_conn *conn) { - struct timeval tv = {0}; + struct msg_client *client = container_of(conn, struct msg_client, conn); - switch (event) { - case MGMTD_BE_CONN_READ: - event_add_read(client_ctx->tm, mgmt_be_client_read, - client_ctx, client_ctx->conn_fd, - &client_ctx->conn_read_ev); - break; - case MGMTD_BE_CONN_WRITE: - event_add_write(client_ctx->tm, mgmt_be_client_write, - client_ctx, client_ctx->conn_fd, - &client_ctx->conn_write_ev); - break; - case MGMTD_BE_PROC_MSG: - tv.tv_usec = MGMTD_BE_MSG_PROC_DELAY_USEC; - event_add_timer_tv(client_ctx->tm, mgmt_be_client_proc_msgbufs, - client_ctx, &tv, &client_ctx->msg_proc_ev); - break; - case MGMTD_BE_SERVER: - case MGMTD_BE_CONN_INIT: - case MGMTD_BE_SCHED_CFG_PREPARE: - case MGMTD_BE_RESCHED_CFG_PREPARE: - case MGMTD_BE_SCHED_CFG_APPLY: - case MGMTD_BE_RESCHED_CFG_APPLY: - assert(!"mgmt_be_client_post_event() called incorrectly"); - break; - } -} - -static void -mgmt_be_client_schedule_conn_retry(struct mgmt_be_client_ctx *client_ctx, - unsigned long intvl_secs) -{ - MGMTD_BE_CLIENT_DBG( - "Scheduling MGMTD Backend server connection retry after %lu seconds", - intvl_secs); - event_add_timer(client_ctx->tm, mgmt_be_client_conn_timeout, - (void *)client_ctx, intvl_secs, - &client_ctx->conn_retry_tmr); + return _notify_conenct_disconnect(client, false); } DEFPY(debug_mgmt_client_be, debug_mgmt_client_be_cmd, @@ -1082,29 +951,22 @@ static struct cmd_node mgmt_dbg_node = { uintptr_t mgmt_be_client_lib_init(struct mgmt_be_client_params *params, struct event_loop *master_thread) { - assert(master_thread && params && strlen(params->name) - && !mgmt_be_client_ctx.tm); + /* Don't call twice */ + assert(!mgmt_be_client_ctx.client.conn.loop); - mgmt_be_client_ctx.tm = master_thread; + /* Only call after frr_init() */ + assert(running_config); - if (!running_config) - assert(!"MGMTD Be Client lib_init() after frr_init() only!"); mgmt_be_client_ctx.running_config = running_config; mgmt_be_client_ctx.candidate_config = nb_config_new(NULL); - - memcpy(&mgmt_be_client_ctx.client_params, params, - sizeof(mgmt_be_client_ctx.client_params)); - if (!mgmt_be_client_ctx.client_params.conn_retry_intvl_sec) - mgmt_be_client_ctx.client_params.conn_retry_intvl_sec = - MGMTD_BE_DEFAULT_CONN_RETRY_INTVL_SEC; - + mgmt_be_client_ctx.client_params = *params; mgmt_be_txns_init(&mgmt_be_client_ctx.txn_head); - mgmt_msg_init(&mgmt_be_client_ctx.mstate, MGMTD_BE_MAX_NUM_MSG_PROC, - MGMTD_BE_MAX_NUM_MSG_WRITE, MGMTD_BE_MSG_MAX_LEN, - "BE-client"); - - /* Start trying to connect to MGMTD backend server immediately */ - mgmt_be_client_schedule_conn_retry(&mgmt_be_client_ctx, 1); + msg_client_init(&mgmt_be_client_ctx.client, master_thread, + MGMTD_BE_SERVER_PATH, mgmt_be_client_notify_conenct, + mgmt_be_client_notify_disconenct, + mgmt_be_client_process_msg, MGMTD_BE_MAX_NUM_MSG_PROC, + MGMTD_BE_MAX_NUM_MSG_WRITE, MGMTD_BE_MSG_MAX_LEN, + "BE-client", MGMTD_DBG_BE_CLIENT_CHECK()); MGMTD_BE_CLIENT_DBG("Initialized client '%s'", params->name); @@ -1183,24 +1045,16 @@ enum mgmt_result mgmt_be_send_yang_notify(uintptr_t lib_hndl, /* * Destroy library and cleanup everything. */ -void mgmt_be_client_lib_destroy(uintptr_t lib_hndl) +void mgmt_be_client_lib_destroy(void) { - struct mgmt_be_client_ctx *client_ctx; - - client_ctx = (struct mgmt_be_client_ctx *)lib_hndl; - assert(client_ctx); + struct mgmt_be_client_ctx *client_ctx = &mgmt_be_client_ctx; MGMTD_BE_CLIENT_DBG("Destroying MGMTD Backend Client '%s'", client_ctx->client_params.name); - mgmt_be_server_disconnect(client_ctx, false); - - mgmt_msg_destroy(&client_ctx->mstate); - - EVENT_OFF(client_ctx->conn_retry_tmr); - EVENT_OFF(client_ctx->conn_read_ev); - EVENT_OFF(client_ctx->conn_write_ev); - EVENT_OFF(client_ctx->msg_proc_ev); + msg_client_cleanup(&client_ctx->client); mgmt_be_cleanup_all_txns(client_ctx); mgmt_be_txns_fini(&client_ctx->txn_head); + + memset(client_ctx, 0, sizeof(*client_ctx)); } diff --git a/lib/mgmt_be_client.h b/lib/mgmt_be_client.h index d4f2d86fd..bbe938b5b 100644 --- a/lib/mgmt_be_client.h +++ b/lib/mgmt_be_client.h @@ -265,7 +265,7 @@ enum mgmt_result mgmt_be_unsubscribe_yang_data(uintptr_t lib_hndl, /* * Destroy library and cleanup everything. */ -extern void mgmt_be_client_lib_destroy(uintptr_t lib_hndl); +extern void mgmt_be_client_lib_destroy(void); #ifdef __cplusplus } diff --git a/lib/mgmt_fe_client.c b/lib/mgmt_fe_client.c index 0458bc1c7..ef19181d6 100644 --- a/lib/mgmt_fe_client.c +++ b/lib/mgmt_fe_client.c @@ -6,6 +6,7 @@ */ #include <zebra.h> +#include "compiler.h" #include "debug.h" #include "memory.h" #include "libfrr.h" @@ -43,17 +44,8 @@ DECLARE_LIST(mgmt_sessions, struct mgmt_fe_client_session, list_linkage); DEFINE_MTYPE_STATIC(LIB, MGMTD_FE_SESSION, "MGMTD Frontend session"); struct mgmt_fe_client_ctx { - int conn_fd; - struct event_loop *tm; - struct event *conn_retry_tmr; - struct event *conn_read_ev; - struct event *conn_write_ev; - struct event *msg_proc_ev; - - struct mgmt_msg_state mstate; - + struct msg_client client; struct mgmt_fe_client_params client_params; - struct mgmt_sessions_head client_sessions; }; @@ -63,15 +55,7 @@ struct mgmt_fe_client_ctx { struct debug mgmt_dbg_fe_client = {0, "Management frontend client operations"}; static struct mgmt_fe_client_ctx mgmt_fe_client_ctx = { - .conn_fd = -1, -}; - -/* Forward declarations */ -static void -mgmt_fe_client_register_event(struct mgmt_fe_client_ctx *client_ctx, - enum mgmt_fe_event event); -static void mgmt_fe_client_schedule_conn_retry( - struct mgmt_fe_client_ctx *client_ctx, unsigned long intvl_secs); + .client = {.conn = {.fd = -1}}}; static struct mgmt_fe_client_session * mgmt_fe_find_session_by_client_id(struct mgmt_fe_client_ctx *client_ctx, @@ -109,59 +93,13 @@ mgmt_fe_find_session_by_session_id(struct mgmt_fe_client_ctx *client_ctx, return NULL; } -static void -mgmt_fe_server_disconnect(struct mgmt_fe_client_ctx *client_ctx, - bool reconnect) -{ - if (client_ctx->conn_fd != -1) { - close(client_ctx->conn_fd); - client_ctx->conn_fd = -1; - } - - if (reconnect) - mgmt_fe_client_schedule_conn_retry( - client_ctx, - client_ctx->client_params.conn_retry_intvl_sec); -} - -static inline void -mgmt_fe_client_sched_msg_write(struct mgmt_fe_client_ctx *client_ctx) -{ - mgmt_fe_client_register_event(client_ctx, MGMTD_FE_CONN_WRITE); -} - static int mgmt_fe_client_send_msg(struct mgmt_fe_client_ctx *client_ctx, Mgmtd__FeMessage *fe_msg) { - /* users current expect this to fail here */ - if (client_ctx->conn_fd == -1) { - MGMTD_FE_CLIENT_DBG("can't send message on closed connection"); - return -1; - } - - int rv = mgmt_msg_send_msg( - &client_ctx->mstate, MGMT_MSG_VERSION_PROTOBUF, fe_msg, + return msg_conn_send_msg( + &client_ctx->client.conn, MGMT_MSG_VERSION_PROTOBUF, fe_msg, mgmtd__fe_message__get_packed_size(fe_msg), - (size_t(*)(void *, void *))mgmtd__fe_message__pack, - MGMTD_DBG_FE_CLIENT_CHECK()); - mgmt_fe_client_sched_msg_write(client_ctx); - return rv; -} - -static void mgmt_fe_client_write(struct event *thread) -{ - struct mgmt_fe_client_ctx *client_ctx; - enum mgmt_msg_wsched rv; - - client_ctx = (struct mgmt_fe_client_ctx *)EVENT_ARG(thread); - rv = mgmt_msg_write(&client_ctx->mstate, client_ctx->conn_fd, - MGMTD_DBG_FE_CLIENT_CHECK()); - if (rv == MSW_SCHED_STREAM) - mgmt_fe_client_register_event(client_ctx, MGMTD_FE_CONN_WRITE); - else if (rv == MSW_DISCONNECT) - mgmt_fe_server_disconnect(client_ctx, true); - else - assert(rv == MSW_SCHED_NONE); + (size_t(*)(void *, void *))mgmtd__fe_message__pack); } static int @@ -614,12 +552,16 @@ mgmt_fe_client_handle_msg(struct mgmt_fe_client_ctx *client_ctx, return 0; } -static void mgmt_fe_client_process_msg(uint8_t version, void *user_ctx, - uint8_t *data, size_t len) +static void mgmt_fe_client_process_msg(uint8_t version, uint8_t *data, + size_t len, struct msg_conn *conn) { - struct mgmt_fe_client_ctx *client_ctx = user_ctx; + struct mgmt_fe_client_ctx *client_ctx; + struct msg_client *client; Mgmtd__FeMessage *fe_msg; + client = container_of(conn, struct msg_client, conn); + client_ctx = container_of(client, struct mgmt_fe_client_ctx, client); + fe_msg = mgmtd__fe_message__unpack(NULL, len, data); if (!fe_msg) { MGMTD_FE_CLIENT_DBG("Failed to decode %zu bytes from server.", @@ -633,105 +575,38 @@ static void mgmt_fe_client_process_msg(uint8_t version, void *user_ctx, mgmtd__fe_message__free_unpacked(fe_msg, NULL); } -static void mgmt_fe_client_proc_msgbufs(struct event *thread) +static int _notify_connect_disconnect(struct msg_client *client, bool connected) { - struct mgmt_fe_client_ctx *client_ctx; - - client_ctx = (struct mgmt_fe_client_ctx *)EVENT_ARG(thread); - if (mgmt_msg_procbufs(&client_ctx->mstate, mgmt_fe_client_process_msg, - client_ctx, MGMTD_DBG_FE_CLIENT_CHECK())) - mgmt_fe_client_register_event(client_ctx, MGMTD_FE_PROC_MSG); -} - -static void mgmt_fe_client_read(struct event *thread) -{ - struct mgmt_fe_client_ctx *client_ctx; - enum mgmt_msg_rsched rv; - - client_ctx = (struct mgmt_fe_client_ctx *)EVENT_ARG(thread); - - rv = mgmt_msg_read(&client_ctx->mstate, client_ctx->conn_fd, - MGMTD_DBG_FE_CLIENT_CHECK()); - if (rv == MSR_DISCONNECT) { - mgmt_fe_server_disconnect(client_ctx, true); - return; - } - if (rv == MSR_SCHED_BOTH) - mgmt_fe_client_register_event(client_ctx, MGMTD_FE_PROC_MSG); - mgmt_fe_client_register_event(client_ctx, MGMTD_FE_CONN_READ); -} - -static void mgmt_fe_server_connect(struct mgmt_fe_client_ctx *client_ctx) -{ - const char *dbgtag = MGMTD_DBG_FE_CLIENT_CHECK() ? "FE-client" : NULL; - - assert(client_ctx->conn_fd == -1); - client_ctx->conn_fd = mgmt_msg_connect( - MGMTD_FE_SERVER_PATH, MGMTD_SOCKET_FE_SEND_BUF_SIZE, - MGMTD_SOCKET_FE_RECV_BUF_SIZE, dbgtag); + struct mgmt_fe_client_ctx *client_ctx = + container_of(client, struct mgmt_fe_client_ctx, client); + int ret; /* Send REGISTER_REQ message */ - if (client_ctx->conn_fd == -1 || - mgmt_fe_send_register_req(client_ctx) != 0) { - mgmt_fe_server_disconnect(client_ctx, true); - return; + if (connected) { + if ((ret = mgmt_fe_send_register_req(client_ctx)) != 0) + return ret; } - /* Start reading from the socket */ - mgmt_fe_client_register_event(client_ctx, MGMTD_FE_CONN_READ); - - /* Notify client through registered callback (if any) */ + /* Notify FE client through registered callback (if any). */ if (client_ctx->client_params.client_connect_notify) (void)(*client_ctx->client_params.client_connect_notify)( (uintptr_t)client_ctx, - client_ctx->client_params.user_data, true); + client_ctx->client_params.user_data, connected); + return 0; } - -static void mgmt_fe_client_conn_timeout(struct event *thread) +static int mgmt_fe_client_notify_connect(struct msg_client *client) { - mgmt_fe_server_connect(EVENT_ARG(thread)); + return _notify_connect_disconnect(client, true); } -static void -mgmt_fe_client_register_event(struct mgmt_fe_client_ctx *client_ctx, - enum mgmt_fe_event event) +static int mgmt_fe_client_notify_disconnect(struct msg_conn *conn) { - struct timeval tv = {0}; + struct msg_client *client = container_of(conn, struct msg_client, conn); - switch (event) { - case MGMTD_FE_CONN_READ: - event_add_read(client_ctx->tm, mgmt_fe_client_read, - client_ctx, client_ctx->conn_fd, - &client_ctx->conn_read_ev); - break; - case MGMTD_FE_CONN_WRITE: - event_add_write(client_ctx->tm, mgmt_fe_client_write, - client_ctx, client_ctx->conn_fd, - &client_ctx->conn_write_ev); - break; - case MGMTD_FE_PROC_MSG: - tv.tv_usec = MGMTD_FE_MSG_PROC_DELAY_USEC; - event_add_timer_tv(client_ctx->tm, - mgmt_fe_client_proc_msgbufs, client_ctx, - &tv, &client_ctx->msg_proc_ev); - break; - case MGMTD_FE_SERVER: - assert(!"mgmt_fe_client_ctx_post_event called incorrectly"); - break; - } + return _notify_connect_disconnect(client, false); } -static void mgmt_fe_client_schedule_conn_retry( - struct mgmt_fe_client_ctx *client_ctx, unsigned long intvl_secs) -{ - MGMTD_FE_CLIENT_DBG( - "Scheduling MGMTD Frontend server connection retry after %lu seconds", - intvl_secs); - event_add_timer(client_ctx->tm, mgmt_fe_client_conn_timeout, - (void *)client_ctx, intvl_secs, - &client_ctx->conn_retry_tmr); -} DEFPY(debug_mgmt_client_fe, debug_mgmt_client_fe_cmd, "[no] debug mgmt client frontend", @@ -781,24 +656,19 @@ static struct cmd_node mgmt_dbg_node = { uintptr_t mgmt_fe_client_lib_init(struct mgmt_fe_client_params *params, struct event_loop *master_thread) { - assert(master_thread && params && strlen(params->name) - && !mgmt_fe_client_ctx.tm); + /* Don't call twice */ + assert(!mgmt_fe_client_ctx.client.conn.loop); - mgmt_fe_client_ctx.tm = master_thread; - memcpy(&mgmt_fe_client_ctx.client_params, params, - sizeof(mgmt_fe_client_ctx.client_params)); - if (!mgmt_fe_client_ctx.client_params.conn_retry_intvl_sec) - mgmt_fe_client_ctx.client_params.conn_retry_intvl_sec = - MGMTD_FE_DEFAULT_CONN_RETRY_INTVL_SEC; - - mgmt_msg_init(&mgmt_fe_client_ctx.mstate, MGMTD_FE_MAX_NUM_MSG_PROC, - MGMTD_FE_MAX_NUM_MSG_WRITE, MGMTD_FE_MSG_MAX_LEN, - "FE-client"); + mgmt_fe_client_ctx.client_params = *params; mgmt_sessions_init(&mgmt_fe_client_ctx.client_sessions); - /* Start trying to connect to MGMTD frontend server immediately */ - mgmt_fe_client_schedule_conn_retry(&mgmt_fe_client_ctx, 1); + msg_client_init(&mgmt_fe_client_ctx.client, master_thread, + MGMTD_FE_SERVER_PATH, mgmt_fe_client_notify_connect, + mgmt_fe_client_notify_disconnect, + mgmt_fe_client_process_msg, MGMTD_FE_MAX_NUM_MSG_PROC, + MGMTD_FE_MAX_NUM_MSG_WRITE, MGMTD_FE_MSG_MAX_LEN, + "FE-client", MGMTD_DBG_FE_CLIENT_CHECK()); MGMTD_FE_CLIENT_DBG("Initialized client '%s'", params->name); @@ -1056,23 +926,14 @@ mgmt_fe_register_yang_notify(uintptr_t lib_hndl, uintptr_t session_id, /* * Destroy library and cleanup everything. */ -void mgmt_fe_client_lib_destroy(uintptr_t lib_hndl) +void mgmt_fe_client_lib_destroy(void) { - struct mgmt_fe_client_ctx *client_ctx; - - client_ctx = (struct mgmt_fe_client_ctx *)lib_hndl; - assert(client_ctx); + struct mgmt_fe_client_ctx *client_ctx = &mgmt_fe_client_ctx; MGMTD_FE_CLIENT_DBG("Destroying MGMTD Frontend Client '%s'", client_ctx->client_params.name); - mgmt_fe_server_disconnect(client_ctx, false); - - mgmt_fe_destroy_client_sessions(lib_hndl); - - EVENT_OFF(client_ctx->conn_retry_tmr); - EVENT_OFF(client_ctx->conn_read_ev); - EVENT_OFF(client_ctx->conn_write_ev); - EVENT_OFF(client_ctx->msg_proc_ev); - mgmt_msg_destroy(&client_ctx->mstate); + mgmt_fe_destroy_client_sessions((uintptr_t)client_ctx); + msg_client_cleanup(&client_ctx->client); + memset(client_ctx, 0, sizeof(*client_ctx)); } diff --git a/lib/mgmt_fe_client.h b/lib/mgmt_fe_client.h index 94867787d..2b382fd9d 100644 --- a/lib/mgmt_fe_client.h +++ b/lib/mgmt_fe_client.h @@ -351,7 +351,7 @@ mgmt_fe_register_yang_notify(uintptr_t lib_hndl, uintptr_t session_id, /* * Destroy library and cleanup everything. */ -extern void mgmt_fe_client_lib_destroy(uintptr_t lib_hndl); +extern void mgmt_fe_client_lib_destroy(void); #ifdef __cplusplus } diff --git a/lib/mgmt_msg.c b/lib/mgmt_msg.c index e682face9..967606d20 100644 --- a/lib/mgmt_msg.c +++ b/lib/mgmt_msg.c @@ -22,7 +22,7 @@ } while (0) #define MGMT_MSG_ERR(ms, fmt, ...) \ - zlog_err("%s: %s: " fmt, ms->idtag, __func__, ##__VA_ARGS__) + zlog_err("%s: %s: " fmt, (ms)->idtag, __func__, ##__VA_ARGS__) /** * Read data from a socket into streams containing 1 or more full msgs headed by @@ -127,8 +127,8 @@ enum mgmt_msg_rsched mgmt_msg_read(struct mgmt_msg_state *ms, int fd, * true if more to process (so reschedule) else false */ bool mgmt_msg_procbufs(struct mgmt_msg_state *ms, - void (*handle_msg)(uint8_t version, void *user, - uint8_t *msg, size_t msglen), + void (*handle_msg)(uint8_t version, uint8_t *msg, + size_t msglen, void *user), void *user, bool debug) { const char *dbgtag = debug ? ms->idtag : NULL; @@ -156,9 +156,10 @@ bool mgmt_msg_procbufs(struct mgmt_msg_state *ms, assert(MGMT_MSG_IS_MARKER(mhdr->marker)); assert(left >= mhdr->len); - handle_msg(MGMT_MSG_MARKER_VERSION(mhdr->marker), user, + handle_msg(MGMT_MSG_MARKER_VERSION(mhdr->marker), (uint8_t *)(mhdr + 1), - mhdr->len - sizeof(struct mgmt_msg_hdr)); + mhdr->len - sizeof(struct mgmt_msg_hdr), + user); ms->nrxm++; nproc++; } @@ -402,6 +403,7 @@ size_t mgmt_msg_reset_writes(struct mgmt_msg_state *ms) return nproc; } + void mgmt_msg_init(struct mgmt_msg_state *ms, size_t max_read_buf, size_t max_write_buf, size_t max_msg_sz, const char *idtag) { @@ -422,3 +424,248 @@ void mgmt_msg_destroy(struct mgmt_msg_state *ms) stream_free(ms->ins); free(ms->idtag); } + +/* + * Connections + */ + +#define MSG_CONN_DEFAULT_CONN_RETRY_MSEC 250 +#define MSG_CONN_SEND_BUF_SIZE (1u << 16) +#define MSG_CONN_RECV_BUF_SIZE (1u << 16) + +static void msg_client_sched_connect(struct msg_client *client, + unsigned long msec); + +static void msg_conn_sched_proc_msgs(struct msg_conn *conn); +static void msg_conn_sched_read(struct msg_conn *conn); +static void msg_conn_sched_write(struct msg_conn *conn); + +static void msg_conn_write(struct event *thread) +{ + struct msg_conn *conn = EVENT_ARG(thread); + enum mgmt_msg_wsched rv; + + rv = mgmt_msg_write(&conn->mstate, conn->fd, conn->debug); + if (rv == MSW_SCHED_STREAM) + msg_conn_sched_write(conn); + else if (rv == MSW_DISCONNECT) + msg_conn_disconnect(conn, conn->is_client); + else + assert(rv == MSW_SCHED_NONE); +} + +static void msg_conn_read(struct event *thread) +{ + struct msg_conn *conn = EVENT_ARG(thread); + enum mgmt_msg_rsched rv; + + rv = mgmt_msg_read(&conn->mstate, conn->fd, conn->debug); + if (rv == MSR_DISCONNECT) { + msg_conn_disconnect(conn, conn->is_client); + return; + } + if (rv == MSR_SCHED_BOTH) + msg_conn_sched_proc_msgs(conn); + msg_conn_sched_read(conn); +} + +/* collapse this into mgmt_msg_procbufs */ +static void msg_conn_proc_msgs(struct event *thread) +{ + struct msg_conn *conn = EVENT_ARG(thread); + + if (mgmt_msg_procbufs(&conn->mstate, + (void (*)(uint8_t, uint8_t *, size_t, + void *))conn->handle_msg, + conn, conn->debug)) + /* there's more, schedule handling more */ + msg_conn_sched_proc_msgs(conn); +} + +static void msg_conn_sched_read(struct msg_conn *conn) +{ + event_add_read(conn->loop, msg_conn_read, conn, conn->fd, + &conn->read_ev); +} + +static void msg_conn_sched_write(struct msg_conn *conn) +{ + event_add_write(conn->loop, msg_conn_write, conn, conn->fd, + &conn->write_ev); +} + +static void msg_conn_sched_proc_msgs(struct msg_conn *conn) +{ + event_add_event(conn->loop, msg_conn_proc_msgs, conn, 0, + &conn->proc_msg_ev); +} + + +void msg_conn_disconnect(struct msg_conn *conn, bool reconnect) +{ + + if (conn->fd != -1) { + close(conn->fd); + conn->fd = -1; + + /* Notify client through registered callback (if any) */ + if (conn->notify_disconnect) + (void)(*conn->notify_disconnect)(conn); + } + + if (reconnect) { + assert(conn->is_client); + msg_client_sched_connect( + container_of(conn, struct msg_client, conn), + MSG_CONN_DEFAULT_CONN_RETRY_MSEC); + } +} + +int msg_conn_send_msg(struct msg_conn *conn, uint8_t version, void *msg, + size_t mlen, size_t (*packf)(void *, void *)) +{ + if (conn->fd == -1) { + MGMT_MSG_ERR(&conn->mstate, + "can't send message on closed connection"); + return -1; + } + + int rv = mgmt_msg_send_msg(&conn->mstate, version, msg, mlen, packf, + conn->debug); + + msg_conn_sched_write(conn); + + return rv; +} + +void msg_conn_cleanup(struct msg_conn *conn) +{ + struct mgmt_msg_state *ms = &conn->mstate; + + if (conn->fd != -1) { + close(conn->fd); + conn->fd = -1; + } + + EVENT_OFF(conn->read_ev); + EVENT_OFF(conn->write_ev); + EVENT_OFF(conn->proc_msg_ev); + + mgmt_msg_destroy(ms); +} + +/* + * Client Connections + */ + +static void msg_client_connect(struct msg_client *conn); + +static void msg_client_connect_timer(struct event *thread) +{ + msg_client_connect(EVENT_ARG(thread)); +} + +static void msg_client_sched_connect(struct msg_client *client, + unsigned long msec) +{ + struct msg_conn *conn = &client->conn; + const char *dbgtag = conn->debug ? conn->mstate.idtag : NULL; + + MGMT_MSG_DBG(dbgtag, "connection retry in %lu msec", msec); + if (msec) + event_add_timer_msec(conn->loop, msg_client_connect_timer, + client, msec, &client->conn_retry_tmr); + else + event_add_event(conn->loop, msg_client_connect_timer, client, 0, + &client->conn_retry_tmr); +} + + +/* Connect and start reading from the socket */ +static void msg_client_connect(struct msg_client *client) +{ + struct msg_conn *conn = &client->conn; + const char *dbgtag = conn->debug ? conn->mstate.idtag : NULL; + + conn->fd = mgmt_msg_connect(client->sopath, MSG_CONN_SEND_BUF_SIZE, + MSG_CONN_RECV_BUF_SIZE, dbgtag); + + if (conn->fd == -1) + /* retry the connection */ + msg_client_sched_connect(client, + MSG_CONN_DEFAULT_CONN_RETRY_MSEC); + else if (client->notify_connect && client->notify_connect(client)) + /* client connect notify failed */ + msg_conn_disconnect(conn, true); + else + /* start reading */ + msg_conn_sched_read(conn); +} + +void msg_client_init(struct msg_client *client, struct event_loop *tm, + const char *sopath, + int (*notify_connect)(struct msg_client *client), + int (*notify_disconnect)(struct msg_conn *client), + void (*handle_msg)(uint8_t version, uint8_t *data, + size_t len, struct msg_conn *client), + size_t max_read_buf, size_t max_write_buf, + size_t max_msg_sz, const char *idtag, bool debug) +{ + struct msg_conn *conn = &client->conn; + memset(client, 0, sizeof(*client)); + + conn->loop = tm; + conn->fd = -1; + conn->handle_msg = handle_msg; + conn->notify_disconnect = notify_disconnect; + conn->is_client = true; + conn->debug = debug; + client->sopath = strdup(sopath); + client->notify_connect = notify_connect; + + mgmt_msg_init(&conn->mstate, max_read_buf, max_write_buf, max_msg_sz, + idtag); + + /* XXX maybe just have client kick this off */ + /* Start trying to connect to server */ + msg_client_sched_connect(client, 0); +} + +void msg_client_cleanup(struct msg_client *client) +{ + assert(client->conn.is_client); + + EVENT_OFF(client->conn_retry_tmr); + free(client->sopath); + + msg_conn_cleanup(&client->conn); +} + +/* + * Initialize and start reading from the accepted socket + * + * notify_connect - only called for disconnect i.e., connected == false + */ +void mgmt_msg_server_accept_init( + struct msg_conn *conn, struct event_loop *tm, int fd, + int (*notify_disconnect)(struct msg_conn *conn), + void (*handle_msg)(uint8_t version, uint8_t *data, size_t len, + struct msg_conn *conn), + size_t max_read, size_t max_write, size_t max_size, const char *idtag) +{ + conn->loop = tm; + conn->fd = fd; + conn->notify_disconnect = notify_disconnect; + conn->handle_msg = handle_msg; + conn->is_client = false; + + mgmt_msg_init(&conn->mstate, max_read, max_write, max_size, idtag); + + /* start reading */ + msg_conn_sched_read(conn); + + /* Make socket non-blocking. */ + set_nonblocking(conn->fd); + setsockopt_so_sendbuf(conn->fd, MSG_CONN_SEND_BUF_SIZE); + setsockopt_so_recvbuf(conn->fd, MSG_CONN_RECV_BUF_SIZE); +} diff --git a/lib/mgmt_msg.h b/lib/mgmt_msg.h index 51046e27e..71ff03caf 100644 --- a/lib/mgmt_msg.h +++ b/lib/mgmt_msg.h @@ -21,6 +21,7 @@ #define MGMT_MSG_VERSION_PROTOBUF 0 #define MGMT_MSG_VERSION_NATIVE 1 + struct mgmt_msg_state { struct stream *ins; struct stream *outs; @@ -53,15 +54,14 @@ enum mgmt_msg_wsched { MSW_DISCONNECT, /* disconnect and start reconnecting */ }; +struct msg_conn; + + extern int mgmt_msg_connect(const char *path, size_t sendbuf, size_t recvbuf, const char *dbgtag); -extern void mgmt_msg_destroy(struct mgmt_msg_state *ms); -extern void mgmt_msg_init(struct mgmt_msg_state *ms, size_t max_read_buf, - size_t max_write_buf, size_t max_msg_sz, - const char *idtag); extern bool mgmt_msg_procbufs(struct mgmt_msg_state *ms, - void (*handle_msg)(uint8_t version, void *user, - uint8_t *msg, size_t msglen), + void (*handle_msg)(uint8_t version, uint8_t *msg, + size_t msglen, void *user), void *user, bool debug); extern enum mgmt_msg_rsched mgmt_msg_read(struct mgmt_msg_state *ms, int fd, bool debug); @@ -72,4 +72,84 @@ extern int mgmt_msg_send_msg(struct mgmt_msg_state *ms, uint8_t version, extern enum mgmt_msg_wsched mgmt_msg_write(struct mgmt_msg_state *ms, int fd, bool debug); +extern void mgmt_msg_destroy(struct mgmt_msg_state *state); + +extern void mgmt_msg_init(struct mgmt_msg_state *ms, size_t max_read_buf, + size_t max_write_buf, size_t max_msg_sz, + const char *idtag); + +/* + * Connections + */ + +struct msg_conn { + int fd; + struct mgmt_msg_state mstate; + struct event_loop *loop; + struct event *read_ev; + struct event *write_ev; + struct event *proc_msg_ev; + int (*notify_disconnect)(struct msg_conn *conn); + void (*handle_msg)(uint8_t version, uint8_t *data, size_t len, + struct msg_conn *conn); + bool is_client; + bool debug; +}; + + +/* + * `notify_disconnect` is not called when `msg_conn_cleanup` is called for a + * msg_conn which is currently connected. The socket is closed but there is no + * notification. + */ +extern void msg_conn_cleanup(struct msg_conn *conn); +extern void msg_conn_disconnect(struct msg_conn *conn, bool reconnect); +extern int msg_conn_send_msg(struct msg_conn *client, uint8_t version, + void *msg, size_t mlen, + size_t (*packf)(void *, void *)); + +/* + * Client-side Connections + */ + +struct msg_client { + struct msg_conn conn; + struct event *conn_retry_tmr; + char *sopath; + int (*notify_connect)(struct msg_client *client); +}; + +/* + * `notify_disconnect` is not called when `msg_client_cleanup` is called for a + * msg_client which is currently connected. The socket is closed but there is no + * notification. + */ +extern void msg_client_cleanup(struct msg_client *client); + +/* + * `notify_disconnect` is not called when the user `msg_client_cleanup` is + * called for a client which is currently connected. The socket is closed + * but there is no notification. + */ +extern void msg_client_init(struct msg_client *client, struct event_loop *tm, + const char *sopath, + int (*notify_connect)(struct msg_client *client), + int (*notify_disconnect)(struct msg_conn *client), + void (*handle_msg)(uint8_t version, uint8_t *data, + size_t len, + struct msg_conn *client), + size_t max_read_buf, size_t max_write_buf, + size_t max_msg_sz, const char *idtag, bool debug); + +/* + * Server-side Connections + */ + +extern void mgmt_msg_server_accept_init( + struct msg_conn *client, struct event_loop *tm, int fd, + int (*notify_disconnect)(struct msg_conn *conn), + void (*handle_msg)(uint8_t version, uint8_t *data, size_t len, + struct msg_conn *conn), + size_t max_read, size_t max_write, size_t max_size, const char *idtag); + #endif /* _MGMT_MSG_H */ @@ -3703,7 +3703,7 @@ void vty_terminate(void) struct vty_serv *vtyserv; if (mgmt_lib_hndl) { - mgmt_fe_client_lib_destroy(mgmt_lib_hndl); + mgmt_fe_client_lib_destroy(); mgmt_lib_hndl = 0; } |