diff options
Diffstat (limited to 'fs/ocfs2/dlm/dlmmaster.c')
-rw-r--r-- | fs/ocfs2/dlm/dlmmaster.c | 103 |
1 files changed, 103 insertions, 0 deletions
diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index 78ac3a00eb54..940be4c13b1f 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -239,6 +239,8 @@ static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm, static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, u8 target); +static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res); int dlm_is_host_down(int errno) @@ -677,6 +679,7 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm, struct dlm_node_iter iter; unsigned int namelen; int tries = 0; + int bit, wait_on_recovery = 0; BUG_ON(!lockid); @@ -762,6 +765,18 @@ lookup: dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0); set_bit(dlm->node_num, mle->maybe_map); list_add(&mle->list, &dlm->master_list); + + /* still holding the dlm spinlock, check the recovery map + * to see if there are any nodes that still need to be + * considered. these will not appear in the mle nodemap + * but they might own this lockres. wait on them. */ + bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0); + if (bit < O2NM_MAX_NODES) { + mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to" + "recover before lock mastery can begin\n", + dlm->name, namelen, (char *)lockid, bit); + wait_on_recovery = 1; + } } /* at this point there is either a DLM_MLE_BLOCK or a @@ -779,6 +794,39 @@ lookup: spin_unlock(&dlm->master_lock); spin_unlock(&dlm->spinlock); + while (wait_on_recovery) { + /* any cluster changes that occurred after dropping the + * dlm spinlock would be detectable be a change on the mle, + * so we only need to clear out the recovery map once. */ + if (dlm_is_recovery_lock(lockid, namelen)) { + mlog(ML_NOTICE, "%s: recovery map is not empty, but " + "must master $RECOVERY lock now\n", dlm->name); + if (!dlm_pre_master_reco_lockres(dlm, res)) + wait_on_recovery = 0; + else { + mlog(0, "%s: waiting 500ms for heartbeat state " + "change\n", dlm->name); + msleep(500); + } + continue; + } + + dlm_kick_recovery_thread(dlm); + msleep(100); + dlm_wait_for_recovery(dlm); + + spin_lock(&dlm->spinlock); + bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0); + if (bit < O2NM_MAX_NODES) { + mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to" + "recover before lock mastery can begin\n", + dlm->name, namelen, (char *)lockid, bit); + wait_on_recovery = 1; + } else + wait_on_recovery = 0; + spin_unlock(&dlm->spinlock); + } + /* must wait for lock to be mastered elsewhere */ if (blocked) goto wait; @@ -1835,6 +1883,61 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data) mlog(0, "finished with dlm_assert_master_worker\n"); } +/* SPECIAL CASE for the $RECOVERY lock used by the recovery thread. + * We cannot wait for node recovery to complete to begin mastering this + * lockres because this lockres is used to kick off recovery! ;-) + * So, do a pre-check on all living nodes to see if any of those nodes + * think that $RECOVERY is currently mastered by a dead node. If so, + * we wait a short time to allow that node to get notified by its own + * heartbeat stack, then check again. All $RECOVERY lock resources + * mastered by dead nodes are purged when the hearbeat callback is + * fired, so we can know for sure that it is safe to continue once + * the node returns a live node or no node. */ +static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res) +{ + struct dlm_node_iter iter; + int nodenum; + int ret = 0; + u8 master = DLM_LOCK_RES_OWNER_UNKNOWN; + + spin_lock(&dlm->spinlock); + dlm_node_iter_init(dlm->domain_map, &iter); + spin_unlock(&dlm->spinlock); + + while ((nodenum = dlm_node_iter_next(&iter)) >= 0) { + /* do not send to self */ + if (nodenum == dlm->node_num) + continue; + ret = dlm_do_master_requery(dlm, res, nodenum, &master); + if (ret < 0) { + mlog_errno(ret); + if (!dlm_is_host_down(ret)) + BUG(); + /* host is down, so answer for that node would be + * DLM_LOCK_RES_OWNER_UNKNOWN. continue. */ + } + + if (master != DLM_LOCK_RES_OWNER_UNKNOWN) { + /* check to see if this master is in the recovery map */ + spin_lock(&dlm->spinlock); + if (test_bit(master, dlm->recovery_map)) { + mlog(ML_NOTICE, "%s: node %u has not seen " + "node %u go down yet, and thinks the " + "dead node is mastering the recovery " + "lock. must wait.\n", dlm->name, + nodenum, master); + ret = -EAGAIN; + } + spin_unlock(&dlm->spinlock); + mlog(0, "%s: reco lock master is %u\n", dlm->name, + master); + break; + } + } + return ret; +} + /* * DLM_MIGRATE_LOCKRES |