diff options
author | Brian Pane <brianp@apache.org> | 2002-09-01 07:55:41 +0200 |
---|---|---|
committer | Brian Pane <brianp@apache.org> | 2002-09-01 07:55:41 +0200 |
commit | 2cbb1156c56960c771a7419a45acaccdd5ff8a60 (patch) | |
tree | 0b9795a39d84e6ab630893103516c0e5721ac5d8 /server/mpm/experimental/leader/leader.c | |
parent | Added support for httpd -k option (diff) | |
download | apache2-2cbb1156c56960c771a7419a45acaccdd5ff8a60.tar.xz apache2-2cbb1156c56960c771a7419a45acaccdd5ff8a60.zip |
Switched back to atomic compare-and-swap instead of mutexes
to synchronize updates to the stack of idle workers
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@96606 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to '')
-rw-r--r-- | server/mpm/experimental/leader/leader.c | 203 |
1 files changed, 89 insertions, 114 deletions
diff --git a/server/mpm/experimental/leader/leader.c b/server/mpm/experimental/leader/leader.c index 55586f8fe8..49343a8d29 100644 --- a/server/mpm/experimental/leader/leader.c +++ b/server/mpm/experimental/leader/leader.c @@ -111,6 +111,7 @@ #include <signal.h> #include <limits.h> /* for INT_MAX */ +#include "apr_atomic.h" /* Limit on the total --- clients will be locked out if more servers than * this are needed. It is intended solely to keep the server from crashing @@ -278,12 +279,15 @@ static worker_wakeup_info *worker_wakeup_create(apr_pool_t *pool) /* Structure used to hold a stack of idle worker threads */ typedef struct { - apr_thread_mutex_t *mutex; - int no_listener; - int terminated; - worker_wakeup_info **stack; - apr_size_t nelts; - apr_size_t nalloc; + /* 'state' consists of several fields concatenated into a + * single 32-bit int for use with the apr_atomic_cas() API: + * state & STACK_FIRST is the thread ID of the first thread + * in a linked list of idle threads + * state & STACK_TERMINATED indicates whether the proc is shutting down + * state & STACK_NO_LISTENER indicates whether the process has + * no current listener thread + */ + apr_uint32_t state; } worker_stack; #define STACK_FIRST 0xffff @@ -291,94 +295,82 @@ typedef struct { #define STACK_TERMINATED 0x10000 #define STACK_NO_LISTENER 0x20000 +static worker_wakeup_info **worker_wakeups = NULL; + static worker_stack* worker_stack_create(apr_pool_t *pool, apr_size_t max) { - apr_status_t rv; worker_stack *stack = (worker_stack *)apr_palloc(pool, sizeof(*stack)); - - if ((rv = apr_thread_mutex_create(&stack->mutex, APR_THREAD_MUTEX_DEFAULT, - pool)) != APR_SUCCESS) { - return NULL; - } - stack->no_listener = 1; - stack->terminated = 0; - stack->nelts = 0; - stack->nalloc = max; - stack->stack = - (worker_wakeup_info **)apr_palloc(pool, stack->nalloc * - sizeof(worker_wakeup_info *)); + stack->state = STACK_NO_LISTENER | STACK_LIST_END; return stack; } static apr_status_t worker_stack_wait(worker_stack *stack, - worker_wakeup_info *wakeup) + apr_uint32_t worker_id) { - apr_status_t rv; - if ((rv = apr_thread_mutex_lock(stack->mutex)) != APR_SUCCESS) { - return rv; - } - if (stack->terminated) { - return APR_EINVAL; - } - if (stack->no_listener) { - /* this thread should become the new listener immediately */ - stack->no_listener = 0; - if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) { - return rv; - } - return APR_SUCCESS; - } - else { - /* push this thread onto the stack of idle workers, and block - * on the condition variable until awoken - */ - if (stack->nelts == stack->nalloc) { - return APR_ENOSPC; + worker_wakeup_info *wakeup = worker_wakeups[worker_id]; + + while (1) { + apr_uint32_t state = stack->state; + if (state & (STACK_TERMINATED | STACK_NO_LISTENER)) { + if (state & STACK_TERMINATED) { + return APR_EINVAL; + } + if (apr_atomic_cas(&(stack->state), STACK_LIST_END, state) != + state) { + continue; + } + else { + return APR_SUCCESS; + } } - stack->stack[stack->nelts++] = wakeup; - if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) { - return rv; + wakeup->next = state; + if (apr_atomic_cas(&(stack->state), worker_id, state) != state) { + continue; } - if ((rv = apr_thread_cond_wait(wakeup->cond, wakeup->mutex)) != - APR_SUCCESS) { - return rv; + else { + return apr_thread_cond_wait(wakeup->cond, wakeup->mutex); } - return APR_SUCCESS; - } + } } static apr_status_t worker_stack_awaken_next(worker_stack *stack) { - apr_status_t rv; - if ((rv = apr_thread_mutex_lock(stack->mutex)) != APR_SUCCESS) { - return rv; - } - if (stack->nelts) { - worker_wakeup_info *wakeup = stack->stack[--stack->nelts]; - if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) { - return rv; - } - /* Acquire and release the idle worker's mutex to ensure - * that it's actually waiting on its condition variable - */ - if ((rv = apr_thread_mutex_lock(wakeup->mutex)) != APR_SUCCESS) { - return rv; - } - if ((rv = apr_thread_mutex_unlock(wakeup->mutex)) != APR_SUCCESS) { - return rv; - } - if ((rv = apr_thread_cond_signal(wakeup->cond)) != APR_SUCCESS) { - apr_thread_mutex_unlock(stack->mutex); - return rv; + + while (1) { + apr_uint32_t state = stack->state; + apr_uint32_t first = state & STACK_FIRST; + if (first == STACK_LIST_END) { + if (apr_atomic_cas(&(stack->state), state | STACK_NO_LISTENER, + state) != state) { + continue; + } + else { + return APR_SUCCESS; + } } - } - else { - stack->no_listener = 1; - if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) { - return rv; + else { + worker_wakeup_info *wakeup = worker_wakeups[first]; + if (apr_atomic_cas(&(stack->state), (state ^ first) | wakeup->next, + state) != state) { + continue; + } + else { + /* Acquire and release the idle worker's mutex to ensure + * that it's actually waiting on its condition variable + */ + apr_status_t rv; + if ((rv = apr_thread_mutex_lock(wakeup->mutex)) != + APR_SUCCESS) { + return rv; + } + if ((rv = apr_thread_mutex_unlock(wakeup->mutex)) != + APR_SUCCESS) { + return rv; + } + return apr_thread_cond_signal(wakeup->cond); + } } } - return APR_SUCCESS; } static apr_status_t worker_stack_term(worker_stack *stack) @@ -386,32 +378,18 @@ static apr_status_t worker_stack_term(worker_stack *stack) int i; apr_status_t rv; - if ((rv = apr_thread_mutex_lock(stack->mutex)) != APR_SUCCESS) { - return rv; - } - if (stack->terminated) { - return apr_thread_mutex_unlock(stack->mutex); - } - stack->terminated = 1; - while (stack->nelts) { - worker_wakeup_info *wakeup = stack->stack[--stack->nelts]; - /* Acquire and release the idle worker's mutex to ensure - * that it's actually waiting on its condition variable - */ - if ((rv = apr_thread_mutex_lock(wakeup->mutex)) != APR_SUCCESS) { - return rv; - } - if ((rv = apr_thread_mutex_unlock(wakeup->mutex)) != APR_SUCCESS) { - return rv; + while (1) { + apr_uint32_t state = stack->state; + if (apr_atomic_cas(&(stack->state), state | STACK_TERMINATED, + state) == state) { + break; } - if ((rv = apr_thread_cond_signal(wakeup->cond)) != APR_SUCCESS) { - apr_thread_mutex_unlock(stack->mutex); + } + for (i = 0; i < ap_threads_per_child; i++) { + if ((rv = worker_stack_awaken_next(stack)) != APR_SUCCESS) { return rv; } } - if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) { - return rv; - } return APR_SUCCESS; } @@ -584,11 +562,6 @@ static void sig_term(int sig) } } -static void child_sig_term(int sig) -{ - ap_start_shutdown(); -} - static void restart(int sig) { ap_start_restart(sig == AP_SIG_GRACEFUL); @@ -792,19 +765,11 @@ static void unblock_signal(int sig) #endif } -static void dummy_signal_handler(int sig) -{ - /* XXX If specifying SIG_IGN is guaranteed to unblock a syscall, - * then we don't need this goofy function. - */ -} - static void *worker_thread(apr_thread_t *thd, void * dummy) { proc_info * ti = dummy; int process_slot = ti->pid; int thread_slot = ti->tid; - worker_wakeup_info *wakeup; apr_uint32_t my_worker_num = (apr_uint32_t)(ti->tid); apr_pool_t *tpool = apr_thread_pool_get(thd); void *csd = NULL; @@ -831,8 +796,6 @@ static void *worker_thread(apr_thread_t *thd, void * dummy) for(lr = ap_listeners ; lr != NULL ; lr = lr->next) apr_poll_socket_add(pollset, lr->sd, APR_POLLIN); - wakeup = worker_wakeup_create(tpool); - /* TODO: Switch to a system where threads reuse the results from earlier poll calls - manoj */ is_listener = 0; @@ -842,7 +805,7 @@ static void *worker_thread(apr_thread_t *thd, void * dummy) SERVER_READY, NULL); if (!is_listener) { /* Wait until it's our turn to become the listener */ - if ((rv = worker_stack_wait(idle_worker_stack, wakeup)) != + if ((rv = worker_stack_wait(idle_worker_stack, my_worker_num)) != APR_SUCCESS) { if (rv != APR_EINVAL) { ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf, @@ -1024,15 +987,27 @@ static void * APR_THREAD_FUNC start_threads(apr_thread_t *thd, void *dummy) clean_child_exit(APEXIT_CHILDFATAL); } + worker_wakeups = (worker_wakeup_info **) + apr_palloc(pchild, sizeof(worker_wakeup_info *) * + ap_threads_per_child); + loops = prev_threads_created = 0; while (1) { for (i = 0; i < ap_threads_per_child; i++) { int status = ap_scoreboard_image->servers[child_num_arg][i].status; + worker_wakeup_info *wakeup; if (status != SERVER_GRACEFUL && status != SERVER_DEAD) { continue; } + wakeup = worker_wakeup_create(pchild); + if (wakeup == NULL) { + ap_log_error(APLOG_MARK, APLOG_ALERT|APLOG_NOERRNO, 0, + ap_server_conf, "worker_wakeup_create failed"); + clean_child_exit(APEXIT_CHILDFATAL); + } + worker_wakeups[threads_created] = wakeup; my_info = (proc_info *)malloc(sizeof(proc_info)); if (my_info == NULL) { ap_log_error(APLOG_MARK, APLOG_ALERT, errno, ap_server_conf, |