diff options
author | Russ White <russ@riw.us> | 2017-05-02 17:26:20 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-05-02 17:26:20 +0200 |
commit | b2ab6b282ce1569ed348aebe17432094adfe7d24 (patch) | |
tree | e3b42a7e43e98b1ab3e02169ea52ffde11f96b71 | |
parent | Merge pull request #428 from qlyoung/fix-isis-mt (diff) | |
parent | lib: allow nonblocking thread_fetch() (diff) | |
download | frr-b2ab6b282ce1569ed348aebe17432094adfe7d24.tar.xz frr-b2ab6b282ce1569ed348aebe17432094adfe7d24.zip |
Merge pull request #377 from qlyoung/frr-pthreads
lib: MT-safe thread.c + add pthread manager
-rw-r--r-- | lib/Makefile.am | 2 | ||||
-rw-r--r-- | lib/frr_pthread.c | 184 | ||||
-rw-r--r-- | lib/frr_pthread.h | 145 | ||||
-rw-r--r-- | lib/thread.c | 377 | ||||
-rw-r--r-- | lib/thread.h | 6 |
5 files changed, 605 insertions, 109 deletions
diff --git a/lib/Makefile.am b/lib/Makefile.am index 75947e614..ad8a48868 100644 --- a/lib/Makefile.am +++ b/lib/Makefile.am @@ -34,6 +34,7 @@ libfrr_la_SOURCES = \ strlcat.c \ module.c \ hook.c \ + frr_pthread.c \ # end BUILT_SOURCES = route_types.h gitversion.h command_parse.h command_lex.h @@ -74,6 +75,7 @@ pkginclude_HEADERS = \ module.h \ hook.h \ libfrr.h \ + frr_pthread.h \ # end noinst_HEADERS = \ diff --git a/lib/frr_pthread.c b/lib/frr_pthread.c new file mode 100644 index 000000000..0408bca09 --- /dev/null +++ b/lib/frr_pthread.c @@ -0,0 +1,184 @@ +/* + Utilities and interfaces for managing POSIX threads + Copyright (C) 2017 Cumulus Networks + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING; if not, write to the + Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, + MA 02110-1301 USA + */ + +#include <zebra.h> +#include <pthread.h> + +#include "frr_pthread.h" +#include "memory.h" +#include "hash.h" + +DEFINE_MTYPE_STATIC(LIB, FRR_PTHREAD, "FRR POSIX Thread"); + +static unsigned int next_id = 0; + +/* Hash table of all frr_pthreads along with synchronization primitive(s) and + * hash table callbacks. + * ------------------------------------------------------------------------ */ +static struct hash *pthread_table; +static pthread_mutex_t pthread_table_mtx = PTHREAD_MUTEX_INITIALIZER; + +/* pthread_table->hash_cmp */ +static int pthread_table_hash_cmp(const void *value1, const void *value2) +{ + const struct frr_pthread *tq1 = value1; + const struct frr_pthread *tq2 = value2; + + return (tq1->id == tq2->id); +} + +/* pthread_table->hash_key */ +static unsigned int pthread_table_hash_key(void *value) +{ + return ((struct frr_pthread *)value)->id; +} +/* ------------------------------------------------------------------------ */ + +void frr_pthread_init() +{ + pthread_mutex_lock(&pthread_table_mtx); + { + pthread_table = + hash_create(pthread_table_hash_key, pthread_table_hash_cmp); + } + pthread_mutex_unlock(&pthread_table_mtx); +} + +void frr_pthread_finish() +{ + pthread_mutex_lock(&pthread_table_mtx); + { + hash_clean(pthread_table, (void (*)(void *))frr_pthread_destroy); + hash_free(pthread_table); + } + pthread_mutex_unlock(&pthread_table_mtx); +} + +struct frr_pthread *frr_pthread_new(const char *name, unsigned int id, + void *(*start_routine) (void *), + int (*stop_routine) (void **, struct frr_pthread *)) +{ + static struct frr_pthread holder = { 0 }; + struct frr_pthread *fpt = NULL; + + pthread_mutex_lock(&pthread_table_mtx); + { + holder.id = id; + + if (!hash_lookup(pthread_table, &holder)) { + struct frr_pthread *fpt = + XCALLOC(MTYPE_FRR_PTHREAD, + sizeof(struct frr_pthread)); + fpt->id = id; + fpt->master = thread_master_create(); + fpt->start_routine = start_routine; + fpt->stop_routine = stop_routine; + fpt->name = XSTRDUP(MTYPE_FRR_PTHREAD, name); + + hash_get(pthread_table, fpt, hash_alloc_intern); + } + } + pthread_mutex_unlock(&pthread_table_mtx); + + return fpt; +} + +void frr_pthread_destroy(struct frr_pthread *fpt) +{ + thread_master_free(fpt->master); + XFREE(MTYPE_FRR_PTHREAD, fpt->name); + XFREE(MTYPE_FRR_PTHREAD, fpt); +} + +struct frr_pthread *frr_pthread_get(unsigned int id) +{ + static struct frr_pthread holder = { 0 }; + struct frr_pthread *fpt; + + pthread_mutex_lock(&pthread_table_mtx); + { + holder.id = id; + fpt = hash_lookup(pthread_table, &holder); + } + pthread_mutex_unlock(&pthread_table_mtx); + + return fpt; +} + +int frr_pthread_run(unsigned int id, const pthread_attr_t * attr, void *arg) +{ + struct frr_pthread *fpt = frr_pthread_get(id); + int ret; + + if (!fpt) + return -1; + + ret = pthread_create(&fpt->thread, attr, fpt->start_routine, arg); + + /* Per pthread_create(3), the contents of fpt->thread are undefined if + * pthread_create() did not succeed. Reset this value to zero. */ + if (ret < 0) + memset(&fpt->thread, 0x00, sizeof(fpt->thread)); + + return ret; +} + +/** + * Calls the stop routine for the frr_pthread and resets any relevant fields. + * + * @param fpt - the frr_pthread to stop + * @param result - pointer to result pointer + * @return the return code from the stop routine + */ +static int frr_pthread_stop_actual(struct frr_pthread *fpt, void **result) +{ + int ret = (*fpt->stop_routine) (result, fpt); + memset(&fpt->thread, 0x00, sizeof(fpt->thread)); + return ret; +} + +int frr_pthread_stop(unsigned int id, void **result) +{ + struct frr_pthread *fpt = frr_pthread_get(id); + return frr_pthread_stop_actual(fpt, result); +} + +/** + * Callback for hash_iterate to stop all frr_pthread's. + */ +static void frr_pthread_stop_all_iter(struct hash_backet *hb, void *arg) +{ + struct frr_pthread *fpt = hb->data; + frr_pthread_stop_actual(fpt, NULL); +} + +void frr_pthread_stop_all() +{ + pthread_mutex_lock(&pthread_table_mtx); + { + hash_iterate(pthread_table, frr_pthread_stop_all_iter, NULL); + } + pthread_mutex_unlock(&pthread_table_mtx); +} + +unsigned int frr_pthread_get_id() +{ + return next_id++; +} diff --git a/lib/frr_pthread.h b/lib/frr_pthread.h new file mode 100644 index 000000000..b4954367f --- /dev/null +++ b/lib/frr_pthread.h @@ -0,0 +1,145 @@ +/* + Utilities and interfaces for managing POSIX threads + Copyright (C) 2017 Cumulus Networks + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING; if not, write to the + Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, + MA 02110-1301 USA + */ + +#ifndef _FRR_PTHREAD_H +#define _FRR_PTHREAD_H + +#include <pthread.h> +#include "thread.h" + +struct frr_pthread { + + /* pthread id */ + pthread_t thread; + + /* frr thread identifier */ + unsigned int id; + + /* thread master for this pthread's thread.c event loop */ + struct thread_master *master; + + /* start routine */ + void *(*start_routine) (void *); + + /* stop routine */ + int (*stop_routine) (void **, struct frr_pthread *); + + /* the (hopefully descriptive) name of this thread */ + char *name; +}; + +/* Initializes this module. + * + * Must be called before using any of the other functions. + */ +void frr_pthread_init(void); + +/* Uninitializes this module. + * + * Destroys all registered frr_pthread's and internal data structures. + * + * It is safe to call frr_pthread_init() after this function to reinitialize + * the module. + */ +void frr_pthread_finish(void); + +/* Creates a new frr_pthread. + * + * If the provided ID is already assigned to an existing frr_pthread, the + * return value will be NULL. + * + * @param name - the name of the thread. Doesn't have to be unique, but it + * probably should be. This value is copied and may be safely free'd upon + * return. + * + * @param id - the integral ID of the thread. MUST be unique. The caller may + * use this id to retrieve the thread. + * + * @param start_routine - start routine for the pthread, will be passed to + * pthread_create (see those docs for details) + * + * @param stop_routine - stop routine for the pthread, called to terminate the + * thread. This function should gracefully stop the pthread and clean up any + * thread-specific resources. The passed pointer is used to return a data + * result. + * + * @return the created frr_pthread upon success, or NULL upon failure + */ +struct frr_pthread *frr_pthread_new(const char *name, unsigned int id, + void *(*start_routine) (void *), + int (*stop_routine) (void **, struct frr_pthread *)); + +/* Destroys an frr_pthread. + * + * Assumes that the associated pthread, if any, has already terminated. + * + * @param fpt - the frr_pthread to destroy + */ +void frr_pthread_destroy(struct frr_pthread *fpt); + +/* Gets an existing frr_pthread by its id. + * + * @return frr_thread associated with the provided id, or NULL on error + */ +struct frr_pthread *frr_pthread_get(unsigned int id); + +/* Creates a new pthread and binds it to a frr_pthread. + * + * This function is a wrapper for pthread_create. The first parameter is the + * frr_pthread to bind the created pthread to. All subsequent arguments are + * passed unmodified to pthread_create(). + * + * This function returns the same code as pthread_create(). If the value is + * zero, the provided frr_pthread is bound to a running POSIX thread. If the + * value is less than zero, the provided frr_pthread is guaranteed to be a + * clean instance that may be susbsequently passed to frr_pthread_run(). + * + * @param id - frr_pthread to bind the created pthread to + * @param attr - see pthread_create(3) + * @param arg - see pthread_create(3) + * + * @return see pthread_create(3) + */ +int frr_pthread_run(unsigned int id, const pthread_attr_t * attr, void *arg); + +/* Stops an frr_pthread with a result. + * + * @param id - frr_pthread to stop + * @param result - where to store the thread's result, if any. May be NULL if a + * result is not needed. + */ +int frr_pthread_stop(unsigned int id, void **result); + +/* Stops all frr_pthread's. */ +void frr_pthread_stop_all(void); + +/* Returns a unique identifier for use with frr_pthread_new(). + * + * Internally, this is an integer that increments after each call to this + * function. Because the number of pthreads created should never exceed INT_MAX + * during the life of the program, there is no overflow protection. If by + * chance this function returns an ID which is already in use, + * frr_pthread_new() will fail when it is provided. + * + * @return unique identifier + */ +unsigned int frr_pthread_get_id(void); + +#endif /* _FRR_PTHREAD_H */ diff --git a/lib/thread.c b/lib/thread.c index e707fc584..d4ed5d1a0 100644 --- a/lib/thread.c +++ b/lib/thread.c @@ -41,7 +41,7 @@ DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats") #include <mach/mach_time.h> #endif -/* Relative time, since startup */ +static pthread_mutex_t cpu_record_mtx = PTHREAD_MUTEX_INITIALIZER; static struct hash *cpu_record = NULL; static unsigned long @@ -137,9 +137,14 @@ cpu_record_print(struct vty *vty, thread_type filter) vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs"); vty_out(vty, " Avg uSec Max uSecs"); vty_out(vty, " Type Thread%s", VTY_NEWLINE); - hash_iterate(cpu_record, - (void(*)(struct hash_backet*,void*))cpu_record_hash_print, - args); + + pthread_mutex_lock (&cpu_record_mtx); + { + hash_iterate(cpu_record, + (void(*)(struct hash_backet*,void*))cpu_record_hash_print, + args); + } + pthread_mutex_unlock (&cpu_record_mtx); if (tmp.total_calls > 0) vty_out_cpu_thread_history(vty, &tmp); @@ -216,16 +221,25 @@ cpu_record_hash_clear (struct hash_backet *bucket, if ( !(a->types & *filter) ) return; - hash_release (cpu_record, bucket->data); + pthread_mutex_lock (&cpu_record_mtx); + { + hash_release (cpu_record, bucket->data); + } + pthread_mutex_unlock (&cpu_record_mtx); } static void cpu_record_clear (thread_type filter) { thread_type *tmp = &filter; - hash_iterate (cpu_record, - (void (*) (struct hash_backet*,void*)) cpu_record_hash_clear, - tmp); + + pthread_mutex_lock (&cpu_record_mtx); + { + hash_iterate (cpu_record, + (void (*) (struct hash_backet*,void*)) cpu_record_hash_clear, + tmp); + } + pthread_mutex_unlock (&cpu_record_mtx); } DEFUN (clear_thread_cpu, @@ -326,16 +340,20 @@ thread_master_create (void) getrlimit(RLIMIT_NOFILE, &limit); - if (cpu_record == NULL) - cpu_record - = hash_create ((unsigned int (*) (void *))cpu_record_hash_key, - (int (*) (const void *, const void *))cpu_record_hash_cmp); + pthread_mutex_lock (&cpu_record_mtx); + { + if (cpu_record == NULL) + cpu_record = hash_create ((unsigned int (*) (void *))cpu_record_hash_key, + (int (*) (const void *, const void *)) + cpu_record_hash_cmp); + } + pthread_mutex_unlock (&cpu_record_mtx); rv = XCALLOC (MTYPE_THREAD_MASTER, sizeof (struct thread_master)); if (rv == NULL) - { - return NULL; - } + return NULL; + + pthread_mutex_init (&rv->mtx, NULL); rv->fd_limit = (int)limit.rlim_cur; rv->read = XCALLOC (MTYPE_THREAD, sizeof (struct thread *) * rv->fd_limit); @@ -358,6 +376,8 @@ thread_master_create (void) rv->background = pqueue_create(); rv->timer->cmp = rv->background->cmp = thread_timer_cmp; rv->timer->update = rv->background->update = thread_timer_update; + rv->spin = true; + rv->handle_signals = true; #if defined(HAVE_POLL) rv->handler.pfdsize = rv->fd_limit; @@ -498,11 +518,16 @@ thread_queue_free (struct thread_master *m, struct pqueue *queue) void thread_master_free_unused (struct thread_master *m) { - struct thread *t; - while ((t = thread_trim_head(&m->unuse)) != NULL) - { - XFREE(MTYPE_THREAD, t); - } + pthread_mutex_lock (&m->mtx); + { + struct thread *t; + while ((t = thread_trim_head(&m->unuse)) != NULL) + { + pthread_mutex_destroy (&t->mtx); + XFREE(MTYPE_THREAD, t); + } + } + pthread_mutex_unlock (&m->mtx); } /* Stop thread scheduler. */ @@ -516,25 +541,37 @@ thread_master_free (struct thread_master *m) thread_list_free (m, &m->ready); thread_list_free (m, &m->unuse); thread_queue_free (m, m->background); + pthread_mutex_destroy (&m->mtx); #if defined(HAVE_POLL) XFREE (MTYPE_THREAD_MASTER, m->handler.pfds); #endif XFREE (MTYPE_THREAD_MASTER, m); - if (cpu_record) - { - hash_clean (cpu_record, cpu_record_hash_free); - hash_free (cpu_record); - cpu_record = NULL; - } + pthread_mutex_lock (&cpu_record_mtx); + { + if (cpu_record) + { + hash_clean (cpu_record, cpu_record_hash_free); + hash_free (cpu_record); + cpu_record = NULL; + } + } + pthread_mutex_unlock (&cpu_record_mtx); } /* Return remain time in second. */ unsigned long thread_timer_remain_second (struct thread *thread) { - int64_t remain = monotime_until(&thread->u.sands, NULL) / 1000000LL; + int64_t remain; + + pthread_mutex_lock (&thread->mtx); + { + remain = monotime_until(&thread->u.sands, NULL) / 1000000LL; + } + pthread_mutex_unlock (&thread->mtx); + return remain < 0 ? 0 : remain; } @@ -545,7 +582,11 @@ struct timeval thread_timer_remain(struct thread *thread) { struct timeval remain; - monotime_until(&thread->u.sands, &remain); + pthread_mutex_lock (&thread->mtx); + { + monotime_until(&thread->u.sands, &remain); + } + pthread_mutex_unlock (&thread->mtx); return remain; } @@ -560,8 +601,11 @@ thread_get (struct thread_master *m, u_char type, if (! thread) { thread = XCALLOC (MTYPE_THREAD, sizeof (struct thread)); + /* mutex only needs to be initialized at struct creation. */ + pthread_mutex_init (&thread->mtx, NULL); m->alloc++; } + thread->type = type; thread->add_type = type; thread->master = m; @@ -584,8 +628,12 @@ thread_get (struct thread_master *m, u_char type, { tmp.func = func; tmp.funcname = funcname; - thread->hist = hash_get (cpu_record, &tmp, - (void * (*) (void *))cpu_record_hash_alloc); + pthread_mutex_lock (&cpu_record_mtx); + { + thread->hist = hash_get (cpu_record, &tmp, + (void * (*) (void *))cpu_record_hash_alloc); + } + pthread_mutex_unlock (&cpu_record_mtx); } thread->hist->total_active++; thread->func = func; @@ -650,15 +698,45 @@ static int fd_select (struct thread_master *m, int size, thread_fd_set *read, thread_fd_set *write, thread_fd_set *except, struct timeval *timer_wait) { int num; + + /* If timer_wait is null here, that means either select() or poll() should + * block indefinitely, unless the thread_master has overriden it. select() + * and poll() differ in the timeout values they interpret as an indefinite + * block; select() requires a null pointer, while poll takes a millisecond + * value of -1. + * + * The thread_master owner has the option of overriding the default behavior + * by setting ->selectpoll_timeout. If the value is positive, it specifies + * the maximum number of milliseconds to wait. If the timeout is -1, it + * specifies that we should never wait and always return immediately even if + * no event is detected. If the value is zero, the behavior is default. + */ + #if defined(HAVE_POLL) - /* recalc timeout for poll. Attention NULL pointer is no timeout with - select, where with poll no timeount is -1 */ int timeout = -1; - if (timer_wait != NULL) + + if (timer_wait != NULL && m->selectpoll_timeout == 0) // use the default value timeout = (timer_wait->tv_sec*1000) + (timer_wait->tv_usec/1000); + else if (m->selectpoll_timeout > 0) // use the user's timeout + timeout = m->selectpoll_timeout; + else if (m->selectpoll_timeout < 0) // effect a poll (return immediately) + timeout = 0; num = poll (m->handler.pfds, m->handler.pfdcount + m->handler.pfdcountsnmp, timeout); #else + struct timeval timeout; + if (m->selectpoll_timeout > 0) // use the user's timeout + { + timeout.tv_sec = m->selectpoll_timeout / 1000; + timeout.tv_usec = (m->selectpoll_timeout % 1000) * 1000; + timer_wait = &timeout; + } + else if (m->selectpoll_timeout < 0) // effect a poll (return immediately) + { + timeout.tv_sec = 0; + timeout.tv_usec = 0; + timer_wait = &timeout; + } num = select (size, read, write, except, timer_wait); #endif @@ -703,36 +781,43 @@ funcname_thread_add_read_write (int dir, struct thread_master *m, { struct thread *thread = NULL; -#if !defined(HAVE_POLL) - thread_fd_set *fdset = NULL; - if (dir == THREAD_READ) - fdset = &m->handler.readfd; - else - fdset = &m->handler.writefd; -#endif - + pthread_mutex_lock (&m->mtx); + { #if defined (HAVE_POLL) - thread = generic_thread_add(m, func, arg, fd, dir, debugargpass); - - if (thread == NULL) - return NULL; + thread = generic_thread_add(m, func, arg, fd, dir, debugargpass); #else - if (FD_ISSET (fd, fdset)) - { - zlog_warn ("There is already %s fd [%d]", - (dir == THREAD_READ) ? "read" : "write", fd); - return NULL; - } + thread_fd_set *fdset = NULL; + if (dir == THREAD_READ) + fdset = &m->handler.readfd; + else + fdset = &m->handler.writefd; - FD_SET (fd, fdset); - thread = thread_get (m, dir, func, arg, debugargpass); + if (FD_ISSET (fd, fdset)) + { + zlog_warn ("There is already %s fd [%d]", + (dir == THREAD_READ) ? "read" : "write", fd); + } + else + { + FD_SET (fd, fdset); + thread = thread_get (m, dir, func, arg, debugargpass); + } #endif - thread->u.fd = fd; - if (dir == THREAD_READ) - thread_add_fd (m->read, thread); - else - thread_add_fd (m->write, thread); + if (thread) + { + pthread_mutex_lock (&thread->mtx); + { + thread->u.fd = fd; + if (dir == THREAD_READ) + thread_add_fd (m->read, thread); + else + thread_add_fd (m->write, thread); + } + pthread_mutex_unlock (&thread->mtx); + } + } + pthread_mutex_unlock (&m->mtx); return thread; } @@ -753,13 +838,21 @@ funcname_thread_add_timer_timeval (struct thread_master *m, assert (type == THREAD_TIMER || type == THREAD_BACKGROUND); assert (time_relative); - queue = ((type == THREAD_TIMER) ? m->timer : m->background); - thread = thread_get (m, type, func, arg, debugargpass); + pthread_mutex_lock (&m->mtx); + { + queue = ((type == THREAD_TIMER) ? m->timer : m->background); + thread = thread_get (m, type, func, arg, debugargpass); - monotime(&thread->u.sands); - timeradd(&thread->u.sands, time_relative, &thread->u.sands); + pthread_mutex_lock (&thread->mtx); + { + monotime(&thread->u.sands); + timeradd(&thread->u.sands, time_relative, &thread->u.sands); + pqueue_enqueue(thread, queue); + } + pthread_mutex_unlock (&thread->mtx); + } + pthread_mutex_unlock (&m->mtx); - pqueue_enqueue(thread, queue); return thread; } @@ -847,9 +940,17 @@ funcname_thread_add_event (struct thread_master *m, assert (m != NULL); - thread = thread_get (m, THREAD_EVENT, func, arg, debugargpass); - thread->u.val = val; - thread_list_add (&m->event, thread); + pthread_mutex_lock (&m->mtx); + { + thread = thread_get (m, THREAD_EVENT, func, arg, debugargpass); + pthread_mutex_lock (&thread->mtx); + { + thread->u.val = val; + thread_list_add (&m->event, thread); + } + pthread_mutex_unlock (&thread->mtx); + } + pthread_mutex_unlock (&m->mtx); return thread; } @@ -880,14 +981,22 @@ thread_cancel_read_or_write (struct thread *thread, short int state) fd_clear_read_write (thread); } -/* Cancel thread from scheduler. */ +/** + * Cancel thread from scheduler. + * + * This function is *NOT* MT-safe. DO NOT call it from any other pthread except + * the one which owns thread->master. + */ void thread_cancel (struct thread *thread) { struct thread_list *list = NULL; struct pqueue *queue = NULL; struct thread **thread_array = NULL; - + + pthread_mutex_lock (&thread->master->mtx); + pthread_mutex_lock (&thread->mtx); + switch (thread->type) { case THREAD_READ: @@ -919,15 +1028,14 @@ thread_cancel (struct thread *thread) queue = thread->master->background; break; default: - return; + goto done; break; } if (queue) { assert(thread->index >= 0); - assert(thread == queue->array[thread->index]); - pqueue_remove_at(thread->index, queue); + pqueue_remove (thread, queue); } else if (list) { @@ -943,6 +1051,10 @@ thread_cancel (struct thread *thread) } thread_add_unuse (thread->master, thread); + +done: + pthread_mutex_unlock (&thread->mtx); + pthread_mutex_unlock (&thread->master->mtx); } /* Delete all events which has argument value arg. */ @@ -951,39 +1063,48 @@ thread_cancel_event (struct thread_master *m, void *arg) { unsigned int ret = 0; struct thread *thread; + struct thread *t; - thread = m->event.head; - while (thread) - { - struct thread *t; - - t = thread; - thread = t->next; - - if (t->arg == arg) + pthread_mutex_lock (&m->mtx); + { + thread = m->event.head; + while (thread) + { + t = thread; + pthread_mutex_lock (&t->mtx); { - ret++; - thread_list_delete (&m->event, t); - thread_add_unuse (m, t); + thread = t->next; + + if (t->arg == arg) + { + ret++; + thread_list_delete (&m->event, t); + thread_add_unuse (m, t); + } } - } - - /* thread can be on the ready list too */ - thread = m->ready.head; - while (thread) - { - struct thread *t; - - t = thread; - thread = t->next; + pthread_mutex_unlock (&t->mtx); + } - if (t->arg == arg) + /* thread can be on the ready list too */ + thread = m->ready.head; + while (thread) + { + t = thread; + pthread_mutex_lock (&t->mtx); { - ret++; - thread_list_delete (&m->ready, t); - thread_add_unuse (m, t); + thread = t->next; + + if (t->arg == arg) + { + ret++; + thread_list_delete (&m->ready, t); + thread_add_unuse (m, t); + } } - } + pthread_mutex_unlock (&t->mtx); + } + } + pthread_mutex_unlock (&m->mtx); return ret; } @@ -1143,18 +1264,24 @@ thread_fetch (struct thread_master *m, struct thread *fetch) struct timeval *timer_wait = &timer_val; struct timeval *timer_wait_bg; - while (1) + do { int num = 0; /* Signals pre-empt everything */ - quagga_sigevent_process (); + if (m->handle_signals) + quagga_sigevent_process (); + pthread_mutex_lock (&m->mtx); /* Drain the ready queue of already scheduled jobs, before scheduling * more. */ if ((thread = thread_trim_head (&m->ready)) != NULL) - return thread_run (m, thread, fetch); + { + fetch = thread_run (m, thread, fetch); + pthread_mutex_unlock (&m->mtx); + return fetch; + } /* To be fair to all kinds of threads, and avoid starvation, we * need to be careful to consider all thread types for scheduling @@ -1194,8 +1321,12 @@ thread_fetch (struct thread_master *m, struct thread *fetch) if (num < 0) { if (errno == EINTR) - continue; /* signal received - process it */ + { + pthread_mutex_unlock (&m->mtx); + continue; /* signal received - process it */ + } zlog_warn ("select() error: %s", safe_strerror (errno)); + pthread_mutex_unlock (&m->mtx); return NULL; } @@ -1215,15 +1346,28 @@ thread_fetch (struct thread_master *m, struct thread *fetch) list at this time. If this is code is uncommented, then background timer threads will not run unless there is nothing else to do. */ if ((thread = thread_trim_head (&m->ready)) != NULL) - return thread_run (m, thread, fetch); + { + fetch = thread_run (m, thread, fetch); + pthread_mutex_unlock (&m->mtx); + return fetch; + } #endif /* Background timer/events, lowest priority */ thread_timer_process (m->background, &now); if ((thread = thread_trim_head (&m->ready)) != NULL) - return thread_run (m, thread, fetch); - } + { + fetch = thread_run (m, thread, fetch); + pthread_mutex_unlock (&m->mtx); + return fetch; + } + + pthread_mutex_unlock (&m->mtx); + + } while (m->spin); + + return NULL; } unsigned long @@ -1248,13 +1392,23 @@ thread_consumed_time (RUSAGE_T *now, RUSAGE_T *start, unsigned long *cputime) int thread_should_yield (struct thread *thread) { - return monotime_since(&thread->real, NULL) > (int64_t)thread->yield; + int result; + pthread_mutex_lock (&thread->mtx); + { + result = monotime_since(&thread->real, NULL) > (int64_t)thread->yield; + } + pthread_mutex_unlock (&thread->mtx); + return result; } void thread_set_yield_time (struct thread *thread, unsigned long yield_time) { - thread->yield = yield_time; + pthread_mutex_lock (&thread->mtx); + { + thread->yield = yield_time; + } + pthread_mutex_unlock (&thread->mtx); } void @@ -1324,6 +1478,7 @@ funcname_thread_execute (struct thread_master *m, memset (&dummy, 0, sizeof (struct thread)); + pthread_mutex_init (&dummy.mtx, NULL); dummy.type = THREAD_EVENT; dummy.add_type = THREAD_EXECUTE; dummy.master = NULL; @@ -1332,8 +1487,12 @@ funcname_thread_execute (struct thread_master *m, tmp.func = dummy.func = func; tmp.funcname = dummy.funcname = funcname; - dummy.hist = hash_get (cpu_record, &tmp, - (void * (*) (void *))cpu_record_hash_alloc); + pthread_mutex_lock (&cpu_record_mtx); + { + dummy.hist = hash_get (cpu_record, &tmp, + (void * (*) (void *))cpu_record_hash_alloc); + } + pthread_mutex_unlock (&cpu_record_mtx); dummy.schedfrom = schedfrom; dummy.schedfrom_line = fromln; diff --git a/lib/thread.h b/lib/thread.h index 34adcc4d0..18fd340ba 100644 --- a/lib/thread.h +++ b/lib/thread.h @@ -24,6 +24,7 @@ #include <zebra.h> #include "monotime.h" +#include <pthread.h> struct rusage_t { @@ -84,6 +85,10 @@ struct thread_master int fd_limit; struct fd_handler handler; unsigned long alloc; + long selectpoll_timeout; + bool spin; + bool handle_signals; + pthread_mutex_t mtx; }; typedef unsigned char thread_type; @@ -110,6 +115,7 @@ struct thread const char *funcname; const char *schedfrom; int schedfrom_line; + pthread_mutex_t mtx; }; struct cpu_thread_history |