summaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/dlmglue.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/dlmglue.c')
-rw-r--r--fs/ocfs2/dlmglue.c164
1 files changed, 128 insertions, 36 deletions
diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
index 4e97dcceaf8f..b3068ade3f7b 100644
--- a/fs/ocfs2/dlmglue.c
+++ b/fs/ocfs2/dlmglue.c
@@ -55,7 +55,6 @@
#include "slot_map.h"
#include "super.h"
#include "uptodate.h"
-#include "vote.h"
#include "buffer_head_io.h"
@@ -153,10 +152,10 @@ struct ocfs2_lock_res_ops {
struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *);
/*
- * Optionally called in the downconvert (or "vote") thread
- * after a successful downconvert. The lockres will not be
- * referenced after this callback is called, so it is safe to
- * free memory, etc.
+ * Optionally called in the downconvert thread after a
+ * successful downconvert. The lockres will not be referenced
+ * after this callback is called, so it is safe to free
+ * memory, etc.
*
* The exact semantics of when this is called are controlled
* by ->downconvert_worker()
@@ -310,8 +309,9 @@ static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
"resource %s: %s\n", dlm_errname(_stat), _func, \
_lockres->l_name, dlm_errmsg(_stat)); \
} while (0)
-static void ocfs2_vote_on_unlock(struct ocfs2_super *osb,
- struct ocfs2_lock_res *lockres);
+static int ocfs2_downconvert_thread(void *arg);
+static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
+ struct ocfs2_lock_res *lockres);
static int ocfs2_meta_lock_update(struct inode *inode,
struct buffer_head **bh);
static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
@@ -732,7 +732,7 @@ static void ocfs2_blocking_ast(void *opaque, int level)
wake_up(&lockres->l_event);
- ocfs2_kick_vote_thread(osb);
+ ocfs2_wake_downconvert_thread(osb);
}
static void ocfs2_locking_ast(void *opaque)
@@ -1089,7 +1089,7 @@ static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
mlog_entry_void();
spin_lock_irqsave(&lockres->l_lock, flags);
ocfs2_dec_holders(lockres, level);
- ocfs2_vote_on_unlock(osb, lockres);
+ ocfs2_downconvert_on_unlock(osb, lockres);
spin_unlock_irqrestore(&lockres->l_lock, flags);
mlog_exit_void();
}
@@ -1372,15 +1372,15 @@ int ocfs2_data_lock_with_page(struct inode *inode,
return ret;
}
-static void ocfs2_vote_on_unlock(struct ocfs2_super *osb,
- struct ocfs2_lock_res *lockres)
+static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
+ struct ocfs2_lock_res *lockres)
{
int kick = 0;
mlog_entry_void();
/* If we know that another node is waiting on our lock, kick
- * the vote thread * pre-emptively when we reach a release
+ * the downconvert thread * pre-emptively when we reach a release
* condition. */
if (lockres->l_flags & OCFS2_LOCK_BLOCKED) {
switch(lockres->l_blocking) {
@@ -1398,7 +1398,7 @@ static void ocfs2_vote_on_unlock(struct ocfs2_super *osb,
}
if (kick)
- ocfs2_kick_vote_thread(osb);
+ ocfs2_wake_downconvert_thread(osb);
mlog_exit_void();
}
@@ -1832,19 +1832,20 @@ bail:
}
/*
- * This is working around a lock inversion between tasks acquiring DLM locks
- * while holding a page lock and the vote thread which blocks dlm lock acquiry
- * while acquiring page locks.
+ * This is working around a lock inversion between tasks acquiring DLM
+ * locks while holding a page lock and the downconvert thread which
+ * blocks dlm lock acquiry while acquiring page locks.
*
* ** These _with_page variantes are only intended to be called from aop
* methods that hold page locks and return a very specific *positive* error
* code that aop methods pass up to the VFS -- test for errors with != 0. **
*
- * The DLM is called such that it returns -EAGAIN if it would have blocked
- * waiting for the vote thread. In that case we unlock our page so the vote
- * thread can make progress. Once we've done this we have to return
- * AOP_TRUNCATED_PAGE so the aop method that called us can bubble that back up
- * into the VFS who will then immediately retry the aop call.
+ * The DLM is called such that it returns -EAGAIN if it would have
+ * blocked waiting for the downconvert thread. In that case we unlock
+ * our page so the downconvert thread can make progress. Once we've
+ * done this we have to return AOP_TRUNCATED_PAGE so the aop method
+ * that called us can bubble that back up into the VFS who will then
+ * immediately retry the aop call.
*
* We do a blocking lock and immediate unlock before returning, though, so that
* the lock has a great chance of being cached on this node by the time the VFS
@@ -2320,11 +2321,11 @@ int ocfs2_dlm_init(struct ocfs2_super *osb)
goto bail;
}
- /* launch vote thread */
- osb->vote_task = kthread_run(ocfs2_vote_thread, osb, "ocfs2vote");
- if (IS_ERR(osb->vote_task)) {
- status = PTR_ERR(osb->vote_task);
- osb->vote_task = NULL;
+ /* launch downconvert thread */
+ osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc");
+ if (IS_ERR(osb->dc_task)) {
+ status = PTR_ERR(osb->dc_task);
+ osb->dc_task = NULL;
mlog_errno(status);
goto bail;
}
@@ -2353,8 +2354,8 @@ local:
bail:
if (status < 0) {
ocfs2_dlm_shutdown_debug(osb);
- if (osb->vote_task)
- kthread_stop(osb->vote_task);
+ if (osb->dc_task)
+ kthread_stop(osb->dc_task);
}
mlog_exit(status);
@@ -2369,9 +2370,9 @@ void ocfs2_dlm_shutdown(struct ocfs2_super *osb)
ocfs2_drop_osb_locks(osb);
- if (osb->vote_task) {
- kthread_stop(osb->vote_task);
- osb->vote_task = NULL;
+ if (osb->dc_task) {
+ kthread_stop(osb->dc_task);
+ osb->dc_task = NULL;
}
ocfs2_lock_res_free(&osb->osb_super_lockres);
@@ -2527,7 +2528,7 @@ out:
/* Mark the lockres as being dropped. It will no longer be
* queued if blocking, but we still may have to wait on it
- * being dequeued from the vote thread before we can consider
+ * being dequeued from the downconvert thread before we can consider
* it safe to drop.
*
* You can *not* attempt to call cluster_lock on this lockres anymore. */
@@ -2903,7 +2904,7 @@ static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres)
/*
* Does the final reference drop on our dentry lock. Right now this
- * happens in the vote thread, but we could choose to simplify the
+ * happens in the downconvert thread, but we could choose to simplify the
* dlmglue API and push these off to the ocfs2_wq in the future.
*/
static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
@@ -3042,7 +3043,7 @@ void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
mlog(0, "lockres %s blocked.\n", lockres->l_name);
/* Detect whether a lock has been marked as going away while
- * the vote thread was processing other things. A lock can
+ * the downconvert thread was processing other things. A lock can
* still be marked with OCFS2_LOCK_FREEING after this check,
* but short circuiting here will still save us some
* performance. */
@@ -3091,13 +3092,104 @@ static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
lockres_or_flags(lockres, OCFS2_LOCK_QUEUED);
- spin_lock(&osb->vote_task_lock);
+ spin_lock(&osb->dc_task_lock);
if (list_empty(&lockres->l_blocked_list)) {
list_add_tail(&lockres->l_blocked_list,
&osb->blocked_lock_list);
osb->blocked_lock_count++;
}
- spin_unlock(&osb->vote_task_lock);
+ spin_unlock(&osb->dc_task_lock);
mlog_exit_void();
}
+
+static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb)
+{
+ unsigned long processed;
+ struct ocfs2_lock_res *lockres;
+
+ mlog_entry_void();
+
+ spin_lock(&osb->dc_task_lock);
+ /* grab this early so we know to try again if a state change and
+ * wake happens part-way through our work */
+ osb->dc_work_sequence = osb->dc_wake_sequence;
+
+ processed = osb->blocked_lock_count;
+ while (processed) {
+ BUG_ON(list_empty(&osb->blocked_lock_list));
+
+ lockres = list_entry(osb->blocked_lock_list.next,
+ struct ocfs2_lock_res, l_blocked_list);
+ list_del_init(&lockres->l_blocked_list);
+ osb->blocked_lock_count--;
+ spin_unlock(&osb->dc_task_lock);
+
+ BUG_ON(!processed);
+ processed--;
+
+ ocfs2_process_blocked_lock(osb, lockres);
+
+ spin_lock(&osb->dc_task_lock);
+ }
+ spin_unlock(&osb->dc_task_lock);
+
+ mlog_exit_void();
+}
+
+static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb)
+{
+ int empty = 0;
+
+ spin_lock(&osb->dc_task_lock);
+ if (list_empty(&osb->blocked_lock_list))
+ empty = 1;
+
+ spin_unlock(&osb->dc_task_lock);
+ return empty;
+}
+
+static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb)
+{
+ int should_wake = 0;
+
+ spin_lock(&osb->dc_task_lock);
+ if (osb->dc_work_sequence != osb->dc_wake_sequence)
+ should_wake = 1;
+ spin_unlock(&osb->dc_task_lock);
+
+ return should_wake;
+}
+
+int ocfs2_downconvert_thread(void *arg)
+{
+ int status = 0;
+ struct ocfs2_super *osb = arg;
+
+ /* only quit once we've been asked to stop and there is no more
+ * work available */
+ while (!(kthread_should_stop() &&
+ ocfs2_downconvert_thread_lists_empty(osb))) {
+
+ wait_event_interruptible(osb->dc_event,
+ ocfs2_downconvert_thread_should_wake(osb) ||
+ kthread_should_stop());
+
+ mlog(0, "downconvert_thread: awoken\n");
+
+ ocfs2_downconvert_thread_do_work(osb);
+ }
+
+ osb->dc_task = NULL;
+ return status;
+}
+
+void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb)
+{
+ spin_lock(&osb->dc_task_lock);
+ /* make sure the voting thread gets a swipe at whatever changes
+ * the caller may have made to the voting state */
+ osb->dc_wake_sequence++;
+ spin_unlock(&osb->dc_task_lock);
+ wake_up(&osb->dc_event);
+}