diff options
Diffstat (limited to 'zebra')
-rw-r--r-- | zebra/subdir.am | 4 | ||||
-rw-r--r-- | zebra/zebra_mlag.c | 565 | ||||
-rw-r--r-- | zebra/zebra_mlag.h | 12 | ||||
-rw-r--r-- | zebra/zebra_mlag_private.c | 11 | ||||
-rw-r--r-- | zebra/zebra_router.h | 1 |
5 files changed, 553 insertions, 40 deletions
diff --git a/zebra/subdir.am b/zebra/subdir.am index 78d374b7b..d0f32d6a1 100644 --- a/zebra/subdir.am +++ b/zebra/subdir.am @@ -38,6 +38,9 @@ man8 += $(MANBUILD)/zebra.8 endif zebra_zebra_LDADD = lib/libfrr.la $(LIBCAP) +if HAVE_PROTOBUF +zebra_zebra_LDADD += mlag/libmlag_pb.la $(PROTOBUF_C_LIBS) +endif zebra_zebra_SOURCES = \ zebra/connected.c \ zebra/debug.c \ @@ -131,6 +134,7 @@ noinst_HEADERS += \ zebra/rtadv.h \ zebra/rule_netlink.h \ zebra/zebra_mlag.h \ + zebra/zebra_mlag_private.h \ zebra/zebra_fpm_private.h \ zebra/zebra_l2.h \ zebra/zebra_dplane.h \ diff --git a/zebra/zebra_mlag.c b/zebra/zebra_mlag.c index 4d1b7712c..1cbe54b17 100644 --- a/zebra/zebra_mlag.c +++ b/zebra/zebra_mlag.c @@ -108,7 +108,7 @@ void zebra_mlag_process_mlag_data(uint8_t *data, uint32_t len) int msg_type = 0; s = stream_new(ZEBRA_MAX_PACKET_SIZ); - msg_type = zebra_mlag_protobuf_decode_message(&s, data, len); + msg_type = zebra_mlag_protobuf_decode_message(s, data, len); if (msg_type <= 0) { /* Something went wrong in decoding */ @@ -148,6 +148,7 @@ static int zebra_mlag_client_msg_handler(struct thread *event) struct stream *s; uint32_t wr_count = 0; uint32_t msg_type = 0; + uint32_t max_count = 0; int len = 0; wr_count = stream_fifo_count_safe(zrouter.mlag_info.mlag_fifo); @@ -155,12 +156,9 @@ static int zebra_mlag_client_msg_handler(struct thread *event) zlog_debug(":%s: Processing MLAG write, %d messages in queue", __func__, wr_count); - zrouter.mlag_info.t_write = NULL; - for (wr_count = 0; wr_count < ZEBRA_MLAG_POST_LIMIT; wr_count++) { - /* FIFO is empty,wait for teh message to be add */ - if (stream_fifo_count_safe(zrouter.mlag_info.mlag_fifo) == 0) - break; + max_count = MIN(wr_count, ZEBRA_MLAG_POST_LIMIT); + for (wr_count = 0; wr_count < max_count; wr_count++) { s = stream_fifo_pop_safe(zrouter.mlag_info.mlag_fifo); if (!s) { zlog_debug(":%s: Got a NULL Messages, some thing wrong", @@ -168,7 +166,6 @@ static int zebra_mlag_client_msg_handler(struct thread *event) break; } - zebra_mlag_reset_write_buffer(); /* * Encode the data now */ @@ -177,17 +174,19 @@ static int zebra_mlag_client_msg_handler(struct thread *event) /* * write to MCLAGD */ - if (len > 0) + if (len > 0) { zebra_mlag_private_write_data(mlag_wr_buffer, len); - /* - * If message type is De-register, send a signal to main thread, - * so that necessary cleanup will be done by main thread. - */ - if (msg_type == MLAG_DEREGISTER) { - thread_add_event(zrouter.master, - zebra_mlag_terminate_pthread, NULL, 0, - NULL); + /* + * If message type is De-register, send a signal to main + * thread, so that necessary cleanup will be done by + * main thread. + */ + if (msg_type == MLAG_DEREGISTER) { + thread_add_event(zrouter.master, + zebra_mlag_terminate_pthread, + NULL, 0, NULL); + } } stream_free(s); @@ -241,12 +240,15 @@ void zebra_mlag_handle_process_state(enum zebra_mlag_state state) */ static int zebra_mlag_signal_write_thread(void) { - if (zrouter.mlag_info.zebra_pth_mlag) { - if (IS_ZEBRA_DEBUG_MLAG) - zlog_debug(":%s: Scheduling MLAG write", __func__); - thread_add_event(zrouter.mlag_info.th_master, - zebra_mlag_client_msg_handler, NULL, 0, - &zrouter.mlag_info.t_write); + frr_with_mutex (&zrouter.mlag_info.mlag_th_mtx) { + if (zrouter.mlag_info.zebra_pth_mlag) { + if (IS_ZEBRA_DEBUG_MLAG) + zlog_debug(":%s: Scheduling MLAG write", + __func__); + thread_add_event(zrouter.mlag_info.th_master, + zebra_mlag_client_msg_handler, NULL, 0, + &zrouter.mlag_info.t_write); + } } return 0; } @@ -375,7 +377,7 @@ static void zebra_mlag_spawn_pthread(void) zrouter.mlag_info.th_master = zrouter.mlag_info.zebra_pth_mlag->master; - /* Enqueue an initial event for the dataplane pthread */ + /* Enqueue an initial event to the Newly spawn MLAG pthread */ zebra_mlag_signal_write_thread(); frr_pthread_run(zrouter.mlag_info.zebra_pth_mlag, NULL); @@ -583,7 +585,7 @@ DEFUN_HIDDEN (show_mlag, ZEBRA_STR "The mlag role on this machine\n") { - char buf[80]; + char buf[MLAG_ROLE_STRSIZE]; vty_out(vty, "MLag is configured to: %s\n", mlag_role2str(zrouter.mlag_info.role, buf, sizeof(buf))); @@ -600,7 +602,7 @@ DEFPY(test_mlag, test_mlag_cmd, "Mlag is setup to be the secondary\n") { enum mlag_role orig = zrouter.mlag_info.role; - char buf1[80], buf2[80]; + char buf1[MLAG_ROLE_STRSIZE], buf2[MLAG_ROLE_STRSIZE]; if (none) zrouter.mlag_info.role = MLAG_ROLE_NONE; @@ -619,8 +621,12 @@ DEFPY(test_mlag, test_mlag_cmd, if (zrouter.mlag_info.role != MLAG_ROLE_NONE) { if (zrouter.mlag_info.clients_interested_cnt == 0 && test_mlag_in_progress == false) { - if (zrouter.mlag_info.zebra_pth_mlag == NULL) - zebra_mlag_spawn_pthread(); + frr_with_mutex ( + &zrouter.mlag_info.mlag_th_mtx) { + if (zrouter.mlag_info.zebra_pth_mlag + == NULL) + zebra_mlag_spawn_pthread(); + } zrouter.mlag_info.clients_interested_cnt++; test_mlag_in_progress = true; zebra_mlag_private_open_channel(); @@ -655,8 +661,8 @@ void zebra_mlag_init(void) zrouter.mlag_info.t_read = NULL; zrouter.mlag_info.t_write = NULL; test_mlag_in_progress = false; - zebra_mlag_reset_write_buffer(); zebra_mlag_reset_read_buffer(); + pthread_mutex_init(&zrouter.mlag_info.mlag_th_mtx, NULL); } void zebra_mlag_terminate(void) @@ -669,13 +675,514 @@ void zebra_mlag_terminate(void) * ProtoBuf Encoding APIs */ +#ifdef HAVE_PROTOBUF + +DEFINE_MTYPE_STATIC(ZEBRA, MLAG_PBUF, "ZEBRA MLAG PROTOBUF") + +int zebra_mlag_protobuf_encode_client_data(struct stream *s, uint32_t *msg_type) +{ + ZebraMlagHeader hdr = ZEBRA_MLAG__HEADER__INIT; + struct mlag_msg mlag_msg; + uint8_t tmp_buf[ZEBRA_MLAG_BUF_LIMIT]; + int len = 0; + int n_len = 0; + int rc = 0; + char buf[ZLOG_FILTER_LENGTH_MAX]; + + if (IS_ZEBRA_DEBUG_MLAG) + zlog_debug("%s: Entering..", __func__); + + rc = zebra_mlag_lib_decode_mlag_hdr(s, &mlag_msg); + if (rc) + return rc; + + if (IS_ZEBRA_DEBUG_MLAG) + zlog_debug("%s: Decoded msg length:%d..", __func__, + mlag_msg.data_len); + + if (IS_ZEBRA_DEBUG_MLAG) + zlog_debug("%s: Mlag ProtoBuf encoding of message:%s", __func__, + zebra_mlag_lib_msgid_to_str(mlag_msg.msg_type, buf, + sizeof(buf))); + *msg_type = mlag_msg.msg_type; + switch (mlag_msg.msg_type) { + case MLAG_MROUTE_ADD: { + struct mlag_mroute_add msg; + ZebraMlagMrouteAdd pay_load = ZEBRA_MLAG_MROUTE_ADD__INIT; + uint32_t vrf_name_len = 0; + + rc = zebra_mlag_lib_decode_mroute_add(s, &msg); + if (rc) + return rc; + vrf_name_len = strlen(msg.vrf_name) + 1; + pay_load.vrf_name = XMALLOC(MTYPE_MLAG_PBUF, vrf_name_len); + strlcpy(pay_load.vrf_name, msg.vrf_name, vrf_name_len); + pay_load.source_ip = msg.source_ip; + pay_load.group_ip = msg.group_ip; + pay_load.cost_to_rp = msg.cost_to_rp; + pay_load.owner_id = msg.owner_id; + pay_load.am_i_dr = msg.am_i_dr; + pay_load.am_i_dual_active = msg.am_i_dual_active; + pay_load.vrf_id = msg.vrf_id; + + if (msg.owner_id == MLAG_OWNER_INTERFACE) { + vrf_name_len = strlen(msg.intf_name) + 1; + pay_load.intf_name = + XMALLOC(MTYPE_MLAG_PBUF, vrf_name_len); + strlcpy(pay_load.intf_name, msg.intf_name, + vrf_name_len); + } + + len = zebra_mlag_mroute_add__pack(&pay_load, tmp_buf); + XFREE(MTYPE_MLAG_PBUF, pay_load.vrf_name); + if (msg.owner_id == MLAG_OWNER_INTERFACE) + XFREE(MTYPE_MLAG_PBUF, pay_load.intf_name); + } break; + case MLAG_MROUTE_DEL: { + struct mlag_mroute_del msg; + ZebraMlagMrouteDel pay_load = ZEBRA_MLAG_MROUTE_DEL__INIT; + uint32_t vrf_name_len = 0; + + rc = zebra_mlag_lib_decode_mroute_del(s, &msg); + if (rc) + return rc; + vrf_name_len = strlen(msg.vrf_name) + 1; + pay_load.vrf_name = XMALLOC(MTYPE_MLAG_PBUF, vrf_name_len); + strlcpy(pay_load.vrf_name, msg.vrf_name, vrf_name_len); + pay_load.source_ip = msg.source_ip; + pay_load.group_ip = msg.group_ip; + pay_load.owner_id = msg.owner_id; + pay_load.vrf_id = msg.vrf_id; + + if (msg.owner_id == MLAG_OWNER_INTERFACE) { + vrf_name_len = strlen(msg.intf_name) + 1; + pay_load.intf_name = + XMALLOC(MTYPE_MLAG_PBUF, vrf_name_len); + strlcpy(pay_load.intf_name, msg.intf_name, + vrf_name_len); + } + + len = zebra_mlag_mroute_del__pack(&pay_load, tmp_buf); + XFREE(MTYPE_MLAG_PBUF, pay_load.vrf_name); + if (msg.owner_id == MLAG_OWNER_INTERFACE) + XFREE(MTYPE_MLAG_PBUF, pay_load.intf_name); + } break; + case MLAG_MROUTE_ADD_BULK: { + struct mlag_mroute_add msg; + ZebraMlagMrouteAddBulk Bulk_msg = + ZEBRA_MLAG_MROUTE_ADD_BULK__INIT; + ZebraMlagMrouteAdd **pay_load = NULL; + int i; + bool cleanup = false; + + Bulk_msg.n_mroute_add = mlag_msg.msg_cnt; + pay_load = XMALLOC(MTYPE_MLAG_PBUF, sizeof(ZebraMlagMrouteAdd *) + * mlag_msg.msg_cnt); + + for (i = 0; i < mlag_msg.msg_cnt; i++) { + + uint32_t vrf_name_len = 0; + + rc = zebra_mlag_lib_decode_mroute_add(s, &msg); + if (rc) { + cleanup = true; + break; + } + pay_load[i] = XMALLOC(MTYPE_MLAG_PBUF, + sizeof(ZebraMlagMrouteAdd)); + zebra_mlag_mroute_add__init(pay_load[i]); + + vrf_name_len = strlen(msg.vrf_name) + 1; + pay_load[i]->vrf_name = + XMALLOC(MTYPE_MLAG_PBUF, vrf_name_len); + strlcpy(pay_load[i]->vrf_name, msg.vrf_name, + vrf_name_len); + pay_load[i]->source_ip = msg.source_ip; + pay_load[i]->group_ip = msg.group_ip; + pay_load[i]->cost_to_rp = msg.cost_to_rp; + pay_load[i]->owner_id = msg.owner_id; + pay_load[i]->am_i_dr = msg.am_i_dr; + pay_load[i]->am_i_dual_active = msg.am_i_dual_active; + pay_load[i]->vrf_id = msg.vrf_id; + if (msg.owner_id == MLAG_OWNER_INTERFACE) { + vrf_name_len = strlen(msg.intf_name) + 1; + pay_load[i]->intf_name = + XMALLOC(MTYPE_MLAG_PBUF, vrf_name_len); + + strlcpy(pay_load[i]->intf_name, msg.intf_name, + vrf_name_len); + } + } + if (cleanup == false) { + Bulk_msg.mroute_add = pay_load; + len = zebra_mlag_mroute_add_bulk__pack(&Bulk_msg, + tmp_buf); + } + + for (i = 0; i < mlag_msg.msg_cnt; i++) { + if (pay_load[i]->vrf_name) + XFREE(MTYPE_MLAG_PBUF, pay_load[i]->vrf_name); + if (pay_load[i]->owner_id == MLAG_OWNER_INTERFACE + && pay_load[i]->intf_name) + XFREE(MTYPE_MLAG_PBUF, pay_load[i]->intf_name); + if (pay_load[i]) + XFREE(MTYPE_MLAG_PBUF, pay_load[i]); + } + XFREE(MTYPE_MLAG_PBUF, pay_load); + if (cleanup == true) + return -1; + } break; + case MLAG_MROUTE_DEL_BULK: { + struct mlag_mroute_del msg; + ZebraMlagMrouteDelBulk Bulk_msg = + ZEBRA_MLAG_MROUTE_DEL_BULK__INIT; + ZebraMlagMrouteDel **pay_load = NULL; + int i; + bool cleanup = false; + + Bulk_msg.n_mroute_del = mlag_msg.msg_cnt; + pay_load = XMALLOC(MTYPE_MLAG_PBUF, sizeof(ZebraMlagMrouteDel *) + * mlag_msg.msg_cnt); + + for (i = 0; i < mlag_msg.msg_cnt; i++) { + + uint32_t vrf_name_len = 0; + + rc = zebra_mlag_lib_decode_mroute_del(s, &msg); + if (rc) { + cleanup = true; + break; + } + + pay_load[i] = XMALLOC(MTYPE_MLAG_PBUF, + sizeof(ZebraMlagMrouteDel)); + zebra_mlag_mroute_del__init(pay_load[i]); + + vrf_name_len = strlen(msg.vrf_name) + 1; + pay_load[i]->vrf_name = + XMALLOC(MTYPE_MLAG_PBUF, vrf_name_len); + + strlcpy(pay_load[i]->vrf_name, msg.vrf_name, + vrf_name_len); + pay_load[i]->source_ip = msg.source_ip; + pay_load[i]->group_ip = msg.group_ip; + pay_load[i]->owner_id = msg.owner_id; + pay_load[i]->vrf_id = msg.vrf_id; + if (msg.owner_id == MLAG_OWNER_INTERFACE) { + vrf_name_len = strlen(msg.intf_name) + 1; + pay_load[i]->intf_name = + XMALLOC(MTYPE_MLAG_PBUF, vrf_name_len); + + strlcpy(pay_load[i]->intf_name, msg.intf_name, + vrf_name_len); + } + } + if (!cleanup) { + Bulk_msg.mroute_del = pay_load; + len = zebra_mlag_mroute_del_bulk__pack(&Bulk_msg, + tmp_buf); + } + + for (i = 0; i < mlag_msg.msg_cnt; i++) { + if (pay_load[i]->vrf_name) + XFREE(MTYPE_MLAG_PBUF, pay_load[i]->vrf_name); + if (pay_load[i]->owner_id == MLAG_OWNER_INTERFACE + && pay_load[i]->intf_name) + XFREE(MTYPE_MLAG_PBUF, pay_load[i]->intf_name); + if (pay_load[i]) + XFREE(MTYPE_MLAG_PBUF, pay_load[i]); + } + XFREE(MTYPE_MLAG_PBUF, pay_load); + if (cleanup) + return -1; + } break; + default: + break; + } + + if (IS_ZEBRA_DEBUG_MLAG) + zlog_debug("%s: length of Mlag ProtoBuf encoded message:%s, %d", + __func__, + zebra_mlag_lib_msgid_to_str(mlag_msg.msg_type, buf, + sizeof(buf)), + len); + hdr.type = (ZebraMlagHeader__MessageType)mlag_msg.msg_type; + if (len != 0) { + hdr.data.len = len; + hdr.data.data = XMALLOC(MTYPE_MLAG_PBUF, len); + memcpy(hdr.data.data, tmp_buf, len); + } + + /* + * ProtoBuf Infra will not support to demarc the pointers whem multiple + * messages are posted inside a single Buffer. + * 2 -solutions exist to solve this + * 1. add Unenoced length at the beginning of every message, this will + * be used to point to next message in the buffer + * 2. another solution is defining all messages insides another message + * But this will permit only 32 messages. this can be extended with + * multiple levels. + * for simplicity we are going with solution-1. + */ + len = zebra_mlag__header__pack(&hdr, + (mlag_wr_buffer + ZEBRA_MLAG_LEN_SIZE)); + n_len = htonl(len); + memcpy(mlag_wr_buffer, &n_len, ZEBRA_MLAG_LEN_SIZE); + len += ZEBRA_MLAG_LEN_SIZE; + + if (IS_ZEBRA_DEBUG_MLAG) + zlog_debug( + "%s: length of Mlag ProtoBuf message:%s with Header %d", + __func__, + zebra_mlag_lib_msgid_to_str(mlag_msg.msg_type, buf, + sizeof(buf)), + len); + if (hdr.data.data) + XFREE(MTYPE_MLAG_PBUF, hdr.data.data); + + return len; +} + +int zebra_mlag_protobuf_decode_message(struct stream *s, uint8_t *data, + uint32_t len) +{ + uint32_t msg_type; + ZebraMlagHeader *hdr; + char buf[80]; + + hdr = zebra_mlag__header__unpack(NULL, len, data); + if (hdr == NULL) + return -1; + + /* + * ADD The MLAG Header + */ + zclient_create_header(s, ZEBRA_MLAG_FORWARD_MSG, VRF_DEFAULT); + + msg_type = hdr->type; + + if (IS_ZEBRA_DEBUG_MLAG) + zlog_debug("%s: Mlag ProtoBuf decoding of message:%s", __func__, + zebra_mlag_lib_msgid_to_str(msg_type, buf, 80)); + + /* + * Internal MLAG Message-types & MLAG.proto message types should + * always match, otherwise there can be decoding errors + * To avoid exposing clients with Protobuf flags, using internal + * message-types + */ + stream_putl(s, hdr->type); + + if (hdr->data.len == 0) { + /* NULL Payload */ + stream_putw(s, MLAG_MSG_NULL_PAYLOAD); + /* No Batching */ + stream_putw(s, MLAG_MSG_NO_BATCH); + } else { + switch (msg_type) { + case ZEBRA_MLAG__HEADER__MESSAGE_TYPE__ZEBRA_MLAG_STATUS_UPDATE: { + ZebraMlagStatusUpdate *msg = NULL; + + msg = zebra_mlag_status_update__unpack( + NULL, hdr->data.len, hdr->data.data); + if (msg == NULL) { + zebra_mlag__header__free_unpacked(hdr, NULL); + return -1; + } + /* Payload len */ + stream_putw(s, sizeof(struct mlag_status)); + /* No Batching */ + stream_putw(s, MLAG_MSG_NO_BATCH); + /* Actual Data */ + stream_put(s, msg->peerlink, INTERFACE_NAMSIZ); + stream_putl(s, msg->my_role); + stream_putl(s, msg->peer_state); + zebra_mlag_status_update__free_unpacked(msg, NULL); + } break; + case ZEBRA_MLAG__HEADER__MESSAGE_TYPE__ZEBRA_MLAG_VXLAN_UPDATE: { + ZebraMlagVxlanUpdate *msg = NULL; + + msg = zebra_mlag_vxlan_update__unpack( + NULL, hdr->data.len, hdr->data.data); + if (msg == NULL) { + zebra_mlag__header__free_unpacked(hdr, NULL); + return -1; + } + /* Payload len */ + stream_putw(s, sizeof(struct mlag_vxlan)); + /* No Batching */ + stream_putw(s, MLAG_MSG_NO_BATCH); + /* Actual Data */ + stream_putl(s, msg->anycast_ip); + stream_putl(s, msg->local_ip); + zebra_mlag_vxlan_update__free_unpacked(msg, NULL); + } break; + case ZEBRA_MLAG__HEADER__MESSAGE_TYPE__ZEBRA_MLAG_MROUTE_ADD: { + ZebraMlagMrouteAdd *msg = NULL; + + msg = zebra_mlag_mroute_add__unpack(NULL, hdr->data.len, + hdr->data.data); + if (msg == NULL) { + zebra_mlag__header__free_unpacked(hdr, NULL); + return -1; + } + /* Payload len */ + stream_putw(s, sizeof(struct mlag_mroute_add)); + /* No Batching */ + stream_putw(s, MLAG_MSG_NO_BATCH); + /* Actual Data */ + stream_put(s, msg->vrf_name, VRF_NAMSIZ); + + stream_putl(s, msg->source_ip); + stream_putl(s, msg->group_ip); + stream_putl(s, msg->cost_to_rp); + stream_putl(s, msg->owner_id); + stream_putc(s, msg->am_i_dr); + stream_putc(s, msg->am_i_dual_active); + stream_putl(s, msg->vrf_id); + if (msg->owner_id == MLAG_OWNER_INTERFACE) + stream_put(s, msg->intf_name, INTERFACE_NAMSIZ); + else + stream_put(s, NULL, INTERFACE_NAMSIZ); + zebra_mlag_mroute_add__free_unpacked(msg, NULL); + } break; + case ZEBRA_MLAG__HEADER__MESSAGE_TYPE__ZEBRA_MLAG_MROUTE_DEL: { + ZebraMlagMrouteDel *msg = NULL; + + msg = zebra_mlag_mroute_del__unpack(NULL, hdr->data.len, + hdr->data.data); + if (msg == NULL) { + zebra_mlag__header__free_unpacked(hdr, NULL); + return -1; + } + /* Payload len */ + stream_putw(s, sizeof(struct mlag_mroute_del)); + /* No Batching */ + stream_putw(s, MLAG_MSG_NO_BATCH); + /* Actual Data */ + stream_put(s, msg->vrf_name, VRF_NAMSIZ); + + stream_putl(s, msg->source_ip); + stream_putl(s, msg->group_ip); + stream_putl(s, msg->group_ip); + stream_putl(s, msg->owner_id); + stream_putl(s, msg->vrf_id); + if (msg->owner_id == MLAG_OWNER_INTERFACE) + stream_put(s, msg->intf_name, INTERFACE_NAMSIZ); + else + stream_put(s, NULL, INTERFACE_NAMSIZ); + zebra_mlag_mroute_del__free_unpacked(msg, NULL); + } break; + case ZEBRA_MLAG__HEADER__MESSAGE_TYPE__ZEBRA_MLAG_MROUTE_ADD_BULK: { + ZebraMlagMrouteAddBulk *Bulk_msg = NULL; + ZebraMlagMrouteAdd *msg = NULL; + size_t i; + + Bulk_msg = zebra_mlag_mroute_add_bulk__unpack( + NULL, hdr->data.len, hdr->data.data); + if (Bulk_msg == NULL) { + zebra_mlag__header__free_unpacked(hdr, NULL); + return -1; + } + /* Payload len */ + stream_putw(s, (Bulk_msg->n_mroute_add + * sizeof(struct mlag_mroute_add))); + /* No. of msgs in Batch */ + stream_putw(s, Bulk_msg->n_mroute_add); + + /* Actual Data */ + for (i = 0; i < Bulk_msg->n_mroute_add; i++) { + + msg = Bulk_msg->mroute_add[i]; + + stream_put(s, msg->vrf_name, VRF_NAMSIZ); + stream_putl(s, msg->source_ip); + stream_putl(s, msg->group_ip); + stream_putl(s, msg->cost_to_rp); + stream_putl(s, msg->owner_id); + stream_putc(s, msg->am_i_dr); + stream_putc(s, msg->am_i_dual_active); + stream_putl(s, msg->vrf_id); + if (msg->owner_id == MLAG_OWNER_INTERFACE) + stream_put(s, msg->intf_name, + INTERFACE_NAMSIZ); + else + stream_put(s, NULL, INTERFACE_NAMSIZ); + } + zebra_mlag_mroute_add_bulk__free_unpacked(Bulk_msg, + NULL); + } break; + case ZEBRA_MLAG__HEADER__MESSAGE_TYPE__ZEBRA_MLAG_MROUTE_DEL_BULK: { + ZebraMlagMrouteDelBulk *Bulk_msg = NULL; + ZebraMlagMrouteDel *msg = NULL; + size_t i; + + Bulk_msg = zebra_mlag_mroute_del_bulk__unpack( + NULL, hdr->data.len, hdr->data.data); + if (Bulk_msg == NULL) { + zebra_mlag__header__free_unpacked(hdr, NULL); + return -1; + } + /* Payload len */ + stream_putw(s, (Bulk_msg->n_mroute_del + * sizeof(struct mlag_mroute_del))); + /* No. of msgs in Batch */ + stream_putw(s, Bulk_msg->n_mroute_del); + + /* Actual Data */ + for (i = 0; i < Bulk_msg->n_mroute_del; i++) { + + msg = Bulk_msg->mroute_del[i]; + + stream_put(s, msg->vrf_name, VRF_NAMSIZ); + stream_putl(s, msg->source_ip); + stream_putl(s, msg->group_ip); + stream_putl(s, msg->owner_id); + stream_putl(s, msg->vrf_id); + if (msg->owner_id == MLAG_OWNER_INTERFACE) + stream_put(s, msg->intf_name, + INTERFACE_NAMSIZ); + else + stream_put(s, NULL, INTERFACE_NAMSIZ); + } + zebra_mlag_mroute_del_bulk__free_unpacked(Bulk_msg, + NULL); + } break; + case ZEBRA_MLAG__HEADER__MESSAGE_TYPE__ZEBRA_MLAG_ZEBRA_STATUS_UPDATE: { + ZebraMlagZebraStatusUpdate *msg = NULL; + + msg = zebra_mlag_zebra_status_update__unpack( + NULL, hdr->data.len, hdr->data.data); + if (msg == NULL) { + zebra_mlag__header__free_unpacked(hdr, NULL); + return -1; + } + /* Payload len */ + stream_putw(s, sizeof(struct mlag_frr_status)); + /* No Batching */ + stream_putw(s, MLAG_MSG_NO_BATCH); + /* Actual Data */ + stream_putl(s, msg->peer_frrstate); + zebra_mlag_zebra_status_update__free_unpacked(msg, + NULL); + } break; + default: + break; + } + } + zebra_mlag__header__free_unpacked(hdr, NULL); + return msg_type; +} + +#else int zebra_mlag_protobuf_encode_client_data(struct stream *s, uint32_t *msg_type) { return 0; } -int zebra_mlag_protobuf_decode_message(struct stream **s, uint8_t *data, +int zebra_mlag_protobuf_decode_message(struct stream *s, uint8_t *data, uint32_t len) { return 0; } +#endif diff --git a/zebra/zebra_mlag.h b/zebra/zebra_mlag.h index 6506089af..6f7ef8319 100644 --- a/zebra/zebra_mlag.h +++ b/zebra/zebra_mlag.h @@ -26,6 +26,10 @@ #include "zclient.h" #include "zebra/zserv.h" +#ifdef HAVE_PROTOBUF +#include "mlag/mlag.pb-c.h" +#endif + #define ZEBRA_MLAG_BUF_LIMIT 2048 #define ZEBRA_MLAG_LEN_SIZE 4 @@ -33,14 +37,8 @@ extern uint8_t mlag_wr_buffer[ZEBRA_MLAG_BUF_LIMIT]; extern uint8_t mlag_rd_buffer[ZEBRA_MLAG_BUF_LIMIT]; extern uint32_t mlag_rd_buf_offset; -static inline void zebra_mlag_reset_write_buffer(void) -{ - memset(mlag_wr_buffer, 0, ZEBRA_MLAG_BUF_LIMIT); -} - static inline void zebra_mlag_reset_read_buffer(void) { - memset(mlag_rd_buffer, 0, ZEBRA_MLAG_BUF_LIMIT); mlag_rd_buf_offset = 0; } @@ -64,6 +62,6 @@ void zebra_mlag_process_mlag_data(uint8_t *data, uint32_t len); */ int zebra_mlag_protobuf_encode_client_data(struct stream *s, uint32_t *msg_type); -int zebra_mlag_protobuf_decode_message(struct stream **s, uint8_t *data, +int zebra_mlag_protobuf_decode_message(struct stream *s, uint8_t *data, uint32_t len); #endif diff --git a/zebra/zebra_mlag_private.c b/zebra/zebra_mlag_private.c index efaaa73c4..6cb40a9c1 100644 --- a/zebra/zebra_mlag_private.c +++ b/zebra/zebra_mlag_private.c @@ -26,6 +26,7 @@ #include "hook.h" #include "module.h" #include "thread.h" +#include "frr_pthread.h" #include "libfrr.h" #include "version.h" #include "network.h" @@ -70,8 +71,10 @@ int zebra_mlag_private_write_data(uint8_t *data, uint32_t len) static void zebra_mlag_sched_read(void) { - thread_add_read(zmlag_master, zebra_mlag_read, NULL, mlag_socket, - &zrouter.mlag_info.t_read); + frr_with_mutex (&zrouter.mlag_info.mlag_th_mtx) { + thread_add_read(zmlag_master, zebra_mlag_read, NULL, + mlag_socket, &zrouter.mlag_info.t_read); + } } static int zebra_mlag_read(struct thread *thread) @@ -80,8 +83,6 @@ static int zebra_mlag_read(struct thread *thread) uint32_t h_msglen; uint32_t tot_len, curr_len = mlag_rd_buf_offset; - zrouter.mlag_info.t_read = NULL; - /* * Received message in sock_stream looks like below * | len-1 (4 Bytes) | payload-1 (len-1) | @@ -103,6 +104,7 @@ static int zebra_mlag_read(struct thread *thread) zebra_mlag_handle_process_state(MLAG_DOWN); return -1; } + mlag_rd_buf_offset += data_len; if (data_len != (ssize_t)ZEBRA_MLAG_LEN_SIZE - curr_len) { /* Try again later */ zebra_mlag_sched_read(); @@ -131,6 +133,7 @@ static int zebra_mlag_read(struct thread *thread) zebra_mlag_handle_process_state(MLAG_DOWN); return -1; } + mlag_rd_buf_offset += data_len; if (data_len != (ssize_t)tot_len - curr_len) { /* Try again later */ zebra_mlag_sched_read(); diff --git a/zebra/zebra_router.h b/zebra/zebra_router.h index 437cab5bb..6fe3e4840 100644 --- a/zebra/zebra_router.h +++ b/zebra/zebra_router.h @@ -100,6 +100,7 @@ struct zebra_mlag_info { /* Threads for read/write. */ struct thread *t_read; struct thread *t_write; + pthread_mutex_t mlag_th_mtx; }; struct zebra_router { |