summaryrefslogtreecommitdiffstats
path: root/modules/http2
diff options
context:
space:
mode:
Diffstat (limited to 'modules/http2')
-rw-r--r--modules/http2/h2_c2.c108
-rw-r--r--modules/http2/h2_c2.h9
-rw-r--r--modules/http2/h2_conn_ctx.c39
-rw-r--r--modules/http2/h2_conn_ctx.h16
-rw-r--r--modules/http2/h2_mplx.c241
-rw-r--r--modules/http2/h2_mplx.h15
-rw-r--r--modules/http2/h2_stream.c7
-rw-r--r--modules/http2/h2_workers.c66
-rw-r--r--modules/http2/h2_workers.h14
9 files changed, 218 insertions, 297 deletions
diff --git a/modules/http2/h2_c2.c b/modules/http2/h2_c2.c
index 00a783d28e..9dd90f2b05 100644
--- a/modules/http2/h2_c2.c
+++ b/modules/http2/h2_c2.c
@@ -131,12 +131,6 @@ int h2_mpm_supported(void)
return mpm_supported;
}
-static module *h2_conn_mpm_module(void)
-{
- check_modules(0);
- return mpm_module;
-}
-
apr_status_t h2_c2_child_init(apr_pool_t *pool, server_rec *s)
{
check_modules(1);
@@ -144,88 +138,6 @@ apr_status_t h2_c2_child_init(apr_pool_t *pool, server_rec *s)
APR_PROTO_TCP, pool);
}
-/* APR callback invoked if allocation fails. */
-static int abort_on_oom(int retcode)
-{
- ap_abort_on_oom();
- return retcode; /* unreachable, hopefully. */
-}
-
-conn_rec *h2_c2_create(conn_rec *c1, apr_pool_t *parent)
-{
- apr_allocator_t *allocator;
- apr_status_t status;
- apr_pool_t *pool;
- conn_rec *c2;
- void *cfg;
- module *mpm;
-
- ap_assert(c1);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, c1,
- "h2_c2: create for c1(%ld)", c1->id);
-
- /* We create a pool with its own allocator to be used for
- * processing a request. This is the only way to have the processing
- * independent of its parent pool in the sense that it can work in
- * another thread.
- */
- apr_allocator_create(&allocator);
- apr_allocator_max_free_set(allocator, ap_max_mem_free);
- status = apr_pool_create_ex(&pool, parent, NULL, allocator);
- if (status != APR_SUCCESS) {
- ap_log_cerror(APLOG_MARK, APLOG_ERR, status, c1,
- APLOGNO(10004) "h2_c2: create pool");
- return NULL;
- }
- apr_allocator_owner_set(allocator, pool);
- apr_pool_abort_set(abort_on_oom, pool);
- apr_pool_tag(pool, "h2_c2_conn");
-
- c2 = (conn_rec *) apr_palloc(pool, sizeof(conn_rec));
- memcpy(c2, c1, sizeof(conn_rec));
-
- c2->master = c1;
- c2->pool = pool;
- c2->conn_config = ap_create_conn_config(pool);
- c2->notes = apr_table_make(pool, 5);
- c2->input_filters = NULL;
- c2->output_filters = NULL;
- c2->keepalives = 0;
-#if AP_MODULE_MAGIC_AT_LEAST(20180903, 1)
- c2->filter_conn_ctx = NULL;
-#endif
- c2->bucket_alloc = apr_bucket_alloc_create(pool);
-#if !AP_MODULE_MAGIC_AT_LEAST(20180720, 1)
- c2->data_in_input_filters = 0;
- c2->data_in_output_filters = 0;
-#endif
- /* prevent mpm_event from making wrong assumptions about this connection,
- * like e.g. using its socket for an async read check. */
- c2->clogging_input_filters = 1;
- c2->log = NULL;
- c2->aborted = 0;
- /* We cannot install the master connection socket on the secondary, as
- * modules mess with timeouts/blocking of the socket, with
- * unwanted side effects to the master connection processing.
- * Fortunately, since we never use the secondary socket, we can just install
- * a single, process-wide dummy and everyone is happy.
- */
- ap_set_module_config(c2->conn_config, &core_module, dummy_socket);
- /* TODO: these should be unique to this thread */
- c2->sbh = NULL; /*c1->sbh;*/
- /* TODO: not all mpm modules have learned about secondary connections yet.
- * copy their config from master to secondary.
- */
- if ((mpm = h2_conn_mpm_module()) != NULL) {
- cfg = ap_get_module_config(c1->conn_config, mpm);
- ap_set_module_config(c2->conn_config, mpm, cfg);
- }
-
- ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, c2,
- "h2_c2(%s): created", c2->log_id);
- return c2;
-}
-
void h2_c2_destroy(conn_rec *c2)
{
ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, c2,
@@ -233,6 +145,21 @@ void h2_c2_destroy(conn_rec *c2)
apr_pool_destroy(c2->pool);
}
+void h2_c2_abort(conn_rec *c2, conn_rec *from)
+{
+ h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c2);
+
+ AP_DEBUG_ASSERT(conn_ctx);
+ AP_DEBUG_ASSERT(conn_ctx->stream_id);
+ if (conn_ctx->beam_in) {
+ h2_beam_abort(conn_ctx->beam_in, from);
+ }
+ if (conn_ctx->beam_out) {
+ h2_beam_abort(conn_ctx->beam_out, from);
+ }
+ c2->aborted = 1;
+}
+
typedef struct {
apr_bucket_brigade *bb; /* c2: data in holding area */
} h2_c2_fctx_in_t;
@@ -458,10 +385,7 @@ static apr_status_t h2_c2_filter_out(ap_filter_t* f, apr_bucket_brigade* bb)
"h2_c2(%s-%d): output leave",
conn_ctx->id, conn_ctx->stream_id);
if (APR_SUCCESS != rv) {
- if (!conn_ctx->done) {
- h2_beam_abort(conn_ctx->beam_out, f->c);
- }
- f->c->aborted = 1;
+ h2_c2_abort(f->c, f->c);
}
return rv;
}
diff --git a/modules/http2/h2_c2.h b/modules/http2/h2_c2.h
index 58964a2ac4..05454459fb 100644
--- a/modules/http2/h2_c2.h
+++ b/modules/http2/h2_c2.h
@@ -41,10 +41,17 @@ int h2_mpm_supported(void);
*/
apr_status_t h2_c2_child_init(apr_pool_t *pool, server_rec *s);
-conn_rec *h2_c2_create(conn_rec *c1, apr_pool_t *parent);
void h2_c2_destroy(conn_rec *c2);
/**
+ * Abort the I/O processing of a secondary connection. And
+ * in-/output beams will return errors and c2->aborted is set.
+ * @param c2 the secondary connection to abort
+ * @param from the connection this is invoked from
+ */
+void h2_c2_abort(conn_rec *c2, conn_rec *from);
+
+/**
* Process a secondary connection for a HTTP/2 stream request.
*/
apr_status_t h2_c2_process(conn_rec *c, apr_thread_t *thread, int worker_id);
diff --git a/modules/http2/h2_conn_ctx.c b/modules/http2/h2_conn_ctx.c
index ce5e9c1234..c5596e9b04 100644
--- a/modules/http2/h2_conn_ctx.c
+++ b/modules/http2/h2_conn_ctx.c
@@ -16,6 +16,7 @@
#include <assert.h>
#include <apr_strings.h>
+#include <apr_atomic.h>
#include <httpd.h>
#include <http_core.h>
@@ -42,6 +43,7 @@ static h2_conn_ctx_t *ctx_create(conn_rec *c, const char *id)
h2_conn_ctx_t *conn_ctx = apr_pcalloc(c->pool, sizeof(*conn_ctx));
conn_ctx->id = id;
conn_ctx->server = c->base_server;
+ apr_atomic_set32(&conn_ctx->started, 1);
conn_ctx->started_at = apr_time_now();
ap_set_module_config(c->conn_config, &http2_module, conn_ctx);
@@ -66,7 +68,8 @@ h2_conn_ctx_t *h2_conn_ctx_create_for_c1(conn_rec *c1, server_rec *s, const char
}
apr_status_t h2_conn_ctx_init_for_c2(h2_conn_ctx_t **pctx, conn_rec *c2,
- struct h2_mplx *mplx, struct h2_stream *stream)
+ struct h2_mplx *mplx, struct h2_stream *stream,
+ struct h2_c2_transit *transit)
{
h2_conn_ctx_t *conn_ctx;
apr_status_t rv = APR_SUCCESS;
@@ -85,10 +88,12 @@ apr_status_t h2_conn_ctx_init_for_c2(h2_conn_ctx_t **pctx, conn_rec *c2,
}
conn_ctx->mplx = mplx;
+ conn_ctx->transit = transit;
conn_ctx->stream_id = stream->id;
apr_pool_create(&conn_ctx->req_pool, c2->pool);
apr_pool_tag(conn_ctx->req_pool, "H2_C2_REQ");
conn_ctx->request = stream->request;
+ apr_atomic_set32(&conn_ctx->started, 1);
conn_ctx->started_at = apr_time_now();
conn_ctx->done = 0;
conn_ctx->done_at = 0;
@@ -97,38 +102,6 @@ apr_status_t h2_conn_ctx_init_for_c2(h2_conn_ctx_t **pctx, conn_rec *c2,
return rv;
}
-void h2_conn_ctx_clear_for_c2(conn_rec *c2)
-{
- h2_conn_ctx_t *conn_ctx;
-
- ap_assert(c2->master);
- conn_ctx = h2_conn_ctx_get(c2);
- conn_ctx->stream_id = -1;
- conn_ctx->request = NULL;
-
- if (conn_ctx->req_pool) {
- apr_pool_destroy(conn_ctx->req_pool);
- conn_ctx->req_pool = NULL;
- conn_ctx->beam_out = NULL;
- }
- memset(&conn_ctx->pfd_in_drain, 0, sizeof(conn_ctx->pfd_in_drain));
- memset(&conn_ctx->pfd_out_prod, 0, sizeof(conn_ctx->pfd_out_prod));
- conn_ctx->beam_in = NULL;
-}
-
-void h2_conn_ctx_destroy(conn_rec *c)
-{
- h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c);
-
- if (conn_ctx) {
- if (conn_ctx->mplx_pool) {
- apr_pool_destroy(conn_ctx->mplx_pool);
- conn_ctx->mplx_pool = NULL;
- }
- ap_set_module_config(c->conn_config, &http2_module, NULL);
- }
-}
-
void h2_conn_ctx_set_timeout(h2_conn_ctx_t *conn_ctx, apr_interval_time_t timeout)
{
if (conn_ctx->beam_out) {
diff --git a/modules/http2/h2_conn_ctx.h b/modules/http2/h2_conn_ctx.h
index b744e8dd29..110301620c 100644
--- a/modules/http2/h2_conn_ctx.h
+++ b/modules/http2/h2_conn_ctx.h
@@ -22,6 +22,7 @@ struct h2_stream;
struct h2_mplx;
struct h2_bucket_beam;
struct h2_response_parser;
+struct h2_c2_transit;
#define H2_PIPE_OUT 0
#define H2_PIPE_IN 1
@@ -40,6 +41,7 @@ struct h2_conn_ctx_t {
const char *protocol; /* c1: the protocol negotiated */
struct h2_session *session; /* c1: the h2 session established */
struct h2_mplx *mplx; /* c2: the multiplexer */
+ struct h2_c2_transit *transit; /* c2: transit pool and bucket_alloc */
int pre_conn_done; /* has pre_connection setup run? */
int stream_id; /* c1: 0, c2: stream id processed */
@@ -48,20 +50,17 @@ struct h2_conn_ctx_t {
struct h2_bucket_beam *beam_out; /* c2: data out, created from req_pool */
struct h2_bucket_beam *beam_in; /* c2: data in or NULL, borrowed from request stream */
- apr_pool_t *mplx_pool; /* c2: an mplx child pool for safe use inside mplx lock */
apr_file_t *pipe_in_prod[2]; /* c2: input produced notification pipe */
- apr_file_t *pipe_in_drain[2]; /* c2: input drained notification pipe */
apr_file_t *pipe_out_prod[2]; /* c2: output produced notification pipe */
- apr_pollfd_t pfd_in_drain; /* c2: poll pipe_in_drain output */
apr_pollfd_t pfd_out_prod; /* c2: poll pipe_out_prod output */
int has_final_response; /* final HTTP response passed on out */
apr_status_t last_err; /* APR_SUCCES or last error encountered in filters */
- struct h2_response_parser *parser; /* optional parser to catch H1 responses */
- volatile int done; /* c2: processing has finished */
+ /* atomic */ apr_uint32_t started; /* c2: processing was started */
apr_time_t started_at; /* c2: when processing started */
+ /* atomic */ apr_uint32_t done; /* c2: processing has finished */
apr_time_t done_at; /* c2: when processing was done */
};
typedef struct h2_conn_ctx_t h2_conn_ctx_t;
@@ -84,14 +83,11 @@ typedef struct h2_conn_ctx_t h2_conn_ctx_t;
h2_conn_ctx_t *h2_conn_ctx_create_for_c1(conn_rec *c, server_rec *s, const char *protocol);
apr_status_t h2_conn_ctx_init_for_c2(h2_conn_ctx_t **pctx, conn_rec *c,
- struct h2_mplx *mplx, struct h2_stream *stream);
-
-void h2_conn_ctx_clear_for_c2(conn_rec *c2);
+ struct h2_mplx *mplx, struct h2_stream *stream,
+ struct h2_c2_transit *transit);
void h2_conn_ctx_detach(conn_rec *c);
-void h2_conn_ctx_destroy(conn_rec *c);
-
void h2_conn_ctx_set_timeout(h2_conn_ctx_t *conn_ctx, apr_interval_time_t timeout);
#endif /* defined(__mod_h2__h2_conn_ctx__) */
diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c
index c083f13c12..c1da860b67 100644
--- a/modules/http2/h2_mplx.c
+++ b/modules/http2/h2_mplx.c
@@ -26,6 +26,7 @@
#include <httpd.h>
#include <http_core.h>
+#include <http_connection.h>
#include <http_log.h>
#include <mpm_common.h>
@@ -69,6 +70,13 @@ static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout,
static apr_pool_t *pchild;
+/* APR callback invoked if allocation fails. */
+static int abort_on_oom(int retcode)
+{
+ ap_abort_on_oom();
+ return retcode; /* unreachable, hopefully. */
+}
+
apr_status_t h2_mplx_c1_child_init(apr_pool_t *pool, server_rec *s)
{
pchild = pool;
@@ -100,7 +108,8 @@ static void c1_input_consumed(void *ctx, h2_bucket_beam *beam, apr_off_t length)
static int stream_is_running(h2_stream *stream)
{
h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(stream->c2);
- return conn_ctx && conn_ctx->started_at != 0 && !conn_ctx->done;
+ return conn_ctx && apr_atomic_read32(&conn_ctx->started) != 0
+ && apr_atomic_read32(&conn_ctx->done) == 0;
}
int h2_mplx_c1_stream_is_running(h2_mplx *m, h2_stream *stream)
@@ -153,13 +162,7 @@ static void m_stream_cleanup(h2_mplx *m, h2_stream *stream)
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
H2_STRM_MSG(stream, "cleanup, c2 is running, abort"));
/* c2 is still running */
- stream->c2->aborted = 1;
- if (stream->input) {
- h2_beam_abort(stream->input, m->c1);
- }
- if (stream->output) {
- h2_beam_abort(stream->output, m->c1);
- }
+ h2_c2_abort(stream->c2, m->c1);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
H2_STRM_MSG(stream, "cleanup, c2 is done, move to shold"));
h2_ihash_add(m->shold, stream);
@@ -173,6 +176,66 @@ static void m_stream_cleanup(h2_mplx *m, h2_stream *stream)
}
}
+static h2_c2_transit *c2_transit_create(h2_mplx *m)
+{
+ apr_allocator_t *allocator;
+ apr_pool_t *ptrans;
+ h2_c2_transit *transit;
+ apr_status_t rv;
+
+ /* We create a pool with its own allocator to be used for
+ * processing a request. This is the only way to have the processing
+ * independent of its parent pool in the sense that it can work in
+ * another thread.
+ */
+
+ rv = apr_allocator_create(&allocator);
+ if (rv == APR_SUCCESS) {
+ apr_allocator_max_free_set(allocator, ap_max_mem_free);
+ rv = apr_pool_create_ex(&ptrans, m->pool, NULL, allocator);
+ }
+ if (rv != APR_SUCCESS) {
+ /* maybe the log goes through, maybe not. */
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, m->c1,
+ APLOGNO(10004) "h2_mplx: create transit pool");
+ ap_abort_on_oom();
+ return NULL; /* should never be reached. */
+ }
+
+ apr_allocator_owner_set(allocator, ptrans);
+ apr_pool_abort_set(abort_on_oom, ptrans);
+ apr_pool_tag(ptrans, "h2_c2_transit");
+
+ transit = apr_pcalloc(ptrans, sizeof(*transit));
+ transit->pool = ptrans;
+ transit->bucket_alloc = apr_bucket_alloc_create(ptrans);
+ return transit;
+}
+
+static void c2_transit_destroy(h2_c2_transit *transit)
+{
+ apr_pool_destroy(transit->pool);
+}
+
+static h2_c2_transit *c2_transit_get(h2_mplx *m)
+{
+ h2_c2_transit **ptransit = apr_array_pop(m->c2_transits);
+ if (ptransit) {
+ return *ptransit;
+ }
+ return c2_transit_create(m);
+}
+
+static void c2_transit_recycle(h2_mplx *m, h2_c2_transit *transit)
+{
+ if (m->c2_transits->nelts >= m->max_spare_transits) {
+ c2_transit_destroy(transit);
+ }
+ else {
+ APR_ARRAY_PUSH(m->c2_transits, h2_c2_transit*) = transit;
+ }
+}
+
/**
* A h2_mplx needs to be thread-safe *and* if will be called by
* the h2_session thread *and* the h2_worker threads. Therefore:
@@ -254,11 +317,11 @@ h2_mplx *h2_mplx_c1_create(h2_stream *stream0, server_rec *s, apr_pool_t *parent
m->streams_ev_in = apr_array_make(m->pool, 10, sizeof(h2_stream*));
m->streams_ev_out = apr_array_make(m->pool, 10, sizeof(h2_stream*));
-#if !H2_POLL_STREAMS
+ m->streams_input_read = h2_iq_create(m->pool, 10);
status = apr_thread_mutex_create(&m->poll_lock, APR_THREAD_MUTEX_DEFAULT,
m->pool);
if (APR_SUCCESS != status) goto failure;
- m->streams_input_read = h2_iq_create(m->pool, 10);
+#if !H2_POLL_STREAMS
m->streams_output_written = h2_iq_create(m->pool, 10);
#endif
@@ -266,6 +329,8 @@ h2_mplx *h2_mplx_c1_create(h2_stream *stream0, server_rec *s, apr_pool_t *parent
mplx_pollset_add(m, conn_ctx);
m->scratch_r = apr_pcalloc(m->pool, sizeof(*m->scratch_r));
+ m->max_spare_transits = 3;
+ m->c2_transits = apr_array_make(m->pool, m->max_spare_transits, sizeof(h2_c2_transit*));
return m;
@@ -331,8 +396,9 @@ static int m_report_stream_iter(void *ctx, void *val) {
H2_STRM_MSG(stream, "->03198: %s %s %s"
"[started=%d/done=%d]"),
conn_ctx->request->method, conn_ctx->request->authority,
- conn_ctx->request->path, conn_ctx->started_at != 0,
- conn_ctx->done);
+ conn_ctx->request->path,
+ (int)apr_atomic_read32(&conn_ctx->started),
+ (int)apr_atomic_read32(&conn_ctx->done));
}
else {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c1, /* NO APLOGNO */
@@ -354,10 +420,6 @@ static int m_stream_cancel_iter(void *ctx, void *val) {
h2_mplx *m = ctx;
h2_stream *stream = val;
- /* disable input consumed reporting */
- if (stream->input) {
- h2_beam_abort(stream->input, m->c1);
- }
/* take over event monitoring */
h2_stream_set_monitor(stream, NULL);
/* Reset, should transit to CLOSED state */
@@ -499,8 +561,11 @@ static void c1_purge_streams(h2_mplx *m)
m->id, stream->id, c2_ctx->stream_id);
}
- h2_conn_ctx_destroy(c2);
h2_c2_destroy(c2);
+ if (c2_ctx->transit) {
+ c2_transit_recycle(m, c2_ctx->transit);
+ c2_ctx->transit = NULL;
+ }
}
h2_stream_destroy(stream);
}
@@ -699,17 +764,10 @@ static void c2_beam_input_read_notify(void *ctx, h2_bucket_beam *beam)
h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c);
if (conn_ctx && conn_ctx->stream_id) {
- if (conn_ctx->pipe_in_drain[H2_PIPE_IN]) {
- apr_file_putc(1, conn_ctx->pipe_in_drain[H2_PIPE_IN]);
- }
-#if !H2_POLL_STREAMS
- else {
- apr_thread_mutex_lock(conn_ctx->mplx->poll_lock);
- h2_iq_append(conn_ctx->mplx->streams_input_read, conn_ctx->stream_id);
- apr_pollset_wakeup(conn_ctx->mplx->pollset);
- apr_thread_mutex_unlock(conn_ctx->mplx->poll_lock);
- }
-#endif
+ apr_thread_mutex_lock(conn_ctx->mplx->poll_lock);
+ h2_iq_append(conn_ctx->mplx->streams_input_read, conn_ctx->stream_id);
+ apr_pollset_wakeup(conn_ctx->mplx->pollset);
+ apr_thread_mutex_unlock(conn_ctx->mplx->poll_lock);
}
}
@@ -733,13 +791,13 @@ static void c2_beam_output_write_notify(void *ctx, h2_bucket_beam *beam)
}
}
-static apr_status_t c2_setup_io(h2_mplx *m, conn_rec *c2, h2_stream *stream)
+static apr_status_t c2_setup_io(h2_mplx *m, conn_rec *c2, h2_stream *stream, h2_c2_transit *transit)
{
h2_conn_ctx_t *conn_ctx;
apr_status_t rv = APR_SUCCESS;
const char *action = "init";
- rv = h2_conn_ctx_init_for_c2(&conn_ctx, c2, m, stream);
+ rv = h2_conn_ctx_init_for_c2(&conn_ctx, c2, m, stream, transit);
if (APR_SUCCESS != rv) goto cleanup;
if (!conn_ctx->beam_out) {
@@ -758,22 +816,14 @@ static apr_status_t c2_setup_io(h2_mplx *m, conn_rec *c2, h2_stream *stream)
h2_beam_on_received(stream->input, c2_beam_input_read_notify, c2);
h2_beam_on_consumed(stream->input, c1_input_consumed, stream);
}
- else {
- memset(&conn_ctx->pfd_in_drain, 0, sizeof(conn_ctx->pfd_in_drain));
- }
#if H2_POLL_STREAMS
- if (!conn_ctx->mplx_pool) {
- apr_pool_create(&conn_ctx->mplx_pool, m->pool);
- apr_pool_tag(conn_ctx->mplx_pool, "H2_MPLX_C2");
- }
-
if (!conn_ctx->pipe_out_prod[H2_PIPE_OUT]) {
action = "create output pipe";
rv = apr_file_pipe_create_pools(&conn_ctx->pipe_out_prod[H2_PIPE_OUT],
&conn_ctx->pipe_out_prod[H2_PIPE_IN],
APR_FULL_NONBLOCK,
- conn_ctx->mplx_pool, c2->pool);
+ c2->pool, c2->pool);
if (APR_SUCCESS != rv) goto cleanup;
}
conn_ctx->pfd_out_prod.desc_type = APR_POLL_FILE;
@@ -787,26 +837,13 @@ static apr_status_t c2_setup_io(h2_mplx *m, conn_rec *c2, h2_stream *stream)
rv = apr_file_pipe_create_pools(&conn_ctx->pipe_in_prod[H2_PIPE_OUT],
&conn_ctx->pipe_in_prod[H2_PIPE_IN],
APR_READ_BLOCK,
- c2->pool, conn_ctx->mplx_pool);
- if (APR_SUCCESS != rv) goto cleanup;
- }
- if (!conn_ctx->pipe_in_drain[H2_PIPE_OUT]) {
- action = "create input read pipe";
- rv = apr_file_pipe_create_pools(&conn_ctx->pipe_in_drain[H2_PIPE_OUT],
- &conn_ctx->pipe_in_drain[H2_PIPE_IN],
- APR_FULL_NONBLOCK,
- c2->pool, conn_ctx->mplx_pool);
+ c2->pool, c2->pool);
if (APR_SUCCESS != rv) goto cleanup;
}
- conn_ctx->pfd_in_drain.desc_type = APR_POLL_FILE;
- conn_ctx->pfd_in_drain.desc.f = conn_ctx->pipe_in_drain[H2_PIPE_OUT];
- conn_ctx->pfd_in_drain.reqevents = APR_POLLIN | APR_POLLERR | APR_POLLHUP;
- conn_ctx->pfd_in_drain.client_data = conn_ctx;
}
#else
memset(&conn_ctx->pfd_out_prod, 0, sizeof(conn_ctx->pfd_out_prod));
memset(&conn_ctx->pipe_in_prod, 0, sizeof(conn_ctx->pipe_in_prod));
- memset(&conn_ctx->pipe_in_drain, 0, sizeof(conn_ctx->pipe_in_drain));
#endif
cleanup:
@@ -822,9 +859,10 @@ cleanup:
static conn_rec *s_next_c2(h2_mplx *m)
{
h2_stream *stream = NULL;
- apr_status_t rv;
+ apr_status_t rv = APR_SUCCESS;
int sid;
- conn_rec *c2;
+ conn_rec *c2 = NULL;
+ h2_c2_transit *transit = NULL;
while (!m->aborted && !stream && (m->processing_count < m->processing_limit)
&& (sid = h2_iq_shift(m->q)) > 0) {
@@ -838,27 +876,35 @@ static conn_rec *s_next_c2(h2_mplx *m)
"Current limit is %d and %d workers are in use.",
m->id, m->processing_limit, m->processing_count);
}
- return NULL;
+ goto cleanup;
}
if (sid > m->max_stream_id_started) {
m->max_stream_id_started = sid;
}
- c2 = h2_c2_create(m->c1, m->pool);
+ transit = c2_transit_get(m);
+ c2 = ap_create_secondary_connection(transit->pool, m->c1, transit->bucket_alloc);
+ if (!c2) goto cleanup;
ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c1,
H2_STRM_MSG(stream, "created new c2"));
- rv = c2_setup_io(m, c2, stream);
- if (APR_SUCCESS != rv) {
- return NULL;
- }
+ rv = c2_setup_io(m, c2, stream, transit);
+ if (APR_SUCCESS != rv) goto cleanup;
stream->c2 = c2;
++m->processing_count;
APR_ARRAY_PUSH(m->streams_to_poll, h2_stream *) = stream;
apr_pollset_wakeup(m->pollset);
+cleanup:
+ if (APR_SUCCESS != rv && c2) {
+ h2_c2_destroy(c2);
+ c2 = NULL;
+ }
+ if (transit && !c2) {
+ c2_transit_recycle(m, transit);
+ }
return c2;
}
@@ -896,8 +942,8 @@ static void s_c2_done(h2_mplx *m, conn_rec *c2, h2_conn_ctx_t *conn_ctx)
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c2,
"h2_mplx(%s-%d): c2 done", conn_ctx->id, conn_ctx->stream_id);
- ap_assert(conn_ctx->done == 0);
- conn_ctx->done = 1;
+ AP_DEBUG_ASSERT(apr_atomic_read32(&conn_ctx->done) == 0);
+ apr_atomic_set32(&conn_ctx->done, 1);
conn_ctx->done_at = apr_time_now();
++c2->keepalives;
/* From here on, the final handling of c2 is done by c1 processing.
@@ -955,16 +1001,18 @@ static void s_c2_done(h2_mplx *m, conn_rec *c2, h2_conn_ctx_t *conn_ctx)
void h2_mplx_worker_c2_done(conn_rec *c2, conn_rec **out_c2)
{
h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c2);
- h2_mplx *m;
+ h2_mplx *m = conn_ctx? conn_ctx->mplx : NULL;
- if (!conn_ctx || !conn_ctx->mplx) return;
- m = conn_ctx->mplx;
+ if (!m) {
+ if (out_c2) *out_c2 = NULL;
+ return;
+ }
H2_MPLX_ENTER_ALWAYS(m);
--m->processing_count;
s_c2_done(m, c2, conn_ctx);
-
+
if (m->join_wait) {
apr_thread_cond_signal(m->join_wait);
}
@@ -1084,52 +1132,19 @@ static apr_status_t mplx_pollset_create(h2_mplx *m)
static apr_status_t mplx_pollset_add(h2_mplx *m, h2_conn_ctx_t *conn_ctx)
{
- apr_status_t rv = APR_SUCCESS;
- const char *name = "";
-
if (conn_ctx->pfd_out_prod.reqevents) {
- name = "adding out";
- rv = apr_pollset_add(m->pollset, &conn_ctx->pfd_out_prod);
- if (APR_SUCCESS != rv) goto cleanup;
- }
-
- if (conn_ctx->pfd_in_drain.reqevents) {
- name = "adding in_read";
- rv = apr_pollset_add(m->pollset, &conn_ctx->pfd_in_drain);
- }
-
-cleanup:
- if (APR_SUCCESS != rv) {
- ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, m->c1,
- "h2_mplx(%ld-%d): error while adding to pollset %s",
- m->id, conn_ctx->stream_id, name);
+ return apr_pollset_add(m->pollset, &conn_ctx->pfd_out_prod);
}
- return rv;
+ return APR_SUCCESS;
}
static apr_status_t mplx_pollset_remove(h2_mplx *m, h2_conn_ctx_t *conn_ctx)
{
apr_status_t rv = APR_SUCCESS;
- const char *name = "";
if (conn_ctx->pfd_out_prod.reqevents) {
rv = apr_pollset_remove(m->pollset, &conn_ctx->pfd_out_prod);
conn_ctx->pfd_out_prod.reqevents = 0;
- if (APR_SUCCESS != rv) goto cleanup;
- }
-
- if (conn_ctx->pfd_in_drain.reqevents) {
- name = "in_read";
- rv = apr_pollset_remove(m->pollset, &conn_ctx->pfd_in_drain);
- conn_ctx->pfd_in_drain.reqevents = 0;
- if (APR_SUCCESS != rv) goto cleanup;
- }
-
-cleanup:
- if (APR_SUCCESS != rv) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, rv, m->c1,
- "h2_mplx(%ld-%d): error removing from pollset %s",
- m->id, conn_ctx->stream_id, name);
}
return rv;
}
@@ -1168,16 +1183,21 @@ static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout,
apr_array_clear(m->streams_to_poll);
}
-#if !H2_POLL_STREAMS
apr_thread_mutex_lock(m->poll_lock);
- if (!h2_iq_empty(m->streams_input_read)
- || !h2_iq_empty(m->streams_output_written)) {
+ if (!h2_iq_empty(m->streams_input_read)) {
while ((i = h2_iq_shift(m->streams_input_read))) {
stream = h2_ihash_get(m->streams, i);
if (stream) {
APR_ARRAY_PUSH(m->streams_ev_in, h2_stream*) = stream;
}
}
+ nresults = 0;
+ rv = APR_SUCCESS;
+ apr_thread_mutex_unlock(m->poll_lock);
+ break;
+ }
+#if !H2_POLL_STREAMS
+ if (!h2_iq_empty(m->streams_output_written)) {
while ((i = h2_iq_shift(m->streams_output_written))) {
stream = h2_ihash_get(m->streams, i);
if (stream) {
@@ -1189,8 +1209,9 @@ static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout,
apr_thread_mutex_unlock(m->poll_lock);
break;
}
- apr_thread_mutex_unlock(m->poll_lock);
#endif
+ apr_thread_mutex_unlock(m->poll_lock);
+
H2_MPLX_LEAVE(m);
rv = apr_pollset_poll(m->pollset, timeout >= 0? timeout : -1, &nresults, &results);
H2_MPLX_ENTER_ALWAYS(m);
@@ -1276,14 +1297,6 @@ static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout,
pfd->rtnevents);
APR_ARRAY_PUSH(m->streams_ev_out, h2_stream*) = stream;
}
- else if (conn_ctx->pfd_in_drain.desc.f == pfd->desc.f) {
- /* input has been consumed */
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
- "[%s-%d] poll input event %hx",
- conn_ctx->id, conn_ctx->stream_id,
- pfd->rtnevents);
- APR_ARRAY_PUSH(m->streams_ev_in, h2_stream*) = stream;
- }
}
if (on_stream_input && m->streams_ev_in->nelts) {
diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h
index d5c4f9916f..662682a05b 100644
--- a/modules/http2/h2_mplx.h
+++ b/modules/http2/h2_mplx.h
@@ -44,6 +44,13 @@ struct h2_iqueue;
#include <apr_queue.h>
+typedef struct h2_c2_transit h2_c2_transit;
+
+struct h2_c2_transit {
+ apr_pool_t *pool;
+ apr_bucket_alloc_t *bucket_alloc;
+};
+
typedef struct h2_mplx h2_mplx;
struct h2_mplx {
@@ -83,14 +90,16 @@ struct h2_mplx {
apr_array_header_t *streams_ev_in;
apr_array_header_t *streams_ev_out;
-#if !H2_POLL_STREAMS
- apr_thread_mutex_t *poll_lock; /* not the painter */
+ apr_thread_mutex_t *poll_lock; /* protect modifications of queues below */
struct h2_iqueue *streams_input_read; /* streams whose input has been read from */
struct h2_iqueue *streams_output_written; /* streams whose output has been written to */
-#endif
+
struct h2_workers *workers; /* h2 workers process wide instance */
request_rec *scratch_r; /* pseudo request_rec for scoreboard reporting */
+
+ apr_size_t max_spare_transits; /* max number of transit pools idling */
+ apr_array_header_t *c2_transits; /* base pools for running c2 connections */
};
apr_status_t h2_mplx_c1_child_init(apr_pool_t *pool, server_rec *s);
diff --git a/modules/http2/h2_stream.c b/modules/http2/h2_stream.c
index a79c7cdd0c..da8443fe7c 100644
--- a/modules/http2/h2_stream.c
+++ b/modules/http2/h2_stream.c
@@ -573,11 +573,8 @@ void h2_stream_destroy(h2_stream *stream)
void h2_stream_rst(h2_stream *stream, int error_code)
{
stream->rst_error = error_code;
- if (stream->input) {
- h2_beam_abort(stream->input, stream->session->c1);
- }
- if (stream->output) {
- h2_beam_abort(stream->output, stream->session->c1);
+ if (stream->c2) {
+ h2_c2_abort(stream->c2, stream->session->c1);
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c1,
H2_STRM_MSG(stream, "reset, error=%d"), error_code);
diff --git a/modules/http2/h2_workers.c b/modules/http2/h2_workers.c
index 1e7aebdca8..7299a2b0d6 100644
--- a/modules/http2/h2_workers.c
+++ b/modules/http2/h2_workers.c
@@ -40,7 +40,7 @@ struct h2_slot {
apr_thread_t *thread;
apr_thread_mutex_t *lock;
apr_thread_cond_t *not_idle;
- volatile apr_uint32_t timed_out;
+ /* atomic */ apr_uint32_t timed_out;
};
static h2_slot *pop_slot(h2_slot *volatile *phead)
@@ -99,7 +99,7 @@ static apr_status_t activate_slot(h2_workers *workers, h2_slot *slot)
/* thread will either immediately start work or add itself
* to the idle queue */
apr_atomic_inc32(&workers->worker_count);
- slot->timed_out = 0;
+ apr_atomic_set32(&slot->timed_out, 0);
rv = ap_thread_create(&slot->thread, workers->thread_attr,
slot_run, slot, workers->pool);
if (rv != APR_SUCCESS) {
@@ -125,22 +125,22 @@ static apr_status_t add_worker(h2_workers *workers)
static void wake_idle_worker(h2_workers *workers)
{
- h2_slot *slot = pop_slot(&workers->idle);
- if (slot) {
- int timed_out = 0;
- apr_thread_mutex_lock(slot->lock);
- timed_out = slot->timed_out;
- if (!timed_out) {
- apr_thread_cond_signal(slot->not_idle);
+ h2_slot *slot;;
+ for (;;) {
+ slot = pop_slot(&workers->idle);
+ if (!slot) {
+ if (workers->dynamic && apr_atomic_read32(&workers->shutdown) == 0) {
+ add_worker(workers);
+ }
+ return;
}
- apr_thread_mutex_unlock(slot->lock);
- if (timed_out) {
- slot_done(slot);
- wake_idle_worker(workers);
+ if (!apr_atomic_read32(&slot->timed_out)) {
+ apr_thread_mutex_lock(slot->lock);
+ apr_thread_cond_signal(slot->not_idle);
+ apr_thread_mutex_unlock(slot->lock);
+ return;
}
- }
- else if (workers->dynamic && !workers->shutdown) {
- add_worker(workers);
+ slot_done(slot);
}
}
@@ -190,9 +190,10 @@ static int get_next(h2_slot *slot)
int non_essential = slot->id >= workers->min_workers;
apr_status_t rv;
- while (!workers->aborted && !slot->timed_out) {
+ while (apr_atomic_read32(&workers->aborted) == 0
+ && apr_atomic_read32(&slot->timed_out) == 0) {
ap_assert(slot->connection == NULL);
- if (non_essential && workers->shutdown) {
+ if (non_essential && apr_atomic_read32(&workers->shutdown)) {
/* Terminate non-essential worker on shutdown */
break;
}
@@ -208,14 +209,16 @@ static int get_next(h2_slot *slot)
join_zombies(workers);
apr_thread_mutex_lock(slot->lock);
- if (!workers->aborted) {
+ if (apr_atomic_read32(&workers->aborted) == 0) {
+ apr_uint32_t idle_secs;
push_slot(&workers->idle, slot);
- if (non_essential && workers->max_idle_duration) {
+ if (non_essential
+ && (idle_secs = apr_atomic_read32(&workers->max_idle_secs))) {
rv = apr_thread_cond_timedwait(slot->not_idle, slot->lock,
- workers->max_idle_duration);
+ apr_time_from_sec(idle_secs));
if (APR_TIMEUP == rv) {
- slot->timed_out = 1;
+ apr_atomic_set32(&slot->timed_out, 1);
}
}
else {
@@ -237,7 +240,8 @@ static void slot_done(h2_slot *slot)
/* If this worker is the last one exiting and the MPM child is stopping,
* unblock workers_pool_cleanup().
*/
- if (!apr_atomic_dec32(&workers->worker_count) && workers->aborted) {
+ if (!apr_atomic_dec32(&workers->worker_count)
+ && apr_atomic_read32(&workers->aborted)) {
apr_thread_mutex_lock(workers->lock);
apr_thread_cond_signal(workers->all_done);
apr_thread_mutex_unlock(workers->lock);
@@ -254,7 +258,7 @@ static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx)
do {
ap_assert(slot->connection != NULL);
h2_c2_process(slot->connection, thread, slot->id);
- if (!slot->workers->aborted &&
+ if (apr_atomic_read32(&slot->workers->aborted) == 0 &&
apr_atomic_read32(&slot->workers->worker_count) < slot->workers->max_workers) {
h2_mplx_worker_c2_done(slot->connection, &slot->connection);
}
@@ -265,7 +269,7 @@ static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx)
} while (slot->connection);
}
- if (!slot->timed_out) {
+ if (apr_atomic_read32(&slot->timed_out) == 0) {
slot_done(slot);
}
@@ -294,8 +298,8 @@ static void workers_abort_idle(h2_workers *workers)
{
h2_slot *slot;
- workers->shutdown = 1;
- workers->aborted = 1;
+ apr_atomic_set32(&workers->shutdown, 1);
+ apr_atomic_set32(&workers->aborted, 1);
h2_fifo_term(workers->mplxs);
/* abort all idle slots */
@@ -379,12 +383,12 @@ h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pchild,
workers->pool = pool;
workers->min_workers = min_workers;
workers->max_workers = max_workers;
- workers->max_idle_duration = apr_time_from_sec((idle_secs > 0)? idle_secs : 10);
+ workers->max_idle_secs = (idle_secs > 0)? idle_secs : 10;
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s,
"h2_workers: created with min=%d max=%d idle_timeout=%d sec",
workers->min_workers, workers->max_workers,
- (int)apr_time_sec(workers->max_idle_duration));
+ (int)workers->max_idle_secs);
/* FIXME: the fifo set we use here has limited capacity. Once the
* set is full, connections with new requests do a wait.
*/
@@ -460,7 +464,7 @@ apr_status_t h2_workers_unregister(h2_workers *workers, struct h2_mplx *m)
void h2_workers_graceful_shutdown(h2_workers *workers)
{
- workers->shutdown = 1;
- workers->max_idle_duration = apr_time_from_sec(1);
+ apr_atomic_set32(&workers->shutdown, 1);
+ apr_atomic_set32(&workers->max_idle_secs, 1);
wake_non_essential_workers(workers);
}
diff --git a/modules/http2/h2_workers.h b/modules/http2/h2_workers.h
index c77cf1a6bb..0de3040676 100644
--- a/modules/http2/h2_workers.h
+++ b/modules/http2/h2_workers.h
@@ -38,19 +38,17 @@ struct h2_workers {
int next_worker_id;
apr_uint32_t max_workers;
- volatile apr_uint32_t min_workers; /* is changed during graceful shutdown */
- volatile apr_interval_time_t max_idle_duration; /* is changed during graceful shutdown */
-
- volatile int aborted;
- volatile int shutdown;
+ apr_uint32_t min_workers;
+ /* atomic */ apr_uint32_t worker_count;
+ /* atomic */ apr_uint32_t max_idle_secs;
+ /* atomic */ apr_uint32_t aborted;
+ /* atomic */ apr_uint32_t shutdown;
int dynamic;
apr_threadattr_t *thread_attr;
int nslots;
struct h2_slot *slots;
-
- volatile apr_uint32_t worker_count;
-
+
struct h2_slot *free;
struct h2_slot *idle;
struct h2_slot *zombies;