diff options
-rw-r--r-- | server/mpm/event/config.m4 | 2 | ||||
-rw-r--r-- | server/mpm/event/config3.m4 | 2 | ||||
-rw-r--r-- | server/mpm/event/equeue.c | 125 | ||||
-rw-r--r-- | server/mpm/event/equeue.h | 50 | ||||
-rw-r--r-- | server/mpm/event/event.c | 246 |
5 files changed, 101 insertions, 324 deletions
diff --git a/server/mpm/event/config.m4 b/server/mpm/event/config.m4 index 5308af8f29..351f1acf4b 100644 --- a/server/mpm/event/config.m4 +++ b/server/mpm/event/config.m4 @@ -5,6 +5,8 @@ elif test $ac_cv_define_APR_HAS_THREADS != yes; then AC_MSG_RESULT(no - APR does not support threads) elif test $have_threaded_sig_graceful != yes; then AC_MSG_RESULT(no - SIG_GRACEFUL cannot be used with a threaded MPM) +elif test $ac_cv_have_threadsafe_pollset != yes; then + AC_MSG_RESULT(no - APR_POLLSET_THREADSAFE is not supported) else AC_MSG_RESULT(yes) APACHE_MPM_SUPPORTED(event, yes, yes) diff --git a/server/mpm/event/config3.m4 b/server/mpm/event/config3.m4 index c0bf202b5d..5c96fe3c30 100644 --- a/server/mpm/event/config3.m4 +++ b/server/mpm/event/config3.m4 @@ -6,6 +6,6 @@ if test "$ac_cv_serf" = yes ; then fi APACHE_SUBST(MOD_MPM_EVENT_LDADD) -APACHE_MPM_MODULE(event, $enable_mpm_event, event.lo fdqueue.lo equeue.lo pod.lo,[ +APACHE_MPM_MODULE(event, $enable_mpm_event, event.lo fdqueue.lo pod.lo,[ AC_CHECK_FUNCS(pthread_kill) ], , [\$(MOD_MPM_EVENT_LDADD)]) diff --git a/server/mpm/event/equeue.c b/server/mpm/event/equeue.c deleted file mode 100644 index 4750ab1c53..0000000000 --- a/server/mpm/event/equeue.c +++ /dev/null @@ -1,125 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "equeue.h" - -#include <apr_atomic.h> -#include <sched.h> - -struct ap_equeue_t { - apr_uint32_t nelem; - apr_size_t elem_size; - uint8_t *bytes; - volatile apr_uint32_t writeCount; - volatile apr_uint32_t readCount; -}; - - -static APR_INLINE apr_uint32_t count_to_index(ap_equeue_t *eq, apr_uint32_t count) -{ - return (count & (eq->nelem - 1)); -} - -static APR_INLINE void* index_to_bytes(ap_equeue_t *eq, apr_uint32_t idx) -{ - apr_size_t offset = idx * eq->elem_size; - return (void*)&eq->bytes[offset]; -} - -static APR_INLINE apr_uint32_t nearest_power(apr_uint32_t num) -{ - apr_uint32_t n = 1; - while (n < num) { - n <<= 1; - } - - return n; -} - -#if 0 -static void dump_queue(ap_equeue_t *eq) -{ - apr_uint32_t i; - - fprintf(stderr, "dumping %p\n", eq); - fprintf(stderr, " nelem: %u\n", eq->nelem); - fprintf(stderr, " esize: %"APR_SIZE_T_FMT"\n", eq->elem_size); - fprintf(stderr, " wcnt: %u\n", eq->writeCount); - fprintf(stderr, " rcnt: %u\n", eq->writeCount); - fprintf(stderr, " bytes: %p\n", eq->bytes); - for (i = 0; i < eq->nelem; i++) { - fprintf(stderr, " [%u] = %p\n", i, index_to_bytes(eq, i)); - } - - fprintf(stderr, "\n"); - fflush(stderr); -} -#endif - -apr_status_t -ap_equeue_create(apr_pool_t *p, apr_uint32_t nelem, apr_size_t elem_size, ap_equeue_t **eqout) -{ - ap_equeue_t *eq; - - *eqout = NULL; - - eq = apr_palloc(p, sizeof(ap_equeue_t)); - eq->nelem = nearest_power(nelem); - eq->bytes = apr_palloc(p, eq->nelem * elem_size); - eq->elem_size = elem_size; - eq->writeCount = 0; - eq->readCount = 0; - *eqout = eq; - - return APR_SUCCESS; -} - -void * -ap_equeue_reader_next(ap_equeue_t *eq) -{ - if (apr_atomic_read32(&eq->writeCount) == eq->readCount) { - return NULL; - } - else { - apr_uint32_t idx = count_to_index(eq, apr_atomic_inc32(&eq->readCount)); - return index_to_bytes(eq, idx); - } -} - -void * -ap_equeue_writer_value(ap_equeue_t *eq) -{ - apr_uint32_t idx; - - while (1) { - apr_uint32_t readCount = apr_atomic_read32(&eq->readCount); - - if (count_to_index(eq, eq->writeCount + 1) != count_to_index(eq, readCount)) { - break; - } - /* TODO: research if sched_yield is even worth doing */ - sched_yield(); - } - - idx = count_to_index(eq, eq->writeCount); - return index_to_bytes(eq, idx); -} - - -void ap_equeue_writer_onward(ap_equeue_t *eq) -{ - apr_atomic_inc32(&eq->writeCount); -} diff --git a/server/mpm/event/equeue.h b/server/mpm/event/equeue.h deleted file mode 100644 index 9738b00b11..0000000000 --- a/server/mpm/event/equeue.h +++ /dev/null @@ -1,50 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef _event_mpm_equeue_h_ -#define _event_mpm_equeue_h_ - -#include "httpd.h" - -typedef struct ap_equeue_t ap_equeue_t; - -apr_status_t -ap_equeue_create(apr_pool_t *p, - unsigned int nelem, - apr_size_t elem_size, - ap_equeue_t **eqout); - - -/** - * Current value of the reader, returns NULL if the reader is caught up - * with the writer - */ -void* ap_equeue_reader_next(ap_equeue_t *eq); - -/** - * Returns pointer to next available write slot. May block - * in a spin lock if none are available. - */ -void* ap_equeue_writer_value(ap_equeue_t *eq); - -/** - * Move the write position up one, making the previously - * editted value available to the reader. - */ -void ap_equeue_writer_onward(ap_equeue_t *eq); - - -#endif diff --git a/server/mpm/event/event.c b/server/mpm/event/event.c index 982e8e5d1e..d3aba1e4ce 100644 --- a/server/mpm/event/event.c +++ b/server/mpm/event/event.c @@ -97,8 +97,6 @@ #include <limits.h> /* for INT_MAX */ -#include "equeue.h" - #if HAVE_SERF #include "mod_serf.h" #include "serf.h" @@ -185,12 +183,7 @@ static fd_queue_t *worker_queue; static fd_queue_info_t *worker_queue_info; static int mpm_state = AP_MPMQ_STARTING; -typedef enum { - TIMEOUT_WRITE_COMPLETION, - TIMEOUT_KEEPALIVE, - TIMEOUT_LINGER, - TIMEOUT_SHORT_LINGER -} timeout_type_e; +static apr_thread_mutex_t *timeout_mutex; struct event_conn_state_t { /** APR_RING of expiration timeouts */ @@ -208,15 +201,8 @@ struct event_conn_state_t { /** public parts of the connection state */ conn_state_t pub; }; - -typedef struct pollset_op_t { - timeout_type_e timeout_type; - event_conn_state_t *cs; - const char *tag; -} pollset_op_t; - - APR_RING_HEAD(timeout_head_t, event_conn_state_t); + struct timeout_queue { struct timeout_head_t head; int count; @@ -388,7 +374,6 @@ static apr_os_thread_t *listener_os_thread; * perform a non-graceful (forced) shutdown of the server. */ static apr_socket_t **worker_sockets; -static ap_equeue_t **worker_equeues; static void disable_listensocks(int process_slot) { @@ -770,50 +755,20 @@ static void set_signals(void) #endif } -static void process_pollop(pollset_op_t *op) -{ - apr_status_t rv; - event_conn_state_t *cs = op->cs; - - switch (op->timeout_type) { - case TIMEOUT_WRITE_COMPLETION: - TO_QUEUE_APPEND(write_completion_q, cs); - break; - case TIMEOUT_KEEPALIVE: - TO_QUEUE_APPEND(keepalive_q, cs); - break; - case TIMEOUT_LINGER: - TO_QUEUE_APPEND(linger_q, cs); - break; - case TIMEOUT_SHORT_LINGER: - TO_QUEUE_APPEND(short_linger_q, cs); - break; - } - - rv = apr_pollset_add(event_pollset, &op->cs->pfd); - - if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) { - ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(00467) - "%s: apr_pollset_add failure", op->tag); - } -} - /* * close our side of the connection * Pre-condition: cs is not in any timeout queue and not in the pollset, * timeout_mutex is not locked * return: 0 if connection is fully closed, * 1 if connection is lingering - * may be called by listener or by worker thread. - * the eq may be null if called from the listener thread, - * and the pollset operations are done directly by this function. + * may be called by listener or by worker thread */ -static int start_lingering_close(event_conn_state_t *cs, ap_equeue_t *eq) +static int start_lingering_close(event_conn_state_t *cs) { apr_status_t rv; cs->c->sbh = NULL; /* prevent scoreboard updates from the listener - * worker will loop around soon and set SERVER_READY + * worker will loop around and set SERVER_READY soon */ if (ap_start_lingering_close(cs->c)) { @@ -823,15 +778,7 @@ static int start_lingering_close(event_conn_state_t *cs, ap_equeue_t *eq) } else { apr_socket_t *csd = ap_get_conn_socket(cs->c); - pollset_op_t localv; - pollset_op_t *v; - - if (eq) { - v = ap_equeue_writer_value(eq); - } - else { - v = &localv; - } + struct timeout_queue *q; #ifdef AP_DEBUG { @@ -849,26 +796,30 @@ static int start_lingering_close(event_conn_state_t *cs, ap_equeue_t *eq) if (apr_table_get(cs->c->notes, "short-lingering-close")) { cs->expiration_time = apr_time_now() + apr_time_from_sec(SECONDS_TO_LINGER); - v->timeout_type = TIMEOUT_SHORT_LINGER; - v->tag = "start_lingering_close(short)"; + q = &short_linger_q; cs->pub.state = CONN_STATE_LINGER_SHORT; } else { cs->expiration_time = apr_time_now() + apr_time_from_sec(MAX_SECS_TO_LINGER); - v->timeout_type = TIMEOUT_LINGER; - v->tag = "start_lingering_close(normal)"; + q = &linger_q; cs->pub.state = CONN_STATE_LINGER_NORMAL; } - + apr_thread_mutex_lock(timeout_mutex); + TO_QUEUE_APPEND(*q, cs); cs->pfd.reqevents = APR_POLLIN | APR_POLLHUP | APR_POLLERR; - v->cs = cs; - if (eq != NULL) { - ap_equeue_writer_onward(eq); - apr_pollset_wakeup(event_pollset); - } - else { - process_pollop(v); + rv = apr_pollset_add(event_pollset, &cs->pfd); + apr_thread_mutex_unlock(timeout_mutex); + if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) { + ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, + "start_lingering_close: apr_pollset_add failure"); + apr_thread_mutex_lock(timeout_mutex); + TO_QUEUE_REMOVE(*q, cs); + apr_thread_mutex_unlock(timeout_mutex); + apr_socket_close(cs->pfd.desc.s); + apr_pool_clear(cs->p); + ap_push_pool(worker_queue_info, cs->p); + return 0; } } return 1; @@ -880,7 +831,7 @@ static int start_lingering_close(event_conn_state_t *cs, ap_equeue_t *eq) * Pre-condition: cs is not in any timeout queue and not in the pollset * return: irrelevant (need same prototype as start_lingering_close) */ -static int stop_lingering_close(event_conn_state_t *cs, ap_equeue_t *eq) +static int stop_lingering_close(event_conn_state_t *cs) { apr_status_t rv; apr_socket_t *csd = ap_get_conn_socket(cs->c); @@ -902,9 +853,7 @@ static int stop_lingering_close(event_conn_state_t *cs, ap_equeue_t *eq) * 0 if it is still open and waiting for some event */ static int process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * sock, - event_conn_state_t * cs, - ap_equeue_t *eq, - int my_child_num, + event_conn_state_t * cs, int my_child_num, int my_thread_num) { conn_rec *c; @@ -938,6 +887,7 @@ static int process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * sock pt->type = PT_CSD; pt->baton = cs; cs->pfd.client_data = pt; + TO_QUEUE_ELEM_INIT(cs); ap_update_vhost_given_ip(c); @@ -1014,17 +964,12 @@ read_request: * Set a write timeout for this connection, and let the * event thread poll for writeability. */ - pollset_op_t *v = ap_equeue_writer_value(eq); - cs->expiration_time = ap_server_conf->timeout + apr_time_now(); + apr_thread_mutex_lock(timeout_mutex); + TO_QUEUE_APPEND(write_completion_q, cs); cs->pfd.reqevents = APR_POLLOUT | APR_POLLHUP | APR_POLLERR; - - v->cs = cs; - v->timeout_type = TIMEOUT_WRITE_COMPLETION; - v->tag = "process_socket(write_completion)"; - - ap_equeue_writer_onward(eq); - apr_pollset_wakeup(event_pollset); + rc = apr_pollset_add(event_pollset, &cs->pfd); + apr_thread_mutex_unlock(timeout_mutex); return 1; } else if (c->keepalive != AP_CONN_KEEPALIVE || c->aborted || @@ -1041,12 +986,11 @@ read_request: } if (cs->pub.state == CONN_STATE_LINGER) { - if (!start_lingering_close(cs, eq)) { + if (!start_lingering_close(cs)) return 0; - } } else if (cs->pub.state == CONN_STATE_CHECK_REQUEST_LINE_READABLE) { - pollset_op_t *v; + apr_status_t rc; /* It greatly simplifies the logic to use a single timeout value here * because the new element can just be added to the end of the list and @@ -1058,15 +1002,19 @@ read_request: */ cs->expiration_time = ap_server_conf->keep_alive_timeout + apr_time_now(); + apr_thread_mutex_lock(timeout_mutex); + TO_QUEUE_APPEND(keepalive_q, cs); /* Add work to pollset. */ - v = ap_equeue_writer_value(eq); - v->timeout_type = TIMEOUT_KEEPALIVE; - v->cs = cs; cs->pfd.reqevents = APR_POLLIN; - v->tag = "process_socket(keepalive)"; - ap_equeue_writer_onward(eq); - apr_pollset_wakeup(event_pollset); + rc = apr_pollset_add(event_pollset, &cs->pfd); + apr_thread_mutex_unlock(timeout_mutex); + + if (rc != APR_SUCCESS) { + ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf, + "process_socket: apr_pollset_add failure"); + AP_DEBUG_ASSERT(rc == APR_SUCCESS); + } } return 1; } @@ -1354,6 +1302,7 @@ static void process_lingering_close(event_conn_state_t *cs, const apr_pollfd_t * return; } + apr_thread_mutex_lock(timeout_mutex); rv = apr_pollset_remove(event_pollset, pfd); AP_DEBUG_ASSERT(rv == APR_SUCCESS); @@ -1361,6 +1310,7 @@ static void process_lingering_close(event_conn_state_t *cs, const apr_pollfd_t * AP_DEBUG_ASSERT(rv == APR_SUCCESS); TO_QUEUE_REMOVE(*q, cs); + apr_thread_mutex_unlock(timeout_mutex); TO_QUEUE_ELEM_INIT(cs); apr_pool_clear(cs->p); @@ -1373,7 +1323,7 @@ static void process_lingering_close(event_conn_state_t *cs, const apr_pollfd_t * */ static void process_timeout_queue(struct timeout_queue *q, apr_time_t timeout_time, - int (*func)(event_conn_state_t *, ap_equeue_t *eq)) + int (*func)(event_conn_state_t *)) { int count = 0; event_conn_state_t *first, *cs, *last; @@ -1401,13 +1351,15 @@ static void process_timeout_queue(struct timeout_queue *q, APR_RING_UNSPLICE(first, last, timeout_list); AP_DEBUG_ASSERT(q->count >= count); q->count -= count; + apr_thread_mutex_unlock(timeout_mutex); while (count) { cs = APR_RING_NEXT(first, timeout_list); TO_QUEUE_ELEM_INIT(first); - func(first, NULL); + func(first); first = cs; count--; } + apr_thread_mutex_lock(timeout_mutex); } static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) @@ -1474,12 +1426,14 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) /* trace log status every second */ if (now - last_log > apr_time_from_msec(1000)) { last_log = now; + apr_thread_mutex_lock(timeout_mutex); ap_log_error(APLOG_MARK, APLOG_TRACE6, 0, ap_server_conf, "connections: %d (write-completion: %d " "keep-alive: %d lingering: %d)", connection_count, write_completion_q.count, keepalive_q.count, linger_q.count + short_linger_q.count); + apr_thread_mutex_unlock(timeout_mutex); } } @@ -1505,13 +1459,16 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) } #endif rc = apr_pollset_poll(event_pollset, timeout_interval, &num, &out_pfd); - if (rc != APR_SUCCESS - && !APR_STATUS_IS_EINTR(rc) - && !APR_STATUS_IS_TIMEUP(rc)) { - ap_log_error(APLOG_MARK, APLOG_CRIT, rc, ap_server_conf, - "apr_pollset_poll failed. Attempting to " - "shutdown process gracefully"); - signal_threads(ST_GRACEFUL); + if (rc != APR_SUCCESS) { + if (APR_STATUS_IS_EINTR(rc)) { + continue; + } + if (!APR_STATUS_IS_TIMEUP(rc)) { + ap_log_error(APLOG_MARK, APLOG_CRIT, rc, ap_server_conf, + "apr_pollset_poll failed. Attempting to " + "shutdown process gracefully"); + signal_threads(ST_GRACEFUL); + } } if (listener_may_exit) { @@ -1544,7 +1501,7 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) /* one of the sockets is readable */ struct timeout_queue *remove_from_q = &write_completion_q; int blocking = 1; - cs = (event_conn_state_t *)pt->baton; + cs = (event_conn_state_t *) pt->baton; switch (cs->pub.state) { case CONN_STATE_CHECK_REQUEST_LINE_READABLE: cs->pub.state = CONN_STATE_READ_REQUEST_LINE; @@ -1555,6 +1512,7 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) case CONN_STATE_WRITE_COMPLETION: get_worker(&have_idle_worker, blocking, &workers_were_busy); + apr_thread_mutex_lock(timeout_mutex); TO_QUEUE_REMOVE(*remove_from_q, cs); rc = apr_pollset_remove(event_pollset, &cs->pfd); @@ -1567,17 +1525,19 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) if (rc != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rc)) { ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf, "pollset remove failed"); - start_lingering_close(cs, NULL); + apr_thread_mutex_unlock(timeout_mutex); + start_lingering_close(cs); break; } + apr_thread_mutex_unlock(timeout_mutex); TO_QUEUE_ELEM_INIT(cs); /* If we didn't get a worker immediately for a keep-alive * request, we close the connection, so that the client can * re-connect to a different process. */ if (!have_idle_worker) { - start_lingering_close(cs, NULL); + start_lingering_close(cs); break; } rc = push2worker(out_pfd, event_pollset); @@ -1602,35 +1562,35 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) } } else if (pt->type == PT_ACCEPT) { - int skip_accept = 0; - int connection_count_local = connection_count; - /* A Listener Socket is ready for an accept() */ if (workers_were_busy) { - skip_accept = 1; + if (!listeners_disabled) + disable_listensocks(process_slot); + listeners_disabled = 1; ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, "All workers busy, not accepting new conns" "in this process"); } - else if (listeners_disabled) { - listeners_disabled = 0; - enable_listensocks(process_slot); - } - else if (connection_count_local > threads_per_child + else if (apr_atomic_read32(&connection_count) > threads_per_child + ap_queue_info_get_idlers(worker_queue_info) * worker_factor / WORKER_FACTOR_SCALE) { - skip_accept = 1; + if (!listeners_disabled) + disable_listensocks(process_slot); ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, "Too many open connections (%u), " "not accepting new conns in this process", - connection_count_local); + apr_atomic_read32(&connection_count)); ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf, "Idle workers: %u", ap_queue_info_get_idlers(worker_queue_info)); + listeners_disabled = 1; } - - if (skip_accept == 0) { + else if (listeners_disabled) { + listeners_disabled = 0; + enable_listensocks(process_slot); + } + if (!listeners_disabled) { lr = (ap_listen_rec *) pt->baton; ap_pop_pool(&ptrans, worker_queue_info); @@ -1701,20 +1661,6 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) num--; } /* while for processing poll */ - { - /* TODO: break out to separate function */ - int i; - - for (i = 0; i < threads_per_child; i++) { - ap_equeue_t *eq = worker_equeues[i]; - pollset_op_t *op = NULL; - - while ((op = ap_equeue_reader_next(eq)) != NULL) { - process_pollop(op); - } - } - } - /* XXX possible optimization: stash the current time for use as * r->request_time for new requests */ @@ -1725,6 +1671,7 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) timeout_time = now + TIMEOUT_FUDGE_FACTOR; /* handle timed out sockets */ + apr_thread_mutex_lock(timeout_mutex); /* Step 1: keepalive timeouts */ /* If all workers are busy, we kill older keep-alive connections so that they @@ -1754,6 +1701,7 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) ps->write_completion = write_completion_q.count; ps->lingering_close = linger_q.count + short_linger_q.count; ps->keep_alive = keepalive_q.count; + apr_thread_mutex_unlock(timeout_mutex); ps->connections = apr_atomic_read32(&connection_count); /* XXX: should count CONN_STATE_SUSPENDED and set ps->suspended */ @@ -1797,7 +1745,6 @@ static void *APR_THREAD_FUNC worker_thread(apr_thread_t * thd, void *dummy) apr_status_t rv; int is_idle = 0; timer_event_t *te = NULL; - ap_equeue_t *eq = worker_equeues[thread_slot]; free(ti); @@ -1870,7 +1817,7 @@ static void *APR_THREAD_FUNC worker_thread(apr_thread_t * thd, void *dummy) else { is_idle = 0; worker_sockets[thread_slot] = csd; - rv = process_socket(thd, ptrans, csd, cs, eq, process_slot, thread_slot); + rv = process_socket(thd, ptrans, csd, cs, process_slot, thread_slot); if (!rv) { requests_this_child--; } @@ -1967,32 +1914,33 @@ static void *APR_THREAD_FUNC start_threads(apr_thread_t * thd, void *dummy) clean_child_exit(APEXIT_CHILDFATAL); } + /* Create the timeout mutex and main pollset before the listener + * thread starts. + */ + rv = apr_thread_mutex_create(&timeout_mutex, APR_THREAD_MUTEX_DEFAULT, + pchild); + if (rv != APR_SUCCESS) { + ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, + "creation of the timeout mutex failed."); + clean_child_exit(APEXIT_CHILDFATAL); + } + /* Create the main pollset */ rv = apr_pollset_create(&event_pollset, threads_per_child, /* XXX don't we need more, to handle * connections in K-A or lingering * close? */ - pchild, APR_POLLSET_WAKEABLE|APR_POLLSET_NOCOPY); + pchild, APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY); if (rv != APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, - "apr_pollset_create failed; check system or user limits"); + "apr_pollset_create with Thread Safety failed."); clean_child_exit(APEXIT_CHILDFATAL); } worker_sockets = apr_pcalloc(pchild, threads_per_child * sizeof(apr_socket_t *)); - worker_equeues = apr_palloc(pchild, threads_per_child * sizeof(ap_equeue_t*)); - - for (i = 0; i < threads_per_child; i++) { - ap_equeue_t* eq = NULL; - /* TODO: research/test optimal size of queue here */ - ap_equeue_create(pchild, 16, sizeof(pollset_op_t), &eq); - /* same as thread ID */ - worker_equeues[i] = eq; - } - loops = prev_threads_created = 0; while (1) { /* threads_per_child does not include the listener thread */ @@ -2889,10 +2837,12 @@ static int event_pre_config(apr_pool_t * pconf, apr_pool_t * plog, ++retained->module_loads; if (retained->module_loads == 2) { rv = apr_pollset_create(&event_pollset, 1, plog, - APR_POLLSET_WAKEABLE|APR_POLLSET_NOCOPY); + APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY); if (rv != APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL, APLOGNO(00495) - "apr_pollset_create failed; check system or user limits"); + "Couldn't create a Thread Safe Pollset. " + "Is it supported on your platform?" + "Also check system or user limits!"); return HTTP_INTERNAL_SERVER_ERROR; } apr_pollset_destroy(event_pollset); |