diff options
Diffstat (limited to 'modules/http2/h2_mplx.c')
-rw-r--r-- | modules/http2/h2_mplx.c | 241 |
1 files changed, 127 insertions, 114 deletions
diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index c083f13c12..c1da860b67 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -26,6 +26,7 @@ #include <httpd.h> #include <http_core.h> +#include <http_connection.h> #include <http_log.h> #include <mpm_common.h> @@ -69,6 +70,13 @@ static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout, static apr_pool_t *pchild; +/* APR callback invoked if allocation fails. */ +static int abort_on_oom(int retcode) +{ + ap_abort_on_oom(); + return retcode; /* unreachable, hopefully. */ +} + apr_status_t h2_mplx_c1_child_init(apr_pool_t *pool, server_rec *s) { pchild = pool; @@ -100,7 +108,8 @@ static void c1_input_consumed(void *ctx, h2_bucket_beam *beam, apr_off_t length) static int stream_is_running(h2_stream *stream) { h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(stream->c2); - return conn_ctx && conn_ctx->started_at != 0 && !conn_ctx->done; + return conn_ctx && apr_atomic_read32(&conn_ctx->started) != 0 + && apr_atomic_read32(&conn_ctx->done) == 0; } int h2_mplx_c1_stream_is_running(h2_mplx *m, h2_stream *stream) @@ -153,13 +162,7 @@ static void m_stream_cleanup(h2_mplx *m, h2_stream *stream) ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1, H2_STRM_MSG(stream, "cleanup, c2 is running, abort")); /* c2 is still running */ - stream->c2->aborted = 1; - if (stream->input) { - h2_beam_abort(stream->input, m->c1); - } - if (stream->output) { - h2_beam_abort(stream->output, m->c1); - } + h2_c2_abort(stream->c2, m->c1); ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1, H2_STRM_MSG(stream, "cleanup, c2 is done, move to shold")); h2_ihash_add(m->shold, stream); @@ -173,6 +176,66 @@ static void m_stream_cleanup(h2_mplx *m, h2_stream *stream) } } +static h2_c2_transit *c2_transit_create(h2_mplx *m) +{ + apr_allocator_t *allocator; + apr_pool_t *ptrans; + h2_c2_transit *transit; + apr_status_t rv; + + /* We create a pool with its own allocator to be used for + * processing a request. This is the only way to have the processing + * independent of its parent pool in the sense that it can work in + * another thread. + */ + + rv = apr_allocator_create(&allocator); + if (rv == APR_SUCCESS) { + apr_allocator_max_free_set(allocator, ap_max_mem_free); + rv = apr_pool_create_ex(&ptrans, m->pool, NULL, allocator); + } + if (rv != APR_SUCCESS) { + /* maybe the log goes through, maybe not. */ + ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, m->c1, + APLOGNO(10004) "h2_mplx: create transit pool"); + ap_abort_on_oom(); + return NULL; /* should never be reached. */ + } + + apr_allocator_owner_set(allocator, ptrans); + apr_pool_abort_set(abort_on_oom, ptrans); + apr_pool_tag(ptrans, "h2_c2_transit"); + + transit = apr_pcalloc(ptrans, sizeof(*transit)); + transit->pool = ptrans; + transit->bucket_alloc = apr_bucket_alloc_create(ptrans); + return transit; +} + +static void c2_transit_destroy(h2_c2_transit *transit) +{ + apr_pool_destroy(transit->pool); +} + +static h2_c2_transit *c2_transit_get(h2_mplx *m) +{ + h2_c2_transit **ptransit = apr_array_pop(m->c2_transits); + if (ptransit) { + return *ptransit; + } + return c2_transit_create(m); +} + +static void c2_transit_recycle(h2_mplx *m, h2_c2_transit *transit) +{ + if (m->c2_transits->nelts >= m->max_spare_transits) { + c2_transit_destroy(transit); + } + else { + APR_ARRAY_PUSH(m->c2_transits, h2_c2_transit*) = transit; + } +} + /** * A h2_mplx needs to be thread-safe *and* if will be called by * the h2_session thread *and* the h2_worker threads. Therefore: @@ -254,11 +317,11 @@ h2_mplx *h2_mplx_c1_create(h2_stream *stream0, server_rec *s, apr_pool_t *parent m->streams_ev_in = apr_array_make(m->pool, 10, sizeof(h2_stream*)); m->streams_ev_out = apr_array_make(m->pool, 10, sizeof(h2_stream*)); -#if !H2_POLL_STREAMS + m->streams_input_read = h2_iq_create(m->pool, 10); status = apr_thread_mutex_create(&m->poll_lock, APR_THREAD_MUTEX_DEFAULT, m->pool); if (APR_SUCCESS != status) goto failure; - m->streams_input_read = h2_iq_create(m->pool, 10); +#if !H2_POLL_STREAMS m->streams_output_written = h2_iq_create(m->pool, 10); #endif @@ -266,6 +329,8 @@ h2_mplx *h2_mplx_c1_create(h2_stream *stream0, server_rec *s, apr_pool_t *parent mplx_pollset_add(m, conn_ctx); m->scratch_r = apr_pcalloc(m->pool, sizeof(*m->scratch_r)); + m->max_spare_transits = 3; + m->c2_transits = apr_array_make(m->pool, m->max_spare_transits, sizeof(h2_c2_transit*)); return m; @@ -331,8 +396,9 @@ static int m_report_stream_iter(void *ctx, void *val) { H2_STRM_MSG(stream, "->03198: %s %s %s" "[started=%d/done=%d]"), conn_ctx->request->method, conn_ctx->request->authority, - conn_ctx->request->path, conn_ctx->started_at != 0, - conn_ctx->done); + conn_ctx->request->path, + (int)apr_atomic_read32(&conn_ctx->started), + (int)apr_atomic_read32(&conn_ctx->done)); } else { ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c1, /* NO APLOGNO */ @@ -354,10 +420,6 @@ static int m_stream_cancel_iter(void *ctx, void *val) { h2_mplx *m = ctx; h2_stream *stream = val; - /* disable input consumed reporting */ - if (stream->input) { - h2_beam_abort(stream->input, m->c1); - } /* take over event monitoring */ h2_stream_set_monitor(stream, NULL); /* Reset, should transit to CLOSED state */ @@ -499,8 +561,11 @@ static void c1_purge_streams(h2_mplx *m) m->id, stream->id, c2_ctx->stream_id); } - h2_conn_ctx_destroy(c2); h2_c2_destroy(c2); + if (c2_ctx->transit) { + c2_transit_recycle(m, c2_ctx->transit); + c2_ctx->transit = NULL; + } } h2_stream_destroy(stream); } @@ -699,17 +764,10 @@ static void c2_beam_input_read_notify(void *ctx, h2_bucket_beam *beam) h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c); if (conn_ctx && conn_ctx->stream_id) { - if (conn_ctx->pipe_in_drain[H2_PIPE_IN]) { - apr_file_putc(1, conn_ctx->pipe_in_drain[H2_PIPE_IN]); - } -#if !H2_POLL_STREAMS - else { - apr_thread_mutex_lock(conn_ctx->mplx->poll_lock); - h2_iq_append(conn_ctx->mplx->streams_input_read, conn_ctx->stream_id); - apr_pollset_wakeup(conn_ctx->mplx->pollset); - apr_thread_mutex_unlock(conn_ctx->mplx->poll_lock); - } -#endif + apr_thread_mutex_lock(conn_ctx->mplx->poll_lock); + h2_iq_append(conn_ctx->mplx->streams_input_read, conn_ctx->stream_id); + apr_pollset_wakeup(conn_ctx->mplx->pollset); + apr_thread_mutex_unlock(conn_ctx->mplx->poll_lock); } } @@ -733,13 +791,13 @@ static void c2_beam_output_write_notify(void *ctx, h2_bucket_beam *beam) } } -static apr_status_t c2_setup_io(h2_mplx *m, conn_rec *c2, h2_stream *stream) +static apr_status_t c2_setup_io(h2_mplx *m, conn_rec *c2, h2_stream *stream, h2_c2_transit *transit) { h2_conn_ctx_t *conn_ctx; apr_status_t rv = APR_SUCCESS; const char *action = "init"; - rv = h2_conn_ctx_init_for_c2(&conn_ctx, c2, m, stream); + rv = h2_conn_ctx_init_for_c2(&conn_ctx, c2, m, stream, transit); if (APR_SUCCESS != rv) goto cleanup; if (!conn_ctx->beam_out) { @@ -758,22 +816,14 @@ static apr_status_t c2_setup_io(h2_mplx *m, conn_rec *c2, h2_stream *stream) h2_beam_on_received(stream->input, c2_beam_input_read_notify, c2); h2_beam_on_consumed(stream->input, c1_input_consumed, stream); } - else { - memset(&conn_ctx->pfd_in_drain, 0, sizeof(conn_ctx->pfd_in_drain)); - } #if H2_POLL_STREAMS - if (!conn_ctx->mplx_pool) { - apr_pool_create(&conn_ctx->mplx_pool, m->pool); - apr_pool_tag(conn_ctx->mplx_pool, "H2_MPLX_C2"); - } - if (!conn_ctx->pipe_out_prod[H2_PIPE_OUT]) { action = "create output pipe"; rv = apr_file_pipe_create_pools(&conn_ctx->pipe_out_prod[H2_PIPE_OUT], &conn_ctx->pipe_out_prod[H2_PIPE_IN], APR_FULL_NONBLOCK, - conn_ctx->mplx_pool, c2->pool); + c2->pool, c2->pool); if (APR_SUCCESS != rv) goto cleanup; } conn_ctx->pfd_out_prod.desc_type = APR_POLL_FILE; @@ -787,26 +837,13 @@ static apr_status_t c2_setup_io(h2_mplx *m, conn_rec *c2, h2_stream *stream) rv = apr_file_pipe_create_pools(&conn_ctx->pipe_in_prod[H2_PIPE_OUT], &conn_ctx->pipe_in_prod[H2_PIPE_IN], APR_READ_BLOCK, - c2->pool, conn_ctx->mplx_pool); - if (APR_SUCCESS != rv) goto cleanup; - } - if (!conn_ctx->pipe_in_drain[H2_PIPE_OUT]) { - action = "create input read pipe"; - rv = apr_file_pipe_create_pools(&conn_ctx->pipe_in_drain[H2_PIPE_OUT], - &conn_ctx->pipe_in_drain[H2_PIPE_IN], - APR_FULL_NONBLOCK, - c2->pool, conn_ctx->mplx_pool); + c2->pool, c2->pool); if (APR_SUCCESS != rv) goto cleanup; } - conn_ctx->pfd_in_drain.desc_type = APR_POLL_FILE; - conn_ctx->pfd_in_drain.desc.f = conn_ctx->pipe_in_drain[H2_PIPE_OUT]; - conn_ctx->pfd_in_drain.reqevents = APR_POLLIN | APR_POLLERR | APR_POLLHUP; - conn_ctx->pfd_in_drain.client_data = conn_ctx; } #else memset(&conn_ctx->pfd_out_prod, 0, sizeof(conn_ctx->pfd_out_prod)); memset(&conn_ctx->pipe_in_prod, 0, sizeof(conn_ctx->pipe_in_prod)); - memset(&conn_ctx->pipe_in_drain, 0, sizeof(conn_ctx->pipe_in_drain)); #endif cleanup: @@ -822,9 +859,10 @@ cleanup: static conn_rec *s_next_c2(h2_mplx *m) { h2_stream *stream = NULL; - apr_status_t rv; + apr_status_t rv = APR_SUCCESS; int sid; - conn_rec *c2; + conn_rec *c2 = NULL; + h2_c2_transit *transit = NULL; while (!m->aborted && !stream && (m->processing_count < m->processing_limit) && (sid = h2_iq_shift(m->q)) > 0) { @@ -838,27 +876,35 @@ static conn_rec *s_next_c2(h2_mplx *m) "Current limit is %d and %d workers are in use.", m->id, m->processing_limit, m->processing_count); } - return NULL; + goto cleanup; } if (sid > m->max_stream_id_started) { m->max_stream_id_started = sid; } - c2 = h2_c2_create(m->c1, m->pool); + transit = c2_transit_get(m); + c2 = ap_create_secondary_connection(transit->pool, m->c1, transit->bucket_alloc); + if (!c2) goto cleanup; ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c1, H2_STRM_MSG(stream, "created new c2")); - rv = c2_setup_io(m, c2, stream); - if (APR_SUCCESS != rv) { - return NULL; - } + rv = c2_setup_io(m, c2, stream, transit); + if (APR_SUCCESS != rv) goto cleanup; stream->c2 = c2; ++m->processing_count; APR_ARRAY_PUSH(m->streams_to_poll, h2_stream *) = stream; apr_pollset_wakeup(m->pollset); +cleanup: + if (APR_SUCCESS != rv && c2) { + h2_c2_destroy(c2); + c2 = NULL; + } + if (transit && !c2) { + c2_transit_recycle(m, transit); + } return c2; } @@ -896,8 +942,8 @@ static void s_c2_done(h2_mplx *m, conn_rec *c2, h2_conn_ctx_t *conn_ctx) ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c2, "h2_mplx(%s-%d): c2 done", conn_ctx->id, conn_ctx->stream_id); - ap_assert(conn_ctx->done == 0); - conn_ctx->done = 1; + AP_DEBUG_ASSERT(apr_atomic_read32(&conn_ctx->done) == 0); + apr_atomic_set32(&conn_ctx->done, 1); conn_ctx->done_at = apr_time_now(); ++c2->keepalives; /* From here on, the final handling of c2 is done by c1 processing. @@ -955,16 +1001,18 @@ static void s_c2_done(h2_mplx *m, conn_rec *c2, h2_conn_ctx_t *conn_ctx) void h2_mplx_worker_c2_done(conn_rec *c2, conn_rec **out_c2) { h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c2); - h2_mplx *m; + h2_mplx *m = conn_ctx? conn_ctx->mplx : NULL; - if (!conn_ctx || !conn_ctx->mplx) return; - m = conn_ctx->mplx; + if (!m) { + if (out_c2) *out_c2 = NULL; + return; + } H2_MPLX_ENTER_ALWAYS(m); --m->processing_count; s_c2_done(m, c2, conn_ctx); - + if (m->join_wait) { apr_thread_cond_signal(m->join_wait); } @@ -1084,52 +1132,19 @@ static apr_status_t mplx_pollset_create(h2_mplx *m) static apr_status_t mplx_pollset_add(h2_mplx *m, h2_conn_ctx_t *conn_ctx) { - apr_status_t rv = APR_SUCCESS; - const char *name = ""; - if (conn_ctx->pfd_out_prod.reqevents) { - name = "adding out"; - rv = apr_pollset_add(m->pollset, &conn_ctx->pfd_out_prod); - if (APR_SUCCESS != rv) goto cleanup; - } - - if (conn_ctx->pfd_in_drain.reqevents) { - name = "adding in_read"; - rv = apr_pollset_add(m->pollset, &conn_ctx->pfd_in_drain); - } - -cleanup: - if (APR_SUCCESS != rv) { - ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, m->c1, - "h2_mplx(%ld-%d): error while adding to pollset %s", - m->id, conn_ctx->stream_id, name); + return apr_pollset_add(m->pollset, &conn_ctx->pfd_out_prod); } - return rv; + return APR_SUCCESS; } static apr_status_t mplx_pollset_remove(h2_mplx *m, h2_conn_ctx_t *conn_ctx) { apr_status_t rv = APR_SUCCESS; - const char *name = ""; if (conn_ctx->pfd_out_prod.reqevents) { rv = apr_pollset_remove(m->pollset, &conn_ctx->pfd_out_prod); conn_ctx->pfd_out_prod.reqevents = 0; - if (APR_SUCCESS != rv) goto cleanup; - } - - if (conn_ctx->pfd_in_drain.reqevents) { - name = "in_read"; - rv = apr_pollset_remove(m->pollset, &conn_ctx->pfd_in_drain); - conn_ctx->pfd_in_drain.reqevents = 0; - if (APR_SUCCESS != rv) goto cleanup; - } - -cleanup: - if (APR_SUCCESS != rv) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, rv, m->c1, - "h2_mplx(%ld-%d): error removing from pollset %s", - m->id, conn_ctx->stream_id, name); } return rv; } @@ -1168,16 +1183,21 @@ static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout, apr_array_clear(m->streams_to_poll); } -#if !H2_POLL_STREAMS apr_thread_mutex_lock(m->poll_lock); - if (!h2_iq_empty(m->streams_input_read) - || !h2_iq_empty(m->streams_output_written)) { + if (!h2_iq_empty(m->streams_input_read)) { while ((i = h2_iq_shift(m->streams_input_read))) { stream = h2_ihash_get(m->streams, i); if (stream) { APR_ARRAY_PUSH(m->streams_ev_in, h2_stream*) = stream; } } + nresults = 0; + rv = APR_SUCCESS; + apr_thread_mutex_unlock(m->poll_lock); + break; + } +#if !H2_POLL_STREAMS + if (!h2_iq_empty(m->streams_output_written)) { while ((i = h2_iq_shift(m->streams_output_written))) { stream = h2_ihash_get(m->streams, i); if (stream) { @@ -1189,8 +1209,9 @@ static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout, apr_thread_mutex_unlock(m->poll_lock); break; } - apr_thread_mutex_unlock(m->poll_lock); #endif + apr_thread_mutex_unlock(m->poll_lock); + H2_MPLX_LEAVE(m); rv = apr_pollset_poll(m->pollset, timeout >= 0? timeout : -1, &nresults, &results); H2_MPLX_ENTER_ALWAYS(m); @@ -1276,14 +1297,6 @@ static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout, pfd->rtnevents); APR_ARRAY_PUSH(m->streams_ev_out, h2_stream*) = stream; } - else if (conn_ctx->pfd_in_drain.desc.f == pfd->desc.f) { - /* input has been consumed */ - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1, - "[%s-%d] poll input event %hx", - conn_ctx->id, conn_ctx->stream_id, - pfd->rtnevents); - APR_ARRAY_PUSH(m->streams_ev_in, h2_stream*) = stream; - } } if (on_stream_input && m->streams_ev_in->nelts) { |