summaryrefslogtreecommitdiffstats
path: root/fs/dlm/rcom.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/dlm/rcom.c')
-rw-r--r--fs/dlm/rcom.c147
1 files changed, 112 insertions, 35 deletions
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index 64d3e2b958c7..87f1a56eab32 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -23,8 +23,6 @@
#include "memory.h"
#include "lock.h"
#include "util.h"
-#include "member.h"
-
static int rcom_response(struct dlm_ls *ls)
{
@@ -275,19 +273,9 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
struct dlm_rcom *rc;
struct dlm_mhandle *mh;
int error = 0;
- int max_size = dlm_config.ci_buffer_size - sizeof(struct dlm_rcom);
ls->ls_recover_nodeid = nodeid;
- if (nodeid == dlm_our_nodeid()) {
- ls->ls_recover_buf->rc_header.h_length =
- dlm_config.ci_buffer_size;
- dlm_copy_master_names(ls, last_name, last_len,
- ls->ls_recover_buf->rc_buf,
- max_size, nodeid);
- goto out;
- }
-
error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, &rc, &mh);
if (error)
goto out;
@@ -337,7 +325,26 @@ int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
if (error)
goto out;
memcpy(rc->rc_buf, r->res_name, r->res_length);
- rc->rc_id = (unsigned long) r;
+ rc->rc_id = (unsigned long) r->res_id;
+
+ send_rcom(ls, mh, rc);
+ out:
+ return error;
+}
+
+int dlm_send_rcom_lookup_dump(struct dlm_rsb *r, int to_nodeid)
+{
+ struct dlm_rcom *rc;
+ struct dlm_mhandle *mh;
+ struct dlm_ls *ls = r->res_ls;
+ int error;
+
+ error = create_rcom(ls, to_nodeid, DLM_RCOM_LOOKUP, r->res_length,
+ &rc, &mh);
+ if (error)
+ goto out;
+ memcpy(rc->rc_buf, r->res_name, r->res_length);
+ rc->rc_id = 0xFFFFFFFF;
send_rcom(ls, mh, rc);
out:
@@ -355,7 +362,14 @@ static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
if (error)
return;
- error = dlm_dir_lookup(ls, nodeid, rc_in->rc_buf, len, &ret_nodeid);
+ if (rc_in->rc_id == 0xFFFFFFFF) {
+ log_error(ls, "receive_rcom_lookup dump from %d", nodeid);
+ dlm_dump_rsb_name(ls, rc_in->rc_buf, len);
+ return;
+ }
+
+ error = dlm_master_lookup(ls, nodeid, rc_in->rc_buf, len,
+ DLM_LU_RECOVER_MASTER, &ret_nodeid, NULL);
if (error)
ret_nodeid = error;
rc->rc_result = ret_nodeid;
@@ -486,17 +500,76 @@ int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
return 0;
}
+/*
+ * Ignore messages for stage Y before we set
+ * recover_status bit for stage X:
+ *
+ * recover_status = 0
+ *
+ * dlm_recover_members()
+ * - send nothing
+ * - recv nothing
+ * - ignore NAMES, NAMES_REPLY
+ * - ignore LOOKUP, LOOKUP_REPLY
+ * - ignore LOCK, LOCK_REPLY
+ *
+ * recover_status |= NODES
+ *
+ * dlm_recover_members_wait()
+ *
+ * dlm_recover_directory()
+ * - send NAMES
+ * - recv NAMES_REPLY
+ * - ignore LOOKUP, LOOKUP_REPLY
+ * - ignore LOCK, LOCK_REPLY
+ *
+ * recover_status |= DIR
+ *
+ * dlm_recover_directory_wait()
+ *
+ * dlm_recover_masters()
+ * - send LOOKUP
+ * - recv LOOKUP_REPLY
+ *
+ * dlm_recover_locks()
+ * - send LOCKS
+ * - recv LOCKS_REPLY
+ *
+ * recover_status |= LOCKS
+ *
+ * dlm_recover_locks_wait()
+ *
+ * recover_status |= DONE
+ */
+
/* Called by dlm_recv; corresponds to dlm_receive_message() but special
recovery-only comms are sent through here. */
void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
{
int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
- int stop, reply = 0, lock = 0;
+ int stop, reply = 0, names = 0, lookup = 0, lock = 0;
uint32_t status;
uint64_t seq;
switch (rc->rc_type) {
+ case DLM_RCOM_STATUS_REPLY:
+ reply = 1;
+ break;
+ case DLM_RCOM_NAMES:
+ names = 1;
+ break;
+ case DLM_RCOM_NAMES_REPLY:
+ names = 1;
+ reply = 1;
+ break;
+ case DLM_RCOM_LOOKUP:
+ lookup = 1;
+ break;
+ case DLM_RCOM_LOOKUP_REPLY:
+ lookup = 1;
+ reply = 1;
+ break;
case DLM_RCOM_LOCK:
lock = 1;
break;
@@ -504,10 +577,6 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
lock = 1;
reply = 1;
break;
- case DLM_RCOM_STATUS_REPLY:
- case DLM_RCOM_NAMES_REPLY:
- case DLM_RCOM_LOOKUP_REPLY:
- reply = 1;
};
spin_lock(&ls->ls_recover_lock);
@@ -516,19 +585,17 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
seq = ls->ls_recover_seq;
spin_unlock(&ls->ls_recover_lock);
- if ((stop && (rc->rc_type != DLM_RCOM_STATUS)) ||
- (reply && (rc->rc_seq_reply != seq)) ||
- (lock && !(status & DLM_RS_DIR))) {
- log_limit(ls, "dlm_receive_rcom ignore msg %d "
- "from %d %llu %llu recover seq %llu sts %x gen %u",
- rc->rc_type,
- nodeid,
- (unsigned long long)rc->rc_seq,
- (unsigned long long)rc->rc_seq_reply,
- (unsigned long long)seq,
- status, ls->ls_generation);
- goto out;
- }
+ if (stop && (rc->rc_type != DLM_RCOM_STATUS))
+ goto ignore;
+
+ if (reply && (rc->rc_seq_reply != seq))
+ goto ignore;
+
+ if (!(status & DLM_RS_NODES) && (names || lookup || lock))
+ goto ignore;
+
+ if (!(status & DLM_RS_DIR) && (lookup || lock))
+ goto ignore;
switch (rc->rc_type) {
case DLM_RCOM_STATUS:
@@ -570,10 +637,20 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
default:
log_error(ls, "receive_rcom bad type %d", rc->rc_type);
}
-out:
+ return;
+
+ignore:
+ log_limit(ls, "dlm_receive_rcom ignore msg %d "
+ "from %d %llu %llu recover seq %llu sts %x gen %u",
+ rc->rc_type,
+ nodeid,
+ (unsigned long long)rc->rc_seq,
+ (unsigned long long)rc->rc_seq_reply,
+ (unsigned long long)seq,
+ status, ls->ls_generation);
return;
Eshort:
- log_error(ls, "recovery message %x from %d is too short",
- rc->rc_type, nodeid);
+ log_error(ls, "recovery message %d from %d is too short",
+ rc->rc_type, nodeid);
}