diff options
author | Stefan Eissing <icing@apache.org> | 2017-03-30 18:05:06 +0200 |
---|---|---|
committer | Stefan Eissing <icing@apache.org> | 2017-03-30 18:05:06 +0200 |
commit | 5440a7562b4a53658a17592b5b1ec16f88df5484 (patch) | |
tree | 5a75beae121178dfc656c2c53f677b25ac8edea9 /modules/http2 | |
parent | On the trunk: (diff) | |
download | apache2-5440a7562b4a53658a17592b5b1ec16f88df5484.tar.xz apache2-5440a7562b4a53658a17592b5b1ec16f88df5484.zip |
On the trunk:
mod_http2: move stuff from master connection to worker threads, increase spare slave connections, create output beams in worker when needed.
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1789535 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'modules/http2')
-rw-r--r-- | modules/http2/h2_bucket_beam.c | 3 | ||||
-rw-r--r-- | modules/http2/h2_bucket_beam.h | 11 | ||||
-rw-r--r-- | modules/http2/h2_conn.c | 2 | ||||
-rw-r--r-- | modules/http2/h2_mplx.c | 118 | ||||
-rw-r--r-- | modules/http2/h2_mplx.h | 6 | ||||
-rw-r--r-- | modules/http2/h2_session.c | 4 | ||||
-rw-r--r-- | modules/http2/h2_stream.c | 24 | ||||
-rw-r--r-- | modules/http2/h2_task.c | 83 | ||||
-rw-r--r-- | modules/http2/h2_task.h | 7 |
9 files changed, 153 insertions, 105 deletions
diff --git a/modules/http2/h2_bucket_beam.c b/modules/http2/h2_bucket_beam.c index 7fb6cb6f79..17ad3d95f1 100644 --- a/modules/http2/h2_bucket_beam.c +++ b/modules/http2/h2_bucket_beam.c @@ -832,11 +832,10 @@ static apr_status_t append_bucket(h2_bucket_beam *beam, apr_bucket_file *bf = b->data; apr_file_t *fd = bf->fd; int can_beam = (bf->refcount.refcount == 1); - if (can_beam && beam->last_beamed != fd && beam->can_beam_fn) { + if (can_beam && beam->can_beam_fn) { can_beam = beam->can_beam_fn(beam->can_beam_ctx, beam, fd); } if (can_beam) { - beam->last_beamed = fd; status = apr_bucket_setaside(b, beam->send_pool); } /* else: enter ENOTIMPL case below */ diff --git a/modules/http2/h2_bucket_beam.h b/modules/http2/h2_bucket_beam.h index 18bc32629f..64117ff159 100644 --- a/modules/http2/h2_bucket_beam.h +++ b/modules/http2/h2_bucket_beam.h @@ -183,7 +183,6 @@ struct h2_bucket_beam { apr_size_t buckets_sent; /* # of beam buckets sent */ apr_size_t files_beamed; /* how many file handles have been set aside */ - apr_file_t *last_beamed; /* last file beamed */ unsigned int aborted : 1; unsigned int closed : 1; @@ -376,6 +375,16 @@ int h2_beam_report_consumption(h2_bucket_beam *beam); void h2_beam_on_produced(h2_bucket_beam *beam, h2_beam_io_callback *io_cb, void *ctx); +/** + * Register a callback that may prevent a file from being beam as + * file handle, forcing the file content to be copied. Then no callback + * is set (NULL), file handles are transferred directly. + * @param beam the beam to set the callback on + * @param io_cb the callback or NULL, called on receiver with bytes produced + * @param ctx the context to use in callback invocation + * + * Call from the receiver side, callbacks invoked on either side. + */ void h2_beam_on_file_beam(h2_bucket_beam *beam, h2_beam_can_beam_callback *cb, void *ctx); diff --git a/modules/http2/h2_conn.c b/modules/http2/h2_conn.c index 73c838e0e8..fcf6bad4d4 100644 --- a/modules/http2/h2_conn.c +++ b/modules/http2/h2_conn.c @@ -308,6 +308,7 @@ conn_rec *h2_slave_create(conn_rec *master, int slave_id, apr_pool_t *parent) master->id, slave_id); /* Simulate that we had already a request on this connection. */ c->keepalives = 1; + c->aborted = 0; /* We cannot install the master connection socket on the slaves, as * modules mess with timeouts/blocking of the socket, with * unwanted side effects to the master connection processing. @@ -335,6 +336,7 @@ void h2_slave_destroy(conn_rec *slave) ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, slave, "h2_stream(%s): destroy slave", apr_table_get(slave->notes, H2_TASK_ID_NOTE)); + slave->sbh = NULL; apr_pool_destroy(slave->pool); } diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index 7eb101db49..78ae81365f 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -131,11 +131,6 @@ static void stream_input_consumed(void *ctx, h2_bucket_beam *beam, apr_off_t len h2_stream_in_consumed(ctx, length); } -static int can_always_beam_file(void *ctx, h2_bucket_beam *beam, apr_file_t *file) -{ - return 1; -} - static void stream_joined(h2_mplx *m, h2_stream *stream) { ap_assert(!stream->task || stream->task->worker_done); @@ -152,8 +147,10 @@ static void stream_cleanup(h2_mplx *m, h2_stream *stream) h2_beam_on_consumed(stream->input, NULL, NULL, NULL); h2_beam_abort(stream->input); } - h2_beam_on_produced(stream->output, NULL, NULL); - h2_beam_leave(stream->output); + if (stream->output) { + h2_beam_on_produced(stream->output, NULL, NULL); + h2_beam_leave(stream->output); + } h2_stream_cleanup(stream); @@ -246,10 +243,10 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, m->readyq = h2_iq_create(m->pool, m->max_streams); m->workers = workers; - m->workers_max = workers->max_workers; - m->workers_limit = 6; /* the original h1 max parallel connections */ + m->max_active = workers->max_workers; + m->limit_active = 6; /* the original h1 max parallel connections */ m->last_limit_change = m->last_idle_block = apr_time_now(); - m->limit_change_interval = apr_time_from_msec(200); + m->limit_change_interval = apr_time_from_msec(100); m->spare_slaves = apr_array_make(m->pool, 10, sizeof(conn_rec*)); @@ -301,7 +298,7 @@ static void task_destroy(h2_mplx *m, h2_task *task) int reuse_slave = 0; slave = task->c; - reuse_slave = ((m->spare_slaves->nelts < m->spare_slaves->nalloc) + reuse_slave = ((m->spare_slaves->nelts < (m->limit_active * 3 / 2)) && !task->rst_error); if (slave) { @@ -328,12 +325,6 @@ static int stream_destroy_iter(void *ctx, void *val) h2_ihash_remove(m->spurge, stream->id); ap_assert(stream->state == H2_SS_CLEANUP); - if (stream->output == NULL) { - ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, m->c, - H2_STRM_MSG(stream, "already with beams==NULL")); - return 0; - } - if (stream->input) { /* Process outstanding events before destruction */ input_consumed_signal(m, stream); @@ -341,10 +332,7 @@ static int stream_destroy_iter(void *ctx, void *val) h2_beam_destroy(stream->input); stream->input = NULL; } - - h2_beam_log(stream->output, m->c, APLOG_TRACE2, "stream_destroy"); - h2_beam_destroy(stream->output); - stream->output = NULL; + if (stream->task) { task_destroy(m, stream->task); stream->task = NULL; @@ -547,10 +535,13 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam) apr_status_t status = APR_SUCCESS; h2_stream *stream = h2_ihash_get(m->streams, stream_id); - if (!stream || !stream->task) { + if (!stream || !stream->task || m->aborted) { return APR_ECONNABORTED; } + ap_assert(stream->output == NULL); + stream->output = beam; + if (APLOGctrace2(m->c)) { h2_beam_log(beam, m->c, APLOG_TRACE2, "out_open"); } @@ -561,8 +552,8 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam) h2_beam_on_consumed(stream->output, NULL, stream_output_consumed, stream); h2_beam_on_produced(stream->output, output_produced, m); - if (!stream->task->output.copy_files) { - h2_beam_on_file_beam(stream->output, can_always_beam_file, m); + if (stream->task->output.copy_files) { + h2_beam_on_file_beam(stream->output, h2_beam_no_files, NULL); } /* time to protect the beam against multi-threaded use */ @@ -721,7 +712,7 @@ static h2_task *next_stream_task(h2_mplx *m) { h2_stream *stream; int sid; - while (!m->aborted && (m->workers_busy < m->workers_limit) + while (!m->aborted && (m->tasks_active < m->limit_active) && (sid = h2_iq_shift(m->q)) > 0) { stream = h2_ihash_get(m->streams, sid); @@ -731,36 +722,37 @@ static h2_task *next_stream_task(h2_mplx *m) pslave = (conn_rec **)apr_array_pop(m->spare_slaves); if (pslave) { slave = *pslave; + slave->aborted = 0; } else { slave = h2_slave_create(m->c, stream->id, m->pool); } - slave->sbh = m->c->sbh; - slave->aborted = 0; if (!stream->task) { - stream->task = h2_task_create(stream, slave); - + m->c->keepalives++; - apr_table_setn(slave->notes, H2_TASK_ID_NOTE, stream->task->id); - h2_slave_run_pre_connection(slave, ap_get_conn_socket(slave)); - if (sid > m->max_stream_started) { m->max_stream_started = sid; } - if (stream->input) { h2_beam_on_consumed(stream->input, stream_input_ev, stream_input_consumed, stream); - h2_beam_on_file_beam(stream->input, can_always_beam_file, m); - h2_beam_mutex_enable(stream->input); } - h2_beam_buffer_size_set(stream->output, m->stream_max_mem); + stream->task = h2_task_create(slave, stream->id, + stream->request, m, stream->input, + stream->session->s->timeout, + m->stream_max_mem); + if (!stream->task) { + ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, slave, + H2_STRM_LOG(APLOGNO(02941), stream, + "create task")); + return NULL; + } + } - stream->task->worker_started = 1; - stream->task->started_at = apr_time_now(); - ++m->workers_busy; + + ++m->tasks_active; return stream->task; } } @@ -841,14 +833,14 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) * a block by flow control. */ if (task->done_at- m->last_limit_change >= m->limit_change_interval - && m->workers_limit < m->workers_max) { + && m->limit_active < m->max_active) { /* Well behaving stream, allow it more workers */ - m->workers_limit = H2MIN(m->workers_limit * 2, - m->workers_max); + m->limit_active = H2MIN(m->limit_active * 2, + m->max_active); m->last_limit_change = task->done_at; ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld): increase worker limit to %d", - m->id, m->workers_limit); + m->id, m->limit_active); } } @@ -870,7 +862,9 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) h2_beam_mutex_disable(stream->input); h2_beam_leave(stream->input); } - h2_beam_mutex_disable(stream->output); + if (stream->output) { + h2_beam_mutex_disable(stream->output); + } check_data_for(m, stream->id); } else if ((stream = h2_ihash_get(m->shold, task->stream_id)) != NULL) { @@ -881,7 +875,9 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) h2_beam_mutex_disable(stream->input); h2_beam_leave(stream->input); } - h2_beam_mutex_disable(stream->output); + if (stream->output) { + h2_beam_mutex_disable(stream->output); + } stream_joined(m, stream); } else if ((stream = h2_ihash_get(m->spurge, task->stream_id)) != NULL) { @@ -903,7 +899,7 @@ void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask) if (enter_mutex(m, &acquired) == APR_SUCCESS) { task_done(m, task, NULL); - --m->workers_busy; + --m->tasks_active; if (m->join_wait) { apr_thread_cond_signal(m->join_wait); } @@ -982,14 +978,14 @@ static apr_status_t unschedule_slow_tasks(h2_mplx *m) /* Try to get rid of streams that occupy workers. Look for safe requests * that are repeatable. If none found, fail the connection. */ - n = (m->workers_busy - m->workers_limit - (int)h2_ihash_count(m->sredo)); + n = (m->tasks_active - m->limit_active - (int)h2_ihash_count(m->sredo)); while (n > 0 && (stream = get_latest_repeatable_unsubmitted_stream(m))) { h2_task_rst(stream->task, H2_ERR_CANCEL); h2_ihash_add(m->sredo, stream); --n; } - if ((m->workers_busy - h2_ihash_count(m->sredo)) > m->workers_limit) { + if ((m->tasks_active - h2_ihash_count(m->sredo)) > m->limit_active) { h2_stream *stream = get_timed_out_busy_stream(m); if (stream) { /* Too many busy workers, unable to cancel enough streams @@ -1009,7 +1005,7 @@ apr_status_t h2_mplx_idle(h2_mplx *m) if (enter_mutex(m, &acquired) == APR_SUCCESS) { apr_size_t scount = h2_ihash_count(m->streams); - if (scount > 0 && m->workers_busy) { + if (scount > 0 && m->tasks_active) { /* If we have streams in connection state 'IDLE', meaning * all streams are ready to sent data out, but lack * WINDOW_UPDATEs. @@ -1024,27 +1020,27 @@ apr_status_t h2_mplx_idle(h2_mplx *m) */ now = apr_time_now(); m->last_idle_block = now; - if (m->workers_limit > 2 + if (m->limit_active > 2 && now - m->last_limit_change >= m->limit_change_interval) { - if (m->workers_limit > 16) { - m->workers_limit = 16; + if (m->limit_active > 16) { + m->limit_active = 16; } - else if (m->workers_limit > 8) { - m->workers_limit = 8; + else if (m->limit_active > 8) { + m->limit_active = 8; } - else if (m->workers_limit > 4) { - m->workers_limit = 4; + else if (m->limit_active > 4) { + m->limit_active = 4; } - else if (m->workers_limit > 2) { - m->workers_limit = 2; + else if (m->limit_active > 2) { + m->limit_active = 2; } m->last_limit_change = now; ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld): decrease worker limit to %d", - m->id, m->workers_limit); + m->id, m->limit_active); } - if (m->workers_busy > m->workers_limit) { + if (m->tasks_active > m->limit_active) { status = unschedule_slow_tasks(m); } } @@ -1257,7 +1253,7 @@ int h2_mplx_awaits_data(h2_mplx *m) if (h2_ihash_empty(m->streams)) { waiting = 0; } - if (h2_iq_empty(m->readyq) && h2_iq_empty(m->q) && !m->workers_busy) { + if (h2_iq_empty(m->readyq) && h2_iq_empty(m->q) && !m->tasks_active) { waiting = 0; } leave_mutex(m, acquired); diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h index 23162e8eb8..82a98fce0a 100644 --- a/modules/http2/h2_mplx.h +++ b/modules/http2/h2_mplx.h @@ -74,9 +74,9 @@ struct h2_mplx { int max_streams; /* max # of concurrent streams */ int max_stream_started; /* highest stream id that started processing */ - int workers_busy; /* # of workers processing on this mplx */ - int workers_limit; /* current # of workers limit, dynamic */ - int workers_max; /* max, hard limit # of workers in a process */ + int tasks_active; /* # of tasks being processed from this mplx */ + int limit_active; /* current limit on active tasks, dynamic */ + int max_active; /* max, hard limit # of active tasks in a process */ apr_time_t last_idle_block; /* last time, this mplx entered IDLE while * streams were ready */ apr_time_t last_limit_change; /* last time, worker limit changed */ diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c index c7126beb95..f37741b61f 100644 --- a/modules/http2/h2_session.c +++ b/modules/http2/h2_session.c @@ -870,8 +870,8 @@ static apr_status_t h2_session_create_int(h2_session **psession, "push_diary(type=%d,N=%d)"), (int)session->max_stream_count, (int)session->max_stream_mem, - session->mplx->workers_limit, - session->mplx->workers_max, + session->mplx->limit_active, + session->mplx->max_active, session->push_diary->dtype, (int)session->push_diary->N); } diff --git a/modules/http2/h2_stream.c b/modules/http2/h2_stream.c index 0b66af84e7..7bf35aa3b2 100644 --- a/modules/http2/h2_stream.c +++ b/modules/http2/h2_stream.c @@ -241,7 +241,7 @@ static apr_status_t close_input(h2_stream *stream) static apr_status_t close_output(h2_stream *stream) { - if (h2_beam_is_closed(stream->output)) { + if (!stream->output || h2_beam_is_closed(stream->output)) { return APR_SUCCESS; } ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c, @@ -531,9 +531,6 @@ h2_stream *h2_stream_create(int id, apr_pool_t *pool, h2_session *session, stream->monitor = monitor; stream->max_mem = session->max_stream_mem; - h2_beam_create(&stream->output, pool, id, "output", H2_BEAM_OWNER_RECV, 0, - session->s->timeout); - #ifdef H2_NG2_LOCAL_WIN_SIZE stream->in_window_size = nghttp2_session_get_stream_local_window_size( @@ -607,7 +604,9 @@ void h2_stream_rst(h2_stream *stream, int error_code) if (stream->input) { h2_beam_abort(stream->input); } - h2_beam_leave(stream->output); + if (stream->output) { + h2_beam_leave(stream->output); + } ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c, H2_STRM_MSG(stream, "reset, error=%d"), error_code); h2_stream_dispatch(stream, H2_SEV_CANCELLED); @@ -777,6 +776,8 @@ apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen, *presponse = NULL; } + ap_assert(stream); + if (stream->rst_error) { *plen = 0; *peos = 1; @@ -785,7 +786,7 @@ apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen, c = stream->session->c; prep_output(stream); - + /* determine how much we'd like to send. We cannot send more than * is requested. But we can reduce the size in case the master * connection operates in smaller chunks. (TSL warmup) */ @@ -797,8 +798,15 @@ apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen, h2_util_bb_avail(stream->out_buffer, plen, peos); if (!*peos && *plen < requested && *plen < stream->max_mem) { H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "pre"); - status = h2_beam_receive(stream->output, stream->out_buffer, - APR_NONBLOCK_READ, stream->max_mem - *plen); + if (stream->output) { + status = h2_beam_receive(stream->output, stream->out_buffer, + APR_NONBLOCK_READ, + stream->max_mem - *plen); + } + else { + status = APR_EOF; + } + if (APR_STATUS_IS_EOF(status)) { apr_bucket *eos = apr_bucket_eos_create(c->bucket_alloc); APR_BRIGADE_INSERT_TAIL(stream->out_buffer, eos); diff --git a/modules/http2/h2_task.c b/modules/http2/h2_task.c index a93fce4cd2..5ab485faab 100644 --- a/modules/http2/h2_task.c +++ b/modules/http2/h2_task.c @@ -496,42 +496,44 @@ static int h2_task_pre_conn(conn_rec* c, void *arg) return OK; } -h2_task *h2_task_create(h2_stream *stream, conn_rec *slave) +h2_task *h2_task_create(conn_rec *slave, int stream_id, + const h2_request *req, h2_mplx *m, + h2_bucket_beam *input, + apr_interval_time_t timeout, + apr_size_t output_max_mem) { apr_pool_t *pool; h2_task *task; ap_assert(slave); - ap_assert(stream); - ap_assert(stream->request); + ap_assert(req); apr_pool_create(&pool, slave->pool); task = apr_pcalloc(pool, sizeof(h2_task)); if (task == NULL) { - ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, slave, - H2_STRM_LOG(APLOGNO(02941), stream, "create task")); return NULL; } - task->id = apr_psprintf(pool, "%ld-%d", - stream->session->id, stream->id); - task->stream_id = stream->id; + task->id = "000"; + task->stream_id = stream_id; task->c = slave; - task->mplx = stream->session->mplx; - task->c->keepalives = slave->master->keepalives; + task->mplx = m; task->pool = pool; - task->request = stream->request; - task->input.beam = stream->input; - task->output.beam = stream->output; - task->timeout = stream->session->s->timeout; - - h2_beam_send_from(stream->output, task->pool); - h2_ctx_create_for(slave, task); - + task->request = req; + task->timeout = timeout; + task->input.beam = input; + task->output.max_buffer = output_max_mem; + return task; } void h2_task_destroy(h2_task *task) { + if (task->output.beam) { + h2_beam_log(task->output.beam, task->c, APLOG_TRACE2, "task_destroy"); + h2_beam_destroy(task->output.beam); + task->output.beam = NULL; + } + if (task->eor) { apr_bucket_destroy(task->eor); } @@ -542,9 +544,14 @@ void h2_task_destroy(h2_task *task) apr_status_t h2_task_do(h2_task *task, apr_thread_t *thread, int worker_id) { + conn_rec *c; + ap_assert(task); - - if (task->c->master) { + c = task->c; + task->worker_started = 1; + task->started_at = apr_time_now(); + + if (c->master) { /* Each conn_rec->id is supposed to be unique at a point in time. Since * some modules (and maybe external code) uses this id as an identifier * for the request_rec they handle, it needs to be unique for slave @@ -562,6 +569,8 @@ apr_status_t h2_task_do(h2_task *task, apr_thread_t *thread, int worker_id) */ int slave_id, free_bits; + task->id = apr_psprintf(task->pool, "%ld-%d", c->master->id, + task->stream_id); if (sizeof(unsigned long) >= 8) { free_bits = 32; slave_id = task->stream_id; @@ -573,12 +582,31 @@ apr_status_t h2_task_do(h2_task *task, apr_thread_t *thread, int worker_id) free_bits = 8; slave_id = worker_id; } - task->c->id = (task->c->master->id << free_bits)^slave_id; + task->c->id = (c->master->id << free_bits)^slave_id; + c->keepalive = AP_CONN_KEEPALIVE; + } + + h2_beam_create(&task->output.beam, c->pool, task->stream_id, "output", + H2_BEAM_OWNER_SEND, 0, task->timeout); + if (!task->output.beam) { + return APR_ENOMEM; } - task->input.bb = apr_brigade_create(task->pool, task->c->bucket_alloc); + h2_beam_buffer_size_set(task->output.beam, task->output.max_buffer); + h2_beam_send_from(task->output.beam, task->pool); + + h2_ctx_create_for(c, task); + apr_table_setn(c->notes, H2_TASK_ID_NOTE, task->id); + + if (task->input.beam) { + h2_beam_mutex_enable(task->input.beam); + } + + h2_slave_run_pre_connection(c, ap_get_conn_socket(c)); + + task->input.bb = apr_brigade_create(task->pool, c->bucket_alloc); if (task->request->serialize) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c, + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, "h2_task(%s): serialize request %s %s", task->id, task->request->method, task->request->path); apr_brigade_printf(task->input.bb, NULL, @@ -588,20 +616,21 @@ apr_status_t h2_task_do(h2_task *task, apr_thread_t *thread, int worker_id) apr_brigade_puts(task->input.bb, NULL, NULL, "\r\n"); } - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c, + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, "h2_task(%s): process connection", task->id); + task->c->current_thread = thread; - ap_run_process_connection(task->c); + ap_run_process_connection(c); if (task->frozen) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c, + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, "h2_task(%s): process_conn returned frozen task", task->id); /* cleanup delayed */ return APR_EAGAIN; } else { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c, + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, "h2_task(%s): processing done", task->id); return output_finish(task); } diff --git a/modules/http2/h2_task.h b/modules/http2/h2_task.h index b2aaf80777..a0875574ec 100644 --- a/modules/http2/h2_task.h +++ b/modules/http2/h2_task.h @@ -73,6 +73,7 @@ struct h2_task { unsigned int copy_files : 1; struct h2_response_parser *rparser; apr_bucket_brigade *bb; + apr_size_t max_buffer; } output; struct h2_mplx *mplx; @@ -91,7 +92,11 @@ struct h2_task { struct h2_req_engine *assigned; /* engine that task has been assigned to */ }; -h2_task *h2_task_create(struct h2_stream *stream, conn_rec *slave); +h2_task *h2_task_create(conn_rec *slave, int stream_id, + const h2_request *req, struct h2_mplx *m, + struct h2_bucket_beam *input, + apr_interval_time_t timeout, + apr_size_t output_max_mem); void h2_task_destroy(h2_task *task); |