diff options
author | Paul Querna <pquerna@apache.org> | 2008-09-20 13:58:08 +0200 |
---|---|---|
committer | Paul Querna <pquerna@apache.org> | 2008-09-20 13:58:08 +0200 |
commit | 767cc30c029cabba2796303f705bf6e8595083a5 (patch) | |
tree | 1c06e44258316a618c2a481e33848ba7a9fc13d0 | |
parent | Cleanup the explanation. (diff) | |
download | apache2-767cc30c029cabba2796303f705bf6e8595083a5.tar.xz apache2-767cc30c029cabba2796303f705bf6e8595083a5.zip |
Introduce Suspendable Requests to the Event MPM.
Using this basic framework, you can return SUSPENDED from an HTTP Handler,
and then register a callback that is invoked by the MPM at a later time.
This initial version only supports _timers_ as callbacks, but in the future I
would like to add things like wait for socket activity, on a socket specified by
the handler.
Once in a callback, It is then the responsibility of the callback fucntion
to finish the HTTP Request handling, but this alows you to do cool things like
a fully async proxy, COMET support, or even rate limiting.
To prove I'm not insane, I've inlcuded an example module, mod_dialup.
You can configure it like this:
<Location "/docs">
ModemStandard "V.32"
</Location>
And for static files inside that path, you will be rate limited to V.32 speeds,
aka 9.6 kilobits/second.
Does anyone besides Rüdiger read commit emails :-) ?
I know there are likely huge problems with this, but I would like to see how far
we can push the Event MPM, figure out what to do better, if there is anything,
and then really dive into the 3.0 development before ApacheCon.
* server/mpm/experimental/event/fdqueue.h:
(timer_event_t): New structure to hold timer events and callback functions.
* server/mpm/experimental/event/fdqueue.c
(ap_queue_empty): Modify to also look at Timer Ring.
(ap_queue_init): Initialize Timer Ring.
(ap_queue_push_timer): New function, pushes a timer event into the queue.
(ap_queue_pop_something): Renamed function, returns a timer event or
a socket/pool for a worker thread to run.
* server/mpm/experimental/event/event.c
(process_socket): If the connection is in SUSPENDED state, don't force it
into linger mode yet, the callback will have to take care of that.
(push_timer2worker): New shortcut function, pushes timer event into queue
for a worker to run.
(timer_free_ring): New global data structure to recycle memory used by
timer events.
(timer_ring): New global data structure to hold active timer events.
(g_timer_ring_mtx): Thread mutex to protect timer event data structures.
(ap_mpm_register_timed_callback): New Function, registers a callback to be
invoked by the MPM at a later time.
(listener_thread): Calculate our wakeup time based on the upcoming Event
Queue, and after pollset_poll runs, push any Timers that have passed
onto worker threads to run.
(worker_thread): Call new queue pop method, and if the Timer Event is
non-null, invoke the callback. Once the callback is done, push the
structure onto the timer_free_ring, to be recycled.
(child_main): Initialize new mutex and ring structures.
* server/config.c
(ap_invoke_handler): Allow SUSPENDED aa valid return code from handlers.
* modules/http/http_core.c
(ap_process_http_async_connection): Don't close the connection when in
SUSPENDED state.
* modules/http/http_request.c
(ap_process_request_after_handler): New function, body pulled from the old,
ap_process_async_request. Split to let handlers invoke this so they
don't need to know all of the details of finishing a request.
(ap_process_async_request): If the handler returns SUSPENDED, don't do
anything but return.
* include/ap_mmn.h: Bump MMN.
* include/ap_mpm.h
(ap_mpm_register_timed_callback): New function.
* include/httpd.h:
(SUSPENDED): New return code for handlers.
(request_rec::invoke_mtx): New mutex to protect callback invokcations
from being run before the original handler finishes running.
(conn_state_e): Add a suspended state.
* include/http_request.h
(ap_process_request_after_handler): New function to make it easier for
handlers to finish the HTTP Request.
* modules/test/config.m4: Add mod_dialup to build.
* modules/test/mod_dialup.c: New rate limiting module, requires the Event MPM
to work.
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@697357 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | include/ap_mmn.h | 3 | ||||
-rw-r--r-- | include/ap_mpm.h | 8 | ||||
-rw-r--r-- | include/http_request.h | 5 | ||||
-rw-r--r-- | include/httpd.h | 5 | ||||
-rw-r--r-- | modules/http/http_core.c | 3 | ||||
-rw-r--r-- | modules/http/http_request.c | 65 | ||||
-rw-r--r-- | modules/test/config.m4 | 2 | ||||
-rw-r--r-- | modules/test/mod_dialup.c | 308 | ||||
-rw-r--r-- | server/config.c | 2 | ||||
-rw-r--r-- | server/mpm/experimental/event/event.c | 155 | ||||
-rw-r--r-- | server/mpm/experimental/event/fdqueue.c | 52 | ||||
-rw-r--r-- | server/mpm/experimental/event/fdqueue.h | 19 |
12 files changed, 573 insertions, 54 deletions
diff --git a/include/ap_mmn.h b/include/ap_mmn.h index 713713df85..c085299fa9 100644 --- a/include/ap_mmn.h +++ b/include/ap_mmn.h @@ -168,13 +168,14 @@ * 20080722.2 (2.3.0-dev) Add scolonsep to proxy_balancer * 20080829.0 (2.3.0-dev) Add cookie attributes when removing cookies * 20080830.0 (2.3.0-dev) Cookies can be set on headers_out and err_headers_out + * 20080920.0 (2.3.0-dev) Add ap_mpm_register_timed_callback. * */ #define MODULE_MAGIC_COOKIE 0x41503234UL /* "AP24" */ #ifndef MODULE_MAGIC_NUMBER_MAJOR -#define MODULE_MAGIC_NUMBER_MAJOR 20080830 +#define MODULE_MAGIC_NUMBER_MAJOR 20080920 #endif #define MODULE_MAGIC_NUMBER_MINOR 0 /* 0...n */ diff --git a/include/ap_mpm.h b/include/ap_mpm.h index eb3ef51663..787ceeca96 100644 --- a/include/ap_mpm.h +++ b/include/ap_mpm.h @@ -152,6 +152,14 @@ AP_DECLARE(apr_status_t) ap_os_create_privileged_process( */ AP_DECLARE(apr_status_t) ap_mpm_query(int query_code, int *result); + +typedef void (ap_mpm_callback_fn_t)(void *baton); + +/* XXXXXXX: only added support in the Event MPM.... */ +AP_DECLARE(void) ap_mpm_register_timed_callback(apr_time_t t, + ap_mpm_callback_fn_t *cbfn, + void *baton); + /* Defining GPROF when compiling uses the moncontrol() function to * disable gprof profiling in the parent, and enable it only for * request processing in children (or in one_process mode). It's diff --git a/include/http_request.h b/include/http_request.h index cda1b955d5..de87d99742 100644 --- a/include/http_request.h +++ b/include/http_request.h @@ -315,7 +315,10 @@ AP_DECLARE(void) ap_allow_standard_methods(request_rec *r, int reset, ...); */ void ap_process_request(request_rec *); -/** +/* For post-processing after a handler has finished with a request. (Commonly used after it was suspended) */ +void ap_process_request_after_handler(request_rec *r); + + /** * Process a top-level request from a client, allowing some or all of * the response to remain buffered in the core output filter for later, * asynchronous write completion diff --git a/include/httpd.h b/include/httpd.h index 279713e336..2be3bf7361 100644 --- a/include/httpd.h +++ b/include/httpd.h @@ -457,6 +457,8 @@ AP_DECLARE(const char *) ap_get_server_built(void); #define DONE -2 /**< Module has served the response completely * - it's safe to die() with no more output */ +#define SUSPENDED -3 /**< Module will handle the remainder of the request. + * The core will never invoke the request again, */ #define OK 0 /**< Module has handled this stage. */ @@ -989,6 +991,8 @@ struct request_rec { /** The optional kept body of the request. */ apr_bucket_brigade *kept_body; + apr_thread_mutex_t *invoke_mtx; + /* Things placed at the end of the record to avoid breaking binary * compatibility. It would be nice to remember to reorder the entire * record to improve 64bit alignment the next time we need to break @@ -1105,6 +1109,7 @@ typedef enum { CONN_STATE_READ_REQUEST_LINE, CONN_STATE_HANDLER, CONN_STATE_WRITE_COMPLETION, + CONN_STATE_SUSPENDED, CONN_STATE_LINGER } conn_state_e; diff --git a/modules/http/http_core.c b/modules/http/http_core.c index 0fd6b758dd..fdf86c0644 100644 --- a/modules/http/http_core.c +++ b/modules/http/http_core.c @@ -154,7 +154,8 @@ static int ap_process_http_async_connection(conn_rec *c) r = NULL; } - if (cs->state != CONN_STATE_WRITE_COMPLETION) { + if (cs->state != CONN_STATE_WRITE_COMPLETION && + cs->state != CONN_STATE_SUSPENDED) { /* Something went wrong; close the connection */ cs->state = CONN_STATE_LINGER; } diff --git a/modules/http/http_request.c b/modules/http/http_request.c index 3b17c76e54..5bac240da2 100644 --- a/modules/http/http_request.c +++ b/modules/http/http_request.c @@ -213,13 +213,39 @@ static void check_pipeline(conn_rec *c) } -void ap_process_async_request(request_rec *r) +void ap_process_request_after_handler(request_rec *r) { - int access_status; apr_bucket_brigade *bb; apr_bucket *b; conn_rec *c = r->connection; + /* Send an EOR bucket through the output filter chain. When + * this bucket is destroyed, the request will be logged and + * its pool will be freed + */ + bb = apr_brigade_create(r->connection->pool, r->connection->bucket_alloc); + b = ap_bucket_eor_create(r->connection->bucket_alloc, r); + APR_BRIGADE_INSERT_HEAD(bb, b); + + ap_pass_brigade(r->connection->output_filters, bb); + + /* From here onward, it is no longer safe to reference r + * or r->pool, because r->pool may have been destroyed + * already by the EOR bucket's cleanup function. + */ + + c->cs->state = CONN_STATE_WRITE_COMPLETION; + check_pipeline(c); + if (ap_extended_status) { + ap_time_process_request(c->sbh, STOP_PREQUEST); + } +} + +void ap_process_async_request(request_rec *r) +{ + conn_rec *c = r->connection; + int access_status; + /* Give quick handlers a shot at serving the request on the fast * path, bypassing all of the other Apache hooks. * @@ -234,8 +260,12 @@ void ap_process_async_request(request_rec *r) * Use this hook with extreme care and only if you know what you are * doing. */ - if (ap_extended_status) + if (ap_extended_status) { ap_time_process_request(r->connection->sbh, START_PREQUEST); + } + + apr_thread_mutex_create(&r->invoke_mtx, APR_THREAD_MUTEX_DEFAULT, r->pool); + apr_thread_mutex_lock(r->invoke_mtx); access_status = ap_run_quick_handler(r, 0); /* Not a look-up request */ if (access_status == DECLINED) { access_status = ap_process_request_internal(r); @@ -244,6 +274,16 @@ void ap_process_async_request(request_rec *r) } } + if (access_status == SUSPENDED) { + if (ap_extended_status) { + ap_time_process_request(c->sbh, STOP_PREQUEST); + } + c->cs->state = CONN_STATE_SUSPENDED; + apr_thread_mutex_unlock(r->invoke_mtx); + return; + } + apr_thread_mutex_unlock(r->invoke_mtx); + if (access_status == DONE) { /* e.g., something not in storage like TRACE */ access_status = OK; @@ -257,24 +297,7 @@ void ap_process_async_request(request_rec *r) ap_die(access_status, r); } - /* Send an EOR bucket through the output filter chain. When - * this bucket is destroyed, the request will be logged and - * its pool will be freed - */ - bb = apr_brigade_create(r->connection->pool, r->connection->bucket_alloc); - b = ap_bucket_eor_create(r->connection->bucket_alloc, r); - APR_BRIGADE_INSERT_HEAD(bb, b); - ap_pass_brigade(r->connection->output_filters, bb); - - /* From here onward, it is no longer safe to reference r - * or r->pool, because r->pool may have been destroyed - * already by the EOR bucket's cleanup function. - */ - - c->cs->state = CONN_STATE_WRITE_COMPLETION; - check_pipeline(c); - if (ap_extended_status) - ap_time_process_request(c->sbh, STOP_PREQUEST); + return ap_process_request_after_handler(r); } void ap_process_request(request_rec *r) diff --git a/modules/test/config.m4 b/modules/test/config.m4 index 01bc0fa971..9c150f488f 100644 --- a/modules/test/config.m4 +++ b/modules/test/config.m4 @@ -6,4 +6,6 @@ APACHE_MODULE(optional_hook_import, example optional hook importer, , , no) APACHE_MODULE(optional_fn_import, example optional function importer, , , no) APACHE_MODULE(optional_fn_export, example optional function exporter, , , no) +APACHE_MODULE(dialup, rate limits static files to dialup modem speeds, , , no) + APACHE_MODPATH_FINISH diff --git a/modules/test/mod_dialup.c b/modules/test/mod_dialup.c new file mode 100644 index 0000000000..349cd2c6e6 --- /dev/null +++ b/modules/test/mod_dialup.c @@ -0,0 +1,308 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +#include "httpd.h" +#include "util_filter.h" +#include "http_log.h" +#include "http_config.h" +#include "http_request.h" + +/* to detect sendfile enabled, we need CORE_PRIVATE. Someone should fix this. */ +#define CORE_PRIVATE +#include "http_core.h" + + +module AP_MODULE_DECLARE_DATA dialup_module; + +#ifndef apr_time_from_msec +#define apr_time_from_msec(x) (x * 1000) +#endif + + +typedef struct dialup_dcfg_t { + apr_size_t bytes_per_second; +} dialup_dcfg_t; + +typedef struct dialup_baton_t { + apr_size_t bytes_per_second; + request_rec *r; + apr_file_t *fd; + apr_bucket_brigade *bb; + apr_bucket_brigade *tmpbb; +} dialup_baton_t; + +static int +dialup_send_pulse(dialup_baton_t *db) +{ + int status; + apr_off_t len = 0; + apr_size_t bytes_sent = 0; + + while (!APR_BRIGADE_EMPTY(db->bb) && bytes_sent < db->bytes_per_second) { + apr_bucket *e; + + if (db->r->connection->aborted) { + return HTTP_INTERNAL_SERVER_ERROR; + } + + status = apr_brigade_partition(db->bb, db->bytes_per_second, &e); + + if (status != APR_SUCCESS && status != APR_INCOMPLETE) { + /* XXXXXX: Log me. */ + return HTTP_INTERNAL_SERVER_ERROR; + } + + if (e != APR_BRIGADE_SENTINEL(db->bb)) { + apr_bucket *f; + apr_bucket *b = APR_BUCKET_PREV(e); + f = APR_RING_FIRST(&db->bb->list); + APR_RING_UNSPLICE(f, b, link); + APR_RING_SPLICE_HEAD(&db->tmpbb->list, f, b, apr_bucket, link); + } + else { + APR_BRIGADE_CONCAT(db->tmpbb, db->bb); + } + + e = apr_bucket_flush_create(db->r->connection->bucket_alloc); + + APR_BRIGADE_INSERT_TAIL(db->tmpbb, e); + + apr_brigade_length(db->tmpbb, 1, &len); + bytes_sent += len; + status = ap_pass_brigade(db->r->output_filters, db->tmpbb); + + apr_brigade_cleanup(db->tmpbb); + + if (status != OK) { + ap_log_rerror(APLOG_MARK, APLOG_ERR, status, db->r, + "dialup: pulse: ap_pass_brigade failed:"); + return status; + } + } + + if (APR_BRIGADE_EMPTY(db->bb)) { + return DONE; + } + else { + return SUSPENDED; + } +} + +void +dialup_callback(void *baton) +{ + int status; + dialup_baton_t *db = (dialup_baton_t *)baton; + + apr_thread_mutex_lock(db->r->invoke_mtx); + + status = dialup_send_pulse(db); + + if (status == SUSPENDED) { + ap_mpm_register_timed_callback(apr_time_from_sec(1), dialup_callback, baton); + } + else if (status == DONE) { + apr_thread_mutex_unlock(db->r->invoke_mtx); + ap_finalize_request_protocol(db->r); + ap_process_request_after_handler(db->r); + return; + } + else { + ap_log_rerror(APLOG_MARK, APLOG_ERR, status, db->r, + "dialup: pulse returned: %d", status); + db->r->status = HTTP_OK; + ap_die(status, db->r); + } + + apr_thread_mutex_unlock(db->r->invoke_mtx); +} + +static int +dialup_handler(request_rec *r) +{ + int status; + apr_status_t rv; + + /* See core.c, default handler for all of the cases we just decline. */ + if (r->method_number != M_GET || + r->finfo.filetype == 0 || + r->finfo.filetype == APR_DIR) { + return DECLINED; + } + + dialup_dcfg_t *dcfg = ap_get_module_config(r->per_dir_config, + &dialup_module); + if (dcfg->bytes_per_second == 0) { + return DECLINED; + } + core_dir_config *ccfg = ap_get_module_config(r->per_dir_config, + &core_module); + + apr_file_t *fd; + + rv = apr_file_open(&fd, r->filename, APR_READ | APR_BINARY +#if APR_HAS_SENDFILE + | ((ccfg->enable_sendfile == ENABLE_SENDFILE_OFF) + ? 0 : APR_SENDFILE_ENABLED) +#endif + , 0, r->pool); + + if (rv) { + return DECLINED; + } + + /* copied from default handler: */ + ap_update_mtime(r, r->finfo.mtime); + ap_set_last_modified(r); + ap_set_etag(r); + apr_table_setn(r->headers_out, "Accept-Ranges", "bytes"); + ap_set_content_length(r, r->finfo.size); + + status = ap_meets_conditions(r); + if (status != OK) { + ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, + "dialup: declined, meets conditions, good luck core handler"); + return DECLINED; + } + + apr_bucket_brigade *bb; + + dialup_baton_t *db = apr_palloc(r->pool, sizeof(dialup_baton_t)); + + db->bb = apr_brigade_create(r->pool, r->connection->bucket_alloc); + db->tmpbb = apr_brigade_create(r->pool, r->connection->bucket_alloc); + + apr_bucket *e; + + e = apr_brigade_insert_file(db->bb, fd, 0, r->finfo.size, r->pool); + +#if APR_HAS_MMAP + if (ccfg->enable_mmap == ENABLE_MMAP_OFF) { + apr_bucket_file_enable_mmap(e, 0); + } +#endif + + + db->bytes_per_second = dcfg->bytes_per_second; + db->r = r; + db->fd = fd; + + e = apr_bucket_eos_create(r->connection->bucket_alloc); + + APR_BRIGADE_INSERT_TAIL(db->bb, e); + + status = dialup_send_pulse(db); + if (status != SUSPENDED && status != DONE) { + ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, + "dialup: failed, send pulse"); + return status; + } + + ap_mpm_register_timed_callback(apr_time_from_sec(1), dialup_callback, db); + + return SUSPENDED; +} + + + +#ifndef APR_HOOK_ALMOST_LAST +#define APR_HOOK_ALMOST_LAST (APR_HOOK_REALLY_LAST - 1) +#endif + +static void +dialup_register_hooks(apr_pool_t *p) +{ + ap_hook_handler(dialup_handler, NULL, NULL, APR_HOOK_ALMOST_LAST); +} + +typedef struct modem_speed_t { + const char *name; + apr_size_t bytes_per_second; +} modem_speed_t; + +#ifndef BITRATE_TO_BYTES +#define BITRATE_TO_BYTES(x) ((1000 * x)/8) +#endif + +static const modem_speed_t modem_bitrates[] = +{ + {"V.21", BITRATE_TO_BYTES(0.1)}, + {"V.26bis", BITRATE_TO_BYTES(2.4)}, + {"V.32", BITRATE_TO_BYTES(9.6)}, + {"V.34", BITRATE_TO_BYTES(28.8)}, + {"V.92", BITRATE_TO_BYTES(56.0)}, + {"i-was-rich-and-got-a-leased-line", BITRATE_TO_BYTES(1500)}, + {NULL, 0} +}; + +static const char * +cmd_modem_standard(cmd_parms *cmd, + void *dconf, + const char *input) +{ + const modem_speed_t *standard; + int i = 0; + dialup_dcfg_t *dcfg = (dialup_dcfg_t*)dconf; + + dcfg->bytes_per_second = 0; + + while (modem_bitrates[i].name != NULL) { + standard = &modem_bitrates[i]; + if (strcasecmp(standard->name, input) == 0) { + dcfg->bytes_per_second = standard->bytes_per_second; + break; + } + i++; + } + + if (dcfg->bytes_per_second == 0) { + return "mod_diaulup: Unkonwn Modem Standard specified."; + } + + return NULL; +} + +static void * +dialup_dcfg_create(apr_pool_t *p, char *dummy) +{ + dialup_dcfg_t *cfg = apr_palloc(p, sizeof(dialup_dcfg_t)); + + cfg->bytes_per_second = 0; + + return cfg; +} + + +static const command_rec dialup_cmds[] = +{ + AP_INIT_TAKE1("ModemStandard", cmd_modem_standard, NULL, ACCESS_CONF, + "Modem Standard to.. simulate. " + "Must be one of: 'V.21', 'V.26bis', 'V.32', 'V.34', or 'V.92'"), + NULL +}; + +module AP_MODULE_DECLARE_DATA dialup_module = +{ + STANDARD20_MODULE_STUFF, + dialup_dcfg_create, + NULL, + NULL, + NULL, + dialup_cmds, + dialup_register_hooks +}; diff --git a/server/config.c b/server/config.c index 3b8a00a111..9dbeb25efb 100644 --- a/server/config.c +++ b/server/config.c @@ -381,7 +381,7 @@ AP_CORE_DECLARE(int) ap_invoke_handler(request_rec *r) ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, r, "handler \"%s\" not found for: %s", r->handler, r->filename); } - if ((result != OK) && (result != DONE) && (result != DECLINED) + if ((result != OK) && (result != DONE) && (result != DECLINED) && (result != SUSPENDED) && !ap_is_HTTP_VALID_RESPONSE(result)) { /* If a module is deliberately returning something else * (request_rec in non-HTTP or proprietary extension?) diff --git a/server/mpm/experimental/event/event.c b/server/mpm/experimental/event/event.c index 33868509c4..3aa10e686a 100644 --- a/server/mpm/experimental/event/event.c +++ b/server/mpm/experimental/event/event.c @@ -612,7 +612,9 @@ static int process_socket(apr_pool_t * p, apr_socket_t * sock, * like the Worker MPM does. */ ap_run_process_connection(c); - cs->state = CONN_STATE_LINGER; + if (cs->state != CONN_STATE_SUSPENDED) { + cs->state = CONN_STATE_LINGER; + } } read_request: @@ -796,6 +798,11 @@ static apr_status_t init_pollset(apr_pool_t *p) return APR_SUCCESS; } +static apr_status_t push_timer2worker(timer_event_t* te) +{ + return ap_queue_push_timer(worker_queue, te); +} + static apr_status_t push2worker(const apr_pollfd_t * pfd, apr_pollset_t * pollset) { @@ -871,8 +878,70 @@ static int get_worker(int *have_idle_worker_p) } } +/* XXXXXX: Convert to skiplist or other better data structure + * (yes, this is VERY VERY VERY VERY BAD) + */ + +/* Structures to reuse */ +static APR_RING_HEAD(timer_free_ring_t, timer_event_t) timer_free_ring; +/* Active timers */ +static APR_RING_HEAD(timer_ring_t, timer_event_t) timer_ring; + +static apr_thread_mutex_t *g_timer_ring_mtx; + +AP_DECLARE(void) ap_mpm_register_timed_callback(apr_time_t t, + ap_mpm_callback_fn_t *cbfn, + void *baton) +{ + timer_event_t *ep; + timer_event_t *te; + /* oh yeah, and make locking smarter/fine grained. */ + apr_thread_mutex_lock(g_timer_ring_mtx); + + if (!APR_RING_EMPTY(&timer_free_ring, timer_event_t, link)) { + te = APR_RING_FIRST(&timer_free_ring); + APR_RING_REMOVE(te, link); + } + else { + /* XXXXX: lol, pool allocation without a context from any thread.Yeah. Right. MPMs Suck. */ + te = malloc(sizeof(timer_event_t)); + APR_RING_ELEM_INIT(te, link); + } + + te->cbfunc = cbfn; + te->baton = baton; + /* XXXXX: optimize */ + te->when = t + apr_time_now(); + + /* Okay, insert sorted by when.. */ + int inserted = 0; + for (ep = APR_RING_FIRST(&timer_ring); + ep != APR_RING_SENTINEL(&timer_ring, + timer_event_t, link); + ep = APR_RING_NEXT(ep, link)) + { + if (ep->when > te->when) { + inserted = 1; + APR_RING_INSERT_BEFORE(ep, te, link); + break; + } + } + + if (!inserted) { + APR_RING_INSERT_TAIL(&timer_ring, te, timer_event_t, link); + } + + apr_thread_mutex_unlock(g_timer_ring_mtx); +} + +#ifndef apr_time_from_msec +#define apr_time_from_msec(x) (x * 1000) +#endif + static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) { + timer_event_t *ep; + timer_event_t *te; apr_status_t rc; proc_info *ti = dummy; int process_slot = ti->pid; @@ -891,20 +960,13 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) free(ti); - /* We set this to force apr_pollset to wakeup if there hasn't been any IO - * on any of its sockets. This allows sockets to have been added - * when no other keepalive operations where going on. - * - * current value is 1 second - */ - timeout_interval = 1000000; - /* the following times out events that are really close in the future * to prevent extra poll calls * * current value is .1 second */ #define TIMEOUT_FUDGE_FACTOR 100000 +#define EVENT_FUDGE_FACTOR 10000 rc = init_pollset(tpool); if (rc != APR_SUCCESS) { @@ -927,6 +989,26 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) check_infinite_requests(); } + + { + apr_time_t now = apr_time_now(); + apr_thread_mutex_lock(g_timer_ring_mtx); + + if (!APR_RING_EMPTY(&timer_ring, timer_event_t, link)) { + te = APR_RING_FIRST(&timer_ring); + if (te->when > now) { + timeout_interval = te->when - now; + } + else { + timeout_interval = 1; + } + } + else { + timeout_interval = apr_time_from_msec(100); + } + apr_thread_mutex_unlock(g_timer_ring_mtx); + } + rc = apr_pollset_poll(event_pollset, timeout_interval, &num, &out_pfd); @@ -945,6 +1027,25 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) if (listener_may_exit) break; + { + apr_time_t now = apr_time_now(); + apr_thread_mutex_lock(g_timer_ring_mtx); + for (ep = APR_RING_FIRST(&timer_ring); + ep != APR_RING_SENTINEL(&timer_ring, + timer_event_t, link); + ep = APR_RING_FIRST(&timer_ring)) + { + if (ep->when < now + EVENT_FUDGE_FACTOR) { + APR_RING_REMOVE(ep, link); + push_timer2worker(ep); + } + else { + break; + } + } + apr_thread_mutex_unlock(g_timer_ring_mtx); + } + while (num && get_worker(&have_idle_worker)) { pt = (listener_poll_type *) out_pfd->client_data; if (pt->type == PT_CSD) { @@ -1143,7 +1244,8 @@ static void *APR_THREAD_FUNC worker_thread(apr_thread_t * thd, void *dummy) apr_pool_t *ptrans; /* Pool for per-transaction stuff */ apr_status_t rv; int is_idle = 0; - + timer_event_t *te = NULL; + free(ti); ap_scoreboard_image->servers[process_slot][thread_slot].pid = ap_my_pid; @@ -1171,7 +1273,10 @@ static void *APR_THREAD_FUNC worker_thread(apr_thread_t * thd, void *dummy) if (workers_may_exit) { break; } - rv = ap_queue_pop(worker_queue, &csd, &cs, &ptrans); + + te = NULL; + + rv = ap_queue_pop_something(worker_queue, &csd, &cs, &ptrans, &te); if (rv != APR_SUCCESS) { /* We get APR_EOF during a graceful shutdown once all the @@ -1201,13 +1306,25 @@ static void *APR_THREAD_FUNC worker_thread(apr_thread_t * thd, void *dummy) } continue; } - is_idle = 0; - worker_sockets[thread_slot] = csd; - rv = process_socket(ptrans, csd, cs, process_slot, thread_slot); - if (!rv) { - requests_this_child--; + if (te != NULL) { + + te->cbfunc(te->baton); + + { + apr_thread_mutex_lock(g_timer_ring_mtx); + APR_RING_INSERT_TAIL(&timer_free_ring, te, timer_event_t, link); + apr_thread_mutex_unlock(g_timer_ring_mtx); + } + } + else { + is_idle = 0; + worker_sockets[thread_slot] = csd; + rv = process_socket(ptrans, csd, cs, process_slot, thread_slot); + if (!rv) { + requests_this_child--; + } + worker_sockets[thread_slot] = NULL; } - worker_sockets[thread_slot] = NULL; } ap_update_child_status_from_indexes(process_slot, thread_slot, @@ -1462,6 +1579,10 @@ static void child_main(int child_num_arg) clean_child_exit(APEXIT_CHILDFATAL); } + apr_thread_mutex_create(&g_timer_ring_mtx, APR_THREAD_MUTEX_DEFAULT, pchild); + APR_RING_INIT(&timer_free_ring, timer_event_t, link); + APR_RING_INIT(&timer_ring, timer_event_t, link); + ap_run_child_init(pchild, ap_server_conf); /* done with init critical section */ diff --git a/server/mpm/experimental/event/fdqueue.c b/server/mpm/experimental/event/fdqueue.c index e9f5f5345d..2100504394 100644 --- a/server/mpm/experimental/event/fdqueue.c +++ b/server/mpm/experimental/event/fdqueue.c @@ -268,7 +268,7 @@ apr_status_t ap_queue_info_term(fd_queue_info_t * queue_info) * Detects when the fd_queue_t is empty. This utility function is expected * to be called from within critical sections, and is not threadsafe. */ -#define ap_queue_empty(queue) ((queue)->nelts == 0) +#define ap_queue_empty(queue) ((queue)->nelts == 0 && APR_RING_EMPTY(&queue->timers ,timer_event_t, link)) /** * Callback routine that is called to destroy this @@ -305,6 +305,8 @@ apr_status_t ap_queue_init(fd_queue_t * queue, int queue_capacity, return rv; } + APR_RING_INIT(&queue->timers, timer_event_t, link); + queue->data = apr_palloc(a, queue_capacity * sizeof(fd_queue_elem_t)); queue->bounds = queue_capacity; queue->nelts = 0; @@ -353,14 +355,36 @@ apr_status_t ap_queue_push(fd_queue_t * queue, apr_socket_t * sd, return APR_SUCCESS; } +apr_status_t ap_queue_push_timer(fd_queue_t * queue, timer_event_t *te) +{ + apr_status_t rv; + + if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) { + return rv; + } + + AP_DEBUG_ASSERT(!queue->terminated); + + APR_RING_INSERT_TAIL(&queue->timers, te, timer_event_t, link); + + apr_thread_cond_signal(queue->not_empty); + + if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) { + return rv; + } + + return APR_SUCCESS; +} + /** * Retrieves the next available socket from the queue. If there are no * sockets available, it will block until one becomes available. * Once retrieved, the socket is placed into the address specified by * 'sd'. */ -apr_status_t ap_queue_pop(fd_queue_t * queue, apr_socket_t ** sd, - conn_state_t ** cs, apr_pool_t ** p) +apr_status_t ap_queue_pop_something(fd_queue_t * queue, apr_socket_t ** sd, + conn_state_t ** cs, apr_pool_t ** p, + timer_event_t ** te_out) { fd_queue_elem_t *elem; apr_status_t rv; @@ -389,15 +413,23 @@ apr_status_t ap_queue_pop(fd_queue_t * queue, apr_socket_t ** sd, } } - elem = &queue->data[--queue->nelts]; - *sd = elem->sd; - *cs = elem->cs; - *p = elem->p; + *te_out = NULL; + + if (!APR_RING_EMPTY(&queue->timers, timer_event_t, link)) { + *te_out = APR_RING_FIRST(&queue->timers); + APR_RING_REMOVE(*te_out, link); + } + else { + elem = &queue->data[--queue->nelts]; + *sd = elem->sd; + *cs = elem->cs; + *p = elem->p; #ifdef AP_DEBUG - elem->sd = NULL; - elem->p = NULL; + elem->sd = NULL; + elem->p = NULL; #endif /* AP_DEBUG */ - + } + rv = apr_thread_mutex_unlock(queue->one_big_mutex); return rv; } diff --git a/server/mpm/experimental/event/fdqueue.h b/server/mpm/experimental/event/fdqueue.h index bd18adfd18..9482d71b0c 100644 --- a/server/mpm/experimental/event/fdqueue.h +++ b/server/mpm/experimental/event/fdqueue.h @@ -37,6 +37,8 @@ #endif #include <apr_errno.h> +#include "ap_mpm.h" + typedef struct fd_queue_info_t fd_queue_info_t; apr_status_t ap_queue_info_create(fd_queue_info_t ** queue_info, @@ -54,8 +56,19 @@ struct fd_queue_elem_t }; typedef struct fd_queue_elem_t fd_queue_elem_t; +typedef struct timer_event_t timer_event_t; + +struct timer_event_t { + APR_RING_ENTRY(timer_event_t) link; + apr_time_t when; + ap_mpm_callback_fn_t *cbfunc; + void *baton; +}; + + struct fd_queue_t { + APR_RING_HEAD(timers_t, timer_event_t) timers; fd_queue_elem_t *data; int nelts; int bounds; @@ -73,8 +86,10 @@ apr_status_t ap_queue_init(fd_queue_t * queue, int queue_capacity, apr_pool_t * a); apr_status_t ap_queue_push(fd_queue_t * queue, apr_socket_t * sd, conn_state_t * cs, apr_pool_t * p); -apr_status_t ap_queue_pop(fd_queue_t * queue, apr_socket_t ** sd, - conn_state_t ** cs, apr_pool_t ** p); +apr_status_t ap_queue_push_timer(fd_queue_t *queue, timer_event_t *te); +apr_status_t ap_queue_pop_something(fd_queue_t * queue, apr_socket_t ** sd, + conn_state_t ** cs, apr_pool_t ** p, + timer_event_t ** te); apr_status_t ap_queue_interrupt_all(fd_queue_t * queue); apr_status_t ap_queue_term(fd_queue_t * queue); |