summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Hopps <chopps@labn.net>2023-03-08 23:11:00 +0100
committerChristian Hopps <chopps@gmail.com>2023-03-22 06:22:56 +0100
commitc9f0e90b60768d74bc3e7904551bf45bd62ba6cd (patch)
tree78a63077ec57d71141e2e4fd9af8364db937ccc0
parentstaticd: handle `distance` value (diff)
downloadfrr-c9f0e90b60768d74bc3e7904551bf45bd62ba6cd.tar.xz
frr-c9f0e90b60768d74bc3e7904551bf45bd62ba6cd.zip
lib: new message library for mgmtd client and adapters
Signed-off-by: Christian Hopps <chopps@labn.net>
Diffstat (limited to '')
-rw-r--r--lib/mgmt_msg.c414
-rw-r--r--lib/mgmt_msg.h73
-rw-r--r--lib/subdir.am2
3 files changed, 489 insertions, 0 deletions
diff --git a/lib/mgmt_msg.c b/lib/mgmt_msg.c
new file mode 100644
index 000000000..d212fef92
--- /dev/null
+++ b/lib/mgmt_msg.c
@@ -0,0 +1,414 @@
+// SPDX-License-Identifier: GPL-2.0-or-later
+/*
+ * March 6 2023, Christian Hopps <chopps@labn.net>
+ *
+ * Copyright (C) 2021 Vmware, Inc.
+ * Pushpasis Sarkar <spushpasis@vmware.com>
+ * Copyright (c) 2023, LabN Consulting, L.L.C.
+ */
+#include <zebra.h>
+#include "network.h"
+#include "sockopt.h"
+#include "stream.h"
+#include "thread.h"
+#include "mgmt_msg.h"
+
+
+#define MGMT_MSG_DBG(dbgtag, fmt, ...) \
+ do { \
+ if (dbgtag) \
+ zlog_debug("%s: %s: " fmt, dbgtag, __func__, \
+ ##__VA_ARGS__); \
+ } while (0)
+
+#define MGMT_MSG_ERR(ms, fmt, ...) \
+ zlog_err("%s: %s: " fmt, ms->idtag, __func__, ##__VA_ARGS__)
+
+/**
+ * Read data from a socket into streams containing 1 or more full msgs headed by
+ * mgmt_msg_hdr which contain API messages (currently protobuf).
+ *
+ * Args:
+ * ms: mgmt_msg_state for this process.
+ * fd: socket/file to read data from.
+ * debug: true to enable debug logging.
+ *
+ * Returns:
+ * MPP_DISCONNECT - socket should be closed and connect retried.
+ * MSV_SCHED_STREAM - this call should be rescheduled to run.
+ * MPP_SCHED_BOTH - this call and the procmsg buf should be scheduled to
+ *run.
+ */
+enum mgmt_msg_rsched mgmt_msg_read(struct mgmt_msg_state *ms, int fd,
+ bool debug)
+{
+ const char *dbgtag = debug ? ms->idtag : NULL;
+ size_t avail = STREAM_WRITEABLE(ms->ins);
+ struct mgmt_msg_hdr *mhdr = NULL;
+ size_t total = 0;
+ size_t mcount = 0;
+ ssize_t n, left;
+
+ assert(ms && fd != -1);
+
+ /*
+ * Read as much as we can into the stream.
+ */
+ while (avail > sizeof(struct mgmt_msg_hdr)) {
+ n = stream_read_try(ms->ins, fd, avail);
+ MGMT_MSG_DBG(dbgtag, "got %ld bytes", n);
+
+ /* -2 is normal nothing read, and to retry */
+ if (n == -2)
+ break;
+ if (n <= 0) {
+ if (n == 0)
+ MGMT_MSG_ERR(ms, "got EOF/disconnect");
+ else
+ MGMT_MSG_ERR(ms,
+ "got error while reading: '%s'",
+ safe_strerror(errno));
+ return MSR_DISCONNECT;
+ }
+ ms->nrxb += n;
+ avail -= n;
+ }
+
+ /*
+ * Check if we have read a complete messages or not.
+ */
+ assert(stream_get_getp(ms->ins) == 0);
+ left = stream_get_endp(ms->ins);
+ while (left > (long)sizeof(struct mgmt_msg_hdr)) {
+ mhdr = (struct mgmt_msg_hdr *)(STREAM_DATA(ms->ins) + total);
+ if (mhdr->marker != MGMT_MSG_MARKER) {
+ MGMT_MSG_DBG(dbgtag, "recv corrupt buffer, disconnect");
+ return MSR_DISCONNECT;
+ }
+ if (mhdr->len > left)
+ break;
+
+ MGMT_MSG_DBG(dbgtag, "read full message len %u", mhdr->len);
+ total += mhdr->len;
+ left -= mhdr->len;
+ mcount++;
+ }
+
+ if (!mcount)
+ return MSR_SCHED_STREAM;
+
+ /*
+ * We have read at least one message into the stream, queue it up.
+ */
+ mhdr = (struct mgmt_msg_hdr *)(STREAM_DATA(ms->ins) + total);
+ stream_set_endp(ms->ins, total);
+ stream_fifo_push(&ms->inq, ms->ins);
+ ms->ins = stream_new(ms->max_msg_sz);
+ if (left) {
+ stream_put(ms->ins, mhdr, left);
+ stream_set_endp(ms->ins, left);
+ }
+
+ return MSR_SCHED_BOTH;
+}
+
+/**
+ * Process streams containing whole messages that have been pushed onto the
+ * FIFO. This should be called from an event/timer handler and should be
+ * reschedulable.
+ *
+ * Args:
+ * ms: mgmt_msg_state for this process.
+ * handle_mgs: function to call for each received message.
+ * user: opaque value passed through to handle_msg.
+ * debug: true to enable debug logging.
+ *
+ * Returns:
+ * true if more to process (so reschedule) else false
+ */
+bool mgmt_msg_procbufs(struct mgmt_msg_state *ms,
+ void (*handle_msg)(void *user, uint8_t *msg,
+ size_t msglen),
+ void *user, bool debug)
+{
+ const char *dbgtag = debug ? ms->idtag : NULL;
+ struct mgmt_msg_hdr *mhdr;
+ struct stream *work;
+ uint8_t *data;
+ size_t left, nproc;
+
+ MGMT_MSG_DBG(dbgtag, "Have %zu streams to process", ms->inq.count);
+
+ nproc = 0;
+ while (nproc < ms->max_read_buf) {
+ work = stream_fifo_pop(&ms->inq);
+ if (!work)
+ break;
+
+ data = STREAM_DATA(work);
+ left = stream_get_endp(work);
+ MGMT_MSG_DBG(dbgtag, "Processing stream of len %zu", left);
+
+ for (; left > sizeof(struct mgmt_msg_hdr);
+ left -= mhdr->len, data += mhdr->len) {
+ mhdr = (struct mgmt_msg_hdr *)data;
+
+ assert(mhdr->marker == MGMT_MSG_MARKER);
+ assert(left >= mhdr->len);
+
+ handle_msg(user, (uint8_t *)(mhdr + 1),
+ mhdr->len - sizeof(struct mgmt_msg_hdr));
+ ms->nrxm++;
+ nproc++;
+ }
+
+ if (work != ms->ins)
+ stream_free(work); /* Free it up */
+ else
+ stream_reset(work); /* Reset stream for next read */
+ }
+
+ /* return true if should reschedule b/c more to process. */
+ return stream_fifo_head(&ms->inq) != NULL;
+}
+
+/**
+ * Write data from a onto the socket, using streams that have been queued for
+ * sending by mgmt_msg_send_msg. This function should be reschedulable.
+ *
+ * Args:
+ * ms: mgmt_msg_state for this process.
+ * fd: socket/file to read data from.
+ * debug: true to enable debug logging.
+ *
+ * Returns:
+ * MSW_SCHED_NONE - do not reschedule anything.
+ * MSW_SCHED_STREAM - this call should be rescheduled to run again.
+ * MSW_SCHED_WRITES_OFF - writes should be disabled with a timer to
+ * re-enable them a short time later
+ * MSW_DISCONNECT - socket should be closed and reconnect retried.
+ *run.
+ */
+enum mgmt_msg_wsched mgmt_msg_write(struct mgmt_msg_state *ms, int fd,
+ bool debug)
+{
+ const char *dbgtag = debug ? ms->idtag : NULL;
+ struct stream *s;
+ size_t nproc = 0;
+ ssize_t left;
+ ssize_t n;
+
+ if (ms->outs) {
+ MGMT_MSG_DBG(dbgtag,
+ "found unqueued stream with %zu bytes, queueing",
+ stream_get_endp(ms->outs));
+ stream_fifo_push(&ms->outq, ms->outs);
+ ms->outs = NULL;
+ }
+
+ for (s = stream_fifo_head(&ms->outq); s && nproc < ms->max_write_buf;
+ s = stream_fifo_head(&ms->outq)) {
+ left = STREAM_READABLE(s);
+ assert(left);
+
+ n = stream_flush(s, fd);
+ if (n <= 0) {
+ if (n == 0)
+ MGMT_MSG_ERR(ms,
+ "connection closed while writing");
+ else if (ERRNO_IO_RETRY(errno)) {
+ MGMT_MSG_DBG(
+ dbgtag,
+ "retry error while writing %zd bytes: %s (%d)",
+ left, safe_strerror(errno), errno);
+ return MSW_SCHED_STREAM;
+ } else
+ MGMT_MSG_ERR(
+ ms,
+ "error while writing %zd bytes: %s (%d)",
+ left, safe_strerror(errno), errno);
+
+ n = mgmt_msg_reset_writes(ms);
+ MGMT_MSG_DBG(dbgtag, "drop and freed %zd streams", n);
+
+ return MSW_DISCONNECT;
+ }
+
+ ms->ntxb += n;
+ if (n != left) {
+ MGMT_MSG_DBG(dbgtag, "short stream write %zd of %zd", n,
+ left);
+ stream_forward_getp(s, n);
+ return MSW_SCHED_STREAM;
+ }
+
+ stream_free(stream_fifo_pop(&ms->outq));
+ MGMT_MSG_DBG(dbgtag, "wrote stream of %zd bytes", n);
+ nproc++;
+ }
+ if (s) {
+ MGMT_MSG_DBG(
+ dbgtag,
+ "reached %zu buffer writes, pausing with %zu streams left",
+ ms->max_write_buf, ms->outq.count);
+ return MSW_SCHED_WRITES_OFF;
+ }
+ MGMT_MSG_DBG(dbgtag, "flushed all streams from output q");
+ return MSW_SCHED_NONE;
+}
+
+
+/**
+ * Send a message by enqueueing it to be written over the socket by
+ * mgmt_msg_write.
+ *
+ * Args:
+ * ms: mgmt_msg_state for this process.
+ * fd: socket/file to read data from.
+ * debug: true to enable debug logging.
+ *
+ * Returns:
+ * 0 on success, otherwise -1 on failure. The only failure mode is if a
+ * the message exceeds the maximum message size configured on init.
+ */
+int mgmt_msg_send_msg(struct mgmt_msg_state *ms, void *msg, size_t len,
+ mgmt_msg_packf packf, bool debug)
+{
+ const char *dbgtag = debug ? ms->idtag : NULL;
+ struct mgmt_msg_hdr *mhdr;
+ struct stream *s;
+ uint8_t *dstbuf;
+ size_t endp, n;
+ size_t mlen = len + sizeof(*mhdr);
+
+ if (mlen > ms->max_msg_sz) {
+ MGMT_MSG_ERR(ms, "Message %zu > max size %zu, dropping", mlen,
+ ms->max_msg_sz);
+ return -1;
+ }
+
+ if (!ms->outs) {
+ MGMT_MSG_DBG(dbgtag, "creating new stream for msg len %zu",
+ len);
+ ms->outs = stream_new(ms->max_msg_sz);
+ } else if (STREAM_WRITEABLE(ms->outs) < mlen) {
+ MGMT_MSG_DBG(
+ dbgtag,
+ "enq existing stream len %zu and creating new stream for msg len %zu",
+ STREAM_WRITEABLE(ms->outs), mlen);
+ stream_fifo_push(&ms->outq, ms->outs);
+ ms->outs = stream_new(ms->max_msg_sz);
+ } else {
+ MGMT_MSG_DBG(
+ dbgtag,
+ "using existing stream with avail %zu for msg len %zu",
+ STREAM_WRITEABLE(ms->outs), mlen);
+ }
+ s = ms->outs;
+
+ /* We have a stream with space, pack the message into it. */
+ mhdr = (struct mgmt_msg_hdr *)(STREAM_DATA(s) + s->endp);
+ mhdr->marker = MGMT_MSG_MARKER;
+ mhdr->len = mlen;
+ stream_forward_endp(s, sizeof(*mhdr));
+ endp = stream_get_endp(s);
+ dstbuf = STREAM_DATA(s) + endp;
+ n = packf(msg, dstbuf);
+ stream_set_endp(s, endp + n);
+ ms->ntxm++;
+
+ return 0;
+}
+
+/**
+ * Create and open a unix domain stream socket on the given path
+ * setting non-blocking and send and receive buffer sizes.
+ *
+ * Args:
+ * path: path of unix domain socket to connect to.
+ * sendbuf: size of socket send buffer.
+ * recvbuf: size of socket receive buffer.
+ * dbgtag: if non-NULL enable log debug, and use this tag.
+ *
+ * Returns:
+ * socket fd or -1 on error.
+ */
+int mgmt_msg_connect(const char *path, size_t sendbuf, size_t recvbuf,
+ const char *dbgtag)
+{
+ int ret, sock, len;
+ struct sockaddr_un addr;
+
+ MGMT_MSG_DBG(dbgtag, "connecting to server on %s", path);
+ sock = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (sock < 0) {
+ MGMT_MSG_DBG(dbgtag, "socket failed: %s", safe_strerror(errno));
+ return -1;
+ }
+
+ memset(&addr, 0, sizeof(struct sockaddr_un));
+ addr.sun_family = AF_UNIX;
+ strlcpy(addr.sun_path, path, sizeof(addr.sun_path));
+#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
+ len = addr.sun_len = SUN_LEN(&addr);
+#else
+ len = sizeof(addr.sun_family) + strlen(addr.sun_path);
+#endif /* HAVE_STRUCT_SOCKADDR_UN_SUN_LEN */
+ ret = connect(sock, (struct sockaddr *)&addr, len);
+ if (ret < 0) {
+ MGMT_MSG_DBG(dbgtag, "failed to connect on %s: %s", path,
+ safe_strerror(errno));
+ close(sock);
+ return -1;
+ }
+
+ MGMT_MSG_DBG(dbgtag, "connected to server on %s", path);
+ set_nonblocking(sock);
+ setsockopt_so_sendbuf(sock, sendbuf);
+ setsockopt_so_recvbuf(sock, recvbuf);
+ return sock;
+}
+
+/**
+ * Reset the sending queue, by dequeueing all streams and freeing them. Return
+ * the number of streams freed.
+ *
+ * Args:
+ * ms: mgmt_msg_state for this process.
+ *
+ * Returns:
+ * Number of streams that were freed.
+ *
+ */
+size_t mgmt_msg_reset_writes(struct mgmt_msg_state *ms)
+{
+ struct stream *s;
+ size_t nproc = 0;
+
+ for (s = stream_fifo_pop(&ms->outq); s;
+ s = stream_fifo_pop(&ms->outq), nproc++)
+ stream_free(s);
+
+ return nproc;
+}
+
+void mgmt_msg_init(struct mgmt_msg_state *ms, size_t max_read_buf,
+ size_t max_write_buf, size_t max_msg_sz, const char *idtag)
+{
+ memset(ms, 0, sizeof(*ms));
+ ms->ins = stream_new(max_msg_sz);
+ stream_fifo_init(&ms->inq);
+ stream_fifo_init(&ms->outq);
+ ms->max_read_buf = max_write_buf;
+ ms->max_write_buf = max_read_buf;
+ ms->max_msg_sz = max_msg_sz;
+ ms->idtag = strdup(idtag);
+}
+
+void mgmt_msg_destroy(struct mgmt_msg_state *ms)
+{
+ mgmt_msg_reset_writes(ms);
+ if (ms->ins)
+ stream_free(ms->ins);
+ free(ms->idtag);
+}
diff --git a/lib/mgmt_msg.h b/lib/mgmt_msg.h
new file mode 100644
index 000000000..854875170
--- /dev/null
+++ b/lib/mgmt_msg.h
@@ -0,0 +1,73 @@
+// SPDX-License-Identifier: GPL-2.0-or-later
+/*
+ * March 6 2023, Christian Hopps <chopps@labn.net>
+ *
+ * Copyright (c) 2023, LabN Consulting, L.L.C.
+ */
+#ifndef _MGMT_MSG_H
+#define _MGMT_MSG_H
+
+#include "stream.h"
+#include "thread.h"
+
+#define MGMT_MSG_MARKER (0x4D724B21u) /* ASCII - "MrK!"*/
+
+struct mgmt_msg_state {
+ struct stream *ins;
+ struct stream *outs;
+ struct stream_fifo inq;
+ struct stream_fifo outq;
+ uint64_t nrxm; /* number of received messages */
+ uint64_t nrxb; /* number of received bytes */
+ uint64_t ntxm; /* number of sent messages */
+ uint64_t ntxb; /* number of sent bytes */
+ size_t max_read_buf; /* should replace with max time value */
+ size_t max_write_buf; /* should replace with max time value */
+ size_t max_msg_sz;
+ char *idtag; /* identifying tag for messages */
+};
+
+struct mgmt_msg_hdr {
+ uint32_t marker;
+ uint32_t len;
+};
+
+enum mgmt_msg_rsched {
+ MSR_SCHED_BOTH, /* schedule both queue and read */
+ MSR_SCHED_STREAM, /* schedule read */
+ MSR_DISCONNECT, /* disconnect and start reconnecting */
+};
+
+enum mgmt_msg_wsched {
+ MSW_SCHED_NONE, /* no scheduling required */
+ MSW_SCHED_STREAM, /* schedule writing */
+ MSW_SCHED_WRITES_OFF, /* toggle writes off */
+ MSW_DISCONNECT, /* disconnect and start reconnecting */
+};
+
+static inline uint8_t *msg_payload(struct mgmt_msg_hdr *mhdr)
+{
+ return (uint8_t *)(mhdr + 1);
+}
+
+typedef size_t (*mgmt_msg_packf)(void *msg, void *data);
+
+extern int mgmt_msg_connect(const char *path, size_t sendbuf, size_t recvbuf,
+ const char *dbgtag);
+extern void mgmt_msg_destroy(struct mgmt_msg_state *ms);
+extern void mgmt_msg_init(struct mgmt_msg_state *ms, size_t max_read_buf,
+ size_t max_write_buf, size_t max_msg_sz,
+ const char *idtag);
+extern bool mgmt_msg_procbufs(struct mgmt_msg_state *ms,
+ void (*handle_msg)(void *user, uint8_t *msg,
+ size_t msglen),
+ void *user, bool debug);
+extern enum mgmt_msg_rsched mgmt_msg_read(struct mgmt_msg_state *ms, int fd,
+ bool debug);
+extern size_t mgmt_msg_reset_writes(struct mgmt_msg_state *ms);
+extern int mgmt_msg_send_msg(struct mgmt_msg_state *ms, void *msg, size_t len,
+ size_t (*packf)(void *msg, void *buf), bool debug);
+extern enum mgmt_msg_wsched mgmt_msg_write(struct mgmt_msg_state *ms, int fd,
+ bool debug);
+
+#endif /* _MGMT_MSG_H */
diff --git a/lib/subdir.am b/lib/subdir.am
index 84a35417e..d456629bb 100644
--- a/lib/subdir.am
+++ b/lib/subdir.am
@@ -66,6 +66,7 @@ lib_libfrr_la_SOURCES = \
lib/memory.c \
lib/mgmt_be_client.c \
lib/mgmt_fe_client.c \
+ lib/mgmt_msg.c \
lib/mlag.c \
lib/module.c \
lib/mpls.c \
@@ -244,6 +245,7 @@ pkginclude_HEADERS += \
lib/mgmt.pb-c.h \
lib/mgmt_be_client.h \
lib/mgmt_fe_client.h \
+ lib/mgmt_msg.h \
lib/mgmt_pb.h \
lib/module.h \
lib/monotime.h \