summaryrefslogtreecommitdiffstats
path: root/modules/http2
diff options
context:
space:
mode:
authorStefan Eissing <icing@apache.org>2016-04-15 15:50:46 +0200
committerStefan Eissing <icing@apache.org>2016-04-15 15:50:46 +0200
commit52cdae53be08ed37e5d73eac20a39d76f7617bcb (patch)
treee383ea1324254aee627932931ee9dd5250f0444f /modules/http2
parenthttp: Respond with "408 Request Timeout" when a timeout occurs while (diff)
downloadapache2-52cdae53be08ed37e5d73eac20a39d76f7617bcb.tar.xz
apache2-52cdae53be08ed37e5d73eac20a39d76f7617bcb.zip
mod_http2: new bucket beams for tranporting buckets across threads without buffer copy. Code cleanup
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1739303 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'modules/http2')
-rw-r--r--modules/http2/NWGNUmod_http25
-rw-r--r--modules/http2/config2.m46
-rw-r--r--modules/http2/h2_bucket_beam.c850
-rw-r--r--modules/http2/h2_bucket_beam.h297
-rw-r--r--modules/http2/h2_bucket_eos.c2
-rw-r--r--modules/http2/h2_filter.c71
-rw-r--r--modules/http2/h2_filter.h28
-rw-r--r--modules/http2/h2_from_h1.c7
-rw-r--r--modules/http2/h2_int_queue.c187
-rw-r--r--modules/http2/h2_int_queue.h108
-rw-r--r--modules/http2/h2_io.c389
-rw-r--r--modules/http2/h2_io.h121
-rw-r--r--modules/http2/h2_io_set.c159
-rw-r--r--modules/http2/h2_io_set.h53
-rw-r--r--modules/http2/h2_mplx.c692
-rw-r--r--modules/http2/h2_mplx.h78
-rw-r--r--modules/http2/h2_ngn_shed.c2
-rw-r--r--modules/http2/h2_proxy_session.c1
-rw-r--r--modules/http2/h2_proxy_session.h4
-rw-r--r--modules/http2/h2_session.c32
-rw-r--r--modules/http2/h2_session.h4
-rw-r--r--modules/http2/h2_stream.c331
-rw-r--r--modules/http2/h2_stream.h24
-rw-r--r--modules/http2/h2_task.c403
-rw-r--r--modules/http2/h2_task.h28
-rw-r--r--modules/http2/h2_task_input.c228
-rw-r--r--modules/http2/h2_task_input.h46
-rw-r--r--modules/http2/h2_task_output.c176
-rw-r--r--modules/http2/h2_task_output.h50
-rw-r--r--modules/http2/h2_util.c796
-rw-r--r--modules/http2/h2_util.h196
-rw-r--r--modules/http2/h2_version.h4
-rw-r--r--modules/http2/h2_worker.c47
-rw-r--r--modules/http2/h2_worker.h12
-rw-r--r--modules/http2/h2_workers.c6
-rw-r--r--modules/http2/mod_http2.c23
-rw-r--r--modules/http2/mod_http2.dsp20
-rw-r--r--modules/http2/mod_proxy_http2.c1
38 files changed, 2657 insertions, 2830 deletions
diff --git a/modules/http2/NWGNUmod_http2 b/modules/http2/NWGNUmod_http2
index da470ecdc0..dd855d172a 100644
--- a/modules/http2/NWGNUmod_http2
+++ b/modules/http2/NWGNUmod_http2
@@ -185,6 +185,7 @@ TARGET_lib = \
#
FILES_nlm_objs = \
$(OBJDIR)/h2_alt_svc.o \
+ $(OBJDIR)/h2_bucket_beam.o \
$(OBJDIR)/h2_bucket_eoc.o \
$(OBJDIR)/h2_bucket_eos.o \
$(OBJDIR)/h2_config.o \
@@ -194,9 +195,7 @@ FILES_nlm_objs = \
$(OBJDIR)/h2_filter.o \
$(OBJDIR)/h2_from_h1.o \
$(OBJDIR)/h2_h2.o \
- $(OBJDIR)/h2_int_queue.o \
$(OBJDIR)/h2_io.o \
- $(OBJDIR)/h2_io_set.o \
$(OBJDIR)/h2_mplx.o \
$(OBJDIR)/h2_ngn_shed.o \
$(OBJDIR)/h2_push.o \
@@ -206,8 +205,6 @@ FILES_nlm_objs = \
$(OBJDIR)/h2_stream.o \
$(OBJDIR)/h2_switch.o \
$(OBJDIR)/h2_task.o \
- $(OBJDIR)/h2_task_input.o \
- $(OBJDIR)/h2_task_output.o \
$(OBJDIR)/h2_util.o \
$(OBJDIR)/h2_worker.o \
$(OBJDIR)/h2_workers.o \
diff --git a/modules/http2/config2.m4 b/modules/http2/config2.m4
index 1515c4dbcf..b4ba6da98a 100644
--- a/modules/http2/config2.m4
+++ b/modules/http2/config2.m4
@@ -20,6 +20,7 @@ dnl # list of module object files
http2_objs="dnl
mod_http2.lo dnl
h2_alt_svc.lo dnl
+h2_bucket_beam.lo dnl
h2_bucket_eoc.lo dnl
h2_bucket_eos.lo dnl
h2_config.lo dnl
@@ -29,9 +30,7 @@ h2_ctx.lo dnl
h2_filter.lo dnl
h2_from_h1.lo dnl
h2_h2.lo dnl
-h2_int_queue.lo dnl
h2_io.lo dnl
-h2_io_set.lo dnl
h2_mplx.lo dnl
h2_ngn_shed.lo dnl
h2_push.lo dnl
@@ -41,8 +40,6 @@ h2_session.lo dnl
h2_stream.lo dnl
h2_switch.lo dnl
h2_task.lo dnl
-h2_task_input.lo dnl
-h2_task_output.lo dnl
h2_util.lo dnl
h2_worker.lo dnl
h2_workers.lo dnl
@@ -209,7 +206,6 @@ is usually linked shared and requires loading. ], $http2_objs, , most, [
dnl # list of module object files
proxy_http2_objs="dnl
mod_proxy_http2.lo dnl
-h2_int_queue.lo dnl
h2_proxy_session.lo dnl
h2_request.lo dnl
h2_util.lo dnl
diff --git a/modules/http2/h2_bucket_beam.c b/modules/http2/h2_bucket_beam.c
new file mode 100644
index 0000000000..5406176fad
--- /dev/null
+++ b/modules/http2/h2_bucket_beam.c
@@ -0,0 +1,850 @@
+/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <apr_lib.h>
+#include <apr_strings.h>
+#include <apr_time.h>
+#include <apr_buckets.h>
+#include <apr_thread_mutex.h>
+#include <apr_thread_cond.h>
+
+#include <httpd.h>
+
+#include "h2_util.h"
+#include "h2_bucket_beam.h"
+
+static void h2_beam_emitted(h2_bucket_beam *beam, apr_bucket *bred);
+
+/*******************************************************************************
+ * beam bucket with reference to beam and bucket it represents
+ ******************************************************************************/
+
+extern const apr_bucket_type_t h2_bucket_type_beam;
+
+#define H2_BUCKET_IS_BEAM(e) (e->type == &h2_bucket_type_beam)
+
+typedef struct {
+ apr_bucket_refcount refcount;
+ h2_bucket_beam *beam;
+ apr_bucket *bred;
+} h2_beam_bucket;
+
+static const char Dummy = '\0';
+
+static apr_status_t beam_bucket_read(apr_bucket *b, const char **str,
+ apr_size_t *len, apr_read_type_e block)
+{
+ h2_beam_bucket *d = b->data;
+ if (d->bred) {
+ const char *data;
+ apr_status_t status = apr_bucket_read(d->bred, &data, len, block);
+ if (status == APR_SUCCESS) {
+ *str = data + b->start;
+ *len = b->length;
+ }
+ return status;
+ }
+ *str = &Dummy;
+ *len = 0;
+ return APR_SUCCESS;
+}
+
+static void beam_bucket_destroy(void *data)
+{
+ h2_beam_bucket *d = data;
+
+ if (apr_bucket_shared_destroy(d)) {
+ if (d->bred) {
+ h2_beam_emitted(d->beam, d->bred);
+ }
+ apr_bucket_free(d);
+ }
+}
+
+static apr_bucket * h2_beam_bucket_make(apr_bucket *b,
+ h2_bucket_beam *beam,
+ apr_bucket *bred)
+{
+ h2_beam_bucket *d;
+
+ d = apr_bucket_alloc(sizeof(*d), b->list);
+ d->beam = beam;
+ d->bred = bred;
+
+ b = apr_bucket_shared_make(b, d, 0, bred? bred->length : 0);
+ b->type = &h2_bucket_type_beam;
+
+ return b;
+}
+
+static apr_bucket *h2_beam_bucket_create(h2_bucket_beam *beam,
+ apr_bucket *bred,
+ apr_bucket_alloc_t *list)
+{
+ apr_bucket *b = apr_bucket_alloc(sizeof(*b), list);
+
+ APR_BUCKET_INIT(b);
+ b->free = apr_bucket_free;
+ b->list = list;
+ return h2_beam_bucket_make(b, beam, bred);
+}
+
+APU_DECLARE_DATA const apr_bucket_type_t h2_bucket_type_beam = {
+ "BEAM", 5, APR_BUCKET_DATA,
+ beam_bucket_destroy,
+ beam_bucket_read,
+ apr_bucket_setaside_noop,
+ apr_bucket_shared_split,
+ apr_bucket_shared_copy
+};
+
+/*******************************************************************************
+ * h2_blist, a brigade without allocations
+ ******************************************************************************/
+
+apr_size_t h2_util_bl_print(char *buffer, apr_size_t bmax,
+ const char *tag, const char *sep,
+ h2_blist *bl)
+{
+ apr_size_t off = 0;
+ const char *sp = "";
+ apr_bucket *b;
+
+ if (bl) {
+ memset(buffer, 0, bmax--);
+ off += apr_snprintf(buffer+off, bmax-off, "%s(", tag);
+ for (b = H2_BLIST_FIRST(bl);
+ bmax && (b != H2_BLIST_SENTINEL(bl));
+ b = APR_BUCKET_NEXT(b)) {
+
+ off += h2_util_bucket_print(buffer+off, bmax-off, b, sp);
+ sp = " ";
+ }
+ off += apr_snprintf(buffer+off, bmax-off, ")%s", sep);
+ }
+ else {
+ off += apr_snprintf(buffer+off, bmax-off, "%s(null)%s", tag, sep);
+ }
+ return off;
+}
+
+
+
+/*******************************************************************************
+ * bucket beam that can transport buckets across threads
+ ******************************************************************************/
+
+static apr_status_t enter_yellow(h2_bucket_beam *beam,
+ apr_thread_mutex_t **plock, int *pacquired)
+{
+ if (beam->m_enter) {
+ return beam->m_enter(beam->m_ctx, plock, pacquired);
+ }
+ *plock = NULL;
+ *pacquired = 0;
+ return APR_SUCCESS;
+}
+
+static void leave_yellow(h2_bucket_beam *beam,
+ apr_thread_mutex_t *lock, int acquired)
+{
+ if (acquired && beam->m_leave) {
+ beam->m_leave(beam->m_ctx, lock, acquired);
+ }
+}
+
+static apr_off_t calc_buffered(h2_bucket_beam *beam)
+{
+ apr_off_t len = 0;
+ apr_bucket *b;
+ for (b = H2_BLIST_FIRST(&beam->red);
+ b != H2_BLIST_SENTINEL(&beam->red);
+ b = APR_BUCKET_NEXT(b)) {
+ if (b->length == ((apr_size_t)-1)) {
+ /* do not count */
+ }
+ else if (APR_BUCKET_IS_FILE(b)) {
+ /* if unread, has no real mem footprint. how to test? */
+ }
+ else {
+ len += b->length;
+ }
+ }
+ return len;
+}
+
+static void r_purge_reds(h2_bucket_beam *beam)
+{
+ apr_bucket *bred;
+ /* delete all red buckets in purge brigade, needs to be called
+ * from red thread only */
+ while (!H2_BLIST_EMPTY(&beam->purge)) {
+ bred = H2_BLIST_FIRST(&beam->purge);
+ apr_bucket_delete(bred);
+ }
+}
+
+static apr_size_t calc_space_left(h2_bucket_beam *beam)
+{
+ if (beam->max_buf_size > 0) {
+ apr_off_t len = calc_buffered(beam);
+ return (beam->max_buf_size > len? (beam->max_buf_size - len) : 0);
+ }
+ return APR_SIZE_MAX;
+}
+
+static apr_status_t wait_cond(h2_bucket_beam *beam, apr_thread_mutex_t *lock)
+{
+ if (beam->timeout > 0) {
+ return apr_thread_cond_timedwait(beam->m_cond, lock, beam->timeout);
+ }
+ else {
+ return apr_thread_cond_wait(beam->m_cond, lock);
+ }
+}
+
+static apr_status_t r_wait_space(h2_bucket_beam *beam, apr_read_type_e block,
+ apr_thread_mutex_t *lock, apr_off_t *premain)
+{
+ *premain = calc_space_left(beam);
+ while (!beam->aborted && *premain <= 0
+ && (block == APR_BLOCK_READ) && lock) {
+ apr_status_t status = wait_cond(beam, lock);
+ if (APR_STATUS_IS_TIMEUP(status)) {
+ return status;
+ }
+ r_purge_reds(beam);
+ *premain = calc_space_left(beam);
+ }
+ return beam->aborted? APR_ECONNABORTED : APR_SUCCESS;
+}
+
+static void h2_beam_prep_purge(h2_bucket_beam *beam, apr_bucket *bred)
+{
+ APR_BUCKET_REMOVE(bred);
+ H2_BLIST_INSERT_TAIL(&beam->purge, bred);
+}
+
+static void h2_beam_emitted(h2_bucket_beam *beam, apr_bucket *bred)
+{
+ apr_thread_mutex_t *lock;
+ int acquired;
+
+ if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+ /* even when beam buckets are split, only the one where
+ * refcount drops to 0 will call us */
+ --beam->live_beam_buckets;
+ /* invoked from green thread, the last beam bucket for the red
+ * bucket bred is about to be destroyed.
+ * remove it from the hold, where it should be now */
+ h2_beam_prep_purge(beam, bred);
+ /* notify anyone waiting on space to become available */
+ if (!lock) {
+ r_purge_reds(beam);
+ }
+ else if (beam->m_cond) {
+ apr_thread_cond_broadcast(beam->m_cond);
+ }
+ leave_yellow(beam, lock, acquired);
+ }
+}
+
+static void report_consumption(h2_bucket_beam *beam)
+{
+ if (beam->consumed_fn && (beam->received_bytes != beam->reported_bytes)) {
+ beam->consumed_fn(beam->consumed_ctx, beam,
+ beam->received_bytes - beam->reported_bytes);
+ beam->reported_bytes = beam->received_bytes;
+ }
+}
+
+static void h2_blist_cleanup(h2_blist *bl)
+{
+ apr_bucket *e;
+
+ while (!H2_BLIST_EMPTY(bl)) {
+ e = H2_BLIST_FIRST(bl);
+ apr_bucket_delete(e);
+ }
+}
+
+static apr_status_t beam_cleanup(void *data)
+{
+ h2_bucket_beam *beam = data;
+
+ AP_DEBUG_ASSERT(beam->live_beam_buckets == 0);
+ h2_blist_cleanup(&beam->red);
+ h2_blist_cleanup(&beam->purge);
+ h2_blist_cleanup(&beam->hold);
+ return APR_SUCCESS;
+}
+
+apr_status_t h2_beam_destroy(h2_bucket_beam *beam)
+{
+ apr_pool_cleanup_kill(beam->life_pool, beam, beam_cleanup);
+ return beam_cleanup(beam);
+}
+
+apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *life_pool,
+ int id, const char *tag,
+ apr_size_t max_buf_size)
+{
+ h2_bucket_beam *beam;
+ apr_status_t status = APR_SUCCESS;
+
+ beam = apr_pcalloc(life_pool, sizeof(*beam));
+ if (!beam) {
+ return APR_ENOMEM;
+ }
+
+ beam->id = id;
+ beam->tag = tag;
+ H2_BLIST_INIT(&beam->red);
+ H2_BLIST_INIT(&beam->hold);
+ H2_BLIST_INIT(&beam->purge);
+ beam->life_pool = life_pool;
+ beam->max_buf_size = max_buf_size;
+
+ apr_pool_cleanup_register(life_pool, beam, beam_cleanup,
+ apr_pool_cleanup_null);
+ *pbeam = beam;
+
+ return status;
+}
+
+void h2_beam_buffer_size_set(h2_bucket_beam *beam, apr_size_t buffer_size)
+{
+ apr_thread_mutex_t *lock;
+ int acquired;
+
+ if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+ beam->max_buf_size = buffer_size;
+ leave_yellow(beam, lock, acquired);
+ }
+}
+
+apr_size_t h2_beam_buffer_size_get(h2_bucket_beam *beam)
+{
+ apr_thread_mutex_t *lock;
+ int acquired;
+ apr_size_t buffer_size = 0;
+
+ if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+ buffer_size = beam->max_buf_size;
+ leave_yellow(beam, lock, acquired);
+ }
+ return buffer_size;
+}
+
+void h2_beam_mutex_set(h2_bucket_beam *beam,
+ h2_beam_mutex_enter m_enter,
+ h2_beam_mutex_leave m_leave,
+ apr_thread_cond_t *cond,
+ void *m_ctx)
+{
+ apr_thread_mutex_t *lock;
+ h2_beam_mutex_leave *prev_leave;
+ void *prev_ctx;
+ int acquired;
+
+ if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+ prev_ctx = beam->m_ctx;
+ prev_leave = beam->m_leave;
+ beam->m_enter = m_enter;
+ beam->m_leave = m_leave;
+ beam->m_ctx = m_ctx;
+ beam->m_cond = cond;
+ if (acquired && prev_leave) {
+ /* special tactics when NULLing a lock */
+ prev_leave(prev_ctx, lock, acquired);
+ }
+ }
+}
+
+void h2_beam_timeout_set(h2_bucket_beam *beam, apr_interval_time_t timeout)
+{
+ apr_thread_mutex_t *lock;
+ int acquired;
+
+ if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+ beam->timeout = timeout;
+ leave_yellow(beam, lock, acquired);
+ }
+}
+
+apr_interval_time_t h2_beam_timeout_get(h2_bucket_beam *beam)
+{
+ apr_thread_mutex_t *lock;
+ int acquired;
+ apr_interval_time_t timeout = 0;
+
+ if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+ timeout = beam->timeout;
+ leave_yellow(beam, lock, acquired);
+ }
+ return timeout;
+}
+
+void h2_beam_abort(h2_bucket_beam *beam)
+{
+ apr_thread_mutex_t *lock;
+ int acquired;
+
+ if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+ r_purge_reds(beam);
+ h2_blist_cleanup(&beam->red);
+ beam->aborted = 1;
+ report_consumption(beam);
+ if (beam->m_cond) {
+ apr_thread_cond_broadcast(beam->m_cond);
+ }
+ leave_yellow(beam, lock, acquired);
+ }
+}
+
+static apr_status_t beam_close(h2_bucket_beam *beam)
+{
+ if (!beam->closed) {
+ beam->closed = 1;
+ if (beam->m_cond) {
+ apr_thread_cond_broadcast(beam->m_cond);
+ }
+ }
+ return APR_SUCCESS;
+}
+
+apr_status_t h2_beam_close(h2_bucket_beam *beam)
+{
+ apr_thread_mutex_t *lock;
+ int acquired;
+
+ if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+ r_purge_reds(beam);
+ beam_close(beam);
+ report_consumption(beam);
+ leave_yellow(beam, lock, acquired);
+ }
+ return beam->aborted? APR_ECONNABORTED : APR_SUCCESS;
+}
+
+void h2_beam_shutdown(h2_bucket_beam *beam)
+{
+ apr_thread_mutex_t *lock;
+ int acquired;
+
+ if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+ r_purge_reds(beam);
+ h2_blist_cleanup(&beam->red);
+ beam_close(beam);
+ report_consumption(beam);
+ leave_yellow(beam, lock, acquired);
+ }
+}
+
+void h2_beam_reset(h2_bucket_beam *beam)
+{
+ apr_thread_mutex_t *lock;
+ int acquired;
+
+ if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+ beam_cleanup(beam);
+ beam->closed = beam->close_sent = 0;
+ beam->sent_bytes = beam->received_bytes = beam->reported_bytes = 0;
+ leave_yellow(beam, lock, acquired);
+ }
+}
+
+static apr_status_t append_bucket(h2_bucket_beam *beam,
+ apr_bucket *bred,
+ apr_read_type_e block,
+ apr_pool_t *pool,
+ apr_thread_mutex_t *lock)
+{
+ const char *data;
+ apr_size_t len;
+ apr_off_t space_left = 0;
+ apr_status_t status;
+
+ if (APR_BUCKET_IS_METADATA(bred)) {
+ if (APR_BUCKET_IS_EOS(bred)) {
+ beam->closed = 1;
+ }
+ APR_BUCKET_REMOVE(bred);
+ H2_BLIST_INSERT_TAIL(&beam->red, bred);
+ return APR_SUCCESS;
+ }
+ else if (APR_BUCKET_IS_FILE(bred)) {
+ /* file bucket lengths do not really count */
+ }
+ else {
+ space_left = calc_space_left(beam);
+ if (space_left > 0 && bred->length == ((apr_size_t)-1)) {
+ const char *data;
+ status = apr_bucket_read(bred, &data, &len, APR_BLOCK_READ);
+ if (status != APR_SUCCESS) {
+ return status;
+ }
+ }
+
+ if (space_left < bred->length) {
+ status = r_wait_space(beam, block, lock, &space_left);
+ if (status != APR_SUCCESS) {
+ return status;
+ }
+ if (space_left <= 0) {
+ return APR_EAGAIN;
+ }
+ }
+ /* space available, maybe need bucket split */
+ }
+
+
+ /* The fundamental problem is that reading a red bucket from
+ * a green thread is a total NO GO, because the bucket might use
+ * its pool/bucket_alloc from a foreign thread and that will
+ * corrupt. */
+ status = APR_ENOTIMPL;
+ if (beam->closed && bred->length > 0) {
+ status = APR_EOF;
+ }
+ else if (APR_BUCKET_IS_TRANSIENT(bred)) {
+ /* this takes care of transient buckets and converts them
+ * into heap ones. Other bucket types might or might not be
+ * affected by this. */
+ status = apr_bucket_setaside(bred, pool);
+ }
+ else if (APR_BUCKET_IS_HEAP(bred) || APR_BUCKET_IS_POOL(bred)) {
+ /* For heap/pool buckets read from a green thread is fine. The
+ * data will be there and live until the bucket itself is
+ * destroyed. */
+ status = APR_SUCCESS;
+ }
+ else if (APR_BUCKET_IS_FILE(bred)) {
+ /* For file buckets the problem is their internal readpool that
+ * is used on the first read to allocate buffer/mmap.
+ * Since setting aside a file bucket will de-register the
+ * file cleanup function from the previous pool, we need to
+ * call that from a red thread. Do it now and make our
+ * yellow pool the owner.
+ * Additionally, we allow callbacks to prevent beaming file
+ * handles across. The use case for this is to limit the number
+ * of open file handles and rather use a less efficient beam
+ * transport. */
+ apr_file_t *fd = ((apr_bucket_file *)bred->data)->fd;
+ int can_beam = 1;
+ if (beam->last_beamed != fd && beam->can_beam_fn) {
+ can_beam = beam->can_beam_fn(beam->can_beam_ctx, beam, fd);
+ }
+ if (can_beam) {
+ beam->last_beamed = fd;
+ status = apr_bucket_setaside(bred, pool);
+ }
+ }
+
+ if (status == APR_ENOTIMPL) {
+ /* we have no knowledge about the internals of this bucket,
+ * but on read, it needs to make the data available somehow.
+ * So we do this while still in a red thread. The data will
+ * live at least os long as the red bucket itself. */
+ if (space_left < APR_BUCKET_BUFF_SIZE) {
+ space_left = APR_BUCKET_BUFF_SIZE;
+ }
+ if (space_left < bred->length) {
+ apr_bucket_split(bred, space_left);
+ }
+ status = apr_bucket_read(bred, &data, &len, APR_BLOCK_READ);
+ if (status == APR_SUCCESS) {
+ status = apr_bucket_setaside(bred, pool);
+ }
+ }
+
+ if (status != APR_SUCCESS && status != APR_ENOTIMPL) {
+ return status;
+ }
+
+ APR_BUCKET_REMOVE(bred);
+ H2_BLIST_INSERT_TAIL(&beam->red, bred);
+ beam->sent_bytes += bred->length;
+
+ return APR_SUCCESS;
+}
+
+apr_status_t h2_beam_send(h2_bucket_beam *beam,
+ apr_bucket_brigade *red_brigade,
+ apr_read_type_e block)
+{
+ apr_thread_mutex_t *lock;
+ apr_bucket *bred;
+ apr_status_t status = APR_SUCCESS;
+ int acquired;
+
+ /* Called from the red thread to add buckets to the beam */
+ if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+ r_purge_reds(beam);
+
+ if (beam->aborted) {
+ status = APR_ECONNABORTED;
+ }
+ else if (red_brigade) {
+ while (!APR_BRIGADE_EMPTY(red_brigade)
+ && status == APR_SUCCESS) {
+ bred = APR_BRIGADE_FIRST(red_brigade);
+ status = append_bucket(beam, bred, block, red_brigade->p, lock);
+ }
+ if (beam->m_cond) {
+ apr_thread_cond_broadcast(beam->m_cond);
+ }
+ }
+ report_consumption(beam);
+ leave_yellow(beam, lock, acquired);
+ }
+ return status;
+}
+
+apr_status_t h2_beam_receive(h2_bucket_beam *beam,
+ apr_bucket_brigade *bb,
+ apr_read_type_e block,
+ apr_off_t readbytes)
+{
+ apr_thread_mutex_t *lock;
+ apr_bucket *bred, *bgreen;
+ int acquired, transferred = 0;
+ apr_status_t status = APR_SUCCESS;
+ apr_off_t remain = readbytes;
+
+ /* Called from the green thread to take buckets from the beam */
+ if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+transfer:
+ if (beam->aborted) {
+ status = APR_ECONNABORTED;
+ goto leave;
+ }
+
+ /* transfer enough buckets from our green brigade, if we have one */
+ while (beam->green
+ && !APR_BRIGADE_EMPTY(beam->green)
+ && (readbytes <= 0 || remain >= 0)) {
+ bgreen = APR_BRIGADE_FIRST(beam->green);
+ if (readbytes > 0 && bgreen->length > 0 && remain <= 0) {
+ break;
+ }
+ APR_BUCKET_REMOVE(bgreen);
+ APR_BRIGADE_INSERT_TAIL(bb, bgreen);
+ remain -= bgreen->length;
+ ++transferred;
+ }
+
+ /* transfer from our red brigade, transforming red buckets to
+ * green ones until we have enough */
+ while (!H2_BLIST_EMPTY(&beam->red) && (readbytes <= 0 || remain >= 0)) {
+ bred = H2_BLIST_FIRST(&beam->red);
+ bgreen = NULL;
+
+ if (readbytes > 0 && bred->length > 0 && remain <= 0) {
+ break;
+ }
+
+ if (APR_BUCKET_IS_METADATA(bred)) {
+ if (APR_BUCKET_IS_EOS(bred)) {
+ beam->close_sent = 1;
+ bgreen = apr_bucket_eos_create(bb->bucket_alloc);
+ }
+ else if (APR_BUCKET_IS_FLUSH(bred)) {
+ bgreen = apr_bucket_flush_create(bb->bucket_alloc);
+ }
+ else {
+ /* put red into hold, no green sent out */
+ }
+ }
+ else if (APR_BUCKET_IS_FILE(bred)) {
+ /* This is set aside into the target brigade pool so that
+ * any read operation messes with that pool and not
+ * the red one. */
+ apr_bucket_file *f = (apr_bucket_file *)bred->data;
+ apr_file_t *fd = f->fd;
+ int setaside = (f->readpool != bb->p);
+
+ if (setaside) {
+ status = apr_file_setaside(&fd, fd, bb->p);
+ if (status != APR_SUCCESS) {
+ goto leave;
+ }
+ ++beam->files_beamed;
+ }
+ apr_brigade_insert_file(bb, fd, bred->start, bred->length,
+ bb->p);
+ remain -= bred->length;
+ ++transferred;
+ }
+ else {
+ /* create a "green" standin bucket. we took care about the
+ * underlying red bucket and its data when we placed it into
+ * the red brigade.
+ * the beam bucket will notify us on destruction that bred is
+ * no longer needed. */
+ bgreen = h2_beam_bucket_create(beam, bred, bb->bucket_alloc);
+ ++beam->live_beam_buckets;
+ }
+
+ /* Place the red bucket into our hold, to be destroyed when no
+ * green bucket references it any more. */
+ APR_BUCKET_REMOVE(bred);
+ H2_BLIST_INSERT_TAIL(&beam->hold, bred);
+ beam->received_bytes += bred->length;
+ if (bgreen) {
+ APR_BRIGADE_INSERT_TAIL(bb, bgreen);
+ remain -= bgreen->length;
+ ++transferred;
+ }
+ }
+
+ if (readbytes > 0 && remain < 0) {
+ /* too much, put some back */
+ remain = readbytes;
+ for (bgreen = APR_BRIGADE_FIRST(bb);
+ bgreen != APR_BRIGADE_SENTINEL(bb);
+ bgreen = APR_BUCKET_NEXT(bgreen)) {
+ remain -= bgreen->length;
+ if (remain < 0) {
+ apr_bucket_split(bgreen, bgreen->length+remain);
+ beam->green = apr_brigade_split_ex(bb,
+ APR_BUCKET_NEXT(bgreen),
+ beam->green);
+ break;
+ }
+ }
+ }
+
+ if (transferred) {
+ status = APR_SUCCESS;
+ }
+ else if (beam->closed) {
+ if (!beam->close_sent) {
+ apr_bucket *b = apr_bucket_eos_create(bb->bucket_alloc);
+ APR_BRIGADE_INSERT_TAIL(bb, b);
+ beam->close_sent = 1;
+ status = APR_SUCCESS;
+ }
+ else {
+ status = APR_EOF;
+ }
+ }
+ else if (block == APR_BLOCK_READ && lock && beam->m_cond) {
+ status = wait_cond(beam, lock);
+ if (status != APR_SUCCESS) {
+ goto leave;
+ }
+ goto transfer;
+ }
+ else {
+ status = APR_EAGAIN;
+ }
+leave:
+ leave_yellow(beam, lock, acquired);
+ }
+ return status;
+}
+
+void h2_beam_on_consumed(h2_bucket_beam *beam,
+ h2_beam_consumed_callback *cb, void *ctx)
+{
+ apr_thread_mutex_t *lock;
+ int acquired;
+
+ if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+ beam->consumed_fn = cb;
+ beam->consumed_ctx = ctx;
+ leave_yellow(beam, lock, acquired);
+ }
+}
+
+void h2_beam_on_file_beam(h2_bucket_beam *beam,
+ h2_beam_can_beam_callback *cb, void *ctx)
+{
+ apr_thread_mutex_t *lock;
+ int acquired;
+
+ if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+ beam->can_beam_fn = cb;
+ beam->can_beam_ctx = ctx;
+ leave_yellow(beam, lock, acquired);
+ }
+}
+
+
+apr_off_t h2_beam_get_buffered(h2_bucket_beam *beam)
+{
+ apr_thread_mutex_t *lock;
+ apr_bucket *b;
+ apr_off_t l = 0;
+ int acquired;
+
+ if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+ for (b = H2_BLIST_FIRST(&beam->red);
+ b != H2_BLIST_SENTINEL(&beam->red);
+ b = APR_BUCKET_NEXT(b)) {
+ /* should all have determinate length */
+ l += b->length;
+ }
+ leave_yellow(beam, lock, acquired);
+ }
+ return l;
+}
+
+int h2_beam_empty(h2_bucket_beam *beam)
+{
+ apr_thread_mutex_t *lock;
+ int empty = 1;
+ int acquired;
+
+ if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+ empty = (H2_BLIST_EMPTY(&beam->red)
+ && (!beam->green || APR_BRIGADE_EMPTY(beam->green)));
+ leave_yellow(beam, lock, acquired);
+ }
+ return empty;
+}
+
+int h2_beam_closed(h2_bucket_beam *beam)
+{
+ return beam->closed;
+}
+
+int h2_beam_was_received(h2_bucket_beam *beam)
+{
+ apr_thread_mutex_t *lock;
+ int happend = 0;
+ int acquired;
+
+ if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+ happend = (beam->received_bytes > 0);
+ leave_yellow(beam, lock, acquired);
+ }
+ return happend;
+}
+
+apr_size_t h2_beam_get_files_beamed(h2_bucket_beam *beam)
+{
+ apr_thread_mutex_t *lock;
+ apr_size_t n = 0;
+ int acquired;
+
+ if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+ n = beam->files_beamed;
+ leave_yellow(beam, lock, acquired);
+ }
+ return n;
+}
+
diff --git a/modules/http2/h2_bucket_beam.h b/modules/http2/h2_bucket_beam.h
new file mode 100644
index 0000000000..45d98b29cc
--- /dev/null
+++ b/modules/http2/h2_bucket_beam.h
@@ -0,0 +1,297 @@
+/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef h2_bucket_beam_h
+#define h2_bucket_beam_h
+
+struct apr_thread_mutex_t;
+struct apr_thread_cond_t;
+
+/*******************************************************************************
+ * apr_bucket list without bells and whistles
+ ******************************************************************************/
+
+/**
+ * h2_blist can hold a list of buckets just like apr_bucket_brigade, but
+ * does not to any allocations or related features.
+ */
+typedef struct {
+ APR_RING_HEAD(h2_bucket_list, apr_bucket) list;
+} h2_blist;
+
+#define H2_BLIST_INIT(b) APR_RING_INIT(&(b)->list, apr_bucket, link);
+#define H2_BLIST_SENTINEL(b) APR_RING_SENTINEL(&(b)->list, apr_bucket, link)
+#define H2_BLIST_EMPTY(b) APR_RING_EMPTY(&(b)->list, apr_bucket, link)
+#define H2_BLIST_FIRST(b) APR_RING_FIRST(&(b)->list)
+#define H2_BLIST_LAST(b) APR_RING_LAST(&(b)->list)
+#define H2_BLIST_INSERT_HEAD(b, e) do { \
+ apr_bucket *ap__b = (e); \
+ APR_RING_INSERT_HEAD(&(b)->list, ap__b, apr_bucket, link); \
+ } while (0)
+#define H2_BLIST_INSERT_TAIL(b, e) do { \
+ apr_bucket *ap__b = (e); \
+ APR_RING_INSERT_TAIL(&(b)->list, ap__b, apr_bucket, link); \
+ } while (0)
+#define H2_BLIST_CONCAT(a, b) do { \
+ APR_RING_CONCAT(&(a)->list, &(b)->list, apr_bucket, link); \
+ } while (0)
+#define H2_BLIST_PREPEND(a, b) do { \
+ APR_RING_PREPEND(&(a)->list, &(b)->list, apr_bucket, link); \
+ } while (0)
+
+/**
+ * Print the buckets in the list into the buffer (type and lengths).
+ * @param buffer the buffer to print into
+ * @param bmax max number of characters to place in buffer, incl. trailing 0
+ * @param tag tag string for this bucket list
+ * @param sep separator to use
+ * @param bl the bucket list to print
+ * @return number of characters printed
+ */
+apr_size_t h2_util_bl_print(char *buffer, apr_size_t bmax,
+ const char *tag, const char *sep,
+ h2_blist *bl);
+
+/*******************************************************************************
+ * h2_bucket_beam
+ ******************************************************************************/
+
+/**
+ * A h2_bucket_beam solves the task of transferring buckets, esp. their data,
+ * across threads with zero buffer copies.
+ *
+ * When a thread, let's call it the red thread, wants to send buckets to
+ * another, the green thread, it creates a h2_bucket_beam and adds buckets
+ * via the h2_beam_send(). It gives the beam to the green thread which then
+ * can receive buckets into its own brigade via h2_beam_receive().
+ *
+ * Sending and receiving can happen concurrently, if a thread mutex is set
+ * for the beam, see h2_beam_mutex_set.
+ *
+ * The beam can limit the amount of data it accepts via the buffer_size. This
+ * can also be adjusted during its lifetime. When the beam not only gets a
+ * mutex but als a condition variable (in h2_beam_mutex_set()), sends and
+ * receives can be done blocking. A timeout can be set for such blocks.
+ *
+ * Care needs to be taken when terminating the beam. The beam registers at
+ * the pool it was created with and will cleanup after itself. However, if
+ * received buckets do still exist, already freed memory might be accessed.
+ * The beam does a AP_DEBUG_ASSERT on this condition.
+ *
+ * The proper way of shutting down a beam is to first make sure there are no
+ * more green buckets out there, then cleanup the beam to purge eventually
+ * still existing red buckets and then, possibly, terminate the beam itself
+ * (or the pool it was created with).
+ *
+ * The following restrictions apply to bucket transport:
+ * - only EOS and FLUSH meta buckets are copied through. All other meta buckets
+ * are kept in the beams hold.
+ * - all kind of data buckets are transported through:
+ * - transient buckets are converted to heap ones on send
+ * - heap and pool buckets require no extra handling
+ * - buckets with indeterminate length are read on send
+ * - file buckets will transfer the file itself into a new bucket, if allowed
+ * - all other buckets are read on send to make sure data is present
+ *
+ * This assures that when the red thread sends its red buckets, the data
+ * is made accessible while still on the red side. The red bucket then enters
+ * the beams hold storage.
+ * When the green thread calls receive, red buckets in the hold are wrapped
+ * into special beam buckets. Beam buckets on read present the data directly
+ * from the internal red one, but otherwise live on the green side. When a
+ * beam bucket gets destroyed, it notifies its beam that the corresponding
+ * red bucket from the hold may be destroyed.
+ * Since the destruction of green buckets happens in the green thread, any
+ * corresponding red bucket can not immediately be destroyed, as that would
+ * result in race conditions.
+ * Instead, the beam transfers such red buckets from the hold to the purge
+ * storage. Next time there is a call from the red side, the buckets in
+ * purge will be deleted.
+ *
+ * There are callbacks that can be registered with a beam:
+ * - a "consumed" callback that gets called on the red side with the
+ * amount of data that has been received by the green side. The amount
+ * is a delta from the last callback invocation. The red side can trigger
+ * these callbacks by calling h2_beam_send() with a NULL brigade.
+ * - a "can_beam_file" callback that can prohibit the transfer of file handles
+ * through the beam. This will cause file buckets to be read on send and
+ * its data buffer will then be transports just like a heap bucket would.
+ * When no callback is registered, no restrictions apply and all files are
+ * passed through.
+ * File handles transferred to the green side will stay there until the
+ * receiving brigade's pool is destroyed/cleared. If the pool lives very
+ * long or if many different files are beamed, the process might run out
+ * of available file handles.
+ *
+ * The name "beam" of course is inspired by good old transporter
+ * technology where humans are kept inside the transporter's memory
+ * buffers until the transmission is complete. Star gates use a similar trick.
+ */
+typedef struct h2_bucket_beam h2_bucket_beam;
+
+typedef apr_status_t h2_beam_mutex_enter(void *ctx,
+ struct apr_thread_mutex_t **plock,
+ int *acquired);
+typedef void h2_beam_mutex_leave(void *ctx,
+ struct apr_thread_mutex_t *lock,
+ int acquired);
+typedef void h2_beam_consumed_callback(void *ctx, h2_bucket_beam *beam,
+ apr_off_t bytes);
+
+typedef int h2_beam_can_beam_callback(void *ctx, h2_bucket_beam *beam,
+ apr_file_t *file);
+
+struct h2_bucket_beam {
+ int id;
+ const char *tag;
+ h2_blist red;
+ h2_blist hold;
+ h2_blist purge;
+ apr_bucket_brigade *green;
+ apr_pool_t *life_pool;
+
+ apr_size_t max_buf_size;
+ apr_size_t live_beam_buckets;
+ apr_size_t files_beamed; /* how many file handles have been set aside */
+ apr_file_t *last_beamed; /* last file beamed */
+ apr_off_t sent_bytes; /* amount of bytes send */
+ apr_off_t received_bytes; /* amount of bytes received */
+ apr_off_t reported_bytes; /* amount of bytes reported as consumed */
+
+ unsigned int aborted : 1;
+ unsigned int closed : 1;
+ unsigned int close_sent : 1;
+
+ void *m_ctx;
+ h2_beam_mutex_enter *m_enter;
+ h2_beam_mutex_leave *m_leave;
+ struct apr_thread_cond_t *m_cond;
+ apr_interval_time_t timeout;
+
+ h2_beam_consumed_callback *consumed_fn;
+ void *consumed_ctx;
+ h2_beam_can_beam_callback *can_beam_fn;
+ void *can_beam_ctx;
+};
+
+/**
+ * Creates a new bucket beam for transfer of buckets across threads.
+ *
+ * The pool the beam is created with will be protected by the given
+ * mutex and will be used in multiple threads. It needs a pool allocator
+ * that is only used inside that same mutex.
+ *
+ * @param pbeam will hold the created beam on return
+ * @param life_pool pool for allocating initial structure and cleanups
+ * @param buffer_size maximum memory footprint of buckets buffered in beam, or
+ * 0 for no limitation
+ */
+apr_status_t h2_beam_create(h2_bucket_beam **pbeam,
+ apr_pool_t *life_pool,
+ int id, const char *tag,
+ apr_size_t buffer_size);
+
+apr_status_t h2_beam_destroy(h2_bucket_beam *beam);
+
+/**
+ * Send buckets from the given brigade through the beam. Will hold buckets
+ * internally as long as they have not been processed by the receiving side.
+ * All accepted buckets are removed from the given brigade. Will return with
+ * APR_EAGAIN on non-blocking sends when not all buckets could be accepted.
+ */
+apr_status_t h2_beam_send(h2_bucket_beam *beam,
+ apr_bucket_brigade *red_buckets,
+ apr_read_type_e block);
+
+/**
+ * Receive buckets from the beam into the given brigade. Will return APR_EOF
+ * when reading past an EOS bucket. Reads can be blocking until data is
+ * available or the beam has been closed. Non-blocking calls return APR_EAGAIN
+ * if no data is available.
+ */
+apr_status_t h2_beam_receive(h2_bucket_beam *beam,
+ apr_bucket_brigade *green_buckets,
+ apr_read_type_e block,
+ apr_off_t readbytes);
+
+void h2_beam_abort(h2_bucket_beam *beam);
+
+/**
+ * Close the beam. Does not need to be invoked if certain that an EOS bucket
+ * has been sent.
+ */
+apr_status_t h2_beam_close(h2_bucket_beam *beam);
+
+/**
+ * Empty the buffer and close.
+ */
+void h2_beam_shutdown(h2_bucket_beam *beam);
+
+/**
+ * Reset the beam to its intial, empty state.
+ */
+void h2_beam_reset(h2_bucket_beam *beam);
+
+void h2_beam_mutex_set(h2_bucket_beam *beam,
+ h2_beam_mutex_enter m_enter,
+ h2_beam_mutex_leave m_leave,
+ struct apr_thread_cond_t *cond,
+ void *m_ctx);
+
+/**
+ * Set/get the timeout for blocking read/write operations. Only works
+ * if a mutex has been set for the beam.
+ */
+void h2_beam_timeout_set(h2_bucket_beam *beam,
+ apr_interval_time_t timeout);
+apr_interval_time_t h2_beam_timeout_get(h2_bucket_beam *beam);
+
+/**
+ * Set/get the maximum buffer size for beam data (memory footprint).
+ */
+void h2_beam_buffer_size_set(h2_bucket_beam *beam,
+ apr_size_t buffer_size);
+apr_size_t h2_beam_buffer_size_get(h2_bucket_beam *beam);
+
+/**
+ * Register a callback to be invoked on the red side with the
+ * amount of bytes that have been consumed by the red side, since the
+ * last callback invocation or reset.
+ * @param beam the beam to set the callback on
+ * @param cb the callback or NULL
+ * @param ctx the context to use in callback invocation
+ */
+void h2_beam_on_consumed(h2_bucket_beam *beam,
+ h2_beam_consumed_callback *cb, void *ctx);
+
+void h2_beam_on_file_beam(h2_bucket_beam *beam,
+ h2_beam_can_beam_callback *cb, void *ctx);
+
+/**
+ * Get the amount of bytes currently buffered in the beam (unread).
+ */
+apr_off_t h2_beam_get_buffered(h2_bucket_beam *beam);
+
+int h2_beam_closed(h2_bucket_beam *beam);
+int h2_beam_empty(h2_bucket_beam *beam);
+
+/**
+ * Return != 0 iff (some) data from the beam has been received.
+ */
+int h2_beam_was_received(h2_bucket_beam *beam);
+
+apr_size_t h2_beam_get_files_beamed(h2_bucket_beam *beam);
+
+#endif /* h2_bucket_beam_h */
diff --git a/modules/http2/h2_bucket_eos.c b/modules/http2/h2_bucket_eos.c
index 98a0b365c6..3a5b1a570a 100644
--- a/modules/http2/h2_bucket_eos.c
+++ b/modules/http2/h2_bucket_eos.c
@@ -92,7 +92,7 @@ static void bucket_destroy(void *data)
if (apr_bucket_shared_destroy(h)) {
h2_stream *stream = h->stream;
if (stream) {
- h2_stream_cleanup(stream);
+ h2_stream_eos_destroy(stream);
}
apr_bucket_free(h);
}
diff --git a/modules/http2/h2_filter.c b/modules/http2/h2_filter.c
index 8bf7fbcb40..30ccb2c43e 100644
--- a/modules/http2/h2_filter.c
+++ b/modules/http2/h2_filter.c
@@ -208,18 +208,27 @@ static apr_status_t bbout(apr_bucket_brigade *bb, const char *fmt, ...)
return rv;
}
-static apr_status_t h2_sos_h2_status_buffer(h2_sos *sos, apr_bucket_brigade *bb)
+static apr_status_t h2_status_stream_filter(h2_stream *stream)
{
- h2_stream *stream = sos->stream;
h2_session *session = stream->session;
h2_mplx *mplx = session->mplx;
+ conn_rec *c = session->c;
h2_push_diary *diary;
+ apr_bucket_brigade *bb;
apr_status_t status;
- if (!bb) {
- bb = apr_brigade_create(stream->pool, session->c->bucket_alloc);
+ if (!stream->response) {
+ return APR_EINVAL;
}
+ if (!stream->buffer) {
+ stream->buffer = apr_brigade_create(stream->pool, c->bucket_alloc);
+ }
+ bb = stream->buffer;
+
+ apr_table_unset(stream->response->headers, "Content-Length");
+ stream->response->content_length = -1;
+
bbout(bb, "{\n");
bbout(bb, " \"HTTP2\": \"on\",\n");
bbout(bb, " \"H2PUSH\": \"%s\",\n", h2_session_push_enabled(session)? "on" : "off");
@@ -266,57 +275,15 @@ static apr_status_t h2_sos_h2_status_buffer(h2_sos *sos, apr_bucket_brigade *bb)
bbout(bb, " \"bytes_sent\": %"APR_UINT64_T_FMT"\n", session->io.bytes_written);
bbout(bb, "}\n");
- return sos->prev->buffer(sos->prev, bb);
-}
-
-static apr_status_t h2_sos_h2_status_read_to(h2_sos *sos, apr_bucket_brigade *bb,
- apr_off_t *plen, int *peos)
-{
- return sos->prev->read_to(sos->prev, bb, plen, peos);
-}
-
-static apr_status_t h2_sos_h2_status_prepare(h2_sos *sos, apr_off_t *plen, int *peos)
-{
- return sos->prev->prepare(sos->prev, plen, peos);
-}
-
-static apr_status_t h2_sos_h2_status_readx(h2_sos *sos, h2_io_data_cb *cb, void *ctx,
- apr_off_t *plen, int *peos)
-{
- return sos->prev->readx(sos->prev, cb, ctx, plen, peos);
-}
-
-static apr_table_t *h2_sos_h2_status_get_trailers(h2_sos *sos)
-{
- return sos->prev->get_trailers(sos->prev);
-}
-
-static h2_sos *h2_sos_h2_status_create(h2_sos *prev)
-{
- h2_sos *sos;
- h2_response *response = prev->response;
-
- apr_table_unset(response->headers, "Content-Length");
- response->content_length = -1;
-
- sos = apr_pcalloc(prev->stream->pool, sizeof(*sos));
- sos->prev = prev;
- sos->response = response;
- sos->stream = prev->stream;
- sos->buffer = h2_sos_h2_status_buffer;
- sos->prepare = h2_sos_h2_status_prepare;
- sos->readx = h2_sos_h2_status_readx;
- sos->read_to = h2_sos_h2_status_read_to;
- sos->get_trailers = h2_sos_h2_status_get_trailers;
-
- return sos;
+ return APR_SUCCESS;
}
-h2_sos *h2_filter_sos_create(const char *name, struct h2_sos *prev)
+apr_status_t h2_stream_filter(h2_stream *stream)
{
- if (!strcmp(H2_SOS_H2_STATUS, name)) {
- return h2_sos_h2_status_create(prev);
+ const char *fname = stream->response? stream->response->sos_filter : NULL;
+ if (fname && !strcmp(H2_SOS_H2_STATUS, fname)) {
+ return h2_status_stream_filter(stream);
}
- return prev;
+ return APR_SUCCESS;
}
diff --git a/modules/http2/h2_filter.h b/modules/http2/h2_filter.h
index 2f281f8be1..5ba7d1581b 100644
--- a/modules/http2/h2_filter.h
+++ b/modules/http2/h2_filter.h
@@ -43,35 +43,9 @@ apr_status_t h2_filter_core_input(ap_filter_t* filter,
apr_read_type_e block,
apr_off_t readbytes);
-typedef struct h2_sos h2_sos;
-typedef apr_status_t h2_sos_data_cb(void *ctx, const char *data, apr_off_t len);
-
-typedef apr_status_t h2_sos_buffer(h2_sos *sos, apr_bucket_brigade *bb);
-typedef apr_status_t h2_sos_prepare(h2_sos *sos, apr_off_t *plen, int *peos);
-typedef apr_status_t h2_sos_readx(h2_sos *sos, h2_sos_data_cb *cb,
- void *ctx, apr_off_t *plen, int *peos);
-typedef apr_status_t h2_sos_read_to(h2_sos *sos, apr_bucket_brigade *bb,
- apr_off_t *plen, int *peos);
-typedef apr_table_t *h2_sos_get_trailers(h2_sos *sos);
-
-
#define H2_RESP_SOS_NOTE "h2-sos-filter"
-struct h2_sos {
- struct h2_stream *stream;
- h2_sos *prev;
- struct h2_response *response;
- void *ctx;
- h2_sos_buffer *buffer;
- h2_sos_prepare *prepare;
- h2_sos_readx *readx;
- h2_sos_read_to *read_to;
- h2_sos_get_trailers *get_trailers;
-};
-
-h2_sos *h2_filter_sos_create(const char *name, struct h2_sos *prev);
-
+apr_status_t h2_stream_filter(struct h2_stream *stream);
int h2_filter_h2_status_handler(request_rec *r);
-
#endif /* __mod_h2__h2_filter__ */
diff --git a/modules/http2/h2_from_h1.c b/modules/http2/h2_from_h1.c
index 8e1f163a2a..eb866a7911 100644
--- a/modules/http2/h2_from_h1.c
+++ b/modules/http2/h2_from_h1.c
@@ -31,7 +31,6 @@
#include "h2_response.h"
#include "h2_from_h1.h"
#include "h2_task.h"
-#include "h2_task_output.h"
#include "h2_util.h"
@@ -473,7 +472,7 @@ static h2_response *create_response(h2_from_h1 *from_h1, request_rec *r)
apr_status_t h2_response_output_filter(ap_filter_t *f, apr_bucket_brigade *bb)
{
h2_task *task = f->ctx;
- h2_from_h1 *from_h1 = task->output? task->output->from_h1 : NULL;
+ h2_from_h1 *from_h1 = task->output.from_h1;
request_rec *r = f->r;
apr_bucket *b;
ap_bucket_error *eb = NULL;
@@ -483,7 +482,7 @@ apr_status_t h2_response_output_filter(ap_filter_t *f, apr_bucket_brigade *bb)
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
"h2_from_h1(%d): output_filter called", from_h1->stream_id);
- if (r->header_only && task->output && from_h1->response) {
+ if (r->header_only && from_h1->response) {
/* throw away any data after we have compiled the response */
apr_brigade_cleanup(bb);
return OK;
@@ -552,7 +551,7 @@ apr_status_t h2_response_output_filter(ap_filter_t *f, apr_bucket_brigade *bb)
apr_status_t h2_response_trailers_filter(ap_filter_t *f, apr_bucket_brigade *bb)
{
h2_task *task = f->ctx;
- h2_from_h1 *from_h1 = task->output? task->output->from_h1 : NULL;
+ h2_from_h1 *from_h1 = task->output.from_h1;
request_rec *r = f->r;
apr_bucket *b;
diff --git a/modules/http2/h2_int_queue.c b/modules/http2/h2_int_queue.c
deleted file mode 100644
index 472ae34063..0000000000
--- a/modules/http2/h2_int_queue.c
+++ /dev/null
@@ -1,187 +0,0 @@
-/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <assert.h>
-#include <stddef.h>
-#include <apr_pools.h>
-
-#include "h2_int_queue.h"
-
-
-static void tq_grow(h2_int_queue *q, int nlen);
-static void tq_swap(h2_int_queue *q, int i, int j);
-static int tq_bubble_up(h2_int_queue *q, int i, int top,
- h2_iq_cmp *cmp, void *ctx);
-static int tq_bubble_down(h2_int_queue *q, int i, int bottom,
- h2_iq_cmp *cmp, void *ctx);
-
-h2_int_queue *h2_iq_create(apr_pool_t *pool, int capacity)
-{
- h2_int_queue *q = apr_pcalloc(pool, sizeof(h2_int_queue));
- if (q) {
- q->pool = pool;
- tq_grow(q, capacity);
- q->nelts = 0;
- }
- return q;
-}
-
-int h2_iq_empty(h2_int_queue *q)
-{
- return q->nelts == 0;
-}
-
-int h2_iq_size(h2_int_queue *q)
-{
- return q->nelts;
-}
-
-
-void h2_iq_add(h2_int_queue *q, int sid, h2_iq_cmp *cmp, void *ctx)
-{
- int i;
-
- if (q->nelts >= q->nalloc) {
- tq_grow(q, q->nalloc * 2);
- }
-
- i = (q->head + q->nelts) % q->nalloc;
- q->elts[i] = sid;
- ++q->nelts;
-
- if (cmp) {
- /* bubble it to the front of the queue */
- tq_bubble_up(q, i, q->head, cmp, ctx);
- }
-}
-
-int h2_iq_remove(h2_int_queue *q, int sid)
-{
- int i;
- for (i = 0; i < q->nelts; ++i) {
- if (sid == q->elts[(q->head + i) % q->nalloc]) {
- break;
- }
- }
-
- if (i < q->nelts) {
- ++i;
- for (; i < q->nelts; ++i) {
- q->elts[(q->head+i-1)%q->nalloc] = q->elts[(q->head+i)%q->nalloc];
- }
- --q->nelts;
- return 1;
- }
- return 0;
-}
-
-void h2_iq_clear(h2_int_queue *q)
-{
- q->nelts = 0;
-}
-
-void h2_iq_sort(h2_int_queue *q, h2_iq_cmp *cmp, void *ctx)
-{
- /* Assume that changes in ordering are minimal. This needs,
- * best case, q->nelts - 1 comparisions to check that nothing
- * changed.
- */
- if (q->nelts > 0) {
- int i, ni, prev, last;
-
- /* Start at the end of the queue and create a tail of sorted
- * entries. Make that tail one element longer in each iteration.
- */
- last = i = (q->head + q->nelts - 1) % q->nalloc;
- while (i != q->head) {
- prev = (q->nalloc + i - 1) % q->nalloc;
-
- ni = tq_bubble_up(q, i, prev, cmp, ctx);
- if (ni == prev) {
- /* i bubbled one up, bubble the new i down, which
- * keeps all tasks below i sorted. */
- tq_bubble_down(q, i, last, cmp, ctx);
- }
- i = prev;
- };
- }
-}
-
-
-int h2_iq_shift(h2_int_queue *q)
-{
- int sid;
-
- if (q->nelts <= 0) {
- return 0;
- }
-
- sid = q->elts[q->head];
- q->head = (q->head + 1) % q->nalloc;
- q->nelts--;
-
- return sid;
-}
-
-static void tq_grow(h2_int_queue *q, int nlen)
-{
- if (nlen > q->nalloc) {
- int *nq = apr_pcalloc(q->pool, sizeof(int) * nlen);
- if (q->nelts > 0) {
- int l = ((q->head + q->nelts) % q->nalloc) - q->head;
-
- memmove(nq, q->elts + q->head, sizeof(int) * l);
- if (l < q->nelts) {
- /* elts wrapped, append elts in [0, remain] to nq */
- int remain = q->nelts - l;
- memmove(nq + l, q->elts, sizeof(int) * remain);
- }
- }
- q->elts = nq;
- q->nalloc = nlen;
- q->head = 0;
- }
-}
-
-static void tq_swap(h2_int_queue *q, int i, int j)
-{
- int x = q->elts[i];
- q->elts[i] = q->elts[j];
- q->elts[j] = x;
-}
-
-static int tq_bubble_up(h2_int_queue *q, int i, int top,
- h2_iq_cmp *cmp, void *ctx)
-{
- int prev;
- while (((prev = (q->nalloc + i - 1) % q->nalloc), i != top)
- && (*cmp)(q->elts[i], q->elts[prev], ctx) < 0) {
- tq_swap(q, prev, i);
- i = prev;
- }
- return i;
-}
-
-static int tq_bubble_down(h2_int_queue *q, int i, int bottom,
- h2_iq_cmp *cmp, void *ctx)
-{
- int next;
- while (((next = (q->nalloc + i + 1) % q->nalloc), i != bottom)
- && (*cmp)(q->elts[i], q->elts[next], ctx) > 0) {
- tq_swap(q, next, i);
- i = next;
- }
- return i;
-}
diff --git a/modules/http2/h2_int_queue.h b/modules/http2/h2_int_queue.h
deleted file mode 100644
index 69f1e1c982..0000000000
--- a/modules/http2/h2_int_queue.h
+++ /dev/null
@@ -1,108 +0,0 @@
-/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __mod_h2__h2_int_queue__
-#define __mod_h2__h2_int_queue__
-
-/**
- * h2_int_queue keeps a list of sorted h2_task* in ascending order.
- */
-typedef struct h2_int_queue h2_int_queue;
-
-struct h2_int_queue {
- int *elts;
- int head;
- int nelts;
- int nalloc;
- apr_pool_t *pool;
-};
-
-/**
- * Comparator for two task to determine their order.
- *
- * @param s1 stream id to compare
- * @param s2 stream id to compare
- * @param ctx provided user data
- * @return value is the same as for strcmp() and has the effect:
- * == 0: s1 and s2 are treated equal in ordering
- * < 0: s1 should be sorted before s2
- * > 0: s2 should be sorted before s1
- */
-typedef int h2_iq_cmp(int s1, int s2, void *ctx);
-
-
-/**
- * Allocate a new queue from the pool and initialize.
- * @param id the identifier of the queue
- * @param pool the memory pool
- */
-h2_int_queue *h2_iq_create(apr_pool_t *pool, int capacity);
-
-/**
- * Return != 0 iff there are no tasks in the queue.
- * @param q the queue to check
- */
-int h2_iq_empty(h2_int_queue *q);
-
-/**
- * Return the number of int in the queue.
- * @param q the queue to get size on
- */
-int h2_iq_size(h2_int_queue *q);
-
-/**
- * Add a stream idto the queue.
- *
- * @param q the queue to append the task to
- * @param sid the stream id to add
- * @param cmp the comparator for sorting
- * @param ctx user data for comparator
- */
-void h2_iq_add(h2_int_queue *q, int sid, h2_iq_cmp *cmp, void *ctx);
-
-/**
- * Remove the stream id from the queue. Return != 0 iff task
- * was found in queue.
- * @param q the task queue
- * @param sid the stream id to remove
- * @return != 0 iff task was found in queue
- */
-int h2_iq_remove(h2_int_queue *q, int sid);
-
-/**
- * Remove all entries in the queue.
- */
-void h2_iq_clear(h2_int_queue *q);
-
-/**
- * Sort the stream idqueue again. Call if the task ordering
- * has changed.
- *
- * @param q the queue to sort
- * @param cmp the comparator for sorting
- * @param ctx user data for the comparator
- */
-void h2_iq_sort(h2_int_queue *q, h2_iq_cmp *cmp, void *ctx);
-
-/**
- * Get the first stream id from the queue or NULL if the queue is empty.
- * The task will be removed.
- *
- * @param q the queue to get the first task from
- * @return the first stream id of the queue, 0 if empty
- */
-int h2_iq_shift(h2_int_queue *q);
-
-#endif /* defined(__mod_h2__h2_int_queue__) */
diff --git a/modules/http2/h2_io.c b/modules/http2/h2_io.c
index 6c140a0893..395c34e502 100644
--- a/modules/http2/h2_io.c
+++ b/modules/http2/h2_io.c
@@ -26,6 +26,7 @@
#include <http_request.h>
#include "h2_private.h"
+#include "h2_bucket_beam.h"
#include "h2_h2.h"
#include "h2_io.h"
#include "h2_mplx.h"
@@ -34,66 +35,31 @@
#include "h2_task.h"
#include "h2_util.h"
-h2_io *h2_io_create(int id, apr_pool_t *pool,
- apr_bucket_alloc_t *bucket_alloc,
- const h2_request *request)
+h2_io *h2_io_create(int id, apr_pool_t *pool, const h2_request *request)
{
h2_io *io = apr_pcalloc(pool, sizeof(*io));
if (io) {
io->id = id;
io->pool = pool;
- io->bucket_alloc = bucket_alloc;
io->request = request;
+ if (request->body) {
+ h2_beam_create(&io->beam_in, pool, id, "input", 0);
+ }
}
return io;
}
-static void check_bbin(h2_io *io)
-{
- if (!io->bbin) {
- io->bbin = apr_brigade_create(io->pool, io->bucket_alloc);
- }
-}
-
-static void check_bbout(h2_io *io)
-{
- if (!io->bbout) {
- io->bbout = apr_brigade_create(io->pool, io->bucket_alloc);
- }
-}
-
-static void check_bbtmp(h2_io *io)
-{
- if (!io->bbtmp) {
- io->bbtmp = apr_brigade_create(io->pool, io->bucket_alloc);
- }
-}
-
-static void append_eos(h2_io *io, apr_bucket_brigade *bb)
-{
- APR_BRIGADE_INSERT_TAIL(bb, apr_bucket_eos_create(io->bucket_alloc));
-}
-
void h2_io_redo(h2_io *io)
{
io->worker_started = 0;
io->response = NULL;
io->rst_error = 0;
- if (io->bbin) {
- apr_brigade_cleanup(io->bbin);
- }
- if (io->bbout) {
- apr_brigade_cleanup(io->bbout);
- }
- if (io->bbtmp) {
- apr_brigade_cleanup(io->bbtmp);
- }
io->started_at = io->done_at = 0;
}
-int h2_io_is_repeatable(h2_io *io) {
+int h2_io_can_redo(h2_io *io) {
if (io->submitted
- || io->input_consumed > 0
+ || (io->beam_in && h2_beam_was_received(io->beam_in))
|| !io->request) {
/* cannot repeat that. */
return 0;
@@ -103,351 +69,40 @@ int h2_io_is_repeatable(h2_io *io) {
|| !strcmp("OPTIONS", io->request->method));
}
-void h2_io_set_response(h2_io *io, h2_response *response)
+void h2_io_set_response(h2_io *io, h2_response *response,
+ h2_bucket_beam *output)
{
- AP_DEBUG_ASSERT(io->pool);
AP_DEBUG_ASSERT(response);
AP_DEBUG_ASSERT(!io->response);
- /* we used to clone the response into the io->pool. But
+ /* we used to clone the response into out own pool. But
* we have much tighter control over the EOR bucket nowadays,
* so just use the instance given */
io->response = response;
+ if (output) {
+ io->beam_out = output;
+ }
if (response->rst_error) {
h2_io_rst(io, response->rst_error);
}
- else if (response->content_length == 0) {
- io->eos_out = 1;
- }
}
void h2_io_rst(h2_io *io, int error)
{
io->rst_error = error;
- io->eos_in = 1;
-}
-
-int h2_io_out_has_data(h2_io *io)
-{
- return io->bbout && h2_util_bb_has_data_or_eos(io->bbout);
-}
-
-apr_off_t h2_io_out_length(h2_io *io)
-{
- if (io->bbout) {
- apr_off_t len = 0;
- apr_brigade_length(io->bbout, 0, &len);
- return (len > 0)? len : 0;
- }
- return 0;
-}
-
-apr_status_t h2_io_in_shutdown(h2_io *io)
-{
- if (io->bbin) {
- apr_off_t end_len = 0;
- apr_brigade_length(io->bbin, 1, &end_len);
- io->input_consumed += end_len;
- apr_brigade_cleanup(io->bbin);
- }
- return h2_io_in_close(io);
-}
-
-
-void h2_io_signal_init(h2_io *io, h2_io_op op, apr_interval_time_t timeout,
- apr_thread_cond_t *cond)
-{
- io->timed_op = op;
- io->timed_cond = cond;
- if (timeout > 0) {
- io->timeout_at = apr_time_now() + timeout;
- }
- else {
- io->timeout_at = 0;
- }
-}
-
-void h2_io_signal_exit(h2_io *io)
-{
- io->timed_cond = NULL;
- io->timeout_at = 0;
-}
-
-apr_status_t h2_io_signal_wait(h2_mplx *m, h2_io *io)
-{
- apr_status_t status;
-
- if (io->timeout_at != 0) {
- status = apr_thread_cond_timedwait(io->timed_cond, m->lock, io->timeout_at);
- if (APR_STATUS_IS_TIMEUP(status)) {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c, APLOGNO(03055)
- "h2_mplx(%ld-%d): stream timeout expired: %s",
- m->id, io->id,
- (io->timed_op == H2_IO_READ)? "read" : "write");
- h2_io_rst(io, H2_ERR_CANCEL);
- }
- }
- else {
- apr_thread_cond_wait(io->timed_cond, m->lock);
- status = APR_SUCCESS;
- }
- if (io->orphaned && status == APR_SUCCESS) {
- return APR_ECONNABORTED;
- }
- return status;
-}
-
-void h2_io_signal(h2_io *io, h2_io_op op)
-{
- if (io->timed_cond && (io->timed_op == op || H2_IO_ANY == op)) {
- apr_thread_cond_signal(io->timed_cond);
- }
-}
-
-void h2_io_make_orphaned(h2_io *io, int error)
-{
- io->orphaned = 1;
- if (error) {
- h2_io_rst(io, error);
- }
- /* if someone is waiting, wake him up */
- h2_io_signal(io, H2_IO_ANY);
-}
-
-static int add_trailer(void *ctx, const char *key, const char *value)
-{
- apr_bucket_brigade *bb = ctx;
- apr_status_t status;
-
- status = apr_brigade_printf(bb, NULL, NULL, "%s: %s\r\n",
- key, value);
- return (status == APR_SUCCESS);
-}
-
-static apr_status_t in_append_eos(h2_io *io, apr_bucket_brigade *bb,
- apr_table_t *trailers)
-{
- apr_status_t status = APR_SUCCESS;
- apr_table_t *t = io->request->trailers;
-
- if (trailers && t && !apr_is_empty_table(trailers)) {
- /* trailers passed in, transfer directly. */
- apr_table_overlap(trailers, t, APR_OVERLAP_TABLES_SET);
- t = NULL;
- }
-
- if (io->request->chunked) {
- if (t && !apr_is_empty_table(t)) {
- /* no trailers passed in, transfer via chunked */
- status = apr_brigade_puts(bb, NULL, NULL, "0\r\n");
- apr_table_do(add_trailer, bb, t, NULL);
- status = apr_brigade_puts(bb, NULL, NULL, "\r\n");
- }
- else {
- status = apr_brigade_puts(bb, NULL, NULL, "0\r\n\r\n");
- }
- }
- append_eos(io, bb);
- return status;
-}
-
-apr_status_t h2_io_in_read(h2_io *io, apr_bucket_brigade *bb,
- apr_size_t maxlen, apr_table_t *trailers)
-{
- apr_off_t start_len = 0;
- apr_status_t status;
-
- if (io->rst_error) {
- return APR_ECONNABORTED;
- }
-
- if (!io->bbin || APR_BRIGADE_EMPTY(io->bbin)) {
- if (io->eos_in) {
- if (!io->eos_in_written) {
- status = in_append_eos(io, bb, trailers);
- io->eos_in_written = 1;
- return status;
- }
- return APR_EOF;
- }
- return APR_EAGAIN;
- }
-
- if (io->request->chunked) {
- /* the reader expects HTTP/1.1 chunked encoding */
- check_bbtmp(io);
- status = h2_util_move(io->bbtmp, io->bbin, maxlen, NULL, "h2_io_in_read_chunk");
- if (status == APR_SUCCESS) {
- apr_off_t tmp_len = 0;
-
- apr_brigade_length(io->bbtmp, 1, &tmp_len);
- if (tmp_len > 0) {
- io->input_consumed += tmp_len;
- status = apr_brigade_printf(bb, NULL, NULL, "%lx\r\n",
- (unsigned long)tmp_len);
- if (status == APR_SUCCESS) {
- status = h2_util_move(bb, io->bbtmp, -1, NULL, "h2_io_in_read_tmp1");
- if (status == APR_SUCCESS) {
- status = apr_brigade_puts(bb, NULL, NULL, "\r\n");
- }
- }
- }
- else {
- status = h2_util_move(bb, io->bbtmp, -1, NULL, "h2_io_in_read_tmp2");
- }
- apr_brigade_cleanup(io->bbtmp);
- }
- }
- else {
- apr_brigade_length(bb, 1, &start_len);
-
- status = h2_util_move(bb, io->bbin, maxlen, NULL, "h2_io_in_read");
- if (status == APR_SUCCESS) {
- apr_off_t end_len = 0;
- apr_brigade_length(bb, 1, &end_len);
- io->input_consumed += (end_len - start_len);
- }
- }
-
- if (status == APR_SUCCESS && (!io->bbin || APR_BRIGADE_EMPTY(io->bbin))) {
- if (io->eos_in) {
- if (!io->eos_in_written) {
- status = in_append_eos(io, bb, trailers);
- io->eos_in_written = 1;
- }
- }
- }
-
- if (status == APR_SUCCESS && APR_BRIGADE_EMPTY(bb)) {
- return APR_EAGAIN;
- }
- return status;
-}
-
-apr_status_t h2_io_in_write(h2_io *io, const char *d, apr_size_t len, int eos)
-{
- if (io->rst_error) {
- return APR_ECONNABORTED;
- }
-
- if (io->eos_in) {
- return APR_EOF;
+ if (io->beam_in) {
+ h2_beam_abort(io->beam_in);
}
- if (eos) {
- io->eos_in = 1;
- }
- if (len > 0) {
- check_bbin(io);
- return apr_brigade_write(io->bbin, NULL, NULL, d, len);
- }
- return APR_SUCCESS;
-}
-
-apr_status_t h2_io_in_close(h2_io *io)
-{
- if (io->rst_error) {
- return APR_ECONNABORTED;
+ if (io->beam_out) {
+ h2_beam_abort(io->beam_out);
}
-
- io->eos_in = 1;
- return APR_SUCCESS;
}
-apr_status_t h2_io_out_get_brigade(h2_io *io, apr_bucket_brigade *bb,
- apr_off_t len)
+void h2_io_shutdown(h2_io *io)
{
- if (io->rst_error) {
- return APR_ECONNABORTED;
- }
- if (io->eos_out_read) {
- return APR_EOF;
- }
- else if (!io->bbout || APR_BRIGADE_EMPTY(io->bbout)) {
- return APR_EAGAIN;
+ if (io->beam_in) {
+ h2_beam_shutdown(io->beam_in);
}
- else {
- apr_status_t status;
- apr_off_t pre_len, post_len;
- /* Allow file handles pass through without limits. If they
- * already have the lifetime of this stream, we might as well
- * pass them on to the master connection */
- apr_size_t files = INT_MAX;
-
- apr_brigade_length(bb, 0, &pre_len);
- status = h2_util_move(bb, io->bbout, len, &files, "h2_io_read_to");
- if (status == APR_SUCCESS && io->eos_out
- && APR_BRIGADE_EMPTY(io->bbout)) {
- io->eos_out_read = 1;
- }
- apr_brigade_length(bb, 0, &post_len);
- io->output_consumed += (post_len - pre_len);
- return status;
- }
-}
-
-apr_status_t h2_io_out_write(h2_io *io, apr_bucket_brigade *bb,
- apr_size_t maxlen,
- apr_size_t *pfile_buckets_allowed)
-{
- apr_status_t status;
- apr_bucket *b;
- int start_allowed;
-
- if (io->rst_error) {
- return APR_ECONNABORTED;
- }
-
- /* Filter the EOR bucket and set it aside. We prefer to tear down
- * the request when the whole h2 stream is done */
- for (b = APR_BRIGADE_FIRST(bb);
- b != APR_BRIGADE_SENTINEL(bb);
- b = APR_BUCKET_NEXT(b))
- {
- if (AP_BUCKET_IS_EOR(b)) {
- APR_BUCKET_REMOVE(b);
- io->eor = b;
- break;
- }
- else if (APR_BUCKET_IS_EOS(b)) {
- io->eos_out = 1;
- break;
- }
- }
-
- /* Let's move the buckets from the request processing in here, so
- * that the main thread can read them when it has time/capacity.
- *
- * Move at most "maxlen" memory bytes. If buckets remain, it is
- * the caller's responsibility to take care of this.
- *
- * We allow passing of file buckets as long as we do not have too
- * many open files already buffered. Otherwise we will run out of
- * file handles.
- */
- check_bbout(io);
- start_allowed = *pfile_buckets_allowed;
- status = h2_util_move(io->bbout, bb, maxlen, pfile_buckets_allowed,
- "h2_io_out_write");
- /* track # file buckets moved into our pool */
- if (start_allowed != *pfile_buckets_allowed) {
- io->files_handles_owned += (start_allowed - *pfile_buckets_allowed);
- }
- return status;
-}
-
-
-apr_status_t h2_io_out_close(h2_io *io)
-{
- if (io->rst_error) {
- return APR_ECONNABORTED;
- }
- if (!io->eos_out_read) { /* EOS has not been read yet */
- if (!io->eos_out) {
- check_bbout(io);
- io->eos_out = 1;
- if (!h2_util_has_eos(io->bbout, -1)) {
- append_eos(io, io->bbout);
- }
- }
+ if (io->beam_out) {
+ h2_beam_shutdown(io->beam_out);
}
- return APR_SUCCESS;
}
diff --git a/modules/http2/h2_io.h b/modules/http2/h2_io.h
index d700f6f322..bfd130eaba 100644
--- a/modules/http2/h2_io.h
+++ b/modules/http2/h2_io.h
@@ -16,6 +16,7 @@
#ifndef __mod_h2__h2_io__
#define __mod_h2__h2_io__
+struct h2_bucket_beam;
struct h2_response;
struct apr_thread_cond_t;
struct h2_mplx;
@@ -37,40 +38,23 @@ typedef struct h2_io h2_io;
struct h2_io {
int id; /* stream identifier */
- apr_pool_t *pool; /* stream pool */
- apr_bucket_alloc_t *bucket_alloc;
+ apr_pool_t *pool; /* io pool */
- const struct h2_request *request;/* request on this io */
- struct h2_response *response; /* response to request */
- int rst_error; /* h2 related stream abort error */
+ const struct h2_request *request;/* request to process */
+ struct h2_response *response; /* response to submit */
+
+ struct h2_bucket_beam *beam_in; /* request body buckets */
+ struct h2_bucket_beam *beam_out; /* response body buckets */
- apr_bucket *eor; /* the EOR bucket, set aside */
struct h2_task *task; /* the task once started */
+ apr_time_t started_at; /* when processing started */
+ apr_time_t done_at; /* when processing was done */
- apr_bucket_brigade *bbin; /* input data for stream */
- apr_bucket_brigade *bbout; /* output data from stream */
- apr_bucket_brigade *bbtmp; /* temporary data for chunking */
-
+ int rst_error; /* h2 related stream abort error */
unsigned int orphaned : 1; /* h2_stream is gone for this io */
+ unsigned int submitted : 1; /* response has been submitted to client */
unsigned int worker_started : 1; /* h2_worker started processing for this io */
unsigned int worker_done : 1; /* h2_worker finished for this io */
- unsigned int submitted : 1; /* response has been submitted to client */
- unsigned int request_body : 1; /* iff request has body */
- unsigned int eos_in : 1; /* input eos has been seen */
- unsigned int eos_in_written : 1; /* input eos has been forwarded */
- unsigned int eos_out : 1; /* output eos is present */
- unsigned int eos_out_read : 1; /* output eos has been forwarded */
-
- h2_io_op timed_op; /* which operation is waited on, if any */
- struct apr_thread_cond_t *timed_cond; /* condition to wait on, maybe NULL */
- apr_time_t timeout_at; /* when IO wait will time out */
-
- apr_time_t started_at; /* when processing started */
- apr_time_t done_at; /* when processing was done */
- apr_size_t input_consumed; /* how many bytes have been read */
- apr_size_t output_consumed; /* how many bytes have been written out */
-
- int files_handles_owned;
};
/*******************************************************************************
@@ -80,96 +64,25 @@ struct h2_io {
/**
* Creates a new h2_io for the given stream id.
*/
-h2_io *h2_io_create(int id, apr_pool_t *pool,
- apr_bucket_alloc_t *bucket_alloc,
- const struct h2_request *request);
+h2_io *h2_io_create(int id, apr_pool_t *pool, const struct h2_request *request);
/**
* Set the response of this stream.
*/
-void h2_io_set_response(h2_io *io, struct h2_response *response);
+void h2_io_set_response(h2_io *io, struct h2_response *response,
+ struct h2_bucket_beam *output);
/**
* Reset the stream with the given error code.
*/
void h2_io_rst(h2_io *io, int error);
-int h2_io_is_repeatable(h2_io *io);
+int h2_io_can_redo(h2_io *io);
void h2_io_redo(h2_io *io);
/**
- * Output data is available.
- */
-int h2_io_out_has_data(h2_io *io);
-
-void h2_io_signal(h2_io *io, h2_io_op op);
-void h2_io_signal_init(h2_io *io, h2_io_op op, apr_interval_time_t timeout,
- struct apr_thread_cond_t *cond);
-void h2_io_signal_exit(h2_io *io);
-apr_status_t h2_io_signal_wait(struct h2_mplx *m, h2_io *io);
-
-void h2_io_make_orphaned(h2_io *io, int error);
-
-/*******************************************************************************
- * Input handling of streams.
- ******************************************************************************/
-/**
- * Reads the next bucket from the input. Returns APR_EAGAIN if none
- * is currently available, APR_EOF if end of input has been reached.
- */
-apr_status_t h2_io_in_read(h2_io *io, apr_bucket_brigade *bb,
- apr_size_t maxlen, apr_table_t *trailers);
-
-/**
- * Appends given bucket to the input.
- */
-apr_status_t h2_io_in_write(h2_io *io, const char *d, apr_size_t len, int eos);
-
-/**
- * Closes the input. After existing data has been read, APR_EOF will
- * be returned.
+ * Shuts all input/output down. Clears any buckets buffered and closes.
*/
-apr_status_t h2_io_in_close(h2_io *io);
-
-/**
- * Shuts all input down. Will close input and mark any data buffered
- * as consumed.
- */
-apr_status_t h2_io_in_shutdown(h2_io *io);
-
-/*******************************************************************************
- * Output handling of streams.
- ******************************************************************************/
-
-/**
- * Read a bucket from the output head. Return APR_EAGAIN if non is available,
- * APR_EOF if none available and output has been closed.
- * May be called with buffer == NULL in order to find out how much data
- * is available.
- * @param io the h2_io to read output from
- * @param buffer the buffer to copy the data to, may be NULL
- * @param plen the requested max len, set to amount of data on return
- * @param peos != 0 iff the end of stream has been reached
- */
-apr_status_t h2_io_out_get_brigade(h2_io *io,
- apr_bucket_brigade *bb,
- apr_off_t len);
-
-apr_status_t h2_io_out_write(h2_io *io, apr_bucket_brigade *bb,
- apr_size_t maxlen,
- apr_size_t *pfile_buckets_allowed);
-
-/**
- * Closes the input. After existing data has been read, APR_EOF will
- * be returned.
- */
-apr_status_t h2_io_out_close(h2_io *io);
-
-/**
- * Gives the overall length of the data that is currently queued for
- * output.
- */
-apr_off_t h2_io_out_length(h2_io *io);
-
+void h2_io_shutdown(h2_io *io);
#endif /* defined(__mod_h2__h2_io__) */
diff --git a/modules/http2/h2_io_set.c b/modules/http2/h2_io_set.c
deleted file mode 100644
index e09497954f..0000000000
--- a/modules/http2/h2_io_set.c
+++ /dev/null
@@ -1,159 +0,0 @@
-/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <assert.h>
-#include <stddef.h>
-
-#include <apr_strings.h>
-
-#include <httpd.h>
-#include <http_core.h>
-#include <http_connection.h>
-#include <http_log.h>
-
-#include "h2_private.h"
-#include "h2_io.h"
-#include "h2_io_set.h"
-
-#define h2_io_IDX(list, i) ((h2_io**)(list)->elts)[i]
-
-struct h2_io_set {
- apr_array_header_t *list;
-};
-
-h2_io_set *h2_io_set_create(apr_pool_t *pool)
-{
- h2_io_set *sp = apr_pcalloc(pool, sizeof(h2_io_set));
- if (sp) {
- sp->list = apr_array_make(pool, 100, sizeof(h2_io*));
- if (!sp->list) {
- return NULL;
- }
- }
- return sp;
-}
-
-static int h2_stream_id_cmp(const void *s1, const void *s2)
-{
- h2_io **pio1 = (h2_io **)s1;
- h2_io **pio2 = (h2_io **)s2;
- return (*pio1)->id - (*pio2)->id;
-}
-
-h2_io *h2_io_set_get(h2_io_set *sp, int stream_id)
-{
- /* we keep the array sorted by id, so lookup can be done
- * by bsearch.
- */
- h2_io **ps;
- h2_io key;
- h2_io *pkey = &key;
-
- memset(&key, 0, sizeof(key));
- key.id = stream_id;
- ps = bsearch(&pkey, sp->list->elts, sp->list->nelts,
- sp->list->elt_size, h2_stream_id_cmp);
- return ps? *ps : NULL;
-}
-
-static void h2_io_set_sort(h2_io_set *sp)
-{
- qsort(sp->list->elts, sp->list->nelts, sp->list->elt_size,
- h2_stream_id_cmp);
-}
-
-apr_status_t h2_io_set_add(h2_io_set *sp, h2_io *io)
-{
- h2_io *existing = h2_io_set_get(sp, io->id);
- if (!existing) {
- int last;
- APR_ARRAY_PUSH(sp->list, h2_io*) = io;
- /* Normally, streams get added in ascending order if id. We
- * keep the array sorted, so we just need to check if the newly
- * appended stream has a lower id than the last one. if not,
- * sorting is not necessary.
- */
- last = sp->list->nelts - 1;
- if (last > 0
- && (h2_io_IDX(sp->list, last)->id
- < h2_io_IDX(sp->list, last-1)->id)) {
- h2_io_set_sort(sp);
- }
- }
- return APR_SUCCESS;
-}
-
-static void remove_idx(h2_io_set *sp, int idx)
-{
- int n;
- --sp->list->nelts;
- n = sp->list->nelts - idx;
- if (n > 0) {
- /* There are n h2_io* behind idx. Move the rest down */
- h2_io **selts = (h2_io**)sp->list->elts;
- memmove(selts + idx, selts + idx + 1, n * sizeof(h2_io*));
- }
-}
-
-h2_io *h2_io_set_remove(h2_io_set *sp, h2_io *io)
-{
- int i;
- for (i = 0; i < sp->list->nelts; ++i) {
- h2_io *e = h2_io_IDX(sp->list, i);
- if (e->id == io->id) {
- remove_idx(sp, i);
- return e;
- }
- }
- return NULL;
-}
-
-h2_io *h2_io_set_shift(h2_io_set *set)
-{
- /* For now, this just removes the first element in the set.
- * the name is misleading...
- */
- if (set->list->nelts > 0) {
- h2_io *io = h2_io_IDX(set->list, 0);
- remove_idx(set, 0);
- return io;
- }
- return NULL;
-}
-
-int h2_io_set_is_empty(h2_io_set *sp)
-{
- AP_DEBUG_ASSERT(sp);
- return sp->list->nelts == 0;
-}
-
-int h2_io_set_iter(h2_io_set *sp,
- h2_io_set_iter_fn *iter, void *ctx)
-{
- int i;
- for (i = 0; i < sp->list->nelts; ++i) {
- h2_io *s = h2_io_IDX(sp->list, i);
- if (!iter(ctx, s)) {
- return 0;
- }
- }
- return 1;
-}
-
-apr_size_t h2_io_set_size(h2_io_set *sp)
-{
- return sp->list->nelts;
-}
-
diff --git a/modules/http2/h2_io_set.h b/modules/http2/h2_io_set.h
deleted file mode 100644
index 936e725222..0000000000
--- a/modules/http2/h2_io_set.h
+++ /dev/null
@@ -1,53 +0,0 @@
-/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __mod_h2__h2_io_set__
-#define __mod_h2__h2_io_set__
-
-struct h2_io;
-
-/**
- * A set of h2_io instances. Allows lookup by stream id
- * and other criteria.
- */
-typedef struct h2_io_set h2_io_set;
-
-h2_io_set *h2_io_set_create(apr_pool_t *pool);
-
-apr_status_t h2_io_set_add(h2_io_set *set, struct h2_io *io);
-h2_io *h2_io_set_get(h2_io_set *set, int stream_id);
-h2_io *h2_io_set_remove(h2_io_set *set, struct h2_io *io);
-
-int h2_io_set_is_empty(h2_io_set *set);
-apr_size_t h2_io_set_size(h2_io_set *set);
-
-
-typedef int h2_io_set_iter_fn(void *ctx, struct h2_io *io);
-
-/**
- * Iterator over all h2_io* in the set or until a
- * callback returns 0. It is not safe to add or remove
- * set members during iteration.
- *
- * @param set the set of h2_io to iterate over
- * @param iter the function to call for each io
- * @param ctx user data for the callback
- * @return 1 iff iteration completed for all members
- */
-int h2_io_set_iter(h2_io_set *set, h2_io_set_iter_fn *iter, void *ctx);
-
-h2_io *h2_io_set_shift(h2_io_set *set);
-
-#endif /* defined(__mod_h2__h2_io_set__) */
diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c
index 6513a75415..d75188a78e 100644
--- a/modules/http2/h2_mplx.c
+++ b/modules/http2/h2_mplx.c
@@ -29,38 +29,40 @@
#include "mod_http2.h"
#include "h2_private.h"
+#include "h2_bucket_beam.h"
#include "h2_config.h"
#include "h2_conn.h"
#include "h2_ctx.h"
#include "h2_h2.h"
-#include "h2_int_queue.h"
#include "h2_io.h"
-#include "h2_io_set.h"
#include "h2_response.h"
#include "h2_mplx.h"
#include "h2_ngn_shed.h"
#include "h2_request.h"
#include "h2_stream.h"
#include "h2_task.h"
-#include "h2_task_input.h"
-#include "h2_task_output.h"
#include "h2_worker.h"
#include "h2_workers.h"
#include "h2_util.h"
-#define H2_MPLX_IO_OUT(lvl,m,io,msg) \
- do { \
- if (APLOG_C_IS_LEVEL((m)->c,lvl)) \
- h2_util_bb_log((m)->c,(io)->id,lvl,msg,(io)->bbout); \
- } while(0)
-
-#define H2_MPLX_IO_IN(lvl,m,io,msg) \
- do { \
- if (APLOG_C_IS_LEVEL((m)->c,lvl)) \
- h2_util_bb_log((m)->c,(io)->id,lvl,msg,(io)->bbin); \
- } while(0)
-
+static void h2_beam_log(h2_bucket_beam *beam, int id, const char *msg,
+ conn_rec *c, int level)
+{
+ if (beam && APLOG_C_IS_LEVEL(c,level)) {
+ char buffer[2048];
+ apr_size_t off = 0;
+
+ off += apr_snprintf(buffer+off, H2_ALEN(buffer)-off, "cl=%d, ", beam->closed);
+ off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "red", ", ", &beam->red);
+ off += h2_util_bb_print(buffer+off, H2_ALEN(buffer)-off, "green", ", ", beam->green);
+ off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "hold", ", ", &beam->hold);
+ off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "purge", "", &beam->purge);
+
+ ap_log_cerror(APLOG_MARK, level, 0, c, "beam(%ld-%d): %s %s",
+ c->id, id, msg, buffer);
+ }
+}
/* NULL or the mutex hold by this thread, used for recursive calls
*/
@@ -104,13 +106,51 @@ static void leave_mutex(h2_mplx *m, int acquired)
}
}
-static int is_aborted(h2_mplx *m, apr_status_t *pstatus)
+static apr_status_t io_mutex_enter(void *ctx,
+ apr_thread_mutex_t **plock, int *acquired)
{
- AP_DEBUG_ASSERT(m);
- if (m->aborted) {
- *pstatus = APR_ECONNABORTED;
+ h2_mplx *m = ctx;
+ *plock = m->lock;
+ return enter_mutex(m, acquired);
+}
+
+static void io_mutex_leave(void *ctx, apr_thread_mutex_t *lock, int acquired)
+{
+ h2_mplx *m = ctx;
+ leave_mutex(m, acquired);
+}
+
+static void stream_output_consumed(void *ctx,
+ h2_bucket_beam *beam, apr_off_t length)
+{
+ h2_io *io = ctx;
+ if (length > 0 && io->task && io->task->assigned) {
+ h2_req_engine_out_consumed(io->task->assigned, io->task->c, length);
+ }
+}
+
+static void stream_input_consumed(void *ctx,
+ h2_bucket_beam *beam, apr_off_t length)
+{
+ h2_mplx *m = ctx;
+ if (m->input_consumed && length) {
+ m->input_consumed(m->input_consumed_ctx, beam->id, length);
+ }
+}
+
+static int can_beam_file(void *ctx, h2_bucket_beam *beam, apr_file_t *file)
+{
+ h2_mplx *m = ctx;
+ if (m->tx_handles_reserved > 0) {
+ --m->tx_handles_reserved;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
+ "h2_mplx(%ld-%d): beaming file %s, tx_avail %d",
+ m->id, beam->id, beam->tag, m->tx_handles_reserved);
return 1;
}
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
+ "h2_mplx(%ld-%d): can_beam_file denied on %s",
+ m->id, beam->id, beam->tag);
return 0;
}
@@ -118,9 +158,9 @@ static void have_out_data_for(h2_mplx *m, int stream_id);
static void check_tx_reservation(h2_mplx *m)
{
- if (m->tx_handles_reserved == 0) {
+ if (m->tx_handles_reserved <= 0) {
m->tx_handles_reserved += h2_workers_tx_reserve(m->workers,
- H2MIN(m->tx_chunk_size, h2_io_set_size(m->stream_ios)));
+ H2MIN(m->tx_chunk_size, h2_ilist_count(m->stream_ios)));
}
}
@@ -132,7 +172,7 @@ static void check_tx_free(h2_mplx *m)
h2_workers_tx_free(m->workers, count);
}
else if (m->tx_handles_reserved
- && (!m->stream_ios || h2_io_set_is_empty(m->stream_ios))) {
+ && (!m->stream_ios || h2_ilist_empty(m->stream_ios))) {
h2_workers_tx_free(m->workers, m->tx_handles_reserved);
m->tx_handles_reserved = 0;
}
@@ -143,7 +183,7 @@ static void h2_mplx_destroy(h2_mplx *m)
AP_DEBUG_ASSERT(m);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): destroy, ios=%d",
- m->id, (int)h2_io_set_size(m->stream_ios));
+ m->id, (int)h2_ilist_count(m->stream_ios));
check_tx_free(m);
if (m->pool) {
apr_pool_destroy(m->pool);
@@ -205,8 +245,8 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent,
m->max_streams = h2_config_geti(conf, H2_CONF_MAX_STREAMS);
m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
m->q = h2_iq_create(m->pool, m->max_streams);
- m->stream_ios = h2_io_set_create(m->pool);
- m->ready_ios = h2_io_set_create(m->pool);
+ m->stream_ios = h2_ilist_create(m->pool);
+ m->ready_ios = h2_ilist_create(m->pool);
m->stream_timeout = stream_timeout;
m->workers = workers;
m->workers_max = workers->max_workers;
@@ -240,49 +280,29 @@ apr_uint32_t h2_mplx_shutdown(h2_mplx *m)
return max_stream_started;
}
-static void workers_register(h2_mplx *m)
-{
- /* h2_workers is only a hub for all the h2_worker instances.
- * At the end-of-life of this h2_mplx, we always unregister at
- * the workers. The thing to manage are all the h2_worker instances
- * out there. Those may hold a reference to this h2_mplx and we cannot
- * call them to unregister.
- *
- * Therefore: ref counting for h2_workers in not needed, ref counting
- * for h2_worker using this is critical.
- */
- m->need_registration = 0;
- h2_workers_register(m->workers, m);
-}
-
-static int io_in_consumed_signal(h2_mplx *m, h2_io *io)
+static void io_in_consumed_signal(h2_mplx *m, h2_io *io)
{
- if (io->input_consumed && m->input_consumed) {
- m->input_consumed(m->input_consumed_ctx,
- io->id, io->input_consumed);
- io->input_consumed = 0;
- return 1;
+ if (io->beam_in && io->worker_started) {
+ h2_beam_send(io->beam_in, NULL, 0); /* trigger updates */
}
- return 0;
}
static int io_out_consumed_signal(h2_mplx *m, h2_io *io)
{
- if (io->output_consumed && io->task && io->task->assigned) {
- h2_req_engine_out_consumed(io->task->assigned, io->task->c,
- io->output_consumed);
- io->output_consumed = 0;
- return 1;
+ if (io->beam_out && io->worker_started && io->task && io->task->assigned) {
+ h2_beam_send(io->beam_out, NULL, 0); /* trigger updates */
}
return 0;
}
+
static void io_destroy(h2_mplx *m, h2_io *io, int events)
{
+ conn_rec *slave = NULL;
int reuse_slave;
/* cleanup any buffered input */
- h2_io_in_shutdown(io);
+ h2_io_shutdown(io);
if (events) {
/* Process outstanding events before destruction */
io_in_consumed_signal(m, io);
@@ -291,24 +311,37 @@ static void io_destroy(h2_mplx *m, h2_io *io, int events)
/* The pool is cleared/destroyed which also closes all
* allocated file handles. Give this count back to our
* file handle pool. */
- m->tx_handles_reserved += io->files_handles_owned;
+ if (io->beam_in) {
+ m->tx_handles_reserved += h2_beam_get_files_beamed(io->beam_in);
+ }
+ if (io->beam_out) {
+ m->tx_handles_reserved += h2_beam_get_files_beamed(io->beam_out);
+ }
- h2_io_set_remove(m->stream_ios, io);
- h2_io_set_remove(m->ready_ios, io);
+ h2_ilist_remove(m->stream_ios, io->id);
+ h2_ilist_remove(m->ready_ios, io->id);
if (m->redo_ios) {
- h2_io_set_remove(m->redo_ios, io);
+ h2_ilist_remove(m->redo_ios, io->id);
}
reuse_slave = ((m->spare_slaves->nelts < m->spare_slaves->nalloc)
- && !io->rst_error && io->eor);
+ && !io->rst_error);
if (io->task) {
- conn_rec *slave = io->task->c;
+ slave = io->task->c;
h2_task_destroy(io->task);
io->task = NULL;
-
+ }
+
+ if (io->pool) {
+ if (m->spare_io_pool) {
+ apr_pool_destroy(m->spare_io_pool);
+ }
+ apr_pool_clear(io->pool);
+ m->spare_io_pool = io->pool;
+ }
+
+ if (slave) {
if (reuse_slave && slave->keepalive == AP_CONN_KEEPALIVE) {
- apr_bucket_delete(io->eor);
- io->eor = NULL;
APR_ARRAY_PUSH(m->spare_slaves, conn_rec*) = slave;
}
else {
@@ -316,18 +349,14 @@ static void io_destroy(h2_mplx *m, h2_io *io, int events)
h2_slave_destroy(slave, NULL);
}
}
-
- if (io->pool) {
- apr_pool_destroy(io->pool);
- }
-
+
check_tx_free(m);
}
static int io_stream_done(h2_mplx *m, h2_io *io, int rst_error)
{
/* Remove io from ready set, we will never submit it */
- h2_io_set_remove(m->ready_ios, io);
+ h2_ilist_remove(m->ready_ios, io->id);
if (!io->worker_started || io->worker_done) {
/* already finished or not even started yet */
h2_iq_remove(m->q, io->id);
@@ -336,39 +365,41 @@ static int io_stream_done(h2_mplx *m, h2_io *io, int rst_error)
}
else {
/* cleanup once task is done */
- h2_io_make_orphaned(io, rst_error);
+ io->orphaned = 1;
+ if (rst_error) {
+ h2_io_rst(io, rst_error);
+ }
return 1;
}
}
-static int stream_done_iter(void *ctx, h2_io *io)
+static int stream_done_iter(void *ctx, void *val)
{
- return io_stream_done((h2_mplx*)ctx, io, 0);
+ return io_stream_done((h2_mplx*)ctx, val, 0);
}
-static int stream_print(void *ctx, h2_io *io)
+static int stream_print(void *ctx, void *val)
{
h2_mplx *m = ctx;
+ h2_io *io = val;
if (io && io->request) {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
"->03198: h2_stream(%ld-%d): %s %s %s -> %s %d"
- "[orph=%d/started=%d/done=%d/eos_in=%d/eos_out=%d]",
+ "[orph=%d/started=%d/done=%d]",
m->id, io->id,
io->request->method, io->request->authority, io->request->path,
io->response? "http" : (io->rst_error? "reset" : "?"),
io->response? io->response->http_status : io->rst_error,
- io->orphaned, io->worker_started, io->worker_done,
- io->eos_in, io->eos_out);
+ io->orphaned, io->worker_started, io->worker_done);
}
else if (io) {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
"->03198: h2_stream(%ld-%d): NULL -> %s %d"
- "[orph=%d/started=%d/done=%d/eos_in=%d/eos_out=%d]",
+ "[orph=%d/started=%d/done=%d]",
m->id, io->id,
io->response? "http" : (io->rst_error? "reset" : "?"),
io->response? io->response->http_status : io->rst_error,
- io->orphaned, io->worker_started, io->worker_done,
- io->eos_in, io->eos_out);
+ io->orphaned, io->worker_started, io->worker_done);
}
else {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
@@ -392,7 +423,7 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
h2_iq_clear(m->q);
apr_thread_cond_broadcast(m->task_thawed);
- while (!h2_io_set_iter(m->stream_ios, stream_done_iter, m)) {
+ while (!h2_ilist_iter(m->stream_ios, stream_done_iter, m)) {
/* iterate until all ios have been orphaned or destroyed */
}
@@ -407,9 +438,13 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
m->join_wait = wait;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): release_join, waiting on %d worker to report back",
- m->id, (int)h2_io_set_size(m->stream_ios));
+ m->id, (int)h2_ilist_count(m->stream_ios));
status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs));
+
+ while (!h2_ilist_iter(m->stream_ios, stream_done_iter, m)) {
+ /* iterate until all ios have been orphaned or destroyed */
+ }
if (APR_STATUS_IS_TIMEUP(status)) {
if (i > 0) {
/* Oh, oh. Still we wait for assigned workers to report that
@@ -421,9 +456,9 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
"h2_mplx(%ld): release, waiting for %d seconds now for "
"%d h2_workers to return, have still %d requests outstanding",
m->id, i*wait_secs, m->workers_busy,
- (int)h2_io_set_size(m->stream_ios));
+ (int)h2_ilist_count(m->stream_ios));
if (i == 1) {
- h2_io_set_iter(m->stream_ios, stream_print, m);
+ h2_ilist_iter(m->stream_ios, stream_print, m);
}
}
h2_mplx_abort(m);
@@ -431,10 +466,10 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
}
}
- if (!h2_io_set_is_empty(m->stream_ios)) {
+ if (!h2_ilist_empty(m->stream_ios)) {
ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, m->c,
"h2_mplx(%ld): release_join, %d streams still open",
- m->id, (int)h2_io_set_size(m->stream_ios));
+ m->id, (int)h2_ilist_count(m->stream_ios));
}
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
"h2_mplx(%ld): release_join -> destroy", m->id);
@@ -468,7 +503,7 @@ apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error)
*/
AP_DEBUG_ASSERT(m);
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+ h2_io *io = h2_ilist_get(m->stream_ios, stream_id);
/* there should be an h2_io, once the stream has been scheduled
* for processing, e.g. when we received all HEADERs. But when
@@ -484,107 +519,16 @@ apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error)
return status;
}
-apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block,
- int stream_id, apr_bucket_brigade *bb,
- apr_table_t *trailers,
- struct apr_thread_cond_t *iowait)
-{
- apr_status_t status;
- int acquired;
-
- AP_DEBUG_ASSERT(m);
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io && !io->orphaned) {
- H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_read_pre");
-
- h2_io_signal_init(io, H2_IO_READ, m->stream_timeout, iowait);
- status = h2_io_in_read(io, bb, -1, trailers);
- while (APR_STATUS_IS_EAGAIN(status)
- && !is_aborted(m, &status)
- && block == APR_BLOCK_READ) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
- "h2_mplx(%ld-%d): wait on in data (BLOCK_READ)",
- m->id, stream_id);
- status = h2_io_signal_wait(m, io);
- if (status == APR_SUCCESS) {
- status = h2_io_in_read(io, bb, -1, trailers);
- }
- }
- H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_read_post");
- h2_io_signal_exit(io);
- }
- else {
- status = APR_EOF;
- }
- leave_mutex(m, acquired);
- }
- return status;
-}
-
-apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id,
- const char *data, apr_size_t len, int eos)
-{
- apr_status_t status;
- int acquired;
-
- AP_DEBUG_ASSERT(m);
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io && !io->orphaned) {
- H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_pre");
- status = h2_io_in_write(io, data, len, eos);
- H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_post");
- h2_io_signal(io, H2_IO_READ);
- io_in_consumed_signal(m, io);
- }
- else {
- status = APR_ECONNABORTED;
- }
- leave_mutex(m, acquired);
- }
- return status;
-}
-
-apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id)
-{
- apr_status_t status;
- int acquired;
-
- AP_DEBUG_ASSERT(m);
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io && !io->orphaned) {
- status = h2_io_in_close(io);
- H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_close");
- h2_io_signal(io, H2_IO_READ);
- io_in_consumed_signal(m, io);
- }
- else {
- status = APR_ECONNABORTED;
- }
- leave_mutex(m, acquired);
- }
- return status;
-}
-
void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx)
{
m->input_consumed = cb;
m->input_consumed_ctx = ctx;
}
-typedef struct {
- h2_mplx * m;
- int streams_updated;
-} update_ctx;
-
-static int update_window(void *ctx, h2_io *io)
+static int update_window(void *ctx, void *val)
{
- update_ctx *uctx = (update_ctx*)ctx;
- if (io_in_consumed_signal(uctx->m, io)) {
- ++uctx->streams_updated;
- }
+ h2_mplx *m = ctx;
+ io_in_consumed_signal(m, val);
return 1;
}
@@ -598,46 +542,11 @@ apr_status_t h2_mplx_in_update_windows(h2_mplx *m)
return APR_ECONNABORTED;
}
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- update_ctx ctx;
+ h2_ilist_iter(m->stream_ios, update_window, m);
- ctx.m = m;
- ctx.streams_updated = 0;
-
- status = APR_EAGAIN;
- h2_io_set_iter(m->stream_ios, update_window, &ctx);
-
- if (ctx.streams_updated) {
- status = APR_SUCCESS;
- }
- leave_mutex(m, acquired);
- }
- return status;
-}
-
-apr_status_t h2_mplx_out_get_brigade(h2_mplx *m, int stream_id,
- apr_bucket_brigade *bb,
- apr_off_t len, apr_table_t **ptrailers)
-{
- apr_status_t status;
- int acquired;
-
- AP_DEBUG_ASSERT(m);
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io && !io->orphaned) {
- H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_get_brigade_pre");
-
- status = h2_io_out_get_brigade(io, bb, len);
-
- H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_get_brigade_post");
- if (status == APR_SUCCESS) {
- h2_io_signal(io, H2_IO_WRITE);
- }
- }
- else {
- status = APR_ECONNABORTED;
- }
- *ptrailers = io->response? io->response->trailers : NULL;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_session(%ld): windows updated", m->id);
+ status = APR_SUCCESS;
leave_mutex(m, acquired);
}
return status;
@@ -651,7 +560,7 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams)
AP_DEBUG_ASSERT(m);
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_io_set_shift(m->ready_ios);
+ h2_io *io = h2_ilist_shift(m->ready_ios);
if (io && !m->aborted) {
stream = h2_ihash_get(streams, io->id);
if (stream) {
@@ -661,9 +570,7 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams)
}
else {
AP_DEBUG_ASSERT(io->response);
- H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_next_submit_pre");
- h2_stream_set_response(stream, io->response, io->bbout);
- H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_next_submit_post");
+ h2_stream_set_response(stream, io->response, io->beam_out);
}
}
else {
@@ -675,114 +582,62 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams)
"h2_mplx(%ld): stream for response %d closed, "
"resetting io to close request processing",
m->id, io->id);
- h2_io_make_orphaned(io, H2_ERR_STREAM_CLOSED);
+ io->orphaned = 1;
+ h2_io_rst(io, H2_ERR_STREAM_CLOSED);
if (!io->worker_started || io->worker_done) {
io_destroy(m, io, 1);
}
else {
/* hang around until the h2_task is done, but
- * shutdown input and send out any events (e.g. window
- * updates) asap. */
- h2_io_in_shutdown(io);
+ * shutdown input/output and send out any events asap. */
+ h2_io_shutdown(io);
io_in_consumed_signal(m, io);
}
}
-
- h2_io_signal(io, H2_IO_WRITE);
}
leave_mutex(m, acquired);
}
return stream;
}
-static apr_status_t out_write(h2_mplx *m, h2_io *io,
- ap_filter_t* f, int blocking,
- apr_bucket_brigade *bb,
- struct apr_thread_cond_t *iowait)
+static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response,
+ h2_bucket_beam *output)
{
apr_status_t status = APR_SUCCESS;
- /* We check the memory footprint queued for this stream_id
- * and block if it exceeds our configured limit.
- * We will not split buckets to enforce the limit to the last
- * byte. After all, the bucket is already in memory.
- */
- while (status == APR_SUCCESS
- && !APR_BRIGADE_EMPTY(bb)
- && !is_aborted(m, &status)) {
-
- status = h2_io_out_write(io, bb, blocking? m->stream_max_mem : INT_MAX,
- &m->tx_handles_reserved);
- io_out_consumed_signal(m, io);
-
- /* Wait for data to drain until there is room again or
- * stream timeout expires */
- h2_io_signal_init(io, H2_IO_WRITE, m->stream_timeout, iowait);
- while (status == APR_SUCCESS
- && !APR_BRIGADE_EMPTY(bb)
- && iowait
- && (m->stream_max_mem <= h2_io_out_length(io))
- && !is_aborted(m, &status)) {
- if (!blocking) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
- "h2_mplx(%ld-%d): incomplete write",
- m->id, io->id);
- return APR_INCOMPLETE;
- }
- if (f) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
- "h2_mplx(%ld-%d): waiting for out drain",
- m->id, io->id);
- }
- status = h2_io_signal_wait(m, io);
- }
- h2_io_signal_exit(io);
+
+ h2_io *io = h2_ilist_get(m->stream_ios, stream_id);
+ if (!io || io->orphaned) {
+ return APR_ECONNABORTED;
}
- apr_brigade_cleanup(bb);
- return status;
-}
-
-static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response,
- ap_filter_t* f, apr_bucket_brigade *bb,
- struct apr_thread_cond_t *iowait)
-{
- apr_status_t status = APR_SUCCESS;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld-%d): open response: %d, rst=%d",
+ m->id, stream_id, response->http_status,
+ response->rst_error);
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io && !io->orphaned) {
- if (f) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
- "h2_mplx(%ld-%d): open response: %d, rst=%d",
- m->id, stream_id, response->http_status,
- response->rst_error);
- }
-
- h2_io_set_response(io, response);
- h2_io_set_add(m->ready_ios, io);
- if (response && response->http_status < 300) {
- /* we might see some file buckets in the output, see
- * if we have enough handles reserved. */
- check_tx_reservation(m);
- }
- if (bb) {
- status = out_write(m, io, f, 0, bb, iowait);
- if (status == APR_INCOMPLETE) {
- /* write will have transferred as much data as possible.
- caller has to deal with non-empty brigade */
- status = APR_SUCCESS;
- }
- }
- have_out_data_for(m, stream_id);
+ if (output) {
+ h2_beam_buffer_size_set(output, m->stream_max_mem);
+ h2_beam_timeout_set(output, m->stream_timeout);
+ h2_beam_on_consumed(output, stream_output_consumed, io);
+ m->tx_handles_reserved -= h2_beam_get_files_beamed(output);
+ h2_beam_on_file_beam(output, can_beam_file, m);
+ h2_beam_mutex_set(output, io_mutex_enter, io_mutex_leave,
+ io->task->cond, m);
}
- else {
- status = APR_ECONNABORTED;
+ h2_io_set_response(io, response, output);
+
+ h2_ilist_add(m->ready_ios, io);
+ if (response && response->http_status < 300) {
+ /* we might see some file buckets in the output, see
+ * if we have enough handles reserved. */
+ check_tx_reservation(m);
}
+ have_out_data_for(m, stream_id);
return status;
}
apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response,
- ap_filter_t* f, apr_bucket_brigade *bb,
- struct apr_thread_cond_t *iowait)
+ h2_bucket_beam *output)
{
apr_status_t status;
int acquired;
@@ -793,37 +648,7 @@ apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response,
status = APR_ECONNABORTED;
}
else {
- status = out_open(m, stream_id, response, f, bb, iowait);
- if (APLOGctrace1(m->c)) {
- h2_util_bb_log(m->c, stream_id, APLOG_TRACE1, "h2_mplx_out_open", bb);
- }
- }
- leave_mutex(m, acquired);
- }
- return status;
-}
-
-apr_status_t h2_mplx_out_write(h2_mplx *m, int stream_id,
- ap_filter_t* f, int blocking,
- apr_bucket_brigade *bb,
- struct apr_thread_cond_t *iowait)
-{
- apr_status_t status;
- int acquired;
-
- AP_DEBUG_ASSERT(m);
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io && !io->orphaned) {
- status = out_write(m, io, f, blocking, bb, iowait);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
- "h2_mplx(%ld-%d): write", m->id, io->id);
- H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_write");
-
- have_out_data_for(m, stream_id);
- }
- else {
- status = APR_ECONNABORTED;
+ status = out_open(m, stream_id, response, output);
}
leave_mutex(m, acquired);
}
@@ -837,7 +662,7 @@ apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id)
AP_DEBUG_ASSERT(m);
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+ h2_io *io = h2_ilist_get(m->stream_ios, stream_id);
if (io && !io->orphaned) {
if (!io->response && !io->rst_error) {
/* In case a close comes before a response was created,
@@ -846,45 +671,20 @@ apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id)
*/
h2_response *r = h2_response_die(stream_id, APR_EGENERAL,
io->request, m->pool);
- status = out_open(m, stream_id, r, NULL, NULL, NULL);
+ status = out_open(m, stream_id, r, NULL);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c,
"h2_mplx(%ld-%d): close, no response, no rst",
m->id, io->id);
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
- "h2_mplx(%ld-%d): close with eor=%s",
- m->id, io->id, io->eor? "yes" : "no");
- status = h2_io_out_close(io);
- H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_close");
- io_out_consumed_signal(m, io);
-
- have_out_data_for(m, stream_id);
- }
- else {
- status = APR_ECONNABORTED;
- }
- leave_mutex(m, acquired);
- }
- return status;
-}
-
-apr_status_t h2_mplx_out_rst(h2_mplx *m, int stream_id, int error)
-{
- apr_status_t status;
- int acquired;
-
- AP_DEBUG_ASSERT(m);
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io && !io->rst_error && !io->orphaned) {
- h2_io_rst(io, error);
- if (!io->response) {
- h2_io_set_add(m->ready_ios, io);
+ "h2_mplx(%ld-%d): close", m->id, io->id);
+ if (io->beam_out) {
+ status = h2_beam_close(io->beam_out);
+ h2_beam_log(io->beam_out, stream_id, "out_close", m->c,
+ APLOG_TRACE2);
}
- H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_rst");
-
+ io_out_consumed_signal(m, io);
have_out_data_for(m, stream_id);
- h2_io_signal(io, H2_IO_WRITE);
}
else {
status = APR_ECONNABORTED;
@@ -894,26 +694,6 @@ apr_status_t h2_mplx_out_rst(h2_mplx *m, int stream_id, int error)
return status;
}
-int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id)
-{
- apr_status_t status;
- int has_data = 0;
- int acquired;
-
- AP_DEBUG_ASSERT(m);
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io && !io->orphaned) {
- has_data = h2_io_out_has_data(io);
- }
- else {
- has_data = 0;
- }
- leave_mutex(m, acquired);
- }
- return has_data;
-}
-
apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
apr_thread_cond_t *iowait)
{
@@ -969,22 +749,7 @@ apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx)
return status;
}
-static h2_io *open_io(h2_mplx *m, int stream_id, const h2_request *request)
-{
- apr_pool_t *io_pool;
- h2_io *io;
-
- apr_pool_create(&io_pool, m->pool);
- apr_pool_tag(io_pool, "h2_io");
- io = h2_io_create(stream_id, io_pool, m->bucket_alloc, request);
- h2_io_set_add(m->stream_ios, io);
-
- return io;
-}
-
-
-apr_status_t h2_mplx_process(h2_mplx *m, int stream_id,
- const h2_request *req,
+apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream,
h2_stream_pri_cmp *cmp, void *ctx)
{
apr_status_t status;
@@ -997,24 +762,38 @@ apr_status_t h2_mplx_process(h2_mplx *m, int stream_id,
status = APR_ECONNABORTED;
}
else {
- h2_io *io = open_io(m, stream_id, req);
+ apr_pool_t *io_pool;
+ h2_io *io;
- if (!io->request->body) {
- status = h2_io_in_close(io);
+ if (!m->need_registration) {
+ m->need_registration = h2_iq_empty(m->q);
}
-
- m->need_registration = m->need_registration || h2_iq_empty(m->q);
- do_registration = (m->need_registration && m->workers_busy < m->workers_max);
+ if (m->workers_busy < m->workers_max) {
+ do_registration = m->need_registration;
+ }
+
+ io_pool = m->spare_io_pool;
+ if (io_pool) {
+ m->spare_io_pool = NULL;
+ }
+ else {
+ apr_pool_create(&io_pool, m->pool);
+ apr_pool_tag(io_pool, "h2_io");
+ }
+ io = h2_io_create(stream->id, io_pool, stream->request);
+ h2_ilist_add(m->stream_ios, io);
h2_iq_add(m->q, io->id, cmp, ctx);
+ stream->input = io->beam_in;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
- "h2_mplx(%ld-%d): process", m->c->id, stream_id);
- H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_process");
+ "h2_mplx(%ld-%d): process, body=%d",
+ m->c->id, stream->id, io->request->body);
}
leave_mutex(m, acquired);
}
- if (status == APR_SUCCESS && do_registration) {
- workers_register(m);
+ if (do_registration) {
+ m->need_registration = 0;
+ h2_workers_register(m->workers, m);
}
return status;
}
@@ -1022,20 +801,24 @@ apr_status_t h2_mplx_process(h2_mplx *m, int stream_id,
static h2_task *pop_task(h2_mplx *m)
{
h2_task *task = NULL;
+ h2_io *io;
int sid;
- while (!m->aborted && !task
- && (m->workers_busy < m->workers_limit)
- && (sid = h2_iq_shift(m->q)) > 0) {
- h2_io *io = h2_io_set_get(m->stream_ios, sid);
- if (io && io->orphaned) {
- io_destroy(m, io, 0);
- if (m->join_wait) {
- apr_thread_cond_signal(m->join_wait);
- }
- }
- else if (io) {
+ while (!m->aborted && !task && (m->workers_busy < m->workers_limit)
+ && (sid = h2_iq_shift(m->q)) > 0) {
+
+ io = h2_ilist_get(m->stream_ios, sid);
+ if (io) {
conn_rec *slave, **pslave;
+ if (io->orphaned) {
+ /* TODO: add to purge list */
+ io_destroy(m, io, 0);
+ if (m->join_wait) {
+ apr_thread_cond_signal(m->join_wait);
+ }
+ continue;
+ }
+
pslave = (conn_rec **)apr_array_pop(m->spare_slaves);
if (pslave) {
slave = *pslave;
@@ -1046,12 +829,21 @@ static h2_task *pop_task(h2_mplx *m)
}
slave->sbh = m->c->sbh;
- io->task = task = h2_task_create(m->id, io->request, slave, m);
+ io->task = task = h2_task_create(slave, io->request,
+ io->beam_in, m);
m->c->keepalives++;
apr_table_setn(slave->notes, H2_TASK_ID_NOTE, task->id);
io->worker_started = 1;
io->started_at = apr_time_now();
+
+ if (io->beam_in) {
+ h2_beam_timeout_set(io->beam_in, m->stream_timeout);
+ h2_beam_on_consumed(io->beam_in, stream_input_consumed, m);
+ h2_beam_on_file_beam(io->beam_in, can_beam_file, m);
+ h2_beam_mutex_set(io->beam_in, io_mutex_enter,
+ io_mutex_leave, task->cond, m);
+ }
if (sid > m->max_stream_started) {
m->max_stream_started = sid;
}
@@ -1088,7 +880,7 @@ h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more)
static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
{
if (task) {
- h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id);
+ h2_io *io = h2_ilist_get(m->stream_ios, task->stream_id);
if (task->frozen) {
/* this task was handed over to an engine for processing
@@ -1112,14 +904,17 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
h2_mplx_out_close(m, task->stream_id);
if (ngn && io) {
- apr_off_t bytes = io->output_consumed + h2_io_out_length(io);
+ apr_off_t bytes = 0;
+ if (io->beam_out) {
+ h2_beam_send(io->beam_out, NULL, APR_NONBLOCK_READ);
+ bytes += h2_beam_get_buffered(io->beam_out);
+ }
if (bytes > 0) {
/* we need to report consumed and current buffered output
* to the engine. The request will be streamed out or cancelled,
* no more data is coming from it and the engine should update
* its calculations before we destroy this information. */
h2_req_engine_out_consumed(ngn, task->c, bytes);
- io->output_consumed = 0;
}
}
@@ -1136,10 +931,10 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
if (io) {
apr_time_t now = apr_time_now();
if (!io->orphaned && m->redo_ios
- && h2_io_set_get(m->redo_ios, io->id)) {
+ && h2_ilist_get(m->redo_ios, io->id)) {
/* reset and schedule again */
h2_io_redo(io);
- h2_io_set_remove(m->redo_ios, io);
+ h2_ilist_remove(m->redo_ios, io->id);
h2_iq_add(m->q, io->id, NULL, NULL);
}
else {
@@ -1168,6 +963,7 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
}
if (io->orphaned) {
+ /* TODO: add to purge list */
io_destroy(m, io, 0);
if (m->join_wait) {
apr_thread_cond_signal(m->join_wait);
@@ -1211,12 +1007,12 @@ typedef struct {
apr_time_t now;
} io_iter_ctx;
-static int latest_repeatable_busy_unsubmitted_iter(void *data, h2_io *io)
+static int latest_repeatable_busy_unsubmitted_iter(void *data, void *val)
{
io_iter_ctx *ctx = data;
+ h2_io *io = val;
if (io->worker_started && !io->worker_done
- && h2_io_is_repeatable(io)
- && !h2_io_set_get(ctx->m->redo_ios, io->id)) {
+ && h2_io_can_redo(io) && !h2_ilist_get(ctx->m->redo_ios, io->id)) {
/* this io occupies a worker, the response has not been submitted yet,
* not been cancelled and it is a repeatable request
* -> it can be re-scheduled later */
@@ -1233,13 +1029,14 @@ static h2_io *get_latest_repeatable_busy_unsubmitted_io(h2_mplx *m)
io_iter_ctx ctx;
ctx.m = m;
ctx.io = NULL;
- h2_io_set_iter(m->stream_ios, latest_repeatable_busy_unsubmitted_iter, &ctx);
+ h2_ilist_iter(m->stream_ios, latest_repeatable_busy_unsubmitted_iter, &ctx);
return ctx.io;
}
-static int timed_out_busy_iter(void *data, h2_io *io)
+static int timed_out_busy_iter(void *data, void *val)
{
io_iter_ctx *ctx = data;
+ h2_io *io = val;
if (io->worker_started && !io->worker_done
&& (ctx->now - io->started_at) > ctx->m->stream_timeout) {
/* timed out stream occupying a worker, found */
@@ -1254,7 +1051,7 @@ static h2_io *get_timed_out_busy_stream(h2_mplx *m)
ctx.m = m;
ctx.io = NULL;
ctx.now = apr_time_now();
- h2_io_set_iter(m->stream_ios, timed_out_busy_iter, &ctx);
+ h2_ilist_iter(m->stream_ios, timed_out_busy_iter, &ctx);
return ctx.io;
}
@@ -1264,19 +1061,19 @@ static apr_status_t unschedule_slow_ios(h2_mplx *m)
int n;
if (!m->redo_ios) {
- m->redo_ios = h2_io_set_create(m->pool);
+ m->redo_ios = h2_ilist_create(m->pool);
}
/* Try to get rid of streams that occupy workers. Look for safe requests
* that are repeatable. If none found, fail the connection.
*/
- n = (m->workers_busy - m->workers_limit - h2_io_set_size(m->redo_ios));
+ n = (m->workers_busy - m->workers_limit - h2_ilist_count(m->redo_ios));
while (n > 0 && (io = get_latest_repeatable_busy_unsubmitted_io(m))) {
- h2_io_set_add(m->redo_ios, io);
+ h2_ilist_add(m->redo_ios, io);
h2_io_rst(io, H2_ERR_CANCEL);
--n;
}
- if ((m->workers_busy - h2_io_set_size(m->redo_ios)) > m->workers_limit) {
+ if ((m->workers_busy - h2_ilist_count(m->redo_ios)) > m->workers_limit) {
io = get_timed_out_busy_stream(m);
if (io) {
/* Too many busy workers, unable to cancel enough streams
@@ -1295,7 +1092,7 @@ apr_status_t h2_mplx_idle(h2_mplx *m)
int acquired;
if (enter_mutex(m, &acquired) == APR_SUCCESS) {
- apr_size_t scount = h2_io_set_size(m->stream_ios);
+ apr_size_t scount = h2_ilist_count(m->stream_ios);
if (scount > 0 && m->workers_busy) {
/* If we have streams in connection state 'IDLE', meaning
* all streams are ready to sent data out, but lack
@@ -1350,9 +1147,10 @@ typedef struct {
int streams_updated;
} ngn_update_ctx;
-static int ngn_update_window(void *ctx, h2_io *io)
+static int ngn_update_window(void *ctx, void *val)
{
ngn_update_ctx *uctx = ctx;
+ h2_io *io = val;
if (io && io->task && io->task->assigned == uctx->ngn
&& io_out_consumed_signal(uctx->m, io)) {
++uctx->streams_updated;
@@ -1367,7 +1165,7 @@ static apr_status_t ngn_out_update_windows(h2_mplx *m, h2_req_engine *ngn)
ctx.m = m;
ctx.ngn = ngn;
ctx.streams_updated = 0;
- h2_io_set_iter(m->stream_ios, ngn_update_window, &ctx);
+ h2_ilist_iter(m->stream_ios, ngn_update_window, &ctx);
return ctx.streams_updated? APR_SUCCESS : APR_EAGAIN;
}
@@ -1389,7 +1187,7 @@ apr_status_t h2_mplx_req_engine_push(const char *ngn_type,
task->r = r;
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id);
+ h2_io *io = h2_ilist_get(m->stream_ios, task->stream_id);
if (!io || io->orphaned) {
status = APR_ECONNABORTED;
}
diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h
index 4029847678..7e6f5bcead 100644
--- a/modules/http2/h2_mplx.h
+++ b/modules/http2/h2_mplx.h
@@ -37,16 +37,17 @@
struct apr_pool_t;
struct apr_thread_mutex_t;
struct apr_thread_cond_t;
+struct h2_bucket_beam;
struct h2_config;
struct h2_ihash_t;
+struct h2_ilist_t;
struct h2_response;
struct h2_task;
struct h2_stream;
struct h2_request;
-struct h2_io_set;
struct apr_thread_cond_t;
struct h2_workers;
-struct h2_int_queue;
+struct h2_iqueue;
struct h2_ngn_shed;
struct h2_req_engine;
@@ -72,10 +73,10 @@ struct h2_mplx {
unsigned int aborted : 1;
unsigned int need_registration : 1;
- struct h2_int_queue *q;
- struct h2_io_set *stream_ios;
- struct h2_io_set *ready_ios;
- struct h2_io_set *redo_ios;
+ struct h2_iqueue *q;
+ struct h2_ilist_t *stream_ios;
+ struct h2_ilist_t *ready_ios;
+ struct h2_ilist_t *redo_ios;
apr_uint32_t max_streams; /* max # of concurrent streams */
apr_uint32_t max_stream_started; /* highest stream id that started processing */
@@ -96,10 +97,11 @@ struct h2_mplx {
apr_size_t stream_max_mem;
apr_interval_time_t stream_timeout;
+ apr_pool_t *spare_io_pool;
apr_array_header_t *spare_slaves; /* spare slave connections */
struct h2_workers *workers;
- apr_size_t tx_handles_reserved;
+ int tx_handles_reserved;
apr_size_t tx_chunk_size;
h2_mplx_consumed_cb *input_consumed;
@@ -166,10 +168,6 @@ apr_uint32_t h2_mplx_shutdown(h2_mplx *m);
*/
apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error);
-/* Return != 0 iff the multiplexer has output data for the given stream.
- */
-int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id);
-
/**
* Waits on output data from any stream in this session to become available.
* Returns APR_TIMEUP if no data arrived in the given time.
@@ -190,8 +188,7 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
* @param cmp the stream priority compare function
* @param ctx context data for the compare function
*/
-apr_status_t h2_mplx_process(h2_mplx *m, int stream_id,
- const struct h2_request *r,
+apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream,
h2_stream_pri_cmp *cmp, void *ctx);
/**
@@ -219,37 +216,11 @@ void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx);
******************************************************************************/
/**
- * Reads a buckets for the given stream_id. Will return ARP_EAGAIN when
- * called with APR_NONBLOCK_READ and no data present. Will return APR_EOF
- * when the end of the stream input has been reached.
- * The condition passed in will be used for blocking/signalling and will
- * be protected by the mplx's own mutex.
- */
-apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block,
- int stream_id, apr_bucket_brigade *bb,
- apr_table_t *trailers,
- struct apr_thread_cond_t *iowait);
-
-/**
- * Appends data to the input of the given stream. Storage of input data is
- * not subject to flow control.
- */
-apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id,
- const char *data, apr_size_t len, int eos);
-
-/**
- * Closes the input for the given stream_id.
- */
-apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id);
-
-/**
* Invoke the consumed callback for all streams that had bytes read since the
* last call to this function. If no stream had input data consumed, the
* callback is not invoked.
* The consumed callback may also be invoked at other times whenever
* the need arises.
- * Returns APR_SUCCESS when an update happened, APR_EAGAIN if no update
- * happened.
*/
apr_status_t h2_mplx_in_update_windows(h2_mplx *m);
@@ -267,44 +238,17 @@ struct h2_stream *h2_mplx_next_submit(h2_mplx *m,
struct h2_ihash_t *streams);
/**
- * Reads output data into the given brigade. Will never block, but
- * return APR_EAGAIN until data arrives or the stream is closed.
- */
-apr_status_t h2_mplx_out_get_brigade(h2_mplx *mplx, int stream_id,
- apr_bucket_brigade *bb,
- apr_off_t len, apr_table_t **ptrailers);
-
-/**
* Opens the output for the given stream with the specified response.
*/
apr_status_t h2_mplx_out_open(h2_mplx *mplx, int stream_id,
struct h2_response *response,
- ap_filter_t* filter, apr_bucket_brigade *bb,
- struct apr_thread_cond_t *iowait);
-
-/**
- * Append the brigade to the stream output. Might block if amount
- * of bytes buffered reaches configured max.
- * @param stream_id the stream identifier
- * @param filter the apache filter context of the data
- * @param blocking == 0 iff call should return with APR_INCOMPLETE if
- * the full brigade cannot be written at once
- * @param bb the bucket brigade to append
- * @param iowait a conditional used for block/signalling in h2_mplx
- */
-apr_status_t h2_mplx_out_write(h2_mplx *mplx, int stream_id,
- ap_filter_t* filter,
- int blocking,
- apr_bucket_brigade *bb,
- struct apr_thread_cond_t *iowait);
+ struct h2_bucket_beam *output);
/**
* Closes the output for stream stream_id.
*/
apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id);
-apr_status_t h2_mplx_out_rst(h2_mplx *m, int stream_id, int error);
-
/*******************************************************************************
* h2_mplx list Manipulation.
******************************************************************************/
diff --git a/modules/http2/h2_ngn_shed.c b/modules/http2/h2_ngn_shed.c
index 32483d9332..48ac09fae6 100644
--- a/modules/http2/h2_ngn_shed.c
+++ b/modules/http2/h2_ngn_shed.c
@@ -33,12 +33,10 @@
#include "h2_conn.h"
#include "h2_ctx.h"
#include "h2_h2.h"
-#include "h2_int_queue.h"
#include "h2_mplx.h"
#include "h2_response.h"
#include "h2_request.h"
#include "h2_task.h"
-#include "h2_task_output.h"
#include "h2_util.h"
#include "h2_ngn_shed.h"
diff --git a/modules/http2/h2_proxy_session.c b/modules/http2/h2_proxy_session.c
index 6f0298e2ef..19aff5bf6f 100644
--- a/modules/http2/h2_proxy_session.c
+++ b/modules/http2/h2_proxy_session.c
@@ -23,7 +23,6 @@
#include "mod_http2.h"
#include "h2.h"
-#include "h2_int_queue.h"
#include "h2_request.h"
#include "h2_util.h"
#include "h2_proxy_session.h"
diff --git a/modules/http2/h2_proxy_session.h b/modules/http2/h2_proxy_session.h
index 7078981c7a..52be5c6b37 100644
--- a/modules/http2/h2_proxy_session.h
+++ b/modules/http2/h2_proxy_session.h
@@ -20,7 +20,7 @@
#include <nghttp2/nghttp2.h>
-struct h2_int_queue;
+struct h2_iqueue;
struct h2_ihash_t;
typedef enum {
@@ -74,7 +74,7 @@ struct h2_proxy_session {
apr_interval_time_t wait_timeout;
struct h2_ihash_t *streams;
- struct h2_int_queue *suspended;
+ struct h2_iqueue *suspended;
apr_size_t remote_max_concurrent;
int last_stream_id; /* last stream id processed by backend, or 0 */
diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c
index 8da2e9f2bf..e491f1f1b7 100644
--- a/modules/http2/h2_session.c
+++ b/modules/http2/h2_session.c
@@ -112,7 +112,7 @@ static void cleanup_streams(h2_session *session)
while (1) {
h2_ihash_iter(session->streams, find_cleanup_stream, &ctx);
if (ctx.candidate) {
- h2_session_stream_destroy(session, ctx.candidate);
+ h2_session_stream_done(session, ctx.candidate);
ctx.candidate = NULL;
}
else {
@@ -1277,8 +1277,9 @@ static apr_status_t submit_response(h2_session *session, h2_stream *stream)
const h2_priority *prio;
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03073)
- "h2_stream(%ld-%d): submit response %d",
- session->id, stream->id, response->http_status);
+ "h2_stream(%ld-%d): submit response %d, REMOTE_WINDOW_SIZE=%u",
+ session->id, stream->id, response->http_status,
+ (unsigned int)nghttp2_session_get_stream_remote_window_size(session->ngh2, stream->id));
if (response->content_length != 0) {
memset(&provider, 0, sizeof(provider));
@@ -1504,21 +1505,24 @@ apr_status_t h2_session_set_prio(h2_session *session, h2_stream *stream,
return status;
}
-apr_status_t h2_session_stream_destroy(h2_session *session, h2_stream *stream)
+apr_status_t h2_session_stream_done(h2_session *session, h2_stream *stream)
{
apr_pool_t *pool = h2_stream_detach_pool(stream);
-
+ int stream_id = stream->id;
+ int rst_error = stream->rst_error;
+
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
"h2_stream(%ld-%d): cleanup by EOS bucket destroy",
- session->id, stream->id);
+ session->id, stream_id);
+ if (session->streams) {
+ h2_ihash_remove(session->streams, stream_id);
+ }
+
+ h2_stream_cleanup(stream);
/* this may be called while the session has already freed
* some internal structures or even when the mplx is locked. */
if (session->mplx) {
- h2_mplx_stream_done(session->mplx, stream->id, stream->rst_error);
- }
-
- if (session->streams) {
- h2_ihash_remove(session->streams, stream->id);
+ h2_mplx_stream_done(session->mplx, stream_id, rst_error);
}
h2_stream_destroy(stream);
@@ -1529,6 +1533,7 @@ apr_status_t h2_session_stream_destroy(h2_session *session, h2_stream *stream)
}
session->spare = pool;
}
+
return APR_SUCCESS;
}
@@ -2217,9 +2222,10 @@ apr_status_t h2_session_process(h2_session *session, int async)
}
/* send out window updates for our inputs */
status = h2_mplx_in_update_windows(session->mplx);
- if (status != APR_SUCCESS && status != APR_EAGAIN) {
+ if (status != APR_SUCCESS) {
dispatch_event(session, H2_SESSION_EV_CONN_ERROR,
- H2_ERR_INTERNAL_ERROR, "window update error");
+ H2_ERR_INTERNAL_ERROR,
+ "window update error");
break;
}
}
diff --git a/modules/http2/h2_session.h b/modules/http2/h2_session.h
index 28eedba843..bf4ded338a 100644
--- a/modules/http2/h2_session.h
+++ b/modules/http2/h2_session.h
@@ -218,8 +218,8 @@ int h2_session_push_enabled(h2_session *session);
* @param session the session to which the stream belongs
* @param stream the stream to destroy
*/
-apr_status_t h2_session_stream_destroy(h2_session *session,
- struct h2_stream *stream);
+apr_status_t h2_session_stream_done(h2_session *session,
+ struct h2_stream *stream);
/**
* Submit a push promise on the stream and schedule the new steam for
diff --git a/modules/http2/h2_stream.c b/modules/http2/h2_stream.c
index a7979e3ffa..ae98019456 100644
--- a/modules/http2/h2_stream.c
+++ b/modules/http2/h2_stream.c
@@ -24,6 +24,7 @@
#include <nghttp2/nghttp2.h>
#include "h2_private.h"
+#include "h2_bucket_beam.h"
#include "h2_conn.h"
#include "h2_config.h"
#include "h2_h2.h"
@@ -36,7 +37,6 @@
#include "h2_stream.h"
#include "h2_task.h"
#include "h2_ctx.h"
-#include "h2_task_input.h"
#include "h2_task.h"
#include "h2_util.h"
@@ -52,6 +52,13 @@ static int state_transition[][7] = {
/*CL*/{ 1, 1, 0, 0, 1, 1, 1 },
};
+#define H2_STREAM_OUT_LOG(lvl,s,msg) \
+ do { \
+ if (APLOG_C_IS_LEVEL((s)->session->c,lvl)) \
+ h2_util_bb_log((s)->session->c,(s)->session->id,lvl,msg,(s)->buffer); \
+ } while(0)
+
+
static int set_state(h2_stream *stream, h2_stream_state_t state)
{
int allowed = state_transition[state][stream->state];
@@ -135,8 +142,6 @@ static int output_open(h2_stream *stream)
}
}
-static h2_sos *h2_sos_mplx_create(h2_stream *stream, h2_response *response);
-
h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session,
int initiated_on, const h2_request *creq)
{
@@ -166,18 +171,26 @@ h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session,
return stream;
}
-apr_status_t h2_stream_destroy(h2_stream *stream)
+void h2_stream_cleanup(h2_stream *stream)
{
AP_DEBUG_ASSERT(stream);
+ if (stream->buffer) {
+ apr_brigade_cleanup(stream->buffer);
+ }
+}
+
+void h2_stream_destroy(h2_stream *stream)
+{
+ AP_DEBUG_ASSERT(stream);
+ h2_stream_cleanup(stream);
if (stream->pool) {
apr_pool_destroy(stream->pool);
}
- return APR_SUCCESS;
}
-void h2_stream_cleanup(h2_stream *stream)
+void h2_stream_eos_destroy(h2_stream *stream)
{
- h2_session_stream_destroy(stream->session, stream);
+ h2_session_stream_done(stream->session, stream);
/* stream is gone */
}
@@ -200,33 +213,7 @@ void h2_stream_rst(h2_stream *stream, int error_code)
struct h2_response *h2_stream_get_response(h2_stream *stream)
{
- return stream->sos? stream->sos->response : NULL;
-}
-
-apr_status_t h2_stream_set_response(h2_stream *stream, h2_response *response,
- apr_bucket_brigade *bb)
-{
- apr_status_t status = APR_SUCCESS;
- h2_sos *sos;
-
- if (!output_open(stream)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
- "h2_stream(%ld-%d): output closed",
- stream->session->id, stream->id);
- return APR_ECONNRESET;
- }
-
- sos = h2_sos_mplx_create(stream, response);
- if (sos->response->sos_filter) {
- sos = h2_filter_sos_create(sos->response->sos_filter, sos);
- }
- stream->sos = sos;
-
- status = stream->sos->buffer(stream->sos, bb);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->c,
- "h2_stream(%ld-%d): set_response(%d)",
- stream->session->id, stream->id, stream->sos->response->http_status);
- return status;
+ return stream->response;
}
apr_status_t h2_stream_set_request(h2_stream *stream, request_rec *r)
@@ -286,15 +273,11 @@ apr_status_t h2_stream_schedule(h2_stream *stream, int eos, int push_enabled,
status = h2_request_end_headers(stream->request, stream->pool,
eos, push_enabled);
if (status == APR_SUCCESS) {
- if (!eos) {
- stream->request->body = 1;
- }
- stream->input_remaining = stream->request->content_length;
-
- status = h2_mplx_process(stream->session->mplx, stream->id,
- stream->request, cmp, ctx);
+ stream->request->body = !eos;
stream->scheduled = 1;
+ stream->input_remaining = stream->request->content_length;
+ status = h2_mplx_process(stream->session->mplx, stream, cmp, ctx);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
"h2_stream(%ld-%d): scheduled %s %s://%s%s",
stream->session->id, stream->id,
@@ -331,8 +314,8 @@ apr_status_t h2_stream_close_input(h2_stream *stream)
return APR_ECONNRESET;
}
- if (close_input(stream)) {
- status = h2_mplx_in_close(stream->session->mplx, stream->id);
+ if (close_input(stream) && stream->input) {
+ status = h2_beam_close(stream->input);
}
return status;
}
@@ -340,25 +323,29 @@ apr_status_t h2_stream_close_input(h2_stream *stream)
apr_status_t h2_stream_write_data(h2_stream *stream,
const char *data, size_t len, int eos)
{
+ conn_rec *c = stream->session->c;
apr_status_t status = APR_SUCCESS;
AP_DEBUG_ASSERT(stream);
+ if (!stream->input) {
+ return APR_EOF;
+ }
if (input_closed(stream) || !stream->request->eoh) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_stream(%ld-%d): writing denied, closed=%d, eoh=%d",
stream->session->id, stream->id, input_closed(stream),
stream->request->eoh);
return APR_EINVAL;
}
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_stream(%ld-%d): add %ld input bytes",
stream->session->id, stream->id, (long)len);
if (!stream->request->chunked) {
stream->input_remaining -= len;
if (stream->input_remaining < 0) {
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, c,
APLOGNO(02961)
"h2_stream(%ld-%d): got %ld more content bytes than announced "
"in content-length header: %ld",
@@ -370,10 +357,18 @@ apr_status_t h2_stream_write_data(h2_stream *stream,
}
}
- status = h2_mplx_in_write(stream->session->mplx, stream->id, data, len, eos);
+ if (!stream->tmp) {
+ stream->tmp = apr_brigade_create(stream->pool, c->bucket_alloc);
+ }
+ apr_brigade_write(stream->tmp, NULL, NULL, data, len);
if (eos) {
+ APR_BRIGADE_INSERT_TAIL(stream->tmp,
+ apr_bucket_eos_create(c->bucket_alloc));
close_input(stream);
}
+
+ status = h2_beam_send(stream->input, stream->tmp, APR_BLOCK_READ);
+ apr_brigade_cleanup(stream->tmp);
return status;
}
@@ -392,44 +387,122 @@ int h2_stream_is_suspended(const h2_stream *stream)
return stream->suspended;
}
-apr_status_t h2_stream_out_prepare(h2_stream *stream,
+static apr_status_t fill_buffer(h2_stream *stream, apr_size_t amount)
+{
+ if (!stream->output) {
+ return APR_EOF;
+ }
+ return h2_beam_receive(stream->output, stream->buffer,
+ APR_NONBLOCK_READ, amount);
+}
+
+apr_status_t h2_stream_set_response(h2_stream *stream, h2_response *response,
+ h2_bucket_beam *output)
+{
+ apr_status_t status = APR_SUCCESS;
+ conn_rec *c = stream->session->c;
+
+ if (!output_open(stream)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
+ "h2_stream(%ld-%d): output closed",
+ stream->session->id, stream->id);
+ return APR_ECONNRESET;
+ }
+
+ stream->response = response;
+ stream->output = output;
+ stream->buffer = apr_brigade_create(stream->pool, c->bucket_alloc);
+
+ h2_stream_filter(stream);
+ if (stream->output) {
+ status = fill_buffer(stream, 0);
+ }
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
+ "h2_stream(%ld-%d): set_response(%d)",
+ stream->session->id, stream->id,
+ stream->response->http_status);
+ return status;
+}
+
+apr_status_t h2_stream_out_prepare(h2_stream *stream,
apr_off_t *plen, int *peos)
{
+ conn_rec *c = stream->session->c;
+ apr_status_t status = APR_SUCCESS;
+ apr_off_t requested = (*plen > 0)? *plen : 32*1024;
+
if (stream->rst_error) {
*plen = 0;
*peos = 1;
return APR_ECONNRESET;
}
- AP_DEBUG_ASSERT(stream->sos);
- return stream->sos->prepare(stream->sos, plen, peos);
+ H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "h2_stream_out_prepare_pre");
+ h2_util_bb_avail(stream->buffer, plen, peos);
+ if (!*peos && !*plen) {
+ /* try to get more data */
+ status = fill_buffer(stream, H2MIN(requested, 32*1024));
+ if (APR_STATUS_IS_EOF(status)) {
+ apr_bucket *eos = apr_bucket_eos_create(c->bucket_alloc);
+ APR_BRIGADE_INSERT_TAIL(stream->buffer, eos);
+ status = APR_SUCCESS;
+ }
+ h2_util_bb_avail(stream->buffer, plen, peos);
+ }
+ H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "h2_stream_out_prepare_post");
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
+ "h2_stream(%ld-%d): prepare, len=%ld eos=%d, trailers=%s",
+ c->id, stream->id, (long)*plen, *peos,
+ (stream->response && stream->response->trailers)?
+ "yes" : "no");
+ if (!*peos && !*plen && status == APR_SUCCESS) {
+ return APR_EAGAIN;
+ }
+ return status;
}
+
apr_status_t h2_stream_readx(h2_stream *stream,
h2_io_data_cb *cb, void *ctx,
apr_off_t *plen, int *peos)
{
+ conn_rec *c = stream->session->c;
+ apr_status_t status = APR_SUCCESS;
+
if (stream->rst_error) {
return APR_ECONNRESET;
}
- if (!stream->sos) {
- return APR_EGENERAL;
+ status = h2_util_bb_readx(stream->buffer, cb, ctx, plen, peos);
+ if (status == APR_SUCCESS && !*peos && !*plen) {
+ status = APR_EAGAIN;
}
- return stream->sos->readx(stream->sos, cb, ctx, plen, peos);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, c,
+ "h2_stream(%ld-%d): readx, len=%ld eos=%d",
+ c->id, stream->id, (long)*plen, *peos);
+ return status;
}
+
apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb,
apr_off_t *plen, int *peos)
{
+ conn_rec *c = stream->session->c;
+ apr_status_t status = APR_SUCCESS;
+
if (stream->rst_error) {
return APR_ECONNRESET;
}
- if (!stream->sos) {
- return APR_EGENERAL;
+ status = h2_append_brigade(bb, stream->buffer, plen, peos);
+ if (status == APR_SUCCESS && !*peos && !*plen) {
+ status = APR_EAGAIN;
}
- return stream->sos->read_to(stream->sos, bb, plen, peos);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, c,
+ "h2_stream(%ld-%d): read_to, len=%ld eos=%d",
+ c->id, stream->id, (long)*plen, *peos);
+ return status;
}
+
int h2_stream_input_is_open(const h2_stream *stream)
{
return input_open(stream);
@@ -474,7 +547,7 @@ apr_status_t h2_stream_submit_pushes(h2_stream *stream)
apr_table_t *h2_stream_get_trailers(h2_stream *stream)
{
- return stream->sos? stream->sos->get_trailers(stream->sos) : NULL;
+ return stream->response? stream->response->trailers : NULL;
}
const h2_priority *h2_stream_get_priority(h2_stream *stream)
@@ -491,147 +564,3 @@ const h2_priority *h2_stream_get_priority(h2_stream *stream)
return NULL;
}
-/*******************************************************************************
- * h2_sos_mplx
- ******************************************************************************/
-
-typedef struct h2_sos_mplx {
- h2_mplx *m;
- apr_bucket_brigade *bb;
- apr_bucket_brigade *tmp;
- apr_table_t *trailers;
- apr_off_t buffer_size;
-} h2_sos_mplx;
-
-#define H2_SOS_MPLX_OUT(lvl,msos,msg) \
- do { \
- if (APLOG_C_IS_LEVEL((msos)->m->c,lvl)) \
- h2_util_bb_log((msos)->m->c,(msos)->m->id,lvl,msg,(msos)->bb); \
- } while(0)
-
-
-static apr_status_t mplx_transfer(h2_sos_mplx *msos, int stream_id,
- apr_pool_t *pool)
-{
- apr_status_t status;
- apr_table_t *trailers = NULL;
-
- if (!msos->tmp) {
- msos->tmp = apr_brigade_create(msos->bb->p, msos->bb->bucket_alloc);
- }
- status = h2_mplx_out_get_brigade(msos->m, stream_id, msos->tmp,
- msos->buffer_size-1, &trailers);
- if (!APR_BRIGADE_EMPTY(msos->tmp)) {
- h2_transfer_brigade(msos->bb, msos->tmp, pool);
- }
- if (trailers) {
- msos->trailers = trailers;
- }
- return status;
-}
-
-static apr_status_t h2_sos_mplx_read_to(h2_sos *sos, apr_bucket_brigade *bb,
- apr_off_t *plen, int *peos)
-{
- h2_sos_mplx *msos = sos->ctx;
- apr_status_t status;
-
- status = h2_append_brigade(bb, msos->bb, plen, peos);
- if (status == APR_SUCCESS && !*peos && !*plen) {
- status = APR_EAGAIN;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, msos->m->c,
- "h2_stream(%ld-%d): read_to, len=%ld eos=%d",
- msos->m->id, sos->stream->id, (long)*plen, *peos);
- }
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, msos->m->c,
- "h2_stream(%ld-%d): read_to, len=%ld eos=%d",
- msos->m->id, sos->stream->id, (long)*plen, *peos);
- return status;
-}
-
-static apr_status_t h2_sos_mplx_readx(h2_sos *sos, h2_io_data_cb *cb, void *ctx,
- apr_off_t *plen, int *peos)
-{
- h2_sos_mplx *msos = sos->ctx;
- apr_status_t status = APR_SUCCESS;
-
- status = h2_util_bb_readx(msos->bb, cb, ctx, plen, peos);
- if (status == APR_SUCCESS && !*peos && !*plen) {
- status = APR_EAGAIN;
- }
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, msos->m->c,
- "h2_stream(%ld-%d): readx, len=%ld eos=%d",
- msos->m->id, sos->stream->id, (long)*plen, *peos);
- return status;
-}
-
-static apr_status_t h2_sos_mplx_prepare(h2_sos *sos, apr_off_t *plen, int *peos)
-{
- h2_sos_mplx *msos = sos->ctx;
- apr_status_t status = APR_SUCCESS;
-
- H2_SOS_MPLX_OUT(APLOG_TRACE2, msos, "h2_sos_mplx prepare_pre");
-
- if (APR_BRIGADE_EMPTY(msos->bb)) {
- status = mplx_transfer(msos, sos->stream->id, sos->stream->pool);
- }
- h2_util_bb_avail(msos->bb, plen, peos);
-
- H2_SOS_MPLX_OUT(APLOG_TRACE2, msos, "h2_sos_mplx prepare_post");
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, msos->m->c,
- "h2_stream(%ld-%d): prepare, len=%ld eos=%d, trailers=%s",
- msos->m->id, sos->stream->id, (long)*plen, *peos,
- msos->trailers? "yes" : "no");
- if (!*peos && !*plen) {
- status = APR_EAGAIN;
- }
-
- return status;
-}
-
-static apr_table_t *h2_sos_mplx_get_trailers(h2_sos *sos)
-{
- h2_sos_mplx *msos = sos->ctx;
-
- return msos->trailers;
-}
-
-static apr_status_t h2_sos_mplx_buffer(h2_sos *sos, apr_bucket_brigade *bb)
-{
- h2_sos_mplx *msos = sos->ctx;
- apr_status_t status = APR_SUCCESS;
-
- if (bb && !APR_BRIGADE_EMPTY(bb)) {
- H2_SOS_MPLX_OUT(APLOG_TRACE2, msos, "h2_sos_mplx set_response_pre");
- status = mplx_transfer(msos, sos->stream->id, sos->stream->pool);
- H2_SOS_MPLX_OUT(APLOG_TRACE2, msos, "h2_sos_mplx set_response_post");
- }
- return status;
-}
-
-static h2_sos *h2_sos_mplx_create(h2_stream *stream, h2_response *response)
-{
- h2_sos *sos;
- h2_sos_mplx *msos;
-
- msos = apr_pcalloc(stream->pool, sizeof(*msos));
- msos->m = stream->session->mplx;
- msos->bb = apr_brigade_create(stream->pool, msos->m->c->bucket_alloc);
- msos->buffer_size = 32 * 1024;
-
- sos = apr_pcalloc(stream->pool, sizeof(*sos));
- sos->stream = stream;
- sos->response = response;
-
- sos->ctx = msos;
- sos->buffer = h2_sos_mplx_buffer;
- sos->prepare = h2_sos_mplx_prepare;
- sos->readx = h2_sos_mplx_readx;
- sos->read_to = h2_sos_mplx_read_to;
- sos->get_trailers = h2_sos_mplx_get_trailers;
-
- sos->response = response;
-
- return sos;
-}
-
diff --git a/modules/http2/h2_stream.h b/modules/http2/h2_stream.h
index 6f142d9dc0..c851f7e64b 100644
--- a/modules/http2/h2_stream.h
+++ b/modules/http2/h2_stream.h
@@ -38,6 +38,7 @@ struct h2_request;
struct h2_response;
struct h2_session;
struct h2_sos;
+struct h2_bucket_beam;
typedef struct h2_stream h2_stream;
@@ -48,8 +49,14 @@ struct h2_stream {
apr_pool_t *pool; /* the memory pool for this stream */
struct h2_request *request; /* the request made in this stream */
+ struct h2_bucket_beam *input;
+
+ struct h2_response *response;
+ struct h2_bucket_beam *output;
+ apr_bucket_brigade *buffer;
+ apr_bucket_brigade *tmp;
+
int rst_error; /* stream error for RST_STREAM */
-
unsigned int aborted : 1; /* was aborted */
unsigned int suspended : 1; /* DATA sending has been suspended */
unsigned int scheduled : 1; /* stream has been scheduled */
@@ -57,7 +64,6 @@ struct h2_stream {
apr_off_t input_remaining; /* remaining bytes on input as advertised via content-length */
- struct h2_sos *sos; /* stream output source, e.g. to read output from */
apr_off_t data_frames_sent; /* # of DATA frames sent out for this stream */
};
@@ -75,12 +81,14 @@ h2_stream *h2_stream_open(int id, apr_pool_t *pool, struct h2_session *session,
int initiated_on, const struct h2_request *req);
/**
- * Destroy any resources held by this stream. Will destroy memory pool
- * if still owned by the stream.
- *
- * @param stream the stream to destroy
+ * Cleanup any resources still held by the stream, called by last bucket.
+ */
+void h2_stream_eos_destroy(h2_stream *stream);
+
+/**
+ * Destroy memory pool if still owned by the stream.
*/
-apr_status_t h2_stream_destroy(h2_stream *stream);
+void h2_stream_destroy(h2_stream *stream);
/**
* Removes stream from h2_session and destroys it.
@@ -179,7 +187,7 @@ struct h2_response *h2_stream_get_response(h2_stream *stream);
*/
apr_status_t h2_stream_set_response(h2_stream *stream,
struct h2_response *response,
- apr_bucket_brigade *bb);
+ struct h2_bucket_beam *output);
/**
* Do a speculative read on the stream output to determine the
diff --git a/modules/http2/h2_task.c b/modules/http2/h2_task.c
index e764fcc832..ba67b4198f 100644
--- a/modules/http2/h2_task.c
+++ b/modules/http2/h2_task.c
@@ -33,6 +33,7 @@
#include <scoreboard.h>
#include "h2_private.h"
+#include "h2_bucket_beam.h"
#include "h2_conn.h"
#include "h2_config.h"
#include "h2_ctx.h"
@@ -42,12 +43,344 @@
#include "h2_request.h"
#include "h2_session.h"
#include "h2_stream.h"
-#include "h2_task_input.h"
-#include "h2_task_output.h"
#include "h2_task.h"
-#include "h2_ctx.h"
#include "h2_worker.h"
+#include "h2_util.h"
+
+/*******************************************************************************
+ * task input handling
+ ******************************************************************************/
+
+static int input_ser_header(void *ctx, const char *name, const char *value)
+{
+ h2_task *task = ctx;
+ apr_brigade_printf(task->input.bb, NULL, NULL, "%s: %s\r\n", name, value);
+ return 1;
+}
+
+static apr_status_t input_append_eos(h2_task *task, request_rec *r)
+{
+ apr_status_t status = APR_SUCCESS;
+ apr_bucket_brigade *bb = task->input.bb;
+ apr_table_t *t = task->request->trailers;
+
+ if (task->input.chunked) {
+ if (t && !apr_is_empty_table(t)) {
+ status = apr_brigade_puts(bb, NULL, NULL, "0\r\n");
+ apr_table_do(input_ser_header, task, t, NULL);
+ status = apr_brigade_puts(bb, NULL, NULL, "\r\n");
+ }
+ else {
+ status = apr_brigade_puts(bb, NULL, NULL, "0\r\n\r\n");
+ }
+ }
+ else if (r && t && !apr_is_empty_table(t)){
+ /* trailers passed in directly. */
+ apr_table_overlap(r->trailers_in, t, APR_OVERLAP_TABLES_SET);
+ }
+ task->input.eos_written = 1;
+ APR_BRIGADE_INSERT_TAIL(bb, apr_bucket_eos_create(bb->bucket_alloc));
+ return status;
+}
+
+static apr_status_t input_read(h2_task *task, ap_filter_t* f,
+ apr_bucket_brigade* bb, ap_input_mode_t mode,
+ apr_read_type_e block, apr_off_t readbytes)
+{
+ apr_status_t status = APR_SUCCESS;
+ apr_bucket *b, *next;
+ apr_off_t bblen = 0;
+
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
+ "h2_task(%s): read, mode=%d, block=%d, readbytes=%ld",
+ task->id, mode, block, (long)readbytes);
+
+ if (mode == AP_MODE_INIT) {
+ return ap_get_brigade(f->c->input_filters, bb, mode, block, readbytes);
+ }
+
+ if (f->c->aborted) {
+ return APR_ECONNABORTED;
+ }
+
+ if (task->input.bb) {
+ /* Cleanup brigades from those nasty 0 length non-meta buckets
+ * that apr_brigade_split_line() sometimes produces. */
+ for (b = APR_BRIGADE_FIRST(task->input.bb);
+ b != APR_BRIGADE_SENTINEL(task->input.bb); b = next) {
+ next = APR_BUCKET_NEXT(b);
+ if (b->length == 0 && !APR_BUCKET_IS_METADATA(b)) {
+ apr_bucket_delete(b);
+ }
+ }
+ apr_brigade_length(task->input.bb, 0, &bblen);
+ }
+
+ if (bblen == 0) {
+ if (task->input.eos_written) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, APR_EOF, f->c,
+ "h2_task(%s): read no data", task->id);
+ return APR_EOF;
+ }
+ else if (task->input.eos) {
+ input_append_eos(task, f->r);
+ }
+ }
+
+ while (APR_BRIGADE_EMPTY(task->input.bb)) {
+ /* Get more input data for our request. */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
+ "h2_task(%s): get more data from mplx, block=%d, "
+ "readbytes=%ld, queued=%ld",
+ task->id, block, (long)readbytes, (long)bblen);
+
+ /* Override the block mode we get called with depending on the input's
+ * setting. */
+ if (task->input.beam) {
+ status = h2_beam_receive(task->input.beam, task->input.bb, block,
+ H2MIN(readbytes, 32*1024));
+ }
+ else {
+ status = APR_EOF;
+ }
+
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
+ "h2_task(%s): read returned", task->id);
+ if (APR_STATUS_IS_EAGAIN(status)
+ && (mode == AP_MODE_GETLINE || block == APR_BLOCK_READ)) {
+ /* chunked input handling does not seem to like it if we
+ * return with APR_EAGAIN from a GETLINE read...
+ * upload 100k test on test-ser.example.org hangs */
+ status = APR_SUCCESS;
+ }
+ else if (APR_STATUS_IS_EOF(status) && !task->input.eos_written) {
+ task->input.eos = 1;
+ }
+ else if (status != APR_SUCCESS) {
+ return status;
+ }
+
+ apr_brigade_length(task->input.bb, 0, &bblen);
+ if (bblen > 0 && task->input.chunked) {
+ /* need to add chunks since request processing expects it */
+ char buffer[128];
+ apr_bucket *b;
+ int len;
+
+ len = apr_snprintf(buffer, H2_ALEN(buffer), "%lx\r\n",
+ (unsigned long)bblen);
+ b = apr_bucket_heap_create(buffer, len, NULL,
+ task->input.bb->bucket_alloc);
+ APR_BRIGADE_INSERT_HEAD(task->input.bb, b);
+ status = apr_brigade_puts(task->input.bb, NULL, NULL, "\r\n");
+ }
+
+ if (h2_util_has_eos(task->input.bb, -1)) {
+ task->input.eos = 1;
+ }
+
+ if (task->input.eos && !task->input.eos_written) {
+ input_append_eos(task, f->r);
+ }
+
+ if (h2_task_logio_add_bytes_in) {
+ h2_task_logio_add_bytes_in(f->c, bblen);
+ }
+ }
+
+ h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2,
+ "task_input.bb", task->input.bb);
+
+ if (APR_BRIGADE_EMPTY(task->input.bb)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
+ "h2_task(%s): no data", task->id);
+ return (block == APR_NONBLOCK_READ)? APR_EAGAIN : APR_EOF;
+ }
+
+ if (mode == AP_MODE_EXHAUSTIVE) {
+ /* return all we have */
+ APR_BRIGADE_CONCAT(bb, task->input.bb);
+ }
+ else if (mode == AP_MODE_READBYTES) {
+ status = h2_brigade_concat_length(bb, task->input.bb, readbytes);
+ }
+ else if (mode == AP_MODE_SPECULATIVE) {
+ status = h2_brigade_copy_length(bb, task->input.bb, readbytes);
+ }
+ else if (mode == AP_MODE_GETLINE) {
+ /* we are reading a single LF line, e.g. the HTTP headers.
+ * this has the nasty side effect to split the bucket, even
+ * though it ends with CRLF and creates a 0 length bucket */
+ status = apr_brigade_split_line(bb, task->input.bb, block,
+ HUGE_STRING_LEN);
+ if (APLOGctrace1(f->c)) {
+ char buffer[1024];
+ apr_size_t len = sizeof(buffer)-1;
+ apr_brigade_flatten(bb, buffer, &len);
+ buffer[len] = 0;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
+ "h2_task(%s): getline: %s",
+ task->id, buffer);
+ }
+ }
+ else {
+ /* Hmm, well. There is mode AP_MODE_EATCRLF, but we chose not
+ * to support it. Seems to work. */
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOTIMPL, f->c,
+ APLOGNO(02942)
+ "h2_task, unsupported READ mode %d", mode);
+ status = APR_ENOTIMPL;
+ }
+
+ if (APLOGctrace1(f->c)) {
+ apr_brigade_length(bb, 0, &bblen);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
+ "h2_task(%s): return %ld data bytes",
+ task->id, (long)bblen);
+ }
+ return status;
+}
+/*******************************************************************************
+ * task input handling
+ ******************************************************************************/
+
+static apr_status_t open_response(h2_task *task)
+{
+ h2_response *response;
+ response = h2_from_h1_get_response(task->output.from_h1);
+ if (!response) {
+ /* This happens currently when ap_die(status, r) is invoked
+ * by a read request filter. */
+ ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, task->c, APLOGNO(03204)
+ "h2_task(%s): write without response for %s %s %s",
+ task->id,
+ task->request->method,
+ task->request->authority,
+ task->request->path);
+ task->c->aborted = 1;
+ return APR_ECONNABORTED;
+ }
+
+ if (h2_task_logio_add_bytes_out) {
+ /* count headers as if we'd do a HTTP/1.1 serialization */
+ task->output.written = h2_util_table_bytes(response->headers, 3)+1;
+ h2_task_logio_add_bytes_out(task->c, task->output.written);
+ }
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->c, APLOGNO(03348)
+ "h2_task(%s): open response to %s %s %s",
+ task->id, task->request->method,
+ task->request->authority,
+ task->request->path);
+ return h2_mplx_out_open(task->mplx, task->stream_id,
+ response, task->output.beam);
+}
+
+static apr_status_t send_out(h2_task *task, apr_bucket_brigade* bb)
+{
+ apr_off_t written, left;
+ apr_status_t status;
+
+ apr_brigade_length(bb, 0, &written);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
+ "h2_task(%s): write response body (%ld bytes)",
+ task->id, (long)written);
+
+ status = h2_beam_send(task->output.beam, bb,
+ task->blocking? APR_BLOCK_READ
+ : APR_NONBLOCK_READ);
+ if (APR_STATUS_IS_EAGAIN(status)) {
+ apr_brigade_length(bb, 0, &left);
+ written -= left;
+ status = APR_SUCCESS;
+ }
+ if (status == APR_SUCCESS) {
+ task->output.written += written;
+ if (h2_task_logio_add_bytes_out) {
+ h2_task_logio_add_bytes_out(task->c, written);
+ }
+ }
+ return status;
+}
+
+/* Bring the data from the brigade (which represents the result of the
+ * request_rec out filter chain) into the h2_mplx for further sending
+ * on the master connection.
+ */
+static apr_status_t output_write(h2_task *task, ap_filter_t* f,
+ apr_bucket_brigade* bb)
+{
+ apr_bucket *b;
+ apr_status_t status = APR_SUCCESS;
+
+ if (APR_BRIGADE_EMPTY(bb)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
+ "h2_task(%s): empty write", task->id);
+ return APR_SUCCESS;
+ }
+
+ if (task->frozen) {
+ h2_util_bb_log(task->c, task->stream_id, APLOG_TRACE2,
+ "frozen task output write, ignored", bb);
+ while (!APR_BRIGADE_EMPTY(bb)) {
+ b = APR_BRIGADE_FIRST(bb);
+ if (AP_BUCKET_IS_EOR(b)) {
+ APR_BUCKET_REMOVE(b);
+ task->eor = b;
+ }
+ else {
+ apr_bucket_delete(b);
+ }
+ }
+ return APR_SUCCESS;
+ }
+
+ if (!task->output.beam) {
+ h2_beam_create(&task->output.beam, task->pool,
+ task->stream_id, "output", 0);
+ }
+
+ /* Attempt to write saved brigade first */
+ if (task->output.bb && !APR_BRIGADE_EMPTY(task->output.bb)) {
+ status = send_out(task, task->output.bb);
+ if (status != APR_SUCCESS) {
+ return status;
+ }
+ }
+
+ /* If there is nothing saved (anymore), try to write the brigade passed */
+ if ((!task->output.bb || APR_BRIGADE_EMPTY(task->output.bb)) && !APR_BRIGADE_EMPTY(bb)) {
+ status = send_out(task, bb);
+ if (status != APR_SUCCESS) {
+ return status;
+ }
+ }
+
+ /* If the passed brigade is not empty, save it before return */
+ if (!APR_BRIGADE_EMPTY(bb)) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, task->c,
+ "h2_task(%s): could not write all, saving brigade",
+ task->id);
+ if (!task->output.bb) {
+ task->output.bb = apr_brigade_create(task->pool,
+ task->c->bucket_alloc);
+ }
+ return ap_save_brigade(f, &task->output.bb, &bb, task->pool);
+ }
+
+ if (!task->output.response_open) {
+ /* data is in the output beam, if we have not opened the response,
+ * do so now. */
+ status = open_response(task);
+ task->output.response_open = 1;
+ }
+
+ return status;
+}
+
+/*******************************************************************************
+ * task slave connection filters
+ ******************************************************************************/
static apr_status_t h2_filter_stream_input(ap_filter_t* filter,
apr_bucket_brigade* brigade,
@@ -57,11 +390,7 @@ static apr_status_t h2_filter_stream_input(ap_filter_t* filter,
{
h2_task *task = filter->ctx;
AP_DEBUG_ASSERT(task);
- if (!task->input) {
- return APR_ECONNABORTED;
- }
- return h2_task_input_read(task->input, filter, brigade,
- mode, block, readbytes);
+ return input_read(task, filter, brigade, mode, block, readbytes);
}
static apr_status_t h2_filter_stream_output(ap_filter_t* filter,
@@ -69,10 +398,7 @@ static apr_status_t h2_filter_stream_output(ap_filter_t* filter,
{
h2_task *task = filter->ctx;
AP_DEBUG_ASSERT(task);
- if (!task->output) {
- return APR_ECONNABORTED;
- }
- return h2_task_output_write(task->output, filter, brigade);
+ return output_write(task, filter, brigade);
}
static apr_status_t h2_filter_read_response(ap_filter_t* f,
@@ -80,10 +406,10 @@ static apr_status_t h2_filter_read_response(ap_filter_t* f,
{
h2_task *task = f->ctx;
AP_DEBUG_ASSERT(task);
- if (!task->output || !task->output->from_h1) {
+ if (!task->output.from_h1) {
return APR_ECONNABORTED;
}
- return h2_from_h1_read_response(task->output->from_h1, f, bb);
+ return h2_from_h1_read_response(task->output.from_h1, f, bb);
}
/*******************************************************************************
@@ -123,8 +449,8 @@ apr_status_t h2_task_init(apr_pool_t *pool, server_rec *s)
return APR_SUCCESS;
}
-h2_task *h2_task_create(long session_id, const h2_request *req,
- conn_rec *c, h2_mplx *mplx)
+h2_task *h2_task_create(conn_rec *c, const h2_request *req,
+ h2_bucket_beam *input, h2_mplx *mplx)
{
apr_pool_t *pool;
h2_task *task;
@@ -134,21 +460,23 @@ h2_task *h2_task_create(long session_id, const h2_request *req,
if (task == NULL) {
ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, c,
APLOGNO(02941) "h2_task(%ld-%d): create stream task",
- session_id, req->id);
+ c->id, req->id);
h2_mplx_out_close(mplx, req->id);
return NULL;
}
- task->id = apr_psprintf(pool, "%ld-%d", session_id, req->id);
+ task->id = apr_psprintf(pool, "%ld-%d", c->id, req->id);
task->stream_id = req->id;
task->c = c;
task->mplx = mplx;
task->c->keepalives = mplx->c->keepalives;
task->pool = pool;
task->request = req;
- task->input_eos = !req->body;
task->ser_headers = req->serialize;
task->blocking = 1;
+ task->input.beam = input;
+
+ apr_thread_cond_create(&task->cond, pool);
h2_ctx_create_for(c, task);
/* Add our own, network level in- and output filters. */
@@ -162,6 +490,9 @@ void h2_task_destroy(h2_task *task)
{
ap_remove_input_filter_byhandle(task->c->input_filters, "H2_TO_H1");
ap_remove_output_filter_byhandle(task->c->output_filters, "H1_TO_H2");
+ if (task->eor) {
+ apr_bucket_destroy(task->eor);
+ }
if (task->pool) {
apr_pool_destroy(task->pool);
}
@@ -172,14 +503,39 @@ void h2_task_set_io_blocking(h2_task *task, int blocking)
task->blocking = blocking;
}
-apr_status_t h2_task_do(h2_task *task, apr_thread_cond_t *cond)
+apr_status_t h2_task_do(h2_task *task)
{
apr_status_t status;
AP_DEBUG_ASSERT(task);
- task->io = cond;
- task->input = h2_task_input_create(task, task->c);
- task->output = h2_task_output_create(task, task->c);
+
+ task->input.block = APR_BLOCK_READ;
+ task->input.chunked = task->request->chunked;
+ task->input.eos = !task->request->body;
+ if (task->input.eos && !task->input.chunked && !task->ser_headers) {
+ /* We do not serialize/chunk and have eos already, no need to
+ * create a bucket brigade. */
+ task->input.bb = NULL;
+ task->input.eos_written = 1;
+ }
+ else {
+ task->input.bb = apr_brigade_create(task->pool, task->c->bucket_alloc);
+ if (task->ser_headers) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
+ "h2_task(%s): serialize request %s %s",
+ task->id, task->request->method, task->request->path);
+ apr_brigade_printf(task->input.bb, NULL,
+ NULL, "%s %s HTTP/1.1\r\n",
+ task->request->method, task->request->path);
+ apr_table_do(input_ser_header, task, task->request->headers, NULL);
+ apr_brigade_puts(task->input.bb, NULL, NULL, "\r\n");
+ }
+ if (task->input.eos) {
+ input_append_eos(task, NULL);
+ }
+ }
+
+ task->output.from_h1 = h2_from_h1_create(task->stream_id, task->pool);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
"h2_task(%s): process connection", task->id);
@@ -195,6 +551,7 @@ apr_status_t h2_task_do(h2_task *task, apr_thread_cond_t *cond)
else {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
"h2_task(%s): processing done", task->id);
+ h2_mplx_out_close(task->mplx, task->stream_id);
status = APR_SUCCESS;
}
diff --git a/modules/http2/h2_task.h b/modules/http2/h2_task.h
index 15a1d3cb2c..450763ee40 100644
--- a/modules/http2/h2_task.h
+++ b/modules/http2/h2_task.h
@@ -38,6 +38,7 @@
*/
struct apr_thread_cond_t;
+struct h2_bucket_beam;
struct h2_conn;
struct h2_mplx;
struct h2_task;
@@ -55,29 +56,42 @@ struct h2_task {
struct h2_mplx *mplx;
apr_pool_t *pool;
const struct h2_request *request;
+ apr_bucket *eor;
+ struct apr_thread_cond_t *cond;
unsigned int filters_set : 1;
- unsigned int input_eos : 1;
unsigned int ser_headers : 1;
unsigned int frozen : 1;
unsigned int blocking : 1;
unsigned int detached : 1;
- struct h2_task_input *input;
- struct h2_task_output *output;
- struct apr_thread_cond_t *io; /* used to wait for events on */
+ struct {
+ struct h2_bucket_beam *beam;
+ apr_bucket_brigade *bb;
+ apr_read_type_e block;
+ unsigned int chunked : 1;
+ unsigned int eos : 1;
+ unsigned int eos_written : 1;
+ } input;
+ struct {
+ struct h2_bucket_beam *beam;
+ struct h2_from_h1 *from_h1;
+ unsigned int response_open : 1;
+ apr_off_t written;
+ apr_bucket_brigade *bb;
+ } output;
struct h2_req_engine *engine; /* engine hosted by this task */
struct h2_req_engine *assigned; /* engine that task has been assigned to */
request_rec *r; /* request being processed in this task */
};
-h2_task *h2_task_create(long session_id, const struct h2_request *req,
- conn_rec *c, struct h2_mplx *mplx);
+h2_task *h2_task_create(conn_rec *c, const struct h2_request *req,
+ struct h2_bucket_beam *input, struct h2_mplx *mplx);
void h2_task_destroy(h2_task *task);
-apr_status_t h2_task_do(h2_task *task, struct apr_thread_cond_t *cond);
+apr_status_t h2_task_do(h2_task *task);
void h2_task_register_hooks(void);
/*
diff --git a/modules/http2/h2_task_input.c b/modules/http2/h2_task_input.c
deleted file mode 100644
index 3993b6b40c..0000000000
--- a/modules/http2/h2_task_input.c
+++ /dev/null
@@ -1,228 +0,0 @@
-/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <assert.h>
-
-#include <httpd.h>
-#include <http_core.h>
-#include <http_log.h>
-#include <http_connection.h>
-
-#include "h2_private.h"
-#include "h2_conn.h"
-#include "h2_mplx.h"
-#include "h2_request.h"
-#include "h2_session.h"
-#include "h2_stream.h"
-#include "h2_task_input.h"
-#include "h2_task.h"
-#include "h2_util.h"
-
-
-static int is_aborted(ap_filter_t *f)
-{
- return (f->c->aborted);
-}
-
-static int ser_header(void *ctx, const char *name, const char *value)
-{
- h2_task_input *input = (h2_task_input*)ctx;
- apr_brigade_printf(input->bb, NULL, NULL, "%s: %s\r\n", name, value);
- return 1;
-}
-
-h2_task_input *h2_task_input_create(h2_task *task, conn_rec *c)
-{
- h2_task_input *input = apr_pcalloc(task->pool, sizeof(h2_task_input));
- if (input) {
- input->task = task;
- input->bb = NULL;
- input->block = APR_BLOCK_READ;
-
- if (task->ser_headers) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
- "h2_task_input(%s): serialize request %s %s",
- task->id, task->request->method, task->request->path);
- input->bb = apr_brigade_create(task->pool, c->bucket_alloc);
- apr_brigade_printf(input->bb, NULL, NULL, "%s %s HTTP/1.1\r\n",
- task->request->method, task->request->path);
- apr_table_do(ser_header, input, task->request->headers, NULL);
- apr_brigade_puts(input->bb, NULL, NULL, "\r\n");
- if (input->task->input_eos) {
- APR_BRIGADE_INSERT_TAIL(input->bb, apr_bucket_eos_create(c->bucket_alloc));
- }
- }
- else if (!input->task->input_eos) {
- input->bb = apr_brigade_create(task->pool, c->bucket_alloc);
- }
- else {
- /* We do not serialize and have eos already, no need to
- * create a bucket brigade. */
- }
- }
- return input;
-}
-
-void h2_task_input_block_set(h2_task_input *input, apr_read_type_e block)
-{
- input->block = block;
-}
-
-apr_status_t h2_task_input_read(h2_task_input *input,
- ap_filter_t* f,
- apr_bucket_brigade* bb,
- ap_input_mode_t mode,
- apr_read_type_e block,
- apr_off_t readbytes)
-{
- apr_status_t status = APR_SUCCESS;
- apr_off_t bblen = 0;
-
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
- "h2_task_input(%s): read, block=%d, mode=%d, readbytes=%ld",
- input->task->id, block, mode, (long)readbytes);
-
- if (mode == AP_MODE_INIT) {
- return ap_get_brigade(f->c->input_filters, bb, mode, block, readbytes);
- }
-
- if (is_aborted(f)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
- "h2_task_input(%s): is aborted", input->task->id);
- return APR_ECONNABORTED;
- }
-
- if (input->bb) {
- status = apr_brigade_length(input->bb, 1, &bblen);
- if (status != APR_SUCCESS) {
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, f->c,
- APLOGNO(02958) "h2_task_input(%s): brigade length fail",
- input->task->id);
- return status;
- }
- }
-
- if ((bblen == 0) && input->task->input_eos) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
- "h2_task_input(%s): eos", input->task->id);
- return APR_EOF;
- }
-
- while (bblen == 0) {
- /* Get more data for our stream from mplx.
- */
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
- "h2_task_input(%s): get more data from mplx, block=%d, "
- "readbytes=%ld, queued=%ld",
- input->task->id, block,
- (long)readbytes, (long)bblen);
-
- /* Override the block mode we get called with depending on the input's
- * setting.
- */
- status = h2_mplx_in_read(input->task->mplx, block,
- input->task->stream_id, input->bb,
- f->r? f->r->trailers_in : NULL,
- input->task->io);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
- "h2_task_input(%s): mplx in read returned",
- input->task->id);
- if (APR_STATUS_IS_EAGAIN(status)
- && (mode == AP_MODE_GETLINE || block == APR_BLOCK_READ)) {
- /* chunked input handling does not seem to like it if we
- * return with APR_EAGAIN from a GETLINE read...
- * upload 100k test on test-ser.example.org hangs */
- status = APR_SUCCESS;
- }
- else if (status != APR_SUCCESS) {
- return status;
- }
-
- status = apr_brigade_length(input->bb, 1, &bblen);
- if (status != APR_SUCCESS) {
- return status;
- }
-
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
- "h2_task_input(%s): mplx in read, %ld bytes in brigade",
- input->task->id, (long)bblen);
- if (h2_task_logio_add_bytes_in) {
- h2_task_logio_add_bytes_in(f->c, bblen);
- }
- }
-
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
- "h2_task_input(%s): read, mode=%d, block=%d, "
- "readbytes=%ld, queued=%ld",
- input->task->id, mode, block,
- (long)readbytes, (long)bblen);
-
- if (!APR_BRIGADE_EMPTY(input->bb)) {
- if (mode == AP_MODE_EXHAUSTIVE) {
- /* return all we have */
- status = h2_util_move(bb, input->bb, readbytes, NULL,
- "task_input_read(exhaustive)");
- }
- else if (mode == AP_MODE_READBYTES) {
- status = h2_util_move(bb, input->bb, readbytes, NULL,
- "task_input_read(readbytes)");
- }
- else if (mode == AP_MODE_SPECULATIVE) {
- /* return not more than was asked for */
- status = h2_util_copy(bb, input->bb, readbytes,
- "task_input_read(speculative)");
- }
- else if (mode == AP_MODE_GETLINE) {
- /* we are reading a single LF line, e.g. the HTTP headers */
- status = apr_brigade_split_line(bb, input->bb, block,
- HUGE_STRING_LEN);
- if (APLOGctrace1(f->c)) {
- char buffer[1024];
- apr_size_t len = sizeof(buffer)-1;
- apr_brigade_flatten(bb, buffer, &len);
- buffer[len] = 0;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
- "h2_task_input(%s): getline: %s",
- input->task->id, buffer);
- }
- }
- else {
- /* Hmm, well. There is mode AP_MODE_EATCRLF, but we chose not
- * to support it. Seems to work. */
- ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOTIMPL, f->c,
- APLOGNO(02942)
- "h2_task_input, unsupported READ mode %d", mode);
- status = APR_ENOTIMPL;
- }
-
- if (APLOGctrace1(f->c)) {
- apr_brigade_length(bb, 0, &bblen);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
- "h2_task_input(%s): return %ld data bytes",
- input->task->id, (long)bblen);
- }
- return status;
- }
-
- if (is_aborted(f)) {
- return APR_ECONNABORTED;
- }
-
- status = (block == APR_NONBLOCK_READ)? APR_EAGAIN : APR_EOF;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
- "h2_task_input(%s): no data", input->task->id);
- return status;
-}
-
diff --git a/modules/http2/h2_task_input.h b/modules/http2/h2_task_input.h
deleted file mode 100644
index c8913cacf2..0000000000
--- a/modules/http2/h2_task_input.h
+++ /dev/null
@@ -1,46 +0,0 @@
-/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __mod_h2__h2_task_input__
-#define __mod_h2__h2_task_input__
-
-/* h2_task_input places the HEADER+DATA, formatted in HTTP/1.1, into
- * a bucket brigade. The brigade is setup as the input brigade for our
- * pseudo httpd conn_rec that is handling a specific h2_task.
- */
-struct apr_thread_cond_t;
-struct h2_mplx;
-struct h2_task;
-
-typedef struct h2_task_input h2_task_input;
-struct h2_task_input {
- struct h2_task *task;
- apr_bucket_brigade *bb;
- apr_read_type_e block;
-};
-
-
-h2_task_input *h2_task_input_create(struct h2_task *task, conn_rec *c);
-
-apr_status_t h2_task_input_read(h2_task_input *input,
- ap_filter_t* filter,
- apr_bucket_brigade* brigade,
- ap_input_mode_t mode,
- apr_read_type_e block,
- apr_off_t readbytes);
-
-void h2_task_input_block_set(h2_task_input *input, apr_read_type_e block);
-
-#endif /* defined(__mod_h2__h2_task_input__) */
diff --git a/modules/http2/h2_task_output.c b/modules/http2/h2_task_output.c
deleted file mode 100644
index 80938d1fcc..0000000000
--- a/modules/http2/h2_task_output.c
+++ /dev/null
@@ -1,176 +0,0 @@
-/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <assert.h>
-
-#include <apr_thread_cond.h>
-#include <httpd.h>
-#include <http_core.h>
-#include <http_log.h>
-#include <http_connection.h>
-#include <http_request.h>
-
-#include "h2_private.h"
-#include "h2_conn.h"
-#include "h2_mplx.h"
-#include "h2_request.h"
-#include "h2_session.h"
-#include "h2_stream.h"
-#include "h2_from_h1.h"
-#include "h2_response.h"
-#include "h2_task_output.h"
-#include "h2_task.h"
-#include "h2_util.h"
-
-
-h2_task_output *h2_task_output_create(h2_task *task, conn_rec *c)
-{
- h2_task_output *output = apr_pcalloc(task->pool, sizeof(h2_task_output));
- if (output) {
- output->task = task;
- output->from_h1 = h2_from_h1_create(task->stream_id, task->pool);
- }
- return output;
-}
-
-static apr_status_t open_response(h2_task_output *output, ap_filter_t *f,
- apr_bucket_brigade *bb, const char *caller)
-{
- h2_response *response;
- response = h2_from_h1_get_response(output->from_h1);
- if (!response) {
- if (f) {
- /* This happens currently when ap_die(status, r) is invoked
- * by a read request filter. */
- ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, output->task->c, APLOGNO(03204)
- "h2_task_output(%s): write without response by %s "
- "for %s %s %s",
- output->task->id, caller,
- output->task->request->method,
- output->task->request->authority,
- output->task->request->path);
- output->task->c->aborted = 1;
- }
- if (output->task->io) {
- apr_thread_cond_broadcast(output->task->io);
- }
- return APR_ECONNABORTED;
- }
-
- if (h2_task_logio_add_bytes_out) {
- /* count headers as if we'd do a HTTP/1.1 serialization */
- output->written = h2_util_table_bytes(response->headers, 3)+1;
- h2_task_logio_add_bytes_out(output->task->c, output->written);
- }
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, output->task->c, APLOGNO(03348)
- "h2_task(%s): open response to %s %s %s",
- output->task->id, output->task->request->method,
- output->task->request->authority,
- output->task->request->path);
- return h2_mplx_out_open(output->task->mplx, output->task->stream_id,
- response, f, bb, output->task->io);
-}
-
-static apr_status_t write_brigade_raw(h2_task_output *output,
- ap_filter_t* f, apr_bucket_brigade* bb)
-{
- apr_off_t written, left;
- apr_status_t status;
-
- apr_brigade_length(bb, 0, &written);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, output->task->c,
- "h2_task(%s): write response body (%ld bytes)",
- output->task->id, (long)written);
-
- status = h2_mplx_out_write(output->task->mplx, output->task->stream_id,
- f, output->task->blocking, bb, output->task->io);
- if (status == APR_INCOMPLETE) {
- apr_brigade_length(bb, 0, &left);
- written -= left;
- status = APR_SUCCESS;
- }
-
- if (status == APR_SUCCESS) {
- output->written += written;
- if (h2_task_logio_add_bytes_out) {
- h2_task_logio_add_bytes_out(output->task->c, written);
- }
- }
- return status;
-}
-
-/* Bring the data from the brigade (which represents the result of the
- * request_rec out filter chain) into the h2_mplx for further sending
- * on the master connection.
- */
-apr_status_t h2_task_output_write(h2_task_output *output,
- ap_filter_t* f, apr_bucket_brigade* bb)
-{
- apr_bucket *b;
- apr_status_t status = APR_SUCCESS;
-
- if (APR_BRIGADE_EMPTY(bb)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, output->task->c,
- "h2_task(%s): empty write", output->task->id);
- return APR_SUCCESS;
- }
-
- if (output->task->frozen) {
- h2_util_bb_log(output->task->c, output->task->stream_id, APLOG_TRACE2,
- "frozen task output write, ignored", bb);
- while (!APR_BRIGADE_EMPTY(bb)) {
- b = APR_BRIGADE_FIRST(bb);
- if (AP_BUCKET_IS_EOR(b)) {
- /* TODO: keep it */
- APR_BUCKET_REMOVE(b);
- }
- else {
- apr_bucket_delete(b);
- }
- }
- return APR_SUCCESS;
- }
-
- if (!output->response_open) {
- status = open_response(output, f, bb, "write");
- output->response_open = 1;
- }
-
- /* Attempt to write saved brigade first */
- if (status == APR_SUCCESS && output->bb && !APR_BRIGADE_EMPTY(output->bb)) {
- status = write_brigade_raw(output, f, output->bb);
- }
-
- /* If there is nothing saved (anymore), try to write the brigade passed */
- if (status == APR_SUCCESS
- && (!output->bb || APR_BRIGADE_EMPTY(output->bb))
- && !APR_BRIGADE_EMPTY(bb)) {
- status = write_brigade_raw(output, f, bb);
- }
-
- /* If the passed brigade is not empty, save it before return */
- if (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb)) {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, output->task->c,
- "h2_task(%s): could not write all, saving brigade",
- output->task->id);
- if (!output->bb) {
- output->bb = apr_brigade_create(output->task->pool, output->task->c->bucket_alloc);
- }
- return ap_save_brigade(f, &output->bb, &bb, output->task->pool);
- }
-
- return status;
-}
-
diff --git a/modules/http2/h2_task_output.h b/modules/http2/h2_task_output.h
deleted file mode 100644
index 3135bc459e..0000000000
--- a/modules/http2/h2_task_output.h
+++ /dev/null
@@ -1,50 +0,0 @@
-/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __mod_h2__h2_task_output__
-#define __mod_h2__h2_task_output__
-
-/* h2_task_output reads a HTTP/1 response from the brigade and applies
- * them to a h2_output_converter. The brigade is setup as the output brigade
- * for our pseudo httpd conn_rec that is handling a specific h2_task.
- *
- */
-struct apr_thread_cond_t;
-struct h2_mplx;
-struct h2_task;
-struct h2_from_h1;
-
-typedef struct h2_task_output h2_task_output;
-
-struct h2_task_output {
- struct h2_task *task;
- struct h2_from_h1 *from_h1;
-
- unsigned int response_open : 1;
-
- apr_off_t written;
- apr_bucket_brigade *bb;
-};
-
-h2_task_output *h2_task_output_create(struct h2_task *task, conn_rec *c);
-
-apr_status_t h2_task_output_write(h2_task_output *output,
- ap_filter_t* filter,
- apr_bucket_brigade* brigade);
-
-apr_status_t h2_task_output_freeze(h2_task_output *output);
-apr_status_t h2_task_output_thaw(h2_task_output *output);
-
-#endif /* defined(__mod_h2__h2_task_output__) */
diff --git a/modules/http2/h2_util.c b/modules/http2/h2_util.c
index 06472425f2..9beaeae048 100644
--- a/modules/http2/h2_util.c
+++ b/modules/http2/h2_util.c
@@ -332,6 +332,301 @@ void h2_ihash_clear(h2_ihash_t *ih)
}
/*******************************************************************************
+ * ilist - sorted list for structs with int identifier
+ ******************************************************************************/
+
+#define h2_ilist_IDX(list, i) ((int**)(list)->elts)[i]
+
+struct h2_ilist_t {
+ apr_array_header_t *l;
+};
+
+h2_ilist_t *h2_ilist_create(apr_pool_t *pool)
+{
+ h2_ilist_t *list = apr_pcalloc(pool, sizeof(h2_ilist_t));
+ if (list) {
+ list->l = apr_array_make(pool, 100, sizeof(int*));
+ if (!list->l) {
+ return NULL;
+ }
+ }
+ return list;
+}
+
+static int h2_ilist_cmp(const void *s1, const void *s2)
+{
+ int **pi1 = (int **)s1;
+ int **pi2 = (int **)s2;
+ return *(*pi1) - *(*pi2);
+}
+
+void *h2_ilist_get(h2_ilist_t *list, int id)
+{
+ /* we keep the array sorted by id, so lookup can be done
+ * by bsearch.
+ */
+ int **pi;
+ int *pkey = &id;
+
+ pi = bsearch(&pkey, list->l->elts, list->l->nelts,
+ list->l->elt_size, h2_ilist_cmp);
+ return pi? *pi : NULL;
+}
+
+static void h2_ilist_sort(h2_ilist_t *list)
+{
+ qsort(list->l->elts, list->l->nelts, list->l->elt_size, h2_ilist_cmp);
+}
+
+apr_status_t h2_ilist_add(h2_ilist_t *list, void *val)
+{
+ int *pi = val;
+ void *existing = h2_ilist_get(list, *pi);
+ if (!existing) {
+ int last;
+ APR_ARRAY_PUSH(list->l, void*) = val;
+ /* Often, values get added in ascending order of id. We
+ * keep the array sorted, so we just need to check if the newly
+ * appended stream has a lower id than the last one. if not,
+ * sorting is not necessary.
+ */
+ last = list->l->nelts - 1;
+ if (last > 0
+ && *h2_ilist_IDX(list->l, last) < *h2_ilist_IDX(list->l, last-1)) {
+ h2_ilist_sort(list);
+ }
+ }
+ return APR_SUCCESS;
+}
+
+static void remove_idx(h2_ilist_t *list, int idx)
+{
+ int n;
+ --list->l->nelts;
+ n = list->l->nelts - idx;
+ if (n > 0) {
+ /* There are n h2_io* behind idx. Move the rest down */
+ int **selts = (int**)list->l->elts;
+ memmove(selts + idx, selts + idx + 1, n * sizeof(int*));
+ }
+}
+
+void *h2_ilist_remove(h2_ilist_t *list, int id)
+{
+ int i;
+ for (i = 0; i < list->l->nelts; ++i) {
+ int *e = h2_ilist_IDX(list->l, i);
+ if (id == *e) {
+ remove_idx(list, i);
+ return e;
+ }
+ }
+ return NULL;
+}
+
+void *h2_ilist_shift(h2_ilist_t *list)
+{
+ if (list->l->nelts > 0) {
+ int *pi = h2_ilist_IDX(list->l, 0);
+ remove_idx(list, 0);
+ return pi;
+ }
+ return NULL;
+}
+
+int h2_ilist_empty(h2_ilist_t *list)
+{
+ return list->l->nelts == 0;
+}
+
+int h2_ilist_iter(h2_ilist_t *list, h2_ilist_iter_t *iter, void *ctx)
+{
+ int i;
+ for (i = 0; i < list->l->nelts; ++i) {
+ int *pi = h2_ilist_IDX(list->l, i);
+ if (!iter(ctx, pi)) {
+ return 0;
+ }
+ }
+ return 1;
+}
+
+apr_size_t h2_ilist_count(h2_ilist_t *list)
+{
+ return list->l->nelts;
+}
+
+/*******************************************************************************
+ * iqueue - sorted list of int
+ ******************************************************************************/
+
+static void iq_grow(h2_iqueue *q, int nlen);
+static void iq_swap(h2_iqueue *q, int i, int j);
+static int iq_bubble_up(h2_iqueue *q, int i, int top,
+ h2_iq_cmp *cmp, void *ctx);
+static int iq_bubble_down(h2_iqueue *q, int i, int bottom,
+ h2_iq_cmp *cmp, void *ctx);
+
+h2_iqueue *h2_iq_create(apr_pool_t *pool, int capacity)
+{
+ h2_iqueue *q = apr_pcalloc(pool, sizeof(h2_iqueue));
+ if (q) {
+ q->pool = pool;
+ iq_grow(q, capacity);
+ q->nelts = 0;
+ }
+ return q;
+}
+
+int h2_iq_empty(h2_iqueue *q)
+{
+ return q->nelts == 0;
+}
+
+int h2_iq_count(h2_iqueue *q)
+{
+ return q->nelts;
+}
+
+
+void h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx)
+{
+ int i;
+
+ if (q->nelts >= q->nalloc) {
+ iq_grow(q, q->nalloc * 2);
+ }
+
+ i = (q->head + q->nelts) % q->nalloc;
+ q->elts[i] = sid;
+ ++q->nelts;
+
+ if (cmp) {
+ /* bubble it to the front of the queue */
+ iq_bubble_up(q, i, q->head, cmp, ctx);
+ }
+}
+
+int h2_iq_remove(h2_iqueue *q, int sid)
+{
+ int i;
+ for (i = 0; i < q->nelts; ++i) {
+ if (sid == q->elts[(q->head + i) % q->nalloc]) {
+ break;
+ }
+ }
+
+ if (i < q->nelts) {
+ ++i;
+ for (; i < q->nelts; ++i) {
+ q->elts[(q->head+i-1)%q->nalloc] = q->elts[(q->head+i)%q->nalloc];
+ }
+ --q->nelts;
+ return 1;
+ }
+ return 0;
+}
+
+void h2_iq_clear(h2_iqueue *q)
+{
+ q->nelts = 0;
+}
+
+void h2_iq_sort(h2_iqueue *q, h2_iq_cmp *cmp, void *ctx)
+{
+ /* Assume that changes in ordering are minimal. This needs,
+ * best case, q->nelts - 1 comparisions to check that nothing
+ * changed.
+ */
+ if (q->nelts > 0) {
+ int i, ni, prev, last;
+
+ /* Start at the end of the queue and create a tail of sorted
+ * entries. Make that tail one element longer in each iteration.
+ */
+ last = i = (q->head + q->nelts - 1) % q->nalloc;
+ while (i != q->head) {
+ prev = (q->nalloc + i - 1) % q->nalloc;
+
+ ni = iq_bubble_up(q, i, prev, cmp, ctx);
+ if (ni == prev) {
+ /* i bubbled one up, bubble the new i down, which
+ * keeps all tasks below i sorted. */
+ iq_bubble_down(q, i, last, cmp, ctx);
+ }
+ i = prev;
+ };
+ }
+}
+
+
+int h2_iq_shift(h2_iqueue *q)
+{
+ int sid;
+
+ if (q->nelts <= 0) {
+ return 0;
+ }
+
+ sid = q->elts[q->head];
+ q->head = (q->head + 1) % q->nalloc;
+ q->nelts--;
+
+ return sid;
+}
+
+static void iq_grow(h2_iqueue *q, int nlen)
+{
+ if (nlen > q->nalloc) {
+ int *nq = apr_pcalloc(q->pool, sizeof(int) * nlen);
+ if (q->nelts > 0) {
+ int l = ((q->head + q->nelts) % q->nalloc) - q->head;
+
+ memmove(nq, q->elts + q->head, sizeof(int) * l);
+ if (l < q->nelts) {
+ /* elts wrapped, append elts in [0, remain] to nq */
+ int remain = q->nelts - l;
+ memmove(nq + l, q->elts, sizeof(int) * remain);
+ }
+ }
+ q->elts = nq;
+ q->nalloc = nlen;
+ q->head = 0;
+ }
+}
+
+static void iq_swap(h2_iqueue *q, int i, int j)
+{
+ int x = q->elts[i];
+ q->elts[i] = q->elts[j];
+ q->elts[j] = x;
+}
+
+static int iq_bubble_up(h2_iqueue *q, int i, int top,
+ h2_iq_cmp *cmp, void *ctx)
+{
+ int prev;
+ while (((prev = (q->nalloc + i - 1) % q->nalloc), i != top)
+ && (*cmp)(q->elts[i], q->elts[prev], ctx) < 0) {
+ iq_swap(q, prev, i);
+ i = prev;
+ }
+ return i;
+}
+
+static int iq_bubble_down(h2_iqueue *q, int i, int bottom,
+ h2_iq_cmp *cmp, void *ctx)
+{
+ int next;
+ while (((next = (q->nalloc + i + 1) % q->nalloc), i != bottom)
+ && (*cmp)(q->elts[i], q->elts[next], ctx) > 0) {
+ iq_swap(q, next, i);
+ i = next;
+ }
+ return i;
+}
+
+/*******************************************************************************
* h2_util for apt_table_t
******************************************************************************/
@@ -368,15 +663,6 @@ apr_size_t h2_util_table_bytes(apr_table_t *t, apr_size_t pair_extra)
* h2_util for bucket brigades
******************************************************************************/
-/* DEEP_COPY==0 crashes under load. I think the setaside is fine,
- * however buckets moved to another thread will still be
- * free'd against the old bucket_alloc. *And* if the old
- * pool gets destroyed too early, the bucket disappears while
- * still needed.
- */
-static const int DEEP_COPY = 1;
-static const int FILE_MOVE = 1;
-
static apr_status_t last_not_included(apr_bucket_brigade *bb,
apr_off_t maxlen,
int same_alloc,
@@ -434,200 +720,95 @@ static apr_status_t last_not_included(apr_bucket_brigade *bb,
return status;
}
-#define LOG_BUCKETS 0
-#define LOG_LEVEL APLOG_INFO
-
-apr_status_t h2_util_move(apr_bucket_brigade *to, apr_bucket_brigade *from,
- apr_off_t maxlen, apr_size_t *pfile_buckets_allowed,
- const char *msg)
+apr_status_t h2_brigade_concat_length(apr_bucket_brigade *dest,
+ apr_bucket_brigade *src,
+ apr_off_t length)
{
+ apr_bucket *b, *next;
+ apr_off_t remain = length;
apr_status_t status = APR_SUCCESS;
- int same_alloc;
- AP_DEBUG_ASSERT(to);
- AP_DEBUG_ASSERT(from);
- same_alloc = (to->bucket_alloc == from->bucket_alloc
- || to->p == from->p);
-
- if (!FILE_MOVE) {
- pfile_buckets_allowed = NULL;
- }
-
- if (!APR_BRIGADE_EMPTY(from)) {
- apr_bucket *b, *end;
+ for (b = APR_BRIGADE_FIRST(src);
+ b != APR_BRIGADE_SENTINEL(src);
+ b = next) {
+ next = APR_BUCKET_NEXT(b);
- status = last_not_included(from, maxlen, same_alloc,
- pfile_buckets_allowed, &end);
- if (status != APR_SUCCESS) {
- return status;
+ if (APR_BUCKET_IS_METADATA(b)) {
+ /* fall through */
}
-
- while (!APR_BRIGADE_EMPTY(from) && status == APR_SUCCESS) {
- b = APR_BRIGADE_FIRST(from);
- if (b == end) {
- break;
+ else {
+ if (remain == b->length) {
+ /* fall through */
}
-
- if (same_alloc || (b->list == to->bucket_alloc)) {
- /* both brigades use the same bucket_alloc and auto-cleanups
- * have the same life time. It's therefore safe to just move
- * directly. */
- APR_BUCKET_REMOVE(b);
- APR_BRIGADE_INSERT_TAIL(to, b);
-#if LOG_BUCKETS
- ap_log_perror(APLOG_MARK, LOG_LEVEL, 0, to->p, APLOGNO(03205)
- "h2_util_move: %s, passed bucket(same bucket_alloc) "
- "%ld-%ld, type=%s",
- msg, (long)b->start, (long)b->length,
- APR_BUCKET_IS_METADATA(b)?
- (APR_BUCKET_IS_EOS(b)? "EOS":
- (APR_BUCKET_IS_FLUSH(b)? "FLUSH" : "META")) :
- (APR_BUCKET_IS_FILE(b)? "FILE" : "DATA"));
-#endif
+ else if (remain <= 0) {
+ return status;
}
- else if (DEEP_COPY) {
- /* we have not managed the magic of passing buckets from
- * one thread to another. Any attempts result in
- * cleanup of pools scrambling memory.
- */
- if (APR_BUCKET_IS_METADATA(b)) {
- if (APR_BUCKET_IS_EOS(b)) {
- APR_BRIGADE_INSERT_TAIL(to, apr_bucket_eos_create(to->bucket_alloc));
- }
- else {
- /* ignore */
- }
- }
- else if (pfile_buckets_allowed
- && *pfile_buckets_allowed > 0
- && APR_BUCKET_IS_FILE(b)) {
- /* We do not want to read files when passing buckets, if
- * we can avoid it. However, what we've come up so far
- * is not working corrently, resulting either in crashes or
- * too many open file descriptors.
- */
- apr_bucket_file *f = (apr_bucket_file *)b->data;
- apr_file_t *fd = f->fd;
- int setaside = (f->readpool != to->p);
-#if LOG_BUCKETS
- ap_log_perror(APLOG_MARK, LOG_LEVEL, 0, to->p, APLOGNO(03206)
- "h2_util_move: %s, moving FILE bucket %ld-%ld "
- "from=%lx(p=%lx) to=%lx(p=%lx), setaside=%d",
- msg, (long)b->start, (long)b->length,
- (long)from, (long)from->p,
- (long)to, (long)to->p, setaside);
-#endif
- if (setaside) {
- status = apr_file_setaside(&fd, fd, to->p);
- if (status != APR_SUCCESS) {
- ap_log_perror(APLOG_MARK, APLOG_ERR, status, to->p,
- APLOGNO(02947) "h2_util: %s, setaside FILE",
- msg);
- return status;
- }
+ else {
+ if (b->length == ((apr_size_t)-1)) {
+ const char *ign;
+ apr_size_t ilen;
+ status = apr_bucket_read(b, &ign, &ilen, APR_BLOCK_READ);
+ if (status != APR_SUCCESS) {
+ return status;
}
- apr_brigade_insert_file(to, fd, b->start, b->length,
- to->p);
- --(*pfile_buckets_allowed);
}
- else {
- const char *data;
- apr_size_t len;
-
- status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
- if (status == APR_SUCCESS && len > 0) {
- status = apr_brigade_write(to, NULL, NULL, data, len);
-#if LOG_BUCKETS
- ap_log_perror(APLOG_MARK, LOG_LEVEL, 0, to->p, APLOGNO(03207)
- "h2_util_move: %s, copied bucket %ld-%ld "
- "from=%lx(p=%lx) to=%lx(p=%lx)",
- msg, (long)b->start, (long)b->length,
- (long)from, (long)from->p,
- (long)to, (long)to->p);
-#endif
- }
+
+ if (remain < b->length) {
+ apr_bucket_split(b, remain);
}
- apr_bucket_delete(b);
- }
- else {
- apr_bucket_setaside(b, to->p);
- APR_BUCKET_REMOVE(b);
- APR_BRIGADE_INSERT_TAIL(to, b);
-#if LOG_BUCKETS
- ap_log_perror(APLOG_MARK, LOG_LEVEL, 0, to->p, APLOGNO(03208)
- "h2_util_move: %s, passed setaside bucket %ld-%ld "
- "from=%lx(p=%lx) to=%lx(p=%lx)",
- msg, (long)b->start, (long)b->length,
- (long)from, (long)from->p,
- (long)to, (long)to->p);
-#endif
}
}
+ APR_BUCKET_REMOVE(b);
+ APR_BRIGADE_INSERT_TAIL(dest, b);
+ remain -= b->length;
}
-
return status;
}
-apr_status_t h2_util_copy(apr_bucket_brigade *to, apr_bucket_brigade *from,
- apr_off_t maxlen, const char *msg)
+apr_status_t h2_brigade_copy_length(apr_bucket_brigade *dest,
+ apr_bucket_brigade *src,
+ apr_off_t length)
{
+ apr_bucket *b, *next;
+ apr_off_t remain = length;
apr_status_t status = APR_SUCCESS;
- int same_alloc;
-
- (void)msg;
- AP_DEBUG_ASSERT(to);
- AP_DEBUG_ASSERT(from);
- same_alloc = (to->bucket_alloc == from->bucket_alloc);
-
- if (!APR_BRIGADE_EMPTY(from)) {
- apr_bucket *b, *end, *cpy;
+
+ for (b = APR_BRIGADE_FIRST(src);
+ b != APR_BRIGADE_SENTINEL(src);
+ b = next) {
+ next = APR_BUCKET_NEXT(b);
- status = last_not_included(from, maxlen, 0, 0, &end);
- if (status != APR_SUCCESS) {
- return status;
+ if (APR_BUCKET_IS_METADATA(b)) {
+ /* fall through */
}
-
- for (b = APR_BRIGADE_FIRST(from);
- b != APR_BRIGADE_SENTINEL(from) && b != end;
- b = APR_BUCKET_NEXT(b))
- {
- if (same_alloc) {
- status = apr_bucket_copy(b, &cpy);
- if (status != APR_SUCCESS) {
- break;
- }
- APR_BRIGADE_INSERT_TAIL(to, cpy);
+ else {
+ if (remain == b->length) {
+ /* fall through */
+ }
+ else if (remain <= 0) {
+ return status;
}
else {
- if (APR_BUCKET_IS_METADATA(b)) {
- if (APR_BUCKET_IS_EOS(b)) {
- APR_BRIGADE_INSERT_TAIL(to, apr_bucket_eos_create(to->bucket_alloc));
- }
- else if (APR_BUCKET_IS_FLUSH(b)) {
- APR_BRIGADE_INSERT_TAIL(to, apr_bucket_flush_create(to->bucket_alloc));
- }
- else {
- /* ignore */
+ if (b->length == ((apr_size_t)-1)) {
+ const char *ign;
+ apr_size_t ilen;
+ status = apr_bucket_read(b, &ign, &ilen, APR_BLOCK_READ);
+ if (status != APR_SUCCESS) {
+ return status;
}
}
- else {
- const char *data;
- apr_size_t len;
- status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
- if (status == APR_SUCCESS && len > 0) {
- status = apr_brigade_write(to, NULL, NULL, data, len);
-#if LOG_BUCKETS
- ap_log_perror(APLOG_MARK, LOG_LEVEL, 0, to->p, APLOGNO(03209)
- "h2_util_copy: %s, copied bucket %ld-%ld "
- "from=%lx(p=%lx) to=%lx(p=%lx)",
- msg, (long)b->start, (long)b->length,
- (long)from, (long)from->p,
- (long)to, (long)to->p);
-#endif
- }
+
+ if (remain < b->length) {
+ apr_bucket_split(b, remain);
}
}
}
+ status = apr_bucket_copy(b, &b);
+ if (status != APR_SUCCESS) {
+ return status;
+ }
+ APR_BRIGADE_INSERT_TAIL(dest, b);
+ remain -= b->length;
}
return status;
}
@@ -666,25 +847,6 @@ int h2_util_bb_has_data(apr_bucket_brigade *bb)
return 0;
}
-int h2_util_bb_has_data_or_eos(apr_bucket_brigade *bb)
-{
- apr_bucket *b;
- for (b = APR_BRIGADE_FIRST(bb);
- b != APR_BRIGADE_SENTINEL(bb);
- b = APR_BUCKET_NEXT(b))
- {
- if (APR_BUCKET_IS_METADATA(b)) {
- if (APR_BUCKET_IS_EOS(b)) {
- return 1;
- }
- }
- else {
- return 1;
- }
- }
- return 0;
-}
-
apr_status_t h2_util_bb_avail(apr_bucket_brigade *bb,
apr_off_t *plen, int *peos)
{
@@ -789,186 +951,102 @@ apr_status_t h2_util_bb_readx(apr_bucket_brigade *bb,
return status;
}
-void h2_util_bb_log(conn_rec *c, int stream_id, int level,
- const char *tag, apr_bucket_brigade *bb)
+apr_size_t h2_util_bucket_print(char *buffer, apr_size_t bmax,
+ apr_bucket *b, const char *sep)
{
- char buffer[16 * 1024];
- const char *line = "(null)";
- apr_size_t bmax = sizeof(buffer)/sizeof(buffer[0]);
- int off = 0;
- apr_bucket *b;
+ apr_size_t off = 0;
+ if (sep && *sep) {
+ off += apr_snprintf(buffer+off, bmax-off, "%s", sep);
+ }
- if (bb) {
- memset(buffer, 0, bmax--);
- for (b = APR_BRIGADE_FIRST(bb);
- bmax && (b != APR_BRIGADE_SENTINEL(bb));
- b = APR_BUCKET_NEXT(b)) {
-
- if (APR_BUCKET_IS_METADATA(b)) {
- if (APR_BUCKET_IS_EOS(b)) {
- off += apr_snprintf(buffer+off, bmax-off, "eos ");
- }
- else if (APR_BUCKET_IS_FLUSH(b)) {
- off += apr_snprintf(buffer+off, bmax-off, "flush ");
- }
- else if (AP_BUCKET_IS_EOR(b)) {
- off += apr_snprintf(buffer+off, bmax-off, "eor ");
- }
- else {
- off += apr_snprintf(buffer+off, bmax-off, "meta(unknown) ");
- }
- }
- else {
- const char *btype = "data";
- if (APR_BUCKET_IS_FILE(b)) {
- btype = "file";
- }
- else if (APR_BUCKET_IS_PIPE(b)) {
- btype = "pipe";
- }
- else if (APR_BUCKET_IS_SOCKET(b)) {
- btype = "socket";
- }
- else if (APR_BUCKET_IS_HEAP(b)) {
- btype = "heap";
- }
- else if (APR_BUCKET_IS_TRANSIENT(b)) {
- btype = "transient";
- }
- else if (APR_BUCKET_IS_IMMORTAL(b)) {
- btype = "immortal";
- }
-#if APR_HAS_MMAP
- else if (APR_BUCKET_IS_MMAP(b)) {
- btype = "mmap";
- }
-#endif
- else if (APR_BUCKET_IS_POOL(b)) {
- btype = "pool";
- }
-
- off += apr_snprintf(buffer+off, bmax-off, "%s[%ld] ",
- btype,
- (long)(b->length == ((apr_size_t)-1)?
- -1 : b->length));
- }
+ if (APR_BUCKET_IS_METADATA(b)) {
+ if (APR_BUCKET_IS_EOS(b)) {
+ off += apr_snprintf(buffer+off, bmax-off, "eos");
+ }
+ else if (APR_BUCKET_IS_FLUSH(b)) {
+ off += apr_snprintf(buffer+off, bmax-off, "flush");
+ }
+ else if (AP_BUCKET_IS_EOR(b)) {
+ off += apr_snprintf(buffer+off, bmax-off, "eor");
+ }
+ else {
+ off += apr_snprintf(buffer+off, bmax-off, "meta(unknown)");
}
- line = *buffer? buffer : "(empty)";
}
- /* Intentional no APLOGNO */
- ap_log_cerror(APLOG_MARK, level, 0, c, "bb_dump(%ld-%d)-%s: %s",
- c->id, stream_id, tag, line);
-
-}
-
-apr_status_t h2_ltransfer_brigade(apr_bucket_brigade *to,
- apr_bucket_brigade *from,
- apr_pool_t *p,
- apr_off_t *plen,
- int *peos)
-{
- apr_bucket *e;
- apr_off_t len = 0, remain = *plen;
- apr_status_t rv;
-
- *peos = 0;
-
- while (!APR_BRIGADE_EMPTY(from)) {
- e = APR_BRIGADE_FIRST(from);
-
- if (APR_BUCKET_IS_METADATA(e)) {
- if (APR_BUCKET_IS_EOS(e)) {
- *peos = 1;
- }
+ else {
+ const char *btype = "data";
+ if (APR_BUCKET_IS_FILE(b)) {
+ btype = "file";
}
- else {
- if (remain > 0 && e->length == ((apr_size_t)-1)) {
- const char *ign;
- apr_size_t ilen;
- rv = apr_bucket_read(e, &ign, &ilen, APR_BLOCK_READ);
- if (rv != APR_SUCCESS) {
- return rv;
- }
- }
-
- if (remain < e->length) {
- if (remain <= 0) {
- return APR_SUCCESS;
- }
- apr_bucket_split(e, remain);
- }
+ else if (APR_BUCKET_IS_PIPE(b)) {
+ btype = "pipe";
}
-
- rv = apr_bucket_setaside(e, p);
-
- /* If the bucket type does not implement setaside, then
- * (hopefully) morph it into a bucket type which does, and set
- * *that* aside... */
- if (rv == APR_ENOTIMPL) {
- const char *s;
- apr_size_t n;
-
- rv = apr_bucket_read(e, &s, &n, APR_BLOCK_READ);
- if (rv == APR_SUCCESS) {
- rv = apr_bucket_setaside(e, p);
- }
+ else if (APR_BUCKET_IS_SOCKET(b)) {
+ btype = "socket";
}
-
- if (rv != APR_SUCCESS) {
- /* Return an error but still save the brigade if
- * ->setaside() is really not implemented. */
- if (rv != APR_ENOTIMPL) {
- return rv;
- }
+ else if (APR_BUCKET_IS_HEAP(b)) {
+ btype = "heap";
+ }
+ else if (APR_BUCKET_IS_TRANSIENT(b)) {
+ btype = "transient";
+ }
+ else if (APR_BUCKET_IS_IMMORTAL(b)) {
+ btype = "immortal";
+ }
+#if APR_HAS_MMAP
+ else if (APR_BUCKET_IS_MMAP(b)) {
+ btype = "mmap";
+ }
+#endif
+ else if (APR_BUCKET_IS_POOL(b)) {
+ btype = "pool";
}
- APR_BUCKET_REMOVE(e);
- APR_BRIGADE_INSERT_TAIL(to, e);
- len += e->length;
- remain -= e->length;
+ off += apr_snprintf(buffer+off, bmax-off, "%s[%ld]",
+ btype,
+ (long)(b->length == ((apr_size_t)-1)?
+ -1 : b->length));
}
-
- *plen = len;
- return APR_SUCCESS;
+ return off;
}
-apr_status_t h2_transfer_brigade(apr_bucket_brigade *to,
- apr_bucket_brigade *from,
- apr_pool_t *p)
+apr_size_t h2_util_bb_print(char *buffer, apr_size_t bmax,
+ const char *tag, const char *sep,
+ apr_bucket_brigade *bb)
{
- apr_bucket *e;
- apr_status_t rv;
-
- while (!APR_BRIGADE_EMPTY(from)) {
- e = APR_BRIGADE_FIRST(from);
-
- rv = apr_bucket_setaside(e, p);
-
- /* If the bucket type does not implement setaside, then
- * (hopefully) morph it into a bucket type which does, and set
- * *that* aside... */
- if (rv == APR_ENOTIMPL) {
- const char *s;
- apr_size_t n;
+ apr_size_t off = 0;
+ const char *sp = "";
+ apr_bucket *b;
+
+ if (bb) {
+ memset(buffer, 0, bmax--);
+ off += apr_snprintf(buffer+off, bmax-off, "%s(", tag);
+ for (b = APR_BRIGADE_FIRST(bb);
+ bmax && (b != APR_BRIGADE_SENTINEL(bb));
+ b = APR_BUCKET_NEXT(b)) {
- rv = apr_bucket_read(e, &s, &n, APR_BLOCK_READ);
- if (rv == APR_SUCCESS) {
- rv = apr_bucket_setaside(e, p);
- }
- }
-
- if (rv != APR_SUCCESS) {
- /* Return an error but still save the brigade if
- * ->setaside() is really not implemented. */
- if (rv != APR_ENOTIMPL) {
- return rv;
- }
+ off += h2_util_bucket_print(buffer+off, bmax-off, b, sp);
+ sp = " ";
}
-
- APR_BUCKET_REMOVE(e);
- APR_BRIGADE_INSERT_TAIL(to, e);
+ off += apr_snprintf(buffer+off, bmax-off, ")%s", sep);
}
- return APR_SUCCESS;
+ else {
+ off += apr_snprintf(buffer+off, bmax-off, "%s(null)%s", tag, sep);
+ }
+ return off;
+}
+
+void h2_util_bb_log(conn_rec *c, int stream_id, int level,
+ const char *tag, apr_bucket_brigade *bb)
+{
+ char buffer[4 * 1024];
+ const char *line = "(null)";
+ apr_size_t len, bmax = sizeof(buffer)/sizeof(buffer[0]);
+
+ len = h2_util_bb_print(buffer, bmax, tag, "", bb);
+ /* Intentional no APLOGNO */
+ ap_log_cerror(APLOG_MARK, level, 0, c, "bb_dump(%ld-%d): %s",
+ c->id, stream_id, len? buffer : line);
}
apr_status_t h2_append_brigade(apr_bucket_brigade *to,
diff --git a/modules/http2/h2_util.h b/modules/http2/h2_util.h
index 4ca2f9b65b..e191c1ee1d 100644
--- a/modules/http2/h2_util.h
+++ b/modules/http2/h2_util.h
@@ -16,6 +16,8 @@
#ifndef __mod_h2__h2_util__
#define __mod_h2__h2_util__
+#include <nghttp2/nghttp2.h>
+
/*******************************************************************************
* some debugging/format helpers
******************************************************************************/
@@ -63,7 +65,120 @@ int h2_ihash_iter(h2_ihash_t *ih, h2_ihash_iter_t *fn, void *ctx);
void h2_ihash_add(h2_ihash_t *ih, void *val);
void h2_ihash_remove(h2_ihash_t *ih, int id);
void h2_ihash_clear(h2_ihash_t *ih);
-
+
+/*******************************************************************************
+ * ilist - sorted list for structs with int identifier as first member
+ ******************************************************************************/
+typedef struct h2_ilist_t h2_ilist_t;
+typedef int h2_ilist_iter_t(void *ctx, void *val);
+
+h2_ilist_t *h2_ilist_create(apr_pool_t *pool);
+
+apr_status_t h2_ilist_add(h2_ilist_t *list, void *val);
+void *h2_ilist_get(h2_ilist_t *list, int id);
+void *h2_ilist_shift(h2_ilist_t *list);
+void *h2_ilist_remove(h2_ilist_t *list, int id);
+
+int h2_ilist_empty(h2_ilist_t *list);
+apr_size_t h2_ilist_count(h2_ilist_t *list);
+
+/* Iterator over all h2_io* in the set or until a
+ * callback returns 0. It is not safe to add or remove
+ * set members during iteration.
+ *
+ * @param set the set of h2_io to iterate over
+ * @param iter the function to call for each io
+ * @param ctx user data for the callback
+ * @return 1 iff iteration completed for all members
+ */
+int h2_ilist_iter(h2_ilist_t *lis, h2_ilist_iter_t *iter, void *ctx);
+
+/*******************************************************************************
+ * iqueue - sorted list of int with user defined ordering
+ ******************************************************************************/
+typedef struct h2_iqueue {
+ int *elts;
+ int head;
+ int nelts;
+ int nalloc;
+ apr_pool_t *pool;
+} h2_iqueue;
+
+/**
+ * Comparator for two int to determine their order.
+ *
+ * @param i1 first int to compare
+ * @param i2 second int to compare
+ * @param ctx provided user data
+ * @return value is the same as for strcmp() and has the effect:
+ * == 0: s1 and s2 are treated equal in ordering
+ * < 0: s1 should be sorted before s2
+ * > 0: s2 should be sorted before s1
+ */
+typedef int h2_iq_cmp(int i1, int i2, void *ctx);
+
+/**
+ * Allocate a new queue from the pool and initialize.
+ * @param id the identifier of the queue
+ * @param pool the memory pool
+ */
+h2_iqueue *h2_iq_create(apr_pool_t *pool, int capacity);
+
+/**
+ * Return != 0 iff there are no tasks in the queue.
+ * @param q the queue to check
+ */
+int h2_iq_empty(h2_iqueue *q);
+
+/**
+ * Return the number of int in the queue.
+ * @param q the queue to get size on
+ */
+int h2_iq_count(h2_iqueue *q);
+
+/**
+ * Add a stream idto the queue.
+ *
+ * @param q the queue to append the task to
+ * @param sid the stream id to add
+ * @param cmp the comparator for sorting
+ * @param ctx user data for comparator
+ */
+void h2_iq_add(h2_iqueue *q, int i, h2_iq_cmp *cmp, void *ctx);
+
+/**
+ * Remove the stream id from the queue. Return != 0 iff task
+ * was found in queue.
+ * @param q the task queue
+ * @param sid the stream id to remove
+ * @return != 0 iff task was found in queue
+ */
+int h2_iq_remove(h2_iqueue *q, int i);
+
+/**
+ * Remove all entries in the queue.
+ */
+void h2_iq_clear(h2_iqueue *q);
+
+/**
+ * Sort the stream idqueue again. Call if the task ordering
+ * has changed.
+ *
+ * @param q the queue to sort
+ * @param cmp the comparator for sorting
+ * @param ctx user data for the comparator
+ */
+void h2_iq_sort(h2_iqueue *q, h2_iq_cmp *cmp, void *ctx);
+
+/**
+ * Get the first stream id from the queue or NULL if the queue is empty.
+ * The task will be removed.
+ *
+ * @param q the queue to get the first task from
+ * @return the first stream id of the queue, 0 if empty
+ */
+int h2_iq_shift(h2_iqueue *q);
+
/*******************************************************************************
* common helpers
******************************************************************************/
@@ -164,33 +279,23 @@ h2_ngheader *h2_util_ngheader_make_req(apr_pool_t *p,
/*******************************************************************************
* apr brigade helpers
******************************************************************************/
+
/**
- * Moves data from one brigade into another. If maxlen > 0, it only
- * moves up to maxlen bytes into the target brigade, making bucket splits
- * if needed.
- * @param to the brigade to move the data to
- * @param from the brigade to get the data from
- * @param maxlen of bytes to move, <= 0 for all
- * @param pfile_buckets_allowed how many file buckets may be moved,
- * may be 0 or NULL
- * @param msg message for use in logging
+ * Concatenate at most length bytes from src to dest brigade, splitting
+ * buckets if necessary and reading buckets of indeterminate length.
*/
-apr_status_t h2_util_move(apr_bucket_brigade *to, apr_bucket_brigade *from,
- apr_off_t maxlen, apr_size_t *pfile_buckets_allowed,
- const char *msg);
-
+apr_status_t h2_brigade_concat_length(apr_bucket_brigade *dest,
+ apr_bucket_brigade *src,
+ apr_off_t length);
+
/**
- * Copies buckets from one brigade into another. If maxlen > 0, it only
- * copies up to maxlen bytes into the target brigade, making bucket splits
- * if needed.
- * @param to the brigade to copy the data to
- * @param from the brigade to get the data from
- * @param maxlen of bytes to copy, <= 0 for all
- * @param msg message for use in logging
+ * Copy at most length bytes from src to dest brigade, splitting
+ * buckets if necessary and reading buckets of indeterminate length.
*/
-apr_status_t h2_util_copy(apr_bucket_brigade *to, apr_bucket_brigade *from,
- apr_off_t maxlen, const char *msg);
-
+apr_status_t h2_brigade_copy_length(apr_bucket_brigade *dest,
+ apr_bucket_brigade *src,
+ apr_off_t length);
+
/**
* Return != 0 iff there is a FLUSH or EOS bucket in the brigade.
* @param bb the brigade to check on
@@ -198,7 +303,6 @@ apr_status_t h2_util_copy(apr_bucket_brigade *to, apr_bucket_brigade *from,
*/
int h2_util_has_eos(apr_bucket_brigade *bb, apr_off_t len);
int h2_util_bb_has_data(apr_bucket_brigade *bb);
-int h2_util_bb_has_data_or_eos(apr_bucket_brigade *bb);
/**
* Check how many bytes of the desired amount are available and if the
@@ -230,6 +334,21 @@ apr_status_t h2_util_bb_readx(apr_bucket_brigade *bb,
apr_off_t *plen, int *peos);
/**
+ * Print a bucket's meta data (type and length) to the buffer.
+ * @return number of characters printed
+ */
+apr_size_t h2_util_bucket_print(char *buffer, apr_size_t bmax,
+ apr_bucket *b, const char *sep);
+
+/**
+ * Prints the brigade bucket types and lengths into the given buffer
+ * up to bmax.
+ * @return number of characters printed
+ */
+apr_size_t h2_util_bb_print(char *buffer, apr_size_t bmax,
+ const char *tag, const char *sep,
+ apr_bucket_brigade *bb);
+/**
* Logs the bucket brigade (which bucket types with what length)
* to the log at the given level.
* @param c the connection to log for
@@ -243,33 +362,6 @@ void h2_util_bb_log(conn_rec *c, int stream_id, int level,
/**
* Transfer buckets from one brigade to another with a limit on the
- * maximum amount of bytes transfered. Sets aside the buckets to
- * pool p.
- * @param to brigade to transfer buckets to
- * @param from brigades to remove buckets from
- * @param p pool that buckets should be setaside to
- * @param plen maximum bytes to transfer, actual bytes transferred
- * @param peos if an EOS bucket was transferred
- */
-apr_status_t h2_ltransfer_brigade(apr_bucket_brigade *to,
- apr_bucket_brigade *from,
- apr_pool_t *p,
- apr_off_t *plen,
- int *peos);
-
-/**
- * Transfer all buckets from one brigade to another. Sets aside the buckets to
- * pool p.
- * @param to brigade to transfer buckets to
- * @param from brigades to remove buckets from
- * @param p pool that buckets should be setaside to
- */
-apr_status_t h2_transfer_brigade(apr_bucket_brigade *to,
- apr_bucket_brigade *from,
- apr_pool_t *p);
-
-/**
- * Transfer buckets from one brigade to another with a limit on the
* maximum amount of bytes transfered. Does no setaside magic, lifetime
* of brigades must fit.
* @param to brigade to transfer buckets to
diff --git a/modules/http2/h2_version.h b/modules/http2/h2_version.h
index a9bd29e895..191af8be4c 100644
--- a/modules/http2/h2_version.h
+++ b/modules/http2/h2_version.h
@@ -26,7 +26,7 @@
* @macro
* Version number of the http2 module as c string
*/
-#define MOD_HTTP2_VERSION "1.4.7-DEV"
+#define MOD_HTTP2_VERSION "1.5.0-DEV"
/**
* @macro
@@ -34,7 +34,7 @@
* release. This is a 24 bit number with 8 bits for major number, 8 bits
* for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203.
*/
-#define MOD_HTTP2_VERSION_NUM 0x010407
+#define MOD_HTTP2_VERSION_NUM 0x010500
#endif /* mod_h2_h2_version_h */
diff --git a/modules/http2/h2_worker.c b/modules/http2/h2_worker.c
index ca6ce3a2f2..e394298e7d 100644
--- a/modules/http2/h2_worker.c
+++ b/modules/http2/h2_worker.c
@@ -42,10 +42,8 @@ static void* APR_THREAD_FUNC execute(apr_thread_t *thread, void *wctx)
/* Get a h2_task from the main workers queue. */
worker->get_next(worker, worker->ctx, &task, &sticky);
while (task) {
- h2_task_do(task, worker->io);
-
- /* if someone was waiting on this task, time to wake up */
- apr_thread_cond_signal(worker->io);
+
+ h2_task_do(task);
/* report the task done and maybe get another one from the same
* mplx (= master connection), if we can be sticky.
*/
@@ -64,40 +62,20 @@ static void* APR_THREAD_FUNC execute(apr_thread_t *thread, void *wctx)
}
h2_worker *h2_worker_create(int id,
- apr_pool_t *parent_pool,
+ apr_pool_t *pool,
apr_threadattr_t *attr,
h2_worker_mplx_next_fn *get_next,
h2_worker_done_fn *worker_done,
void *ctx)
{
- apr_allocator_t *allocator = NULL;
- apr_pool_t *pool = NULL;
- h2_worker *w;
- apr_status_t status;
-
- apr_allocator_create(&allocator);
- apr_allocator_max_free_set(allocator, ap_max_mem_free);
- apr_pool_create_ex(&pool, parent_pool, NULL, allocator);
- apr_pool_tag(pool, "h2_worker");
- apr_allocator_owner_set(allocator, pool);
-
- w = apr_pcalloc(pool, sizeof(h2_worker));
+ h2_worker *w = apr_pcalloc(pool, sizeof(h2_worker));
if (w) {
- APR_RING_ELEM_INIT(w, link);
-
w->id = id;
- w->pool = pool;
-
+ APR_RING_ELEM_INIT(w, link);
w->get_next = get_next;
w->worker_done = worker_done;
w->ctx = ctx;
-
- status = apr_thread_cond_create(&w->io, w->pool);
- if (status != APR_SUCCESS) {
- return NULL;
- }
-
- apr_thread_create(&w->thread, attr, execute, w, w->pool);
+ apr_thread_create(&w->thread, attr, execute, w, pool);
}
return w;
}
@@ -109,22 +87,9 @@ apr_status_t h2_worker_destroy(h2_worker *worker)
apr_thread_join(&status, worker->thread);
worker->thread = NULL;
}
- if (worker->io) {
- apr_thread_cond_destroy(worker->io);
- worker->io = NULL;
- }
- if (worker->pool) {
- apr_pool_destroy(worker->pool);
- /* worker is gone */
- }
return APR_SUCCESS;
}
-int h2_worker_get_id(h2_worker *worker)
-{
- return worker->id;
-}
-
void h2_worker_abort(h2_worker *worker)
{
worker->aborted = 1;
diff --git a/modules/http2/h2_worker.h b/modules/http2/h2_worker.h
index 7a8c254f5d..04ff570361 100644
--- a/modules/http2/h2_worker.h
+++ b/modules/http2/h2_worker.h
@@ -16,7 +16,6 @@
#ifndef __mod_h2__h2_worker__
#define __mod_h2__h2_worker__
-struct apr_thread_cond_t;
struct h2_mplx;
struct h2_request;
struct h2_task;
@@ -39,19 +38,14 @@ typedef void h2_worker_done_fn(h2_worker *worker, void *ctx);
struct h2_worker {
+ int id;
/** Links to the rest of the workers */
APR_RING_ENTRY(h2_worker) link;
-
- int id;
apr_thread_t *thread;
- apr_pool_t *pool;
- struct apr_thread_cond_t *io;
-
h2_worker_mplx_next_fn *get_next;
h2_worker_done_fn *worker_done;
void *ctx;
-
- unsigned int aborted : 1;
+ int aborted;
};
/**
@@ -136,8 +130,6 @@ apr_status_t h2_worker_destroy(h2_worker *worker);
void h2_worker_abort(h2_worker *worker);
-int h2_worker_get_id(h2_worker *worker);
-
int h2_worker_is_aborted(h2_worker *worker);
#endif /* defined(__mod_h2__h2_worker__) */
diff --git a/modules/http2/h2_workers.c b/modules/http2/h2_workers.c
index 2c1dc8dab4..2a1599914c 100644
--- a/modules/http2/h2_workers.c
+++ b/modules/http2/h2_workers.c
@@ -116,7 +116,7 @@ static apr_status_t get_mplx_next(h2_worker *worker, void *ctx,
if (status == APR_SUCCESS) {
++workers->idle_workers;
ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
- "h2_worker(%d): looking for work", h2_worker_get_id(worker));
+ "h2_worker(%d): looking for work", worker->id);
while (!h2_worker_is_aborted(worker) && !workers->aborted
&& !(task = next_task(workers))) {
@@ -195,7 +195,7 @@ static void worker_done(h2_worker *worker, void *ctx)
apr_status_t status = apr_thread_mutex_lock(workers->lock);
if (status == APR_SUCCESS) {
ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
- "h2_worker(%d): done", h2_worker_get_id(worker));
+ "h2_worker(%d): done", worker->id);
H2_WORKER_REMOVE(worker);
--workers->worker_count;
H2_WORKER_LIST_INSERT_TAIL(&workers->zombies, worker);
@@ -213,7 +213,7 @@ static apr_status_t add_worker(h2_workers *workers)
return APR_ENOMEM;
}
ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
- "h2_workers: adding worker(%d)", h2_worker_get_id(w));
+ "h2_workers: adding worker(%d)", w->id);
++workers->worker_count;
H2_WORKER_LIST_INSERT_TAIL(&workers->workers, w);
return APR_SUCCESS;
diff --git a/modules/http2/mod_http2.c b/modules/http2/mod_http2.c
index 0d33969161..480917a419 100644
--- a/modules/http2/mod_http2.c
+++ b/modules/http2/mod_http2.c
@@ -57,6 +57,13 @@ AP_DECLARE_MODULE(http2) = {
static int h2_h2_fixups(request_rec *r);
+typedef struct {
+ unsigned int change_prio : 1;
+ unsigned int sha256 : 1;
+} features;
+
+static features myfeats;
+
/* The module initialization. Called once as apache hook, before any multi
* processing (threaded or not) happens. It is typically at least called twice,
* see
@@ -77,7 +84,16 @@ static int h2_post_config(apr_pool_t *p, apr_pool_t *plog,
const char *mod_h2_init_key = "mod_http2_init_counter";
nghttp2_info *ngh2;
apr_status_t status;
+ const char *sep = "";
+
(void)plog;(void)ptemp;
+#ifdef H2_NG2_CHANGE_PRIO
+ myfeats.change_prio = 1;
+ sep = "+";
+#endif
+#ifdef H2_OPENSSL
+ myfeats.sha256 = 1;
+#endif
apr_pool_userdata_get(&data, mod_h2_init_key, s->process->pool);
if ( data == NULL ) {
@@ -90,8 +106,11 @@ static int h2_post_config(apr_pool_t *p, apr_pool_t *plog,
ngh2 = nghttp2_version(0);
ap_log_error( APLOG_MARK, APLOG_INFO, 0, s, APLOGNO(03090)
- "mod_http2 (v%s, nghttp2 %s), initializing...",
- MOD_HTTP2_VERSION, ngh2? ngh2->version_str : "unknown");
+ "mod_http2 (v%s, feats=%s%s%s, nghttp2 %s), initializing...",
+ MOD_HTTP2_VERSION,
+ myfeats.change_prio? "CHPRIO" : "", sep,
+ myfeats.sha256? "SHA256" : "",
+ ngh2? ngh2->version_str : "unknown");
switch (h2_conn_mpm_type()) {
case H2_MPM_SIMPLE:
diff --git a/modules/http2/mod_http2.dsp b/modules/http2/mod_http2.dsp
index eb55028a06..cc8f666387 100644
--- a/modules/http2/mod_http2.dsp
+++ b/modules/http2/mod_http2.dsp
@@ -105,6 +105,10 @@ SOURCE=./h2_alt_svc.c
# End Source File
# Begin Source File
+SOURCE=./h2_bucket_beam.c
+# End Source File
+# Begin Source File
+
SOURCE=./h2_bucket_eoc.c
# End Source File
# Begin Source File
@@ -141,18 +145,10 @@ SOURCE=./h2_h2.c
# End Source File
# Begin Source File
-SOURCE=./h2_int_queue.c
-# End Source File
-# Begin Source File
-
SOURCE=./h2_io.c
# End Source File
# Begin Source File
-SOURCE=./h2_io_set.c
-# End Source File
-# Begin Source File
-
SOURCE=./h2_mplx.c
# End Source File
# Begin Source File
@@ -189,14 +185,6 @@ SOURCE=./h2_task.c
# End Source File
# Begin Source File
-SOURCE=./h2_task_input.c
-# End Source File
-# Begin Source File
-
-SOURCE=./h2_task_output.c
-# End Source File
-# Begin Source File
-
SOURCE=./h2_util.c
# End Source File
# Begin Source File
diff --git a/modules/http2/mod_proxy_http2.c b/modules/http2/mod_proxy_http2.c
index 7335e9527f..f643265497 100644
--- a/modules/http2/mod_proxy_http2.c
+++ b/modules/http2/mod_proxy_http2.c
@@ -21,7 +21,6 @@
#include "mod_proxy_http2.h"
-#include "h2_int_queue.h"
#include "h2_request.h"
#include "h2_util.h"
#include "h2_version.h"