summaryrefslogtreecommitdiffstats
path: root/modules/http2/h2_mplx.c
diff options
context:
space:
mode:
Diffstat (limited to 'modules/http2/h2_mplx.c')
-rw-r--r--modules/http2/h2_mplx.c692
1 files changed, 245 insertions, 447 deletions
diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c
index 6513a75415..d75188a78e 100644
--- a/modules/http2/h2_mplx.c
+++ b/modules/http2/h2_mplx.c
@@ -29,38 +29,40 @@
#include "mod_http2.h"
#include "h2_private.h"
+#include "h2_bucket_beam.h"
#include "h2_config.h"
#include "h2_conn.h"
#include "h2_ctx.h"
#include "h2_h2.h"
-#include "h2_int_queue.h"
#include "h2_io.h"
-#include "h2_io_set.h"
#include "h2_response.h"
#include "h2_mplx.h"
#include "h2_ngn_shed.h"
#include "h2_request.h"
#include "h2_stream.h"
#include "h2_task.h"
-#include "h2_task_input.h"
-#include "h2_task_output.h"
#include "h2_worker.h"
#include "h2_workers.h"
#include "h2_util.h"
-#define H2_MPLX_IO_OUT(lvl,m,io,msg) \
- do { \
- if (APLOG_C_IS_LEVEL((m)->c,lvl)) \
- h2_util_bb_log((m)->c,(io)->id,lvl,msg,(io)->bbout); \
- } while(0)
-
-#define H2_MPLX_IO_IN(lvl,m,io,msg) \
- do { \
- if (APLOG_C_IS_LEVEL((m)->c,lvl)) \
- h2_util_bb_log((m)->c,(io)->id,lvl,msg,(io)->bbin); \
- } while(0)
-
+static void h2_beam_log(h2_bucket_beam *beam, int id, const char *msg,
+ conn_rec *c, int level)
+{
+ if (beam && APLOG_C_IS_LEVEL(c,level)) {
+ char buffer[2048];
+ apr_size_t off = 0;
+
+ off += apr_snprintf(buffer+off, H2_ALEN(buffer)-off, "cl=%d, ", beam->closed);
+ off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "red", ", ", &beam->red);
+ off += h2_util_bb_print(buffer+off, H2_ALEN(buffer)-off, "green", ", ", beam->green);
+ off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "hold", ", ", &beam->hold);
+ off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "purge", "", &beam->purge);
+
+ ap_log_cerror(APLOG_MARK, level, 0, c, "beam(%ld-%d): %s %s",
+ c->id, id, msg, buffer);
+ }
+}
/* NULL or the mutex hold by this thread, used for recursive calls
*/
@@ -104,13 +106,51 @@ static void leave_mutex(h2_mplx *m, int acquired)
}
}
-static int is_aborted(h2_mplx *m, apr_status_t *pstatus)
+static apr_status_t io_mutex_enter(void *ctx,
+ apr_thread_mutex_t **plock, int *acquired)
{
- AP_DEBUG_ASSERT(m);
- if (m->aborted) {
- *pstatus = APR_ECONNABORTED;
+ h2_mplx *m = ctx;
+ *plock = m->lock;
+ return enter_mutex(m, acquired);
+}
+
+static void io_mutex_leave(void *ctx, apr_thread_mutex_t *lock, int acquired)
+{
+ h2_mplx *m = ctx;
+ leave_mutex(m, acquired);
+}
+
+static void stream_output_consumed(void *ctx,
+ h2_bucket_beam *beam, apr_off_t length)
+{
+ h2_io *io = ctx;
+ if (length > 0 && io->task && io->task->assigned) {
+ h2_req_engine_out_consumed(io->task->assigned, io->task->c, length);
+ }
+}
+
+static void stream_input_consumed(void *ctx,
+ h2_bucket_beam *beam, apr_off_t length)
+{
+ h2_mplx *m = ctx;
+ if (m->input_consumed && length) {
+ m->input_consumed(m->input_consumed_ctx, beam->id, length);
+ }
+}
+
+static int can_beam_file(void *ctx, h2_bucket_beam *beam, apr_file_t *file)
+{
+ h2_mplx *m = ctx;
+ if (m->tx_handles_reserved > 0) {
+ --m->tx_handles_reserved;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
+ "h2_mplx(%ld-%d): beaming file %s, tx_avail %d",
+ m->id, beam->id, beam->tag, m->tx_handles_reserved);
return 1;
}
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
+ "h2_mplx(%ld-%d): can_beam_file denied on %s",
+ m->id, beam->id, beam->tag);
return 0;
}
@@ -118,9 +158,9 @@ static void have_out_data_for(h2_mplx *m, int stream_id);
static void check_tx_reservation(h2_mplx *m)
{
- if (m->tx_handles_reserved == 0) {
+ if (m->tx_handles_reserved <= 0) {
m->tx_handles_reserved += h2_workers_tx_reserve(m->workers,
- H2MIN(m->tx_chunk_size, h2_io_set_size(m->stream_ios)));
+ H2MIN(m->tx_chunk_size, h2_ilist_count(m->stream_ios)));
}
}
@@ -132,7 +172,7 @@ static void check_tx_free(h2_mplx *m)
h2_workers_tx_free(m->workers, count);
}
else if (m->tx_handles_reserved
- && (!m->stream_ios || h2_io_set_is_empty(m->stream_ios))) {
+ && (!m->stream_ios || h2_ilist_empty(m->stream_ios))) {
h2_workers_tx_free(m->workers, m->tx_handles_reserved);
m->tx_handles_reserved = 0;
}
@@ -143,7 +183,7 @@ static void h2_mplx_destroy(h2_mplx *m)
AP_DEBUG_ASSERT(m);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): destroy, ios=%d",
- m->id, (int)h2_io_set_size(m->stream_ios));
+ m->id, (int)h2_ilist_count(m->stream_ios));
check_tx_free(m);
if (m->pool) {
apr_pool_destroy(m->pool);
@@ -205,8 +245,8 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent,
m->max_streams = h2_config_geti(conf, H2_CONF_MAX_STREAMS);
m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
m->q = h2_iq_create(m->pool, m->max_streams);
- m->stream_ios = h2_io_set_create(m->pool);
- m->ready_ios = h2_io_set_create(m->pool);
+ m->stream_ios = h2_ilist_create(m->pool);
+ m->ready_ios = h2_ilist_create(m->pool);
m->stream_timeout = stream_timeout;
m->workers = workers;
m->workers_max = workers->max_workers;
@@ -240,49 +280,29 @@ apr_uint32_t h2_mplx_shutdown(h2_mplx *m)
return max_stream_started;
}
-static void workers_register(h2_mplx *m)
-{
- /* h2_workers is only a hub for all the h2_worker instances.
- * At the end-of-life of this h2_mplx, we always unregister at
- * the workers. The thing to manage are all the h2_worker instances
- * out there. Those may hold a reference to this h2_mplx and we cannot
- * call them to unregister.
- *
- * Therefore: ref counting for h2_workers in not needed, ref counting
- * for h2_worker using this is critical.
- */
- m->need_registration = 0;
- h2_workers_register(m->workers, m);
-}
-
-static int io_in_consumed_signal(h2_mplx *m, h2_io *io)
+static void io_in_consumed_signal(h2_mplx *m, h2_io *io)
{
- if (io->input_consumed && m->input_consumed) {
- m->input_consumed(m->input_consumed_ctx,
- io->id, io->input_consumed);
- io->input_consumed = 0;
- return 1;
+ if (io->beam_in && io->worker_started) {
+ h2_beam_send(io->beam_in, NULL, 0); /* trigger updates */
}
- return 0;
}
static int io_out_consumed_signal(h2_mplx *m, h2_io *io)
{
- if (io->output_consumed && io->task && io->task->assigned) {
- h2_req_engine_out_consumed(io->task->assigned, io->task->c,
- io->output_consumed);
- io->output_consumed = 0;
- return 1;
+ if (io->beam_out && io->worker_started && io->task && io->task->assigned) {
+ h2_beam_send(io->beam_out, NULL, 0); /* trigger updates */
}
return 0;
}
+
static void io_destroy(h2_mplx *m, h2_io *io, int events)
{
+ conn_rec *slave = NULL;
int reuse_slave;
/* cleanup any buffered input */
- h2_io_in_shutdown(io);
+ h2_io_shutdown(io);
if (events) {
/* Process outstanding events before destruction */
io_in_consumed_signal(m, io);
@@ -291,24 +311,37 @@ static void io_destroy(h2_mplx *m, h2_io *io, int events)
/* The pool is cleared/destroyed which also closes all
* allocated file handles. Give this count back to our
* file handle pool. */
- m->tx_handles_reserved += io->files_handles_owned;
+ if (io->beam_in) {
+ m->tx_handles_reserved += h2_beam_get_files_beamed(io->beam_in);
+ }
+ if (io->beam_out) {
+ m->tx_handles_reserved += h2_beam_get_files_beamed(io->beam_out);
+ }
- h2_io_set_remove(m->stream_ios, io);
- h2_io_set_remove(m->ready_ios, io);
+ h2_ilist_remove(m->stream_ios, io->id);
+ h2_ilist_remove(m->ready_ios, io->id);
if (m->redo_ios) {
- h2_io_set_remove(m->redo_ios, io);
+ h2_ilist_remove(m->redo_ios, io->id);
}
reuse_slave = ((m->spare_slaves->nelts < m->spare_slaves->nalloc)
- && !io->rst_error && io->eor);
+ && !io->rst_error);
if (io->task) {
- conn_rec *slave = io->task->c;
+ slave = io->task->c;
h2_task_destroy(io->task);
io->task = NULL;
-
+ }
+
+ if (io->pool) {
+ if (m->spare_io_pool) {
+ apr_pool_destroy(m->spare_io_pool);
+ }
+ apr_pool_clear(io->pool);
+ m->spare_io_pool = io->pool;
+ }
+
+ if (slave) {
if (reuse_slave && slave->keepalive == AP_CONN_KEEPALIVE) {
- apr_bucket_delete(io->eor);
- io->eor = NULL;
APR_ARRAY_PUSH(m->spare_slaves, conn_rec*) = slave;
}
else {
@@ -316,18 +349,14 @@ static void io_destroy(h2_mplx *m, h2_io *io, int events)
h2_slave_destroy(slave, NULL);
}
}
-
- if (io->pool) {
- apr_pool_destroy(io->pool);
- }
-
+
check_tx_free(m);
}
static int io_stream_done(h2_mplx *m, h2_io *io, int rst_error)
{
/* Remove io from ready set, we will never submit it */
- h2_io_set_remove(m->ready_ios, io);
+ h2_ilist_remove(m->ready_ios, io->id);
if (!io->worker_started || io->worker_done) {
/* already finished or not even started yet */
h2_iq_remove(m->q, io->id);
@@ -336,39 +365,41 @@ static int io_stream_done(h2_mplx *m, h2_io *io, int rst_error)
}
else {
/* cleanup once task is done */
- h2_io_make_orphaned(io, rst_error);
+ io->orphaned = 1;
+ if (rst_error) {
+ h2_io_rst(io, rst_error);
+ }
return 1;
}
}
-static int stream_done_iter(void *ctx, h2_io *io)
+static int stream_done_iter(void *ctx, void *val)
{
- return io_stream_done((h2_mplx*)ctx, io, 0);
+ return io_stream_done((h2_mplx*)ctx, val, 0);
}
-static int stream_print(void *ctx, h2_io *io)
+static int stream_print(void *ctx, void *val)
{
h2_mplx *m = ctx;
+ h2_io *io = val;
if (io && io->request) {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
"->03198: h2_stream(%ld-%d): %s %s %s -> %s %d"
- "[orph=%d/started=%d/done=%d/eos_in=%d/eos_out=%d]",
+ "[orph=%d/started=%d/done=%d]",
m->id, io->id,
io->request->method, io->request->authority, io->request->path,
io->response? "http" : (io->rst_error? "reset" : "?"),
io->response? io->response->http_status : io->rst_error,
- io->orphaned, io->worker_started, io->worker_done,
- io->eos_in, io->eos_out);
+ io->orphaned, io->worker_started, io->worker_done);
}
else if (io) {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
"->03198: h2_stream(%ld-%d): NULL -> %s %d"
- "[orph=%d/started=%d/done=%d/eos_in=%d/eos_out=%d]",
+ "[orph=%d/started=%d/done=%d]",
m->id, io->id,
io->response? "http" : (io->rst_error? "reset" : "?"),
io->response? io->response->http_status : io->rst_error,
- io->orphaned, io->worker_started, io->worker_done,
- io->eos_in, io->eos_out);
+ io->orphaned, io->worker_started, io->worker_done);
}
else {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
@@ -392,7 +423,7 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
h2_iq_clear(m->q);
apr_thread_cond_broadcast(m->task_thawed);
- while (!h2_io_set_iter(m->stream_ios, stream_done_iter, m)) {
+ while (!h2_ilist_iter(m->stream_ios, stream_done_iter, m)) {
/* iterate until all ios have been orphaned or destroyed */
}
@@ -407,9 +438,13 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
m->join_wait = wait;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): release_join, waiting on %d worker to report back",
- m->id, (int)h2_io_set_size(m->stream_ios));
+ m->id, (int)h2_ilist_count(m->stream_ios));
status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs));
+
+ while (!h2_ilist_iter(m->stream_ios, stream_done_iter, m)) {
+ /* iterate until all ios have been orphaned or destroyed */
+ }
if (APR_STATUS_IS_TIMEUP(status)) {
if (i > 0) {
/* Oh, oh. Still we wait for assigned workers to report that
@@ -421,9 +456,9 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
"h2_mplx(%ld): release, waiting for %d seconds now for "
"%d h2_workers to return, have still %d requests outstanding",
m->id, i*wait_secs, m->workers_busy,
- (int)h2_io_set_size(m->stream_ios));
+ (int)h2_ilist_count(m->stream_ios));
if (i == 1) {
- h2_io_set_iter(m->stream_ios, stream_print, m);
+ h2_ilist_iter(m->stream_ios, stream_print, m);
}
}
h2_mplx_abort(m);
@@ -431,10 +466,10 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
}
}
- if (!h2_io_set_is_empty(m->stream_ios)) {
+ if (!h2_ilist_empty(m->stream_ios)) {
ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, m->c,
"h2_mplx(%ld): release_join, %d streams still open",
- m->id, (int)h2_io_set_size(m->stream_ios));
+ m->id, (int)h2_ilist_count(m->stream_ios));
}
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
"h2_mplx(%ld): release_join -> destroy", m->id);
@@ -468,7 +503,7 @@ apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error)
*/
AP_DEBUG_ASSERT(m);
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+ h2_io *io = h2_ilist_get(m->stream_ios, stream_id);
/* there should be an h2_io, once the stream has been scheduled
* for processing, e.g. when we received all HEADERs. But when
@@ -484,107 +519,16 @@ apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error)
return status;
}
-apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block,
- int stream_id, apr_bucket_brigade *bb,
- apr_table_t *trailers,
- struct apr_thread_cond_t *iowait)
-{
- apr_status_t status;
- int acquired;
-
- AP_DEBUG_ASSERT(m);
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io && !io->orphaned) {
- H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_read_pre");
-
- h2_io_signal_init(io, H2_IO_READ, m->stream_timeout, iowait);
- status = h2_io_in_read(io, bb, -1, trailers);
- while (APR_STATUS_IS_EAGAIN(status)
- && !is_aborted(m, &status)
- && block == APR_BLOCK_READ) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
- "h2_mplx(%ld-%d): wait on in data (BLOCK_READ)",
- m->id, stream_id);
- status = h2_io_signal_wait(m, io);
- if (status == APR_SUCCESS) {
- status = h2_io_in_read(io, bb, -1, trailers);
- }
- }
- H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_read_post");
- h2_io_signal_exit(io);
- }
- else {
- status = APR_EOF;
- }
- leave_mutex(m, acquired);
- }
- return status;
-}
-
-apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id,
- const char *data, apr_size_t len, int eos)
-{
- apr_status_t status;
- int acquired;
-
- AP_DEBUG_ASSERT(m);
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io && !io->orphaned) {
- H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_pre");
- status = h2_io_in_write(io, data, len, eos);
- H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_post");
- h2_io_signal(io, H2_IO_READ);
- io_in_consumed_signal(m, io);
- }
- else {
- status = APR_ECONNABORTED;
- }
- leave_mutex(m, acquired);
- }
- return status;
-}
-
-apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id)
-{
- apr_status_t status;
- int acquired;
-
- AP_DEBUG_ASSERT(m);
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io && !io->orphaned) {
- status = h2_io_in_close(io);
- H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_close");
- h2_io_signal(io, H2_IO_READ);
- io_in_consumed_signal(m, io);
- }
- else {
- status = APR_ECONNABORTED;
- }
- leave_mutex(m, acquired);
- }
- return status;
-}
-
void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx)
{
m->input_consumed = cb;
m->input_consumed_ctx = ctx;
}
-typedef struct {
- h2_mplx * m;
- int streams_updated;
-} update_ctx;
-
-static int update_window(void *ctx, h2_io *io)
+static int update_window(void *ctx, void *val)
{
- update_ctx *uctx = (update_ctx*)ctx;
- if (io_in_consumed_signal(uctx->m, io)) {
- ++uctx->streams_updated;
- }
+ h2_mplx *m = ctx;
+ io_in_consumed_signal(m, val);
return 1;
}
@@ -598,46 +542,11 @@ apr_status_t h2_mplx_in_update_windows(h2_mplx *m)
return APR_ECONNABORTED;
}
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- update_ctx ctx;
+ h2_ilist_iter(m->stream_ios, update_window, m);
- ctx.m = m;
- ctx.streams_updated = 0;
-
- status = APR_EAGAIN;
- h2_io_set_iter(m->stream_ios, update_window, &ctx);
-
- if (ctx.streams_updated) {
- status = APR_SUCCESS;
- }
- leave_mutex(m, acquired);
- }
- return status;
-}
-
-apr_status_t h2_mplx_out_get_brigade(h2_mplx *m, int stream_id,
- apr_bucket_brigade *bb,
- apr_off_t len, apr_table_t **ptrailers)
-{
- apr_status_t status;
- int acquired;
-
- AP_DEBUG_ASSERT(m);
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io && !io->orphaned) {
- H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_get_brigade_pre");
-
- status = h2_io_out_get_brigade(io, bb, len);
-
- H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_get_brigade_post");
- if (status == APR_SUCCESS) {
- h2_io_signal(io, H2_IO_WRITE);
- }
- }
- else {
- status = APR_ECONNABORTED;
- }
- *ptrailers = io->response? io->response->trailers : NULL;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_session(%ld): windows updated", m->id);
+ status = APR_SUCCESS;
leave_mutex(m, acquired);
}
return status;
@@ -651,7 +560,7 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams)
AP_DEBUG_ASSERT(m);
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_io_set_shift(m->ready_ios);
+ h2_io *io = h2_ilist_shift(m->ready_ios);
if (io && !m->aborted) {
stream = h2_ihash_get(streams, io->id);
if (stream) {
@@ -661,9 +570,7 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams)
}
else {
AP_DEBUG_ASSERT(io->response);
- H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_next_submit_pre");
- h2_stream_set_response(stream, io->response, io->bbout);
- H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_next_submit_post");
+ h2_stream_set_response(stream, io->response, io->beam_out);
}
}
else {
@@ -675,114 +582,62 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams)
"h2_mplx(%ld): stream for response %d closed, "
"resetting io to close request processing",
m->id, io->id);
- h2_io_make_orphaned(io, H2_ERR_STREAM_CLOSED);
+ io->orphaned = 1;
+ h2_io_rst(io, H2_ERR_STREAM_CLOSED);
if (!io->worker_started || io->worker_done) {
io_destroy(m, io, 1);
}
else {
/* hang around until the h2_task is done, but
- * shutdown input and send out any events (e.g. window
- * updates) asap. */
- h2_io_in_shutdown(io);
+ * shutdown input/output and send out any events asap. */
+ h2_io_shutdown(io);
io_in_consumed_signal(m, io);
}
}
-
- h2_io_signal(io, H2_IO_WRITE);
}
leave_mutex(m, acquired);
}
return stream;
}
-static apr_status_t out_write(h2_mplx *m, h2_io *io,
- ap_filter_t* f, int blocking,
- apr_bucket_brigade *bb,
- struct apr_thread_cond_t *iowait)
+static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response,
+ h2_bucket_beam *output)
{
apr_status_t status = APR_SUCCESS;
- /* We check the memory footprint queued for this stream_id
- * and block if it exceeds our configured limit.
- * We will not split buckets to enforce the limit to the last
- * byte. After all, the bucket is already in memory.
- */
- while (status == APR_SUCCESS
- && !APR_BRIGADE_EMPTY(bb)
- && !is_aborted(m, &status)) {
-
- status = h2_io_out_write(io, bb, blocking? m->stream_max_mem : INT_MAX,
- &m->tx_handles_reserved);
- io_out_consumed_signal(m, io);
-
- /* Wait for data to drain until there is room again or
- * stream timeout expires */
- h2_io_signal_init(io, H2_IO_WRITE, m->stream_timeout, iowait);
- while (status == APR_SUCCESS
- && !APR_BRIGADE_EMPTY(bb)
- && iowait
- && (m->stream_max_mem <= h2_io_out_length(io))
- && !is_aborted(m, &status)) {
- if (!blocking) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
- "h2_mplx(%ld-%d): incomplete write",
- m->id, io->id);
- return APR_INCOMPLETE;
- }
- if (f) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
- "h2_mplx(%ld-%d): waiting for out drain",
- m->id, io->id);
- }
- status = h2_io_signal_wait(m, io);
- }
- h2_io_signal_exit(io);
+
+ h2_io *io = h2_ilist_get(m->stream_ios, stream_id);
+ if (!io || io->orphaned) {
+ return APR_ECONNABORTED;
}
- apr_brigade_cleanup(bb);
- return status;
-}
-
-static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response,
- ap_filter_t* f, apr_bucket_brigade *bb,
- struct apr_thread_cond_t *iowait)
-{
- apr_status_t status = APR_SUCCESS;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld-%d): open response: %d, rst=%d",
+ m->id, stream_id, response->http_status,
+ response->rst_error);
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io && !io->orphaned) {
- if (f) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
- "h2_mplx(%ld-%d): open response: %d, rst=%d",
- m->id, stream_id, response->http_status,
- response->rst_error);
- }
-
- h2_io_set_response(io, response);
- h2_io_set_add(m->ready_ios, io);
- if (response && response->http_status < 300) {
- /* we might see some file buckets in the output, see
- * if we have enough handles reserved. */
- check_tx_reservation(m);
- }
- if (bb) {
- status = out_write(m, io, f, 0, bb, iowait);
- if (status == APR_INCOMPLETE) {
- /* write will have transferred as much data as possible.
- caller has to deal with non-empty brigade */
- status = APR_SUCCESS;
- }
- }
- have_out_data_for(m, stream_id);
+ if (output) {
+ h2_beam_buffer_size_set(output, m->stream_max_mem);
+ h2_beam_timeout_set(output, m->stream_timeout);
+ h2_beam_on_consumed(output, stream_output_consumed, io);
+ m->tx_handles_reserved -= h2_beam_get_files_beamed(output);
+ h2_beam_on_file_beam(output, can_beam_file, m);
+ h2_beam_mutex_set(output, io_mutex_enter, io_mutex_leave,
+ io->task->cond, m);
}
- else {
- status = APR_ECONNABORTED;
+ h2_io_set_response(io, response, output);
+
+ h2_ilist_add(m->ready_ios, io);
+ if (response && response->http_status < 300) {
+ /* we might see some file buckets in the output, see
+ * if we have enough handles reserved. */
+ check_tx_reservation(m);
}
+ have_out_data_for(m, stream_id);
return status;
}
apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response,
- ap_filter_t* f, apr_bucket_brigade *bb,
- struct apr_thread_cond_t *iowait)
+ h2_bucket_beam *output)
{
apr_status_t status;
int acquired;
@@ -793,37 +648,7 @@ apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response,
status = APR_ECONNABORTED;
}
else {
- status = out_open(m, stream_id, response, f, bb, iowait);
- if (APLOGctrace1(m->c)) {
- h2_util_bb_log(m->c, stream_id, APLOG_TRACE1, "h2_mplx_out_open", bb);
- }
- }
- leave_mutex(m, acquired);
- }
- return status;
-}
-
-apr_status_t h2_mplx_out_write(h2_mplx *m, int stream_id,
- ap_filter_t* f, int blocking,
- apr_bucket_brigade *bb,
- struct apr_thread_cond_t *iowait)
-{
- apr_status_t status;
- int acquired;
-
- AP_DEBUG_ASSERT(m);
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io && !io->orphaned) {
- status = out_write(m, io, f, blocking, bb, iowait);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
- "h2_mplx(%ld-%d): write", m->id, io->id);
- H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_write");
-
- have_out_data_for(m, stream_id);
- }
- else {
- status = APR_ECONNABORTED;
+ status = out_open(m, stream_id, response, output);
}
leave_mutex(m, acquired);
}
@@ -837,7 +662,7 @@ apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id)
AP_DEBUG_ASSERT(m);
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+ h2_io *io = h2_ilist_get(m->stream_ios, stream_id);
if (io && !io->orphaned) {
if (!io->response && !io->rst_error) {
/* In case a close comes before a response was created,
@@ -846,45 +671,20 @@ apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id)
*/
h2_response *r = h2_response_die(stream_id, APR_EGENERAL,
io->request, m->pool);
- status = out_open(m, stream_id, r, NULL, NULL, NULL);
+ status = out_open(m, stream_id, r, NULL);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c,
"h2_mplx(%ld-%d): close, no response, no rst",
m->id, io->id);
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
- "h2_mplx(%ld-%d): close with eor=%s",
- m->id, io->id, io->eor? "yes" : "no");
- status = h2_io_out_close(io);
- H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_close");
- io_out_consumed_signal(m, io);
-
- have_out_data_for(m, stream_id);
- }
- else {
- status = APR_ECONNABORTED;
- }
- leave_mutex(m, acquired);
- }
- return status;
-}
-
-apr_status_t h2_mplx_out_rst(h2_mplx *m, int stream_id, int error)
-{
- apr_status_t status;
- int acquired;
-
- AP_DEBUG_ASSERT(m);
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io && !io->rst_error && !io->orphaned) {
- h2_io_rst(io, error);
- if (!io->response) {
- h2_io_set_add(m->ready_ios, io);
+ "h2_mplx(%ld-%d): close", m->id, io->id);
+ if (io->beam_out) {
+ status = h2_beam_close(io->beam_out);
+ h2_beam_log(io->beam_out, stream_id, "out_close", m->c,
+ APLOG_TRACE2);
}
- H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_rst");
-
+ io_out_consumed_signal(m, io);
have_out_data_for(m, stream_id);
- h2_io_signal(io, H2_IO_WRITE);
}
else {
status = APR_ECONNABORTED;
@@ -894,26 +694,6 @@ apr_status_t h2_mplx_out_rst(h2_mplx *m, int stream_id, int error)
return status;
}
-int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id)
-{
- apr_status_t status;
- int has_data = 0;
- int acquired;
-
- AP_DEBUG_ASSERT(m);
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io && !io->orphaned) {
- has_data = h2_io_out_has_data(io);
- }
- else {
- has_data = 0;
- }
- leave_mutex(m, acquired);
- }
- return has_data;
-}
-
apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
apr_thread_cond_t *iowait)
{
@@ -969,22 +749,7 @@ apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx)
return status;
}
-static h2_io *open_io(h2_mplx *m, int stream_id, const h2_request *request)
-{
- apr_pool_t *io_pool;
- h2_io *io;
-
- apr_pool_create(&io_pool, m->pool);
- apr_pool_tag(io_pool, "h2_io");
- io = h2_io_create(stream_id, io_pool, m->bucket_alloc, request);
- h2_io_set_add(m->stream_ios, io);
-
- return io;
-}
-
-
-apr_status_t h2_mplx_process(h2_mplx *m, int stream_id,
- const h2_request *req,
+apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream,
h2_stream_pri_cmp *cmp, void *ctx)
{
apr_status_t status;
@@ -997,24 +762,38 @@ apr_status_t h2_mplx_process(h2_mplx *m, int stream_id,
status = APR_ECONNABORTED;
}
else {
- h2_io *io = open_io(m, stream_id, req);
+ apr_pool_t *io_pool;
+ h2_io *io;
- if (!io->request->body) {
- status = h2_io_in_close(io);
+ if (!m->need_registration) {
+ m->need_registration = h2_iq_empty(m->q);
}
-
- m->need_registration = m->need_registration || h2_iq_empty(m->q);
- do_registration = (m->need_registration && m->workers_busy < m->workers_max);
+ if (m->workers_busy < m->workers_max) {
+ do_registration = m->need_registration;
+ }
+
+ io_pool = m->spare_io_pool;
+ if (io_pool) {
+ m->spare_io_pool = NULL;
+ }
+ else {
+ apr_pool_create(&io_pool, m->pool);
+ apr_pool_tag(io_pool, "h2_io");
+ }
+ io = h2_io_create(stream->id, io_pool, stream->request);
+ h2_ilist_add(m->stream_ios, io);
h2_iq_add(m->q, io->id, cmp, ctx);
+ stream->input = io->beam_in;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
- "h2_mplx(%ld-%d): process", m->c->id, stream_id);
- H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_process");
+ "h2_mplx(%ld-%d): process, body=%d",
+ m->c->id, stream->id, io->request->body);
}
leave_mutex(m, acquired);
}
- if (status == APR_SUCCESS && do_registration) {
- workers_register(m);
+ if (do_registration) {
+ m->need_registration = 0;
+ h2_workers_register(m->workers, m);
}
return status;
}
@@ -1022,20 +801,24 @@ apr_status_t h2_mplx_process(h2_mplx *m, int stream_id,
static h2_task *pop_task(h2_mplx *m)
{
h2_task *task = NULL;
+ h2_io *io;
int sid;
- while (!m->aborted && !task
- && (m->workers_busy < m->workers_limit)
- && (sid = h2_iq_shift(m->q)) > 0) {
- h2_io *io = h2_io_set_get(m->stream_ios, sid);
- if (io && io->orphaned) {
- io_destroy(m, io, 0);
- if (m->join_wait) {
- apr_thread_cond_signal(m->join_wait);
- }
- }
- else if (io) {
+ while (!m->aborted && !task && (m->workers_busy < m->workers_limit)
+ && (sid = h2_iq_shift(m->q)) > 0) {
+
+ io = h2_ilist_get(m->stream_ios, sid);
+ if (io) {
conn_rec *slave, **pslave;
+ if (io->orphaned) {
+ /* TODO: add to purge list */
+ io_destroy(m, io, 0);
+ if (m->join_wait) {
+ apr_thread_cond_signal(m->join_wait);
+ }
+ continue;
+ }
+
pslave = (conn_rec **)apr_array_pop(m->spare_slaves);
if (pslave) {
slave = *pslave;
@@ -1046,12 +829,21 @@ static h2_task *pop_task(h2_mplx *m)
}
slave->sbh = m->c->sbh;
- io->task = task = h2_task_create(m->id, io->request, slave, m);
+ io->task = task = h2_task_create(slave, io->request,
+ io->beam_in, m);
m->c->keepalives++;
apr_table_setn(slave->notes, H2_TASK_ID_NOTE, task->id);
io->worker_started = 1;
io->started_at = apr_time_now();
+
+ if (io->beam_in) {
+ h2_beam_timeout_set(io->beam_in, m->stream_timeout);
+ h2_beam_on_consumed(io->beam_in, stream_input_consumed, m);
+ h2_beam_on_file_beam(io->beam_in, can_beam_file, m);
+ h2_beam_mutex_set(io->beam_in, io_mutex_enter,
+ io_mutex_leave, task->cond, m);
+ }
if (sid > m->max_stream_started) {
m->max_stream_started = sid;
}
@@ -1088,7 +880,7 @@ h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more)
static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
{
if (task) {
- h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id);
+ h2_io *io = h2_ilist_get(m->stream_ios, task->stream_id);
if (task->frozen) {
/* this task was handed over to an engine for processing
@@ -1112,14 +904,17 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
h2_mplx_out_close(m, task->stream_id);
if (ngn && io) {
- apr_off_t bytes = io->output_consumed + h2_io_out_length(io);
+ apr_off_t bytes = 0;
+ if (io->beam_out) {
+ h2_beam_send(io->beam_out, NULL, APR_NONBLOCK_READ);
+ bytes += h2_beam_get_buffered(io->beam_out);
+ }
if (bytes > 0) {
/* we need to report consumed and current buffered output
* to the engine. The request will be streamed out or cancelled,
* no more data is coming from it and the engine should update
* its calculations before we destroy this information. */
h2_req_engine_out_consumed(ngn, task->c, bytes);
- io->output_consumed = 0;
}
}
@@ -1136,10 +931,10 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
if (io) {
apr_time_t now = apr_time_now();
if (!io->orphaned && m->redo_ios
- && h2_io_set_get(m->redo_ios, io->id)) {
+ && h2_ilist_get(m->redo_ios, io->id)) {
/* reset and schedule again */
h2_io_redo(io);
- h2_io_set_remove(m->redo_ios, io);
+ h2_ilist_remove(m->redo_ios, io->id);
h2_iq_add(m->q, io->id, NULL, NULL);
}
else {
@@ -1168,6 +963,7 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
}
if (io->orphaned) {
+ /* TODO: add to purge list */
io_destroy(m, io, 0);
if (m->join_wait) {
apr_thread_cond_signal(m->join_wait);
@@ -1211,12 +1007,12 @@ typedef struct {
apr_time_t now;
} io_iter_ctx;
-static int latest_repeatable_busy_unsubmitted_iter(void *data, h2_io *io)
+static int latest_repeatable_busy_unsubmitted_iter(void *data, void *val)
{
io_iter_ctx *ctx = data;
+ h2_io *io = val;
if (io->worker_started && !io->worker_done
- && h2_io_is_repeatable(io)
- && !h2_io_set_get(ctx->m->redo_ios, io->id)) {
+ && h2_io_can_redo(io) && !h2_ilist_get(ctx->m->redo_ios, io->id)) {
/* this io occupies a worker, the response has not been submitted yet,
* not been cancelled and it is a repeatable request
* -> it can be re-scheduled later */
@@ -1233,13 +1029,14 @@ static h2_io *get_latest_repeatable_busy_unsubmitted_io(h2_mplx *m)
io_iter_ctx ctx;
ctx.m = m;
ctx.io = NULL;
- h2_io_set_iter(m->stream_ios, latest_repeatable_busy_unsubmitted_iter, &ctx);
+ h2_ilist_iter(m->stream_ios, latest_repeatable_busy_unsubmitted_iter, &ctx);
return ctx.io;
}
-static int timed_out_busy_iter(void *data, h2_io *io)
+static int timed_out_busy_iter(void *data, void *val)
{
io_iter_ctx *ctx = data;
+ h2_io *io = val;
if (io->worker_started && !io->worker_done
&& (ctx->now - io->started_at) > ctx->m->stream_timeout) {
/* timed out stream occupying a worker, found */
@@ -1254,7 +1051,7 @@ static h2_io *get_timed_out_busy_stream(h2_mplx *m)
ctx.m = m;
ctx.io = NULL;
ctx.now = apr_time_now();
- h2_io_set_iter(m->stream_ios, timed_out_busy_iter, &ctx);
+ h2_ilist_iter(m->stream_ios, timed_out_busy_iter, &ctx);
return ctx.io;
}
@@ -1264,19 +1061,19 @@ static apr_status_t unschedule_slow_ios(h2_mplx *m)
int n;
if (!m->redo_ios) {
- m->redo_ios = h2_io_set_create(m->pool);
+ m->redo_ios = h2_ilist_create(m->pool);
}
/* Try to get rid of streams that occupy workers. Look for safe requests
* that are repeatable. If none found, fail the connection.
*/
- n = (m->workers_busy - m->workers_limit - h2_io_set_size(m->redo_ios));
+ n = (m->workers_busy - m->workers_limit - h2_ilist_count(m->redo_ios));
while (n > 0 && (io = get_latest_repeatable_busy_unsubmitted_io(m))) {
- h2_io_set_add(m->redo_ios, io);
+ h2_ilist_add(m->redo_ios, io);
h2_io_rst(io, H2_ERR_CANCEL);
--n;
}
- if ((m->workers_busy - h2_io_set_size(m->redo_ios)) > m->workers_limit) {
+ if ((m->workers_busy - h2_ilist_count(m->redo_ios)) > m->workers_limit) {
io = get_timed_out_busy_stream(m);
if (io) {
/* Too many busy workers, unable to cancel enough streams
@@ -1295,7 +1092,7 @@ apr_status_t h2_mplx_idle(h2_mplx *m)
int acquired;
if (enter_mutex(m, &acquired) == APR_SUCCESS) {
- apr_size_t scount = h2_io_set_size(m->stream_ios);
+ apr_size_t scount = h2_ilist_count(m->stream_ios);
if (scount > 0 && m->workers_busy) {
/* If we have streams in connection state 'IDLE', meaning
* all streams are ready to sent data out, but lack
@@ -1350,9 +1147,10 @@ typedef struct {
int streams_updated;
} ngn_update_ctx;
-static int ngn_update_window(void *ctx, h2_io *io)
+static int ngn_update_window(void *ctx, void *val)
{
ngn_update_ctx *uctx = ctx;
+ h2_io *io = val;
if (io && io->task && io->task->assigned == uctx->ngn
&& io_out_consumed_signal(uctx->m, io)) {
++uctx->streams_updated;
@@ -1367,7 +1165,7 @@ static apr_status_t ngn_out_update_windows(h2_mplx *m, h2_req_engine *ngn)
ctx.m = m;
ctx.ngn = ngn;
ctx.streams_updated = 0;
- h2_io_set_iter(m->stream_ios, ngn_update_window, &ctx);
+ h2_ilist_iter(m->stream_ios, ngn_update_window, &ctx);
return ctx.streams_updated? APR_SUCCESS : APR_EAGAIN;
}
@@ -1389,7 +1187,7 @@ apr_status_t h2_mplx_req_engine_push(const char *ngn_type,
task->r = r;
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id);
+ h2_io *io = h2_ilist_get(m->stream_ios, task->stream_id);
if (!io || io->orphaned) {
status = APR_ECONNABORTED;
}