diff options
author | Alexander Aring <aahringo@redhat.com> | 2024-04-15 20:39:42 +0200 |
---|---|---|
committer | David Teigland <teigland@redhat.com> | 2024-04-16 21:45:31 +0200 |
commit | e91313591b29ce724fe2f1bdf29f2482878fc275 (patch) | |
tree | f3058f289d907f09b98a946527b388e6e6f57a93 /fs/dlm/lock.c | |
parent | dlm: drop dlm_scand kthread and use timers (diff) | |
download | linux-e91313591b29ce724fe2f1bdf29f2482878fc275.tar.xz linux-e91313591b29ce724fe2f1bdf29f2482878fc275.zip |
dlm: use rwlock for rsb hash table
The conversion to rhashtable introduced a hash table lock per lockspace,
in place of per bucket locks. To make this more scalable, switch to
using a rwlock for hash table access. The common case fast path uses
it as a read lock.
Signed-off-by: Alexander Aring <aahringo@redhat.com>
Signed-off-by: David Teigland <teigland@redhat.com>
Diffstat (limited to 'fs/dlm/lock.c')
-rw-r--r-- | fs/dlm/lock.c | 269 |
1 files changed, 194 insertions, 75 deletions
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index 7c97181a04fe..790d0fd76bbe 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -342,15 +342,15 @@ void dlm_hold_rsb(struct dlm_rsb *r) /* TODO move this to lib/refcount.c */ static __must_check bool -dlm_refcount_dec_and_lock_bh(refcount_t *r, spinlock_t *lock) +dlm_refcount_dec_and_write_lock_bh(refcount_t *r, rwlock_t *lock) __cond_acquires(lock) { if (refcount_dec_not_one(r)) return false; - spin_lock_bh(lock); + write_lock_bh(lock); if (!refcount_dec_and_test(r)) { - spin_unlock_bh(lock); + write_unlock_bh(lock); return false; } @@ -358,11 +358,11 @@ __cond_acquires(lock) } /* TODO move this to include/linux/kref.h */ -static inline int dlm_kref_put_lock_bh(struct kref *kref, - void (*release)(struct kref *kref), - spinlock_t *lock) +static inline int dlm_kref_put_write_lock_bh(struct kref *kref, + void (*release)(struct kref *kref), + rwlock_t *lock) { - if (dlm_refcount_dec_and_lock_bh(&kref->refcount, lock)) { + if (dlm_refcount_dec_and_write_lock_bh(&kref->refcount, lock)) { release(kref); return 1; } @@ -378,10 +378,10 @@ static void put_rsb(struct dlm_rsb *r) struct dlm_ls *ls = r->res_ls; int rv; - rv = dlm_kref_put_lock_bh(&r->res_ref, toss_rsb, - &ls->ls_rsbtbl_lock); + rv = dlm_kref_put_write_lock_bh(&r->res_ref, toss_rsb, + &ls->ls_rsbtbl_lock); if (rv) - spin_unlock_bh(&ls->ls_rsbtbl_lock); + write_unlock_bh(&ls->ls_rsbtbl_lock); } void dlm_put_rsb(struct dlm_rsb *r) @@ -603,7 +603,7 @@ void dlm_rsb_toss_timer(struct timer_list *timer) * a caching handling and the other holders might to put * this rsb out of the toss state. */ - rv = spin_trylock(&ls->ls_rsbtbl_lock); + rv = write_trylock(&ls->ls_rsbtbl_lock); if (!rv) { spin_unlock(&ls->ls_toss_q_lock); /* rearm again try timer */ @@ -618,7 +618,7 @@ void dlm_rsb_toss_timer(struct timer_list *timer) /* not necessary to held the ls_rsbtbl_lock when * calling send_remove() */ - spin_unlock(&ls->ls_rsbtbl_lock); + write_unlock(&ls->ls_rsbtbl_lock); /* remove the rsb out of the toss queue its gone * drom DLM now @@ -702,16 +702,8 @@ int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len, static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash) { - int rv; - - rv = rhashtable_insert_fast(rhash, &rsb->res_node, - dlm_rhash_rsb_params); - if (rv == -EEXIST) { - log_print("%s match", __func__); - dlm_dump_rsb(rsb); - } - - return rv; + return rhashtable_insert_fast(rhash, &rsb->res_node, + dlm_rhash_rsb_params); } /* @@ -806,24 +798,47 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, goto out; } - spin_lock_bh(&ls->ls_rsbtbl_lock); + retry_lookup: + /* check if the rsb is in keep state under read lock - likely path */ + read_lock_bh(&ls->ls_rsbtbl_lock); error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); - if (error) + if (error) { + read_unlock_bh(&ls->ls_rsbtbl_lock); goto do_new; + } /* * rsb is active, so we can't check master_nodeid without lock_rsb. */ - if (rsb_flag(r, RSB_TOSS)) + if (rsb_flag(r, RSB_TOSS)) { + read_unlock_bh(&ls->ls_rsbtbl_lock); goto do_toss; + } kref_get(&r->res_ref); - goto out_unlock; + read_unlock_bh(&ls->ls_rsbtbl_lock); + goto out; do_toss: + write_lock_bh(&ls->ls_rsbtbl_lock); + + /* retry lookup under write lock to see if its still in toss state + * if not it's in keep state and we relookup - unlikely path. + */ + error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); + if (!error) { + if (!rsb_flag(r, RSB_TOSS)) { + write_unlock_bh(&ls->ls_rsbtbl_lock); + goto retry_lookup; + } + } else { + write_unlock_bh(&ls->ls_rsbtbl_lock); + goto do_new; + } + /* * rsb found inactive (master_nodeid may be out of date unless * we are the dir_nodeid or were the master) No other thread @@ -837,8 +852,9 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s", from_nodeid, r->res_master_nodeid, dir_nodeid, r->res_name); + write_unlock_bh(&ls->ls_rsbtbl_lock); error = -ENOTBLK; - goto out_unlock; + goto out; } if ((r->res_master_nodeid != our_nodeid) && from_dir) { @@ -868,9 +884,9 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, */ kref_init(&r->res_ref); rsb_delete_toss_timer(ls, r); - spin_unlock_bh(&ls->ls_rsbtbl_lock); + write_unlock_bh(&ls->ls_rsbtbl_lock); - goto out_unlock; + goto out; do_new: @@ -879,15 +895,13 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, */ if (error == -EBADR && !create) - goto out_unlock; + goto out; error = get_rsb_struct(ls, name, len, &r); - if (error == -EAGAIN) { - spin_unlock_bh(&ls->ls_rsbtbl_lock); + if (error == -EAGAIN) goto retry; - } if (error) - goto out_unlock; + goto out; r->res_hash = hash; r->res_dir_nodeid = dir_nodeid; @@ -909,7 +923,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, dlm_free_rsb(r); r = NULL; error = -ENOTBLK; - goto out_unlock; + goto out; } if (from_other) { @@ -929,11 +943,20 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, } out_add: + + write_lock_bh(&ls->ls_rsbtbl_lock); error = rsb_insert(r, &ls->ls_rsbtbl); - if (!error) + if (error == -EEXIST) { + /* somebody else was faster and it seems the + * rsb exists now, we do a whole relookup + */ + write_unlock_bh(&ls->ls_rsbtbl_lock); + dlm_free_rsb(r); + goto retry_lookup; + } else if (!error) { list_add(&r->res_rsbs_list, &ls->ls_keep); - out_unlock: - spin_unlock_bh(&ls->ls_rsbtbl_lock); + } + write_unlock_bh(&ls->ls_rsbtbl_lock); out: *r_ret = r; return error; @@ -957,24 +980,49 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, if (error < 0) goto out; - spin_lock_bh(&ls->ls_rsbtbl_lock); + retry_lookup: + /* check if the rsb is in keep state under read lock - likely path */ + read_lock_bh(&ls->ls_rsbtbl_lock); error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); - if (error) + if (error) { + read_unlock_bh(&ls->ls_rsbtbl_lock); goto do_new; + } - if (rsb_flag(r, RSB_TOSS)) + if (rsb_flag(r, RSB_TOSS)) { + read_unlock_bh(&ls->ls_rsbtbl_lock); goto do_toss; + } /* * rsb is active, so we can't check master_nodeid without lock_rsb. */ kref_get(&r->res_ref); - goto out_unlock; + read_unlock_bh(&ls->ls_rsbtbl_lock); + + goto out; do_toss: + write_lock_bh(&ls->ls_rsbtbl_lock); + + /* retry lookup under write lock to see if its still in toss state + * if not it's in keep state and we relookup - unlikely path. + */ + error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); + if (!error) { + if (!rsb_flag(r, RSB_TOSS)) { + write_unlock_bh(&ls->ls_rsbtbl_lock); + goto retry_lookup; + } + } else { + write_unlock_bh(&ls->ls_rsbtbl_lock); + goto do_new; + } + + /* * rsb found inactive. No other thread is using this rsb because * it's on the toss list, so we can look at or update @@ -987,8 +1035,9 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d", from_nodeid, r->res_master_nodeid, dir_nodeid); dlm_print_rsb(r); + write_unlock_bh(&ls->ls_rsbtbl_lock); error = -ENOTBLK; - goto out_unlock; + goto out; } if (!recover && (r->res_master_nodeid != our_nodeid) && @@ -1010,9 +1059,9 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, */ kref_init(&r->res_ref); rsb_delete_toss_timer(ls, r); - spin_unlock_bh(&ls->ls_rsbtbl_lock); + write_unlock_bh(&ls->ls_rsbtbl_lock); - goto out_unlock; + goto out; do_new: @@ -1022,11 +1071,10 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, error = get_rsb_struct(ls, name, len, &r); if (error == -EAGAIN) { - spin_unlock_bh(&ls->ls_rsbtbl_lock); goto retry; } if (error) - goto out_unlock; + goto out; r->res_hash = hash; r->res_dir_nodeid = dir_nodeid; @@ -1034,11 +1082,20 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid; kref_init(&r->res_ref); + write_lock_bh(&ls->ls_rsbtbl_lock); error = rsb_insert(r, &ls->ls_rsbtbl); - if (!error) + if (error == -EEXIST) { + /* somebody else was faster and it seems the + * rsb exists now, we do a whole relookup + */ + write_unlock_bh(&ls->ls_rsbtbl_lock); + dlm_free_rsb(r); + goto retry_lookup; + } else if (!error) { list_add(&r->res_rsbs_list, &ls->ls_keep); - out_unlock: - spin_unlock_bh(&ls->ls_rsbtbl_lock); + } + write_unlock_bh(&ls->ls_rsbtbl_lock); + out: *r_ret = r; return error; @@ -1251,18 +1308,23 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, if (error < 0) return error; - spin_lock_bh(&ls->ls_rsbtbl_lock); + retry_lookup: + + /* check if the rsb is in keep state under read lock - likely path */ + read_lock_bh(&ls->ls_rsbtbl_lock); error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); if (!error) { - if (rsb_flag(r, RSB_TOSS)) + if (rsb_flag(r, RSB_TOSS)) { + read_unlock_bh(&ls->ls_rsbtbl_lock); goto do_toss; + } /* because the rsb is active, we need to lock_rsb before * checking/changing re_master_nodeid */ hold_rsb(r); - spin_unlock_bh(&ls->ls_rsbtbl_lock); + read_unlock_bh(&ls->ls_rsbtbl_lock); lock_rsb(r); __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false, @@ -1274,10 +1336,31 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, return 0; } else { + read_unlock_bh(&ls->ls_rsbtbl_lock); goto not_found; } do_toss: + /* unlikely path - relookup under write */ + write_lock_bh(&ls->ls_rsbtbl_lock); + + /* rsb_mod_timer() requires to held ls_rsbtbl_lock in write lock + * check if the rsb is still in toss state, if not relookup + */ + error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); + if (!error) { + if (!rsb_flag(r, RSB_TOSS)) { + write_unlock_bh(&ls->ls_rsbtbl_lock); + /* something as changed, very unlikely but + * try again + */ + goto retry_lookup; + } + } else { + write_unlock_bh(&ls->ls_rsbtbl_lock); + goto not_found; + } + /* because the rsb is inactive (on toss list), it's not refcounted * and lock_rsb is not used, but is protected by the rsbtbl lock */ @@ -1287,18 +1370,16 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, rsb_mod_timer(ls, r); /* the rsb was inactive (on toss list) */ - spin_unlock_bh(&ls->ls_rsbtbl_lock); + write_unlock_bh(&ls->ls_rsbtbl_lock); return 0; not_found: error = get_rsb_struct(ls, name, len, &r); - if (error == -EAGAIN) { - spin_unlock_bh(&ls->ls_rsbtbl_lock); + if (error == -EAGAIN) goto retry; - } if (error) - goto out_unlock; + goto out; r->res_hash = hash; r->res_dir_nodeid = our_nodeid; @@ -1307,22 +1388,30 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, kref_init(&r->res_ref); rsb_set_flag(r, RSB_TOSS); + write_lock_bh(&ls->ls_rsbtbl_lock); error = rsb_insert(r, &ls->ls_rsbtbl); - if (error) { + if (error == -EEXIST) { + /* somebody else was faster and it seems the + * rsb exists now, we do a whole relookup + */ + write_unlock_bh(&ls->ls_rsbtbl_lock); + dlm_free_rsb(r); + goto retry_lookup; + } else if (error) { + write_unlock_bh(&ls->ls_rsbtbl_lock); /* should never happen */ dlm_free_rsb(r); - spin_unlock_bh(&ls->ls_rsbtbl_lock); goto retry; } list_add(&r->res_rsbs_list, &ls->ls_toss); rsb_mod_timer(ls, r); + write_unlock_bh(&ls->ls_rsbtbl_lock); if (result) *result = DLM_LU_ADD; *r_nodeid = from_nodeid; - out_unlock: - spin_unlock_bh(&ls->ls_rsbtbl_lock); + out: return error; } @@ -1330,12 +1419,12 @@ static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash) { struct dlm_rsb *r; - spin_lock_bh(&ls->ls_rsbtbl_lock); + read_lock_bh(&ls->ls_rsbtbl_lock); list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) { if (r->res_hash == hash) dlm_dump_rsb(r); } - spin_unlock_bh(&ls->ls_rsbtbl_lock); + read_unlock_bh(&ls->ls_rsbtbl_lock); } void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len) @@ -1343,14 +1432,14 @@ void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len) struct dlm_rsb *r = NULL; int error; - spin_lock_bh(&ls->ls_rsbtbl_lock); + read_lock_bh(&ls->ls_rsbtbl_lock); error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); if (!error) goto out; dlm_dump_rsb(r); out: - spin_unlock_bh(&ls->ls_rsbtbl_lock); + read_unlock_bh(&ls->ls_rsbtbl_lock); } static void toss_rsb(struct kref *kref) @@ -1478,6 +1567,36 @@ static void kill_lkb(struct kref *kref) DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb);); } +/* TODO move this to lib/refcount.c */ +static __must_check bool +dlm_refcount_dec_and_lock_bh(refcount_t *r, spinlock_t *lock) +__cond_acquires(lock) +{ + if (refcount_dec_not_one(r)) + return false; + + spin_lock_bh(lock); + if (!refcount_dec_and_test(r)) { + spin_unlock_bh(lock); + return false; + } + + return true; +} + +/* TODO move this to include/linux/kref.h */ +static inline int dlm_kref_put_lock_bh(struct kref *kref, + void (*release)(struct kref *kref), + spinlock_t *lock) +{ + if (dlm_refcount_dec_and_lock_bh(&kref->refcount, lock)) { + release(kref); + return 1; + } + + return 0; +} + /* __put_lkb() is used when an lkb may not have an rsb attached to it so we need to provide the lockspace explicitly */ @@ -4247,14 +4366,14 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms) memset(name, 0, sizeof(name)); memcpy(name, ms->m_extra, len); - spin_lock_bh(&ls->ls_rsbtbl_lock); + write_lock_bh(&ls->ls_rsbtbl_lock); rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); if (rv) { /* should not happen */ log_error(ls, "%s from %d not found %s", __func__, from_nodeid, name); - spin_unlock_bh(&ls->ls_rsbtbl_lock); + write_unlock_bh(&ls->ls_rsbtbl_lock); return; } @@ -4264,14 +4383,14 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms) log_error(ls, "receive_remove keep from %d master %d", from_nodeid, r->res_master_nodeid); dlm_print_rsb(r); - spin_unlock_bh(&ls->ls_rsbtbl_lock); + write_unlock_bh(&ls->ls_rsbtbl_lock); return; } log_debug(ls, "receive_remove from %d master %d first %x %s", from_nodeid, r->res_master_nodeid, r->res_first_lkid, name); - spin_unlock_bh(&ls->ls_rsbtbl_lock); + write_unlock_bh(&ls->ls_rsbtbl_lock); return; } @@ -4279,14 +4398,14 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms) log_error(ls, "receive_remove toss from %d master %d", from_nodeid, r->res_master_nodeid); dlm_print_rsb(r); - spin_unlock_bh(&ls->ls_rsbtbl_lock); + write_unlock_bh(&ls->ls_rsbtbl_lock); return; } list_del(&r->res_rsbs_list); rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node, dlm_rhash_rsb_params); - spin_unlock_bh(&ls->ls_rsbtbl_lock); + write_unlock_bh(&ls->ls_rsbtbl_lock); free_toss_rsb(r); } @@ -5354,7 +5473,7 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls) { struct dlm_rsb *r; - spin_lock_bh(&ls->ls_rsbtbl_lock); + read_lock_bh(&ls->ls_rsbtbl_lock); list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) { if (!rsb_flag(r, RSB_RECOVER_GRANT)) continue; @@ -5363,10 +5482,10 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls) continue; } hold_rsb(r); - spin_unlock_bh(&ls->ls_rsbtbl_lock); + read_unlock_bh(&ls->ls_rsbtbl_lock); return r; } - spin_unlock_bh(&ls->ls_rsbtbl_lock); + read_unlock_bh(&ls->ls_rsbtbl_lock); return NULL; } |