summaryrefslogtreecommitdiffstats
path: root/modules/http2/h2_mplx.c
diff options
context:
space:
mode:
authorStefan Eissing <icing@apache.org>2022-03-18 10:52:52 +0100
committerStefan Eissing <icing@apache.org>2022-03-18 10:52:52 +0100
commit6bd9d17e081692d3555d1c9803962c883acd136b (patch)
tree61bed2fc5a3746e58932712e5c5eb477a11fa5f0 /modules/http2/h2_mplx.c
parentUpdate to test against OpenSSL 3.0.2. (diff)
downloadapache2-6bd9d17e081692d3555d1c9803962c883acd136b.tar.xz
apache2-6bd9d17e081692d3555d1c9803962c883acd136b.zip
*) core: adding a new hook and method to the API:
create_secondary_connection and ap_create_secondary_connection() to setup connections related to a "master" one, as used in the HTTP/2 protocol implementation. *) mod_http2: using the new API calls to get rid of knowledge about how the core handles conn_rec specifics. Improvements in pollset stream handling to use less sets. Using atomic read/writes instead of volatiles now. Keeping a reserve of "transit" pools and bucket_allocs for use on secondary connections to avoid repeated setup/teardowns. git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1899032 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'modules/http2/h2_mplx.c')
-rw-r--r--modules/http2/h2_mplx.c241
1 files changed, 127 insertions, 114 deletions
diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c
index c083f13c12..c1da860b67 100644
--- a/modules/http2/h2_mplx.c
+++ b/modules/http2/h2_mplx.c
@@ -26,6 +26,7 @@
#include <httpd.h>
#include <http_core.h>
+#include <http_connection.h>
#include <http_log.h>
#include <mpm_common.h>
@@ -69,6 +70,13 @@ static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout,
static apr_pool_t *pchild;
+/* APR callback invoked if allocation fails. */
+static int abort_on_oom(int retcode)
+{
+ ap_abort_on_oom();
+ return retcode; /* unreachable, hopefully. */
+}
+
apr_status_t h2_mplx_c1_child_init(apr_pool_t *pool, server_rec *s)
{
pchild = pool;
@@ -100,7 +108,8 @@ static void c1_input_consumed(void *ctx, h2_bucket_beam *beam, apr_off_t length)
static int stream_is_running(h2_stream *stream)
{
h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(stream->c2);
- return conn_ctx && conn_ctx->started_at != 0 && !conn_ctx->done;
+ return conn_ctx && apr_atomic_read32(&conn_ctx->started) != 0
+ && apr_atomic_read32(&conn_ctx->done) == 0;
}
int h2_mplx_c1_stream_is_running(h2_mplx *m, h2_stream *stream)
@@ -153,13 +162,7 @@ static void m_stream_cleanup(h2_mplx *m, h2_stream *stream)
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
H2_STRM_MSG(stream, "cleanup, c2 is running, abort"));
/* c2 is still running */
- stream->c2->aborted = 1;
- if (stream->input) {
- h2_beam_abort(stream->input, m->c1);
- }
- if (stream->output) {
- h2_beam_abort(stream->output, m->c1);
- }
+ h2_c2_abort(stream->c2, m->c1);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
H2_STRM_MSG(stream, "cleanup, c2 is done, move to shold"));
h2_ihash_add(m->shold, stream);
@@ -173,6 +176,66 @@ static void m_stream_cleanup(h2_mplx *m, h2_stream *stream)
}
}
+static h2_c2_transit *c2_transit_create(h2_mplx *m)
+{
+ apr_allocator_t *allocator;
+ apr_pool_t *ptrans;
+ h2_c2_transit *transit;
+ apr_status_t rv;
+
+ /* We create a pool with its own allocator to be used for
+ * processing a request. This is the only way to have the processing
+ * independent of its parent pool in the sense that it can work in
+ * another thread.
+ */
+
+ rv = apr_allocator_create(&allocator);
+ if (rv == APR_SUCCESS) {
+ apr_allocator_max_free_set(allocator, ap_max_mem_free);
+ rv = apr_pool_create_ex(&ptrans, m->pool, NULL, allocator);
+ }
+ if (rv != APR_SUCCESS) {
+ /* maybe the log goes through, maybe not. */
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, m->c1,
+ APLOGNO(10004) "h2_mplx: create transit pool");
+ ap_abort_on_oom();
+ return NULL; /* should never be reached. */
+ }
+
+ apr_allocator_owner_set(allocator, ptrans);
+ apr_pool_abort_set(abort_on_oom, ptrans);
+ apr_pool_tag(ptrans, "h2_c2_transit");
+
+ transit = apr_pcalloc(ptrans, sizeof(*transit));
+ transit->pool = ptrans;
+ transit->bucket_alloc = apr_bucket_alloc_create(ptrans);
+ return transit;
+}
+
+static void c2_transit_destroy(h2_c2_transit *transit)
+{
+ apr_pool_destroy(transit->pool);
+}
+
+static h2_c2_transit *c2_transit_get(h2_mplx *m)
+{
+ h2_c2_transit **ptransit = apr_array_pop(m->c2_transits);
+ if (ptransit) {
+ return *ptransit;
+ }
+ return c2_transit_create(m);
+}
+
+static void c2_transit_recycle(h2_mplx *m, h2_c2_transit *transit)
+{
+ if (m->c2_transits->nelts >= m->max_spare_transits) {
+ c2_transit_destroy(transit);
+ }
+ else {
+ APR_ARRAY_PUSH(m->c2_transits, h2_c2_transit*) = transit;
+ }
+}
+
/**
* A h2_mplx needs to be thread-safe *and* if will be called by
* the h2_session thread *and* the h2_worker threads. Therefore:
@@ -254,11 +317,11 @@ h2_mplx *h2_mplx_c1_create(h2_stream *stream0, server_rec *s, apr_pool_t *parent
m->streams_ev_in = apr_array_make(m->pool, 10, sizeof(h2_stream*));
m->streams_ev_out = apr_array_make(m->pool, 10, sizeof(h2_stream*));
-#if !H2_POLL_STREAMS
+ m->streams_input_read = h2_iq_create(m->pool, 10);
status = apr_thread_mutex_create(&m->poll_lock, APR_THREAD_MUTEX_DEFAULT,
m->pool);
if (APR_SUCCESS != status) goto failure;
- m->streams_input_read = h2_iq_create(m->pool, 10);
+#if !H2_POLL_STREAMS
m->streams_output_written = h2_iq_create(m->pool, 10);
#endif
@@ -266,6 +329,8 @@ h2_mplx *h2_mplx_c1_create(h2_stream *stream0, server_rec *s, apr_pool_t *parent
mplx_pollset_add(m, conn_ctx);
m->scratch_r = apr_pcalloc(m->pool, sizeof(*m->scratch_r));
+ m->max_spare_transits = 3;
+ m->c2_transits = apr_array_make(m->pool, m->max_spare_transits, sizeof(h2_c2_transit*));
return m;
@@ -331,8 +396,9 @@ static int m_report_stream_iter(void *ctx, void *val) {
H2_STRM_MSG(stream, "->03198: %s %s %s"
"[started=%d/done=%d]"),
conn_ctx->request->method, conn_ctx->request->authority,
- conn_ctx->request->path, conn_ctx->started_at != 0,
- conn_ctx->done);
+ conn_ctx->request->path,
+ (int)apr_atomic_read32(&conn_ctx->started),
+ (int)apr_atomic_read32(&conn_ctx->done));
}
else {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c1, /* NO APLOGNO */
@@ -354,10 +420,6 @@ static int m_stream_cancel_iter(void *ctx, void *val) {
h2_mplx *m = ctx;
h2_stream *stream = val;
- /* disable input consumed reporting */
- if (stream->input) {
- h2_beam_abort(stream->input, m->c1);
- }
/* take over event monitoring */
h2_stream_set_monitor(stream, NULL);
/* Reset, should transit to CLOSED state */
@@ -499,8 +561,11 @@ static void c1_purge_streams(h2_mplx *m)
m->id, stream->id, c2_ctx->stream_id);
}
- h2_conn_ctx_destroy(c2);
h2_c2_destroy(c2);
+ if (c2_ctx->transit) {
+ c2_transit_recycle(m, c2_ctx->transit);
+ c2_ctx->transit = NULL;
+ }
}
h2_stream_destroy(stream);
}
@@ -699,17 +764,10 @@ static void c2_beam_input_read_notify(void *ctx, h2_bucket_beam *beam)
h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c);
if (conn_ctx && conn_ctx->stream_id) {
- if (conn_ctx->pipe_in_drain[H2_PIPE_IN]) {
- apr_file_putc(1, conn_ctx->pipe_in_drain[H2_PIPE_IN]);
- }
-#if !H2_POLL_STREAMS
- else {
- apr_thread_mutex_lock(conn_ctx->mplx->poll_lock);
- h2_iq_append(conn_ctx->mplx->streams_input_read, conn_ctx->stream_id);
- apr_pollset_wakeup(conn_ctx->mplx->pollset);
- apr_thread_mutex_unlock(conn_ctx->mplx->poll_lock);
- }
-#endif
+ apr_thread_mutex_lock(conn_ctx->mplx->poll_lock);
+ h2_iq_append(conn_ctx->mplx->streams_input_read, conn_ctx->stream_id);
+ apr_pollset_wakeup(conn_ctx->mplx->pollset);
+ apr_thread_mutex_unlock(conn_ctx->mplx->poll_lock);
}
}
@@ -733,13 +791,13 @@ static void c2_beam_output_write_notify(void *ctx, h2_bucket_beam *beam)
}
}
-static apr_status_t c2_setup_io(h2_mplx *m, conn_rec *c2, h2_stream *stream)
+static apr_status_t c2_setup_io(h2_mplx *m, conn_rec *c2, h2_stream *stream, h2_c2_transit *transit)
{
h2_conn_ctx_t *conn_ctx;
apr_status_t rv = APR_SUCCESS;
const char *action = "init";
- rv = h2_conn_ctx_init_for_c2(&conn_ctx, c2, m, stream);
+ rv = h2_conn_ctx_init_for_c2(&conn_ctx, c2, m, stream, transit);
if (APR_SUCCESS != rv) goto cleanup;
if (!conn_ctx->beam_out) {
@@ -758,22 +816,14 @@ static apr_status_t c2_setup_io(h2_mplx *m, conn_rec *c2, h2_stream *stream)
h2_beam_on_received(stream->input, c2_beam_input_read_notify, c2);
h2_beam_on_consumed(stream->input, c1_input_consumed, stream);
}
- else {
- memset(&conn_ctx->pfd_in_drain, 0, sizeof(conn_ctx->pfd_in_drain));
- }
#if H2_POLL_STREAMS
- if (!conn_ctx->mplx_pool) {
- apr_pool_create(&conn_ctx->mplx_pool, m->pool);
- apr_pool_tag(conn_ctx->mplx_pool, "H2_MPLX_C2");
- }
-
if (!conn_ctx->pipe_out_prod[H2_PIPE_OUT]) {
action = "create output pipe";
rv = apr_file_pipe_create_pools(&conn_ctx->pipe_out_prod[H2_PIPE_OUT],
&conn_ctx->pipe_out_prod[H2_PIPE_IN],
APR_FULL_NONBLOCK,
- conn_ctx->mplx_pool, c2->pool);
+ c2->pool, c2->pool);
if (APR_SUCCESS != rv) goto cleanup;
}
conn_ctx->pfd_out_prod.desc_type = APR_POLL_FILE;
@@ -787,26 +837,13 @@ static apr_status_t c2_setup_io(h2_mplx *m, conn_rec *c2, h2_stream *stream)
rv = apr_file_pipe_create_pools(&conn_ctx->pipe_in_prod[H2_PIPE_OUT],
&conn_ctx->pipe_in_prod[H2_PIPE_IN],
APR_READ_BLOCK,
- c2->pool, conn_ctx->mplx_pool);
- if (APR_SUCCESS != rv) goto cleanup;
- }
- if (!conn_ctx->pipe_in_drain[H2_PIPE_OUT]) {
- action = "create input read pipe";
- rv = apr_file_pipe_create_pools(&conn_ctx->pipe_in_drain[H2_PIPE_OUT],
- &conn_ctx->pipe_in_drain[H2_PIPE_IN],
- APR_FULL_NONBLOCK,
- c2->pool, conn_ctx->mplx_pool);
+ c2->pool, c2->pool);
if (APR_SUCCESS != rv) goto cleanup;
}
- conn_ctx->pfd_in_drain.desc_type = APR_POLL_FILE;
- conn_ctx->pfd_in_drain.desc.f = conn_ctx->pipe_in_drain[H2_PIPE_OUT];
- conn_ctx->pfd_in_drain.reqevents = APR_POLLIN | APR_POLLERR | APR_POLLHUP;
- conn_ctx->pfd_in_drain.client_data = conn_ctx;
}
#else
memset(&conn_ctx->pfd_out_prod, 0, sizeof(conn_ctx->pfd_out_prod));
memset(&conn_ctx->pipe_in_prod, 0, sizeof(conn_ctx->pipe_in_prod));
- memset(&conn_ctx->pipe_in_drain, 0, sizeof(conn_ctx->pipe_in_drain));
#endif
cleanup:
@@ -822,9 +859,10 @@ cleanup:
static conn_rec *s_next_c2(h2_mplx *m)
{
h2_stream *stream = NULL;
- apr_status_t rv;
+ apr_status_t rv = APR_SUCCESS;
int sid;
- conn_rec *c2;
+ conn_rec *c2 = NULL;
+ h2_c2_transit *transit = NULL;
while (!m->aborted && !stream && (m->processing_count < m->processing_limit)
&& (sid = h2_iq_shift(m->q)) > 0) {
@@ -838,27 +876,35 @@ static conn_rec *s_next_c2(h2_mplx *m)
"Current limit is %d and %d workers are in use.",
m->id, m->processing_limit, m->processing_count);
}
- return NULL;
+ goto cleanup;
}
if (sid > m->max_stream_id_started) {
m->max_stream_id_started = sid;
}
- c2 = h2_c2_create(m->c1, m->pool);
+ transit = c2_transit_get(m);
+ c2 = ap_create_secondary_connection(transit->pool, m->c1, transit->bucket_alloc);
+ if (!c2) goto cleanup;
ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c1,
H2_STRM_MSG(stream, "created new c2"));
- rv = c2_setup_io(m, c2, stream);
- if (APR_SUCCESS != rv) {
- return NULL;
- }
+ rv = c2_setup_io(m, c2, stream, transit);
+ if (APR_SUCCESS != rv) goto cleanup;
stream->c2 = c2;
++m->processing_count;
APR_ARRAY_PUSH(m->streams_to_poll, h2_stream *) = stream;
apr_pollset_wakeup(m->pollset);
+cleanup:
+ if (APR_SUCCESS != rv && c2) {
+ h2_c2_destroy(c2);
+ c2 = NULL;
+ }
+ if (transit && !c2) {
+ c2_transit_recycle(m, transit);
+ }
return c2;
}
@@ -896,8 +942,8 @@ static void s_c2_done(h2_mplx *m, conn_rec *c2, h2_conn_ctx_t *conn_ctx)
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c2,
"h2_mplx(%s-%d): c2 done", conn_ctx->id, conn_ctx->stream_id);
- ap_assert(conn_ctx->done == 0);
- conn_ctx->done = 1;
+ AP_DEBUG_ASSERT(apr_atomic_read32(&conn_ctx->done) == 0);
+ apr_atomic_set32(&conn_ctx->done, 1);
conn_ctx->done_at = apr_time_now();
++c2->keepalives;
/* From here on, the final handling of c2 is done by c1 processing.
@@ -955,16 +1001,18 @@ static void s_c2_done(h2_mplx *m, conn_rec *c2, h2_conn_ctx_t *conn_ctx)
void h2_mplx_worker_c2_done(conn_rec *c2, conn_rec **out_c2)
{
h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c2);
- h2_mplx *m;
+ h2_mplx *m = conn_ctx? conn_ctx->mplx : NULL;
- if (!conn_ctx || !conn_ctx->mplx) return;
- m = conn_ctx->mplx;
+ if (!m) {
+ if (out_c2) *out_c2 = NULL;
+ return;
+ }
H2_MPLX_ENTER_ALWAYS(m);
--m->processing_count;
s_c2_done(m, c2, conn_ctx);
-
+
if (m->join_wait) {
apr_thread_cond_signal(m->join_wait);
}
@@ -1084,52 +1132,19 @@ static apr_status_t mplx_pollset_create(h2_mplx *m)
static apr_status_t mplx_pollset_add(h2_mplx *m, h2_conn_ctx_t *conn_ctx)
{
- apr_status_t rv = APR_SUCCESS;
- const char *name = "";
-
if (conn_ctx->pfd_out_prod.reqevents) {
- name = "adding out";
- rv = apr_pollset_add(m->pollset, &conn_ctx->pfd_out_prod);
- if (APR_SUCCESS != rv) goto cleanup;
- }
-
- if (conn_ctx->pfd_in_drain.reqevents) {
- name = "adding in_read";
- rv = apr_pollset_add(m->pollset, &conn_ctx->pfd_in_drain);
- }
-
-cleanup:
- if (APR_SUCCESS != rv) {
- ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, m->c1,
- "h2_mplx(%ld-%d): error while adding to pollset %s",
- m->id, conn_ctx->stream_id, name);
+ return apr_pollset_add(m->pollset, &conn_ctx->pfd_out_prod);
}
- return rv;
+ return APR_SUCCESS;
}
static apr_status_t mplx_pollset_remove(h2_mplx *m, h2_conn_ctx_t *conn_ctx)
{
apr_status_t rv = APR_SUCCESS;
- const char *name = "";
if (conn_ctx->pfd_out_prod.reqevents) {
rv = apr_pollset_remove(m->pollset, &conn_ctx->pfd_out_prod);
conn_ctx->pfd_out_prod.reqevents = 0;
- if (APR_SUCCESS != rv) goto cleanup;
- }
-
- if (conn_ctx->pfd_in_drain.reqevents) {
- name = "in_read";
- rv = apr_pollset_remove(m->pollset, &conn_ctx->pfd_in_drain);
- conn_ctx->pfd_in_drain.reqevents = 0;
- if (APR_SUCCESS != rv) goto cleanup;
- }
-
-cleanup:
- if (APR_SUCCESS != rv) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, rv, m->c1,
- "h2_mplx(%ld-%d): error removing from pollset %s",
- m->id, conn_ctx->stream_id, name);
}
return rv;
}
@@ -1168,16 +1183,21 @@ static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout,
apr_array_clear(m->streams_to_poll);
}
-#if !H2_POLL_STREAMS
apr_thread_mutex_lock(m->poll_lock);
- if (!h2_iq_empty(m->streams_input_read)
- || !h2_iq_empty(m->streams_output_written)) {
+ if (!h2_iq_empty(m->streams_input_read)) {
while ((i = h2_iq_shift(m->streams_input_read))) {
stream = h2_ihash_get(m->streams, i);
if (stream) {
APR_ARRAY_PUSH(m->streams_ev_in, h2_stream*) = stream;
}
}
+ nresults = 0;
+ rv = APR_SUCCESS;
+ apr_thread_mutex_unlock(m->poll_lock);
+ break;
+ }
+#if !H2_POLL_STREAMS
+ if (!h2_iq_empty(m->streams_output_written)) {
while ((i = h2_iq_shift(m->streams_output_written))) {
stream = h2_ihash_get(m->streams, i);
if (stream) {
@@ -1189,8 +1209,9 @@ static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout,
apr_thread_mutex_unlock(m->poll_lock);
break;
}
- apr_thread_mutex_unlock(m->poll_lock);
#endif
+ apr_thread_mutex_unlock(m->poll_lock);
+
H2_MPLX_LEAVE(m);
rv = apr_pollset_poll(m->pollset, timeout >= 0? timeout : -1, &nresults, &results);
H2_MPLX_ENTER_ALWAYS(m);
@@ -1276,14 +1297,6 @@ static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout,
pfd->rtnevents);
APR_ARRAY_PUSH(m->streams_ev_out, h2_stream*) = stream;
}
- else if (conn_ctx->pfd_in_drain.desc.f == pfd->desc.f) {
- /* input has been consumed */
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
- "[%s-%d] poll input event %hx",
- conn_ctx->id, conn_ctx->stream_id,
- pfd->rtnevents);
- APR_ARRAY_PUSH(m->streams_ev_in, h2_stream*) = stream;
- }
}
if (on_stream_input && m->streams_ev_in->nelts) {