diff options
author | Christian Hopps <chopps@labn.net> | 2023-03-08 23:11:43 +0100 |
---|---|---|
committer | Christian Hopps <chopps@gmail.com> | 2023-03-22 06:22:56 +0100 |
commit | f82370b47bddb214d53ffb94775805d637300e9b (patch) | |
tree | eaec8f2525caf8071f724d6076fcc2b68759f842 /mgmtd | |
parent | lib: new message library for mgmtd client and adapters (diff) | |
download | frr-f82370b47bddb214d53ffb94775805d637300e9b.tar.xz frr-f82370b47bddb214d53ffb94775805d637300e9b.zip |
mgmtd: lib: utilize msglib constructed from the removed code
Signed-off-by: Christian Hopps <chopps@labn.net>
Diffstat (limited to 'mgmtd')
-rw-r--r-- | mgmtd/mgmt_be_adapter.c | 313 | ||||
-rw-r--r-- | mgmtd/mgmt_be_adapter.h | 18 | ||||
-rw-r--r-- | mgmtd/mgmt_be_server.c | 2 | ||||
-rw-r--r-- | mgmtd/mgmt_fe_adapter.c | 328 | ||||
-rw-r--r-- | mgmtd/mgmt_fe_adapter.h | 15 | ||||
-rw-r--r-- | mgmtd/mgmt_fe_server.c | 2 | ||||
-rw-r--r-- | mgmtd/mgmt_txn.c | 52 |
7 files changed, 140 insertions, 590 deletions
diff --git a/mgmtd/mgmt_be_adapter.c b/mgmtd/mgmt_be_adapter.c index 8ad406429..c57fa081a 100644 --- a/mgmtd/mgmt_be_adapter.c +++ b/mgmtd/mgmt_be_adapter.c @@ -11,6 +11,7 @@ #include "sockopt.h" #include "network.h" #include "libfrr.h" +#include "mgmt_msg.h" #include "mgmt_pb.h" #include "mgmtd/mgmt.h" #include "mgmtd/mgmt_memory.h" @@ -497,8 +498,7 @@ mgmt_be_adapter_writes_on(struct mgmt_be_client_adapter *adapter) { MGMTD_BE_ADAPTER_DBG("Resume writing msgs for '%s'", adapter->name); UNSET_FLAG(adapter->flags, MGMTD_BE_ADAPTER_FLAGS_WRITES_OFF); - if (adapter->obuf_work || stream_fifo_count_safe(adapter->obuf_fifo)) - mgmt_be_adapter_sched_msg_write(adapter); + mgmt_be_adapter_sched_msg_write(adapter); } static inline void @@ -509,40 +509,20 @@ mgmt_be_adapter_writes_off(struct mgmt_be_client_adapter *adapter) } static int mgmt_be_adapter_send_msg(struct mgmt_be_client_adapter *adapter, - Mgmtd__BeMessage *be_msg) + Mgmtd__BeMessage *be_msg) { - size_t msg_size; - uint8_t *msg_buf = adapter->msg_buf; - struct mgmt_be_msg *msg; - - if (adapter->conn_fd < 0) - return -1; - - msg_size = mgmtd__be_message__get_packed_size(be_msg); - msg_size += MGMTD_BE_MSG_HDR_LEN; - if (msg_size > MGMTD_BE_MSG_MAX_LEN) { - MGMTD_BE_ADAPTER_ERR( - "Message size %d more than max size'%d. Not sending!'", - (int)msg_size, (int)MGMTD_BE_MSG_MAX_LEN); + if (adapter->conn_fd == -1) { + MGMTD_BE_ADAPTER_DBG("can't send message on closed connection"); return -1; } - msg = (struct mgmt_be_msg *)msg_buf; - msg->hdr.marker = MGMTD_BE_MSG_MARKER; - msg->hdr.len = (uint16_t)msg_size; - mgmtd__be_message__pack(be_msg, msg->payload); - - if (!adapter->obuf_work) - adapter->obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); - if (STREAM_WRITEABLE(adapter->obuf_work) < msg_size) { - stream_fifo_push(adapter->obuf_fifo, adapter->obuf_work); - adapter->obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); - } - stream_write(adapter->obuf_work, (void *)msg_buf, msg_size); - + int rv = mgmt_msg_send_msg( + &adapter->mstate, be_msg, + mgmtd__be_message__get_packed_size(be_msg), + (size_t(*)(void *, void *))mgmtd__be_message__pack, + mgmt_debug_be); mgmt_be_adapter_sched_msg_write(adapter); - adapter->num_msg_tx++; - return 0; + return rv; } static int mgmt_be_send_txn_req(struct mgmt_be_client_adapter *adapter, @@ -614,239 +594,67 @@ static int mgmt_be_send_cfgapply_req(struct mgmt_be_client_adapter *adapter, return mgmt_be_adapter_send_msg(adapter, &be_msg); } -static uint16_t -mgmt_be_adapter_process_msg(struct mgmt_be_client_adapter *adapter, - uint8_t *msg_buf, uint16_t bytes_read) +static void mgmt_be_adapter_process_msg(void *user_ctx, uint8_t *data, + size_t len) { + struct mgmt_be_client_adapter *adapter = user_ctx; Mgmtd__BeMessage *be_msg; - struct mgmt_be_msg *msg; - uint16_t bytes_left; - uint16_t processed = 0; - - bytes_left = bytes_read; - for (; bytes_left > MGMTD_BE_MSG_HDR_LEN; - bytes_left -= msg->hdr.len, msg_buf += msg->hdr.len) { - msg = (struct mgmt_be_msg *)msg_buf; - if (msg->hdr.marker != MGMTD_BE_MSG_MARKER) { - MGMTD_BE_ADAPTER_DBG( - "Marker not found in message from MGMTD Backend adapter '%s'", - adapter->name); - break; - } - - if (bytes_left < msg->hdr.len) { - MGMTD_BE_ADAPTER_DBG( - "Incomplete message of %d bytes (epxected: %u) from MGMTD Backend adapter '%s'", - bytes_left, msg->hdr.len, adapter->name); - break; - } - be_msg = mgmtd__be_message__unpack( - NULL, (size_t)(msg->hdr.len - MGMTD_BE_MSG_HDR_LEN), - msg->payload); - if (!be_msg) { - MGMTD_BE_ADAPTER_DBG( - "Failed to decode %d bytes from MGMTD Backend adapter '%s'", - msg->hdr.len, adapter->name); - continue; - } - - (void)mgmt_be_adapter_handle_msg(adapter, be_msg); - mgmtd__be_message__free_unpacked(be_msg, NULL); - processed++; - adapter->num_msg_rx++; + be_msg = mgmtd__be_message__unpack(NULL, len, data); + if (!be_msg) { + MGMTD_BE_ADAPTER_DBG( + "Failed to decode %zu bytes for adapter: %s", len, + adapter->name); + return; } - - return processed; + MGMTD_BE_ADAPTER_DBG("Decoded %zu bytes of message: %u for adapter: %s", + len, be_msg->message_case, adapter->name); + (void)mgmt_be_adapter_handle_msg(adapter, be_msg); + mgmtd__be_message__free_unpacked(be_msg, NULL); } static void mgmt_be_adapter_proc_msgbufs(struct thread *thread) { - struct mgmt_be_client_adapter *adapter; - struct stream *work; - int processed = 0; - - adapter = (struct mgmt_be_client_adapter *)THREAD_ARG(thread); - assert(adapter); - - if (adapter->conn_fd < 0) - return; - - for (; processed < MGMTD_BE_MAX_NUM_MSG_PROC;) { - work = stream_fifo_pop_safe(adapter->ibuf_fifo); - if (!work) - break; - - processed += mgmt_be_adapter_process_msg( - adapter, STREAM_DATA(work), stream_get_endp(work)); - - if (work != adapter->ibuf_work) { - /* Free it up */ - stream_free(work); - } else { - /* Reset stream buffer for next read */ - stream_reset(work); - } - } + struct mgmt_be_client_adapter *adapter = THREAD_ARG(thread); - /* - * If we have more to process, reschedule for processing it. - */ - if (stream_fifo_head(adapter->ibuf_fifo)) + if (mgmt_msg_procbufs(&adapter->mstate, mgmt_be_adapter_process_msg, + adapter, mgmt_debug_be)) mgmt_be_adapter_register_event(adapter, MGMTD_BE_PROC_MSG); } static void mgmt_be_adapter_read(struct thread *thread) { struct mgmt_be_client_adapter *adapter; - int bytes_read, msg_cnt; - size_t total_bytes, bytes_left; - struct mgmt_be_msg_hdr *msg_hdr; - bool incomplete = false; + enum mgmt_msg_rsched rv; adapter = (struct mgmt_be_client_adapter *)THREAD_ARG(thread); - assert(adapter && adapter->conn_fd >= 0); - total_bytes = 0; - bytes_left = STREAM_SIZE(adapter->ibuf_work) - - stream_get_endp(adapter->ibuf_work); - for (; bytes_left > MGMTD_BE_MSG_HDR_LEN;) { - bytes_read = stream_read_try(adapter->ibuf_work, - adapter->conn_fd, bytes_left); - MGMTD_BE_ADAPTER_DBG( - "Got %d bytes of message from MGMTD Backend adapter '%s'", - bytes_read, adapter->name); - if (bytes_read <= 0) { - if (bytes_read == -1 - && (errno == EAGAIN || errno == EWOULDBLOCK)) { - mgmt_be_adapter_register_event( - adapter, MGMTD_BE_CONN_READ); - return; - } - - if (!bytes_read) { - /* Looks like connection closed */ - MGMTD_BE_ADAPTER_ERR( - "Got error (%d) while reading from MGMTD Backend adapter '%s'. Err: '%s'", - bytes_read, adapter->name, - safe_strerror(errno)); - mgmt_be_adapter_disconnect(adapter); - return; - } - break; - } - - total_bytes += bytes_read; - bytes_left -= bytes_read; - } - - /* - * Check if we would have read incomplete messages or not. - */ - stream_set_getp(adapter->ibuf_work, 0); - total_bytes = 0; - msg_cnt = 0; - bytes_left = stream_get_endp(adapter->ibuf_work); - for (; bytes_left > MGMTD_BE_MSG_HDR_LEN;) { - msg_hdr = - (struct mgmt_be_msg_hdr *)(STREAM_DATA( - adapter->ibuf_work) - + total_bytes); - if (msg_hdr->marker != MGMTD_BE_MSG_MARKER) { - /* Corrupted buffer. Force disconnect?? */ - MGMTD_BE_ADAPTER_ERR( - "Received corrupted buffer from MGMTD Backend client."); - mgmt_be_adapter_disconnect(adapter); - return; - } - if (msg_hdr->len > bytes_left) - break; - - total_bytes += msg_hdr->len; - bytes_left -= msg_hdr->len; - msg_cnt++; - } - - if (bytes_left > 0) - incomplete = true; - - /* - * We would have read one or several messages. - * Schedule processing them now. - */ - msg_hdr = (struct mgmt_be_msg_hdr *)(STREAM_DATA(adapter->ibuf_work) - + total_bytes); - stream_set_endp(adapter->ibuf_work, total_bytes); - stream_fifo_push(adapter->ibuf_fifo, adapter->ibuf_work); - adapter->ibuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); - if (incomplete) { - stream_put(adapter->ibuf_work, msg_hdr, bytes_left); - stream_set_endp(adapter->ibuf_work, bytes_left); + rv = mgmt_msg_read(&adapter->mstate, adapter->conn_fd, mgmt_debug_be); + if (rv == MSR_DISCONNECT) { + mgmt_be_adapter_disconnect(adapter); + return; } - - if (msg_cnt) + if (rv == MSR_SCHED_BOTH) mgmt_be_adapter_register_event(adapter, MGMTD_BE_PROC_MSG); - mgmt_be_adapter_register_event(adapter, MGMTD_BE_CONN_READ); } static void mgmt_be_adapter_write(struct thread *thread) { - int bytes_written = 0; - int processed = 0; - int msg_size = 0; - struct stream *s = NULL; - struct stream *free = NULL; - struct mgmt_be_client_adapter *adapter; + struct mgmt_be_client_adapter *adapter = THREAD_ARG(thread); + enum mgmt_msg_wsched rv; - adapter = (struct mgmt_be_client_adapter *)THREAD_ARG(thread); - assert(adapter && adapter->conn_fd >= 0); - - /* Ensure pushing any pending write buffer to FIFO */ - if (adapter->obuf_work) { - stream_fifo_push(adapter->obuf_fifo, adapter->obuf_work); - adapter->obuf_work = NULL; - } - - for (s = stream_fifo_head(adapter->obuf_fifo); - s && processed < MGMTD_BE_MAX_NUM_MSG_WRITE; - s = stream_fifo_head(adapter->obuf_fifo)) { - /* msg_size = (int)stream_get_size(s); */ - msg_size = (int)STREAM_READABLE(s); - bytes_written = stream_flush(s, adapter->conn_fd); - if (bytes_written == -1 - && (errno == EAGAIN || errno == EWOULDBLOCK)) { - mgmt_be_adapter_register_event(adapter, - MGMTD_BE_CONN_WRITE); - return; - } else if (bytes_written != msg_size) { - MGMTD_BE_ADAPTER_ERR( - "Could not write all %d bytes (wrote: %d) to MGMTD Backend client socket. Err: '%s'", - msg_size, bytes_written, safe_strerror(errno)); - if (bytes_written > 0) { - stream_forward_getp(s, (size_t)bytes_written); - stream_pulldown(s); - mgmt_be_adapter_register_event( - adapter, MGMTD_BE_CONN_WRITE); - return; - } - mgmt_be_adapter_disconnect(adapter); - return; - } - - free = stream_fifo_pop(adapter->obuf_fifo); - stream_free(free); - MGMTD_BE_ADAPTER_DBG( - "Wrote %d bytes of message to MGMTD Backend client socket.'", - bytes_written); - processed++; - } - - if (s) { + rv = mgmt_msg_write(&adapter->mstate, adapter->conn_fd, mgmt_debug_be); + if (rv == MSW_SCHED_STREAM) + mgmt_be_adapter_register_event(adapter, MGMTD_BE_CONN_WRITE); + else if (rv == MSW_DISCONNECT) + mgmt_be_adapter_disconnect(adapter); + else if (rv == MSW_SCHED_WRITES_OFF) { mgmt_be_adapter_writes_off(adapter); mgmt_be_adapter_register_event(adapter, - MGMTD_BE_CONN_WRITES_ON); - } + MGMTD_BE_CONN_WRITES_ON); + } else + assert(rv == MSW_SCHED_NONE); } static void mgmt_be_adapter_resume_writes(struct thread *thread) @@ -936,6 +744,14 @@ mgmt_be_adapter_register_event(struct mgmt_be_client_adapter *adapter, assert(adapter->conn_read_ev); break; case MGMTD_BE_CONN_WRITE: + if (adapter->conn_write_ev) + MGMTD_BE_ADAPTER_DBG( + "write ready notify already set for client %s", + adapter->name); + else + MGMTD_BE_ADAPTER_DBG( + "scheduling write ready notify for client %s", + adapter->name); thread_add_write(mgmt_be_adapter_tm, mgmt_be_adapter_write, adapter, adapter->conn_fd, &adapter->conn_write_ev); assert(adapter->conn_write_ev); @@ -976,17 +792,12 @@ extern void mgmt_be_adapter_unlock(struct mgmt_be_client_adapter **adapter) (*adapter)->refcount--; if (!(*adapter)->refcount) { mgmt_be_adapters_del(&mgmt_be_adapters, *adapter); - - stream_fifo_free((*adapter)->ibuf_fifo); - stream_free((*adapter)->ibuf_work); - stream_fifo_free((*adapter)->obuf_fifo); - stream_free((*adapter)->obuf_work); - THREAD_OFF((*adapter)->conn_init_ev); THREAD_OFF((*adapter)->conn_read_ev); THREAD_OFF((*adapter)->conn_write_ev); THREAD_OFF((*adapter)->conn_writes_on); THREAD_OFF((*adapter)->proc_msg_ev); + mgmt_msg_destroy(&(*adapter)->mstate); XFREE(MTYPE_MGMTD_BE_ADPATER, *adapter); } @@ -1029,11 +840,9 @@ mgmt_be_create_adapter(int conn_fd, union sockunion *from) memcpy(&adapter->conn_su, from, sizeof(adapter->conn_su)); snprintf(adapter->name, sizeof(adapter->name), "Unknown-FD-%d", adapter->conn_fd); - adapter->ibuf_fifo = stream_fifo_new(); - adapter->ibuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); - adapter->obuf_fifo = stream_fifo_new(); - /* adapter->obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); */ - adapter->obuf_work = NULL; + mgmt_msg_init(&adapter->mstate, MGMTD_BE_MAX_NUM_MSG_PROC, + MGMTD_BE_MAX_NUM_MSG_WRITE, MGMTD_BE_MSG_MAX_LEN, + "BE-adapter"); mgmt_be_adapter_lock(adapter); mgmt_be_adapter_register_event(adapter, MGMTD_BE_CONN_READ); @@ -1195,8 +1004,14 @@ void mgmt_be_adapter_status_write(struct vty *vty) vty_out(vty, " Conn-FD: \t\t\t%d\n", adapter->conn_fd); vty_out(vty, " Client-Id: \t\t\t%d\n", adapter->id); vty_out(vty, " Ref-Count: \t\t\t%u\n", adapter->refcount); - vty_out(vty, " Msg-Sent: \t\t\t%u\n", adapter->num_msg_tx); - vty_out(vty, " Msg-Recvd: \t\t\t%u\n", adapter->num_msg_rx); + vty_out(vty, " Msg-Recvd: \t\t\t%" PRIu64 "\n", + adapter->mstate.nrxm); + vty_out(vty, " Bytes-Recvd: \t\t%" PRIu64 "\n", + adapter->mstate.nrxb); + vty_out(vty, " Msg-Sent: \t\t\t%" PRIu64 "\n", + adapter->mstate.ntxm); + vty_out(vty, " Bytes-Sent: \t\t%" PRIu64 "\n", + adapter->mstate.ntxb); } vty_out(vty, " Total: %d\n", (int)mgmt_be_adapters_count(&mgmt_be_adapters)); diff --git a/mgmtd/mgmt_be_adapter.h b/mgmtd/mgmt_be_adapter.h index 5dfc2386d..7f57233d3 100644 --- a/mgmtd/mgmt_be_adapter.h +++ b/mgmtd/mgmt_be_adapter.h @@ -9,8 +9,9 @@ #ifndef _FRR_MGMTD_BE_ADAPTER_H_ #define _FRR_MGMTD_BE_ADAPTER_H_ -#include "mgmtd/mgmt_defines.h" #include "mgmt_be_client.h" +#include "mgmt_msg.h" +#include "mgmtd/mgmt_defines.h" #include "mgmtd/mgmt_ds.h" #define MGMTD_BE_CONN_INIT_DELAY_MSEC 50 @@ -54,22 +55,9 @@ struct mgmt_be_client_adapter { char xpath_reg[MGMTD_MAX_NUM_XPATH_REG][MGMTD_MAX_XPATH_LEN]; /* IO streams for read and write */ - /* pthread_mutex_t ibuf_mtx; */ - struct stream_fifo *ibuf_fifo; - /* pthread_mutex_t obuf_mtx; */ - struct stream_fifo *obuf_fifo; - - /* Private I/O buffers */ - struct stream *ibuf_work; - struct stream *obuf_work; - uint8_t msg_buf[MGMTD_BE_MSG_MAX_LEN]; - - /* Buffer of data waiting to be written to client. */ - /* struct buffer *wb; */ + struct mgmt_msg_state mstate; int refcount; - uint32_t num_msg_tx; - uint32_t num_msg_rx; /* * List of config items that should be sent to the diff --git a/mgmtd/mgmt_be_server.c b/mgmtd/mgmt_be_server.c index 6464b12ae..6997fdcf8 100644 --- a/mgmtd/mgmt_be_server.c +++ b/mgmtd/mgmt_be_server.c @@ -28,7 +28,7 @@ zlog_err("%s: ERROR: " fmt, __func__, ##__VA_ARGS__) #endif /* REDIRECT_DEBUG_TO_STDERR */ -static int mgmt_be_listen_fd; +static int mgmt_be_listen_fd = -1; static struct thread_master *mgmt_be_listen_tm; static struct thread *mgmt_be_listen_ev; static void mgmt_be_server_register_event(enum mgmt_be_event event); diff --git a/mgmtd/mgmt_fe_adapter.c b/mgmtd/mgmt_fe_adapter.c index 1ea812c1a..cc812ab15 100644 --- a/mgmtd/mgmt_fe_adapter.c +++ b/mgmtd/mgmt_fe_adapter.c @@ -11,6 +11,7 @@ #include "network.h" #include "libfrr.h" #include "mgmt_fe_client.h" +#include "mgmt_msg.h" #include "mgmt_pb.h" #include "hash.h" #include "jhash.h" @@ -375,8 +376,7 @@ mgmt_fe_adapter_writes_on(struct mgmt_fe_client_adapter *adapter) { MGMTD_FE_ADAPTER_DBG("Resume writing msgs for '%s'", adapter->name); UNSET_FLAG(adapter->flags, MGMTD_FE_ADAPTER_FLAGS_WRITES_OFF); - if (adapter->obuf_work || stream_fifo_count_safe(adapter->obuf_fifo)) - mgmt_fe_adapter_sched_msg_write(adapter); + mgmt_fe_adapter_sched_msg_write(adapter); } static inline void @@ -390,40 +390,18 @@ static int mgmt_fe_adapter_send_msg(struct mgmt_fe_client_adapter *adapter, Mgmtd__FeMessage *fe_msg) { - size_t msg_size; - uint8_t msg_buf[MGMTD_FE_MSG_MAX_LEN]; - struct mgmt_fe_msg *msg; - - if (adapter->conn_fd < 0) { - MGMTD_FE_ADAPTER_ERR("Connection already reset"); - return -1; - } - - msg_size = mgmtd__fe_message__get_packed_size(fe_msg); - msg_size += MGMTD_FE_MSG_HDR_LEN; - if (msg_size > sizeof(msg_buf)) { - MGMTD_FE_ADAPTER_ERR( - "Message size %d more than max size'%d. Not sending!'", - (int)msg_size, (int)sizeof(msg_buf)); + if (adapter->conn_fd == -1) { + MGMTD_FE_ADAPTER_DBG("can't send message on closed connection"); return -1; } - msg = (struct mgmt_fe_msg *)msg_buf; - msg->hdr.marker = MGMTD_FE_MSG_MARKER; - msg->hdr.len = (uint16_t)msg_size; - mgmtd__fe_message__pack(fe_msg, msg->payload); - - if (!adapter->obuf_work) - adapter->obuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN); - if (STREAM_WRITEABLE(adapter->obuf_work) < msg_size) { - stream_fifo_push(adapter->obuf_fifo, adapter->obuf_work); - adapter->obuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN); - } - stream_write(adapter->obuf_work, (void *)msg_buf, msg_size); - + int rv = mgmt_msg_send_msg( + &adapter->mstate, fe_msg, + mgmtd__fe_message__get_packed_size(fe_msg), + (size_t(*)(void *, void *))mgmtd__fe_message__pack, + mgmt_debug_fe); mgmt_fe_adapter_sched_msg_write(adapter); - adapter->num_msg_tx++; - return 0; + return rv; } static int @@ -1244,6 +1222,9 @@ static int mgmt_fe_session_handle_commit_config_req_msg( "Failed to create a Configuration session!"); return 0; } + MGMTD_FE_ADAPTER_DBG( + "Created txn %llu for session %llu for COMMIT-CFG-REQ", + session->cfg_txn_id, session->session_id); } @@ -1430,252 +1411,66 @@ mgmt_fe_adapter_handle_msg(struct mgmt_fe_client_adapter *adapter, return 0; } -static uint16_t -mgmt_fe_adapter_process_msg(struct mgmt_fe_client_adapter *adapter, - uint8_t *msg_buf, uint16_t bytes_read) +static void mgmt_fe_adapter_process_msg(void *user_ctx, uint8_t *data, + size_t len) { + struct mgmt_fe_client_adapter *adapter = user_ctx; Mgmtd__FeMessage *fe_msg; - struct mgmt_fe_msg *msg; - uint16_t bytes_left; - uint16_t processed = 0; - - MGMTD_FE_ADAPTER_DBG( - "Have %u bytes of messages from client '%s' to process", - bytes_read, adapter->name); - - bytes_left = bytes_read; - for (; bytes_left > MGMTD_FE_MSG_HDR_LEN; - bytes_left -= msg->hdr.len, msg_buf += msg->hdr.len) { - msg = (struct mgmt_fe_msg *)msg_buf; - if (msg->hdr.marker != MGMTD_FE_MSG_MARKER) { - MGMTD_FE_ADAPTER_DBG( - "Marker not found in message from MGMTD Frontend adapter '%s'", - adapter->name); - break; - } - - if (bytes_left < msg->hdr.len) { - MGMTD_FE_ADAPTER_DBG( - "Incomplete message of %d bytes (epxected: %u) from MGMTD Frontend adapter '%s'", - bytes_left, msg->hdr.len, adapter->name); - break; - } - - fe_msg = mgmtd__fe_message__unpack( - NULL, (size_t)(msg->hdr.len - MGMTD_FE_MSG_HDR_LEN), - msg->payload); - if (!fe_msg) { - MGMTD_FE_ADAPTER_DBG( - "Failed to decode %d bytes from MGMTD Frontend adapter '%s'", - msg->hdr.len, adapter->name); - continue; - } + fe_msg = mgmtd__fe_message__unpack(NULL, len, data); + if (!fe_msg) { MGMTD_FE_ADAPTER_DBG( - "Decoded %d bytes of message(msg: %u/%u) from MGMTD Frontend adapter '%s'", - msg->hdr.len, fe_msg->message_case, - fe_msg->message_case, adapter->name); - - (void)mgmt_fe_adapter_handle_msg(adapter, fe_msg); - - mgmtd__fe_message__free_unpacked(fe_msg, NULL); - processed++; - adapter->num_msg_rx++; + "Failed to decode %zu bytes for adapter: %s", len, + adapter->name); + return; } - - return processed; + MGMTD_FE_ADAPTER_DBG( + "Decoded %zu bytes of message: %u from adapter: %s", len, + fe_msg->message_case, adapter->name); + (void)mgmt_fe_adapter_handle_msg(adapter, fe_msg); + mgmtd__fe_message__free_unpacked(fe_msg, NULL); } static void mgmt_fe_adapter_proc_msgbufs(struct thread *thread) { - struct mgmt_fe_client_adapter *adapter; - struct stream *work; - int processed = 0; + struct mgmt_fe_client_adapter *adapter = THREAD_ARG(thread); - adapter = (struct mgmt_fe_client_adapter *)THREAD_ARG(thread); - assert(adapter && adapter->conn_fd >= 0); - - MGMTD_FE_ADAPTER_DBG("Have %d ibufs for client '%s' to process", - (int)stream_fifo_count_safe(adapter->ibuf_fifo), - adapter->name); - - for (; processed < MGMTD_FE_MAX_NUM_MSG_PROC;) { - work = stream_fifo_pop_safe(adapter->ibuf_fifo); - if (!work) - break; - - processed += mgmt_fe_adapter_process_msg( - adapter, STREAM_DATA(work), stream_get_endp(work)); - - if (work != adapter->ibuf_work) { - /* Free it up */ - stream_free(work); - } else { - /* Reset stream buffer for next read */ - stream_reset(work); - } - } - - /* - * If we have more to process, reschedule for processing it. - */ - if (stream_fifo_head(adapter->ibuf_fifo)) + if (mgmt_msg_procbufs(&adapter->mstate, mgmt_fe_adapter_process_msg, + adapter, mgmt_debug_fe)) mgmt_fe_adapter_register_event(adapter, MGMTD_FE_PROC_MSG); } static void mgmt_fe_adapter_read(struct thread *thread) { - struct mgmt_fe_client_adapter *adapter; - int bytes_read, msg_cnt; - size_t total_bytes, bytes_left; - struct mgmt_fe_msg_hdr *msg_hdr; - bool incomplete = false; + struct mgmt_fe_client_adapter *adapter = THREAD_ARG(thread); + enum mgmt_msg_rsched rv; - adapter = (struct mgmt_fe_client_adapter *)THREAD_ARG(thread); - assert(adapter && adapter->conn_fd); - - total_bytes = 0; - bytes_left = STREAM_SIZE(adapter->ibuf_work) - - stream_get_endp(adapter->ibuf_work); - for (; bytes_left > MGMTD_FE_MSG_HDR_LEN;) { - bytes_read = stream_read_try(adapter->ibuf_work, adapter->conn_fd, - bytes_left); - MGMTD_FE_ADAPTER_DBG( - "Got %d bytes of message from MGMTD Frontend adapter '%s'", - bytes_read, adapter->name); - if (bytes_read <= 0) { - if (bytes_read == -1 - && (errno == EAGAIN || errno == EWOULDBLOCK)) { - mgmt_fe_adapter_register_event( - adapter, MGMTD_FE_CONN_READ); - return; - } - - if (!bytes_read) { - /* Looks like connection closed */ - MGMTD_FE_ADAPTER_ERR( - "Got error (%d) while reading from MGMTD Frontend adapter '%s'. Err: '%s'", - bytes_read, adapter->name, - safe_strerror(errno)); - mgmt_fe_adapter_disconnect(adapter); - return; - } - break; - } - - total_bytes += bytes_read; - bytes_left -= bytes_read; - } - - /* - * Check if we would have read incomplete messages or not. - */ - stream_set_getp(adapter->ibuf_work, 0); - total_bytes = 0; - msg_cnt = 0; - bytes_left = stream_get_endp(adapter->ibuf_work); - for (; bytes_left > MGMTD_FE_MSG_HDR_LEN;) { - msg_hdr = - (struct mgmt_fe_msg_hdr *)(STREAM_DATA( - adapter->ibuf_work) - + total_bytes); - if (msg_hdr->marker != MGMTD_FE_MSG_MARKER) { - /* Corrupted buffer. Force disconnect?? */ - MGMTD_FE_ADAPTER_ERR( - "Received corrupted buffer from MGMTD frontend client."); - mgmt_fe_adapter_disconnect(adapter); - return; - } - if (msg_hdr->len > bytes_left) - break; - - MGMTD_FE_ADAPTER_DBG("Got message (len: %u) from client '%s'", - msg_hdr->len, adapter->name); - - total_bytes += msg_hdr->len; - bytes_left -= msg_hdr->len; - msg_cnt++; - } - - if (bytes_left > 0) - incomplete = true; - /* - * We would have read one or several messages. - * Schedule processing them now. - */ - msg_hdr = (struct mgmt_fe_msg_hdr *)(STREAM_DATA(adapter->ibuf_work) - + total_bytes); - stream_set_endp(adapter->ibuf_work, total_bytes); - stream_fifo_push(adapter->ibuf_fifo, adapter->ibuf_work); - adapter->ibuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN); - if (incomplete) { - stream_put(adapter->ibuf_work, msg_hdr, bytes_left); - stream_set_endp(adapter->ibuf_work, bytes_left); + rv = mgmt_msg_read(&adapter->mstate, adapter->conn_fd, mgmt_debug_fe); + if (rv == MSR_DISCONNECT) { + mgmt_fe_adapter_disconnect(adapter); + return; } - - if (msg_cnt) + if (rv == MSR_SCHED_BOTH) mgmt_fe_adapter_register_event(adapter, MGMTD_FE_PROC_MSG); - mgmt_fe_adapter_register_event(adapter, MGMTD_FE_CONN_READ); } static void mgmt_fe_adapter_write(struct thread *thread) { - int bytes_written = 0; - int processed = 0; - int msg_size = 0; - struct stream *s = NULL; - struct stream *free = NULL; - struct mgmt_fe_client_adapter *adapter; - - adapter = (struct mgmt_fe_client_adapter *)THREAD_ARG(thread); - assert(adapter && adapter->conn_fd); - - /* Ensure pushing any pending write buffer to FIFO */ - if (adapter->obuf_work) { - stream_fifo_push(adapter->obuf_fifo, adapter->obuf_work); - adapter->obuf_work = NULL; - } - - for (s = stream_fifo_head(adapter->obuf_fifo); - s && processed < MGMTD_FE_MAX_NUM_MSG_WRITE; - s = stream_fifo_head(adapter->obuf_fifo)) { - /* msg_size = (int)stream_get_size(s); */ - msg_size = (int)STREAM_READABLE(s); - bytes_written = stream_flush(s, adapter->conn_fd); - if (bytes_written == -1 - && (errno == EAGAIN || errno == EWOULDBLOCK)) { - mgmt_fe_adapter_register_event( - adapter, MGMTD_FE_CONN_WRITE); - return; - } else if (bytes_written != msg_size) { - MGMTD_FE_ADAPTER_ERR( - "Could not write all %d bytes (wrote: %d) to MGMTD Frontend client socket. Err: '%s'", - msg_size, bytes_written, safe_strerror(errno)); - if (bytes_written > 0) { - stream_forward_getp(s, (size_t)bytes_written); - stream_pulldown(s); - mgmt_fe_adapter_register_event( - adapter, MGMTD_FE_CONN_WRITE); - return; - } - mgmt_fe_adapter_disconnect(adapter); - return; - } - - free = stream_fifo_pop(adapter->obuf_fifo); - stream_free(free); - MGMTD_FE_ADAPTER_DBG( - "Wrote %d bytes of message to MGMTD Frontend client socket.'", - bytes_written); - processed++; - } + struct mgmt_fe_client_adapter *adapter = THREAD_ARG(thread); + enum mgmt_msg_wsched rv; - if (s) { + rv = mgmt_msg_write(&adapter->mstate, adapter->conn_fd, mgmt_debug_fe); + if (rv == MSW_SCHED_STREAM) + mgmt_fe_adapter_register_event(adapter, MGMTD_FE_CONN_WRITE); + else if (rv == MSW_DISCONNECT) + mgmt_fe_adapter_disconnect(adapter); + else if (rv == MSW_SCHED_WRITES_OFF) { mgmt_fe_adapter_writes_off(adapter); mgmt_fe_adapter_register_event(adapter, - MGMTD_FE_CONN_WRITES_ON); - } + MGMTD_FE_CONN_WRITES_ON); + } else + assert(rv == MSW_SCHED_NONE); } static void mgmt_fe_adapter_resume_writes(struct thread *thread) @@ -1683,7 +1478,7 @@ static void mgmt_fe_adapter_resume_writes(struct thread *thread) struct mgmt_fe_client_adapter *adapter; adapter = (struct mgmt_fe_client_adapter *)THREAD_ARG(thread); - assert(adapter && adapter->conn_fd); + assert(adapter && adapter->conn_fd != -1); mgmt_fe_adapter_writes_on(adapter); } @@ -1739,16 +1534,11 @@ mgmt_fe_adapter_unlock(struct mgmt_fe_client_adapter **adapter) (*adapter)->refcount--; if (!(*adapter)->refcount) { mgmt_fe_adapters_del(&mgmt_fe_adapters, *adapter); - - stream_fifo_free((*adapter)->ibuf_fifo); - stream_free((*adapter)->ibuf_work); - stream_fifo_free((*adapter)->obuf_fifo); - stream_free((*adapter)->obuf_work); - THREAD_OFF((*adapter)->conn_read_ev); THREAD_OFF((*adapter)->conn_write_ev); THREAD_OFF((*adapter)->proc_msg_ev); THREAD_OFF((*adapter)->conn_writes_on); + mgmt_msg_destroy(&(*adapter)->mstate); XFREE(MTYPE_MGMTD_FE_ADPATER, *adapter); } @@ -1793,11 +1583,10 @@ mgmt_fe_create_adapter(int conn_fd, union sockunion *from) snprintf(adapter->name, sizeof(adapter->name), "Unknown-FD-%d", adapter->conn_fd); mgmt_fe_sessions_init(&adapter->fe_sessions); - adapter->ibuf_fifo = stream_fifo_new(); - adapter->ibuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN); - adapter->obuf_fifo = stream_fifo_new(); - /* adapter->obuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN); */ - adapter->obuf_work = NULL; + + mgmt_msg_init(&adapter->mstate, MGMTD_FE_MAX_NUM_MSG_PROC, + MGMTD_FE_MAX_NUM_MSG_WRITE, MGMTD_FE_MSG_MAX_LEN, + "FE-adapter"); mgmt_fe_adapter_lock(adapter); mgmt_fe_adapter_register_event(adapter, MGMTD_FE_CONN_READ); @@ -2083,9 +1872,14 @@ void mgmt_fe_adapter_status_write(struct vty *vty, bool detail) } vty_out(vty, " Total-Sessions: \t\t\t%d\n", (int)mgmt_fe_sessions_count(&adapter->fe_sessions)); - vty_out(vty, " Msg-Sent: \t\t\t\t%u\n", adapter->num_msg_tx); - vty_out(vty, " Msg-Recvd: \t\t\t\t%u\n", - adapter->num_msg_rx); + vty_out(vty, " Msg-Recvd: \t\t\t\t%" PRIu64 "\n", + adapter->mstate.nrxm); + vty_out(vty, " Bytes-Recvd: \t\t\t%" PRIu64 "\n", + adapter->mstate.nrxb); + vty_out(vty, " Msg-Sent: \t\t\t\t%" PRIu64 "\n", + adapter->mstate.ntxm); + vty_out(vty, " Bytes-Sent: \t\t\t%" PRIu64 "\n", + adapter->mstate.ntxb); } vty_out(vty, " Total: %d\n", (int)mgmt_fe_adapters_count(&mgmt_fe_adapters)); diff --git a/mgmtd/mgmt_fe_adapter.h b/mgmtd/mgmt_fe_adapter.h index 05d37d3f3..3389234a3 100644 --- a/mgmtd/mgmt_fe_adapter.h +++ b/mgmtd/mgmt_fe_adapter.h @@ -9,6 +9,10 @@ #ifndef _FRR_MGMTD_FE_ADAPTER_H_ #define _FRR_MGMTD_FE_ADAPTER_H_ +#include "mgmt_fe_client.h" +#include "mgmt_msg.h" +#include "mgmtd/mgmt_defines.h" + struct mgmt_fe_client_adapter; struct mgmt_master; @@ -64,18 +68,9 @@ struct mgmt_fe_client_adapter { struct mgmt_fe_sessions_head fe_sessions; /* IO streams for read and write */ - /* pthread_mutex_t ibuf_mtx; */ - struct stream_fifo *ibuf_fifo; - /* pthread_mutex_t obuf_mtx; */ - struct stream_fifo *obuf_fifo; - - /* Private I/O buffers */ - struct stream *ibuf_work; - struct stream *obuf_work; + struct mgmt_msg_state mstate; int refcount; - uint32_t num_msg_tx; - uint32_t num_msg_rx; struct mgmt_commit_stats cmt_stats; struct mgmt_setcfg_stats setcfg_stats; diff --git a/mgmtd/mgmt_fe_server.c b/mgmtd/mgmt_fe_server.c index 2db4397cc..0b0a56ea6 100644 --- a/mgmtd/mgmt_fe_server.c +++ b/mgmtd/mgmt_fe_server.c @@ -28,7 +28,7 @@ zlog_err("%s: ERROR: " fmt, __func__, ##__VA_ARGS__) #endif /* REDIRECT_DEBUG_TO_STDERR */ -static int mgmt_fe_listen_fd; +static int mgmt_fe_listen_fd = -1; static struct thread_master *mgmt_fe_listen_tm; static struct thread *mgmt_fe_listen_ev; static void mgmt_fe_server_register_event(enum mgmt_fe_event event); diff --git a/mgmtd/mgmt_txn.c b/mgmtd/mgmt_txn.c index 115aa532c..05b593798 100644 --- a/mgmtd/mgmt_txn.c +++ b/mgmtd/mgmt_txn.c @@ -2471,6 +2471,8 @@ int mgmt_txn_notify_be_adapter_conn(struct mgmt_be_client_adapter *adapter, return -1; } + MGMTD_TXN_DBG("Created initial txn %llu for BE connection %s", + txn->txn_id, adapter->name); /* * Set the changeset for transaction to commit and trigger the * commit request. @@ -2608,53 +2610,7 @@ int mgmt_txn_notify_be_cfgdata_reply( return 0; } -int mgmt_txn_notify_be_cfg_validate_reply( - uint64_t txn_id, bool success, uint64_t batch_ids[], - size_t num_batch_ids, char *error_if_any, - struct mgmt_be_client_adapter *adapter) -{ - struct mgmt_txn_ctx *txn; - struct mgmt_txn_be_cfg_batch *cfg_btch; - struct mgmt_commit_cfg_req *cmtcfg_req = NULL; - size_t indx; - - txn = mgmt_txn_id2ctx(txn_id); - if (!txn || txn->type != MGMTD_TXN_TYPE_CONFIG) - return -1; - - assert(txn->commit_cfg_req); - cmtcfg_req = &txn->commit_cfg_req->req.commit_cfg; - - if (!success) { - MGMTD_TXN_ERR( - "CFGDATA_VALIDATE_REQ sent to '%s' failed for Txn %p, Batches [0x%llx - 0x%llx], Err: %s", - adapter->name, txn, (unsigned long long)batch_ids[0], - (unsigned long long)batch_ids[num_batch_ids - 1], - error_if_any ? error_if_any : "None"); - mgmt_txn_send_commit_cfg_reply( - txn, MGMTD_INTERNAL_ERROR, - "Internal error! Failed to validate config data on backend!"); - return 0; - } - - for (indx = 0; indx < num_batch_ids; indx++) { - cfg_btch = mgmt_txn_cfgbatch_id2ctx(txn, batch_ids[indx]); - if (cfg_btch->txn != txn) - return -1; - mgmt_move_txn_cfg_batch_to_next( - cmtcfg_req, cfg_btch, - &cmtcfg_req->curr_batches[adapter->id], - &cmtcfg_req->next_batches[adapter->id], true, - MGMTD_COMMIT_PHASE_APPLY_CFG); - } - - mgmt_try_move_commit_to_next_phase(txn, cmtcfg_req); - - return 0; -} - -extern int -mgmt_txn_notify_be_cfg_apply_reply(uint64_t txn_id, bool success, +int mgmt_txn_notify_be_cfg_apply_reply(uint64_t txn_id, bool success, uint64_t batch_ids[], size_t num_batch_ids, char *error_if_any, struct mgmt_be_client_adapter *adapter) @@ -2852,6 +2808,8 @@ int mgmt_txn_rollback_trigger_cfg_apply(struct mgmt_ds_ctx *src_ds_ctx, return -1; } + MGMTD_TXN_DBG("Created rollback txn %llu", txn->txn_id); + /* * Set the changeset for transaction to commit and trigger the commit * request. |