diff options
author | Stefan Eissing <icing@apache.org> | 2015-11-16 18:29:30 +0100 |
---|---|---|
committer | Stefan Eissing <icing@apache.org> | 2015-11-16 18:29:30 +0100 |
commit | 181e0dbc89e393957c96a6c3e8d6f5f2f4571988 (patch) | |
tree | 3ad84518aa23535ef5d5eee133627bd1804525a9 /modules | |
parent | improvements in handling orphaned stream resources, where client prematurely ... (diff) | |
download | apache2-181e0dbc89e393957c96a6c3e8d6f5f2f4571988.tar.xz apache2-181e0dbc89e393957c96a6c3e8d6f5f2f4571988.zip |
task creation moved into h2_worker domain (pool+thread)
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1714635 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'modules')
-rw-r--r-- | modules/http2/h2_io.c | 36 | ||||
-rw-r--r-- | modules/http2/h2_io.h | 9 | ||||
-rw-r--r-- | modules/http2/h2_mplx.c | 67 | ||||
-rw-r--r-- | modules/http2/h2_mplx.h | 3 | ||||
-rw-r--r-- | modules/http2/h2_task.c | 8 | ||||
-rw-r--r-- | modules/http2/h2_task_queue.c | 45 | ||||
-rw-r--r-- | modules/http2/h2_task_queue.h | 38 | ||||
-rw-r--r-- | modules/http2/h2_worker.c | 29 | ||||
-rw-r--r-- | modules/http2/h2_worker.h | 4 | ||||
-rw-r--r-- | modules/http2/h2_workers.c | 4 |
10 files changed, 143 insertions, 100 deletions
diff --git a/modules/http2/h2_io.c b/modules/http2/h2_io.c index 205b99cc12..715bc5616c 100644 --- a/modules/http2/h2_io.c +++ b/modules/http2/h2_io.c @@ -32,19 +32,19 @@ h2_io *h2_io_create(int id, apr_pool_t *pool, apr_bucket_alloc_t *bucket_alloc) if (io) { io->id = id; io->pool = pool; + io->bucket_alloc = bucket_alloc; io->bbin = NULL; - io->bbout = apr_brigade_create(pool, bucket_alloc); + io->bbout = NULL; } return io; } -static void h2_io_cleanup(h2_io *io) -{ -} - void h2_io_destroy(h2_io *io) { - h2_io_cleanup(io); + if (io->pool) { + apr_pool_destroy(io->pool); + /* gone */ + } } void h2_io_set_response(h2_io *io, h2_response *response) @@ -72,7 +72,7 @@ int h2_io_in_has_eos_for(h2_io *io) int h2_io_out_has_data(h2_io *io) { - return h2_util_bb_has_data_or_eos(io->bbout); + return io->bbout && h2_util_bb_has_data_or_eos(io->bbout); } apr_off_t h2_io_out_length(h2_io *io) @@ -127,8 +127,7 @@ apr_status_t h2_io_in_write(h2_io *io, apr_bucket_brigade *bb) io->eos_in = h2_util_has_eos(bb, 0); if (!APR_BRIGADE_EMPTY(bb)) { if (!io->bbin) { - io->bbin = apr_brigade_create(io->bbout->p, - io->bbout->bucket_alloc); + io->bbin = apr_brigade_create(io->pool, io->bucket_alloc); } return h2_util_move(io->bbin, bb, 0, NULL, "h2_io_in_write"); } @@ -164,6 +163,11 @@ apr_status_t h2_io_out_readx(h2_io *io, *peos = 1; return APR_SUCCESS; } + else if (!io->bbout) { + *plen = 0; + *peos = 0; + return APR_EAGAIN; + } if (cb == NULL) { /* just checking length available */ @@ -191,7 +195,11 @@ apr_status_t h2_io_out_read_to(h2_io *io, apr_bucket_brigade *bb, *peos = 1; return APR_SUCCESS; } - + else if (!io->bbout) { + *plen = 0; + *peos = 0; + return APR_EAGAIN; + } io->eos_out = *peos = h2_util_has_eos(io->bbout, *plen); return h2_util_move(bb, io->bbout, *plen, NULL, "h2_io_read_to"); @@ -224,6 +232,10 @@ apr_status_t h2_io_out_write(h2_io *io, apr_bucket_brigade *bb, } return status; } + + if (!io->bbout) { + io->bbout = apr_brigade_create(io->pool, io->bucket_alloc); + } /* Let's move the buckets from the request processing in here, so * that the main thread can read them when it has time/capacity. @@ -236,7 +248,6 @@ apr_status_t h2_io_out_write(h2_io *io, apr_bucket_brigade *bb, * file handles. */ start_allowed = *pfile_handles_allowed; - status = h2_util_move(io->bbout, bb, maxlen, pfile_handles_allowed, "h2_io_out_write"); /* track # file buckets moved into our pool */ @@ -252,6 +263,9 @@ apr_status_t h2_io_out_close(h2_io *io) if (io->rst_error) { return APR_ECONNABORTED; } + if (!io->bbout) { + io->bbout = apr_brigade_create(io->pool, io->bucket_alloc); + } if (!io->eos_out && !h2_util_has_eos(io->bbout, 0)) { APR_BRIGADE_INSERT_TAIL(io->bbout, apr_bucket_eos_create(io->bbout->bucket_alloc)); diff --git a/modules/http2/h2_io.h b/modules/http2/h2_io.h index 1d08f43345..1fd1167747 100644 --- a/modules/http2/h2_io.h +++ b/modules/http2/h2_io.h @@ -18,7 +18,7 @@ struct h2_response; struct apr_thread_cond_t; -struct h2_task; +struct h2_request; typedef apr_status_t h2_io_data_cb(void *ctx, const char *data, apr_off_t len); @@ -34,9 +34,9 @@ struct h2_io { int orphaned; /* h2_stream is gone for this io */ int task_done; - struct h2_task *task; /* task created for this io */ - - struct h2_response *response;/* submittable response created */ + const struct h2_request *request; /* request on this io */ + int request_body; /* == 0 iff request has no body */ + struct h2_response *response;/* response for submit, once created */ int rst_error; int eos_in; @@ -46,6 +46,7 @@ struct h2_io { int eos_out; apr_bucket_brigade *bbout; /* output data from stream */ + apr_bucket_alloc_t *bucket_alloc; struct apr_thread_cond_t *output_drained; /* block on writing */ int files_handles_owned; diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index 9d0b33b913..253868461b 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -41,6 +41,7 @@ #include "h2_task_input.h" #include "h2_task_output.h" #include "h2_task_queue.h" +#include "h2_worker.h" #include "h2_workers.h" #include "h2_util.h" @@ -228,14 +229,8 @@ void h2_mplx_abort(h2_mplx *m) static void io_destroy(h2_mplx *m, h2_io *io) { apr_pool_t *pool = io->pool; - if (pool) { - io->pool = NULL; - apr_pool_clear(pool); - if (m->spare_pool) { - apr_pool_destroy(m->spare_pool); - } - m->spare_pool = pool; - } + + io->pool = NULL; /* The pool is cleared/destroyed which also closes all * allocated file handles. Give this count back to our * file handle pool. */ @@ -243,6 +238,14 @@ static void io_destroy(h2_mplx *m, h2_io *io) h2_io_set_remove(m->stream_ios, io); h2_io_set_remove(m->ready_ios, io); h2_io_destroy(io); + + if (pool) { + apr_pool_clear(pool); + if (m->spare_pool) { + apr_pool_destroy(m->spare_pool); + } + m->spare_pool = pool; + } } apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error) @@ -263,8 +266,8 @@ apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error) if (io) { /* Remove io from ready set, we will never submit it */ h2_io_set_remove(m->ready_ios, io); - - if (io->task_done) { + if (io->task_done || h2_tq_remove(m->q, io->id)) { + /* already finished or not even started yet */ io_destroy(m, io); } else { @@ -274,6 +277,7 @@ apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error) h2_io_rst(io, rst_error); } } + } apr_thread_mutex_unlock(m->lock); @@ -812,17 +816,6 @@ static void have_out_data_for(h2_mplx *m, int stream_id) } } -typedef struct { - h2_stream_pri_cmp *cmp; - void *ctx; -} cmp_ctx; - -static int task_cmp(h2_task *t1, h2_task *t2, void *ctx) -{ - cmp_ctx *x = ctx; - return x->cmp(t1->stream_id, t2->stream_id, x->ctx); -} - apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx) { apr_status_t status; @@ -833,11 +826,7 @@ apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx) } status = apr_thread_mutex_lock(m->lock); if (APR_SUCCESS == status) { - cmp_ctx x; - - x.cmp = cmp; - x.ctx = ctx; - h2_tq_sort(m->q, task_cmp, &x); + h2_tq_sort(m->q, cmp, ctx); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld): reprioritize tasks", m->id); @@ -878,19 +867,15 @@ apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, } status = apr_thread_mutex_lock(m->lock); if (APR_SUCCESS == status) { - h2_io *io; - cmp_ctx x; - - io = open_io(m, stream_id); - io->task = h2_task_create(m->id, req, io->pool, m, eos); + h2_io *io = open_io(m, stream_id); + io->request = req; + io->request_body = !eos; if (eos) { status = h2_io_in_close(io); } - x.cmp = cmp; - x.ctx = ctx; - h2_tq_add(m->q, io->task, task_cmp, &x); + h2_tq_add(m->q, io->id, cmp, ctx); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c, "h2_mplx(%ld-%d): process", m->c->id, stream_id); @@ -903,10 +888,11 @@ apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, return status; } -h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more) +h2_task *h2_mplx_pop_task(h2_mplx *m, h2_worker *w, int *has_more) { h2_task *task = NULL; apr_status_t status; + AP_DEBUG_ASSERT(m); if (m->aborted) { *has_more = 0; @@ -914,14 +900,17 @@ h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more) } status = apr_thread_mutex_lock(m->lock); if (APR_SUCCESS == status) { - task = h2_tq_shift(m->q); - *has_more = !h2_tq_empty(m->q); - if (task) { + int sid; + while (!task && (sid = h2_tq_shift(m->q)) > 0) { /* Anything not already setup correctly in the task * needs to be so now, as task will be executed right about * when this method returns. */ - + h2_io *io = h2_io_set_get(m->stream_ios, sid); + if (io) { + task = h2_worker_create_task(w, m, io->request, !io->request_body); + } } + *has_more = !h2_tq_empty(m->q); apr_thread_mutex_unlock(m->lock); } return task; diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h index fc68b1addc..60aa74d535 100644 --- a/modules/http2/h2_mplx.h +++ b/modules/http2/h2_mplx.h @@ -44,6 +44,7 @@ struct h2_stream; struct h2_request; struct h2_io_set; struct apr_thread_cond_t; +struct h2_worker; struct h2_workers; struct h2_stream_set; struct h2_task_queue; @@ -169,7 +170,7 @@ apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, */ apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx); -struct h2_task *h2_mplx_pop_task(h2_mplx *mplx, int *has_more); +struct h2_task *h2_mplx_pop_task(h2_mplx *mplx, struct h2_worker *w, int *has_more); /******************************************************************************* * Input handling of streams. diff --git a/modules/http2/h2_task.c b/modules/http2/h2_task.c index 4dacc04d3e..b7d48a1bf6 100644 --- a/modules/http2/h2_task.c +++ b/modules/http2/h2_task.c @@ -167,8 +167,9 @@ h2_task *h2_task_create(long session_id, const h2_request *req, task->id = apr_psprintf(pool, "%ld-%d", session_id, req->id); task->stream_id = req->id; + task->pool = pool; task->mplx = mplx; - task->c = h2_conn_create(mplx->c, pool); + task->c = h2_conn_create(mplx->c, task->pool); task->request = req; task->input_eos = eos; @@ -193,8 +194,7 @@ apr_status_t h2_task_do(h2_task *task, h2_worker *worker) status = h2_worker_setup_task(worker, task); - /* save in connection that this one is a pseudo connection, prevents - * other hooks from messing with it. */ + /* save in connection that this one is a pseudo connection */ h2_ctx_create_for(task->c, task); if (status == APR_SUCCESS) { @@ -228,8 +228,8 @@ apr_status_t h2_task_do(h2_task *task, h2_worker *worker) apr_thread_cond_signal(task->io); } - h2_worker_release_task(worker, task); h2_mplx_task_done(task->mplx, task->stream_id); + h2_worker_release_task(worker, task); return status; } diff --git a/modules/http2/h2_task_queue.c b/modules/http2/h2_task_queue.c index 1653aa26e2..23bad194b9 100644 --- a/modules/http2/h2_task_queue.c +++ b/modules/http2/h2_task_queue.c @@ -46,8 +46,7 @@ int h2_tq_empty(h2_task_queue *q) return q->nelts == 0; } -void h2_tq_add(h2_task_queue *q, struct h2_task *task, - h2_tq_cmp *cmp, void *ctx) +void h2_tq_add(h2_task_queue *q, int sid, h2_tq_cmp *cmp, void *ctx) { int i; @@ -56,13 +55,33 @@ void h2_tq_add(h2_task_queue *q, struct h2_task *task, } i = (q->head + q->nelts) % q->nalloc; - q->elts[i] = task; + q->elts[i] = sid; ++q->nelts; /* bubble it to the front of the queue */ tq_bubble_up(q, i, q->head, cmp, ctx); } +int h2_tq_remove(h2_task_queue *q, int sid) +{ + int i; + for (i = 0; i < q->nelts; ++i) { + if (sid == q->elts[(q->head + i) % q->nalloc]) { + break; + } + } + + if (i < q->nelts) { + ++i; + for (; i < q->nelts; ++i) { + q->elts[(q->head+i-1)%q->nalloc] = q->elts[(q->head+i)%q->nalloc]; + } + --q->nelts; + return 1; + } + return 0; +} + void h2_tq_sort(h2_task_queue *q, h2_tq_cmp *cmp, void *ctx) { /* Assume that changes in ordering are minimal. This needs, @@ -91,34 +110,34 @@ void h2_tq_sort(h2_task_queue *q, h2_tq_cmp *cmp, void *ctx) } -h2_task *h2_tq_shift(h2_task_queue *q) +int h2_tq_shift(h2_task_queue *q) { - h2_task *t; + int sid; if (q->nelts <= 0) { - return NULL; + return 0; } - t = q->elts[q->head]; + sid = q->elts[q->head]; q->head = (q->head + 1) % q->nalloc; q->nelts--; - return t; + return sid; } static void tq_grow(h2_task_queue *q, int nlen) { AP_DEBUG_ASSERT(q->nalloc <= nlen); if (nlen > q->nalloc) { - h2_task **nq = apr_pcalloc(q->pool, sizeof(h2_task *) * nlen); + int *nq = apr_pcalloc(q->pool, sizeof(h2_task *) * nlen); if (q->nelts > 0) { int l = ((q->head + q->nelts) % q->nalloc) - q->head; - memmove(nq, q->elts + q->head, sizeof(h2_task *) * l); + memmove(nq, q->elts + q->head, sizeof(int) * l); if (l < q->nelts) { /* elts wrapped, append elts in [0, remain] to nq */ int remain = q->nelts - l; - memmove(nq + l, q->elts, sizeof(h2_task *) * remain); + memmove(nq + l, q->elts, sizeof(int) * remain); } } q->elts = nq; @@ -129,9 +148,9 @@ static void tq_grow(h2_task_queue *q, int nlen) static void tq_swap(h2_task_queue *q, int i, int j) { - h2_task *t = q->elts[i]; + int x = q->elts[i]; q->elts[i] = q->elts[j]; - q->elts[j] = t; + q->elts[j] = x; } static int tq_bubble_up(h2_task_queue *q, int i, int top, diff --git a/modules/http2/h2_task_queue.h b/modules/http2/h2_task_queue.h index 36fad2c4d8..dcc46d037a 100644 --- a/modules/http2/h2_task_queue.h +++ b/modules/http2/h2_task_queue.h @@ -24,7 +24,7 @@ struct h2_task; typedef struct h2_task_queue h2_task_queue; struct h2_task_queue { - struct h2_task **elts; + int *elts; int head; int nelts; int nalloc; @@ -34,15 +34,15 @@ struct h2_task_queue { /** * Comparator for two task to determine their order. * - * @param t1 task to compare - * @param t2 task to compare + * @param s1 stream id to compare + * @param s2 stream id to compare * @param ctx provided user data * @return value is the same as for strcmp() and has the effect: - * == 0: t1 and t2 are treated equal in ordering - * < 0: t1 should be sorted before t2 - * > 0: t2 should be sorted before t1 + * == 0: s1 and s2 are treated equal in ordering + * < 0: s1 should be sorted before s2 + * > 0: s2 should be sorted before s1 */ -typedef int h2_tq_cmp(struct h2_task *t1, struct h2_task *t2, void *ctx); +typedef int h2_tq_cmp(int s1, int s2, void *ctx); /** @@ -59,18 +59,26 @@ h2_task_queue *h2_tq_create(apr_pool_t *pool, int capacity); int h2_tq_empty(h2_task_queue *q); /** - * Add the task to the queue. + * Add a stream idto the queue. * * @param q the queue to append the task to - * @param task the task to add + * @param sid the stream id to add * @param cmp the comparator for sorting * @param ctx user data for comparator */ -void h2_tq_add(h2_task_queue *q, struct h2_task *task, - h2_tq_cmp *cmp, void *ctx); +void h2_tq_add(h2_task_queue *q, int sid, h2_tq_cmp *cmp, void *ctx); /** - * Sort the tasks queue again. Call if the task ordering + * Remove the stream id from the queue. Return != 0 iff task + * was found in queue. + * @param q the task queue + * @param sid the stream id to remove + * @return != 0 iff task was found in queue + */ +int h2_tq_remove(h2_task_queue *q, int sid); + +/** + * Sort the stream idqueue again. Call if the task ordering * has changed. * * @param q the queue to sort @@ -80,12 +88,12 @@ void h2_tq_add(h2_task_queue *q, struct h2_task *task, void h2_tq_sort(h2_task_queue *q, h2_tq_cmp *cmp, void *ctx); /** - * Get the first task from the queue or NULL if the queue is empty. + * Get the first stream id from the queue or NULL if the queue is empty. * The task will be removed. * * @param q the queue to get the first task from - * @return the first task of the queue, NULL if empty + * @return the first stream id of the queue, 0 if empty */ -h2_task *h2_tq_shift(h2_task_queue *q); +int h2_tq_shift(h2_task_queue *q); #endif /* defined(__mod_h2__h2_task_queue__) */ diff --git a/modules/http2/h2_worker.c b/modules/http2/h2_worker.c index 0100421371..b11e8549ff 100644 --- a/modules/http2/h2_worker.c +++ b/modules/http2/h2_worker.c @@ -24,6 +24,7 @@ #include "h2_private.h" #include "h2_conn.h" #include "h2_mplx.h" +#include "h2_request.h" #include "h2_task.h" #include "h2_worker.h" @@ -123,8 +124,9 @@ h2_worker *h2_worker_create(int id, return NULL; } - apr_pool_pre_cleanup_register(pool, w, cleanup_join_thread); - apr_thread_create(&w->thread, attr, execute, w, pool); + apr_pool_pre_cleanup_register(w->pool, w, cleanup_join_thread); + apr_thread_create(&w->thread, attr, execute, w, w->pool); + apr_pool_create(&w->task_pool, w->pool); } return w; } @@ -157,18 +159,26 @@ int h2_worker_is_aborted(h2_worker *worker) return worker->aborted; } -apr_status_t h2_worker_setup_task(h2_worker *worker, h2_task *task) { - apr_status_t status; +h2_task *h2_worker_create_task(h2_worker *worker, h2_mplx *m, + const h2_request *req, int eos) +{ + h2_task *task; /* Create a subpool from the worker one to be used for all things * with life-time of this task execution. */ - apr_pool_create(&task->pool, worker->pool); - + task = h2_task_create(m->id, req, worker->task_pool, m, eos); /* Link the task to the worker which provides useful things such * as mutex, a socket etc. */ task->io = worker->io; + return task; +} + +apr_status_t h2_worker_setup_task(h2_worker *worker, h2_task *task) { + apr_status_t status; + + status = h2_conn_setup(task, apr_bucket_alloc_create(task->pool), worker->thread, worker->socket); @@ -178,11 +188,8 @@ apr_status_t h2_worker_setup_task(h2_worker *worker, h2_task *task) { void h2_worker_release_task(h2_worker *worker, struct h2_task *task) { task->io = NULL; - - if (task->pool) { - apr_pool_destroy(task->pool); - task->pool = NULL; - } + task->pool = NULL; + apr_pool_clear(worker->task_pool); } apr_socket_t *h2_worker_get_socket(h2_worker *worker) diff --git a/modules/http2/h2_worker.h b/modules/http2/h2_worker.h index ce934f745b..035448e5db 100644 --- a/modules/http2/h2_worker.h +++ b/modules/http2/h2_worker.h @@ -18,6 +18,7 @@ struct apr_thread_cond_t; struct h2_mplx; +struct h2_request; struct h2_task; /* h2_worker is a basically a apr_thread_t that reads fromt he h2_workers @@ -44,6 +45,7 @@ struct h2_worker { int id; apr_thread_t *thread; apr_pool_t *pool; + apr_pool_t *task_pool; struct apr_thread_cond_t *io; apr_socket_t *socket; @@ -141,6 +143,8 @@ int h2_worker_get_id(h2_worker *worker); int h2_worker_is_aborted(h2_worker *worker); +struct h2_task *h2_worker_create_task(h2_worker *worker, struct h2_mplx *m, + const struct h2_request *req, int eos); apr_status_t h2_worker_setup_task(h2_worker *worker, struct h2_task *task); void h2_worker_release_task(h2_worker *worker, struct h2_task *task); diff --git a/modules/http2/h2_workers.c b/modules/http2/h2_workers.c index bebbcd2d29..3c08ff35d1 100644 --- a/modules/http2/h2_workers.c +++ b/modules/http2/h2_workers.c @@ -79,7 +79,7 @@ static apr_status_t get_mplx_next(h2_worker *worker, h2_mplx **pm, if (*pm && ptask != NULL) { /* We have a h2_mplx instance and the worker wants the next task. * Try to get one from the given mplx. */ - *ptask = h2_mplx_pop_task(*pm, &has_more); + *ptask = h2_mplx_pop_task(*pm, worker, &has_more); if (*ptask) { return APR_SUCCESS; } @@ -124,7 +124,7 @@ static apr_status_t get_mplx_next(h2_worker *worker, h2_mplx **pm, m = H2_MPLX_LIST_FIRST(&workers->mplxs); H2_MPLX_REMOVE(m); - task = h2_mplx_pop_task(m, &has_more); + task = h2_mplx_pop_task(m, worker, &has_more); if (task) { if (has_more) { H2_MPLX_LIST_INSERT_TAIL(&workers->mplxs, m); |