diff options
author | Stefan Eissing <icing@apache.org> | 2016-04-15 15:50:46 +0200 |
---|---|---|
committer | Stefan Eissing <icing@apache.org> | 2016-04-15 15:50:46 +0200 |
commit | 52cdae53be08ed37e5d73eac20a39d76f7617bcb (patch) | |
tree | e383ea1324254aee627932931ee9dd5250f0444f /modules/http2 | |
parent | http: Respond with "408 Request Timeout" when a timeout occurs while (diff) | |
download | apache2-52cdae53be08ed37e5d73eac20a39d76f7617bcb.tar.xz apache2-52cdae53be08ed37e5d73eac20a39d76f7617bcb.zip |
mod_http2: new bucket beams for tranporting buckets across threads without buffer copy. Code cleanup
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1739303 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'modules/http2')
38 files changed, 2657 insertions, 2830 deletions
diff --git a/modules/http2/NWGNUmod_http2 b/modules/http2/NWGNUmod_http2 index da470ecdc0..dd855d172a 100644 --- a/modules/http2/NWGNUmod_http2 +++ b/modules/http2/NWGNUmod_http2 @@ -185,6 +185,7 @@ TARGET_lib = \ # FILES_nlm_objs = \ $(OBJDIR)/h2_alt_svc.o \ + $(OBJDIR)/h2_bucket_beam.o \ $(OBJDIR)/h2_bucket_eoc.o \ $(OBJDIR)/h2_bucket_eos.o \ $(OBJDIR)/h2_config.o \ @@ -194,9 +195,7 @@ FILES_nlm_objs = \ $(OBJDIR)/h2_filter.o \ $(OBJDIR)/h2_from_h1.o \ $(OBJDIR)/h2_h2.o \ - $(OBJDIR)/h2_int_queue.o \ $(OBJDIR)/h2_io.o \ - $(OBJDIR)/h2_io_set.o \ $(OBJDIR)/h2_mplx.o \ $(OBJDIR)/h2_ngn_shed.o \ $(OBJDIR)/h2_push.o \ @@ -206,8 +205,6 @@ FILES_nlm_objs = \ $(OBJDIR)/h2_stream.o \ $(OBJDIR)/h2_switch.o \ $(OBJDIR)/h2_task.o \ - $(OBJDIR)/h2_task_input.o \ - $(OBJDIR)/h2_task_output.o \ $(OBJDIR)/h2_util.o \ $(OBJDIR)/h2_worker.o \ $(OBJDIR)/h2_workers.o \ diff --git a/modules/http2/config2.m4 b/modules/http2/config2.m4 index 1515c4dbcf..b4ba6da98a 100644 --- a/modules/http2/config2.m4 +++ b/modules/http2/config2.m4 @@ -20,6 +20,7 @@ dnl # list of module object files http2_objs="dnl mod_http2.lo dnl h2_alt_svc.lo dnl +h2_bucket_beam.lo dnl h2_bucket_eoc.lo dnl h2_bucket_eos.lo dnl h2_config.lo dnl @@ -29,9 +30,7 @@ h2_ctx.lo dnl h2_filter.lo dnl h2_from_h1.lo dnl h2_h2.lo dnl -h2_int_queue.lo dnl h2_io.lo dnl -h2_io_set.lo dnl h2_mplx.lo dnl h2_ngn_shed.lo dnl h2_push.lo dnl @@ -41,8 +40,6 @@ h2_session.lo dnl h2_stream.lo dnl h2_switch.lo dnl h2_task.lo dnl -h2_task_input.lo dnl -h2_task_output.lo dnl h2_util.lo dnl h2_worker.lo dnl h2_workers.lo dnl @@ -209,7 +206,6 @@ is usually linked shared and requires loading. ], $http2_objs, , most, [ dnl # list of module object files proxy_http2_objs="dnl mod_proxy_http2.lo dnl -h2_int_queue.lo dnl h2_proxy_session.lo dnl h2_request.lo dnl h2_util.lo dnl diff --git a/modules/http2/h2_bucket_beam.c b/modules/http2/h2_bucket_beam.c new file mode 100644 index 0000000000..5406176fad --- /dev/null +++ b/modules/http2/h2_bucket_beam.c @@ -0,0 +1,850 @@ +/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <apr_lib.h> +#include <apr_strings.h> +#include <apr_time.h> +#include <apr_buckets.h> +#include <apr_thread_mutex.h> +#include <apr_thread_cond.h> + +#include <httpd.h> + +#include "h2_util.h" +#include "h2_bucket_beam.h" + +static void h2_beam_emitted(h2_bucket_beam *beam, apr_bucket *bred); + +/******************************************************************************* + * beam bucket with reference to beam and bucket it represents + ******************************************************************************/ + +extern const apr_bucket_type_t h2_bucket_type_beam; + +#define H2_BUCKET_IS_BEAM(e) (e->type == &h2_bucket_type_beam) + +typedef struct { + apr_bucket_refcount refcount; + h2_bucket_beam *beam; + apr_bucket *bred; +} h2_beam_bucket; + +static const char Dummy = '\0'; + +static apr_status_t beam_bucket_read(apr_bucket *b, const char **str, + apr_size_t *len, apr_read_type_e block) +{ + h2_beam_bucket *d = b->data; + if (d->bred) { + const char *data; + apr_status_t status = apr_bucket_read(d->bred, &data, len, block); + if (status == APR_SUCCESS) { + *str = data + b->start; + *len = b->length; + } + return status; + } + *str = &Dummy; + *len = 0; + return APR_SUCCESS; +} + +static void beam_bucket_destroy(void *data) +{ + h2_beam_bucket *d = data; + + if (apr_bucket_shared_destroy(d)) { + if (d->bred) { + h2_beam_emitted(d->beam, d->bred); + } + apr_bucket_free(d); + } +} + +static apr_bucket * h2_beam_bucket_make(apr_bucket *b, + h2_bucket_beam *beam, + apr_bucket *bred) +{ + h2_beam_bucket *d; + + d = apr_bucket_alloc(sizeof(*d), b->list); + d->beam = beam; + d->bred = bred; + + b = apr_bucket_shared_make(b, d, 0, bred? bred->length : 0); + b->type = &h2_bucket_type_beam; + + return b; +} + +static apr_bucket *h2_beam_bucket_create(h2_bucket_beam *beam, + apr_bucket *bred, + apr_bucket_alloc_t *list) +{ + apr_bucket *b = apr_bucket_alloc(sizeof(*b), list); + + APR_BUCKET_INIT(b); + b->free = apr_bucket_free; + b->list = list; + return h2_beam_bucket_make(b, beam, bred); +} + +APU_DECLARE_DATA const apr_bucket_type_t h2_bucket_type_beam = { + "BEAM", 5, APR_BUCKET_DATA, + beam_bucket_destroy, + beam_bucket_read, + apr_bucket_setaside_noop, + apr_bucket_shared_split, + apr_bucket_shared_copy +}; + +/******************************************************************************* + * h2_blist, a brigade without allocations + ******************************************************************************/ + +apr_size_t h2_util_bl_print(char *buffer, apr_size_t bmax, + const char *tag, const char *sep, + h2_blist *bl) +{ + apr_size_t off = 0; + const char *sp = ""; + apr_bucket *b; + + if (bl) { + memset(buffer, 0, bmax--); + off += apr_snprintf(buffer+off, bmax-off, "%s(", tag); + for (b = H2_BLIST_FIRST(bl); + bmax && (b != H2_BLIST_SENTINEL(bl)); + b = APR_BUCKET_NEXT(b)) { + + off += h2_util_bucket_print(buffer+off, bmax-off, b, sp); + sp = " "; + } + off += apr_snprintf(buffer+off, bmax-off, ")%s", sep); + } + else { + off += apr_snprintf(buffer+off, bmax-off, "%s(null)%s", tag, sep); + } + return off; +} + + + +/******************************************************************************* + * bucket beam that can transport buckets across threads + ******************************************************************************/ + +static apr_status_t enter_yellow(h2_bucket_beam *beam, + apr_thread_mutex_t **plock, int *pacquired) +{ + if (beam->m_enter) { + return beam->m_enter(beam->m_ctx, plock, pacquired); + } + *plock = NULL; + *pacquired = 0; + return APR_SUCCESS; +} + +static void leave_yellow(h2_bucket_beam *beam, + apr_thread_mutex_t *lock, int acquired) +{ + if (acquired && beam->m_leave) { + beam->m_leave(beam->m_ctx, lock, acquired); + } +} + +static apr_off_t calc_buffered(h2_bucket_beam *beam) +{ + apr_off_t len = 0; + apr_bucket *b; + for (b = H2_BLIST_FIRST(&beam->red); + b != H2_BLIST_SENTINEL(&beam->red); + b = APR_BUCKET_NEXT(b)) { + if (b->length == ((apr_size_t)-1)) { + /* do not count */ + } + else if (APR_BUCKET_IS_FILE(b)) { + /* if unread, has no real mem footprint. how to test? */ + } + else { + len += b->length; + } + } + return len; +} + +static void r_purge_reds(h2_bucket_beam *beam) +{ + apr_bucket *bred; + /* delete all red buckets in purge brigade, needs to be called + * from red thread only */ + while (!H2_BLIST_EMPTY(&beam->purge)) { + bred = H2_BLIST_FIRST(&beam->purge); + apr_bucket_delete(bred); + } +} + +static apr_size_t calc_space_left(h2_bucket_beam *beam) +{ + if (beam->max_buf_size > 0) { + apr_off_t len = calc_buffered(beam); + return (beam->max_buf_size > len? (beam->max_buf_size - len) : 0); + } + return APR_SIZE_MAX; +} + +static apr_status_t wait_cond(h2_bucket_beam *beam, apr_thread_mutex_t *lock) +{ + if (beam->timeout > 0) { + return apr_thread_cond_timedwait(beam->m_cond, lock, beam->timeout); + } + else { + return apr_thread_cond_wait(beam->m_cond, lock); + } +} + +static apr_status_t r_wait_space(h2_bucket_beam *beam, apr_read_type_e block, + apr_thread_mutex_t *lock, apr_off_t *premain) +{ + *premain = calc_space_left(beam); + while (!beam->aborted && *premain <= 0 + && (block == APR_BLOCK_READ) && lock) { + apr_status_t status = wait_cond(beam, lock); + if (APR_STATUS_IS_TIMEUP(status)) { + return status; + } + r_purge_reds(beam); + *premain = calc_space_left(beam); + } + return beam->aborted? APR_ECONNABORTED : APR_SUCCESS; +} + +static void h2_beam_prep_purge(h2_bucket_beam *beam, apr_bucket *bred) +{ + APR_BUCKET_REMOVE(bred); + H2_BLIST_INSERT_TAIL(&beam->purge, bred); +} + +static void h2_beam_emitted(h2_bucket_beam *beam, apr_bucket *bred) +{ + apr_thread_mutex_t *lock; + int acquired; + + if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) { + /* even when beam buckets are split, only the one where + * refcount drops to 0 will call us */ + --beam->live_beam_buckets; + /* invoked from green thread, the last beam bucket for the red + * bucket bred is about to be destroyed. + * remove it from the hold, where it should be now */ + h2_beam_prep_purge(beam, bred); + /* notify anyone waiting on space to become available */ + if (!lock) { + r_purge_reds(beam); + } + else if (beam->m_cond) { + apr_thread_cond_broadcast(beam->m_cond); + } + leave_yellow(beam, lock, acquired); + } +} + +static void report_consumption(h2_bucket_beam *beam) +{ + if (beam->consumed_fn && (beam->received_bytes != beam->reported_bytes)) { + beam->consumed_fn(beam->consumed_ctx, beam, + beam->received_bytes - beam->reported_bytes); + beam->reported_bytes = beam->received_bytes; + } +} + +static void h2_blist_cleanup(h2_blist *bl) +{ + apr_bucket *e; + + while (!H2_BLIST_EMPTY(bl)) { + e = H2_BLIST_FIRST(bl); + apr_bucket_delete(e); + } +} + +static apr_status_t beam_cleanup(void *data) +{ + h2_bucket_beam *beam = data; + + AP_DEBUG_ASSERT(beam->live_beam_buckets == 0); + h2_blist_cleanup(&beam->red); + h2_blist_cleanup(&beam->purge); + h2_blist_cleanup(&beam->hold); + return APR_SUCCESS; +} + +apr_status_t h2_beam_destroy(h2_bucket_beam *beam) +{ + apr_pool_cleanup_kill(beam->life_pool, beam, beam_cleanup); + return beam_cleanup(beam); +} + +apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *life_pool, + int id, const char *tag, + apr_size_t max_buf_size) +{ + h2_bucket_beam *beam; + apr_status_t status = APR_SUCCESS; + + beam = apr_pcalloc(life_pool, sizeof(*beam)); + if (!beam) { + return APR_ENOMEM; + } + + beam->id = id; + beam->tag = tag; + H2_BLIST_INIT(&beam->red); + H2_BLIST_INIT(&beam->hold); + H2_BLIST_INIT(&beam->purge); + beam->life_pool = life_pool; + beam->max_buf_size = max_buf_size; + + apr_pool_cleanup_register(life_pool, beam, beam_cleanup, + apr_pool_cleanup_null); + *pbeam = beam; + + return status; +} + +void h2_beam_buffer_size_set(h2_bucket_beam *beam, apr_size_t buffer_size) +{ + apr_thread_mutex_t *lock; + int acquired; + + if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) { + beam->max_buf_size = buffer_size; + leave_yellow(beam, lock, acquired); + } +} + +apr_size_t h2_beam_buffer_size_get(h2_bucket_beam *beam) +{ + apr_thread_mutex_t *lock; + int acquired; + apr_size_t buffer_size = 0; + + if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) { + buffer_size = beam->max_buf_size; + leave_yellow(beam, lock, acquired); + } + return buffer_size; +} + +void h2_beam_mutex_set(h2_bucket_beam *beam, + h2_beam_mutex_enter m_enter, + h2_beam_mutex_leave m_leave, + apr_thread_cond_t *cond, + void *m_ctx) +{ + apr_thread_mutex_t *lock; + h2_beam_mutex_leave *prev_leave; + void *prev_ctx; + int acquired; + + if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) { + prev_ctx = beam->m_ctx; + prev_leave = beam->m_leave; + beam->m_enter = m_enter; + beam->m_leave = m_leave; + beam->m_ctx = m_ctx; + beam->m_cond = cond; + if (acquired && prev_leave) { + /* special tactics when NULLing a lock */ + prev_leave(prev_ctx, lock, acquired); + } + } +} + +void h2_beam_timeout_set(h2_bucket_beam *beam, apr_interval_time_t timeout) +{ + apr_thread_mutex_t *lock; + int acquired; + + if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) { + beam->timeout = timeout; + leave_yellow(beam, lock, acquired); + } +} + +apr_interval_time_t h2_beam_timeout_get(h2_bucket_beam *beam) +{ + apr_thread_mutex_t *lock; + int acquired; + apr_interval_time_t timeout = 0; + + if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) { + timeout = beam->timeout; + leave_yellow(beam, lock, acquired); + } + return timeout; +} + +void h2_beam_abort(h2_bucket_beam *beam) +{ + apr_thread_mutex_t *lock; + int acquired; + + if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) { + r_purge_reds(beam); + h2_blist_cleanup(&beam->red); + beam->aborted = 1; + report_consumption(beam); + if (beam->m_cond) { + apr_thread_cond_broadcast(beam->m_cond); + } + leave_yellow(beam, lock, acquired); + } +} + +static apr_status_t beam_close(h2_bucket_beam *beam) +{ + if (!beam->closed) { + beam->closed = 1; + if (beam->m_cond) { + apr_thread_cond_broadcast(beam->m_cond); + } + } + return APR_SUCCESS; +} + +apr_status_t h2_beam_close(h2_bucket_beam *beam) +{ + apr_thread_mutex_t *lock; + int acquired; + + if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) { + r_purge_reds(beam); + beam_close(beam); + report_consumption(beam); + leave_yellow(beam, lock, acquired); + } + return beam->aborted? APR_ECONNABORTED : APR_SUCCESS; +} + +void h2_beam_shutdown(h2_bucket_beam *beam) +{ + apr_thread_mutex_t *lock; + int acquired; + + if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) { + r_purge_reds(beam); + h2_blist_cleanup(&beam->red); + beam_close(beam); + report_consumption(beam); + leave_yellow(beam, lock, acquired); + } +} + +void h2_beam_reset(h2_bucket_beam *beam) +{ + apr_thread_mutex_t *lock; + int acquired; + + if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) { + beam_cleanup(beam); + beam->closed = beam->close_sent = 0; + beam->sent_bytes = beam->received_bytes = beam->reported_bytes = 0; + leave_yellow(beam, lock, acquired); + } +} + +static apr_status_t append_bucket(h2_bucket_beam *beam, + apr_bucket *bred, + apr_read_type_e block, + apr_pool_t *pool, + apr_thread_mutex_t *lock) +{ + const char *data; + apr_size_t len; + apr_off_t space_left = 0; + apr_status_t status; + + if (APR_BUCKET_IS_METADATA(bred)) { + if (APR_BUCKET_IS_EOS(bred)) { + beam->closed = 1; + } + APR_BUCKET_REMOVE(bred); + H2_BLIST_INSERT_TAIL(&beam->red, bred); + return APR_SUCCESS; + } + else if (APR_BUCKET_IS_FILE(bred)) { + /* file bucket lengths do not really count */ + } + else { + space_left = calc_space_left(beam); + if (space_left > 0 && bred->length == ((apr_size_t)-1)) { + const char *data; + status = apr_bucket_read(bred, &data, &len, APR_BLOCK_READ); + if (status != APR_SUCCESS) { + return status; + } + } + + if (space_left < bred->length) { + status = r_wait_space(beam, block, lock, &space_left); + if (status != APR_SUCCESS) { + return status; + } + if (space_left <= 0) { + return APR_EAGAIN; + } + } + /* space available, maybe need bucket split */ + } + + + /* The fundamental problem is that reading a red bucket from + * a green thread is a total NO GO, because the bucket might use + * its pool/bucket_alloc from a foreign thread and that will + * corrupt. */ + status = APR_ENOTIMPL; + if (beam->closed && bred->length > 0) { + status = APR_EOF; + } + else if (APR_BUCKET_IS_TRANSIENT(bred)) { + /* this takes care of transient buckets and converts them + * into heap ones. Other bucket types might or might not be + * affected by this. */ + status = apr_bucket_setaside(bred, pool); + } + else if (APR_BUCKET_IS_HEAP(bred) || APR_BUCKET_IS_POOL(bred)) { + /* For heap/pool buckets read from a green thread is fine. The + * data will be there and live until the bucket itself is + * destroyed. */ + status = APR_SUCCESS; + } + else if (APR_BUCKET_IS_FILE(bred)) { + /* For file buckets the problem is their internal readpool that + * is used on the first read to allocate buffer/mmap. + * Since setting aside a file bucket will de-register the + * file cleanup function from the previous pool, we need to + * call that from a red thread. Do it now and make our + * yellow pool the owner. + * Additionally, we allow callbacks to prevent beaming file + * handles across. The use case for this is to limit the number + * of open file handles and rather use a less efficient beam + * transport. */ + apr_file_t *fd = ((apr_bucket_file *)bred->data)->fd; + int can_beam = 1; + if (beam->last_beamed != fd && beam->can_beam_fn) { + can_beam = beam->can_beam_fn(beam->can_beam_ctx, beam, fd); + } + if (can_beam) { + beam->last_beamed = fd; + status = apr_bucket_setaside(bred, pool); + } + } + + if (status == APR_ENOTIMPL) { + /* we have no knowledge about the internals of this bucket, + * but on read, it needs to make the data available somehow. + * So we do this while still in a red thread. The data will + * live at least os long as the red bucket itself. */ + if (space_left < APR_BUCKET_BUFF_SIZE) { + space_left = APR_BUCKET_BUFF_SIZE; + } + if (space_left < bred->length) { + apr_bucket_split(bred, space_left); + } + status = apr_bucket_read(bred, &data, &len, APR_BLOCK_READ); + if (status == APR_SUCCESS) { + status = apr_bucket_setaside(bred, pool); + } + } + + if (status != APR_SUCCESS && status != APR_ENOTIMPL) { + return status; + } + + APR_BUCKET_REMOVE(bred); + H2_BLIST_INSERT_TAIL(&beam->red, bred); + beam->sent_bytes += bred->length; + + return APR_SUCCESS; +} + +apr_status_t h2_beam_send(h2_bucket_beam *beam, + apr_bucket_brigade *red_brigade, + apr_read_type_e block) +{ + apr_thread_mutex_t *lock; + apr_bucket *bred; + apr_status_t status = APR_SUCCESS; + int acquired; + + /* Called from the red thread to add buckets to the beam */ + if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) { + r_purge_reds(beam); + + if (beam->aborted) { + status = APR_ECONNABORTED; + } + else if (red_brigade) { + while (!APR_BRIGADE_EMPTY(red_brigade) + && status == APR_SUCCESS) { + bred = APR_BRIGADE_FIRST(red_brigade); + status = append_bucket(beam, bred, block, red_brigade->p, lock); + } + if (beam->m_cond) { + apr_thread_cond_broadcast(beam->m_cond); + } + } + report_consumption(beam); + leave_yellow(beam, lock, acquired); + } + return status; +} + +apr_status_t h2_beam_receive(h2_bucket_beam *beam, + apr_bucket_brigade *bb, + apr_read_type_e block, + apr_off_t readbytes) +{ + apr_thread_mutex_t *lock; + apr_bucket *bred, *bgreen; + int acquired, transferred = 0; + apr_status_t status = APR_SUCCESS; + apr_off_t remain = readbytes; + + /* Called from the green thread to take buckets from the beam */ + if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) { +transfer: + if (beam->aborted) { + status = APR_ECONNABORTED; + goto leave; + } + + /* transfer enough buckets from our green brigade, if we have one */ + while (beam->green + && !APR_BRIGADE_EMPTY(beam->green) + && (readbytes <= 0 || remain >= 0)) { + bgreen = APR_BRIGADE_FIRST(beam->green); + if (readbytes > 0 && bgreen->length > 0 && remain <= 0) { + break; + } + APR_BUCKET_REMOVE(bgreen); + APR_BRIGADE_INSERT_TAIL(bb, bgreen); + remain -= bgreen->length; + ++transferred; + } + + /* transfer from our red brigade, transforming red buckets to + * green ones until we have enough */ + while (!H2_BLIST_EMPTY(&beam->red) && (readbytes <= 0 || remain >= 0)) { + bred = H2_BLIST_FIRST(&beam->red); + bgreen = NULL; + + if (readbytes > 0 && bred->length > 0 && remain <= 0) { + break; + } + + if (APR_BUCKET_IS_METADATA(bred)) { + if (APR_BUCKET_IS_EOS(bred)) { + beam->close_sent = 1; + bgreen = apr_bucket_eos_create(bb->bucket_alloc); + } + else if (APR_BUCKET_IS_FLUSH(bred)) { + bgreen = apr_bucket_flush_create(bb->bucket_alloc); + } + else { + /* put red into hold, no green sent out */ + } + } + else if (APR_BUCKET_IS_FILE(bred)) { + /* This is set aside into the target brigade pool so that + * any read operation messes with that pool and not + * the red one. */ + apr_bucket_file *f = (apr_bucket_file *)bred->data; + apr_file_t *fd = f->fd; + int setaside = (f->readpool != bb->p); + + if (setaside) { + status = apr_file_setaside(&fd, fd, bb->p); + if (status != APR_SUCCESS) { + goto leave; + } + ++beam->files_beamed; + } + apr_brigade_insert_file(bb, fd, bred->start, bred->length, + bb->p); + remain -= bred->length; + ++transferred; + } + else { + /* create a "green" standin bucket. we took care about the + * underlying red bucket and its data when we placed it into + * the red brigade. + * the beam bucket will notify us on destruction that bred is + * no longer needed. */ + bgreen = h2_beam_bucket_create(beam, bred, bb->bucket_alloc); + ++beam->live_beam_buckets; + } + + /* Place the red bucket into our hold, to be destroyed when no + * green bucket references it any more. */ + APR_BUCKET_REMOVE(bred); + H2_BLIST_INSERT_TAIL(&beam->hold, bred); + beam->received_bytes += bred->length; + if (bgreen) { + APR_BRIGADE_INSERT_TAIL(bb, bgreen); + remain -= bgreen->length; + ++transferred; + } + } + + if (readbytes > 0 && remain < 0) { + /* too much, put some back */ + remain = readbytes; + for (bgreen = APR_BRIGADE_FIRST(bb); + bgreen != APR_BRIGADE_SENTINEL(bb); + bgreen = APR_BUCKET_NEXT(bgreen)) { + remain -= bgreen->length; + if (remain < 0) { + apr_bucket_split(bgreen, bgreen->length+remain); + beam->green = apr_brigade_split_ex(bb, + APR_BUCKET_NEXT(bgreen), + beam->green); + break; + } + } + } + + if (transferred) { + status = APR_SUCCESS; + } + else if (beam->closed) { + if (!beam->close_sent) { + apr_bucket *b = apr_bucket_eos_create(bb->bucket_alloc); + APR_BRIGADE_INSERT_TAIL(bb, b); + beam->close_sent = 1; + status = APR_SUCCESS; + } + else { + status = APR_EOF; + } + } + else if (block == APR_BLOCK_READ && lock && beam->m_cond) { + status = wait_cond(beam, lock); + if (status != APR_SUCCESS) { + goto leave; + } + goto transfer; + } + else { + status = APR_EAGAIN; + } +leave: + leave_yellow(beam, lock, acquired); + } + return status; +} + +void h2_beam_on_consumed(h2_bucket_beam *beam, + h2_beam_consumed_callback *cb, void *ctx) +{ + apr_thread_mutex_t *lock; + int acquired; + + if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) { + beam->consumed_fn = cb; + beam->consumed_ctx = ctx; + leave_yellow(beam, lock, acquired); + } +} + +void h2_beam_on_file_beam(h2_bucket_beam *beam, + h2_beam_can_beam_callback *cb, void *ctx) +{ + apr_thread_mutex_t *lock; + int acquired; + + if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) { + beam->can_beam_fn = cb; + beam->can_beam_ctx = ctx; + leave_yellow(beam, lock, acquired); + } +} + + +apr_off_t h2_beam_get_buffered(h2_bucket_beam *beam) +{ + apr_thread_mutex_t *lock; + apr_bucket *b; + apr_off_t l = 0; + int acquired; + + if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) { + for (b = H2_BLIST_FIRST(&beam->red); + b != H2_BLIST_SENTINEL(&beam->red); + b = APR_BUCKET_NEXT(b)) { + /* should all have determinate length */ + l += b->length; + } + leave_yellow(beam, lock, acquired); + } + return l; +} + +int h2_beam_empty(h2_bucket_beam *beam) +{ + apr_thread_mutex_t *lock; + int empty = 1; + int acquired; + + if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) { + empty = (H2_BLIST_EMPTY(&beam->red) + && (!beam->green || APR_BRIGADE_EMPTY(beam->green))); + leave_yellow(beam, lock, acquired); + } + return empty; +} + +int h2_beam_closed(h2_bucket_beam *beam) +{ + return beam->closed; +} + +int h2_beam_was_received(h2_bucket_beam *beam) +{ + apr_thread_mutex_t *lock; + int happend = 0; + int acquired; + + if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) { + happend = (beam->received_bytes > 0); + leave_yellow(beam, lock, acquired); + } + return happend; +} + +apr_size_t h2_beam_get_files_beamed(h2_bucket_beam *beam) +{ + apr_thread_mutex_t *lock; + apr_size_t n = 0; + int acquired; + + if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) { + n = beam->files_beamed; + leave_yellow(beam, lock, acquired); + } + return n; +} + diff --git a/modules/http2/h2_bucket_beam.h b/modules/http2/h2_bucket_beam.h new file mode 100644 index 0000000000..45d98b29cc --- /dev/null +++ b/modules/http2/h2_bucket_beam.h @@ -0,0 +1,297 @@ +/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef h2_bucket_beam_h +#define h2_bucket_beam_h + +struct apr_thread_mutex_t; +struct apr_thread_cond_t; + +/******************************************************************************* + * apr_bucket list without bells and whistles + ******************************************************************************/ + +/** + * h2_blist can hold a list of buckets just like apr_bucket_brigade, but + * does not to any allocations or related features. + */ +typedef struct { + APR_RING_HEAD(h2_bucket_list, apr_bucket) list; +} h2_blist; + +#define H2_BLIST_INIT(b) APR_RING_INIT(&(b)->list, apr_bucket, link); +#define H2_BLIST_SENTINEL(b) APR_RING_SENTINEL(&(b)->list, apr_bucket, link) +#define H2_BLIST_EMPTY(b) APR_RING_EMPTY(&(b)->list, apr_bucket, link) +#define H2_BLIST_FIRST(b) APR_RING_FIRST(&(b)->list) +#define H2_BLIST_LAST(b) APR_RING_LAST(&(b)->list) +#define H2_BLIST_INSERT_HEAD(b, e) do { \ + apr_bucket *ap__b = (e); \ + APR_RING_INSERT_HEAD(&(b)->list, ap__b, apr_bucket, link); \ + } while (0) +#define H2_BLIST_INSERT_TAIL(b, e) do { \ + apr_bucket *ap__b = (e); \ + APR_RING_INSERT_TAIL(&(b)->list, ap__b, apr_bucket, link); \ + } while (0) +#define H2_BLIST_CONCAT(a, b) do { \ + APR_RING_CONCAT(&(a)->list, &(b)->list, apr_bucket, link); \ + } while (0) +#define H2_BLIST_PREPEND(a, b) do { \ + APR_RING_PREPEND(&(a)->list, &(b)->list, apr_bucket, link); \ + } while (0) + +/** + * Print the buckets in the list into the buffer (type and lengths). + * @param buffer the buffer to print into + * @param bmax max number of characters to place in buffer, incl. trailing 0 + * @param tag tag string for this bucket list + * @param sep separator to use + * @param bl the bucket list to print + * @return number of characters printed + */ +apr_size_t h2_util_bl_print(char *buffer, apr_size_t bmax, + const char *tag, const char *sep, + h2_blist *bl); + +/******************************************************************************* + * h2_bucket_beam + ******************************************************************************/ + +/** + * A h2_bucket_beam solves the task of transferring buckets, esp. their data, + * across threads with zero buffer copies. + * + * When a thread, let's call it the red thread, wants to send buckets to + * another, the green thread, it creates a h2_bucket_beam and adds buckets + * via the h2_beam_send(). It gives the beam to the green thread which then + * can receive buckets into its own brigade via h2_beam_receive(). + * + * Sending and receiving can happen concurrently, if a thread mutex is set + * for the beam, see h2_beam_mutex_set. + * + * The beam can limit the amount of data it accepts via the buffer_size. This + * can also be adjusted during its lifetime. When the beam not only gets a + * mutex but als a condition variable (in h2_beam_mutex_set()), sends and + * receives can be done blocking. A timeout can be set for such blocks. + * + * Care needs to be taken when terminating the beam. The beam registers at + * the pool it was created with and will cleanup after itself. However, if + * received buckets do still exist, already freed memory might be accessed. + * The beam does a AP_DEBUG_ASSERT on this condition. + * + * The proper way of shutting down a beam is to first make sure there are no + * more green buckets out there, then cleanup the beam to purge eventually + * still existing red buckets and then, possibly, terminate the beam itself + * (or the pool it was created with). + * + * The following restrictions apply to bucket transport: + * - only EOS and FLUSH meta buckets are copied through. All other meta buckets + * are kept in the beams hold. + * - all kind of data buckets are transported through: + * - transient buckets are converted to heap ones on send + * - heap and pool buckets require no extra handling + * - buckets with indeterminate length are read on send + * - file buckets will transfer the file itself into a new bucket, if allowed + * - all other buckets are read on send to make sure data is present + * + * This assures that when the red thread sends its red buckets, the data + * is made accessible while still on the red side. The red bucket then enters + * the beams hold storage. + * When the green thread calls receive, red buckets in the hold are wrapped + * into special beam buckets. Beam buckets on read present the data directly + * from the internal red one, but otherwise live on the green side. When a + * beam bucket gets destroyed, it notifies its beam that the corresponding + * red bucket from the hold may be destroyed. + * Since the destruction of green buckets happens in the green thread, any + * corresponding red bucket can not immediately be destroyed, as that would + * result in race conditions. + * Instead, the beam transfers such red buckets from the hold to the purge + * storage. Next time there is a call from the red side, the buckets in + * purge will be deleted. + * + * There are callbacks that can be registered with a beam: + * - a "consumed" callback that gets called on the red side with the + * amount of data that has been received by the green side. The amount + * is a delta from the last callback invocation. The red side can trigger + * these callbacks by calling h2_beam_send() with a NULL brigade. + * - a "can_beam_file" callback that can prohibit the transfer of file handles + * through the beam. This will cause file buckets to be read on send and + * its data buffer will then be transports just like a heap bucket would. + * When no callback is registered, no restrictions apply and all files are + * passed through. + * File handles transferred to the green side will stay there until the + * receiving brigade's pool is destroyed/cleared. If the pool lives very + * long or if many different files are beamed, the process might run out + * of available file handles. + * + * The name "beam" of course is inspired by good old transporter + * technology where humans are kept inside the transporter's memory + * buffers until the transmission is complete. Star gates use a similar trick. + */ +typedef struct h2_bucket_beam h2_bucket_beam; + +typedef apr_status_t h2_beam_mutex_enter(void *ctx, + struct apr_thread_mutex_t **plock, + int *acquired); +typedef void h2_beam_mutex_leave(void *ctx, + struct apr_thread_mutex_t *lock, + int acquired); +typedef void h2_beam_consumed_callback(void *ctx, h2_bucket_beam *beam, + apr_off_t bytes); + +typedef int h2_beam_can_beam_callback(void *ctx, h2_bucket_beam *beam, + apr_file_t *file); + +struct h2_bucket_beam { + int id; + const char *tag; + h2_blist red; + h2_blist hold; + h2_blist purge; + apr_bucket_brigade *green; + apr_pool_t *life_pool; + + apr_size_t max_buf_size; + apr_size_t live_beam_buckets; + apr_size_t files_beamed; /* how many file handles have been set aside */ + apr_file_t *last_beamed; /* last file beamed */ + apr_off_t sent_bytes; /* amount of bytes send */ + apr_off_t received_bytes; /* amount of bytes received */ + apr_off_t reported_bytes; /* amount of bytes reported as consumed */ + + unsigned int aborted : 1; + unsigned int closed : 1; + unsigned int close_sent : 1; + + void *m_ctx; + h2_beam_mutex_enter *m_enter; + h2_beam_mutex_leave *m_leave; + struct apr_thread_cond_t *m_cond; + apr_interval_time_t timeout; + + h2_beam_consumed_callback *consumed_fn; + void *consumed_ctx; + h2_beam_can_beam_callback *can_beam_fn; + void *can_beam_ctx; +}; + +/** + * Creates a new bucket beam for transfer of buckets across threads. + * + * The pool the beam is created with will be protected by the given + * mutex and will be used in multiple threads. It needs a pool allocator + * that is only used inside that same mutex. + * + * @param pbeam will hold the created beam on return + * @param life_pool pool for allocating initial structure and cleanups + * @param buffer_size maximum memory footprint of buckets buffered in beam, or + * 0 for no limitation + */ +apr_status_t h2_beam_create(h2_bucket_beam **pbeam, + apr_pool_t *life_pool, + int id, const char *tag, + apr_size_t buffer_size); + +apr_status_t h2_beam_destroy(h2_bucket_beam *beam); + +/** + * Send buckets from the given brigade through the beam. Will hold buckets + * internally as long as they have not been processed by the receiving side. + * All accepted buckets are removed from the given brigade. Will return with + * APR_EAGAIN on non-blocking sends when not all buckets could be accepted. + */ +apr_status_t h2_beam_send(h2_bucket_beam *beam, + apr_bucket_brigade *red_buckets, + apr_read_type_e block); + +/** + * Receive buckets from the beam into the given brigade. Will return APR_EOF + * when reading past an EOS bucket. Reads can be blocking until data is + * available or the beam has been closed. Non-blocking calls return APR_EAGAIN + * if no data is available. + */ +apr_status_t h2_beam_receive(h2_bucket_beam *beam, + apr_bucket_brigade *green_buckets, + apr_read_type_e block, + apr_off_t readbytes); + +void h2_beam_abort(h2_bucket_beam *beam); + +/** + * Close the beam. Does not need to be invoked if certain that an EOS bucket + * has been sent. + */ +apr_status_t h2_beam_close(h2_bucket_beam *beam); + +/** + * Empty the buffer and close. + */ +void h2_beam_shutdown(h2_bucket_beam *beam); + +/** + * Reset the beam to its intial, empty state. + */ +void h2_beam_reset(h2_bucket_beam *beam); + +void h2_beam_mutex_set(h2_bucket_beam *beam, + h2_beam_mutex_enter m_enter, + h2_beam_mutex_leave m_leave, + struct apr_thread_cond_t *cond, + void *m_ctx); + +/** + * Set/get the timeout for blocking read/write operations. Only works + * if a mutex has been set for the beam. + */ +void h2_beam_timeout_set(h2_bucket_beam *beam, + apr_interval_time_t timeout); +apr_interval_time_t h2_beam_timeout_get(h2_bucket_beam *beam); + +/** + * Set/get the maximum buffer size for beam data (memory footprint). + */ +void h2_beam_buffer_size_set(h2_bucket_beam *beam, + apr_size_t buffer_size); +apr_size_t h2_beam_buffer_size_get(h2_bucket_beam *beam); + +/** + * Register a callback to be invoked on the red side with the + * amount of bytes that have been consumed by the red side, since the + * last callback invocation or reset. + * @param beam the beam to set the callback on + * @param cb the callback or NULL + * @param ctx the context to use in callback invocation + */ +void h2_beam_on_consumed(h2_bucket_beam *beam, + h2_beam_consumed_callback *cb, void *ctx); + +void h2_beam_on_file_beam(h2_bucket_beam *beam, + h2_beam_can_beam_callback *cb, void *ctx); + +/** + * Get the amount of bytes currently buffered in the beam (unread). + */ +apr_off_t h2_beam_get_buffered(h2_bucket_beam *beam); + +int h2_beam_closed(h2_bucket_beam *beam); +int h2_beam_empty(h2_bucket_beam *beam); + +/** + * Return != 0 iff (some) data from the beam has been received. + */ +int h2_beam_was_received(h2_bucket_beam *beam); + +apr_size_t h2_beam_get_files_beamed(h2_bucket_beam *beam); + +#endif /* h2_bucket_beam_h */ diff --git a/modules/http2/h2_bucket_eos.c b/modules/http2/h2_bucket_eos.c index 98a0b365c6..3a5b1a570a 100644 --- a/modules/http2/h2_bucket_eos.c +++ b/modules/http2/h2_bucket_eos.c @@ -92,7 +92,7 @@ static void bucket_destroy(void *data) if (apr_bucket_shared_destroy(h)) { h2_stream *stream = h->stream; if (stream) { - h2_stream_cleanup(stream); + h2_stream_eos_destroy(stream); } apr_bucket_free(h); } diff --git a/modules/http2/h2_filter.c b/modules/http2/h2_filter.c index 8bf7fbcb40..30ccb2c43e 100644 --- a/modules/http2/h2_filter.c +++ b/modules/http2/h2_filter.c @@ -208,18 +208,27 @@ static apr_status_t bbout(apr_bucket_brigade *bb, const char *fmt, ...) return rv; } -static apr_status_t h2_sos_h2_status_buffer(h2_sos *sos, apr_bucket_brigade *bb) +static apr_status_t h2_status_stream_filter(h2_stream *stream) { - h2_stream *stream = sos->stream; h2_session *session = stream->session; h2_mplx *mplx = session->mplx; + conn_rec *c = session->c; h2_push_diary *diary; + apr_bucket_brigade *bb; apr_status_t status; - if (!bb) { - bb = apr_brigade_create(stream->pool, session->c->bucket_alloc); + if (!stream->response) { + return APR_EINVAL; } + if (!stream->buffer) { + stream->buffer = apr_brigade_create(stream->pool, c->bucket_alloc); + } + bb = stream->buffer; + + apr_table_unset(stream->response->headers, "Content-Length"); + stream->response->content_length = -1; + bbout(bb, "{\n"); bbout(bb, " \"HTTP2\": \"on\",\n"); bbout(bb, " \"H2PUSH\": \"%s\",\n", h2_session_push_enabled(session)? "on" : "off"); @@ -266,57 +275,15 @@ static apr_status_t h2_sos_h2_status_buffer(h2_sos *sos, apr_bucket_brigade *bb) bbout(bb, " \"bytes_sent\": %"APR_UINT64_T_FMT"\n", session->io.bytes_written); bbout(bb, "}\n"); - return sos->prev->buffer(sos->prev, bb); -} - -static apr_status_t h2_sos_h2_status_read_to(h2_sos *sos, apr_bucket_brigade *bb, - apr_off_t *plen, int *peos) -{ - return sos->prev->read_to(sos->prev, bb, plen, peos); -} - -static apr_status_t h2_sos_h2_status_prepare(h2_sos *sos, apr_off_t *plen, int *peos) -{ - return sos->prev->prepare(sos->prev, plen, peos); -} - -static apr_status_t h2_sos_h2_status_readx(h2_sos *sos, h2_io_data_cb *cb, void *ctx, - apr_off_t *plen, int *peos) -{ - return sos->prev->readx(sos->prev, cb, ctx, plen, peos); -} - -static apr_table_t *h2_sos_h2_status_get_trailers(h2_sos *sos) -{ - return sos->prev->get_trailers(sos->prev); -} - -static h2_sos *h2_sos_h2_status_create(h2_sos *prev) -{ - h2_sos *sos; - h2_response *response = prev->response; - - apr_table_unset(response->headers, "Content-Length"); - response->content_length = -1; - - sos = apr_pcalloc(prev->stream->pool, sizeof(*sos)); - sos->prev = prev; - sos->response = response; - sos->stream = prev->stream; - sos->buffer = h2_sos_h2_status_buffer; - sos->prepare = h2_sos_h2_status_prepare; - sos->readx = h2_sos_h2_status_readx; - sos->read_to = h2_sos_h2_status_read_to; - sos->get_trailers = h2_sos_h2_status_get_trailers; - - return sos; + return APR_SUCCESS; } -h2_sos *h2_filter_sos_create(const char *name, struct h2_sos *prev) +apr_status_t h2_stream_filter(h2_stream *stream) { - if (!strcmp(H2_SOS_H2_STATUS, name)) { - return h2_sos_h2_status_create(prev); + const char *fname = stream->response? stream->response->sos_filter : NULL; + if (fname && !strcmp(H2_SOS_H2_STATUS, fname)) { + return h2_status_stream_filter(stream); } - return prev; + return APR_SUCCESS; } diff --git a/modules/http2/h2_filter.h b/modules/http2/h2_filter.h index 2f281f8be1..5ba7d1581b 100644 --- a/modules/http2/h2_filter.h +++ b/modules/http2/h2_filter.h @@ -43,35 +43,9 @@ apr_status_t h2_filter_core_input(ap_filter_t* filter, apr_read_type_e block, apr_off_t readbytes); -typedef struct h2_sos h2_sos; -typedef apr_status_t h2_sos_data_cb(void *ctx, const char *data, apr_off_t len); - -typedef apr_status_t h2_sos_buffer(h2_sos *sos, apr_bucket_brigade *bb); -typedef apr_status_t h2_sos_prepare(h2_sos *sos, apr_off_t *plen, int *peos); -typedef apr_status_t h2_sos_readx(h2_sos *sos, h2_sos_data_cb *cb, - void *ctx, apr_off_t *plen, int *peos); -typedef apr_status_t h2_sos_read_to(h2_sos *sos, apr_bucket_brigade *bb, - apr_off_t *plen, int *peos); -typedef apr_table_t *h2_sos_get_trailers(h2_sos *sos); - - #define H2_RESP_SOS_NOTE "h2-sos-filter" -struct h2_sos { - struct h2_stream *stream; - h2_sos *prev; - struct h2_response *response; - void *ctx; - h2_sos_buffer *buffer; - h2_sos_prepare *prepare; - h2_sos_readx *readx; - h2_sos_read_to *read_to; - h2_sos_get_trailers *get_trailers; -}; - -h2_sos *h2_filter_sos_create(const char *name, struct h2_sos *prev); - +apr_status_t h2_stream_filter(struct h2_stream *stream); int h2_filter_h2_status_handler(request_rec *r); - #endif /* __mod_h2__h2_filter__ */ diff --git a/modules/http2/h2_from_h1.c b/modules/http2/h2_from_h1.c index 8e1f163a2a..eb866a7911 100644 --- a/modules/http2/h2_from_h1.c +++ b/modules/http2/h2_from_h1.c @@ -31,7 +31,6 @@ #include "h2_response.h" #include "h2_from_h1.h" #include "h2_task.h" -#include "h2_task_output.h" #include "h2_util.h" @@ -473,7 +472,7 @@ static h2_response *create_response(h2_from_h1 *from_h1, request_rec *r) apr_status_t h2_response_output_filter(ap_filter_t *f, apr_bucket_brigade *bb) { h2_task *task = f->ctx; - h2_from_h1 *from_h1 = task->output? task->output->from_h1 : NULL; + h2_from_h1 *from_h1 = task->output.from_h1; request_rec *r = f->r; apr_bucket *b; ap_bucket_error *eb = NULL; @@ -483,7 +482,7 @@ apr_status_t h2_response_output_filter(ap_filter_t *f, apr_bucket_brigade *bb) ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, "h2_from_h1(%d): output_filter called", from_h1->stream_id); - if (r->header_only && task->output && from_h1->response) { + if (r->header_only && from_h1->response) { /* throw away any data after we have compiled the response */ apr_brigade_cleanup(bb); return OK; @@ -552,7 +551,7 @@ apr_status_t h2_response_output_filter(ap_filter_t *f, apr_bucket_brigade *bb) apr_status_t h2_response_trailers_filter(ap_filter_t *f, apr_bucket_brigade *bb) { h2_task *task = f->ctx; - h2_from_h1 *from_h1 = task->output? task->output->from_h1 : NULL; + h2_from_h1 *from_h1 = task->output.from_h1; request_rec *r = f->r; apr_bucket *b; diff --git a/modules/http2/h2_int_queue.c b/modules/http2/h2_int_queue.c deleted file mode 100644 index 472ae34063..0000000000 --- a/modules/http2/h2_int_queue.c +++ /dev/null @@ -1,187 +0,0 @@ -/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <assert.h> -#include <stddef.h> -#include <apr_pools.h> - -#include "h2_int_queue.h" - - -static void tq_grow(h2_int_queue *q, int nlen); -static void tq_swap(h2_int_queue *q, int i, int j); -static int tq_bubble_up(h2_int_queue *q, int i, int top, - h2_iq_cmp *cmp, void *ctx); -static int tq_bubble_down(h2_int_queue *q, int i, int bottom, - h2_iq_cmp *cmp, void *ctx); - -h2_int_queue *h2_iq_create(apr_pool_t *pool, int capacity) -{ - h2_int_queue *q = apr_pcalloc(pool, sizeof(h2_int_queue)); - if (q) { - q->pool = pool; - tq_grow(q, capacity); - q->nelts = 0; - } - return q; -} - -int h2_iq_empty(h2_int_queue *q) -{ - return q->nelts == 0; -} - -int h2_iq_size(h2_int_queue *q) -{ - return q->nelts; -} - - -void h2_iq_add(h2_int_queue *q, int sid, h2_iq_cmp *cmp, void *ctx) -{ - int i; - - if (q->nelts >= q->nalloc) { - tq_grow(q, q->nalloc * 2); - } - - i = (q->head + q->nelts) % q->nalloc; - q->elts[i] = sid; - ++q->nelts; - - if (cmp) { - /* bubble it to the front of the queue */ - tq_bubble_up(q, i, q->head, cmp, ctx); - } -} - -int h2_iq_remove(h2_int_queue *q, int sid) -{ - int i; - for (i = 0; i < q->nelts; ++i) { - if (sid == q->elts[(q->head + i) % q->nalloc]) { - break; - } - } - - if (i < q->nelts) { - ++i; - for (; i < q->nelts; ++i) { - q->elts[(q->head+i-1)%q->nalloc] = q->elts[(q->head+i)%q->nalloc]; - } - --q->nelts; - return 1; - } - return 0; -} - -void h2_iq_clear(h2_int_queue *q) -{ - q->nelts = 0; -} - -void h2_iq_sort(h2_int_queue *q, h2_iq_cmp *cmp, void *ctx) -{ - /* Assume that changes in ordering are minimal. This needs, - * best case, q->nelts - 1 comparisions to check that nothing - * changed. - */ - if (q->nelts > 0) { - int i, ni, prev, last; - - /* Start at the end of the queue and create a tail of sorted - * entries. Make that tail one element longer in each iteration. - */ - last = i = (q->head + q->nelts - 1) % q->nalloc; - while (i != q->head) { - prev = (q->nalloc + i - 1) % q->nalloc; - - ni = tq_bubble_up(q, i, prev, cmp, ctx); - if (ni == prev) { - /* i bubbled one up, bubble the new i down, which - * keeps all tasks below i sorted. */ - tq_bubble_down(q, i, last, cmp, ctx); - } - i = prev; - }; - } -} - - -int h2_iq_shift(h2_int_queue *q) -{ - int sid; - - if (q->nelts <= 0) { - return 0; - } - - sid = q->elts[q->head]; - q->head = (q->head + 1) % q->nalloc; - q->nelts--; - - return sid; -} - -static void tq_grow(h2_int_queue *q, int nlen) -{ - if (nlen > q->nalloc) { - int *nq = apr_pcalloc(q->pool, sizeof(int) * nlen); - if (q->nelts > 0) { - int l = ((q->head + q->nelts) % q->nalloc) - q->head; - - memmove(nq, q->elts + q->head, sizeof(int) * l); - if (l < q->nelts) { - /* elts wrapped, append elts in [0, remain] to nq */ - int remain = q->nelts - l; - memmove(nq + l, q->elts, sizeof(int) * remain); - } - } - q->elts = nq; - q->nalloc = nlen; - q->head = 0; - } -} - -static void tq_swap(h2_int_queue *q, int i, int j) -{ - int x = q->elts[i]; - q->elts[i] = q->elts[j]; - q->elts[j] = x; -} - -static int tq_bubble_up(h2_int_queue *q, int i, int top, - h2_iq_cmp *cmp, void *ctx) -{ - int prev; - while (((prev = (q->nalloc + i - 1) % q->nalloc), i != top) - && (*cmp)(q->elts[i], q->elts[prev], ctx) < 0) { - tq_swap(q, prev, i); - i = prev; - } - return i; -} - -static int tq_bubble_down(h2_int_queue *q, int i, int bottom, - h2_iq_cmp *cmp, void *ctx) -{ - int next; - while (((next = (q->nalloc + i + 1) % q->nalloc), i != bottom) - && (*cmp)(q->elts[i], q->elts[next], ctx) > 0) { - tq_swap(q, next, i); - i = next; - } - return i; -} diff --git a/modules/http2/h2_int_queue.h b/modules/http2/h2_int_queue.h deleted file mode 100644 index 69f1e1c982..0000000000 --- a/modules/http2/h2_int_queue.h +++ /dev/null @@ -1,108 +0,0 @@ -/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef __mod_h2__h2_int_queue__ -#define __mod_h2__h2_int_queue__ - -/** - * h2_int_queue keeps a list of sorted h2_task* in ascending order. - */ -typedef struct h2_int_queue h2_int_queue; - -struct h2_int_queue { - int *elts; - int head; - int nelts; - int nalloc; - apr_pool_t *pool; -}; - -/** - * Comparator for two task to determine their order. - * - * @param s1 stream id to compare - * @param s2 stream id to compare - * @param ctx provided user data - * @return value is the same as for strcmp() and has the effect: - * == 0: s1 and s2 are treated equal in ordering - * < 0: s1 should be sorted before s2 - * > 0: s2 should be sorted before s1 - */ -typedef int h2_iq_cmp(int s1, int s2, void *ctx); - - -/** - * Allocate a new queue from the pool and initialize. - * @param id the identifier of the queue - * @param pool the memory pool - */ -h2_int_queue *h2_iq_create(apr_pool_t *pool, int capacity); - -/** - * Return != 0 iff there are no tasks in the queue. - * @param q the queue to check - */ -int h2_iq_empty(h2_int_queue *q); - -/** - * Return the number of int in the queue. - * @param q the queue to get size on - */ -int h2_iq_size(h2_int_queue *q); - -/** - * Add a stream idto the queue. - * - * @param q the queue to append the task to - * @param sid the stream id to add - * @param cmp the comparator for sorting - * @param ctx user data for comparator - */ -void h2_iq_add(h2_int_queue *q, int sid, h2_iq_cmp *cmp, void *ctx); - -/** - * Remove the stream id from the queue. Return != 0 iff task - * was found in queue. - * @param q the task queue - * @param sid the stream id to remove - * @return != 0 iff task was found in queue - */ -int h2_iq_remove(h2_int_queue *q, int sid); - -/** - * Remove all entries in the queue. - */ -void h2_iq_clear(h2_int_queue *q); - -/** - * Sort the stream idqueue again. Call if the task ordering - * has changed. - * - * @param q the queue to sort - * @param cmp the comparator for sorting - * @param ctx user data for the comparator - */ -void h2_iq_sort(h2_int_queue *q, h2_iq_cmp *cmp, void *ctx); - -/** - * Get the first stream id from the queue or NULL if the queue is empty. - * The task will be removed. - * - * @param q the queue to get the first task from - * @return the first stream id of the queue, 0 if empty - */ -int h2_iq_shift(h2_int_queue *q); - -#endif /* defined(__mod_h2__h2_int_queue__) */ diff --git a/modules/http2/h2_io.c b/modules/http2/h2_io.c index 6c140a0893..395c34e502 100644 --- a/modules/http2/h2_io.c +++ b/modules/http2/h2_io.c @@ -26,6 +26,7 @@ #include <http_request.h> #include "h2_private.h" +#include "h2_bucket_beam.h" #include "h2_h2.h" #include "h2_io.h" #include "h2_mplx.h" @@ -34,66 +35,31 @@ #include "h2_task.h" #include "h2_util.h" -h2_io *h2_io_create(int id, apr_pool_t *pool, - apr_bucket_alloc_t *bucket_alloc, - const h2_request *request) +h2_io *h2_io_create(int id, apr_pool_t *pool, const h2_request *request) { h2_io *io = apr_pcalloc(pool, sizeof(*io)); if (io) { io->id = id; io->pool = pool; - io->bucket_alloc = bucket_alloc; io->request = request; + if (request->body) { + h2_beam_create(&io->beam_in, pool, id, "input", 0); + } } return io; } -static void check_bbin(h2_io *io) -{ - if (!io->bbin) { - io->bbin = apr_brigade_create(io->pool, io->bucket_alloc); - } -} - -static void check_bbout(h2_io *io) -{ - if (!io->bbout) { - io->bbout = apr_brigade_create(io->pool, io->bucket_alloc); - } -} - -static void check_bbtmp(h2_io *io) -{ - if (!io->bbtmp) { - io->bbtmp = apr_brigade_create(io->pool, io->bucket_alloc); - } -} - -static void append_eos(h2_io *io, apr_bucket_brigade *bb) -{ - APR_BRIGADE_INSERT_TAIL(bb, apr_bucket_eos_create(io->bucket_alloc)); -} - void h2_io_redo(h2_io *io) { io->worker_started = 0; io->response = NULL; io->rst_error = 0; - if (io->bbin) { - apr_brigade_cleanup(io->bbin); - } - if (io->bbout) { - apr_brigade_cleanup(io->bbout); - } - if (io->bbtmp) { - apr_brigade_cleanup(io->bbtmp); - } io->started_at = io->done_at = 0; } -int h2_io_is_repeatable(h2_io *io) { +int h2_io_can_redo(h2_io *io) { if (io->submitted - || io->input_consumed > 0 + || (io->beam_in && h2_beam_was_received(io->beam_in)) || !io->request) { /* cannot repeat that. */ return 0; @@ -103,351 +69,40 @@ int h2_io_is_repeatable(h2_io *io) { || !strcmp("OPTIONS", io->request->method)); } -void h2_io_set_response(h2_io *io, h2_response *response) +void h2_io_set_response(h2_io *io, h2_response *response, + h2_bucket_beam *output) { - AP_DEBUG_ASSERT(io->pool); AP_DEBUG_ASSERT(response); AP_DEBUG_ASSERT(!io->response); - /* we used to clone the response into the io->pool. But + /* we used to clone the response into out own pool. But * we have much tighter control over the EOR bucket nowadays, * so just use the instance given */ io->response = response; + if (output) { + io->beam_out = output; + } if (response->rst_error) { h2_io_rst(io, response->rst_error); } - else if (response->content_length == 0) { - io->eos_out = 1; - } } void h2_io_rst(h2_io *io, int error) { io->rst_error = error; - io->eos_in = 1; -} - -int h2_io_out_has_data(h2_io *io) -{ - return io->bbout && h2_util_bb_has_data_or_eos(io->bbout); -} - -apr_off_t h2_io_out_length(h2_io *io) -{ - if (io->bbout) { - apr_off_t len = 0; - apr_brigade_length(io->bbout, 0, &len); - return (len > 0)? len : 0; - } - return 0; -} - -apr_status_t h2_io_in_shutdown(h2_io *io) -{ - if (io->bbin) { - apr_off_t end_len = 0; - apr_brigade_length(io->bbin, 1, &end_len); - io->input_consumed += end_len; - apr_brigade_cleanup(io->bbin); - } - return h2_io_in_close(io); -} - - -void h2_io_signal_init(h2_io *io, h2_io_op op, apr_interval_time_t timeout, - apr_thread_cond_t *cond) -{ - io->timed_op = op; - io->timed_cond = cond; - if (timeout > 0) { - io->timeout_at = apr_time_now() + timeout; - } - else { - io->timeout_at = 0; - } -} - -void h2_io_signal_exit(h2_io *io) -{ - io->timed_cond = NULL; - io->timeout_at = 0; -} - -apr_status_t h2_io_signal_wait(h2_mplx *m, h2_io *io) -{ - apr_status_t status; - - if (io->timeout_at != 0) { - status = apr_thread_cond_timedwait(io->timed_cond, m->lock, io->timeout_at); - if (APR_STATUS_IS_TIMEUP(status)) { - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c, APLOGNO(03055) - "h2_mplx(%ld-%d): stream timeout expired: %s", - m->id, io->id, - (io->timed_op == H2_IO_READ)? "read" : "write"); - h2_io_rst(io, H2_ERR_CANCEL); - } - } - else { - apr_thread_cond_wait(io->timed_cond, m->lock); - status = APR_SUCCESS; - } - if (io->orphaned && status == APR_SUCCESS) { - return APR_ECONNABORTED; - } - return status; -} - -void h2_io_signal(h2_io *io, h2_io_op op) -{ - if (io->timed_cond && (io->timed_op == op || H2_IO_ANY == op)) { - apr_thread_cond_signal(io->timed_cond); - } -} - -void h2_io_make_orphaned(h2_io *io, int error) -{ - io->orphaned = 1; - if (error) { - h2_io_rst(io, error); - } - /* if someone is waiting, wake him up */ - h2_io_signal(io, H2_IO_ANY); -} - -static int add_trailer(void *ctx, const char *key, const char *value) -{ - apr_bucket_brigade *bb = ctx; - apr_status_t status; - - status = apr_brigade_printf(bb, NULL, NULL, "%s: %s\r\n", - key, value); - return (status == APR_SUCCESS); -} - -static apr_status_t in_append_eos(h2_io *io, apr_bucket_brigade *bb, - apr_table_t *trailers) -{ - apr_status_t status = APR_SUCCESS; - apr_table_t *t = io->request->trailers; - - if (trailers && t && !apr_is_empty_table(trailers)) { - /* trailers passed in, transfer directly. */ - apr_table_overlap(trailers, t, APR_OVERLAP_TABLES_SET); - t = NULL; - } - - if (io->request->chunked) { - if (t && !apr_is_empty_table(t)) { - /* no trailers passed in, transfer via chunked */ - status = apr_brigade_puts(bb, NULL, NULL, "0\r\n"); - apr_table_do(add_trailer, bb, t, NULL); - status = apr_brigade_puts(bb, NULL, NULL, "\r\n"); - } - else { - status = apr_brigade_puts(bb, NULL, NULL, "0\r\n\r\n"); - } - } - append_eos(io, bb); - return status; -} - -apr_status_t h2_io_in_read(h2_io *io, apr_bucket_brigade *bb, - apr_size_t maxlen, apr_table_t *trailers) -{ - apr_off_t start_len = 0; - apr_status_t status; - - if (io->rst_error) { - return APR_ECONNABORTED; - } - - if (!io->bbin || APR_BRIGADE_EMPTY(io->bbin)) { - if (io->eos_in) { - if (!io->eos_in_written) { - status = in_append_eos(io, bb, trailers); - io->eos_in_written = 1; - return status; - } - return APR_EOF; - } - return APR_EAGAIN; - } - - if (io->request->chunked) { - /* the reader expects HTTP/1.1 chunked encoding */ - check_bbtmp(io); - status = h2_util_move(io->bbtmp, io->bbin, maxlen, NULL, "h2_io_in_read_chunk"); - if (status == APR_SUCCESS) { - apr_off_t tmp_len = 0; - - apr_brigade_length(io->bbtmp, 1, &tmp_len); - if (tmp_len > 0) { - io->input_consumed += tmp_len; - status = apr_brigade_printf(bb, NULL, NULL, "%lx\r\n", - (unsigned long)tmp_len); - if (status == APR_SUCCESS) { - status = h2_util_move(bb, io->bbtmp, -1, NULL, "h2_io_in_read_tmp1"); - if (status == APR_SUCCESS) { - status = apr_brigade_puts(bb, NULL, NULL, "\r\n"); - } - } - } - else { - status = h2_util_move(bb, io->bbtmp, -1, NULL, "h2_io_in_read_tmp2"); - } - apr_brigade_cleanup(io->bbtmp); - } - } - else { - apr_brigade_length(bb, 1, &start_len); - - status = h2_util_move(bb, io->bbin, maxlen, NULL, "h2_io_in_read"); - if (status == APR_SUCCESS) { - apr_off_t end_len = 0; - apr_brigade_length(bb, 1, &end_len); - io->input_consumed += (end_len - start_len); - } - } - - if (status == APR_SUCCESS && (!io->bbin || APR_BRIGADE_EMPTY(io->bbin))) { - if (io->eos_in) { - if (!io->eos_in_written) { - status = in_append_eos(io, bb, trailers); - io->eos_in_written = 1; - } - } - } - - if (status == APR_SUCCESS && APR_BRIGADE_EMPTY(bb)) { - return APR_EAGAIN; - } - return status; -} - -apr_status_t h2_io_in_write(h2_io *io, const char *d, apr_size_t len, int eos) -{ - if (io->rst_error) { - return APR_ECONNABORTED; - } - - if (io->eos_in) { - return APR_EOF; + if (io->beam_in) { + h2_beam_abort(io->beam_in); } - if (eos) { - io->eos_in = 1; - } - if (len > 0) { - check_bbin(io); - return apr_brigade_write(io->bbin, NULL, NULL, d, len); - } - return APR_SUCCESS; -} - -apr_status_t h2_io_in_close(h2_io *io) -{ - if (io->rst_error) { - return APR_ECONNABORTED; + if (io->beam_out) { + h2_beam_abort(io->beam_out); } - - io->eos_in = 1; - return APR_SUCCESS; } -apr_status_t h2_io_out_get_brigade(h2_io *io, apr_bucket_brigade *bb, - apr_off_t len) +void h2_io_shutdown(h2_io *io) { - if (io->rst_error) { - return APR_ECONNABORTED; - } - if (io->eos_out_read) { - return APR_EOF; - } - else if (!io->bbout || APR_BRIGADE_EMPTY(io->bbout)) { - return APR_EAGAIN; + if (io->beam_in) { + h2_beam_shutdown(io->beam_in); } - else { - apr_status_t status; - apr_off_t pre_len, post_len; - /* Allow file handles pass through without limits. If they - * already have the lifetime of this stream, we might as well - * pass them on to the master connection */ - apr_size_t files = INT_MAX; - - apr_brigade_length(bb, 0, &pre_len); - status = h2_util_move(bb, io->bbout, len, &files, "h2_io_read_to"); - if (status == APR_SUCCESS && io->eos_out - && APR_BRIGADE_EMPTY(io->bbout)) { - io->eos_out_read = 1; - } - apr_brigade_length(bb, 0, &post_len); - io->output_consumed += (post_len - pre_len); - return status; - } -} - -apr_status_t h2_io_out_write(h2_io *io, apr_bucket_brigade *bb, - apr_size_t maxlen, - apr_size_t *pfile_buckets_allowed) -{ - apr_status_t status; - apr_bucket *b; - int start_allowed; - - if (io->rst_error) { - return APR_ECONNABORTED; - } - - /* Filter the EOR bucket and set it aside. We prefer to tear down - * the request when the whole h2 stream is done */ - for (b = APR_BRIGADE_FIRST(bb); - b != APR_BRIGADE_SENTINEL(bb); - b = APR_BUCKET_NEXT(b)) - { - if (AP_BUCKET_IS_EOR(b)) { - APR_BUCKET_REMOVE(b); - io->eor = b; - break; - } - else if (APR_BUCKET_IS_EOS(b)) { - io->eos_out = 1; - break; - } - } - - /* Let's move the buckets from the request processing in here, so - * that the main thread can read them when it has time/capacity. - * - * Move at most "maxlen" memory bytes. If buckets remain, it is - * the caller's responsibility to take care of this. - * - * We allow passing of file buckets as long as we do not have too - * many open files already buffered. Otherwise we will run out of - * file handles. - */ - check_bbout(io); - start_allowed = *pfile_buckets_allowed; - status = h2_util_move(io->bbout, bb, maxlen, pfile_buckets_allowed, - "h2_io_out_write"); - /* track # file buckets moved into our pool */ - if (start_allowed != *pfile_buckets_allowed) { - io->files_handles_owned += (start_allowed - *pfile_buckets_allowed); - } - return status; -} - - -apr_status_t h2_io_out_close(h2_io *io) -{ - if (io->rst_error) { - return APR_ECONNABORTED; - } - if (!io->eos_out_read) { /* EOS has not been read yet */ - if (!io->eos_out) { - check_bbout(io); - io->eos_out = 1; - if (!h2_util_has_eos(io->bbout, -1)) { - append_eos(io, io->bbout); - } - } + if (io->beam_out) { + h2_beam_shutdown(io->beam_out); } - return APR_SUCCESS; } diff --git a/modules/http2/h2_io.h b/modules/http2/h2_io.h index d700f6f322..bfd130eaba 100644 --- a/modules/http2/h2_io.h +++ b/modules/http2/h2_io.h @@ -16,6 +16,7 @@ #ifndef __mod_h2__h2_io__ #define __mod_h2__h2_io__ +struct h2_bucket_beam; struct h2_response; struct apr_thread_cond_t; struct h2_mplx; @@ -37,40 +38,23 @@ typedef struct h2_io h2_io; struct h2_io { int id; /* stream identifier */ - apr_pool_t *pool; /* stream pool */ - apr_bucket_alloc_t *bucket_alloc; + apr_pool_t *pool; /* io pool */ - const struct h2_request *request;/* request on this io */ - struct h2_response *response; /* response to request */ - int rst_error; /* h2 related stream abort error */ + const struct h2_request *request;/* request to process */ + struct h2_response *response; /* response to submit */ + + struct h2_bucket_beam *beam_in; /* request body buckets */ + struct h2_bucket_beam *beam_out; /* response body buckets */ - apr_bucket *eor; /* the EOR bucket, set aside */ struct h2_task *task; /* the task once started */ + apr_time_t started_at; /* when processing started */ + apr_time_t done_at; /* when processing was done */ - apr_bucket_brigade *bbin; /* input data for stream */ - apr_bucket_brigade *bbout; /* output data from stream */ - apr_bucket_brigade *bbtmp; /* temporary data for chunking */ - + int rst_error; /* h2 related stream abort error */ unsigned int orphaned : 1; /* h2_stream is gone for this io */ + unsigned int submitted : 1; /* response has been submitted to client */ unsigned int worker_started : 1; /* h2_worker started processing for this io */ unsigned int worker_done : 1; /* h2_worker finished for this io */ - unsigned int submitted : 1; /* response has been submitted to client */ - unsigned int request_body : 1; /* iff request has body */ - unsigned int eos_in : 1; /* input eos has been seen */ - unsigned int eos_in_written : 1; /* input eos has been forwarded */ - unsigned int eos_out : 1; /* output eos is present */ - unsigned int eos_out_read : 1; /* output eos has been forwarded */ - - h2_io_op timed_op; /* which operation is waited on, if any */ - struct apr_thread_cond_t *timed_cond; /* condition to wait on, maybe NULL */ - apr_time_t timeout_at; /* when IO wait will time out */ - - apr_time_t started_at; /* when processing started */ - apr_time_t done_at; /* when processing was done */ - apr_size_t input_consumed; /* how many bytes have been read */ - apr_size_t output_consumed; /* how many bytes have been written out */ - - int files_handles_owned; }; /******************************************************************************* @@ -80,96 +64,25 @@ struct h2_io { /** * Creates a new h2_io for the given stream id. */ -h2_io *h2_io_create(int id, apr_pool_t *pool, - apr_bucket_alloc_t *bucket_alloc, - const struct h2_request *request); +h2_io *h2_io_create(int id, apr_pool_t *pool, const struct h2_request *request); /** * Set the response of this stream. */ -void h2_io_set_response(h2_io *io, struct h2_response *response); +void h2_io_set_response(h2_io *io, struct h2_response *response, + struct h2_bucket_beam *output); /** * Reset the stream with the given error code. */ void h2_io_rst(h2_io *io, int error); -int h2_io_is_repeatable(h2_io *io); +int h2_io_can_redo(h2_io *io); void h2_io_redo(h2_io *io); /** - * Output data is available. - */ -int h2_io_out_has_data(h2_io *io); - -void h2_io_signal(h2_io *io, h2_io_op op); -void h2_io_signal_init(h2_io *io, h2_io_op op, apr_interval_time_t timeout, - struct apr_thread_cond_t *cond); -void h2_io_signal_exit(h2_io *io); -apr_status_t h2_io_signal_wait(struct h2_mplx *m, h2_io *io); - -void h2_io_make_orphaned(h2_io *io, int error); - -/******************************************************************************* - * Input handling of streams. - ******************************************************************************/ -/** - * Reads the next bucket from the input. Returns APR_EAGAIN if none - * is currently available, APR_EOF if end of input has been reached. - */ -apr_status_t h2_io_in_read(h2_io *io, apr_bucket_brigade *bb, - apr_size_t maxlen, apr_table_t *trailers); - -/** - * Appends given bucket to the input. - */ -apr_status_t h2_io_in_write(h2_io *io, const char *d, apr_size_t len, int eos); - -/** - * Closes the input. After existing data has been read, APR_EOF will - * be returned. + * Shuts all input/output down. Clears any buckets buffered and closes. */ -apr_status_t h2_io_in_close(h2_io *io); - -/** - * Shuts all input down. Will close input and mark any data buffered - * as consumed. - */ -apr_status_t h2_io_in_shutdown(h2_io *io); - -/******************************************************************************* - * Output handling of streams. - ******************************************************************************/ - -/** - * Read a bucket from the output head. Return APR_EAGAIN if non is available, - * APR_EOF if none available and output has been closed. - * May be called with buffer == NULL in order to find out how much data - * is available. - * @param io the h2_io to read output from - * @param buffer the buffer to copy the data to, may be NULL - * @param plen the requested max len, set to amount of data on return - * @param peos != 0 iff the end of stream has been reached - */ -apr_status_t h2_io_out_get_brigade(h2_io *io, - apr_bucket_brigade *bb, - apr_off_t len); - -apr_status_t h2_io_out_write(h2_io *io, apr_bucket_brigade *bb, - apr_size_t maxlen, - apr_size_t *pfile_buckets_allowed); - -/** - * Closes the input. After existing data has been read, APR_EOF will - * be returned. - */ -apr_status_t h2_io_out_close(h2_io *io); - -/** - * Gives the overall length of the data that is currently queued for - * output. - */ -apr_off_t h2_io_out_length(h2_io *io); - +void h2_io_shutdown(h2_io *io); #endif /* defined(__mod_h2__h2_io__) */ diff --git a/modules/http2/h2_io_set.c b/modules/http2/h2_io_set.c deleted file mode 100644 index e09497954f..0000000000 --- a/modules/http2/h2_io_set.c +++ /dev/null @@ -1,159 +0,0 @@ -/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <assert.h> -#include <stddef.h> - -#include <apr_strings.h> - -#include <httpd.h> -#include <http_core.h> -#include <http_connection.h> -#include <http_log.h> - -#include "h2_private.h" -#include "h2_io.h" -#include "h2_io_set.h" - -#define h2_io_IDX(list, i) ((h2_io**)(list)->elts)[i] - -struct h2_io_set { - apr_array_header_t *list; -}; - -h2_io_set *h2_io_set_create(apr_pool_t *pool) -{ - h2_io_set *sp = apr_pcalloc(pool, sizeof(h2_io_set)); - if (sp) { - sp->list = apr_array_make(pool, 100, sizeof(h2_io*)); - if (!sp->list) { - return NULL; - } - } - return sp; -} - -static int h2_stream_id_cmp(const void *s1, const void *s2) -{ - h2_io **pio1 = (h2_io **)s1; - h2_io **pio2 = (h2_io **)s2; - return (*pio1)->id - (*pio2)->id; -} - -h2_io *h2_io_set_get(h2_io_set *sp, int stream_id) -{ - /* we keep the array sorted by id, so lookup can be done - * by bsearch. - */ - h2_io **ps; - h2_io key; - h2_io *pkey = &key; - - memset(&key, 0, sizeof(key)); - key.id = stream_id; - ps = bsearch(&pkey, sp->list->elts, sp->list->nelts, - sp->list->elt_size, h2_stream_id_cmp); - return ps? *ps : NULL; -} - -static void h2_io_set_sort(h2_io_set *sp) -{ - qsort(sp->list->elts, sp->list->nelts, sp->list->elt_size, - h2_stream_id_cmp); -} - -apr_status_t h2_io_set_add(h2_io_set *sp, h2_io *io) -{ - h2_io *existing = h2_io_set_get(sp, io->id); - if (!existing) { - int last; - APR_ARRAY_PUSH(sp->list, h2_io*) = io; - /* Normally, streams get added in ascending order if id. We - * keep the array sorted, so we just need to check if the newly - * appended stream has a lower id than the last one. if not, - * sorting is not necessary. - */ - last = sp->list->nelts - 1; - if (last > 0 - && (h2_io_IDX(sp->list, last)->id - < h2_io_IDX(sp->list, last-1)->id)) { - h2_io_set_sort(sp); - } - } - return APR_SUCCESS; -} - -static void remove_idx(h2_io_set *sp, int idx) -{ - int n; - --sp->list->nelts; - n = sp->list->nelts - idx; - if (n > 0) { - /* There are n h2_io* behind idx. Move the rest down */ - h2_io **selts = (h2_io**)sp->list->elts; - memmove(selts + idx, selts + idx + 1, n * sizeof(h2_io*)); - } -} - -h2_io *h2_io_set_remove(h2_io_set *sp, h2_io *io) -{ - int i; - for (i = 0; i < sp->list->nelts; ++i) { - h2_io *e = h2_io_IDX(sp->list, i); - if (e->id == io->id) { - remove_idx(sp, i); - return e; - } - } - return NULL; -} - -h2_io *h2_io_set_shift(h2_io_set *set) -{ - /* For now, this just removes the first element in the set. - * the name is misleading... - */ - if (set->list->nelts > 0) { - h2_io *io = h2_io_IDX(set->list, 0); - remove_idx(set, 0); - return io; - } - return NULL; -} - -int h2_io_set_is_empty(h2_io_set *sp) -{ - AP_DEBUG_ASSERT(sp); - return sp->list->nelts == 0; -} - -int h2_io_set_iter(h2_io_set *sp, - h2_io_set_iter_fn *iter, void *ctx) -{ - int i; - for (i = 0; i < sp->list->nelts; ++i) { - h2_io *s = h2_io_IDX(sp->list, i); - if (!iter(ctx, s)) { - return 0; - } - } - return 1; -} - -apr_size_t h2_io_set_size(h2_io_set *sp) -{ - return sp->list->nelts; -} - diff --git a/modules/http2/h2_io_set.h b/modules/http2/h2_io_set.h deleted file mode 100644 index 936e725222..0000000000 --- a/modules/http2/h2_io_set.h +++ /dev/null @@ -1,53 +0,0 @@ -/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef __mod_h2__h2_io_set__ -#define __mod_h2__h2_io_set__ - -struct h2_io; - -/** - * A set of h2_io instances. Allows lookup by stream id - * and other criteria. - */ -typedef struct h2_io_set h2_io_set; - -h2_io_set *h2_io_set_create(apr_pool_t *pool); - -apr_status_t h2_io_set_add(h2_io_set *set, struct h2_io *io); -h2_io *h2_io_set_get(h2_io_set *set, int stream_id); -h2_io *h2_io_set_remove(h2_io_set *set, struct h2_io *io); - -int h2_io_set_is_empty(h2_io_set *set); -apr_size_t h2_io_set_size(h2_io_set *set); - - -typedef int h2_io_set_iter_fn(void *ctx, struct h2_io *io); - -/** - * Iterator over all h2_io* in the set or until a - * callback returns 0. It is not safe to add or remove - * set members during iteration. - * - * @param set the set of h2_io to iterate over - * @param iter the function to call for each io - * @param ctx user data for the callback - * @return 1 iff iteration completed for all members - */ -int h2_io_set_iter(h2_io_set *set, h2_io_set_iter_fn *iter, void *ctx); - -h2_io *h2_io_set_shift(h2_io_set *set); - -#endif /* defined(__mod_h2__h2_io_set__) */ diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index 6513a75415..d75188a78e 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -29,38 +29,40 @@ #include "mod_http2.h" #include "h2_private.h" +#include "h2_bucket_beam.h" #include "h2_config.h" #include "h2_conn.h" #include "h2_ctx.h" #include "h2_h2.h" -#include "h2_int_queue.h" #include "h2_io.h" -#include "h2_io_set.h" #include "h2_response.h" #include "h2_mplx.h" #include "h2_ngn_shed.h" #include "h2_request.h" #include "h2_stream.h" #include "h2_task.h" -#include "h2_task_input.h" -#include "h2_task_output.h" #include "h2_worker.h" #include "h2_workers.h" #include "h2_util.h" -#define H2_MPLX_IO_OUT(lvl,m,io,msg) \ - do { \ - if (APLOG_C_IS_LEVEL((m)->c,lvl)) \ - h2_util_bb_log((m)->c,(io)->id,lvl,msg,(io)->bbout); \ - } while(0) - -#define H2_MPLX_IO_IN(lvl,m,io,msg) \ - do { \ - if (APLOG_C_IS_LEVEL((m)->c,lvl)) \ - h2_util_bb_log((m)->c,(io)->id,lvl,msg,(io)->bbin); \ - } while(0) - +static void h2_beam_log(h2_bucket_beam *beam, int id, const char *msg, + conn_rec *c, int level) +{ + if (beam && APLOG_C_IS_LEVEL(c,level)) { + char buffer[2048]; + apr_size_t off = 0; + + off += apr_snprintf(buffer+off, H2_ALEN(buffer)-off, "cl=%d, ", beam->closed); + off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "red", ", ", &beam->red); + off += h2_util_bb_print(buffer+off, H2_ALEN(buffer)-off, "green", ", ", beam->green); + off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "hold", ", ", &beam->hold); + off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "purge", "", &beam->purge); + + ap_log_cerror(APLOG_MARK, level, 0, c, "beam(%ld-%d): %s %s", + c->id, id, msg, buffer); + } +} /* NULL or the mutex hold by this thread, used for recursive calls */ @@ -104,13 +106,51 @@ static void leave_mutex(h2_mplx *m, int acquired) } } -static int is_aborted(h2_mplx *m, apr_status_t *pstatus) +static apr_status_t io_mutex_enter(void *ctx, + apr_thread_mutex_t **plock, int *acquired) { - AP_DEBUG_ASSERT(m); - if (m->aborted) { - *pstatus = APR_ECONNABORTED; + h2_mplx *m = ctx; + *plock = m->lock; + return enter_mutex(m, acquired); +} + +static void io_mutex_leave(void *ctx, apr_thread_mutex_t *lock, int acquired) +{ + h2_mplx *m = ctx; + leave_mutex(m, acquired); +} + +static void stream_output_consumed(void *ctx, + h2_bucket_beam *beam, apr_off_t length) +{ + h2_io *io = ctx; + if (length > 0 && io->task && io->task->assigned) { + h2_req_engine_out_consumed(io->task->assigned, io->task->c, length); + } +} + +static void stream_input_consumed(void *ctx, + h2_bucket_beam *beam, apr_off_t length) +{ + h2_mplx *m = ctx; + if (m->input_consumed && length) { + m->input_consumed(m->input_consumed_ctx, beam->id, length); + } +} + +static int can_beam_file(void *ctx, h2_bucket_beam *beam, apr_file_t *file) +{ + h2_mplx *m = ctx; + if (m->tx_handles_reserved > 0) { + --m->tx_handles_reserved; + ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, + "h2_mplx(%ld-%d): beaming file %s, tx_avail %d", + m->id, beam->id, beam->tag, m->tx_handles_reserved); return 1; } + ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, + "h2_mplx(%ld-%d): can_beam_file denied on %s", + m->id, beam->id, beam->tag); return 0; } @@ -118,9 +158,9 @@ static void have_out_data_for(h2_mplx *m, int stream_id); static void check_tx_reservation(h2_mplx *m) { - if (m->tx_handles_reserved == 0) { + if (m->tx_handles_reserved <= 0) { m->tx_handles_reserved += h2_workers_tx_reserve(m->workers, - H2MIN(m->tx_chunk_size, h2_io_set_size(m->stream_ios))); + H2MIN(m->tx_chunk_size, h2_ilist_count(m->stream_ios))); } } @@ -132,7 +172,7 @@ static void check_tx_free(h2_mplx *m) h2_workers_tx_free(m->workers, count); } else if (m->tx_handles_reserved - && (!m->stream_ios || h2_io_set_is_empty(m->stream_ios))) { + && (!m->stream_ios || h2_ilist_empty(m->stream_ios))) { h2_workers_tx_free(m->workers, m->tx_handles_reserved); m->tx_handles_reserved = 0; } @@ -143,7 +183,7 @@ static void h2_mplx_destroy(h2_mplx *m) AP_DEBUG_ASSERT(m); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld): destroy, ios=%d", - m->id, (int)h2_io_set_size(m->stream_ios)); + m->id, (int)h2_ilist_count(m->stream_ios)); check_tx_free(m); if (m->pool) { apr_pool_destroy(m->pool); @@ -205,8 +245,8 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, m->max_streams = h2_config_geti(conf, H2_CONF_MAX_STREAMS); m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM); m->q = h2_iq_create(m->pool, m->max_streams); - m->stream_ios = h2_io_set_create(m->pool); - m->ready_ios = h2_io_set_create(m->pool); + m->stream_ios = h2_ilist_create(m->pool); + m->ready_ios = h2_ilist_create(m->pool); m->stream_timeout = stream_timeout; m->workers = workers; m->workers_max = workers->max_workers; @@ -240,49 +280,29 @@ apr_uint32_t h2_mplx_shutdown(h2_mplx *m) return max_stream_started; } -static void workers_register(h2_mplx *m) -{ - /* h2_workers is only a hub for all the h2_worker instances. - * At the end-of-life of this h2_mplx, we always unregister at - * the workers. The thing to manage are all the h2_worker instances - * out there. Those may hold a reference to this h2_mplx and we cannot - * call them to unregister. - * - * Therefore: ref counting for h2_workers in not needed, ref counting - * for h2_worker using this is critical. - */ - m->need_registration = 0; - h2_workers_register(m->workers, m); -} - -static int io_in_consumed_signal(h2_mplx *m, h2_io *io) +static void io_in_consumed_signal(h2_mplx *m, h2_io *io) { - if (io->input_consumed && m->input_consumed) { - m->input_consumed(m->input_consumed_ctx, - io->id, io->input_consumed); - io->input_consumed = 0; - return 1; + if (io->beam_in && io->worker_started) { + h2_beam_send(io->beam_in, NULL, 0); /* trigger updates */ } - return 0; } static int io_out_consumed_signal(h2_mplx *m, h2_io *io) { - if (io->output_consumed && io->task && io->task->assigned) { - h2_req_engine_out_consumed(io->task->assigned, io->task->c, - io->output_consumed); - io->output_consumed = 0; - return 1; + if (io->beam_out && io->worker_started && io->task && io->task->assigned) { + h2_beam_send(io->beam_out, NULL, 0); /* trigger updates */ } return 0; } + static void io_destroy(h2_mplx *m, h2_io *io, int events) { + conn_rec *slave = NULL; int reuse_slave; /* cleanup any buffered input */ - h2_io_in_shutdown(io); + h2_io_shutdown(io); if (events) { /* Process outstanding events before destruction */ io_in_consumed_signal(m, io); @@ -291,24 +311,37 @@ static void io_destroy(h2_mplx *m, h2_io *io, int events) /* The pool is cleared/destroyed which also closes all * allocated file handles. Give this count back to our * file handle pool. */ - m->tx_handles_reserved += io->files_handles_owned; + if (io->beam_in) { + m->tx_handles_reserved += h2_beam_get_files_beamed(io->beam_in); + } + if (io->beam_out) { + m->tx_handles_reserved += h2_beam_get_files_beamed(io->beam_out); + } - h2_io_set_remove(m->stream_ios, io); - h2_io_set_remove(m->ready_ios, io); + h2_ilist_remove(m->stream_ios, io->id); + h2_ilist_remove(m->ready_ios, io->id); if (m->redo_ios) { - h2_io_set_remove(m->redo_ios, io); + h2_ilist_remove(m->redo_ios, io->id); } reuse_slave = ((m->spare_slaves->nelts < m->spare_slaves->nalloc) - && !io->rst_error && io->eor); + && !io->rst_error); if (io->task) { - conn_rec *slave = io->task->c; + slave = io->task->c; h2_task_destroy(io->task); io->task = NULL; - + } + + if (io->pool) { + if (m->spare_io_pool) { + apr_pool_destroy(m->spare_io_pool); + } + apr_pool_clear(io->pool); + m->spare_io_pool = io->pool; + } + + if (slave) { if (reuse_slave && slave->keepalive == AP_CONN_KEEPALIVE) { - apr_bucket_delete(io->eor); - io->eor = NULL; APR_ARRAY_PUSH(m->spare_slaves, conn_rec*) = slave; } else { @@ -316,18 +349,14 @@ static void io_destroy(h2_mplx *m, h2_io *io, int events) h2_slave_destroy(slave, NULL); } } - - if (io->pool) { - apr_pool_destroy(io->pool); - } - + check_tx_free(m); } static int io_stream_done(h2_mplx *m, h2_io *io, int rst_error) { /* Remove io from ready set, we will never submit it */ - h2_io_set_remove(m->ready_ios, io); + h2_ilist_remove(m->ready_ios, io->id); if (!io->worker_started || io->worker_done) { /* already finished or not even started yet */ h2_iq_remove(m->q, io->id); @@ -336,39 +365,41 @@ static int io_stream_done(h2_mplx *m, h2_io *io, int rst_error) } else { /* cleanup once task is done */ - h2_io_make_orphaned(io, rst_error); + io->orphaned = 1; + if (rst_error) { + h2_io_rst(io, rst_error); + } return 1; } } -static int stream_done_iter(void *ctx, h2_io *io) +static int stream_done_iter(void *ctx, void *val) { - return io_stream_done((h2_mplx*)ctx, io, 0); + return io_stream_done((h2_mplx*)ctx, val, 0); } -static int stream_print(void *ctx, h2_io *io) +static int stream_print(void *ctx, void *val) { h2_mplx *m = ctx; + h2_io *io = val; if (io && io->request) { ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */ "->03198: h2_stream(%ld-%d): %s %s %s -> %s %d" - "[orph=%d/started=%d/done=%d/eos_in=%d/eos_out=%d]", + "[orph=%d/started=%d/done=%d]", m->id, io->id, io->request->method, io->request->authority, io->request->path, io->response? "http" : (io->rst_error? "reset" : "?"), io->response? io->response->http_status : io->rst_error, - io->orphaned, io->worker_started, io->worker_done, - io->eos_in, io->eos_out); + io->orphaned, io->worker_started, io->worker_done); } else if (io) { ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */ "->03198: h2_stream(%ld-%d): NULL -> %s %d" - "[orph=%d/started=%d/done=%d/eos_in=%d/eos_out=%d]", + "[orph=%d/started=%d/done=%d]", m->id, io->id, io->response? "http" : (io->rst_error? "reset" : "?"), io->response? io->response->http_status : io->rst_error, - io->orphaned, io->worker_started, io->worker_done, - io->eos_in, io->eos_out); + io->orphaned, io->worker_started, io->worker_done); } else { ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */ @@ -392,7 +423,7 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) h2_iq_clear(m->q); apr_thread_cond_broadcast(m->task_thawed); - while (!h2_io_set_iter(m->stream_ios, stream_done_iter, m)) { + while (!h2_ilist_iter(m->stream_ios, stream_done_iter, m)) { /* iterate until all ios have been orphaned or destroyed */ } @@ -407,9 +438,13 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) m->join_wait = wait; ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld): release_join, waiting on %d worker to report back", - m->id, (int)h2_io_set_size(m->stream_ios)); + m->id, (int)h2_ilist_count(m->stream_ios)); status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs)); + + while (!h2_ilist_iter(m->stream_ios, stream_done_iter, m)) { + /* iterate until all ios have been orphaned or destroyed */ + } if (APR_STATUS_IS_TIMEUP(status)) { if (i > 0) { /* Oh, oh. Still we wait for assigned workers to report that @@ -421,9 +456,9 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) "h2_mplx(%ld): release, waiting for %d seconds now for " "%d h2_workers to return, have still %d requests outstanding", m->id, i*wait_secs, m->workers_busy, - (int)h2_io_set_size(m->stream_ios)); + (int)h2_ilist_count(m->stream_ios)); if (i == 1) { - h2_io_set_iter(m->stream_ios, stream_print, m); + h2_ilist_iter(m->stream_ios, stream_print, m); } } h2_mplx_abort(m); @@ -431,10 +466,10 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) } } - if (!h2_io_set_is_empty(m->stream_ios)) { + if (!h2_ilist_empty(m->stream_ios)) { ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, m->c, "h2_mplx(%ld): release_join, %d streams still open", - m->id, (int)h2_io_set_size(m->stream_ios)); + m->id, (int)h2_ilist_count(m->stream_ios)); } ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056) "h2_mplx(%ld): release_join -> destroy", m->id); @@ -468,7 +503,7 @@ apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error) */ AP_DEBUG_ASSERT(m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_io_set_get(m->stream_ios, stream_id); + h2_io *io = h2_ilist_get(m->stream_ios, stream_id); /* there should be an h2_io, once the stream has been scheduled * for processing, e.g. when we received all HEADERs. But when @@ -484,107 +519,16 @@ apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error) return status; } -apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block, - int stream_id, apr_bucket_brigade *bb, - apr_table_t *trailers, - struct apr_thread_cond_t *iowait) -{ - apr_status_t status; - int acquired; - - AP_DEBUG_ASSERT(m); - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_io_set_get(m->stream_ios, stream_id); - if (io && !io->orphaned) { - H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_read_pre"); - - h2_io_signal_init(io, H2_IO_READ, m->stream_timeout, iowait); - status = h2_io_in_read(io, bb, -1, trailers); - while (APR_STATUS_IS_EAGAIN(status) - && !is_aborted(m, &status) - && block == APR_BLOCK_READ) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c, - "h2_mplx(%ld-%d): wait on in data (BLOCK_READ)", - m->id, stream_id); - status = h2_io_signal_wait(m, io); - if (status == APR_SUCCESS) { - status = h2_io_in_read(io, bb, -1, trailers); - } - } - H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_read_post"); - h2_io_signal_exit(io); - } - else { - status = APR_EOF; - } - leave_mutex(m, acquired); - } - return status; -} - -apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id, - const char *data, apr_size_t len, int eos) -{ - apr_status_t status; - int acquired; - - AP_DEBUG_ASSERT(m); - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_io_set_get(m->stream_ios, stream_id); - if (io && !io->orphaned) { - H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_pre"); - status = h2_io_in_write(io, data, len, eos); - H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_post"); - h2_io_signal(io, H2_IO_READ); - io_in_consumed_signal(m, io); - } - else { - status = APR_ECONNABORTED; - } - leave_mutex(m, acquired); - } - return status; -} - -apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id) -{ - apr_status_t status; - int acquired; - - AP_DEBUG_ASSERT(m); - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_io_set_get(m->stream_ios, stream_id); - if (io && !io->orphaned) { - status = h2_io_in_close(io); - H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_close"); - h2_io_signal(io, H2_IO_READ); - io_in_consumed_signal(m, io); - } - else { - status = APR_ECONNABORTED; - } - leave_mutex(m, acquired); - } - return status; -} - void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx) { m->input_consumed = cb; m->input_consumed_ctx = ctx; } -typedef struct { - h2_mplx * m; - int streams_updated; -} update_ctx; - -static int update_window(void *ctx, h2_io *io) +static int update_window(void *ctx, void *val) { - update_ctx *uctx = (update_ctx*)ctx; - if (io_in_consumed_signal(uctx->m, io)) { - ++uctx->streams_updated; - } + h2_mplx *m = ctx; + io_in_consumed_signal(m, val); return 1; } @@ -598,46 +542,11 @@ apr_status_t h2_mplx_in_update_windows(h2_mplx *m) return APR_ECONNABORTED; } if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - update_ctx ctx; + h2_ilist_iter(m->stream_ios, update_window, m); - ctx.m = m; - ctx.streams_updated = 0; - - status = APR_EAGAIN; - h2_io_set_iter(m->stream_ios, update_window, &ctx); - - if (ctx.streams_updated) { - status = APR_SUCCESS; - } - leave_mutex(m, acquired); - } - return status; -} - -apr_status_t h2_mplx_out_get_brigade(h2_mplx *m, int stream_id, - apr_bucket_brigade *bb, - apr_off_t len, apr_table_t **ptrailers) -{ - apr_status_t status; - int acquired; - - AP_DEBUG_ASSERT(m); - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_io_set_get(m->stream_ios, stream_id); - if (io && !io->orphaned) { - H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_get_brigade_pre"); - - status = h2_io_out_get_brigade(io, bb, len); - - H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_get_brigade_post"); - if (status == APR_SUCCESS) { - h2_io_signal(io, H2_IO_WRITE); - } - } - else { - status = APR_ECONNABORTED; - } - *ptrailers = io->response? io->response->trailers : NULL; + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_session(%ld): windows updated", m->id); + status = APR_SUCCESS; leave_mutex(m, acquired); } return status; @@ -651,7 +560,7 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams) AP_DEBUG_ASSERT(m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_io_set_shift(m->ready_ios); + h2_io *io = h2_ilist_shift(m->ready_ios); if (io && !m->aborted) { stream = h2_ihash_get(streams, io->id); if (stream) { @@ -661,9 +570,7 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams) } else { AP_DEBUG_ASSERT(io->response); - H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_next_submit_pre"); - h2_stream_set_response(stream, io->response, io->bbout); - H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_next_submit_post"); + h2_stream_set_response(stream, io->response, io->beam_out); } } else { @@ -675,114 +582,62 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams) "h2_mplx(%ld): stream for response %d closed, " "resetting io to close request processing", m->id, io->id); - h2_io_make_orphaned(io, H2_ERR_STREAM_CLOSED); + io->orphaned = 1; + h2_io_rst(io, H2_ERR_STREAM_CLOSED); if (!io->worker_started || io->worker_done) { io_destroy(m, io, 1); } else { /* hang around until the h2_task is done, but - * shutdown input and send out any events (e.g. window - * updates) asap. */ - h2_io_in_shutdown(io); + * shutdown input/output and send out any events asap. */ + h2_io_shutdown(io); io_in_consumed_signal(m, io); } } - - h2_io_signal(io, H2_IO_WRITE); } leave_mutex(m, acquired); } return stream; } -static apr_status_t out_write(h2_mplx *m, h2_io *io, - ap_filter_t* f, int blocking, - apr_bucket_brigade *bb, - struct apr_thread_cond_t *iowait) +static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response, + h2_bucket_beam *output) { apr_status_t status = APR_SUCCESS; - /* We check the memory footprint queued for this stream_id - * and block if it exceeds our configured limit. - * We will not split buckets to enforce the limit to the last - * byte. After all, the bucket is already in memory. - */ - while (status == APR_SUCCESS - && !APR_BRIGADE_EMPTY(bb) - && !is_aborted(m, &status)) { - - status = h2_io_out_write(io, bb, blocking? m->stream_max_mem : INT_MAX, - &m->tx_handles_reserved); - io_out_consumed_signal(m, io); - - /* Wait for data to drain until there is room again or - * stream timeout expires */ - h2_io_signal_init(io, H2_IO_WRITE, m->stream_timeout, iowait); - while (status == APR_SUCCESS - && !APR_BRIGADE_EMPTY(bb) - && iowait - && (m->stream_max_mem <= h2_io_out_length(io)) - && !is_aborted(m, &status)) { - if (!blocking) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, - "h2_mplx(%ld-%d): incomplete write", - m->id, io->id); - return APR_INCOMPLETE; - } - if (f) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, - "h2_mplx(%ld-%d): waiting for out drain", - m->id, io->id); - } - status = h2_io_signal_wait(m, io); - } - h2_io_signal_exit(io); + + h2_io *io = h2_ilist_get(m->stream_ios, stream_id); + if (!io || io->orphaned) { + return APR_ECONNABORTED; } - apr_brigade_cleanup(bb); - return status; -} - -static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response, - ap_filter_t* f, apr_bucket_brigade *bb, - struct apr_thread_cond_t *iowait) -{ - apr_status_t status = APR_SUCCESS; + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + "h2_mplx(%ld-%d): open response: %d, rst=%d", + m->id, stream_id, response->http_status, + response->rst_error); - h2_io *io = h2_io_set_get(m->stream_ios, stream_id); - if (io && !io->orphaned) { - if (f) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, - "h2_mplx(%ld-%d): open response: %d, rst=%d", - m->id, stream_id, response->http_status, - response->rst_error); - } - - h2_io_set_response(io, response); - h2_io_set_add(m->ready_ios, io); - if (response && response->http_status < 300) { - /* we might see some file buckets in the output, see - * if we have enough handles reserved. */ - check_tx_reservation(m); - } - if (bb) { - status = out_write(m, io, f, 0, bb, iowait); - if (status == APR_INCOMPLETE) { - /* write will have transferred as much data as possible. - caller has to deal with non-empty brigade */ - status = APR_SUCCESS; - } - } - have_out_data_for(m, stream_id); + if (output) { + h2_beam_buffer_size_set(output, m->stream_max_mem); + h2_beam_timeout_set(output, m->stream_timeout); + h2_beam_on_consumed(output, stream_output_consumed, io); + m->tx_handles_reserved -= h2_beam_get_files_beamed(output); + h2_beam_on_file_beam(output, can_beam_file, m); + h2_beam_mutex_set(output, io_mutex_enter, io_mutex_leave, + io->task->cond, m); } - else { - status = APR_ECONNABORTED; + h2_io_set_response(io, response, output); + + h2_ilist_add(m->ready_ios, io); + if (response && response->http_status < 300) { + /* we might see some file buckets in the output, see + * if we have enough handles reserved. */ + check_tx_reservation(m); } + have_out_data_for(m, stream_id); return status; } apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response, - ap_filter_t* f, apr_bucket_brigade *bb, - struct apr_thread_cond_t *iowait) + h2_bucket_beam *output) { apr_status_t status; int acquired; @@ -793,37 +648,7 @@ apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response, status = APR_ECONNABORTED; } else { - status = out_open(m, stream_id, response, f, bb, iowait); - if (APLOGctrace1(m->c)) { - h2_util_bb_log(m->c, stream_id, APLOG_TRACE1, "h2_mplx_out_open", bb); - } - } - leave_mutex(m, acquired); - } - return status; -} - -apr_status_t h2_mplx_out_write(h2_mplx *m, int stream_id, - ap_filter_t* f, int blocking, - apr_bucket_brigade *bb, - struct apr_thread_cond_t *iowait) -{ - apr_status_t status; - int acquired; - - AP_DEBUG_ASSERT(m); - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_io_set_get(m->stream_ios, stream_id); - if (io && !io->orphaned) { - status = out_write(m, io, f, blocking, bb, iowait); - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c, - "h2_mplx(%ld-%d): write", m->id, io->id); - H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_write"); - - have_out_data_for(m, stream_id); - } - else { - status = APR_ECONNABORTED; + status = out_open(m, stream_id, response, output); } leave_mutex(m, acquired); } @@ -837,7 +662,7 @@ apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id) AP_DEBUG_ASSERT(m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_io_set_get(m->stream_ios, stream_id); + h2_io *io = h2_ilist_get(m->stream_ios, stream_id); if (io && !io->orphaned) { if (!io->response && !io->rst_error) { /* In case a close comes before a response was created, @@ -846,45 +671,20 @@ apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id) */ h2_response *r = h2_response_die(stream_id, APR_EGENERAL, io->request, m->pool); - status = out_open(m, stream_id, r, NULL, NULL, NULL); + status = out_open(m, stream_id, r, NULL); ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c, "h2_mplx(%ld-%d): close, no response, no rst", m->id, io->id); } ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c, - "h2_mplx(%ld-%d): close with eor=%s", - m->id, io->id, io->eor? "yes" : "no"); - status = h2_io_out_close(io); - H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_close"); - io_out_consumed_signal(m, io); - - have_out_data_for(m, stream_id); - } - else { - status = APR_ECONNABORTED; - } - leave_mutex(m, acquired); - } - return status; -} - -apr_status_t h2_mplx_out_rst(h2_mplx *m, int stream_id, int error) -{ - apr_status_t status; - int acquired; - - AP_DEBUG_ASSERT(m); - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_io_set_get(m->stream_ios, stream_id); - if (io && !io->rst_error && !io->orphaned) { - h2_io_rst(io, error); - if (!io->response) { - h2_io_set_add(m->ready_ios, io); + "h2_mplx(%ld-%d): close", m->id, io->id); + if (io->beam_out) { + status = h2_beam_close(io->beam_out); + h2_beam_log(io->beam_out, stream_id, "out_close", m->c, + APLOG_TRACE2); } - H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_rst"); - + io_out_consumed_signal(m, io); have_out_data_for(m, stream_id); - h2_io_signal(io, H2_IO_WRITE); } else { status = APR_ECONNABORTED; @@ -894,26 +694,6 @@ apr_status_t h2_mplx_out_rst(h2_mplx *m, int stream_id, int error) return status; } -int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id) -{ - apr_status_t status; - int has_data = 0; - int acquired; - - AP_DEBUG_ASSERT(m); - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_io_set_get(m->stream_ios, stream_id); - if (io && !io->orphaned) { - has_data = h2_io_out_has_data(io); - } - else { - has_data = 0; - } - leave_mutex(m, acquired); - } - return has_data; -} - apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout, apr_thread_cond_t *iowait) { @@ -969,22 +749,7 @@ apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx) return status; } -static h2_io *open_io(h2_mplx *m, int stream_id, const h2_request *request) -{ - apr_pool_t *io_pool; - h2_io *io; - - apr_pool_create(&io_pool, m->pool); - apr_pool_tag(io_pool, "h2_io"); - io = h2_io_create(stream_id, io_pool, m->bucket_alloc, request); - h2_io_set_add(m->stream_ios, io); - - return io; -} - - -apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, - const h2_request *req, +apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream, h2_stream_pri_cmp *cmp, void *ctx) { apr_status_t status; @@ -997,24 +762,38 @@ apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, status = APR_ECONNABORTED; } else { - h2_io *io = open_io(m, stream_id, req); + apr_pool_t *io_pool; + h2_io *io; - if (!io->request->body) { - status = h2_io_in_close(io); + if (!m->need_registration) { + m->need_registration = h2_iq_empty(m->q); } - - m->need_registration = m->need_registration || h2_iq_empty(m->q); - do_registration = (m->need_registration && m->workers_busy < m->workers_max); + if (m->workers_busy < m->workers_max) { + do_registration = m->need_registration; + } + + io_pool = m->spare_io_pool; + if (io_pool) { + m->spare_io_pool = NULL; + } + else { + apr_pool_create(&io_pool, m->pool); + apr_pool_tag(io_pool, "h2_io"); + } + io = h2_io_create(stream->id, io_pool, stream->request); + h2_ilist_add(m->stream_ios, io); h2_iq_add(m->q, io->id, cmp, ctx); + stream->input = io->beam_in; ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c, - "h2_mplx(%ld-%d): process", m->c->id, stream_id); - H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_process"); + "h2_mplx(%ld-%d): process, body=%d", + m->c->id, stream->id, io->request->body); } leave_mutex(m, acquired); } - if (status == APR_SUCCESS && do_registration) { - workers_register(m); + if (do_registration) { + m->need_registration = 0; + h2_workers_register(m->workers, m); } return status; } @@ -1022,20 +801,24 @@ apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, static h2_task *pop_task(h2_mplx *m) { h2_task *task = NULL; + h2_io *io; int sid; - while (!m->aborted && !task - && (m->workers_busy < m->workers_limit) - && (sid = h2_iq_shift(m->q)) > 0) { - h2_io *io = h2_io_set_get(m->stream_ios, sid); - if (io && io->orphaned) { - io_destroy(m, io, 0); - if (m->join_wait) { - apr_thread_cond_signal(m->join_wait); - } - } - else if (io) { + while (!m->aborted && !task && (m->workers_busy < m->workers_limit) + && (sid = h2_iq_shift(m->q)) > 0) { + + io = h2_ilist_get(m->stream_ios, sid); + if (io) { conn_rec *slave, **pslave; + if (io->orphaned) { + /* TODO: add to purge list */ + io_destroy(m, io, 0); + if (m->join_wait) { + apr_thread_cond_signal(m->join_wait); + } + continue; + } + pslave = (conn_rec **)apr_array_pop(m->spare_slaves); if (pslave) { slave = *pslave; @@ -1046,12 +829,21 @@ static h2_task *pop_task(h2_mplx *m) } slave->sbh = m->c->sbh; - io->task = task = h2_task_create(m->id, io->request, slave, m); + io->task = task = h2_task_create(slave, io->request, + io->beam_in, m); m->c->keepalives++; apr_table_setn(slave->notes, H2_TASK_ID_NOTE, task->id); io->worker_started = 1; io->started_at = apr_time_now(); + + if (io->beam_in) { + h2_beam_timeout_set(io->beam_in, m->stream_timeout); + h2_beam_on_consumed(io->beam_in, stream_input_consumed, m); + h2_beam_on_file_beam(io->beam_in, can_beam_file, m); + h2_beam_mutex_set(io->beam_in, io_mutex_enter, + io_mutex_leave, task->cond, m); + } if (sid > m->max_stream_started) { m->max_stream_started = sid; } @@ -1088,7 +880,7 @@ h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more) static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) { if (task) { - h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id); + h2_io *io = h2_ilist_get(m->stream_ios, task->stream_id); if (task->frozen) { /* this task was handed over to an engine for processing @@ -1112,14 +904,17 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) h2_mplx_out_close(m, task->stream_id); if (ngn && io) { - apr_off_t bytes = io->output_consumed + h2_io_out_length(io); + apr_off_t bytes = 0; + if (io->beam_out) { + h2_beam_send(io->beam_out, NULL, APR_NONBLOCK_READ); + bytes += h2_beam_get_buffered(io->beam_out); + } if (bytes > 0) { /* we need to report consumed and current buffered output * to the engine. The request will be streamed out or cancelled, * no more data is coming from it and the engine should update * its calculations before we destroy this information. */ h2_req_engine_out_consumed(ngn, task->c, bytes); - io->output_consumed = 0; } } @@ -1136,10 +931,10 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) if (io) { apr_time_t now = apr_time_now(); if (!io->orphaned && m->redo_ios - && h2_io_set_get(m->redo_ios, io->id)) { + && h2_ilist_get(m->redo_ios, io->id)) { /* reset and schedule again */ h2_io_redo(io); - h2_io_set_remove(m->redo_ios, io); + h2_ilist_remove(m->redo_ios, io->id); h2_iq_add(m->q, io->id, NULL, NULL); } else { @@ -1168,6 +963,7 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) } if (io->orphaned) { + /* TODO: add to purge list */ io_destroy(m, io, 0); if (m->join_wait) { apr_thread_cond_signal(m->join_wait); @@ -1211,12 +1007,12 @@ typedef struct { apr_time_t now; } io_iter_ctx; -static int latest_repeatable_busy_unsubmitted_iter(void *data, h2_io *io) +static int latest_repeatable_busy_unsubmitted_iter(void *data, void *val) { io_iter_ctx *ctx = data; + h2_io *io = val; if (io->worker_started && !io->worker_done - && h2_io_is_repeatable(io) - && !h2_io_set_get(ctx->m->redo_ios, io->id)) { + && h2_io_can_redo(io) && !h2_ilist_get(ctx->m->redo_ios, io->id)) { /* this io occupies a worker, the response has not been submitted yet, * not been cancelled and it is a repeatable request * -> it can be re-scheduled later */ @@ -1233,13 +1029,14 @@ static h2_io *get_latest_repeatable_busy_unsubmitted_io(h2_mplx *m) io_iter_ctx ctx; ctx.m = m; ctx.io = NULL; - h2_io_set_iter(m->stream_ios, latest_repeatable_busy_unsubmitted_iter, &ctx); + h2_ilist_iter(m->stream_ios, latest_repeatable_busy_unsubmitted_iter, &ctx); return ctx.io; } -static int timed_out_busy_iter(void *data, h2_io *io) +static int timed_out_busy_iter(void *data, void *val) { io_iter_ctx *ctx = data; + h2_io *io = val; if (io->worker_started && !io->worker_done && (ctx->now - io->started_at) > ctx->m->stream_timeout) { /* timed out stream occupying a worker, found */ @@ -1254,7 +1051,7 @@ static h2_io *get_timed_out_busy_stream(h2_mplx *m) ctx.m = m; ctx.io = NULL; ctx.now = apr_time_now(); - h2_io_set_iter(m->stream_ios, timed_out_busy_iter, &ctx); + h2_ilist_iter(m->stream_ios, timed_out_busy_iter, &ctx); return ctx.io; } @@ -1264,19 +1061,19 @@ static apr_status_t unschedule_slow_ios(h2_mplx *m) int n; if (!m->redo_ios) { - m->redo_ios = h2_io_set_create(m->pool); + m->redo_ios = h2_ilist_create(m->pool); } /* Try to get rid of streams that occupy workers. Look for safe requests * that are repeatable. If none found, fail the connection. */ - n = (m->workers_busy - m->workers_limit - h2_io_set_size(m->redo_ios)); + n = (m->workers_busy - m->workers_limit - h2_ilist_count(m->redo_ios)); while (n > 0 && (io = get_latest_repeatable_busy_unsubmitted_io(m))) { - h2_io_set_add(m->redo_ios, io); + h2_ilist_add(m->redo_ios, io); h2_io_rst(io, H2_ERR_CANCEL); --n; } - if ((m->workers_busy - h2_io_set_size(m->redo_ios)) > m->workers_limit) { + if ((m->workers_busy - h2_ilist_count(m->redo_ios)) > m->workers_limit) { io = get_timed_out_busy_stream(m); if (io) { /* Too many busy workers, unable to cancel enough streams @@ -1295,7 +1092,7 @@ apr_status_t h2_mplx_idle(h2_mplx *m) int acquired; if (enter_mutex(m, &acquired) == APR_SUCCESS) { - apr_size_t scount = h2_io_set_size(m->stream_ios); + apr_size_t scount = h2_ilist_count(m->stream_ios); if (scount > 0 && m->workers_busy) { /* If we have streams in connection state 'IDLE', meaning * all streams are ready to sent data out, but lack @@ -1350,9 +1147,10 @@ typedef struct { int streams_updated; } ngn_update_ctx; -static int ngn_update_window(void *ctx, h2_io *io) +static int ngn_update_window(void *ctx, void *val) { ngn_update_ctx *uctx = ctx; + h2_io *io = val; if (io && io->task && io->task->assigned == uctx->ngn && io_out_consumed_signal(uctx->m, io)) { ++uctx->streams_updated; @@ -1367,7 +1165,7 @@ static apr_status_t ngn_out_update_windows(h2_mplx *m, h2_req_engine *ngn) ctx.m = m; ctx.ngn = ngn; ctx.streams_updated = 0; - h2_io_set_iter(m->stream_ios, ngn_update_window, &ctx); + h2_ilist_iter(m->stream_ios, ngn_update_window, &ctx); return ctx.streams_updated? APR_SUCCESS : APR_EAGAIN; } @@ -1389,7 +1187,7 @@ apr_status_t h2_mplx_req_engine_push(const char *ngn_type, task->r = r; if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id); + h2_io *io = h2_ilist_get(m->stream_ios, task->stream_id); if (!io || io->orphaned) { status = APR_ECONNABORTED; } diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h index 4029847678..7e6f5bcead 100644 --- a/modules/http2/h2_mplx.h +++ b/modules/http2/h2_mplx.h @@ -37,16 +37,17 @@ struct apr_pool_t; struct apr_thread_mutex_t; struct apr_thread_cond_t; +struct h2_bucket_beam; struct h2_config; struct h2_ihash_t; +struct h2_ilist_t; struct h2_response; struct h2_task; struct h2_stream; struct h2_request; -struct h2_io_set; struct apr_thread_cond_t; struct h2_workers; -struct h2_int_queue; +struct h2_iqueue; struct h2_ngn_shed; struct h2_req_engine; @@ -72,10 +73,10 @@ struct h2_mplx { unsigned int aborted : 1; unsigned int need_registration : 1; - struct h2_int_queue *q; - struct h2_io_set *stream_ios; - struct h2_io_set *ready_ios; - struct h2_io_set *redo_ios; + struct h2_iqueue *q; + struct h2_ilist_t *stream_ios; + struct h2_ilist_t *ready_ios; + struct h2_ilist_t *redo_ios; apr_uint32_t max_streams; /* max # of concurrent streams */ apr_uint32_t max_stream_started; /* highest stream id that started processing */ @@ -96,10 +97,11 @@ struct h2_mplx { apr_size_t stream_max_mem; apr_interval_time_t stream_timeout; + apr_pool_t *spare_io_pool; apr_array_header_t *spare_slaves; /* spare slave connections */ struct h2_workers *workers; - apr_size_t tx_handles_reserved; + int tx_handles_reserved; apr_size_t tx_chunk_size; h2_mplx_consumed_cb *input_consumed; @@ -166,10 +168,6 @@ apr_uint32_t h2_mplx_shutdown(h2_mplx *m); */ apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error); -/* Return != 0 iff the multiplexer has output data for the given stream. - */ -int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id); - /** * Waits on output data from any stream in this session to become available. * Returns APR_TIMEUP if no data arrived in the given time. @@ -190,8 +188,7 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout, * @param cmp the stream priority compare function * @param ctx context data for the compare function */ -apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, - const struct h2_request *r, +apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream, h2_stream_pri_cmp *cmp, void *ctx); /** @@ -219,37 +216,11 @@ void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx); ******************************************************************************/ /** - * Reads a buckets for the given stream_id. Will return ARP_EAGAIN when - * called with APR_NONBLOCK_READ and no data present. Will return APR_EOF - * when the end of the stream input has been reached. - * The condition passed in will be used for blocking/signalling and will - * be protected by the mplx's own mutex. - */ -apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block, - int stream_id, apr_bucket_brigade *bb, - apr_table_t *trailers, - struct apr_thread_cond_t *iowait); - -/** - * Appends data to the input of the given stream. Storage of input data is - * not subject to flow control. - */ -apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id, - const char *data, apr_size_t len, int eos); - -/** - * Closes the input for the given stream_id. - */ -apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id); - -/** * Invoke the consumed callback for all streams that had bytes read since the * last call to this function. If no stream had input data consumed, the * callback is not invoked. * The consumed callback may also be invoked at other times whenever * the need arises. - * Returns APR_SUCCESS when an update happened, APR_EAGAIN if no update - * happened. */ apr_status_t h2_mplx_in_update_windows(h2_mplx *m); @@ -267,44 +238,17 @@ struct h2_stream *h2_mplx_next_submit(h2_mplx *m, struct h2_ihash_t *streams); /** - * Reads output data into the given brigade. Will never block, but - * return APR_EAGAIN until data arrives or the stream is closed. - */ -apr_status_t h2_mplx_out_get_brigade(h2_mplx *mplx, int stream_id, - apr_bucket_brigade *bb, - apr_off_t len, apr_table_t **ptrailers); - -/** * Opens the output for the given stream with the specified response. */ apr_status_t h2_mplx_out_open(h2_mplx *mplx, int stream_id, struct h2_response *response, - ap_filter_t* filter, apr_bucket_brigade *bb, - struct apr_thread_cond_t *iowait); - -/** - * Append the brigade to the stream output. Might block if amount - * of bytes buffered reaches configured max. - * @param stream_id the stream identifier - * @param filter the apache filter context of the data - * @param blocking == 0 iff call should return with APR_INCOMPLETE if - * the full brigade cannot be written at once - * @param bb the bucket brigade to append - * @param iowait a conditional used for block/signalling in h2_mplx - */ -apr_status_t h2_mplx_out_write(h2_mplx *mplx, int stream_id, - ap_filter_t* filter, - int blocking, - apr_bucket_brigade *bb, - struct apr_thread_cond_t *iowait); + struct h2_bucket_beam *output); /** * Closes the output for stream stream_id. */ apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id); -apr_status_t h2_mplx_out_rst(h2_mplx *m, int stream_id, int error); - /******************************************************************************* * h2_mplx list Manipulation. ******************************************************************************/ diff --git a/modules/http2/h2_ngn_shed.c b/modules/http2/h2_ngn_shed.c index 32483d9332..48ac09fae6 100644 --- a/modules/http2/h2_ngn_shed.c +++ b/modules/http2/h2_ngn_shed.c @@ -33,12 +33,10 @@ #include "h2_conn.h" #include "h2_ctx.h" #include "h2_h2.h" -#include "h2_int_queue.h" #include "h2_mplx.h" #include "h2_response.h" #include "h2_request.h" #include "h2_task.h" -#include "h2_task_output.h" #include "h2_util.h" #include "h2_ngn_shed.h" diff --git a/modules/http2/h2_proxy_session.c b/modules/http2/h2_proxy_session.c index 6f0298e2ef..19aff5bf6f 100644 --- a/modules/http2/h2_proxy_session.c +++ b/modules/http2/h2_proxy_session.c @@ -23,7 +23,6 @@ #include "mod_http2.h" #include "h2.h" -#include "h2_int_queue.h" #include "h2_request.h" #include "h2_util.h" #include "h2_proxy_session.h" diff --git a/modules/http2/h2_proxy_session.h b/modules/http2/h2_proxy_session.h index 7078981c7a..52be5c6b37 100644 --- a/modules/http2/h2_proxy_session.h +++ b/modules/http2/h2_proxy_session.h @@ -20,7 +20,7 @@ #include <nghttp2/nghttp2.h> -struct h2_int_queue; +struct h2_iqueue; struct h2_ihash_t; typedef enum { @@ -74,7 +74,7 @@ struct h2_proxy_session { apr_interval_time_t wait_timeout; struct h2_ihash_t *streams; - struct h2_int_queue *suspended; + struct h2_iqueue *suspended; apr_size_t remote_max_concurrent; int last_stream_id; /* last stream id processed by backend, or 0 */ diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c index 8da2e9f2bf..e491f1f1b7 100644 --- a/modules/http2/h2_session.c +++ b/modules/http2/h2_session.c @@ -112,7 +112,7 @@ static void cleanup_streams(h2_session *session) while (1) { h2_ihash_iter(session->streams, find_cleanup_stream, &ctx); if (ctx.candidate) { - h2_session_stream_destroy(session, ctx.candidate); + h2_session_stream_done(session, ctx.candidate); ctx.candidate = NULL; } else { @@ -1277,8 +1277,9 @@ static apr_status_t submit_response(h2_session *session, h2_stream *stream) const h2_priority *prio; ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03073) - "h2_stream(%ld-%d): submit response %d", - session->id, stream->id, response->http_status); + "h2_stream(%ld-%d): submit response %d, REMOTE_WINDOW_SIZE=%u", + session->id, stream->id, response->http_status, + (unsigned int)nghttp2_session_get_stream_remote_window_size(session->ngh2, stream->id)); if (response->content_length != 0) { memset(&provider, 0, sizeof(provider)); @@ -1504,21 +1505,24 @@ apr_status_t h2_session_set_prio(h2_session *session, h2_stream *stream, return status; } -apr_status_t h2_session_stream_destroy(h2_session *session, h2_stream *stream) +apr_status_t h2_session_stream_done(h2_session *session, h2_stream *stream) { apr_pool_t *pool = h2_stream_detach_pool(stream); - + int stream_id = stream->id; + int rst_error = stream->rst_error; + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, "h2_stream(%ld-%d): cleanup by EOS bucket destroy", - session->id, stream->id); + session->id, stream_id); + if (session->streams) { + h2_ihash_remove(session->streams, stream_id); + } + + h2_stream_cleanup(stream); /* this may be called while the session has already freed * some internal structures or even when the mplx is locked. */ if (session->mplx) { - h2_mplx_stream_done(session->mplx, stream->id, stream->rst_error); - } - - if (session->streams) { - h2_ihash_remove(session->streams, stream->id); + h2_mplx_stream_done(session->mplx, stream_id, rst_error); } h2_stream_destroy(stream); @@ -1529,6 +1533,7 @@ apr_status_t h2_session_stream_destroy(h2_session *session, h2_stream *stream) } session->spare = pool; } + return APR_SUCCESS; } @@ -2217,9 +2222,10 @@ apr_status_t h2_session_process(h2_session *session, int async) } /* send out window updates for our inputs */ status = h2_mplx_in_update_windows(session->mplx); - if (status != APR_SUCCESS && status != APR_EAGAIN) { + if (status != APR_SUCCESS) { dispatch_event(session, H2_SESSION_EV_CONN_ERROR, - H2_ERR_INTERNAL_ERROR, "window update error"); + H2_ERR_INTERNAL_ERROR, + "window update error"); break; } } diff --git a/modules/http2/h2_session.h b/modules/http2/h2_session.h index 28eedba843..bf4ded338a 100644 --- a/modules/http2/h2_session.h +++ b/modules/http2/h2_session.h @@ -218,8 +218,8 @@ int h2_session_push_enabled(h2_session *session); * @param session the session to which the stream belongs * @param stream the stream to destroy */ -apr_status_t h2_session_stream_destroy(h2_session *session, - struct h2_stream *stream); +apr_status_t h2_session_stream_done(h2_session *session, + struct h2_stream *stream); /** * Submit a push promise on the stream and schedule the new steam for diff --git a/modules/http2/h2_stream.c b/modules/http2/h2_stream.c index a7979e3ffa..ae98019456 100644 --- a/modules/http2/h2_stream.c +++ b/modules/http2/h2_stream.c @@ -24,6 +24,7 @@ #include <nghttp2/nghttp2.h> #include "h2_private.h" +#include "h2_bucket_beam.h" #include "h2_conn.h" #include "h2_config.h" #include "h2_h2.h" @@ -36,7 +37,6 @@ #include "h2_stream.h" #include "h2_task.h" #include "h2_ctx.h" -#include "h2_task_input.h" #include "h2_task.h" #include "h2_util.h" @@ -52,6 +52,13 @@ static int state_transition[][7] = { /*CL*/{ 1, 1, 0, 0, 1, 1, 1 }, }; +#define H2_STREAM_OUT_LOG(lvl,s,msg) \ + do { \ + if (APLOG_C_IS_LEVEL((s)->session->c,lvl)) \ + h2_util_bb_log((s)->session->c,(s)->session->id,lvl,msg,(s)->buffer); \ + } while(0) + + static int set_state(h2_stream *stream, h2_stream_state_t state) { int allowed = state_transition[state][stream->state]; @@ -135,8 +142,6 @@ static int output_open(h2_stream *stream) } } -static h2_sos *h2_sos_mplx_create(h2_stream *stream, h2_response *response); - h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session, int initiated_on, const h2_request *creq) { @@ -166,18 +171,26 @@ h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session, return stream; } -apr_status_t h2_stream_destroy(h2_stream *stream) +void h2_stream_cleanup(h2_stream *stream) { AP_DEBUG_ASSERT(stream); + if (stream->buffer) { + apr_brigade_cleanup(stream->buffer); + } +} + +void h2_stream_destroy(h2_stream *stream) +{ + AP_DEBUG_ASSERT(stream); + h2_stream_cleanup(stream); if (stream->pool) { apr_pool_destroy(stream->pool); } - return APR_SUCCESS; } -void h2_stream_cleanup(h2_stream *stream) +void h2_stream_eos_destroy(h2_stream *stream) { - h2_session_stream_destroy(stream->session, stream); + h2_session_stream_done(stream->session, stream); /* stream is gone */ } @@ -200,33 +213,7 @@ void h2_stream_rst(h2_stream *stream, int error_code) struct h2_response *h2_stream_get_response(h2_stream *stream) { - return stream->sos? stream->sos->response : NULL; -} - -apr_status_t h2_stream_set_response(h2_stream *stream, h2_response *response, - apr_bucket_brigade *bb) -{ - apr_status_t status = APR_SUCCESS; - h2_sos *sos; - - if (!output_open(stream)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c, - "h2_stream(%ld-%d): output closed", - stream->session->id, stream->id); - return APR_ECONNRESET; - } - - sos = h2_sos_mplx_create(stream, response); - if (sos->response->sos_filter) { - sos = h2_filter_sos_create(sos->response->sos_filter, sos); - } - stream->sos = sos; - - status = stream->sos->buffer(stream->sos, bb); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->c, - "h2_stream(%ld-%d): set_response(%d)", - stream->session->id, stream->id, stream->sos->response->http_status); - return status; + return stream->response; } apr_status_t h2_stream_set_request(h2_stream *stream, request_rec *r) @@ -286,15 +273,11 @@ apr_status_t h2_stream_schedule(h2_stream *stream, int eos, int push_enabled, status = h2_request_end_headers(stream->request, stream->pool, eos, push_enabled); if (status == APR_SUCCESS) { - if (!eos) { - stream->request->body = 1; - } - stream->input_remaining = stream->request->content_length; - - status = h2_mplx_process(stream->session->mplx, stream->id, - stream->request, cmp, ctx); + stream->request->body = !eos; stream->scheduled = 1; + stream->input_remaining = stream->request->content_length; + status = h2_mplx_process(stream->session->mplx, stream, cmp, ctx); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c, "h2_stream(%ld-%d): scheduled %s %s://%s%s", stream->session->id, stream->id, @@ -331,8 +314,8 @@ apr_status_t h2_stream_close_input(h2_stream *stream) return APR_ECONNRESET; } - if (close_input(stream)) { - status = h2_mplx_in_close(stream->session->mplx, stream->id); + if (close_input(stream) && stream->input) { + status = h2_beam_close(stream->input); } return status; } @@ -340,25 +323,29 @@ apr_status_t h2_stream_close_input(h2_stream *stream) apr_status_t h2_stream_write_data(h2_stream *stream, const char *data, size_t len, int eos) { + conn_rec *c = stream->session->c; apr_status_t status = APR_SUCCESS; AP_DEBUG_ASSERT(stream); + if (!stream->input) { + return APR_EOF; + } if (input_closed(stream) || !stream->request->eoh) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c, + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, "h2_stream(%ld-%d): writing denied, closed=%d, eoh=%d", stream->session->id, stream->id, input_closed(stream), stream->request->eoh); return APR_EINVAL; } - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c, + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, "h2_stream(%ld-%d): add %ld input bytes", stream->session->id, stream->id, (long)len); if (!stream->request->chunked) { stream->input_remaining -= len; if (stream->input_remaining < 0) { - ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c, + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, c, APLOGNO(02961) "h2_stream(%ld-%d): got %ld more content bytes than announced " "in content-length header: %ld", @@ -370,10 +357,18 @@ apr_status_t h2_stream_write_data(h2_stream *stream, } } - status = h2_mplx_in_write(stream->session->mplx, stream->id, data, len, eos); + if (!stream->tmp) { + stream->tmp = apr_brigade_create(stream->pool, c->bucket_alloc); + } + apr_brigade_write(stream->tmp, NULL, NULL, data, len); if (eos) { + APR_BRIGADE_INSERT_TAIL(stream->tmp, + apr_bucket_eos_create(c->bucket_alloc)); close_input(stream); } + + status = h2_beam_send(stream->input, stream->tmp, APR_BLOCK_READ); + apr_brigade_cleanup(stream->tmp); return status; } @@ -392,44 +387,122 @@ int h2_stream_is_suspended(const h2_stream *stream) return stream->suspended; } -apr_status_t h2_stream_out_prepare(h2_stream *stream, +static apr_status_t fill_buffer(h2_stream *stream, apr_size_t amount) +{ + if (!stream->output) { + return APR_EOF; + } + return h2_beam_receive(stream->output, stream->buffer, + APR_NONBLOCK_READ, amount); +} + +apr_status_t h2_stream_set_response(h2_stream *stream, h2_response *response, + h2_bucket_beam *output) +{ + apr_status_t status = APR_SUCCESS; + conn_rec *c = stream->session->c; + + if (!output_open(stream)) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, + "h2_stream(%ld-%d): output closed", + stream->session->id, stream->id); + return APR_ECONNRESET; + } + + stream->response = response; + stream->output = output; + stream->buffer = apr_brigade_create(stream->pool, c->bucket_alloc); + + h2_stream_filter(stream); + if (stream->output) { + status = fill_buffer(stream, 0); + } + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c, + "h2_stream(%ld-%d): set_response(%d)", + stream->session->id, stream->id, + stream->response->http_status); + return status; +} + +apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen, int *peos) { + conn_rec *c = stream->session->c; + apr_status_t status = APR_SUCCESS; + apr_off_t requested = (*plen > 0)? *plen : 32*1024; + if (stream->rst_error) { *plen = 0; *peos = 1; return APR_ECONNRESET; } - AP_DEBUG_ASSERT(stream->sos); - return stream->sos->prepare(stream->sos, plen, peos); + H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "h2_stream_out_prepare_pre"); + h2_util_bb_avail(stream->buffer, plen, peos); + if (!*peos && !*plen) { + /* try to get more data */ + status = fill_buffer(stream, H2MIN(requested, 32*1024)); + if (APR_STATUS_IS_EOF(status)) { + apr_bucket *eos = apr_bucket_eos_create(c->bucket_alloc); + APR_BRIGADE_INSERT_TAIL(stream->buffer, eos); + status = APR_SUCCESS; + } + h2_util_bb_avail(stream->buffer, plen, peos); + } + H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "h2_stream_out_prepare_post"); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c, + "h2_stream(%ld-%d): prepare, len=%ld eos=%d, trailers=%s", + c->id, stream->id, (long)*plen, *peos, + (stream->response && stream->response->trailers)? + "yes" : "no"); + if (!*peos && !*plen && status == APR_SUCCESS) { + return APR_EAGAIN; + } + return status; } + apr_status_t h2_stream_readx(h2_stream *stream, h2_io_data_cb *cb, void *ctx, apr_off_t *plen, int *peos) { + conn_rec *c = stream->session->c; + apr_status_t status = APR_SUCCESS; + if (stream->rst_error) { return APR_ECONNRESET; } - if (!stream->sos) { - return APR_EGENERAL; + status = h2_util_bb_readx(stream->buffer, cb, ctx, plen, peos); + if (status == APR_SUCCESS && !*peos && !*plen) { + status = APR_EAGAIN; } - return stream->sos->readx(stream->sos, cb, ctx, plen, peos); + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, c, + "h2_stream(%ld-%d): readx, len=%ld eos=%d", + c->id, stream->id, (long)*plen, *peos); + return status; } + apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb, apr_off_t *plen, int *peos) { + conn_rec *c = stream->session->c; + apr_status_t status = APR_SUCCESS; + if (stream->rst_error) { return APR_ECONNRESET; } - if (!stream->sos) { - return APR_EGENERAL; + status = h2_append_brigade(bb, stream->buffer, plen, peos); + if (status == APR_SUCCESS && !*peos && !*plen) { + status = APR_EAGAIN; } - return stream->sos->read_to(stream->sos, bb, plen, peos); + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, c, + "h2_stream(%ld-%d): read_to, len=%ld eos=%d", + c->id, stream->id, (long)*plen, *peos); + return status; } + int h2_stream_input_is_open(const h2_stream *stream) { return input_open(stream); @@ -474,7 +547,7 @@ apr_status_t h2_stream_submit_pushes(h2_stream *stream) apr_table_t *h2_stream_get_trailers(h2_stream *stream) { - return stream->sos? stream->sos->get_trailers(stream->sos) : NULL; + return stream->response? stream->response->trailers : NULL; } const h2_priority *h2_stream_get_priority(h2_stream *stream) @@ -491,147 +564,3 @@ const h2_priority *h2_stream_get_priority(h2_stream *stream) return NULL; } -/******************************************************************************* - * h2_sos_mplx - ******************************************************************************/ - -typedef struct h2_sos_mplx { - h2_mplx *m; - apr_bucket_brigade *bb; - apr_bucket_brigade *tmp; - apr_table_t *trailers; - apr_off_t buffer_size; -} h2_sos_mplx; - -#define H2_SOS_MPLX_OUT(lvl,msos,msg) \ - do { \ - if (APLOG_C_IS_LEVEL((msos)->m->c,lvl)) \ - h2_util_bb_log((msos)->m->c,(msos)->m->id,lvl,msg,(msos)->bb); \ - } while(0) - - -static apr_status_t mplx_transfer(h2_sos_mplx *msos, int stream_id, - apr_pool_t *pool) -{ - apr_status_t status; - apr_table_t *trailers = NULL; - - if (!msos->tmp) { - msos->tmp = apr_brigade_create(msos->bb->p, msos->bb->bucket_alloc); - } - status = h2_mplx_out_get_brigade(msos->m, stream_id, msos->tmp, - msos->buffer_size-1, &trailers); - if (!APR_BRIGADE_EMPTY(msos->tmp)) { - h2_transfer_brigade(msos->bb, msos->tmp, pool); - } - if (trailers) { - msos->trailers = trailers; - } - return status; -} - -static apr_status_t h2_sos_mplx_read_to(h2_sos *sos, apr_bucket_brigade *bb, - apr_off_t *plen, int *peos) -{ - h2_sos_mplx *msos = sos->ctx; - apr_status_t status; - - status = h2_append_brigade(bb, msos->bb, plen, peos); - if (status == APR_SUCCESS && !*peos && !*plen) { - status = APR_EAGAIN; - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, msos->m->c, - "h2_stream(%ld-%d): read_to, len=%ld eos=%d", - msos->m->id, sos->stream->id, (long)*plen, *peos); - } - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, msos->m->c, - "h2_stream(%ld-%d): read_to, len=%ld eos=%d", - msos->m->id, sos->stream->id, (long)*plen, *peos); - return status; -} - -static apr_status_t h2_sos_mplx_readx(h2_sos *sos, h2_io_data_cb *cb, void *ctx, - apr_off_t *plen, int *peos) -{ - h2_sos_mplx *msos = sos->ctx; - apr_status_t status = APR_SUCCESS; - - status = h2_util_bb_readx(msos->bb, cb, ctx, plen, peos); - if (status == APR_SUCCESS && !*peos && !*plen) { - status = APR_EAGAIN; - } - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, msos->m->c, - "h2_stream(%ld-%d): readx, len=%ld eos=%d", - msos->m->id, sos->stream->id, (long)*plen, *peos); - return status; -} - -static apr_status_t h2_sos_mplx_prepare(h2_sos *sos, apr_off_t *plen, int *peos) -{ - h2_sos_mplx *msos = sos->ctx; - apr_status_t status = APR_SUCCESS; - - H2_SOS_MPLX_OUT(APLOG_TRACE2, msos, "h2_sos_mplx prepare_pre"); - - if (APR_BRIGADE_EMPTY(msos->bb)) { - status = mplx_transfer(msos, sos->stream->id, sos->stream->pool); - } - h2_util_bb_avail(msos->bb, plen, peos); - - H2_SOS_MPLX_OUT(APLOG_TRACE2, msos, "h2_sos_mplx prepare_post"); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, msos->m->c, - "h2_stream(%ld-%d): prepare, len=%ld eos=%d, trailers=%s", - msos->m->id, sos->stream->id, (long)*plen, *peos, - msos->trailers? "yes" : "no"); - if (!*peos && !*plen) { - status = APR_EAGAIN; - } - - return status; -} - -static apr_table_t *h2_sos_mplx_get_trailers(h2_sos *sos) -{ - h2_sos_mplx *msos = sos->ctx; - - return msos->trailers; -} - -static apr_status_t h2_sos_mplx_buffer(h2_sos *sos, apr_bucket_brigade *bb) -{ - h2_sos_mplx *msos = sos->ctx; - apr_status_t status = APR_SUCCESS; - - if (bb && !APR_BRIGADE_EMPTY(bb)) { - H2_SOS_MPLX_OUT(APLOG_TRACE2, msos, "h2_sos_mplx set_response_pre"); - status = mplx_transfer(msos, sos->stream->id, sos->stream->pool); - H2_SOS_MPLX_OUT(APLOG_TRACE2, msos, "h2_sos_mplx set_response_post"); - } - return status; -} - -static h2_sos *h2_sos_mplx_create(h2_stream *stream, h2_response *response) -{ - h2_sos *sos; - h2_sos_mplx *msos; - - msos = apr_pcalloc(stream->pool, sizeof(*msos)); - msos->m = stream->session->mplx; - msos->bb = apr_brigade_create(stream->pool, msos->m->c->bucket_alloc); - msos->buffer_size = 32 * 1024; - - sos = apr_pcalloc(stream->pool, sizeof(*sos)); - sos->stream = stream; - sos->response = response; - - sos->ctx = msos; - sos->buffer = h2_sos_mplx_buffer; - sos->prepare = h2_sos_mplx_prepare; - sos->readx = h2_sos_mplx_readx; - sos->read_to = h2_sos_mplx_read_to; - sos->get_trailers = h2_sos_mplx_get_trailers; - - sos->response = response; - - return sos; -} - diff --git a/modules/http2/h2_stream.h b/modules/http2/h2_stream.h index 6f142d9dc0..c851f7e64b 100644 --- a/modules/http2/h2_stream.h +++ b/modules/http2/h2_stream.h @@ -38,6 +38,7 @@ struct h2_request; struct h2_response; struct h2_session; struct h2_sos; +struct h2_bucket_beam; typedef struct h2_stream h2_stream; @@ -48,8 +49,14 @@ struct h2_stream { apr_pool_t *pool; /* the memory pool for this stream */ struct h2_request *request; /* the request made in this stream */ + struct h2_bucket_beam *input; + + struct h2_response *response; + struct h2_bucket_beam *output; + apr_bucket_brigade *buffer; + apr_bucket_brigade *tmp; + int rst_error; /* stream error for RST_STREAM */ - unsigned int aborted : 1; /* was aborted */ unsigned int suspended : 1; /* DATA sending has been suspended */ unsigned int scheduled : 1; /* stream has been scheduled */ @@ -57,7 +64,6 @@ struct h2_stream { apr_off_t input_remaining; /* remaining bytes on input as advertised via content-length */ - struct h2_sos *sos; /* stream output source, e.g. to read output from */ apr_off_t data_frames_sent; /* # of DATA frames sent out for this stream */ }; @@ -75,12 +81,14 @@ h2_stream *h2_stream_open(int id, apr_pool_t *pool, struct h2_session *session, int initiated_on, const struct h2_request *req); /** - * Destroy any resources held by this stream. Will destroy memory pool - * if still owned by the stream. - * - * @param stream the stream to destroy + * Cleanup any resources still held by the stream, called by last bucket. + */ +void h2_stream_eos_destroy(h2_stream *stream); + +/** + * Destroy memory pool if still owned by the stream. */ -apr_status_t h2_stream_destroy(h2_stream *stream); +void h2_stream_destroy(h2_stream *stream); /** * Removes stream from h2_session and destroys it. @@ -179,7 +187,7 @@ struct h2_response *h2_stream_get_response(h2_stream *stream); */ apr_status_t h2_stream_set_response(h2_stream *stream, struct h2_response *response, - apr_bucket_brigade *bb); + struct h2_bucket_beam *output); /** * Do a speculative read on the stream output to determine the diff --git a/modules/http2/h2_task.c b/modules/http2/h2_task.c index e764fcc832..ba67b4198f 100644 --- a/modules/http2/h2_task.c +++ b/modules/http2/h2_task.c @@ -33,6 +33,7 @@ #include <scoreboard.h> #include "h2_private.h" +#include "h2_bucket_beam.h" #include "h2_conn.h" #include "h2_config.h" #include "h2_ctx.h" @@ -42,12 +43,344 @@ #include "h2_request.h" #include "h2_session.h" #include "h2_stream.h" -#include "h2_task_input.h" -#include "h2_task_output.h" #include "h2_task.h" -#include "h2_ctx.h" #include "h2_worker.h" +#include "h2_util.h" + +/******************************************************************************* + * task input handling + ******************************************************************************/ + +static int input_ser_header(void *ctx, const char *name, const char *value) +{ + h2_task *task = ctx; + apr_brigade_printf(task->input.bb, NULL, NULL, "%s: %s\r\n", name, value); + return 1; +} + +static apr_status_t input_append_eos(h2_task *task, request_rec *r) +{ + apr_status_t status = APR_SUCCESS; + apr_bucket_brigade *bb = task->input.bb; + apr_table_t *t = task->request->trailers; + + if (task->input.chunked) { + if (t && !apr_is_empty_table(t)) { + status = apr_brigade_puts(bb, NULL, NULL, "0\r\n"); + apr_table_do(input_ser_header, task, t, NULL); + status = apr_brigade_puts(bb, NULL, NULL, "\r\n"); + } + else { + status = apr_brigade_puts(bb, NULL, NULL, "0\r\n\r\n"); + } + } + else if (r && t && !apr_is_empty_table(t)){ + /* trailers passed in directly. */ + apr_table_overlap(r->trailers_in, t, APR_OVERLAP_TABLES_SET); + } + task->input.eos_written = 1; + APR_BRIGADE_INSERT_TAIL(bb, apr_bucket_eos_create(bb->bucket_alloc)); + return status; +} + +static apr_status_t input_read(h2_task *task, ap_filter_t* f, + apr_bucket_brigade* bb, ap_input_mode_t mode, + apr_read_type_e block, apr_off_t readbytes) +{ + apr_status_t status = APR_SUCCESS; + apr_bucket *b, *next; + apr_off_t bblen = 0; + + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, + "h2_task(%s): read, mode=%d, block=%d, readbytes=%ld", + task->id, mode, block, (long)readbytes); + + if (mode == AP_MODE_INIT) { + return ap_get_brigade(f->c->input_filters, bb, mode, block, readbytes); + } + + if (f->c->aborted) { + return APR_ECONNABORTED; + } + + if (task->input.bb) { + /* Cleanup brigades from those nasty 0 length non-meta buckets + * that apr_brigade_split_line() sometimes produces. */ + for (b = APR_BRIGADE_FIRST(task->input.bb); + b != APR_BRIGADE_SENTINEL(task->input.bb); b = next) { + next = APR_BUCKET_NEXT(b); + if (b->length == 0 && !APR_BUCKET_IS_METADATA(b)) { + apr_bucket_delete(b); + } + } + apr_brigade_length(task->input.bb, 0, &bblen); + } + + if (bblen == 0) { + if (task->input.eos_written) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, APR_EOF, f->c, + "h2_task(%s): read no data", task->id); + return APR_EOF; + } + else if (task->input.eos) { + input_append_eos(task, f->r); + } + } + + while (APR_BRIGADE_EMPTY(task->input.bb)) { + /* Get more input data for our request. */ + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, + "h2_task(%s): get more data from mplx, block=%d, " + "readbytes=%ld, queued=%ld", + task->id, block, (long)readbytes, (long)bblen); + + /* Override the block mode we get called with depending on the input's + * setting. */ + if (task->input.beam) { + status = h2_beam_receive(task->input.beam, task->input.bb, block, + H2MIN(readbytes, 32*1024)); + } + else { + status = APR_EOF; + } + + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, + "h2_task(%s): read returned", task->id); + if (APR_STATUS_IS_EAGAIN(status) + && (mode == AP_MODE_GETLINE || block == APR_BLOCK_READ)) { + /* chunked input handling does not seem to like it if we + * return with APR_EAGAIN from a GETLINE read... + * upload 100k test on test-ser.example.org hangs */ + status = APR_SUCCESS; + } + else if (APR_STATUS_IS_EOF(status) && !task->input.eos_written) { + task->input.eos = 1; + } + else if (status != APR_SUCCESS) { + return status; + } + + apr_brigade_length(task->input.bb, 0, &bblen); + if (bblen > 0 && task->input.chunked) { + /* need to add chunks since request processing expects it */ + char buffer[128]; + apr_bucket *b; + int len; + + len = apr_snprintf(buffer, H2_ALEN(buffer), "%lx\r\n", + (unsigned long)bblen); + b = apr_bucket_heap_create(buffer, len, NULL, + task->input.bb->bucket_alloc); + APR_BRIGADE_INSERT_HEAD(task->input.bb, b); + status = apr_brigade_puts(task->input.bb, NULL, NULL, "\r\n"); + } + + if (h2_util_has_eos(task->input.bb, -1)) { + task->input.eos = 1; + } + + if (task->input.eos && !task->input.eos_written) { + input_append_eos(task, f->r); + } + + if (h2_task_logio_add_bytes_in) { + h2_task_logio_add_bytes_in(f->c, bblen); + } + } + + h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2, + "task_input.bb", task->input.bb); + + if (APR_BRIGADE_EMPTY(task->input.bb)) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, + "h2_task(%s): no data", task->id); + return (block == APR_NONBLOCK_READ)? APR_EAGAIN : APR_EOF; + } + + if (mode == AP_MODE_EXHAUSTIVE) { + /* return all we have */ + APR_BRIGADE_CONCAT(bb, task->input.bb); + } + else if (mode == AP_MODE_READBYTES) { + status = h2_brigade_concat_length(bb, task->input.bb, readbytes); + } + else if (mode == AP_MODE_SPECULATIVE) { + status = h2_brigade_copy_length(bb, task->input.bb, readbytes); + } + else if (mode == AP_MODE_GETLINE) { + /* we are reading a single LF line, e.g. the HTTP headers. + * this has the nasty side effect to split the bucket, even + * though it ends with CRLF and creates a 0 length bucket */ + status = apr_brigade_split_line(bb, task->input.bb, block, + HUGE_STRING_LEN); + if (APLOGctrace1(f->c)) { + char buffer[1024]; + apr_size_t len = sizeof(buffer)-1; + apr_brigade_flatten(bb, buffer, &len); + buffer[len] = 0; + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, + "h2_task(%s): getline: %s", + task->id, buffer); + } + } + else { + /* Hmm, well. There is mode AP_MODE_EATCRLF, but we chose not + * to support it. Seems to work. */ + ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOTIMPL, f->c, + APLOGNO(02942) + "h2_task, unsupported READ mode %d", mode); + status = APR_ENOTIMPL; + } + + if (APLOGctrace1(f->c)) { + apr_brigade_length(bb, 0, &bblen); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, + "h2_task(%s): return %ld data bytes", + task->id, (long)bblen); + } + return status; +} +/******************************************************************************* + * task input handling + ******************************************************************************/ + +static apr_status_t open_response(h2_task *task) +{ + h2_response *response; + response = h2_from_h1_get_response(task->output.from_h1); + if (!response) { + /* This happens currently when ap_die(status, r) is invoked + * by a read request filter. */ + ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, task->c, APLOGNO(03204) + "h2_task(%s): write without response for %s %s %s", + task->id, + task->request->method, + task->request->authority, + task->request->path); + task->c->aborted = 1; + return APR_ECONNABORTED; + } + + if (h2_task_logio_add_bytes_out) { + /* count headers as if we'd do a HTTP/1.1 serialization */ + task->output.written = h2_util_table_bytes(response->headers, 3)+1; + h2_task_logio_add_bytes_out(task->c, task->output.written); + } + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->c, APLOGNO(03348) + "h2_task(%s): open response to %s %s %s", + task->id, task->request->method, + task->request->authority, + task->request->path); + return h2_mplx_out_open(task->mplx, task->stream_id, + response, task->output.beam); +} + +static apr_status_t send_out(h2_task *task, apr_bucket_brigade* bb) +{ + apr_off_t written, left; + apr_status_t status; + + apr_brigade_length(bb, 0, &written); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c, + "h2_task(%s): write response body (%ld bytes)", + task->id, (long)written); + + status = h2_beam_send(task->output.beam, bb, + task->blocking? APR_BLOCK_READ + : APR_NONBLOCK_READ); + if (APR_STATUS_IS_EAGAIN(status)) { + apr_brigade_length(bb, 0, &left); + written -= left; + status = APR_SUCCESS; + } + if (status == APR_SUCCESS) { + task->output.written += written; + if (h2_task_logio_add_bytes_out) { + h2_task_logio_add_bytes_out(task->c, written); + } + } + return status; +} + +/* Bring the data from the brigade (which represents the result of the + * request_rec out filter chain) into the h2_mplx for further sending + * on the master connection. + */ +static apr_status_t output_write(h2_task *task, ap_filter_t* f, + apr_bucket_brigade* bb) +{ + apr_bucket *b; + apr_status_t status = APR_SUCCESS; + + if (APR_BRIGADE_EMPTY(bb)) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c, + "h2_task(%s): empty write", task->id); + return APR_SUCCESS; + } + + if (task->frozen) { + h2_util_bb_log(task->c, task->stream_id, APLOG_TRACE2, + "frozen task output write, ignored", bb); + while (!APR_BRIGADE_EMPTY(bb)) { + b = APR_BRIGADE_FIRST(bb); + if (AP_BUCKET_IS_EOR(b)) { + APR_BUCKET_REMOVE(b); + task->eor = b; + } + else { + apr_bucket_delete(b); + } + } + return APR_SUCCESS; + } + + if (!task->output.beam) { + h2_beam_create(&task->output.beam, task->pool, + task->stream_id, "output", 0); + } + + /* Attempt to write saved brigade first */ + if (task->output.bb && !APR_BRIGADE_EMPTY(task->output.bb)) { + status = send_out(task, task->output.bb); + if (status != APR_SUCCESS) { + return status; + } + } + + /* If there is nothing saved (anymore), try to write the brigade passed */ + if ((!task->output.bb || APR_BRIGADE_EMPTY(task->output.bb)) && !APR_BRIGADE_EMPTY(bb)) { + status = send_out(task, bb); + if (status != APR_SUCCESS) { + return status; + } + } + + /* If the passed brigade is not empty, save it before return */ + if (!APR_BRIGADE_EMPTY(bb)) { + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, task->c, + "h2_task(%s): could not write all, saving brigade", + task->id); + if (!task->output.bb) { + task->output.bb = apr_brigade_create(task->pool, + task->c->bucket_alloc); + } + return ap_save_brigade(f, &task->output.bb, &bb, task->pool); + } + + if (!task->output.response_open) { + /* data is in the output beam, if we have not opened the response, + * do so now. */ + status = open_response(task); + task->output.response_open = 1; + } + + return status; +} + +/******************************************************************************* + * task slave connection filters + ******************************************************************************/ static apr_status_t h2_filter_stream_input(ap_filter_t* filter, apr_bucket_brigade* brigade, @@ -57,11 +390,7 @@ static apr_status_t h2_filter_stream_input(ap_filter_t* filter, { h2_task *task = filter->ctx; AP_DEBUG_ASSERT(task); - if (!task->input) { - return APR_ECONNABORTED; - } - return h2_task_input_read(task->input, filter, brigade, - mode, block, readbytes); + return input_read(task, filter, brigade, mode, block, readbytes); } static apr_status_t h2_filter_stream_output(ap_filter_t* filter, @@ -69,10 +398,7 @@ static apr_status_t h2_filter_stream_output(ap_filter_t* filter, { h2_task *task = filter->ctx; AP_DEBUG_ASSERT(task); - if (!task->output) { - return APR_ECONNABORTED; - } - return h2_task_output_write(task->output, filter, brigade); + return output_write(task, filter, brigade); } static apr_status_t h2_filter_read_response(ap_filter_t* f, @@ -80,10 +406,10 @@ static apr_status_t h2_filter_read_response(ap_filter_t* f, { h2_task *task = f->ctx; AP_DEBUG_ASSERT(task); - if (!task->output || !task->output->from_h1) { + if (!task->output.from_h1) { return APR_ECONNABORTED; } - return h2_from_h1_read_response(task->output->from_h1, f, bb); + return h2_from_h1_read_response(task->output.from_h1, f, bb); } /******************************************************************************* @@ -123,8 +449,8 @@ apr_status_t h2_task_init(apr_pool_t *pool, server_rec *s) return APR_SUCCESS; } -h2_task *h2_task_create(long session_id, const h2_request *req, - conn_rec *c, h2_mplx *mplx) +h2_task *h2_task_create(conn_rec *c, const h2_request *req, + h2_bucket_beam *input, h2_mplx *mplx) { apr_pool_t *pool; h2_task *task; @@ -134,21 +460,23 @@ h2_task *h2_task_create(long session_id, const h2_request *req, if (task == NULL) { ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, c, APLOGNO(02941) "h2_task(%ld-%d): create stream task", - session_id, req->id); + c->id, req->id); h2_mplx_out_close(mplx, req->id); return NULL; } - task->id = apr_psprintf(pool, "%ld-%d", session_id, req->id); + task->id = apr_psprintf(pool, "%ld-%d", c->id, req->id); task->stream_id = req->id; task->c = c; task->mplx = mplx; task->c->keepalives = mplx->c->keepalives; task->pool = pool; task->request = req; - task->input_eos = !req->body; task->ser_headers = req->serialize; task->blocking = 1; + task->input.beam = input; + + apr_thread_cond_create(&task->cond, pool); h2_ctx_create_for(c, task); /* Add our own, network level in- and output filters. */ @@ -162,6 +490,9 @@ void h2_task_destroy(h2_task *task) { ap_remove_input_filter_byhandle(task->c->input_filters, "H2_TO_H1"); ap_remove_output_filter_byhandle(task->c->output_filters, "H1_TO_H2"); + if (task->eor) { + apr_bucket_destroy(task->eor); + } if (task->pool) { apr_pool_destroy(task->pool); } @@ -172,14 +503,39 @@ void h2_task_set_io_blocking(h2_task *task, int blocking) task->blocking = blocking; } -apr_status_t h2_task_do(h2_task *task, apr_thread_cond_t *cond) +apr_status_t h2_task_do(h2_task *task) { apr_status_t status; AP_DEBUG_ASSERT(task); - task->io = cond; - task->input = h2_task_input_create(task, task->c); - task->output = h2_task_output_create(task, task->c); + + task->input.block = APR_BLOCK_READ; + task->input.chunked = task->request->chunked; + task->input.eos = !task->request->body; + if (task->input.eos && !task->input.chunked && !task->ser_headers) { + /* We do not serialize/chunk and have eos already, no need to + * create a bucket brigade. */ + task->input.bb = NULL; + task->input.eos_written = 1; + } + else { + task->input.bb = apr_brigade_create(task->pool, task->c->bucket_alloc); + if (task->ser_headers) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c, + "h2_task(%s): serialize request %s %s", + task->id, task->request->method, task->request->path); + apr_brigade_printf(task->input.bb, NULL, + NULL, "%s %s HTTP/1.1\r\n", + task->request->method, task->request->path); + apr_table_do(input_ser_header, task, task->request->headers, NULL); + apr_brigade_puts(task->input.bb, NULL, NULL, "\r\n"); + } + if (task->input.eos) { + input_append_eos(task, NULL); + } + } + + task->output.from_h1 = h2_from_h1_create(task->stream_id, task->pool); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c, "h2_task(%s): process connection", task->id); @@ -195,6 +551,7 @@ apr_status_t h2_task_do(h2_task *task, apr_thread_cond_t *cond) else { ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c, "h2_task(%s): processing done", task->id); + h2_mplx_out_close(task->mplx, task->stream_id); status = APR_SUCCESS; } diff --git a/modules/http2/h2_task.h b/modules/http2/h2_task.h index 15a1d3cb2c..450763ee40 100644 --- a/modules/http2/h2_task.h +++ b/modules/http2/h2_task.h @@ -38,6 +38,7 @@ */ struct apr_thread_cond_t; +struct h2_bucket_beam; struct h2_conn; struct h2_mplx; struct h2_task; @@ -55,29 +56,42 @@ struct h2_task { struct h2_mplx *mplx; apr_pool_t *pool; const struct h2_request *request; + apr_bucket *eor; + struct apr_thread_cond_t *cond; unsigned int filters_set : 1; - unsigned int input_eos : 1; unsigned int ser_headers : 1; unsigned int frozen : 1; unsigned int blocking : 1; unsigned int detached : 1; - struct h2_task_input *input; - struct h2_task_output *output; - struct apr_thread_cond_t *io; /* used to wait for events on */ + struct { + struct h2_bucket_beam *beam; + apr_bucket_brigade *bb; + apr_read_type_e block; + unsigned int chunked : 1; + unsigned int eos : 1; + unsigned int eos_written : 1; + } input; + struct { + struct h2_bucket_beam *beam; + struct h2_from_h1 *from_h1; + unsigned int response_open : 1; + apr_off_t written; + apr_bucket_brigade *bb; + } output; struct h2_req_engine *engine; /* engine hosted by this task */ struct h2_req_engine *assigned; /* engine that task has been assigned to */ request_rec *r; /* request being processed in this task */ }; -h2_task *h2_task_create(long session_id, const struct h2_request *req, - conn_rec *c, struct h2_mplx *mplx); +h2_task *h2_task_create(conn_rec *c, const struct h2_request *req, + struct h2_bucket_beam *input, struct h2_mplx *mplx); void h2_task_destroy(h2_task *task); -apr_status_t h2_task_do(h2_task *task, struct apr_thread_cond_t *cond); +apr_status_t h2_task_do(h2_task *task); void h2_task_register_hooks(void); /* diff --git a/modules/http2/h2_task_input.c b/modules/http2/h2_task_input.c deleted file mode 100644 index 3993b6b40c..0000000000 --- a/modules/http2/h2_task_input.c +++ /dev/null @@ -1,228 +0,0 @@ -/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <assert.h> - -#include <httpd.h> -#include <http_core.h> -#include <http_log.h> -#include <http_connection.h> - -#include "h2_private.h" -#include "h2_conn.h" -#include "h2_mplx.h" -#include "h2_request.h" -#include "h2_session.h" -#include "h2_stream.h" -#include "h2_task_input.h" -#include "h2_task.h" -#include "h2_util.h" - - -static int is_aborted(ap_filter_t *f) -{ - return (f->c->aborted); -} - -static int ser_header(void *ctx, const char *name, const char *value) -{ - h2_task_input *input = (h2_task_input*)ctx; - apr_brigade_printf(input->bb, NULL, NULL, "%s: %s\r\n", name, value); - return 1; -} - -h2_task_input *h2_task_input_create(h2_task *task, conn_rec *c) -{ - h2_task_input *input = apr_pcalloc(task->pool, sizeof(h2_task_input)); - if (input) { - input->task = task; - input->bb = NULL; - input->block = APR_BLOCK_READ; - - if (task->ser_headers) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, - "h2_task_input(%s): serialize request %s %s", - task->id, task->request->method, task->request->path); - input->bb = apr_brigade_create(task->pool, c->bucket_alloc); - apr_brigade_printf(input->bb, NULL, NULL, "%s %s HTTP/1.1\r\n", - task->request->method, task->request->path); - apr_table_do(ser_header, input, task->request->headers, NULL); - apr_brigade_puts(input->bb, NULL, NULL, "\r\n"); - if (input->task->input_eos) { - APR_BRIGADE_INSERT_TAIL(input->bb, apr_bucket_eos_create(c->bucket_alloc)); - } - } - else if (!input->task->input_eos) { - input->bb = apr_brigade_create(task->pool, c->bucket_alloc); - } - else { - /* We do not serialize and have eos already, no need to - * create a bucket brigade. */ - } - } - return input; -} - -void h2_task_input_block_set(h2_task_input *input, apr_read_type_e block) -{ - input->block = block; -} - -apr_status_t h2_task_input_read(h2_task_input *input, - ap_filter_t* f, - apr_bucket_brigade* bb, - ap_input_mode_t mode, - apr_read_type_e block, - apr_off_t readbytes) -{ - apr_status_t status = APR_SUCCESS; - apr_off_t bblen = 0; - - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, - "h2_task_input(%s): read, block=%d, mode=%d, readbytes=%ld", - input->task->id, block, mode, (long)readbytes); - - if (mode == AP_MODE_INIT) { - return ap_get_brigade(f->c->input_filters, bb, mode, block, readbytes); - } - - if (is_aborted(f)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, - "h2_task_input(%s): is aborted", input->task->id); - return APR_ECONNABORTED; - } - - if (input->bb) { - status = apr_brigade_length(input->bb, 1, &bblen); - if (status != APR_SUCCESS) { - ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, f->c, - APLOGNO(02958) "h2_task_input(%s): brigade length fail", - input->task->id); - return status; - } - } - - if ((bblen == 0) && input->task->input_eos) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, - "h2_task_input(%s): eos", input->task->id); - return APR_EOF; - } - - while (bblen == 0) { - /* Get more data for our stream from mplx. - */ - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, - "h2_task_input(%s): get more data from mplx, block=%d, " - "readbytes=%ld, queued=%ld", - input->task->id, block, - (long)readbytes, (long)bblen); - - /* Override the block mode we get called with depending on the input's - * setting. - */ - status = h2_mplx_in_read(input->task->mplx, block, - input->task->stream_id, input->bb, - f->r? f->r->trailers_in : NULL, - input->task->io); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, - "h2_task_input(%s): mplx in read returned", - input->task->id); - if (APR_STATUS_IS_EAGAIN(status) - && (mode == AP_MODE_GETLINE || block == APR_BLOCK_READ)) { - /* chunked input handling does not seem to like it if we - * return with APR_EAGAIN from a GETLINE read... - * upload 100k test on test-ser.example.org hangs */ - status = APR_SUCCESS; - } - else if (status != APR_SUCCESS) { - return status; - } - - status = apr_brigade_length(input->bb, 1, &bblen); - if (status != APR_SUCCESS) { - return status; - } - - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, - "h2_task_input(%s): mplx in read, %ld bytes in brigade", - input->task->id, (long)bblen); - if (h2_task_logio_add_bytes_in) { - h2_task_logio_add_bytes_in(f->c, bblen); - } - } - - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, - "h2_task_input(%s): read, mode=%d, block=%d, " - "readbytes=%ld, queued=%ld", - input->task->id, mode, block, - (long)readbytes, (long)bblen); - - if (!APR_BRIGADE_EMPTY(input->bb)) { - if (mode == AP_MODE_EXHAUSTIVE) { - /* return all we have */ - status = h2_util_move(bb, input->bb, readbytes, NULL, - "task_input_read(exhaustive)"); - } - else if (mode == AP_MODE_READBYTES) { - status = h2_util_move(bb, input->bb, readbytes, NULL, - "task_input_read(readbytes)"); - } - else if (mode == AP_MODE_SPECULATIVE) { - /* return not more than was asked for */ - status = h2_util_copy(bb, input->bb, readbytes, - "task_input_read(speculative)"); - } - else if (mode == AP_MODE_GETLINE) { - /* we are reading a single LF line, e.g. the HTTP headers */ - status = apr_brigade_split_line(bb, input->bb, block, - HUGE_STRING_LEN); - if (APLOGctrace1(f->c)) { - char buffer[1024]; - apr_size_t len = sizeof(buffer)-1; - apr_brigade_flatten(bb, buffer, &len); - buffer[len] = 0; - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, - "h2_task_input(%s): getline: %s", - input->task->id, buffer); - } - } - else { - /* Hmm, well. There is mode AP_MODE_EATCRLF, but we chose not - * to support it. Seems to work. */ - ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOTIMPL, f->c, - APLOGNO(02942) - "h2_task_input, unsupported READ mode %d", mode); - status = APR_ENOTIMPL; - } - - if (APLOGctrace1(f->c)) { - apr_brigade_length(bb, 0, &bblen); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, - "h2_task_input(%s): return %ld data bytes", - input->task->id, (long)bblen); - } - return status; - } - - if (is_aborted(f)) { - return APR_ECONNABORTED; - } - - status = (block == APR_NONBLOCK_READ)? APR_EAGAIN : APR_EOF; - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, - "h2_task_input(%s): no data", input->task->id); - return status; -} - diff --git a/modules/http2/h2_task_input.h b/modules/http2/h2_task_input.h deleted file mode 100644 index c8913cacf2..0000000000 --- a/modules/http2/h2_task_input.h +++ /dev/null @@ -1,46 +0,0 @@ -/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef __mod_h2__h2_task_input__ -#define __mod_h2__h2_task_input__ - -/* h2_task_input places the HEADER+DATA, formatted in HTTP/1.1, into - * a bucket brigade. The brigade is setup as the input brigade for our - * pseudo httpd conn_rec that is handling a specific h2_task. - */ -struct apr_thread_cond_t; -struct h2_mplx; -struct h2_task; - -typedef struct h2_task_input h2_task_input; -struct h2_task_input { - struct h2_task *task; - apr_bucket_brigade *bb; - apr_read_type_e block; -}; - - -h2_task_input *h2_task_input_create(struct h2_task *task, conn_rec *c); - -apr_status_t h2_task_input_read(h2_task_input *input, - ap_filter_t* filter, - apr_bucket_brigade* brigade, - ap_input_mode_t mode, - apr_read_type_e block, - apr_off_t readbytes); - -void h2_task_input_block_set(h2_task_input *input, apr_read_type_e block); - -#endif /* defined(__mod_h2__h2_task_input__) */ diff --git a/modules/http2/h2_task_output.c b/modules/http2/h2_task_output.c deleted file mode 100644 index 80938d1fcc..0000000000 --- a/modules/http2/h2_task_output.c +++ /dev/null @@ -1,176 +0,0 @@ -/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <assert.h> - -#include <apr_thread_cond.h> -#include <httpd.h> -#include <http_core.h> -#include <http_log.h> -#include <http_connection.h> -#include <http_request.h> - -#include "h2_private.h" -#include "h2_conn.h" -#include "h2_mplx.h" -#include "h2_request.h" -#include "h2_session.h" -#include "h2_stream.h" -#include "h2_from_h1.h" -#include "h2_response.h" -#include "h2_task_output.h" -#include "h2_task.h" -#include "h2_util.h" - - -h2_task_output *h2_task_output_create(h2_task *task, conn_rec *c) -{ - h2_task_output *output = apr_pcalloc(task->pool, sizeof(h2_task_output)); - if (output) { - output->task = task; - output->from_h1 = h2_from_h1_create(task->stream_id, task->pool); - } - return output; -} - -static apr_status_t open_response(h2_task_output *output, ap_filter_t *f, - apr_bucket_brigade *bb, const char *caller) -{ - h2_response *response; - response = h2_from_h1_get_response(output->from_h1); - if (!response) { - if (f) { - /* This happens currently when ap_die(status, r) is invoked - * by a read request filter. */ - ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, output->task->c, APLOGNO(03204) - "h2_task_output(%s): write without response by %s " - "for %s %s %s", - output->task->id, caller, - output->task->request->method, - output->task->request->authority, - output->task->request->path); - output->task->c->aborted = 1; - } - if (output->task->io) { - apr_thread_cond_broadcast(output->task->io); - } - return APR_ECONNABORTED; - } - - if (h2_task_logio_add_bytes_out) { - /* count headers as if we'd do a HTTP/1.1 serialization */ - output->written = h2_util_table_bytes(response->headers, 3)+1; - h2_task_logio_add_bytes_out(output->task->c, output->written); - } - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, output->task->c, APLOGNO(03348) - "h2_task(%s): open response to %s %s %s", - output->task->id, output->task->request->method, - output->task->request->authority, - output->task->request->path); - return h2_mplx_out_open(output->task->mplx, output->task->stream_id, - response, f, bb, output->task->io); -} - -static apr_status_t write_brigade_raw(h2_task_output *output, - ap_filter_t* f, apr_bucket_brigade* bb) -{ - apr_off_t written, left; - apr_status_t status; - - apr_brigade_length(bb, 0, &written); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, output->task->c, - "h2_task(%s): write response body (%ld bytes)", - output->task->id, (long)written); - - status = h2_mplx_out_write(output->task->mplx, output->task->stream_id, - f, output->task->blocking, bb, output->task->io); - if (status == APR_INCOMPLETE) { - apr_brigade_length(bb, 0, &left); - written -= left; - status = APR_SUCCESS; - } - - if (status == APR_SUCCESS) { - output->written += written; - if (h2_task_logio_add_bytes_out) { - h2_task_logio_add_bytes_out(output->task->c, written); - } - } - return status; -} - -/* Bring the data from the brigade (which represents the result of the - * request_rec out filter chain) into the h2_mplx for further sending - * on the master connection. - */ -apr_status_t h2_task_output_write(h2_task_output *output, - ap_filter_t* f, apr_bucket_brigade* bb) -{ - apr_bucket *b; - apr_status_t status = APR_SUCCESS; - - if (APR_BRIGADE_EMPTY(bb)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, output->task->c, - "h2_task(%s): empty write", output->task->id); - return APR_SUCCESS; - } - - if (output->task->frozen) { - h2_util_bb_log(output->task->c, output->task->stream_id, APLOG_TRACE2, - "frozen task output write, ignored", bb); - while (!APR_BRIGADE_EMPTY(bb)) { - b = APR_BRIGADE_FIRST(bb); - if (AP_BUCKET_IS_EOR(b)) { - /* TODO: keep it */ - APR_BUCKET_REMOVE(b); - } - else { - apr_bucket_delete(b); - } - } - return APR_SUCCESS; - } - - if (!output->response_open) { - status = open_response(output, f, bb, "write"); - output->response_open = 1; - } - - /* Attempt to write saved brigade first */ - if (status == APR_SUCCESS && output->bb && !APR_BRIGADE_EMPTY(output->bb)) { - status = write_brigade_raw(output, f, output->bb); - } - - /* If there is nothing saved (anymore), try to write the brigade passed */ - if (status == APR_SUCCESS - && (!output->bb || APR_BRIGADE_EMPTY(output->bb)) - && !APR_BRIGADE_EMPTY(bb)) { - status = write_brigade_raw(output, f, bb); - } - - /* If the passed brigade is not empty, save it before return */ - if (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb)) { - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, output->task->c, - "h2_task(%s): could not write all, saving brigade", - output->task->id); - if (!output->bb) { - output->bb = apr_brigade_create(output->task->pool, output->task->c->bucket_alloc); - } - return ap_save_brigade(f, &output->bb, &bb, output->task->pool); - } - - return status; -} - diff --git a/modules/http2/h2_task_output.h b/modules/http2/h2_task_output.h deleted file mode 100644 index 3135bc459e..0000000000 --- a/modules/http2/h2_task_output.h +++ /dev/null @@ -1,50 +0,0 @@ -/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef __mod_h2__h2_task_output__ -#define __mod_h2__h2_task_output__ - -/* h2_task_output reads a HTTP/1 response from the brigade and applies - * them to a h2_output_converter. The brigade is setup as the output brigade - * for our pseudo httpd conn_rec that is handling a specific h2_task. - * - */ -struct apr_thread_cond_t; -struct h2_mplx; -struct h2_task; -struct h2_from_h1; - -typedef struct h2_task_output h2_task_output; - -struct h2_task_output { - struct h2_task *task; - struct h2_from_h1 *from_h1; - - unsigned int response_open : 1; - - apr_off_t written; - apr_bucket_brigade *bb; -}; - -h2_task_output *h2_task_output_create(struct h2_task *task, conn_rec *c); - -apr_status_t h2_task_output_write(h2_task_output *output, - ap_filter_t* filter, - apr_bucket_brigade* brigade); - -apr_status_t h2_task_output_freeze(h2_task_output *output); -apr_status_t h2_task_output_thaw(h2_task_output *output); - -#endif /* defined(__mod_h2__h2_task_output__) */ diff --git a/modules/http2/h2_util.c b/modules/http2/h2_util.c index 06472425f2..9beaeae048 100644 --- a/modules/http2/h2_util.c +++ b/modules/http2/h2_util.c @@ -332,6 +332,301 @@ void h2_ihash_clear(h2_ihash_t *ih) } /******************************************************************************* + * ilist - sorted list for structs with int identifier + ******************************************************************************/ + +#define h2_ilist_IDX(list, i) ((int**)(list)->elts)[i] + +struct h2_ilist_t { + apr_array_header_t *l; +}; + +h2_ilist_t *h2_ilist_create(apr_pool_t *pool) +{ + h2_ilist_t *list = apr_pcalloc(pool, sizeof(h2_ilist_t)); + if (list) { + list->l = apr_array_make(pool, 100, sizeof(int*)); + if (!list->l) { + return NULL; + } + } + return list; +} + +static int h2_ilist_cmp(const void *s1, const void *s2) +{ + int **pi1 = (int **)s1; + int **pi2 = (int **)s2; + return *(*pi1) - *(*pi2); +} + +void *h2_ilist_get(h2_ilist_t *list, int id) +{ + /* we keep the array sorted by id, so lookup can be done + * by bsearch. + */ + int **pi; + int *pkey = &id; + + pi = bsearch(&pkey, list->l->elts, list->l->nelts, + list->l->elt_size, h2_ilist_cmp); + return pi? *pi : NULL; +} + +static void h2_ilist_sort(h2_ilist_t *list) +{ + qsort(list->l->elts, list->l->nelts, list->l->elt_size, h2_ilist_cmp); +} + +apr_status_t h2_ilist_add(h2_ilist_t *list, void *val) +{ + int *pi = val; + void *existing = h2_ilist_get(list, *pi); + if (!existing) { + int last; + APR_ARRAY_PUSH(list->l, void*) = val; + /* Often, values get added in ascending order of id. We + * keep the array sorted, so we just need to check if the newly + * appended stream has a lower id than the last one. if not, + * sorting is not necessary. + */ + last = list->l->nelts - 1; + if (last > 0 + && *h2_ilist_IDX(list->l, last) < *h2_ilist_IDX(list->l, last-1)) { + h2_ilist_sort(list); + } + } + return APR_SUCCESS; +} + +static void remove_idx(h2_ilist_t *list, int idx) +{ + int n; + --list->l->nelts; + n = list->l->nelts - idx; + if (n > 0) { + /* There are n h2_io* behind idx. Move the rest down */ + int **selts = (int**)list->l->elts; + memmove(selts + idx, selts + idx + 1, n * sizeof(int*)); + } +} + +void *h2_ilist_remove(h2_ilist_t *list, int id) +{ + int i; + for (i = 0; i < list->l->nelts; ++i) { + int *e = h2_ilist_IDX(list->l, i); + if (id == *e) { + remove_idx(list, i); + return e; + } + } + return NULL; +} + +void *h2_ilist_shift(h2_ilist_t *list) +{ + if (list->l->nelts > 0) { + int *pi = h2_ilist_IDX(list->l, 0); + remove_idx(list, 0); + return pi; + } + return NULL; +} + +int h2_ilist_empty(h2_ilist_t *list) +{ + return list->l->nelts == 0; +} + +int h2_ilist_iter(h2_ilist_t *list, h2_ilist_iter_t *iter, void *ctx) +{ + int i; + for (i = 0; i < list->l->nelts; ++i) { + int *pi = h2_ilist_IDX(list->l, i); + if (!iter(ctx, pi)) { + return 0; + } + } + return 1; +} + +apr_size_t h2_ilist_count(h2_ilist_t *list) +{ + return list->l->nelts; +} + +/******************************************************************************* + * iqueue - sorted list of int + ******************************************************************************/ + +static void iq_grow(h2_iqueue *q, int nlen); +static void iq_swap(h2_iqueue *q, int i, int j); +static int iq_bubble_up(h2_iqueue *q, int i, int top, + h2_iq_cmp *cmp, void *ctx); +static int iq_bubble_down(h2_iqueue *q, int i, int bottom, + h2_iq_cmp *cmp, void *ctx); + +h2_iqueue *h2_iq_create(apr_pool_t *pool, int capacity) +{ + h2_iqueue *q = apr_pcalloc(pool, sizeof(h2_iqueue)); + if (q) { + q->pool = pool; + iq_grow(q, capacity); + q->nelts = 0; + } + return q; +} + +int h2_iq_empty(h2_iqueue *q) +{ + return q->nelts == 0; +} + +int h2_iq_count(h2_iqueue *q) +{ + return q->nelts; +} + + +void h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx) +{ + int i; + + if (q->nelts >= q->nalloc) { + iq_grow(q, q->nalloc * 2); + } + + i = (q->head + q->nelts) % q->nalloc; + q->elts[i] = sid; + ++q->nelts; + + if (cmp) { + /* bubble it to the front of the queue */ + iq_bubble_up(q, i, q->head, cmp, ctx); + } +} + +int h2_iq_remove(h2_iqueue *q, int sid) +{ + int i; + for (i = 0; i < q->nelts; ++i) { + if (sid == q->elts[(q->head + i) % q->nalloc]) { + break; + } + } + + if (i < q->nelts) { + ++i; + for (; i < q->nelts; ++i) { + q->elts[(q->head+i-1)%q->nalloc] = q->elts[(q->head+i)%q->nalloc]; + } + --q->nelts; + return 1; + } + return 0; +} + +void h2_iq_clear(h2_iqueue *q) +{ + q->nelts = 0; +} + +void h2_iq_sort(h2_iqueue *q, h2_iq_cmp *cmp, void *ctx) +{ + /* Assume that changes in ordering are minimal. This needs, + * best case, q->nelts - 1 comparisions to check that nothing + * changed. + */ + if (q->nelts > 0) { + int i, ni, prev, last; + + /* Start at the end of the queue and create a tail of sorted + * entries. Make that tail one element longer in each iteration. + */ + last = i = (q->head + q->nelts - 1) % q->nalloc; + while (i != q->head) { + prev = (q->nalloc + i - 1) % q->nalloc; + + ni = iq_bubble_up(q, i, prev, cmp, ctx); + if (ni == prev) { + /* i bubbled one up, bubble the new i down, which + * keeps all tasks below i sorted. */ + iq_bubble_down(q, i, last, cmp, ctx); + } + i = prev; + }; + } +} + + +int h2_iq_shift(h2_iqueue *q) +{ + int sid; + + if (q->nelts <= 0) { + return 0; + } + + sid = q->elts[q->head]; + q->head = (q->head + 1) % q->nalloc; + q->nelts--; + + return sid; +} + +static void iq_grow(h2_iqueue *q, int nlen) +{ + if (nlen > q->nalloc) { + int *nq = apr_pcalloc(q->pool, sizeof(int) * nlen); + if (q->nelts > 0) { + int l = ((q->head + q->nelts) % q->nalloc) - q->head; + + memmove(nq, q->elts + q->head, sizeof(int) * l); + if (l < q->nelts) { + /* elts wrapped, append elts in [0, remain] to nq */ + int remain = q->nelts - l; + memmove(nq + l, q->elts, sizeof(int) * remain); + } + } + q->elts = nq; + q->nalloc = nlen; + q->head = 0; + } +} + +static void iq_swap(h2_iqueue *q, int i, int j) +{ + int x = q->elts[i]; + q->elts[i] = q->elts[j]; + q->elts[j] = x; +} + +static int iq_bubble_up(h2_iqueue *q, int i, int top, + h2_iq_cmp *cmp, void *ctx) +{ + int prev; + while (((prev = (q->nalloc + i - 1) % q->nalloc), i != top) + && (*cmp)(q->elts[i], q->elts[prev], ctx) < 0) { + iq_swap(q, prev, i); + i = prev; + } + return i; +} + +static int iq_bubble_down(h2_iqueue *q, int i, int bottom, + h2_iq_cmp *cmp, void *ctx) +{ + int next; + while (((next = (q->nalloc + i + 1) % q->nalloc), i != bottom) + && (*cmp)(q->elts[i], q->elts[next], ctx) > 0) { + iq_swap(q, next, i); + i = next; + } + return i; +} + +/******************************************************************************* * h2_util for apt_table_t ******************************************************************************/ @@ -368,15 +663,6 @@ apr_size_t h2_util_table_bytes(apr_table_t *t, apr_size_t pair_extra) * h2_util for bucket brigades ******************************************************************************/ -/* DEEP_COPY==0 crashes under load. I think the setaside is fine, - * however buckets moved to another thread will still be - * free'd against the old bucket_alloc. *And* if the old - * pool gets destroyed too early, the bucket disappears while - * still needed. - */ -static const int DEEP_COPY = 1; -static const int FILE_MOVE = 1; - static apr_status_t last_not_included(apr_bucket_brigade *bb, apr_off_t maxlen, int same_alloc, @@ -434,200 +720,95 @@ static apr_status_t last_not_included(apr_bucket_brigade *bb, return status; } -#define LOG_BUCKETS 0 -#define LOG_LEVEL APLOG_INFO - -apr_status_t h2_util_move(apr_bucket_brigade *to, apr_bucket_brigade *from, - apr_off_t maxlen, apr_size_t *pfile_buckets_allowed, - const char *msg) +apr_status_t h2_brigade_concat_length(apr_bucket_brigade *dest, + apr_bucket_brigade *src, + apr_off_t length) { + apr_bucket *b, *next; + apr_off_t remain = length; apr_status_t status = APR_SUCCESS; - int same_alloc; - AP_DEBUG_ASSERT(to); - AP_DEBUG_ASSERT(from); - same_alloc = (to->bucket_alloc == from->bucket_alloc - || to->p == from->p); - - if (!FILE_MOVE) { - pfile_buckets_allowed = NULL; - } - - if (!APR_BRIGADE_EMPTY(from)) { - apr_bucket *b, *end; + for (b = APR_BRIGADE_FIRST(src); + b != APR_BRIGADE_SENTINEL(src); + b = next) { + next = APR_BUCKET_NEXT(b); - status = last_not_included(from, maxlen, same_alloc, - pfile_buckets_allowed, &end); - if (status != APR_SUCCESS) { - return status; + if (APR_BUCKET_IS_METADATA(b)) { + /* fall through */ } - - while (!APR_BRIGADE_EMPTY(from) && status == APR_SUCCESS) { - b = APR_BRIGADE_FIRST(from); - if (b == end) { - break; + else { + if (remain == b->length) { + /* fall through */ } - - if (same_alloc || (b->list == to->bucket_alloc)) { - /* both brigades use the same bucket_alloc and auto-cleanups - * have the same life time. It's therefore safe to just move - * directly. */ - APR_BUCKET_REMOVE(b); - APR_BRIGADE_INSERT_TAIL(to, b); -#if LOG_BUCKETS - ap_log_perror(APLOG_MARK, LOG_LEVEL, 0, to->p, APLOGNO(03205) - "h2_util_move: %s, passed bucket(same bucket_alloc) " - "%ld-%ld, type=%s", - msg, (long)b->start, (long)b->length, - APR_BUCKET_IS_METADATA(b)? - (APR_BUCKET_IS_EOS(b)? "EOS": - (APR_BUCKET_IS_FLUSH(b)? "FLUSH" : "META")) : - (APR_BUCKET_IS_FILE(b)? "FILE" : "DATA")); -#endif + else if (remain <= 0) { + return status; } - else if (DEEP_COPY) { - /* we have not managed the magic of passing buckets from - * one thread to another. Any attempts result in - * cleanup of pools scrambling memory. - */ - if (APR_BUCKET_IS_METADATA(b)) { - if (APR_BUCKET_IS_EOS(b)) { - APR_BRIGADE_INSERT_TAIL(to, apr_bucket_eos_create(to->bucket_alloc)); - } - else { - /* ignore */ - } - } - else if (pfile_buckets_allowed - && *pfile_buckets_allowed > 0 - && APR_BUCKET_IS_FILE(b)) { - /* We do not want to read files when passing buckets, if - * we can avoid it. However, what we've come up so far - * is not working corrently, resulting either in crashes or - * too many open file descriptors. - */ - apr_bucket_file *f = (apr_bucket_file *)b->data; - apr_file_t *fd = f->fd; - int setaside = (f->readpool != to->p); -#if LOG_BUCKETS - ap_log_perror(APLOG_MARK, LOG_LEVEL, 0, to->p, APLOGNO(03206) - "h2_util_move: %s, moving FILE bucket %ld-%ld " - "from=%lx(p=%lx) to=%lx(p=%lx), setaside=%d", - msg, (long)b->start, (long)b->length, - (long)from, (long)from->p, - (long)to, (long)to->p, setaside); -#endif - if (setaside) { - status = apr_file_setaside(&fd, fd, to->p); - if (status != APR_SUCCESS) { - ap_log_perror(APLOG_MARK, APLOG_ERR, status, to->p, - APLOGNO(02947) "h2_util: %s, setaside FILE", - msg); - return status; - } + else { + if (b->length == ((apr_size_t)-1)) { + const char *ign; + apr_size_t ilen; + status = apr_bucket_read(b, &ign, &ilen, APR_BLOCK_READ); + if (status != APR_SUCCESS) { + return status; } - apr_brigade_insert_file(to, fd, b->start, b->length, - to->p); - --(*pfile_buckets_allowed); } - else { - const char *data; - apr_size_t len; - - status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ); - if (status == APR_SUCCESS && len > 0) { - status = apr_brigade_write(to, NULL, NULL, data, len); -#if LOG_BUCKETS - ap_log_perror(APLOG_MARK, LOG_LEVEL, 0, to->p, APLOGNO(03207) - "h2_util_move: %s, copied bucket %ld-%ld " - "from=%lx(p=%lx) to=%lx(p=%lx)", - msg, (long)b->start, (long)b->length, - (long)from, (long)from->p, - (long)to, (long)to->p); -#endif - } + + if (remain < b->length) { + apr_bucket_split(b, remain); } - apr_bucket_delete(b); - } - else { - apr_bucket_setaside(b, to->p); - APR_BUCKET_REMOVE(b); - APR_BRIGADE_INSERT_TAIL(to, b); -#if LOG_BUCKETS - ap_log_perror(APLOG_MARK, LOG_LEVEL, 0, to->p, APLOGNO(03208) - "h2_util_move: %s, passed setaside bucket %ld-%ld " - "from=%lx(p=%lx) to=%lx(p=%lx)", - msg, (long)b->start, (long)b->length, - (long)from, (long)from->p, - (long)to, (long)to->p); -#endif } } + APR_BUCKET_REMOVE(b); + APR_BRIGADE_INSERT_TAIL(dest, b); + remain -= b->length; } - return status; } -apr_status_t h2_util_copy(apr_bucket_brigade *to, apr_bucket_brigade *from, - apr_off_t maxlen, const char *msg) +apr_status_t h2_brigade_copy_length(apr_bucket_brigade *dest, + apr_bucket_brigade *src, + apr_off_t length) { + apr_bucket *b, *next; + apr_off_t remain = length; apr_status_t status = APR_SUCCESS; - int same_alloc; - - (void)msg; - AP_DEBUG_ASSERT(to); - AP_DEBUG_ASSERT(from); - same_alloc = (to->bucket_alloc == from->bucket_alloc); - - if (!APR_BRIGADE_EMPTY(from)) { - apr_bucket *b, *end, *cpy; + + for (b = APR_BRIGADE_FIRST(src); + b != APR_BRIGADE_SENTINEL(src); + b = next) { + next = APR_BUCKET_NEXT(b); - status = last_not_included(from, maxlen, 0, 0, &end); - if (status != APR_SUCCESS) { - return status; + if (APR_BUCKET_IS_METADATA(b)) { + /* fall through */ } - - for (b = APR_BRIGADE_FIRST(from); - b != APR_BRIGADE_SENTINEL(from) && b != end; - b = APR_BUCKET_NEXT(b)) - { - if (same_alloc) { - status = apr_bucket_copy(b, &cpy); - if (status != APR_SUCCESS) { - break; - } - APR_BRIGADE_INSERT_TAIL(to, cpy); + else { + if (remain == b->length) { + /* fall through */ + } + else if (remain <= 0) { + return status; } else { - if (APR_BUCKET_IS_METADATA(b)) { - if (APR_BUCKET_IS_EOS(b)) { - APR_BRIGADE_INSERT_TAIL(to, apr_bucket_eos_create(to->bucket_alloc)); - } - else if (APR_BUCKET_IS_FLUSH(b)) { - APR_BRIGADE_INSERT_TAIL(to, apr_bucket_flush_create(to->bucket_alloc)); - } - else { - /* ignore */ + if (b->length == ((apr_size_t)-1)) { + const char *ign; + apr_size_t ilen; + status = apr_bucket_read(b, &ign, &ilen, APR_BLOCK_READ); + if (status != APR_SUCCESS) { + return status; } } - else { - const char *data; - apr_size_t len; - status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ); - if (status == APR_SUCCESS && len > 0) { - status = apr_brigade_write(to, NULL, NULL, data, len); -#if LOG_BUCKETS - ap_log_perror(APLOG_MARK, LOG_LEVEL, 0, to->p, APLOGNO(03209) - "h2_util_copy: %s, copied bucket %ld-%ld " - "from=%lx(p=%lx) to=%lx(p=%lx)", - msg, (long)b->start, (long)b->length, - (long)from, (long)from->p, - (long)to, (long)to->p); -#endif - } + + if (remain < b->length) { + apr_bucket_split(b, remain); } } } + status = apr_bucket_copy(b, &b); + if (status != APR_SUCCESS) { + return status; + } + APR_BRIGADE_INSERT_TAIL(dest, b); + remain -= b->length; } return status; } @@ -666,25 +847,6 @@ int h2_util_bb_has_data(apr_bucket_brigade *bb) return 0; } -int h2_util_bb_has_data_or_eos(apr_bucket_brigade *bb) -{ - apr_bucket *b; - for (b = APR_BRIGADE_FIRST(bb); - b != APR_BRIGADE_SENTINEL(bb); - b = APR_BUCKET_NEXT(b)) - { - if (APR_BUCKET_IS_METADATA(b)) { - if (APR_BUCKET_IS_EOS(b)) { - return 1; - } - } - else { - return 1; - } - } - return 0; -} - apr_status_t h2_util_bb_avail(apr_bucket_brigade *bb, apr_off_t *plen, int *peos) { @@ -789,186 +951,102 @@ apr_status_t h2_util_bb_readx(apr_bucket_brigade *bb, return status; } -void h2_util_bb_log(conn_rec *c, int stream_id, int level, - const char *tag, apr_bucket_brigade *bb) +apr_size_t h2_util_bucket_print(char *buffer, apr_size_t bmax, + apr_bucket *b, const char *sep) { - char buffer[16 * 1024]; - const char *line = "(null)"; - apr_size_t bmax = sizeof(buffer)/sizeof(buffer[0]); - int off = 0; - apr_bucket *b; + apr_size_t off = 0; + if (sep && *sep) { + off += apr_snprintf(buffer+off, bmax-off, "%s", sep); + } - if (bb) { - memset(buffer, 0, bmax--); - for (b = APR_BRIGADE_FIRST(bb); - bmax && (b != APR_BRIGADE_SENTINEL(bb)); - b = APR_BUCKET_NEXT(b)) { - - if (APR_BUCKET_IS_METADATA(b)) { - if (APR_BUCKET_IS_EOS(b)) { - off += apr_snprintf(buffer+off, bmax-off, "eos "); - } - else if (APR_BUCKET_IS_FLUSH(b)) { - off += apr_snprintf(buffer+off, bmax-off, "flush "); - } - else if (AP_BUCKET_IS_EOR(b)) { - off += apr_snprintf(buffer+off, bmax-off, "eor "); - } - else { - off += apr_snprintf(buffer+off, bmax-off, "meta(unknown) "); - } - } - else { - const char *btype = "data"; - if (APR_BUCKET_IS_FILE(b)) { - btype = "file"; - } - else if (APR_BUCKET_IS_PIPE(b)) { - btype = "pipe"; - } - else if (APR_BUCKET_IS_SOCKET(b)) { - btype = "socket"; - } - else if (APR_BUCKET_IS_HEAP(b)) { - btype = "heap"; - } - else if (APR_BUCKET_IS_TRANSIENT(b)) { - btype = "transient"; - } - else if (APR_BUCKET_IS_IMMORTAL(b)) { - btype = "immortal"; - } -#if APR_HAS_MMAP - else if (APR_BUCKET_IS_MMAP(b)) { - btype = "mmap"; - } -#endif - else if (APR_BUCKET_IS_POOL(b)) { - btype = "pool"; - } - - off += apr_snprintf(buffer+off, bmax-off, "%s[%ld] ", - btype, - (long)(b->length == ((apr_size_t)-1)? - -1 : b->length)); - } + if (APR_BUCKET_IS_METADATA(b)) { + if (APR_BUCKET_IS_EOS(b)) { + off += apr_snprintf(buffer+off, bmax-off, "eos"); + } + else if (APR_BUCKET_IS_FLUSH(b)) { + off += apr_snprintf(buffer+off, bmax-off, "flush"); + } + else if (AP_BUCKET_IS_EOR(b)) { + off += apr_snprintf(buffer+off, bmax-off, "eor"); + } + else { + off += apr_snprintf(buffer+off, bmax-off, "meta(unknown)"); } - line = *buffer? buffer : "(empty)"; } - /* Intentional no APLOGNO */ - ap_log_cerror(APLOG_MARK, level, 0, c, "bb_dump(%ld-%d)-%s: %s", - c->id, stream_id, tag, line); - -} - -apr_status_t h2_ltransfer_brigade(apr_bucket_brigade *to, - apr_bucket_brigade *from, - apr_pool_t *p, - apr_off_t *plen, - int *peos) -{ - apr_bucket *e; - apr_off_t len = 0, remain = *plen; - apr_status_t rv; - - *peos = 0; - - while (!APR_BRIGADE_EMPTY(from)) { - e = APR_BRIGADE_FIRST(from); - - if (APR_BUCKET_IS_METADATA(e)) { - if (APR_BUCKET_IS_EOS(e)) { - *peos = 1; - } + else { + const char *btype = "data"; + if (APR_BUCKET_IS_FILE(b)) { + btype = "file"; } - else { - if (remain > 0 && e->length == ((apr_size_t)-1)) { - const char *ign; - apr_size_t ilen; - rv = apr_bucket_read(e, &ign, &ilen, APR_BLOCK_READ); - if (rv != APR_SUCCESS) { - return rv; - } - } - - if (remain < e->length) { - if (remain <= 0) { - return APR_SUCCESS; - } - apr_bucket_split(e, remain); - } + else if (APR_BUCKET_IS_PIPE(b)) { + btype = "pipe"; } - - rv = apr_bucket_setaside(e, p); - - /* If the bucket type does not implement setaside, then - * (hopefully) morph it into a bucket type which does, and set - * *that* aside... */ - if (rv == APR_ENOTIMPL) { - const char *s; - apr_size_t n; - - rv = apr_bucket_read(e, &s, &n, APR_BLOCK_READ); - if (rv == APR_SUCCESS) { - rv = apr_bucket_setaside(e, p); - } + else if (APR_BUCKET_IS_SOCKET(b)) { + btype = "socket"; } - - if (rv != APR_SUCCESS) { - /* Return an error but still save the brigade if - * ->setaside() is really not implemented. */ - if (rv != APR_ENOTIMPL) { - return rv; - } + else if (APR_BUCKET_IS_HEAP(b)) { + btype = "heap"; + } + else if (APR_BUCKET_IS_TRANSIENT(b)) { + btype = "transient"; + } + else if (APR_BUCKET_IS_IMMORTAL(b)) { + btype = "immortal"; + } +#if APR_HAS_MMAP + else if (APR_BUCKET_IS_MMAP(b)) { + btype = "mmap"; + } +#endif + else if (APR_BUCKET_IS_POOL(b)) { + btype = "pool"; } - APR_BUCKET_REMOVE(e); - APR_BRIGADE_INSERT_TAIL(to, e); - len += e->length; - remain -= e->length; + off += apr_snprintf(buffer+off, bmax-off, "%s[%ld]", + btype, + (long)(b->length == ((apr_size_t)-1)? + -1 : b->length)); } - - *plen = len; - return APR_SUCCESS; + return off; } -apr_status_t h2_transfer_brigade(apr_bucket_brigade *to, - apr_bucket_brigade *from, - apr_pool_t *p) +apr_size_t h2_util_bb_print(char *buffer, apr_size_t bmax, + const char *tag, const char *sep, + apr_bucket_brigade *bb) { - apr_bucket *e; - apr_status_t rv; - - while (!APR_BRIGADE_EMPTY(from)) { - e = APR_BRIGADE_FIRST(from); - - rv = apr_bucket_setaside(e, p); - - /* If the bucket type does not implement setaside, then - * (hopefully) morph it into a bucket type which does, and set - * *that* aside... */ - if (rv == APR_ENOTIMPL) { - const char *s; - apr_size_t n; + apr_size_t off = 0; + const char *sp = ""; + apr_bucket *b; + + if (bb) { + memset(buffer, 0, bmax--); + off += apr_snprintf(buffer+off, bmax-off, "%s(", tag); + for (b = APR_BRIGADE_FIRST(bb); + bmax && (b != APR_BRIGADE_SENTINEL(bb)); + b = APR_BUCKET_NEXT(b)) { - rv = apr_bucket_read(e, &s, &n, APR_BLOCK_READ); - if (rv == APR_SUCCESS) { - rv = apr_bucket_setaside(e, p); - } - } - - if (rv != APR_SUCCESS) { - /* Return an error but still save the brigade if - * ->setaside() is really not implemented. */ - if (rv != APR_ENOTIMPL) { - return rv; - } + off += h2_util_bucket_print(buffer+off, bmax-off, b, sp); + sp = " "; } - - APR_BUCKET_REMOVE(e); - APR_BRIGADE_INSERT_TAIL(to, e); + off += apr_snprintf(buffer+off, bmax-off, ")%s", sep); } - return APR_SUCCESS; + else { + off += apr_snprintf(buffer+off, bmax-off, "%s(null)%s", tag, sep); + } + return off; +} + +void h2_util_bb_log(conn_rec *c, int stream_id, int level, + const char *tag, apr_bucket_brigade *bb) +{ + char buffer[4 * 1024]; + const char *line = "(null)"; + apr_size_t len, bmax = sizeof(buffer)/sizeof(buffer[0]); + + len = h2_util_bb_print(buffer, bmax, tag, "", bb); + /* Intentional no APLOGNO */ + ap_log_cerror(APLOG_MARK, level, 0, c, "bb_dump(%ld-%d): %s", + c->id, stream_id, len? buffer : line); } apr_status_t h2_append_brigade(apr_bucket_brigade *to, diff --git a/modules/http2/h2_util.h b/modules/http2/h2_util.h index 4ca2f9b65b..e191c1ee1d 100644 --- a/modules/http2/h2_util.h +++ b/modules/http2/h2_util.h @@ -16,6 +16,8 @@ #ifndef __mod_h2__h2_util__ #define __mod_h2__h2_util__ +#include <nghttp2/nghttp2.h> + /******************************************************************************* * some debugging/format helpers ******************************************************************************/ @@ -63,7 +65,120 @@ int h2_ihash_iter(h2_ihash_t *ih, h2_ihash_iter_t *fn, void *ctx); void h2_ihash_add(h2_ihash_t *ih, void *val); void h2_ihash_remove(h2_ihash_t *ih, int id); void h2_ihash_clear(h2_ihash_t *ih); - + +/******************************************************************************* + * ilist - sorted list for structs with int identifier as first member + ******************************************************************************/ +typedef struct h2_ilist_t h2_ilist_t; +typedef int h2_ilist_iter_t(void *ctx, void *val); + +h2_ilist_t *h2_ilist_create(apr_pool_t *pool); + +apr_status_t h2_ilist_add(h2_ilist_t *list, void *val); +void *h2_ilist_get(h2_ilist_t *list, int id); +void *h2_ilist_shift(h2_ilist_t *list); +void *h2_ilist_remove(h2_ilist_t *list, int id); + +int h2_ilist_empty(h2_ilist_t *list); +apr_size_t h2_ilist_count(h2_ilist_t *list); + +/* Iterator over all h2_io* in the set or until a + * callback returns 0. It is not safe to add or remove + * set members during iteration. + * + * @param set the set of h2_io to iterate over + * @param iter the function to call for each io + * @param ctx user data for the callback + * @return 1 iff iteration completed for all members + */ +int h2_ilist_iter(h2_ilist_t *lis, h2_ilist_iter_t *iter, void *ctx); + +/******************************************************************************* + * iqueue - sorted list of int with user defined ordering + ******************************************************************************/ +typedef struct h2_iqueue { + int *elts; + int head; + int nelts; + int nalloc; + apr_pool_t *pool; +} h2_iqueue; + +/** + * Comparator for two int to determine their order. + * + * @param i1 first int to compare + * @param i2 second int to compare + * @param ctx provided user data + * @return value is the same as for strcmp() and has the effect: + * == 0: s1 and s2 are treated equal in ordering + * < 0: s1 should be sorted before s2 + * > 0: s2 should be sorted before s1 + */ +typedef int h2_iq_cmp(int i1, int i2, void *ctx); + +/** + * Allocate a new queue from the pool and initialize. + * @param id the identifier of the queue + * @param pool the memory pool + */ +h2_iqueue *h2_iq_create(apr_pool_t *pool, int capacity); + +/** + * Return != 0 iff there are no tasks in the queue. + * @param q the queue to check + */ +int h2_iq_empty(h2_iqueue *q); + +/** + * Return the number of int in the queue. + * @param q the queue to get size on + */ +int h2_iq_count(h2_iqueue *q); + +/** + * Add a stream idto the queue. + * + * @param q the queue to append the task to + * @param sid the stream id to add + * @param cmp the comparator for sorting + * @param ctx user data for comparator + */ +void h2_iq_add(h2_iqueue *q, int i, h2_iq_cmp *cmp, void *ctx); + +/** + * Remove the stream id from the queue. Return != 0 iff task + * was found in queue. + * @param q the task queue + * @param sid the stream id to remove + * @return != 0 iff task was found in queue + */ +int h2_iq_remove(h2_iqueue *q, int i); + +/** + * Remove all entries in the queue. + */ +void h2_iq_clear(h2_iqueue *q); + +/** + * Sort the stream idqueue again. Call if the task ordering + * has changed. + * + * @param q the queue to sort + * @param cmp the comparator for sorting + * @param ctx user data for the comparator + */ +void h2_iq_sort(h2_iqueue *q, h2_iq_cmp *cmp, void *ctx); + +/** + * Get the first stream id from the queue or NULL if the queue is empty. + * The task will be removed. + * + * @param q the queue to get the first task from + * @return the first stream id of the queue, 0 if empty + */ +int h2_iq_shift(h2_iqueue *q); + /******************************************************************************* * common helpers ******************************************************************************/ @@ -164,33 +279,23 @@ h2_ngheader *h2_util_ngheader_make_req(apr_pool_t *p, /******************************************************************************* * apr brigade helpers ******************************************************************************/ + /** - * Moves data from one brigade into another. If maxlen > 0, it only - * moves up to maxlen bytes into the target brigade, making bucket splits - * if needed. - * @param to the brigade to move the data to - * @param from the brigade to get the data from - * @param maxlen of bytes to move, <= 0 for all - * @param pfile_buckets_allowed how many file buckets may be moved, - * may be 0 or NULL - * @param msg message for use in logging + * Concatenate at most length bytes from src to dest brigade, splitting + * buckets if necessary and reading buckets of indeterminate length. */ -apr_status_t h2_util_move(apr_bucket_brigade *to, apr_bucket_brigade *from, - apr_off_t maxlen, apr_size_t *pfile_buckets_allowed, - const char *msg); - +apr_status_t h2_brigade_concat_length(apr_bucket_brigade *dest, + apr_bucket_brigade *src, + apr_off_t length); + /** - * Copies buckets from one brigade into another. If maxlen > 0, it only - * copies up to maxlen bytes into the target brigade, making bucket splits - * if needed. - * @param to the brigade to copy the data to - * @param from the brigade to get the data from - * @param maxlen of bytes to copy, <= 0 for all - * @param msg message for use in logging + * Copy at most length bytes from src to dest brigade, splitting + * buckets if necessary and reading buckets of indeterminate length. */ -apr_status_t h2_util_copy(apr_bucket_brigade *to, apr_bucket_brigade *from, - apr_off_t maxlen, const char *msg); - +apr_status_t h2_brigade_copy_length(apr_bucket_brigade *dest, + apr_bucket_brigade *src, + apr_off_t length); + /** * Return != 0 iff there is a FLUSH or EOS bucket in the brigade. * @param bb the brigade to check on @@ -198,7 +303,6 @@ apr_status_t h2_util_copy(apr_bucket_brigade *to, apr_bucket_brigade *from, */ int h2_util_has_eos(apr_bucket_brigade *bb, apr_off_t len); int h2_util_bb_has_data(apr_bucket_brigade *bb); -int h2_util_bb_has_data_or_eos(apr_bucket_brigade *bb); /** * Check how many bytes of the desired amount are available and if the @@ -230,6 +334,21 @@ apr_status_t h2_util_bb_readx(apr_bucket_brigade *bb, apr_off_t *plen, int *peos); /** + * Print a bucket's meta data (type and length) to the buffer. + * @return number of characters printed + */ +apr_size_t h2_util_bucket_print(char *buffer, apr_size_t bmax, + apr_bucket *b, const char *sep); + +/** + * Prints the brigade bucket types and lengths into the given buffer + * up to bmax. + * @return number of characters printed + */ +apr_size_t h2_util_bb_print(char *buffer, apr_size_t bmax, + const char *tag, const char *sep, + apr_bucket_brigade *bb); +/** * Logs the bucket brigade (which bucket types with what length) * to the log at the given level. * @param c the connection to log for @@ -243,33 +362,6 @@ void h2_util_bb_log(conn_rec *c, int stream_id, int level, /** * Transfer buckets from one brigade to another with a limit on the - * maximum amount of bytes transfered. Sets aside the buckets to - * pool p. - * @param to brigade to transfer buckets to - * @param from brigades to remove buckets from - * @param p pool that buckets should be setaside to - * @param plen maximum bytes to transfer, actual bytes transferred - * @param peos if an EOS bucket was transferred - */ -apr_status_t h2_ltransfer_brigade(apr_bucket_brigade *to, - apr_bucket_brigade *from, - apr_pool_t *p, - apr_off_t *plen, - int *peos); - -/** - * Transfer all buckets from one brigade to another. Sets aside the buckets to - * pool p. - * @param to brigade to transfer buckets to - * @param from brigades to remove buckets from - * @param p pool that buckets should be setaside to - */ -apr_status_t h2_transfer_brigade(apr_bucket_brigade *to, - apr_bucket_brigade *from, - apr_pool_t *p); - -/** - * Transfer buckets from one brigade to another with a limit on the * maximum amount of bytes transfered. Does no setaside magic, lifetime * of brigades must fit. * @param to brigade to transfer buckets to diff --git a/modules/http2/h2_version.h b/modules/http2/h2_version.h index a9bd29e895..191af8be4c 100644 --- a/modules/http2/h2_version.h +++ b/modules/http2/h2_version.h @@ -26,7 +26,7 @@ * @macro * Version number of the http2 module as c string */ -#define MOD_HTTP2_VERSION "1.4.7-DEV" +#define MOD_HTTP2_VERSION "1.5.0-DEV" /** * @macro @@ -34,7 +34,7 @@ * release. This is a 24 bit number with 8 bits for major number, 8 bits * for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203. */ -#define MOD_HTTP2_VERSION_NUM 0x010407 +#define MOD_HTTP2_VERSION_NUM 0x010500 #endif /* mod_h2_h2_version_h */ diff --git a/modules/http2/h2_worker.c b/modules/http2/h2_worker.c index ca6ce3a2f2..e394298e7d 100644 --- a/modules/http2/h2_worker.c +++ b/modules/http2/h2_worker.c @@ -42,10 +42,8 @@ static void* APR_THREAD_FUNC execute(apr_thread_t *thread, void *wctx) /* Get a h2_task from the main workers queue. */ worker->get_next(worker, worker->ctx, &task, &sticky); while (task) { - h2_task_do(task, worker->io); - - /* if someone was waiting on this task, time to wake up */ - apr_thread_cond_signal(worker->io); + + h2_task_do(task); /* report the task done and maybe get another one from the same * mplx (= master connection), if we can be sticky. */ @@ -64,40 +62,20 @@ static void* APR_THREAD_FUNC execute(apr_thread_t *thread, void *wctx) } h2_worker *h2_worker_create(int id, - apr_pool_t *parent_pool, + apr_pool_t *pool, apr_threadattr_t *attr, h2_worker_mplx_next_fn *get_next, h2_worker_done_fn *worker_done, void *ctx) { - apr_allocator_t *allocator = NULL; - apr_pool_t *pool = NULL; - h2_worker *w; - apr_status_t status; - - apr_allocator_create(&allocator); - apr_allocator_max_free_set(allocator, ap_max_mem_free); - apr_pool_create_ex(&pool, parent_pool, NULL, allocator); - apr_pool_tag(pool, "h2_worker"); - apr_allocator_owner_set(allocator, pool); - - w = apr_pcalloc(pool, sizeof(h2_worker)); + h2_worker *w = apr_pcalloc(pool, sizeof(h2_worker)); if (w) { - APR_RING_ELEM_INIT(w, link); - w->id = id; - w->pool = pool; - + APR_RING_ELEM_INIT(w, link); w->get_next = get_next; w->worker_done = worker_done; w->ctx = ctx; - - status = apr_thread_cond_create(&w->io, w->pool); - if (status != APR_SUCCESS) { - return NULL; - } - - apr_thread_create(&w->thread, attr, execute, w, w->pool); + apr_thread_create(&w->thread, attr, execute, w, pool); } return w; } @@ -109,22 +87,9 @@ apr_status_t h2_worker_destroy(h2_worker *worker) apr_thread_join(&status, worker->thread); worker->thread = NULL; } - if (worker->io) { - apr_thread_cond_destroy(worker->io); - worker->io = NULL; - } - if (worker->pool) { - apr_pool_destroy(worker->pool); - /* worker is gone */ - } return APR_SUCCESS; } -int h2_worker_get_id(h2_worker *worker) -{ - return worker->id; -} - void h2_worker_abort(h2_worker *worker) { worker->aborted = 1; diff --git a/modules/http2/h2_worker.h b/modules/http2/h2_worker.h index 7a8c254f5d..04ff570361 100644 --- a/modules/http2/h2_worker.h +++ b/modules/http2/h2_worker.h @@ -16,7 +16,6 @@ #ifndef __mod_h2__h2_worker__ #define __mod_h2__h2_worker__ -struct apr_thread_cond_t; struct h2_mplx; struct h2_request; struct h2_task; @@ -39,19 +38,14 @@ typedef void h2_worker_done_fn(h2_worker *worker, void *ctx); struct h2_worker { + int id; /** Links to the rest of the workers */ APR_RING_ENTRY(h2_worker) link; - - int id; apr_thread_t *thread; - apr_pool_t *pool; - struct apr_thread_cond_t *io; - h2_worker_mplx_next_fn *get_next; h2_worker_done_fn *worker_done; void *ctx; - - unsigned int aborted : 1; + int aborted; }; /** @@ -136,8 +130,6 @@ apr_status_t h2_worker_destroy(h2_worker *worker); void h2_worker_abort(h2_worker *worker); -int h2_worker_get_id(h2_worker *worker); - int h2_worker_is_aborted(h2_worker *worker); #endif /* defined(__mod_h2__h2_worker__) */ diff --git a/modules/http2/h2_workers.c b/modules/http2/h2_workers.c index 2c1dc8dab4..2a1599914c 100644 --- a/modules/http2/h2_workers.c +++ b/modules/http2/h2_workers.c @@ -116,7 +116,7 @@ static apr_status_t get_mplx_next(h2_worker *worker, void *ctx, if (status == APR_SUCCESS) { ++workers->idle_workers; ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, - "h2_worker(%d): looking for work", h2_worker_get_id(worker)); + "h2_worker(%d): looking for work", worker->id); while (!h2_worker_is_aborted(worker) && !workers->aborted && !(task = next_task(workers))) { @@ -195,7 +195,7 @@ static void worker_done(h2_worker *worker, void *ctx) apr_status_t status = apr_thread_mutex_lock(workers->lock); if (status == APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, - "h2_worker(%d): done", h2_worker_get_id(worker)); + "h2_worker(%d): done", worker->id); H2_WORKER_REMOVE(worker); --workers->worker_count; H2_WORKER_LIST_INSERT_TAIL(&workers->zombies, worker); @@ -213,7 +213,7 @@ static apr_status_t add_worker(h2_workers *workers) return APR_ENOMEM; } ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, - "h2_workers: adding worker(%d)", h2_worker_get_id(w)); + "h2_workers: adding worker(%d)", w->id); ++workers->worker_count; H2_WORKER_LIST_INSERT_TAIL(&workers->workers, w); return APR_SUCCESS; diff --git a/modules/http2/mod_http2.c b/modules/http2/mod_http2.c index 0d33969161..480917a419 100644 --- a/modules/http2/mod_http2.c +++ b/modules/http2/mod_http2.c @@ -57,6 +57,13 @@ AP_DECLARE_MODULE(http2) = { static int h2_h2_fixups(request_rec *r); +typedef struct { + unsigned int change_prio : 1; + unsigned int sha256 : 1; +} features; + +static features myfeats; + /* The module initialization. Called once as apache hook, before any multi * processing (threaded or not) happens. It is typically at least called twice, * see @@ -77,7 +84,16 @@ static int h2_post_config(apr_pool_t *p, apr_pool_t *plog, const char *mod_h2_init_key = "mod_http2_init_counter"; nghttp2_info *ngh2; apr_status_t status; + const char *sep = ""; + (void)plog;(void)ptemp; +#ifdef H2_NG2_CHANGE_PRIO + myfeats.change_prio = 1; + sep = "+"; +#endif +#ifdef H2_OPENSSL + myfeats.sha256 = 1; +#endif apr_pool_userdata_get(&data, mod_h2_init_key, s->process->pool); if ( data == NULL ) { @@ -90,8 +106,11 @@ static int h2_post_config(apr_pool_t *p, apr_pool_t *plog, ngh2 = nghttp2_version(0); ap_log_error( APLOG_MARK, APLOG_INFO, 0, s, APLOGNO(03090) - "mod_http2 (v%s, nghttp2 %s), initializing...", - MOD_HTTP2_VERSION, ngh2? ngh2->version_str : "unknown"); + "mod_http2 (v%s, feats=%s%s%s, nghttp2 %s), initializing...", + MOD_HTTP2_VERSION, + myfeats.change_prio? "CHPRIO" : "", sep, + myfeats.sha256? "SHA256" : "", + ngh2? ngh2->version_str : "unknown"); switch (h2_conn_mpm_type()) { case H2_MPM_SIMPLE: diff --git a/modules/http2/mod_http2.dsp b/modules/http2/mod_http2.dsp index eb55028a06..cc8f666387 100644 --- a/modules/http2/mod_http2.dsp +++ b/modules/http2/mod_http2.dsp @@ -105,6 +105,10 @@ SOURCE=./h2_alt_svc.c # End Source File # Begin Source File +SOURCE=./h2_bucket_beam.c +# End Source File +# Begin Source File + SOURCE=./h2_bucket_eoc.c # End Source File # Begin Source File @@ -141,18 +145,10 @@ SOURCE=./h2_h2.c # End Source File # Begin Source File -SOURCE=./h2_int_queue.c -# End Source File -# Begin Source File - SOURCE=./h2_io.c # End Source File # Begin Source File -SOURCE=./h2_io_set.c -# End Source File -# Begin Source File - SOURCE=./h2_mplx.c # End Source File # Begin Source File @@ -189,14 +185,6 @@ SOURCE=./h2_task.c # End Source File # Begin Source File -SOURCE=./h2_task_input.c -# End Source File -# Begin Source File - -SOURCE=./h2_task_output.c -# End Source File -# Begin Source File - SOURCE=./h2_util.c # End Source File # Begin Source File diff --git a/modules/http2/mod_proxy_http2.c b/modules/http2/mod_proxy_http2.c index 7335e9527f..f643265497 100644 --- a/modules/http2/mod_proxy_http2.c +++ b/modules/http2/mod_proxy_http2.c @@ -21,7 +21,6 @@ #include "mod_proxy_http2.h" -#include "h2_int_queue.h" #include "h2_request.h" #include "h2_util.h" #include "h2_version.h" |