diff options
author | Tom Herbert <tom@quantonium.net> | 2017-07-29 01:22:43 +0200 |
---|---|---|
committer | David S. Miller <davem@davemloft.net> | 2017-08-02 00:26:19 +0200 |
commit | bbb03029a899679d73e62d7e6ae80348cc5d0054 (patch) | |
tree | 33ac457d4f2fe6464309310015674afb6d553724 /net/strparser | |
parent | skbuff: Function to send an skbuf on a socket (diff) | |
download | linux-bbb03029a899679d73e62d7e6ae80348cc5d0054.tar.xz linux-bbb03029a899679d73e62d7e6ae80348cc5d0054.zip |
strparser: Generalize strparser
Generalize strparser from more than just being used in conjunction
with read_sock. strparser will also be used in the send path with
zero proxy. The primary change is to create strp_process function
that performs the critical processing on skbs. The documentation
is also updated to reflect the new uses.
Signed-off-by: Tom Herbert <tom@quantonium.net>
Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/strparser')
-rw-r--r-- | net/strparser/strparser.c | 313 |
1 files changed, 187 insertions, 126 deletions
diff --git a/net/strparser/strparser.c b/net/strparser/strparser.c index b5c279b22680..0d18fbc6f870 100644 --- a/net/strparser/strparser.c +++ b/net/strparser/strparser.c @@ -29,44 +29,46 @@ static struct workqueue_struct *strp_wq; -struct _strp_rx_msg { - /* Internal cb structure. struct strp_rx_msg must be first for passing +struct _strp_msg { + /* Internal cb structure. struct strp_msg must be first for passing * to upper layer. */ - struct strp_rx_msg strp; + struct strp_msg strp; int accum_len; int early_eaten; }; -static inline struct _strp_rx_msg *_strp_rx_msg(struct sk_buff *skb) +static inline struct _strp_msg *_strp_msg(struct sk_buff *skb) { - return (struct _strp_rx_msg *)((void *)skb->cb + + return (struct _strp_msg *)((void *)skb->cb + offsetof(struct qdisc_skb_cb, data)); } /* Lower lock held */ -static void strp_abort_rx_strp(struct strparser *strp, int err) +static void strp_abort_strp(struct strparser *strp, int err) { - struct sock *csk = strp->sk; - /* Unrecoverable error in receive */ - del_timer(&strp->rx_msg_timer); + del_timer(&strp->msg_timer); - if (strp->rx_stopped) + if (strp->stopped) return; - strp->rx_stopped = 1; + strp->stopped = 1; + + if (strp->sk) { + struct sock *sk = strp->sk; - /* Report an error on the lower socket */ - csk->sk_err = err; - csk->sk_error_report(csk); + /* Report an error on the lower socket */ + sk->sk_err = err; + sk->sk_error_report(sk); + } } -static void strp_start_rx_timer(struct strparser *strp) +static void strp_start_timer(struct strparser *strp, long timeo) { - if (strp->sk->sk_rcvtimeo) - mod_timer(&strp->rx_msg_timer, strp->sk->sk_rcvtimeo); + if (timeo) + mod_timer(&strp->msg_timer, timeo); } /* Lower lock held */ @@ -74,46 +76,55 @@ static void strp_parser_err(struct strparser *strp, int err, read_descriptor_t *desc) { desc->error = err; - kfree_skb(strp->rx_skb_head); - strp->rx_skb_head = NULL; + kfree_skb(strp->skb_head); + strp->skb_head = NULL; strp->cb.abort_parser(strp, err); } static inline int strp_peek_len(struct strparser *strp) { - struct socket *sock = strp->sk->sk_socket; + if (strp->sk) { + struct socket *sock = strp->sk->sk_socket; + + return sock->ops->peek_len(sock); + } + + /* If we don't have an associated socket there's nothing to peek. + * Return int max to avoid stopping the strparser. + */ - return sock->ops->peek_len(sock); + return INT_MAX; } /* Lower socket lock held */ -static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb, - unsigned int orig_offset, size_t orig_len) +static int __strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb, + unsigned int orig_offset, size_t orig_len, + size_t max_msg_size, long timeo) { struct strparser *strp = (struct strparser *)desc->arg.data; - struct _strp_rx_msg *rxm; + struct _strp_msg *stm; struct sk_buff *head, *skb; size_t eaten = 0, cand_len; ssize_t extra; int err; bool cloned_orig = false; - if (strp->rx_paused) + if (strp->paused) return 0; - head = strp->rx_skb_head; + head = strp->skb_head; if (head) { /* Message already in progress */ - rxm = _strp_rx_msg(head); - if (unlikely(rxm->early_eaten)) { + stm = _strp_msg(head); + if (unlikely(stm->early_eaten)) { /* Already some number of bytes on the receive sock - * data saved in rx_skb_head, just indicate they + * data saved in skb_head, just indicate they * are consumed. */ - eaten = orig_len <= rxm->early_eaten ? - orig_len : rxm->early_eaten; - rxm->early_eaten -= eaten; + eaten = orig_len <= stm->early_eaten ? + orig_len : stm->early_eaten; + stm->early_eaten -= eaten; return eaten; } @@ -126,12 +137,12 @@ static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb, */ orig_skb = skb_clone(orig_skb, GFP_ATOMIC); if (!orig_skb) { - STRP_STATS_INCR(strp->stats.rx_mem_fail); + STRP_STATS_INCR(strp->stats.mem_fail); desc->error = -ENOMEM; return 0; } if (!pskb_pull(orig_skb, orig_offset)) { - STRP_STATS_INCR(strp->stats.rx_mem_fail); + STRP_STATS_INCR(strp->stats.mem_fail); kfree_skb(orig_skb); desc->error = -ENOMEM; return 0; @@ -140,13 +151,13 @@ static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb, orig_offset = 0; } - if (!strp->rx_skb_nextp) { + if (!strp->skb_nextp) { /* We are going to append to the frags_list of head. * Need to unshare the frag_list. */ err = skb_unclone(head, GFP_ATOMIC); if (err) { - STRP_STATS_INCR(strp->stats.rx_mem_fail); + STRP_STATS_INCR(strp->stats.mem_fail); desc->error = err; return 0; } @@ -165,20 +176,20 @@ static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb, skb = alloc_skb(0, GFP_ATOMIC); if (!skb) { - STRP_STATS_INCR(strp->stats.rx_mem_fail); + STRP_STATS_INCR(strp->stats.mem_fail); desc->error = -ENOMEM; return 0; } skb->len = head->len; skb->data_len = head->len; skb->truesize = head->truesize; - *_strp_rx_msg(skb) = *_strp_rx_msg(head); - strp->rx_skb_nextp = &head->next; + *_strp_msg(skb) = *_strp_msg(head); + strp->skb_nextp = &head->next; skb_shinfo(skb)->frag_list = head; - strp->rx_skb_head = skb; + strp->skb_head = skb; head = skb; } else { - strp->rx_skb_nextp = + strp->skb_nextp = &skb_shinfo(head)->frag_list; } } @@ -188,112 +199,112 @@ static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb, /* Always clone since we will consume something */ skb = skb_clone(orig_skb, GFP_ATOMIC); if (!skb) { - STRP_STATS_INCR(strp->stats.rx_mem_fail); + STRP_STATS_INCR(strp->stats.mem_fail); desc->error = -ENOMEM; break; } cand_len = orig_len - eaten; - head = strp->rx_skb_head; + head = strp->skb_head; if (!head) { head = skb; - strp->rx_skb_head = head; - /* Will set rx_skb_nextp on next packet if needed */ - strp->rx_skb_nextp = NULL; - rxm = _strp_rx_msg(head); - memset(rxm, 0, sizeof(*rxm)); - rxm->strp.offset = orig_offset + eaten; + strp->skb_head = head; + /* Will set skb_nextp on next packet if needed */ + strp->skb_nextp = NULL; + stm = _strp_msg(head); + memset(stm, 0, sizeof(*stm)); + stm->strp.offset = orig_offset + eaten; } else { /* Unclone since we may be appending to an skb that we * already share a frag_list with. */ err = skb_unclone(skb, GFP_ATOMIC); if (err) { - STRP_STATS_INCR(strp->stats.rx_mem_fail); + STRP_STATS_INCR(strp->stats.mem_fail); desc->error = err; break; } - rxm = _strp_rx_msg(head); - *strp->rx_skb_nextp = skb; - strp->rx_skb_nextp = &skb->next; + stm = _strp_msg(head); + *strp->skb_nextp = skb; + strp->skb_nextp = &skb->next; head->data_len += skb->len; head->len += skb->len; head->truesize += skb->truesize; } - if (!rxm->strp.full_len) { + if (!stm->strp.full_len) { ssize_t len; len = (*strp->cb.parse_msg)(strp, head); if (!len) { /* Need more header to determine length */ - if (!rxm->accum_len) { + if (!stm->accum_len) { /* Start RX timer for new message */ - strp_start_rx_timer(strp); + strp_start_timer(strp, timeo); } - rxm->accum_len += cand_len; + stm->accum_len += cand_len; eaten += cand_len; - STRP_STATS_INCR(strp->stats.rx_need_more_hdr); + STRP_STATS_INCR(strp->stats.need_more_hdr); WARN_ON(eaten != orig_len); break; } else if (len < 0) { - if (len == -ESTRPIPE && rxm->accum_len) { + if (len == -ESTRPIPE && stm->accum_len) { len = -ENODATA; - strp->rx_unrecov_intr = 1; + strp->unrecov_intr = 1; } else { - strp->rx_interrupted = 1; + strp->interrupted = 1; } strp_parser_err(strp, len, desc); break; - } else if (len > strp->sk->sk_rcvbuf) { + } else if (len > max_msg_size) { /* Message length exceeds maximum allowed */ - STRP_STATS_INCR(strp->stats.rx_msg_too_big); + STRP_STATS_INCR(strp->stats.msg_too_big); strp_parser_err(strp, -EMSGSIZE, desc); break; } else if (len <= (ssize_t)head->len - - skb->len - rxm->strp.offset) { + skb->len - stm->strp.offset) { /* Length must be into new skb (and also * greater than zero) */ - STRP_STATS_INCR(strp->stats.rx_bad_hdr_len); + STRP_STATS_INCR(strp->stats.bad_hdr_len); strp_parser_err(strp, -EPROTO, desc); break; } - rxm->strp.full_len = len; + stm->strp.full_len = len; } - extra = (ssize_t)(rxm->accum_len + cand_len) - - rxm->strp.full_len; + extra = (ssize_t)(stm->accum_len + cand_len) - + stm->strp.full_len; if (extra < 0) { /* Message not complete yet. */ - if (rxm->strp.full_len - rxm->accum_len > + if (stm->strp.full_len - stm->accum_len > strp_peek_len(strp)) { - /* Don't have the whole messages in the socket - * buffer. Set strp->rx_need_bytes to wait for + /* Don't have the whole message in the socket + * buffer. Set strp->need_bytes to wait for * the rest of the message. Also, set "early * eaten" since we've already buffered the skb * but don't consume yet per strp_read_sock. */ - if (!rxm->accum_len) { + if (!stm->accum_len) { /* Start RX timer for new message */ - strp_start_rx_timer(strp); + strp_start_timer(strp, timeo); } - strp->rx_need_bytes = rxm->strp.full_len - - rxm->accum_len; - rxm->accum_len += cand_len; - rxm->early_eaten = cand_len; - STRP_STATS_ADD(strp->stats.rx_bytes, cand_len); + strp->need_bytes = stm->strp.full_len - + stm->accum_len; + stm->accum_len += cand_len; + stm->early_eaten = cand_len; + STRP_STATS_ADD(strp->stats.bytes, cand_len); desc->count = 0; /* Stop reading socket */ break; } - rxm->accum_len += cand_len; + stm->accum_len += cand_len; eaten += cand_len; WARN_ON(eaten != orig_len); break; @@ -308,14 +319,14 @@ static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb, eaten += (cand_len - extra); /* Hurray, we have a new message! */ - del_timer(&strp->rx_msg_timer); - strp->rx_skb_head = NULL; - STRP_STATS_INCR(strp->stats.rx_msgs); + del_timer(&strp->msg_timer); + strp->skb_head = NULL; + STRP_STATS_INCR(strp->stats.msgs); /* Give skb to upper layer */ strp->cb.rcv_msg(strp, head); - if (unlikely(strp->rx_paused)) { + if (unlikely(strp->paused)) { /* Upper layer paused strp */ break; } @@ -324,11 +335,33 @@ static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb, if (cloned_orig) kfree_skb(orig_skb); - STRP_STATS_ADD(strp->stats.rx_bytes, eaten); + STRP_STATS_ADD(strp->stats.bytes, eaten); return eaten; } +int strp_process(struct strparser *strp, struct sk_buff *orig_skb, + unsigned int orig_offset, size_t orig_len, + size_t max_msg_size, long timeo) +{ + read_descriptor_t desc; /* Dummy arg to strp_recv */ + + desc.arg.data = strp; + + return __strp_recv(&desc, orig_skb, orig_offset, orig_len, + max_msg_size, timeo); +} +EXPORT_SYMBOL_GPL(strp_process); + +static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb, + unsigned int orig_offset, size_t orig_len) +{ + struct strparser *strp = (struct strparser *)desc->arg.data; + + return __strp_recv(desc, orig_skb, orig_offset, orig_len, + strp->sk->sk_rcvbuf, strp->sk->sk_rcvtimeo); +} + static int default_read_sock_done(struct strparser *strp, int err) { return err; @@ -355,101 +388,129 @@ static int strp_read_sock(struct strparser *strp) /* Lower sock lock held */ void strp_data_ready(struct strparser *strp) { - if (unlikely(strp->rx_stopped)) + if (unlikely(strp->stopped)) return; - /* This check is needed to synchronize with do_strp_rx_work. - * do_strp_rx_work acquires a process lock (lock_sock) whereas + /* This check is needed to synchronize with do_strp_work. + * do_strp_work acquires a process lock (lock_sock) whereas * the lock held here is bh_lock_sock. The two locks can be * held by different threads at the same time, but bh_lock_sock * allows a thread in BH context to safely check if the process * lock is held. In this case, if the lock is held, queue work. */ if (sock_owned_by_user(strp->sk)) { - queue_work(strp_wq, &strp->rx_work); + queue_work(strp_wq, &strp->work); return; } - if (strp->rx_paused) + if (strp->paused) return; - if (strp->rx_need_bytes) { - if (strp_peek_len(strp) >= strp->rx_need_bytes) - strp->rx_need_bytes = 0; + if (strp->need_bytes) { + if (strp_peek_len(strp) >= strp->need_bytes) + strp->need_bytes = 0; else return; } if (strp_read_sock(strp) == -ENOMEM) - queue_work(strp_wq, &strp->rx_work); + queue_work(strp_wq, &strp->work); } EXPORT_SYMBOL_GPL(strp_data_ready); -static void do_strp_rx_work(struct strparser *strp) +static void do_strp_work(struct strparser *strp) { read_descriptor_t rd_desc; - struct sock *csk = strp->sk; /* We need the read lock to synchronize with strp_data_ready. We * need the socket lock for calling strp_read_sock. */ - lock_sock(csk); + strp->cb.lock(strp); - if (unlikely(strp->rx_stopped)) + if (unlikely(strp->stopped)) goto out; - if (strp->rx_paused) + if (strp->paused) goto out; rd_desc.arg.data = strp; if (strp_read_sock(strp) == -ENOMEM) - queue_work(strp_wq, &strp->rx_work); + queue_work(strp_wq, &strp->work); out: - release_sock(csk); + strp->cb.unlock(strp); } -static void strp_rx_work(struct work_struct *w) +static void strp_work(struct work_struct *w) { - do_strp_rx_work(container_of(w, struct strparser, rx_work)); + do_strp_work(container_of(w, struct strparser, work)); } -static void strp_rx_msg_timeout(unsigned long arg) +static void strp_msg_timeout(unsigned long arg) { struct strparser *strp = (struct strparser *)arg; /* Message assembly timed out */ - STRP_STATS_INCR(strp->stats.rx_msg_timeouts); - lock_sock(strp->sk); + STRP_STATS_INCR(strp->stats.msg_timeouts); + strp->cb.lock(strp); strp->cb.abort_parser(strp, ETIMEDOUT); + strp->cb.unlock(strp); +} + +static void strp_sock_lock(struct strparser *strp) +{ + lock_sock(strp->sk); +} + +static void strp_sock_unlock(struct strparser *strp) +{ release_sock(strp->sk); } -int strp_init(struct strparser *strp, struct sock *csk, +int strp_init(struct strparser *strp, struct sock *sk, struct strp_callbacks *cb) { - struct socket *sock = csk->sk_socket; if (!cb || !cb->rcv_msg || !cb->parse_msg) return -EINVAL; - if (!sock->ops->read_sock || !sock->ops->peek_len) - return -EAFNOSUPPORT; + /* The sk (sock) arg determines the mode of the stream parser. + * + * If the sock is set then the strparser is in receive callback mode. + * The upper layer calls strp_data_ready to kick receive processing + * and strparser calls the read_sock function on the socket to + * get packets. + * + * If the sock is not set then the strparser is in general mode. + * The upper layer calls strp_process for each skb to be parsed. + */ - memset(strp, 0, sizeof(*strp)); + if (sk) { + struct socket *sock = sk->sk_socket; - strp->sk = csk; + if (!sock->ops->read_sock || !sock->ops->peek_len) + return -EAFNOSUPPORT; + } else { + if (!cb->lock || !cb->unlock) + return -EINVAL; + } - setup_timer(&strp->rx_msg_timer, strp_rx_msg_timeout, - (unsigned long)strp); + memset(strp, 0, sizeof(*strp)); - INIT_WORK(&strp->rx_work, strp_rx_work); + strp->sk = sk; + strp->cb.lock = cb->lock ? : strp_sock_lock; + strp->cb.unlock = cb->unlock ? : strp_sock_unlock; strp->cb.rcv_msg = cb->rcv_msg; strp->cb.parse_msg = cb->parse_msg; strp->cb.read_sock_done = cb->read_sock_done ? : default_read_sock_done; - strp->cb.abort_parser = cb->abort_parser ? : strp_abort_rx_strp; + strp->cb.abort_parser = cb->abort_parser ? : strp_abort_strp; + + setup_timer(&strp->msg_timer, strp_msg_timeout, + (unsigned long)strp); + + INIT_WORK(&strp->work, strp_work); return 0; } @@ -457,12 +518,12 @@ EXPORT_SYMBOL_GPL(strp_init); void strp_unpause(struct strparser *strp) { - strp->rx_paused = 0; + strp->paused = 0; - /* Sync setting rx_paused with RX work */ + /* Sync setting paused with RX work */ smp_mb(); - queue_work(strp_wq, &strp->rx_work); + queue_work(strp_wq, &strp->work); } EXPORT_SYMBOL_GPL(strp_unpause); @@ -471,27 +532,27 @@ EXPORT_SYMBOL_GPL(strp_unpause); */ void strp_done(struct strparser *strp) { - WARN_ON(!strp->rx_stopped); + WARN_ON(!strp->stopped); - del_timer_sync(&strp->rx_msg_timer); - cancel_work_sync(&strp->rx_work); + del_timer_sync(&strp->msg_timer); + cancel_work_sync(&strp->work); - if (strp->rx_skb_head) { - kfree_skb(strp->rx_skb_head); - strp->rx_skb_head = NULL; + if (strp->skb_head) { + kfree_skb(strp->skb_head); + strp->skb_head = NULL; } } EXPORT_SYMBOL_GPL(strp_done); void strp_stop(struct strparser *strp) { - strp->rx_stopped = 1; + strp->stopped = 1; } EXPORT_SYMBOL_GPL(strp_stop); void strp_check_rcv(struct strparser *strp) { - queue_work(strp_wq, &strp->rx_work); + queue_work(strp_wq, &strp->work); } EXPORT_SYMBOL_GPL(strp_check_rcv); |