diff options
Diffstat (limited to 'lib/thread.c')
-rw-r--r-- | lib/thread.c | 377 |
1 files changed, 268 insertions, 109 deletions
diff --git a/lib/thread.c b/lib/thread.c index e707fc584..d4ed5d1a0 100644 --- a/lib/thread.c +++ b/lib/thread.c @@ -41,7 +41,7 @@ DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats") #include <mach/mach_time.h> #endif -/* Relative time, since startup */ +static pthread_mutex_t cpu_record_mtx = PTHREAD_MUTEX_INITIALIZER; static struct hash *cpu_record = NULL; static unsigned long @@ -137,9 +137,14 @@ cpu_record_print(struct vty *vty, thread_type filter) vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs"); vty_out(vty, " Avg uSec Max uSecs"); vty_out(vty, " Type Thread%s", VTY_NEWLINE); - hash_iterate(cpu_record, - (void(*)(struct hash_backet*,void*))cpu_record_hash_print, - args); + + pthread_mutex_lock (&cpu_record_mtx); + { + hash_iterate(cpu_record, + (void(*)(struct hash_backet*,void*))cpu_record_hash_print, + args); + } + pthread_mutex_unlock (&cpu_record_mtx); if (tmp.total_calls > 0) vty_out_cpu_thread_history(vty, &tmp); @@ -216,16 +221,25 @@ cpu_record_hash_clear (struct hash_backet *bucket, if ( !(a->types & *filter) ) return; - hash_release (cpu_record, bucket->data); + pthread_mutex_lock (&cpu_record_mtx); + { + hash_release (cpu_record, bucket->data); + } + pthread_mutex_unlock (&cpu_record_mtx); } static void cpu_record_clear (thread_type filter) { thread_type *tmp = &filter; - hash_iterate (cpu_record, - (void (*) (struct hash_backet*,void*)) cpu_record_hash_clear, - tmp); + + pthread_mutex_lock (&cpu_record_mtx); + { + hash_iterate (cpu_record, + (void (*) (struct hash_backet*,void*)) cpu_record_hash_clear, + tmp); + } + pthread_mutex_unlock (&cpu_record_mtx); } DEFUN (clear_thread_cpu, @@ -326,16 +340,20 @@ thread_master_create (void) getrlimit(RLIMIT_NOFILE, &limit); - if (cpu_record == NULL) - cpu_record - = hash_create ((unsigned int (*) (void *))cpu_record_hash_key, - (int (*) (const void *, const void *))cpu_record_hash_cmp); + pthread_mutex_lock (&cpu_record_mtx); + { + if (cpu_record == NULL) + cpu_record = hash_create ((unsigned int (*) (void *))cpu_record_hash_key, + (int (*) (const void *, const void *)) + cpu_record_hash_cmp); + } + pthread_mutex_unlock (&cpu_record_mtx); rv = XCALLOC (MTYPE_THREAD_MASTER, sizeof (struct thread_master)); if (rv == NULL) - { - return NULL; - } + return NULL; + + pthread_mutex_init (&rv->mtx, NULL); rv->fd_limit = (int)limit.rlim_cur; rv->read = XCALLOC (MTYPE_THREAD, sizeof (struct thread *) * rv->fd_limit); @@ -358,6 +376,8 @@ thread_master_create (void) rv->background = pqueue_create(); rv->timer->cmp = rv->background->cmp = thread_timer_cmp; rv->timer->update = rv->background->update = thread_timer_update; + rv->spin = true; + rv->handle_signals = true; #if defined(HAVE_POLL) rv->handler.pfdsize = rv->fd_limit; @@ -498,11 +518,16 @@ thread_queue_free (struct thread_master *m, struct pqueue *queue) void thread_master_free_unused (struct thread_master *m) { - struct thread *t; - while ((t = thread_trim_head(&m->unuse)) != NULL) - { - XFREE(MTYPE_THREAD, t); - } + pthread_mutex_lock (&m->mtx); + { + struct thread *t; + while ((t = thread_trim_head(&m->unuse)) != NULL) + { + pthread_mutex_destroy (&t->mtx); + XFREE(MTYPE_THREAD, t); + } + } + pthread_mutex_unlock (&m->mtx); } /* Stop thread scheduler. */ @@ -516,25 +541,37 @@ thread_master_free (struct thread_master *m) thread_list_free (m, &m->ready); thread_list_free (m, &m->unuse); thread_queue_free (m, m->background); + pthread_mutex_destroy (&m->mtx); #if defined(HAVE_POLL) XFREE (MTYPE_THREAD_MASTER, m->handler.pfds); #endif XFREE (MTYPE_THREAD_MASTER, m); - if (cpu_record) - { - hash_clean (cpu_record, cpu_record_hash_free); - hash_free (cpu_record); - cpu_record = NULL; - } + pthread_mutex_lock (&cpu_record_mtx); + { + if (cpu_record) + { + hash_clean (cpu_record, cpu_record_hash_free); + hash_free (cpu_record); + cpu_record = NULL; + } + } + pthread_mutex_unlock (&cpu_record_mtx); } /* Return remain time in second. */ unsigned long thread_timer_remain_second (struct thread *thread) { - int64_t remain = monotime_until(&thread->u.sands, NULL) / 1000000LL; + int64_t remain; + + pthread_mutex_lock (&thread->mtx); + { + remain = monotime_until(&thread->u.sands, NULL) / 1000000LL; + } + pthread_mutex_unlock (&thread->mtx); + return remain < 0 ? 0 : remain; } @@ -545,7 +582,11 @@ struct timeval thread_timer_remain(struct thread *thread) { struct timeval remain; - monotime_until(&thread->u.sands, &remain); + pthread_mutex_lock (&thread->mtx); + { + monotime_until(&thread->u.sands, &remain); + } + pthread_mutex_unlock (&thread->mtx); return remain; } @@ -560,8 +601,11 @@ thread_get (struct thread_master *m, u_char type, if (! thread) { thread = XCALLOC (MTYPE_THREAD, sizeof (struct thread)); + /* mutex only needs to be initialized at struct creation. */ + pthread_mutex_init (&thread->mtx, NULL); m->alloc++; } + thread->type = type; thread->add_type = type; thread->master = m; @@ -584,8 +628,12 @@ thread_get (struct thread_master *m, u_char type, { tmp.func = func; tmp.funcname = funcname; - thread->hist = hash_get (cpu_record, &tmp, - (void * (*) (void *))cpu_record_hash_alloc); + pthread_mutex_lock (&cpu_record_mtx); + { + thread->hist = hash_get (cpu_record, &tmp, + (void * (*) (void *))cpu_record_hash_alloc); + } + pthread_mutex_unlock (&cpu_record_mtx); } thread->hist->total_active++; thread->func = func; @@ -650,15 +698,45 @@ static int fd_select (struct thread_master *m, int size, thread_fd_set *read, thread_fd_set *write, thread_fd_set *except, struct timeval *timer_wait) { int num; + + /* If timer_wait is null here, that means either select() or poll() should + * block indefinitely, unless the thread_master has overriden it. select() + * and poll() differ in the timeout values they interpret as an indefinite + * block; select() requires a null pointer, while poll takes a millisecond + * value of -1. + * + * The thread_master owner has the option of overriding the default behavior + * by setting ->selectpoll_timeout. If the value is positive, it specifies + * the maximum number of milliseconds to wait. If the timeout is -1, it + * specifies that we should never wait and always return immediately even if + * no event is detected. If the value is zero, the behavior is default. + */ + #if defined(HAVE_POLL) - /* recalc timeout for poll. Attention NULL pointer is no timeout with - select, where with poll no timeount is -1 */ int timeout = -1; - if (timer_wait != NULL) + + if (timer_wait != NULL && m->selectpoll_timeout == 0) // use the default value timeout = (timer_wait->tv_sec*1000) + (timer_wait->tv_usec/1000); + else if (m->selectpoll_timeout > 0) // use the user's timeout + timeout = m->selectpoll_timeout; + else if (m->selectpoll_timeout < 0) // effect a poll (return immediately) + timeout = 0; num = poll (m->handler.pfds, m->handler.pfdcount + m->handler.pfdcountsnmp, timeout); #else + struct timeval timeout; + if (m->selectpoll_timeout > 0) // use the user's timeout + { + timeout.tv_sec = m->selectpoll_timeout / 1000; + timeout.tv_usec = (m->selectpoll_timeout % 1000) * 1000; + timer_wait = &timeout; + } + else if (m->selectpoll_timeout < 0) // effect a poll (return immediately) + { + timeout.tv_sec = 0; + timeout.tv_usec = 0; + timer_wait = &timeout; + } num = select (size, read, write, except, timer_wait); #endif @@ -703,36 +781,43 @@ funcname_thread_add_read_write (int dir, struct thread_master *m, { struct thread *thread = NULL; -#if !defined(HAVE_POLL) - thread_fd_set *fdset = NULL; - if (dir == THREAD_READ) - fdset = &m->handler.readfd; - else - fdset = &m->handler.writefd; -#endif - + pthread_mutex_lock (&m->mtx); + { #if defined (HAVE_POLL) - thread = generic_thread_add(m, func, arg, fd, dir, debugargpass); - - if (thread == NULL) - return NULL; + thread = generic_thread_add(m, func, arg, fd, dir, debugargpass); #else - if (FD_ISSET (fd, fdset)) - { - zlog_warn ("There is already %s fd [%d]", - (dir == THREAD_READ) ? "read" : "write", fd); - return NULL; - } + thread_fd_set *fdset = NULL; + if (dir == THREAD_READ) + fdset = &m->handler.readfd; + else + fdset = &m->handler.writefd; - FD_SET (fd, fdset); - thread = thread_get (m, dir, func, arg, debugargpass); + if (FD_ISSET (fd, fdset)) + { + zlog_warn ("There is already %s fd [%d]", + (dir == THREAD_READ) ? "read" : "write", fd); + } + else + { + FD_SET (fd, fdset); + thread = thread_get (m, dir, func, arg, debugargpass); + } #endif - thread->u.fd = fd; - if (dir == THREAD_READ) - thread_add_fd (m->read, thread); - else - thread_add_fd (m->write, thread); + if (thread) + { + pthread_mutex_lock (&thread->mtx); + { + thread->u.fd = fd; + if (dir == THREAD_READ) + thread_add_fd (m->read, thread); + else + thread_add_fd (m->write, thread); + } + pthread_mutex_unlock (&thread->mtx); + } + } + pthread_mutex_unlock (&m->mtx); return thread; } @@ -753,13 +838,21 @@ funcname_thread_add_timer_timeval (struct thread_master *m, assert (type == THREAD_TIMER || type == THREAD_BACKGROUND); assert (time_relative); - queue = ((type == THREAD_TIMER) ? m->timer : m->background); - thread = thread_get (m, type, func, arg, debugargpass); + pthread_mutex_lock (&m->mtx); + { + queue = ((type == THREAD_TIMER) ? m->timer : m->background); + thread = thread_get (m, type, func, arg, debugargpass); - monotime(&thread->u.sands); - timeradd(&thread->u.sands, time_relative, &thread->u.sands); + pthread_mutex_lock (&thread->mtx); + { + monotime(&thread->u.sands); + timeradd(&thread->u.sands, time_relative, &thread->u.sands); + pqueue_enqueue(thread, queue); + } + pthread_mutex_unlock (&thread->mtx); + } + pthread_mutex_unlock (&m->mtx); - pqueue_enqueue(thread, queue); return thread; } @@ -847,9 +940,17 @@ funcname_thread_add_event (struct thread_master *m, assert (m != NULL); - thread = thread_get (m, THREAD_EVENT, func, arg, debugargpass); - thread->u.val = val; - thread_list_add (&m->event, thread); + pthread_mutex_lock (&m->mtx); + { + thread = thread_get (m, THREAD_EVENT, func, arg, debugargpass); + pthread_mutex_lock (&thread->mtx); + { + thread->u.val = val; + thread_list_add (&m->event, thread); + } + pthread_mutex_unlock (&thread->mtx); + } + pthread_mutex_unlock (&m->mtx); return thread; } @@ -880,14 +981,22 @@ thread_cancel_read_or_write (struct thread *thread, short int state) fd_clear_read_write (thread); } -/* Cancel thread from scheduler. */ +/** + * Cancel thread from scheduler. + * + * This function is *NOT* MT-safe. DO NOT call it from any other pthread except + * the one which owns thread->master. + */ void thread_cancel (struct thread *thread) { struct thread_list *list = NULL; struct pqueue *queue = NULL; struct thread **thread_array = NULL; - + + pthread_mutex_lock (&thread->master->mtx); + pthread_mutex_lock (&thread->mtx); + switch (thread->type) { case THREAD_READ: @@ -919,15 +1028,14 @@ thread_cancel (struct thread *thread) queue = thread->master->background; break; default: - return; + goto done; break; } if (queue) { assert(thread->index >= 0); - assert(thread == queue->array[thread->index]); - pqueue_remove_at(thread->index, queue); + pqueue_remove (thread, queue); } else if (list) { @@ -943,6 +1051,10 @@ thread_cancel (struct thread *thread) } thread_add_unuse (thread->master, thread); + +done: + pthread_mutex_unlock (&thread->mtx); + pthread_mutex_unlock (&thread->master->mtx); } /* Delete all events which has argument value arg. */ @@ -951,39 +1063,48 @@ thread_cancel_event (struct thread_master *m, void *arg) { unsigned int ret = 0; struct thread *thread; + struct thread *t; - thread = m->event.head; - while (thread) - { - struct thread *t; - - t = thread; - thread = t->next; - - if (t->arg == arg) + pthread_mutex_lock (&m->mtx); + { + thread = m->event.head; + while (thread) + { + t = thread; + pthread_mutex_lock (&t->mtx); { - ret++; - thread_list_delete (&m->event, t); - thread_add_unuse (m, t); + thread = t->next; + + if (t->arg == arg) + { + ret++; + thread_list_delete (&m->event, t); + thread_add_unuse (m, t); + } } - } - - /* thread can be on the ready list too */ - thread = m->ready.head; - while (thread) - { - struct thread *t; - - t = thread; - thread = t->next; + pthread_mutex_unlock (&t->mtx); + } - if (t->arg == arg) + /* thread can be on the ready list too */ + thread = m->ready.head; + while (thread) + { + t = thread; + pthread_mutex_lock (&t->mtx); { - ret++; - thread_list_delete (&m->ready, t); - thread_add_unuse (m, t); + thread = t->next; + + if (t->arg == arg) + { + ret++; + thread_list_delete (&m->ready, t); + thread_add_unuse (m, t); + } } - } + pthread_mutex_unlock (&t->mtx); + } + } + pthread_mutex_unlock (&m->mtx); return ret; } @@ -1143,18 +1264,24 @@ thread_fetch (struct thread_master *m, struct thread *fetch) struct timeval *timer_wait = &timer_val; struct timeval *timer_wait_bg; - while (1) + do { int num = 0; /* Signals pre-empt everything */ - quagga_sigevent_process (); + if (m->handle_signals) + quagga_sigevent_process (); + pthread_mutex_lock (&m->mtx); /* Drain the ready queue of already scheduled jobs, before scheduling * more. */ if ((thread = thread_trim_head (&m->ready)) != NULL) - return thread_run (m, thread, fetch); + { + fetch = thread_run (m, thread, fetch); + pthread_mutex_unlock (&m->mtx); + return fetch; + } /* To be fair to all kinds of threads, and avoid starvation, we * need to be careful to consider all thread types for scheduling @@ -1194,8 +1321,12 @@ thread_fetch (struct thread_master *m, struct thread *fetch) if (num < 0) { if (errno == EINTR) - continue; /* signal received - process it */ + { + pthread_mutex_unlock (&m->mtx); + continue; /* signal received - process it */ + } zlog_warn ("select() error: %s", safe_strerror (errno)); + pthread_mutex_unlock (&m->mtx); return NULL; } @@ -1215,15 +1346,28 @@ thread_fetch (struct thread_master *m, struct thread *fetch) list at this time. If this is code is uncommented, then background timer threads will not run unless there is nothing else to do. */ if ((thread = thread_trim_head (&m->ready)) != NULL) - return thread_run (m, thread, fetch); + { + fetch = thread_run (m, thread, fetch); + pthread_mutex_unlock (&m->mtx); + return fetch; + } #endif /* Background timer/events, lowest priority */ thread_timer_process (m->background, &now); if ((thread = thread_trim_head (&m->ready)) != NULL) - return thread_run (m, thread, fetch); - } + { + fetch = thread_run (m, thread, fetch); + pthread_mutex_unlock (&m->mtx); + return fetch; + } + + pthread_mutex_unlock (&m->mtx); + + } while (m->spin); + + return NULL; } unsigned long @@ -1248,13 +1392,23 @@ thread_consumed_time (RUSAGE_T *now, RUSAGE_T *start, unsigned long *cputime) int thread_should_yield (struct thread *thread) { - return monotime_since(&thread->real, NULL) > (int64_t)thread->yield; + int result; + pthread_mutex_lock (&thread->mtx); + { + result = monotime_since(&thread->real, NULL) > (int64_t)thread->yield; + } + pthread_mutex_unlock (&thread->mtx); + return result; } void thread_set_yield_time (struct thread *thread, unsigned long yield_time) { - thread->yield = yield_time; + pthread_mutex_lock (&thread->mtx); + { + thread->yield = yield_time; + } + pthread_mutex_unlock (&thread->mtx); } void @@ -1324,6 +1478,7 @@ funcname_thread_execute (struct thread_master *m, memset (&dummy, 0, sizeof (struct thread)); + pthread_mutex_init (&dummy.mtx, NULL); dummy.type = THREAD_EVENT; dummy.add_type = THREAD_EXECUTE; dummy.master = NULL; @@ -1332,8 +1487,12 @@ funcname_thread_execute (struct thread_master *m, tmp.func = dummy.func = func; tmp.funcname = dummy.funcname = funcname; - dummy.hist = hash_get (cpu_record, &tmp, - (void * (*) (void *))cpu_record_hash_alloc); + pthread_mutex_lock (&cpu_record_mtx); + { + dummy.hist = hash_get (cpu_record, &tmp, + (void * (*) (void *))cpu_record_hash_alloc); + } + pthread_mutex_unlock (&cpu_record_mtx); dummy.schedfrom = schedfrom; dummy.schedfrom_line = fromln; |