diff options
Diffstat (limited to 'net/tipc/bcast.c')
-rw-r--r-- | net/tipc/bcast.c | 194 |
1 files changed, 119 insertions, 75 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index 95ab5ef92920..26631679a1fa 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -71,7 +71,7 @@ struct tipc_bcbearer_pair { * Note: The fields labelled "temporary" are incorporated into the bearer * to avoid consuming potentially limited stack space through the use of * large local variables within multicast routines. Concurrent access is - * prevented through use of the spinlock "bc_lock". + * prevented through use of the spinlock "bclink_lock". */ struct tipc_bcbearer { struct tipc_bearer bearer; @@ -84,34 +84,64 @@ struct tipc_bcbearer { /** * struct tipc_bclink - link used for broadcast messages + * @lock: spinlock governing access to structure * @link: (non-standard) broadcast link structure * @node: (non-standard) node structure representing b'cast link's peer node + * @flags: represent bclink states * @bcast_nodes: map of broadcast-capable nodes * @retransmit_to: node that most recently requested a retransmit * * Handles sequence numbering, fragmentation, bundling, etc. */ struct tipc_bclink { + spinlock_t lock; struct tipc_link link; struct tipc_node node; + unsigned int flags; struct tipc_node_map bcast_nodes; struct tipc_node *retransmit_to; }; -static struct tipc_bcbearer bcast_bearer; -static struct tipc_bclink bcast_link; - -static struct tipc_bcbearer *bcbearer = &bcast_bearer; -static struct tipc_bclink *bclink = &bcast_link; -static struct tipc_link *bcl = &bcast_link.link; - -static DEFINE_SPINLOCK(bc_lock); +static struct tipc_bcbearer *bcbearer; +static struct tipc_bclink *bclink; +static struct tipc_link *bcl; const char tipc_bclink_name[] = "broadcast-link"; static void tipc_nmap_diff(struct tipc_node_map *nm_a, struct tipc_node_map *nm_b, struct tipc_node_map *nm_diff); +static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node); +static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node); + +static void tipc_bclink_lock(void) +{ + spin_lock_bh(&bclink->lock); +} + +static void tipc_bclink_unlock(void) +{ + struct tipc_node *node = NULL; + + if (likely(!bclink->flags)) { + spin_unlock_bh(&bclink->lock); + return; + } + + if (bclink->flags & TIPC_BCLINK_RESET) { + bclink->flags &= ~TIPC_BCLINK_RESET; + node = tipc_bclink_retransmit_to(); + } + spin_unlock_bh(&bclink->lock); + + if (node) + tipc_link_reset_all(node); +} + +void tipc_bclink_set_flags(unsigned int flags) +{ + bclink->flags |= flags; +} static u32 bcbuf_acks(struct sk_buff *buf) { @@ -130,16 +160,16 @@ static void bcbuf_decr_acks(struct sk_buff *buf) void tipc_bclink_add_node(u32 addr) { - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); tipc_nmap_add(&bclink->bcast_nodes, addr); - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); } void tipc_bclink_remove_node(u32 addr) { - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); tipc_nmap_remove(&bclink->bcast_nodes, addr); - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); } static void bclink_set_last_sent(void) @@ -165,7 +195,7 @@ static void bclink_update_last_sent(struct tipc_node *node, u32 seqno) /** * tipc_bclink_retransmit_to - get most recent node to request retransmission * - * Called with bc_lock locked + * Called with bclink_lock locked */ struct tipc_node *tipc_bclink_retransmit_to(void) { @@ -177,7 +207,7 @@ struct tipc_node *tipc_bclink_retransmit_to(void) * @after: sequence number of last packet to *not* retransmit * @to: sequence number of last packet to retransmit * - * Called with bc_lock locked + * Called with bclink_lock locked */ static void bclink_retransmit_pkt(u32 after, u32 to) { @@ -194,7 +224,7 @@ static void bclink_retransmit_pkt(u32 after, u32 to) * @n_ptr: node that sent acknowledgement info * @acked: broadcast sequence # that has been acknowledged * - * Node is locked, bc_lock unlocked. + * Node is locked, bclink_lock unlocked. */ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked) { @@ -202,8 +232,7 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked) struct sk_buff *next; unsigned int released = 0; - spin_lock_bh(&bc_lock); - + tipc_bclink_lock(); /* Bail out if tx queue is empty (no clean up is required) */ crs = bcl->first_out; if (!crs) @@ -267,13 +296,13 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked) if (unlikely(released && !list_empty(&bcl->waiting_ports))) tipc_link_wakeup_ports(bcl, 0); exit: - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); } /** * tipc_bclink_update_link_state - update broadcast link state * - * tipc_net_lock and node lock set + * RCU and node lock set */ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent) { @@ -320,10 +349,10 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent) ? buf_seqno(n_ptr->bclink.deferred_head) - 1 : n_ptr->bclink.last_sent); - spin_lock_bh(&bc_lock); - tipc_bearer_send(&bcbearer->bearer, buf, NULL); + tipc_bclink_lock(); + tipc_bearer_send(MAX_BEARERS, buf, NULL); bcl->stats.sent_nacks++; - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); kfree_skb(buf); n_ptr->bclink.oos_state++; @@ -335,8 +364,6 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent) * * Delay any upcoming NACK by this node if another node has already * requested the first message this node is going to ask for. - * - * Only tipc_net_lock set. */ static void bclink_peek_nack(struct tipc_msg *msg) { @@ -362,7 +389,7 @@ int tipc_bclink_xmit(struct sk_buff *buf) { int res; - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); if (!bclink->bcast_nodes.count) { res = msg_data_sz(buf_msg(buf)); @@ -377,14 +404,14 @@ int tipc_bclink_xmit(struct sk_buff *buf) bcl->stats.accu_queue_sz += bcl->out_queue_size; } exit: - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); return res; } /** * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet * - * Called with both sending node's lock and bc_lock taken. + * Called with both sending node's lock and bclink_lock taken. */ static void bclink_accept_pkt(struct tipc_node *node, u32 seqno) { @@ -408,7 +435,7 @@ static void bclink_accept_pkt(struct tipc_node *node, u32 seqno) /** * tipc_bclink_rcv - receive a broadcast packet, and deliver upwards * - * tipc_net_lock is read_locked, no other locks set + * RCU is locked, no other locks set */ void tipc_bclink_rcv(struct sk_buff *buf) { @@ -439,12 +466,12 @@ void tipc_bclink_rcv(struct sk_buff *buf) if (msg_destnode(msg) == tipc_own_addr) { tipc_bclink_acknowledge(node, msg_bcast_ack(msg)); tipc_node_unlock(node); - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); bcl->stats.recv_nacks++; bclink->retransmit_to = node; bclink_retransmit_pkt(msg_bcgap_after(msg), msg_bcgap_to(msg)); - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); } else { tipc_node_unlock(node); bclink_peek_nack(msg); @@ -462,51 +489,47 @@ receive: /* Deliver message to destination */ if (likely(msg_isdata(msg))) { - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); bclink_accept_pkt(node, seqno); - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); tipc_node_unlock(node); if (likely(msg_mcast(msg))) tipc_port_mcast_rcv(buf, NULL); else kfree_skb(buf); } else if (msg_user(msg) == MSG_BUNDLER) { - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); bclink_accept_pkt(node, seqno); bcl->stats.recv_bundles++; bcl->stats.recv_bundled += msg_msgcnt(msg); - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); tipc_node_unlock(node); tipc_link_bundle_rcv(buf); } else if (msg_user(msg) == MSG_FRAGMENTER) { - int ret; - ret = tipc_link_frag_rcv(&node->bclink.reasm_head, - &node->bclink.reasm_tail, - &buf); - if (ret == LINK_REASM_ERROR) + tipc_buf_append(&node->bclink.reasm_buf, &buf); + if (unlikely(!buf && !node->bclink.reasm_buf)) goto unlock; - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); bclink_accept_pkt(node, seqno); bcl->stats.recv_fragments++; - if (ret == LINK_REASM_COMPLETE) { + if (buf) { bcl->stats.recv_fragmented++; - /* Point msg to inner header */ msg = buf_msg(buf); - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); goto receive; } - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); tipc_node_unlock(node); } else if (msg_user(msg) == NAME_DISTRIBUTOR) { - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); bclink_accept_pkt(node, seqno); - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); tipc_node_unlock(node); tipc_named_rcv(buf); } else { - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); bclink_accept_pkt(node, seqno); - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); tipc_node_unlock(node); kfree_skb(buf); } @@ -552,14 +575,14 @@ receive: } else deferred = 0; - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); if (deferred) bcl->stats.deferred_recv++; else bcl->stats.duplicates++; - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); unlock: tipc_node_unlock(node); @@ -627,13 +650,13 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1, if (bp_index == 0) { /* Use original buffer for first bearer */ - tipc_bearer_send(b, buf, &b->bcast_addr); + tipc_bearer_send(b->identity, buf, &b->bcast_addr); } else { /* Avoid concurrent buffer access */ - tbuf = pskb_copy(buf, GFP_ATOMIC); + tbuf = pskb_copy_for_clone(buf, GFP_ATOMIC); if (!tbuf) break; - tipc_bearer_send(b, tbuf, &b->bcast_addr); + tipc_bearer_send(b->identity, tbuf, &b->bcast_addr); kfree_skb(tbuf); /* Bearer keeps a clone */ } @@ -655,20 +678,27 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1, /** * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer */ -void tipc_bcbearer_sort(void) +void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action) { struct tipc_bcbearer_pair *bp_temp = bcbearer->bpairs_temp; struct tipc_bcbearer_pair *bp_curr; + struct tipc_bearer *b; int b_index; int pri; - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); + + if (action) + tipc_nmap_add(nm_ptr, node); + else + tipc_nmap_remove(nm_ptr, node); /* Group bearers by priority (can assume max of two per priority) */ memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp)); + rcu_read_lock(); for (b_index = 0; b_index < MAX_BEARERS; b_index++) { - struct tipc_bearer *b = bearer_list[b_index]; + b = rcu_dereference_rtnl(bearer_list[b_index]); if (!b || !b->nodes.count) continue; @@ -677,6 +707,7 @@ void tipc_bcbearer_sort(void) else bp_temp[b->priority].secondary = b; } + rcu_read_unlock(); /* Create array of bearer pairs for broadcasting */ bp_curr = bcbearer->bpairs; @@ -702,7 +733,7 @@ void tipc_bcbearer_sort(void) bp_curr++; } - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); } @@ -714,7 +745,7 @@ int tipc_bclink_stats(char *buf, const u32 buf_size) if (!bcl) return 0; - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); s = &bcl->stats; @@ -743,7 +774,7 @@ int tipc_bclink_stats(char *buf, const u32 buf_size) s->queue_sz_counts ? (s->accu_queue_sz / s->queue_sz_counts) : 0); - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); return ret; } @@ -752,9 +783,9 @@ int tipc_bclink_reset_stats(void) if (!bcl) return -ENOPROTOOPT; - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); memset(&bcl->stats, 0, sizeof(bcl->stats)); - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); return 0; } @@ -765,46 +796,59 @@ int tipc_bclink_set_queue_limits(u32 limit) if ((limit < TIPC_MIN_LINK_WIN) || (limit > TIPC_MAX_LINK_WIN)) return -EINVAL; - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); tipc_link_set_queue_limits(bcl, limit); - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); return 0; } -void tipc_bclink_init(void) +int tipc_bclink_init(void) { + bcbearer = kzalloc(sizeof(*bcbearer), GFP_ATOMIC); + if (!bcbearer) + return -ENOMEM; + + bclink = kzalloc(sizeof(*bclink), GFP_ATOMIC); + if (!bclink) { + kfree(bcbearer); + return -ENOMEM; + } + + bcl = &bclink->link; bcbearer->bearer.media = &bcbearer->media; bcbearer->media.send_msg = tipc_bcbearer_send; sprintf(bcbearer->media.name, "tipc-broadcast"); + spin_lock_init(&bclink->lock); INIT_LIST_HEAD(&bcl->waiting_ports); bcl->next_out_no = 1; spin_lock_init(&bclink->node.lock); bcl->owner = &bclink->node; bcl->max_pkt = MAX_PKT_DEFAULT_MCAST; tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT); - bcl->b_ptr = &bcbearer->bearer; - bearer_list[BCBEARER] = &bcbearer->bearer; + bcl->bearer_id = MAX_BEARERS; + rcu_assign_pointer(bearer_list[MAX_BEARERS], &bcbearer->bearer); bcl->state = WORKING_WORKING; strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME); + return 0; } void tipc_bclink_stop(void) { - spin_lock_bh(&bc_lock); + tipc_bclink_lock(); tipc_link_purge_queues(bcl); - spin_unlock_bh(&bc_lock); + tipc_bclink_unlock(); - bearer_list[BCBEARER] = NULL; - memset(bclink, 0, sizeof(*bclink)); - memset(bcbearer, 0, sizeof(*bcbearer)); + RCU_INIT_POINTER(bearer_list[BCBEARER], NULL); + synchronize_net(); + kfree(bcbearer); + kfree(bclink); } - /** * tipc_nmap_add - add a node to a node map */ -void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node) +static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node) { int n = tipc_node(node); int w = n / WSIZE; @@ -819,7 +863,7 @@ void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node) /** * tipc_nmap_remove - remove a node from a node map */ -void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node) +static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node) { int n = tipc_node(node); int w = n / WSIZE; |