summaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorJon Maloy <jon.maloy@ericsson.com>2018-01-08 21:03:30 +0100
committerDavid S. Miller <davem@davemloft.net>2018-01-09 18:35:58 +0100
commit232d07b74a33b9f5d48516dc1d8ce41723ada593 (patch)
tree04a2df7d400b7aa4fbb0028d033bbace6e0ebdcc /net
parenttipc: add option to suppress PUBLISH events for pre-existing publications (diff)
downloadlinux-232d07b74a33b9f5d48516dc1d8ce41723ada593.tar.xz
linux-232d07b74a33b9f5d48516dc1d8ce41723ada593.zip
tipc: improve groupcast scope handling
When a member joins a group, it also indicates a binding scope. This makes it possible to create both node local groups, invisible to other nodes, as well as cluster global groups, visible everywhere. In order to avoid that different members end up having permanently differing views of group size and memberhip, we must inhibit locally and globally bound members from joining the same group. We do this by using the binding scope as an additional separator between groups. I.e., a member must ignore all membership events from sockets using a different scope than itself, and all lookups for message destinations must require an exact match between the message's lookup scope and the potential target's binding scope. Apart from making it possible to create local groups using the same identity on different nodes, a side effect of this is that it now also becomes possible to create a cluster global group with the same identity across the same nodes, without interfering with the local groups. Acked-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net')
-rw-r--r--net/tipc/group.c13
-rw-r--r--net/tipc/name_table.c40
-rw-r--r--net/tipc/name_table.h4
-rw-r--r--net/tipc/server.c4
-rw-r--r--net/tipc/server.h6
-rw-r--r--net/tipc/socket.c88
-rw-r--r--net/tipc/subscr.c10
-rw-r--r--net/tipc/subscr.h2
8 files changed, 96 insertions, 71 deletions
diff --git a/net/tipc/group.c b/net/tipc/group.c
index cf996bd6ec98..1908773c9fca 100644
--- a/net/tipc/group.c
+++ b/net/tipc/group.c
@@ -87,7 +87,6 @@ struct tipc_group {
int subid;
u32 type;
u32 instance;
- u32 domain;
u32 scope;
u32 portid;
u16 member_cnt;
@@ -158,6 +157,8 @@ int tipc_group_size(struct tipc_group *grp)
struct tipc_group *tipc_group_create(struct net *net, u32 portid,
struct tipc_group_req *mreq)
{
+ u32 filter = TIPC_SUB_PORTS | TIPC_SUB_NO_STATUS;
+ bool global = mreq->scope != TIPC_NODE_SCOPE;
struct tipc_group *grp;
u32 type = mreq->type;
@@ -171,15 +172,14 @@ struct tipc_group *tipc_group_create(struct net *net, u32 portid,
grp->members = RB_ROOT;
grp->net = net;
grp->portid = portid;
- grp->domain = addr_domain(net, mreq->scope);
grp->type = type;
grp->instance = mreq->instance;
grp->scope = mreq->scope;
grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK;
grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS;
- if (tipc_topsrv_kern_subscr(net, portid, type,
- TIPC_SUB_PORTS | TIPC_SUB_NO_STATUS,
- 0, ~0, &grp->subid))
+ filter |= global ? TIPC_SUB_CLUSTER_SCOPE : TIPC_SUB_NODE_SCOPE;
+ if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0,
+ filter, &grp->subid))
return grp;
kfree(grp);
return NULL;
@@ -732,6 +732,9 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
if (!grp)
return;
+ if (grp->scope == TIPC_NODE_SCOPE && node != tipc_own_addr(grp->net))
+ return;
+
m = tipc_group_find_member(grp, node, port);
switch (msg_type(hdr)) {
diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
index 60af9885f160..64cdd3c302b0 100644
--- a/net/tipc/name_table.c
+++ b/net/tipc/name_table.c
@@ -328,7 +328,8 @@ static struct publication *tipc_nameseq_insert_publ(struct net *net,
list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) {
tipc_subscrp_report_overlap(s, publ->lower, publ->upper,
TIPC_PUBLISHED, publ->ref,
- publ->node, created_subseq);
+ publ->node, publ->scope,
+ created_subseq);
}
return publ;
}
@@ -398,7 +399,8 @@ found:
list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) {
tipc_subscrp_report_overlap(s, publ->lower, publ->upper,
TIPC_WITHDRAWN, publ->ref,
- publ->node, removed_subseq);
+ publ->node, publ->scope,
+ removed_subseq);
}
return publ;
@@ -435,6 +437,7 @@ static void tipc_nameseq_subscribe(struct name_seq *nseq,
sseq->upper,
TIPC_PUBLISHED,
crs->ref, crs->node,
+ crs->scope,
must_report);
must_report = 0;
}
@@ -598,7 +601,7 @@ not_found:
return ref;
}
-bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 domain,
+bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 scope,
struct list_head *dsts, int *dstcnt, u32 exclude,
bool all)
{
@@ -608,9 +611,6 @@ bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 domain,
struct name_seq *seq;
struct sub_seq *sseq;
- if (!tipc_in_scope(domain, self))
- return false;
-
*dstcnt = 0;
rcu_read_lock();
seq = nametbl_find_seq(net, type);
@@ -621,7 +621,7 @@ bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 domain,
if (likely(sseq)) {
info = sseq->info;
list_for_each_entry(publ, &info->zone_list, zone_list) {
- if (!tipc_in_scope(domain, publ->node))
+ if (publ->scope != scope)
continue;
if (publ->ref == exclude && publ->node == self)
continue;
@@ -639,13 +639,14 @@ exit:
return !list_empty(dsts);
}
-int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
- u32 limit, struct list_head *dports)
+int tipc_nametbl_mc_lookup(struct net *net, u32 type, u32 lower, u32 upper,
+ u32 scope, bool exact, struct list_head *dports)
{
- struct name_seq *seq;
- struct sub_seq *sseq;
struct sub_seq *sseq_stop;
struct name_info *info;
+ struct publication *p;
+ struct name_seq *seq;
+ struct sub_seq *sseq;
int res = 0;
rcu_read_lock();
@@ -657,15 +658,12 @@ int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
sseq = seq->sseqs + nameseq_locate_subseq(seq, lower);
sseq_stop = seq->sseqs + seq->first_free;
for (; sseq != sseq_stop; sseq++) {
- struct publication *publ;
-
if (sseq->lower > upper)
break;
-
info = sseq->info;
- list_for_each_entry(publ, &info->node_list, node_list) {
- if (publ->scope <= limit)
- tipc_dest_push(dports, 0, publ->ref);
+ list_for_each_entry(p, &info->node_list, node_list) {
+ if (p->scope == scope || (!exact && p->scope < scope))
+ tipc_dest_push(dports, 0, p->ref);
}
if (info->cluster_list_size != info->node_list_size)
@@ -682,7 +680,7 @@ exit:
* - Determines if any node local ports overlap
*/
void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,
- u32 upper, u32 domain,
+ u32 upper, u32 scope,
struct tipc_nlist *nodes)
{
struct sub_seq *sseq, *stop;
@@ -701,7 +699,7 @@ void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,
for (; sseq != stop && sseq->lower <= upper; sseq++) {
info = sseq->info;
list_for_each_entry(publ, &info->zone_list, zone_list) {
- if (tipc_in_scope(domain, publ->node))
+ if (publ->scope == scope)
tipc_nlist_add(nodes, publ->node);
}
}
@@ -713,7 +711,7 @@ exit:
/* tipc_nametbl_build_group - build list of communication group members
*/
void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp,
- u32 type, u32 domain)
+ u32 type, u32 scope)
{
struct sub_seq *sseq, *stop;
struct name_info *info;
@@ -731,7 +729,7 @@ void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp,
for (; sseq != stop; sseq++) {
info = sseq->info;
list_for_each_entry(p, &info->zone_list, zone_list) {
- if (!tipc_in_scope(domain, p->node))
+ if (p->scope != scope)
continue;
tipc_group_add_member(grp, p->node, p->ref, p->lower);
}
diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h
index 73a148c85c15..b595d8aa00f0 100644
--- a/net/tipc/name_table.h
+++ b/net/tipc/name_table.h
@@ -100,8 +100,8 @@ struct name_table {
int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb);
u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, u32 *node);
-int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
- u32 limit, struct list_head *dports);
+int tipc_nametbl_mc_lookup(struct net *net, u32 type, u32 lower, u32 upper,
+ u32 scope, bool exact, struct list_head *dports);
void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp,
u32 type, u32 domain);
void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,
diff --git a/net/tipc/server.c b/net/tipc/server.c
index 950c54cbcf3a..8ee5e86b7870 100644
--- a/net/tipc/server.c
+++ b/net/tipc/server.c
@@ -489,8 +489,8 @@ void tipc_conn_terminate(struct tipc_server *s, int conid)
}
}
-bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type,
- u32 filter, u32 lower, u32 upper, int *conid)
+bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
+ u32 upper, u32 filter, int *conid)
{
struct tipc_subscriber *scbr;
struct tipc_subscr sub;
diff --git a/net/tipc/server.h b/net/tipc/server.h
index ea1effbff23e..17f49ee44cfd 100644
--- a/net/tipc/server.h
+++ b/net/tipc/server.h
@@ -41,6 +41,8 @@
#include <net/net_namespace.h>
#define TIPC_SERVER_NAME_LEN 32
+#define TIPC_SUB_CLUSTER_SCOPE 0x20
+#define TIPC_SUB_NODE_SCOPE 0x40
#define TIPC_SUB_NO_STATUS 0x80
/**
@@ -84,8 +86,8 @@ struct tipc_server {
int tipc_conn_sendmsg(struct tipc_server *s, int conid,
struct sockaddr_tipc *addr, void *data, size_t len);
-bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type,
- u32 filter, u32 lower, u32 upper, int *conid);
+bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
+ u32 upper, u32 filter, int *conid);
void tipc_topsrv_kern_unsubscr(struct net *net, int conid);
/**
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index e3a02f1fcab5..b24dab3996c9 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -928,21 +928,22 @@ static int tipc_send_group_anycast(struct socket *sock, struct msghdr *m,
struct list_head *cong_links = &tsk->cong_links;
int blks = tsk_blocks(GROUP_H_SIZE + dlen);
struct tipc_group *grp = tsk->group;
+ struct tipc_msg *hdr = &tsk->phdr;
struct tipc_member *first = NULL;
struct tipc_member *mbr = NULL;
struct net *net = sock_net(sk);
u32 node, port, exclude;
- u32 type, inst, domain;
struct list_head dsts;
+ u32 type, inst, scope;
int lookups = 0;
int dstcnt, rc;
bool cong;
INIT_LIST_HEAD(&dsts);
- type = dest->addr.name.name.type;
+ type = msg_nametype(hdr);
inst = dest->addr.name.name.instance;
- domain = addr_domain(net, dest->scope);
+ scope = msg_lookup_scope(hdr);
exclude = tipc_group_exclude(grp);
while (++lookups < 4) {
@@ -950,7 +951,7 @@ static int tipc_send_group_anycast(struct socket *sock, struct msghdr *m,
/* Look for a non-congested destination member, if any */
while (1) {
- if (!tipc_nametbl_lookup(net, type, inst, domain, &dsts,
+ if (!tipc_nametbl_lookup(net, type, inst, scope, &dsts,
&dstcnt, exclude, false))
return -EHOSTUNREACH;
tipc_dest_pop(&dsts, &node, &port);
@@ -1079,22 +1080,23 @@ static int tipc_send_group_mcast(struct socket *sock, struct msghdr *m,
{
struct sock *sk = sock->sk;
DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
- struct tipc_name_seq *seq = &dest->addr.nameseq;
struct tipc_sock *tsk = tipc_sk(sk);
struct tipc_group *grp = tsk->group;
+ struct tipc_msg *hdr = &tsk->phdr;
struct net *net = sock_net(sk);
- u32 domain, exclude, dstcnt;
+ u32 type, inst, scope, exclude;
struct list_head dsts;
+ u32 dstcnt;
INIT_LIST_HEAD(&dsts);
- if (seq->lower != seq->upper)
- return -ENOTSUPP;
-
- domain = addr_domain(net, dest->scope);
+ type = msg_nametype(hdr);
+ inst = dest->addr.name.name.instance;
+ scope = msg_lookup_scope(hdr);
exclude = tipc_group_exclude(grp);
- if (!tipc_nametbl_lookup(net, seq->type, seq->lower, domain,
- &dsts, &dstcnt, exclude, true))
+
+ if (!tipc_nametbl_lookup(net, type, inst, scope, &dsts,
+ &dstcnt, exclude, true))
return -EHOSTUNREACH;
if (dstcnt == 1) {
@@ -1116,24 +1118,29 @@ static int tipc_send_group_mcast(struct socket *sock, struct msghdr *m,
void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
struct sk_buff_head *inputq)
{
- u32 scope = TIPC_CLUSTER_SCOPE;
u32 self = tipc_own_addr(net);
+ u32 type, lower, upper, scope;
struct sk_buff *skb, *_skb;
- u32 lower = 0, upper = ~0;
- struct sk_buff_head tmpq;
u32 portid, oport, onode;
+ struct sk_buff_head tmpq;
struct list_head dports;
- struct tipc_msg *msg;
- int user, mtyp, hsz;
+ struct tipc_msg *hdr;
+ int user, mtyp, hlen;
+ bool exact;
__skb_queue_head_init(&tmpq);
INIT_LIST_HEAD(&dports);
skb = tipc_skb_peek(arrvq, &inputq->lock);
for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
- msg = buf_msg(skb);
- user = msg_user(msg);
- mtyp = msg_type(msg);
+ hdr = buf_msg(skb);
+ user = msg_user(hdr);
+ mtyp = msg_type(hdr);
+ hlen = skb_headroom(skb) + msg_hdr_sz(hdr);
+ oport = msg_origport(hdr);
+ onode = msg_orignode(hdr);
+ type = msg_nametype(hdr);
+
if (mtyp == TIPC_GRP_UCAST_MSG || user == GROUP_PROTOCOL) {
spin_lock_bh(&inputq->lock);
if (skb_peek(arrvq) == skb) {
@@ -1144,21 +1151,31 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
spin_unlock_bh(&inputq->lock);
continue;
}
- hsz = skb_headroom(skb) + msg_hdr_sz(msg);
- oport = msg_origport(msg);
- onode = msg_orignode(msg);
- if (onode == self)
- scope = TIPC_NODE_SCOPE;
-
- /* Create destination port list and message clones: */
- if (!msg_in_group(msg)) {
- lower = msg_namelower(msg);
- upper = msg_nameupper(msg);
+
+ /* Group messages require exact scope match */
+ if (msg_in_group(hdr)) {
+ lower = 0;
+ upper = ~0;
+ scope = msg_lookup_scope(hdr);
+ exact = true;
+ } else {
+ /* TIPC_NODE_SCOPE means "any scope" in this context */
+ if (onode == self)
+ scope = TIPC_NODE_SCOPE;
+ else
+ scope = TIPC_CLUSTER_SCOPE;
+ exact = false;
+ lower = msg_namelower(hdr);
+ upper = msg_nameupper(hdr);
}
- tipc_nametbl_mc_translate(net, msg_nametype(msg), lower, upper,
- scope, &dports);
+
+ /* Create destination port list: */
+ tipc_nametbl_mc_lookup(net, type, lower, upper,
+ scope, exact, &dports);
+
+ /* Clone message per destination */
while (tipc_dest_pop(&dports, NULL, &portid)) {
- _skb = __pskb_copy(skb, hsz, GFP_ATOMIC);
+ _skb = __pskb_copy(skb, hlen, GFP_ATOMIC);
if (_skb) {
msg_set_destport(buf_msg(_skb), portid);
__skb_queue_tail(&tmpq, _skb);
@@ -2731,7 +2748,6 @@ void tipc_sk_rht_destroy(struct net *net)
static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq)
{
struct net *net = sock_net(&tsk->sk);
- u32 domain = addr_domain(net, mreq->scope);
struct tipc_group *grp = tsk->group;
struct tipc_msg *hdr = &tsk->phdr;
struct tipc_name_seq seq;
@@ -2739,6 +2755,8 @@ static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq)
if (mreq->type < TIPC_RESERVED_TYPES)
return -EACCES;
+ if (mreq->scope > TIPC_NODE_SCOPE)
+ return -EINVAL;
if (grp)
return -EACCES;
grp = tipc_group_create(net, tsk->portid, mreq);
@@ -2751,7 +2769,7 @@ static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq)
seq.type = mreq->type;
seq.lower = mreq->instance;
seq.upper = seq.lower;
- tipc_nametbl_build_group(net, grp, mreq->type, domain);
+ tipc_nametbl_build_group(net, grp, mreq->type, mreq->scope);
rc = tipc_sk_publish(tsk, mreq->scope, &seq);
if (rc) {
tipc_group_delete(net, grp);
diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
index 1052341a0ea9..44df528ed6ab 100644
--- a/net/tipc/subscr.c
+++ b/net/tipc/subscr.c
@@ -118,15 +118,19 @@ void tipc_subscrp_convert_seq(struct tipc_name_seq *in, int swap,
void tipc_subscrp_report_overlap(struct tipc_subscription *sub, u32 found_lower,
u32 found_upper, u32 event, u32 port_ref,
- u32 node, int must)
+ u32 node, u32 scope, int must)
{
+ u32 filter = htohl(sub->evt.s.filter, sub->swap);
struct tipc_name_seq seq;
tipc_subscrp_convert_seq(&sub->evt.s.seq, sub->swap, &seq);
if (!tipc_subscrp_check_overlap(&seq, found_lower, found_upper))
return;
- if (!must &&
- !(htohl(sub->evt.s.filter, sub->swap) & TIPC_SUB_PORTS))
+ if (!must && !(filter & TIPC_SUB_PORTS))
+ return;
+ if (filter & TIPC_SUB_CLUSTER_SCOPE && scope == TIPC_NODE_SCOPE)
+ return;
+ if (filter & TIPC_SUB_NODE_SCOPE && scope != TIPC_NODE_SCOPE)
return;
tipc_subscrp_send_event(sub, found_lower, found_upper, event, port_ref,
diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h
index ee52957dc952..f3edca775d9f 100644
--- a/net/tipc/subscr.h
+++ b/net/tipc/subscr.h
@@ -71,7 +71,7 @@ int tipc_subscrp_check_overlap(struct tipc_name_seq *seq, u32 found_lower,
u32 found_upper);
void tipc_subscrp_report_overlap(struct tipc_subscription *sub,
u32 found_lower, u32 found_upper, u32 event,
- u32 port_ref, u32 node, int must);
+ u32 port_ref, u32 node, u32 scope, int must);
void tipc_subscrp_convert_seq(struct tipc_name_seq *in, int swap,
struct tipc_name_seq *out);
u32 tipc_subscrp_convert_seq_type(u32 type, int swap);