diff options
-rw-r--r-- | lib/mgmt_be_client.c | 4 | ||||
-rw-r--r-- | lib/mgmt_fe_client.c | 27 | ||||
-rw-r--r-- | lib/mgmt_msg.c | 119 | ||||
-rw-r--r-- | lib/mgmt_msg.h | 28 | ||||
-rw-r--r-- | mgmtd/mgmt_be_adapter.c | 2 | ||||
-rw-r--r-- | mgmtd/mgmt_fe_adapter.c | 18 |
6 files changed, 162 insertions, 36 deletions
diff --git a/lib/mgmt_be_client.c b/lib/mgmt_be_client.c index 0b98f979e..29b54690d 100644 --- a/lib/mgmt_be_client.c +++ b/lib/mgmt_be_client.c @@ -138,7 +138,7 @@ static int mgmt_be_client_send_msg(struct mgmt_be_client_ctx *client_ctx, return msg_conn_send_msg( &client_ctx->client.conn, MGMT_MSG_VERSION_PROTOBUF, be_msg, mgmtd__be_message__get_packed_size(be_msg), - (size_t(*)(void *, void *))mgmtd__be_message__pack); + (size_t(*)(void *, void *))mgmtd__be_message__pack, false); } static struct mgmt_be_batch_ctx * @@ -966,7 +966,7 @@ uintptr_t mgmt_be_client_lib_init(struct mgmt_be_client_params *params, MGMTD_BE_SERVER_PATH, mgmt_be_client_notify_conenct, mgmt_be_client_notify_disconenct, mgmt_be_client_process_msg, MGMTD_BE_MAX_NUM_MSG_PROC, - MGMTD_BE_MAX_NUM_MSG_WRITE, MGMTD_BE_MSG_MAX_LEN, + MGMTD_BE_MAX_NUM_MSG_WRITE, MGMTD_BE_MSG_MAX_LEN, false, "BE-client", MGMTD_DBG_BE_CLIENT_CHECK()); MGMTD_BE_CLIENT_DBG("Initialized client '%s'", params->name); diff --git a/lib/mgmt_fe_client.c b/lib/mgmt_fe_client.c index cf031dcac..9f87c3878 100644 --- a/lib/mgmt_fe_client.c +++ b/lib/mgmt_fe_client.c @@ -98,12 +98,14 @@ mgmt_fe_find_session_by_session_id(struct mgmt_fe_client_ctx *client_ctx, } static int mgmt_fe_client_send_msg(struct mgmt_fe_client_ctx *client_ctx, - Mgmtd__FeMessage *fe_msg) + Mgmtd__FeMessage *fe_msg, + bool short_circuit_ok) { return msg_conn_send_msg( &client_ctx->client.conn, MGMT_MSG_VERSION_PROTOBUF, fe_msg, mgmtd__fe_message__get_packed_size(fe_msg), - (size_t(*)(void *, void *))mgmtd__fe_message__pack); + (size_t(*)(void *, void *))mgmtd__fe_message__pack, + short_circuit_ok); } static int mgmt_fe_send_register_req(struct mgmt_fe_client_ctx *client_ctx) @@ -121,7 +123,7 @@ static int mgmt_fe_send_register_req(struct mgmt_fe_client_ctx *client_ctx) MGMTD_FE_CLIENT_DBG( "Sending REGISTER_REQ message to MGMTD Frontend server"); - return mgmt_fe_client_send_msg(client_ctx, &fe_msg); + return mgmt_fe_client_send_msg(client_ctx, &fe_msg, true); } static int mgmt_fe_send_session_req(struct mgmt_fe_client_ctx *client_ctx, @@ -130,15 +132,18 @@ static int mgmt_fe_send_session_req(struct mgmt_fe_client_ctx *client_ctx, { Mgmtd__FeMessage fe_msg; Mgmtd__FeSessionReq sess_req; + bool scok; mgmtd__fe_session_req__init(&sess_req); sess_req.create = create; if (create) { sess_req.id_case = MGMTD__FE_SESSION_REQ__ID_CLIENT_CONN_ID; sess_req.client_conn_id = session->client_id; + scok = true; } else { sess_req.id_case = MGMTD__FE_SESSION_REQ__ID_SESSION_ID; sess_req.session_id = session->session_id; + scok = false; } mgmtd__fe_message__init(&fe_msg); @@ -149,7 +154,7 @@ static int mgmt_fe_send_session_req(struct mgmt_fe_client_ctx *client_ctx, "Sending SESSION_REQ %s message for client-id %" PRIu64, create ? "create" : "destroy", session->client_id); - return mgmt_fe_client_send_msg(client_ctx, &fe_msg); + return mgmt_fe_client_send_msg(client_ctx, &fe_msg, scok); } static int mgmt_fe_send_lockds_req(struct mgmt_fe_client_ctx *client_ctx, @@ -174,7 +179,7 @@ static int mgmt_fe_send_lockds_req(struct mgmt_fe_client_ctx *client_ctx, "Sending %sLOCK_REQ message for Ds:%d session-id %" PRIu64, lock ? "" : "UN", ds_id, session_id); - return mgmt_fe_client_send_msg(client_ctx, &fe_msg); + return mgmt_fe_client_send_msg(client_ctx, &fe_msg, false); } static int mgmt_fe_send_setcfg_req(struct mgmt_fe_client_ctx *client_ctx, @@ -206,7 +211,7 @@ static int mgmt_fe_send_setcfg_req(struct mgmt_fe_client_ctx *client_ctx, " (#xpaths:%d)", ds_id, session_id, num_data_reqs); - return mgmt_fe_client_send_msg(client_ctx, &fe_msg); + return mgmt_fe_client_send_msg(client_ctx, &fe_msg, false); } static int mgmt_fe_send_commitcfg_req(struct mgmt_fe_client_ctx *client_ctx, @@ -235,7 +240,7 @@ static int mgmt_fe_send_commitcfg_req(struct mgmt_fe_client_ctx *client_ctx, "Sending COMMIT_CONFIG_REQ message for Src-Ds:%d, Dst-Ds:%d session-id %" PRIu64, src_ds_id, dest_ds_id, session_id); - return mgmt_fe_client_send_msg(client_ctx, &fe_msg); + return mgmt_fe_client_send_msg(client_ctx, &fe_msg, false); } static int mgmt_fe_send_getcfg_req(struct mgmt_fe_client_ctx *client_ctx, @@ -264,7 +269,7 @@ static int mgmt_fe_send_getcfg_req(struct mgmt_fe_client_ctx *client_ctx, " (#xpaths:%d)", ds_id, session_id, num_data_reqs); - return mgmt_fe_client_send_msg(client_ctx, &fe_msg); + return mgmt_fe_client_send_msg(client_ctx, &fe_msg, false); } static int mgmt_fe_send_getdata_req(struct mgmt_fe_client_ctx *client_ctx, @@ -293,7 +298,7 @@ static int mgmt_fe_send_getdata_req(struct mgmt_fe_client_ctx *client_ctx, " (#xpaths:%d)", ds_id, session_id, num_data_reqs); - return mgmt_fe_client_send_msg(client_ctx, &fe_msg); + return mgmt_fe_client_send_msg(client_ctx, &fe_msg, false); } static int mgmt_fe_send_regnotify_req(struct mgmt_fe_client_ctx *client_ctx, @@ -318,7 +323,7 @@ static int mgmt_fe_send_regnotify_req(struct mgmt_fe_client_ctx *client_ctx, fe_msg.message_case = MGMTD__FE_MESSAGE__MESSAGE_REGNOTIFY_REQ; fe_msg.regnotify_req = ®ntfy_req; - return mgmt_fe_client_send_msg(client_ctx, &fe_msg); + return mgmt_fe_client_send_msg(client_ctx, &fe_msg, false); } static int mgmt_fe_client_handle_msg(struct mgmt_fe_client_ctx *client_ctx, @@ -640,7 +645,7 @@ uintptr_t mgmt_fe_client_lib_init(struct mgmt_fe_client_params *params, MGMTD_FE_SERVER_PATH, mgmt_fe_client_notify_connect, mgmt_fe_client_notify_disconnect, mgmt_fe_client_process_msg, MGMTD_FE_MAX_NUM_MSG_PROC, - MGMTD_FE_MAX_NUM_MSG_WRITE, MGMTD_FE_MSG_MAX_LEN, + MGMTD_FE_MAX_NUM_MSG_WRITE, MGMTD_FE_MSG_MAX_LEN, false, "FE-client", MGMTD_DBG_FE_CLIENT_CHECK()); MGMTD_FE_CLIENT_DBG("Initialized client '%s'", params->name); diff --git a/lib/mgmt_msg.c b/lib/mgmt_msg.c index 03e896a08..0d9802a2b 100644 --- a/lib/mgmt_msg.c +++ b/lib/mgmt_msg.c @@ -507,6 +507,12 @@ static void msg_conn_sched_proc_msgs(struct msg_conn *conn) void msg_conn_disconnect(struct msg_conn *conn, bool reconnect) { + /* disconnect short-circuit if present */ + if (conn->remote_conn) { + conn->remote_conn->remote_conn = NULL; + conn->remote_conn = NULL; + } + if (conn->fd != -1) { close(conn->fd); conn->fd = -1; @@ -525,14 +531,41 @@ void msg_conn_disconnect(struct msg_conn *conn, bool reconnect) } int msg_conn_send_msg(struct msg_conn *conn, uint8_t version, void *msg, - size_t mlen, size_t (*packf)(void *, void *)) + size_t mlen, size_t (*packf)(void *, void *), + bool short_circuit_ok) { + const char *dbgtag = conn->debug ? conn->mstate.idtag : NULL; + if (conn->fd == -1) { MGMT_MSG_ERR(&conn->mstate, "can't send message on closed connection"); return -1; } + /* immediately handle the message if short-circuit is present */ + if (conn->remote_conn && short_circuit_ok) { + uint8_t *buf = msg; + size_t n = mlen; + + if (packf) { + buf = XMALLOC(MTYPE_TMP, mlen); + n = packf(msg, buf); + } + + MGMT_MSG_DBG(dbgtag, "SC send: depth %u msg: %p", + ++conn->short_circuit_depth, msg); + + conn->remote_conn->handle_msg(version, buf, n, + conn->remote_conn); + + MGMT_MSG_DBG(dbgtag, "SC return from depth: %u msg: %p", + conn->short_circuit_depth--, msg); + + if (packf) + XFREE(MTYPE_TMP, buf); + return 0; + } + int rv = mgmt_msg_send_msg(&conn->mstate, version, msg, mlen, packf, conn->debug); @@ -545,6 +578,12 @@ void msg_conn_cleanup(struct msg_conn *conn) { struct mgmt_msg_state *ms = &conn->mstate; + /* disconnect short-circuit if present */ + if (conn->remote_conn) { + conn->remote_conn->remote_conn = NULL; + conn->remote_conn = NULL; + } + if (conn->fd != -1) { close(conn->fd); conn->fd = -1; @@ -561,6 +600,10 @@ void msg_conn_cleanup(struct msg_conn *conn) * Client Connections */ +DECLARE_LIST(msg_server_list, struct msg_server, link); + +static struct msg_server_list_head msg_servers; + static void msg_client_connect(struct msg_client *conn); static void msg_client_connect_timer(struct event *thread) @@ -583,6 +626,64 @@ static void msg_client_sched_connect(struct msg_client *client, &client->conn_retry_tmr); } +static bool msg_client_connect_short_circuit(struct msg_client *client) +{ + struct msg_conn *server_conn; + struct msg_server *server; + const char *dbgtag = + client->conn.debug ? client->conn.mstate.idtag : NULL; + union sockunion su = {0}; + int sockets[2]; + + frr_each (msg_server_list, &msg_servers, server) + if (!strcmp(server->sopath, client->sopath)) + break; + if (!server) { + MGMT_MSG_DBG(dbgtag, + "no short-circuit connection available for %s", + client->sopath); + + return false; + } + + if (socketpair(AF_UNIX, SOCK_STREAM, 0, sockets)) { + MGMT_MSG_ERR( + &client->conn.mstate, + "socketpair failed trying to short-circuit connection on %s: %s", + client->sopath, safe_strerror(errno)); + return false; + } + + /* client side */ + client->conn.fd = sockets[0]; + set_nonblocking(sockets[0]); + setsockopt_so_sendbuf(sockets[0], client->conn.mstate.max_write_buf); + setsockopt_so_recvbuf(sockets[0], client->conn.mstate.max_read_buf); + client->conn.is_short_circuit = true; + + /* server side */ + memset(&su, 0, sizeof(union sockunion)); + server_conn = server->create(sockets[1], &su); + server_conn->is_short_circuit = true; + + client->conn.remote_conn = server_conn; + server_conn->remote_conn = &client->conn; + + MGMT_MSG_DBG( + dbgtag, + "short-circuit connection on %s server %s:%d to client %s:%d", + client->sopath, server_conn->mstate.idtag, server_conn->fd, + client->conn.mstate.idtag, client->conn.fd); + + MGMT_MSG_DBG( + server_conn->debug ? server_conn->mstate.idtag : NULL, + "short-circuit connection on %s client %s:%d to server %s:%d", + client->sopath, client->conn.mstate.idtag, client->conn.fd, + server_conn->mstate.idtag, server_conn->fd); + + return true; +} + /* Connect and start reading from the socket */ static void msg_client_connect(struct msg_client *client) @@ -590,8 +691,11 @@ static void msg_client_connect(struct msg_client *client) struct msg_conn *conn = &client->conn; const char *dbgtag = conn->debug ? conn->mstate.idtag : NULL; - conn->fd = mgmt_msg_connect(client->sopath, MSG_CONN_SEND_BUF_SIZE, - MSG_CONN_RECV_BUF_SIZE, dbgtag); + if (!client->short_circuit_ok || + !msg_client_connect_short_circuit(client)) + conn->fd = + mgmt_msg_connect(client->sopath, MSG_CONN_SEND_BUF_SIZE, + MSG_CONN_RECV_BUF_SIZE, dbgtag); if (conn->fd == -1) /* retry the connection */ @@ -612,7 +716,8 @@ void msg_client_init(struct msg_client *client, struct event_loop *tm, void (*handle_msg)(uint8_t version, uint8_t *data, size_t len, struct msg_conn *client), size_t max_read_buf, size_t max_write_buf, - size_t max_msg_sz, const char *idtag, bool debug) + size_t max_msg_sz, bool short_circuit_ok, + const char *idtag, bool debug) { struct msg_conn *conn = &client->conn; memset(client, 0, sizeof(*client)); @@ -623,6 +728,7 @@ void msg_client_init(struct msg_client *client, struct event_loop *tm, conn->notify_disconnect = notify_disconnect; conn->is_client = true; conn->debug = debug; + client->short_circuit_ok = short_circuit_ok; client->sopath = strdup(sopath); client->notify_connect = notify_connect; @@ -726,6 +832,8 @@ int msg_server_init(struct msg_server *server, const char *sopath, server->create = create; server->debug = debug; + msg_server_list_add_head(&msg_servers, server); + event_add_read(server->loop, msg_server_accept, server, server->fd, &server->listen_ev); @@ -746,6 +854,9 @@ void msg_server_cleanup(struct msg_server *server) if (server->listen_ev) EVENT_OFF(server->listen_ev); + + msg_server_list_del(&msg_servers, server); + if (server->fd >= 0) close(server->fd); free((char *)server->sopath); diff --git a/lib/mgmt_msg.h b/lib/mgmt_msg.h index 79b1e44c1..9fdcb9ecd 100644 --- a/lib/mgmt_msg.h +++ b/lib/mgmt_msg.h @@ -92,11 +92,14 @@ struct msg_conn { struct event *read_ev; struct event *write_ev; struct event *proc_msg_ev; + struct msg_conn *remote_conn; int (*notify_disconnect)(struct msg_conn *conn); void (*handle_msg)(uint8_t version, uint8_t *data, size_t len, struct msg_conn *conn); void *user; + uint short_circuit_depth; bool is_client; + bool is_short_circuit; bool debug; }; @@ -110,7 +113,8 @@ extern void msg_conn_cleanup(struct msg_conn *conn); extern void msg_conn_disconnect(struct msg_conn *conn, bool reconnect); extern int msg_conn_send_msg(struct msg_conn *client, uint8_t version, void *msg, size_t mlen, - size_t (*packf)(void *, void *)); + size_t (*packf)(void *, void *), + bool short_circuit_ok); /* * Client-side Connections @@ -121,6 +125,7 @@ struct msg_client { struct event *conn_retry_tmr; char *sopath; int (*notify_connect)(struct msg_client *client); + bool short_circuit_ok; }; /* @@ -135,23 +140,26 @@ extern void msg_client_cleanup(struct msg_client *client); * called for a client which is currently connected. The socket is closed * but there is no notification. */ -extern void msg_client_init(struct msg_client *client, struct event_loop *tm, - const char *sopath, - int (*notify_connect)(struct msg_client *client), - int (*notify_disconnect)(struct msg_conn *client), - void (*handle_msg)(uint8_t version, uint8_t *data, - size_t len, - struct msg_conn *client), - size_t max_read_buf, size_t max_write_buf, - size_t max_msg_sz, const char *idtag, bool debug); +extern void +msg_client_init(struct msg_client *client, struct event_loop *tm, + const char *sopath, + int (*notify_connect)(struct msg_client *client), + int (*notify_disconnect)(struct msg_conn *client), + void (*handle_msg)(uint8_t version, uint8_t *data, size_t len, + struct msg_conn *client), + size_t max_read_buf, size_t max_write_buf, size_t max_msg_sz, + bool short_circuit_ok, const char *idtag, bool debug); /* * Server-side Connections */ #define MGMTD_MAX_CONN 32 +PREDECL_LIST(msg_server_list); + struct msg_server { int fd; + struct msg_server_list_item link; struct event_loop *loop; struct event *listen_ev; const char *sopath; diff --git a/mgmtd/mgmt_be_adapter.c b/mgmtd/mgmt_be_adapter.c index da67511e0..0eba9120e 100644 --- a/mgmtd/mgmt_be_adapter.c +++ b/mgmtd/mgmt_be_adapter.c @@ -489,7 +489,7 @@ static int mgmt_be_adapter_send_msg(struct mgmt_be_client_adapter *adapter, return msg_conn_send_msg( adapter->conn, MGMT_MSG_VERSION_PROTOBUF, be_msg, mgmtd__be_message__get_packed_size(be_msg), - (size_t(*)(void *, void *))mgmtd__be_message__pack); + (size_t(*)(void *, void *))mgmtd__be_message__pack, false); } static int mgmt_be_send_txn_req(struct mgmt_be_client_adapter *adapter, diff --git a/mgmtd/mgmt_fe_adapter.c b/mgmtd/mgmt_fe_adapter.c index b9753e21d..996d9a230 100644 --- a/mgmtd/mgmt_fe_adapter.c +++ b/mgmtd/mgmt_fe_adapter.c @@ -327,12 +327,14 @@ mgmt_fe_create_session(struct mgmt_fe_client_adapter *adapter, } static int mgmt_fe_adapter_send_msg(struct mgmt_fe_client_adapter *adapter, - Mgmtd__FeMessage *fe_msg) + Mgmtd__FeMessage *fe_msg, + bool short_circuit_ok) { return msg_conn_send_msg( adapter->conn, MGMT_MSG_VERSION_PROTOBUF, fe_msg, mgmtd__fe_message__get_packed_size(fe_msg), - (size_t(*)(void *, void *))mgmtd__fe_message__pack); + (size_t(*)(void *, void *))mgmtd__fe_message__pack, + short_circuit_ok); } static int @@ -360,7 +362,7 @@ mgmt_fe_send_session_reply(struct mgmt_fe_client_adapter *adapter, "Sending SESSION_REPLY message to MGMTD Frontend client '%s'", adapter->name); - return mgmt_fe_adapter_send_msg(adapter, &fe_msg); + return mgmt_fe_adapter_send_msg(adapter, &fe_msg, true); } static int mgmt_fe_send_lockds_reply(struct mgmt_fe_session_ctx *session, @@ -390,7 +392,7 @@ static int mgmt_fe_send_lockds_reply(struct mgmt_fe_session_ctx *session, "Sending LOCK_DS_REPLY message to MGMTD Frontend client '%s'", session->adapter->name); - return mgmt_fe_adapter_send_msg(session->adapter, &fe_msg); + return mgmt_fe_adapter_send_msg(session->adapter, &fe_msg, false); } static int mgmt_fe_send_setcfg_reply(struct mgmt_fe_session_ctx *session, @@ -436,7 +438,7 @@ static int mgmt_fe_send_setcfg_reply(struct mgmt_fe_session_ctx *session, gettimeofday(&session->adapter->setcfg_stats.last_end, NULL); mgmt_fe_adapter_compute_set_cfg_timers(&session->adapter->setcfg_stats); - return mgmt_fe_adapter_send_msg(session->adapter, &fe_msg); + return mgmt_fe_adapter_send_msg(session->adapter, &fe_msg, false); } static int mgmt_fe_send_commitcfg_reply( @@ -482,7 +484,7 @@ static int mgmt_fe_send_commitcfg_reply( if (mm->perf_stats_en) gettimeofday(&session->adapter->cmt_stats.last_end, NULL); mgmt_fe_session_compute_commit_timers(&session->adapter->cmt_stats); - return mgmt_fe_adapter_send_msg(session->adapter, &fe_msg); + return mgmt_fe_adapter_send_msg(session->adapter, &fe_msg, false); } static int mgmt_fe_send_getcfg_reply(struct mgmt_fe_session_ctx *session, @@ -520,7 +522,7 @@ static int mgmt_fe_send_getcfg_reply(struct mgmt_fe_session_ctx *session, mgmt_fe_session_register_event( session, MGMTD_FE_SESSION_SHOW_TXN_CLNUP); - return mgmt_fe_adapter_send_msg(session->adapter, &fe_msg); + return mgmt_fe_adapter_send_msg(session->adapter, &fe_msg, false); } static int mgmt_fe_send_getdata_reply(struct mgmt_fe_session_ctx *session, @@ -558,7 +560,7 @@ static int mgmt_fe_send_getdata_reply(struct mgmt_fe_session_ctx *session, mgmt_fe_session_register_event( session, MGMTD_FE_SESSION_SHOW_TXN_CLNUP); - return mgmt_fe_adapter_send_msg(session->adapter, &fe_msg); + return mgmt_fe_adapter_send_msg(session->adapter, &fe_msg, false); } static void mgmt_fe_session_cfg_txn_clnup(struct event *thread) |