diff options
-rw-r--r-- | net/tipc/group.c | 45 |
1 files changed, 32 insertions, 13 deletions
diff --git a/net/tipc/group.c b/net/tipc/group.c index eab862e047dc..8f0eb5d22e8f 100644 --- a/net/tipc/group.c +++ b/net/tipc/group.c @@ -71,6 +71,7 @@ struct tipc_member { u16 advertised; u16 window; u16 bc_rcv_nxt; + u16 bc_syncpt; u16 bc_acked; bool usr_pending; }; @@ -410,7 +411,7 @@ static void tipc_group_sort_msg(struct sk_buff *skb, struct sk_buff_head *defq) struct sk_buff *_skb, *tmp; int mtyp = msg_type(hdr); - /* Bcast may be bypassed by unicast or other bcast, - sort it in */ + /* Bcast/mcast may be bypassed by ucast or other bcast, - sort it in */ if (mtyp == TIPC_GRP_BCAST_MSG || mtyp == TIPC_GRP_MCAST_MSG) { skb_queue_walk_safe(defq, _skb, tmp) { _hdr = buf_msg(_skb); @@ -431,7 +432,7 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, struct sk_buff_head *xmitq) { struct sk_buff *skb = __skb_dequeue(inputq); - bool ack, deliver, update; + bool ack, deliver, update, leave = false; struct sk_buff_head *defq; struct tipc_member *m; struct tipc_msg *hdr; @@ -448,13 +449,6 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, if (!msg_in_group(hdr)) goto drop; - if (msg_is_grp_evt(hdr)) { - if (!grp->events) - goto drop; - __skb_queue_tail(inputq, skb); - return; - } - m = tipc_group_find_member(grp, node, port); if (!tipc_group_is_receiver(m)) goto drop; @@ -490,6 +484,12 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, break; case TIPC_GRP_UCAST_MSG: break; + case TIPC_GRP_MEMBER_EVT: + if (m->state == MBR_LEAVING) + leave = true; + if (!grp->events) + deliver = false; + break; default: break; } @@ -504,6 +504,11 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, if (ack) tipc_group_proto_xmit(grp, m, GRP_ACK_MSG, xmitq); + if (leave) { + tipc_group_delete_member(grp, m); + __skb_queue_purge(defq); + break; + } if (!update) continue; @@ -561,6 +566,8 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt); msg_set_adv_win(hdr, adv); m->advertised += adv; + } else if (mtyp == GRP_LEAVE_MSG) { + msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt); } else if (mtyp == GRP_ADV_MSG) { msg_set_adv_win(hdr, adv); m->advertised += adv; @@ -577,6 +584,7 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, u32 node = msg_orignode(hdr); u32 port = msg_origport(hdr); struct tipc_member *m; + struct tipc_msg *ehdr; if (!grp) return; @@ -590,7 +598,8 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, MBR_QUARANTINED); if (!m) return; - m->bc_rcv_nxt = msg_grp_bc_syncpt(hdr); + m->bc_syncpt = msg_grp_bc_syncpt(hdr); + m->bc_rcv_nxt = m->bc_syncpt; m->window += msg_adv_win(hdr); /* Wait until PUBLISH event is received */ @@ -601,6 +610,8 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, *usr_wakeup = true; m->usr_pending = false; tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); + ehdr = buf_msg(m->event_msg); + msg_set_grp_bc_seqno(ehdr, m->bc_syncpt); __skb_queue_tail(inputq, m->event_msg); } if (m->window < ADV_IDLE) @@ -611,6 +622,7 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, case GRP_LEAVE_MSG: if (!m) return; + m->bc_syncpt = msg_grp_bc_syncpt(hdr); /* Wait until WITHDRAW event is received */ if (m->state != MBR_LEAVING) { @@ -618,9 +630,10 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, return; } /* Otherwise deliver already received WITHDRAW event */ + ehdr = buf_msg(m->event_msg); + msg_set_grp_bc_seqno(ehdr, m->bc_syncpt); __skb_queue_tail(inputq, m->event_msg); *usr_wakeup = true; - tipc_group_delete_member(grp, m); list_del_init(&m->congested); return; case GRP_ADV_MSG: @@ -662,6 +675,7 @@ void tipc_group_member_evt(struct tipc_group *grp, int event = evt->event; struct tipc_member *m; struct net *net; + bool node_up; u32 self; if (!grp) @@ -695,6 +709,7 @@ void tipc_group_member_evt(struct tipc_group *grp, m->event_msg = skb; m->state = MBR_PUBLISHED; } else { + msg_set_grp_bc_seqno(hdr, m->bc_syncpt); __skb_queue_tail(inputq, skb); m->state = MBR_JOINED; *usr_wakeup = true; @@ -715,14 +730,18 @@ void tipc_group_member_evt(struct tipc_group *grp, *usr_wakeup = true; m->usr_pending = false; + node_up = tipc_node_is_up(net, node); /* Hold back event if more messages might be expected */ - if (m->state != MBR_LEAVING && tipc_node_is_up(net, node)) { + if (m->state != MBR_LEAVING && node_up) { m->event_msg = skb; m->state = MBR_LEAVING; } else { + if (node_up) + msg_set_grp_bc_seqno(hdr, m->bc_syncpt); + else + msg_set_grp_bc_seqno(hdr, m->bc_rcv_nxt); __skb_queue_tail(inputq, skb); - tipc_group_delete_member(grp, m); } list_del_init(&m->congested); } |