diff options
author | Graham Leggett <minfrin@apache.org> | 2015-10-04 12:10:51 +0200 |
---|---|---|
committer | Graham Leggett <minfrin@apache.org> | 2015-10-04 12:10:51 +0200 |
commit | 615f97f93364fd7189ce973478266ce3d229d76b (patch) | |
tree | 84b9f0601b3a3ba6ecb0caf794378d7019f850e5 /server | |
parent | leave LoadModule of mod_http2 commented-out by default (diff) | |
download | apache2-615f97f93364fd7189ce973478266ce3d229d76b.tar.xz apache2-615f97f93364fd7189ce973478266ce3d229d76b.zip |
core: Extend support for asynchronous write completion from the
network filter to any connection or request filter.
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1706669 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'server')
-rw-r--r-- | server/core.c | 6 | ||||
-rw-r--r-- | server/core_filters.c | 206 | ||||
-rw-r--r-- | server/mpm/event/event.c | 35 | ||||
-rw-r--r-- | server/mpm/motorz/motorz.c | 37 | ||||
-rw-r--r-- | server/mpm/simple/simple_io.c | 35 | ||||
-rw-r--r-- | server/request.c | 58 | ||||
-rw-r--r-- | server/util_filter.c | 263 |
7 files changed, 431 insertions, 209 deletions
diff --git a/server/core.c b/server/core.c index 4d33a00f72..de3fa23f91 100644 --- a/server/core.c +++ b/server/core.c @@ -112,6 +112,7 @@ AP_IMPLEMENT_HOOK_RUN_FIRST(apr_status_t, insert_network_bucket, /* Handles for core filters */ AP_DECLARE_DATA ap_filter_rec_t *ap_subreq_core_filter_handle; +AP_DECLARE_DATA ap_filter_rec_t *ap_request_core_filter_handle; AP_DECLARE_DATA ap_filter_rec_t *ap_core_output_filter_handle; AP_DECLARE_DATA ap_filter_rec_t *ap_content_length_filter_handle; AP_DECLARE_DATA ap_filter_rec_t *ap_core_input_filter_handle; @@ -5007,6 +5008,8 @@ static conn_rec *core_create_conn(apr_pool_t *ptrans, server_rec *s, c->id = id; c->bucket_alloc = alloc; + c->empty = apr_brigade_create(c->pool, c->bucket_alloc); + c->filters = apr_hash_make(c->pool); c->clogging_input_filters = 0; @@ -5395,6 +5398,9 @@ static void register_hooks(apr_pool_t *p) ap_core_output_filter_handle = ap_register_output_filter("CORE", ap_core_output_filter, NULL, AP_FTYPE_NETWORK); + ap_request_core_filter_handle = + ap_register_output_filter("REQ_CORE", ap_request_core_filter, + NULL, AP_FTYPE_TRANSCODE); ap_subreq_core_filter_handle = ap_register_output_filter("SUBREQ_CORE", ap_sub_req_output_filter, NULL, AP_FTYPE_CONTENT_SET); diff --git a/server/core_filters.c b/server/core_filters.c index a6c2bd666b..0f530f868f 100644 --- a/server/core_filters.c +++ b/server/core_filters.c @@ -78,9 +78,7 @@ do { \ #define APLOG_MODULE_INDEX AP_CORE_MODULE_INDEX struct core_output_filter_ctx { - apr_bucket_brigade *buffered_bb; apr_bucket_brigade *tmp_flush_bb; - apr_pool_t *deferred_write_pool; apr_size_t bytes_written; }; @@ -328,11 +326,6 @@ apr_status_t ap_core_input_filter(ap_filter_t *f, apr_bucket_brigade *b, return APR_SUCCESS; } -static void setaside_remaining_output(ap_filter_t *f, - core_output_filter_ctx_t *ctx, - apr_bucket_brigade *bb, - conn_rec *c); - static apr_status_t send_brigade_nonblocking(apr_socket_t *s, apr_bucket_brigade *bb, apr_size_t *bytes_written, @@ -358,33 +351,23 @@ static apr_status_t sendfile_nonblocking(apr_socket_t *s, conn_rec *c); #endif -/* XXX: Should these be configurable parameters? */ -#define THRESHOLD_MIN_WRITE 4096 -#define THRESHOLD_MAX_BUFFER 65536 -#define MAX_REQUESTS_IN_PIPELINE 5 - /* Optional function coming from mod_logio, used for logging of output * traffic */ extern APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_out) *ap__logio_add_bytes_out; -apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb) +apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *bb) { conn_rec *c = f->c; core_net_rec *net = f->ctx; core_output_filter_ctx_t *ctx = net->out_ctx; - apr_bucket_brigade *bb = NULL; - apr_bucket *bucket, *next, *flush_upto = NULL; - apr_size_t bytes_in_brigade, non_file_bytes_in_brigade; - int eor_buckets_in_brigade, morphing_bucket_in_brigade; + apr_bucket *flush_upto = NULL; apr_status_t rv; int loglevel = ap_get_conn_module_loglevel(c, APLOG_MODULE_INDEX); /* Fail quickly if the connection has already been aborted. */ if (c->aborted) { - if (new_bb != NULL) { - apr_brigade_cleanup(new_bb); - } + apr_brigade_cleanup(bb); return APR_ECONNABORTED; } @@ -397,33 +380,14 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb) * allocated from bb->pool which might be wrong. */ ctx->tmp_flush_bb = apr_brigade_create(c->pool, c->bucket_alloc); - /* same for buffered_bb and ap_save_brigade */ - ctx->buffered_bb = apr_brigade_create(c->pool, c->bucket_alloc); - } - - if (new_bb != NULL) - bb = new_bb; - - if ((ctx->buffered_bb != NULL) && - !APR_BRIGADE_EMPTY(ctx->buffered_bb)) { - if (new_bb != NULL) { - APR_BRIGADE_PREPEND(bb, ctx->buffered_bb); - } - else { - bb = ctx->buffered_bb; - } - c->data_in_output_filters = 0; - } - else if (new_bb == NULL) { - return APR_SUCCESS; } /* Scan through the brigade and decide whether to attempt a write, * and how much to write, based on the following rules: * - * 1) The new_bb is null: Do a nonblocking write of as much as + * 1) The bb is empty: Do a nonblocking write of as much as * possible: do a nonblocking write of as much data as possible, - * then save the rest in ctx->buffered_bb. (If new_bb == NULL, + * then save the rest in ctx->buffered_bb. (If bb is empty, * it probably means that the MPM is doing asynchronous write * completion and has just determined that this connection * is writable.) @@ -459,91 +423,12 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb) * 3) Actually do the blocking write up to the last bucket determined * by rules 2a-d. The point of doing only one flush is to make as * few calls to writev() as possible. - * - * 4) If the brigade contains at least THRESHOLD_MIN_WRITE - * bytes: Do a nonblocking write of as much data as possible, - * then save the rest in ctx->buffered_bb. */ - if (new_bb == NULL) { - rv = send_brigade_nonblocking(net->client_socket, bb, - &(ctx->bytes_written), c); - if (rv != APR_SUCCESS && !APR_STATUS_IS_EAGAIN(rv)) { - /* The client has aborted the connection */ - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c, - "core_output_filter: writing data to the network"); - apr_brigade_cleanup(bb); - c->aborted = 1; - return rv; - } - setaside_remaining_output(f, ctx, bb, c); - return APR_SUCCESS; - } - - bytes_in_brigade = 0; - non_file_bytes_in_brigade = 0; - eor_buckets_in_brigade = 0; - morphing_bucket_in_brigade = 0; - - for (bucket = APR_BRIGADE_FIRST(bb); bucket != APR_BRIGADE_SENTINEL(bb); - bucket = next) { - next = APR_BUCKET_NEXT(bucket); - - if (!APR_BUCKET_IS_METADATA(bucket)) { - if (bucket->length == (apr_size_t)-1) { - /* - * A setaside of morphing buckets would read everything into - * memory. Instead, we will flush everything up to and - * including this bucket. - */ - morphing_bucket_in_brigade = 1; - } - else { - bytes_in_brigade += bucket->length; - if (!APR_BUCKET_IS_FILE(bucket)) - non_file_bytes_in_brigade += bucket->length; - } - } - else if (AP_BUCKET_IS_EOR(bucket)) { - eor_buckets_in_brigade++; - } - - if (APR_BUCKET_IS_FLUSH(bucket) - || non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER - || morphing_bucket_in_brigade - || eor_buckets_in_brigade > MAX_REQUESTS_IN_PIPELINE) { - /* this segment of the brigade MUST be sent before returning. */ - - if (loglevel >= APLOG_TRACE6) { - char *reason = APR_BUCKET_IS_FLUSH(bucket) ? - "FLUSH bucket" : - (non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER) ? - "THRESHOLD_MAX_BUFFER" : - morphing_bucket_in_brigade ? "morphing bucket" : - "MAX_REQUESTS_IN_PIPELINE"; - ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, c, - "will flush because of %s", reason); - ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, c, - "seen in brigade%s: bytes: %" APR_SIZE_T_FMT - ", non-file bytes: %" APR_SIZE_T_FMT ", eor " - "buckets: %d, morphing buckets: %d", - flush_upto == NULL ? " so far" - : " since last flush point", - bytes_in_brigade, - non_file_bytes_in_brigade, - eor_buckets_in_brigade, - morphing_bucket_in_brigade); - } - /* - * Defer the actual blocking write to avoid doing many writes. - */ - flush_upto = next; + ap_filter_reinstate_brigade(f, bb, &flush_upto); - bytes_in_brigade = 0; - non_file_bytes_in_brigade = 0; - eor_buckets_in_brigade = 0; - morphing_bucket_in_brigade = 0; - } + if (APR_BRIGADE_EMPTY(bb)) { + return APR_SUCCESS; } if (flush_upto != NULL) { @@ -571,71 +456,30 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb) APR_BRIGADE_CONCAT(bb, ctx->tmp_flush_bb); } + rv = send_brigade_nonblocking(net->client_socket, bb, &(ctx->bytes_written), + c); + if ((rv != APR_SUCCESS) && (!APR_STATUS_IS_EAGAIN(rv))) { + /* The client has aborted the connection */ + ap_log_cerror( + APLOG_MARK, APLOG_TRACE1, rv, c, + "core_output_filter: writing data to the network"); + apr_brigade_cleanup(bb); + c->aborted = 1; + return rv; + } if (loglevel >= APLOG_TRACE8) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, c, - "brigade contains: bytes: %" APR_SIZE_T_FMT - ", non-file bytes: %" APR_SIZE_T_FMT - ", eor buckets: %d, morphing buckets: %d", - bytes_in_brigade, non_file_bytes_in_brigade, - eor_buckets_in_brigade, morphing_bucket_in_brigade); + ap_log_cerror( + APLOG_MARK, APLOG_TRACE8, 0, c, + "tried nonblocking write, total bytes " + "written: %" APR_SIZE_T_FMT, ctx->bytes_written); } - if (bytes_in_brigade >= THRESHOLD_MIN_WRITE) { - rv = send_brigade_nonblocking(net->client_socket, bb, - &(ctx->bytes_written), c); - if ((rv != APR_SUCCESS) && (!APR_STATUS_IS_EAGAIN(rv))) { - /* The client has aborted the connection */ - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c, - "core_output_filter: writing data to the network"); - apr_brigade_cleanup(bb); - c->aborted = 1; - return rv; - } - if (loglevel >= APLOG_TRACE8) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, c, - "tried nonblocking write, total bytes " - "written: %" APR_SIZE_T_FMT, - ctx->bytes_written); - } - } + remove_empty_buckets(bb); + ap_filter_setaside_brigade(f, bb); - setaside_remaining_output(f, ctx, bb, c); return APR_SUCCESS; } -/* - * This function assumes that either ctx->buffered_bb == NULL, or - * ctx->buffered_bb is empty, or ctx->buffered_bb == bb - */ -static void setaside_remaining_output(ap_filter_t *f, - core_output_filter_ctx_t *ctx, - apr_bucket_brigade *bb, - conn_rec *c) -{ - if (bb == NULL) { - return; - } - remove_empty_buckets(bb); - if (!APR_BRIGADE_EMPTY(bb)) { - c->data_in_output_filters = 1; - if (bb != ctx->buffered_bb) { - if (!ctx->deferred_write_pool) { - apr_pool_create(&ctx->deferred_write_pool, c->pool); - apr_pool_tag(ctx->deferred_write_pool, "deferred_write"); - } - ap_save_brigade(f, &(ctx->buffered_bb), &bb, - ctx->deferred_write_pool); - } - } - else if (ctx->deferred_write_pool) { - /* - * There are no more requests in the pipeline. We can just clear the - * pool. - */ - apr_pool_clear(ctx->deferred_write_pool); - } -} - #ifndef APR_MAX_IOVEC_SIZE #define MAX_IOVEC_TO_WRITE 16 #else diff --git a/server/mpm/event/event.c b/server/mpm/event/event.c index ee0d8fe690..1cdc52c762 100644 --- a/server/mpm/event/event.c +++ b/server/mpm/event/event.c @@ -1146,19 +1146,38 @@ read_request: } if (cs->pub.state == CONN_STATE_WRITE_COMPLETION) { - ap_filter_t *output_filter = c->output_filters; - apr_status_t rv; + apr_hash_index_t *rindex; + apr_status_t rv = APR_SUCCESS; + int data_in_output_filters = 0; ap_update_child_status_from_conn(sbh, SERVER_BUSY_WRITE, c); - while (output_filter->next != NULL) { - output_filter = output_filter->next; + + rindex = apr_hash_first(NULL, c->filters); + while (rindex) { + ap_filter_t *f = apr_hash_this_val(rindex); + + if (!APR_BRIGADE_EMPTY(f->bb)) { + + rv = ap_pass_brigade(f, c->empty); + apr_brigade_cleanup(c->empty); + if (APR_SUCCESS != rv) { + ap_log_cerror( + APLOG_MARK, APLOG_DEBUG, rv, c, APLOGNO(00470) + "write failure in '%s' output filter", f->frec->name); + break; + } + + if (ap_filter_should_yield(f)) { + data_in_output_filters = 1; + } + } + + rindex = apr_hash_next(rindex); } - rv = output_filter->frec->filter_func.out_func(output_filter, NULL); + if (rv != APR_SUCCESS) { - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, rv, c, APLOGNO(00470) - "network write failure in core output filter"); cs->pub.state = CONN_STATE_LINGER; } - else if (c->data_in_output_filters) { + else if (data_in_output_filters) { /* Still in WRITE_COMPLETION_STATE: * Set a write timeout for this connection, and let the * event thread poll for writeability. diff --git a/server/mpm/motorz/motorz.c b/server/mpm/motorz/motorz.c index f359dbafb9..a10ead0f59 100644 --- a/server/mpm/motorz/motorz.c +++ b/server/mpm/motorz/motorz.c @@ -359,21 +359,38 @@ static apr_status_t motorz_io_process(motorz_conn_t *scon) } if (scon->cs.state == CONN_STATE_WRITE_COMPLETION) { - ap_filter_t *output_filter = c->output_filters; - ap_update_child_status_from_conn(scon->sbh, SERVER_BUSY_WRITE, c); - while (output_filter->next != NULL) { - output_filter = output_filter->next; - } + apr_hash_index_t *rindex; + apr_status_t rv = APR_SUCCESS; + int data_in_output_filters = 0; + ap_update_child_status_from_conn(sbh, SERVER_BUSY_WRITE, c); + + rindex = apr_hash_first(NULL, c->filters); + while (rindex) { + ap_filter_t *f = apr_hash_this_val(rindex); + + if (!APR_BRIGADE_EMPTY(f->bb)) { + + rv = ap_pass_brigade(f, c->empty); + apr_brigade_cleanup(c->empty); + if (APR_SUCCESS != rv) { + ap_log_cerror( + APLOG_MARK, APLOG_DEBUG, rv, c, APLOGNO(02848) + "write failure in '%s' output filter", f->frec->name); + break; + } + + if (ap_filter_should_yield(f)) { + data_in_output_filters = 1; + } + } - rv = output_filter->frec->filter_func.out_func(output_filter, - NULL); + rindex = apr_hash_next(rindex); + } if (rv != APR_SUCCESS) { - ap_log_error(APLOG_MARK, APLOG_WARNING, rv, ap_server_conf, APLOGNO(02848) - "network write failure in core output filter"); scon->cs.state = CONN_STATE_LINGER; } - else if (c->data_in_output_filters) { + else if (data_in_output_filters) { /* Still in WRITE_COMPLETION_STATE: * Set a write timeout for this connection, and let the * event thread poll for writeability. diff --git a/server/mpm/simple/simple_io.c b/server/mpm/simple/simple_io.c index b14aae474f..47c13d71c6 100644 --- a/server/mpm/simple/simple_io.c +++ b/server/mpm/simple/simple_io.c @@ -92,20 +92,37 @@ static apr_status_t simple_io_process(simple_conn_t * scon) } if (scon->cs.state == CONN_STATE_WRITE_COMPLETION) { - ap_filter_t *output_filter = c->output_filters; - while (output_filter->next != NULL) { - output_filter = output_filter->next; - } + apr_hash_index_t *rindex; + apr_status_t rv = APR_SUCCESS; + int data_in_output_filters = 0; + + rindex = apr_hash_first(NULL, c->filters); + while (rindex) { + ap_filter_t *f = apr_hash_this_val(rindex); + + if (!APR_BRIGADE_EMPTY(f->bb)) { + + rv = ap_pass_brigade(f, c->empty); + apr_brigade_cleanup(c->empty); + if (APR_SUCCESS != rv) { + ap_log_cerror( + APLOG_MARK, APLOG_DEBUG, rv, c, APLOGNO(00249) + "write failure in '%s' output filter", f->frec->name); + break; + } + + if (ap_filter_should_yield(f)) { + data_in_output_filters = 1; + } + } - rv = output_filter->frec->filter_func.out_func(output_filter, - NULL); + rindex = apr_hash_next(rindex); + } if (rv != APR_SUCCESS) { - ap_log_error(APLOG_MARK, APLOG_WARNING, rv, ap_server_conf, APLOGNO(00249) - "network write failure in core output filter"); scon->cs.state = CONN_STATE_LINGER; } - else if (c->data_in_output_filters) { + else if (data_in_output_filters) { /* Still in WRITE_COMPLETION_STATE: * Set a write timeout for this connection, and let the * event thread poll for writeability. diff --git a/server/request.c b/server/request.c index 67e535d14c..9c9ad9f93b 100644 --- a/server/request.c +++ b/server/request.c @@ -2036,6 +2036,64 @@ AP_CORE_DECLARE_NONSTD(apr_status_t) ap_sub_req_output_filter(ap_filter_t *f, return APR_SUCCESS; } +AP_CORE_DECLARE_NONSTD(apr_status_t) ap_request_core_filter(ap_filter_t *f, + apr_bucket_brigade *bb) +{ + apr_bucket *flush_upto = NULL; + apr_status_t status = APR_SUCCESS; + apr_bucket_brigade *tmp_bb = f->ctx; + + if (!tmp_bb) { + tmp_bb = f->ctx = apr_brigade_create(f->r->pool, f->c->bucket_alloc); + } + + /* Reinstate any buffered content */ + ap_filter_reinstate_brigade(f, bb, &flush_upto); + + while (!APR_BRIGADE_EMPTY(bb)) { + apr_bucket *bucket = APR_BRIGADE_FIRST(bb); + + /* if the core has set aside data, back off and try later */ + if (!flush_upto) { + if (ap_filter_should_yield(f)) { + break; + } + } + else if (flush_upto == bucket) { + flush_upto = NULL; + } + + /* have we found a morphing bucket? if so, force it to morph into something + * safe to pass down to the connection filters without needing to be set + * aside. + */ + if (!APR_BUCKET_IS_METADATA(bucket)) { + if (bucket->length == (apr_size_t) - 1) { + const char *data; + apr_size_t size; + if (APR_SUCCESS + != (status = apr_bucket_read(bucket, &data, &size, + APR_BLOCK_READ))) { + return status; + } + } + } + + /* pass each bucket down the chain */ + APR_BUCKET_REMOVE(bucket); + APR_BRIGADE_INSERT_TAIL(tmp_bb, bucket); + + status = ap_pass_brigade(f->next, tmp_bb); + if (!APR_STATUS_IS_EOF(status) && (status != APR_SUCCESS)) { + return status; + } + + } + + ap_filter_setaside_brigade(f, bb); + return status; +} + extern APR_OPTIONAL_FN_TYPE(authz_some_auth_required) *ap__authz_ap_some_auth_required; AP_DECLARE(int) ap_some_auth_required(request_rec *r) diff --git a/server/util_filter.c b/server/util_filter.c index 01eb533520..ad14112ddb 100644 --- a/server/util_filter.c +++ b/server/util_filter.c @@ -24,6 +24,7 @@ #include "http_config.h" #include "http_core.h" #include "http_log.h" +#include "http_request.h" #include "util_filter.h" /* NOTE: Apache's current design doesn't allow a pool to be passed thru, @@ -32,6 +33,10 @@ #define FILTER_POOL apr_hook_global_pool #include "ap_hooks.h" /* for apr_hook_global_pool */ +/* XXX: Should these be configurable parameters? */ +#define THRESHOLD_MAX_BUFFER 65536 +#define MAX_REQUESTS_IN_PIPELINE 5 + /* ** This macro returns true/false if a given filter should be inserted BEFORE ** another filter. This will happen when one of: 1) there isn't another @@ -319,6 +324,8 @@ static ap_filter_t *add_any_filter_handle(ap_filter_rec_t *frec, void *ctx, f->r = frec->ftype < AP_FTYPE_CONNECTION ? r : NULL; f->c = c; f->next = NULL; + f->bb = NULL; + f->deferred_pool = NULL; if (INSERT_BEFORE(f, *outf)) { f->next = *outf; @@ -474,6 +481,16 @@ AP_DECLARE(void) ap_remove_input_filter(ap_filter_t *f) AP_DECLARE(void) ap_remove_output_filter(ap_filter_t *f) { + + if ((f->bb) && !APR_BRIGADE_EMPTY(f->bb)) { + apr_brigade_cleanup(f->bb); + } + + if (f->deferred_pool) { + apr_pool_destroy(f->deferred_pool); + f->deferred_pool = NULL; + } + remove_any_filter(f, f->r ? &f->r->output_filters : NULL, f->r ? &f->r->proto_output_filters : NULL, &f->c->output_filters); @@ -566,6 +583,7 @@ AP_DECLARE(apr_status_t) ap_pass_brigade(ap_filter_t *next, { if (next) { apr_bucket *e; + if ((e = APR_BRIGADE_LAST(bb)) && APR_BUCKET_IS_EOS(e) && next->r) { /* This is only safe because HTTP_HEADER filter is always in * the filter stack. This ensures that there is ALWAYS a @@ -635,7 +653,8 @@ AP_DECLARE(apr_status_t) ap_save_brigade(ap_filter_t *f, apr_status_t rv, srv = APR_SUCCESS; /* If have never stored any data in the filter, then we had better - * create an empty bucket brigade so that we can concat. + * create an empty bucket brigade so that we can concat. Register + * a cleanup to zero out the pointer if the pool is cleared. */ if (!(*saveto)) { *saveto = apr_brigade_create(p, f->c->bucket_alloc); @@ -673,6 +692,248 @@ AP_DECLARE(apr_status_t) ap_save_brigade(ap_filter_t *f, return srv; } +static apr_status_t filters_cleanup(void *data) +{ + ap_filter_t **key = data; + + apr_hash_set((*key)->c->filters, key, sizeof(ap_filter_t **), NULL); + + return APR_SUCCESS; +} + +AP_DECLARE(apr_status_t) ap_filter_setaside_brigade(ap_filter_t *f, + apr_bucket_brigade *bb) +{ + int loglevel = ap_get_conn_module_loglevel(f->c, APLOG_MODULE_INDEX); + + if (loglevel >= APLOG_TRACE6) { + ap_log_cerror( + APLOG_MARK, APLOG_TRACE6, 0, f->c, + "setaside %s brigade to %s brigade in '%s' output filter", + (APR_BRIGADE_EMPTY(bb) ? "empty" : "full"), + (!f->bb || APR_BRIGADE_EMPTY(f->bb) ? "empty" : "full"), f->frec->name); + } + + if (!APR_BRIGADE_EMPTY(bb)) { + apr_pool_t *pool; + /* + * Set aside the brigade bb within f->bb. + */ + if (!f->bb) { + ap_filter_t **key; + + pool = f->r ? f->r->pool : f->c->pool; + + key = apr_palloc(pool, sizeof(ap_filter_t **)); + *key = f; + apr_hash_set(f->c->filters, key, sizeof(ap_filter_t **), f); + + f->bb = apr_brigade_create(pool, f->c->bucket_alloc); + + apr_pool_pre_cleanup_register(pool, key, filters_cleanup); + + } + + /* decide what pool we setaside to, request pool or deferred pool? */ + if (f->r) { + pool = f->r->pool; + APR_BRIGADE_CONCAT(f->bb, bb); + } + else { + if (!f->deferred_pool) { + apr_pool_create(&f->deferred_pool, f->c->pool); + apr_pool_tag(f->deferred_pool, "deferred_pool"); + } + pool = f->deferred_pool; + return ap_save_brigade(f, &f->bb, &bb, pool); + } + + } + else if (f->deferred_pool) { + /* + * There are no more requests in the pipeline. We can just clear the + * pool. + */ + apr_brigade_cleanup(f->bb); + apr_pool_clear(f->deferred_pool); + } + return APR_SUCCESS; +} + +AP_DECLARE(apr_status_t) ap_filter_reinstate_brigade(ap_filter_t *f, + apr_bucket_brigade *bb, + apr_bucket **flush_upto) +{ + apr_bucket *bucket, *next; + apr_size_t bytes_in_brigade, non_file_bytes_in_brigade; + int eor_buckets_in_brigade, morphing_bucket_in_brigade; + int loglevel = ap_get_conn_module_loglevel(f->c, APLOG_MODULE_INDEX); + + if (loglevel >= APLOG_TRACE6) { + ap_log_cerror( + APLOG_MARK, APLOG_TRACE6, 0, f->c, + "reinstate %s brigade to %s brigade in '%s' output filter", + (!f->bb || APR_BRIGADE_EMPTY(f->bb) ? "empty" : "full"), + (APR_BRIGADE_EMPTY(bb) ? "empty" : "full"), f->frec->name); + } + + if (f->bb && !APR_BRIGADE_EMPTY(f->bb)) { + APR_BRIGADE_PREPEND(bb, f->bb); + } + + /* + * Determine if and up to which bucket we need to do a blocking write: + * + * a) The brigade contains a flush bucket: Do a blocking write + * of everything up that point. + * + * b) The request is in CONN_STATE_HANDLER state, and the brigade + * contains at least THRESHOLD_MAX_BUFFER bytes in non-file + * buckets: Do blocking writes until the amount of data in the + * buffer is less than THRESHOLD_MAX_BUFFER. (The point of this + * rule is to provide flow control, in case a handler is + * streaming out lots of data faster than the data can be + * sent to the client.) + * + * c) The request is in CONN_STATE_HANDLER state, and the brigade + * contains at least MAX_REQUESTS_IN_PIPELINE EOR buckets: + * Do blocking writes until less than MAX_REQUESTS_IN_PIPELINE EOR + * buckets are left. (The point of this rule is to prevent too many + * FDs being kept open by pipelined requests, possibly allowing a + * DoS). + * + * d) The request is being served by a connection filter and the + * brigade contains a morphing bucket: If there was no other + * reason to do a blocking write yet, try reading the bucket. If its + * contents fit into memory before THRESHOLD_MAX_BUFFER is reached, + * everything is fine. Otherwise we need to do a blocking write the + * up to and including the morphing bucket, because ap_save_brigade() + * would read the whole bucket into memory later on. + */ + + *flush_upto = NULL; + + bytes_in_brigade = 0; + non_file_bytes_in_brigade = 0; + eor_buckets_in_brigade = 0; + morphing_bucket_in_brigade = 0; + + for (bucket = APR_BRIGADE_FIRST(bb); bucket != APR_BRIGADE_SENTINEL(bb); + bucket = next) { + next = APR_BUCKET_NEXT(bucket); + + if (!APR_BUCKET_IS_METADATA(bucket)) { + if (bucket->length == (apr_size_t)-1) { + /* + * A setaside of morphing buckets would read everything into + * memory. Instead, we will flush everything up to and + * including this bucket. + */ + morphing_bucket_in_brigade = 1; + } + else { + bytes_in_brigade += bucket->length; + if (!APR_BUCKET_IS_FILE(bucket)) + non_file_bytes_in_brigade += bucket->length; + } + } + else if (AP_BUCKET_IS_EOR(bucket)) { + eor_buckets_in_brigade++; + } + + if (APR_BUCKET_IS_FLUSH(bucket) + || non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER + || (!f->r && morphing_bucket_in_brigade) + || eor_buckets_in_brigade > MAX_REQUESTS_IN_PIPELINE) { + /* this segment of the brigade MUST be sent before returning. */ + + if (loglevel >= APLOG_TRACE6) { + char *reason = APR_BUCKET_IS_FLUSH(bucket) ? + "FLUSH bucket" : + (non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER) ? + "THRESHOLD_MAX_BUFFER" : + (!f->r && morphing_bucket_in_brigade) ? "morphing bucket" : + "MAX_REQUESTS_IN_PIPELINE"; + ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, f->c, + "will flush because of %s", reason); + ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, f->c, + "seen in brigade%s: bytes: %" APR_SIZE_T_FMT + ", non-file bytes: %" APR_SIZE_T_FMT ", eor " + "buckets: %d, morphing buckets: %d", + flush_upto == NULL ? " so far" + : " since last flush point", + bytes_in_brigade, + non_file_bytes_in_brigade, + eor_buckets_in_brigade, + morphing_bucket_in_brigade); + } + /* + * Defer the actual blocking write to avoid doing many writes. + */ + *flush_upto = next; + + bytes_in_brigade = 0; + non_file_bytes_in_brigade = 0; + eor_buckets_in_brigade = 0; + morphing_bucket_in_brigade = 0; + } + } + + if (loglevel >= APLOG_TRACE8) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, f->c, + "brigade contains: bytes: %" APR_SIZE_T_FMT + ", non-file bytes: %" APR_SIZE_T_FMT + ", eor buckets: %d, morphing buckets: %d", + bytes_in_brigade, non_file_bytes_in_brigade, + eor_buckets_in_brigade, morphing_bucket_in_brigade); + } + + return APR_SUCCESS; +} + +AP_DECLARE(int) ap_filter_should_yield(ap_filter_t *f) +{ + /* + * This function decides whether a filter should yield due to buffered + * data in a downstream filter. If a downstream filter buffers we + * must back off so we don't overwhelm the server. If this function + * returns true, the filter should call ap_filter_setaside_brigade() + * to save unprocessed buckets, and then reinstate those buckets on + * the next call with ap_filter_reinstate_brigade() and continue + * where it left off. + * + * If this function is forced to return zero, we return back to + * synchronous filter behaviour. + * + * Subrequests present us with a problem - we don't know how much data + * they will produce and therefore how much buffering we'll need, and + * if a subrequest had to trigger buffering, but next subrequest wouldn't + * know when the previous one had finished sending data and buckets + * could be sent out of order. + * + * In the case of subrequests, deny the ability to yield. When the data + * reaches the filters from the main request, they will be setaside + * there in the right order and the request will be given the + * opportunity to yield. + */ + if (f->r && f->r->main) { + return 0; + } + + /* + * This is either a main request or internal redirect, or it is a + * connection filter. Yield if there is any buffered data downstream + * from us. + */ + while (f) { + if (f->bb && !APR_BRIGADE_EMPTY(f->bb)) { + return 1; + } + f = f->next; + } + return 0; +} + AP_DECLARE_NONSTD(apr_status_t) ap_filter_flush(apr_bucket_brigade *bb, void *ctx) { |