summaryrefslogtreecommitdiffstats
path: root/server
diff options
context:
space:
mode:
authorGraham Leggett <minfrin@apache.org>2015-10-04 12:10:51 +0200
committerGraham Leggett <minfrin@apache.org>2015-10-04 12:10:51 +0200
commit615f97f93364fd7189ce973478266ce3d229d76b (patch)
tree84b9f0601b3a3ba6ecb0caf794378d7019f850e5 /server
parentleave LoadModule of mod_http2 commented-out by default (diff)
downloadapache2-615f97f93364fd7189ce973478266ce3d229d76b.tar.xz
apache2-615f97f93364fd7189ce973478266ce3d229d76b.zip
core: Extend support for asynchronous write completion from the
network filter to any connection or request filter. git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1706669 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'server')
-rw-r--r--server/core.c6
-rw-r--r--server/core_filters.c206
-rw-r--r--server/mpm/event/event.c35
-rw-r--r--server/mpm/motorz/motorz.c37
-rw-r--r--server/mpm/simple/simple_io.c35
-rw-r--r--server/request.c58
-rw-r--r--server/util_filter.c263
7 files changed, 431 insertions, 209 deletions
diff --git a/server/core.c b/server/core.c
index 4d33a00f72..de3fa23f91 100644
--- a/server/core.c
+++ b/server/core.c
@@ -112,6 +112,7 @@ AP_IMPLEMENT_HOOK_RUN_FIRST(apr_status_t, insert_network_bucket,
/* Handles for core filters */
AP_DECLARE_DATA ap_filter_rec_t *ap_subreq_core_filter_handle;
+AP_DECLARE_DATA ap_filter_rec_t *ap_request_core_filter_handle;
AP_DECLARE_DATA ap_filter_rec_t *ap_core_output_filter_handle;
AP_DECLARE_DATA ap_filter_rec_t *ap_content_length_filter_handle;
AP_DECLARE_DATA ap_filter_rec_t *ap_core_input_filter_handle;
@@ -5007,6 +5008,8 @@ static conn_rec *core_create_conn(apr_pool_t *ptrans, server_rec *s,
c->id = id;
c->bucket_alloc = alloc;
+ c->empty = apr_brigade_create(c->pool, c->bucket_alloc);
+ c->filters = apr_hash_make(c->pool);
c->clogging_input_filters = 0;
@@ -5395,6 +5398,9 @@ static void register_hooks(apr_pool_t *p)
ap_core_output_filter_handle =
ap_register_output_filter("CORE", ap_core_output_filter,
NULL, AP_FTYPE_NETWORK);
+ ap_request_core_filter_handle =
+ ap_register_output_filter("REQ_CORE", ap_request_core_filter,
+ NULL, AP_FTYPE_TRANSCODE);
ap_subreq_core_filter_handle =
ap_register_output_filter("SUBREQ_CORE", ap_sub_req_output_filter,
NULL, AP_FTYPE_CONTENT_SET);
diff --git a/server/core_filters.c b/server/core_filters.c
index a6c2bd666b..0f530f868f 100644
--- a/server/core_filters.c
+++ b/server/core_filters.c
@@ -78,9 +78,7 @@ do { \
#define APLOG_MODULE_INDEX AP_CORE_MODULE_INDEX
struct core_output_filter_ctx {
- apr_bucket_brigade *buffered_bb;
apr_bucket_brigade *tmp_flush_bb;
- apr_pool_t *deferred_write_pool;
apr_size_t bytes_written;
};
@@ -328,11 +326,6 @@ apr_status_t ap_core_input_filter(ap_filter_t *f, apr_bucket_brigade *b,
return APR_SUCCESS;
}
-static void setaside_remaining_output(ap_filter_t *f,
- core_output_filter_ctx_t *ctx,
- apr_bucket_brigade *bb,
- conn_rec *c);
-
static apr_status_t send_brigade_nonblocking(apr_socket_t *s,
apr_bucket_brigade *bb,
apr_size_t *bytes_written,
@@ -358,33 +351,23 @@ static apr_status_t sendfile_nonblocking(apr_socket_t *s,
conn_rec *c);
#endif
-/* XXX: Should these be configurable parameters? */
-#define THRESHOLD_MIN_WRITE 4096
-#define THRESHOLD_MAX_BUFFER 65536
-#define MAX_REQUESTS_IN_PIPELINE 5
-
/* Optional function coming from mod_logio, used for logging of output
* traffic
*/
extern APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_out) *ap__logio_add_bytes_out;
-apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb)
+apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *bb)
{
conn_rec *c = f->c;
core_net_rec *net = f->ctx;
core_output_filter_ctx_t *ctx = net->out_ctx;
- apr_bucket_brigade *bb = NULL;
- apr_bucket *bucket, *next, *flush_upto = NULL;
- apr_size_t bytes_in_brigade, non_file_bytes_in_brigade;
- int eor_buckets_in_brigade, morphing_bucket_in_brigade;
+ apr_bucket *flush_upto = NULL;
apr_status_t rv;
int loglevel = ap_get_conn_module_loglevel(c, APLOG_MODULE_INDEX);
/* Fail quickly if the connection has already been aborted. */
if (c->aborted) {
- if (new_bb != NULL) {
- apr_brigade_cleanup(new_bb);
- }
+ apr_brigade_cleanup(bb);
return APR_ECONNABORTED;
}
@@ -397,33 +380,14 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb)
* allocated from bb->pool which might be wrong.
*/
ctx->tmp_flush_bb = apr_brigade_create(c->pool, c->bucket_alloc);
- /* same for buffered_bb and ap_save_brigade */
- ctx->buffered_bb = apr_brigade_create(c->pool, c->bucket_alloc);
- }
-
- if (new_bb != NULL)
- bb = new_bb;
-
- if ((ctx->buffered_bb != NULL) &&
- !APR_BRIGADE_EMPTY(ctx->buffered_bb)) {
- if (new_bb != NULL) {
- APR_BRIGADE_PREPEND(bb, ctx->buffered_bb);
- }
- else {
- bb = ctx->buffered_bb;
- }
- c->data_in_output_filters = 0;
- }
- else if (new_bb == NULL) {
- return APR_SUCCESS;
}
/* Scan through the brigade and decide whether to attempt a write,
* and how much to write, based on the following rules:
*
- * 1) The new_bb is null: Do a nonblocking write of as much as
+ * 1) The bb is empty: Do a nonblocking write of as much as
* possible: do a nonblocking write of as much data as possible,
- * then save the rest in ctx->buffered_bb. (If new_bb == NULL,
+ * then save the rest in ctx->buffered_bb. (If bb is empty,
* it probably means that the MPM is doing asynchronous write
* completion and has just determined that this connection
* is writable.)
@@ -459,91 +423,12 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb)
* 3) Actually do the blocking write up to the last bucket determined
* by rules 2a-d. The point of doing only one flush is to make as
* few calls to writev() as possible.
- *
- * 4) If the brigade contains at least THRESHOLD_MIN_WRITE
- * bytes: Do a nonblocking write of as much data as possible,
- * then save the rest in ctx->buffered_bb.
*/
- if (new_bb == NULL) {
- rv = send_brigade_nonblocking(net->client_socket, bb,
- &(ctx->bytes_written), c);
- if (rv != APR_SUCCESS && !APR_STATUS_IS_EAGAIN(rv)) {
- /* The client has aborted the connection */
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c,
- "core_output_filter: writing data to the network");
- apr_brigade_cleanup(bb);
- c->aborted = 1;
- return rv;
- }
- setaside_remaining_output(f, ctx, bb, c);
- return APR_SUCCESS;
- }
-
- bytes_in_brigade = 0;
- non_file_bytes_in_brigade = 0;
- eor_buckets_in_brigade = 0;
- morphing_bucket_in_brigade = 0;
-
- for (bucket = APR_BRIGADE_FIRST(bb); bucket != APR_BRIGADE_SENTINEL(bb);
- bucket = next) {
- next = APR_BUCKET_NEXT(bucket);
-
- if (!APR_BUCKET_IS_METADATA(bucket)) {
- if (bucket->length == (apr_size_t)-1) {
- /*
- * A setaside of morphing buckets would read everything into
- * memory. Instead, we will flush everything up to and
- * including this bucket.
- */
- morphing_bucket_in_brigade = 1;
- }
- else {
- bytes_in_brigade += bucket->length;
- if (!APR_BUCKET_IS_FILE(bucket))
- non_file_bytes_in_brigade += bucket->length;
- }
- }
- else if (AP_BUCKET_IS_EOR(bucket)) {
- eor_buckets_in_brigade++;
- }
-
- if (APR_BUCKET_IS_FLUSH(bucket)
- || non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER
- || morphing_bucket_in_brigade
- || eor_buckets_in_brigade > MAX_REQUESTS_IN_PIPELINE) {
- /* this segment of the brigade MUST be sent before returning. */
-
- if (loglevel >= APLOG_TRACE6) {
- char *reason = APR_BUCKET_IS_FLUSH(bucket) ?
- "FLUSH bucket" :
- (non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER) ?
- "THRESHOLD_MAX_BUFFER" :
- morphing_bucket_in_brigade ? "morphing bucket" :
- "MAX_REQUESTS_IN_PIPELINE";
- ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, c,
- "will flush because of %s", reason);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, c,
- "seen in brigade%s: bytes: %" APR_SIZE_T_FMT
- ", non-file bytes: %" APR_SIZE_T_FMT ", eor "
- "buckets: %d, morphing buckets: %d",
- flush_upto == NULL ? " so far"
- : " since last flush point",
- bytes_in_brigade,
- non_file_bytes_in_brigade,
- eor_buckets_in_brigade,
- morphing_bucket_in_brigade);
- }
- /*
- * Defer the actual blocking write to avoid doing many writes.
- */
- flush_upto = next;
+ ap_filter_reinstate_brigade(f, bb, &flush_upto);
- bytes_in_brigade = 0;
- non_file_bytes_in_brigade = 0;
- eor_buckets_in_brigade = 0;
- morphing_bucket_in_brigade = 0;
- }
+ if (APR_BRIGADE_EMPTY(bb)) {
+ return APR_SUCCESS;
}
if (flush_upto != NULL) {
@@ -571,71 +456,30 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb)
APR_BRIGADE_CONCAT(bb, ctx->tmp_flush_bb);
}
+ rv = send_brigade_nonblocking(net->client_socket, bb, &(ctx->bytes_written),
+ c);
+ if ((rv != APR_SUCCESS) && (!APR_STATUS_IS_EAGAIN(rv))) {
+ /* The client has aborted the connection */
+ ap_log_cerror(
+ APLOG_MARK, APLOG_TRACE1, rv, c,
+ "core_output_filter: writing data to the network");
+ apr_brigade_cleanup(bb);
+ c->aborted = 1;
+ return rv;
+ }
if (loglevel >= APLOG_TRACE8) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, c,
- "brigade contains: bytes: %" APR_SIZE_T_FMT
- ", non-file bytes: %" APR_SIZE_T_FMT
- ", eor buckets: %d, morphing buckets: %d",
- bytes_in_brigade, non_file_bytes_in_brigade,
- eor_buckets_in_brigade, morphing_bucket_in_brigade);
+ ap_log_cerror(
+ APLOG_MARK, APLOG_TRACE8, 0, c,
+ "tried nonblocking write, total bytes "
+ "written: %" APR_SIZE_T_FMT, ctx->bytes_written);
}
- if (bytes_in_brigade >= THRESHOLD_MIN_WRITE) {
- rv = send_brigade_nonblocking(net->client_socket, bb,
- &(ctx->bytes_written), c);
- if ((rv != APR_SUCCESS) && (!APR_STATUS_IS_EAGAIN(rv))) {
- /* The client has aborted the connection */
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c,
- "core_output_filter: writing data to the network");
- apr_brigade_cleanup(bb);
- c->aborted = 1;
- return rv;
- }
- if (loglevel >= APLOG_TRACE8) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, c,
- "tried nonblocking write, total bytes "
- "written: %" APR_SIZE_T_FMT,
- ctx->bytes_written);
- }
- }
+ remove_empty_buckets(bb);
+ ap_filter_setaside_brigade(f, bb);
- setaside_remaining_output(f, ctx, bb, c);
return APR_SUCCESS;
}
-/*
- * This function assumes that either ctx->buffered_bb == NULL, or
- * ctx->buffered_bb is empty, or ctx->buffered_bb == bb
- */
-static void setaside_remaining_output(ap_filter_t *f,
- core_output_filter_ctx_t *ctx,
- apr_bucket_brigade *bb,
- conn_rec *c)
-{
- if (bb == NULL) {
- return;
- }
- remove_empty_buckets(bb);
- if (!APR_BRIGADE_EMPTY(bb)) {
- c->data_in_output_filters = 1;
- if (bb != ctx->buffered_bb) {
- if (!ctx->deferred_write_pool) {
- apr_pool_create(&ctx->deferred_write_pool, c->pool);
- apr_pool_tag(ctx->deferred_write_pool, "deferred_write");
- }
- ap_save_brigade(f, &(ctx->buffered_bb), &bb,
- ctx->deferred_write_pool);
- }
- }
- else if (ctx->deferred_write_pool) {
- /*
- * There are no more requests in the pipeline. We can just clear the
- * pool.
- */
- apr_pool_clear(ctx->deferred_write_pool);
- }
-}
-
#ifndef APR_MAX_IOVEC_SIZE
#define MAX_IOVEC_TO_WRITE 16
#else
diff --git a/server/mpm/event/event.c b/server/mpm/event/event.c
index ee0d8fe690..1cdc52c762 100644
--- a/server/mpm/event/event.c
+++ b/server/mpm/event/event.c
@@ -1146,19 +1146,38 @@ read_request:
}
if (cs->pub.state == CONN_STATE_WRITE_COMPLETION) {
- ap_filter_t *output_filter = c->output_filters;
- apr_status_t rv;
+ apr_hash_index_t *rindex;
+ apr_status_t rv = APR_SUCCESS;
+ int data_in_output_filters = 0;
ap_update_child_status_from_conn(sbh, SERVER_BUSY_WRITE, c);
- while (output_filter->next != NULL) {
- output_filter = output_filter->next;
+
+ rindex = apr_hash_first(NULL, c->filters);
+ while (rindex) {
+ ap_filter_t *f = apr_hash_this_val(rindex);
+
+ if (!APR_BRIGADE_EMPTY(f->bb)) {
+
+ rv = ap_pass_brigade(f, c->empty);
+ apr_brigade_cleanup(c->empty);
+ if (APR_SUCCESS != rv) {
+ ap_log_cerror(
+ APLOG_MARK, APLOG_DEBUG, rv, c, APLOGNO(00470)
+ "write failure in '%s' output filter", f->frec->name);
+ break;
+ }
+
+ if (ap_filter_should_yield(f)) {
+ data_in_output_filters = 1;
+ }
+ }
+
+ rindex = apr_hash_next(rindex);
}
- rv = output_filter->frec->filter_func.out_func(output_filter, NULL);
+
if (rv != APR_SUCCESS) {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, rv, c, APLOGNO(00470)
- "network write failure in core output filter");
cs->pub.state = CONN_STATE_LINGER;
}
- else if (c->data_in_output_filters) {
+ else if (data_in_output_filters) {
/* Still in WRITE_COMPLETION_STATE:
* Set a write timeout for this connection, and let the
* event thread poll for writeability.
diff --git a/server/mpm/motorz/motorz.c b/server/mpm/motorz/motorz.c
index f359dbafb9..a10ead0f59 100644
--- a/server/mpm/motorz/motorz.c
+++ b/server/mpm/motorz/motorz.c
@@ -359,21 +359,38 @@ static apr_status_t motorz_io_process(motorz_conn_t *scon)
}
if (scon->cs.state == CONN_STATE_WRITE_COMPLETION) {
- ap_filter_t *output_filter = c->output_filters;
- ap_update_child_status_from_conn(scon->sbh, SERVER_BUSY_WRITE, c);
- while (output_filter->next != NULL) {
- output_filter = output_filter->next;
- }
+ apr_hash_index_t *rindex;
+ apr_status_t rv = APR_SUCCESS;
+ int data_in_output_filters = 0;
+ ap_update_child_status_from_conn(sbh, SERVER_BUSY_WRITE, c);
+
+ rindex = apr_hash_first(NULL, c->filters);
+ while (rindex) {
+ ap_filter_t *f = apr_hash_this_val(rindex);
+
+ if (!APR_BRIGADE_EMPTY(f->bb)) {
+
+ rv = ap_pass_brigade(f, c->empty);
+ apr_brigade_cleanup(c->empty);
+ if (APR_SUCCESS != rv) {
+ ap_log_cerror(
+ APLOG_MARK, APLOG_DEBUG, rv, c, APLOGNO(02848)
+ "write failure in '%s' output filter", f->frec->name);
+ break;
+ }
+
+ if (ap_filter_should_yield(f)) {
+ data_in_output_filters = 1;
+ }
+ }
- rv = output_filter->frec->filter_func.out_func(output_filter,
- NULL);
+ rindex = apr_hash_next(rindex);
+ }
if (rv != APR_SUCCESS) {
- ap_log_error(APLOG_MARK, APLOG_WARNING, rv, ap_server_conf, APLOGNO(02848)
- "network write failure in core output filter");
scon->cs.state = CONN_STATE_LINGER;
}
- else if (c->data_in_output_filters) {
+ else if (data_in_output_filters) {
/* Still in WRITE_COMPLETION_STATE:
* Set a write timeout for this connection, and let the
* event thread poll for writeability.
diff --git a/server/mpm/simple/simple_io.c b/server/mpm/simple/simple_io.c
index b14aae474f..47c13d71c6 100644
--- a/server/mpm/simple/simple_io.c
+++ b/server/mpm/simple/simple_io.c
@@ -92,20 +92,37 @@ static apr_status_t simple_io_process(simple_conn_t * scon)
}
if (scon->cs.state == CONN_STATE_WRITE_COMPLETION) {
- ap_filter_t *output_filter = c->output_filters;
- while (output_filter->next != NULL) {
- output_filter = output_filter->next;
- }
+ apr_hash_index_t *rindex;
+ apr_status_t rv = APR_SUCCESS;
+ int data_in_output_filters = 0;
+
+ rindex = apr_hash_first(NULL, c->filters);
+ while (rindex) {
+ ap_filter_t *f = apr_hash_this_val(rindex);
+
+ if (!APR_BRIGADE_EMPTY(f->bb)) {
+
+ rv = ap_pass_brigade(f, c->empty);
+ apr_brigade_cleanup(c->empty);
+ if (APR_SUCCESS != rv) {
+ ap_log_cerror(
+ APLOG_MARK, APLOG_DEBUG, rv, c, APLOGNO(00249)
+ "write failure in '%s' output filter", f->frec->name);
+ break;
+ }
+
+ if (ap_filter_should_yield(f)) {
+ data_in_output_filters = 1;
+ }
+ }
- rv = output_filter->frec->filter_func.out_func(output_filter,
- NULL);
+ rindex = apr_hash_next(rindex);
+ }
if (rv != APR_SUCCESS) {
- ap_log_error(APLOG_MARK, APLOG_WARNING, rv, ap_server_conf, APLOGNO(00249)
- "network write failure in core output filter");
scon->cs.state = CONN_STATE_LINGER;
}
- else if (c->data_in_output_filters) {
+ else if (data_in_output_filters) {
/* Still in WRITE_COMPLETION_STATE:
* Set a write timeout for this connection, and let the
* event thread poll for writeability.
diff --git a/server/request.c b/server/request.c
index 67e535d14c..9c9ad9f93b 100644
--- a/server/request.c
+++ b/server/request.c
@@ -2036,6 +2036,64 @@ AP_CORE_DECLARE_NONSTD(apr_status_t) ap_sub_req_output_filter(ap_filter_t *f,
return APR_SUCCESS;
}
+AP_CORE_DECLARE_NONSTD(apr_status_t) ap_request_core_filter(ap_filter_t *f,
+ apr_bucket_brigade *bb)
+{
+ apr_bucket *flush_upto = NULL;
+ apr_status_t status = APR_SUCCESS;
+ apr_bucket_brigade *tmp_bb = f->ctx;
+
+ if (!tmp_bb) {
+ tmp_bb = f->ctx = apr_brigade_create(f->r->pool, f->c->bucket_alloc);
+ }
+
+ /* Reinstate any buffered content */
+ ap_filter_reinstate_brigade(f, bb, &flush_upto);
+
+ while (!APR_BRIGADE_EMPTY(bb)) {
+ apr_bucket *bucket = APR_BRIGADE_FIRST(bb);
+
+ /* if the core has set aside data, back off and try later */
+ if (!flush_upto) {
+ if (ap_filter_should_yield(f)) {
+ break;
+ }
+ }
+ else if (flush_upto == bucket) {
+ flush_upto = NULL;
+ }
+
+ /* have we found a morphing bucket? if so, force it to morph into something
+ * safe to pass down to the connection filters without needing to be set
+ * aside.
+ */
+ if (!APR_BUCKET_IS_METADATA(bucket)) {
+ if (bucket->length == (apr_size_t) - 1) {
+ const char *data;
+ apr_size_t size;
+ if (APR_SUCCESS
+ != (status = apr_bucket_read(bucket, &data, &size,
+ APR_BLOCK_READ))) {
+ return status;
+ }
+ }
+ }
+
+ /* pass each bucket down the chain */
+ APR_BUCKET_REMOVE(bucket);
+ APR_BRIGADE_INSERT_TAIL(tmp_bb, bucket);
+
+ status = ap_pass_brigade(f->next, tmp_bb);
+ if (!APR_STATUS_IS_EOF(status) && (status != APR_SUCCESS)) {
+ return status;
+ }
+
+ }
+
+ ap_filter_setaside_brigade(f, bb);
+ return status;
+}
+
extern APR_OPTIONAL_FN_TYPE(authz_some_auth_required) *ap__authz_ap_some_auth_required;
AP_DECLARE(int) ap_some_auth_required(request_rec *r)
diff --git a/server/util_filter.c b/server/util_filter.c
index 01eb533520..ad14112ddb 100644
--- a/server/util_filter.c
+++ b/server/util_filter.c
@@ -24,6 +24,7 @@
#include "http_config.h"
#include "http_core.h"
#include "http_log.h"
+#include "http_request.h"
#include "util_filter.h"
/* NOTE: Apache's current design doesn't allow a pool to be passed thru,
@@ -32,6 +33,10 @@
#define FILTER_POOL apr_hook_global_pool
#include "ap_hooks.h" /* for apr_hook_global_pool */
+/* XXX: Should these be configurable parameters? */
+#define THRESHOLD_MAX_BUFFER 65536
+#define MAX_REQUESTS_IN_PIPELINE 5
+
/*
** This macro returns true/false if a given filter should be inserted BEFORE
** another filter. This will happen when one of: 1) there isn't another
@@ -319,6 +324,8 @@ static ap_filter_t *add_any_filter_handle(ap_filter_rec_t *frec, void *ctx,
f->r = frec->ftype < AP_FTYPE_CONNECTION ? r : NULL;
f->c = c;
f->next = NULL;
+ f->bb = NULL;
+ f->deferred_pool = NULL;
if (INSERT_BEFORE(f, *outf)) {
f->next = *outf;
@@ -474,6 +481,16 @@ AP_DECLARE(void) ap_remove_input_filter(ap_filter_t *f)
AP_DECLARE(void) ap_remove_output_filter(ap_filter_t *f)
{
+
+ if ((f->bb) && !APR_BRIGADE_EMPTY(f->bb)) {
+ apr_brigade_cleanup(f->bb);
+ }
+
+ if (f->deferred_pool) {
+ apr_pool_destroy(f->deferred_pool);
+ f->deferred_pool = NULL;
+ }
+
remove_any_filter(f, f->r ? &f->r->output_filters : NULL,
f->r ? &f->r->proto_output_filters : NULL,
&f->c->output_filters);
@@ -566,6 +583,7 @@ AP_DECLARE(apr_status_t) ap_pass_brigade(ap_filter_t *next,
{
if (next) {
apr_bucket *e;
+
if ((e = APR_BRIGADE_LAST(bb)) && APR_BUCKET_IS_EOS(e) && next->r) {
/* This is only safe because HTTP_HEADER filter is always in
* the filter stack. This ensures that there is ALWAYS a
@@ -635,7 +653,8 @@ AP_DECLARE(apr_status_t) ap_save_brigade(ap_filter_t *f,
apr_status_t rv, srv = APR_SUCCESS;
/* If have never stored any data in the filter, then we had better
- * create an empty bucket brigade so that we can concat.
+ * create an empty bucket brigade so that we can concat. Register
+ * a cleanup to zero out the pointer if the pool is cleared.
*/
if (!(*saveto)) {
*saveto = apr_brigade_create(p, f->c->bucket_alloc);
@@ -673,6 +692,248 @@ AP_DECLARE(apr_status_t) ap_save_brigade(ap_filter_t *f,
return srv;
}
+static apr_status_t filters_cleanup(void *data)
+{
+ ap_filter_t **key = data;
+
+ apr_hash_set((*key)->c->filters, key, sizeof(ap_filter_t **), NULL);
+
+ return APR_SUCCESS;
+}
+
+AP_DECLARE(apr_status_t) ap_filter_setaside_brigade(ap_filter_t *f,
+ apr_bucket_brigade *bb)
+{
+ int loglevel = ap_get_conn_module_loglevel(f->c, APLOG_MODULE_INDEX);
+
+ if (loglevel >= APLOG_TRACE6) {
+ ap_log_cerror(
+ APLOG_MARK, APLOG_TRACE6, 0, f->c,
+ "setaside %s brigade to %s brigade in '%s' output filter",
+ (APR_BRIGADE_EMPTY(bb) ? "empty" : "full"),
+ (!f->bb || APR_BRIGADE_EMPTY(f->bb) ? "empty" : "full"), f->frec->name);
+ }
+
+ if (!APR_BRIGADE_EMPTY(bb)) {
+ apr_pool_t *pool;
+ /*
+ * Set aside the brigade bb within f->bb.
+ */
+ if (!f->bb) {
+ ap_filter_t **key;
+
+ pool = f->r ? f->r->pool : f->c->pool;
+
+ key = apr_palloc(pool, sizeof(ap_filter_t **));
+ *key = f;
+ apr_hash_set(f->c->filters, key, sizeof(ap_filter_t **), f);
+
+ f->bb = apr_brigade_create(pool, f->c->bucket_alloc);
+
+ apr_pool_pre_cleanup_register(pool, key, filters_cleanup);
+
+ }
+
+ /* decide what pool we setaside to, request pool or deferred pool? */
+ if (f->r) {
+ pool = f->r->pool;
+ APR_BRIGADE_CONCAT(f->bb, bb);
+ }
+ else {
+ if (!f->deferred_pool) {
+ apr_pool_create(&f->deferred_pool, f->c->pool);
+ apr_pool_tag(f->deferred_pool, "deferred_pool");
+ }
+ pool = f->deferred_pool;
+ return ap_save_brigade(f, &f->bb, &bb, pool);
+ }
+
+ }
+ else if (f->deferred_pool) {
+ /*
+ * There are no more requests in the pipeline. We can just clear the
+ * pool.
+ */
+ apr_brigade_cleanup(f->bb);
+ apr_pool_clear(f->deferred_pool);
+ }
+ return APR_SUCCESS;
+}
+
+AP_DECLARE(apr_status_t) ap_filter_reinstate_brigade(ap_filter_t *f,
+ apr_bucket_brigade *bb,
+ apr_bucket **flush_upto)
+{
+ apr_bucket *bucket, *next;
+ apr_size_t bytes_in_brigade, non_file_bytes_in_brigade;
+ int eor_buckets_in_brigade, morphing_bucket_in_brigade;
+ int loglevel = ap_get_conn_module_loglevel(f->c, APLOG_MODULE_INDEX);
+
+ if (loglevel >= APLOG_TRACE6) {
+ ap_log_cerror(
+ APLOG_MARK, APLOG_TRACE6, 0, f->c,
+ "reinstate %s brigade to %s brigade in '%s' output filter",
+ (!f->bb || APR_BRIGADE_EMPTY(f->bb) ? "empty" : "full"),
+ (APR_BRIGADE_EMPTY(bb) ? "empty" : "full"), f->frec->name);
+ }
+
+ if (f->bb && !APR_BRIGADE_EMPTY(f->bb)) {
+ APR_BRIGADE_PREPEND(bb, f->bb);
+ }
+
+ /*
+ * Determine if and up to which bucket we need to do a blocking write:
+ *
+ * a) The brigade contains a flush bucket: Do a blocking write
+ * of everything up that point.
+ *
+ * b) The request is in CONN_STATE_HANDLER state, and the brigade
+ * contains at least THRESHOLD_MAX_BUFFER bytes in non-file
+ * buckets: Do blocking writes until the amount of data in the
+ * buffer is less than THRESHOLD_MAX_BUFFER. (The point of this
+ * rule is to provide flow control, in case a handler is
+ * streaming out lots of data faster than the data can be
+ * sent to the client.)
+ *
+ * c) The request is in CONN_STATE_HANDLER state, and the brigade
+ * contains at least MAX_REQUESTS_IN_PIPELINE EOR buckets:
+ * Do blocking writes until less than MAX_REQUESTS_IN_PIPELINE EOR
+ * buckets are left. (The point of this rule is to prevent too many
+ * FDs being kept open by pipelined requests, possibly allowing a
+ * DoS).
+ *
+ * d) The request is being served by a connection filter and the
+ * brigade contains a morphing bucket: If there was no other
+ * reason to do a blocking write yet, try reading the bucket. If its
+ * contents fit into memory before THRESHOLD_MAX_BUFFER is reached,
+ * everything is fine. Otherwise we need to do a blocking write the
+ * up to and including the morphing bucket, because ap_save_brigade()
+ * would read the whole bucket into memory later on.
+ */
+
+ *flush_upto = NULL;
+
+ bytes_in_brigade = 0;
+ non_file_bytes_in_brigade = 0;
+ eor_buckets_in_brigade = 0;
+ morphing_bucket_in_brigade = 0;
+
+ for (bucket = APR_BRIGADE_FIRST(bb); bucket != APR_BRIGADE_SENTINEL(bb);
+ bucket = next) {
+ next = APR_BUCKET_NEXT(bucket);
+
+ if (!APR_BUCKET_IS_METADATA(bucket)) {
+ if (bucket->length == (apr_size_t)-1) {
+ /*
+ * A setaside of morphing buckets would read everything into
+ * memory. Instead, we will flush everything up to and
+ * including this bucket.
+ */
+ morphing_bucket_in_brigade = 1;
+ }
+ else {
+ bytes_in_brigade += bucket->length;
+ if (!APR_BUCKET_IS_FILE(bucket))
+ non_file_bytes_in_brigade += bucket->length;
+ }
+ }
+ else if (AP_BUCKET_IS_EOR(bucket)) {
+ eor_buckets_in_brigade++;
+ }
+
+ if (APR_BUCKET_IS_FLUSH(bucket)
+ || non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER
+ || (!f->r && morphing_bucket_in_brigade)
+ || eor_buckets_in_brigade > MAX_REQUESTS_IN_PIPELINE) {
+ /* this segment of the brigade MUST be sent before returning. */
+
+ if (loglevel >= APLOG_TRACE6) {
+ char *reason = APR_BUCKET_IS_FLUSH(bucket) ?
+ "FLUSH bucket" :
+ (non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER) ?
+ "THRESHOLD_MAX_BUFFER" :
+ (!f->r && morphing_bucket_in_brigade) ? "morphing bucket" :
+ "MAX_REQUESTS_IN_PIPELINE";
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, f->c,
+ "will flush because of %s", reason);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, f->c,
+ "seen in brigade%s: bytes: %" APR_SIZE_T_FMT
+ ", non-file bytes: %" APR_SIZE_T_FMT ", eor "
+ "buckets: %d, morphing buckets: %d",
+ flush_upto == NULL ? " so far"
+ : " since last flush point",
+ bytes_in_brigade,
+ non_file_bytes_in_brigade,
+ eor_buckets_in_brigade,
+ morphing_bucket_in_brigade);
+ }
+ /*
+ * Defer the actual blocking write to avoid doing many writes.
+ */
+ *flush_upto = next;
+
+ bytes_in_brigade = 0;
+ non_file_bytes_in_brigade = 0;
+ eor_buckets_in_brigade = 0;
+ morphing_bucket_in_brigade = 0;
+ }
+ }
+
+ if (loglevel >= APLOG_TRACE8) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, f->c,
+ "brigade contains: bytes: %" APR_SIZE_T_FMT
+ ", non-file bytes: %" APR_SIZE_T_FMT
+ ", eor buckets: %d, morphing buckets: %d",
+ bytes_in_brigade, non_file_bytes_in_brigade,
+ eor_buckets_in_brigade, morphing_bucket_in_brigade);
+ }
+
+ return APR_SUCCESS;
+}
+
+AP_DECLARE(int) ap_filter_should_yield(ap_filter_t *f)
+{
+ /*
+ * This function decides whether a filter should yield due to buffered
+ * data in a downstream filter. If a downstream filter buffers we
+ * must back off so we don't overwhelm the server. If this function
+ * returns true, the filter should call ap_filter_setaside_brigade()
+ * to save unprocessed buckets, and then reinstate those buckets on
+ * the next call with ap_filter_reinstate_brigade() and continue
+ * where it left off.
+ *
+ * If this function is forced to return zero, we return back to
+ * synchronous filter behaviour.
+ *
+ * Subrequests present us with a problem - we don't know how much data
+ * they will produce and therefore how much buffering we'll need, and
+ * if a subrequest had to trigger buffering, but next subrequest wouldn't
+ * know when the previous one had finished sending data and buckets
+ * could be sent out of order.
+ *
+ * In the case of subrequests, deny the ability to yield. When the data
+ * reaches the filters from the main request, they will be setaside
+ * there in the right order and the request will be given the
+ * opportunity to yield.
+ */
+ if (f->r && f->r->main) {
+ return 0;
+ }
+
+ /*
+ * This is either a main request or internal redirect, or it is a
+ * connection filter. Yield if there is any buffered data downstream
+ * from us.
+ */
+ while (f) {
+ if (f->bb && !APR_BRIGADE_EMPTY(f->bb)) {
+ return 1;
+ }
+ f = f->next;
+ }
+ return 0;
+}
+
AP_DECLARE_NONSTD(apr_status_t) ap_filter_flush(apr_bucket_brigade *bb,
void *ctx)
{