summaryrefslogtreecommitdiffstats
path: root/fs/dlm/lock.c
diff options
context:
space:
mode:
authorDavid Teigland <teigland@redhat.com>2007-09-27 22:53:38 +0200
committerSteven Whitehouse <swhiteho@redhat.com>2007-10-10 09:56:38 +0200
commitc36258b5925e6cf6bf72904635100593573bfcff (patch)
tree565f1ce29a7f8a2cd1c25f2d36c932727adbdbc2 /fs/dlm/lock.c
parent[DLM] don't overwrite castparam if it's NULL (diff)
downloadlinux-c36258b5925e6cf6bf72904635100593573bfcff.tar.xz
linux-c36258b5925e6cf6bf72904635100593573bfcff.zip
[DLM] block dlm_recv in recovery transition
Introduce a per-lockspace rwsem that's held in read mode by dlm_recv threads while working in the dlm. This allows dlm_recv activity to be suspended when the lockspace transitions to, from and between recovery cycles. The specific bug prompting this change is one where an in-progress recovery cycle is aborted by a new recovery cycle. While dlm_recv was processing a recovery message, the recovery cycle was aborted and dlm_recoverd began cleaning up. dlm_recv decremented recover_locks_count on an rsb after dlm_recoverd had reset it to zero. This is fixed by suspending dlm_recv (taking write lock on the rwsem) before aborting the current recovery. The transitions to/from normal and recovery modes are simplified by using this new ability to block dlm_recv. The switch from normal to recovery mode means dlm_recv goes from processing locking messages, to saving them for later, and vice versa. Races are avoided by blocking dlm_recv when setting the flag that switches between modes. Signed-off-by: David Teigland <teigland@redhat.com> Signed-off-by: Steven Whitehouse <swhiteho@redhat.com>
Diffstat (limited to 'fs/dlm/lock.c')
-rw-r--r--fs/dlm/lock.c136
1 files changed, 81 insertions, 55 deletions
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 031229f144fa..3915b8e14146 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -3638,55 +3638,8 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
dlm_put_lkb(lkb);
}
-int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
+static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
{
- struct dlm_message *ms = (struct dlm_message *) hd;
- struct dlm_ls *ls;
- int error = 0;
-
- if (!recovery)
- dlm_message_in(ms);
-
- ls = dlm_find_lockspace_global(hd->h_lockspace);
- if (!ls) {
- log_print("drop message %d from %d for unknown lockspace %d",
- ms->m_type, nodeid, hd->h_lockspace);
- return -EINVAL;
- }
-
- /* recovery may have just ended leaving a bunch of backed-up requests
- in the requestqueue; wait while dlm_recoverd clears them */
-
- if (!recovery)
- dlm_wait_requestqueue(ls);
-
- /* recovery may have just started while there were a bunch of
- in-flight requests -- save them in requestqueue to be processed
- after recovery. we can't let dlm_recvd block on the recovery
- lock. if dlm_recoverd is calling this function to clear the
- requestqueue, it needs to be interrupted (-EINTR) if another
- recovery operation is starting. */
-
- while (1) {
- if (dlm_locking_stopped(ls)) {
- if (recovery) {
- error = -EINTR;
- goto out;
- }
- error = dlm_add_requestqueue(ls, nodeid, hd);
- if (error == -EAGAIN)
- continue;
- else {
- error = -EINTR;
- goto out;
- }
- }
-
- if (dlm_lock_recovery_try(ls))
- break;
- schedule();
- }
-
switch (ms->m_type) {
/* messages sent to a master node */
@@ -3761,17 +3714,90 @@ int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
log_error(ls, "unknown message type %d", ms->m_type);
}
- dlm_unlock_recovery(ls);
- out:
- dlm_put_lockspace(ls);
dlm_astd_wake();
- return error;
}
+/* If the lockspace is in recovery mode (locking stopped), then normal
+ messages are saved on the requestqueue for processing after recovery is
+ done. When not in recovery mode, we wait for dlm_recoverd to drain saved
+ messages off the requestqueue before we process new ones. This occurs right
+ after recovery completes when we transition from saving all messages on
+ requestqueue, to processing all the saved messages, to processing new
+ messages as they arrive. */
-/*
- * Recovery related
- */
+static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
+ int nodeid)
+{
+ if (dlm_locking_stopped(ls)) {
+ dlm_add_requestqueue(ls, nodeid, (struct dlm_header *) ms);
+ } else {
+ dlm_wait_requestqueue(ls);
+ _receive_message(ls, ms);
+ }
+}
+
+/* This is called by dlm_recoverd to process messages that were saved on
+ the requestqueue. */
+
+void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
+{
+ _receive_message(ls, ms);
+}
+
+/* This is called by the midcomms layer when something is received for
+ the lockspace. It could be either a MSG (normal message sent as part of
+ standard locking activity) or an RCOM (recovery message sent as part of
+ lockspace recovery). */
+
+void dlm_receive_buffer(struct dlm_header *hd, int nodeid)
+{
+ struct dlm_message *ms = (struct dlm_message *) hd;
+ struct dlm_rcom *rc = (struct dlm_rcom *) hd;
+ struct dlm_ls *ls;
+ int type = 0;
+
+ switch (hd->h_cmd) {
+ case DLM_MSG:
+ dlm_message_in(ms);
+ type = ms->m_type;
+ break;
+ case DLM_RCOM:
+ dlm_rcom_in(rc);
+ type = rc->rc_type;
+ break;
+ default:
+ log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
+ return;
+ }
+
+ if (hd->h_nodeid != nodeid) {
+ log_print("invalid h_nodeid %d from %d lockspace %x",
+ hd->h_nodeid, nodeid, hd->h_lockspace);
+ return;
+ }
+
+ ls = dlm_find_lockspace_global(hd->h_lockspace);
+ if (!ls) {
+ log_print("invalid h_lockspace %x from %d cmd %d type %d",
+ hd->h_lockspace, nodeid, hd->h_cmd, type);
+
+ if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
+ dlm_send_ls_not_ready(nodeid, rc);
+ return;
+ }
+
+ /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
+ be inactive (in this ls) before transitioning to recovery mode */
+
+ down_read(&ls->ls_recv_active);
+ if (hd->h_cmd == DLM_MSG)
+ dlm_receive_message(ls, ms, nodeid);
+ else
+ dlm_receive_rcom(ls, rc, nodeid);
+ up_read(&ls->ls_recv_active);
+
+ dlm_put_lockspace(ls);
+}
static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
{