diff options
author | Stefan Eissing <icing@apache.org> | 2022-03-21 09:19:28 +0100 |
---|---|---|
committer | Stefan Eissing <icing@apache.org> | 2022-03-21 09:19:28 +0100 |
commit | 32efaf07fd89a4c3fe86ddbaf0ab439463da5604 (patch) | |
tree | a95cbfc5d94cb58b4b1c15f30278375a91f9bb27 /modules/http2/h2_mplx.c | |
parent | *) core: adding a new hook and method to the API: (diff) | |
download | apache2-32efaf07fd89a4c3fe86ddbaf0ab439463da5604.tar.xz apache2-32efaf07fd89a4c3fe86ddbaf0ab439463da5604.zip |
*) mod_http2: use pollset only for main connection and wakeups
for events on streams. Provide streams in INPUT pipe when
needed and supported on the platform.
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1899102 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'modules/http2/h2_mplx.c')
-rw-r--r-- | modules/http2/h2_mplx.c | 168 |
1 files changed, 20 insertions, 148 deletions
diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index c1da860b67..dc930f7eb4 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -61,8 +61,6 @@ static apr_status_t s_mplx_be_happy(h2_mplx *m, conn_rec *c, h2_conn_ctx_t *conn static apr_status_t m_be_annoyed(h2_mplx *m); static apr_status_t mplx_pollset_create(h2_mplx *m); -static apr_status_t mplx_pollset_add(h2_mplx *m, h2_conn_ctx_t *conn_ctx); -static apr_status_t mplx_pollset_remove(h2_mplx *m, h2_conn_ctx_t *conn_ctx); static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout, stream_ev_callback *on_stream_input, stream_ev_callback *on_stream_output, @@ -313,20 +311,19 @@ h2_mplx *h2_mplx_c1_create(h2_stream *stream0, server_rec *s, apr_pool_t *parent "nghttp2: could not create pollset"); goto failure; } - m->streams_to_poll = apr_array_make(m->pool, 10, sizeof(h2_stream*)); m->streams_ev_in = apr_array_make(m->pool, 10, sizeof(h2_stream*)); m->streams_ev_out = apr_array_make(m->pool, 10, sizeof(h2_stream*)); m->streams_input_read = h2_iq_create(m->pool, 10); + m->streams_output_written = h2_iq_create(m->pool, 10); status = apr_thread_mutex_create(&m->poll_lock, APR_THREAD_MUTEX_DEFAULT, m->pool); if (APR_SUCCESS != status) goto failure; -#if !H2_POLL_STREAMS - m->streams_output_written = h2_iq_create(m->pool, 10); -#endif conn_ctx = h2_conn_ctx_get(m->c1); - mplx_pollset_add(m, conn_ctx); + if (conn_ctx->pfd.reqevents) { + apr_pollset_add(m->pollset, &conn_ctx->pfd); + } m->scratch_r = apr_pcalloc(m->pool, sizeof(*m->scratch_r)); m->max_spare_transits = 3; @@ -550,17 +547,9 @@ static void c1_purge_streams(h2_mplx *m) if (stream->c2) { conn_rec *c2 = stream->c2; h2_conn_ctx_t *c2_ctx = h2_conn_ctx_get(c2); - apr_status_t rv; stream->c2 = NULL; ap_assert(c2_ctx); - rv = mplx_pollset_remove(m, c2_ctx); - if (APR_SUCCESS != rv) { - ap_log_cerror(APLOG_MARK, APLOG_INFO, rv, m->c1, - "h2_mplx(%ld-%d): pollset_remove %d on purge", - m->id, stream->id, c2_ctx->stream_id); - } - h2_c2_destroy(c2); if (c2_ctx->transit) { c2_transit_recycle(m, c2_ctx->transit); @@ -753,8 +742,8 @@ static void c2_beam_input_write_notify(void *ctx, h2_bucket_beam *beam) h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c); (void)beam; - if (conn_ctx && conn_ctx->stream_id && conn_ctx->pipe_in_prod[H2_PIPE_IN]) { - apr_file_putc(1, conn_ctx->pipe_in_prod[H2_PIPE_IN]); + if (conn_ctx && conn_ctx->stream_id && conn_ctx->pipe_in[H2_PIPE_IN]) { + apr_file_putc(1, conn_ctx->pipe_in[H2_PIPE_IN]); } } @@ -777,17 +766,10 @@ static void c2_beam_output_write_notify(void *ctx, h2_bucket_beam *beam) h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c); if (conn_ctx && conn_ctx->stream_id) { - if (conn_ctx->pipe_out_prod[H2_PIPE_IN]) { - apr_file_putc(1, conn_ctx->pipe_out_prod[H2_PIPE_IN]); - } -#if !H2_POLL_STREAMS - else { - apr_thread_mutex_lock(conn_ctx->mplx->poll_lock); - h2_iq_append(conn_ctx->mplx->streams_output_written, conn_ctx->stream_id); - apr_pollset_wakeup(conn_ctx->mplx->pollset); - apr_thread_mutex_unlock(conn_ctx->mplx->poll_lock); - } -#endif + apr_thread_mutex_lock(conn_ctx->mplx->poll_lock); + h2_iq_append(conn_ctx->mplx->streams_output_written, conn_ctx->stream_id); + apr_pollset_wakeup(conn_ctx->mplx->pollset); + apr_thread_mutex_unlock(conn_ctx->mplx->poll_lock); } } @@ -817,33 +799,19 @@ static apr_status_t c2_setup_io(h2_mplx *m, conn_rec *c2, h2_stream *stream, h2_ h2_beam_on_consumed(stream->input, c1_input_consumed, stream); } -#if H2_POLL_STREAMS - if (!conn_ctx->pipe_out_prod[H2_PIPE_OUT]) { - action = "create output pipe"; - rv = apr_file_pipe_create_pools(&conn_ctx->pipe_out_prod[H2_PIPE_OUT], - &conn_ctx->pipe_out_prod[H2_PIPE_IN], - APR_FULL_NONBLOCK, - c2->pool, c2->pool); - if (APR_SUCCESS != rv) goto cleanup; - } - conn_ctx->pfd_out_prod.desc_type = APR_POLL_FILE; - conn_ctx->pfd_out_prod.desc.f = conn_ctx->pipe_out_prod[H2_PIPE_OUT]; - conn_ctx->pfd_out_prod.reqevents = APR_POLLIN | APR_POLLERR | APR_POLLHUP; - conn_ctx->pfd_out_prod.client_data = conn_ctx; - +#if H2_USE_PIPES if (stream->input) { - if (!conn_ctx->pipe_in_prod[H2_PIPE_OUT]) { + if (!conn_ctx->pipe_in[H2_PIPE_OUT]) { action = "create input write pipe"; - rv = apr_file_pipe_create_pools(&conn_ctx->pipe_in_prod[H2_PIPE_OUT], - &conn_ctx->pipe_in_prod[H2_PIPE_IN], + rv = apr_file_pipe_create_pools(&conn_ctx->pipe_in[H2_PIPE_OUT], + &conn_ctx->pipe_in[H2_PIPE_IN], APR_READ_BLOCK, c2->pool, c2->pool); if (APR_SUCCESS != rv) goto cleanup; } } #else - memset(&conn_ctx->pfd_out_prod, 0, sizeof(conn_ctx->pfd_out_prod)); - memset(&conn_ctx->pipe_in_prod, 0, sizeof(conn_ctx->pipe_in_prod)); + memset(&conn_ctx->pipe_in, 0, sizeof(conn_ctx->pipe_in)); #endif cleanup: @@ -894,8 +862,6 @@ static conn_rec *s_next_c2(h2_mplx *m) stream->c2 = c2; ++m->processing_count; - APR_ARRAY_PUSH(m->streams_to_poll, h2_stream *) = stream; - apr_pollset_wakeup(m->pollset); cleanup: if (APR_SUCCESS != rv && c2) { @@ -1122,33 +1088,11 @@ apr_status_t h2_mplx_c1_client_rst(h2_mplx *m, int stream_id) static apr_status_t mplx_pollset_create(h2_mplx *m) { - int max_pdfs; - - /* stream0 output, pdf_out+pfd_in_consume per active streams */ - max_pdfs = 1 + 2 * H2MIN(m->processing_max, m->max_streams); - return apr_pollset_create(&m->pollset, max_pdfs, m->pool, + /* stream0 output only */ + return apr_pollset_create(&m->pollset, 1, m->pool, APR_POLLSET_WAKEABLE); } -static apr_status_t mplx_pollset_add(h2_mplx *m, h2_conn_ctx_t *conn_ctx) -{ - if (conn_ctx->pfd_out_prod.reqevents) { - return apr_pollset_add(m->pollset, &conn_ctx->pfd_out_prod); - } - return APR_SUCCESS; -} - -static apr_status_t mplx_pollset_remove(h2_mplx *m, h2_conn_ctx_t *conn_ctx) -{ - apr_status_t rv = APR_SUCCESS; - - if (conn_ctx->pfd_out_prod.reqevents) { - rv = apr_pollset_remove(m->pollset, &conn_ctx->pfd_out_prod); - conn_ctx->pfd_out_prod.reqevents = 0; - } - return rv; -} - static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout, stream_ev_callback *on_stream_input, stream_ev_callback *on_stream_output, @@ -1173,31 +1117,15 @@ static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout, do { /* add streams we started processing in the meantime */ - if (m->streams_to_poll->nelts) { - for (i = 0; i < m->streams_to_poll->nelts; ++i) { - stream = APR_ARRAY_IDX(m->streams_to_poll, i, h2_stream*); - if (stream && stream->c2 && (conn_ctx = h2_conn_ctx_get(stream->c2))) { - mplx_pollset_add(m, conn_ctx); - } - } - apr_array_clear(m->streams_to_poll); - } - apr_thread_mutex_lock(m->poll_lock); - if (!h2_iq_empty(m->streams_input_read)) { + if (!h2_iq_empty(m->streams_input_read) + || !h2_iq_empty(m->streams_output_written)) { while ((i = h2_iq_shift(m->streams_input_read))) { stream = h2_ihash_get(m->streams, i); if (stream) { APR_ARRAY_PUSH(m->streams_ev_in, h2_stream*) = stream; } } - nresults = 0; - rv = APR_SUCCESS; - apr_thread_mutex_unlock(m->poll_lock); - break; - } -#if !H2_POLL_STREAMS - if (!h2_iq_empty(m->streams_output_written)) { while ((i = h2_iq_shift(m->streams_output_written))) { stream = h2_ihash_get(m->streams, i); if (stream) { @@ -1209,7 +1137,6 @@ static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout, apr_thread_mutex_unlock(m->poll_lock); break; } -#endif apr_thread_mutex_unlock(m->poll_lock); H2_MPLX_LEAVE(m); @@ -1235,68 +1162,13 @@ static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout, pfd = &results[i]; conn_ctx = pfd->client_data; - ap_assert(conn_ctx); + AP_DEBUG_ASSERT(conn_ctx); if (conn_ctx->stream_id == 0) { if (on_stream_input) { APR_ARRAY_PUSH(m->streams_ev_in, h2_stream*) = m->stream0; } continue; } - - h2_util_drain_pipe(pfd->desc.f); - stream = h2_ihash_get(m->streams, conn_ctx->stream_id); - if (!stream) { - stream = h2_ihash_get(m->shold, conn_ctx->stream_id); - if (stream) { - /* This is normal and means that stream processing on c1 has - * already finished to CLEANUP and c2 is not done yet */ - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, m->c1, - "h2_mplx(%ld-%d): stream already in hold for poll event %hx", - m->id, conn_ctx->stream_id, pfd->rtnevents); - } - else { - h2_stream *sp = NULL; - int j; - - for (j = 0; j < m->spurge->nelts; ++j) { - sp = APR_ARRAY_IDX(m->spurge, j, h2_stream*); - if (sp->id == conn_ctx->stream_id) { - stream = sp; - break; - } - } - - if (stream) { - /* This is normal and means that stream processing on c1 has - * already finished to CLEANUP and c2 is not done yet */ - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, rv, m->c1, APLOGNO(10311) - "h2_mplx(%ld-%d): stream already in purge for poll event %hx", - m->id, conn_ctx->stream_id, pfd->rtnevents); - } - else { - /* This should not happen. When a stream has been purged, - * it MUST no longer appear in the pollset. Puring is done - * outside the poll result processing. */ - ap_log_cerror(APLOG_MARK, APLOG_WARNING, rv, m->c1, APLOGNO(10312) - "h2_mplx(%ld-%d): stream no longer known for poll event %hx" - ", m->streams=%d, conn_ctx=%lx, fd=%lx", - m->id, conn_ctx->stream_id, pfd->rtnevents, - (int)h2_ihash_count(m->streams), - (long)conn_ctx, (long)pfd->desc.f); - h2_ihash_iter(m->streams, m_report_stream_iter, m); - } - } - continue; - } - - if (conn_ctx->pfd_out_prod.desc.f == pfd->desc.f) { - /* output is available */ - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1, - "[%s-%d] poll output event %hx", - conn_ctx->id, conn_ctx->stream_id, - pfd->rtnevents); - APR_ARRAY_PUSH(m->streams_ev_out, h2_stream*) = stream; - } } if (on_stream_input && m->streams_ev_in->nelts) { |