summaryrefslogtreecommitdiffstats
path: root/modules/http2/h2_mplx.c
diff options
context:
space:
mode:
authorStefan Eissing <icing@apache.org>2022-03-21 09:19:28 +0100
committerStefan Eissing <icing@apache.org>2022-03-21 09:19:28 +0100
commit32efaf07fd89a4c3fe86ddbaf0ab439463da5604 (patch)
treea95cbfc5d94cb58b4b1c15f30278375a91f9bb27 /modules/http2/h2_mplx.c
parent *) core: adding a new hook and method to the API: (diff)
downloadapache2-32efaf07fd89a4c3fe86ddbaf0ab439463da5604.tar.xz
apache2-32efaf07fd89a4c3fe86ddbaf0ab439463da5604.zip
*) mod_http2: use pollset only for main connection and wakeups
for events on streams. Provide streams in INPUT pipe when needed and supported on the platform. git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1899102 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'modules/http2/h2_mplx.c')
-rw-r--r--modules/http2/h2_mplx.c168
1 files changed, 20 insertions, 148 deletions
diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c
index c1da860b67..dc930f7eb4 100644
--- a/modules/http2/h2_mplx.c
+++ b/modules/http2/h2_mplx.c
@@ -61,8 +61,6 @@ static apr_status_t s_mplx_be_happy(h2_mplx *m, conn_rec *c, h2_conn_ctx_t *conn
static apr_status_t m_be_annoyed(h2_mplx *m);
static apr_status_t mplx_pollset_create(h2_mplx *m);
-static apr_status_t mplx_pollset_add(h2_mplx *m, h2_conn_ctx_t *conn_ctx);
-static apr_status_t mplx_pollset_remove(h2_mplx *m, h2_conn_ctx_t *conn_ctx);
static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout,
stream_ev_callback *on_stream_input,
stream_ev_callback *on_stream_output,
@@ -313,20 +311,19 @@ h2_mplx *h2_mplx_c1_create(h2_stream *stream0, server_rec *s, apr_pool_t *parent
"nghttp2: could not create pollset");
goto failure;
}
- m->streams_to_poll = apr_array_make(m->pool, 10, sizeof(h2_stream*));
m->streams_ev_in = apr_array_make(m->pool, 10, sizeof(h2_stream*));
m->streams_ev_out = apr_array_make(m->pool, 10, sizeof(h2_stream*));
m->streams_input_read = h2_iq_create(m->pool, 10);
+ m->streams_output_written = h2_iq_create(m->pool, 10);
status = apr_thread_mutex_create(&m->poll_lock, APR_THREAD_MUTEX_DEFAULT,
m->pool);
if (APR_SUCCESS != status) goto failure;
-#if !H2_POLL_STREAMS
- m->streams_output_written = h2_iq_create(m->pool, 10);
-#endif
conn_ctx = h2_conn_ctx_get(m->c1);
- mplx_pollset_add(m, conn_ctx);
+ if (conn_ctx->pfd.reqevents) {
+ apr_pollset_add(m->pollset, &conn_ctx->pfd);
+ }
m->scratch_r = apr_pcalloc(m->pool, sizeof(*m->scratch_r));
m->max_spare_transits = 3;
@@ -550,17 +547,9 @@ static void c1_purge_streams(h2_mplx *m)
if (stream->c2) {
conn_rec *c2 = stream->c2;
h2_conn_ctx_t *c2_ctx = h2_conn_ctx_get(c2);
- apr_status_t rv;
stream->c2 = NULL;
ap_assert(c2_ctx);
- rv = mplx_pollset_remove(m, c2_ctx);
- if (APR_SUCCESS != rv) {
- ap_log_cerror(APLOG_MARK, APLOG_INFO, rv, m->c1,
- "h2_mplx(%ld-%d): pollset_remove %d on purge",
- m->id, stream->id, c2_ctx->stream_id);
- }
-
h2_c2_destroy(c2);
if (c2_ctx->transit) {
c2_transit_recycle(m, c2_ctx->transit);
@@ -753,8 +742,8 @@ static void c2_beam_input_write_notify(void *ctx, h2_bucket_beam *beam)
h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c);
(void)beam;
- if (conn_ctx && conn_ctx->stream_id && conn_ctx->pipe_in_prod[H2_PIPE_IN]) {
- apr_file_putc(1, conn_ctx->pipe_in_prod[H2_PIPE_IN]);
+ if (conn_ctx && conn_ctx->stream_id && conn_ctx->pipe_in[H2_PIPE_IN]) {
+ apr_file_putc(1, conn_ctx->pipe_in[H2_PIPE_IN]);
}
}
@@ -777,17 +766,10 @@ static void c2_beam_output_write_notify(void *ctx, h2_bucket_beam *beam)
h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c);
if (conn_ctx && conn_ctx->stream_id) {
- if (conn_ctx->pipe_out_prod[H2_PIPE_IN]) {
- apr_file_putc(1, conn_ctx->pipe_out_prod[H2_PIPE_IN]);
- }
-#if !H2_POLL_STREAMS
- else {
- apr_thread_mutex_lock(conn_ctx->mplx->poll_lock);
- h2_iq_append(conn_ctx->mplx->streams_output_written, conn_ctx->stream_id);
- apr_pollset_wakeup(conn_ctx->mplx->pollset);
- apr_thread_mutex_unlock(conn_ctx->mplx->poll_lock);
- }
-#endif
+ apr_thread_mutex_lock(conn_ctx->mplx->poll_lock);
+ h2_iq_append(conn_ctx->mplx->streams_output_written, conn_ctx->stream_id);
+ apr_pollset_wakeup(conn_ctx->mplx->pollset);
+ apr_thread_mutex_unlock(conn_ctx->mplx->poll_lock);
}
}
@@ -817,33 +799,19 @@ static apr_status_t c2_setup_io(h2_mplx *m, conn_rec *c2, h2_stream *stream, h2_
h2_beam_on_consumed(stream->input, c1_input_consumed, stream);
}
-#if H2_POLL_STREAMS
- if (!conn_ctx->pipe_out_prod[H2_PIPE_OUT]) {
- action = "create output pipe";
- rv = apr_file_pipe_create_pools(&conn_ctx->pipe_out_prod[H2_PIPE_OUT],
- &conn_ctx->pipe_out_prod[H2_PIPE_IN],
- APR_FULL_NONBLOCK,
- c2->pool, c2->pool);
- if (APR_SUCCESS != rv) goto cleanup;
- }
- conn_ctx->pfd_out_prod.desc_type = APR_POLL_FILE;
- conn_ctx->pfd_out_prod.desc.f = conn_ctx->pipe_out_prod[H2_PIPE_OUT];
- conn_ctx->pfd_out_prod.reqevents = APR_POLLIN | APR_POLLERR | APR_POLLHUP;
- conn_ctx->pfd_out_prod.client_data = conn_ctx;
-
+#if H2_USE_PIPES
if (stream->input) {
- if (!conn_ctx->pipe_in_prod[H2_PIPE_OUT]) {
+ if (!conn_ctx->pipe_in[H2_PIPE_OUT]) {
action = "create input write pipe";
- rv = apr_file_pipe_create_pools(&conn_ctx->pipe_in_prod[H2_PIPE_OUT],
- &conn_ctx->pipe_in_prod[H2_PIPE_IN],
+ rv = apr_file_pipe_create_pools(&conn_ctx->pipe_in[H2_PIPE_OUT],
+ &conn_ctx->pipe_in[H2_PIPE_IN],
APR_READ_BLOCK,
c2->pool, c2->pool);
if (APR_SUCCESS != rv) goto cleanup;
}
}
#else
- memset(&conn_ctx->pfd_out_prod, 0, sizeof(conn_ctx->pfd_out_prod));
- memset(&conn_ctx->pipe_in_prod, 0, sizeof(conn_ctx->pipe_in_prod));
+ memset(&conn_ctx->pipe_in, 0, sizeof(conn_ctx->pipe_in));
#endif
cleanup:
@@ -894,8 +862,6 @@ static conn_rec *s_next_c2(h2_mplx *m)
stream->c2 = c2;
++m->processing_count;
- APR_ARRAY_PUSH(m->streams_to_poll, h2_stream *) = stream;
- apr_pollset_wakeup(m->pollset);
cleanup:
if (APR_SUCCESS != rv && c2) {
@@ -1122,33 +1088,11 @@ apr_status_t h2_mplx_c1_client_rst(h2_mplx *m, int stream_id)
static apr_status_t mplx_pollset_create(h2_mplx *m)
{
- int max_pdfs;
-
- /* stream0 output, pdf_out+pfd_in_consume per active streams */
- max_pdfs = 1 + 2 * H2MIN(m->processing_max, m->max_streams);
- return apr_pollset_create(&m->pollset, max_pdfs, m->pool,
+ /* stream0 output only */
+ return apr_pollset_create(&m->pollset, 1, m->pool,
APR_POLLSET_WAKEABLE);
}
-static apr_status_t mplx_pollset_add(h2_mplx *m, h2_conn_ctx_t *conn_ctx)
-{
- if (conn_ctx->pfd_out_prod.reqevents) {
- return apr_pollset_add(m->pollset, &conn_ctx->pfd_out_prod);
- }
- return APR_SUCCESS;
-}
-
-static apr_status_t mplx_pollset_remove(h2_mplx *m, h2_conn_ctx_t *conn_ctx)
-{
- apr_status_t rv = APR_SUCCESS;
-
- if (conn_ctx->pfd_out_prod.reqevents) {
- rv = apr_pollset_remove(m->pollset, &conn_ctx->pfd_out_prod);
- conn_ctx->pfd_out_prod.reqevents = 0;
- }
- return rv;
-}
-
static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout,
stream_ev_callback *on_stream_input,
stream_ev_callback *on_stream_output,
@@ -1173,31 +1117,15 @@ static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout,
do {
/* add streams we started processing in the meantime */
- if (m->streams_to_poll->nelts) {
- for (i = 0; i < m->streams_to_poll->nelts; ++i) {
- stream = APR_ARRAY_IDX(m->streams_to_poll, i, h2_stream*);
- if (stream && stream->c2 && (conn_ctx = h2_conn_ctx_get(stream->c2))) {
- mplx_pollset_add(m, conn_ctx);
- }
- }
- apr_array_clear(m->streams_to_poll);
- }
-
apr_thread_mutex_lock(m->poll_lock);
- if (!h2_iq_empty(m->streams_input_read)) {
+ if (!h2_iq_empty(m->streams_input_read)
+ || !h2_iq_empty(m->streams_output_written)) {
while ((i = h2_iq_shift(m->streams_input_read))) {
stream = h2_ihash_get(m->streams, i);
if (stream) {
APR_ARRAY_PUSH(m->streams_ev_in, h2_stream*) = stream;
}
}
- nresults = 0;
- rv = APR_SUCCESS;
- apr_thread_mutex_unlock(m->poll_lock);
- break;
- }
-#if !H2_POLL_STREAMS
- if (!h2_iq_empty(m->streams_output_written)) {
while ((i = h2_iq_shift(m->streams_output_written))) {
stream = h2_ihash_get(m->streams, i);
if (stream) {
@@ -1209,7 +1137,6 @@ static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout,
apr_thread_mutex_unlock(m->poll_lock);
break;
}
-#endif
apr_thread_mutex_unlock(m->poll_lock);
H2_MPLX_LEAVE(m);
@@ -1235,68 +1162,13 @@ static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout,
pfd = &results[i];
conn_ctx = pfd->client_data;
- ap_assert(conn_ctx);
+ AP_DEBUG_ASSERT(conn_ctx);
if (conn_ctx->stream_id == 0) {
if (on_stream_input) {
APR_ARRAY_PUSH(m->streams_ev_in, h2_stream*) = m->stream0;
}
continue;
}
-
- h2_util_drain_pipe(pfd->desc.f);
- stream = h2_ihash_get(m->streams, conn_ctx->stream_id);
- if (!stream) {
- stream = h2_ihash_get(m->shold, conn_ctx->stream_id);
- if (stream) {
- /* This is normal and means that stream processing on c1 has
- * already finished to CLEANUP and c2 is not done yet */
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, m->c1,
- "h2_mplx(%ld-%d): stream already in hold for poll event %hx",
- m->id, conn_ctx->stream_id, pfd->rtnevents);
- }
- else {
- h2_stream *sp = NULL;
- int j;
-
- for (j = 0; j < m->spurge->nelts; ++j) {
- sp = APR_ARRAY_IDX(m->spurge, j, h2_stream*);
- if (sp->id == conn_ctx->stream_id) {
- stream = sp;
- break;
- }
- }
-
- if (stream) {
- /* This is normal and means that stream processing on c1 has
- * already finished to CLEANUP and c2 is not done yet */
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, rv, m->c1, APLOGNO(10311)
- "h2_mplx(%ld-%d): stream already in purge for poll event %hx",
- m->id, conn_ctx->stream_id, pfd->rtnevents);
- }
- else {
- /* This should not happen. When a stream has been purged,
- * it MUST no longer appear in the pollset. Puring is done
- * outside the poll result processing. */
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, rv, m->c1, APLOGNO(10312)
- "h2_mplx(%ld-%d): stream no longer known for poll event %hx"
- ", m->streams=%d, conn_ctx=%lx, fd=%lx",
- m->id, conn_ctx->stream_id, pfd->rtnevents,
- (int)h2_ihash_count(m->streams),
- (long)conn_ctx, (long)pfd->desc.f);
- h2_ihash_iter(m->streams, m_report_stream_iter, m);
- }
- }
- continue;
- }
-
- if (conn_ctx->pfd_out_prod.desc.f == pfd->desc.f) {
- /* output is available */
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
- "[%s-%d] poll output event %hx",
- conn_ctx->id, conn_ctx->stream_id,
- pfd->rtnevents);
- APR_ARRAY_PUSH(m->streams_ev_out, h2_stream*) = stream;
- }
}
if (on_stream_input && m->streams_ev_in->nelts) {