summaryrefslogtreecommitdiffstats
path: root/server/mpm/experimental/leader/leader.c
diff options
context:
space:
mode:
authorBrian Pane <brianp@apache.org>2002-09-01 07:55:41 +0200
committerBrian Pane <brianp@apache.org>2002-09-01 07:55:41 +0200
commit2cbb1156c56960c771a7419a45acaccdd5ff8a60 (patch)
tree0b9795a39d84e6ab630893103516c0e5721ac5d8 /server/mpm/experimental/leader/leader.c
parentAdded support for httpd -k option (diff)
downloadapache2-2cbb1156c56960c771a7419a45acaccdd5ff8a60.tar.xz
apache2-2cbb1156c56960c771a7419a45acaccdd5ff8a60.zip
Switched back to atomic compare-and-swap instead of mutexes
to synchronize updates to the stack of idle workers git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@96606 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to '')
-rw-r--r--server/mpm/experimental/leader/leader.c203
1 files changed, 89 insertions, 114 deletions
diff --git a/server/mpm/experimental/leader/leader.c b/server/mpm/experimental/leader/leader.c
index 55586f8fe8..49343a8d29 100644
--- a/server/mpm/experimental/leader/leader.c
+++ b/server/mpm/experimental/leader/leader.c
@@ -111,6 +111,7 @@
#include <signal.h>
#include <limits.h> /* for INT_MAX */
+#include "apr_atomic.h"
/* Limit on the total --- clients will be locked out if more servers than
* this are needed. It is intended solely to keep the server from crashing
@@ -278,12 +279,15 @@ static worker_wakeup_info *worker_wakeup_create(apr_pool_t *pool)
/* Structure used to hold a stack of idle worker threads
*/
typedef struct {
- apr_thread_mutex_t *mutex;
- int no_listener;
- int terminated;
- worker_wakeup_info **stack;
- apr_size_t nelts;
- apr_size_t nalloc;
+ /* 'state' consists of several fields concatenated into a
+ * single 32-bit int for use with the apr_atomic_cas() API:
+ * state & STACK_FIRST is the thread ID of the first thread
+ * in a linked list of idle threads
+ * state & STACK_TERMINATED indicates whether the proc is shutting down
+ * state & STACK_NO_LISTENER indicates whether the process has
+ * no current listener thread
+ */
+ apr_uint32_t state;
} worker_stack;
#define STACK_FIRST 0xffff
@@ -291,94 +295,82 @@ typedef struct {
#define STACK_TERMINATED 0x10000
#define STACK_NO_LISTENER 0x20000
+static worker_wakeup_info **worker_wakeups = NULL;
+
static worker_stack* worker_stack_create(apr_pool_t *pool, apr_size_t max)
{
- apr_status_t rv;
worker_stack *stack = (worker_stack *)apr_palloc(pool, sizeof(*stack));
-
- if ((rv = apr_thread_mutex_create(&stack->mutex, APR_THREAD_MUTEX_DEFAULT,
- pool)) != APR_SUCCESS) {
- return NULL;
- }
- stack->no_listener = 1;
- stack->terminated = 0;
- stack->nelts = 0;
- stack->nalloc = max;
- stack->stack =
- (worker_wakeup_info **)apr_palloc(pool, stack->nalloc *
- sizeof(worker_wakeup_info *));
+ stack->state = STACK_NO_LISTENER | STACK_LIST_END;
return stack;
}
static apr_status_t worker_stack_wait(worker_stack *stack,
- worker_wakeup_info *wakeup)
+ apr_uint32_t worker_id)
{
- apr_status_t rv;
- if ((rv = apr_thread_mutex_lock(stack->mutex)) != APR_SUCCESS) {
- return rv;
- }
- if (stack->terminated) {
- return APR_EINVAL;
- }
- if (stack->no_listener) {
- /* this thread should become the new listener immediately */
- stack->no_listener = 0;
- if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) {
- return rv;
- }
- return APR_SUCCESS;
- }
- else {
- /* push this thread onto the stack of idle workers, and block
- * on the condition variable until awoken
- */
- if (stack->nelts == stack->nalloc) {
- return APR_ENOSPC;
+ worker_wakeup_info *wakeup = worker_wakeups[worker_id];
+
+ while (1) {
+ apr_uint32_t state = stack->state;
+ if (state & (STACK_TERMINATED | STACK_NO_LISTENER)) {
+ if (state & STACK_TERMINATED) {
+ return APR_EINVAL;
+ }
+ if (apr_atomic_cas(&(stack->state), STACK_LIST_END, state) !=
+ state) {
+ continue;
+ }
+ else {
+ return APR_SUCCESS;
+ }
}
- stack->stack[stack->nelts++] = wakeup;
- if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) {
- return rv;
+ wakeup->next = state;
+ if (apr_atomic_cas(&(stack->state), worker_id, state) != state) {
+ continue;
}
- if ((rv = apr_thread_cond_wait(wakeup->cond, wakeup->mutex)) !=
- APR_SUCCESS) {
- return rv;
+ else {
+ return apr_thread_cond_wait(wakeup->cond, wakeup->mutex);
}
- return APR_SUCCESS;
- }
+ }
}
static apr_status_t worker_stack_awaken_next(worker_stack *stack)
{
- apr_status_t rv;
- if ((rv = apr_thread_mutex_lock(stack->mutex)) != APR_SUCCESS) {
- return rv;
- }
- if (stack->nelts) {
- worker_wakeup_info *wakeup = stack->stack[--stack->nelts];
- if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) {
- return rv;
- }
- /* Acquire and release the idle worker's mutex to ensure
- * that it's actually waiting on its condition variable
- */
- if ((rv = apr_thread_mutex_lock(wakeup->mutex)) != APR_SUCCESS) {
- return rv;
- }
- if ((rv = apr_thread_mutex_unlock(wakeup->mutex)) != APR_SUCCESS) {
- return rv;
- }
- if ((rv = apr_thread_cond_signal(wakeup->cond)) != APR_SUCCESS) {
- apr_thread_mutex_unlock(stack->mutex);
- return rv;
+
+ while (1) {
+ apr_uint32_t state = stack->state;
+ apr_uint32_t first = state & STACK_FIRST;
+ if (first == STACK_LIST_END) {
+ if (apr_atomic_cas(&(stack->state), state | STACK_NO_LISTENER,
+ state) != state) {
+ continue;
+ }
+ else {
+ return APR_SUCCESS;
+ }
}
- }
- else {
- stack->no_listener = 1;
- if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) {
- return rv;
+ else {
+ worker_wakeup_info *wakeup = worker_wakeups[first];
+ if (apr_atomic_cas(&(stack->state), (state ^ first) | wakeup->next,
+ state) != state) {
+ continue;
+ }
+ else {
+ /* Acquire and release the idle worker's mutex to ensure
+ * that it's actually waiting on its condition variable
+ */
+ apr_status_t rv;
+ if ((rv = apr_thread_mutex_lock(wakeup->mutex)) !=
+ APR_SUCCESS) {
+ return rv;
+ }
+ if ((rv = apr_thread_mutex_unlock(wakeup->mutex)) !=
+ APR_SUCCESS) {
+ return rv;
+ }
+ return apr_thread_cond_signal(wakeup->cond);
+ }
}
}
- return APR_SUCCESS;
}
static apr_status_t worker_stack_term(worker_stack *stack)
@@ -386,32 +378,18 @@ static apr_status_t worker_stack_term(worker_stack *stack)
int i;
apr_status_t rv;
- if ((rv = apr_thread_mutex_lock(stack->mutex)) != APR_SUCCESS) {
- return rv;
- }
- if (stack->terminated) {
- return apr_thread_mutex_unlock(stack->mutex);
- }
- stack->terminated = 1;
- while (stack->nelts) {
- worker_wakeup_info *wakeup = stack->stack[--stack->nelts];
- /* Acquire and release the idle worker's mutex to ensure
- * that it's actually waiting on its condition variable
- */
- if ((rv = apr_thread_mutex_lock(wakeup->mutex)) != APR_SUCCESS) {
- return rv;
- }
- if ((rv = apr_thread_mutex_unlock(wakeup->mutex)) != APR_SUCCESS) {
- return rv;
+ while (1) {
+ apr_uint32_t state = stack->state;
+ if (apr_atomic_cas(&(stack->state), state | STACK_TERMINATED,
+ state) == state) {
+ break;
}
- if ((rv = apr_thread_cond_signal(wakeup->cond)) != APR_SUCCESS) {
- apr_thread_mutex_unlock(stack->mutex);
+ }
+ for (i = 0; i < ap_threads_per_child; i++) {
+ if ((rv = worker_stack_awaken_next(stack)) != APR_SUCCESS) {
return rv;
}
}
- if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) {
- return rv;
- }
return APR_SUCCESS;
}
@@ -584,11 +562,6 @@ static void sig_term(int sig)
}
}
-static void child_sig_term(int sig)
-{
- ap_start_shutdown();
-}
-
static void restart(int sig)
{
ap_start_restart(sig == AP_SIG_GRACEFUL);
@@ -792,19 +765,11 @@ static void unblock_signal(int sig)
#endif
}
-static void dummy_signal_handler(int sig)
-{
- /* XXX If specifying SIG_IGN is guaranteed to unblock a syscall,
- * then we don't need this goofy function.
- */
-}
-
static void *worker_thread(apr_thread_t *thd, void * dummy)
{
proc_info * ti = dummy;
int process_slot = ti->pid;
int thread_slot = ti->tid;
- worker_wakeup_info *wakeup;
apr_uint32_t my_worker_num = (apr_uint32_t)(ti->tid);
apr_pool_t *tpool = apr_thread_pool_get(thd);
void *csd = NULL;
@@ -831,8 +796,6 @@ static void *worker_thread(apr_thread_t *thd, void * dummy)
for(lr = ap_listeners ; lr != NULL ; lr = lr->next)
apr_poll_socket_add(pollset, lr->sd, APR_POLLIN);
- wakeup = worker_wakeup_create(tpool);
-
/* TODO: Switch to a system where threads reuse the results from earlier
poll calls - manoj */
is_listener = 0;
@@ -842,7 +805,7 @@ static void *worker_thread(apr_thread_t *thd, void * dummy)
SERVER_READY, NULL);
if (!is_listener) {
/* Wait until it's our turn to become the listener */
- if ((rv = worker_stack_wait(idle_worker_stack, wakeup)) !=
+ if ((rv = worker_stack_wait(idle_worker_stack, my_worker_num)) !=
APR_SUCCESS) {
if (rv != APR_EINVAL) {
ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
@@ -1024,15 +987,27 @@ static void * APR_THREAD_FUNC start_threads(apr_thread_t *thd, void *dummy)
clean_child_exit(APEXIT_CHILDFATAL);
}
+ worker_wakeups = (worker_wakeup_info **)
+ apr_palloc(pchild, sizeof(worker_wakeup_info *) *
+ ap_threads_per_child);
+
loops = prev_threads_created = 0;
while (1) {
for (i = 0; i < ap_threads_per_child; i++) {
int status = ap_scoreboard_image->servers[child_num_arg][i].status;
+ worker_wakeup_info *wakeup;
if (status != SERVER_GRACEFUL && status != SERVER_DEAD) {
continue;
}
+ wakeup = worker_wakeup_create(pchild);
+ if (wakeup == NULL) {
+ ap_log_error(APLOG_MARK, APLOG_ALERT|APLOG_NOERRNO, 0,
+ ap_server_conf, "worker_wakeup_create failed");
+ clean_child_exit(APEXIT_CHILDFATAL);
+ }
+ worker_wakeups[threads_created] = wakeup;
my_info = (proc_info *)malloc(sizeof(proc_info));
if (my_info == NULL) {
ap_log_error(APLOG_MARK, APLOG_ALERT, errno, ap_server_conf,