diff options
author | Daniel Borkmann <daniel@iogearbox.net> | 2018-06-05 15:52:30 +0200 |
---|---|---|
committer | Daniel Borkmann <daniel@iogearbox.net> | 2018-06-05 15:58:07 +0200 |
commit | 9fa06104a235f64d6a2bf3012cc9966e8e4be5eb (patch) | |
tree | 2b2c63350de9c72b786004ec5c01ce2bc0dea97b | |
parent | Merge branch 'bpf-xdp-remove-xdp-flush' (diff) | |
parent | samples/bpf: xdpsock: use skb Tx path for XDP_SKB (diff) | |
download | linux-9fa06104a235f64d6a2bf3012cc9966e8e4be5eb.tar.xz linux-9fa06104a235f64d6a2bf3012cc9966e8e4be5eb.zip |
Merge branch 'bpf-af-xdp-zc-api'
Björn Töpel says:
====================
This patch serie introduces zerocopy (ZC) support for
AF_XDP. Programs using AF_XDP sockets will now receive RX packets
without any copies and can also transmit packets without incurring any
copies. No modifications to the application are needed, but the NIC
driver needs to be modified to support ZC. If ZC is not supported by
the driver, the modes introduced in the AF_XDP patch will be
used. Using ZC in our micro benchmarks results in significantly
improved performance as can be seen in the performance section later
in this cover letter.
Note that for an untrusted application, HW packet steering to a
specific queue pair (the one associated with the application) is a
requirement when using ZC, as the application would otherwise be able
to see other user space processes' packets. If the HW cannot support
the required packet steering you need to use the XDP_SKB mode or the
XDP_DRV mode without ZC turned on. The XSKMAP introduced in the AF_XDP
patch set can be used to do load balancing in that case.
For benchmarking, you can use the xdpsock application from the AF_XDP
patch set without any modifications. Say that you would like your UDP
traffic from port 4242 to end up in queue 16, that we will enable
AF_XDP on. Here, we use ethtool for this:
ethtool -N p3p2 rx-flow-hash udp4 fn
ethtool -N p3p2 flow-type udp4 src-port 4242 dst-port 4242 \
action 16
Running the rxdrop benchmark in XDP_DRV mode with zerocopy can then be
done using:
samples/bpf/xdpsock -i p3p2 -q 16 -r -N
We have run some benchmarks on a dual socket system with two Broadwell
E5 2660 @ 2.0 GHz with hyperthreading turned off. Each socket has 14
cores which gives a total of 28, but only two cores are used in these
experiments. One for TR/RX and one for the user space application. The
memory is DDR4 @ 2133 MT/s (1067 MHz) and the size of each DIMM is
8192MB and with 8 of those DIMMs in the system we have 64 GB of total
memory. The compiler used is gcc (Ubuntu 7.3.0-16ubuntu3) 7.3.0. The
NIC is Intel I40E 40Gbit/s using the i40e driver.
Below are the results in Mpps of the I40E NIC benchmark runs for 64
and 1500 byte packets, generated by a commercial packet generator HW
outputing packets at full 40 Gbit/s line rate. The results are without
retpoline so that we can compare against previous numbers.
AF_XDP performance 64 byte packets. Results from the AF_XDP V3 patch
set are also reported for ease of reference. The numbers within
parantheses are from the RFC V1 ZC patch set.
Benchmark XDP_SKB XDP_DRV XDP_DRV with zerocopy
rxdrop 2.9* 9.6* 21.1(21.5)
txpush 2.6* - 22.0(21.6)
l2fwd 1.9* 2.5* 15.3(15.0)
AF_XDP performance 1500 byte packets:
Benchmark XDP_SKB XDP_DRV XDP_DRV with zerocopy
rxdrop 2.1* 3.3* 3.3(3.3)
l2fwd 1.4* 1.8* 3.1(3.1)
* From AF_XDP V3 patch set and cover letter.
So why do we not get higher values for RX similar to the 34 Mpps we
had in AF_PACKET V4? We made an experiment running the rxdrop
benchmark without using the xdp_do_redirect/flush infrastructure nor
using an XDP program (all traffic on a queue goes to one
socket). Instead the driver acts directly on the AF_XDP socket. With
this we got 36.9 Mpps, a significant improvement without any change to
the uapi. So not forcing users to have an XDP program if they do not
need it, might be a good idea. This measurement is actually higher
than what we got with AF_PACKET V4.
XDP performance on our system as a base line:
64 byte packets:
XDP stats CPU pps issue-pps
XDP-RX CPU 16 32.3M 0
1500 byte packets:
XDP stats CPU pps issue-pps
XDP-RX CPU 16 3.3M 0
The structure of the patch set is as follows:
Patches 1-3: Plumbing for AF_XDP ZC support
Patches 4-5: AF_XDP ZC for RX
Patches 6-7: AF_XDP ZC for TX
Patch 8-10: ZC support for i40e.
Patch 11: Use the bind flags in sample application to force TX skb
path when -S is providedd on the command line.
This patch set is based on the new uapi introduced in "AF_XDP: bug
fixes and descriptor changes". You need to apply that patch set
first, before applying this one.
We based this patch set on bpf-next commit bd3a08aaa9a3 ("bpf:
flowlabel in bpf_fib_lookup should be flowinfo")
Comments:
* Implementing dynamic creation and deletion of queues in the i40e
driver would facilitate the coexistence of xdp_redirect and af_xdp.
Thanks: Björn and Magnus
====================
Note: as agreed upon, i40e/zc bits will be routed via Jeff's tree.
Acked-by: Alexei Starovoitov <ast@kernel.org>
Signed-off-by: Daniel Borkmann <daniel@iogearbox.net>
-rw-r--r-- | include/linux/netdevice.h | 10 | ||||
-rw-r--r-- | include/net/xdp.h | 10 | ||||
-rw-r--r-- | include/net/xdp_sock.h | 44 | ||||
-rw-r--r-- | include/uapi/linux/if_xdp.h | 4 | ||||
-rw-r--r-- | net/core/xdp.c | 19 | ||||
-rw-r--r-- | net/xdp/xdp_umem.c | 118 | ||||
-rw-r--r-- | net/xdp/xdp_umem.h | 34 | ||||
-rw-r--r-- | net/xdp/xsk.c | 166 | ||||
-rw-r--r-- | net/xdp/xsk_queue.h | 35 | ||||
-rw-r--r-- | samples/bpf/xdpsock_user.c | 5 |
10 files changed, 384 insertions, 61 deletions
diff --git a/include/linux/netdevice.h b/include/linux/netdevice.h index 42c6ea35a6f2..03ffeadf8a41 100644 --- a/include/linux/netdevice.h +++ b/include/linux/netdevice.h @@ -817,10 +817,13 @@ enum bpf_netdev_command { BPF_OFFLOAD_DESTROY, BPF_OFFLOAD_MAP_ALLOC, BPF_OFFLOAD_MAP_FREE, + XDP_QUERY_XSK_UMEM, + XDP_SETUP_XSK_UMEM, }; struct bpf_prog_offload_ops; struct netlink_ext_ack; +struct xdp_umem; struct netdev_bpf { enum bpf_netdev_command command; @@ -851,6 +854,11 @@ struct netdev_bpf { struct { struct bpf_offloaded_map *offmap; }; + /* XDP_SETUP_XSK_UMEM */ + struct { + struct xdp_umem *umem; + u16 queue_id; + } xsk; }; }; @@ -1379,6 +1387,8 @@ struct net_device_ops { int (*ndo_xdp_xmit)(struct net_device *dev, int n, struct xdp_frame **xdp, u32 flags); + int (*ndo_xsk_async_xmit)(struct net_device *dev, + u32 queue_id); }; /** diff --git a/include/net/xdp.h b/include/net/xdp.h index a3b71a4dd71d..2deea7166a34 100644 --- a/include/net/xdp.h +++ b/include/net/xdp.h @@ -37,6 +37,7 @@ enum xdp_mem_type { MEM_TYPE_PAGE_SHARED = 0, /* Split-page refcnt based model */ MEM_TYPE_PAGE_ORDER0, /* Orig XDP full page model */ MEM_TYPE_PAGE_POOL, + MEM_TYPE_ZERO_COPY, MEM_TYPE_MAX, }; @@ -51,6 +52,10 @@ struct xdp_mem_info { struct page_pool; +struct zero_copy_allocator { + void (*free)(struct zero_copy_allocator *zca, unsigned long handle); +}; + struct xdp_rxq_info { struct net_device *dev; u32 queue_index; @@ -63,6 +68,7 @@ struct xdp_buff { void *data_end; void *data_meta; void *data_hard_start; + unsigned long handle; struct xdp_rxq_info *rxq; }; @@ -86,6 +92,10 @@ struct xdp_frame *convert_to_xdp_frame(struct xdp_buff *xdp) int metasize; int headroom; + /* TODO: implement clone, copy, use "native" MEM_TYPE */ + if (xdp->rxq->mem.type == MEM_TYPE_ZERO_COPY) + return NULL; + /* Assure headroom is available for storing info */ headroom = xdp->data - xdp->data_hard_start; metasize = xdp->data - xdp->data_meta; diff --git a/include/net/xdp_sock.h b/include/net/xdp_sock.h index 7a647c56ec15..9fe472f2ac95 100644 --- a/include/net/xdp_sock.h +++ b/include/net/xdp_sock.h @@ -6,12 +6,46 @@ #ifndef _LINUX_XDP_SOCK_H #define _LINUX_XDP_SOCK_H +#include <linux/workqueue.h> +#include <linux/if_xdp.h> #include <linux/mutex.h> +#include <linux/spinlock.h> +#include <linux/mm.h> #include <net/sock.h> struct net_device; struct xsk_queue; -struct xdp_umem; + +struct xdp_umem_props { + u64 chunk_mask; + u64 size; +}; + +struct xdp_umem_page { + void *addr; + dma_addr_t dma; +}; + +struct xdp_umem { + struct xsk_queue *fq; + struct xsk_queue *cq; + struct xdp_umem_page *pages; + struct xdp_umem_props props; + u32 headroom; + u32 chunk_size_nohr; + struct user_struct *user; + struct pid *pid; + unsigned long address; + refcount_t users; + struct work_struct work; + struct page **pgs; + u32 npgs; + struct net_device *dev; + u16 queue_id; + bool zc; + spinlock_t xsk_list_lock; + struct list_head xsk_list; +}; struct xdp_sock { /* struct sock must be the first member of struct xdp_sock */ @@ -22,6 +56,8 @@ struct xdp_sock { struct list_head flush_node; u16 queue_id; struct xsk_queue *tx ____cacheline_aligned_in_smp; + struct list_head list; + bool zc; /* Protects multiple processes in the control path */ struct mutex mutex; u64 rx_dropped; @@ -33,6 +69,12 @@ int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp); int xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp); void xsk_flush(struct xdp_sock *xs); bool xsk_is_setup_for_bpf_map(struct xdp_sock *xs); +/* Used from netdev driver */ +u64 *xsk_umem_peek_addr(struct xdp_umem *umem, u64 *addr); +void xsk_umem_discard_addr(struct xdp_umem *umem); +void xsk_umem_complete_tx(struct xdp_umem *umem, u32 nb_entries); +bool xsk_umem_consume_tx(struct xdp_umem *umem, dma_addr_t *dma, u32 *len); +void xsk_umem_consume_tx_done(struct xdp_umem *umem); #else static inline int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp) { diff --git a/include/uapi/linux/if_xdp.h b/include/uapi/linux/if_xdp.h index e411d6f9ac65..1fa0e977ea8d 100644 --- a/include/uapi/linux/if_xdp.h +++ b/include/uapi/linux/if_xdp.h @@ -13,7 +13,9 @@ #include <linux/types.h> /* Options for the sxdp_flags field */ -#define XDP_SHARED_UMEM 1 +#define XDP_SHARED_UMEM (1 << 0) +#define XDP_COPY (1 << 1) /* Force copy-mode */ +#define XDP_ZEROCOPY (1 << 2) /* Force zero-copy mode */ struct sockaddr_xdp { __u16 sxdp_family; diff --git a/net/core/xdp.c b/net/core/xdp.c index cb8c4e061a5a..9d1f22072d5d 100644 --- a/net/core/xdp.c +++ b/net/core/xdp.c @@ -31,6 +31,7 @@ struct xdp_mem_allocator { union { void *allocator; struct page_pool *page_pool; + struct zero_copy_allocator *zc_alloc; }; struct rhash_head node; struct rcu_head rcu; @@ -261,7 +262,7 @@ int xdp_rxq_info_reg_mem_model(struct xdp_rxq_info *xdp_rxq, xdp_rxq->mem.type = type; if (!allocator) { - if (type == MEM_TYPE_PAGE_POOL) + if (type == MEM_TYPE_PAGE_POOL || type == MEM_TYPE_ZERO_COPY) return -EINVAL; /* Setup time check page_pool req */ return 0; } @@ -314,7 +315,8 @@ EXPORT_SYMBOL_GPL(xdp_rxq_info_reg_mem_model); * is used for those calls sites. Thus, allowing for faster recycling * of xdp_frames/pages in those cases. */ -static void __xdp_return(void *data, struct xdp_mem_info *mem, bool napi_direct) +static void __xdp_return(void *data, struct xdp_mem_info *mem, bool napi_direct, + unsigned long handle) { struct xdp_mem_allocator *xa; struct page *page; @@ -338,6 +340,13 @@ static void __xdp_return(void *data, struct xdp_mem_info *mem, bool napi_direct) page = virt_to_page(data); /* Assumes order0 page*/ put_page(page); break; + case MEM_TYPE_ZERO_COPY: + /* NB! Only valid from an xdp_buff! */ + rcu_read_lock(); + /* mem->id is valid, checked in xdp_rxq_info_reg_mem_model() */ + xa = rhashtable_lookup(mem_id_ht, &mem->id, mem_id_rht_params); + xa->zc_alloc->free(xa->zc_alloc, handle); + rcu_read_unlock(); default: /* Not possible, checked in xdp_rxq_info_reg_mem_model() */ break; @@ -346,18 +355,18 @@ static void __xdp_return(void *data, struct xdp_mem_info *mem, bool napi_direct) void xdp_return_frame(struct xdp_frame *xdpf) { - __xdp_return(xdpf->data, &xdpf->mem, false); + __xdp_return(xdpf->data, &xdpf->mem, false, 0); } EXPORT_SYMBOL_GPL(xdp_return_frame); void xdp_return_frame_rx_napi(struct xdp_frame *xdpf) { - __xdp_return(xdpf->data, &xdpf->mem, true); + __xdp_return(xdpf->data, &xdpf->mem, true, 0); } EXPORT_SYMBOL_GPL(xdp_return_frame_rx_napi); void xdp_return_buff(struct xdp_buff *xdp) { - __xdp_return(xdp->data, &xdp->rxq->mem, true); + __xdp_return(xdp->data, &xdp->rxq->mem, true, xdp->handle); } EXPORT_SYMBOL_GPL(xdp_return_buff); diff --git a/net/xdp/xdp_umem.c b/net/xdp/xdp_umem.c index 9ad791ff4739..7eb4948a38d2 100644 --- a/net/xdp/xdp_umem.c +++ b/net/xdp/xdp_umem.c @@ -13,9 +13,108 @@ #include <linux/mm.h> #include "xdp_umem.h" +#include "xsk_queue.h" #define XDP_UMEM_MIN_CHUNK_SIZE 2048 +void xdp_add_sk_umem(struct xdp_umem *umem, struct xdp_sock *xs) +{ + unsigned long flags; + + spin_lock_irqsave(&umem->xsk_list_lock, flags); + list_add_rcu(&xs->list, &umem->xsk_list); + spin_unlock_irqrestore(&umem->xsk_list_lock, flags); +} + +void xdp_del_sk_umem(struct xdp_umem *umem, struct xdp_sock *xs) +{ + unsigned long flags; + + if (xs->dev) { + spin_lock_irqsave(&umem->xsk_list_lock, flags); + list_del_rcu(&xs->list); + spin_unlock_irqrestore(&umem->xsk_list_lock, flags); + + if (umem->zc) + synchronize_net(); + } +} + +int xdp_umem_assign_dev(struct xdp_umem *umem, struct net_device *dev, + u32 queue_id, u16 flags) +{ + bool force_zc, force_copy; + struct netdev_bpf bpf; + int err; + + force_zc = flags & XDP_ZEROCOPY; + force_copy = flags & XDP_COPY; + + if (force_zc && force_copy) + return -EINVAL; + + if (force_copy) + return 0; + + dev_hold(dev); + + if (dev->netdev_ops->ndo_bpf && dev->netdev_ops->ndo_xsk_async_xmit) { + bpf.command = XDP_QUERY_XSK_UMEM; + + rtnl_lock(); + err = dev->netdev_ops->ndo_bpf(dev, &bpf); + rtnl_unlock(); + + if (err) { + dev_put(dev); + return force_zc ? -ENOTSUPP : 0; + } + + bpf.command = XDP_SETUP_XSK_UMEM; + bpf.xsk.umem = umem; + bpf.xsk.queue_id = queue_id; + + rtnl_lock(); + err = dev->netdev_ops->ndo_bpf(dev, &bpf); + rtnl_unlock(); + + if (err) { + dev_put(dev); + return force_zc ? err : 0; /* fail or fallback */ + } + + umem->dev = dev; + umem->queue_id = queue_id; + umem->zc = true; + return 0; + } + + dev_put(dev); + return force_zc ? -ENOTSUPP : 0; /* fail or fallback */ +} + +static void xdp_umem_clear_dev(struct xdp_umem *umem) +{ + struct netdev_bpf bpf; + int err; + + if (umem->dev) { + bpf.command = XDP_SETUP_XSK_UMEM; + bpf.xsk.umem = NULL; + bpf.xsk.queue_id = umem->queue_id; + + rtnl_lock(); + err = umem->dev->netdev_ops->ndo_bpf(umem->dev, &bpf); + rtnl_unlock(); + + if (err) + WARN(1, "failed to disable umem!\n"); + + dev_put(umem->dev); + umem->dev = NULL; + } +} + static void xdp_umem_unpin_pages(struct xdp_umem *umem) { unsigned int i; @@ -42,6 +141,8 @@ static void xdp_umem_release(struct xdp_umem *umem) struct task_struct *task; struct mm_struct *mm; + xdp_umem_clear_dev(umem); + if (umem->fq) { xskq_destroy(umem->fq); umem->fq = NULL; @@ -64,6 +165,9 @@ static void xdp_umem_release(struct xdp_umem *umem) goto out; mmput(mm); + kfree(umem->pages); + umem->pages = NULL; + xdp_umem_unaccount_pages(umem); out: kfree(umem); @@ -154,7 +258,7 @@ static int xdp_umem_reg(struct xdp_umem *umem, struct xdp_umem_reg *mr) u32 chunk_size = mr->chunk_size, headroom = mr->headroom; unsigned int chunks, chunks_per_page; u64 addr = mr->addr, size = mr->len; - int size_chk, err; + int size_chk, err, i; if (chunk_size < XDP_UMEM_MIN_CHUNK_SIZE || chunk_size > PAGE_SIZE) { /* Strictly speaking we could support this, if: @@ -202,6 +306,8 @@ static int xdp_umem_reg(struct xdp_umem *umem, struct xdp_umem_reg *mr) umem->npgs = size / PAGE_SIZE; umem->pgs = NULL; umem->user = NULL; + INIT_LIST_HEAD(&umem->xsk_list); + spin_lock_init(&umem->xsk_list_lock); refcount_set(&umem->users, 1); @@ -212,6 +318,16 @@ static int xdp_umem_reg(struct xdp_umem *umem, struct xdp_umem_reg *mr) err = xdp_umem_pin_pages(umem); if (err) goto out_account; + + umem->pages = kcalloc(umem->npgs, sizeof(*umem->pages), GFP_KERNEL); + if (!umem->pages) { + err = -ENOMEM; + goto out_account; + } + + for (i = 0; i < umem->npgs; i++) + umem->pages[i].addr = page_address(umem->pgs[i]); + return 0; out_account: diff --git a/net/xdp/xdp_umem.h b/net/xdp/xdp_umem.h index aeadd1bcb72d..f11560334f88 100644 --- a/net/xdp/xdp_umem.h +++ b/net/xdp/xdp_umem.h @@ -6,37 +6,25 @@ #ifndef XDP_UMEM_H_ #define XDP_UMEM_H_ -#include <linux/mm.h> -#include <linux/if_xdp.h> -#include <linux/workqueue.h> - -#include "xsk_queue.h" -#include "xdp_umem_props.h" - -struct xdp_umem { - struct xsk_queue *fq; - struct xsk_queue *cq; - struct page **pgs; - struct xdp_umem_props props; - u32 headroom; - u32 chunk_size_nohr; - struct user_struct *user; - struct pid *pid; - unsigned long address; - refcount_t users; - struct work_struct work; - u32 npgs; -}; +#include <net/xdp_sock.h> static inline char *xdp_umem_get_data(struct xdp_umem *umem, u64 addr) { - return page_address(umem->pgs[addr >> PAGE_SHIFT]) + - (addr & (PAGE_SIZE - 1)); + return umem->pages[addr >> PAGE_SHIFT].addr + (addr & (PAGE_SIZE - 1)); +} + +static inline dma_addr_t xdp_umem_get_dma(struct xdp_umem *umem, u64 addr) +{ + return umem->pages[addr >> PAGE_SHIFT].dma + (addr & (PAGE_SIZE - 1)); } +int xdp_umem_assign_dev(struct xdp_umem *umem, struct net_device *dev, + u32 queue_id, u16 flags); bool xdp_umem_validate_queues(struct xdp_umem *umem); void xdp_get_umem(struct xdp_umem *umem); void xdp_put_umem(struct xdp_umem *umem); +void xdp_add_sk_umem(struct xdp_umem *umem, struct xdp_sock *xs); +void xdp_del_sk_umem(struct xdp_umem *umem, struct xdp_sock *xs); struct xdp_umem *xdp_umem_create(struct xdp_umem_reg *mr); #endif /* XDP_UMEM_H_ */ diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c index 4688c750df1d..ddca4bf1cfc8 100644 --- a/net/xdp/xsk.c +++ b/net/xdp/xsk.c @@ -21,6 +21,7 @@ #include <linux/uaccess.h> #include <linux/net.h> #include <linux/netdevice.h> +#include <linux/rculist.h> #include <net/xdp_sock.h> #include <net/xdp.h> @@ -36,19 +37,28 @@ static struct xdp_sock *xdp_sk(struct sock *sk) bool xsk_is_setup_for_bpf_map(struct xdp_sock *xs) { - return !!xs->rx; + return READ_ONCE(xs->rx) && READ_ONCE(xs->umem) && + READ_ONCE(xs->umem->fq); } -static int __xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp) +u64 *xsk_umem_peek_addr(struct xdp_umem *umem, u64 *addr) +{ + return xskq_peek_addr(umem->fq, addr); +} +EXPORT_SYMBOL(xsk_umem_peek_addr); + +void xsk_umem_discard_addr(struct xdp_umem *umem) +{ + xskq_discard_addr(umem->fq); +} +EXPORT_SYMBOL(xsk_umem_discard_addr); + +static int __xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp, u32 len) { - u32 len = xdp->data_end - xdp->data; void *buffer; u64 addr; int err; - if (xs->dev != xdp->rxq->dev || xs->queue_id != xdp->rxq->queue_index) - return -EINVAL; - if (!xskq_peek_addr(xs->umem->fq, &addr) || len > xs->umem->chunk_size_nohr) { xs->rx_dropped++; @@ -60,25 +70,41 @@ static int __xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp) buffer = xdp_umem_get_data(xs->umem, addr); memcpy(buffer, xdp->data, len); err = xskq_produce_batch_desc(xs->rx, addr, len); - if (!err) + if (!err) { xskq_discard_addr(xs->umem->fq); - else - xs->rx_dropped++; + xdp_return_buff(xdp); + return 0; + } + xs->rx_dropped++; return err; } -int xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp) +static int __xsk_rcv_zc(struct xdp_sock *xs, struct xdp_buff *xdp, u32 len) { - int err; + int err = xskq_produce_batch_desc(xs->rx, (u64)xdp->handle, len); - err = __xsk_rcv(xs, xdp); - if (likely(!err)) + if (err) { xdp_return_buff(xdp); + xs->rx_dropped++; + } return err; } +int xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp) +{ + u32 len; + + if (xs->dev != xdp->rxq->dev || xs->queue_id != xdp->rxq->queue_index) + return -EINVAL; + + len = xdp->data_end - xdp->data; + + return (xdp->rxq->mem.type == MEM_TYPE_ZERO_COPY) ? + __xsk_rcv_zc(xs, xdp, len) : __xsk_rcv(xs, xdp, len); +} + void xsk_flush(struct xdp_sock *xs) { xskq_produce_flush_desc(xs->rx); @@ -87,15 +113,85 @@ void xsk_flush(struct xdp_sock *xs) int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp) { + u32 len = xdp->data_end - xdp->data; + void *buffer; + u64 addr; int err; - err = __xsk_rcv(xs, xdp); - if (!err) + if (!xskq_peek_addr(xs->umem->fq, &addr) || + len > xs->umem->chunk_size_nohr) { + xs->rx_dropped++; + return -ENOSPC; + } + + addr += xs->umem->headroom; + + buffer = xdp_umem_get_data(xs->umem, addr); + memcpy(buffer, xdp->data, len); + err = xskq_produce_batch_desc(xs->rx, addr, len); + if (!err) { + xskq_discard_addr(xs->umem->fq); xsk_flush(xs); + return 0; + } + xs->rx_dropped++; return err; } +void xsk_umem_complete_tx(struct xdp_umem *umem, u32 nb_entries) +{ + xskq_produce_flush_addr_n(umem->cq, nb_entries); +} +EXPORT_SYMBOL(xsk_umem_complete_tx); + +void xsk_umem_consume_tx_done(struct xdp_umem *umem) +{ + struct xdp_sock *xs; + + rcu_read_lock(); + list_for_each_entry_rcu(xs, &umem->xsk_list, list) { + xs->sk.sk_write_space(&xs->sk); + } + rcu_read_unlock(); +} +EXPORT_SYMBOL(xsk_umem_consume_tx_done); + +bool xsk_umem_consume_tx(struct xdp_umem *umem, dma_addr_t *dma, u32 *len) +{ + struct xdp_desc desc; + struct xdp_sock *xs; + + rcu_read_lock(); + list_for_each_entry_rcu(xs, &umem->xsk_list, list) { + if (!xskq_peek_desc(xs->tx, &desc)) + continue; + + if (xskq_produce_addr_lazy(umem->cq, desc.addr)) + goto out; + + *dma = xdp_umem_get_dma(umem, desc.addr); + *len = desc.len; + + xskq_discard_desc(xs->tx); + rcu_read_unlock(); + return true; + } + +out: + rcu_read_unlock(); + return false; +} +EXPORT_SYMBOL(xsk_umem_consume_tx); + +static int xsk_zc_xmit(struct sock *sk) +{ + struct xdp_sock *xs = xdp_sk(sk); + struct net_device *dev = xs->dev; + + return dev->netdev_ops->ndo_xsk_async_xmit(dev, xs->queue_id); +} + static void xsk_destruct_skb(struct sk_buff *skb) { u64 addr = (u64)(long)skb_shinfo(skb)->destructor_arg; @@ -109,7 +205,6 @@ static void xsk_destruct_skb(struct sk_buff *skb) static int xsk_generic_xmit(struct sock *sk, struct msghdr *m, size_t total_len) { - bool need_wait = !(m->msg_flags & MSG_DONTWAIT); u32 max_batch = TX_BATCH_SIZE; struct xdp_sock *xs = xdp_sk(sk); bool sent_frame = false; @@ -119,8 +214,6 @@ static int xsk_generic_xmit(struct sock *sk, struct msghdr *m, if (unlikely(!xs->tx)) return -ENOBUFS; - if (need_wait) - return -EOPNOTSUPP; mutex_lock(&xs->mutex); @@ -150,7 +243,7 @@ static int xsk_generic_xmit(struct sock *sk, struct msghdr *m, goto out; } - skb = sock_alloc_send_skb(sk, len, !need_wait, &err); + skb = sock_alloc_send_skb(sk, len, 1, &err); if (unlikely(!skb)) { err = -EAGAIN; goto out; @@ -193,6 +286,7 @@ out: static int xsk_sendmsg(struct socket *sock, struct msghdr *m, size_t total_len) { + bool need_wait = !(m->msg_flags & MSG_DONTWAIT); struct sock *sk = sock->sk; struct xdp_sock *xs = xdp_sk(sk); @@ -200,8 +294,10 @@ static int xsk_sendmsg(struct socket *sock, struct msghdr *m, size_t total_len) return -ENXIO; if (unlikely(!(xs->dev->flags & IFF_UP))) return -ENETDOWN; + if (need_wait) + return -EOPNOTSUPP; - return xsk_generic_xmit(sk, m, total_len); + return (xs->zc) ? xsk_zc_xmit(sk) : xsk_generic_xmit(sk, m, total_len); } static unsigned int xsk_poll(struct file *file, struct socket *sock, @@ -291,6 +387,7 @@ static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len) struct sock *sk = sock->sk; struct xdp_sock *xs = xdp_sk(sk); struct net_device *dev; + u32 flags, qid; int err = 0; if (addr_len < sizeof(struct sockaddr_xdp)) @@ -315,16 +412,26 @@ static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len) goto out_unlock; } - if ((xs->rx && sxdp->sxdp_queue_id >= dev->real_num_rx_queues) || - (xs->tx && sxdp->sxdp_queue_id >= dev->real_num_tx_queues)) { + qid = sxdp->sxdp_queue_id; + + if ((xs->rx && qid >= dev->real_num_rx_queues) || + (xs->tx && qid >= dev->real_num_tx_queues)) { err = -EINVAL; goto out_unlock; } - if (sxdp->sxdp_flags & XDP_SHARED_UMEM) { + flags = sxdp->sxdp_flags; + + if (flags & XDP_SHARED_UMEM) { struct xdp_sock *umem_xs; struct socket *sock; + if ((flags & XDP_COPY) || (flags & XDP_ZEROCOPY)) { + /* Cannot specify flags for shared sockets. */ + err = -EINVAL; + goto out_unlock; + } + if (xs->umem) { /* We have already our own. */ err = -EINVAL; @@ -343,8 +450,7 @@ static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len) err = -EBADF; sockfd_put(sock); goto out_unlock; - } else if (umem_xs->dev != dev || - umem_xs->queue_id != sxdp->sxdp_queue_id) { + } else if (umem_xs->dev != dev || umem_xs->queue_id != qid) { err = -EINVAL; sockfd_put(sock); goto out_unlock; @@ -360,13 +466,18 @@ static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len) /* This xsk has its own umem. */ xskq_set_umem(xs->umem->fq, &xs->umem->props); xskq_set_umem(xs->umem->cq, &xs->umem->props); + + err = xdp_umem_assign_dev(xs->umem, dev, qid, flags); + if (err) + goto out_unlock; } xs->dev = dev; - xs->queue_id = sxdp->sxdp_queue_id; - + xs->zc = xs->umem->zc; + xs->queue_id = qid; xskq_set_umem(xs->rx, &xs->umem->props); xskq_set_umem(xs->tx, &xs->umem->props); + xdp_add_sk_umem(xs->umem, xs); out_unlock: if (err) @@ -604,6 +715,7 @@ static void xsk_destruct(struct sock *sk) xskq_destroy(xs->rx); xskq_destroy(xs->tx); + xdp_del_sk_umem(xs->umem, xs); xdp_put_umem(xs->umem); sk_refcnt_debug_dec(sk); diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h index 337e5ad3b10e..ef6a6f0ec949 100644 --- a/net/xdp/xsk_queue.h +++ b/net/xdp/xsk_queue.h @@ -8,10 +8,10 @@ #include <linux/types.h> #include <linux/if_xdp.h> - -#include "xdp_umem_props.h" +#include <net/xdp_sock.h> #define RX_BATCH_SIZE 16 +#define LAZY_UPDATE_THRESHOLD 128 struct xdp_ring { u32 producer ____cacheline_aligned_in_smp; @@ -62,9 +62,14 @@ static inline u32 xskq_nb_avail(struct xsk_queue *q, u32 dcnt) return (entries > dcnt) ? dcnt : entries; } +static inline u32 xskq_nb_free_lazy(struct xsk_queue *q, u32 producer) +{ + return q->nentries - (producer - q->cons_tail); +} + static inline u32 xskq_nb_free(struct xsk_queue *q, u32 producer, u32 dcnt) { - u32 free_entries = q->nentries - (producer - q->cons_tail); + u32 free_entries = xskq_nb_free_lazy(q, producer); if (free_entries >= dcnt) return free_entries; @@ -124,6 +129,9 @@ static inline int xskq_produce_addr(struct xsk_queue *q, u64 addr) { struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; + if (xskq_nb_free(q, q->prod_tail, LAZY_UPDATE_THRESHOLD) == 0) + return -ENOSPC; + ring->desc[q->prod_tail++ & q->ring_mask] = addr; /* Order producer and data */ @@ -133,6 +141,27 @@ static inline int xskq_produce_addr(struct xsk_queue *q, u64 addr) return 0; } +static inline int xskq_produce_addr_lazy(struct xsk_queue *q, u64 addr) +{ + struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; + + if (xskq_nb_free(q, q->prod_head, LAZY_UPDATE_THRESHOLD) == 0) + return -ENOSPC; + + ring->desc[q->prod_head++ & q->ring_mask] = addr; + return 0; +} + +static inline void xskq_produce_flush_addr_n(struct xsk_queue *q, + u32 nb_entries) +{ + /* Order producer and data */ + smp_wmb(); + + q->prod_tail += nb_entries; + WRITE_ONCE(q->ring->producer, q->prod_tail); +} + static inline int xskq_reserve_addr(struct xsk_queue *q) { if (xskq_nb_free(q, q->prod_head, 1) == 0) diff --git a/samples/bpf/xdpsock_user.c b/samples/bpf/xdpsock_user.c index 7494f60fbff8..d69c8d78d3fd 100644 --- a/samples/bpf/xdpsock_user.c +++ b/samples/bpf/xdpsock_user.c @@ -75,6 +75,7 @@ static int opt_queue; static int opt_poll; static int opt_shared_packet_buffer; static int opt_interval = 1; +static u32 opt_xdp_bind_flags; struct xdp_umem_uqueue { u32 cached_prod; @@ -541,9 +542,12 @@ static struct xdpsock *xsk_configure(struct xdp_umem *umem) sxdp.sxdp_family = PF_XDP; sxdp.sxdp_ifindex = opt_ifindex; sxdp.sxdp_queue_id = opt_queue; + if (shared) { sxdp.sxdp_flags = XDP_SHARED_UMEM; sxdp.sxdp_shared_umem_fd = umem->fd; + } else { + sxdp.sxdp_flags = opt_xdp_bind_flags; } lassert(bind(sfd, (struct sockaddr *)&sxdp, sizeof(sxdp)) == 0); @@ -699,6 +703,7 @@ static void parse_command_line(int argc, char **argv) break; case 'S': opt_xdp_flags |= XDP_FLAGS_SKB_MODE; + opt_xdp_bind_flags |= XDP_COPY; break; case 'N': opt_xdp_flags |= XDP_FLAGS_DRV_MODE; |