diff options
Diffstat (limited to 'fs/dlm/lock.c')
-rw-r--r-- | fs/dlm/lock.c | 654 |
1 files changed, 348 insertions, 306 deletions
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index bdb51d209ba2..226822f49d30 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -350,10 +350,12 @@ static void put_rsb(struct dlm_rsb *r) { struct dlm_ls *ls = r->res_ls; uint32_t bucket = r->res_bucket; + int rv; - spin_lock(&ls->ls_rsbtbl[bucket].lock); - kref_put(&r->res_ref, toss_rsb); - spin_unlock(&ls->ls_rsbtbl[bucket].lock); + rv = kref_put_lock(&r->res_ref, toss_rsb, + &ls->ls_rsbtbl[bucket].lock); + if (rv) + spin_unlock(&ls->ls_rsbtbl[bucket].lock); } void dlm_put_rsb(struct dlm_rsb *r) @@ -602,7 +604,6 @@ static int find_rsb_dir(struct dlm_ls *ls, char *name, int len, */ kref_get(&r->res_ref); - error = 0; goto out_unlock; @@ -880,6 +881,88 @@ static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r, } } +static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid, + int from_nodeid, bool toss_list, unsigned int flags, + int *r_nodeid, int *result) +{ + int fix_master = (flags & DLM_LU_RECOVER_MASTER); + int from_master = (flags & DLM_LU_RECOVER_DIR); + + if (r->res_dir_nodeid != our_nodeid) { + /* should not happen, but may as well fix it and carry on */ + log_error(ls, "%s res_dir %d our %d %s", __func__, + r->res_dir_nodeid, our_nodeid, r->res_name); + r->res_dir_nodeid = our_nodeid; + } + + if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) { + /* Recovery uses this function to set a new master when + * the previous master failed. Setting NEW_MASTER will + * force dlm_recover_masters to call recover_master on this + * rsb even though the res_nodeid is no longer removed. + */ + + r->res_master_nodeid = from_nodeid; + r->res_nodeid = from_nodeid; + rsb_set_flag(r, RSB_NEW_MASTER); + + if (toss_list) { + /* I don't think we should ever find it on toss list. */ + log_error(ls, "%s fix_master on toss", __func__); + dlm_dump_rsb(r); + } + } + + if (from_master && (r->res_master_nodeid != from_nodeid)) { + /* this will happen if from_nodeid became master during + * a previous recovery cycle, and we aborted the previous + * cycle before recovering this master value + */ + + log_limit(ls, "%s from_master %d master_nodeid %d res_nodeid %d first %x %s", + __func__, from_nodeid, r->res_master_nodeid, + r->res_nodeid, r->res_first_lkid, r->res_name); + + if (r->res_master_nodeid == our_nodeid) { + log_error(ls, "from_master %d our_master", from_nodeid); + dlm_dump_rsb(r); + goto ret_assign; + } + + r->res_master_nodeid = from_nodeid; + r->res_nodeid = from_nodeid; + rsb_set_flag(r, RSB_NEW_MASTER); + } + + if (!r->res_master_nodeid) { + /* this will happen if recovery happens while we're looking + * up the master for this rsb + */ + + log_debug(ls, "%s master 0 to %d first %x %s", __func__, + from_nodeid, r->res_first_lkid, r->res_name); + r->res_master_nodeid = from_nodeid; + r->res_nodeid = from_nodeid; + } + + if (!from_master && !fix_master && + (r->res_master_nodeid == from_nodeid)) { + /* this can happen when the master sends remove, the dir node + * finds the rsb on the keep list and ignores the remove, + * and the former master sends a lookup + */ + + log_limit(ls, "%s from master %d flags %x first %x %s", + __func__, from_nodeid, flags, r->res_first_lkid, + r->res_name); + } + + ret_assign: + *r_nodeid = r->res_master_nodeid; + if (result) + *result = DLM_LU_MATCH; +} + /* * We're the dir node for this res and another node wants to know the * master nodeid. During normal operation (non recovery) this is only @@ -914,10 +997,8 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len, { struct dlm_rsb *r = NULL; uint32_t hash, b; - int from_master = (flags & DLM_LU_RECOVER_DIR); - int fix_master = (flags & DLM_LU_RECOVER_MASTER); int our_nodeid = dlm_our_nodeid(); - int dir_nodeid, error, toss_list = 0; + int dir_nodeid, error; if (len > DLM_RESNAME_MAXLEN) return -EINVAL; @@ -949,12 +1030,21 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len, error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); if (!error) { /* because the rsb is active, we need to lock_rsb before - checking/changing re_master_nodeid */ + * checking/changing re_master_nodeid + */ hold_rsb(r); spin_unlock(&ls->ls_rsbtbl[b].lock); lock_rsb(r); - goto found; + + __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false, + flags, r_nodeid, result); + + /* the rsb was active */ + unlock_rsb(r); + put_rsb(r); + + return 0; } error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); @@ -962,90 +1052,16 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len, goto not_found; /* because the rsb is inactive (on toss list), it's not refcounted - and lock_rsb is not used, but is protected by the rsbtbl lock */ - - toss_list = 1; - found: - if (r->res_dir_nodeid != our_nodeid) { - /* should not happen, but may as well fix it and carry on */ - log_error(ls, "dlm_master_lookup res_dir %d our %d %s", - r->res_dir_nodeid, our_nodeid, r->res_name); - r->res_dir_nodeid = our_nodeid; - } - - if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) { - /* Recovery uses this function to set a new master when - the previous master failed. Setting NEW_MASTER will - force dlm_recover_masters to call recover_master on this - rsb even though the res_nodeid is no longer removed. */ - - r->res_master_nodeid = from_nodeid; - r->res_nodeid = from_nodeid; - rsb_set_flag(r, RSB_NEW_MASTER); - - if (toss_list) { - /* I don't think we should ever find it on toss list. */ - log_error(ls, "dlm_master_lookup fix_master on toss"); - dlm_dump_rsb(r); - } - } - - if (from_master && (r->res_master_nodeid != from_nodeid)) { - /* this will happen if from_nodeid became master during - a previous recovery cycle, and we aborted the previous - cycle before recovering this master value */ - - log_limit(ls, "dlm_master_lookup from_master %d " - "master_nodeid %d res_nodeid %d first %x %s", - from_nodeid, r->res_master_nodeid, r->res_nodeid, - r->res_first_lkid, r->res_name); - - if (r->res_master_nodeid == our_nodeid) { - log_error(ls, "from_master %d our_master", from_nodeid); - dlm_dump_rsb(r); - goto out_found; - } - - r->res_master_nodeid = from_nodeid; - r->res_nodeid = from_nodeid; - rsb_set_flag(r, RSB_NEW_MASTER); - } - - if (!r->res_master_nodeid) { - /* this will happen if recovery happens while we're looking - up the master for this rsb */ - - log_debug(ls, "dlm_master_lookup master 0 to %d first %x %s", - from_nodeid, r->res_first_lkid, r->res_name); - r->res_master_nodeid = from_nodeid; - r->res_nodeid = from_nodeid; - } - - if (!from_master && !fix_master && - (r->res_master_nodeid == from_nodeid)) { - /* this can happen when the master sends remove, the dir node - finds the rsb on the keep list and ignores the remove, - and the former master sends a lookup */ + * and lock_rsb is not used, but is protected by the rsbtbl lock + */ - log_limit(ls, "dlm_master_lookup from master %d flags %x " - "first %x %s", from_nodeid, flags, - r->res_first_lkid, r->res_name); - } + __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags, + r_nodeid, result); - out_found: - *r_nodeid = r->res_master_nodeid; - if (result) - *result = DLM_LU_MATCH; + r->res_toss_time = jiffies; + /* the rsb was inactive (on toss list) */ + spin_unlock(&ls->ls_rsbtbl[b].lock); - if (toss_list) { - r->res_toss_time = jiffies; - /* the rsb was inactive (on toss list) */ - spin_unlock(&ls->ls_rsbtbl[b].lock); - } else { - /* the rsb was active */ - unlock_rsb(r); - put_rsb(r); - } return 0; not_found: @@ -1076,7 +1092,6 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len, if (result) *result = DLM_LU_ADD; *r_nodeid = from_nodeid; - error = 0; out_unlock: spin_unlock(&ls->ls_rsbtbl[b].lock); return error; @@ -1253,9 +1268,11 @@ static void kill_lkb(struct kref *kref) static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb) { uint32_t lkid = lkb->lkb_id; + int rv; - spin_lock(&ls->ls_lkbidr_spin); - if (kref_put(&lkb->lkb_ref, kill_lkb)) { + rv = kref_put_lock(&lkb->lkb_ref, kill_lkb, + &ls->ls_lkbidr_spin); + if (rv) { idr_remove(&ls->ls_lkbidr, lkid); spin_unlock(&ls->ls_lkbidr_spin); @@ -1265,11 +1282,9 @@ static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb) if (lkb->lkb_lvbptr && is_master_copy(lkb)) dlm_free_lvb(lkb->lkb_lvbptr); dlm_free_lkb(lkb); - return 1; - } else { - spin_unlock(&ls->ls_lkbidr_spin); - return 0; } + + return rv; } int dlm_put_lkb(struct dlm_lkb *lkb) @@ -1306,13 +1321,17 @@ static inline void unhold_lkb(struct dlm_lkb *lkb) static void lkb_add_ordered(struct list_head *new, struct list_head *head, int mode) { - struct dlm_lkb *lkb = NULL; + struct dlm_lkb *lkb = NULL, *iter; - list_for_each_entry(lkb, head, lkb_statequeue) - if (lkb->lkb_rqmode < mode) + list_for_each_entry(iter, head, lkb_statequeue) + if (iter->lkb_rqmode < mode) { + lkb = iter; + list_add_tail(new, &iter->lkb_statequeue); break; + } - __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue); + if (!lkb) + list_add_tail(new, head); } /* add/remove lkb to rsb's grant/convert/wait queue */ @@ -1559,6 +1578,7 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype, lkb->lkb_wait_type = 0; lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; lkb->lkb_wait_count--; + unhold_lkb(lkb); goto out_del; } @@ -1571,8 +1591,8 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype, } log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait", - lkb->lkb_id, ms ? ms->m_header.h_nodeid : 0, lkb->lkb_remid, - mstype, lkb->lkb_flags); + lkb->lkb_id, ms ? le32_to_cpu(ms->m_header.h_nodeid) : 0, + lkb->lkb_remid, mstype, lkb->lkb_flags); return -1; out_del: @@ -1585,6 +1605,7 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype, log_error(ls, "remwait error %x reply %d wait_type %d overlap", lkb->lkb_id, mstype, lkb->lkb_wait_type); lkb->lkb_wait_count--; + unhold_lkb(lkb); lkb->lkb_wait_type = 0; } @@ -1617,10 +1638,10 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms) struct dlm_ls *ls = lkb->lkb_resource->res_ls; int error; - if (ms->m_flags != DLM_IFL_STUB_MS) + if (ms->m_flags != cpu_to_le32(DLM_IFL_STUB_MS)) mutex_lock(&ls->ls_waiters_mutex); - error = _remove_from_waiters(lkb, ms->m_type, ms); - if (ms->m_flags != DLM_IFL_STUB_MS) + error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms); + if (ms->m_flags != cpu_to_le32(DLM_IFL_STUB_MS)) mutex_unlock(&ls->ls_waiters_mutex); return error; } @@ -1795,7 +1816,6 @@ static void shrink_bucket(struct dlm_ls *ls, int b) memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN); spin_unlock(&ls->ls_remove_spin); spin_unlock(&ls->ls_rsbtbl[b].lock); - wake_up(&ls->ls_remove_wait); send_remove(r); @@ -1804,6 +1824,7 @@ static void shrink_bucket(struct dlm_ls *ls, int b) ls->ls_remove_len = 0; memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN); spin_unlock(&ls->ls_remove_spin); + wake_up(&ls->ls_remove_wait); dlm_free_rsb(r); } @@ -1866,7 +1887,7 @@ static void del_timeout(struct dlm_lkb *lkb) void dlm_scan_timeout(struct dlm_ls *ls) { struct dlm_rsb *r; - struct dlm_lkb *lkb; + struct dlm_lkb *lkb = NULL, *iter; int do_cancel, do_warn; s64 wait_us; @@ -1877,27 +1898,28 @@ void dlm_scan_timeout(struct dlm_ls *ls) do_cancel = 0; do_warn = 0; mutex_lock(&ls->ls_timeout_mutex); - list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) { + list_for_each_entry(iter, &ls->ls_timeout, lkb_time_list) { wait_us = ktime_to_us(ktime_sub(ktime_get(), - lkb->lkb_timestamp)); + iter->lkb_timestamp)); - if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) && - wait_us >= (lkb->lkb_timeout_cs * 10000)) + if ((iter->lkb_exflags & DLM_LKF_TIMEOUT) && + wait_us >= (iter->lkb_timeout_cs * 10000)) do_cancel = 1; - if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) && + if ((iter->lkb_flags & DLM_IFL_WATCH_TIMEWARN) && wait_us >= dlm_config.ci_timewarn_cs * 10000) do_warn = 1; if (!do_cancel && !do_warn) continue; - hold_lkb(lkb); + hold_lkb(iter); + lkb = iter; break; } mutex_unlock(&ls->ls_timeout_mutex); - if (!do_cancel && !do_warn) + if (!lkb) break; r = lkb->lkb_resource; @@ -2051,7 +2073,7 @@ static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, if (len > r->res_ls->ls_lvblen) len = r->res_ls->ls_lvblen; memcpy(lkb->lkb_lvbptr, ms->m_extra, len); - lkb->lkb_lvbseq = ms->m_lvbseq; + lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq); } } @@ -2182,10 +2204,10 @@ static void munge_demoted(struct dlm_lkb *lkb) static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms) { - if (ms->m_type != DLM_MSG_REQUEST_REPLY && - ms->m_type != DLM_MSG_GRANT) { + if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) && + ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) { log_print("munge_altmode %x invalid reply type %d", - lkb->lkb_id, ms->m_type); + lkb->lkb_id, le32_to_cpu(ms->m_type)); return; } @@ -2912,7 +2934,8 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, if (lkb->lkb_status != DLM_LKSTS_GRANTED) goto out; - if (lkb->lkb_wait_type) + /* lock not allowed if there's any op in progress */ + if (lkb->lkb_wait_type || lkb->lkb_wait_count) goto out; if (is_overlap(lkb)) @@ -3563,13 +3586,13 @@ static int _create_message(struct dlm_ls *ls, int mb_len, ms = (struct dlm_message *) mb; - ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR); - ms->m_header.u.h_lockspace = ls->ls_global_id; - ms->m_header.h_nodeid = dlm_our_nodeid(); - ms->m_header.h_length = mb_len; + ms->m_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR); + ms->m_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id); + ms->m_header.h_nodeid = cpu_to_le32(dlm_our_nodeid()); + ms->m_header.h_length = cpu_to_le16(mb_len); ms->m_header.h_cmd = DLM_MSG; - ms->m_type = mstype; + ms->m_type = cpu_to_le32(mstype); *mh_ret = mh; *ms_ret = ms; @@ -3608,7 +3631,6 @@ static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb, static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms) { - dlm_message_out(ms); dlm_midcomms_commit_mhandle(mh); return 0; } @@ -3616,40 +3638,40 @@ static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms) static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb, struct dlm_message *ms) { - ms->m_nodeid = lkb->lkb_nodeid; - ms->m_pid = lkb->lkb_ownpid; - ms->m_lkid = lkb->lkb_id; - ms->m_remid = lkb->lkb_remid; - ms->m_exflags = lkb->lkb_exflags; - ms->m_sbflags = lkb->lkb_sbflags; - ms->m_flags = lkb->lkb_flags; - ms->m_lvbseq = lkb->lkb_lvbseq; - ms->m_status = lkb->lkb_status; - ms->m_grmode = lkb->lkb_grmode; - ms->m_rqmode = lkb->lkb_rqmode; - ms->m_hash = r->res_hash; + ms->m_nodeid = cpu_to_le32(lkb->lkb_nodeid); + ms->m_pid = cpu_to_le32(lkb->lkb_ownpid); + ms->m_lkid = cpu_to_le32(lkb->lkb_id); + ms->m_remid = cpu_to_le32(lkb->lkb_remid); + ms->m_exflags = cpu_to_le32(lkb->lkb_exflags); + ms->m_sbflags = cpu_to_le32(lkb->lkb_sbflags); + ms->m_flags = cpu_to_le32(lkb->lkb_flags); + ms->m_lvbseq = cpu_to_le32(lkb->lkb_lvbseq); + ms->m_status = cpu_to_le32(lkb->lkb_status); + ms->m_grmode = cpu_to_le32(lkb->lkb_grmode); + ms->m_rqmode = cpu_to_le32(lkb->lkb_rqmode); + ms->m_hash = cpu_to_le32(r->res_hash); /* m_result and m_bastmode are set from function args, not from lkb fields */ if (lkb->lkb_bastfn) - ms->m_asts |= DLM_CB_BAST; + ms->m_asts |= cpu_to_le32(DLM_CB_BAST); if (lkb->lkb_astfn) - ms->m_asts |= DLM_CB_CAST; + ms->m_asts |= cpu_to_le32(DLM_CB_CAST); /* compare with switch in create_message; send_remove() doesn't use send_args() */ switch (ms->m_type) { - case DLM_MSG_REQUEST: - case DLM_MSG_LOOKUP: + case cpu_to_le32(DLM_MSG_REQUEST): + case cpu_to_le32(DLM_MSG_LOOKUP): memcpy(ms->m_extra, r->res_name, r->res_length); break; - case DLM_MSG_CONVERT: - case DLM_MSG_UNLOCK: - case DLM_MSG_REQUEST_REPLY: - case DLM_MSG_CONVERT_REPLY: - case DLM_MSG_GRANT: + case cpu_to_le32(DLM_MSG_CONVERT): + case cpu_to_le32(DLM_MSG_UNLOCK): + case cpu_to_le32(DLM_MSG_REQUEST_REPLY): + case cpu_to_le32(DLM_MSG_CONVERT_REPLY): + case cpu_to_le32(DLM_MSG_GRANT): if (!lkb->lkb_lvbptr) break; memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen); @@ -3699,8 +3721,8 @@ static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) /* down conversions go without a reply from the master */ if (!error && down_conversion(lkb)) { remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY); - r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS; - r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY; + r->res_ls->ls_stub_ms.m_flags = cpu_to_le32(DLM_IFL_STUB_MS); + r->res_ls->ls_stub_ms.m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY); r->res_ls->ls_stub_ms.m_result = 0; __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms); } @@ -3757,7 +3779,7 @@ static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode) send_args(r, lkb, ms); - ms->m_bastmode = mode; + ms->m_bastmode = cpu_to_le32(mode); error = send_message(mh, ms); out: @@ -3805,7 +3827,7 @@ static int send_remove(struct dlm_rsb *r) goto out; memcpy(ms->m_extra, r->res_name, r->res_length); - ms->m_hash = r->res_hash; + ms->m_hash = cpu_to_le32(r->res_hash); error = send_message(mh, ms); out: @@ -3827,7 +3849,7 @@ static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, send_args(r, lkb, ms); - ms->m_result = rv; + ms->m_result = cpu_to_le32(to_dlm_errno(rv)); error = send_message(mh, ms); out: @@ -3860,15 +3882,15 @@ static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in, struct dlm_rsb *r = &ls->ls_stub_rsb; struct dlm_message *ms; struct dlm_mhandle *mh; - int error, nodeid = ms_in->m_header.h_nodeid; + int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid); error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh); if (error) goto out; ms->m_lkid = ms_in->m_lkid; - ms->m_result = rv; - ms->m_nodeid = ret_nodeid; + ms->m_result = cpu_to_le32(to_dlm_errno(rv)); + ms->m_nodeid = cpu_to_le32(ret_nodeid); error = send_message(mh, ms); out: @@ -3881,25 +3903,26 @@ static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in, static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms) { - lkb->lkb_exflags = ms->m_exflags; - lkb->lkb_sbflags = ms->m_sbflags; + lkb->lkb_exflags = le32_to_cpu(ms->m_exflags); + lkb->lkb_sbflags = le32_to_cpu(ms->m_sbflags); lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) | - (ms->m_flags & 0x0000FFFF); + (le32_to_cpu(ms->m_flags) & 0x0000FFFF); } static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms) { - if (ms->m_flags == DLM_IFL_STUB_MS) + if (ms->m_flags == cpu_to_le32(DLM_IFL_STUB_MS)) return; - lkb->lkb_sbflags = ms->m_sbflags; + lkb->lkb_sbflags = le32_to_cpu(ms->m_sbflags); lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) | - (ms->m_flags & 0x0000FFFF); + (le32_to_cpu(ms->m_flags) & 0x0000FFFF); } static int receive_extralen(struct dlm_message *ms) { - return (ms->m_header.h_length - sizeof(struct dlm_message)); + return (le16_to_cpu(ms->m_header.h_length) - + sizeof(struct dlm_message)); } static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb, @@ -3933,14 +3956,14 @@ static void fake_astfn(void *astparam) static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb, struct dlm_message *ms) { - lkb->lkb_nodeid = ms->m_header.h_nodeid; - lkb->lkb_ownpid = ms->m_pid; - lkb->lkb_remid = ms->m_lkid; + lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid); + lkb->lkb_ownpid = le32_to_cpu(ms->m_pid); + lkb->lkb_remid = le32_to_cpu(ms->m_lkid); lkb->lkb_grmode = DLM_LOCK_IV; - lkb->lkb_rqmode = ms->m_rqmode; + lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode); - lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL; - lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL; + lkb->lkb_bastfn = (ms->m_asts & cpu_to_le32(DLM_CB_BAST)) ? &fake_bastfn : NULL; + lkb->lkb_astfn = (ms->m_asts & cpu_to_le32(DLM_CB_CAST)) ? &fake_astfn : NULL; if (lkb->lkb_exflags & DLM_LKF_VALBLK) { /* lkb was just created so there won't be an lvb yet */ @@ -3961,8 +3984,8 @@ static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb, if (receive_lvb(ls, lkb, ms)) return -ENOMEM; - lkb->lkb_rqmode = ms->m_rqmode; - lkb->lkb_lvbseq = ms->m_lvbseq; + lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode); + lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq); return 0; } @@ -3981,8 +4004,8 @@ static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms) { struct dlm_lkb *lkb = &ls->ls_stub_lkb; - lkb->lkb_nodeid = ms->m_header.h_nodeid; - lkb->lkb_remid = ms->m_lkid; + lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid); + lkb->lkb_remid = le32_to_cpu(ms->m_lkid); } /* This is called after the rsb is locked so that we can safely inspect @@ -3990,11 +4013,12 @@ static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms) static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms) { - int from = ms->m_header.h_nodeid; + int from = le32_to_cpu(ms->m_header.h_nodeid); int error = 0; /* currently mixing of user/kernel locks are not supported */ - if (ms->m_flags & DLM_IFL_USER && ~lkb->lkb_flags & DLM_IFL_USER) { + if (ms->m_flags & cpu_to_le32(DLM_IFL_USER) && + ~lkb->lkb_flags & DLM_IFL_USER) { log_error(lkb->lkb_resource->res_ls, "got user dlm message for a kernel lock"); error = -EINVAL; @@ -4002,23 +4026,23 @@ static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms) } switch (ms->m_type) { - case DLM_MSG_CONVERT: - case DLM_MSG_UNLOCK: - case DLM_MSG_CANCEL: + case cpu_to_le32(DLM_MSG_CONVERT): + case cpu_to_le32(DLM_MSG_UNLOCK): + case cpu_to_le32(DLM_MSG_CANCEL): if (!is_master_copy(lkb) || lkb->lkb_nodeid != from) error = -EINVAL; break; - case DLM_MSG_CONVERT_REPLY: - case DLM_MSG_UNLOCK_REPLY: - case DLM_MSG_CANCEL_REPLY: - case DLM_MSG_GRANT: - case DLM_MSG_BAST: + case cpu_to_le32(DLM_MSG_CONVERT_REPLY): + case cpu_to_le32(DLM_MSG_UNLOCK_REPLY): + case cpu_to_le32(DLM_MSG_CANCEL_REPLY): + case cpu_to_le32(DLM_MSG_GRANT): + case cpu_to_le32(DLM_MSG_BAST): if (!is_process_copy(lkb) || lkb->lkb_nodeid != from) error = -EINVAL; break; - case DLM_MSG_REQUEST_REPLY: + case cpu_to_le32(DLM_MSG_REQUEST_REPLY): if (!is_process_copy(lkb)) error = -EINVAL; else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from) @@ -4033,8 +4057,8 @@ out: if (error) log_error(lkb->lkb_resource->res_ls, "ignore invalid message %d from %d %x %x %x %d", - ms->m_type, from, lkb->lkb_id, lkb->lkb_remid, - lkb->lkb_flags, lkb->lkb_nodeid); + le32_to_cpu(ms->m_type), from, lkb->lkb_id, + lkb->lkb_remid, lkb->lkb_flags, lkb->lkb_nodeid); return error; } @@ -4079,22 +4103,23 @@ static void send_repeat_remove(struct dlm_ls *ls, char *ms_name, int len) memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN); spin_unlock(&ls->ls_remove_spin); spin_unlock(&ls->ls_rsbtbl[b].lock); - wake_up(&ls->ls_remove_wait); rv = _create_message(ls, sizeof(struct dlm_message) + len, dir_nodeid, DLM_MSG_REMOVE, &ms, &mh); if (rv) - return; + goto out; memcpy(ms->m_extra, name, len); - ms->m_hash = hash; + ms->m_hash = cpu_to_le32(hash); send_message(mh, ms); +out: spin_lock(&ls->ls_remove_spin); ls->ls_remove_len = 0; memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN); spin_unlock(&ls->ls_remove_spin); + wake_up(&ls->ls_remove_wait); } static int receive_request(struct dlm_ls *ls, struct dlm_message *ms) @@ -4104,7 +4129,7 @@ static int receive_request(struct dlm_ls *ls, struct dlm_message *ms) int from_nodeid; int error, namelen = 0; - from_nodeid = ms->m_header.h_nodeid; + from_nodeid = le32_to_cpu(ms->m_header.h_nodeid); error = create_lkb(ls, &lkb); if (error) @@ -4177,7 +4202,7 @@ static int receive_request(struct dlm_ls *ls, struct dlm_message *ms) if (error != -ENOTBLK) { log_limit(ls, "receive_request %x from %d %d", - ms->m_lkid, from_nodeid, error); + le32_to_cpu(ms->m_lkid), from_nodeid, error); } if (namelen && error == -EBADR) { @@ -4196,15 +4221,16 @@ static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms) struct dlm_rsb *r; int error, reply = 1; - error = find_lkb(ls, ms->m_remid, &lkb); + error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); if (error) goto fail; - if (lkb->lkb_remid != ms->m_lkid) { + if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) { log_error(ls, "receive_convert %x remid %x recover_seq %llu " "remote %d %x", lkb->lkb_id, lkb->lkb_remid, (unsigned long long)lkb->lkb_recover_seq, - ms->m_header.h_nodeid, ms->m_lkid); + le32_to_cpu(ms->m_header.h_nodeid), + le32_to_cpu(ms->m_lkid)); error = -ENOENT; dlm_put_lkb(lkb); goto fail; @@ -4251,14 +4277,15 @@ static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms) struct dlm_rsb *r; int error; - error = find_lkb(ls, ms->m_remid, &lkb); + error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); if (error) goto fail; - if (lkb->lkb_remid != ms->m_lkid) { + if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) { log_error(ls, "receive_unlock %x remid %x remote %d %x", lkb->lkb_id, lkb->lkb_remid, - ms->m_header.h_nodeid, ms->m_lkid); + le32_to_cpu(ms->m_header.h_nodeid), + le32_to_cpu(ms->m_lkid)); error = -ENOENT; dlm_put_lkb(lkb); goto fail; @@ -4302,7 +4329,7 @@ static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms) struct dlm_rsb *r; int error; - error = find_lkb(ls, ms->m_remid, &lkb); + error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); if (error) goto fail; @@ -4338,7 +4365,7 @@ static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms) struct dlm_rsb *r; int error; - error = find_lkb(ls, ms->m_remid, &lkb); + error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); if (error) return error; @@ -4369,7 +4396,7 @@ static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms) struct dlm_rsb *r; int error; - error = find_lkb(ls, ms->m_remid, &lkb); + error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); if (error) return error; @@ -4382,8 +4409,8 @@ static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms) if (error) goto out; - queue_bast(r, lkb, ms->m_bastmode); - lkb->lkb_highbast = ms->m_bastmode; + queue_bast(r, lkb, le32_to_cpu(ms->m_bastmode)); + lkb->lkb_highbast = le32_to_cpu(ms->m_bastmode); out: unlock_rsb(r); put_rsb(r); @@ -4395,7 +4422,7 @@ static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms) { int len, error, ret_nodeid, from_nodeid, our_nodeid; - from_nodeid = ms->m_header.h_nodeid; + from_nodeid = le32_to_cpu(ms->m_header.h_nodeid); our_nodeid = dlm_our_nodeid(); len = receive_extralen(ms); @@ -4418,7 +4445,7 @@ static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms) uint32_t hash, b; int rv, len, dir_nodeid, from_nodeid; - from_nodeid = ms->m_header.h_nodeid; + from_nodeid = le32_to_cpu(ms->m_header.h_nodeid); len = receive_extralen(ms); @@ -4428,7 +4455,7 @@ static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms) return; } - dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash); + dir_nodeid = dlm_hash2nodeid(ls, le32_to_cpu(ms->m_hash)); if (dir_nodeid != dlm_our_nodeid()) { log_error(ls, "receive_remove from %d bad nodeid %d", from_nodeid, dir_nodeid); @@ -4501,7 +4528,7 @@ static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms) static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms) { - do_purge(ls, ms->m_nodeid, ms->m_pid); + do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid)); } static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) @@ -4509,9 +4536,9 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) struct dlm_lkb *lkb; struct dlm_rsb *r; int error, mstype, result; - int from_nodeid = ms->m_header.h_nodeid; + int from_nodeid = le32_to_cpu(ms->m_header.h_nodeid); - error = find_lkb(ls, ms->m_remid, &lkb); + error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); if (error) return error; @@ -4527,7 +4554,8 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY); if (error) { log_error(ls, "receive_request_reply %x remote %d %x result %d", - lkb->lkb_id, from_nodeid, ms->m_lkid, ms->m_result); + lkb->lkb_id, from_nodeid, le32_to_cpu(ms->m_lkid), + from_dlm_errno(le32_to_cpu(ms->m_result))); dlm_dump_rsb(r); goto out; } @@ -4541,7 +4569,7 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) } /* this is the value returned from do_request() on the master */ - result = ms->m_result; + result = from_dlm_errno(le32_to_cpu(ms->m_result)); switch (result) { case -EAGAIN: @@ -4555,7 +4583,7 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) case 0: /* request was queued or granted on remote master */ receive_flags_reply(lkb, ms); - lkb->lkb_remid = ms->m_lkid; + lkb->lkb_remid = le32_to_cpu(ms->m_lkid); if (is_altmode(lkb)) munge_altmode(lkb, ms); if (result) { @@ -4628,7 +4656,7 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, struct dlm_message *ms) { /* this is the value returned from do_convert() on the master */ - switch (ms->m_result) { + switch (from_dlm_errno(le32_to_cpu(ms->m_result))) { case -EAGAIN: /* convert would block (be queued) on remote master */ queue_cast(r, lkb, -EAGAIN); @@ -4661,8 +4689,9 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, default: log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d", - lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid, - ms->m_result); + lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid), + le32_to_cpu(ms->m_lkid), + from_dlm_errno(le32_to_cpu(ms->m_result))); dlm_print_rsb(r); dlm_print_lkb(lkb); } @@ -4696,7 +4725,7 @@ static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms) struct dlm_lkb *lkb; int error; - error = find_lkb(ls, ms->m_remid, &lkb); + error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); if (error) return error; @@ -4724,7 +4753,7 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms) /* this is the value returned from do_unlock() on the master */ - switch (ms->m_result) { + switch (from_dlm_errno(le32_to_cpu(ms->m_result))) { case -DLM_EUNLOCK: receive_flags_reply(lkb, ms); remove_lock_pc(r, lkb); @@ -4734,7 +4763,7 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms) break; default: log_error(r->res_ls, "receive_unlock_reply %x error %d", - lkb->lkb_id, ms->m_result); + lkb->lkb_id, from_dlm_errno(le32_to_cpu(ms->m_result))); } out: unlock_rsb(r); @@ -4746,7 +4775,7 @@ static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms) struct dlm_lkb *lkb; int error; - error = find_lkb(ls, ms->m_remid, &lkb); + error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); if (error) return error; @@ -4774,7 +4803,7 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms) /* this is the value returned from do_cancel() on the master */ - switch (ms->m_result) { + switch (from_dlm_errno(le32_to_cpu(ms->m_result))) { case -DLM_ECANCEL: receive_flags_reply(lkb, ms); revert_lock_pc(r, lkb); @@ -4784,7 +4813,8 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms) break; default: log_error(r->res_ls, "receive_cancel_reply %x error %d", - lkb->lkb_id, ms->m_result); + lkb->lkb_id, + from_dlm_errno(le32_to_cpu(ms->m_result))); } out: unlock_rsb(r); @@ -4796,7 +4826,7 @@ static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms) struct dlm_lkb *lkb; int error; - error = find_lkb(ls, ms->m_remid, &lkb); + error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); if (error) return error; @@ -4812,9 +4842,10 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) int error, ret_nodeid; int do_lookup_list = 0; - error = find_lkb(ls, ms->m_lkid, &lkb); + error = find_lkb(ls, le32_to_cpu(ms->m_lkid), &lkb); if (error) { - log_error(ls, "receive_lookup_reply no lkid %x", ms->m_lkid); + log_error(ls, "%s no lkid %x", __func__, + le32_to_cpu(ms->m_lkid)); return; } @@ -4829,7 +4860,7 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) if (error) goto out; - ret_nodeid = ms->m_nodeid; + ret_nodeid = le32_to_cpu(ms->m_nodeid); /* We sometimes receive a request from the dir node for this rsb before we've received the dir node's loookup_reply for it. @@ -4841,8 +4872,8 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) /* This should never happen */ log_error(ls, "receive_lookup_reply %x from %d ret %d " "master %d dir %d our %d first %x %s", - lkb->lkb_id, ms->m_header.h_nodeid, ret_nodeid, - r->res_master_nodeid, r->res_dir_nodeid, + lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid), + ret_nodeid, r->res_master_nodeid, r->res_dir_nodeid, dlm_our_nodeid(), r->res_first_lkid, r->res_name); } @@ -4854,7 +4885,7 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) } else if (ret_nodeid == -1) { /* the remote node doesn't believe it's the dir node */ log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid", - lkb->lkb_id, ms->m_header.h_nodeid); + lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid)); r->res_master_nodeid = 0; r->res_nodeid = -1; lkb->lkb_nodeid = -1; @@ -4888,10 +4919,12 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms, { int error = 0, noent = 0; - if (!dlm_is_member(ls, ms->m_header.h_nodeid)) { + if (!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid))) { log_limit(ls, "receive %d from non-member %d %x %x %d", - ms->m_type, ms->m_header.h_nodeid, ms->m_lkid, - ms->m_remid, ms->m_result); + le32_to_cpu(ms->m_type), + le32_to_cpu(ms->m_header.h_nodeid), + le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid), + from_dlm_errno(le32_to_cpu(ms->m_result))); return; } @@ -4899,77 +4932,78 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms, /* messages sent to a master node */ - case DLM_MSG_REQUEST: + case cpu_to_le32(DLM_MSG_REQUEST): error = receive_request(ls, ms); break; - case DLM_MSG_CONVERT: + case cpu_to_le32(DLM_MSG_CONVERT): error = receive_convert(ls, ms); break; - case DLM_MSG_UNLOCK: + case cpu_to_le32(DLM_MSG_UNLOCK): error = receive_unlock(ls, ms); break; - case DLM_MSG_CANCEL: + case cpu_to_le32(DLM_MSG_CANCEL): noent = 1; error = receive_cancel(ls, ms); break; /* messages sent from a master node (replies to above) */ - case DLM_MSG_REQUEST_REPLY: + case cpu_to_le32(DLM_MSG_REQUEST_REPLY): error = receive_request_reply(ls, ms); break; - case DLM_MSG_CONVERT_REPLY: + case cpu_to_le32(DLM_MSG_CONVERT_REPLY): error = receive_convert_reply(ls, ms); break; - case DLM_MSG_UNLOCK_REPLY: + case cpu_to_le32(DLM_MSG_UNLOCK_REPLY): error = receive_unlock_reply(ls, ms); break; - case DLM_MSG_CANCEL_REPLY: + case cpu_to_le32(DLM_MSG_CANCEL_REPLY): error = receive_cancel_reply(ls, ms); break; /* messages sent from a master node (only two types of async msg) */ - case DLM_MSG_GRANT: + case cpu_to_le32(DLM_MSG_GRANT): noent = 1; error = receive_grant(ls, ms); break; - case DLM_MSG_BAST: + case cpu_to_le32(DLM_MSG_BAST): noent = 1; error = receive_bast(ls, ms); break; /* messages sent to a dir node */ - case DLM_MSG_LOOKUP: + case cpu_to_le32(DLM_MSG_LOOKUP): receive_lookup(ls, ms); break; - case DLM_MSG_REMOVE: + case cpu_to_le32(DLM_MSG_REMOVE): receive_remove(ls, ms); break; /* messages sent from a dir node (remove has no reply) */ - case DLM_MSG_LOOKUP_REPLY: + case cpu_to_le32(DLM_MSG_LOOKUP_REPLY): receive_lookup_reply(ls, ms); break; /* other messages */ - case DLM_MSG_PURGE: + case cpu_to_le32(DLM_MSG_PURGE): receive_purge(ls, ms); break; default: - log_error(ls, "unknown message type %d", ms->m_type); + log_error(ls, "unknown message type %d", + le32_to_cpu(ms->m_type)); } /* @@ -4985,22 +5019,26 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms, if (error == -ENOENT && noent) { log_debug(ls, "receive %d no %x remote %d %x saved_seq %u", - ms->m_type, ms->m_remid, ms->m_header.h_nodeid, - ms->m_lkid, saved_seq); + le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid), + le32_to_cpu(ms->m_header.h_nodeid), + le32_to_cpu(ms->m_lkid), saved_seq); } else if (error == -ENOENT) { log_error(ls, "receive %d no %x remote %d %x saved_seq %u", - ms->m_type, ms->m_remid, ms->m_header.h_nodeid, - ms->m_lkid, saved_seq); + le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid), + le32_to_cpu(ms->m_header.h_nodeid), + le32_to_cpu(ms->m_lkid), saved_seq); - if (ms->m_type == DLM_MSG_CONVERT) - dlm_dump_rsb_hash(ls, ms->m_hash); + if (ms->m_type == cpu_to_le32(DLM_MSG_CONVERT)) + dlm_dump_rsb_hash(ls, le32_to_cpu(ms->m_hash)); } if (error == -EINVAL) { log_error(ls, "receive %d inval from %d lkid %x remid %x " "saved_seq %u", - ms->m_type, ms->m_header.h_nodeid, - ms->m_lkid, ms->m_remid, saved_seq); + le32_to_cpu(ms->m_type), + le32_to_cpu(ms->m_header.h_nodeid), + le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid), + saved_seq); } } @@ -5021,7 +5059,7 @@ static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms, lockspace generation before we left. */ if (!ls->ls_generation) { log_limit(ls, "receive %d from %d ignore old gen", - ms->m_type, nodeid); + le32_to_cpu(ms->m_type), nodeid); return; } @@ -5054,30 +5092,30 @@ void dlm_receive_buffer(union dlm_packet *p, int nodeid) switch (hd->h_cmd) { case DLM_MSG: - dlm_message_in(&p->message); - type = p->message.m_type; + type = le32_to_cpu(p->message.m_type); break; case DLM_RCOM: - dlm_rcom_in(&p->rcom); - type = p->rcom.rc_type; + type = le32_to_cpu(p->rcom.rc_type); break; default: log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid); return; } - if (hd->h_nodeid != nodeid) { + if (le32_to_cpu(hd->h_nodeid) != nodeid) { log_print("invalid h_nodeid %d from %d lockspace %x", - hd->h_nodeid, nodeid, hd->u.h_lockspace); + le32_to_cpu(hd->h_nodeid), nodeid, + le32_to_cpu(hd->u.h_lockspace)); return; } - ls = dlm_find_lockspace_global(hd->u.h_lockspace); + ls = dlm_find_lockspace_global(le32_to_cpu(hd->u.h_lockspace)); if (!ls) { if (dlm_config.ci_log_debug) { printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace " "%u from %d cmd %d type %d\n", - hd->u.h_lockspace, nodeid, hd->h_cmd, type); + le32_to_cpu(hd->u.h_lockspace), nodeid, + hd->h_cmd, type); } if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS) @@ -5104,10 +5142,10 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb, if (middle_conversion(lkb)) { hold_lkb(lkb); memset(ms_stub, 0, sizeof(struct dlm_message)); - ms_stub->m_flags = DLM_IFL_STUB_MS; - ms_stub->m_type = DLM_MSG_CONVERT_REPLY; - ms_stub->m_result = -EINPROGRESS; - ms_stub->m_header.h_nodeid = lkb->lkb_nodeid; + ms_stub->m_flags = cpu_to_le32(DLM_IFL_STUB_MS); + ms_stub->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY); + ms_stub->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS)); + ms_stub->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid); _receive_convert_reply(lkb, ms_stub); /* Same special case as in receive_rcom_lock_args() */ @@ -5226,10 +5264,10 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls) case DLM_MSG_UNLOCK: hold_lkb(lkb); memset(ms_stub, 0, sizeof(struct dlm_message)); - ms_stub->m_flags = DLM_IFL_STUB_MS; - ms_stub->m_type = DLM_MSG_UNLOCK_REPLY; - ms_stub->m_result = stub_unlock_result; - ms_stub->m_header.h_nodeid = lkb->lkb_nodeid; + ms_stub->m_flags = cpu_to_le32(DLM_IFL_STUB_MS); + ms_stub->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY); + ms_stub->m_result = cpu_to_le32(to_dlm_errno(stub_unlock_result)); + ms_stub->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid); _receive_unlock_reply(lkb, ms_stub); dlm_put_lkb(lkb); break; @@ -5237,10 +5275,10 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls) case DLM_MSG_CANCEL: hold_lkb(lkb); memset(ms_stub, 0, sizeof(struct dlm_message)); - ms_stub->m_flags = DLM_IFL_STUB_MS; - ms_stub->m_type = DLM_MSG_CANCEL_REPLY; - ms_stub->m_result = stub_cancel_result; - ms_stub->m_header.h_nodeid = lkb->lkb_nodeid; + ms_stub->m_flags = cpu_to_le32(DLM_IFL_STUB_MS); + ms_stub->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY); + ms_stub->m_result = cpu_to_le32(to_dlm_errno(stub_cancel_result)); + ms_stub->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid); _receive_cancel_reply(lkb, ms_stub); dlm_put_lkb(lkb); break; @@ -5257,21 +5295,18 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls) static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls) { - struct dlm_lkb *lkb; - int found = 0; + struct dlm_lkb *lkb = NULL, *iter; mutex_lock(&ls->ls_waiters_mutex); - list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) { - if (lkb->lkb_flags & DLM_IFL_RESEND) { - hold_lkb(lkb); - found = 1; + list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) { + if (iter->lkb_flags & DLM_IFL_RESEND) { + hold_lkb(iter); + lkb = iter; break; } } mutex_unlock(&ls->ls_waiters_mutex); - if (!found) - lkb = NULL; return lkb; } @@ -5331,11 +5366,16 @@ int dlm_recover_waiters_post(struct dlm_ls *ls) lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; lkb->lkb_wait_type = 0; - lkb->lkb_wait_count = 0; + /* drop all wait_count references we still + * hold a reference for this iteration. + */ + while (lkb->lkb_wait_count) { + lkb->lkb_wait_count--; + unhold_lkb(lkb); + } mutex_lock(&ls->ls_waiters_mutex); list_del_init(&lkb->lkb_wait_reply); mutex_unlock(&ls->ls_waiters_mutex); - unhold_lkb(lkb); /* for waiters list */ if (oc || ou) { /* do an unlock or cancel instead of resending */ @@ -5605,7 +5645,7 @@ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, { struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; - lkb->lkb_nodeid = rc->rc_header.h_nodeid; + lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid); lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid); lkb->lkb_remid = le32_to_cpu(rl->rl_lkid); lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags); @@ -5620,8 +5660,8 @@ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL; if (lkb->lkb_exflags & DLM_LKF_VALBLK) { - int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) - - sizeof(struct rcom_lock); + int lvblen = le16_to_cpu(rc->rc_header.h_length) - + sizeof(struct dlm_rcom) - sizeof(struct rcom_lock); if (lvblen > ls->ls_lvblen) return -EINVAL; lkb->lkb_lvbptr = dlm_allocate_lvb(ls); @@ -5657,7 +5697,7 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) struct dlm_rsb *r; struct dlm_lkb *lkb; uint32_t remid = 0; - int from_nodeid = rc->rc_header.h_nodeid; + int from_nodeid = le32_to_cpu(rc->rc_header.h_nodeid); int error; if (rl->rl_parent_lkid) { @@ -5707,7 +5747,6 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) attach_lkb(r, lkb); add_lkb(r, lkb, rl->rl_status); - error = 0; ls->ls_recover_locks_in++; if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue)) @@ -5747,7 +5786,8 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc) error = find_lkb(ls, lkid, &lkb); if (error) { log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d", - lkid, rc->rc_header.h_nodeid, remid, result); + lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid, + result); return error; } @@ -5757,7 +5797,8 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc) if (!is_process_copy(lkb)) { log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d", - lkid, rc->rc_header.h_nodeid, remid, result); + lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid, + result); dlm_dump_rsb(r); unlock_rsb(r); put_rsb(r); @@ -5772,7 +5813,8 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc) a barrier between recover_masters and recover_locks. */ log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d", - lkid, rc->rc_header.h_nodeid, remid, result); + lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid, + result); dlm_send_rcom_lock(r, lkb); goto out; @@ -5782,7 +5824,8 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc) break; default: log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk", - lkid, rc->rc_header.h_nodeid, remid, result); + lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid, + result); } /* an ack for dlm_recover_locks() which waits for replies from @@ -5925,37 +5968,36 @@ int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, int mode, uint32_t flags, void *name, unsigned int namelen, unsigned long timeout_cs, uint32_t *lkid) { - struct dlm_lkb *lkb; + struct dlm_lkb *lkb = NULL, *iter; struct dlm_user_args *ua; int found_other_mode = 0; - int found = 0; int rv = 0; mutex_lock(&ls->ls_orphans_mutex); - list_for_each_entry(lkb, &ls->ls_orphans, lkb_ownqueue) { - if (lkb->lkb_resource->res_length != namelen) + list_for_each_entry(iter, &ls->ls_orphans, lkb_ownqueue) { + if (iter->lkb_resource->res_length != namelen) continue; - if (memcmp(lkb->lkb_resource->res_name, name, namelen)) + if (memcmp(iter->lkb_resource->res_name, name, namelen)) continue; - if (lkb->lkb_grmode != mode) { + if (iter->lkb_grmode != mode) { found_other_mode = 1; continue; } - found = 1; - list_del_init(&lkb->lkb_ownqueue); - lkb->lkb_flags &= ~DLM_IFL_ORPHAN; - *lkid = lkb->lkb_id; + lkb = iter; + list_del_init(&iter->lkb_ownqueue); + iter->lkb_flags &= ~DLM_IFL_ORPHAN; + *lkid = iter->lkb_id; break; } mutex_unlock(&ls->ls_orphans_mutex); - if (!found && found_other_mode) { + if (!lkb && found_other_mode) { rv = -EAGAIN; goto out; } - if (!found) { + if (!lkb) { rv = -ENOENT; goto out; } @@ -6307,8 +6349,8 @@ static int send_purge(struct dlm_ls *ls, int nodeid, int pid) DLM_MSG_PURGE, &ms, &mh); if (error) return error; - ms->m_nodeid = nodeid; - ms->m_pid = pid; + ms->m_nodeid = cpu_to_le32(nodeid); + ms->m_pid = cpu_to_le32(pid); return send_message(mh, ms); } |