diff options
author | Stefan Eissing <icing@apache.org> | 2016-05-20 16:37:39 +0200 |
---|---|---|
committer | Stefan Eissing <icing@apache.org> | 2016-05-20 16:37:39 +0200 |
commit | 134f867cea9f256a49cdab8a1a94467b9ef3bfe5 (patch) | |
tree | 47860851d1928b5ae8c5dffc3120df4904ca3d12 /modules | |
parent | mod_http2: improved resume/response/window update handling on master connection (diff) | |
download | apache2-134f867cea9f256a49cdab8a1a94467b9ef3bfe5.tar.xz apache2-134f867cea9f256a49cdab8a1a94467b9ef3bfe5.zip |
mod_http2: fixing re-entrancy problems with new master event dispatching
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1744751 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'modules')
-rw-r--r-- | modules/http2/h2_bucket_beam.c | 34 | ||||
-rw-r--r-- | modules/http2/h2_mplx.c | 185 | ||||
-rw-r--r-- | modules/http2/h2_util.c | 73 | ||||
-rw-r--r-- | modules/http2/h2_util.h | 4 |
4 files changed, 187 insertions, 109 deletions
diff --git a/modules/http2/h2_bucket_beam.c b/modules/http2/h2_bucket_beam.c index 8d0892db72..d648b1d159 100644 --- a/modules/http2/h2_bucket_beam.c +++ b/modules/http2/h2_bucket_beam.c @@ -706,13 +706,13 @@ apr_status_t h2_beam_send(h2_bucket_beam *beam, status = APR_ECONNABORTED; } else if (red_brigade) { - int not_emtpy = APR_BRIGADE_EMPTY(red_brigade); + int force_report = !APR_BRIGADE_EMPTY(red_brigade); while (!APR_BRIGADE_EMPTY(red_brigade) && status == APR_SUCCESS) { bred = APR_BRIGADE_FIRST(red_brigade); status = append_bucket(beam, bred, block, beam->red_pool, &bl); } - report_production(beam, not_emtpy); + report_production(beam, force_report); if (beam->m_cond) { apr_thread_cond_broadcast(beam->m_cond); } @@ -771,8 +771,8 @@ transfer: if (APR_BUCKET_IS_METADATA(bred)) { if (APR_BUCKET_IS_EOS(bred)) { - beam->close_sent = 1; bgreen = apr_bucket_eos_create(bb->bucket_alloc); + beam->close_sent = 1; } else if (APR_BUCKET_IS_FLUSH(bred)) { bgreen = apr_bucket_flush_create(bb->bucket_alloc); @@ -850,28 +850,24 @@ transfer: } } - if ((!beam->green || APR_BRIGADE_EMPTY(beam->green)) - && H2_BLIST_EMPTY(&beam->red) - && beam->closed && !beam->close_sent) { - apr_bucket *b = apr_bucket_eos_create(bb->bucket_alloc); - APR_BRIGADE_INSERT_TAIL(bb, b); - beam->close_sent = 1; - ++transferred; - status = APR_SUCCESS; - } - else if (transferred) { - status = APR_SUCCESS; - } - else if (beam->closed) { + if (beam->closed + && (!beam->green || APR_BRIGADE_EMPTY(beam->green)) + && H2_BLIST_EMPTY(&beam->red)) { + /* beam is closed and we have nothing more to receive */ if (!beam->close_sent) { apr_bucket *b = apr_bucket_eos_create(bb->bucket_alloc); APR_BRIGADE_INSERT_TAIL(bb, b); beam->close_sent = 1; + ++transferred; status = APR_SUCCESS; } - else { - status = APR_EOF; - } + } + + if (transferred) { + status = APR_SUCCESS; + } + else if (beam->closed) { + status = APR_EOF; } else if (block == APR_BLOCK_READ && bl.mutex && beam->m_cond) { status = wait_cond(beam, bl.mutex); diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index 4861c06550..f7b30fffad 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -369,6 +369,7 @@ static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master) if (task->output.beam) { m->tx_handles_reserved += h2_beam_get_files_beamed(task->output.beam); + h2_beam_on_produced(task->output.beam, NULL, NULL); } slave = task->c; @@ -502,6 +503,17 @@ static int task_abort_connection(void *ctx, void *val) return 1; } +static int report_stream_iter(void *ctx, void *val) { + h2_mplx *m = ctx; + h2_stream *stream = val; + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + "h2_mplx(%ld-%d): exists, started=%d, scheduled=%d, " + "submitted=%d, suspended=%d", + m->id, stream->id, stream->started, stream->scheduled, + stream->submitted, stream->suspended); + return 1; +} + apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) { apr_status_t status; @@ -511,6 +523,17 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { int i, wait_secs = 5; + + if (!h2_ihash_empty(m->streams) && APLOGctrace1(m->c)) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + "h2_mplx(%ld): release_join with %d streams open, " + "%d streams resume, %d streams ready, %d tasks", + m->id, (int)h2_ihash_count(m->streams), + (int)h2_ihash_count(m->sresume), + (int)h2_ihash_count(m->sready), + (int)h2_ihash_count(m->tasks)); + h2_ihash_iter(m->streams, report_stream_iter, m); + } /* disable WINDOW_UPDATE callbacks */ h2_mplx_set_consumed_cb(m, NULL, NULL); @@ -585,10 +608,9 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) purge_streams(m); } AP_DEBUG_ASSERT(h2_ihash_empty(m->spurge)); - AP_DEBUG_ASSERT(h2_ihash_empty(m->tasks)); if (!h2_ihash_empty(m->tasks)) { - ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03056) + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056) "h2_mplx(%ld): release_join -> destroy, " "%d tasks still present", m->id, (int)h2_ihash_count(m->tasks)); @@ -793,26 +815,27 @@ apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream, if (m->aborted) { status = APR_ECONNABORTED; } - else if (stream->response) { - /* already have a respone, schedule for submit */ - h2_ihash_add(m->sready, stream); - } else { - h2_beam_create(&stream->input, stream->pool, stream->id, - "input", 0); h2_ihash_add(m->streams, stream); - - if (!m->need_registration) { - m->need_registration = h2_iq_empty(m->q); + if (stream->response) { + /* already have a respone, schedule for submit */ + h2_ihash_add(m->sready, stream); } - if (m->workers_busy < m->workers_max) { - do_registration = m->need_registration; + else { + h2_beam_create(&stream->input, stream->pool, stream->id, + "input", 0); + if (!m->need_registration) { + m->need_registration = h2_iq_empty(m->q); + } + if (m->workers_busy < m->workers_max) { + do_registration = m->need_registration; + } + h2_iq_add(m->q, stream->id, cmp, ctx); + + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c, + "h2_mplx(%ld-%d): process, body=%d", + m->c->id, stream->id, stream->request->body); } - h2_iq_add(m->q, stream->id, cmp, ctx); - - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c, - "h2_mplx(%ld-%d): process, body=%d", - m->c->id, stream->id, stream->request->body); } leave_mutex(m, acquired); } @@ -958,7 +981,6 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) task->done_at = apr_time_now(); if (task->output.beam) { h2_beam_on_consumed(task->output.beam, NULL, NULL); - h2_beam_on_produced(task->output.beam, NULL, NULL); h2_beam_mutex_set(task->output.beam, NULL, NULL, NULL); } ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, @@ -1304,57 +1326,12 @@ void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn) * mplx master events dispatching ******************************************************************************/ -typedef struct { - h2_mplx *m; - stream_ev_callback *on_resume; - stream_ev_callback *on_response; - void *on_ctx; - apr_status_t status; -} dispatch_ctx; - static int update_window(void *ctx, void *val) { input_consumed_signal(ctx, val); return 1; } -static int stream_ready_iter(void *data, void *val) -{ - dispatch_ctx *ctx = data; - h2_stream *stream = val; - h2_task *task = h2_ihash_get(ctx->m->tasks, stream->id); - - if (task) { - task->submitted = 1; - if (task->rst_error) { - h2_stream_rst(stream, task->rst_error); - } - else { - AP_DEBUG_ASSERT(task->response); - h2_stream_set_response(stream, task->response, task->output.beam); - } - } - else { - /* We have the stream ready without a task. This happens - * when we fail streams early. A response should already - * be present. */ - AP_DEBUG_ASSERT(stream->response || stream->rst_error); - } - - ctx->status = ctx->on_response(ctx->on_ctx, stream->id); - return 1; -} - -static int stream_resume_iter(void *data, void *val) -{ - dispatch_ctx *ctx = data; - h2_stream *stream = val; - - h2_stream_set_suspended(stream, 0); - ctx->status = ctx->on_resume(ctx->on_ctx, stream->id); - return 1; -} - apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, stream_ev_callback *on_resume, stream_ev_callback *on_response, @@ -1362,32 +1339,66 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, { apr_status_t status; int acquired; + int streams[32]; + h2_stream *stream; + h2_task *task; + size_t i, n; AP_DEBUG_ASSERT(m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - dispatch_ctx ctx; - ctx.m = m; - ctx.on_resume = on_resume; - ctx.on_response = on_response; - ctx.on_ctx = on_ctx; - ctx.status = APR_SUCCESS; - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, "h2_mplx(%ld): dispatch events", m->id); + /* update input windows for streams */ h2_ihash_iter(m->streams, update_window, m); - if (ctx.on_response) { - h2_ihash_iter(m->sready, stream_ready_iter, &ctx); - h2_ihash_clear(m->sready); + if (on_response && !h2_ihash_empty(m->sready)) { + n = h2_ihash_ishift(m->sready, streams, H2_ALEN(streams)); + for (i = 0; i < n; ++i) { + stream = h2_ihash_get(m->streams, streams[i]); + if (!stream) { + continue; + } + ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, + "h2_mplx(%ld-%d): on_response", + m->id, stream->id); + task = h2_ihash_get(m->tasks, stream->id); + if (task) { + task->submitted = 1; + if (task->rst_error) { + h2_stream_rst(stream, task->rst_error); + } + else { + AP_DEBUG_ASSERT(task->response); + h2_stream_set_response(stream, task->response, task->output.beam); + } + } + else { + /* We have the stream ready without a task. This happens + * when we fail streams early. A response should already + * be present. */ + AP_DEBUG_ASSERT(stream->response || stream->rst_error); + } + status = on_response(on_ctx, stream->id); + } } - if (ctx.on_resume) { - h2_ihash_iter(m->sresume, stream_resume_iter, &ctx); - h2_ihash_clear(m->sresume); + if (on_resume && !h2_ihash_empty(m->sresume)) { + n = h2_ihash_ishift(m->sresume, streams, H2_ALEN(streams)); + for (i = 0; i < n; ++i) { + stream = h2_ihash_get(m->streams, streams[i]); + if (!stream) { + continue; + } + ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, + "h2_mplx(%ld-%d): on_resume", + m->id, stream->id); + h2_stream_set_suspended(stream, 0); + status = on_resume(on_ctx, stream->id); + } } leave_mutex(m, acquired); - return ctx.status; } return status; } @@ -1397,7 +1408,6 @@ static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes) h2_mplx *m = ctx; apr_status_t status; h2_stream *stream; - h2_task *task; int acquired; AP_DEBUG_ASSERT(m); @@ -1405,10 +1415,7 @@ static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes) stream = h2_ihash_get(m->streams, beam->id); if (stream && h2_stream_is_suspended(stream)) { h2_ihash_add(m->sresume, stream); - task = h2_ihash_get(m->tasks, stream->id); - if (task && task->output.beam) { - h2_beam_on_produced(task->output.beam, NULL, NULL); - } + h2_beam_on_produced(beam, NULL, NULL); have_out_data_for(m, beam->id); } leave_mutex(m, acquired); @@ -1425,17 +1432,15 @@ apr_status_t h2_mplx_suspend_stream(h2_mplx *m, int stream_id) AP_DEBUG_ASSERT(m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { stream = h2_ihash_get(m->streams, stream_id); - if (stream && !h2_stream_is_suspended(stream)) { + if (stream) { h2_stream_set_suspended(stream, 1); task = h2_ihash_get(m->tasks, stream->id); - if (task && task->output.beam && h2_beam_empty(task->output.beam)) { - /* register callback so that we can resume on new output */ - h2_beam_on_produced(task->output.beam, output_produced, m); + if (stream->started && (!task || task->worker_done)) { + h2_ihash_add(m->sresume, stream); } else { - /* if the beam got data in the meantime, add this to the to-be - * resumed streams right away. */ - h2_ihash_add(m->sresume, stream); + /* register callback so that we can resume on new output */ + h2_beam_on_produced(task->output.beam, output_produced, m); } } leave_mutex(m, acquired); diff --git a/modules/http2/h2_util.c b/modules/http2/h2_util.c index a0f82ac057..f8575fa7e1 100644 --- a/modules/http2/h2_util.c +++ b/modules/http2/h2_util.c @@ -325,11 +325,84 @@ void h2_ihash_remove(h2_ihash_t *ih, int id) apr_hash_set(ih->hash, &id, sizeof(id), NULL); } +void h2_ihash_remove_val(h2_ihash_t *ih, void *val) +{ + int id = *((int*)((char *)val + ih->ioff)); + apr_hash_set(ih->hash, &id, sizeof(id), NULL); +} + + void h2_ihash_clear(h2_ihash_t *ih) { apr_hash_clear(ih->hash); } +typedef struct { + h2_ihash_t *ih; + void **buffer; + size_t max; + size_t len; +} collect_ctx; + +static int collect_iter(void *x, void *val) +{ + collect_ctx *ctx = x; + if (ctx->len < ctx->max) { + ctx->buffer[ctx->len++] = val; + return 1; + } + return 0; +} + +size_t h2_ihash_shift(h2_ihash_t *ih, void **buffer, size_t max) +{ + collect_ctx ctx; + size_t i; + + ctx.ih = ih; + ctx.buffer = buffer; + ctx.max = max; + ctx.len = 0; + h2_ihash_iter(ih, collect_iter, &ctx); + for (i = 0; i < ctx.len; ++i) { + h2_ihash_remove_val(ih, buffer[i]); + } + return ctx.len; +} + +typedef struct { + h2_ihash_t *ih; + int *buffer; + size_t max; + size_t len; +} icollect_ctx; + +static int icollect_iter(void *x, void *val) +{ + icollect_ctx *ctx = x; + if (ctx->len < ctx->max) { + ctx->buffer[ctx->len++] = *((int*)((char *)val + ctx->ih->ioff)); + return 1; + } + return 0; +} + +size_t h2_ihash_ishift(h2_ihash_t *ih, int *buffer, size_t max) +{ + icollect_ctx ctx; + size_t i; + + ctx.ih = ih; + ctx.buffer = buffer; + ctx.max = max; + ctx.len = 0; + h2_ihash_iter(ih, icollect_iter, &ctx); + for (i = 0; i < ctx.len; ++i) { + h2_ihash_remove(ih, buffer[i]); + } + return ctx.len; +} + /******************************************************************************* * ilist - sorted list for structs with int identifier ******************************************************************************/ diff --git a/modules/http2/h2_util.h b/modules/http2/h2_util.h index c200729c20..99724d7a5d 100644 --- a/modules/http2/h2_util.h +++ b/modules/http2/h2_util.h @@ -64,8 +64,12 @@ int h2_ihash_iter(h2_ihash_t *ih, h2_ihash_iter_t *fn, void *ctx); void h2_ihash_add(h2_ihash_t *ih, void *val); void h2_ihash_remove(h2_ihash_t *ih, int id); +void h2_ihash_remove_val(h2_ihash_t *ih, void *val); void h2_ihash_clear(h2_ihash_t *ih); +size_t h2_ihash_shift(h2_ihash_t *ih, void **buffer, size_t max); +size_t h2_ihash_ishift(h2_ihash_t *ih, int *buffer, size_t max); + /******************************************************************************* * ilist - sorted list for structs with int identifier as first member ******************************************************************************/ |