summaryrefslogtreecommitdiffstats
path: root/mgmtd
diff options
context:
space:
mode:
authorChristian Hopps <chopps@labn.net>2023-03-08 23:11:43 +0100
committerChristian Hopps <chopps@gmail.com>2023-03-22 06:22:56 +0100
commitf82370b47bddb214d53ffb94775805d637300e9b (patch)
treeeaec8f2525caf8071f724d6076fcc2b68759f842 /mgmtd
parentlib: new message library for mgmtd client and adapters (diff)
downloadfrr-f82370b47bddb214d53ffb94775805d637300e9b.tar.xz
frr-f82370b47bddb214d53ffb94775805d637300e9b.zip
mgmtd: lib: utilize msglib constructed from the removed code
Signed-off-by: Christian Hopps <chopps@labn.net>
Diffstat (limited to 'mgmtd')
-rw-r--r--mgmtd/mgmt_be_adapter.c313
-rw-r--r--mgmtd/mgmt_be_adapter.h18
-rw-r--r--mgmtd/mgmt_be_server.c2
-rw-r--r--mgmtd/mgmt_fe_adapter.c328
-rw-r--r--mgmtd/mgmt_fe_adapter.h15
-rw-r--r--mgmtd/mgmt_fe_server.c2
-rw-r--r--mgmtd/mgmt_txn.c52
7 files changed, 140 insertions, 590 deletions
diff --git a/mgmtd/mgmt_be_adapter.c b/mgmtd/mgmt_be_adapter.c
index 8ad406429..c57fa081a 100644
--- a/mgmtd/mgmt_be_adapter.c
+++ b/mgmtd/mgmt_be_adapter.c
@@ -11,6 +11,7 @@
#include "sockopt.h"
#include "network.h"
#include "libfrr.h"
+#include "mgmt_msg.h"
#include "mgmt_pb.h"
#include "mgmtd/mgmt.h"
#include "mgmtd/mgmt_memory.h"
@@ -497,8 +498,7 @@ mgmt_be_adapter_writes_on(struct mgmt_be_client_adapter *adapter)
{
MGMTD_BE_ADAPTER_DBG("Resume writing msgs for '%s'", adapter->name);
UNSET_FLAG(adapter->flags, MGMTD_BE_ADAPTER_FLAGS_WRITES_OFF);
- if (adapter->obuf_work || stream_fifo_count_safe(adapter->obuf_fifo))
- mgmt_be_adapter_sched_msg_write(adapter);
+ mgmt_be_adapter_sched_msg_write(adapter);
}
static inline void
@@ -509,40 +509,20 @@ mgmt_be_adapter_writes_off(struct mgmt_be_client_adapter *adapter)
}
static int mgmt_be_adapter_send_msg(struct mgmt_be_client_adapter *adapter,
- Mgmtd__BeMessage *be_msg)
+ Mgmtd__BeMessage *be_msg)
{
- size_t msg_size;
- uint8_t *msg_buf = adapter->msg_buf;
- struct mgmt_be_msg *msg;
-
- if (adapter->conn_fd < 0)
- return -1;
-
- msg_size = mgmtd__be_message__get_packed_size(be_msg);
- msg_size += MGMTD_BE_MSG_HDR_LEN;
- if (msg_size > MGMTD_BE_MSG_MAX_LEN) {
- MGMTD_BE_ADAPTER_ERR(
- "Message size %d more than max size'%d. Not sending!'",
- (int)msg_size, (int)MGMTD_BE_MSG_MAX_LEN);
+ if (adapter->conn_fd == -1) {
+ MGMTD_BE_ADAPTER_DBG("can't send message on closed connection");
return -1;
}
- msg = (struct mgmt_be_msg *)msg_buf;
- msg->hdr.marker = MGMTD_BE_MSG_MARKER;
- msg->hdr.len = (uint16_t)msg_size;
- mgmtd__be_message__pack(be_msg, msg->payload);
-
- if (!adapter->obuf_work)
- adapter->obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN);
- if (STREAM_WRITEABLE(adapter->obuf_work) < msg_size) {
- stream_fifo_push(adapter->obuf_fifo, adapter->obuf_work);
- adapter->obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN);
- }
- stream_write(adapter->obuf_work, (void *)msg_buf, msg_size);
-
+ int rv = mgmt_msg_send_msg(
+ &adapter->mstate, be_msg,
+ mgmtd__be_message__get_packed_size(be_msg),
+ (size_t(*)(void *, void *))mgmtd__be_message__pack,
+ mgmt_debug_be);
mgmt_be_adapter_sched_msg_write(adapter);
- adapter->num_msg_tx++;
- return 0;
+ return rv;
}
static int mgmt_be_send_txn_req(struct mgmt_be_client_adapter *adapter,
@@ -614,239 +594,67 @@ static int mgmt_be_send_cfgapply_req(struct mgmt_be_client_adapter *adapter,
return mgmt_be_adapter_send_msg(adapter, &be_msg);
}
-static uint16_t
-mgmt_be_adapter_process_msg(struct mgmt_be_client_adapter *adapter,
- uint8_t *msg_buf, uint16_t bytes_read)
+static void mgmt_be_adapter_process_msg(void *user_ctx, uint8_t *data,
+ size_t len)
{
+ struct mgmt_be_client_adapter *adapter = user_ctx;
Mgmtd__BeMessage *be_msg;
- struct mgmt_be_msg *msg;
- uint16_t bytes_left;
- uint16_t processed = 0;
-
- bytes_left = bytes_read;
- for (; bytes_left > MGMTD_BE_MSG_HDR_LEN;
- bytes_left -= msg->hdr.len, msg_buf += msg->hdr.len) {
- msg = (struct mgmt_be_msg *)msg_buf;
- if (msg->hdr.marker != MGMTD_BE_MSG_MARKER) {
- MGMTD_BE_ADAPTER_DBG(
- "Marker not found in message from MGMTD Backend adapter '%s'",
- adapter->name);
- break;
- }
-
- if (bytes_left < msg->hdr.len) {
- MGMTD_BE_ADAPTER_DBG(
- "Incomplete message of %d bytes (epxected: %u) from MGMTD Backend adapter '%s'",
- bytes_left, msg->hdr.len, adapter->name);
- break;
- }
- be_msg = mgmtd__be_message__unpack(
- NULL, (size_t)(msg->hdr.len - MGMTD_BE_MSG_HDR_LEN),
- msg->payload);
- if (!be_msg) {
- MGMTD_BE_ADAPTER_DBG(
- "Failed to decode %d bytes from MGMTD Backend adapter '%s'",
- msg->hdr.len, adapter->name);
- continue;
- }
-
- (void)mgmt_be_adapter_handle_msg(adapter, be_msg);
- mgmtd__be_message__free_unpacked(be_msg, NULL);
- processed++;
- adapter->num_msg_rx++;
+ be_msg = mgmtd__be_message__unpack(NULL, len, data);
+ if (!be_msg) {
+ MGMTD_BE_ADAPTER_DBG(
+ "Failed to decode %zu bytes for adapter: %s", len,
+ adapter->name);
+ return;
}
-
- return processed;
+ MGMTD_BE_ADAPTER_DBG("Decoded %zu bytes of message: %u for adapter: %s",
+ len, be_msg->message_case, adapter->name);
+ (void)mgmt_be_adapter_handle_msg(adapter, be_msg);
+ mgmtd__be_message__free_unpacked(be_msg, NULL);
}
static void mgmt_be_adapter_proc_msgbufs(struct thread *thread)
{
- struct mgmt_be_client_adapter *adapter;
- struct stream *work;
- int processed = 0;
-
- adapter = (struct mgmt_be_client_adapter *)THREAD_ARG(thread);
- assert(adapter);
-
- if (adapter->conn_fd < 0)
- return;
-
- for (; processed < MGMTD_BE_MAX_NUM_MSG_PROC;) {
- work = stream_fifo_pop_safe(adapter->ibuf_fifo);
- if (!work)
- break;
-
- processed += mgmt_be_adapter_process_msg(
- adapter, STREAM_DATA(work), stream_get_endp(work));
-
- if (work != adapter->ibuf_work) {
- /* Free it up */
- stream_free(work);
- } else {
- /* Reset stream buffer for next read */
- stream_reset(work);
- }
- }
+ struct mgmt_be_client_adapter *adapter = THREAD_ARG(thread);
- /*
- * If we have more to process, reschedule for processing it.
- */
- if (stream_fifo_head(adapter->ibuf_fifo))
+ if (mgmt_msg_procbufs(&adapter->mstate, mgmt_be_adapter_process_msg,
+ adapter, mgmt_debug_be))
mgmt_be_adapter_register_event(adapter, MGMTD_BE_PROC_MSG);
}
static void mgmt_be_adapter_read(struct thread *thread)
{
struct mgmt_be_client_adapter *adapter;
- int bytes_read, msg_cnt;
- size_t total_bytes, bytes_left;
- struct mgmt_be_msg_hdr *msg_hdr;
- bool incomplete = false;
+ enum mgmt_msg_rsched rv;
adapter = (struct mgmt_be_client_adapter *)THREAD_ARG(thread);
- assert(adapter && adapter->conn_fd >= 0);
- total_bytes = 0;
- bytes_left = STREAM_SIZE(adapter->ibuf_work) -
- stream_get_endp(adapter->ibuf_work);
- for (; bytes_left > MGMTD_BE_MSG_HDR_LEN;) {
- bytes_read = stream_read_try(adapter->ibuf_work,
- adapter->conn_fd, bytes_left);
- MGMTD_BE_ADAPTER_DBG(
- "Got %d bytes of message from MGMTD Backend adapter '%s'",
- bytes_read, adapter->name);
- if (bytes_read <= 0) {
- if (bytes_read == -1
- && (errno == EAGAIN || errno == EWOULDBLOCK)) {
- mgmt_be_adapter_register_event(
- adapter, MGMTD_BE_CONN_READ);
- return;
- }
-
- if (!bytes_read) {
- /* Looks like connection closed */
- MGMTD_BE_ADAPTER_ERR(
- "Got error (%d) while reading from MGMTD Backend adapter '%s'. Err: '%s'",
- bytes_read, adapter->name,
- safe_strerror(errno));
- mgmt_be_adapter_disconnect(adapter);
- return;
- }
- break;
- }
-
- total_bytes += bytes_read;
- bytes_left -= bytes_read;
- }
-
- /*
- * Check if we would have read incomplete messages or not.
- */
- stream_set_getp(adapter->ibuf_work, 0);
- total_bytes = 0;
- msg_cnt = 0;
- bytes_left = stream_get_endp(adapter->ibuf_work);
- for (; bytes_left > MGMTD_BE_MSG_HDR_LEN;) {
- msg_hdr =
- (struct mgmt_be_msg_hdr *)(STREAM_DATA(
- adapter->ibuf_work)
- + total_bytes);
- if (msg_hdr->marker != MGMTD_BE_MSG_MARKER) {
- /* Corrupted buffer. Force disconnect?? */
- MGMTD_BE_ADAPTER_ERR(
- "Received corrupted buffer from MGMTD Backend client.");
- mgmt_be_adapter_disconnect(adapter);
- return;
- }
- if (msg_hdr->len > bytes_left)
- break;
-
- total_bytes += msg_hdr->len;
- bytes_left -= msg_hdr->len;
- msg_cnt++;
- }
-
- if (bytes_left > 0)
- incomplete = true;
-
- /*
- * We would have read one or several messages.
- * Schedule processing them now.
- */
- msg_hdr = (struct mgmt_be_msg_hdr *)(STREAM_DATA(adapter->ibuf_work)
- + total_bytes);
- stream_set_endp(adapter->ibuf_work, total_bytes);
- stream_fifo_push(adapter->ibuf_fifo, adapter->ibuf_work);
- adapter->ibuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN);
- if (incomplete) {
- stream_put(adapter->ibuf_work, msg_hdr, bytes_left);
- stream_set_endp(adapter->ibuf_work, bytes_left);
+ rv = mgmt_msg_read(&adapter->mstate, adapter->conn_fd, mgmt_debug_be);
+ if (rv == MSR_DISCONNECT) {
+ mgmt_be_adapter_disconnect(adapter);
+ return;
}
-
- if (msg_cnt)
+ if (rv == MSR_SCHED_BOTH)
mgmt_be_adapter_register_event(adapter, MGMTD_BE_PROC_MSG);
-
mgmt_be_adapter_register_event(adapter, MGMTD_BE_CONN_READ);
}
static void mgmt_be_adapter_write(struct thread *thread)
{
- int bytes_written = 0;
- int processed = 0;
- int msg_size = 0;
- struct stream *s = NULL;
- struct stream *free = NULL;
- struct mgmt_be_client_adapter *adapter;
+ struct mgmt_be_client_adapter *adapter = THREAD_ARG(thread);
+ enum mgmt_msg_wsched rv;
- adapter = (struct mgmt_be_client_adapter *)THREAD_ARG(thread);
- assert(adapter && adapter->conn_fd >= 0);
-
- /* Ensure pushing any pending write buffer to FIFO */
- if (adapter->obuf_work) {
- stream_fifo_push(adapter->obuf_fifo, adapter->obuf_work);
- adapter->obuf_work = NULL;
- }
-
- for (s = stream_fifo_head(adapter->obuf_fifo);
- s && processed < MGMTD_BE_MAX_NUM_MSG_WRITE;
- s = stream_fifo_head(adapter->obuf_fifo)) {
- /* msg_size = (int)stream_get_size(s); */
- msg_size = (int)STREAM_READABLE(s);
- bytes_written = stream_flush(s, adapter->conn_fd);
- if (bytes_written == -1
- && (errno == EAGAIN || errno == EWOULDBLOCK)) {
- mgmt_be_adapter_register_event(adapter,
- MGMTD_BE_CONN_WRITE);
- return;
- } else if (bytes_written != msg_size) {
- MGMTD_BE_ADAPTER_ERR(
- "Could not write all %d bytes (wrote: %d) to MGMTD Backend client socket. Err: '%s'",
- msg_size, bytes_written, safe_strerror(errno));
- if (bytes_written > 0) {
- stream_forward_getp(s, (size_t)bytes_written);
- stream_pulldown(s);
- mgmt_be_adapter_register_event(
- adapter, MGMTD_BE_CONN_WRITE);
- return;
- }
- mgmt_be_adapter_disconnect(adapter);
- return;
- }
-
- free = stream_fifo_pop(adapter->obuf_fifo);
- stream_free(free);
- MGMTD_BE_ADAPTER_DBG(
- "Wrote %d bytes of message to MGMTD Backend client socket.'",
- bytes_written);
- processed++;
- }
-
- if (s) {
+ rv = mgmt_msg_write(&adapter->mstate, adapter->conn_fd, mgmt_debug_be);
+ if (rv == MSW_SCHED_STREAM)
+ mgmt_be_adapter_register_event(adapter, MGMTD_BE_CONN_WRITE);
+ else if (rv == MSW_DISCONNECT)
+ mgmt_be_adapter_disconnect(adapter);
+ else if (rv == MSW_SCHED_WRITES_OFF) {
mgmt_be_adapter_writes_off(adapter);
mgmt_be_adapter_register_event(adapter,
- MGMTD_BE_CONN_WRITES_ON);
- }
+ MGMTD_BE_CONN_WRITES_ON);
+ } else
+ assert(rv == MSW_SCHED_NONE);
}
static void mgmt_be_adapter_resume_writes(struct thread *thread)
@@ -936,6 +744,14 @@ mgmt_be_adapter_register_event(struct mgmt_be_client_adapter *adapter,
assert(adapter->conn_read_ev);
break;
case MGMTD_BE_CONN_WRITE:
+ if (adapter->conn_write_ev)
+ MGMTD_BE_ADAPTER_DBG(
+ "write ready notify already set for client %s",
+ adapter->name);
+ else
+ MGMTD_BE_ADAPTER_DBG(
+ "scheduling write ready notify for client %s",
+ adapter->name);
thread_add_write(mgmt_be_adapter_tm, mgmt_be_adapter_write,
adapter, adapter->conn_fd, &adapter->conn_write_ev);
assert(adapter->conn_write_ev);
@@ -976,17 +792,12 @@ extern void mgmt_be_adapter_unlock(struct mgmt_be_client_adapter **adapter)
(*adapter)->refcount--;
if (!(*adapter)->refcount) {
mgmt_be_adapters_del(&mgmt_be_adapters, *adapter);
-
- stream_fifo_free((*adapter)->ibuf_fifo);
- stream_free((*adapter)->ibuf_work);
- stream_fifo_free((*adapter)->obuf_fifo);
- stream_free((*adapter)->obuf_work);
-
THREAD_OFF((*adapter)->conn_init_ev);
THREAD_OFF((*adapter)->conn_read_ev);
THREAD_OFF((*adapter)->conn_write_ev);
THREAD_OFF((*adapter)->conn_writes_on);
THREAD_OFF((*adapter)->proc_msg_ev);
+ mgmt_msg_destroy(&(*adapter)->mstate);
XFREE(MTYPE_MGMTD_BE_ADPATER, *adapter);
}
@@ -1029,11 +840,9 @@ mgmt_be_create_adapter(int conn_fd, union sockunion *from)
memcpy(&adapter->conn_su, from, sizeof(adapter->conn_su));
snprintf(adapter->name, sizeof(adapter->name), "Unknown-FD-%d",
adapter->conn_fd);
- adapter->ibuf_fifo = stream_fifo_new();
- adapter->ibuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN);
- adapter->obuf_fifo = stream_fifo_new();
- /* adapter->obuf_work = stream_new(MGMTD_BE_MSG_MAX_LEN); */
- adapter->obuf_work = NULL;
+ mgmt_msg_init(&adapter->mstate, MGMTD_BE_MAX_NUM_MSG_PROC,
+ MGMTD_BE_MAX_NUM_MSG_WRITE, MGMTD_BE_MSG_MAX_LEN,
+ "BE-adapter");
mgmt_be_adapter_lock(adapter);
mgmt_be_adapter_register_event(adapter, MGMTD_BE_CONN_READ);
@@ -1195,8 +1004,14 @@ void mgmt_be_adapter_status_write(struct vty *vty)
vty_out(vty, " Conn-FD: \t\t\t%d\n", adapter->conn_fd);
vty_out(vty, " Client-Id: \t\t\t%d\n", adapter->id);
vty_out(vty, " Ref-Count: \t\t\t%u\n", adapter->refcount);
- vty_out(vty, " Msg-Sent: \t\t\t%u\n", adapter->num_msg_tx);
- vty_out(vty, " Msg-Recvd: \t\t\t%u\n", adapter->num_msg_rx);
+ vty_out(vty, " Msg-Recvd: \t\t\t%" PRIu64 "\n",
+ adapter->mstate.nrxm);
+ vty_out(vty, " Bytes-Recvd: \t\t%" PRIu64 "\n",
+ adapter->mstate.nrxb);
+ vty_out(vty, " Msg-Sent: \t\t\t%" PRIu64 "\n",
+ adapter->mstate.ntxm);
+ vty_out(vty, " Bytes-Sent: \t\t%" PRIu64 "\n",
+ adapter->mstate.ntxb);
}
vty_out(vty, " Total: %d\n",
(int)mgmt_be_adapters_count(&mgmt_be_adapters));
diff --git a/mgmtd/mgmt_be_adapter.h b/mgmtd/mgmt_be_adapter.h
index 5dfc2386d..7f57233d3 100644
--- a/mgmtd/mgmt_be_adapter.h
+++ b/mgmtd/mgmt_be_adapter.h
@@ -9,8 +9,9 @@
#ifndef _FRR_MGMTD_BE_ADAPTER_H_
#define _FRR_MGMTD_BE_ADAPTER_H_
-#include "mgmtd/mgmt_defines.h"
#include "mgmt_be_client.h"
+#include "mgmt_msg.h"
+#include "mgmtd/mgmt_defines.h"
#include "mgmtd/mgmt_ds.h"
#define MGMTD_BE_CONN_INIT_DELAY_MSEC 50
@@ -54,22 +55,9 @@ struct mgmt_be_client_adapter {
char xpath_reg[MGMTD_MAX_NUM_XPATH_REG][MGMTD_MAX_XPATH_LEN];
/* IO streams for read and write */
- /* pthread_mutex_t ibuf_mtx; */
- struct stream_fifo *ibuf_fifo;
- /* pthread_mutex_t obuf_mtx; */
- struct stream_fifo *obuf_fifo;
-
- /* Private I/O buffers */
- struct stream *ibuf_work;
- struct stream *obuf_work;
- uint8_t msg_buf[MGMTD_BE_MSG_MAX_LEN];
-
- /* Buffer of data waiting to be written to client. */
- /* struct buffer *wb; */
+ struct mgmt_msg_state mstate;
int refcount;
- uint32_t num_msg_tx;
- uint32_t num_msg_rx;
/*
* List of config items that should be sent to the
diff --git a/mgmtd/mgmt_be_server.c b/mgmtd/mgmt_be_server.c
index 6464b12ae..6997fdcf8 100644
--- a/mgmtd/mgmt_be_server.c
+++ b/mgmtd/mgmt_be_server.c
@@ -28,7 +28,7 @@
zlog_err("%s: ERROR: " fmt, __func__, ##__VA_ARGS__)
#endif /* REDIRECT_DEBUG_TO_STDERR */
-static int mgmt_be_listen_fd;
+static int mgmt_be_listen_fd = -1;
static struct thread_master *mgmt_be_listen_tm;
static struct thread *mgmt_be_listen_ev;
static void mgmt_be_server_register_event(enum mgmt_be_event event);
diff --git a/mgmtd/mgmt_fe_adapter.c b/mgmtd/mgmt_fe_adapter.c
index 1ea812c1a..cc812ab15 100644
--- a/mgmtd/mgmt_fe_adapter.c
+++ b/mgmtd/mgmt_fe_adapter.c
@@ -11,6 +11,7 @@
#include "network.h"
#include "libfrr.h"
#include "mgmt_fe_client.h"
+#include "mgmt_msg.h"
#include "mgmt_pb.h"
#include "hash.h"
#include "jhash.h"
@@ -375,8 +376,7 @@ mgmt_fe_adapter_writes_on(struct mgmt_fe_client_adapter *adapter)
{
MGMTD_FE_ADAPTER_DBG("Resume writing msgs for '%s'", adapter->name);
UNSET_FLAG(adapter->flags, MGMTD_FE_ADAPTER_FLAGS_WRITES_OFF);
- if (adapter->obuf_work || stream_fifo_count_safe(adapter->obuf_fifo))
- mgmt_fe_adapter_sched_msg_write(adapter);
+ mgmt_fe_adapter_sched_msg_write(adapter);
}
static inline void
@@ -390,40 +390,18 @@ static int
mgmt_fe_adapter_send_msg(struct mgmt_fe_client_adapter *adapter,
Mgmtd__FeMessage *fe_msg)
{
- size_t msg_size;
- uint8_t msg_buf[MGMTD_FE_MSG_MAX_LEN];
- struct mgmt_fe_msg *msg;
-
- if (adapter->conn_fd < 0) {
- MGMTD_FE_ADAPTER_ERR("Connection already reset");
- return -1;
- }
-
- msg_size = mgmtd__fe_message__get_packed_size(fe_msg);
- msg_size += MGMTD_FE_MSG_HDR_LEN;
- if (msg_size > sizeof(msg_buf)) {
- MGMTD_FE_ADAPTER_ERR(
- "Message size %d more than max size'%d. Not sending!'",
- (int)msg_size, (int)sizeof(msg_buf));
+ if (adapter->conn_fd == -1) {
+ MGMTD_FE_ADAPTER_DBG("can't send message on closed connection");
return -1;
}
- msg = (struct mgmt_fe_msg *)msg_buf;
- msg->hdr.marker = MGMTD_FE_MSG_MARKER;
- msg->hdr.len = (uint16_t)msg_size;
- mgmtd__fe_message__pack(fe_msg, msg->payload);
-
- if (!adapter->obuf_work)
- adapter->obuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN);
- if (STREAM_WRITEABLE(adapter->obuf_work) < msg_size) {
- stream_fifo_push(adapter->obuf_fifo, adapter->obuf_work);
- adapter->obuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN);
- }
- stream_write(adapter->obuf_work, (void *)msg_buf, msg_size);
-
+ int rv = mgmt_msg_send_msg(
+ &adapter->mstate, fe_msg,
+ mgmtd__fe_message__get_packed_size(fe_msg),
+ (size_t(*)(void *, void *))mgmtd__fe_message__pack,
+ mgmt_debug_fe);
mgmt_fe_adapter_sched_msg_write(adapter);
- adapter->num_msg_tx++;
- return 0;
+ return rv;
}
static int
@@ -1244,6 +1222,9 @@ static int mgmt_fe_session_handle_commit_config_req_msg(
"Failed to create a Configuration session!");
return 0;
}
+ MGMTD_FE_ADAPTER_DBG(
+ "Created txn %llu for session %llu for COMMIT-CFG-REQ",
+ session->cfg_txn_id, session->session_id);
}
@@ -1430,252 +1411,66 @@ mgmt_fe_adapter_handle_msg(struct mgmt_fe_client_adapter *adapter,
return 0;
}
-static uint16_t
-mgmt_fe_adapter_process_msg(struct mgmt_fe_client_adapter *adapter,
- uint8_t *msg_buf, uint16_t bytes_read)
+static void mgmt_fe_adapter_process_msg(void *user_ctx, uint8_t *data,
+ size_t len)
{
+ struct mgmt_fe_client_adapter *adapter = user_ctx;
Mgmtd__FeMessage *fe_msg;
- struct mgmt_fe_msg *msg;
- uint16_t bytes_left;
- uint16_t processed = 0;
-
- MGMTD_FE_ADAPTER_DBG(
- "Have %u bytes of messages from client '%s' to process",
- bytes_read, adapter->name);
-
- bytes_left = bytes_read;
- for (; bytes_left > MGMTD_FE_MSG_HDR_LEN;
- bytes_left -= msg->hdr.len, msg_buf += msg->hdr.len) {
- msg = (struct mgmt_fe_msg *)msg_buf;
- if (msg->hdr.marker != MGMTD_FE_MSG_MARKER) {
- MGMTD_FE_ADAPTER_DBG(
- "Marker not found in message from MGMTD Frontend adapter '%s'",
- adapter->name);
- break;
- }
-
- if (bytes_left < msg->hdr.len) {
- MGMTD_FE_ADAPTER_DBG(
- "Incomplete message of %d bytes (epxected: %u) from MGMTD Frontend adapter '%s'",
- bytes_left, msg->hdr.len, adapter->name);
- break;
- }
-
- fe_msg = mgmtd__fe_message__unpack(
- NULL, (size_t)(msg->hdr.len - MGMTD_FE_MSG_HDR_LEN),
- msg->payload);
- if (!fe_msg) {
- MGMTD_FE_ADAPTER_DBG(
- "Failed to decode %d bytes from MGMTD Frontend adapter '%s'",
- msg->hdr.len, adapter->name);
- continue;
- }
+ fe_msg = mgmtd__fe_message__unpack(NULL, len, data);
+ if (!fe_msg) {
MGMTD_FE_ADAPTER_DBG(
- "Decoded %d bytes of message(msg: %u/%u) from MGMTD Frontend adapter '%s'",
- msg->hdr.len, fe_msg->message_case,
- fe_msg->message_case, adapter->name);
-
- (void)mgmt_fe_adapter_handle_msg(adapter, fe_msg);
-
- mgmtd__fe_message__free_unpacked(fe_msg, NULL);
- processed++;
- adapter->num_msg_rx++;
+ "Failed to decode %zu bytes for adapter: %s", len,
+ adapter->name);
+ return;
}
-
- return processed;
+ MGMTD_FE_ADAPTER_DBG(
+ "Decoded %zu bytes of message: %u from adapter: %s", len,
+ fe_msg->message_case, adapter->name);
+ (void)mgmt_fe_adapter_handle_msg(adapter, fe_msg);
+ mgmtd__fe_message__free_unpacked(fe_msg, NULL);
}
static void mgmt_fe_adapter_proc_msgbufs(struct thread *thread)
{
- struct mgmt_fe_client_adapter *adapter;
- struct stream *work;
- int processed = 0;
+ struct mgmt_fe_client_adapter *adapter = THREAD_ARG(thread);
- adapter = (struct mgmt_fe_client_adapter *)THREAD_ARG(thread);
- assert(adapter && adapter->conn_fd >= 0);
-
- MGMTD_FE_ADAPTER_DBG("Have %d ibufs for client '%s' to process",
- (int)stream_fifo_count_safe(adapter->ibuf_fifo),
- adapter->name);
-
- for (; processed < MGMTD_FE_MAX_NUM_MSG_PROC;) {
- work = stream_fifo_pop_safe(adapter->ibuf_fifo);
- if (!work)
- break;
-
- processed += mgmt_fe_adapter_process_msg(
- adapter, STREAM_DATA(work), stream_get_endp(work));
-
- if (work != adapter->ibuf_work) {
- /* Free it up */
- stream_free(work);
- } else {
- /* Reset stream buffer for next read */
- stream_reset(work);
- }
- }
-
- /*
- * If we have more to process, reschedule for processing it.
- */
- if (stream_fifo_head(adapter->ibuf_fifo))
+ if (mgmt_msg_procbufs(&adapter->mstate, mgmt_fe_adapter_process_msg,
+ adapter, mgmt_debug_fe))
mgmt_fe_adapter_register_event(adapter, MGMTD_FE_PROC_MSG);
}
static void mgmt_fe_adapter_read(struct thread *thread)
{
- struct mgmt_fe_client_adapter *adapter;
- int bytes_read, msg_cnt;
- size_t total_bytes, bytes_left;
- struct mgmt_fe_msg_hdr *msg_hdr;
- bool incomplete = false;
+ struct mgmt_fe_client_adapter *adapter = THREAD_ARG(thread);
+ enum mgmt_msg_rsched rv;
- adapter = (struct mgmt_fe_client_adapter *)THREAD_ARG(thread);
- assert(adapter && adapter->conn_fd);
-
- total_bytes = 0;
- bytes_left = STREAM_SIZE(adapter->ibuf_work)
- - stream_get_endp(adapter->ibuf_work);
- for (; bytes_left > MGMTD_FE_MSG_HDR_LEN;) {
- bytes_read = stream_read_try(adapter->ibuf_work, adapter->conn_fd,
- bytes_left);
- MGMTD_FE_ADAPTER_DBG(
- "Got %d bytes of message from MGMTD Frontend adapter '%s'",
- bytes_read, adapter->name);
- if (bytes_read <= 0) {
- if (bytes_read == -1
- && (errno == EAGAIN || errno == EWOULDBLOCK)) {
- mgmt_fe_adapter_register_event(
- adapter, MGMTD_FE_CONN_READ);
- return;
- }
-
- if (!bytes_read) {
- /* Looks like connection closed */
- MGMTD_FE_ADAPTER_ERR(
- "Got error (%d) while reading from MGMTD Frontend adapter '%s'. Err: '%s'",
- bytes_read, adapter->name,
- safe_strerror(errno));
- mgmt_fe_adapter_disconnect(adapter);
- return;
- }
- break;
- }
-
- total_bytes += bytes_read;
- bytes_left -= bytes_read;
- }
-
- /*
- * Check if we would have read incomplete messages or not.
- */
- stream_set_getp(adapter->ibuf_work, 0);
- total_bytes = 0;
- msg_cnt = 0;
- bytes_left = stream_get_endp(adapter->ibuf_work);
- for (; bytes_left > MGMTD_FE_MSG_HDR_LEN;) {
- msg_hdr =
- (struct mgmt_fe_msg_hdr *)(STREAM_DATA(
- adapter->ibuf_work)
- + total_bytes);
- if (msg_hdr->marker != MGMTD_FE_MSG_MARKER) {
- /* Corrupted buffer. Force disconnect?? */
- MGMTD_FE_ADAPTER_ERR(
- "Received corrupted buffer from MGMTD frontend client.");
- mgmt_fe_adapter_disconnect(adapter);
- return;
- }
- if (msg_hdr->len > bytes_left)
- break;
-
- MGMTD_FE_ADAPTER_DBG("Got message (len: %u) from client '%s'",
- msg_hdr->len, adapter->name);
-
- total_bytes += msg_hdr->len;
- bytes_left -= msg_hdr->len;
- msg_cnt++;
- }
-
- if (bytes_left > 0)
- incomplete = true;
- /*
- * We would have read one or several messages.
- * Schedule processing them now.
- */
- msg_hdr = (struct mgmt_fe_msg_hdr *)(STREAM_DATA(adapter->ibuf_work)
- + total_bytes);
- stream_set_endp(adapter->ibuf_work, total_bytes);
- stream_fifo_push(adapter->ibuf_fifo, adapter->ibuf_work);
- adapter->ibuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN);
- if (incomplete) {
- stream_put(adapter->ibuf_work, msg_hdr, bytes_left);
- stream_set_endp(adapter->ibuf_work, bytes_left);
+ rv = mgmt_msg_read(&adapter->mstate, adapter->conn_fd, mgmt_debug_fe);
+ if (rv == MSR_DISCONNECT) {
+ mgmt_fe_adapter_disconnect(adapter);
+ return;
}
-
- if (msg_cnt)
+ if (rv == MSR_SCHED_BOTH)
mgmt_fe_adapter_register_event(adapter, MGMTD_FE_PROC_MSG);
-
mgmt_fe_adapter_register_event(adapter, MGMTD_FE_CONN_READ);
}
static void mgmt_fe_adapter_write(struct thread *thread)
{
- int bytes_written = 0;
- int processed = 0;
- int msg_size = 0;
- struct stream *s = NULL;
- struct stream *free = NULL;
- struct mgmt_fe_client_adapter *adapter;
-
- adapter = (struct mgmt_fe_client_adapter *)THREAD_ARG(thread);
- assert(adapter && adapter->conn_fd);
-
- /* Ensure pushing any pending write buffer to FIFO */
- if (adapter->obuf_work) {
- stream_fifo_push(adapter->obuf_fifo, adapter->obuf_work);
- adapter->obuf_work = NULL;
- }
-
- for (s = stream_fifo_head(adapter->obuf_fifo);
- s && processed < MGMTD_FE_MAX_NUM_MSG_WRITE;
- s = stream_fifo_head(adapter->obuf_fifo)) {
- /* msg_size = (int)stream_get_size(s); */
- msg_size = (int)STREAM_READABLE(s);
- bytes_written = stream_flush(s, adapter->conn_fd);
- if (bytes_written == -1
- && (errno == EAGAIN || errno == EWOULDBLOCK)) {
- mgmt_fe_adapter_register_event(
- adapter, MGMTD_FE_CONN_WRITE);
- return;
- } else if (bytes_written != msg_size) {
- MGMTD_FE_ADAPTER_ERR(
- "Could not write all %d bytes (wrote: %d) to MGMTD Frontend client socket. Err: '%s'",
- msg_size, bytes_written, safe_strerror(errno));
- if (bytes_written > 0) {
- stream_forward_getp(s, (size_t)bytes_written);
- stream_pulldown(s);
- mgmt_fe_adapter_register_event(
- adapter, MGMTD_FE_CONN_WRITE);
- return;
- }
- mgmt_fe_adapter_disconnect(adapter);
- return;
- }
-
- free = stream_fifo_pop(adapter->obuf_fifo);
- stream_free(free);
- MGMTD_FE_ADAPTER_DBG(
- "Wrote %d bytes of message to MGMTD Frontend client socket.'",
- bytes_written);
- processed++;
- }
+ struct mgmt_fe_client_adapter *adapter = THREAD_ARG(thread);
+ enum mgmt_msg_wsched rv;
- if (s) {
+ rv = mgmt_msg_write(&adapter->mstate, adapter->conn_fd, mgmt_debug_fe);
+ if (rv == MSW_SCHED_STREAM)
+ mgmt_fe_adapter_register_event(adapter, MGMTD_FE_CONN_WRITE);
+ else if (rv == MSW_DISCONNECT)
+ mgmt_fe_adapter_disconnect(adapter);
+ else if (rv == MSW_SCHED_WRITES_OFF) {
mgmt_fe_adapter_writes_off(adapter);
mgmt_fe_adapter_register_event(adapter,
- MGMTD_FE_CONN_WRITES_ON);
- }
+ MGMTD_FE_CONN_WRITES_ON);
+ } else
+ assert(rv == MSW_SCHED_NONE);
}
static void mgmt_fe_adapter_resume_writes(struct thread *thread)
@@ -1683,7 +1478,7 @@ static void mgmt_fe_adapter_resume_writes(struct thread *thread)
struct mgmt_fe_client_adapter *adapter;
adapter = (struct mgmt_fe_client_adapter *)THREAD_ARG(thread);
- assert(adapter && adapter->conn_fd);
+ assert(adapter && adapter->conn_fd != -1);
mgmt_fe_adapter_writes_on(adapter);
}
@@ -1739,16 +1534,11 @@ mgmt_fe_adapter_unlock(struct mgmt_fe_client_adapter **adapter)
(*adapter)->refcount--;
if (!(*adapter)->refcount) {
mgmt_fe_adapters_del(&mgmt_fe_adapters, *adapter);
-
- stream_fifo_free((*adapter)->ibuf_fifo);
- stream_free((*adapter)->ibuf_work);
- stream_fifo_free((*adapter)->obuf_fifo);
- stream_free((*adapter)->obuf_work);
-
THREAD_OFF((*adapter)->conn_read_ev);
THREAD_OFF((*adapter)->conn_write_ev);
THREAD_OFF((*adapter)->proc_msg_ev);
THREAD_OFF((*adapter)->conn_writes_on);
+ mgmt_msg_destroy(&(*adapter)->mstate);
XFREE(MTYPE_MGMTD_FE_ADPATER, *adapter);
}
@@ -1793,11 +1583,10 @@ mgmt_fe_create_adapter(int conn_fd, union sockunion *from)
snprintf(adapter->name, sizeof(adapter->name), "Unknown-FD-%d",
adapter->conn_fd);
mgmt_fe_sessions_init(&adapter->fe_sessions);
- adapter->ibuf_fifo = stream_fifo_new();
- adapter->ibuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN);
- adapter->obuf_fifo = stream_fifo_new();
- /* adapter->obuf_work = stream_new(MGMTD_FE_MSG_MAX_LEN); */
- adapter->obuf_work = NULL;
+
+ mgmt_msg_init(&adapter->mstate, MGMTD_FE_MAX_NUM_MSG_PROC,
+ MGMTD_FE_MAX_NUM_MSG_WRITE, MGMTD_FE_MSG_MAX_LEN,
+ "FE-adapter");
mgmt_fe_adapter_lock(adapter);
mgmt_fe_adapter_register_event(adapter, MGMTD_FE_CONN_READ);
@@ -2083,9 +1872,14 @@ void mgmt_fe_adapter_status_write(struct vty *vty, bool detail)
}
vty_out(vty, " Total-Sessions: \t\t\t%d\n",
(int)mgmt_fe_sessions_count(&adapter->fe_sessions));
- vty_out(vty, " Msg-Sent: \t\t\t\t%u\n", adapter->num_msg_tx);
- vty_out(vty, " Msg-Recvd: \t\t\t\t%u\n",
- adapter->num_msg_rx);
+ vty_out(vty, " Msg-Recvd: \t\t\t\t%" PRIu64 "\n",
+ adapter->mstate.nrxm);
+ vty_out(vty, " Bytes-Recvd: \t\t\t%" PRIu64 "\n",
+ adapter->mstate.nrxb);
+ vty_out(vty, " Msg-Sent: \t\t\t\t%" PRIu64 "\n",
+ adapter->mstate.ntxm);
+ vty_out(vty, " Bytes-Sent: \t\t\t%" PRIu64 "\n",
+ adapter->mstate.ntxb);
}
vty_out(vty, " Total: %d\n",
(int)mgmt_fe_adapters_count(&mgmt_fe_adapters));
diff --git a/mgmtd/mgmt_fe_adapter.h b/mgmtd/mgmt_fe_adapter.h
index 05d37d3f3..3389234a3 100644
--- a/mgmtd/mgmt_fe_adapter.h
+++ b/mgmtd/mgmt_fe_adapter.h
@@ -9,6 +9,10 @@
#ifndef _FRR_MGMTD_FE_ADAPTER_H_
#define _FRR_MGMTD_FE_ADAPTER_H_
+#include "mgmt_fe_client.h"
+#include "mgmt_msg.h"
+#include "mgmtd/mgmt_defines.h"
+
struct mgmt_fe_client_adapter;
struct mgmt_master;
@@ -64,18 +68,9 @@ struct mgmt_fe_client_adapter {
struct mgmt_fe_sessions_head fe_sessions;
/* IO streams for read and write */
- /* pthread_mutex_t ibuf_mtx; */
- struct stream_fifo *ibuf_fifo;
- /* pthread_mutex_t obuf_mtx; */
- struct stream_fifo *obuf_fifo;
-
- /* Private I/O buffers */
- struct stream *ibuf_work;
- struct stream *obuf_work;
+ struct mgmt_msg_state mstate;
int refcount;
- uint32_t num_msg_tx;
- uint32_t num_msg_rx;
struct mgmt_commit_stats cmt_stats;
struct mgmt_setcfg_stats setcfg_stats;
diff --git a/mgmtd/mgmt_fe_server.c b/mgmtd/mgmt_fe_server.c
index 2db4397cc..0b0a56ea6 100644
--- a/mgmtd/mgmt_fe_server.c
+++ b/mgmtd/mgmt_fe_server.c
@@ -28,7 +28,7 @@
zlog_err("%s: ERROR: " fmt, __func__, ##__VA_ARGS__)
#endif /* REDIRECT_DEBUG_TO_STDERR */
-static int mgmt_fe_listen_fd;
+static int mgmt_fe_listen_fd = -1;
static struct thread_master *mgmt_fe_listen_tm;
static struct thread *mgmt_fe_listen_ev;
static void mgmt_fe_server_register_event(enum mgmt_fe_event event);
diff --git a/mgmtd/mgmt_txn.c b/mgmtd/mgmt_txn.c
index 115aa532c..05b593798 100644
--- a/mgmtd/mgmt_txn.c
+++ b/mgmtd/mgmt_txn.c
@@ -2471,6 +2471,8 @@ int mgmt_txn_notify_be_adapter_conn(struct mgmt_be_client_adapter *adapter,
return -1;
}
+ MGMTD_TXN_DBG("Created initial txn %llu for BE connection %s",
+ txn->txn_id, adapter->name);
/*
* Set the changeset for transaction to commit and trigger the
* commit request.
@@ -2608,53 +2610,7 @@ int mgmt_txn_notify_be_cfgdata_reply(
return 0;
}
-int mgmt_txn_notify_be_cfg_validate_reply(
- uint64_t txn_id, bool success, uint64_t batch_ids[],
- size_t num_batch_ids, char *error_if_any,
- struct mgmt_be_client_adapter *adapter)
-{
- struct mgmt_txn_ctx *txn;
- struct mgmt_txn_be_cfg_batch *cfg_btch;
- struct mgmt_commit_cfg_req *cmtcfg_req = NULL;
- size_t indx;
-
- txn = mgmt_txn_id2ctx(txn_id);
- if (!txn || txn->type != MGMTD_TXN_TYPE_CONFIG)
- return -1;
-
- assert(txn->commit_cfg_req);
- cmtcfg_req = &txn->commit_cfg_req->req.commit_cfg;
-
- if (!success) {
- MGMTD_TXN_ERR(
- "CFGDATA_VALIDATE_REQ sent to '%s' failed for Txn %p, Batches [0x%llx - 0x%llx], Err: %s",
- adapter->name, txn, (unsigned long long)batch_ids[0],
- (unsigned long long)batch_ids[num_batch_ids - 1],
- error_if_any ? error_if_any : "None");
- mgmt_txn_send_commit_cfg_reply(
- txn, MGMTD_INTERNAL_ERROR,
- "Internal error! Failed to validate config data on backend!");
- return 0;
- }
-
- for (indx = 0; indx < num_batch_ids; indx++) {
- cfg_btch = mgmt_txn_cfgbatch_id2ctx(txn, batch_ids[indx]);
- if (cfg_btch->txn != txn)
- return -1;
- mgmt_move_txn_cfg_batch_to_next(
- cmtcfg_req, cfg_btch,
- &cmtcfg_req->curr_batches[adapter->id],
- &cmtcfg_req->next_batches[adapter->id], true,
- MGMTD_COMMIT_PHASE_APPLY_CFG);
- }
-
- mgmt_try_move_commit_to_next_phase(txn, cmtcfg_req);
-
- return 0;
-}
-
-extern int
-mgmt_txn_notify_be_cfg_apply_reply(uint64_t txn_id, bool success,
+int mgmt_txn_notify_be_cfg_apply_reply(uint64_t txn_id, bool success,
uint64_t batch_ids[],
size_t num_batch_ids, char *error_if_any,
struct mgmt_be_client_adapter *adapter)
@@ -2852,6 +2808,8 @@ int mgmt_txn_rollback_trigger_cfg_apply(struct mgmt_ds_ctx *src_ds_ctx,
return -1;
}
+ MGMTD_TXN_DBG("Created rollback txn %llu", txn->txn_id);
+
/*
* Set the changeset for transaction to commit and trigger the commit
* request.