diff options
Diffstat (limited to 'modules/http2')
-rw-r--r-- | modules/http2/h2_c2.c | 108 | ||||
-rw-r--r-- | modules/http2/h2_c2.h | 9 | ||||
-rw-r--r-- | modules/http2/h2_conn_ctx.c | 39 | ||||
-rw-r--r-- | modules/http2/h2_conn_ctx.h | 16 | ||||
-rw-r--r-- | modules/http2/h2_mplx.c | 241 | ||||
-rw-r--r-- | modules/http2/h2_mplx.h | 15 | ||||
-rw-r--r-- | modules/http2/h2_stream.c | 7 | ||||
-rw-r--r-- | modules/http2/h2_workers.c | 66 | ||||
-rw-r--r-- | modules/http2/h2_workers.h | 14 |
9 files changed, 218 insertions, 297 deletions
diff --git a/modules/http2/h2_c2.c b/modules/http2/h2_c2.c index 00a783d28e..9dd90f2b05 100644 --- a/modules/http2/h2_c2.c +++ b/modules/http2/h2_c2.c @@ -131,12 +131,6 @@ int h2_mpm_supported(void) return mpm_supported; } -static module *h2_conn_mpm_module(void) -{ - check_modules(0); - return mpm_module; -} - apr_status_t h2_c2_child_init(apr_pool_t *pool, server_rec *s) { check_modules(1); @@ -144,88 +138,6 @@ apr_status_t h2_c2_child_init(apr_pool_t *pool, server_rec *s) APR_PROTO_TCP, pool); } -/* APR callback invoked if allocation fails. */ -static int abort_on_oom(int retcode) -{ - ap_abort_on_oom(); - return retcode; /* unreachable, hopefully. */ -} - -conn_rec *h2_c2_create(conn_rec *c1, apr_pool_t *parent) -{ - apr_allocator_t *allocator; - apr_status_t status; - apr_pool_t *pool; - conn_rec *c2; - void *cfg; - module *mpm; - - ap_assert(c1); - ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, c1, - "h2_c2: create for c1(%ld)", c1->id); - - /* We create a pool with its own allocator to be used for - * processing a request. This is the only way to have the processing - * independent of its parent pool in the sense that it can work in - * another thread. - */ - apr_allocator_create(&allocator); - apr_allocator_max_free_set(allocator, ap_max_mem_free); - status = apr_pool_create_ex(&pool, parent, NULL, allocator); - if (status != APR_SUCCESS) { - ap_log_cerror(APLOG_MARK, APLOG_ERR, status, c1, - APLOGNO(10004) "h2_c2: create pool"); - return NULL; - } - apr_allocator_owner_set(allocator, pool); - apr_pool_abort_set(abort_on_oom, pool); - apr_pool_tag(pool, "h2_c2_conn"); - - c2 = (conn_rec *) apr_palloc(pool, sizeof(conn_rec)); - memcpy(c2, c1, sizeof(conn_rec)); - - c2->master = c1; - c2->pool = pool; - c2->conn_config = ap_create_conn_config(pool); - c2->notes = apr_table_make(pool, 5); - c2->input_filters = NULL; - c2->output_filters = NULL; - c2->keepalives = 0; -#if AP_MODULE_MAGIC_AT_LEAST(20180903, 1) - c2->filter_conn_ctx = NULL; -#endif - c2->bucket_alloc = apr_bucket_alloc_create(pool); -#if !AP_MODULE_MAGIC_AT_LEAST(20180720, 1) - c2->data_in_input_filters = 0; - c2->data_in_output_filters = 0; -#endif - /* prevent mpm_event from making wrong assumptions about this connection, - * like e.g. using its socket for an async read check. */ - c2->clogging_input_filters = 1; - c2->log = NULL; - c2->aborted = 0; - /* We cannot install the master connection socket on the secondary, as - * modules mess with timeouts/blocking of the socket, with - * unwanted side effects to the master connection processing. - * Fortunately, since we never use the secondary socket, we can just install - * a single, process-wide dummy and everyone is happy. - */ - ap_set_module_config(c2->conn_config, &core_module, dummy_socket); - /* TODO: these should be unique to this thread */ - c2->sbh = NULL; /*c1->sbh;*/ - /* TODO: not all mpm modules have learned about secondary connections yet. - * copy their config from master to secondary. - */ - if ((mpm = h2_conn_mpm_module()) != NULL) { - cfg = ap_get_module_config(c1->conn_config, mpm); - ap_set_module_config(c2->conn_config, mpm, cfg); - } - - ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, c2, - "h2_c2(%s): created", c2->log_id); - return c2; -} - void h2_c2_destroy(conn_rec *c2) { ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, c2, @@ -233,6 +145,21 @@ void h2_c2_destroy(conn_rec *c2) apr_pool_destroy(c2->pool); } +void h2_c2_abort(conn_rec *c2, conn_rec *from) +{ + h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c2); + + AP_DEBUG_ASSERT(conn_ctx); + AP_DEBUG_ASSERT(conn_ctx->stream_id); + if (conn_ctx->beam_in) { + h2_beam_abort(conn_ctx->beam_in, from); + } + if (conn_ctx->beam_out) { + h2_beam_abort(conn_ctx->beam_out, from); + } + c2->aborted = 1; +} + typedef struct { apr_bucket_brigade *bb; /* c2: data in holding area */ } h2_c2_fctx_in_t; @@ -458,10 +385,7 @@ static apr_status_t h2_c2_filter_out(ap_filter_t* f, apr_bucket_brigade* bb) "h2_c2(%s-%d): output leave", conn_ctx->id, conn_ctx->stream_id); if (APR_SUCCESS != rv) { - if (!conn_ctx->done) { - h2_beam_abort(conn_ctx->beam_out, f->c); - } - f->c->aborted = 1; + h2_c2_abort(f->c, f->c); } return rv; } diff --git a/modules/http2/h2_c2.h b/modules/http2/h2_c2.h index 58964a2ac4..05454459fb 100644 --- a/modules/http2/h2_c2.h +++ b/modules/http2/h2_c2.h @@ -41,10 +41,17 @@ int h2_mpm_supported(void); */ apr_status_t h2_c2_child_init(apr_pool_t *pool, server_rec *s); -conn_rec *h2_c2_create(conn_rec *c1, apr_pool_t *parent); void h2_c2_destroy(conn_rec *c2); /** + * Abort the I/O processing of a secondary connection. And + * in-/output beams will return errors and c2->aborted is set. + * @param c2 the secondary connection to abort + * @param from the connection this is invoked from + */ +void h2_c2_abort(conn_rec *c2, conn_rec *from); + +/** * Process a secondary connection for a HTTP/2 stream request. */ apr_status_t h2_c2_process(conn_rec *c, apr_thread_t *thread, int worker_id); diff --git a/modules/http2/h2_conn_ctx.c b/modules/http2/h2_conn_ctx.c index ce5e9c1234..c5596e9b04 100644 --- a/modules/http2/h2_conn_ctx.c +++ b/modules/http2/h2_conn_ctx.c @@ -16,6 +16,7 @@ #include <assert.h> #include <apr_strings.h> +#include <apr_atomic.h> #include <httpd.h> #include <http_core.h> @@ -42,6 +43,7 @@ static h2_conn_ctx_t *ctx_create(conn_rec *c, const char *id) h2_conn_ctx_t *conn_ctx = apr_pcalloc(c->pool, sizeof(*conn_ctx)); conn_ctx->id = id; conn_ctx->server = c->base_server; + apr_atomic_set32(&conn_ctx->started, 1); conn_ctx->started_at = apr_time_now(); ap_set_module_config(c->conn_config, &http2_module, conn_ctx); @@ -66,7 +68,8 @@ h2_conn_ctx_t *h2_conn_ctx_create_for_c1(conn_rec *c1, server_rec *s, const char } apr_status_t h2_conn_ctx_init_for_c2(h2_conn_ctx_t **pctx, conn_rec *c2, - struct h2_mplx *mplx, struct h2_stream *stream) + struct h2_mplx *mplx, struct h2_stream *stream, + struct h2_c2_transit *transit) { h2_conn_ctx_t *conn_ctx; apr_status_t rv = APR_SUCCESS; @@ -85,10 +88,12 @@ apr_status_t h2_conn_ctx_init_for_c2(h2_conn_ctx_t **pctx, conn_rec *c2, } conn_ctx->mplx = mplx; + conn_ctx->transit = transit; conn_ctx->stream_id = stream->id; apr_pool_create(&conn_ctx->req_pool, c2->pool); apr_pool_tag(conn_ctx->req_pool, "H2_C2_REQ"); conn_ctx->request = stream->request; + apr_atomic_set32(&conn_ctx->started, 1); conn_ctx->started_at = apr_time_now(); conn_ctx->done = 0; conn_ctx->done_at = 0; @@ -97,38 +102,6 @@ apr_status_t h2_conn_ctx_init_for_c2(h2_conn_ctx_t **pctx, conn_rec *c2, return rv; } -void h2_conn_ctx_clear_for_c2(conn_rec *c2) -{ - h2_conn_ctx_t *conn_ctx; - - ap_assert(c2->master); - conn_ctx = h2_conn_ctx_get(c2); - conn_ctx->stream_id = -1; - conn_ctx->request = NULL; - - if (conn_ctx->req_pool) { - apr_pool_destroy(conn_ctx->req_pool); - conn_ctx->req_pool = NULL; - conn_ctx->beam_out = NULL; - } - memset(&conn_ctx->pfd_in_drain, 0, sizeof(conn_ctx->pfd_in_drain)); - memset(&conn_ctx->pfd_out_prod, 0, sizeof(conn_ctx->pfd_out_prod)); - conn_ctx->beam_in = NULL; -} - -void h2_conn_ctx_destroy(conn_rec *c) -{ - h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c); - - if (conn_ctx) { - if (conn_ctx->mplx_pool) { - apr_pool_destroy(conn_ctx->mplx_pool); - conn_ctx->mplx_pool = NULL; - } - ap_set_module_config(c->conn_config, &http2_module, NULL); - } -} - void h2_conn_ctx_set_timeout(h2_conn_ctx_t *conn_ctx, apr_interval_time_t timeout) { if (conn_ctx->beam_out) { diff --git a/modules/http2/h2_conn_ctx.h b/modules/http2/h2_conn_ctx.h index b744e8dd29..110301620c 100644 --- a/modules/http2/h2_conn_ctx.h +++ b/modules/http2/h2_conn_ctx.h @@ -22,6 +22,7 @@ struct h2_stream; struct h2_mplx; struct h2_bucket_beam; struct h2_response_parser; +struct h2_c2_transit; #define H2_PIPE_OUT 0 #define H2_PIPE_IN 1 @@ -40,6 +41,7 @@ struct h2_conn_ctx_t { const char *protocol; /* c1: the protocol negotiated */ struct h2_session *session; /* c1: the h2 session established */ struct h2_mplx *mplx; /* c2: the multiplexer */ + struct h2_c2_transit *transit; /* c2: transit pool and bucket_alloc */ int pre_conn_done; /* has pre_connection setup run? */ int stream_id; /* c1: 0, c2: stream id processed */ @@ -48,20 +50,17 @@ struct h2_conn_ctx_t { struct h2_bucket_beam *beam_out; /* c2: data out, created from req_pool */ struct h2_bucket_beam *beam_in; /* c2: data in or NULL, borrowed from request stream */ - apr_pool_t *mplx_pool; /* c2: an mplx child pool for safe use inside mplx lock */ apr_file_t *pipe_in_prod[2]; /* c2: input produced notification pipe */ - apr_file_t *pipe_in_drain[2]; /* c2: input drained notification pipe */ apr_file_t *pipe_out_prod[2]; /* c2: output produced notification pipe */ - apr_pollfd_t pfd_in_drain; /* c2: poll pipe_in_drain output */ apr_pollfd_t pfd_out_prod; /* c2: poll pipe_out_prod output */ int has_final_response; /* final HTTP response passed on out */ apr_status_t last_err; /* APR_SUCCES or last error encountered in filters */ - struct h2_response_parser *parser; /* optional parser to catch H1 responses */ - volatile int done; /* c2: processing has finished */ + /* atomic */ apr_uint32_t started; /* c2: processing was started */ apr_time_t started_at; /* c2: when processing started */ + /* atomic */ apr_uint32_t done; /* c2: processing has finished */ apr_time_t done_at; /* c2: when processing was done */ }; typedef struct h2_conn_ctx_t h2_conn_ctx_t; @@ -84,14 +83,11 @@ typedef struct h2_conn_ctx_t h2_conn_ctx_t; h2_conn_ctx_t *h2_conn_ctx_create_for_c1(conn_rec *c, server_rec *s, const char *protocol); apr_status_t h2_conn_ctx_init_for_c2(h2_conn_ctx_t **pctx, conn_rec *c, - struct h2_mplx *mplx, struct h2_stream *stream); - -void h2_conn_ctx_clear_for_c2(conn_rec *c2); + struct h2_mplx *mplx, struct h2_stream *stream, + struct h2_c2_transit *transit); void h2_conn_ctx_detach(conn_rec *c); -void h2_conn_ctx_destroy(conn_rec *c); - void h2_conn_ctx_set_timeout(h2_conn_ctx_t *conn_ctx, apr_interval_time_t timeout); #endif /* defined(__mod_h2__h2_conn_ctx__) */ diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index c083f13c12..c1da860b67 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -26,6 +26,7 @@ #include <httpd.h> #include <http_core.h> +#include <http_connection.h> #include <http_log.h> #include <mpm_common.h> @@ -69,6 +70,13 @@ static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout, static apr_pool_t *pchild; +/* APR callback invoked if allocation fails. */ +static int abort_on_oom(int retcode) +{ + ap_abort_on_oom(); + return retcode; /* unreachable, hopefully. */ +} + apr_status_t h2_mplx_c1_child_init(apr_pool_t *pool, server_rec *s) { pchild = pool; @@ -100,7 +108,8 @@ static void c1_input_consumed(void *ctx, h2_bucket_beam *beam, apr_off_t length) static int stream_is_running(h2_stream *stream) { h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(stream->c2); - return conn_ctx && conn_ctx->started_at != 0 && !conn_ctx->done; + return conn_ctx && apr_atomic_read32(&conn_ctx->started) != 0 + && apr_atomic_read32(&conn_ctx->done) == 0; } int h2_mplx_c1_stream_is_running(h2_mplx *m, h2_stream *stream) @@ -153,13 +162,7 @@ static void m_stream_cleanup(h2_mplx *m, h2_stream *stream) ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1, H2_STRM_MSG(stream, "cleanup, c2 is running, abort")); /* c2 is still running */ - stream->c2->aborted = 1; - if (stream->input) { - h2_beam_abort(stream->input, m->c1); - } - if (stream->output) { - h2_beam_abort(stream->output, m->c1); - } + h2_c2_abort(stream->c2, m->c1); ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1, H2_STRM_MSG(stream, "cleanup, c2 is done, move to shold")); h2_ihash_add(m->shold, stream); @@ -173,6 +176,66 @@ static void m_stream_cleanup(h2_mplx *m, h2_stream *stream) } } +static h2_c2_transit *c2_transit_create(h2_mplx *m) +{ + apr_allocator_t *allocator; + apr_pool_t *ptrans; + h2_c2_transit *transit; + apr_status_t rv; + + /* We create a pool with its own allocator to be used for + * processing a request. This is the only way to have the processing + * independent of its parent pool in the sense that it can work in + * another thread. + */ + + rv = apr_allocator_create(&allocator); + if (rv == APR_SUCCESS) { + apr_allocator_max_free_set(allocator, ap_max_mem_free); + rv = apr_pool_create_ex(&ptrans, m->pool, NULL, allocator); + } + if (rv != APR_SUCCESS) { + /* maybe the log goes through, maybe not. */ + ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, m->c1, + APLOGNO(10004) "h2_mplx: create transit pool"); + ap_abort_on_oom(); + return NULL; /* should never be reached. */ + } + + apr_allocator_owner_set(allocator, ptrans); + apr_pool_abort_set(abort_on_oom, ptrans); + apr_pool_tag(ptrans, "h2_c2_transit"); + + transit = apr_pcalloc(ptrans, sizeof(*transit)); + transit->pool = ptrans; + transit->bucket_alloc = apr_bucket_alloc_create(ptrans); + return transit; +} + +static void c2_transit_destroy(h2_c2_transit *transit) +{ + apr_pool_destroy(transit->pool); +} + +static h2_c2_transit *c2_transit_get(h2_mplx *m) +{ + h2_c2_transit **ptransit = apr_array_pop(m->c2_transits); + if (ptransit) { + return *ptransit; + } + return c2_transit_create(m); +} + +static void c2_transit_recycle(h2_mplx *m, h2_c2_transit *transit) +{ + if (m->c2_transits->nelts >= m->max_spare_transits) { + c2_transit_destroy(transit); + } + else { + APR_ARRAY_PUSH(m->c2_transits, h2_c2_transit*) = transit; + } +} + /** * A h2_mplx needs to be thread-safe *and* if will be called by * the h2_session thread *and* the h2_worker threads. Therefore: @@ -254,11 +317,11 @@ h2_mplx *h2_mplx_c1_create(h2_stream *stream0, server_rec *s, apr_pool_t *parent m->streams_ev_in = apr_array_make(m->pool, 10, sizeof(h2_stream*)); m->streams_ev_out = apr_array_make(m->pool, 10, sizeof(h2_stream*)); -#if !H2_POLL_STREAMS + m->streams_input_read = h2_iq_create(m->pool, 10); status = apr_thread_mutex_create(&m->poll_lock, APR_THREAD_MUTEX_DEFAULT, m->pool); if (APR_SUCCESS != status) goto failure; - m->streams_input_read = h2_iq_create(m->pool, 10); +#if !H2_POLL_STREAMS m->streams_output_written = h2_iq_create(m->pool, 10); #endif @@ -266,6 +329,8 @@ h2_mplx *h2_mplx_c1_create(h2_stream *stream0, server_rec *s, apr_pool_t *parent mplx_pollset_add(m, conn_ctx); m->scratch_r = apr_pcalloc(m->pool, sizeof(*m->scratch_r)); + m->max_spare_transits = 3; + m->c2_transits = apr_array_make(m->pool, m->max_spare_transits, sizeof(h2_c2_transit*)); return m; @@ -331,8 +396,9 @@ static int m_report_stream_iter(void *ctx, void *val) { H2_STRM_MSG(stream, "->03198: %s %s %s" "[started=%d/done=%d]"), conn_ctx->request->method, conn_ctx->request->authority, - conn_ctx->request->path, conn_ctx->started_at != 0, - conn_ctx->done); + conn_ctx->request->path, + (int)apr_atomic_read32(&conn_ctx->started), + (int)apr_atomic_read32(&conn_ctx->done)); } else { ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c1, /* NO APLOGNO */ @@ -354,10 +420,6 @@ static int m_stream_cancel_iter(void *ctx, void *val) { h2_mplx *m = ctx; h2_stream *stream = val; - /* disable input consumed reporting */ - if (stream->input) { - h2_beam_abort(stream->input, m->c1); - } /* take over event monitoring */ h2_stream_set_monitor(stream, NULL); /* Reset, should transit to CLOSED state */ @@ -499,8 +561,11 @@ static void c1_purge_streams(h2_mplx *m) m->id, stream->id, c2_ctx->stream_id); } - h2_conn_ctx_destroy(c2); h2_c2_destroy(c2); + if (c2_ctx->transit) { + c2_transit_recycle(m, c2_ctx->transit); + c2_ctx->transit = NULL; + } } h2_stream_destroy(stream); } @@ -699,17 +764,10 @@ static void c2_beam_input_read_notify(void *ctx, h2_bucket_beam *beam) h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c); if (conn_ctx && conn_ctx->stream_id) { - if (conn_ctx->pipe_in_drain[H2_PIPE_IN]) { - apr_file_putc(1, conn_ctx->pipe_in_drain[H2_PIPE_IN]); - } -#if !H2_POLL_STREAMS - else { - apr_thread_mutex_lock(conn_ctx->mplx->poll_lock); - h2_iq_append(conn_ctx->mplx->streams_input_read, conn_ctx->stream_id); - apr_pollset_wakeup(conn_ctx->mplx->pollset); - apr_thread_mutex_unlock(conn_ctx->mplx->poll_lock); - } -#endif + apr_thread_mutex_lock(conn_ctx->mplx->poll_lock); + h2_iq_append(conn_ctx->mplx->streams_input_read, conn_ctx->stream_id); + apr_pollset_wakeup(conn_ctx->mplx->pollset); + apr_thread_mutex_unlock(conn_ctx->mplx->poll_lock); } } @@ -733,13 +791,13 @@ static void c2_beam_output_write_notify(void *ctx, h2_bucket_beam *beam) } } -static apr_status_t c2_setup_io(h2_mplx *m, conn_rec *c2, h2_stream *stream) +static apr_status_t c2_setup_io(h2_mplx *m, conn_rec *c2, h2_stream *stream, h2_c2_transit *transit) { h2_conn_ctx_t *conn_ctx; apr_status_t rv = APR_SUCCESS; const char *action = "init"; - rv = h2_conn_ctx_init_for_c2(&conn_ctx, c2, m, stream); + rv = h2_conn_ctx_init_for_c2(&conn_ctx, c2, m, stream, transit); if (APR_SUCCESS != rv) goto cleanup; if (!conn_ctx->beam_out) { @@ -758,22 +816,14 @@ static apr_status_t c2_setup_io(h2_mplx *m, conn_rec *c2, h2_stream *stream) h2_beam_on_received(stream->input, c2_beam_input_read_notify, c2); h2_beam_on_consumed(stream->input, c1_input_consumed, stream); } - else { - memset(&conn_ctx->pfd_in_drain, 0, sizeof(conn_ctx->pfd_in_drain)); - } #if H2_POLL_STREAMS - if (!conn_ctx->mplx_pool) { - apr_pool_create(&conn_ctx->mplx_pool, m->pool); - apr_pool_tag(conn_ctx->mplx_pool, "H2_MPLX_C2"); - } - if (!conn_ctx->pipe_out_prod[H2_PIPE_OUT]) { action = "create output pipe"; rv = apr_file_pipe_create_pools(&conn_ctx->pipe_out_prod[H2_PIPE_OUT], &conn_ctx->pipe_out_prod[H2_PIPE_IN], APR_FULL_NONBLOCK, - conn_ctx->mplx_pool, c2->pool); + c2->pool, c2->pool); if (APR_SUCCESS != rv) goto cleanup; } conn_ctx->pfd_out_prod.desc_type = APR_POLL_FILE; @@ -787,26 +837,13 @@ static apr_status_t c2_setup_io(h2_mplx *m, conn_rec *c2, h2_stream *stream) rv = apr_file_pipe_create_pools(&conn_ctx->pipe_in_prod[H2_PIPE_OUT], &conn_ctx->pipe_in_prod[H2_PIPE_IN], APR_READ_BLOCK, - c2->pool, conn_ctx->mplx_pool); - if (APR_SUCCESS != rv) goto cleanup; - } - if (!conn_ctx->pipe_in_drain[H2_PIPE_OUT]) { - action = "create input read pipe"; - rv = apr_file_pipe_create_pools(&conn_ctx->pipe_in_drain[H2_PIPE_OUT], - &conn_ctx->pipe_in_drain[H2_PIPE_IN], - APR_FULL_NONBLOCK, - c2->pool, conn_ctx->mplx_pool); + c2->pool, c2->pool); if (APR_SUCCESS != rv) goto cleanup; } - conn_ctx->pfd_in_drain.desc_type = APR_POLL_FILE; - conn_ctx->pfd_in_drain.desc.f = conn_ctx->pipe_in_drain[H2_PIPE_OUT]; - conn_ctx->pfd_in_drain.reqevents = APR_POLLIN | APR_POLLERR | APR_POLLHUP; - conn_ctx->pfd_in_drain.client_data = conn_ctx; } #else memset(&conn_ctx->pfd_out_prod, 0, sizeof(conn_ctx->pfd_out_prod)); memset(&conn_ctx->pipe_in_prod, 0, sizeof(conn_ctx->pipe_in_prod)); - memset(&conn_ctx->pipe_in_drain, 0, sizeof(conn_ctx->pipe_in_drain)); #endif cleanup: @@ -822,9 +859,10 @@ cleanup: static conn_rec *s_next_c2(h2_mplx *m) { h2_stream *stream = NULL; - apr_status_t rv; + apr_status_t rv = APR_SUCCESS; int sid; - conn_rec *c2; + conn_rec *c2 = NULL; + h2_c2_transit *transit = NULL; while (!m->aborted && !stream && (m->processing_count < m->processing_limit) && (sid = h2_iq_shift(m->q)) > 0) { @@ -838,27 +876,35 @@ static conn_rec *s_next_c2(h2_mplx *m) "Current limit is %d and %d workers are in use.", m->id, m->processing_limit, m->processing_count); } - return NULL; + goto cleanup; } if (sid > m->max_stream_id_started) { m->max_stream_id_started = sid; } - c2 = h2_c2_create(m->c1, m->pool); + transit = c2_transit_get(m); + c2 = ap_create_secondary_connection(transit->pool, m->c1, transit->bucket_alloc); + if (!c2) goto cleanup; ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c1, H2_STRM_MSG(stream, "created new c2")); - rv = c2_setup_io(m, c2, stream); - if (APR_SUCCESS != rv) { - return NULL; - } + rv = c2_setup_io(m, c2, stream, transit); + if (APR_SUCCESS != rv) goto cleanup; stream->c2 = c2; ++m->processing_count; APR_ARRAY_PUSH(m->streams_to_poll, h2_stream *) = stream; apr_pollset_wakeup(m->pollset); +cleanup: + if (APR_SUCCESS != rv && c2) { + h2_c2_destroy(c2); + c2 = NULL; + } + if (transit && !c2) { + c2_transit_recycle(m, transit); + } return c2; } @@ -896,8 +942,8 @@ static void s_c2_done(h2_mplx *m, conn_rec *c2, h2_conn_ctx_t *conn_ctx) ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c2, "h2_mplx(%s-%d): c2 done", conn_ctx->id, conn_ctx->stream_id); - ap_assert(conn_ctx->done == 0); - conn_ctx->done = 1; + AP_DEBUG_ASSERT(apr_atomic_read32(&conn_ctx->done) == 0); + apr_atomic_set32(&conn_ctx->done, 1); conn_ctx->done_at = apr_time_now(); ++c2->keepalives; /* From here on, the final handling of c2 is done by c1 processing. @@ -955,16 +1001,18 @@ static void s_c2_done(h2_mplx *m, conn_rec *c2, h2_conn_ctx_t *conn_ctx) void h2_mplx_worker_c2_done(conn_rec *c2, conn_rec **out_c2) { h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c2); - h2_mplx *m; + h2_mplx *m = conn_ctx? conn_ctx->mplx : NULL; - if (!conn_ctx || !conn_ctx->mplx) return; - m = conn_ctx->mplx; + if (!m) { + if (out_c2) *out_c2 = NULL; + return; + } H2_MPLX_ENTER_ALWAYS(m); --m->processing_count; s_c2_done(m, c2, conn_ctx); - + if (m->join_wait) { apr_thread_cond_signal(m->join_wait); } @@ -1084,52 +1132,19 @@ static apr_status_t mplx_pollset_create(h2_mplx *m) static apr_status_t mplx_pollset_add(h2_mplx *m, h2_conn_ctx_t *conn_ctx) { - apr_status_t rv = APR_SUCCESS; - const char *name = ""; - if (conn_ctx->pfd_out_prod.reqevents) { - name = "adding out"; - rv = apr_pollset_add(m->pollset, &conn_ctx->pfd_out_prod); - if (APR_SUCCESS != rv) goto cleanup; - } - - if (conn_ctx->pfd_in_drain.reqevents) { - name = "adding in_read"; - rv = apr_pollset_add(m->pollset, &conn_ctx->pfd_in_drain); - } - -cleanup: - if (APR_SUCCESS != rv) { - ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, m->c1, - "h2_mplx(%ld-%d): error while adding to pollset %s", - m->id, conn_ctx->stream_id, name); + return apr_pollset_add(m->pollset, &conn_ctx->pfd_out_prod); } - return rv; + return APR_SUCCESS; } static apr_status_t mplx_pollset_remove(h2_mplx *m, h2_conn_ctx_t *conn_ctx) { apr_status_t rv = APR_SUCCESS; - const char *name = ""; if (conn_ctx->pfd_out_prod.reqevents) { rv = apr_pollset_remove(m->pollset, &conn_ctx->pfd_out_prod); conn_ctx->pfd_out_prod.reqevents = 0; - if (APR_SUCCESS != rv) goto cleanup; - } - - if (conn_ctx->pfd_in_drain.reqevents) { - name = "in_read"; - rv = apr_pollset_remove(m->pollset, &conn_ctx->pfd_in_drain); - conn_ctx->pfd_in_drain.reqevents = 0; - if (APR_SUCCESS != rv) goto cleanup; - } - -cleanup: - if (APR_SUCCESS != rv) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, rv, m->c1, - "h2_mplx(%ld-%d): error removing from pollset %s", - m->id, conn_ctx->stream_id, name); } return rv; } @@ -1168,16 +1183,21 @@ static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout, apr_array_clear(m->streams_to_poll); } -#if !H2_POLL_STREAMS apr_thread_mutex_lock(m->poll_lock); - if (!h2_iq_empty(m->streams_input_read) - || !h2_iq_empty(m->streams_output_written)) { + if (!h2_iq_empty(m->streams_input_read)) { while ((i = h2_iq_shift(m->streams_input_read))) { stream = h2_ihash_get(m->streams, i); if (stream) { APR_ARRAY_PUSH(m->streams_ev_in, h2_stream*) = stream; } } + nresults = 0; + rv = APR_SUCCESS; + apr_thread_mutex_unlock(m->poll_lock); + break; + } +#if !H2_POLL_STREAMS + if (!h2_iq_empty(m->streams_output_written)) { while ((i = h2_iq_shift(m->streams_output_written))) { stream = h2_ihash_get(m->streams, i); if (stream) { @@ -1189,8 +1209,9 @@ static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout, apr_thread_mutex_unlock(m->poll_lock); break; } - apr_thread_mutex_unlock(m->poll_lock); #endif + apr_thread_mutex_unlock(m->poll_lock); + H2_MPLX_LEAVE(m); rv = apr_pollset_poll(m->pollset, timeout >= 0? timeout : -1, &nresults, &results); H2_MPLX_ENTER_ALWAYS(m); @@ -1276,14 +1297,6 @@ static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout, pfd->rtnevents); APR_ARRAY_PUSH(m->streams_ev_out, h2_stream*) = stream; } - else if (conn_ctx->pfd_in_drain.desc.f == pfd->desc.f) { - /* input has been consumed */ - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1, - "[%s-%d] poll input event %hx", - conn_ctx->id, conn_ctx->stream_id, - pfd->rtnevents); - APR_ARRAY_PUSH(m->streams_ev_in, h2_stream*) = stream; - } } if (on_stream_input && m->streams_ev_in->nelts) { diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h index d5c4f9916f..662682a05b 100644 --- a/modules/http2/h2_mplx.h +++ b/modules/http2/h2_mplx.h @@ -44,6 +44,13 @@ struct h2_iqueue; #include <apr_queue.h> +typedef struct h2_c2_transit h2_c2_transit; + +struct h2_c2_transit { + apr_pool_t *pool; + apr_bucket_alloc_t *bucket_alloc; +}; + typedef struct h2_mplx h2_mplx; struct h2_mplx { @@ -83,14 +90,16 @@ struct h2_mplx { apr_array_header_t *streams_ev_in; apr_array_header_t *streams_ev_out; -#if !H2_POLL_STREAMS - apr_thread_mutex_t *poll_lock; /* not the painter */ + apr_thread_mutex_t *poll_lock; /* protect modifications of queues below */ struct h2_iqueue *streams_input_read; /* streams whose input has been read from */ struct h2_iqueue *streams_output_written; /* streams whose output has been written to */ -#endif + struct h2_workers *workers; /* h2 workers process wide instance */ request_rec *scratch_r; /* pseudo request_rec for scoreboard reporting */ + + apr_size_t max_spare_transits; /* max number of transit pools idling */ + apr_array_header_t *c2_transits; /* base pools for running c2 connections */ }; apr_status_t h2_mplx_c1_child_init(apr_pool_t *pool, server_rec *s); diff --git a/modules/http2/h2_stream.c b/modules/http2/h2_stream.c index a79c7cdd0c..da8443fe7c 100644 --- a/modules/http2/h2_stream.c +++ b/modules/http2/h2_stream.c @@ -573,11 +573,8 @@ void h2_stream_destroy(h2_stream *stream) void h2_stream_rst(h2_stream *stream, int error_code) { stream->rst_error = error_code; - if (stream->input) { - h2_beam_abort(stream->input, stream->session->c1); - } - if (stream->output) { - h2_beam_abort(stream->output, stream->session->c1); + if (stream->c2) { + h2_c2_abort(stream->c2, stream->session->c1); } ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c1, H2_STRM_MSG(stream, "reset, error=%d"), error_code); diff --git a/modules/http2/h2_workers.c b/modules/http2/h2_workers.c index 1e7aebdca8..7299a2b0d6 100644 --- a/modules/http2/h2_workers.c +++ b/modules/http2/h2_workers.c @@ -40,7 +40,7 @@ struct h2_slot { apr_thread_t *thread; apr_thread_mutex_t *lock; apr_thread_cond_t *not_idle; - volatile apr_uint32_t timed_out; + /* atomic */ apr_uint32_t timed_out; }; static h2_slot *pop_slot(h2_slot *volatile *phead) @@ -99,7 +99,7 @@ static apr_status_t activate_slot(h2_workers *workers, h2_slot *slot) /* thread will either immediately start work or add itself * to the idle queue */ apr_atomic_inc32(&workers->worker_count); - slot->timed_out = 0; + apr_atomic_set32(&slot->timed_out, 0); rv = ap_thread_create(&slot->thread, workers->thread_attr, slot_run, slot, workers->pool); if (rv != APR_SUCCESS) { @@ -125,22 +125,22 @@ static apr_status_t add_worker(h2_workers *workers) static void wake_idle_worker(h2_workers *workers) { - h2_slot *slot = pop_slot(&workers->idle); - if (slot) { - int timed_out = 0; - apr_thread_mutex_lock(slot->lock); - timed_out = slot->timed_out; - if (!timed_out) { - apr_thread_cond_signal(slot->not_idle); + h2_slot *slot;; + for (;;) { + slot = pop_slot(&workers->idle); + if (!slot) { + if (workers->dynamic && apr_atomic_read32(&workers->shutdown) == 0) { + add_worker(workers); + } + return; } - apr_thread_mutex_unlock(slot->lock); - if (timed_out) { - slot_done(slot); - wake_idle_worker(workers); + if (!apr_atomic_read32(&slot->timed_out)) { + apr_thread_mutex_lock(slot->lock); + apr_thread_cond_signal(slot->not_idle); + apr_thread_mutex_unlock(slot->lock); + return; } - } - else if (workers->dynamic && !workers->shutdown) { - add_worker(workers); + slot_done(slot); } } @@ -190,9 +190,10 @@ static int get_next(h2_slot *slot) int non_essential = slot->id >= workers->min_workers; apr_status_t rv; - while (!workers->aborted && !slot->timed_out) { + while (apr_atomic_read32(&workers->aborted) == 0 + && apr_atomic_read32(&slot->timed_out) == 0) { ap_assert(slot->connection == NULL); - if (non_essential && workers->shutdown) { + if (non_essential && apr_atomic_read32(&workers->shutdown)) { /* Terminate non-essential worker on shutdown */ break; } @@ -208,14 +209,16 @@ static int get_next(h2_slot *slot) join_zombies(workers); apr_thread_mutex_lock(slot->lock); - if (!workers->aborted) { + if (apr_atomic_read32(&workers->aborted) == 0) { + apr_uint32_t idle_secs; push_slot(&workers->idle, slot); - if (non_essential && workers->max_idle_duration) { + if (non_essential + && (idle_secs = apr_atomic_read32(&workers->max_idle_secs))) { rv = apr_thread_cond_timedwait(slot->not_idle, slot->lock, - workers->max_idle_duration); + apr_time_from_sec(idle_secs)); if (APR_TIMEUP == rv) { - slot->timed_out = 1; + apr_atomic_set32(&slot->timed_out, 1); } } else { @@ -237,7 +240,8 @@ static void slot_done(h2_slot *slot) /* If this worker is the last one exiting and the MPM child is stopping, * unblock workers_pool_cleanup(). */ - if (!apr_atomic_dec32(&workers->worker_count) && workers->aborted) { + if (!apr_atomic_dec32(&workers->worker_count) + && apr_atomic_read32(&workers->aborted)) { apr_thread_mutex_lock(workers->lock); apr_thread_cond_signal(workers->all_done); apr_thread_mutex_unlock(workers->lock); @@ -254,7 +258,7 @@ static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx) do { ap_assert(slot->connection != NULL); h2_c2_process(slot->connection, thread, slot->id); - if (!slot->workers->aborted && + if (apr_atomic_read32(&slot->workers->aborted) == 0 && apr_atomic_read32(&slot->workers->worker_count) < slot->workers->max_workers) { h2_mplx_worker_c2_done(slot->connection, &slot->connection); } @@ -265,7 +269,7 @@ static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx) } while (slot->connection); } - if (!slot->timed_out) { + if (apr_atomic_read32(&slot->timed_out) == 0) { slot_done(slot); } @@ -294,8 +298,8 @@ static void workers_abort_idle(h2_workers *workers) { h2_slot *slot; - workers->shutdown = 1; - workers->aborted = 1; + apr_atomic_set32(&workers->shutdown, 1); + apr_atomic_set32(&workers->aborted, 1); h2_fifo_term(workers->mplxs); /* abort all idle slots */ @@ -379,12 +383,12 @@ h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pchild, workers->pool = pool; workers->min_workers = min_workers; workers->max_workers = max_workers; - workers->max_idle_duration = apr_time_from_sec((idle_secs > 0)? idle_secs : 10); + workers->max_idle_secs = (idle_secs > 0)? idle_secs : 10; ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s, "h2_workers: created with min=%d max=%d idle_timeout=%d sec", workers->min_workers, workers->max_workers, - (int)apr_time_sec(workers->max_idle_duration)); + (int)workers->max_idle_secs); /* FIXME: the fifo set we use here has limited capacity. Once the * set is full, connections with new requests do a wait. */ @@ -460,7 +464,7 @@ apr_status_t h2_workers_unregister(h2_workers *workers, struct h2_mplx *m) void h2_workers_graceful_shutdown(h2_workers *workers) { - workers->shutdown = 1; - workers->max_idle_duration = apr_time_from_sec(1); + apr_atomic_set32(&workers->shutdown, 1); + apr_atomic_set32(&workers->max_idle_secs, 1); wake_non_essential_workers(workers); } diff --git a/modules/http2/h2_workers.h b/modules/http2/h2_workers.h index c77cf1a6bb..0de3040676 100644 --- a/modules/http2/h2_workers.h +++ b/modules/http2/h2_workers.h @@ -38,19 +38,17 @@ struct h2_workers { int next_worker_id; apr_uint32_t max_workers; - volatile apr_uint32_t min_workers; /* is changed during graceful shutdown */ - volatile apr_interval_time_t max_idle_duration; /* is changed during graceful shutdown */ - - volatile int aborted; - volatile int shutdown; + apr_uint32_t min_workers; + /* atomic */ apr_uint32_t worker_count; + /* atomic */ apr_uint32_t max_idle_secs; + /* atomic */ apr_uint32_t aborted; + /* atomic */ apr_uint32_t shutdown; int dynamic; apr_threadattr_t *thread_attr; int nslots; struct h2_slot *slots; - - volatile apr_uint32_t worker_count; - + struct h2_slot *free; struct h2_slot *idle; struct h2_slot *zombies; |