diff options
author | Christian Hopps <chopps@labn.net> | 2023-05-06 06:09:46 +0200 |
---|---|---|
committer | Christian Hopps <chopps@labn.net> | 2023-05-28 11:13:22 +0200 |
commit | 5f05ff582117c6c5f658ed31ff8f78a029c3fab1 (patch) | |
tree | aca94a3569ddc4cb73affb01b465f18069ff1421 /lib/mgmt_msg.c | |
parent | lib: mgmtd: add a identifying tag to the debug message (diff) | |
download | frr-5f05ff582117c6c5f658ed31ff8f78a029c3fab1.tar.xz frr-5f05ff582117c6c5f658ed31ff8f78a029c3fab1.zip |
lib: add short-circuit operation between same process
- Use a socketpair for connection, and direct (no event loop)
message sending and handling.
Signed-off-by: Christian Hopps <chopps@labn.net>
Diffstat (limited to 'lib/mgmt_msg.c')
-rw-r--r-- | lib/mgmt_msg.c | 119 |
1 files changed, 115 insertions, 4 deletions
diff --git a/lib/mgmt_msg.c b/lib/mgmt_msg.c index 03e896a08..0d9802a2b 100644 --- a/lib/mgmt_msg.c +++ b/lib/mgmt_msg.c @@ -507,6 +507,12 @@ static void msg_conn_sched_proc_msgs(struct msg_conn *conn) void msg_conn_disconnect(struct msg_conn *conn, bool reconnect) { + /* disconnect short-circuit if present */ + if (conn->remote_conn) { + conn->remote_conn->remote_conn = NULL; + conn->remote_conn = NULL; + } + if (conn->fd != -1) { close(conn->fd); conn->fd = -1; @@ -525,14 +531,41 @@ void msg_conn_disconnect(struct msg_conn *conn, bool reconnect) } int msg_conn_send_msg(struct msg_conn *conn, uint8_t version, void *msg, - size_t mlen, size_t (*packf)(void *, void *)) + size_t mlen, size_t (*packf)(void *, void *), + bool short_circuit_ok) { + const char *dbgtag = conn->debug ? conn->mstate.idtag : NULL; + if (conn->fd == -1) { MGMT_MSG_ERR(&conn->mstate, "can't send message on closed connection"); return -1; } + /* immediately handle the message if short-circuit is present */ + if (conn->remote_conn && short_circuit_ok) { + uint8_t *buf = msg; + size_t n = mlen; + + if (packf) { + buf = XMALLOC(MTYPE_TMP, mlen); + n = packf(msg, buf); + } + + MGMT_MSG_DBG(dbgtag, "SC send: depth %u msg: %p", + ++conn->short_circuit_depth, msg); + + conn->remote_conn->handle_msg(version, buf, n, + conn->remote_conn); + + MGMT_MSG_DBG(dbgtag, "SC return from depth: %u msg: %p", + conn->short_circuit_depth--, msg); + + if (packf) + XFREE(MTYPE_TMP, buf); + return 0; + } + int rv = mgmt_msg_send_msg(&conn->mstate, version, msg, mlen, packf, conn->debug); @@ -545,6 +578,12 @@ void msg_conn_cleanup(struct msg_conn *conn) { struct mgmt_msg_state *ms = &conn->mstate; + /* disconnect short-circuit if present */ + if (conn->remote_conn) { + conn->remote_conn->remote_conn = NULL; + conn->remote_conn = NULL; + } + if (conn->fd != -1) { close(conn->fd); conn->fd = -1; @@ -561,6 +600,10 @@ void msg_conn_cleanup(struct msg_conn *conn) * Client Connections */ +DECLARE_LIST(msg_server_list, struct msg_server, link); + +static struct msg_server_list_head msg_servers; + static void msg_client_connect(struct msg_client *conn); static void msg_client_connect_timer(struct event *thread) @@ -583,6 +626,64 @@ static void msg_client_sched_connect(struct msg_client *client, &client->conn_retry_tmr); } +static bool msg_client_connect_short_circuit(struct msg_client *client) +{ + struct msg_conn *server_conn; + struct msg_server *server; + const char *dbgtag = + client->conn.debug ? client->conn.mstate.idtag : NULL; + union sockunion su = {0}; + int sockets[2]; + + frr_each (msg_server_list, &msg_servers, server) + if (!strcmp(server->sopath, client->sopath)) + break; + if (!server) { + MGMT_MSG_DBG(dbgtag, + "no short-circuit connection available for %s", + client->sopath); + + return false; + } + + if (socketpair(AF_UNIX, SOCK_STREAM, 0, sockets)) { + MGMT_MSG_ERR( + &client->conn.mstate, + "socketpair failed trying to short-circuit connection on %s: %s", + client->sopath, safe_strerror(errno)); + return false; + } + + /* client side */ + client->conn.fd = sockets[0]; + set_nonblocking(sockets[0]); + setsockopt_so_sendbuf(sockets[0], client->conn.mstate.max_write_buf); + setsockopt_so_recvbuf(sockets[0], client->conn.mstate.max_read_buf); + client->conn.is_short_circuit = true; + + /* server side */ + memset(&su, 0, sizeof(union sockunion)); + server_conn = server->create(sockets[1], &su); + server_conn->is_short_circuit = true; + + client->conn.remote_conn = server_conn; + server_conn->remote_conn = &client->conn; + + MGMT_MSG_DBG( + dbgtag, + "short-circuit connection on %s server %s:%d to client %s:%d", + client->sopath, server_conn->mstate.idtag, server_conn->fd, + client->conn.mstate.idtag, client->conn.fd); + + MGMT_MSG_DBG( + server_conn->debug ? server_conn->mstate.idtag : NULL, + "short-circuit connection on %s client %s:%d to server %s:%d", + client->sopath, client->conn.mstate.idtag, client->conn.fd, + server_conn->mstate.idtag, server_conn->fd); + + return true; +} + /* Connect and start reading from the socket */ static void msg_client_connect(struct msg_client *client) @@ -590,8 +691,11 @@ static void msg_client_connect(struct msg_client *client) struct msg_conn *conn = &client->conn; const char *dbgtag = conn->debug ? conn->mstate.idtag : NULL; - conn->fd = mgmt_msg_connect(client->sopath, MSG_CONN_SEND_BUF_SIZE, - MSG_CONN_RECV_BUF_SIZE, dbgtag); + if (!client->short_circuit_ok || + !msg_client_connect_short_circuit(client)) + conn->fd = + mgmt_msg_connect(client->sopath, MSG_CONN_SEND_BUF_SIZE, + MSG_CONN_RECV_BUF_SIZE, dbgtag); if (conn->fd == -1) /* retry the connection */ @@ -612,7 +716,8 @@ void msg_client_init(struct msg_client *client, struct event_loop *tm, void (*handle_msg)(uint8_t version, uint8_t *data, size_t len, struct msg_conn *client), size_t max_read_buf, size_t max_write_buf, - size_t max_msg_sz, const char *idtag, bool debug) + size_t max_msg_sz, bool short_circuit_ok, + const char *idtag, bool debug) { struct msg_conn *conn = &client->conn; memset(client, 0, sizeof(*client)); @@ -623,6 +728,7 @@ void msg_client_init(struct msg_client *client, struct event_loop *tm, conn->notify_disconnect = notify_disconnect; conn->is_client = true; conn->debug = debug; + client->short_circuit_ok = short_circuit_ok; client->sopath = strdup(sopath); client->notify_connect = notify_connect; @@ -726,6 +832,8 @@ int msg_server_init(struct msg_server *server, const char *sopath, server->create = create; server->debug = debug; + msg_server_list_add_head(&msg_servers, server); + event_add_read(server->loop, msg_server_accept, server, server->fd, &server->listen_ev); @@ -746,6 +854,9 @@ void msg_server_cleanup(struct msg_server *server) if (server->listen_ev) EVENT_OFF(server->listen_ev); + + msg_server_list_del(&msg_servers, server); + if (server->fd >= 0) close(server->fd); free((char *)server->sopath); |