summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--net/tipc/group.c45
1 files changed, 32 insertions, 13 deletions
diff --git a/net/tipc/group.c b/net/tipc/group.c
index eab862e047dc..8f0eb5d22e8f 100644
--- a/net/tipc/group.c
+++ b/net/tipc/group.c
@@ -71,6 +71,7 @@ struct tipc_member {
u16 advertised;
u16 window;
u16 bc_rcv_nxt;
+ u16 bc_syncpt;
u16 bc_acked;
bool usr_pending;
};
@@ -410,7 +411,7 @@ static void tipc_group_sort_msg(struct sk_buff *skb, struct sk_buff_head *defq)
struct sk_buff *_skb, *tmp;
int mtyp = msg_type(hdr);
- /* Bcast may be bypassed by unicast or other bcast, - sort it in */
+ /* Bcast/mcast may be bypassed by ucast or other bcast, - sort it in */
if (mtyp == TIPC_GRP_BCAST_MSG || mtyp == TIPC_GRP_MCAST_MSG) {
skb_queue_walk_safe(defq, _skb, tmp) {
_hdr = buf_msg(_skb);
@@ -431,7 +432,7 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
struct sk_buff_head *xmitq)
{
struct sk_buff *skb = __skb_dequeue(inputq);
- bool ack, deliver, update;
+ bool ack, deliver, update, leave = false;
struct sk_buff_head *defq;
struct tipc_member *m;
struct tipc_msg *hdr;
@@ -448,13 +449,6 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
if (!msg_in_group(hdr))
goto drop;
- if (msg_is_grp_evt(hdr)) {
- if (!grp->events)
- goto drop;
- __skb_queue_tail(inputq, skb);
- return;
- }
-
m = tipc_group_find_member(grp, node, port);
if (!tipc_group_is_receiver(m))
goto drop;
@@ -490,6 +484,12 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
break;
case TIPC_GRP_UCAST_MSG:
break;
+ case TIPC_GRP_MEMBER_EVT:
+ if (m->state == MBR_LEAVING)
+ leave = true;
+ if (!grp->events)
+ deliver = false;
+ break;
default:
break;
}
@@ -504,6 +504,11 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
if (ack)
tipc_group_proto_xmit(grp, m, GRP_ACK_MSG, xmitq);
+ if (leave) {
+ tipc_group_delete_member(grp, m);
+ __skb_queue_purge(defq);
+ break;
+ }
if (!update)
continue;
@@ -561,6 +566,8 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt);
msg_set_adv_win(hdr, adv);
m->advertised += adv;
+ } else if (mtyp == GRP_LEAVE_MSG) {
+ msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt);
} else if (mtyp == GRP_ADV_MSG) {
msg_set_adv_win(hdr, adv);
m->advertised += adv;
@@ -577,6 +584,7 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
u32 node = msg_orignode(hdr);
u32 port = msg_origport(hdr);
struct tipc_member *m;
+ struct tipc_msg *ehdr;
if (!grp)
return;
@@ -590,7 +598,8 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
MBR_QUARANTINED);
if (!m)
return;
- m->bc_rcv_nxt = msg_grp_bc_syncpt(hdr);
+ m->bc_syncpt = msg_grp_bc_syncpt(hdr);
+ m->bc_rcv_nxt = m->bc_syncpt;
m->window += msg_adv_win(hdr);
/* Wait until PUBLISH event is received */
@@ -601,6 +610,8 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
*usr_wakeup = true;
m->usr_pending = false;
tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
+ ehdr = buf_msg(m->event_msg);
+ msg_set_grp_bc_seqno(ehdr, m->bc_syncpt);
__skb_queue_tail(inputq, m->event_msg);
}
if (m->window < ADV_IDLE)
@@ -611,6 +622,7 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
case GRP_LEAVE_MSG:
if (!m)
return;
+ m->bc_syncpt = msg_grp_bc_syncpt(hdr);
/* Wait until WITHDRAW event is received */
if (m->state != MBR_LEAVING) {
@@ -618,9 +630,10 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
return;
}
/* Otherwise deliver already received WITHDRAW event */
+ ehdr = buf_msg(m->event_msg);
+ msg_set_grp_bc_seqno(ehdr, m->bc_syncpt);
__skb_queue_tail(inputq, m->event_msg);
*usr_wakeup = true;
- tipc_group_delete_member(grp, m);
list_del_init(&m->congested);
return;
case GRP_ADV_MSG:
@@ -662,6 +675,7 @@ void tipc_group_member_evt(struct tipc_group *grp,
int event = evt->event;
struct tipc_member *m;
struct net *net;
+ bool node_up;
u32 self;
if (!grp)
@@ -695,6 +709,7 @@ void tipc_group_member_evt(struct tipc_group *grp,
m->event_msg = skb;
m->state = MBR_PUBLISHED;
} else {
+ msg_set_grp_bc_seqno(hdr, m->bc_syncpt);
__skb_queue_tail(inputq, skb);
m->state = MBR_JOINED;
*usr_wakeup = true;
@@ -715,14 +730,18 @@ void tipc_group_member_evt(struct tipc_group *grp,
*usr_wakeup = true;
m->usr_pending = false;
+ node_up = tipc_node_is_up(net, node);
/* Hold back event if more messages might be expected */
- if (m->state != MBR_LEAVING && tipc_node_is_up(net, node)) {
+ if (m->state != MBR_LEAVING && node_up) {
m->event_msg = skb;
m->state = MBR_LEAVING;
} else {
+ if (node_up)
+ msg_set_grp_bc_seqno(hdr, m->bc_syncpt);
+ else
+ msg_set_grp_bc_seqno(hdr, m->bc_rcv_nxt);
__skb_queue_tail(inputq, skb);
- tipc_group_delete_member(grp, m);
}
list_del_init(&m->congested);
}