diff options
author | Stefan Eissing <icing@apache.org> | 2019-05-28 14:42:17 +0200 |
---|---|---|
committer | Stefan Eissing <icing@apache.org> | 2019-05-28 14:42:17 +0200 |
commit | 5b32f27b77efc4688a3c81f26abb1246c2e97867 (patch) | |
tree | c21ed4c157ab0395b3e65264917bf0bb5a3e6d45 | |
parent | Addendum to r1856493: check NULLness of new arg parameter. (diff) | |
download | apache2-5b32f27b77efc4688a3c81f26abb1246c2e97867.tar.xz apache2-5b32f27b77efc4688a3c81f26abb1246c2e97867.zip |
* modules/http2: reverting r1859724, as no good.
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1860257 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to '')
-rw-r--r-- | CHANGES | 3 | ||||
-rw-r--r-- | modules/http2/h2.h | 12 | ||||
-rw-r--r-- | modules/http2/h2_bucket_beam.c | 223 | ||||
-rw-r--r-- | modules/http2/h2_bucket_beam.h | 17 | ||||
-rw-r--r-- | modules/http2/h2_bucket_eos.c | 43 | ||||
-rw-r--r-- | modules/http2/h2_bucket_eos.h | 7 | ||||
-rw-r--r-- | modules/http2/h2_conn.c | 4 | ||||
-rw-r--r-- | modules/http2/h2_conn_io.c | 76 | ||||
-rw-r--r-- | modules/http2/h2_conn_io.h | 1 | ||||
-rw-r--r-- | modules/http2/h2_filter.h | 1 | ||||
-rw-r--r-- | modules/http2/h2_h2.c | 5 | ||||
-rw-r--r-- | modules/http2/h2_mplx.c | 99 | ||||
-rw-r--r-- | modules/http2/h2_mplx.h | 24 | ||||
-rw-r--r-- | modules/http2/h2_push.c | 20 | ||||
-rw-r--r-- | modules/http2/h2_session.c | 259 | ||||
-rw-r--r-- | modules/http2/h2_session.h | 16 | ||||
-rw-r--r-- | modules/http2/h2_stream.c | 73 | ||||
-rw-r--r-- | modules/http2/h2_stream.h | 5 | ||||
-rw-r--r-- | modules/http2/h2_task.c | 17 | ||||
-rw-r--r-- | modules/http2/h2_task.h | 3 | ||||
-rw-r--r-- | modules/http2/h2_version.h | 4 | ||||
-rw-r--r-- | modules/http2/h2_workers.c | 4 | ||||
-rw-r--r-- | modules/http2/mod_proxy_http2.c | 8 |
23 files changed, 498 insertions, 426 deletions
@@ -1,9 +1,6 @@ -*- coding: utf-8 -*- Changes with Apache 2.5.1 - *) mod_http2: internal code cleanups and simplifications. Common output code for - h2 and h2c protocols, using nested mutex locks for simplified calls. [Stefan Eissing] - *) mod_proxy/ssl: Proxy SSL client certificate configuration and other proxy SSL configurations broken inside <Proxy> context. PR 63430. [Ruediger Pluem, Yann Ylavic] diff --git a/modules/http2/h2.h b/modules/http2/h2.h index 798f4b5b7f..e057d66e0c 100644 --- a/modules/http2/h2.h +++ b/modules/http2/h2.h @@ -112,7 +112,6 @@ typedef enum h2_stream_state_t { H2_SS_CLOSED_L, H2_SS_CLOSED, H2_SS_CLEANUP, - H2_SS_DESTROYED, H2_SS_MAX } h2_stream_state_t; @@ -124,17 +123,6 @@ typedef enum { H2_SEV_IN_DATA_PENDING, } h2_stream_event_t; -typedef enum { - H2_PS_NONE, - H2_PS_QUEUED, - H2_PS_RUNNING, - H2_PS_FINISHED, -} h2_processing_state_t; - -#define H2_PS_IS_RUNNING(s) ((s) == H2_PS_RUNNING) -#define H2_PS_IS_NOT_RUNNING(s) ((s) != H2_PS_RUNNING) -#define H2_PS_IS_WAS_STARTED(s) ((s) >= H2_PS_RUNNING) -#define H2_PS_IS_HAS_FINISHED(s) ((s) == H2_PS_FINISHED) /* h2_request is the transformer of HTTP2 streams into HTTP/1.1 internal * format that will be fed to various httpd input filters to finally diff --git a/modules/http2/h2_bucket_beam.c b/modules/http2/h2_bucket_beam.c index cf97797b9c..a7f5edf5cc 100644 --- a/modules/http2/h2_bucket_beam.c +++ b/modules/http2/h2_bucket_beam.c @@ -24,7 +24,6 @@ #include <httpd.h> #include <http_protocol.h> -#include <http_request.h> #include <http_log.h> #include "h2_private.h" @@ -155,30 +154,6 @@ const apr_bucket_type_t h2_bucket_type_beam = { * h2_blist, a brigade without allocations ******************************************************************************/ -static void h2_blist_cleanup(h2_blist *bl) -{ - apr_bucket *e; - - while (!H2_BLIST_EMPTY(bl)) { - e = H2_BLIST_FIRST(bl); - apr_bucket_delete(e); - } -} - -static void brigade_move_to_blist(apr_bucket_brigade *bb, h2_blist *list) -{ - apr_bucket *b; - while (bb && !APR_BRIGADE_EMPTY(bb)) { - b = APR_BRIGADE_FIRST(bb); - APR_BUCKET_REMOVE(b); - H2_BLIST_INSERT_TAIL(list, b); - } -} - -/******************************************************************************* - * bucket beamer registration - ******************************************************************************/ - static apr_array_header_t *beamers; static apr_status_t cleanup_beamers(void *dummy) @@ -315,6 +290,17 @@ static apr_size_t calc_buffered(h2_bucket_beam *beam) return len; } +static void r_purge_sent(h2_bucket_beam *beam) +{ + apr_bucket *b; + /* delete all sender buckets in purge brigade, needs to be called + * from sender thread only */ + while (!H2_BLIST_EMPTY(&beam->purge_list)) { + b = H2_BLIST_FIRST(&beam->purge_list); + apr_bucket_delete(b); + } +} + static apr_size_t calc_space_left(h2_bucket_beam *beam) { if (beam->max_buf_size > 0) { @@ -449,7 +435,7 @@ static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy) } else { /* it should be there unless we screwed up */ - ap_log_perror(APLOG_MARK, APLOG_WARNING, 0, beam->pool, + ap_log_perror(APLOG_MARK, APLOG_WARNING, 0, beam->send_pool, APLOGNO(03384) "h2_beam(%d-%s): emitted bucket not " "in hold, n=%d", beam->id, beam->tag, (int)proxy->n); @@ -458,7 +444,7 @@ static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy) } /* notify anyone waiting on space to become available */ if (!bl.mutex) { - h2_blist_cleanup(&beam->purge_list); + r_purge_sent(beam); } else { apr_thread_cond_broadcast(beam->change); @@ -467,6 +453,16 @@ static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy) } } +static void h2_blist_cleanup(h2_blist *bl) +{ + apr_bucket *e; + + while (!H2_BLIST_EMPTY(bl)) { + e = H2_BLIST_FIRST(bl); + apr_bucket_delete(e); + } +} + static apr_status_t beam_close(h2_bucket_beam *beam) { if (!beam->closed) { @@ -481,10 +477,40 @@ int h2_beam_is_closed(h2_bucket_beam *beam) return beam->closed; } +static int pool_register(h2_bucket_beam *beam, apr_pool_t *pool, + apr_status_t (*cleanup)(void *)) +{ + if (pool && pool != beam->pool) { + apr_pool_pre_cleanup_register(pool, beam, cleanup); + return 1; + } + return 0; +} + +static int pool_kill(h2_bucket_beam *beam, apr_pool_t *pool, + apr_status_t (*cleanup)(void *)) { + if (pool && pool != beam->pool) { + apr_pool_cleanup_kill(pool, beam, cleanup); + return 1; + } + return 0; +} + +static apr_status_t beam_recv_cleanup(void *data) +{ + h2_bucket_beam *beam = data; + /* receiver pool has gone away, clear references */ + beam->recv_buffer = NULL; + beam->recv_pool = NULL; + return APR_SUCCESS; +} + static apr_status_t beam_send_cleanup(void *data) { h2_bucket_beam *beam = data; /* sender is going away, clear up all references to its memory */ + r_purge_sent(beam); + h2_blist_cleanup(&beam->send_list); report_consumption(beam, NULL); while (!H2_BPROXY_LIST_EMPTY(&beam->proxies)) { h2_beam_proxy *proxy = H2_BPROXY_LIST_FIRST(&beam->proxies); @@ -494,10 +520,22 @@ static apr_status_t beam_send_cleanup(void *data) } h2_blist_cleanup(&beam->purge_list); h2_blist_cleanup(&beam->hold_list); - h2_blist_cleanup(&beam->send_list); + beam->send_pool = NULL; return APR_SUCCESS; } +static void beam_set_send_pool(h2_bucket_beam *beam, apr_pool_t *pool) +{ + if (beam->send_pool != pool) { + if (beam->send_pool && beam->send_pool != beam->pool) { + pool_kill(beam, beam->send_pool, beam_send_cleanup); + beam_send_cleanup(beam); + } + beam->send_pool = pool; + pool_register(beam, beam->send_pool, beam_send_cleanup); + } +} + static void recv_buffer_cleanup(h2_bucket_beam *beam, h2_beam_lock *bl) { if (beam->recv_buffer && !APR_BRIGADE_EMPTY(beam->recv_buffer)) { @@ -521,18 +559,74 @@ static void recv_buffer_cleanup(h2_bucket_beam *beam, h2_beam_lock *bl) } } -apr_status_t h2_beam_destroy(h2_bucket_beam *beam) +static apr_status_t beam_cleanup(h2_bucket_beam *beam, int from_pool) { - /* no more io callbacks */ - beam->cons_io_cb = NULL; - beam->recv_buffer = NULL; - beam->recv_pool = NULL; + apr_status_t status = APR_SUCCESS; + int safe_send = (beam->owner == H2_BEAM_OWNER_SEND); + int safe_recv = (beam->owner == H2_BEAM_OWNER_RECV); - return beam_send_cleanup(beam); + /* + * Owner of the beam is going away, depending on which side it owns, + * cleanup strategies will differ. + * + * In general, receiver holds references to memory from sender. + * Clean up receiver first, if safe, then cleanup sender, if safe. + */ + + /* When called from pool destroy, io callbacks are disabled */ + if (from_pool) { + beam->cons_io_cb = NULL; + } + + /* When modify send is not safe, this means we still have multi-thread + * protection and the owner is receiving the buckets. If the sending + * side has not gone away, this means we could have dangling buckets + * in our lists that never get destroyed. This should not happen. */ + ap_assert(safe_send || !beam->send_pool); + if (!H2_BLIST_EMPTY(&beam->send_list)) { + ap_assert(beam->send_pool); + } + + if (safe_recv) { + if (beam->recv_pool) { + pool_kill(beam, beam->recv_pool, beam_recv_cleanup); + beam->recv_pool = NULL; + } + recv_buffer_cleanup(beam, NULL); + } + else { + beam->recv_buffer = NULL; + beam->recv_pool = NULL; + } + + if (safe_send && beam->send_pool) { + pool_kill(beam, beam->send_pool, beam_send_cleanup); + status = beam_send_cleanup(beam); + } + + if (safe_recv) { + ap_assert(H2_BPROXY_LIST_EMPTY(&beam->proxies)); + ap_assert(H2_BLIST_EMPTY(&beam->send_list)); + ap_assert(H2_BLIST_EMPTY(&beam->hold_list)); + ap_assert(H2_BLIST_EMPTY(&beam->purge_list)); + } + return status; +} + +static apr_status_t beam_pool_cleanup(void *data) +{ + return beam_cleanup(data, 1); +} + +apr_status_t h2_beam_destroy(h2_bucket_beam *beam) +{ + apr_pool_cleanup_kill(beam->pool, beam, beam_pool_cleanup); + return beam_cleanup(beam, 0); } apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *pool, int id, const char *tag, + h2_beam_owner_t owner, apr_size_t max_buf_size, apr_interval_time_t timeout) { @@ -547,6 +641,7 @@ apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *pool, beam->id = id; beam->tag = tag; beam->pool = pool; + beam->owner = owner; H2_BLIST_INIT(&beam->send_list); H2_BLIST_INIT(&beam->hold_list); H2_BLIST_INIT(&beam->purge_list); @@ -555,11 +650,14 @@ apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *pool, beam->max_buf_size = max_buf_size; beam->timeout = timeout; - rv = apr_thread_mutex_create(&beam->lock, APR_THREAD_MUTEX_NESTED, pool); + rv = apr_thread_mutex_create(&beam->lock, APR_THREAD_MUTEX_DEFAULT, pool); if (APR_SUCCESS == rv) { rv = apr_thread_cond_create(&beam->change, pool); + if (APR_SUCCESS == rv) { + apr_pool_pre_cleanup_register(pool, beam, beam_pool_cleanup); + *pbeam = beam; + } } - *pbeam = (APR_SUCCESS == rv)? beam : NULL; return rv; } @@ -613,7 +711,7 @@ void h2_beam_abort(h2_bucket_beam *beam) if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) { beam->aborted = 1; - h2_blist_cleanup(&beam->purge_list); + r_purge_sent(beam); h2_blist_cleanup(&beam->send_list); report_consumption(beam, &bl); apr_thread_cond_broadcast(beam->change); @@ -626,7 +724,7 @@ apr_status_t h2_beam_close(h2_bucket_beam *beam) h2_beam_lock bl; if (beam && enter_yellow(beam, &bl) == APR_SUCCESS) { - h2_blist_cleanup(&beam->purge_list); + r_purge_sent(beam); beam_close(beam); report_consumption(beam, &bl); leave_yellow(beam, &bl); @@ -659,6 +757,17 @@ apr_status_t h2_beam_wait_empty(h2_bucket_beam *beam, apr_read_type_e block) return status; } +static void move_to_hold(h2_bucket_beam *beam, + apr_bucket_brigade *sender_bb) +{ + apr_bucket *b; + while (sender_bb && !APR_BRIGADE_EMPTY(sender_bb)) { + b = APR_BRIGADE_FIRST(sender_bb); + APR_BUCKET_REMOVE(b); + H2_BLIST_INSERT_TAIL(&beam->send_list, b); + } +} + static apr_status_t append_bucket(h2_bucket_beam *beam, apr_bucket *b, apr_read_type_e block, @@ -680,19 +789,6 @@ static apr_status_t append_bucket(h2_bucket_beam *beam, if (APR_BUCKET_IS_EOS(b)) { beam->closed = 1; } - if (AP_BUCKET_IS_EOR(b)) { - /* The problem with EOR buckets: - * - we cannot delete it now, as it will destroy the request pool - * and free data that we are still holding in the beam. - * - if we add it to the send_list, as all other buckets, - * it will most likely not be read, as an EOS came before. - * This means we still juggle it when the beam is destroyed, - * and rarely this seems to cause the pool to be freed twice... - * if asan stack traces are to be believed... - * - since we - */ - beam->closed = 1; - } APR_BUCKET_REMOVE(b); H2_BLIST_INSERT_TAIL(&beam->send_list, b); return APR_SUCCESS; @@ -748,7 +844,7 @@ static apr_status_t append_bucket(h2_bucket_beam *beam, /* this takes care of transient buckets and converts them * into heap ones. Other bucket types might or might not be * affected by this. */ - status = apr_bucket_setaside(b, beam->pool); + status = apr_bucket_setaside(b, beam->send_pool); } else if (APR_BUCKET_IS_HEAP(b)) { /* For heap buckets read from a receiver thread is fine. The @@ -768,7 +864,7 @@ static apr_status_t append_bucket(h2_bucket_beam *beam, } } else if (APR_BUCKET_IS_FILE(b) && can_beam) { - status = apr_bucket_setaside(b, beam->pool); + status = apr_bucket_setaside(b, beam->send_pool); } if (status == APR_ENOTIMPL) { @@ -780,7 +876,7 @@ static apr_status_t append_bucket(h2_bucket_beam *beam, * use pools/allocators safely. */ status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ); if (status == APR_SUCCESS) { - status = apr_bucket_setaside(b, beam->pool); + status = apr_bucket_setaside(b, beam->send_pool); } } @@ -795,6 +891,17 @@ static apr_status_t append_bucket(h2_bucket_beam *beam, return APR_SUCCESS; } +void h2_beam_send_from(h2_bucket_beam *beam, apr_pool_t *p) +{ + h2_beam_lock bl; + /* Called from the sender thread to add buckets to the beam */ + if (enter_yellow(beam, &bl) == APR_SUCCESS) { + r_purge_sent(beam); + beam_set_send_pool(beam, p); + leave_yellow(beam, &bl); + } +} + apr_status_t h2_beam_send(h2_bucket_beam *beam, apr_bucket_brigade *sender_bb, apr_read_type_e block) @@ -806,11 +913,11 @@ apr_status_t h2_beam_send(h2_bucket_beam *beam, /* Called from the sender thread to add buckets to the beam */ if (enter_yellow(beam, &bl) == APR_SUCCESS) { - ap_assert(beam->pool); - h2_blist_cleanup(&beam->purge_list); + ap_assert(beam->send_pool); + r_purge_sent(beam); if (beam->aborted) { - brigade_move_to_blist(sender_bb, &beam->send_list); + move_to_hold(beam, sender_bb); rv = APR_ECONNABORTED; } else if (sender_bb) { @@ -820,7 +927,7 @@ apr_status_t h2_beam_send(h2_bucket_beam *beam, while (!APR_BRIGADE_EMPTY(sender_bb) && APR_SUCCESS == rv) { if (space_left <= 0) { report_prod_io(beam, force_report, &bl); - h2_blist_cleanup(&beam->purge_list); + r_purge_sent(beam); rv = wait_not_full(beam, block, &space_left, &bl); if (APR_SUCCESS != rv) { break; diff --git a/modules/http2/h2_bucket_beam.h b/modules/http2/h2_bucket_beam.h index be5a2fda6e..f260762366 100644 --- a/modules/http2/h2_bucket_beam.h +++ b/modules/http2/h2_bucket_beam.h @@ -150,6 +150,11 @@ typedef struct { typedef int h2_beam_can_beam_callback(void *ctx, h2_bucket_beam *beam, apr_file_t *file); +typedef enum { + H2_BEAM_OWNER_SEND, + H2_BEAM_OWNER_RECV +} h2_beam_owner_t; + /** * Will deny all transfer of apr_file_t across the beam and force * a data copy instead. @@ -160,11 +165,13 @@ struct h2_bucket_beam { int id; const char *tag; apr_pool_t *pool; + h2_beam_owner_t owner; h2_blist send_list; h2_blist hold_list; h2_blist purge_list; apr_bucket_brigade *recv_buffer; h2_bproxy_list proxies; + apr_pool_t *send_pool; apr_pool_t *recv_pool; apr_size_t max_buf_size; @@ -208,6 +215,8 @@ struct h2_bucket_beam { * @param pool pool owning the beam, beam will cleanup when pool released * @param id identifier of the beam * @param tag tag identifying beam for logging + * @param owner if the beam is owned by the sender or receiver, e.g. if + * the pool owner is using this beam for sending or receiving * @param buffer_size maximum memory footprint of buckets buffered in beam, or * 0 for no limitation * @param timeout timeout for blocking operations @@ -215,6 +224,7 @@ struct h2_bucket_beam { apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *pool, int id, const char *tag, + h2_beam_owner_t owner, apr_size_t buffer_size, apr_interval_time_t timeout); @@ -236,6 +246,13 @@ apr_status_t h2_beam_send(h2_bucket_beam *beam, apr_read_type_e block); /** + * Register the pool from which future buckets are send. This defines + * the lifetime of the buckets, e.g. the pool should not be cleared/destroyed + * until the data is no longer needed (or has been received). + */ +void h2_beam_send_from(h2_bucket_beam *beam, apr_pool_t *p); + +/** * Receive buckets from the beam into the given brigade. Will return APR_EOF * when reading past an EOS bucket. Reads can be blocking until data is * available or the beam has been closed. Non-blocking calls return APR_EAGAIN diff --git a/modules/http2/h2_bucket_eos.c b/modules/http2/h2_bucket_eos.c index 17f5f06f4e..4fe7ea725f 100644 --- a/modules/http2/h2_bucket_eos.c +++ b/modules/http2/h2_bucket_eos.c @@ -24,17 +24,27 @@ #include "h2_private.h" #include "h2.h" -#include "h2_ctx.h" #include "h2_mplx.h" -#include "h2_session.h" +#include "h2_stream.h" #include "h2_bucket_eos.h" typedef struct { apr_bucket_refcount refcount; - conn_rec *c; - int stream_id; + h2_stream *stream; } h2_bucket_eos; +static apr_status_t bucket_cleanup(void *data) +{ + h2_stream **pstream = data; + + if (*pstream) { + /* If bucket_destroy is called after us, this prevents + * bucket_destroy from trying to destroy the stream again. */ + *pstream = NULL; + } + return APR_SUCCESS; +} + static apr_status_t bucket_read(apr_bucket *b, const char **str, apr_size_t *len, apr_read_type_e block) { @@ -45,13 +55,12 @@ static apr_status_t bucket_read(apr_bucket *b, const char **str, return APR_SUCCESS; } -apr_bucket *h2_bucket_eos_make(apr_bucket *b, conn_rec *c, int stream_id) +apr_bucket *h2_bucket_eos_make(apr_bucket *b, h2_stream *stream) { h2_bucket_eos *h; h = apr_bucket_alloc(sizeof(*h), b->list); - h->c = c; - h->stream_id = stream_id; + h->stream = stream; b = apr_bucket_shared_make(b, h, 0, 0); b->type = &h2_bucket_type_eos; @@ -59,27 +68,35 @@ apr_bucket *h2_bucket_eos_make(apr_bucket *b, conn_rec *c, int stream_id) return b; } -apr_bucket *h2_bucket_eos_create(apr_bucket_alloc_t *list, conn_rec *c, int stream_id) +apr_bucket *h2_bucket_eos_create(apr_bucket_alloc_t *list, + h2_stream *stream) { apr_bucket *b = apr_bucket_alloc(sizeof(*b), list); APR_BUCKET_INIT(b); b->free = apr_bucket_free; b->list = list; - b = h2_bucket_eos_make(b, c, stream_id); + b = h2_bucket_eos_make(b, stream); + if (stream) { + h2_bucket_eos *h = b->data; + apr_pool_pre_cleanup_register(stream->pool, &h->stream, bucket_cleanup); + } return b; } static void bucket_destroy(void *data) { h2_bucket_eos *h = data; - h2_session *session; - + if (apr_bucket_shared_destroy(h)) { - if ((session = h2_ctx_get_session(h->c))) { - h2_session_eos_sent(session, h->stream_id); + h2_stream *stream = h->stream; + if (stream && stream->pool) { + apr_pool_cleanup_kill(stream->pool, &h->stream, bucket_cleanup); } apr_bucket_free(h); + if (stream) { + h2_stream_dispatch(stream, H2_SEV_EOS_SENT); + } } } diff --git a/modules/http2/h2_bucket_eos.h b/modules/http2/h2_bucket_eos.h index 3f9b800717..04e32e37f1 100644 --- a/modules/http2/h2_bucket_eos.h +++ b/modules/http2/h2_bucket_eos.h @@ -17,13 +17,16 @@ #ifndef mod_http2_h2_bucket_stream_eos_h #define mod_http2_h2_bucket_stream_eos_h +struct h2_stream; + /** End Of HTTP/2 STREAM (H2EOS) bucket */ extern const apr_bucket_type_t h2_bucket_type_eos; #define H2_BUCKET_IS_H2EOS(e) (e->type == &h2_bucket_type_eos) -apr_bucket *h2_bucket_eos_make(apr_bucket *b, conn_rec *c, int stream_id); +apr_bucket *h2_bucket_eos_make(apr_bucket *b, struct h2_stream *stream); -apr_bucket *h2_bucket_eos_create(apr_bucket_alloc_t *list, conn_rec *c, int stream_id); +apr_bucket *h2_bucket_eos_create(apr_bucket_alloc_t *list, + struct h2_stream *stream); #endif /* mod_http2_h2_bucket_stream_eos_h */ diff --git a/modules/http2/h2_conn.c b/modules/http2/h2_conn.c index 7abbed9898..d29cd7e996 100644 --- a/modules/http2/h2_conn.c +++ b/modules/http2/h2_conn.c @@ -37,6 +37,7 @@ #include "h2_filter.h" #include "h2_mplx.h" #include "h2_session.h" +#include "h2_stream.h" #include "h2_h2.h" #include "h2_task.h" #include "h2_workers.h" @@ -350,7 +351,8 @@ conn_rec *h2_slave_create(conn_rec *master, int slave_id, apr_pool_t *parent) void h2_slave_destroy(conn_rec *slave) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, slave, "h2_slave(%s): destroy", slave->log_id); + ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, slave, + "h2_slave(%s): destroy", slave->log_id); slave->sbh = NULL; apr_pool_destroy(slave->pool); } diff --git a/modules/http2/h2_conn_io.c b/modules/http2/h2_conn_io.c index 25f96ed98c..68c15d13e4 100644 --- a/modules/http2/h2_conn_io.c +++ b/modules/http2/h2_conn_io.c @@ -134,8 +134,7 @@ apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c, server_rec *s) io->c = c; io->output = apr_brigade_create(c->pool, c->bucket_alloc); io->is_tls = h2_h2_is_tls(c); - /* we used to buffer only on TLS connections, but to eliminate code paths - * and force more predictable behaviour, we do it on all now. Less test cases. */ + io->buffer_output = io->is_tls; io->flush_threshold = (apr_size_t)h2_config_sgeti64(s, H2_CONF_STREAM_MAX_MEM); if (io->is_tls) { @@ -151,13 +150,14 @@ apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c, server_rec *s) else { io->warmup_size = 0; io->cooldown_usecs = 0; - io->write_size = WRITE_SIZE_MAX; + io->write_size = 0; } if (APLOGctrace1(c)) { ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c, - "h2_conn_io(%ld): init, warmup_size=%ld, cd_secs=%f", - io->c->id, (long)io->warmup_size, + "h2_conn_io(%ld): init, buffering=%d, warmup_size=%ld, " + "cd_secs=%f", io->c->id, io->buffer_output, + (long)io->warmup_size, ((double)io->cooldown_usecs/APR_USEC_PER_SEC)); } @@ -321,20 +321,25 @@ apr_status_t h2_conn_io_write(h2_conn_io *io, const char *data, size_t length) io->is_flushed = 0; } - while (length > 0) { - remain = assure_scratch_space(io); - if (remain >= length) { - memcpy(io->scratch + io->slen, data, length); - io->slen += length; - length = 0; - } - else { - memcpy(io->scratch + io->slen, data, remain); - io->slen += remain; - data += remain; - length -= remain; + if (io->buffer_output) { + while (length > 0) { + remain = assure_scratch_space(io); + if (remain >= length) { + memcpy(io->scratch + io->slen, data, length); + io->slen += length; + length = 0; + } + else { + memcpy(io->scratch + io->slen, data, remain); + io->slen += remain; + data += remain; + length -= remain; + } } } + else { + status = apr_brigade_write(io->output, NULL, NULL, data, length); + } return status; } @@ -351,26 +356,37 @@ apr_status_t h2_conn_io_pass(h2_conn_io *io, apr_bucket_brigade *bb) b = APR_BRIGADE_FIRST(bb); if (APR_BUCKET_IS_METADATA(b)) { - if (APR_BUCKET_IS_FLUSH(b)) { - /* need to finish any open scratch bucket, as meta data - * needs to be forward "in order". */ - append_scratch(io); - APR_BUCKET_REMOVE(b); - APR_BRIGADE_INSERT_TAIL(io->output, b); + /* need to finish any open scratch bucket, as meta data + * needs to be forward "in order". */ + append_scratch(io); + APR_BUCKET_REMOVE(b); + APR_BRIGADE_INSERT_TAIL(io->output, b); + } + else if (io->buffer_output) { + apr_size_t remain = assure_scratch_space(io); + if (b->length > remain) { + apr_bucket_split(b, remain); + if (io->slen == 0) { + /* complete write_size bucket, append unchanged */ + APR_BUCKET_REMOVE(b); + APR_BRIGADE_INSERT_TAIL(io->output, b); + continue; + } } else { + /* bucket fits in remain, copy to scratch */ + status = read_to_scratch(io, b); apr_bucket_delete(b); + continue; } } else { - apr_size_t remain = assure_scratch_space(io); - if (b->length > remain) { - apr_bucket_split(b, remain); + /* no buffering, forward buckets setaside on flush */ + if (APR_BUCKET_IS_TRANSIENT(b)) { + apr_bucket_setaside(b, io->c->pool); } - /* bucket now fits in remain, copy to scratch */ - status = read_to_scratch(io, b); - apr_bucket_delete(b); - continue; + APR_BUCKET_REMOVE(b); + APR_BRIGADE_INSERT_TAIL(io->output, b); } } return status; diff --git a/modules/http2/h2_conn_io.h b/modules/http2/h2_conn_io.h index a8821aa095..e96203cac2 100644 --- a/modules/http2/h2_conn_io.h +++ b/modules/http2/h2_conn_io.h @@ -39,6 +39,7 @@ typedef struct { apr_int64_t bytes_read; apr_int64_t bytes_written; + int buffer_output; apr_size_t flush_threshold; unsigned int is_flushed : 1; diff --git a/modules/http2/h2_filter.h b/modules/http2/h2_filter.h index bb32546aab..12810d81b7 100644 --- a/modules/http2/h2_filter.h +++ b/modules/http2/h2_filter.h @@ -19,6 +19,7 @@ struct h2_bucket_beam; struct h2_headers; +struct h2_stream; struct h2_session; typedef struct h2_filter_cin { diff --git a/modules/http2/h2_h2.c b/modules/http2/h2_h2.c index ded9bc41c4..4ff1d51d84 100644 --- a/modules/http2/h2_h2.c +++ b/modules/http2/h2_h2.c @@ -34,6 +34,7 @@ #include "h2_private.h" #include "h2_bucket_beam.h" +#include "h2_stream.h" #include "h2_task.h" #include "h2_config.h" #include "h2_ctx.h" @@ -755,10 +756,6 @@ static int h2_h2_late_fixups(request_rec *r) } check_push(r, "late_fixup"); } - /* enforce that we will close this slave connection after - * the task is done. This will keep request processing from - * trying to clean up dangling input data, for example. */ - r->connection->keepalive = AP_CONN_CLOSE; } return DECLINED; } diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index ba47edec33..81b063ad44 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -61,8 +61,8 @@ apr_status_t h2_mplx_child_init(apr_pool_t *pool, server_rec *s) } #define H2_MPLX_ENTER(m) \ - do { apr_status_t lrv; if ((lrv = apr_thread_mutex_lock(m->lock)) != APR_SUCCESS) {\ - return lrv;\ + do { apr_status_t rv; if ((rv = apr_thread_mutex_lock(m->lock)) != APR_SUCCESS) {\ + return rv;\ } } while(0) #define H2_MPLX_LEAVE(m) \ @@ -104,7 +104,7 @@ static void stream_joined(h2_mplx *m, h2_stream *stream) h2_ihash_add(m->spurge, stream); } -static void stream_discard(h2_mplx *m, h2_stream *stream) +static void stream_cleanup(h2_mplx *m, h2_stream *stream) { ap_assert(stream->state == H2_SS_CLEANUP); @@ -175,7 +175,7 @@ h2_mplx *h2_mplx_create(conn_rec *c, server_rec *s, apr_pool_t *parent, } apr_pool_tag(m->pool, "h2_mplx"); apr_allocator_owner_set(allocator, m->pool); - status = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, + status = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_DEFAULT, m->pool); if (status != APR_SUCCESS) { apr_pool_destroy(m->pool); @@ -183,7 +183,7 @@ h2_mplx *h2_mplx_create(conn_rec *c, server_rec *s, apr_pool_t *parent, } apr_allocator_mutex_set(allocator, mutex); - status = apr_thread_mutex_create(&m->lock, APR_THREAD_MUTEX_NESTED, + status = apr_thread_mutex_create(&m->lock, APR_THREAD_MUTEX_DEFAULT, m->pool); if (status != APR_SUCCESS) { apr_pool_destroy(m->pool); @@ -267,13 +267,8 @@ static int stream_destroy_iter(void *ctx, void *val) h2_mplx *m = ctx; h2_stream *stream = val; - /* Make dead certain we are called for a stream - to purge and that we have not already done so */ - ap_assert(h2_ihash_get(m->spurge, stream->id) == stream); - h2_ihash_remove(m->spurge, stream->id); ap_assert(stream->state == H2_SS_CLEANUP); - stream->state = H2_SS_DESTROYED; if (stream->input) { /* Process outstanding events before destruction */ @@ -308,15 +303,15 @@ static int stream_destroy_iter(void *ctx, void *val) && !task->rst_error); } - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, - APLOGNO(03385) "h2_task_destroy, reuse slave=%d", reuse_slave); - task->c = NULL; - h2_task_destroy(task); - if (reuse_slave) { + h2_beam_log(task->output.beam, m->c, APLOG_DEBUG, + APLOGNO(03385) "h2_task_destroy, reuse slave"); + h2_task_destroy(task); APR_ARRAY_PUSH(m->spare_slaves, conn_rec*) = slave; } else { + h2_beam_log(task->output.beam, m->c, APLOG_TRACE1, + "h2_task_destroy, destroy slave"); h2_slave_destroy(slave); } } @@ -325,15 +320,15 @@ static int stream_destroy_iter(void *ctx, void *val) return 0; } -static void purge_streams(h2_mplx *m) +static void purge_streams(h2_mplx *m, int lock) { - H2_MPLX_ENTER_ALWAYS(m); if (!h2_ihash_empty(m->spurge)) { + H2_MPLX_ENTER_MAYBE(m, lock); while (!h2_ihash_iter(m->spurge, stream_destroy_iter, m)) { /* repeat until empty */ } + H2_MPLX_LEAVE_MAYBE(m, lock); } - H2_MPLX_LEAVE(m); } typedef struct { @@ -395,7 +390,7 @@ static int unexpected_stream_iter(void *ctx, void *val) { return 1; } -static int stream_cancel_and_discard_iter(void *ctx, void *val) { +static int stream_cancel_iter(void *ctx, void *val) { h2_mplx *m = ctx; h2_stream *stream = val; @@ -409,7 +404,7 @@ static int stream_cancel_and_discard_iter(void *ctx, void *val) { h2_stream_rst(stream, H2_ERR_NO_ERROR); /* All connection data has been sent, simulate cleanup */ h2_stream_dispatch(stream, H2_SEV_EOS_SENT); - stream_discard(m, stream); + stream_cleanup(m, stream); return 0; } @@ -435,7 +430,7 @@ void h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) /* How to shut down a h2 connection: * 1. cancel all streams still active */ - while (!h2_ihash_iter(m->streams, stream_cancel_and_discard_iter, m)) { + while (!h2_ihash_iter(m->streams, stream_cancel_iter, m)) { /* until empty */ } @@ -471,7 +466,6 @@ void h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) h2_ihash_iter(m->shold, unexpected_stream_iter, m); } - purge_streams(m); m->c->aborted = old_aborted; H2_MPLX_LEAVE(m); @@ -479,9 +473,16 @@ void h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) "h2_mplx(%ld): released", m->id); } -static h2_stream *mplx_stream_get(h2_mplx *m, int id) +apr_status_t h2_mplx_stream_cleanup(h2_mplx *m, h2_stream *stream) { - return h2_ihash_get(m->streams, id); + H2_MPLX_ENTER(m); + + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + H2_STRM_MSG(stream, "cleanup")); + stream_cleanup(m, stream); + + H2_MPLX_LEAVE(m); + return APR_SUCCESS; } h2_stream *h2_mplx_stream_get(h2_mplx *m, int id) @@ -489,24 +490,11 @@ h2_stream *h2_mplx_stream_get(h2_mplx *m, int id) h2_stream *s = NULL; H2_MPLX_ENTER_ALWAYS(m); + s = h2_ihash_get(m->streams, id); - H2_MPLX_LEAVE(m); - return s; -} -apr_status_t h2_mplx_stream_discard(h2_mplx *m, int stream_id) -{ - h2_stream *stream; - - H2_MPLX_ENTER(m); - stream = mplx_stream_get(m, stream_id); - if (stream) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - H2_STRM_MSG(stream, "cleanup")); - stream_discard(m, stream); - } H2_MPLX_LEAVE(m); - return APR_SUCCESS; + return s; } static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes) @@ -606,7 +594,7 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout, status = APR_SUCCESS; } else { - purge_streams(m); + purge_streams(m, 0); h2_ihash_iter(m->streams, report_consumption_iter, m); m->added_output = iowait; status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout); @@ -668,31 +656,19 @@ static void register_if_needed(h2_mplx *m) } } -void h2_mplx_stream_register(h2_mplx *m, h2_stream *stream) +apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream, + h2_stream_pri_cmp *cmp, void *ctx) { - H2_MPLX_ENTER_ALWAYS(m); - AP_DEBUG_ASSERT(h2_ihash_get(m->streams, stream->id) == NULL); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, H2_STRM_MSG(stream, "registered")); - h2_ihash_add(m->streams, stream); - H2_MPLX_LEAVE(m); -} - -apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, h2_stream_pri_cmp *cmp, void *ctx) -{ - h2_stream *stream; - apr_status_t rv = APR_SUCCESS; + apr_status_t status; H2_MPLX_ENTER(m); if (m->aborted) { - rv = APR_ECONNABORTED; + status = APR_ECONNABORTED; } else { - stream = mplx_stream_get(m, stream_id); - if (!stream) goto leave; - ap_assert(!stream->scheduled); - stream->scheduled = 1; - + status = APR_SUCCESS; + h2_ihash_add(m->streams, stream); if (h2_stream_is_ready(stream)) { /* already have a response */ check_data_for(m, stream, 0); @@ -706,9 +682,9 @@ apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, h2_stream_pri_cmp *cmp, H2_STRM_MSG(stream, "process, added to q")); } } -leave: + H2_MPLX_LEAVE(m); - return rv; + return status; } static h2_task *next_stream_task(h2_mplx *m) @@ -1050,6 +1026,7 @@ apr_status_t h2_mplx_idle(h2_mplx *m) ", out has %ld bytes buffered"), h2_beam_is_closed(stream->output), (long)h2_beam_get_buffered(stream->output)); + h2_ihash_add(m->streams, stream); check_data_for(m, stream, 0); stream->out_checked = 1; status = APR_EAGAIN; @@ -1085,7 +1062,7 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, /* update input windows for streams */ h2_ihash_iter(m->streams, report_consumption_iter, m); - purge_streams(m); + purge_streams(m, 1); n = h2_ififo_count(m->readyq); while (n > 0 diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h index af8462a22a..575ccaf430 100644 --- a/modules/http2/h2_mplx.h +++ b/modules/http2/h2_mplx.h @@ -136,22 +136,7 @@ int h2_mplx_is_busy(h2_mplx *m); * IO lifetime of streams. ******************************************************************************/ -/** - * Register a stream with the multiplexer. This transfers responisibility - * for lifetime and final destruction to mplx. - - * @param mplx the multiplexer - * @param stream the h2 stream instance - */ -void h2_mplx_stream_register(h2_mplx *mplx, struct h2_stream *stream); - -/** - * Lookup a stream by its id. Will only return active streams, not discarded ones. - * @param mplx the multiplexer - * @param id the stream identifier - * @return the stream or NULL - */ -struct h2_stream *h2_mplx_stream_get(h2_mplx *mplx, int id); +struct h2_stream *h2_mplx_stream_get(h2_mplx *m, int id); /** * Notifies mplx that a stream has been completely handled on the main @@ -160,7 +145,7 @@ struct h2_stream *h2_mplx_stream_get(h2_mplx *mplx, int id); * @param m the mplx itself * @param stream the stream ready for cleanup */ -apr_status_t h2_mplx_stream_discard(h2_mplx *m, int stream_id); +apr_status_t h2_mplx_stream_cleanup(h2_mplx *m, struct h2_stream *stream); /** * Waits on output data from any stream in this session to become available. @@ -179,12 +164,13 @@ apr_status_t h2_mplx_keep_active(h2_mplx *m, struct h2_stream *stream); * Process a stream request. * * @param m the multiplexer - * @param stream_id the identifier of the stream + * @param stream the identifier of the stream * @param r the request to be processed * @param cmp the stream priority compare function * @param ctx context data for the compare function */ -apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, h2_stream_pri_cmp *cmp, void *ctx); +apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream, + h2_stream_pri_cmp *cmp, void *ctx); /** * Stream priorities have changed, reschedule pending requests. diff --git a/modules/http2/h2_push.c b/modules/http2/h2_push.c index 4740026e2c..750088af2d 100644 --- a/modules/http2/h2_push.c +++ b/modules/http2/h2_push.c @@ -622,19 +622,15 @@ static h2_push_diary_entry *move_to_last(h2_push_diary *diary, apr_size_t idx) { h2_push_diary_entry *entries = (h2_push_diary_entry*)diary->entries->elts; h2_push_diary_entry e; + apr_size_t lastidx = (apr_size_t)diary->entries->nelts; - if (diary->entries->nelts > 0) { - int lastidx = diary->entries->nelts - 1; - - /* move entry[idx] to the end */ - if (idx < lastidx) { - e = entries[idx]; - memmove(entries+idx, entries+idx+1, sizeof(e) * (lastidx - idx)); - entries[lastidx] = e; - return &entries[lastidx]; - } + /* move entry[idx] to the end */ + if (idx+1 < lastidx) { + e = entries[idx]; + memmove(entries+idx, entries+idx+1, sizeof(e) * (lastidx - idx)); + entries[lastidx] = e; } - return &entries[idx]; + return &entries[lastidx]; } static void h2_push_diary_append(h2_push_diary *diary, h2_push_diary_entry *e) @@ -711,7 +707,7 @@ apr_array_header_t *h2_push_collect_update(h2_stream *stream, } } pushes = h2_push_collect(stream->pool, req, stream->push_policy, res); - return h2_push_diary_update(session, pushes); + return h2_push_diary_update(stream->session, pushes); } static apr_int32_t h2_log2inv(unsigned char log2) diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c index 8a10e40973..1fceabc112 100644 --- a/modules/http2/h2_session.c +++ b/modules/http2/h2_session.c @@ -75,15 +75,7 @@ static int h2_session_status_from_apr_status(apr_status_t rv) static h2_stream *get_stream(h2_session *session, int stream_id) { - h2_stream *stream; - - if (stream_id <= 0) return NULL; - stream = h2_mplx_stream_get(session->mplx, stream_id); - if (!stream) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, - "session_stream_get(%d) == NULL", stream_id); - } - return stream; + return nghttp2_session_get_stream_user_data(session->ngh2, stream_id); } static void dispatch_event(h2_session *session, h2_session_event_t ev, @@ -117,24 +109,21 @@ static void cleanup_unprocessed_streams(h2_session *session) h2_mplx_stream_do(session->mplx, rst_unprocessed_stream, session); } -static apr_pool_t *session_stream_pool_create(h2_session *session) +static h2_stream *h2_session_open_stream(h2_session *session, int stream_id, + int initiated_on) { - apr_pool_t *pool; + h2_stream * stream; + apr_pool_t *stream_pool; - apr_pool_create(&pool, session->pool); - apr_pool_tag(pool, "h2_stream"); - return pool; -} - -static h2_stream *session_stream_pcreate(h2_session *session, int stream_id, - apr_pool_t *pool, int initiated_on) -{ - return h2_stream_create(stream_id, pool, session, session->monitor, initiated_on); -} - -static h2_stream *session_stream_create(h2_session *session, int stream_id) -{ - return session_stream_pcreate(session, stream_id, session_stream_pool_create(session), 0); + apr_pool_create(&stream_pool, session->pool); + apr_pool_tag(stream_pool, "h2_stream"); + + stream = h2_stream_create(stream_id, stream_pool, session, + session->monitor, initiated_on); + if (stream) { + nghttp2_session_set_stream_user_data(session->ngh2, stream_id, stream); + } + return stream; } /** @@ -286,18 +275,19 @@ static int on_begin_headers_cb(nghttp2_session *ngh2, const nghttp2_frame *frame, void *userp) { h2_session *session = (h2_session *)userp; - h2_stream *stream; + h2_stream *s; /* We may see HEADERs at the start of a stream or after all DATA * streams to carry trailers. */ (void)ngh2; - stream = get_stream(session, frame->hd.stream_id); - if (!stream) { - stream = session_stream_create(session, frame->hd.stream_id); - if (!stream) return NGHTTP2_ERR_START_STREAM_NOT_ALLOWED; - h2_mplx_stream_register(session->mplx, stream); + s = get_stream(session, frame->hd.stream_id); + if (s) { + /* nop */ } - return 0; + else { + s = h2_session_open_stream(userp, frame->hd.stream_id, 0); + } + return s? 0 : NGHTTP2_ERR_START_STREAM_NOT_ALLOWED; } static int on_header_cb(nghttp2_session *ngh2, const nghttp2_frame *frame, @@ -376,15 +366,13 @@ static int on_frame_recv_cb(nghttp2_session *ng2s, break; case NGHTTP2_PRIORITY: session->reprioritize = 1; - if (APLOGctrace2(session->c)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, - "h2_stream(%ld-%d): PRIORITY frame " - " weight=%d, dependsOn=%d, exclusive=%d", - session->id, (int)frame->hd.stream_id, - frame->priority.pri_spec.weight, - frame->priority.pri_spec.stream_id, - frame->priority.pri_spec.exclusive); - } + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, + "h2_stream(%ld-%d): PRIORITY frame " + " weight=%d, dependsOn=%d, exclusive=%d", + session->id, (int)frame->hd.stream_id, + frame->priority.pri_spec.weight, + frame->priority.pri_spec.stream_id, + frame->priority.pri_spec.exclusive); break; case NGHTTP2_WINDOW_UPDATE: ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, @@ -396,15 +384,16 @@ static int on_frame_recv_cb(nghttp2_session *ng2s, } break; case NGHTTP2_RST_STREAM: - if (APLOGcdebug(session->c)) { - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03067) - "h2_stream(%ld-%d): RST_STREAM by client, errror=%d", - session->id, (int)frame->hd.stream_id, - (int)frame->rst_stream.error_code); - } + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03067) + "h2_stream(%ld-%d): RST_STREAM by client, errror=%d", + session->id, (int)frame->hd.stream_id, + (int)frame->rst_stream.error_code); stream = get_stream(session, frame->hd.stream_id); - if (stream) { - stream->initiated_on? ++session->pushes_reset : ++session->streams_reset; + if (stream && stream->initiated_on) { + ++session->pushes_reset; + } + else { + ++session->streams_reset; } break; case NGHTTP2_GOAWAY: @@ -468,7 +457,18 @@ static int on_frame_recv_cb(nghttp2_session *ng2s, } } - return (APR_SUCCESS != rv)? NGHTTP2_ERR_PROTO : 0; + if (APR_SUCCESS != rv) return NGHTTP2_ERR_PROTO; + return 0; +} + +static int h2_session_continue_data(h2_session *session) { + if (h2_mplx_has_master_events(session->mplx)) { + return 0; + } + if (h2_conn_io_needs_flush(&session->io)) { + return 0; + } + return 1; } static char immortal_zeros[H2_MAX_PADLEN]; @@ -491,8 +491,7 @@ static int on_send_data_cb(nghttp2_session *ngh2, (void)ngh2; (void)source; - /* Be nimble, react to events from your tasks and do not buffer more than we need */ - if (h2_mplx_has_master_events(session->mplx) ||h2_conn_io_needs_flush(&session->io)) { + if (!h2_session_continue_data(session)) { return NGHTTP2_ERR_WOULDBLOCK; } @@ -834,9 +833,10 @@ apr_status_t h2_session_create(h2_session **psession, conn_rec *c, request_rec * } apr_pool_tag(pool, "h2_session"); apr_allocator_owner_set(allocator, pool); - status = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool); + status = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_DEFAULT, pool); if (status != APR_SUCCESS) { - goto fail; + apr_pool_destroy(pool); + return APR_ENOMEM; } apr_allocator_mutex_set(allocator, mutex); @@ -862,25 +862,26 @@ apr_status_t h2_session_create(h2_session **psession, conn_rec *c, request_rec * status = apr_thread_cond_create(&session->iowait, session->pool); if (status != APR_SUCCESS) { - goto fail; + apr_pool_destroy(pool); + return status; } session->in_pending = h2_iq_create(session->pool, (int)session->max_stream_count); if (session->in_pending == NULL) { - status = APR_ENOMEM; - goto fail; + apr_pool_destroy(pool); + return APR_ENOMEM; } session->in_process = h2_iq_create(session->pool, (int)session->max_stream_count); if (session->in_process == NULL) { - status = APR_ENOMEM; - goto fail; + apr_pool_destroy(pool); + return APR_ENOMEM; } session->monitor = apr_pcalloc(pool, sizeof(h2_stream_monitor)); if (session->monitor == NULL) { - status = APR_ENOMEM; - goto fail; + apr_pool_destroy(pool); + return APR_ENOMEM; } session->monitor->ctx = session; session->monitor->on_state_enter = on_stream_state_enter; @@ -905,8 +906,8 @@ apr_status_t h2_session_create(h2_session **psession, conn_rec *c, request_rec * if (status != APR_SUCCESS) { ap_log_cerror(APLOG_MARK, APLOG_ERR, status, c, APLOGNO(02927) "nghttp2: error in init_callbacks"); - status = APR_ENOMEM; - goto fail; + apr_pool_destroy(pool); + return status; } rv = nghttp2_option_new(&options); @@ -914,8 +915,8 @@ apr_status_t h2_session_create(h2_session **psession, conn_rec *c, request_rec * ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_EGENERAL, c, APLOGNO(02928) "nghttp2_option_new: %s", nghttp2_strerror(rv)); - status = APR_ENOMEM; - goto fail; + apr_pool_destroy(pool); + return status; } nghttp2_option_set_peer_max_concurrent_streams(options, (uint32_t)session->max_stream_count); /* We need to handle window updates ourself, otherwise we @@ -931,8 +932,8 @@ apr_status_t h2_session_create(h2_session **psession, conn_rec *c, request_rec * ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_EGENERAL, c, APLOGNO(02929) "nghttp2_session_server_new: %s", nghttp2_strerror(rv)); - status = APR_ENOMEM; - goto fail; + apr_pool_destroy(pool); + return APR_ENOMEM; } n = h2_config_sgeti(s, H2_CONF_PUSH_DIARY_SIZE); @@ -955,9 +956,6 @@ apr_status_t h2_session_create(h2_session **psession, conn_rec *c, request_rec * apr_pool_pre_cleanup_register(pool, c, session_pool_cleanup); return APR_SUCCESS; -fail: - apr_pool_destroy(pool); - return status; } static apr_status_t h2_session_start(h2_session *session, int *rv) @@ -1005,7 +1003,7 @@ static apr_status_t h2_session_start(h2_session *session, int *rv) } /* Now we need to auto-open stream 1 for the request we got. */ - stream = session_stream_create(session, 1); + stream = h2_session_open_stream(session, 1, 0); if (!stream) { status = APR_EGENERAL; ap_log_rerror(APLOG_MARK, APLOG_ERR, status, session->r, @@ -1013,11 +1011,11 @@ static apr_status_t h2_session_start(h2_session *session, int *rv) nghttp2_strerror(*rv)); return status; } + status = h2_stream_set_request_rec(stream, session->r, 1); if (status != APR_SUCCESS) { return status; } - h2_mplx_stream_register(session->mplx, stream); } slen = 0; @@ -1148,6 +1146,48 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s, return (ssize_t)nread; } +struct h2_stream *h2_session_push(h2_session *session, h2_stream *is, + h2_push *push) +{ + h2_stream *stream; + h2_ngheader *ngh; + apr_status_t status; + int nid = 0; + + status = h2_req_create_ngheader(&ngh, is->pool, push->req); + if (status == APR_SUCCESS) { + nid = nghttp2_submit_push_promise(session->ngh2, 0, is->id, + ngh->nv, ngh->nvlen, NULL); + } + if (status != APR_SUCCESS || nid <= 0) { + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, + H2_STRM_LOG(APLOGNO(03075), is, + "submitting push promise fail: %s"), nghttp2_strerror(nid)); + return NULL; + } + ++session->pushes_promised; + + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, + H2_STRM_LOG(APLOGNO(03076), is, "SERVER_PUSH %d for %s %s on %d"), + nid, push->req->method, push->req->path, is->id); + + stream = h2_session_open_stream(session, nid, is->id); + if (!stream) { + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, + H2_STRM_LOG(APLOGNO(03077), stream, + "failed to create stream obj %d"), nid); + /* kill the push_promise */ + nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE, nid, + NGHTTP2_INTERNAL_ERROR); + return NULL; + } + + h2_session_set_prio(session, stream, push->priority); + h2_stream_set_request(stream, push->req); + ++session->unsent_promises; + return stream; +} + static int valid_weight(float f) { int w = (int)f; @@ -1155,8 +1195,8 @@ static int valid_weight(float f) (w > NGHTTP2_MAX_WEIGHT)? NGHTTP2_MAX_WEIGHT : w); } -static apr_status_t session_stream_priority_set(h2_session *session, h2_stream *stream, - const h2_priority *prio) +apr_status_t h2_session_set_prio(h2_session *session, h2_stream *stream, + const h2_priority *prio) { apr_status_t status = APR_SUCCESS; #ifdef H2_NG2_CHANGE_PRIO @@ -1254,51 +1294,6 @@ static apr_status_t session_stream_priority_set(h2_session *session, h2_stream * return status; } -apr_status_t h2_session_push(h2_session *session, int initiating_stream_id, h2_push *push) -{ - h2_stream *stream; - apr_pool_t *pool; - h2_ngheader *ngh; - int nid = 0; - - pool = session_stream_pool_create(session); - if (APR_SUCCESS != h2_req_create_ngheader(&ngh, pool, push->req)) goto fail; - - nid = nghttp2_submit_push_promise(session->ngh2, 0, initiating_stream_id, - ngh->nv, ngh->nvlen, NULL); - if (nid <= 0) { - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, - APLOGNO(03075) "submitting push promise fail: %s", nghttp2_strerror(nid)); - goto fail; - } - - ++session->pushes_promised; - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, - APLOGNO(03076) "SERVER_PUSH %d for %s %s on %d", - nid, push->req->method, push->req->path, initiating_stream_id); - - stream = session_stream_pcreate(session, nid, pool, initiating_stream_id); - if (!stream) { - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, - H2_STRM_LOG(APLOGNO(03077), stream, - "failed to create stream obj %d"), nid); - goto fail; - } - - session_stream_priority_set(session, stream, push->priority); - h2_stream_request_set(stream, push->req); - ++session->unsent_promises; - h2_mplx_stream_register(session->mplx, stream); - return APR_SUCCESS; - -fail: - if (nid > 0) { - nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE, nid, NGHTTP2_INTERNAL_ERROR); - } - if (pool) apr_pool_destroy(pool); - return APR_EINVAL; -} - int h2_session_push_enabled(h2_session *session) { /* iff we can and they can and want */ @@ -1426,7 +1421,7 @@ static apr_status_t on_stream_headers(h2_session *session, h2_stream *stream, if (!stream->pref_priority) { stream->pref_priority = h2_stream_get_priority(stream, headers); } - session_stream_priority_set(session, stream, stream->pref_priority); + h2_session_set_prio(session, stream, stream->pref_priority); note = apr_table_get(headers->notes, H2_FILTER_DEBUG_NOTE); if (note && !strcmp("on", note)) { @@ -1550,8 +1545,9 @@ static void h2_session_in_flush(h2_session *session) while ((id = h2_iq_shift(session->in_process)) > 0) { h2_stream *stream = get_stream(session, id); if (stream) { + ap_assert(!stream->scheduled); if (h2_stream_prep_processing(stream) == APR_SUCCESS) { - h2_mplx_process(session->mplx, id, stream_pri_cmp, session); + h2_mplx_process(session->mplx, stream, stream_pri_cmp, session); } else { h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR); @@ -1919,21 +1915,6 @@ static void ev_stream_open(h2_session *session, h2_stream *stream) h2_iq_append(session->in_process, stream->id); } -void h2_session_eos_sent(h2_session *session, int stream_id) -{ - /* stream may no longer be known by nghttp2, but still kept in mplx */ - h2_stream *stream = h2_mplx_stream_get(session->mplx, stream_id); - if (stream) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, - H2_STRM_MSG(stream, "eos sent")); - h2_stream_dispatch(stream, H2_SEV_EOS_SENT); - } - else { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, - "eos sent for unknown stream %d", stream_id); - } -} - static void ev_stream_closed(h2_session *session, h2_stream *stream) { apr_bucket *b; @@ -1949,14 +1930,12 @@ static void ev_stream_closed(h2_session *session, h2_stream *stream) break; } - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, - H2_STRM_MSG(stream, "sending eos")); /* The stream might have data in the buffers of the main connection. * We can only free the allocated resources once all had been written. * Send a special buckets on the connection that gets destroyed when * all preceding data has been handled. On its destruction, it is safe * to purge all resources of the stream. */ - b = h2_bucket_eos_create(session->c->bucket_alloc, session->c, stream->id); + b = h2_bucket_eos_create(session->c->bucket_alloc, stream); APR_BRIGADE_INSERT_TAIL(session->bbtmp, b); h2_conn_io_pass(&session->io, session->bbtmp); apr_brigade_cleanup(session->bbtmp); @@ -1998,7 +1977,7 @@ static void on_stream_state_enter(void *ctx, h2_stream *stream) ev_stream_closed(session, stream); break; case H2_SS_CLEANUP: - h2_mplx_stream_discard(session->mplx, stream->id); + h2_mplx_stream_cleanup(session->mplx, stream); break; default: break; diff --git a/modules/http2/h2_session.h b/modules/http2/h2_session.h index ae2c26b769..cd08fc2429 100644 --- a/modules/http2/h2_session.h +++ b/modules/http2/h2_session.h @@ -51,6 +51,7 @@ struct h2_priority; struct h2_push; struct h2_push_diary; struct h2_session; +struct h2_stream; struct h2_stream_monitor; struct h2_task; struct h2_workers; @@ -191,17 +192,16 @@ int h2_session_push_enabled(h2_session *session); * processing.. * * @param session the session to work in - * @param initiating_stream_id id of the stream initiating this push + * @param is the stream initiating the push * @param push the push to promise + * @return the new promised stream or NULL */ -apr_status_t h2_session_push(h2_session *session, - int initiating_stream_id, struct h2_push *push); +struct h2_stream *h2_session_push(h2_session *session, + struct h2_stream *is, struct h2_push *push); -/** - * Notifies the session that the EOS for a stream has been sent. - * See h2_bucket_eos for usage. - */ -void h2_session_eos_sent(h2_session *session, int stream_id); +apr_status_t h2_session_set_prio(h2_session *session, + struct h2_stream *stream, + const struct h2_priority *prio); #define H2_SSSN_MSG(s, msg) \ "h2_session(%ld,%s,%d): "msg, s->id, h2_session_state_str(s->state), \ diff --git a/modules/http2/h2_stream.c b/modules/http2/h2_stream.c index 18537fff26..9b7d2c5655 100644 --- a/modules/http2/h2_stream.c +++ b/modules/http2/h2_stream.c @@ -88,39 +88,39 @@ const char *h2_stream_state_str(h2_stream *stream) /* state transisitions when certain frame types are sent */ static int trans_on_send[][H2_SS_MAX] = { -/*S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS, S_CLN, S_DSTR */ -{ S_ERR, S_ERR, S_ERR, S_NOP, S_NOP, S_ERR, S_NOP, S_NOP, S_NOP, },/* DATA */ -{ S_ERR, S_ERR, S_CL_R, S_NOP, S_NOP, S_ERR, S_NOP, S_NOP, S_NOP, },/* HEADERS */ -{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* PRIORITY */ -{ S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, S_NOP, },/* RST_STREAM */ -{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_NOP, },/* SETTINGS */ -{ S_RS_L,S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_NOP, },/* PUSH_PROMISE */ -{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_NOP, },/* PING */ -{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_NOP, },/* GOAWAY */ -{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* WINDOW_UPDATE */ -{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* CONT */ +/*S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS, S_CLN, */ +{ S_ERR, S_ERR, S_ERR, S_NOP, S_NOP, S_ERR, S_NOP, S_NOP, },/* DATA */ +{ S_ERR, S_ERR, S_CL_R, S_NOP, S_NOP, S_ERR, S_NOP, S_NOP, },/* HEADERS */ +{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* PRIORITY */ +{ S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, },/* RST_STREAM */ +{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* SETTINGS */ +{ S_RS_L,S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* PUSH_PROMISE */ +{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* PING */ +{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* GOAWAY */ +{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* WINDOW_UPDATE */ +{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* CONT */ }; /* state transisitions when certain frame types are received */ static int trans_on_recv[][H2_SS_MAX] = { -/*S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS, S_CLN, S_DSTR */ -{ S_ERR, S_ERR, S_ERR, S_NOP, S_ERR, S_NOP, S_NOP, S_NOP, S_NOP, },/* DATA */ -{ S_OPEN,S_CL_L, S_ERR, S_NOP, S_ERR, S_NOP, S_NOP, S_NOP, S_NOP, },/* HEADERS */ -{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* PRIORITY */ -{ S_ERR, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, S_NOP, },/* RST_STREAM */ -{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_NOP, },/* SETTINGS */ -{ S_RS_R,S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_NOP, },/* PUSH_PROMISE */ -{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_NOP, },/* PING */ -{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_NOP, },/* GOAWAY */ -{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* WINDOW_UPDATE */ -{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* CONT */ +/*S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS, S_CLN, */ +{ S_ERR, S_ERR, S_ERR, S_NOP, S_ERR, S_NOP, S_NOP, S_NOP, },/* DATA */ +{ S_OPEN,S_CL_L, S_ERR, S_NOP, S_ERR, S_NOP, S_NOP, S_NOP, },/* HEADERS */ +{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* PRIORITY */ +{ S_ERR, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, },/* RST_STREAM */ +{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* SETTINGS */ +{ S_RS_R,S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* PUSH_PROMISE */ +{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* PING */ +{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* GOAWAY */ +{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* WINDOW_UPDATE */ +{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* CONT */ }; /* state transisitions when certain events happen */ static int trans_on_event[][H2_SS_MAX] = { -/*S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS, S_CLN, S_DSTR */ -{ S_XXX, S_ERR, S_ERR, S_CL_L, S_CLS, S_XXX, S_XXX, S_XXX, S_NOP, },/* EV_CLOSED_L*/ -{ S_ERR, S_ERR, S_ERR, S_CL_R, S_ERR, S_CLS, S_NOP, S_NOP, S_NOP, },/* EV_CLOSED_R*/ -{ S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, S_NOP, },/* EV_CANCELLED*/ -{ S_NOP, S_XXX, S_XXX, S_XXX, S_XXX, S_CLS, S_CLN, S_XXX, S_NOP, },/* EV_EOS_SENT*/ +/*S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS, S_CLN, */ +{ S_XXX, S_ERR, S_ERR, S_CL_L, S_CLS, S_XXX, S_XXX, S_XXX, },/* EV_CLOSED_L*/ +{ S_ERR, S_ERR, S_ERR, S_CL_R, S_ERR, S_CLS, S_NOP, S_NOP, },/* EV_CLOSED_R*/ +{ S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, },/* EV_CANCELLED*/ +{ S_NOP, S_XXX, S_XXX, S_XXX, S_XXX, S_CLS, S_CLN, S_XXX, },/* EV_EOS_SENT*/ }; static int on_map(h2_stream_state_t state, int map[H2_SS_MAX]) @@ -189,7 +189,9 @@ static apr_status_t setup_input(h2_stream *stream) { || APR_BRIGADE_EMPTY(stream->in_buffer))); if (!empty) { h2_beam_create(&stream->input, stream->pool, stream->id, - "input", 0, stream->session->s->timeout); + "input", H2_BEAM_OWNER_SEND, 0, + stream->session->s->timeout); + h2_beam_send_from(stream->input, stream->pool); } } return APR_SUCCESS; @@ -590,6 +592,7 @@ apr_status_t h2_stream_prep_processing(h2_stream *stream) H2_STRM_MSG(stream, "schedule %s %s://%s%s chunked=%d"), r->method, r->scheme, r->authority, r->path, r->chunked); setup_input(stream); + stream->scheduled = 1; return APR_SUCCESS; } return APR_EINVAL; @@ -634,7 +637,7 @@ apr_status_t h2_stream_set_request_rec(h2_stream *stream, return status; } -void h2_stream_request_set(h2_stream *stream, const h2_request *r) +void h2_stream_set_request(h2_stream *stream, const h2_request *r) { ap_assert(stream->request == NULL); ap_assert(stream->rtmp == NULL); @@ -950,7 +953,7 @@ apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb, apr_status_t h2_stream_submit_pushes(h2_stream *stream, h2_headers *response) { - apr_status_t rv = APR_SUCCESS; + apr_status_t status = APR_SUCCESS; apr_array_header_t *pushes; int i; @@ -959,12 +962,16 @@ apr_status_t h2_stream_submit_pushes(h2_stream *stream, h2_headers *response) ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c, H2_STRM_MSG(stream, "found %d push candidates"), pushes->nelts); - for (i = 0; i < pushes->nelts && (APR_SUCCESS == rv); ++i) { + for (i = 0; i < pushes->nelts; ++i) { h2_push *push = APR_ARRAY_IDX(pushes, i, h2_push*); - rv = h2_session_push(stream->session, stream->id, push); + h2_stream *s = h2_session_push(stream->session, stream, push); + if (!s) { + status = APR_ECONNRESET; + break; + } } } - return rv; + return status; } apr_table_t *h2_stream_get_trailers(h2_stream *stream) diff --git a/modules/http2/h2_stream.h b/modules/http2/h2_stream.h index 8d6146fa4c..7ecc0ad6bc 100644 --- a/modules/http2/h2_stream.h +++ b/modules/http2/h2_stream.h @@ -167,14 +167,13 @@ void h2_stream_cleanup(h2_stream *stream); apr_status_t h2_stream_in_consumed(h2_stream *stream, apr_off_t amount); /** - * Set complete stream headers from given h2_request, creates a deep copy. - * Only to be called once to initialize. + * Set complete stream headers from given h2_request. * * @param stream stream to write request to * @param r the request with all the meta data * @param eos != 0 iff stream input is closed */ -void h2_stream_request_set(h2_stream *stream, const h2_request *r); +void h2_stream_set_request(h2_stream *stream, const h2_request *r); /** * Set complete stream header from given request_rec. diff --git a/modules/http2/h2_task.c b/modules/http2/h2_task.c index 949c0b97ef..8d3dc6fde8 100644 --- a/modules/http2/h2_task.c +++ b/modules/http2/h2_task.c @@ -44,6 +44,7 @@ #include "h2_request.h" #include "h2_headers.h" #include "h2_session.h" +#include "h2_stream.h" #include "h2_task.h" #include "h2_util.h" @@ -492,14 +493,6 @@ static int h2_task_pre_conn(conn_rec* c, void *arg) return OK; } -static apr_status_t task_pool_cleanup(void *data) -{ - h2_task *task = data; - - ap_assert(task->destroyed); - return APR_SUCCESS; -} - h2_task *h2_task_create(conn_rec *slave, int stream_id, const h2_request *req, h2_mplx *m, h2_bucket_beam *input, @@ -528,15 +521,13 @@ h2_task *h2_task_create(conn_rec *slave, int stream_id, task->input.beam = input; task->output.max_buffer = output_max_mem; - apr_pool_cleanup_register(pool, task, task_pool_cleanup, apr_pool_cleanup_null); - return task; } void h2_task_destroy(h2_task *task) { - task->destroyed = 1; if (task->output.beam) { + h2_beam_log(task->output.beam, task->c, APLOG_TRACE2, "task_destroy"); h2_beam_destroy(task->output.beam); task->output.beam = NULL; } @@ -593,12 +584,13 @@ apr_status_t h2_task_do(h2_task *task, apr_thread_t *thread, int worker_id) } h2_beam_create(&task->output.beam, c->pool, task->stream_id, "output", - 0, task->timeout); + H2_BEAM_OWNER_SEND, 0, task->timeout); if (!task->output.beam) { return APR_ENOMEM; } h2_beam_buffer_size_set(task->output.beam, task->output.max_buffer); + h2_beam_send_from(task->output.beam, task->pool); h2_ctx_create_for(c, task); apr_table_setn(c->notes, H2_TASK_ID_NOTE, task->id); @@ -716,4 +708,3 @@ static int h2_task_process_conn(conn_rec* c) return DECLINED; } - diff --git a/modules/http2/h2_task.h b/modules/http2/h2_task.h index 80c6640804..4121d0fd69 100644 --- a/modules/http2/h2_task.h +++ b/modules/http2/h2_task.h @@ -87,11 +87,10 @@ struct h2_task { apr_time_t started_at; /* when processing started */ apr_time_t done_at; /* when processing was done */ apr_bucket *eor; - int destroyed; }; h2_task *h2_task_create(conn_rec *slave, int stream_id, - const struct h2_request *req, struct h2_mplx *m, + const h2_request *req, struct h2_mplx *m, struct h2_bucket_beam *input, apr_interval_time_t timeout, apr_size_t output_max_mem); diff --git a/modules/http2/h2_version.h b/modules/http2/h2_version.h index 498ad13950..286e98f1ce 100644 --- a/modules/http2/h2_version.h +++ b/modules/http2/h2_version.h @@ -27,7 +27,7 @@ * @macro * Version number of the http2 module as c string */ -#define MOD_HTTP2_VERSION "1.15.0-git" +#define MOD_HTTP2_VERSION "1.14.1-git" /** * @macro @@ -35,7 +35,7 @@ * release. This is a 24 bit number with 8 bits for major number, 8 bits * for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203. */ -#define MOD_HTTP2_VERSION_NUM 0x010f00 +#define MOD_HTTP2_VERSION_NUM 0x010e01 #endif /* mod_h2_h2_version_h */ diff --git a/modules/http2/h2_workers.c b/modules/http2/h2_workers.c index 5ee934abf0..699f533f80 100644 --- a/modules/http2/h2_workers.c +++ b/modules/http2/h2_workers.c @@ -83,7 +83,7 @@ static apr_status_t activate_slot(h2_workers *workers, h2_slot *slot) if (!slot->lock) { status = apr_thread_mutex_create(&slot->lock, - APR_THREAD_MUTEX_NESTED, + APR_THREAD_MUTEX_DEFAULT, workers->pool); if (status != APR_SUCCESS) { push_slot(&workers->free, slot); @@ -336,7 +336,7 @@ h2_workers *h2_workers_create(server_rec *s, apr_pool_t *server_pool, } status = apr_thread_mutex_create(&workers->lock, - APR_THREAD_MUTEX_NESTED, + APR_THREAD_MUTEX_DEFAULT, workers->pool); if (status == APR_SUCCESS) { n = workers->nslots = workers->max_workers; diff --git a/modules/http2/mod_proxy_http2.c b/modules/http2/mod_proxy_http2.c index 83ae431c87..2208707990 100644 --- a/modules/http2/mod_proxy_http2.c +++ b/modules/http2/mod_proxy_http2.c @@ -403,14 +403,6 @@ run_connect: */ apr_table_setn(ctx->p_conn->connection->notes, "proxy-request-alpn-protos", "h2"); - if (ctx->p_conn->ssl_hostname) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, ctx->owner, - "set SNI to %s for (%s)", - ctx->p_conn->ssl_hostname, - ctx->p_conn->hostname); - apr_table_setn(ctx->p_conn->connection->notes, - "proxy-request-hostname", ctx->p_conn->ssl_hostname); - } } if (ctx->master->aborted) goto cleanup; |