summaryrefslogtreecommitdiffstats
path: root/modules
diff options
context:
space:
mode:
authorStefan Eissing <icing@apache.org>2016-05-20 16:37:39 +0200
committerStefan Eissing <icing@apache.org>2016-05-20 16:37:39 +0200
commit134f867cea9f256a49cdab8a1a94467b9ef3bfe5 (patch)
tree47860851d1928b5ae8c5dffc3120df4904ca3d12 /modules
parentmod_http2: improved resume/response/window update handling on master connection (diff)
downloadapache2-134f867cea9f256a49cdab8a1a94467b9ef3bfe5.tar.xz
apache2-134f867cea9f256a49cdab8a1a94467b9ef3bfe5.zip
mod_http2: fixing re-entrancy problems with new master event dispatching
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1744751 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'modules')
-rw-r--r--modules/http2/h2_bucket_beam.c34
-rw-r--r--modules/http2/h2_mplx.c185
-rw-r--r--modules/http2/h2_util.c73
-rw-r--r--modules/http2/h2_util.h4
4 files changed, 187 insertions, 109 deletions
diff --git a/modules/http2/h2_bucket_beam.c b/modules/http2/h2_bucket_beam.c
index 8d0892db72..d648b1d159 100644
--- a/modules/http2/h2_bucket_beam.c
+++ b/modules/http2/h2_bucket_beam.c
@@ -706,13 +706,13 @@ apr_status_t h2_beam_send(h2_bucket_beam *beam,
status = APR_ECONNABORTED;
}
else if (red_brigade) {
- int not_emtpy = APR_BRIGADE_EMPTY(red_brigade);
+ int force_report = !APR_BRIGADE_EMPTY(red_brigade);
while (!APR_BRIGADE_EMPTY(red_brigade)
&& status == APR_SUCCESS) {
bred = APR_BRIGADE_FIRST(red_brigade);
status = append_bucket(beam, bred, block, beam->red_pool, &bl);
}
- report_production(beam, not_emtpy);
+ report_production(beam, force_report);
if (beam->m_cond) {
apr_thread_cond_broadcast(beam->m_cond);
}
@@ -771,8 +771,8 @@ transfer:
if (APR_BUCKET_IS_METADATA(bred)) {
if (APR_BUCKET_IS_EOS(bred)) {
- beam->close_sent = 1;
bgreen = apr_bucket_eos_create(bb->bucket_alloc);
+ beam->close_sent = 1;
}
else if (APR_BUCKET_IS_FLUSH(bred)) {
bgreen = apr_bucket_flush_create(bb->bucket_alloc);
@@ -850,28 +850,24 @@ transfer:
}
}
- if ((!beam->green || APR_BRIGADE_EMPTY(beam->green))
- && H2_BLIST_EMPTY(&beam->red)
- && beam->closed && !beam->close_sent) {
- apr_bucket *b = apr_bucket_eos_create(bb->bucket_alloc);
- APR_BRIGADE_INSERT_TAIL(bb, b);
- beam->close_sent = 1;
- ++transferred;
- status = APR_SUCCESS;
- }
- else if (transferred) {
- status = APR_SUCCESS;
- }
- else if (beam->closed) {
+ if (beam->closed
+ && (!beam->green || APR_BRIGADE_EMPTY(beam->green))
+ && H2_BLIST_EMPTY(&beam->red)) {
+ /* beam is closed and we have nothing more to receive */
if (!beam->close_sent) {
apr_bucket *b = apr_bucket_eos_create(bb->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(bb, b);
beam->close_sent = 1;
+ ++transferred;
status = APR_SUCCESS;
}
- else {
- status = APR_EOF;
- }
+ }
+
+ if (transferred) {
+ status = APR_SUCCESS;
+ }
+ else if (beam->closed) {
+ status = APR_EOF;
}
else if (block == APR_BLOCK_READ && bl.mutex && beam->m_cond) {
status = wait_cond(beam, bl.mutex);
diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c
index 4861c06550..f7b30fffad 100644
--- a/modules/http2/h2_mplx.c
+++ b/modules/http2/h2_mplx.c
@@ -369,6 +369,7 @@ static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master)
if (task->output.beam) {
m->tx_handles_reserved +=
h2_beam_get_files_beamed(task->output.beam);
+ h2_beam_on_produced(task->output.beam, NULL, NULL);
}
slave = task->c;
@@ -502,6 +503,17 @@ static int task_abort_connection(void *ctx, void *val)
return 1;
}
+static int report_stream_iter(void *ctx, void *val) {
+ h2_mplx *m = ctx;
+ h2_stream *stream = val;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld-%d): exists, started=%d, scheduled=%d, "
+ "submitted=%d, suspended=%d",
+ m->id, stream->id, stream->started, stream->scheduled,
+ stream->submitted, stream->suspended);
+ return 1;
+}
+
apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
{
apr_status_t status;
@@ -511,6 +523,17 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
int i, wait_secs = 5;
+
+ if (!h2_ihash_empty(m->streams) && APLOGctrace1(m->c)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): release_join with %d streams open, "
+ "%d streams resume, %d streams ready, %d tasks",
+ m->id, (int)h2_ihash_count(m->streams),
+ (int)h2_ihash_count(m->sresume),
+ (int)h2_ihash_count(m->sready),
+ (int)h2_ihash_count(m->tasks));
+ h2_ihash_iter(m->streams, report_stream_iter, m);
+ }
/* disable WINDOW_UPDATE callbacks */
h2_mplx_set_consumed_cb(m, NULL, NULL);
@@ -585,10 +608,9 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
purge_streams(m);
}
AP_DEBUG_ASSERT(h2_ihash_empty(m->spurge));
- AP_DEBUG_ASSERT(h2_ihash_empty(m->tasks));
if (!h2_ihash_empty(m->tasks)) {
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03056)
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
"h2_mplx(%ld): release_join -> destroy, "
"%d tasks still present",
m->id, (int)h2_ihash_count(m->tasks));
@@ -793,26 +815,27 @@ apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream,
if (m->aborted) {
status = APR_ECONNABORTED;
}
- else if (stream->response) {
- /* already have a respone, schedule for submit */
- h2_ihash_add(m->sready, stream);
- }
else {
- h2_beam_create(&stream->input, stream->pool, stream->id,
- "input", 0);
h2_ihash_add(m->streams, stream);
-
- if (!m->need_registration) {
- m->need_registration = h2_iq_empty(m->q);
+ if (stream->response) {
+ /* already have a respone, schedule for submit */
+ h2_ihash_add(m->sready, stream);
}
- if (m->workers_busy < m->workers_max) {
- do_registration = m->need_registration;
+ else {
+ h2_beam_create(&stream->input, stream->pool, stream->id,
+ "input", 0);
+ if (!m->need_registration) {
+ m->need_registration = h2_iq_empty(m->q);
+ }
+ if (m->workers_busy < m->workers_max) {
+ do_registration = m->need_registration;
+ }
+ h2_iq_add(m->q, stream->id, cmp, ctx);
+
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
+ "h2_mplx(%ld-%d): process, body=%d",
+ m->c->id, stream->id, stream->request->body);
}
- h2_iq_add(m->q, stream->id, cmp, ctx);
-
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
- "h2_mplx(%ld-%d): process, body=%d",
- m->c->id, stream->id, stream->request->body);
}
leave_mutex(m, acquired);
}
@@ -958,7 +981,6 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
task->done_at = apr_time_now();
if (task->output.beam) {
h2_beam_on_consumed(task->output.beam, NULL, NULL);
- h2_beam_on_produced(task->output.beam, NULL, NULL);
h2_beam_mutex_set(task->output.beam, NULL, NULL, NULL);
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
@@ -1304,57 +1326,12 @@ void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn)
* mplx master events dispatching
******************************************************************************/
-typedef struct {
- h2_mplx *m;
- stream_ev_callback *on_resume;
- stream_ev_callback *on_response;
- void *on_ctx;
- apr_status_t status;
-} dispatch_ctx;
-
static int update_window(void *ctx, void *val)
{
input_consumed_signal(ctx, val);
return 1;
}
-static int stream_ready_iter(void *data, void *val)
-{
- dispatch_ctx *ctx = data;
- h2_stream *stream = val;
- h2_task *task = h2_ihash_get(ctx->m->tasks, stream->id);
-
- if (task) {
- task->submitted = 1;
- if (task->rst_error) {
- h2_stream_rst(stream, task->rst_error);
- }
- else {
- AP_DEBUG_ASSERT(task->response);
- h2_stream_set_response(stream, task->response, task->output.beam);
- }
- }
- else {
- /* We have the stream ready without a task. This happens
- * when we fail streams early. A response should already
- * be present. */
- AP_DEBUG_ASSERT(stream->response || stream->rst_error);
- }
-
- ctx->status = ctx->on_response(ctx->on_ctx, stream->id);
- return 1;
-}
-
-static int stream_resume_iter(void *data, void *val)
-{
- dispatch_ctx *ctx = data;
- h2_stream *stream = val;
-
- h2_stream_set_suspended(stream, 0);
- ctx->status = ctx->on_resume(ctx->on_ctx, stream->id);
- return 1;
-}
-
apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m,
stream_ev_callback *on_resume,
stream_ev_callback *on_response,
@@ -1362,32 +1339,66 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m,
{
apr_status_t status;
int acquired;
+ int streams[32];
+ h2_stream *stream;
+ h2_task *task;
+ size_t i, n;
AP_DEBUG_ASSERT(m);
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- dispatch_ctx ctx;
- ctx.m = m;
- ctx.on_resume = on_resume;
- ctx.on_response = on_response;
- ctx.on_ctx = on_ctx;
- ctx.status = APR_SUCCESS;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
"h2_mplx(%ld): dispatch events", m->id);
+
/* update input windows for streams */
h2_ihash_iter(m->streams, update_window, m);
- if (ctx.on_response) {
- h2_ihash_iter(m->sready, stream_ready_iter, &ctx);
- h2_ihash_clear(m->sready);
+ if (on_response && !h2_ihash_empty(m->sready)) {
+ n = h2_ihash_ishift(m->sready, streams, H2_ALEN(streams));
+ for (i = 0; i < n; ++i) {
+ stream = h2_ihash_get(m->streams, streams[i]);
+ if (!stream) {
+ continue;
+ }
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
+ "h2_mplx(%ld-%d): on_response",
+ m->id, stream->id);
+ task = h2_ihash_get(m->tasks, stream->id);
+ if (task) {
+ task->submitted = 1;
+ if (task->rst_error) {
+ h2_stream_rst(stream, task->rst_error);
+ }
+ else {
+ AP_DEBUG_ASSERT(task->response);
+ h2_stream_set_response(stream, task->response, task->output.beam);
+ }
+ }
+ else {
+ /* We have the stream ready without a task. This happens
+ * when we fail streams early. A response should already
+ * be present. */
+ AP_DEBUG_ASSERT(stream->response || stream->rst_error);
+ }
+ status = on_response(on_ctx, stream->id);
+ }
}
- if (ctx.on_resume) {
- h2_ihash_iter(m->sresume, stream_resume_iter, &ctx);
- h2_ihash_clear(m->sresume);
+ if (on_resume && !h2_ihash_empty(m->sresume)) {
+ n = h2_ihash_ishift(m->sresume, streams, H2_ALEN(streams));
+ for (i = 0; i < n; ++i) {
+ stream = h2_ihash_get(m->streams, streams[i]);
+ if (!stream) {
+ continue;
+ }
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
+ "h2_mplx(%ld-%d): on_resume",
+ m->id, stream->id);
+ h2_stream_set_suspended(stream, 0);
+ status = on_resume(on_ctx, stream->id);
+ }
}
leave_mutex(m, acquired);
- return ctx.status;
}
return status;
}
@@ -1397,7 +1408,6 @@ static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
h2_mplx *m = ctx;
apr_status_t status;
h2_stream *stream;
- h2_task *task;
int acquired;
AP_DEBUG_ASSERT(m);
@@ -1405,10 +1415,7 @@ static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
stream = h2_ihash_get(m->streams, beam->id);
if (stream && h2_stream_is_suspended(stream)) {
h2_ihash_add(m->sresume, stream);
- task = h2_ihash_get(m->tasks, stream->id);
- if (task && task->output.beam) {
- h2_beam_on_produced(task->output.beam, NULL, NULL);
- }
+ h2_beam_on_produced(beam, NULL, NULL);
have_out_data_for(m, beam->id);
}
leave_mutex(m, acquired);
@@ -1425,17 +1432,15 @@ apr_status_t h2_mplx_suspend_stream(h2_mplx *m, int stream_id)
AP_DEBUG_ASSERT(m);
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
stream = h2_ihash_get(m->streams, stream_id);
- if (stream && !h2_stream_is_suspended(stream)) {
+ if (stream) {
h2_stream_set_suspended(stream, 1);
task = h2_ihash_get(m->tasks, stream->id);
- if (task && task->output.beam && h2_beam_empty(task->output.beam)) {
- /* register callback so that we can resume on new output */
- h2_beam_on_produced(task->output.beam, output_produced, m);
+ if (stream->started && (!task || task->worker_done)) {
+ h2_ihash_add(m->sresume, stream);
}
else {
- /* if the beam got data in the meantime, add this to the to-be
- * resumed streams right away. */
- h2_ihash_add(m->sresume, stream);
+ /* register callback so that we can resume on new output */
+ h2_beam_on_produced(task->output.beam, output_produced, m);
}
}
leave_mutex(m, acquired);
diff --git a/modules/http2/h2_util.c b/modules/http2/h2_util.c
index a0f82ac057..f8575fa7e1 100644
--- a/modules/http2/h2_util.c
+++ b/modules/http2/h2_util.c
@@ -325,11 +325,84 @@ void h2_ihash_remove(h2_ihash_t *ih, int id)
apr_hash_set(ih->hash, &id, sizeof(id), NULL);
}
+void h2_ihash_remove_val(h2_ihash_t *ih, void *val)
+{
+ int id = *((int*)((char *)val + ih->ioff));
+ apr_hash_set(ih->hash, &id, sizeof(id), NULL);
+}
+
+
void h2_ihash_clear(h2_ihash_t *ih)
{
apr_hash_clear(ih->hash);
}
+typedef struct {
+ h2_ihash_t *ih;
+ void **buffer;
+ size_t max;
+ size_t len;
+} collect_ctx;
+
+static int collect_iter(void *x, void *val)
+{
+ collect_ctx *ctx = x;
+ if (ctx->len < ctx->max) {
+ ctx->buffer[ctx->len++] = val;
+ return 1;
+ }
+ return 0;
+}
+
+size_t h2_ihash_shift(h2_ihash_t *ih, void **buffer, size_t max)
+{
+ collect_ctx ctx;
+ size_t i;
+
+ ctx.ih = ih;
+ ctx.buffer = buffer;
+ ctx.max = max;
+ ctx.len = 0;
+ h2_ihash_iter(ih, collect_iter, &ctx);
+ for (i = 0; i < ctx.len; ++i) {
+ h2_ihash_remove_val(ih, buffer[i]);
+ }
+ return ctx.len;
+}
+
+typedef struct {
+ h2_ihash_t *ih;
+ int *buffer;
+ size_t max;
+ size_t len;
+} icollect_ctx;
+
+static int icollect_iter(void *x, void *val)
+{
+ icollect_ctx *ctx = x;
+ if (ctx->len < ctx->max) {
+ ctx->buffer[ctx->len++] = *((int*)((char *)val + ctx->ih->ioff));
+ return 1;
+ }
+ return 0;
+}
+
+size_t h2_ihash_ishift(h2_ihash_t *ih, int *buffer, size_t max)
+{
+ icollect_ctx ctx;
+ size_t i;
+
+ ctx.ih = ih;
+ ctx.buffer = buffer;
+ ctx.max = max;
+ ctx.len = 0;
+ h2_ihash_iter(ih, icollect_iter, &ctx);
+ for (i = 0; i < ctx.len; ++i) {
+ h2_ihash_remove(ih, buffer[i]);
+ }
+ return ctx.len;
+}
+
/*******************************************************************************
* ilist - sorted list for structs with int identifier
******************************************************************************/
diff --git a/modules/http2/h2_util.h b/modules/http2/h2_util.h
index c200729c20..99724d7a5d 100644
--- a/modules/http2/h2_util.h
+++ b/modules/http2/h2_util.h
@@ -64,8 +64,12 @@ int h2_ihash_iter(h2_ihash_t *ih, h2_ihash_iter_t *fn, void *ctx);
void h2_ihash_add(h2_ihash_t *ih, void *val);
void h2_ihash_remove(h2_ihash_t *ih, int id);
+void h2_ihash_remove_val(h2_ihash_t *ih, void *val);
void h2_ihash_clear(h2_ihash_t *ih);
+size_t h2_ihash_shift(h2_ihash_t *ih, void **buffer, size_t max);
+size_t h2_ihash_ishift(h2_ihash_t *ih, int *buffer, size_t max);
+
/*******************************************************************************
* ilist - sorted list for structs with int identifier as first member
******************************************************************************/