diff options
-rw-r--r-- | net/tipc/server.c | 161 | ||||
-rw-r--r-- | net/tipc/server.h | 2 | ||||
-rw-r--r-- | net/tipc/subscr.c | 173 | ||||
-rw-r--r-- | net/tipc/subscr.h | 17 |
4 files changed, 146 insertions, 207 deletions
diff --git a/net/tipc/server.c b/net/tipc/server.c index 8aa2a33b1e48..b8268c0882a7 100644 --- a/net/tipc/server.c +++ b/net/tipc/server.c @@ -2,6 +2,7 @@ * net/tipc/server.c: TIPC server infrastructure * * Copyright (c) 2012-2013, Wind River Systems + * Copyright (c) 2017, Ericsson AB * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -57,12 +58,13 @@ * @sock: socket handler associated with connection * @flags: indicates connection state * @server: pointer to connected server + * @sub_list: lsit to all pertaing subscriptions + * @sub_lock: lock protecting the subscription list + * @outqueue_lock: control access to the outqueue * @rwork: receive work item - * @usr_data: user-specified field * @rx_action: what to do when connection socket is active * @outqueue: pointer to first outbound message in queue * @outqueue_lock: control access to the outqueue - * @outqueue: list of connection objects for its server * @swork: send work item */ struct tipc_conn { @@ -71,9 +73,10 @@ struct tipc_conn { struct socket *sock; unsigned long flags; struct tipc_server *server; + struct list_head sub_list; + spinlock_t sub_lock; /* for subscription list */ struct work_struct rwork; int (*rx_action) (struct tipc_conn *con); - void *usr_data; struct list_head outqueue; spinlock_t outqueue_lock; struct work_struct swork; @@ -81,6 +84,7 @@ struct tipc_conn { /* An entry waiting to be sent */ struct outqueue_entry { + u32 evt; struct list_head list; struct kvec iov; }; @@ -89,18 +93,33 @@ static void tipc_recv_work(struct work_struct *work); static void tipc_send_work(struct work_struct *work); static void tipc_clean_outqueues(struct tipc_conn *con); +static bool connected(struct tipc_conn *con) +{ + return con && test_bit(CF_CONNECTED, &con->flags); +} + +/** + * htohl - convert value to endianness used by destination + * @in: value to convert + * @swap: non-zero if endianness must be reversed + * + * Returns converted value + */ +static u32 htohl(u32 in, int swap) +{ + return swap ? swab32(in) : in; +} + static void tipc_conn_kref_release(struct kref *kref) { struct tipc_conn *con = container_of(kref, struct tipc_conn, kref); struct tipc_server *s = con->server; struct socket *sock = con->sock; - struct sock *sk; if (sock) { - sk = sock->sk; if (test_bit(CF_SERVER, &con->flags)) { __module_get(sock->ops->owner); - __module_get(sk->sk_prot_creator->owner); + __module_get(sock->sk->sk_prot_creator->owner); } sock_release(sock); con->sock = NULL; @@ -129,11 +148,8 @@ static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, int conid) spin_lock_bh(&s->idr_lock); con = idr_find(&s->conn_idr, conid); - if (con) { - if (!test_bit(CF_CONNECTED, &con->flags) || - !kref_get_unless_zero(&con->kref)) - con = NULL; - } + if (!connected(con) || !kref_get_unless_zero(&con->kref)) + con = NULL; spin_unlock_bh(&s->idr_lock); return con; } @@ -144,7 +160,7 @@ static void sock_data_ready(struct sock *sk) read_lock_bh(&sk->sk_callback_lock); con = sock2con(sk); - if (con && test_bit(CF_CONNECTED, &con->flags)) { + if (connected(con)) { conn_get(con); if (!queue_work(con->server->rcv_wq, &con->rwork)) conn_put(con); @@ -158,7 +174,7 @@ static void sock_write_space(struct sock *sk) read_lock_bh(&sk->sk_callback_lock); con = sock2con(sk); - if (con && test_bit(CF_CONNECTED, &con->flags)) { + if (connected(con)) { conn_get(con); if (!queue_work(con->server->send_wq, &con->swork)) conn_put(con); @@ -181,6 +197,24 @@ static void tipc_register_callbacks(struct socket *sock, struct tipc_conn *con) write_unlock_bh(&sk->sk_callback_lock); } +/* tipc_con_delete_sub - delete a specific or all subscriptions + * for a given subscriber + */ +static void tipc_con_delete_sub(struct tipc_conn *con, struct tipc_subscr *s) +{ + struct list_head *sub_list = &con->sub_list; + struct tipc_subscription *sub, *tmp; + + spin_lock_bh(&con->sub_lock); + list_for_each_entry_safe(sub, tmp, sub_list, subscrp_list) { + if (!s || !memcmp(s, &sub->evt.s, sizeof(*s))) + tipc_sub_delete(sub); + else if (s) + break; + } + spin_unlock_bh(&con->sub_lock); +} + static void tipc_close_conn(struct tipc_conn *con) { struct sock *sk = con->sock->sk; @@ -188,10 +222,11 @@ static void tipc_close_conn(struct tipc_conn *con) write_lock_bh(&sk->sk_callback_lock); disconnect = test_and_clear_bit(CF_CONNECTED, &con->flags); + if (disconnect) { sk->sk_user_data = NULL; if (con->conid) - tipc_subscrb_delete(con->usr_data); + tipc_con_delete_sub(con, NULL); } write_unlock_bh(&sk->sk_callback_lock); @@ -215,7 +250,9 @@ static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s) kref_init(&con->kref); INIT_LIST_HEAD(&con->outqueue); + INIT_LIST_HEAD(&con->sub_list); spin_lock_init(&con->outqueue_lock); + spin_lock_init(&con->sub_lock); INIT_WORK(&con->swork, tipc_send_work); INIT_WORK(&con->rwork, tipc_recv_work); @@ -236,6 +273,35 @@ static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s) return con; } +int tipc_con_rcv_sub(struct net *net, int conid, struct tipc_conn *con, + void *buf, size_t len) +{ + struct tipc_subscr *s = (struct tipc_subscr *)buf; + struct tipc_subscription *sub; + bool status; + int swap; + + /* Determine subscriber's endianness */ + swap = !(s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE | + TIPC_SUB_CANCEL)); + + /* Detect & process a subscription cancellation request */ + if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) { + s->filter &= ~htohl(TIPC_SUB_CANCEL, swap); + tipc_con_delete_sub(con, s); + return 0; + } + status = !(s->filter & htohl(TIPC_SUB_NO_STATUS, swap)); + sub = tipc_subscrp_subscribe(net, s, conid, swap, status); + if (!sub) + return -1; + + spin_lock_bh(&con->sub_lock); + list_add(&sub->subscrp_list, &con->sub_list); + spin_unlock_bh(&con->sub_lock); + return 0; +} + static int tipc_receive_from_sock(struct tipc_conn *con) { struct tipc_server *s = con->server; @@ -262,9 +328,7 @@ static int tipc_receive_from_sock(struct tipc_conn *con) } read_lock_bh(&sk->sk_callback_lock); - if (test_bit(CF_CONNECTED, &con->flags)) - ret = tipc_subscrb_rcv(sock_net(con->sock->sk), con->conid, - con->usr_data, buf, ret); + ret = tipc_con_rcv_sub(s->net, con->conid, con, buf, ret); read_unlock_bh(&sk->sk_callback_lock); kmem_cache_free(s->rcvbuf_cache, buf); if (ret < 0) @@ -302,15 +366,6 @@ static int tipc_accept_from_sock(struct tipc_conn *con) newcon->rx_action = tipc_receive_from_sock; tipc_register_callbacks(newsock, newcon); - /* Notify that new connection is incoming */ - newcon->usr_data = tipc_subscrb_create(newcon->conid); - - if (!newcon->usr_data) { - sock_release(newsock); - conn_put(newcon); - return -ENOMEM; - } - /* Wake up receive process in case of 'SYN+' message */ newsock->sk->sk_data_ready(newsock->sk); return ret; @@ -427,7 +482,7 @@ static void tipc_clean_outqueues(struct tipc_conn *con) } int tipc_conn_sendmsg(struct tipc_server *s, int conid, - void *data, size_t len) + u32 evt, void *data, size_t len) { struct outqueue_entry *e; struct tipc_conn *con; @@ -436,7 +491,7 @@ int tipc_conn_sendmsg(struct tipc_server *s, int conid, if (!con) return -EINVAL; - if (!test_bit(CF_CONNECTED, &con->flags)) { + if (!connected(con)) { conn_put(con); return 0; } @@ -446,7 +501,7 @@ int tipc_conn_sendmsg(struct tipc_server *s, int conid, conn_put(con); return -ENOMEM; } - + e->evt = evt; spin_lock_bh(&con->outqueue_lock); list_add_tail(&e->list, &con->outqueue); spin_unlock_bh(&con->outqueue_lock); @@ -470,10 +525,9 @@ void tipc_conn_terminate(struct tipc_server *s, int conid) bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, u32 upper, u32 filter, int *conid) { - struct tipc_subscriber *scbr; struct tipc_subscr sub; - struct tipc_server *s; struct tipc_conn *con; + int rc; sub.seq.type = type; sub.seq.lower = lower; @@ -487,32 +541,23 @@ bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, return false; *conid = con->conid; - s = con->server; - scbr = tipc_subscrb_create(*conid); - if (!scbr) { - conn_put(con); - return false; - } - - con->usr_data = scbr; con->sock = NULL; - tipc_subscrb_rcv(net, *conid, scbr, &sub, sizeof(sub)); - return true; + rc = tipc_con_rcv_sub(net, *conid, con, &sub, sizeof(sub)); + if (rc < 0) + tipc_close_conn(con); + return !rc; } void tipc_topsrv_kern_unsubscr(struct net *net, int conid) { struct tipc_conn *con; - struct tipc_server *srv; con = tipc_conn_lookup(tipc_topsrv(net), conid); if (!con) return; test_and_clear_bit(CF_CONNECTED, &con->flags); - srv = con->server; - if (con->conid) - tipc_subscrb_delete(con->usr_data); + tipc_con_delete_sub(con, NULL); conn_put(con); conn_put(con); } @@ -537,7 +582,8 @@ static void tipc_send_kern_top_evt(struct net *net, struct tipc_event *evt) static void tipc_send_to_sock(struct tipc_conn *con) { - struct tipc_server *s = con->server; + struct list_head *queue = &con->outqueue; + struct tipc_server *srv = con->server; struct outqueue_entry *e; struct tipc_event *evt; struct msghdr msg; @@ -545,16 +591,20 @@ static void tipc_send_to_sock(struct tipc_conn *con) int ret; spin_lock_bh(&con->outqueue_lock); - while (test_bit(CF_CONNECTED, &con->flags)) { - e = list_entry(con->outqueue.next, struct outqueue_entry, list); - if ((struct list_head *) e == &con->outqueue) - break; + + while (!list_empty(queue)) { + e = list_first_entry(queue, struct outqueue_entry, list); spin_unlock_bh(&con->outqueue_lock); + if (e->evt == TIPC_SUBSCR_TIMEOUT) { + evt = (struct tipc_event *)e->iov.iov_base; + tipc_con_delete_sub(con, &evt->s); + } + memset(&msg, 0, sizeof(msg)); + msg.msg_flags = MSG_DONTWAIT; + if (con->sock) { - memset(&msg, 0, sizeof(msg)); - msg.msg_flags = MSG_DONTWAIT; ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1, e->iov.iov_len); if (ret == -EWOULDBLOCK || ret == 0) { @@ -565,7 +615,7 @@ static void tipc_send_to_sock(struct tipc_conn *con) } } else { evt = e->iov.iov_base; - tipc_send_kern_top_evt(s->net, evt); + tipc_send_kern_top_evt(srv->net, evt); } /* Don't starve users filling buffers */ @@ -573,7 +623,6 @@ static void tipc_send_to_sock(struct tipc_conn *con) cond_resched(); count = 0; } - spin_lock_bh(&con->outqueue_lock); list_del(&e->list); tipc_free_entry(e); @@ -591,7 +640,7 @@ static void tipc_recv_work(struct work_struct *work) struct tipc_conn *con = container_of(work, struct tipc_conn, rwork); int count = 0; - while (test_bit(CF_CONNECTED, &con->flags)) { + while (connected(con)) { if (con->rx_action(con)) break; @@ -608,7 +657,7 @@ static void tipc_send_work(struct work_struct *work) { struct tipc_conn *con = container_of(work, struct tipc_conn, swork); - if (test_bit(CF_CONNECTED, &con->flags)) + if (connected(con)) tipc_send_to_sock(con); conn_put(con); diff --git a/net/tipc/server.h b/net/tipc/server.h index b4b83bdc8b20..fcc72321362d 100644 --- a/net/tipc/server.h +++ b/net/tipc/server.h @@ -77,7 +77,7 @@ struct tipc_server { }; int tipc_conn_sendmsg(struct tipc_server *s, int conid, - void *data, size_t len); + u32 evt, void *data, size_t len); bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, u32 upper, u32 filter, int *conid); diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c index b86fbbf7a0b9..c6de1452db69 100644 --- a/net/tipc/subscr.c +++ b/net/tipc/subscr.c @@ -1,7 +1,7 @@ /* * net/tipc/subscr.c: TIPC network topology service * - * Copyright (c) 2000-2006, Ericsson AB + * Copyright (c) 2000-2017, Ericsson AB * Copyright (c) 2005-2007, 2010-2013, Wind River Systems * All rights reserved. * @@ -39,22 +39,6 @@ #include "subscr.h" /** - * struct tipc_subscriber - TIPC network topology subscriber - * @kref: reference counter to tipc_subscription object - * @conid: connection identifier to server connecting to subscriber - * @lock: control access to subscriber - * @subscrp_list: list of subscription objects for this subscriber - */ -struct tipc_subscriber { - struct kref kref; - int conid; - spinlock_t lock; - struct list_head subscrp_list; -}; - -static void tipc_subscrb_put(struct tipc_subscriber *subscriber); - -/** * htohl - convert value to endianness used by destination * @in: value to convert * @swap: non-zero if endianness must be reversed @@ -71,9 +55,10 @@ static void tipc_subscrp_send_event(struct tipc_subscription *sub, u32 event, u32 port_ref, u32 node) { struct tipc_net *tn = net_generic(sub->net, tipc_net_id); - struct tipc_subscriber *subscriber = sub->subscriber; struct kvec msg_sect; + if (sub->inactive) + return; msg_sect.iov_base = (void *)&sub->evt; msg_sect.iov_len = sizeof(struct tipc_event); sub->evt.event = htohl(event, sub->swap); @@ -81,7 +66,7 @@ static void tipc_subscrp_send_event(struct tipc_subscription *sub, sub->evt.found_upper = htohl(found_upper, sub->swap); sub->evt.port.ref = htohl(port_ref, sub->swap); sub->evt.port.node = htohl(node, sub->swap); - tipc_conn_sendmsg(tn->topsrv, subscriber->conid, + tipc_conn_sendmsg(tn->topsrv, sub->conid, event, msg_sect.iov_base, msg_sect.iov_len); } @@ -132,41 +117,22 @@ void tipc_subscrp_report_overlap(struct tipc_subscription *sub, u32 found_lower, return; if (filter & TIPC_SUB_NODE_SCOPE && scope != TIPC_NODE_SCOPE) return; - - tipc_subscrp_send_event(sub, found_lower, found_upper, event, port_ref, - node); + spin_lock(&sub->lock); + tipc_subscrp_send_event(sub, found_lower, found_upper, + event, port_ref, node); + spin_unlock(&sub->lock); } static void tipc_subscrp_timeout(struct timer_list *t) { struct tipc_subscription *sub = from_timer(sub, t, timer); - struct tipc_subscriber *subscriber = sub->subscriber; - - spin_lock_bh(&subscriber->lock); - tipc_nametbl_unsubscribe(sub); - list_del(&sub->subscrp_list); - spin_unlock_bh(&subscriber->lock); + struct tipc_subscr *s = &sub->evt.s; - /* Notify subscriber of timeout */ - tipc_subscrp_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper, + spin_lock(&sub->lock); + tipc_subscrp_send_event(sub, s->seq.lower, s->seq.upper, TIPC_SUBSCR_TIMEOUT, 0, 0); - - tipc_subscrp_put(sub); -} - -static void tipc_subscrb_kref_release(struct kref *kref) -{ - kfree(container_of(kref,struct tipc_subscriber, kref)); -} - -static void tipc_subscrb_put(struct tipc_subscriber *subscriber) -{ - kref_put(&subscriber->kref, tipc_subscrb_kref_release); -} - -static void tipc_subscrb_get(struct tipc_subscriber *subscriber) -{ - kref_get(&subscriber->kref); + sub->inactive = true; + spin_unlock(&sub->lock); } static void tipc_subscrp_kref_release(struct kref *kref) @@ -175,11 +141,9 @@ static void tipc_subscrp_kref_release(struct kref *kref) struct tipc_subscription, kref); struct tipc_net *tn = net_generic(sub->net, tipc_net_id); - struct tipc_subscriber *subscriber = sub->subscriber; atomic_dec(&tn->subscription_count); kfree(sub); - tipc_subscrb_put(subscriber); } void tipc_subscrp_put(struct tipc_subscription *subscription) @@ -192,68 +156,9 @@ void tipc_subscrp_get(struct tipc_subscription *subscription) kref_get(&subscription->kref); } -/* tipc_subscrb_subscrp_delete - delete a specific subscription or all - * subscriptions for a given subscriber. - */ -static void tipc_subscrb_subscrp_delete(struct tipc_subscriber *subscriber, - struct tipc_subscr *s) -{ - struct list_head *subscription_list = &subscriber->subscrp_list; - struct tipc_subscription *sub, *temp; - u32 timeout; - - spin_lock_bh(&subscriber->lock); - list_for_each_entry_safe(sub, temp, subscription_list, subscrp_list) { - if (s && memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) - continue; - - timeout = htohl(sub->evt.s.timeout, sub->swap); - if (timeout == TIPC_WAIT_FOREVER || del_timer(&sub->timer)) { - tipc_nametbl_unsubscribe(sub); - list_del(&sub->subscrp_list); - tipc_subscrp_put(sub); - } - - if (s) - break; - } - spin_unlock_bh(&subscriber->lock); -} - -struct tipc_subscriber *tipc_subscrb_create(int conid) -{ - struct tipc_subscriber *subscriber; - - subscriber = kzalloc(sizeof(*subscriber), GFP_ATOMIC); - if (!subscriber) { - pr_warn("Subscriber rejected, no memory\n"); - return NULL; - } - INIT_LIST_HEAD(&subscriber->subscrp_list); - kref_init(&subscriber->kref); - subscriber->conid = conid; - spin_lock_init(&subscriber->lock); - - return subscriber; -} - -void tipc_subscrb_delete(struct tipc_subscriber *subscriber) -{ - tipc_subscrb_subscrp_delete(subscriber, NULL); - tipc_subscrb_put(subscriber); -} - -static void tipc_subscrp_cancel(struct tipc_subscr *s, - struct tipc_subscriber *subscriber) -{ - tipc_subscrb_get(subscriber); - tipc_subscrb_subscrp_delete(subscriber, s); - tipc_subscrb_put(subscriber); -} - static struct tipc_subscription *tipc_subscrp_create(struct net *net, struct tipc_subscr *s, - int swap) + int conid, bool swap) { struct tipc_net *tn = net_generic(net, tipc_net_id); struct tipc_subscription *sub; @@ -275,6 +180,8 @@ static struct tipc_subscription *tipc_subscrp_create(struct net *net, /* Initialize subscription object */ sub->net = net; + sub->conid = conid; + sub->inactive = false; if (((filter & TIPC_SUB_PORTS) && (filter & TIPC_SUB_SERVICE)) || (htohl(s->seq.lower, swap) > htohl(s->seq.upper, swap))) { pr_warn("Subscription rejected, illegal request\n"); @@ -284,59 +191,39 @@ static struct tipc_subscription *tipc_subscrp_create(struct net *net, sub->swap = swap; memcpy(&sub->evt.s, s, sizeof(*s)); + spin_lock_init(&sub->lock); atomic_inc(&tn->subscription_count); kref_init(&sub->kref); return sub; } -static int tipc_subscrp_subscribe(struct net *net, struct tipc_subscr *s, - struct tipc_subscriber *subscriber, int swap, - bool status) +struct tipc_subscription *tipc_subscrp_subscribe(struct net *net, + struct tipc_subscr *s, + int conid, bool swap, + bool status) { struct tipc_subscription *sub = NULL; u32 timeout; - sub = tipc_subscrp_create(net, s, swap); + sub = tipc_subscrp_create(net, s, conid, swap); if (!sub) - return -1; + return NULL; - spin_lock_bh(&subscriber->lock); - list_add(&sub->subscrp_list, &subscriber->subscrp_list); - sub->subscriber = subscriber; tipc_nametbl_subscribe(sub, status); - tipc_subscrb_get(subscriber); - spin_unlock_bh(&subscriber->lock); - timer_setup(&sub->timer, tipc_subscrp_timeout, 0); timeout = htohl(sub->evt.s.timeout, swap); - if (timeout != TIPC_WAIT_FOREVER) mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout)); - return 0; + return sub; } -/* Handle one request to create a new subscription for the subscriber - */ -int tipc_subscrb_rcv(struct net *net, int conid, void *usr_data, - void *buf, size_t len) +void tipc_sub_delete(struct tipc_subscription *sub) { - struct tipc_subscriber *subscriber = usr_data; - struct tipc_subscr *s = (struct tipc_subscr *)buf; - bool status; - int swap; - - /* Determine subscriber's endianness */ - swap = !(s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE | - TIPC_SUB_CANCEL)); - - /* Detect & process a subscription cancellation request */ - if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) { - s->filter &= ~htohl(TIPC_SUB_CANCEL, swap); - tipc_subscrp_cancel(s, subscriber); - return 0; - } - status = !(s->filter & htohl(TIPC_SUB_NO_STATUS, swap)); - return tipc_subscrp_subscribe(net, s, subscriber, swap, status); + tipc_nametbl_unsubscribe(sub); + if (sub->evt.s.timeout != TIPC_WAIT_FOREVER) + del_timer_sync(&sub->timer); + list_del(&sub->subscrp_list); + tipc_subscrp_put(sub); } int tipc_topsrv_start(struct net *net) diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h index a736f29ba9ab..cfd0cb3a1da8 100644 --- a/net/tipc/subscr.h +++ b/net/tipc/subscr.h @@ -43,7 +43,7 @@ #define TIPC_MAX_PUBLICATIONS 65535 struct tipc_subscription; -struct tipc_subscriber; +struct tipc_conn; /** * struct tipc_subscription - TIPC network topology subscription object @@ -58,19 +58,22 @@ struct tipc_subscriber; */ struct tipc_subscription { struct kref kref; - struct tipc_subscriber *subscriber; struct net *net; struct timer_list timer; struct list_head nameseq_list; struct list_head subscrp_list; - int swap; struct tipc_event evt; + int conid; + bool swap; + bool inactive; + spinlock_t lock; /* serialize up/down and timer events */ }; -struct tipc_subscriber *tipc_subscrb_create(int conid); -void tipc_subscrb_delete(struct tipc_subscriber *subscriber); -int tipc_subscrb_rcv(struct net *net, int conid, void *usr_data, - void *buf, size_t len); +struct tipc_subscription *tipc_subscrp_subscribe(struct net *net, + struct tipc_subscr *s, + int conid, bool swap, + bool status); +void tipc_sub_delete(struct tipc_subscription *sub); int tipc_subscrp_check_overlap(struct tipc_name_seq *seq, u32 found_lower, u32 found_upper); void tipc_subscrp_report_overlap(struct tipc_subscription *sub, |