diff options
Diffstat (limited to 'modules/http2/h2_mplx.c')
-rw-r--r-- | modules/http2/h2_mplx.c | 692 |
1 files changed, 245 insertions, 447 deletions
diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index 6513a75415..d75188a78e 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -29,38 +29,40 @@ #include "mod_http2.h" #include "h2_private.h" +#include "h2_bucket_beam.h" #include "h2_config.h" #include "h2_conn.h" #include "h2_ctx.h" #include "h2_h2.h" -#include "h2_int_queue.h" #include "h2_io.h" -#include "h2_io_set.h" #include "h2_response.h" #include "h2_mplx.h" #include "h2_ngn_shed.h" #include "h2_request.h" #include "h2_stream.h" #include "h2_task.h" -#include "h2_task_input.h" -#include "h2_task_output.h" #include "h2_worker.h" #include "h2_workers.h" #include "h2_util.h" -#define H2_MPLX_IO_OUT(lvl,m,io,msg) \ - do { \ - if (APLOG_C_IS_LEVEL((m)->c,lvl)) \ - h2_util_bb_log((m)->c,(io)->id,lvl,msg,(io)->bbout); \ - } while(0) - -#define H2_MPLX_IO_IN(lvl,m,io,msg) \ - do { \ - if (APLOG_C_IS_LEVEL((m)->c,lvl)) \ - h2_util_bb_log((m)->c,(io)->id,lvl,msg,(io)->bbin); \ - } while(0) - +static void h2_beam_log(h2_bucket_beam *beam, int id, const char *msg, + conn_rec *c, int level) +{ + if (beam && APLOG_C_IS_LEVEL(c,level)) { + char buffer[2048]; + apr_size_t off = 0; + + off += apr_snprintf(buffer+off, H2_ALEN(buffer)-off, "cl=%d, ", beam->closed); + off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "red", ", ", &beam->red); + off += h2_util_bb_print(buffer+off, H2_ALEN(buffer)-off, "green", ", ", beam->green); + off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "hold", ", ", &beam->hold); + off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "purge", "", &beam->purge); + + ap_log_cerror(APLOG_MARK, level, 0, c, "beam(%ld-%d): %s %s", + c->id, id, msg, buffer); + } +} /* NULL or the mutex hold by this thread, used for recursive calls */ @@ -104,13 +106,51 @@ static void leave_mutex(h2_mplx *m, int acquired) } } -static int is_aborted(h2_mplx *m, apr_status_t *pstatus) +static apr_status_t io_mutex_enter(void *ctx, + apr_thread_mutex_t **plock, int *acquired) { - AP_DEBUG_ASSERT(m); - if (m->aborted) { - *pstatus = APR_ECONNABORTED; + h2_mplx *m = ctx; + *plock = m->lock; + return enter_mutex(m, acquired); +} + +static void io_mutex_leave(void *ctx, apr_thread_mutex_t *lock, int acquired) +{ + h2_mplx *m = ctx; + leave_mutex(m, acquired); +} + +static void stream_output_consumed(void *ctx, + h2_bucket_beam *beam, apr_off_t length) +{ + h2_io *io = ctx; + if (length > 0 && io->task && io->task->assigned) { + h2_req_engine_out_consumed(io->task->assigned, io->task->c, length); + } +} + +static void stream_input_consumed(void *ctx, + h2_bucket_beam *beam, apr_off_t length) +{ + h2_mplx *m = ctx; + if (m->input_consumed && length) { + m->input_consumed(m->input_consumed_ctx, beam->id, length); + } +} + +static int can_beam_file(void *ctx, h2_bucket_beam *beam, apr_file_t *file) +{ + h2_mplx *m = ctx; + if (m->tx_handles_reserved > 0) { + --m->tx_handles_reserved; + ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, + "h2_mplx(%ld-%d): beaming file %s, tx_avail %d", + m->id, beam->id, beam->tag, m->tx_handles_reserved); return 1; } + ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, + "h2_mplx(%ld-%d): can_beam_file denied on %s", + m->id, beam->id, beam->tag); return 0; } @@ -118,9 +158,9 @@ static void have_out_data_for(h2_mplx *m, int stream_id); static void check_tx_reservation(h2_mplx *m) { - if (m->tx_handles_reserved == 0) { + if (m->tx_handles_reserved <= 0) { m->tx_handles_reserved += h2_workers_tx_reserve(m->workers, - H2MIN(m->tx_chunk_size, h2_io_set_size(m->stream_ios))); + H2MIN(m->tx_chunk_size, h2_ilist_count(m->stream_ios))); } } @@ -132,7 +172,7 @@ static void check_tx_free(h2_mplx *m) h2_workers_tx_free(m->workers, count); } else if (m->tx_handles_reserved - && (!m->stream_ios || h2_io_set_is_empty(m->stream_ios))) { + && (!m->stream_ios || h2_ilist_empty(m->stream_ios))) { h2_workers_tx_free(m->workers, m->tx_handles_reserved); m->tx_handles_reserved = 0; } @@ -143,7 +183,7 @@ static void h2_mplx_destroy(h2_mplx *m) AP_DEBUG_ASSERT(m); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld): destroy, ios=%d", - m->id, (int)h2_io_set_size(m->stream_ios)); + m->id, (int)h2_ilist_count(m->stream_ios)); check_tx_free(m); if (m->pool) { apr_pool_destroy(m->pool); @@ -205,8 +245,8 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, m->max_streams = h2_config_geti(conf, H2_CONF_MAX_STREAMS); m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM); m->q = h2_iq_create(m->pool, m->max_streams); - m->stream_ios = h2_io_set_create(m->pool); - m->ready_ios = h2_io_set_create(m->pool); + m->stream_ios = h2_ilist_create(m->pool); + m->ready_ios = h2_ilist_create(m->pool); m->stream_timeout = stream_timeout; m->workers = workers; m->workers_max = workers->max_workers; @@ -240,49 +280,29 @@ apr_uint32_t h2_mplx_shutdown(h2_mplx *m) return max_stream_started; } -static void workers_register(h2_mplx *m) -{ - /* h2_workers is only a hub for all the h2_worker instances. - * At the end-of-life of this h2_mplx, we always unregister at - * the workers. The thing to manage are all the h2_worker instances - * out there. Those may hold a reference to this h2_mplx and we cannot - * call them to unregister. - * - * Therefore: ref counting for h2_workers in not needed, ref counting - * for h2_worker using this is critical. - */ - m->need_registration = 0; - h2_workers_register(m->workers, m); -} - -static int io_in_consumed_signal(h2_mplx *m, h2_io *io) +static void io_in_consumed_signal(h2_mplx *m, h2_io *io) { - if (io->input_consumed && m->input_consumed) { - m->input_consumed(m->input_consumed_ctx, - io->id, io->input_consumed); - io->input_consumed = 0; - return 1; + if (io->beam_in && io->worker_started) { + h2_beam_send(io->beam_in, NULL, 0); /* trigger updates */ } - return 0; } static int io_out_consumed_signal(h2_mplx *m, h2_io *io) { - if (io->output_consumed && io->task && io->task->assigned) { - h2_req_engine_out_consumed(io->task->assigned, io->task->c, - io->output_consumed); - io->output_consumed = 0; - return 1; + if (io->beam_out && io->worker_started && io->task && io->task->assigned) { + h2_beam_send(io->beam_out, NULL, 0); /* trigger updates */ } return 0; } + static void io_destroy(h2_mplx *m, h2_io *io, int events) { + conn_rec *slave = NULL; int reuse_slave; /* cleanup any buffered input */ - h2_io_in_shutdown(io); + h2_io_shutdown(io); if (events) { /* Process outstanding events before destruction */ io_in_consumed_signal(m, io); @@ -291,24 +311,37 @@ static void io_destroy(h2_mplx *m, h2_io *io, int events) /* The pool is cleared/destroyed which also closes all * allocated file handles. Give this count back to our * file handle pool. */ - m->tx_handles_reserved += io->files_handles_owned; + if (io->beam_in) { + m->tx_handles_reserved += h2_beam_get_files_beamed(io->beam_in); + } + if (io->beam_out) { + m->tx_handles_reserved += h2_beam_get_files_beamed(io->beam_out); + } - h2_io_set_remove(m->stream_ios, io); - h2_io_set_remove(m->ready_ios, io); + h2_ilist_remove(m->stream_ios, io->id); + h2_ilist_remove(m->ready_ios, io->id); if (m->redo_ios) { - h2_io_set_remove(m->redo_ios, io); + h2_ilist_remove(m->redo_ios, io->id); } reuse_slave = ((m->spare_slaves->nelts < m->spare_slaves->nalloc) - && !io->rst_error && io->eor); + && !io->rst_error); if (io->task) { - conn_rec *slave = io->task->c; + slave = io->task->c; h2_task_destroy(io->task); io->task = NULL; - + } + + if (io->pool) { + if (m->spare_io_pool) { + apr_pool_destroy(m->spare_io_pool); + } + apr_pool_clear(io->pool); + m->spare_io_pool = io->pool; + } + + if (slave) { if (reuse_slave && slave->keepalive == AP_CONN_KEEPALIVE) { - apr_bucket_delete(io->eor); - io->eor = NULL; APR_ARRAY_PUSH(m->spare_slaves, conn_rec*) = slave; } else { @@ -316,18 +349,14 @@ static void io_destroy(h2_mplx *m, h2_io *io, int events) h2_slave_destroy(slave, NULL); } } - - if (io->pool) { - apr_pool_destroy(io->pool); - } - + check_tx_free(m); } static int io_stream_done(h2_mplx *m, h2_io *io, int rst_error) { /* Remove io from ready set, we will never submit it */ - h2_io_set_remove(m->ready_ios, io); + h2_ilist_remove(m->ready_ios, io->id); if (!io->worker_started || io->worker_done) { /* already finished or not even started yet */ h2_iq_remove(m->q, io->id); @@ -336,39 +365,41 @@ static int io_stream_done(h2_mplx *m, h2_io *io, int rst_error) } else { /* cleanup once task is done */ - h2_io_make_orphaned(io, rst_error); + io->orphaned = 1; + if (rst_error) { + h2_io_rst(io, rst_error); + } return 1; } } -static int stream_done_iter(void *ctx, h2_io *io) +static int stream_done_iter(void *ctx, void *val) { - return io_stream_done((h2_mplx*)ctx, io, 0); + return io_stream_done((h2_mplx*)ctx, val, 0); } -static int stream_print(void *ctx, h2_io *io) +static int stream_print(void *ctx, void *val) { h2_mplx *m = ctx; + h2_io *io = val; if (io && io->request) { ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */ "->03198: h2_stream(%ld-%d): %s %s %s -> %s %d" - "[orph=%d/started=%d/done=%d/eos_in=%d/eos_out=%d]", + "[orph=%d/started=%d/done=%d]", m->id, io->id, io->request->method, io->request->authority, io->request->path, io->response? "http" : (io->rst_error? "reset" : "?"), io->response? io->response->http_status : io->rst_error, - io->orphaned, io->worker_started, io->worker_done, - io->eos_in, io->eos_out); + io->orphaned, io->worker_started, io->worker_done); } else if (io) { ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */ "->03198: h2_stream(%ld-%d): NULL -> %s %d" - "[orph=%d/started=%d/done=%d/eos_in=%d/eos_out=%d]", + "[orph=%d/started=%d/done=%d]", m->id, io->id, io->response? "http" : (io->rst_error? "reset" : "?"), io->response? io->response->http_status : io->rst_error, - io->orphaned, io->worker_started, io->worker_done, - io->eos_in, io->eos_out); + io->orphaned, io->worker_started, io->worker_done); } else { ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */ @@ -392,7 +423,7 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) h2_iq_clear(m->q); apr_thread_cond_broadcast(m->task_thawed); - while (!h2_io_set_iter(m->stream_ios, stream_done_iter, m)) { + while (!h2_ilist_iter(m->stream_ios, stream_done_iter, m)) { /* iterate until all ios have been orphaned or destroyed */ } @@ -407,9 +438,13 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) m->join_wait = wait; ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld): release_join, waiting on %d worker to report back", - m->id, (int)h2_io_set_size(m->stream_ios)); + m->id, (int)h2_ilist_count(m->stream_ios)); status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs)); + + while (!h2_ilist_iter(m->stream_ios, stream_done_iter, m)) { + /* iterate until all ios have been orphaned or destroyed */ + } if (APR_STATUS_IS_TIMEUP(status)) { if (i > 0) { /* Oh, oh. Still we wait for assigned workers to report that @@ -421,9 +456,9 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) "h2_mplx(%ld): release, waiting for %d seconds now for " "%d h2_workers to return, have still %d requests outstanding", m->id, i*wait_secs, m->workers_busy, - (int)h2_io_set_size(m->stream_ios)); + (int)h2_ilist_count(m->stream_ios)); if (i == 1) { - h2_io_set_iter(m->stream_ios, stream_print, m); + h2_ilist_iter(m->stream_ios, stream_print, m); } } h2_mplx_abort(m); @@ -431,10 +466,10 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) } } - if (!h2_io_set_is_empty(m->stream_ios)) { + if (!h2_ilist_empty(m->stream_ios)) { ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, m->c, "h2_mplx(%ld): release_join, %d streams still open", - m->id, (int)h2_io_set_size(m->stream_ios)); + m->id, (int)h2_ilist_count(m->stream_ios)); } ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056) "h2_mplx(%ld): release_join -> destroy", m->id); @@ -468,7 +503,7 @@ apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error) */ AP_DEBUG_ASSERT(m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_io_set_get(m->stream_ios, stream_id); + h2_io *io = h2_ilist_get(m->stream_ios, stream_id); /* there should be an h2_io, once the stream has been scheduled * for processing, e.g. when we received all HEADERs. But when @@ -484,107 +519,16 @@ apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error) return status; } -apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block, - int stream_id, apr_bucket_brigade *bb, - apr_table_t *trailers, - struct apr_thread_cond_t *iowait) -{ - apr_status_t status; - int acquired; - - AP_DEBUG_ASSERT(m); - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_io_set_get(m->stream_ios, stream_id); - if (io && !io->orphaned) { - H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_read_pre"); - - h2_io_signal_init(io, H2_IO_READ, m->stream_timeout, iowait); - status = h2_io_in_read(io, bb, -1, trailers); - while (APR_STATUS_IS_EAGAIN(status) - && !is_aborted(m, &status) - && block == APR_BLOCK_READ) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c, - "h2_mplx(%ld-%d): wait on in data (BLOCK_READ)", - m->id, stream_id); - status = h2_io_signal_wait(m, io); - if (status == APR_SUCCESS) { - status = h2_io_in_read(io, bb, -1, trailers); - } - } - H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_read_post"); - h2_io_signal_exit(io); - } - else { - status = APR_EOF; - } - leave_mutex(m, acquired); - } - return status; -} - -apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id, - const char *data, apr_size_t len, int eos) -{ - apr_status_t status; - int acquired; - - AP_DEBUG_ASSERT(m); - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_io_set_get(m->stream_ios, stream_id); - if (io && !io->orphaned) { - H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_pre"); - status = h2_io_in_write(io, data, len, eos); - H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_post"); - h2_io_signal(io, H2_IO_READ); - io_in_consumed_signal(m, io); - } - else { - status = APR_ECONNABORTED; - } - leave_mutex(m, acquired); - } - return status; -} - -apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id) -{ - apr_status_t status; - int acquired; - - AP_DEBUG_ASSERT(m); - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_io_set_get(m->stream_ios, stream_id); - if (io && !io->orphaned) { - status = h2_io_in_close(io); - H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_close"); - h2_io_signal(io, H2_IO_READ); - io_in_consumed_signal(m, io); - } - else { - status = APR_ECONNABORTED; - } - leave_mutex(m, acquired); - } - return status; -} - void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx) { m->input_consumed = cb; m->input_consumed_ctx = ctx; } -typedef struct { - h2_mplx * m; - int streams_updated; -} update_ctx; - -static int update_window(void *ctx, h2_io *io) +static int update_window(void *ctx, void *val) { - update_ctx *uctx = (update_ctx*)ctx; - if (io_in_consumed_signal(uctx->m, io)) { - ++uctx->streams_updated; - } + h2_mplx *m = ctx; + io_in_consumed_signal(m, val); return 1; } @@ -598,46 +542,11 @@ apr_status_t h2_mplx_in_update_windows(h2_mplx *m) return APR_ECONNABORTED; } if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - update_ctx ctx; + h2_ilist_iter(m->stream_ios, update_window, m); - ctx.m = m; - ctx.streams_updated = 0; - - status = APR_EAGAIN; - h2_io_set_iter(m->stream_ios, update_window, &ctx); - - if (ctx.streams_updated) { - status = APR_SUCCESS; - } - leave_mutex(m, acquired); - } - return status; -} - -apr_status_t h2_mplx_out_get_brigade(h2_mplx *m, int stream_id, - apr_bucket_brigade *bb, - apr_off_t len, apr_table_t **ptrailers) -{ - apr_status_t status; - int acquired; - - AP_DEBUG_ASSERT(m); - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_io_set_get(m->stream_ios, stream_id); - if (io && !io->orphaned) { - H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_get_brigade_pre"); - - status = h2_io_out_get_brigade(io, bb, len); - - H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_get_brigade_post"); - if (status == APR_SUCCESS) { - h2_io_signal(io, H2_IO_WRITE); - } - } - else { - status = APR_ECONNABORTED; - } - *ptrailers = io->response? io->response->trailers : NULL; + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_session(%ld): windows updated", m->id); + status = APR_SUCCESS; leave_mutex(m, acquired); } return status; @@ -651,7 +560,7 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams) AP_DEBUG_ASSERT(m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_io_set_shift(m->ready_ios); + h2_io *io = h2_ilist_shift(m->ready_ios); if (io && !m->aborted) { stream = h2_ihash_get(streams, io->id); if (stream) { @@ -661,9 +570,7 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams) } else { AP_DEBUG_ASSERT(io->response); - H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_next_submit_pre"); - h2_stream_set_response(stream, io->response, io->bbout); - H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_next_submit_post"); + h2_stream_set_response(stream, io->response, io->beam_out); } } else { @@ -675,114 +582,62 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams) "h2_mplx(%ld): stream for response %d closed, " "resetting io to close request processing", m->id, io->id); - h2_io_make_orphaned(io, H2_ERR_STREAM_CLOSED); + io->orphaned = 1; + h2_io_rst(io, H2_ERR_STREAM_CLOSED); if (!io->worker_started || io->worker_done) { io_destroy(m, io, 1); } else { /* hang around until the h2_task is done, but - * shutdown input and send out any events (e.g. window - * updates) asap. */ - h2_io_in_shutdown(io); + * shutdown input/output and send out any events asap. */ + h2_io_shutdown(io); io_in_consumed_signal(m, io); } } - - h2_io_signal(io, H2_IO_WRITE); } leave_mutex(m, acquired); } return stream; } -static apr_status_t out_write(h2_mplx *m, h2_io *io, - ap_filter_t* f, int blocking, - apr_bucket_brigade *bb, - struct apr_thread_cond_t *iowait) +static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response, + h2_bucket_beam *output) { apr_status_t status = APR_SUCCESS; - /* We check the memory footprint queued for this stream_id - * and block if it exceeds our configured limit. - * We will not split buckets to enforce the limit to the last - * byte. After all, the bucket is already in memory. - */ - while (status == APR_SUCCESS - && !APR_BRIGADE_EMPTY(bb) - && !is_aborted(m, &status)) { - - status = h2_io_out_write(io, bb, blocking? m->stream_max_mem : INT_MAX, - &m->tx_handles_reserved); - io_out_consumed_signal(m, io); - - /* Wait for data to drain until there is room again or - * stream timeout expires */ - h2_io_signal_init(io, H2_IO_WRITE, m->stream_timeout, iowait); - while (status == APR_SUCCESS - && !APR_BRIGADE_EMPTY(bb) - && iowait - && (m->stream_max_mem <= h2_io_out_length(io)) - && !is_aborted(m, &status)) { - if (!blocking) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, - "h2_mplx(%ld-%d): incomplete write", - m->id, io->id); - return APR_INCOMPLETE; - } - if (f) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, - "h2_mplx(%ld-%d): waiting for out drain", - m->id, io->id); - } - status = h2_io_signal_wait(m, io); - } - h2_io_signal_exit(io); + + h2_io *io = h2_ilist_get(m->stream_ios, stream_id); + if (!io || io->orphaned) { + return APR_ECONNABORTED; } - apr_brigade_cleanup(bb); - return status; -} - -static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response, - ap_filter_t* f, apr_bucket_brigade *bb, - struct apr_thread_cond_t *iowait) -{ - apr_status_t status = APR_SUCCESS; + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + "h2_mplx(%ld-%d): open response: %d, rst=%d", + m->id, stream_id, response->http_status, + response->rst_error); - h2_io *io = h2_io_set_get(m->stream_ios, stream_id); - if (io && !io->orphaned) { - if (f) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, - "h2_mplx(%ld-%d): open response: %d, rst=%d", - m->id, stream_id, response->http_status, - response->rst_error); - } - - h2_io_set_response(io, response); - h2_io_set_add(m->ready_ios, io); - if (response && response->http_status < 300) { - /* we might see some file buckets in the output, see - * if we have enough handles reserved. */ - check_tx_reservation(m); - } - if (bb) { - status = out_write(m, io, f, 0, bb, iowait); - if (status == APR_INCOMPLETE) { - /* write will have transferred as much data as possible. - caller has to deal with non-empty brigade */ - status = APR_SUCCESS; - } - } - have_out_data_for(m, stream_id); + if (output) { + h2_beam_buffer_size_set(output, m->stream_max_mem); + h2_beam_timeout_set(output, m->stream_timeout); + h2_beam_on_consumed(output, stream_output_consumed, io); + m->tx_handles_reserved -= h2_beam_get_files_beamed(output); + h2_beam_on_file_beam(output, can_beam_file, m); + h2_beam_mutex_set(output, io_mutex_enter, io_mutex_leave, + io->task->cond, m); } - else { - status = APR_ECONNABORTED; + h2_io_set_response(io, response, output); + + h2_ilist_add(m->ready_ios, io); + if (response && response->http_status < 300) { + /* we might see some file buckets in the output, see + * if we have enough handles reserved. */ + check_tx_reservation(m); } + have_out_data_for(m, stream_id); return status; } apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response, - ap_filter_t* f, apr_bucket_brigade *bb, - struct apr_thread_cond_t *iowait) + h2_bucket_beam *output) { apr_status_t status; int acquired; @@ -793,37 +648,7 @@ apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response, status = APR_ECONNABORTED; } else { - status = out_open(m, stream_id, response, f, bb, iowait); - if (APLOGctrace1(m->c)) { - h2_util_bb_log(m->c, stream_id, APLOG_TRACE1, "h2_mplx_out_open", bb); - } - } - leave_mutex(m, acquired); - } - return status; -} - -apr_status_t h2_mplx_out_write(h2_mplx *m, int stream_id, - ap_filter_t* f, int blocking, - apr_bucket_brigade *bb, - struct apr_thread_cond_t *iowait) -{ - apr_status_t status; - int acquired; - - AP_DEBUG_ASSERT(m); - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_io_set_get(m->stream_ios, stream_id); - if (io && !io->orphaned) { - status = out_write(m, io, f, blocking, bb, iowait); - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c, - "h2_mplx(%ld-%d): write", m->id, io->id); - H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_write"); - - have_out_data_for(m, stream_id); - } - else { - status = APR_ECONNABORTED; + status = out_open(m, stream_id, response, output); } leave_mutex(m, acquired); } @@ -837,7 +662,7 @@ apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id) AP_DEBUG_ASSERT(m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_io_set_get(m->stream_ios, stream_id); + h2_io *io = h2_ilist_get(m->stream_ios, stream_id); if (io && !io->orphaned) { if (!io->response && !io->rst_error) { /* In case a close comes before a response was created, @@ -846,45 +671,20 @@ apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id) */ h2_response *r = h2_response_die(stream_id, APR_EGENERAL, io->request, m->pool); - status = out_open(m, stream_id, r, NULL, NULL, NULL); + status = out_open(m, stream_id, r, NULL); ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c, "h2_mplx(%ld-%d): close, no response, no rst", m->id, io->id); } ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c, - "h2_mplx(%ld-%d): close with eor=%s", - m->id, io->id, io->eor? "yes" : "no"); - status = h2_io_out_close(io); - H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_close"); - io_out_consumed_signal(m, io); - - have_out_data_for(m, stream_id); - } - else { - status = APR_ECONNABORTED; - } - leave_mutex(m, acquired); - } - return status; -} - -apr_status_t h2_mplx_out_rst(h2_mplx *m, int stream_id, int error) -{ - apr_status_t status; - int acquired; - - AP_DEBUG_ASSERT(m); - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_io_set_get(m->stream_ios, stream_id); - if (io && !io->rst_error && !io->orphaned) { - h2_io_rst(io, error); - if (!io->response) { - h2_io_set_add(m->ready_ios, io); + "h2_mplx(%ld-%d): close", m->id, io->id); + if (io->beam_out) { + status = h2_beam_close(io->beam_out); + h2_beam_log(io->beam_out, stream_id, "out_close", m->c, + APLOG_TRACE2); } - H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_rst"); - + io_out_consumed_signal(m, io); have_out_data_for(m, stream_id); - h2_io_signal(io, H2_IO_WRITE); } else { status = APR_ECONNABORTED; @@ -894,26 +694,6 @@ apr_status_t h2_mplx_out_rst(h2_mplx *m, int stream_id, int error) return status; } -int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id) -{ - apr_status_t status; - int has_data = 0; - int acquired; - - AP_DEBUG_ASSERT(m); - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_io_set_get(m->stream_ios, stream_id); - if (io && !io->orphaned) { - has_data = h2_io_out_has_data(io); - } - else { - has_data = 0; - } - leave_mutex(m, acquired); - } - return has_data; -} - apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout, apr_thread_cond_t *iowait) { @@ -969,22 +749,7 @@ apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx) return status; } -static h2_io *open_io(h2_mplx *m, int stream_id, const h2_request *request) -{ - apr_pool_t *io_pool; - h2_io *io; - - apr_pool_create(&io_pool, m->pool); - apr_pool_tag(io_pool, "h2_io"); - io = h2_io_create(stream_id, io_pool, m->bucket_alloc, request); - h2_io_set_add(m->stream_ios, io); - - return io; -} - - -apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, - const h2_request *req, +apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream, h2_stream_pri_cmp *cmp, void *ctx) { apr_status_t status; @@ -997,24 +762,38 @@ apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, status = APR_ECONNABORTED; } else { - h2_io *io = open_io(m, stream_id, req); + apr_pool_t *io_pool; + h2_io *io; - if (!io->request->body) { - status = h2_io_in_close(io); + if (!m->need_registration) { + m->need_registration = h2_iq_empty(m->q); } - - m->need_registration = m->need_registration || h2_iq_empty(m->q); - do_registration = (m->need_registration && m->workers_busy < m->workers_max); + if (m->workers_busy < m->workers_max) { + do_registration = m->need_registration; + } + + io_pool = m->spare_io_pool; + if (io_pool) { + m->spare_io_pool = NULL; + } + else { + apr_pool_create(&io_pool, m->pool); + apr_pool_tag(io_pool, "h2_io"); + } + io = h2_io_create(stream->id, io_pool, stream->request); + h2_ilist_add(m->stream_ios, io); h2_iq_add(m->q, io->id, cmp, ctx); + stream->input = io->beam_in; ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c, - "h2_mplx(%ld-%d): process", m->c->id, stream_id); - H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_process"); + "h2_mplx(%ld-%d): process, body=%d", + m->c->id, stream->id, io->request->body); } leave_mutex(m, acquired); } - if (status == APR_SUCCESS && do_registration) { - workers_register(m); + if (do_registration) { + m->need_registration = 0; + h2_workers_register(m->workers, m); } return status; } @@ -1022,20 +801,24 @@ apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, static h2_task *pop_task(h2_mplx *m) { h2_task *task = NULL; + h2_io *io; int sid; - while (!m->aborted && !task - && (m->workers_busy < m->workers_limit) - && (sid = h2_iq_shift(m->q)) > 0) { - h2_io *io = h2_io_set_get(m->stream_ios, sid); - if (io && io->orphaned) { - io_destroy(m, io, 0); - if (m->join_wait) { - apr_thread_cond_signal(m->join_wait); - } - } - else if (io) { + while (!m->aborted && !task && (m->workers_busy < m->workers_limit) + && (sid = h2_iq_shift(m->q)) > 0) { + + io = h2_ilist_get(m->stream_ios, sid); + if (io) { conn_rec *slave, **pslave; + if (io->orphaned) { + /* TODO: add to purge list */ + io_destroy(m, io, 0); + if (m->join_wait) { + apr_thread_cond_signal(m->join_wait); + } + continue; + } + pslave = (conn_rec **)apr_array_pop(m->spare_slaves); if (pslave) { slave = *pslave; @@ -1046,12 +829,21 @@ static h2_task *pop_task(h2_mplx *m) } slave->sbh = m->c->sbh; - io->task = task = h2_task_create(m->id, io->request, slave, m); + io->task = task = h2_task_create(slave, io->request, + io->beam_in, m); m->c->keepalives++; apr_table_setn(slave->notes, H2_TASK_ID_NOTE, task->id); io->worker_started = 1; io->started_at = apr_time_now(); + + if (io->beam_in) { + h2_beam_timeout_set(io->beam_in, m->stream_timeout); + h2_beam_on_consumed(io->beam_in, stream_input_consumed, m); + h2_beam_on_file_beam(io->beam_in, can_beam_file, m); + h2_beam_mutex_set(io->beam_in, io_mutex_enter, + io_mutex_leave, task->cond, m); + } if (sid > m->max_stream_started) { m->max_stream_started = sid; } @@ -1088,7 +880,7 @@ h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more) static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) { if (task) { - h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id); + h2_io *io = h2_ilist_get(m->stream_ios, task->stream_id); if (task->frozen) { /* this task was handed over to an engine for processing @@ -1112,14 +904,17 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) h2_mplx_out_close(m, task->stream_id); if (ngn && io) { - apr_off_t bytes = io->output_consumed + h2_io_out_length(io); + apr_off_t bytes = 0; + if (io->beam_out) { + h2_beam_send(io->beam_out, NULL, APR_NONBLOCK_READ); + bytes += h2_beam_get_buffered(io->beam_out); + } if (bytes > 0) { /* we need to report consumed and current buffered output * to the engine. The request will be streamed out or cancelled, * no more data is coming from it and the engine should update * its calculations before we destroy this information. */ h2_req_engine_out_consumed(ngn, task->c, bytes); - io->output_consumed = 0; } } @@ -1136,10 +931,10 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) if (io) { apr_time_t now = apr_time_now(); if (!io->orphaned && m->redo_ios - && h2_io_set_get(m->redo_ios, io->id)) { + && h2_ilist_get(m->redo_ios, io->id)) { /* reset and schedule again */ h2_io_redo(io); - h2_io_set_remove(m->redo_ios, io); + h2_ilist_remove(m->redo_ios, io->id); h2_iq_add(m->q, io->id, NULL, NULL); } else { @@ -1168,6 +963,7 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) } if (io->orphaned) { + /* TODO: add to purge list */ io_destroy(m, io, 0); if (m->join_wait) { apr_thread_cond_signal(m->join_wait); @@ -1211,12 +1007,12 @@ typedef struct { apr_time_t now; } io_iter_ctx; -static int latest_repeatable_busy_unsubmitted_iter(void *data, h2_io *io) +static int latest_repeatable_busy_unsubmitted_iter(void *data, void *val) { io_iter_ctx *ctx = data; + h2_io *io = val; if (io->worker_started && !io->worker_done - && h2_io_is_repeatable(io) - && !h2_io_set_get(ctx->m->redo_ios, io->id)) { + && h2_io_can_redo(io) && !h2_ilist_get(ctx->m->redo_ios, io->id)) { /* this io occupies a worker, the response has not been submitted yet, * not been cancelled and it is a repeatable request * -> it can be re-scheduled later */ @@ -1233,13 +1029,14 @@ static h2_io *get_latest_repeatable_busy_unsubmitted_io(h2_mplx *m) io_iter_ctx ctx; ctx.m = m; ctx.io = NULL; - h2_io_set_iter(m->stream_ios, latest_repeatable_busy_unsubmitted_iter, &ctx); + h2_ilist_iter(m->stream_ios, latest_repeatable_busy_unsubmitted_iter, &ctx); return ctx.io; } -static int timed_out_busy_iter(void *data, h2_io *io) +static int timed_out_busy_iter(void *data, void *val) { io_iter_ctx *ctx = data; + h2_io *io = val; if (io->worker_started && !io->worker_done && (ctx->now - io->started_at) > ctx->m->stream_timeout) { /* timed out stream occupying a worker, found */ @@ -1254,7 +1051,7 @@ static h2_io *get_timed_out_busy_stream(h2_mplx *m) ctx.m = m; ctx.io = NULL; ctx.now = apr_time_now(); - h2_io_set_iter(m->stream_ios, timed_out_busy_iter, &ctx); + h2_ilist_iter(m->stream_ios, timed_out_busy_iter, &ctx); return ctx.io; } @@ -1264,19 +1061,19 @@ static apr_status_t unschedule_slow_ios(h2_mplx *m) int n; if (!m->redo_ios) { - m->redo_ios = h2_io_set_create(m->pool); + m->redo_ios = h2_ilist_create(m->pool); } /* Try to get rid of streams that occupy workers. Look for safe requests * that are repeatable. If none found, fail the connection. */ - n = (m->workers_busy - m->workers_limit - h2_io_set_size(m->redo_ios)); + n = (m->workers_busy - m->workers_limit - h2_ilist_count(m->redo_ios)); while (n > 0 && (io = get_latest_repeatable_busy_unsubmitted_io(m))) { - h2_io_set_add(m->redo_ios, io); + h2_ilist_add(m->redo_ios, io); h2_io_rst(io, H2_ERR_CANCEL); --n; } - if ((m->workers_busy - h2_io_set_size(m->redo_ios)) > m->workers_limit) { + if ((m->workers_busy - h2_ilist_count(m->redo_ios)) > m->workers_limit) { io = get_timed_out_busy_stream(m); if (io) { /* Too many busy workers, unable to cancel enough streams @@ -1295,7 +1092,7 @@ apr_status_t h2_mplx_idle(h2_mplx *m) int acquired; if (enter_mutex(m, &acquired) == APR_SUCCESS) { - apr_size_t scount = h2_io_set_size(m->stream_ios); + apr_size_t scount = h2_ilist_count(m->stream_ios); if (scount > 0 && m->workers_busy) { /* If we have streams in connection state 'IDLE', meaning * all streams are ready to sent data out, but lack @@ -1350,9 +1147,10 @@ typedef struct { int streams_updated; } ngn_update_ctx; -static int ngn_update_window(void *ctx, h2_io *io) +static int ngn_update_window(void *ctx, void *val) { ngn_update_ctx *uctx = ctx; + h2_io *io = val; if (io && io->task && io->task->assigned == uctx->ngn && io_out_consumed_signal(uctx->m, io)) { ++uctx->streams_updated; @@ -1367,7 +1165,7 @@ static apr_status_t ngn_out_update_windows(h2_mplx *m, h2_req_engine *ngn) ctx.m = m; ctx.ngn = ngn; ctx.streams_updated = 0; - h2_io_set_iter(m->stream_ios, ngn_update_window, &ctx); + h2_ilist_iter(m->stream_ios, ngn_update_window, &ctx); return ctx.streams_updated? APR_SUCCESS : APR_EAGAIN; } @@ -1389,7 +1187,7 @@ apr_status_t h2_mplx_req_engine_push(const char *ngn_type, task->r = r; if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id); + h2_io *io = h2_ilist_get(m->stream_ios, task->stream_id); if (!io || io->orphaned) { status = APR_ECONNABORTED; } |