diff options
-rw-r--r-- | fs/orangefs/devorangefs-req.c | 15 | ||||
-rw-r--r-- | fs/orangefs/orangefs-bufmap.c | 341 | ||||
-rw-r--r-- | fs/orangefs/orangefs-bufmap.h | 4 | ||||
-rw-r--r-- | fs/orangefs/waitqueue.c | 61 |
4 files changed, 174 insertions, 247 deletions
diff --git a/fs/orangefs/devorangefs-req.c b/fs/orangefs/devorangefs-req.c index 6a7df1204bfc..790855a72e32 100644 --- a/fs/orangefs/devorangefs-req.c +++ b/fs/orangefs/devorangefs-req.c @@ -508,8 +508,7 @@ static int orangefs_devreq_release(struct inode *inode, struct file *file) __func__); mutex_lock(&devreq_mutex); - if (orangefs_get_bufmap_init()) - orangefs_bufmap_finalize(); + orangefs_bufmap_finalize(); open_access_count = -1; @@ -527,6 +526,9 @@ static int orangefs_devreq_release(struct inode *inode, struct file *file) * them as purged and wake them up */ purge_inprogress_ops(); + + orangefs_bufmap_run_down(); + gossip_debug(GOSSIP_DEV_DEBUG, "pvfs2-client-core: device close complete\n"); open_access_count = 0; @@ -607,13 +609,8 @@ static long dispatch_ioctl_command(unsigned int command, unsigned long arg) (struct ORANGEFS_dev_map_desc __user *) arg, sizeof(struct ORANGEFS_dev_map_desc)); - if (orangefs_get_bufmap_init()) { - return -EINVAL; - } else { - return ret ? - -EIO : - orangefs_bufmap_initialize(&user_desc); - } + /* WTF -EIO and not -EFAULT? */ + return ret ? -EIO : orangefs_bufmap_initialize(&user_desc); case ORANGEFS_DEV_REMOUNT_ALL: gossip_debug(GOSSIP_DEV_DEBUG, "%s: got ORANGEFS_DEV_REMOUNT_ALL\n", diff --git a/fs/orangefs/orangefs-bufmap.c b/fs/orangefs/orangefs-bufmap.c index cd484665bf72..96faf4ee6529 100644 --- a/fs/orangefs/orangefs-bufmap.c +++ b/fs/orangefs/orangefs-bufmap.c @@ -7,7 +7,133 @@ #include "orangefs-kernel.h" #include "orangefs-bufmap.h" -DECLARE_WAIT_QUEUE_HEAD(orangefs_bufmap_init_waitq); +struct slot_map { + int c; + wait_queue_head_t q; + int count; + unsigned long *map; +}; + +static struct slot_map rw_map = { + .c = -1, + .q = __WAIT_QUEUE_HEAD_INITIALIZER(rw_map.q) +}; +static struct slot_map readdir_map = { + .c = -1, + .q = __WAIT_QUEUE_HEAD_INITIALIZER(readdir_map.q) +}; + + +static void install(struct slot_map *m, int count, unsigned long *map) +{ + spin_lock(&m->q.lock); + m->c = m->count = count; + m->map = map; + wake_up_all_locked(&m->q); + spin_unlock(&m->q.lock); +} + +static void mark_killed(struct slot_map *m) +{ + spin_lock(&m->q.lock); + m->c -= m->count + 1; + spin_unlock(&m->q.lock); +} + +static void run_down(struct slot_map *m) +{ + DEFINE_WAIT(wait); + spin_lock(&m->q.lock); + if (m->c != -1) { + for (;;) { + if (likely(list_empty(&wait.task_list))) + __add_wait_queue_tail(&m->q, &wait); + set_current_state(TASK_UNINTERRUPTIBLE); + + if (m->c == -1) + break; + + spin_unlock(&m->q.lock); + schedule(); + spin_lock(&m->q.lock); + } + __remove_wait_queue(&m->q, &wait); + __set_current_state(TASK_RUNNING); + } + m->map = NULL; + spin_unlock(&m->q.lock); +} + +static void put(struct slot_map *m, int slot) +{ + int v; + spin_lock(&m->q.lock); + __clear_bit(slot, m->map); + v = ++m->c; + if (unlikely(v == 1)) /* no free slots -> one free slot */ + wake_up_locked(&m->q); + else if (unlikely(v == -1)) /* finished dying */ + wake_up_all_locked(&m->q); + spin_unlock(&m->q.lock); +} + +static int wait_for_free(struct slot_map *m) +{ + long left = slot_timeout_secs * HZ; + DEFINE_WAIT(wait); + + do { + long n = left, t; + if (likely(list_empty(&wait.task_list))) + __add_wait_queue_tail_exclusive(&m->q, &wait); + set_current_state(TASK_INTERRUPTIBLE); + + if (m->c > 0) + break; + + if (m->c < 0) { + /* we are waiting for map to be installed */ + /* it would better be there soon, or we go away */ + if (n > ORANGEFS_BUFMAP_WAIT_TIMEOUT_SECS * HZ) + n = ORANGEFS_BUFMAP_WAIT_TIMEOUT_SECS * HZ; + } + spin_unlock(&m->q.lock); + t = schedule_timeout(n); + spin_lock(&m->q.lock); + if (unlikely(!t) && n != left && m->c < 0) + left = t; + else + left = t + (left - n); + if (unlikely(signal_pending(current))) + left = -EINTR; + } while (left > 0); + + if (!list_empty(&wait.task_list)) + list_del(&wait.task_list); + else if (left <= 0 && waitqueue_active(&m->q)) + __wake_up_locked_key(&m->q, TASK_INTERRUPTIBLE, NULL); + __set_current_state(TASK_RUNNING); + + if (likely(left > 0)) + return 0; + + return left < 0 ? -EINTR : -ETIMEDOUT; +} + +static int get(struct slot_map *m) +{ + int res = 0; + spin_lock(&m->q.lock); + if (unlikely(m->c <= 0)) + res = wait_for_free(m); + if (likely(!res)) { + m->c--; + res = find_first_zero_bit(m->map, m->count); + __set_bit(res, m->map); + } + spin_unlock(&m->q.lock); + return res; +} /* used to describe mapped buffers */ struct orangefs_bufmap_desc { @@ -18,8 +144,6 @@ struct orangefs_bufmap_desc { }; static struct orangefs_bufmap { - atomic_t refcnt; - int desc_size; int desc_shift; int desc_count; @@ -30,12 +154,12 @@ static struct orangefs_bufmap { struct orangefs_bufmap_desc *desc_array; /* array to track usage of buffer descriptors */ - int *buffer_index_array; - spinlock_t buffer_index_lock; + unsigned long *buffer_index_array; /* array to track usage of buffer descriptors for readdir */ - int readdir_index_array[ORANGEFS_READDIR_DEFAULT_DESC_COUNT]; - spinlock_t readdir_index_lock; +#define N DIV_ROUND_UP(ORANGEFS_READDIR_DEFAULT_DESC_COUNT, BITS_PER_LONG) + unsigned long readdir_index_array[N]; +#undef N } *__orangefs_bufmap; static DEFINE_SPINLOCK(orangefs_bufmap_lock); @@ -58,30 +182,6 @@ orangefs_bufmap_free(struct orangefs_bufmap *bufmap) kfree(bufmap); } -static struct orangefs_bufmap *orangefs_bufmap_ref(void) -{ - struct orangefs_bufmap *bufmap = NULL; - - spin_lock(&orangefs_bufmap_lock); - if (__orangefs_bufmap) { - bufmap = __orangefs_bufmap; - atomic_inc(&bufmap->refcnt); - } - spin_unlock(&orangefs_bufmap_lock); - return bufmap; -} - -static void orangefs_bufmap_unref(struct orangefs_bufmap *bufmap) -{ - if (atomic_dec_and_lock(&bufmap->refcnt, &orangefs_bufmap_lock)) { - __orangefs_bufmap = NULL; - spin_unlock(&orangefs_bufmap_lock); - - orangefs_bufmap_unmap(bufmap); - orangefs_bufmap_free(bufmap); - } -} - /* * XXX: Can the size and shift change while the caller gives up the * XXX: lock between calling this and doing something useful? @@ -137,21 +237,18 @@ orangefs_bufmap_alloc(struct ORANGEFS_dev_map_desc *user_desc) if (!bufmap) goto out; - atomic_set(&bufmap->refcnt, 1); bufmap->total_size = user_desc->total_size; bufmap->desc_count = user_desc->count; bufmap->desc_size = user_desc->size; bufmap->desc_shift = ilog2(bufmap->desc_size); - spin_lock_init(&bufmap->buffer_index_lock); bufmap->buffer_index_array = - kcalloc(bufmap->desc_count, sizeof(int), GFP_KERNEL); + kzalloc(DIV_ROUND_UP(bufmap->desc_count, BITS_PER_LONG), GFP_KERNEL); if (!bufmap->buffer_index_array) { gossip_err("orangefs: could not allocate %d buffer indices\n", bufmap->desc_count); goto out_free_bufmap; } - spin_lock_init(&bufmap->readdir_index_lock); bufmap->desc_array = kcalloc(bufmap->desc_count, sizeof(struct orangefs_bufmap_desc), @@ -294,24 +391,18 @@ int orangefs_bufmap_initialize(struct ORANGEFS_dev_map_desc *user_desc) if (__orangefs_bufmap) { spin_unlock(&orangefs_bufmap_lock); gossip_err("orangefs: error: bufmap already initialized.\n"); - ret = -EALREADY; + ret = -EINVAL; goto out_unmap_bufmap; } __orangefs_bufmap = bufmap; + install(&rw_map, + bufmap->desc_count, + bufmap->buffer_index_array); + install(&readdir_map, + ORANGEFS_READDIR_DEFAULT_DESC_COUNT, + bufmap->readdir_index_array); spin_unlock(&orangefs_bufmap_lock); - /* - * If there are operations in orangefs_bufmap_init_waitq, wake them up. - * This scenario occurs when the client-core is restarted and I/O - * requests in the in-progress or waiting tables are restarted. I/O - * requests cannot be restarted until the shared memory system is - * completely re-initialized, so we put the I/O requests in this - * waitq until initialization has completed. NOTE: the I/O requests - * are also on a timer, so they don't wait forever just in case the - * client-core doesn't come back up. - */ - wake_up_interruptible(&orangefs_bufmap_init_waitq); - gossip_debug(GOSSIP_BUFMAP_DEBUG, "orangefs_bufmap_initialize: exiting normally\n"); return 0; @@ -334,91 +425,28 @@ out: */ void orangefs_bufmap_finalize(void) { + struct orangefs_bufmap *bufmap = __orangefs_bufmap; + if (!bufmap) + return; gossip_debug(GOSSIP_BUFMAP_DEBUG, "orangefs_bufmap_finalize: called\n"); - BUG_ON(!__orangefs_bufmap); - orangefs_bufmap_unref(__orangefs_bufmap); + mark_killed(&rw_map); + mark_killed(&readdir_map); gossip_debug(GOSSIP_BUFMAP_DEBUG, "orangefs_bufmap_finalize: exiting normally\n"); } -struct slot_args { - int slot_count; - int *slot_array; - spinlock_t *slot_lock; - wait_queue_head_t *slot_wq; -}; - -static int wait_for_a_slot(struct slot_args *slargs, int *buffer_index) +void orangefs_bufmap_run_down(void) { - int ret = -1; - int i = 0; - DEFINE_WAIT(wait_entry); - - while (1) { - /* - * check for available desc, slot_lock is the appropriate - * index_lock - */ - spin_lock(slargs->slot_lock); - prepare_to_wait_exclusive(slargs->slot_wq, - &wait_entry, - TASK_INTERRUPTIBLE); - for (i = 0; i < slargs->slot_count; i++) - if (slargs->slot_array[i] == 0) { - slargs->slot_array[i] = 1; - *buffer_index = i; - ret = 0; - break; - } - spin_unlock(slargs->slot_lock); - - /* if we acquired a buffer, then break out of while */ - if (ret == 0) - break; - - if (!signal_pending(current)) { - gossip_debug(GOSSIP_BUFMAP_DEBUG, - "[BUFMAP]: waiting %d " - "seconds for a slot\n", - slot_timeout_secs); - if (!schedule_timeout(slot_timeout_secs * HZ)) { - gossip_debug(GOSSIP_BUFMAP_DEBUG, - "*** wait_for_a_slot timed out\n"); - ret = -ETIMEDOUT; - break; - } - gossip_debug(GOSSIP_BUFMAP_DEBUG, - "[BUFMAP]: woken up by a slot becoming available.\n"); - continue; - } - - gossip_debug(GOSSIP_BUFMAP_DEBUG, "orangefs: %s interrupted.\n", - __func__); - ret = -EINTR; - break; - } - - spin_lock(slargs->slot_lock); - finish_wait(slargs->slot_wq, &wait_entry); - spin_unlock(slargs->slot_lock); - return ret; -} - -static void put_back_slot(struct slot_args *slargs, int buffer_index) -{ - /* slot_lock is the appropriate index_lock */ - spin_lock(slargs->slot_lock); - if (buffer_index < 0 || buffer_index >= slargs->slot_count) { - spin_unlock(slargs->slot_lock); + struct orangefs_bufmap *bufmap = __orangefs_bufmap; + if (!bufmap) return; - } - - /* put the desc back on the queue */ - slargs->slot_array[buffer_index] = 0; - spin_unlock(slargs->slot_lock); - - /* wake up anyone who may be sleeping on the queue */ - wake_up_interruptible(slargs->slot_wq); + run_down(&rw_map); + run_down(&readdir_map); + spin_lock(&orangefs_bufmap_lock); + __orangefs_bufmap = NULL; + spin_unlock(&orangefs_bufmap_lock); + orangefs_bufmap_unmap(bufmap); + orangefs_bufmap_free(bufmap); } /* @@ -431,23 +459,12 @@ static void put_back_slot(struct slot_args *slargs, int buffer_index) */ int orangefs_bufmap_get(struct orangefs_bufmap **mapp, int *buffer_index) { - struct orangefs_bufmap *bufmap = orangefs_bufmap_ref(); - struct slot_args slargs; - int ret; - - if (!bufmap) { - gossip_err("orangefs: please confirm that pvfs2-client daemon is running.\n"); - return -EIO; + int ret = get(&rw_map); + if (ret >= 0) { + *mapp = __orangefs_bufmap; + *buffer_index = ret; + ret = 0; } - - slargs.slot_count = bufmap->desc_count; - slargs.slot_array = bufmap->buffer_index_array; - slargs.slot_lock = &bufmap->buffer_index_lock; - slargs.slot_wq = &bufmap_waitq; - ret = wait_for_a_slot(&slargs, buffer_index); - if (ret) - orangefs_bufmap_unref(bufmap); - *mapp = bufmap; return ret; } @@ -460,15 +477,7 @@ int orangefs_bufmap_get(struct orangefs_bufmap **mapp, int *buffer_index) */ void orangefs_bufmap_put(int buffer_index) { - struct slot_args slargs; - struct orangefs_bufmap *bufmap = __orangefs_bufmap; - - slargs.slot_count = bufmap->desc_count; - slargs.slot_array = bufmap->buffer_index_array; - slargs.slot_lock = &bufmap->buffer_index_lock; - slargs.slot_wq = &bufmap_waitq; - put_back_slot(&slargs, buffer_index); - orangefs_bufmap_unref(bufmap); + put(&rw_map, buffer_index); } /* @@ -484,36 +493,18 @@ void orangefs_bufmap_put(int buffer_index) */ int orangefs_readdir_index_get(struct orangefs_bufmap **mapp, int *buffer_index) { - struct orangefs_bufmap *bufmap = orangefs_bufmap_ref(); - struct slot_args slargs; - int ret; - - if (!bufmap) { - gossip_err("orangefs: please confirm that pvfs2-client daemon is running.\n"); - return -EIO; + int ret = get(&readdir_map); + if (ret >= 0) { + *mapp = __orangefs_bufmap; + *buffer_index = ret; + ret = 0; } - - slargs.slot_count = ORANGEFS_READDIR_DEFAULT_DESC_COUNT; - slargs.slot_array = bufmap->readdir_index_array; - slargs.slot_lock = &bufmap->readdir_index_lock; - slargs.slot_wq = &readdir_waitq; - ret = wait_for_a_slot(&slargs, buffer_index); - if (ret) - orangefs_bufmap_unref(bufmap); - *mapp = bufmap; return ret; } void orangefs_readdir_index_put(struct orangefs_bufmap *bufmap, int buffer_index) { - struct slot_args slargs; - - slargs.slot_count = ORANGEFS_READDIR_DEFAULT_DESC_COUNT; - slargs.slot_array = bufmap->readdir_index_array; - slargs.slot_lock = &bufmap->readdir_index_lock; - slargs.slot_wq = &readdir_waitq; - put_back_slot(&slargs, buffer_index); - orangefs_bufmap_unref(bufmap); + put(&readdir_map, buffer_index); } /* diff --git a/fs/orangefs/orangefs-bufmap.h b/fs/orangefs/orangefs-bufmap.h index 2a2d4269d03e..f0684f0085d1 100644 --- a/fs/orangefs/orangefs-bufmap.h +++ b/fs/orangefs/orangefs-bufmap.h @@ -15,10 +15,10 @@ int orangefs_bufmap_shift_query(void); int orangefs_bufmap_initialize(struct ORANGEFS_dev_map_desc *user_desc); -int orangefs_get_bufmap_init(void); - void orangefs_bufmap_finalize(void); +void orangefs_bufmap_run_down(void); + int orangefs_bufmap_get(struct orangefs_bufmap **mapp, int *buffer_index); void orangefs_bufmap_put(int buffer_index); diff --git a/fs/orangefs/waitqueue.c b/fs/orangefs/waitqueue.c index 378cdcf43252..36eedd6a8335 100644 --- a/fs/orangefs/waitqueue.c +++ b/fs/orangefs/waitqueue.c @@ -155,67 +155,6 @@ retry_servicing: * system */ goto retry_servicing; - - /* op uses shared memory */ - if (orangefs_get_bufmap_init() == 0) { - WARN_ON(1); - /* - * This operation uses the shared memory system AND - * the system is not yet ready. This situation occurs - * when the client-core is restarted AND there were - * operations waiting to be processed or were already - * in process. - */ - gossip_debug(GOSSIP_WAIT_DEBUG, - "uses_shared_memory is true.\n"); - gossip_debug(GOSSIP_WAIT_DEBUG, - "Client core in-service status(%d).\n", - is_daemon_in_service()); - gossip_debug(GOSSIP_WAIT_DEBUG, "bufmap_init:%d.\n", - orangefs_get_bufmap_init()); - gossip_debug(GOSSIP_WAIT_DEBUG, - "operation's status is 0x%0x.\n", - op->op_state); - - /* - * let process sleep for a few seconds so shared - * memory system can be initialized. - */ - prepare_to_wait(&orangefs_bufmap_init_waitq, - &wait_entry, - TASK_INTERRUPTIBLE); - - /* - * Wait for orangefs_bufmap_initialize() to wake me up - * within the allotted time. - */ - ret = schedule_timeout( - ORANGEFS_BUFMAP_WAIT_TIMEOUT_SECS * HZ); - - gossip_debug(GOSSIP_WAIT_DEBUG, - "Value returned from schedule_timeout:" - "%d.\n", - ret); - gossip_debug(GOSSIP_WAIT_DEBUG, - "Is shared memory available? (%d).\n", - orangefs_get_bufmap_init()); - - finish_wait(&orangefs_bufmap_init_waitq, &wait_entry); - - if (orangefs_get_bufmap_init() == 0) { - gossip_err("%s:The shared memory system has not started in %d seconds after the client core restarted. Aborting user's request(%s).\n", - __func__, - ORANGEFS_BUFMAP_WAIT_TIMEOUT_SECS, - get_opname_string(op)); - return -EIO; - } - - /* - * Return to the calling function and re-populate a - * shared memory buffer. - */ - return -EAGAIN; - } } out: |