diff options
author | Stefan Eissing <icing@apache.org> | 2022-06-17 11:24:57 +0200 |
---|---|---|
committer | Stefan Eissing <icing@apache.org> | 2022-06-17 11:24:57 +0200 |
commit | ff2ed5d73957148453b0b5bfd9ec4c311101473f (patch) | |
tree | 31803642a264b4edc33de579a4fe9eb843c60b1e /modules/http2 | |
parent | *) mod_http2: fix an edge case in h2_fifo_remove, (diff) | |
download | apache2-ff2ed5d73957148453b0b5bfd9ec4c311101473f.tar.xz apache2-ff2ed5d73957148453b0b5bfd9ec4c311101473f.zip |
*) mod_http2: new implementation of h2 worker pool.
- O(1) cost at registration of connection processing producers
- no limit on registered producers
- join of ongoing work on unregister
- callbacks to unlink dependencies into other h2 code
- memory cleanup on workers deactivation (on idle timeouts)
- idle_limit as apr_time_t instead of seconds
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1902005 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'modules/http2')
-rw-r--r-- | modules/http2/h2_c1.c | 13 | ||||
-rw-r--r-- | modules/http2/h2_config.c | 64 | ||||
-rw-r--r-- | modules/http2/h2_config.h | 5 | ||||
-rw-r--r-- | modules/http2/h2_mplx.c | 63 | ||||
-rw-r--r-- | modules/http2/h2_mplx.h | 10 | ||||
-rw-r--r-- | modules/http2/h2_workers.c | 694 | ||||
-rw-r--r-- | modules/http2/h2_workers.h | 111 | ||||
-rw-r--r-- | modules/http2/mod_http2.c | 4 |
8 files changed, 539 insertions, 425 deletions
diff --git a/modules/http2/h2_c1.c b/modules/http2/h2_c1.c index 1dc0de7c60..7662a0e4fe 100644 --- a/modules/http2/h2_c1.c +++ b/modules/http2/h2_c1.c @@ -56,11 +56,8 @@ apr_status_t h2_c1_child_init(apr_pool_t *pool, server_rec *s) { apr_status_t status = APR_SUCCESS; int minw, maxw; - int max_threads_per_child = 0; - int idle_secs = 0; + apr_time_t idle_limit; - ap_mpm_query(AP_MPMQ_MAX_THREADS, &max_threads_per_child); - status = ap_mpm_query(AP_MPMQ_IS_ASYNC, &async_mpm); if (status != APR_SUCCESS) { /* some MPMs do not implemnent this */ @@ -70,12 +67,8 @@ apr_status_t h2_c1_child_init(apr_pool_t *pool, server_rec *s) h2_config_init(pool); - h2_get_num_workers(s, &minw, &maxw); - idle_secs = h2_config_sgeti(s, H2_CONF_MAX_WORKER_IDLE_SECS); - ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, s, - "h2_workers: min=%d max=%d, mthrpchild=%d, idle_secs=%d", - minw, maxw, max_threads_per_child, idle_secs); - workers = h2_workers_create(s, pool, minw, maxw, idle_secs); + h2_get_workers_config(s, &minw, &maxw, &idle_limit); + workers = h2_workers_create(s, pool, maxw, minw, idle_limit); h2_c_logio_add_bytes_in = APR_RETRIEVE_OPTIONAL_FN(ap_logio_add_bytes_in); h2_c_logio_add_bytes_out = APR_RETRIEVE_OPTIONAL_FN(ap_logio_add_bytes_out); diff --git a/modules/http2/h2_config.c b/modules/http2/h2_config.c index 4df058d95d..da1cf79a07 100644 --- a/modules/http2/h2_config.c +++ b/modules/http2/h2_config.c @@ -57,7 +57,7 @@ typedef struct h2_config { int h2_window_size; /* stream window size (http2) */ int min_workers; /* min # of worker threads/child */ int max_workers; /* max # of worker threads/child */ - int max_worker_idle_secs; /* max # of idle seconds for worker */ + apr_interval_time_t idle_limit; /* max duration for idle workers */ int stream_max_mem_size; /* max # bytes held in memory/stream */ int h2_direct; /* if mod_h2 is active directly */ int modern_tls_only; /* Accept only modern TLS in HTTP/2 connections */ @@ -93,7 +93,7 @@ static h2_config defconf = { H2_INITIAL_WINDOW_SIZE, /* window_size */ -1, /* min workers */ -1, /* max workers */ - 10 * 60, /* max workers idle secs */ + apr_time_from_sec(10 * 60), /* workers idle limit */ 32 * 1024, /* stream max mem size */ -1, /* h2 direct mode */ 1, /* modern TLS only */ @@ -136,7 +136,7 @@ void *h2_config_create_svr(apr_pool_t *pool, server_rec *s) conf->h2_window_size = DEF_VAL; conf->min_workers = DEF_VAL; conf->max_workers = DEF_VAL; - conf->max_worker_idle_secs = DEF_VAL; + conf->idle_limit = DEF_VAL; conf->stream_max_mem_size = DEF_VAL; conf->h2_direct = DEF_VAL; conf->modern_tls_only = DEF_VAL; @@ -152,7 +152,7 @@ void *h2_config_create_svr(apr_pool_t *pool, server_rec *s) conf->padding_bits = DEF_VAL; conf->padding_always = DEF_VAL; conf->output_buffered = DEF_VAL; - conf->stream_timeout = DEF_VAL; + conf->stream_timeout = DEF_VAL; return conf; } @@ -168,7 +168,7 @@ static void *h2_config_merge(apr_pool_t *pool, void *basev, void *addv) n->h2_window_size = H2_CONFIG_GET(add, base, h2_window_size); n->min_workers = H2_CONFIG_GET(add, base, min_workers); n->max_workers = H2_CONFIG_GET(add, base, max_workers); - n->max_worker_idle_secs = H2_CONFIG_GET(add, base, max_worker_idle_secs); + n->idle_limit = H2_CONFIG_GET(add, base, idle_limit); n->stream_max_mem_size = H2_CONFIG_GET(add, base, stream_max_mem_size); n->h2_direct = H2_CONFIG_GET(add, base, h2_direct); n->modern_tls_only = H2_CONFIG_GET(add, base, modern_tls_only); @@ -194,7 +194,7 @@ static void *h2_config_merge(apr_pool_t *pool, void *basev, void *addv) n->early_hints = H2_CONFIG_GET(add, base, early_hints); n->padding_bits = H2_CONFIG_GET(add, base, padding_bits); n->padding_always = H2_CONFIG_GET(add, base, padding_always); - n->stream_timeout = H2_CONFIG_GET(add, base, stream_timeout); + n->stream_timeout = H2_CONFIG_GET(add, base, stream_timeout); return n; } @@ -248,8 +248,8 @@ static apr_int64_t h2_srv_config_geti64(const h2_config *conf, h2_config_var_t v return H2_CONFIG_GET(conf, &defconf, min_workers); case H2_CONF_MAX_WORKERS: return H2_CONFIG_GET(conf, &defconf, max_workers); - case H2_CONF_MAX_WORKER_IDLE_SECS: - return H2_CONFIG_GET(conf, &defconf, max_worker_idle_secs); + case H2_CONF_MAX_WORKER_IDLE_LIMIT: + return H2_CONFIG_GET(conf, &defconf, idle_limit); case H2_CONF_STREAM_MAX_MEM: return H2_CONFIG_GET(conf, &defconf, stream_max_mem_size); case H2_CONF_MODERN_TLS_ONLY: @@ -298,9 +298,6 @@ static void h2_srv_config_seti(h2_config *conf, h2_config_var_t var, int val) case H2_CONF_MAX_WORKERS: H2_CONFIG_SET(conf, max_workers, val); break; - case H2_CONF_MAX_WORKER_IDLE_SECS: - H2_CONFIG_SET(conf, max_worker_idle_secs, val); - break; case H2_CONF_STREAM_MAX_MEM: H2_CONFIG_SET(conf, stream_max_mem_size, val); break; @@ -354,6 +351,9 @@ static void h2_srv_config_seti64(h2_config *conf, h2_config_var_t var, apr_int64 case H2_CONF_STREAM_TIMEOUT: H2_CONFIG_SET(conf, stream_timeout, val); break; + case H2_CONF_MAX_WORKER_IDLE_LIMIT: + H2_CONFIG_SET(conf, idle_limit, val); + break; default: h2_srv_config_seti(conf, var, (int)val); break; @@ -557,14 +557,15 @@ static const char *h2_conf_set_max_workers(cmd_parms *cmd, return NULL; } -static const char *h2_conf_set_max_worker_idle_secs(cmd_parms *cmd, - void *dirconf, const char *value) +static const char *h2_conf_set_max_worker_idle_limit(cmd_parms *cmd, + void *dirconf, const char *value) { - int val = (int)apr_atoi64(value); - if (val < 1) { - return "value must be > 0"; + apr_interval_time_t timeout; + apr_status_t rv = ap_timeout_parameter_parse(value, &timeout, "s"); + if (rv != APR_SUCCESS) { + return "Invalid idle limit value"; } - CONFIG_CMD_SET(cmd, dirconf, H2_CONF_MAX_WORKER_IDLE_SECS, val); + CONFIG_CMD_SET64(cmd, dirconf, H2_CONF_MAX_WORKER_IDLE_LIMIT, timeout); return NULL; } @@ -868,27 +869,22 @@ static const char *h2_conf_set_stream_timeout(cmd_parms *cmd, return NULL; } -void h2_get_num_workers(server_rec *s, int *minw, int *maxw) +void h2_get_workers_config(server_rec *s, int *pminw, int *pmaxw, + apr_time_t *pidle_limit) { int threads_per_child = 0; - *minw = h2_config_sgeti(s, H2_CONF_MIN_WORKERS); - *maxw = h2_config_sgeti(s, H2_CONF_MAX_WORKERS); - ap_mpm_query(AP_MPMQ_MAX_THREADS, &threads_per_child); + *pminw = h2_config_sgeti(s, H2_CONF_MIN_WORKERS); + *pmaxw = h2_config_sgeti(s, H2_CONF_MAX_WORKERS); - if (*minw <= 0) { - *minw = threads_per_child; - } - if (*maxw <= 0) { - /* As a default, this seems to work quite well under mpm_event. - * For people enabling http2 under mpm_prefork, start 4 threads unless - * configured otherwise. People get unhappy if their http2 requests are - * blocking each other. */ - *maxw = 3 * (*minw) / 2; - if (*maxw < 4) { - *maxw = 4; - } + ap_mpm_query(AP_MPMQ_MAX_THREADS, &threads_per_child); + if (*pminw <= 0) { + *pminw = threads_per_child; + } + if (*pmaxw <= 0) { + *pmaxw = H2MAX(4, 3 * (*pminw) / 2); } + *pidle_limit = h2_config_sgeti64(s, H2_CONF_MAX_WORKER_IDLE_LIMIT); } #define AP_END_CMD AP_INIT_TAKE1(NULL, NULL, NULL, RSRC_CONF, NULL) @@ -902,7 +898,7 @@ const command_rec h2_cmds[] = { RSRC_CONF, "minimum number of worker threads per child"), AP_INIT_TAKE1("H2MaxWorkers", h2_conf_set_max_workers, NULL, RSRC_CONF, "maximum number of worker threads per child"), - AP_INIT_TAKE1("H2MaxWorkerIdleSeconds", h2_conf_set_max_worker_idle_secs, NULL, + AP_INIT_TAKE1("H2MaxWorkerIdleSeconds", h2_conf_set_max_worker_idle_limit, NULL, RSRC_CONF, "maximum number of idle seconds before a worker shuts down"), AP_INIT_TAKE1("H2StreamMaxMemSize", h2_conf_set_stream_max_mem_size, NULL, RSRC_CONF, "maximum number of bytes buffered in memory for a stream"), diff --git a/modules/http2/h2_config.h b/modules/http2/h2_config.h index c150fe21d8..6d2e65f926 100644 --- a/modules/http2/h2_config.h +++ b/modules/http2/h2_config.h @@ -28,7 +28,7 @@ typedef enum { H2_CONF_WIN_SIZE, H2_CONF_MIN_WORKERS, H2_CONF_MAX_WORKERS, - H2_CONF_MAX_WORKER_IDLE_SECS, + H2_CONF_MAX_WORKER_IDLE_LIMIT, H2_CONF_STREAM_MAX_MEM, H2_CONF_DIRECT, H2_CONF_MODERN_TLS_ONLY, @@ -88,7 +88,8 @@ apr_int64_t h2_config_rgeti64(request_rec *r, h2_config_var_t var); apr_array_header_t *h2_config_push_list(request_rec *r); -void h2_get_num_workers(server_rec *s, int *minw, int *maxw); +void h2_get_workers_config(server_rec *s, int *pminw, int *pmaxw, + apr_time_t *pidle_limit); void h2_config_init(apr_pool_t *pool); const struct h2_priority *h2_cconfig_get_priority(conn_rec *c, const char *content_type); diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index 205d19f020..bb6b324115 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -58,6 +58,9 @@ typedef struct { apr_size_t count; } stream_iter_ctx; +static conn_rec *c2_prod_next(void *baton, int *phas_more); +static void c2_prod_done(void *baton, conn_rec *c2); + static void s_mplx_be_happy(h2_mplx *m, conn_rec *c, h2_conn_ctx_t *conn_ctx); static void m_be_annoyed(h2_mplx *m); @@ -303,7 +306,7 @@ h2_mplx *h2_mplx_c1_create(int child_num, apr_uint32_t id, h2_stream *stream0, m->q = h2_iq_create(m->pool, m->max_streams); m->workers = workers; - m->processing_max = m->max_streams; + m->processing_max = H2MIN(h2_workers_get_max_workers(workers), m->max_streams); m->processing_limit = 6; /* the original h1 max parallel connections */ m->last_mood_change = apr_time_now(); m->mood_update_interval = apr_time_from_msec(100); @@ -332,6 +335,9 @@ h2_mplx *h2_mplx_c1_create(int child_num, apr_uint32_t id, h2_stream *stream0, m->max_spare_transits = 3; m->c2_transits = apr_array_make(m->pool, m->max_spare_transits, sizeof(h2_c2_transit*)); + m->producer = h2_workers_register(workers, m->pool, + apr_psprintf(m->pool, "h2-%d", (int)m->id), + c2_prod_next, c2_prod_done, m); return m; failure: @@ -440,8 +446,7 @@ void h2_mplx_c1_destroy(h2_mplx *m) /* How to shut down a h2 connection: * 0. abort and tell the workers that no more work will come from us */ m->aborted = 1; - h2_workers_unregister(m->workers, m); - + H2_MPLX_ENTER_ALWAYS(m); /* While really terminating any c2 connections, treat the master @@ -485,6 +490,10 @@ void h2_mplx_c1_destroy(h2_mplx *m) } } + H2_MPLX_LEAVE(m); + h2_workers_join(m->workers, m->producer); + H2_MPLX_ENTER_ALWAYS(m); + /* 4. With all workers done, all streams should be in spurge */ ap_assert(m->processing_count == 0); if (!h2_ihash_empty(m->shold)) { @@ -687,15 +696,13 @@ void h2_mplx_c1_process(h2_mplx *m, H2_MPLX_MSG(m, "stream %d not found to process"), sid); } } - if (!m->is_registered && !h2_iq_empty(m->q)) { - m->is_registered = 1; + if ((m->processing_count < m->processing_limit) && !h2_iq_empty(m->q)) { H2_MPLX_LEAVE(m); - rv = h2_workers_register(m->workers, m); + rv = h2_workers_activate(m->workers, m->producer); H2_MPLX_ENTER_ALWAYS(m); if (rv != APR_SUCCESS) { - m->is_registered = 0; ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, m->c1, APLOGNO(10021) - H2_MPLX_MSG(m, "register at workers")); + H2_MPLX_MSG(m, "activate at workers")); } } *pstream_count = (int)h2_ihash_count(m->streams); @@ -863,24 +870,18 @@ cleanup: return c2; } -apr_status_t h2_mplx_worker_pop_c2(h2_mplx *m, conn_rec **out_c) +static conn_rec *c2_prod_next(void *baton, int *phas_more) { - apr_status_t rv; + h2_mplx *m = baton; + conn_rec *c = NULL; H2_MPLX_ENTER_ALWAYS(m); - if (m->aborted) { - *out_c = NULL; - rv = APR_EOF; - } - else { - *out_c = s_next_c2(m); - rv = (*out_c != NULL && !h2_iq_empty(m->q))? APR_EAGAIN : APR_SUCCESS; - } - if (APR_EAGAIN != rv) { - m->is_registered = 0; /* h2_workers will discard this mplx */ + if (!m->aborted) { + c = s_next_c2(m); + *phas_more = (c != NULL && !h2_iq_empty(m->q)); } H2_MPLX_LEAVE(m); - return rv; + return c; } static void s_c2_done(h2_mplx *m, conn_rec *c2, h2_conn_ctx_t *conn_ctx) @@ -947,34 +948,18 @@ static void s_c2_done(h2_mplx *m, conn_rec *c2, h2_conn_ctx_t *conn_ctx) } } -void h2_mplx_worker_c2_done(conn_rec *c2) +static void c2_prod_done(void *baton, conn_rec *c2) { + h2_mplx *m = baton; h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c2); - h2_mplx *m; AP_DEBUG_ASSERT(conn_ctx); - m = conn_ctx->mplx; H2_MPLX_ENTER_ALWAYS(m); --m->processing_count; s_c2_done(m, c2, conn_ctx); if (m->join_wait) apr_thread_cond_signal(m->join_wait); - if (!m->aborted && !m->is_registered - && (m->processing_count < m->processing_limit) - && !h2_iq_empty(m->q)) { - /* We have a limit on the amount of c2s we process at a time. When - * this is reached, we do no longer have things to do for h2 workers - * and they remove such an mplx from its queue. - * When a c2 is done, there might then be room for more processing - * and we need then to register this mplx at h2 workers again. - */ - m->is_registered = 1; - H2_MPLX_LEAVE(m); - h2_workers_register(m->workers, m); - return; - } - H2_MPLX_LEAVE(m); } diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h index 42faf051ad..e056acacdd 100644 --- a/modules/http2/h2_mplx.h +++ b/modules/http2/h2_mplx.h @@ -44,6 +44,8 @@ struct h2_iqueue; #include <apr_queue.h> +#include "h2_workers.h" + typedef struct h2_c2_transit h2_c2_transit; struct h2_c2_transit { @@ -63,7 +65,7 @@ struct h2_mplx { int aborted; int polling; /* is waiting/processing pollset events */ - int is_registered; /* is registered at h2_workers */ + ap_conn_producer_t *producer; /* registered producer at h2_workers */ struct h2_ihash_t *streams; /* all streams active */ struct h2_ihash_t *shold; /* all streams done with c2 processing ongoing */ @@ -218,12 +220,6 @@ const struct h2_stream *h2_mplx_c2_stream_get(h2_mplx *m, int stream_id); */ apr_status_t h2_mplx_worker_pop_c2(h2_mplx *m, conn_rec **out_c2); -/** - * A h2 worker reports a secondary connection processing done. - * @param c2 the secondary connection finished processing - */ -void h2_mplx_worker_c2_done(conn_rec *c2); - #define H2_MPLX_MSG(m, msg) \ "h2_mplx(%d-%lu): "msg, m->child_num, (unsigned long)m->id diff --git a/modules/http2/h2_workers.c b/modules/http2/h2_workers.c index 22c31f4f83..c8796aeeac 100644 --- a/modules/http2/h2_workers.c +++ b/modules/http2/h2_workers.c @@ -15,7 +15,7 @@ */ #include <assert.h> -#include <apr_atomic.h> +#include <apr_ring.h> #include <apr_thread_mutex.h> #include <apr_thread_cond.h> @@ -33,309 +33,329 @@ #include "h2_workers.h" #include "h2_util.h" +typedef enum { + PROD_IDLE, + PROD_ACTIVE, + PROD_JOINED, +} prod_state_t; + +struct ap_conn_producer_t { + APR_RING_ENTRY(ap_conn_producer_t) link; + const char *name; + void *baton; + ap_conn_producer_next *fn_next; + ap_conn_producer_done *fn_done; + volatile prod_state_t state; + volatile int conns_active; +}; + + +typedef enum { + H2_SLOT_FREE, + H2_SLOT_RUN, + H2_SLOT_ZOMBIE, +} h2_slot_state_t; + typedef struct h2_slot h2_slot; struct h2_slot { + APR_RING_ENTRY(h2_slot) link; int id; - h2_slot *next; + apr_pool_t *pool; + h2_slot_state_t state; + volatile int should_shutdown; + volatile int is_idle; h2_workers *workers; - conn_rec *connection; + ap_conn_producer_t *prod; apr_thread_t *thread; - apr_thread_mutex_t *lock; - apr_thread_cond_t *not_idle; - /* atomic */ apr_uint32_t timed_out; + struct apr_thread_cond_t *more_work; + int activations; }; -static h2_slot *pop_slot(h2_slot *volatile *phead) -{ - /* Atomically pop a slot from the list */ - for (;;) { - h2_slot *first = *phead; - if (first == NULL) { - return NULL; - } - if (apr_atomic_casptr((void*)phead, first->next, first) == first) { - first->next = NULL; - return first; - } - } -} +struct h2_workers { + server_rec *s; + apr_pool_t *pool; + + apr_uint32_t max_slots; + apr_uint32_t min_active; + volatile int idle_limit; + volatile int aborted; + volatile int shutdown; + int dynamic; + + volatile apr_uint32_t active_slots; + volatile apr_uint32_t idle_slots; + + apr_threadattr_t *thread_attr; + h2_slot *slots; + + APR_RING_HEAD(h2_slots_free, h2_slot) free; + APR_RING_HEAD(h2_slots_idle, h2_slot) idle; + APR_RING_HEAD(h2_slots_busy, h2_slot) busy; + APR_RING_HEAD(h2_slots_zombie, h2_slot) zombie; + + APR_RING_HEAD(ap_conn_producer_active, ap_conn_producer_t) prod_active; + APR_RING_HEAD(ap_conn_producer_idle, ap_conn_producer_t) prod_idle; + + struct apr_thread_mutex_t *lock; + struct apr_thread_cond_t *prod_done; + struct apr_thread_cond_t *all_done; +}; -static void push_slot(h2_slot *volatile *phead, h2_slot *slot) -{ - /* Atomically push a slot to the list */ - ap_assert(!slot->next); - for (;;) { - h2_slot *next = slot->next = *phead; - if (apr_atomic_casptr((void*)phead, slot, next) == next) { - return; - } - } -} static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx); -static void slot_done(h2_slot *slot); -static apr_status_t activate_slot(h2_workers *workers, h2_slot *slot) +static apr_status_t activate_slot(h2_workers *workers) { + h2_slot *slot; + apr_pool_t *pool; apr_status_t rv; - - slot->workers = workers; - slot->connection = NULL; - apr_thread_mutex_lock(workers->lock); - if (!slot->lock) { - rv = apr_thread_mutex_create(&slot->lock, - APR_THREAD_MUTEX_DEFAULT, - workers->pool); - if (rv != APR_SUCCESS) goto cleanup; + if (APR_RING_EMPTY(&workers->free, h2_slot, link)) { + return APR_EAGAIN; } + slot = APR_RING_FIRST(&workers->free); + ap_assert(slot->state == H2_SLOT_FREE); + APR_RING_REMOVE(slot, link); - if (!slot->not_idle) { - rv = apr_thread_cond_create(&slot->not_idle, workers->pool); - if (rv != APR_SUCCESS) goto cleanup; - } - ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, - "h2_workers: new thread for slot %d", slot->id); + "h2_workers: activate slot %d", slot->id); + + slot->state = H2_SLOT_RUN; + slot->should_shutdown = 0; + slot->is_idle = 0; + slot->pool = NULL; + ++workers->active_slots; + rv = apr_pool_create(&pool, workers->pool); + if (APR_SUCCESS != rv) goto cleanup; + apr_pool_tag(pool, "h2_worker_slot"); + slot->pool = pool; - /* thread will either immediately start work or add itself - * to the idle queue */ - apr_atomic_inc32(&workers->worker_count); - apr_atomic_set32(&slot->timed_out, 0); rv = ap_thread_create(&slot->thread, workers->thread_attr, - slot_run, slot, workers->pool); - if (rv != APR_SUCCESS) { - apr_atomic_dec32(&workers->worker_count); - } + slot_run, slot, slot->pool); cleanup: - apr_thread_mutex_unlock(workers->lock); if (rv != APR_SUCCESS) { - push_slot(&workers->free, slot); - } - return rv; -} - -static apr_status_t add_worker(h2_workers *workers) -{ - h2_slot *slot = pop_slot(&workers->free); - if (slot) { - return activate_slot(workers, slot); - } - return APR_EAGAIN; -} - -static void wake_idle_worker(h2_workers *workers) -{ - h2_slot *slot;; - for (;;) { - slot = pop_slot(&workers->idle); - if (!slot) { - if (workers->dynamic && apr_atomic_read32(&workers->shutdown) == 0) { - add_worker(workers); - } - return; - } - if (!apr_atomic_read32(&slot->timed_out)) { - apr_thread_mutex_lock(slot->lock); - apr_thread_cond_signal(slot->not_idle); - apr_thread_mutex_unlock(slot->lock); - return; + AP_DEBUG_ASSERT(0); + slot->state = H2_SLOT_FREE; + if (slot->pool) { + apr_pool_destroy(slot->pool); + slot->pool = NULL; } - slot_done(slot); + APR_RING_INSERT_TAIL(&workers->free, slot, h2_slot, link); + --workers->active_slots; } + return rv; } static void join_zombies(h2_workers *workers) { h2_slot *slot; - while ((slot = pop_slot(&workers->zombies))) { - apr_status_t status; + apr_status_t status; + + while (!APR_RING_EMPTY(&workers->zombie, h2_slot, link)) { + slot = APR_RING_FIRST(&workers->zombie); + APR_RING_REMOVE(slot, link); + ap_assert(slot->state == H2_SLOT_ZOMBIE); ap_assert(slot->thread != NULL); + + apr_thread_mutex_unlock(workers->lock); apr_thread_join(&status, slot->thread); - slot->thread = NULL; + apr_thread_mutex_lock(workers->lock); - push_slot(&workers->free, slot); + slot->thread = NULL; + slot->state = H2_SLOT_FREE; + if (slot->pool) { + apr_pool_destroy(slot->pool); + slot->pool = NULL; + } + APR_RING_INSERT_TAIL(&workers->free, slot, h2_slot, link); } } -static apr_status_t slot_pull_c2(h2_slot *slot, h2_mplx *m) +static void wake_idle_worker(h2_workers *workers, ap_conn_producer_t *prod) { - apr_status_t rv; - - rv = h2_mplx_worker_pop_c2(m, &slot->connection); - if (slot->connection) { - return rv; + if (!APR_RING_EMPTY(&workers->idle, h2_slot, link)) { + h2_slot *slot; + for (slot = APR_RING_FIRST(&workers->idle); + slot != APR_RING_SENTINEL(&workers->idle, h2_slot, link); + slot = APR_RING_NEXT(slot, link)) { + if (slot->is_idle && !slot->should_shutdown) { + apr_thread_cond_signal(slot->more_work); + slot->is_idle = 0; + return; + } + } + } + if (workers->dynamic && !workers->shutdown + && (workers->active_slots < workers->max_slots)) { + activate_slot(workers); } - return APR_EOF; -} - -static h2_fifo_op_t mplx_peek(void *head, void *ctx) -{ - h2_mplx *m = head; - h2_slot *slot = ctx; - - if (slot_pull_c2(slot, m) == APR_EAGAIN) { - wake_idle_worker(slot->workers); - return H2_FIFO_OP_REPUSH; - } - return H2_FIFO_OP_PULL; } /** - * Get the next c2 for the given worker. Will block until a c2 arrives - * or the max_wait timer expires and more than min workers exist. + * Get the next connection to work on. */ -static int get_next(h2_slot *slot) +static conn_rec *get_next(h2_slot *slot) { h2_workers *workers = slot->workers; - int non_essential = slot->id >= workers->min_workers; - apr_status_t rv; - - while (apr_atomic_read32(&workers->aborted) == 0 - && apr_atomic_read32(&slot->timed_out) == 0) { - ap_assert(slot->connection == NULL); - if (non_essential && apr_atomic_read32(&workers->shutdown)) { - /* Terminate non-essential worker on shutdown */ - break; + conn_rec *c = NULL; + ap_conn_producer_t *prod; + int has_more; + + slot->prod = NULL; + if (!APR_RING_EMPTY(&workers->prod_active, ap_conn_producer_t, link)) { + slot->prod = prod = APR_RING_FIRST(&workers->prod_active); + APR_RING_REMOVE(prod, link); + AP_DEBUG_ASSERT(PROD_ACTIVE == prod->state); + + c = prod->fn_next(prod->baton, &has_more); + if (c && has_more) { + APR_RING_INSERT_TAIL(&workers->prod_active, prod, ap_conn_producer_t, link); + wake_idle_worker(workers, slot->prod); } - if (h2_fifo_try_peek(workers->mplxs, mplx_peek, slot) == APR_EOF) { - /* The queue is terminated with the MPM child being cleaned up, - * just leave. */ - break; + else { + prod->state = PROD_IDLE; + APR_RING_INSERT_TAIL(&workers->prod_idle, prod, ap_conn_producer_t, link); } - if (slot->connection) { - return 1; + if (c) { + ++prod->conns_active; } - - join_zombies(workers); - - apr_thread_mutex_lock(slot->lock); - if (apr_atomic_read32(&workers->aborted) == 0) { - apr_uint32_t idle_secs; - - push_slot(&workers->idle, slot); - if (non_essential - && (idle_secs = apr_atomic_read32(&workers->max_idle_secs))) { - rv = apr_thread_cond_timedwait(slot->not_idle, slot->lock, - apr_time_from_sec(idle_secs)); - if (APR_TIMEUP == rv) { - apr_atomic_set32(&slot->timed_out, 1); - } - } - else { - apr_thread_cond_wait(slot->not_idle, slot->lock); - } - } - apr_thread_mutex_unlock(slot->lock); } - return 0; + return c; } -static void slot_done(h2_slot *slot) +static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx) { + h2_slot *slot = wctx; h2_workers *workers = slot->workers; + conn_rec *c; + apr_status_t rv; - push_slot(&workers->zombies, slot); + apr_thread_mutex_lock(workers->lock); + slot->state = H2_SLOT_RUN; + ++slot->activations; + APR_RING_ELEM_INIT(slot, link); + for(;;) { + if (APR_RING_NEXT(slot, link) != slot) { + /* slot is part of the idle ring from the last loop */ + APR_RING_REMOVE(slot, link); + --workers->idle_slots; + } + slot->is_idle = 0; + + if (!workers->aborted && !slot->should_shutdown) { + APR_RING_INSERT_TAIL(&workers->busy, slot, h2_slot, link); + do { + c = get_next(slot); + if (!c) { + break; + } + apr_thread_mutex_unlock(workers->lock); + /* See the discussion at <https://github.com/icing/mod_h2/issues/195> + * + * Each conn_rec->id is supposed to be unique at a point in time. Since + * some modules (and maybe external code) uses this id as an identifier + * for the request_rec they handle, it needs to be unique for secondary + * connections also. + * + * The MPM module assigns the connection ids and mod_unique_id is using + * that one to generate identifier for requests. While the implementation + * works for HTTP/1.x, the parallel execution of several requests per + * connection will generate duplicate identifiers on load. + * + * The original implementation for secondary connection identifiers used + * to shift the master connection id up and assign the stream id to the + * lower bits. This was cramped on 32 bit systems, but on 64bit there was + * enough space. + * + * As issue 195 showed, mod_unique_id only uses the lower 32 bit of the + * connection id, even on 64bit systems. Therefore collisions in request ids. + * + * The way master connection ids are generated, there is some space "at the + * top" of the lower 32 bits on allmost all systems. If you have a setup + * with 64k threads per child and 255 child processes, you live on the edge. + * + * The new implementation shifts 8 bits and XORs in the worker + * id. This will experience collisions with > 256 h2 workers and heavy + * load still. There seems to be no way to solve this in all possible + * configurations by mod_h2 alone. + */ + if (c->master) { + c->id = (c->master->id << 8)^slot->id; + } + c->current_thread = thread; + AP_DEBUG_ASSERT(slot->prod); - /* If this worker is the last one exiting and the MPM child is stopping, - * unblock workers_pool_cleanup(). - */ - if (!apr_atomic_dec32(&workers->worker_count) - && apr_atomic_read32(&workers->aborted)) { - apr_thread_mutex_lock(workers->lock); - apr_thread_cond_signal(workers->all_done); - apr_thread_mutex_unlock(workers->lock); - } -} + ap_process_connection(c, ap_get_conn_socket(c)); + slot->prod->fn_done(slot->prod->baton, c); + apr_thread_mutex_lock(workers->lock); + if (--slot->prod->conns_active <= 0) { + apr_thread_cond_broadcast(workers->prod_done); + } + if (slot->prod->state == PROD_IDLE) { + APR_RING_REMOVE(slot->prod, link); + slot->prod->state = PROD_ACTIVE; + APR_RING_INSERT_TAIL(&workers->prod_active, slot->prod, ap_conn_producer_t, link); + } -static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx) -{ - h2_slot *slot = wctx; - conn_rec *c; - - /* Get the next c2 from mplx to process. */ - while (get_next(slot)) { - /* See the discussion at <https://github.com/icing/mod_h2/issues/195> - * - * Each conn_rec->id is supposed to be unique at a point in time. Since - * some modules (and maybe external code) uses this id as an identifier - * for the request_rec they handle, it needs to be unique for secondary - * connections also. - * - * The MPM module assigns the connection ids and mod_unique_id is using - * that one to generate identifier for requests. While the implementation - * works for HTTP/1.x, the parallel execution of several requests per - * connection will generate duplicate identifiers on load. - * - * The original implementation for secondary connection identifiers used - * to shift the master connection id up and assign the stream id to the - * lower bits. This was cramped on 32 bit systems, but on 64bit there was - * enough space. - * - * As issue 195 showed, mod_unique_id only uses the lower 32 bit of the - * connection id, even on 64bit systems. Therefore collisions in request ids. - * - * The way master connection ids are generated, there is some space "at the - * top" of the lower 32 bits on allmost all systems. If you have a setup - * with 64k threads per child and 255 child processes, you live on the edge. - * - * The new implementation shifts 8 bits and XORs in the worker - * id. This will experience collisions with > 256 h2 workers and heavy - * load still. There seems to be no way to solve this in all possible - * configurations by mod_h2 alone. - */ - AP_DEBUG_ASSERT(slot->connection != NULL); - c = slot->connection; - slot->connection = NULL; - c->id = (c->master->id << 8)^slot->id; - c->current_thread = thread; + } while (!workers->aborted && !slot->should_shutdown); + APR_RING_REMOVE(slot, link); /* no longer busy */ + } - ap_process_connection(c, ap_get_conn_socket(c)); + if (workers->aborted || slot->should_shutdown) { + break; + } - h2_mplx_worker_c2_done(c); + join_zombies(workers); + + /* we are idle */ + APR_RING_INSERT_TAIL(&workers->idle, slot, h2_slot, link); + ++workers->idle_slots; + slot->is_idle = 1; + if (slot->id >= workers->min_active && workers->idle_limit) { + rv = apr_thread_cond_timedwait(slot->more_work, workers->lock, + workers->idle_limit); + if (APR_TIMEUP == rv) { + APR_RING_REMOVE(slot, link); + --workers->idle_slots; + ap_log_error(APLOG_MARK, APLOG_ERR, 0, workers->s, + "h2_workers: idle timeout slot %d in state %d (%d activations)", + slot->id, slot->state, slot->activations); + break; + } + } + else { + apr_thread_cond_wait(slot->more_work, workers->lock); + } } - if (apr_atomic_read32(&slot->timed_out) == 0) { - slot_done(slot); + ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, + "h2_workers: terminate slot %d in state %d (%d activations)", + slot->id, slot->state, slot->activations); + slot->is_idle = 0; + slot->state = H2_SLOT_ZOMBIE; + slot->should_shutdown = 0; + APR_RING_INSERT_TAIL(&workers->zombie, slot, h2_slot, link); + --workers->active_slots; + if (workers->active_slots <= 0) { + apr_thread_cond_broadcast(workers->all_done); } + apr_thread_mutex_unlock(workers->lock); apr_thread_exit(thread, APR_SUCCESS); return NULL; } -static void wake_non_essential_workers(h2_workers *workers) +static void wake_all_idles(h2_workers *workers) { h2_slot *slot; - /* pop all idle, signal the non essentials and add the others again */ - if ((slot = pop_slot(&workers->idle))) { - wake_non_essential_workers(workers); - if (slot->id > workers->min_workers) { - apr_thread_mutex_lock(slot->lock); - apr_thread_cond_signal(slot->not_idle); - apr_thread_mutex_unlock(slot->lock); - } - else { - push_slot(&workers->idle, slot); - } - } -} - -static void workers_abort_idle(h2_workers *workers) -{ - h2_slot *slot; - - apr_atomic_set32(&workers->shutdown, 1); - apr_atomic_set32(&workers->aborted, 1); - h2_fifo_term(workers->mplxs); - - /* abort all idle slots */ - while ((slot = pop_slot(&workers->idle))) { - apr_thread_mutex_lock(slot->lock); - apr_thread_cond_signal(slot->not_idle); - apr_thread_mutex_unlock(slot->lock); + for (slot = APR_RING_FIRST(&workers->idle); + slot != APR_RING_SENTINEL(&workers->idle, h2_slot, link); + slot = APR_RING_NEXT(slot, link)) + { + apr_thread_cond_signal(slot->more_work); } } @@ -347,37 +367,47 @@ static apr_status_t workers_pool_cleanup(void *data) int n, wait_sec = 5; ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s, - "h2_workers: cleanup %d workers idling", - (int)apr_atomic_read32(&workers->worker_count)); - workers_abort_idle(workers); + "h2_workers: cleanup %d workers (%d idle)", + workers->active_slots, workers->idle_slots); + apr_thread_mutex_lock(workers->lock); + workers->shutdown = 1; + workers->aborted = 1; + wake_all_idles(workers); + apr_thread_mutex_unlock(workers->lock); /* wait for all the workers to become zombies and join them. * this gets called after the mpm shuts down and all connections * have either been handled (graceful) or we are forced exiting * (ungrateful). Either way, we show limited patience. */ - apr_thread_mutex_lock(workers->lock); end = apr_time_now() + apr_time_from_sec(wait_sec); - while ((n = apr_atomic_read32(&workers->worker_count)) > 0 - && apr_time_now() < end) { + while (apr_time_now() < end) { + apr_thread_mutex_lock(workers->lock); + if (!(n = workers->active_slots)) { + apr_thread_mutex_unlock(workers->lock); + break; + } + wake_all_idles(workers); rv = apr_thread_cond_timedwait(workers->all_done, workers->lock, timeout); + apr_thread_mutex_unlock(workers->lock); + if (APR_TIMEUP == rv) { ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s, - APLOGNO(10290) "h2_workers: waiting for idle workers to close, " - "still seeing %d workers living", - apr_atomic_read32(&workers->worker_count)); - continue; + APLOGNO(10290) "h2_workers: waiting for workers to close, " + "still seeing %d workers (%d idle) living", + workers->active_slots, workers->idle_slots); } } if (n) { ap_log_error(APLOG_MARK, APLOG_WARNING, 0, workers->s, - APLOGNO(10291) "h2_workers: cleanup, %d idle workers " + APLOGNO(10291) "h2_workers: cleanup, %d workers (%d idle) " "did not exit after %d seconds.", - n, wait_sec); + n, workers->idle_slots, wait_sec); } - apr_thread_mutex_unlock(workers->lock); ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s, "h2_workers: cleanup all workers terminated"); + apr_thread_mutex_lock(workers->lock); join_zombies(workers); + apr_thread_mutex_unlock(workers->lock); ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s, "h2_workers: cleanup zombie workers joined"); @@ -385,13 +415,13 @@ static apr_status_t workers_pool_cleanup(void *data) } h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pchild, - int min_workers, int max_workers, - int idle_secs) + int max_slots, int min_active, apr_time_t idle_limit) { apr_status_t rv; h2_workers *workers; apr_pool_t *pool; - int i, n; + apr_allocator_t *allocator; + int i, locked = 0; ap_assert(s); ap_assert(pchild); @@ -401,7 +431,16 @@ h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pchild, * guarded by our lock. Without this pool, all subpool creations would * happen on the pool handed to us, which we do not guard. */ - apr_pool_create(&pool, pchild); + rv = apr_allocator_create(&allocator); + if (rv != APR_SUCCESS) { + goto cleanup; + } + rv = apr_pool_create_ex(&pool, pchild, NULL, allocator); + if (rv != APR_SUCCESS) { + apr_allocator_destroy(allocator); + goto cleanup; + } + apr_allocator_owner_set(allocator, pool); apr_pool_tag(pool, "h2_workers"); workers = apr_pcalloc(pool, sizeof(h2_workers)); if (!workers) { @@ -410,19 +449,23 @@ h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pchild, workers->s = s; workers->pool = pool; - workers->min_workers = min_workers; - workers->max_workers = max_workers; - workers->max_idle_secs = (idle_secs > 0)? idle_secs : 10; + workers->min_active = min_active; + workers->max_slots = max_slots; + workers->idle_limit = (idle_limit > 0)? idle_limit : apr_time_from_sec(10); + workers->dynamic = (workers->min_active < workers->max_slots); - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s, - "h2_workers: created with min=%d max=%d idle_timeout=%d sec", - workers->min_workers, workers->max_workers, - (int)workers->max_idle_secs); - /* FIXME: the fifo set we use here has limited capacity. Once the - * set is full, connections with new requests do a wait. - */ - rv = h2_fifo_set_create(&workers->mplxs, pool, 16 * 1024); - if (rv != APR_SUCCESS) goto cleanup; + ap_log_error(APLOG_MARK, APLOG_INFO, 0, workers->s, + "h2_workers: created with min=%d max=%d idle_ms=%d", + workers->min_active, workers->max_slots, + (int)apr_time_as_msec(idle_limit)); + + APR_RING_INIT(&workers->idle, h2_slot, link); + APR_RING_INIT(&workers->busy, h2_slot, link); + APR_RING_INIT(&workers->free, h2_slot, link); + APR_RING_INIT(&workers->zombie, h2_slot, link); + + APR_RING_INIT(&workers->prod_active, ap_conn_producer_t, link); + APR_RING_INIT(&workers->prod_idle, ap_conn_producer_t, link); rv = apr_threadattr_create(&workers->thread_attr, workers->pool); if (rv != APR_SUCCESS) goto cleanup; @@ -441,32 +484,35 @@ h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pchild, if (rv != APR_SUCCESS) goto cleanup; rv = apr_thread_cond_create(&workers->all_done, workers->pool); if (rv != APR_SUCCESS) goto cleanup; + rv = apr_thread_cond_create(&workers->prod_done, workers->pool); + if (rv != APR_SUCCESS) goto cleanup; - n = workers->nslots = workers->max_workers; - workers->slots = apr_pcalloc(workers->pool, n * sizeof(h2_slot)); - if (workers->slots == NULL) { - n = workers->nslots = 0; - rv = APR_ENOMEM; - goto cleanup; - } - for (i = 0; i < n; ++i) { + apr_thread_mutex_lock(workers->lock); + locked = 1; + + /* create the slots and put them on the free list */ + workers->slots = apr_pcalloc(workers->pool, workers->max_slots * sizeof(h2_slot)); + + for (i = 0; i < workers->max_slots; ++i) { workers->slots[i].id = i; - } - /* we activate all for now, TODO: support min_workers again. - * do this in reverse for vanity reasons so slot 0 will most - * likely be at head of idle queue. */ - n = workers->min_workers; - for (i = n-1; i >= 0; --i) { - rv = activate_slot(workers, &workers->slots[i]); + workers->slots[i].state = H2_SLOT_FREE; + workers->slots[i].workers = workers; + APR_RING_ELEM_INIT(&workers->slots[i], link); + APR_RING_INSERT_TAIL(&workers->free, &workers->slots[i], h2_slot, link); + rv = apr_thread_cond_create(&workers->slots[i].more_work, workers->pool); if (rv != APR_SUCCESS) goto cleanup; } - /* the rest of the slots go on the free list */ - for(i = n; i < workers->nslots; ++i) { - push_slot(&workers->free, &workers->slots[i]); + + /* activate the min amount of workers */ + for (i = 0; i < workers->min_active; ++i) { + rv = activate_slot(workers); + if (rv != APR_SUCCESS) goto cleanup; } - workers->dynamic = (workers->worker_count < workers->max_workers); cleanup: + if (locked) { + apr_thread_mutex_unlock(workers->lock); + } if (rv == APR_SUCCESS) { /* Stop/join the workers threads when the MPM child exits (pchild is * destroyed), and as a pre_cleanup of pchild thus before the threads @@ -476,24 +522,84 @@ cleanup: apr_pool_pre_cleanup_register(pchild, workers, workers_pool_cleanup); return workers; } + ap_log_error(APLOG_MARK, APLOG_DEBUG, rv, workers->s, + "h2_workers: errors initializing"); return NULL; } -apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m) +apr_size_t h2_workers_get_max_workers(h2_workers *workers) { - apr_status_t status = h2_fifo_push(workers->mplxs, m); - wake_idle_worker(workers); - return status; + return workers->max_slots; } -apr_status_t h2_workers_unregister(h2_workers *workers, struct h2_mplx *m) +void h2_workers_graceful_shutdown(h2_workers *workers) { - return h2_fifo_remove(workers->mplxs, m); + apr_thread_mutex_lock(workers->lock); + workers->shutdown = 1; + workers->idle_limit = apr_time_from_sec(1); + wake_all_idles(workers); + apr_thread_mutex_unlock(workers->lock); } -void h2_workers_graceful_shutdown(h2_workers *workers) +ap_conn_producer_t *h2_workers_register(h2_workers *workers, + apr_pool_t *producer_pool, + const char *name, + ap_conn_producer_next *fn_next, + ap_conn_producer_done *fn_done, + void *baton) { - apr_atomic_set32(&workers->shutdown, 1); - apr_atomic_set32(&workers->max_idle_secs, 1); - wake_non_essential_workers(workers); + ap_conn_producer_t *prod; + + prod = apr_pcalloc(producer_pool, sizeof(*prod)); + APR_RING_ELEM_INIT(prod, link); + prod->name = name; + prod->fn_next = fn_next; + prod->fn_done = fn_done; + prod->baton = baton; + + apr_thread_mutex_lock(workers->lock); + prod->state = PROD_IDLE; + APR_RING_INSERT_TAIL(&workers->prod_idle, prod, ap_conn_producer_t, link); + apr_thread_mutex_unlock(workers->lock); + + return prod; +} + +apr_status_t h2_workers_join(h2_workers *workers, ap_conn_producer_t *prod) +{ + apr_status_t rv = APR_SUCCESS; + + apr_thread_mutex_lock(workers->lock); + if (PROD_JOINED == prod->state) { + AP_DEBUG_ASSERT(APR_RING_NEXT(prod, link) == prod); /* should be in no ring */ + rv = APR_EINVAL; + } + else { + AP_DEBUG_ASSERT(PROD_ACTIVE == prod->state || PROD_IDLE == prod->state); + APR_RING_REMOVE(prod, link); + prod->state = PROD_JOINED; /* prevent further activations */ + while (prod->conns_active > 0) { + apr_thread_cond_wait(workers->prod_done, workers->lock); + } + APR_RING_ELEM_INIT(prod, link); /* make it link to itself */ + } + apr_thread_mutex_unlock(workers->lock); + return rv; +} + +apr_status_t h2_workers_activate(h2_workers *workers, ap_conn_producer_t *prod) +{ + apr_status_t rv = APR_SUCCESS; + apr_thread_mutex_lock(workers->lock); + if (PROD_IDLE == prod->state) { + APR_RING_REMOVE(prod, link); + prod->state = PROD_ACTIVE; + APR_RING_INSERT_TAIL(&workers->prod_active, prod, ap_conn_producer_t, link); + wake_idle_worker(workers, prod); + } + else if (PROD_JOINED == prod->state) { + rv = APR_EINVAL; + } + apr_thread_mutex_unlock(workers->lock); + return rv; } diff --git a/modules/http2/h2_workers.h b/modules/http2/h2_workers.h index 0de3040676..20169a0d50 100644 --- a/modules/http2/h2_workers.h +++ b/modules/http2/h2_workers.h @@ -28,59 +28,94 @@ struct h2_mplx; struct h2_request; struct h2_fifo; -struct h2_slot; - typedef struct h2_workers h2_workers; -struct h2_workers { - server_rec *s; - apr_pool_t *pool; - - int next_worker_id; - apr_uint32_t max_workers; - apr_uint32_t min_workers; - /* atomic */ apr_uint32_t worker_count; - /* atomic */ apr_uint32_t max_idle_secs; - /* atomic */ apr_uint32_t aborted; - /* atomic */ apr_uint32_t shutdown; - int dynamic; - apr_threadattr_t *thread_attr; - int nslots; - struct h2_slot *slots; +/** + * Create a worker set with a maximum number of 'slots', e.g. worker + * threads to run. Always keep `min_active` workers running. Shutdown + * any additional workers after `idle_secs` seconds of doing nothing. + * + * @oaram s the base server + * @param pool for allocations + * @param min_active minimum number of workers to run + * @param max_slots maximum number of worker slots + * @param idle_limit upper duration of idle after a non-minimal slots shuts down + */ +h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pool, + int max_slots, int min_active, apr_time_t idle_limit); - struct h2_slot *free; - struct h2_slot *idle; - struct h2_slot *zombies; - - struct h2_fifo *mplxs; - - struct apr_thread_mutex_t *lock; - struct apr_thread_cond_t *all_done; -}; +/** + * Shut down processing gracefully by terminating all idle workers. + */ +void h2_workers_graceful_shutdown(h2_workers *workers); +/** + * Get the maximum number of workers. + */ +apr_size_t h2_workers_get_max_workers(h2_workers *workers); -/* Create a worker pool with the given minimum and maximum number of - * threads. +/** + * ap_conn_producer_t is the source of connections (conn_rec*) to run. + * + * Active producers are queried by idle workers for connections. + * If they do not hand one back, they become inactive and are not + * queried further. `h2_workers_activate()` places them on the active + * list again. + * + * A producer finishing MUST call `h2_workers_join()` which removes + * it completely from workers processing and waits for all ongoing + * work for this producer to be done. */ -h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pool, - int min_size, int max_size, int idle_secs); +typedef struct ap_conn_producer_t ap_conn_producer_t; /** - * Registers a h2_mplx for scheduling. If this h2_mplx runs - * out of work, it will be automatically be unregistered. Should - * new work arrive, it needs to be registered again. + * Ask a producer for the next connection to process. + * @param baton value from producer registration + * @param pconn holds the connection to process on return + * @param pmore if the producer has more connections that may be retrieved + * @return APR_SUCCESS for a connection to process, APR_EAGAIN for no + * connection being available at the time. */ -apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m); +typedef conn_rec *ap_conn_producer_next(void *baton, int *pmore); /** - * Remove a h2_mplx from the worker registry. + * Tell the producer that processing the connection is done. + * @param baton value from producer registration + * @param conn the connection that has been processed. */ -apr_status_t h2_workers_unregister(h2_workers *workers, struct h2_mplx *m); +typedef void ap_conn_producer_done(void *baton, conn_rec *conn); /** - * Shut down processing gracefully by terminating all idle workers. + * Register a new producer with the given `baton` and callback functions. + * Will allocate internal structures from the given pool (but make no use + * of the pool after registration). + * Producers are inactive on registration. See `h2_workers_activate()`. + * @param producer_pool to allocate the producer from + * @param name descriptive name of the producer, must not be unique + * @param fn_next callback for retrieving connections to process + * @param fn_done callback for processed connections + * @param baton provided value passed on in callbacks + * @return the producer instance created */ -void h2_workers_graceful_shutdown(h2_workers *workers); +ap_conn_producer_t *h2_workers_register(h2_workers *workers, + apr_pool_t *producer_pool, + const char *name, + ap_conn_producer_next *fn_next, + ap_conn_producer_done *fn_done, + void *baton); + +/** + * Stop retrieving more connection from the producer and wait + * for all ongoing for from that producer to be done. + */ +apr_status_t h2_workers_join(h2_workers *workers, ap_conn_producer_t *producer); + +/** + * Activate a producer. A worker will query the producer for a connection + * to process, once a worker is available. + * This may be called, irregardless of the producers active/inactive. + */ +apr_status_t h2_workers_activate(h2_workers *workers, ap_conn_producer_t *producer); #endif /* defined(__mod_h2__h2_workers__) */ diff --git a/modules/http2/mod_http2.c b/modules/http2/mod_http2.c index 36acc432f9..a4800c148b 100644 --- a/modules/http2/mod_http2.c +++ b/modules/http2/mod_http2.c @@ -150,7 +150,9 @@ static int http2_is_h2(conn_rec *); static void http2_get_num_workers(server_rec *s, int *minw, int *maxw) { - h2_get_num_workers(s, minw, maxw); + apr_time_t tdummy; + + h2_get_workers_config(s, minw, maxw, &tdummy); } /* Runs once per created child process. Perform any process |