summaryrefslogtreecommitdiffstats
path: root/fs/dlm/recover.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/dlm/recover.c')
-rw-r--r--fs/dlm/recover.c295
1 files changed, 190 insertions, 105 deletions
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 7554e4dac6bb..4a7a76e42fc3 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -36,30 +36,23 @@
* (LS_RECOVERY_STOP set due to failure of a node in ls_nodes). When another
* function thinks it could have completed the waited-on task, they should wake
* up ls_wait_general to get an immediate response rather than waiting for the
- * timer to detect the result. A timer wakes us up periodically while waiting
- * to see if we should abort due to a node failure. This should only be called
- * by the dlm_recoverd thread.
+ * timeout. This uses a timeout so it can check periodically if the wait
+ * should abort due to node failure (which doesn't cause a wake_up).
+ * This should only be called by the dlm_recoverd thread.
*/
-static void dlm_wait_timer_fn(unsigned long data)
-{
- struct dlm_ls *ls = (struct dlm_ls *) data;
- mod_timer(&ls->ls_timer, jiffies + (dlm_config.ci_recover_timer * HZ));
- wake_up(&ls->ls_wait_general);
-}
-
int dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls))
{
int error = 0;
+ int rv;
- init_timer(&ls->ls_timer);
- ls->ls_timer.function = dlm_wait_timer_fn;
- ls->ls_timer.data = (long) ls;
- ls->ls_timer.expires = jiffies + (dlm_config.ci_recover_timer * HZ);
- add_timer(&ls->ls_timer);
-
- wait_event(ls->ls_wait_general, testfn(ls) || dlm_recovery_stopped(ls));
- del_timer_sync(&ls->ls_timer);
+ while (1) {
+ rv = wait_event_timeout(ls->ls_wait_general,
+ testfn(ls) || dlm_recovery_stopped(ls),
+ dlm_config.ci_recover_timer * HZ);
+ if (rv)
+ break;
+ }
if (dlm_recovery_stopped(ls)) {
log_debug(ls, "dlm_wait_function aborted");
@@ -277,22 +270,6 @@ static void recover_list_del(struct dlm_rsb *r)
dlm_put_rsb(r);
}
-static struct dlm_rsb *recover_list_find(struct dlm_ls *ls, uint64_t id)
-{
- struct dlm_rsb *r = NULL;
-
- spin_lock(&ls->ls_recover_list_lock);
-
- list_for_each_entry(r, &ls->ls_recover_list, res_recover_list) {
- if (id == (unsigned long) r)
- goto out;
- }
- r = NULL;
- out:
- spin_unlock(&ls->ls_recover_list_lock);
- return r;
-}
-
static void recover_list_clear(struct dlm_ls *ls)
{
struct dlm_rsb *r, *s;
@@ -313,6 +290,94 @@ static void recover_list_clear(struct dlm_ls *ls)
spin_unlock(&ls->ls_recover_list_lock);
}
+static int recover_idr_empty(struct dlm_ls *ls)
+{
+ int empty = 1;
+
+ spin_lock(&ls->ls_recover_idr_lock);
+ if (ls->ls_recover_list_count)
+ empty = 0;
+ spin_unlock(&ls->ls_recover_idr_lock);
+
+ return empty;
+}
+
+static int recover_idr_add(struct dlm_rsb *r)
+{
+ struct dlm_ls *ls = r->res_ls;
+ int rv, id;
+
+ rv = idr_pre_get(&ls->ls_recover_idr, GFP_NOFS);
+ if (!rv)
+ return -ENOMEM;
+
+ spin_lock(&ls->ls_recover_idr_lock);
+ if (r->res_id) {
+ spin_unlock(&ls->ls_recover_idr_lock);
+ return -1;
+ }
+ rv = idr_get_new_above(&ls->ls_recover_idr, r, 1, &id);
+ if (rv) {
+ spin_unlock(&ls->ls_recover_idr_lock);
+ return rv;
+ }
+ r->res_id = id;
+ ls->ls_recover_list_count++;
+ dlm_hold_rsb(r);
+ spin_unlock(&ls->ls_recover_idr_lock);
+ return 0;
+}
+
+static void recover_idr_del(struct dlm_rsb *r)
+{
+ struct dlm_ls *ls = r->res_ls;
+
+ spin_lock(&ls->ls_recover_idr_lock);
+ idr_remove(&ls->ls_recover_idr, r->res_id);
+ r->res_id = 0;
+ ls->ls_recover_list_count--;
+ spin_unlock(&ls->ls_recover_idr_lock);
+
+ dlm_put_rsb(r);
+}
+
+static struct dlm_rsb *recover_idr_find(struct dlm_ls *ls, uint64_t id)
+{
+ struct dlm_rsb *r;
+
+ spin_lock(&ls->ls_recover_idr_lock);
+ r = idr_find(&ls->ls_recover_idr, (int)id);
+ spin_unlock(&ls->ls_recover_idr_lock);
+ return r;
+}
+
+static int recover_idr_clear_rsb(int id, void *p, void *data)
+{
+ struct dlm_ls *ls = data;
+ struct dlm_rsb *r = p;
+
+ r->res_id = 0;
+ r->res_recover_locks_count = 0;
+ ls->ls_recover_list_count--;
+
+ dlm_put_rsb(r);
+ return 0;
+}
+
+static void recover_idr_clear(struct dlm_ls *ls)
+{
+ spin_lock(&ls->ls_recover_idr_lock);
+ idr_for_each(&ls->ls_recover_idr, recover_idr_clear_rsb, ls);
+ idr_remove_all(&ls->ls_recover_idr);
+
+ if (ls->ls_recover_list_count != 0) {
+ log_error(ls, "warning: recover_list_count %d",
+ ls->ls_recover_list_count);
+ ls->ls_recover_list_count = 0;
+ }
+ spin_unlock(&ls->ls_recover_idr_lock);
+}
+
/* Master recovery: find new master node for rsb's that were
mastered on nodes that have been removed.
@@ -361,9 +426,8 @@ static void set_master_lkbs(struct dlm_rsb *r)
* rsb's to consider.
*/
-static void set_new_master(struct dlm_rsb *r, int nodeid)
+static void set_new_master(struct dlm_rsb *r)
{
- r->res_nodeid = nodeid;
set_master_lkbs(r);
rsb_set_flag(r, RSB_NEW_MASTER);
rsb_set_flag(r, RSB_NEW_MASTER2);
@@ -372,31 +436,48 @@ static void set_new_master(struct dlm_rsb *r, int nodeid)
/*
* We do async lookups on rsb's that need new masters. The rsb's
* waiting for a lookup reply are kept on the recover_list.
+ *
+ * Another node recovering the master may have sent us a rcom lookup,
+ * and our dlm_master_lookup() set it as the new master, along with
+ * NEW_MASTER so that we'll recover it here (this implies dir_nodeid
+ * equals our_nodeid below).
*/
-static int recover_master(struct dlm_rsb *r)
+static int recover_master(struct dlm_rsb *r, unsigned int *count)
{
struct dlm_ls *ls = r->res_ls;
- int error, ret_nodeid;
- int our_nodeid = dlm_our_nodeid();
- int dir_nodeid = dlm_dir_nodeid(r);
+ int our_nodeid, dir_nodeid;
+ int is_removed = 0;
+ int error;
+
+ if (is_master(r))
+ return 0;
+
+ is_removed = dlm_is_removed(ls, r->res_nodeid);
+
+ if (!is_removed && !rsb_flag(r, RSB_NEW_MASTER))
+ return 0;
+
+ our_nodeid = dlm_our_nodeid();
+ dir_nodeid = dlm_dir_nodeid(r);
if (dir_nodeid == our_nodeid) {
- error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
- r->res_length, &ret_nodeid);
- if (error)
- log_error(ls, "recover dir lookup error %d", error);
+ if (is_removed) {
+ r->res_master_nodeid = our_nodeid;
+ r->res_nodeid = 0;
+ }
- if (ret_nodeid == our_nodeid)
- ret_nodeid = 0;
- lock_rsb(r);
- set_new_master(r, ret_nodeid);
- unlock_rsb(r);
+ /* set master of lkbs to ourself when is_removed, or to
+ another new master which we set along with NEW_MASTER
+ in dlm_master_lookup */
+ set_new_master(r);
+ error = 0;
} else {
- recover_list_add(r);
+ recover_idr_add(r);
error = dlm_send_rcom_lookup(r, dir_nodeid);
}
+ (*count)++;
return error;
}
@@ -415,7 +496,7 @@ static int recover_master(struct dlm_rsb *r)
* resent.
*/
-static int recover_master_static(struct dlm_rsb *r)
+static int recover_master_static(struct dlm_rsb *r, unsigned int *count)
{
int dir_nodeid = dlm_dir_nodeid(r);
int new_master = dir_nodeid;
@@ -423,11 +504,12 @@ static int recover_master_static(struct dlm_rsb *r)
if (dir_nodeid == dlm_our_nodeid())
new_master = 0;
- lock_rsb(r);
dlm_purge_mstcpy_locks(r);
- set_new_master(r, new_master);
- unlock_rsb(r);
- return 1;
+ r->res_master_nodeid = dir_nodeid;
+ r->res_nodeid = new_master;
+ set_new_master(r);
+ (*count)++;
+ return 0;
}
/*
@@ -443,7 +525,10 @@ static int recover_master_static(struct dlm_rsb *r)
int dlm_recover_masters(struct dlm_ls *ls)
{
struct dlm_rsb *r;
- int error = 0, count = 0;
+ unsigned int total = 0;
+ unsigned int count = 0;
+ int nodir = dlm_no_directory(ls);
+ int error;
log_debug(ls, "dlm_recover_masters");
@@ -455,50 +540,58 @@ int dlm_recover_masters(struct dlm_ls *ls)
goto out;
}
- if (dlm_no_directory(ls))
- count += recover_master_static(r);
- else if (!is_master(r) &&
- (dlm_is_removed(ls, r->res_nodeid) ||
- rsb_flag(r, RSB_NEW_MASTER))) {
- recover_master(r);
- count++;
- }
+ lock_rsb(r);
+ if (nodir)
+ error = recover_master_static(r, &count);
+ else
+ error = recover_master(r, &count);
+ unlock_rsb(r);
+ cond_resched();
+ total++;
- schedule();
+ if (error) {
+ up_read(&ls->ls_root_sem);
+ goto out;
+ }
}
up_read(&ls->ls_root_sem);
- log_debug(ls, "dlm_recover_masters %d resources", count);
+ log_debug(ls, "dlm_recover_masters %u of %u", count, total);
- error = dlm_wait_function(ls, &recover_list_empty);
+ error = dlm_wait_function(ls, &recover_idr_empty);
out:
if (error)
- recover_list_clear(ls);
+ recover_idr_clear(ls);
return error;
}
int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
{
struct dlm_rsb *r;
- int nodeid;
+ int ret_nodeid, new_master;
- r = recover_list_find(ls, rc->rc_id);
+ r = recover_idr_find(ls, rc->rc_id);
if (!r) {
log_error(ls, "dlm_recover_master_reply no id %llx",
(unsigned long long)rc->rc_id);
goto out;
}
- nodeid = rc->rc_result;
- if (nodeid == dlm_our_nodeid())
- nodeid = 0;
+ ret_nodeid = rc->rc_result;
+
+ if (ret_nodeid == dlm_our_nodeid())
+ new_master = 0;
+ else
+ new_master = ret_nodeid;
lock_rsb(r);
- set_new_master(r, nodeid);
+ r->res_master_nodeid = ret_nodeid;
+ r->res_nodeid = new_master;
+ set_new_master(r);
unlock_rsb(r);
- recover_list_del(r);
+ recover_idr_del(r);
- if (recover_list_empty(ls))
+ if (recover_idr_empty(ls))
wake_up(&ls->ls_wait_general);
out:
return 0;
@@ -711,6 +804,7 @@ static void recover_lvb(struct dlm_rsb *r)
static void recover_conversion(struct dlm_rsb *r)
{
+ struct dlm_ls *ls = r->res_ls;
struct dlm_lkb *lkb;
int grmode = -1;
@@ -725,10 +819,15 @@ static void recover_conversion(struct dlm_rsb *r)
list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) {
if (lkb->lkb_grmode != DLM_LOCK_IV)
continue;
- if (grmode == -1)
+ if (grmode == -1) {
+ log_debug(ls, "recover_conversion %x set gr to rq %d",
+ lkb->lkb_id, lkb->lkb_rqmode);
lkb->lkb_grmode = lkb->lkb_rqmode;
- else
+ } else {
+ log_debug(ls, "recover_conversion %x set gr %d",
+ lkb->lkb_id, grmode);
lkb->lkb_grmode = grmode;
+ }
}
}
@@ -791,20 +890,8 @@ int dlm_create_root_list(struct dlm_ls *ls)
dlm_hold_rsb(r);
}
- /* If we're using a directory, add tossed rsbs to the root
- list; they'll have entries created in the new directory,
- but no other recovery steps should do anything with them. */
-
- if (dlm_no_directory(ls)) {
- spin_unlock(&ls->ls_rsbtbl[i].lock);
- continue;
- }
-
- for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = rb_next(n)) {
- r = rb_entry(n, struct dlm_rsb, res_hashnode);
- list_add(&r->res_root_list, &ls->ls_root_list);
- dlm_hold_rsb(r);
- }
+ if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[i].toss))
+ log_error(ls, "dlm_create_root_list toss not empty");
spin_unlock(&ls->ls_rsbtbl[i].lock);
}
out:
@@ -824,28 +911,26 @@ void dlm_release_root_list(struct dlm_ls *ls)
up_write(&ls->ls_root_sem);
}
-/* If not using a directory, clear the entire toss list, there's no benefit to
- caching the master value since it's fixed. If we are using a dir, keep the
- rsb's we're the master of. Recovery will add them to the root list and from
- there they'll be entered in the rebuilt directory. */
-
-void dlm_clear_toss_list(struct dlm_ls *ls)
+void dlm_clear_toss(struct dlm_ls *ls)
{
struct rb_node *n, *next;
- struct dlm_rsb *rsb;
+ struct dlm_rsb *r;
+ unsigned int count = 0;
int i;
for (i = 0; i < ls->ls_rsbtbl_size; i++) {
spin_lock(&ls->ls_rsbtbl[i].lock);
for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) {
- next = rb_next(n);;
- rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
- if (dlm_no_directory(ls) || !is_master(rsb)) {
- rb_erase(n, &ls->ls_rsbtbl[i].toss);
- dlm_free_rsb(rsb);
- }
+ next = rb_next(n);
+ r = rb_entry(n, struct dlm_rsb, res_hashnode);
+ rb_erase(n, &ls->ls_rsbtbl[i].toss);
+ dlm_free_rsb(r);
+ count++;
}
spin_unlock(&ls->ls_rsbtbl[i].lock);
}
+
+ if (count)
+ log_debug(ls, "dlm_clear_toss %u done", count);
}