summaryrefslogtreecommitdiffstats
path: root/net/tipc/link.c
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2014-12-11 23:27:06 +0100
committerLinus Torvalds <torvalds@linux-foundation.org>2014-12-11 23:27:06 +0100
commit70e71ca0af244f48a5dcf56dc435243792e3a495 (patch)
treef7d9c4c4d9a857a00043e9bf6aa2d6f533a34778 /net/tipc/link.c
parentMerge tag 'sound-3.19-rc1' of git://git.kernel.org/pub/scm/linux/kernel/git/t... (diff)
parentFix race condition between vxlan_sock_add and vxlan_sock_release (diff)
downloadlinux-70e71ca0af244f48a5dcf56dc435243792e3a495.tar.xz
linux-70e71ca0af244f48a5dcf56dc435243792e3a495.zip
Merge git://git.kernel.org/pub/scm/linux/kernel/git/davem/net-next
Pull networking updates from David Miller: 1) New offloading infrastructure and example 'rocker' driver for offloading of switching and routing to hardware. This work was done by a large group of dedicated individuals, not limited to: Scott Feldman, Jiri Pirko, Thomas Graf, John Fastabend, Jamal Hadi Salim, Andy Gospodarek, Florian Fainelli, Roopa Prabhu 2) Start making the networking operate on IOV iterators instead of modifying iov objects in-situ during transfers. Thanks to Al Viro and Herbert Xu. 3) A set of new netlink interfaces for the TIPC stack, from Richard Alpe. 4) Remove unnecessary looping during ipv6 routing lookups, from Martin KaFai Lau. 5) Add PAUSE frame generation support to gianfar driver, from Matei Pavaluca. 6) Allow for larger reordering levels in TCP, which are easily achievable in the real world right now, from Eric Dumazet. 7) Add a variable of napi_schedule that doesn't need to disable cpu interrupts, from Eric Dumazet. 8) Use a doubly linked list to optimize neigh_parms_release(), from Nicolas Dichtel. 9) Various enhancements to the kernel BPF verifier, and allow eBPF programs to actually be attached to sockets. From Alexei Starovoitov. 10) Support TSO/LSO in sunvnet driver, from David L Stevens. 11) Allow controlling ECN usage via routing metrics, from Florian Westphal. 12) Remote checksum offload, from Tom Herbert. 13) Add split-header receive, BQL, and xmit_more support to amd-xgbe driver, from Thomas Lendacky. 14) Add MPLS support to openvswitch, from Simon Horman. 15) Support wildcard tunnel endpoints in ipv6 tunnels, from Steffen Klassert. 16) Do gro flushes on a per-device basis using a timer, from Eric Dumazet. This tries to resolve the conflicting goals between the desired handling of bulk vs. RPC-like traffic. 17) Allow userspace to ask for the CPU upon what a packet was received/steered, via SO_INCOMING_CPU. From Eric Dumazet. 18) Limit GSO packets to half the current congestion window, from Eric Dumazet. 19) Add a generic helper so that all drivers set their RSS keys in a consistent way, from Eric Dumazet. 20) Add xmit_more support to enic driver, from Govindarajulu Varadarajan. 21) Add VLAN packet scheduler action, from Jiri Pirko. 22) Support configurable RSS hash functions via ethtool, from Eyal Perry. * git://git.kernel.org/pub/scm/linux/kernel/git/davem/net-next: (1820 commits) Fix race condition between vxlan_sock_add and vxlan_sock_release net/macb: fix compilation warning for print_hex_dump() called with skb->mac_header net/mlx4: Add support for A0 steering net/mlx4: Refactor QUERY_PORT net/mlx4_core: Add explicit error message when rule doesn't meet configuration net/mlx4: Add A0 hybrid steering net/mlx4: Add mlx4_bitmap zone allocator net/mlx4: Add a check if there are too many reserved QPs net/mlx4: Change QP allocation scheme net/mlx4_core: Use tasklet for user-space CQ completion events net/mlx4_core: Mask out host side virtualization features for guests net/mlx4_en: Set csum level for encapsulated packets be2net: Export tunnel offloads only when a VxLAN tunnel is created gianfar: Fix dma check map error when DMA_API_DEBUG is enabled cxgb4/csiostor: Don't use MASTER_MUST for fw_hello call net: fec: only enable mdio interrupt before phy device link up net: fec: clear all interrupt events to support i.MX6SX net: fec: reset fep link status in suspend function net: sock: fix access via invalid file descriptor net: introduce helper macro for_each_cmsghdr ...
Diffstat (limited to 'net/tipc/link.c')
-rw-r--r--net/tipc/link.c981
1 files changed, 676 insertions, 305 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 1db162aa64a5..23bcc1132365 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -36,10 +36,12 @@
#include "core.h"
#include "link.h"
+#include "bcast.h"
#include "socket.h"
#include "name_distr.h"
#include "discover.h"
#include "config.h"
+#include "netlink.h"
#include <linux/pkt_sched.h>
@@ -50,6 +52,30 @@ static const char *link_co_err = "Link changeover error, ";
static const char *link_rst_msg = "Resetting link ";
static const char *link_unk_evt = "Unknown link event ";
+static const struct nla_policy tipc_nl_link_policy[TIPC_NLA_LINK_MAX + 1] = {
+ [TIPC_NLA_LINK_UNSPEC] = { .type = NLA_UNSPEC },
+ [TIPC_NLA_LINK_NAME] = {
+ .type = NLA_STRING,
+ .len = TIPC_MAX_LINK_NAME
+ },
+ [TIPC_NLA_LINK_MTU] = { .type = NLA_U32 },
+ [TIPC_NLA_LINK_BROADCAST] = { .type = NLA_FLAG },
+ [TIPC_NLA_LINK_UP] = { .type = NLA_FLAG },
+ [TIPC_NLA_LINK_ACTIVE] = { .type = NLA_FLAG },
+ [TIPC_NLA_LINK_PROP] = { .type = NLA_NESTED },
+ [TIPC_NLA_LINK_STATS] = { .type = NLA_NESTED },
+ [TIPC_NLA_LINK_RX] = { .type = NLA_U32 },
+ [TIPC_NLA_LINK_TX] = { .type = NLA_U32 }
+};
+
+/* Properties valid for media, bearar and link */
+static const struct nla_policy tipc_nl_prop_policy[TIPC_NLA_PROP_MAX + 1] = {
+ [TIPC_NLA_PROP_UNSPEC] = { .type = NLA_UNSPEC },
+ [TIPC_NLA_PROP_PRIO] = { .type = NLA_U32 },
+ [TIPC_NLA_PROP_TOL] = { .type = NLA_U32 },
+ [TIPC_NLA_PROP_WIN] = { .type = NLA_U32 }
+};
+
/*
* Out-of-range value for link session numbers
*/
@@ -123,18 +149,6 @@ static void link_init_max_pkt(struct tipc_link *l_ptr)
l_ptr->max_pkt_probes = 0;
}
-static u32 link_next_sent(struct tipc_link *l_ptr)
-{
- if (l_ptr->next_out)
- return buf_seqno(l_ptr->next_out);
- return mod(l_ptr->next_out_no);
-}
-
-static u32 link_last_sent(struct tipc_link *l_ptr)
-{
- return mod(link_next_sent(l_ptr) - 1);
-}
-
/*
* Simple non-static link routines (i.e. referenced outside this file)
*/
@@ -157,14 +171,17 @@ int tipc_link_is_active(struct tipc_link *l_ptr)
*/
static void link_timeout(struct tipc_link *l_ptr)
{
+ struct sk_buff *skb;
+
tipc_node_lock(l_ptr->owner);
/* update counters used in statistical profiling of send traffic */
- l_ptr->stats.accu_queue_sz += l_ptr->out_queue_size;
+ l_ptr->stats.accu_queue_sz += skb_queue_len(&l_ptr->outqueue);
l_ptr->stats.queue_sz_counts++;
- if (l_ptr->first_out) {
- struct tipc_msg *msg = buf_msg(l_ptr->first_out);
+ skb = skb_peek(&l_ptr->outqueue);
+ if (skb) {
+ struct tipc_msg *msg = buf_msg(skb);
u32 length = msg_size(msg);
if ((msg_user(msg) == MSG_FRAGMENTER) &&
@@ -192,11 +209,10 @@ static void link_timeout(struct tipc_link *l_ptr)
}
/* do all other link processing performed on a periodic basis */
-
link_state_event(l_ptr, TIMEOUT_EVT);
if (l_ptr->next_out)
- tipc_link_push_queue(l_ptr);
+ tipc_link_push_packets(l_ptr);
tipc_node_unlock(l_ptr->owner);
}
@@ -224,9 +240,10 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
char addr_string[16];
u32 peer = n_ptr->addr;
- if (n_ptr->link_cnt >= 2) {
+ if (n_ptr->link_cnt >= MAX_BEARERS) {
tipc_addr_string_fill(addr_string, n_ptr->addr);
- pr_err("Attempt to establish third link to %s\n", addr_string);
+ pr_err("Attempt to establish %uth link to %s. Max %u allowed.\n",
+ n_ptr->link_cnt, addr_string, MAX_BEARERS);
return NULL;
}
@@ -274,7 +291,9 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
link_init_max_pkt(l_ptr);
l_ptr->next_out_no = 1;
- __skb_queue_head_init(&l_ptr->waiting_sks);
+ __skb_queue_head_init(&l_ptr->outqueue);
+ __skb_queue_head_init(&l_ptr->deferred_queue);
+ skb_queue_head_init(&l_ptr->waiting_sks);
link_reset_statistics(l_ptr);
@@ -339,7 +358,7 @@ static bool link_schedule_user(struct tipc_link *link, u32 oport,
return false;
TIPC_SKB_CB(buf)->chain_sz = chain_sz;
TIPC_SKB_CB(buf)->chain_imp = imp;
- __skb_queue_tail(&link->waiting_sks, buf);
+ skb_queue_tail(&link->waiting_sks, buf);
link->stats.link_congs++;
return true;
}
@@ -352,30 +371,19 @@ static bool link_schedule_user(struct tipc_link *link, u32 oport,
*/
static void link_prepare_wakeup(struct tipc_link *link)
{
- struct sk_buff_head *wq = &link->waiting_sks;
- struct sk_buff *buf;
- uint pend_qsz = link->out_queue_size;
+ uint pend_qsz = skb_queue_len(&link->outqueue);
+ struct sk_buff *skb, *tmp;
- for (buf = skb_peek(wq); buf; buf = skb_peek(wq)) {
- if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(buf)->chain_imp])
+ skb_queue_walk_safe(&link->waiting_sks, skb, tmp) {
+ if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(skb)->chain_imp])
break;
- pend_qsz += TIPC_SKB_CB(buf)->chain_sz;
- __skb_queue_tail(&link->owner->waiting_sks, __skb_dequeue(wq));
+ pend_qsz += TIPC_SKB_CB(skb)->chain_sz;
+ skb_unlink(skb, &link->waiting_sks);
+ skb_queue_tail(&link->owner->waiting_sks, skb);
}
}
/**
- * link_release_outqueue - purge link's outbound message queue
- * @l_ptr: pointer to link
- */
-static void link_release_outqueue(struct tipc_link *l_ptr)
-{
- kfree_skb_list(l_ptr->first_out);
- l_ptr->first_out = NULL;
- l_ptr->out_queue_size = 0;
-}
-
-/**
* tipc_link_reset_fragments - purge link's inbound message fragments queue
* @l_ptr: pointer to link
*/
@@ -391,11 +399,9 @@ void tipc_link_reset_fragments(struct tipc_link *l_ptr)
*/
void tipc_link_purge_queues(struct tipc_link *l_ptr)
{
- kfree_skb_list(l_ptr->oldest_deferred_in);
- kfree_skb_list(l_ptr->first_out);
+ __skb_queue_purge(&l_ptr->deferred_queue);
+ __skb_queue_purge(&l_ptr->outqueue);
tipc_link_reset_fragments(l_ptr);
- kfree_skb(l_ptr->proto_msg_queue);
- l_ptr->proto_msg_queue = NULL;
}
void tipc_link_reset(struct tipc_link *l_ptr)
@@ -427,25 +433,16 @@ void tipc_link_reset(struct tipc_link *l_ptr)
}
/* Clean up all queues: */
- link_release_outqueue(l_ptr);
- kfree_skb(l_ptr->proto_msg_queue);
- l_ptr->proto_msg_queue = NULL;
- kfree_skb_list(l_ptr->oldest_deferred_in);
+ __skb_queue_purge(&l_ptr->outqueue);
+ __skb_queue_purge(&l_ptr->deferred_queue);
if (!skb_queue_empty(&l_ptr->waiting_sks)) {
skb_queue_splice_init(&l_ptr->waiting_sks, &owner->waiting_sks);
owner->action_flags |= TIPC_WAKEUP_USERS;
}
- l_ptr->retransm_queue_head = 0;
- l_ptr->retransm_queue_size = 0;
- l_ptr->last_out = NULL;
- l_ptr->first_out = NULL;
l_ptr->next_out = NULL;
l_ptr->unacked_window = 0;
l_ptr->checkpoint = 1;
l_ptr->next_out_no = 1;
- l_ptr->deferred_inqueue_sz = 0;
- l_ptr->oldest_deferred_in = NULL;
- l_ptr->newest_deferred_in = NULL;
l_ptr->fsm_msg_cnt = 0;
l_ptr->stale_count = 0;
link_reset_statistics(l_ptr);
@@ -667,9 +664,10 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
* - For all other messages we discard the buffer and return -EHOSTUNREACH
* - For TIPC internal messages we also reset the link
*/
-static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf)
+static int tipc_link_cong(struct tipc_link *link, struct sk_buff_head *list)
{
- struct tipc_msg *msg = buf_msg(buf);
+ struct sk_buff *skb = skb_peek(list);
+ struct tipc_msg *msg = buf_msg(skb);
uint imp = tipc_msg_tot_importance(msg);
u32 oport = msg_tot_origport(msg);
@@ -682,30 +680,30 @@ static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf)
goto drop;
if (unlikely(msg_reroute_cnt(msg)))
goto drop;
- if (TIPC_SKB_CB(buf)->wakeup_pending)
+ if (TIPC_SKB_CB(skb)->wakeup_pending)
return -ELINKCONG;
- if (link_schedule_user(link, oport, TIPC_SKB_CB(buf)->chain_sz, imp))
+ if (link_schedule_user(link, oport, skb_queue_len(list), imp))
return -ELINKCONG;
drop:
- kfree_skb_list(buf);
+ __skb_queue_purge(list);
return -EHOSTUNREACH;
}
/**
* __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked
* @link: link to use
- * @buf: chain of buffers containing message
+ * @list: chain of buffers containing message
+ *
* Consumes the buffer chain, except when returning -ELINKCONG
* Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket
* user data messages) or -EHOSTUNREACH (all other messages/senders)
* Only the socket functions tipc_send_stream() and tipc_send_packet() need
* to act on the return value, since they may need to do more send attempts.
*/
-int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf)
+int __tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list)
{
- struct tipc_msg *msg = buf_msg(buf);
+ struct tipc_msg *msg = buf_msg(skb_peek(list));
uint psz = msg_size(msg);
- uint qsz = link->out_queue_size;
uint sndlim = link->queue_limit[0];
uint imp = tipc_msg_tot_importance(msg);
uint mtu = link->max_pkt;
@@ -713,71 +711,83 @@ int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf)
uint seqno = link->next_out_no;
uint bc_last_in = link->owner->bclink.last_in;
struct tipc_media_addr *addr = &link->media_addr;
- struct sk_buff *next = buf->next;
+ struct sk_buff_head *outqueue = &link->outqueue;
+ struct sk_buff *skb, *tmp;
/* Match queue limits against msg importance: */
- if (unlikely(qsz >= link->queue_limit[imp]))
- return tipc_link_cong(link, buf);
+ if (unlikely(skb_queue_len(outqueue) >= link->queue_limit[imp]))
+ return tipc_link_cong(link, list);
/* Has valid packet limit been used ? */
if (unlikely(psz > mtu)) {
- kfree_skb_list(buf);
+ __skb_queue_purge(list);
return -EMSGSIZE;
}
/* Prepare each packet for sending, and add to outqueue: */
- while (buf) {
- next = buf->next;
- msg = buf_msg(buf);
+ skb_queue_walk_safe(list, skb, tmp) {
+ __skb_unlink(skb, list);
+ msg = buf_msg(skb);
msg_set_word(msg, 2, ((ack << 16) | mod(seqno)));
msg_set_bcast_ack(msg, bc_last_in);
- if (!link->first_out) {
- link->first_out = buf;
- } else if (qsz < sndlim) {
- link->last_out->next = buf;
- } else if (tipc_msg_bundle(link->last_out, buf, mtu)) {
+ if (skb_queue_len(outqueue) < sndlim) {
+ __skb_queue_tail(outqueue, skb);
+ tipc_bearer_send(link->bearer_id, skb, addr);
+ link->next_out = NULL;
+ link->unacked_window = 0;
+ } else if (tipc_msg_bundle(outqueue, skb, mtu)) {
link->stats.sent_bundled++;
- buf = next;
- next = buf->next;
continue;
- } else if (tipc_msg_make_bundle(&buf, mtu, link->addr)) {
+ } else if (tipc_msg_make_bundle(outqueue, skb, mtu,
+ link->addr)) {
link->stats.sent_bundled++;
link->stats.sent_bundles++;
- link->last_out->next = buf;
if (!link->next_out)
- link->next_out = buf;
+ link->next_out = skb_peek_tail(outqueue);
} else {
- link->last_out->next = buf;
+ __skb_queue_tail(outqueue, skb);
if (!link->next_out)
- link->next_out = buf;
- }
-
- /* Send packet if possible: */
- if (likely(++qsz <= sndlim)) {
- tipc_bearer_send(link->bearer_id, buf, addr);
- link->next_out = next;
- link->unacked_window = 0;
+ link->next_out = skb;
}
seqno++;
- link->last_out = buf;
- buf = next;
}
link->next_out_no = seqno;
- link->out_queue_size = qsz;
return 0;
}
+static void skb2list(struct sk_buff *skb, struct sk_buff_head *list)
+{
+ __skb_queue_head_init(list);
+ __skb_queue_tail(list, skb);
+}
+
+static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb)
+{
+ struct sk_buff_head head;
+
+ skb2list(skb, &head);
+ return __tipc_link_xmit(link, &head);
+}
+
+int tipc_link_xmit_skb(struct sk_buff *skb, u32 dnode, u32 selector)
+{
+ struct sk_buff_head head;
+
+ skb2list(skb, &head);
+ return tipc_link_xmit(&head, dnode, selector);
+}
+
/**
* tipc_link_xmit() is the general link level function for message sending
- * @buf: chain of buffers containing message
+ * @list: chain of buffers containing message
* @dsz: amount of user data to be sent
* @dnode: address of destination node
* @selector: a number used for deterministic link selection
* Consumes the buffer chain, except when returning -ELINKCONG
* Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
*/
-int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector)
+int tipc_link_xmit(struct sk_buff_head *list, u32 dnode, u32 selector)
{
struct tipc_link *link = NULL;
struct tipc_node *node;
@@ -788,17 +798,22 @@ int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector)
tipc_node_lock(node);
link = node->active_links[selector & 1];
if (link)
- rc = __tipc_link_xmit(link, buf);
+ rc = __tipc_link_xmit(link, list);
tipc_node_unlock(node);
}
if (link)
return rc;
- if (likely(in_own_node(dnode)))
- return tipc_sk_rcv(buf);
+ if (likely(in_own_node(dnode))) {
+ /* As a node local message chain never contains more than one
+ * buffer, we just need to dequeue one SKB buffer from the
+ * head list.
+ */
+ return tipc_sk_rcv(__skb_dequeue(list));
+ }
+ __skb_queue_purge(list);
- kfree_skb_list(buf);
return rc;
}
@@ -812,17 +827,17 @@ int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector)
*/
static void tipc_link_sync_xmit(struct tipc_link *link)
{
- struct sk_buff *buf;
+ struct sk_buff *skb;
struct tipc_msg *msg;
- buf = tipc_buf_acquire(INT_H_SIZE);
- if (!buf)
+ skb = tipc_buf_acquire(INT_H_SIZE);
+ if (!skb)
return;
- msg = buf_msg(buf);
+ msg = buf_msg(skb);
tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, link->addr);
msg_set_last_bcast(msg, link->owner->bclink.acked);
- __tipc_link_xmit(link, buf);
+ __tipc_link_xmit_skb(link, skb);
}
/*
@@ -842,85 +857,46 @@ static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)
kfree_skb(buf);
}
+struct sk_buff *tipc_skb_queue_next(const struct sk_buff_head *list,
+ const struct sk_buff *skb)
+{
+ if (skb_queue_is_last(list, skb))
+ return NULL;
+ return skb->next;
+}
+
/*
- * tipc_link_push_packet: Push one unsent packet to the media
+ * tipc_link_push_packets - push unsent packets to bearer
+ *
+ * Push out the unsent messages of a link where congestion
+ * has abated. Node is locked.
+ *
+ * Called with node locked
*/
-static u32 tipc_link_push_packet(struct tipc_link *l_ptr)
-{
- struct sk_buff *buf = l_ptr->first_out;
- u32 r_q_size = l_ptr->retransm_queue_size;
- u32 r_q_head = l_ptr->retransm_queue_head;
-
- /* Step to position where retransmission failed, if any, */
- /* consider that buffers may have been released in meantime */
- if (r_q_size && buf) {
- u32 last = lesser(mod(r_q_head + r_q_size),
- link_last_sent(l_ptr));
- u32 first = buf_seqno(buf);
-
- while (buf && less(first, r_q_head)) {
- first = mod(first + 1);
- buf = buf->next;
- }
- l_ptr->retransm_queue_head = r_q_head = first;
- l_ptr->retransm_queue_size = r_q_size = mod(last - first);
- }
-
- /* Continue retransmission now, if there is anything: */
- if (r_q_size && buf) {
- msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
- msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
- tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr);
- l_ptr->retransm_queue_head = mod(++r_q_head);
- l_ptr->retransm_queue_size = --r_q_size;
- l_ptr->stats.retransmitted++;
- return 0;
- }
-
- /* Send deferred protocol message, if any: */
- buf = l_ptr->proto_msg_queue;
- if (buf) {
- msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
- msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
- tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr);
- l_ptr->unacked_window = 0;
- kfree_skb(buf);
- l_ptr->proto_msg_queue = NULL;
- return 0;
- }
+void tipc_link_push_packets(struct tipc_link *l_ptr)
+{
+ struct sk_buff_head *outqueue = &l_ptr->outqueue;
+ struct sk_buff *skb = l_ptr->next_out;
+ struct tipc_msg *msg;
+ u32 next, first;
- /* Send one deferred data message, if send window not full: */
- buf = l_ptr->next_out;
- if (buf) {
- struct tipc_msg *msg = buf_msg(buf);
- u32 next = msg_seqno(msg);
- u32 first = buf_seqno(l_ptr->first_out);
+ skb_queue_walk_from(outqueue, skb) {
+ msg = buf_msg(skb);
+ next = msg_seqno(msg);
+ first = buf_seqno(skb_peek(outqueue));
if (mod(next - first) < l_ptr->queue_limit[0]) {
msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
- tipc_bearer_send(l_ptr->bearer_id, buf,
- &l_ptr->media_addr);
if (msg_user(msg) == MSG_BUNDLER)
- msg_set_type(msg, BUNDLE_CLOSED);
- l_ptr->next_out = buf->next;
- return 0;
+ TIPC_SKB_CB(skb)->bundling = false;
+ tipc_bearer_send(l_ptr->bearer_id, skb,
+ &l_ptr->media_addr);
+ l_ptr->next_out = tipc_skb_queue_next(outqueue, skb);
+ } else {
+ break;
}
}
- return 1;
-}
-
-/*
- * push_queue(): push out the unsent messages of a link where
- * congestion has abated. Node is locked
- */
-void tipc_link_push_queue(struct tipc_link *l_ptr)
-{
- u32 res;
-
- do {
- res = tipc_link_push_packet(l_ptr);
- } while (!res);
}
void tipc_link_reset_all(struct tipc_node *node)
@@ -984,20 +960,20 @@ static void link_retransmit_failure(struct tipc_link *l_ptr,
}
}
-void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,
+void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,
u32 retransmits)
{
struct tipc_msg *msg;
- if (!buf)
+ if (!skb)
return;
- msg = buf_msg(buf);
+ msg = buf_msg(skb);
/* Detect repeated retransmit failures */
if (l_ptr->last_retransmitted == msg_seqno(msg)) {
if (++l_ptr->stale_count > 100) {
- link_retransmit_failure(l_ptr, buf);
+ link_retransmit_failure(l_ptr, skb);
return;
}
} else {
@@ -1005,38 +981,29 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,
l_ptr->stale_count = 1;
}
- while (retransmits && (buf != l_ptr->next_out) && buf) {
- msg = buf_msg(buf);
+ skb_queue_walk_from(&l_ptr->outqueue, skb) {
+ if (!retransmits || skb == l_ptr->next_out)
+ break;
+ msg = buf_msg(skb);
msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
- tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr);
- buf = buf->next;
+ tipc_bearer_send(l_ptr->bearer_id, skb, &l_ptr->media_addr);
retransmits--;
l_ptr->stats.retransmitted++;
}
-
- l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0;
}
-/**
- * link_insert_deferred_queue - insert deferred messages back into receive chain
- */
-static struct sk_buff *link_insert_deferred_queue(struct tipc_link *l_ptr,
- struct sk_buff *buf)
+static void link_retrieve_defq(struct tipc_link *link,
+ struct sk_buff_head *list)
{
u32 seq_no;
- if (l_ptr->oldest_deferred_in == NULL)
- return buf;
+ if (skb_queue_empty(&link->deferred_queue))
+ return;
- seq_no = buf_seqno(l_ptr->oldest_deferred_in);
- if (seq_no == mod(l_ptr->next_in_no)) {
- l_ptr->newest_deferred_in->next = buf;
- buf = l_ptr->oldest_deferred_in;
- l_ptr->oldest_deferred_in = NULL;
- l_ptr->deferred_inqueue_sz = 0;
- }
- return buf;
+ seq_no = buf_seqno(skb_peek(&link->deferred_queue));
+ if (seq_no == mod(link->next_in_no))
+ skb_queue_splice_tail_init(&link->deferred_queue, list);
}
/**
@@ -1096,43 +1063,42 @@ static int link_recv_buf_validate(struct sk_buff *buf)
/**
* tipc_rcv - process TIPC packets/messages arriving from off-node
- * @head: pointer to message buffer chain
+ * @skb: TIPC packet
* @b_ptr: pointer to bearer message arrived on
*
* Invoked with no locks held. Bearer pointer must point to a valid bearer
* structure (i.e. cannot be NULL), but bearer can be inactive.
*/
-void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
+void tipc_rcv(struct sk_buff *skb, struct tipc_bearer *b_ptr)
{
- while (head) {
- struct tipc_node *n_ptr;
- struct tipc_link *l_ptr;
- struct sk_buff *crs;
- struct sk_buff *buf = head;
- struct tipc_msg *msg;
- u32 seq_no;
- u32 ackd;
- u32 released = 0;
+ struct sk_buff_head head;
+ struct tipc_node *n_ptr;
+ struct tipc_link *l_ptr;
+ struct sk_buff *skb1, *tmp;
+ struct tipc_msg *msg;
+ u32 seq_no;
+ u32 ackd;
+ u32 released;
- head = head->next;
- buf->next = NULL;
+ skb2list(skb, &head);
+ while ((skb = __skb_dequeue(&head))) {
/* Ensure message is well-formed */
- if (unlikely(!link_recv_buf_validate(buf)))
+ if (unlikely(!link_recv_buf_validate(skb)))
goto discard;
/* Ensure message data is a single contiguous unit */
- if (unlikely(skb_linearize(buf)))
+ if (unlikely(skb_linearize(skb)))
goto discard;
/* Handle arrival of a non-unicast link message */
- msg = buf_msg(buf);
+ msg = buf_msg(skb);
if (unlikely(msg_non_seq(msg))) {
if (msg_user(msg) == LINK_CONFIG)
- tipc_disc_rcv(buf, b_ptr);
+ tipc_disc_rcv(skb, b_ptr);
else
- tipc_bclink_rcv(buf);
+ tipc_bclink_rcv(skb);
continue;
}
@@ -1171,22 +1137,19 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
if (n_ptr->bclink.recv_permitted)
tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
- crs = l_ptr->first_out;
- while ((crs != l_ptr->next_out) &&
- less_eq(buf_seqno(crs), ackd)) {
- struct sk_buff *next = crs->next;
- kfree_skb(crs);
- crs = next;
- released++;
- }
- if (released) {
- l_ptr->first_out = crs;
- l_ptr->out_queue_size -= released;
+ released = 0;
+ skb_queue_walk_safe(&l_ptr->outqueue, skb1, tmp) {
+ if (skb1 == l_ptr->next_out ||
+ more(buf_seqno(skb1), ackd))
+ break;
+ __skb_unlink(skb1, &l_ptr->outqueue);
+ kfree_skb(skb1);
+ released = 1;
}
/* Try sending any messages link endpoint has pending */
if (unlikely(l_ptr->next_out))
- tipc_link_push_queue(l_ptr);
+ tipc_link_push_packets(l_ptr);
if (released && !skb_queue_empty(&l_ptr->waiting_sks)) {
link_prepare_wakeup(l_ptr);
@@ -1196,8 +1159,8 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
/* Process the incoming packet */
if (unlikely(!link_working_working(l_ptr))) {
if (msg_user(msg) == LINK_PROTOCOL) {
- tipc_link_proto_rcv(l_ptr, buf);
- head = link_insert_deferred_queue(l_ptr, head);
+ tipc_link_proto_rcv(l_ptr, skb);
+ link_retrieve_defq(l_ptr, &head);
tipc_node_unlock(n_ptr);
continue;
}
@@ -1207,8 +1170,7 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
if (link_working_working(l_ptr)) {
/* Re-insert buffer in front of queue */
- buf->next = head;
- head = buf;
+ __skb_queue_head(&head, skb);
tipc_node_unlock(n_ptr);
continue;
}
@@ -1217,33 +1179,33 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
/* Link is now in state WORKING_WORKING */
if (unlikely(seq_no != mod(l_ptr->next_in_no))) {
- link_handle_out_of_seq_msg(l_ptr, buf);
- head = link_insert_deferred_queue(l_ptr, head);
+ link_handle_out_of_seq_msg(l_ptr, skb);
+ link_retrieve_defq(l_ptr, &head);
tipc_node_unlock(n_ptr);
continue;
}
l_ptr->next_in_no++;
- if (unlikely(l_ptr->oldest_deferred_in))
- head = link_insert_deferred_queue(l_ptr, head);
+ if (unlikely(!skb_queue_empty(&l_ptr->deferred_queue)))
+ link_retrieve_defq(l_ptr, &head);
if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
l_ptr->stats.sent_acks++;
tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
}
- if (tipc_link_prepare_input(l_ptr, &buf)) {
+ if (tipc_link_prepare_input(l_ptr, &skb)) {
tipc_node_unlock(n_ptr);
continue;
}
tipc_node_unlock(n_ptr);
- msg = buf_msg(buf);
- if (tipc_link_input(l_ptr, buf) != 0)
+
+ if (tipc_link_input(l_ptr, skb) != 0)
goto discard;
continue;
unlock_discard:
tipc_node_unlock(n_ptr);
discard:
- kfree_skb(buf);
+ kfree_skb(skb);
}
}
@@ -1326,48 +1288,37 @@ static int tipc_link_input(struct tipc_link *l, struct sk_buff *buf)
*
* Returns increase in queue length (i.e. 0 or 1)
*/
-u32 tipc_link_defer_pkt(struct sk_buff **head, struct sk_buff **tail,
- struct sk_buff *buf)
+u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *skb)
{
- struct sk_buff *queue_buf;
- struct sk_buff **prev;
- u32 seq_no = buf_seqno(buf);
-
- buf->next = NULL;
+ struct sk_buff *skb1;
+ u32 seq_no = buf_seqno(skb);
/* Empty queue ? */
- if (*head == NULL) {
- *head = *tail = buf;
+ if (skb_queue_empty(list)) {
+ __skb_queue_tail(list, skb);
return 1;
}
/* Last ? */
- if (less(buf_seqno(*tail), seq_no)) {
- (*tail)->next = buf;
- *tail = buf;
+ if (less(buf_seqno(skb_peek_tail(list)), seq_no)) {
+ __skb_queue_tail(list, skb);
return 1;
}
/* Locate insertion point in queue, then insert; discard if duplicate */
- prev = head;
- queue_buf = *head;
- for (;;) {
- u32 curr_seqno = buf_seqno(queue_buf);
+ skb_queue_walk(list, skb1) {
+ u32 curr_seqno = buf_seqno(skb1);
if (seq_no == curr_seqno) {
- kfree_skb(buf);
+ kfree_skb(skb);
return 0;
}
if (less(seq_no, curr_seqno))
break;
-
- prev = &queue_buf->next;
- queue_buf = queue_buf->next;
}
- buf->next = queue_buf;
- *prev = buf;
+ __skb_queue_before(list, skb1, skb);
return 1;
}
@@ -1397,15 +1348,14 @@ static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
return;
}
- if (tipc_link_defer_pkt(&l_ptr->oldest_deferred_in,
- &l_ptr->newest_deferred_in, buf)) {
- l_ptr->deferred_inqueue_sz++;
+ if (tipc_link_defer_pkt(&l_ptr->deferred_queue, buf)) {
l_ptr->stats.deferred_recv++;
TIPC_SKB_CB(buf)->deferred = true;
- if ((l_ptr->deferred_inqueue_sz % 16) == 1)
+ if ((skb_queue_len(&l_ptr->deferred_queue) % 16) == 1)
tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
- } else
+ } else {
l_ptr->stats.duplicates++;
+ }
}
/*
@@ -1419,12 +1369,6 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,
u32 msg_size = sizeof(l_ptr->proto_msg);
int r_flag;
- /* Discard any previous message that was deferred due to congestion */
- if (l_ptr->proto_msg_queue) {
- kfree_skb(l_ptr->proto_msg_queue);
- l_ptr->proto_msg_queue = NULL;
- }
-
/* Don't send protocol message during link changeover */
if (l_ptr->exp_msg_count)
return;
@@ -1447,8 +1391,8 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,
if (l_ptr->next_out)
next_sent = buf_seqno(l_ptr->next_out);
msg_set_next_sent(msg, next_sent);
- if (l_ptr->oldest_deferred_in) {
- u32 rec = buf_seqno(l_ptr->oldest_deferred_in);
+ if (!skb_queue_empty(&l_ptr->deferred_queue)) {
+ u32 rec = buf_seqno(skb_peek(&l_ptr->deferred_queue));
gap = mod(rec - mod(l_ptr->next_in_no));
}
msg_set_seq_gap(msg, gap);
@@ -1636,7 +1580,7 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf)
}
if (msg_seq_gap(msg)) {
l_ptr->stats.recv_nacks++;
- tipc_link_retransmit(l_ptr, l_ptr->first_out,
+ tipc_link_retransmit(l_ptr, skb_peek(&l_ptr->outqueue),
msg_seq_gap(msg));
}
break;
@@ -1655,7 +1599,7 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
u32 selector)
{
struct tipc_link *tunnel;
- struct sk_buff *buf;
+ struct sk_buff *skb;
u32 length = msg_size(msg);
tunnel = l_ptr->owner->active_links[selector & 1];
@@ -1664,14 +1608,14 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
return;
}
msg_set_size(tunnel_hdr, length + INT_H_SIZE);
- buf = tipc_buf_acquire(length + INT_H_SIZE);
- if (!buf) {
+ skb = tipc_buf_acquire(length + INT_H_SIZE);
+ if (!skb) {
pr_warn("%sunable to send tunnel msg\n", link_co_err);
return;
}
- skb_copy_to_linear_data(buf, tunnel_hdr, INT_H_SIZE);
- skb_copy_to_linear_data_offset(buf, INT_H_SIZE, msg, length);
- __tipc_link_xmit(tunnel, buf);
+ skb_copy_to_linear_data(skb, tunnel_hdr, INT_H_SIZE);
+ skb_copy_to_linear_data_offset(skb, INT_H_SIZE, msg, length);
+ __tipc_link_xmit_skb(tunnel, skb);
}
@@ -1683,10 +1627,10 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
*/
void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
{
- u32 msgcount = l_ptr->out_queue_size;
- struct sk_buff *crs = l_ptr->first_out;
+ u32 msgcount = skb_queue_len(&l_ptr->outqueue);
struct tipc_link *tunnel = l_ptr->owner->active_links[0];
struct tipc_msg tunnel_hdr;
+ struct sk_buff *skb;
int split_bundles;
if (!tunnel)
@@ -1697,14 +1641,12 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
msg_set_msgcnt(&tunnel_hdr, msgcount);
- if (!l_ptr->first_out) {
- struct sk_buff *buf;
-
- buf = tipc_buf_acquire(INT_H_SIZE);
- if (buf) {
- skb_copy_to_linear_data(buf, &tunnel_hdr, INT_H_SIZE);
+ if (skb_queue_empty(&l_ptr->outqueue)) {
+ skb = tipc_buf_acquire(INT_H_SIZE);
+ if (skb) {
+ skb_copy_to_linear_data(skb, &tunnel_hdr, INT_H_SIZE);
msg_set_size(&tunnel_hdr, INT_H_SIZE);
- __tipc_link_xmit(tunnel, buf);
+ __tipc_link_xmit_skb(tunnel, skb);
} else {
pr_warn("%sunable to send changeover msg\n",
link_co_err);
@@ -1715,8 +1657,8 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
split_bundles = (l_ptr->owner->active_links[0] !=
l_ptr->owner->active_links[1]);
- while (crs) {
- struct tipc_msg *msg = buf_msg(crs);
+ skb_queue_walk(&l_ptr->outqueue, skb) {
+ struct tipc_msg *msg = buf_msg(skb);
if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) {
struct tipc_msg *m = msg_get_wrapped(msg);
@@ -1734,7 +1676,6 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, msg,
msg_link_selector(msg));
}
- crs = crs->next;
}
}
@@ -1750,17 +1691,16 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr,
struct tipc_link *tunnel)
{
- struct sk_buff *iter;
+ struct sk_buff *skb;
struct tipc_msg tunnel_hdr;
tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr);
- msg_set_msgcnt(&tunnel_hdr, l_ptr->out_queue_size);
+ msg_set_msgcnt(&tunnel_hdr, skb_queue_len(&l_ptr->outqueue));
msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
- iter = l_ptr->first_out;
- while (iter) {
- struct sk_buff *outbuf;
- struct tipc_msg *msg = buf_msg(iter);
+ skb_queue_walk(&l_ptr->outqueue, skb) {
+ struct sk_buff *outskb;
+ struct tipc_msg *msg = buf_msg(skb);
u32 length = msg_size(msg);
if (msg_user(msg) == MSG_BUNDLER)
@@ -1768,19 +1708,18 @@ void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr,
msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); /* Update */
msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
msg_set_size(&tunnel_hdr, length + INT_H_SIZE);
- outbuf = tipc_buf_acquire(length + INT_H_SIZE);
- if (outbuf == NULL) {
+ outskb = tipc_buf_acquire(length + INT_H_SIZE);
+ if (outskb == NULL) {
pr_warn("%sunable to send duplicate msg\n",
link_co_err);
return;
}
- skb_copy_to_linear_data(outbuf, &tunnel_hdr, INT_H_SIZE);
- skb_copy_to_linear_data_offset(outbuf, INT_H_SIZE, iter->data,
+ skb_copy_to_linear_data(outskb, &tunnel_hdr, INT_H_SIZE);
+ skb_copy_to_linear_data_offset(outskb, INT_H_SIZE, skb->data,
length);
- __tipc_link_xmit(tunnel, outbuf);
+ __tipc_link_xmit_skb(tunnel, outskb);
if (!tipc_link_is_up(l_ptr))
return;
- iter = iter->next;
}
}
@@ -2375,3 +2314,435 @@ static void link_print(struct tipc_link *l_ptr, const char *str)
else
pr_cont("\n");
}
+
+/* Parse and validate nested (link) properties valid for media, bearer and link
+ */
+int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[])
+{
+ int err;
+
+ err = nla_parse_nested(props, TIPC_NLA_PROP_MAX, prop,
+ tipc_nl_prop_policy);
+ if (err)
+ return err;
+
+ if (props[TIPC_NLA_PROP_PRIO]) {
+ u32 prio;
+
+ prio = nla_get_u32(props[TIPC_NLA_PROP_PRIO]);
+ if (prio > TIPC_MAX_LINK_PRI)
+ return -EINVAL;
+ }
+
+ if (props[TIPC_NLA_PROP_TOL]) {
+ u32 tol;
+
+ tol = nla_get_u32(props[TIPC_NLA_PROP_TOL]);
+ if ((tol < TIPC_MIN_LINK_TOL) || (tol > TIPC_MAX_LINK_TOL))
+ return -EINVAL;
+ }
+
+ if (props[TIPC_NLA_PROP_WIN]) {
+ u32 win;
+
+ win = nla_get_u32(props[TIPC_NLA_PROP_WIN]);
+ if ((win < TIPC_MIN_LINK_WIN) || (win > TIPC_MAX_LINK_WIN))
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
+int tipc_nl_link_set(struct sk_buff *skb, struct genl_info *info)
+{
+ int err;
+ int res = 0;
+ int bearer_id;
+ char *name;
+ struct tipc_link *link;
+ struct tipc_node *node;
+ struct nlattr *attrs[TIPC_NLA_LINK_MAX + 1];
+
+ if (!info->attrs[TIPC_NLA_LINK])
+ return -EINVAL;
+
+ err = nla_parse_nested(attrs, TIPC_NLA_LINK_MAX,
+ info->attrs[TIPC_NLA_LINK],
+ tipc_nl_link_policy);
+ if (err)
+ return err;
+
+ if (!attrs[TIPC_NLA_LINK_NAME])
+ return -EINVAL;
+
+ name = nla_data(attrs[TIPC_NLA_LINK_NAME]);
+
+ node = tipc_link_find_owner(name, &bearer_id);
+ if (!node)
+ return -EINVAL;
+
+ tipc_node_lock(node);
+
+ link = node->links[bearer_id];
+ if (!link) {
+ res = -EINVAL;
+ goto out;
+ }
+
+ if (attrs[TIPC_NLA_LINK_PROP]) {
+ struct nlattr *props[TIPC_NLA_PROP_MAX + 1];
+
+ err = tipc_nl_parse_link_prop(attrs[TIPC_NLA_LINK_PROP],
+ props);
+ if (err) {
+ res = err;
+ goto out;
+ }
+
+ if (props[TIPC_NLA_PROP_TOL]) {
+ u32 tol;
+
+ tol = nla_get_u32(props[TIPC_NLA_PROP_TOL]);
+ link_set_supervision_props(link, tol);
+ tipc_link_proto_xmit(link, STATE_MSG, 0, 0, tol, 0, 0);
+ }
+ if (props[TIPC_NLA_PROP_PRIO]) {
+ u32 prio;
+
+ prio = nla_get_u32(props[TIPC_NLA_PROP_PRIO]);
+ link->priority = prio;
+ tipc_link_proto_xmit(link, STATE_MSG, 0, 0, 0, prio, 0);
+ }
+ if (props[TIPC_NLA_PROP_WIN]) {
+ u32 win;
+
+ win = nla_get_u32(props[TIPC_NLA_PROP_WIN]);
+ tipc_link_set_queue_limits(link, win);
+ }
+ }
+
+out:
+ tipc_node_unlock(node);
+
+ return res;
+}
+
+static int __tipc_nl_add_stats(struct sk_buff *skb, struct tipc_stats *s)
+{
+ int i;
+ struct nlattr *stats;
+
+ struct nla_map {
+ u32 key;
+ u32 val;
+ };
+
+ struct nla_map map[] = {
+ {TIPC_NLA_STATS_RX_INFO, s->recv_info},
+ {TIPC_NLA_STATS_RX_FRAGMENTS, s->recv_fragments},
+ {TIPC_NLA_STATS_RX_FRAGMENTED, s->recv_fragmented},
+ {TIPC_NLA_STATS_RX_BUNDLES, s->recv_bundles},
+ {TIPC_NLA_STATS_RX_BUNDLED, s->recv_bundled},
+ {TIPC_NLA_STATS_TX_INFO, s->sent_info},
+ {TIPC_NLA_STATS_TX_FRAGMENTS, s->sent_fragments},
+ {TIPC_NLA_STATS_TX_FRAGMENTED, s->sent_fragmented},
+ {TIPC_NLA_STATS_TX_BUNDLES, s->sent_bundles},
+ {TIPC_NLA_STATS_TX_BUNDLED, s->sent_bundled},
+ {TIPC_NLA_STATS_MSG_PROF_TOT, (s->msg_length_counts) ?
+ s->msg_length_counts : 1},
+ {TIPC_NLA_STATS_MSG_LEN_CNT, s->msg_length_counts},
+ {TIPC_NLA_STATS_MSG_LEN_TOT, s->msg_lengths_total},
+ {TIPC_NLA_STATS_MSG_LEN_P0, s->msg_length_profile[0]},
+ {TIPC_NLA_STATS_MSG_LEN_P1, s->msg_length_profile[1]},
+ {TIPC_NLA_STATS_MSG_LEN_P2, s->msg_length_profile[2]},
+ {TIPC_NLA_STATS_MSG_LEN_P3, s->msg_length_profile[3]},
+ {TIPC_NLA_STATS_MSG_LEN_P4, s->msg_length_profile[4]},
+ {TIPC_NLA_STATS_MSG_LEN_P5, s->msg_length_profile[5]},
+ {TIPC_NLA_STATS_MSG_LEN_P6, s->msg_length_profile[6]},
+ {TIPC_NLA_STATS_RX_STATES, s->recv_states},
+ {TIPC_NLA_STATS_RX_PROBES, s->recv_probes},
+ {TIPC_NLA_STATS_RX_NACKS, s->recv_nacks},
+ {TIPC_NLA_STATS_RX_DEFERRED, s->deferred_recv},
+ {TIPC_NLA_STATS_TX_STATES, s->sent_states},
+ {TIPC_NLA_STATS_TX_PROBES, s->sent_probes},
+ {TIPC_NLA_STATS_TX_NACKS, s->sent_nacks},
+ {TIPC_NLA_STATS_TX_ACKS, s->sent_acks},
+ {TIPC_NLA_STATS_RETRANSMITTED, s->retransmitted},
+ {TIPC_NLA_STATS_DUPLICATES, s->duplicates},
+ {TIPC_NLA_STATS_LINK_CONGS, s->link_congs},
+ {TIPC_NLA_STATS_MAX_QUEUE, s->max_queue_sz},
+ {TIPC_NLA_STATS_AVG_QUEUE, s->queue_sz_counts ?
+ (s->accu_queue_sz / s->queue_sz_counts) : 0}
+ };
+
+ stats = nla_nest_start(skb, TIPC_NLA_LINK_STATS);
+ if (!stats)
+ return -EMSGSIZE;
+
+ for (i = 0; i < ARRAY_SIZE(map); i++)
+ if (nla_put_u32(skb, map[i].key, map[i].val))
+ goto msg_full;
+
+ nla_nest_end(skb, stats);
+
+ return 0;
+msg_full:
+ nla_nest_cancel(skb, stats);
+
+ return -EMSGSIZE;
+}
+
+/* Caller should hold appropriate locks to protect the link */
+static int __tipc_nl_add_link(struct tipc_nl_msg *msg, struct tipc_link *link)
+{
+ int err;
+ void *hdr;
+ struct nlattr *attrs;
+ struct nlattr *prop;
+
+ hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_v2_family,
+ NLM_F_MULTI, TIPC_NL_LINK_GET);
+ if (!hdr)
+ return -EMSGSIZE;
+
+ attrs = nla_nest_start(msg->skb, TIPC_NLA_LINK);
+ if (!attrs)
+ goto msg_full;
+
+ if (nla_put_string(msg->skb, TIPC_NLA_LINK_NAME, link->name))
+ goto attr_msg_full;
+ if (nla_put_u32(msg->skb, TIPC_NLA_LINK_DEST,
+ tipc_cluster_mask(tipc_own_addr)))
+ goto attr_msg_full;
+ if (nla_put_u32(msg->skb, TIPC_NLA_LINK_MTU, link->max_pkt))
+ goto attr_msg_full;
+ if (nla_put_u32(msg->skb, TIPC_NLA_LINK_RX, link->next_in_no))
+ goto attr_msg_full;
+ if (nla_put_u32(msg->skb, TIPC_NLA_LINK_TX, link->next_out_no))
+ goto attr_msg_full;
+
+ if (tipc_link_is_up(link))
+ if (nla_put_flag(msg->skb, TIPC_NLA_LINK_UP))
+ goto attr_msg_full;
+ if (tipc_link_is_active(link))
+ if (nla_put_flag(msg->skb, TIPC_NLA_LINK_ACTIVE))
+ goto attr_msg_full;
+
+ prop = nla_nest_start(msg->skb, TIPC_NLA_LINK_PROP);
+ if (!prop)
+ goto attr_msg_full;
+ if (nla_put_u32(msg->skb, TIPC_NLA_PROP_PRIO, link->priority))
+ goto prop_msg_full;
+ if (nla_put_u32(msg->skb, TIPC_NLA_PROP_TOL, link->tolerance))
+ goto prop_msg_full;
+ if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN,
+ link->queue_limit[TIPC_LOW_IMPORTANCE]))
+ goto prop_msg_full;
+ if (nla_put_u32(msg->skb, TIPC_NLA_PROP_PRIO, link->priority))
+ goto prop_msg_full;
+ nla_nest_end(msg->skb, prop);
+
+ err = __tipc_nl_add_stats(msg->skb, &link->stats);
+ if (err)
+ goto attr_msg_full;
+
+ nla_nest_end(msg->skb, attrs);
+ genlmsg_end(msg->skb, hdr);
+
+ return 0;
+
+prop_msg_full:
+ nla_nest_cancel(msg->skb, prop);
+attr_msg_full:
+ nla_nest_cancel(msg->skb, attrs);
+msg_full:
+ genlmsg_cancel(msg->skb, hdr);
+
+ return -EMSGSIZE;
+}
+
+/* Caller should hold node lock */
+static int __tipc_nl_add_node_links(struct tipc_nl_msg *msg,
+ struct tipc_node *node,
+ u32 *prev_link)
+{
+ u32 i;
+ int err;
+
+ for (i = *prev_link; i < MAX_BEARERS; i++) {
+ *prev_link = i;
+
+ if (!node->links[i])
+ continue;
+
+ err = __tipc_nl_add_link(msg, node->links[i]);
+ if (err)
+ return err;
+ }
+ *prev_link = 0;
+
+ return 0;
+}
+
+int tipc_nl_link_dump(struct sk_buff *skb, struct netlink_callback *cb)
+{
+ struct tipc_node *node;
+ struct tipc_nl_msg msg;
+ u32 prev_node = cb->args[0];
+ u32 prev_link = cb->args[1];
+ int done = cb->args[2];
+ int err;
+
+ if (done)
+ return 0;
+
+ msg.skb = skb;
+ msg.portid = NETLINK_CB(cb->skb).portid;
+ msg.seq = cb->nlh->nlmsg_seq;
+
+ rcu_read_lock();
+
+ if (prev_node) {
+ node = tipc_node_find(prev_node);
+ if (!node) {
+ /* We never set seq or call nl_dump_check_consistent()
+ * this means that setting prev_seq here will cause the
+ * consistence check to fail in the netlink callback
+ * handler. Resulting in the last NLMSG_DONE message
+ * having the NLM_F_DUMP_INTR flag set.
+ */
+ cb->prev_seq = 1;
+ goto out;
+ }
+
+ list_for_each_entry_continue_rcu(node, &tipc_node_list, list) {
+ tipc_node_lock(node);
+ err = __tipc_nl_add_node_links(&msg, node, &prev_link);
+ tipc_node_unlock(node);
+ if (err)
+ goto out;
+
+ prev_node = node->addr;
+ }
+ } else {
+ err = tipc_nl_add_bc_link(&msg);
+ if (err)
+ goto out;
+
+ list_for_each_entry_rcu(node, &tipc_node_list, list) {
+ tipc_node_lock(node);
+ err = __tipc_nl_add_node_links(&msg, node, &prev_link);
+ tipc_node_unlock(node);
+ if (err)
+ goto out;
+
+ prev_node = node->addr;
+ }
+ }
+ done = 1;
+out:
+ rcu_read_unlock();
+
+ cb->args[0] = prev_node;
+ cb->args[1] = prev_link;
+ cb->args[2] = done;
+
+ return skb->len;
+}
+
+int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info)
+{
+ struct sk_buff *ans_skb;
+ struct tipc_nl_msg msg;
+ struct tipc_link *link;
+ struct tipc_node *node;
+ char *name;
+ int bearer_id;
+ int err;
+
+ if (!info->attrs[TIPC_NLA_LINK_NAME])
+ return -EINVAL;
+
+ name = nla_data(info->attrs[TIPC_NLA_LINK_NAME]);
+ node = tipc_link_find_owner(name, &bearer_id);
+ if (!node)
+ return -EINVAL;
+
+ ans_skb = nlmsg_new(NLMSG_GOODSIZE, GFP_KERNEL);
+ if (!ans_skb)
+ return -ENOMEM;
+
+ msg.skb = ans_skb;
+ msg.portid = info->snd_portid;
+ msg.seq = info->snd_seq;
+
+ tipc_node_lock(node);
+ link = node->links[bearer_id];
+ if (!link) {
+ err = -EINVAL;
+ goto err_out;
+ }
+
+ err = __tipc_nl_add_link(&msg, link);
+ if (err)
+ goto err_out;
+
+ tipc_node_unlock(node);
+
+ return genlmsg_reply(ans_skb, info);
+
+err_out:
+ tipc_node_unlock(node);
+ nlmsg_free(ans_skb);
+
+ return err;
+}
+
+int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info)
+{
+ int err;
+ char *link_name;
+ unsigned int bearer_id;
+ struct tipc_link *link;
+ struct tipc_node *node;
+ struct nlattr *attrs[TIPC_NLA_LINK_MAX + 1];
+
+ if (!info->attrs[TIPC_NLA_LINK])
+ return -EINVAL;
+
+ err = nla_parse_nested(attrs, TIPC_NLA_LINK_MAX,
+ info->attrs[TIPC_NLA_LINK],
+ tipc_nl_link_policy);
+ if (err)
+ return err;
+
+ if (!attrs[TIPC_NLA_LINK_NAME])
+ return -EINVAL;
+
+ link_name = nla_data(attrs[TIPC_NLA_LINK_NAME]);
+
+ if (strcmp(link_name, tipc_bclink_name) == 0) {
+ err = tipc_bclink_reset_stats();
+ if (err)
+ return err;
+ return 0;
+ }
+
+ node = tipc_link_find_owner(link_name, &bearer_id);
+ if (!node)
+ return -EINVAL;
+
+ tipc_node_lock(node);
+
+ link = node->links[bearer_id];
+ if (!link) {
+ tipc_node_unlock(node);
+ return -EINVAL;
+ }
+
+ link_reset_statistics(link);
+
+ tipc_node_unlock(node);
+
+ return 0;
+}