diff options
author | Steven Whitehouse <steve@men-an-tol.chygwyn.com> | 2006-02-23 10:49:43 +0100 |
---|---|---|
committer | Steven Whitehouse <swhiteho@redhat.com> | 2006-02-23 10:49:43 +0100 |
commit | d35462b4bb847b68321c55e95c926aa485aecce2 (patch) | |
tree | b08e18bf6e672633402871ee763102fdb5e63229 /fs/ocfs2/dlm/dlmrecovery.c | |
parent | [GFS2] Missed deletion of debugging code (diff) | |
parent | Merge master.kernel.org:/pub/scm/linux/kernel/git/davem/sparc-2.6 (diff) | |
download | linux-d35462b4bb847b68321c55e95c926aa485aecce2.tar.xz linux-d35462b4bb847b68321c55e95c926aa485aecce2.zip |
Merge branch 'master'
Diffstat (limited to 'fs/ocfs2/dlm/dlmrecovery.c')
-rw-r--r-- | fs/ocfs2/dlm/dlmrecovery.c | 292 |
1 files changed, 255 insertions, 37 deletions
diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index 0c8eb1093f00..ed76bda1a534 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -39,6 +39,7 @@ #include <linux/inet.h> #include <linux/timer.h> #include <linux/kthread.h> +#include <linux/delay.h> #include "cluster/heartbeat.h" @@ -256,6 +257,45 @@ static int dlm_recovery_thread(void *data) return 0; } +/* returns true when the recovery master has contacted us */ +static int dlm_reco_master_ready(struct dlm_ctxt *dlm) +{ + int ready; + spin_lock(&dlm->spinlock); + ready = (dlm->reco.new_master != O2NM_INVALID_NODE_NUM); + spin_unlock(&dlm->spinlock); + return ready; +} + +/* returns true if node is no longer in the domain + * could be dead or just not joined */ +int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node) +{ + int dead; + spin_lock(&dlm->spinlock); + dead = test_bit(node, dlm->domain_map); + spin_unlock(&dlm->spinlock); + return dead; +} + +int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout) +{ + if (timeout) { + mlog(ML_NOTICE, "%s: waiting %dms for notification of " + "death of node %u\n", dlm->name, timeout, node); + wait_event_timeout(dlm->dlm_reco_thread_wq, + dlm_is_node_dead(dlm, node), + msecs_to_jiffies(timeout)); + } else { + mlog(ML_NOTICE, "%s: waiting indefinitely for notification " + "of death of node %u\n", dlm->name, node); + wait_event(dlm->dlm_reco_thread_wq, + dlm_is_node_dead(dlm, node)); + } + /* for now, return 0 */ + return 0; +} + /* callers of the top-level api calls (dlmlock/dlmunlock) should * block on the dlm->reco.event when recovery is in progress. * the dlm recovery thread will set this state when it begins @@ -297,6 +337,7 @@ static void dlm_end_recovery(struct dlm_ctxt *dlm) static int dlm_do_recovery(struct dlm_ctxt *dlm) { int status = 0; + int ret; spin_lock(&dlm->spinlock); @@ -343,10 +384,13 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm) goto master_here; if (dlm->reco.new_master == O2NM_INVALID_NODE_NUM) { - /* choose a new master */ - if (!dlm_pick_recovery_master(dlm)) { + /* choose a new master, returns 0 if this node + * is the master, -EEXIST if it's another node. + * this does not return until a new master is chosen + * or recovery completes entirely. */ + ret = dlm_pick_recovery_master(dlm); + if (!ret) { /* already notified everyone. go. */ - dlm->reco.new_master = dlm->node_num; goto master_here; } mlog(0, "another node will master this recovery session.\n"); @@ -371,8 +415,13 @@ master_here: if (status < 0) { mlog(ML_ERROR, "error %d remastering locks for node %u, " "retrying.\n", status, dlm->reco.dead_node); + /* yield a bit to allow any final network messages + * to get handled on remaining nodes */ + msleep(100); } else { /* success! see if any other nodes need recovery */ + mlog(0, "DONE mastering recovery of %s:%u here(this=%u)!\n", + dlm->name, dlm->reco.dead_node, dlm->node_num); dlm_reset_recovery(dlm); } dlm_end_recovery(dlm); @@ -477,7 +526,7 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node) BUG(); break; case DLM_RECO_NODE_DATA_DEAD: - mlog(0, "node %u died after " + mlog(ML_NOTICE, "node %u died after " "requesting recovery info for " "node %u\n", ndata->node_num, dead_node); @@ -485,6 +534,19 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node) // start all over destroy = 1; status = -EAGAIN; + /* instead of spinning like crazy here, + * wait for the domain map to catch up + * with the network state. otherwise this + * can be hit hundreds of times before + * the node is really seen as dead. */ + wait_event_timeout(dlm->dlm_reco_thread_wq, + dlm_is_node_dead(dlm, + ndata->node_num), + msecs_to_jiffies(1000)); + mlog(0, "waited 1 sec for %u, " + "dead? %s\n", ndata->node_num, + dlm_is_node_dead(dlm, ndata->node_num) ? + "yes" : "no"); goto leave; case DLM_RECO_NODE_DATA_RECEIVING: case DLM_RECO_NODE_DATA_REQUESTED: @@ -678,11 +740,27 @@ static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data) dlm = item->dlm; dead_node = item->u.ral.dead_node; reco_master = item->u.ral.reco_master; + mres = (struct dlm_migratable_lockres *)data; + + if (dead_node != dlm->reco.dead_node || + reco_master != dlm->reco.new_master) { + /* show extra debug info if the recovery state is messed */ + mlog(ML_ERROR, "%s: bad reco state: reco(dead=%u, master=%u), " + "request(dead=%u, master=%u)\n", + dlm->name, dlm->reco.dead_node, dlm->reco.new_master, + dead_node, reco_master); + mlog(ML_ERROR, "%s: name=%.*s master=%u locks=%u/%u flags=%u " + "entry[0]={c=%"MLFu64",l=%u,f=%u,t=%d,ct=%d,hb=%d,n=%u}\n", + dlm->name, mres->lockname_len, mres->lockname, mres->master, + mres->num_locks, mres->total_locks, mres->flags, + mres->ml[0].cookie, mres->ml[0].list, mres->ml[0].flags, + mres->ml[0].type, mres->ml[0].convert_type, + mres->ml[0].highest_blocked, mres->ml[0].node); + BUG(); + } BUG_ON(dead_node != dlm->reco.dead_node); BUG_ON(reco_master != dlm->reco.new_master); - mres = (struct dlm_migratable_lockres *)data; - /* lock resources should have already been moved to the * dlm->reco.resources list. now move items from that list * to a temp list if the dead owner matches. note that the @@ -757,15 +835,18 @@ int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data) continue; switch (ndata->state) { + /* should have moved beyond INIT but not to FINALIZE yet */ case DLM_RECO_NODE_DATA_INIT: case DLM_RECO_NODE_DATA_DEAD: - case DLM_RECO_NODE_DATA_DONE: case DLM_RECO_NODE_DATA_FINALIZE_SENT: mlog(ML_ERROR, "bad ndata state for node %u:" " state=%d\n", ndata->node_num, ndata->state); BUG(); break; + /* these states are possible at this point, anywhere along + * the line of recovery */ + case DLM_RECO_NODE_DATA_DONE: case DLM_RECO_NODE_DATA_RECEIVING: case DLM_RECO_NODE_DATA_REQUESTED: case DLM_RECO_NODE_DATA_REQUESTING: @@ -799,13 +880,31 @@ static void dlm_move_reco_locks_to_list(struct dlm_ctxt *dlm, { struct dlm_lock_resource *res; struct list_head *iter, *iter2; + struct dlm_lock *lock; spin_lock(&dlm->spinlock); list_for_each_safe(iter, iter2, &dlm->reco.resources) { res = list_entry (iter, struct dlm_lock_resource, recovering); + /* always prune any $RECOVERY entries for dead nodes, + * otherwise hangs can occur during later recovery */ if (dlm_is_recovery_lock(res->lockname.name, - res->lockname.len)) + res->lockname.len)) { + spin_lock(&res->spinlock); + list_for_each_entry(lock, &res->granted, list) { + if (lock->ml.node == dead_node) { + mlog(0, "AHA! there was " + "a $RECOVERY lock for dead " + "node %u (%s)!\n", + dead_node, dlm->name); + list_del_init(&lock->list); + dlm_lock_put(lock); + break; + } + } + spin_unlock(&res->spinlock); continue; + } + if (res->owner == dead_node) { mlog(0, "found lockres owned by dead node while " "doing recovery for node %u. sending it.\n", @@ -1179,7 +1278,7 @@ static void dlm_mig_lockres_worker(struct dlm_work_item *item, void *data) again: ret = dlm_lockres_master_requery(dlm, res, &real_master); if (ret < 0) { - mlog(0, "dlm_lockres_master_requery failure: %d\n", + mlog(0, "dlm_lockres_master_requery ret=%d\n", ret); goto again; } @@ -1757,6 +1856,7 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node) struct dlm_lock_resource *res; int i; struct list_head *bucket; + struct dlm_lock *lock; /* purge any stale mles */ @@ -1780,10 +1880,25 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node) bucket = &(dlm->resources[i]); list_for_each(iter, bucket) { res = list_entry (iter, struct dlm_lock_resource, list); + /* always prune any $RECOVERY entries for dead nodes, + * otherwise hangs can occur during later recovery */ if (dlm_is_recovery_lock(res->lockname.name, - res->lockname.len)) + res->lockname.len)) { + spin_lock(&res->spinlock); + list_for_each_entry(lock, &res->granted, list) { + if (lock->ml.node == dead_node) { + mlog(0, "AHA! there was " + "a $RECOVERY lock for dead " + "node %u (%s)!\n", + dead_node, dlm->name); + list_del_init(&lock->list); + dlm_lock_put(lock); + break; + } + } + spin_unlock(&res->spinlock); continue; - + } spin_lock(&res->spinlock); /* zero the lvb if necessary */ dlm_revalidate_lvb(dlm, res, dead_node); @@ -1869,12 +1984,9 @@ void dlm_hb_node_up_cb(struct o2nm_node *node, int idx, void *data) return; spin_lock(&dlm->spinlock); - set_bit(idx, dlm->live_nodes_map); - - /* notify any mles attached to the heartbeat events */ - dlm_hb_event_notify_attached(dlm, idx, 1); - + /* do NOT notify mle attached to the heartbeat events. + * new nodes are not interesting in mastery until joined. */ spin_unlock(&dlm->spinlock); dlm_put(dlm); @@ -1897,7 +2009,18 @@ static void dlm_reco_unlock_ast(void *astdata, enum dlm_status st) mlog(0, "unlockast for recovery lock fired!\n"); } - +/* + * dlm_pick_recovery_master will continually attempt to use + * dlmlock() on the special "$RECOVERY" lockres with the + * LKM_NOQUEUE flag to get an EX. every thread that enters + * this function on each node racing to become the recovery + * master will not stop attempting this until either: + * a) this node gets the EX (and becomes the recovery master), + * or b) dlm->reco.new_master gets set to some nodenum + * != O2NM_INVALID_NODE_NUM (another node will do the reco). + * so each time a recovery master is needed, the entire cluster + * will sync at this point. if the new master dies, that will + * be detected in dlm_do_recovery */ static int dlm_pick_recovery_master(struct dlm_ctxt *dlm) { enum dlm_status ret; @@ -1906,23 +2029,69 @@ static int dlm_pick_recovery_master(struct dlm_ctxt *dlm) mlog(0, "starting recovery of %s at %lu, dead=%u, this=%u\n", dlm->name, jiffies, dlm->reco.dead_node, dlm->node_num); -retry: +again: memset(&lksb, 0, sizeof(lksb)); ret = dlmlock(dlm, LKM_EXMODE, &lksb, LKM_NOQUEUE|LKM_RECOVERY, DLM_RECOVERY_LOCK_NAME, dlm_reco_ast, dlm, dlm_reco_bast); + mlog(0, "%s: dlmlock($RECOVERY) returned %d, lksb=%d\n", + dlm->name, ret, lksb.status); + if (ret == DLM_NORMAL) { mlog(0, "dlm=%s dlmlock says I got it (this=%u)\n", dlm->name, dlm->node_num); - /* I am master, send message to all nodes saying - * that I am beginning a recovery session */ - status = dlm_send_begin_reco_message(dlm, - dlm->reco.dead_node); + + /* got the EX lock. check to see if another node + * just became the reco master */ + if (dlm_reco_master_ready(dlm)) { + mlog(0, "%s: got reco EX lock, but %u will " + "do the recovery\n", dlm->name, + dlm->reco.new_master); + status = -EEXIST; + } else { + status = 0; + + /* see if recovery was already finished elsewhere */ + spin_lock(&dlm->spinlock); + if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) { + status = -EINVAL; + mlog(0, "%s: got reco EX lock, but " + "node got recovered already\n", dlm->name); + if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) { + mlog(ML_ERROR, "%s: new master is %u " + "but no dead node!\n", + dlm->name, dlm->reco.new_master); + BUG(); + } + } + spin_unlock(&dlm->spinlock); + } + + /* if this node has actually become the recovery master, + * set the master and send the messages to begin recovery */ + if (!status) { + mlog(0, "%s: dead=%u, this=%u, sending " + "begin_reco now\n", dlm->name, + dlm->reco.dead_node, dlm->node_num); + status = dlm_send_begin_reco_message(dlm, + dlm->reco.dead_node); + /* this always succeeds */ + BUG_ON(status); + + /* set the new_master to this node */ + spin_lock(&dlm->spinlock); + dlm->reco.new_master = dlm->node_num; + spin_unlock(&dlm->spinlock); + } /* recovery lock is a special case. ast will not get fired, * so just go ahead and unlock it. */ ret = dlmunlock(dlm, &lksb, 0, dlm_reco_unlock_ast, dlm); + if (ret == DLM_DENIED) { + mlog(0, "got DLM_DENIED, trying LKM_CANCEL\n"); + ret = dlmunlock(dlm, &lksb, LKM_CANCEL, dlm_reco_unlock_ast, dlm); + } if (ret != DLM_NORMAL) { /* this would really suck. this could only happen * if there was a network error during the unlock @@ -1930,20 +2099,42 @@ retry: * is actually "done" and the lock structure is * even freed. we can continue, but only * because this specific lock name is special. */ - mlog(0, "dlmunlock returned %d\n", ret); - } - - if (status < 0) { - mlog(0, "failed to send recovery message. " - "must retry with new node map.\n"); - goto retry; + mlog(ML_ERROR, "dlmunlock returned %d\n", ret); } } else if (ret == DLM_NOTQUEUED) { mlog(0, "dlm=%s dlmlock says another node got it (this=%u)\n", dlm->name, dlm->node_num); /* another node is master. wait on - * reco.new_master != O2NM_INVALID_NODE_NUM */ + * reco.new_master != O2NM_INVALID_NODE_NUM + * for at most one second */ + wait_event_timeout(dlm->dlm_reco_thread_wq, + dlm_reco_master_ready(dlm), + msecs_to_jiffies(1000)); + if (!dlm_reco_master_ready(dlm)) { + mlog(0, "%s: reco master taking awhile\n", + dlm->name); + goto again; + } + /* another node has informed this one that it is reco master */ + mlog(0, "%s: reco master %u is ready to recover %u\n", + dlm->name, dlm->reco.new_master, dlm->reco.dead_node); status = -EEXIST; + } else { + struct dlm_lock_resource *res; + + /* dlmlock returned something other than NOTQUEUED or NORMAL */ + mlog(ML_ERROR, "%s: got %s from dlmlock($RECOVERY), " + "lksb.status=%s\n", dlm->name, dlm_errname(ret), + dlm_errname(lksb.status)); + res = dlm_lookup_lockres(dlm, DLM_RECOVERY_LOCK_NAME, + DLM_RECOVERY_LOCK_NAME_LEN); + if (res) { + dlm_print_one_lock_resource(res); + dlm_lockres_put(res); + } else { + mlog(ML_ERROR, "recovery lock not found\n"); + } + BUG(); } return status; @@ -1982,7 +2173,7 @@ static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node) mlog(0, "not sending begin reco to self\n"); continue; } - +retry: ret = -EINVAL; mlog(0, "attempting to send begin reco msg to %d\n", nodenum); @@ -1991,8 +2182,17 @@ static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node) /* negative status is handled ok by caller here */ if (ret >= 0) ret = status; + if (dlm_is_host_down(ret)) { + /* node is down. not involved in recovery + * so just keep going */ + mlog(0, "%s: node %u was down when sending " + "begin reco msg (%d)\n", dlm->name, nodenum, ret); + ret = 0; + } if (ret < 0) { struct dlm_lock_resource *res; + /* this is now a serious problem, possibly ENOMEM + * in the network stack. must retry */ mlog_errno(ret); mlog(ML_ERROR, "begin reco of dlm %s to node %u " " returned %d\n", dlm->name, nodenum, ret); @@ -2004,7 +2204,10 @@ static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node) } else { mlog(ML_ERROR, "recovery lock not found\n"); } - break; + /* sleep for a bit in hopes that we can avoid + * another ENOMEM */ + msleep(100); + goto retry; } } @@ -2027,19 +2230,34 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data) spin_lock(&dlm->spinlock); if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) { - mlog(0, "new_master already set to %u!\n", - dlm->reco.new_master); + if (test_bit(dlm->reco.new_master, dlm->recovery_map)) { + mlog(0, "%s: new_master %u died, changing " + "to %u\n", dlm->name, dlm->reco.new_master, + br->node_idx); + } else { + mlog(0, "%s: new_master %u NOT DEAD, changing " + "to %u\n", dlm->name, dlm->reco.new_master, + br->node_idx); + /* may not have seen the new master as dead yet */ + } } if (dlm->reco.dead_node != O2NM_INVALID_NODE_NUM) { - mlog(0, "dead_node already set to %u!\n", - dlm->reco.dead_node); + mlog(ML_NOTICE, "%s: dead_node previously set to %u, " + "node %u changing it to %u\n", dlm->name, + dlm->reco.dead_node, br->node_idx, br->dead_node); } dlm->reco.new_master = br->node_idx; dlm->reco.dead_node = br->dead_node; if (!test_bit(br->dead_node, dlm->recovery_map)) { - mlog(ML_ERROR, "recovery master %u sees %u as dead, but this " + mlog(0, "recovery master %u sees %u as dead, but this " "node has not yet. marking %u as dead\n", br->node_idx, br->dead_node, br->dead_node); + if (!test_bit(br->dead_node, dlm->domain_map) || + !test_bit(br->dead_node, dlm->live_nodes_map)) + mlog(0, "%u not in domain/live_nodes map " + "so setting it in reco map manually\n", + br->dead_node); + set_bit(br->dead_node, dlm->recovery_map); __dlm_hb_node_down(dlm, br->dead_node); } spin_unlock(&dlm->spinlock); |