diff options
Diffstat (limited to 'fs/dlm')
-rw-r--r-- | fs/dlm/ast.c | 16 | ||||
-rw-r--r-- | fs/dlm/debug_fs.c | 96 | ||||
-rw-r--r-- | fs/dlm/dir.c | 3 | ||||
-rw-r--r-- | fs/dlm/dlm_internal.h | 12 | ||||
-rw-r--r-- | fs/dlm/lock.c | 109 | ||||
-rw-r--r-- | fs/dlm/lock.h | 4 | ||||
-rw-r--r-- | fs/dlm/lockspace.c | 41 | ||||
-rw-r--r-- | fs/dlm/lowcomms.c | 209 | ||||
-rw-r--r-- | fs/dlm/lowcomms.h | 6 | ||||
-rw-r--r-- | fs/dlm/main.c | 3 | ||||
-rw-r--r-- | fs/dlm/member.c | 3 | ||||
-rw-r--r-- | fs/dlm/memory.c | 68 | ||||
-rw-r--r-- | fs/dlm/memory.h | 6 | ||||
-rw-r--r-- | fs/dlm/midcomms.c | 85 | ||||
-rw-r--r-- | fs/dlm/midcomms.h | 3 | ||||
-rw-r--r-- | fs/dlm/rcom.c | 2 | ||||
-rw-r--r-- | fs/dlm/recoverd.c | 3 | ||||
-rw-r--r-- | fs/dlm/requestqueue.c | 17 |
18 files changed, 511 insertions, 175 deletions
diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c index 283c7b94edda..bfac462dd3e8 100644 --- a/fs/dlm/ast.c +++ b/fs/dlm/ast.c @@ -9,6 +9,8 @@ ******************************************************************************* ******************************************************************************/ +#include <trace/events/dlm.h> + #include "dlm_internal.h" #include "lock.h" #include "user.h" @@ -254,10 +256,12 @@ void dlm_callback_work(struct work_struct *work) continue; } else if (callbacks[i].flags & DLM_CB_BAST) { bastfn(lkb->lkb_astparam, callbacks[i].mode); + trace_dlm_bast(ls, lkb, callbacks[i].mode); } else if (callbacks[i].flags & DLM_CB_CAST) { lkb->lkb_lksb->sb_status = callbacks[i].sb_status; lkb->lkb_lksb->sb_flags = callbacks[i].sb_flags; castfn(lkb->lkb_astparam); + trace_dlm_ast(ls, lkb, lkb->lkb_lksb); } } @@ -295,7 +299,8 @@ void dlm_callback_suspend(struct dlm_ls *ls) void dlm_callback_resume(struct dlm_ls *ls) { struct dlm_lkb *lkb, *safe; - int count = 0; + int count = 0, sum = 0; + bool empty; clear_bit(LSFL_CB_DELAY, &ls->ls_flags); @@ -311,14 +316,17 @@ more: if (count == MAX_CB_QUEUE) break; } + empty = list_empty(&ls->ls_cb_delay); mutex_unlock(&ls->ls_cb_mutex); - if (count) - log_rinfo(ls, "dlm_callback_resume %d", count); - if (count == MAX_CB_QUEUE) { + sum += count; + if (!empty) { count = 0; cond_resched(); goto more; } + + if (sum) + log_rinfo(ls, "%s %d", __func__, sum); } diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c index 47e9d57e4cae..8fb04ebbafb5 100644 --- a/fs/dlm/debug_fs.c +++ b/fs/dlm/debug_fs.c @@ -635,6 +635,35 @@ static int table_open2(struct inode *inode, struct file *file) return 0; } +static ssize_t table_write2(struct file *file, const char __user *user_buf, + size_t count, loff_t *ppos) +{ + struct seq_file *seq = file->private_data; + int n, len, lkb_nodeid, lkb_status, error; + char name[DLM_RESNAME_MAXLEN + 1] = {}; + struct dlm_ls *ls = seq->private; + unsigned int lkb_flags; + char buf[256] = {}; + uint32_t lkb_id; + + if (copy_from_user(buf, user_buf, + min_t(size_t, sizeof(buf) - 1, count))) + return -EFAULT; + + n = sscanf(buf, "%x %" __stringify(DLM_RESNAME_MAXLEN) "s %x %d %d", + &lkb_id, name, &lkb_flags, &lkb_nodeid, &lkb_status); + if (n != 5) + return -EINVAL; + + len = strnlen(name, DLM_RESNAME_MAXLEN); + error = dlm_debug_add_lkb(ls, lkb_id, name, len, lkb_flags, + lkb_nodeid, lkb_status); + if (error) + return error; + + return count; +} + static int table_open3(struct inode *inode, struct file *file) { struct seq_file *seq; @@ -675,6 +704,7 @@ static const struct file_operations format2_fops = { .owner = THIS_MODULE, .open = table_open2, .read = seq_read, + .write = table_write2, .llseek = seq_lseek, .release = seq_release }; @@ -724,10 +754,35 @@ static ssize_t waiters_read(struct file *file, char __user *userbuf, return rv; } +static ssize_t waiters_write(struct file *file, const char __user *user_buf, + size_t count, loff_t *ppos) +{ + struct dlm_ls *ls = file->private_data; + int mstype, to_nodeid; + char buf[128] = {}; + uint32_t lkb_id; + int n, error; + + if (copy_from_user(buf, user_buf, + min_t(size_t, sizeof(buf) - 1, count))) + return -EFAULT; + + n = sscanf(buf, "%x %d %d", &lkb_id, &mstype, &to_nodeid); + if (n != 3) + return -EINVAL; + + error = dlm_debug_add_lkb_to_waiters(ls, lkb_id, mstype, to_nodeid); + if (error) + return error; + + return count; +} + static const struct file_operations waiters_fops = { .owner = THIS_MODULE, .open = simple_open, .read = waiters_read, + .write = waiters_write, .llseek = default_llseek, }; @@ -768,6 +823,42 @@ static int dlm_version_show(struct seq_file *file, void *offset) } DEFINE_SHOW_ATTRIBUTE(dlm_version); +static ssize_t dlm_rawmsg_write(struct file *fp, const char __user *user_buf, + size_t count, loff_t *ppos) +{ + void *buf; + int ret; + + if (count > PAGE_SIZE || count < sizeof(struct dlm_header)) + return -EINVAL; + + buf = kmalloc(PAGE_SIZE, GFP_NOFS); + if (!buf) + return -ENOMEM; + + if (copy_from_user(buf, user_buf, count)) { + ret = -EFAULT; + goto out; + } + + ret = dlm_midcomms_rawmsg_send(fp->private_data, buf, count); + if (ret) + goto out; + + kfree(buf); + return count; + +out: + kfree(buf); + return ret; +} + +static const struct file_operations dlm_rawmsg_fops = { + .open = simple_open, + .write = dlm_rawmsg_write, + .llseek = no_llseek, +}; + void *dlm_create_debug_comms_file(int nodeid, void *data) { struct dentry *d_node; @@ -782,6 +873,7 @@ void *dlm_create_debug_comms_file(int nodeid, void *data) debugfs_create_file("send_queue_count", 0444, d_node, data, &dlm_send_queue_cnt_fops); debugfs_create_file("version", 0444, d_node, data, &dlm_version_fops); + debugfs_create_file("rawmsg", 0200, d_node, data, &dlm_rawmsg_fops); return d_node; } @@ -809,7 +901,7 @@ void dlm_create_debug_file(struct dlm_ls *ls) snprintf(name, DLM_LOCKSPACE_LEN + 8, "%s_locks", ls->ls_name); ls->ls_debug_locks_dentry = debugfs_create_file(name, - S_IFREG | S_IRUGO, + 0644, dlm_root, ls, &format2_fops); @@ -840,7 +932,7 @@ void dlm_create_debug_file(struct dlm_ls *ls) snprintf(name, DLM_LOCKSPACE_LEN + 8, "%s_waiters", ls->ls_name); ls->ls_debug_waiters_dentry = debugfs_create_file(name, - S_IFREG | S_IRUGO, + 0644, dlm_root, ls, &waiters_fops); diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c index 45ebbe602bbf..b6692f81ec83 100644 --- a/fs/dlm/dir.c +++ b/fs/dlm/dir.c @@ -84,8 +84,7 @@ int dlm_recover_directory(struct dlm_ls *ls) for (;;) { int left; - error = dlm_recovery_stopped(ls); - if (error) { + if (dlm_recovery_stopped(ls)) { error = -EINTR; goto out_free; } diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index 5f57538b5d45..74a9590a4dd5 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -41,12 +41,6 @@ #include <linux/dlm.h> #include "config.h" -/* Size of the temp buffer midcomms allocates on the stack. - We try to make this large enough so most messages fit. - FIXME: should sctp make this unnecessary? */ - -#define DLM_INBUF_LEN 148 - struct dlm_ls; struct dlm_lkb; struct dlm_rsb; @@ -554,8 +548,9 @@ struct dlm_ls { uint32_t ls_generation; uint32_t ls_exflags; int ls_lvblen; - int ls_count; /* refcount of processes in + atomic_t ls_count; /* refcount of processes in the dlm using this ls */ + wait_queue_head_t ls_count_wait; int ls_create_count; /* create/release refcount */ unsigned long ls_flags; /* LSFL_ */ unsigned long ls_scan_time; @@ -581,6 +576,7 @@ struct dlm_ls { struct list_head ls_new_rsb; /* new rsb structs */ spinlock_t ls_remove_spin; + wait_queue_head_t ls_remove_wait; char ls_remove_name[DLM_RESNAME_MAXLEN+1]; char *ls_remove_names[DLM_REMOVE_NAMES_MAX]; int ls_remove_len; @@ -632,6 +628,8 @@ struct dlm_ls { struct rw_semaphore ls_in_recovery; /* block local requests */ struct rw_semaphore ls_recv_active; /* block dlm_recv */ struct list_head ls_requestqueue;/* queue remote requests */ + atomic_t ls_requestqueue_cnt; + wait_queue_head_t ls_requestqueue_wait; struct mutex ls_requestqueue_mutex; struct dlm_rcom *ls_recover_buf; int ls_recover_nodeid; /* for debugging */ diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index c502c065d007..bdb51d209ba2 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -53,6 +53,8 @@ R: do_xxxx() L: receive_xxxx_reply() <- R: send_xxxx_reply() */ +#include <trace/events/dlm.h> + #include <linux/types.h> #include <linux/rbtree.h> #include <linux/slab.h> @@ -1178,7 +1180,8 @@ static void detach_lkb(struct dlm_lkb *lkb) } } -static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) +static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret, + int start, int end) { struct dlm_lkb *lkb; int rv; @@ -1199,7 +1202,7 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) idr_preload(GFP_NOFS); spin_lock(&ls->ls_lkbidr_spin); - rv = idr_alloc(&ls->ls_lkbidr, lkb, 1, 0, GFP_NOWAIT); + rv = idr_alloc(&ls->ls_lkbidr, lkb, start, end, GFP_NOWAIT); if (rv >= 0) lkb->lkb_id = rv; spin_unlock(&ls->ls_lkbidr_spin); @@ -1215,6 +1218,11 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) return 0; } +static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) +{ + return _create_lkb(ls, lkb_ret, 1, 0); +} + static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret) { struct dlm_lkb *lkb; @@ -1618,21 +1626,24 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms) } /* If there's an rsb for the same resource being removed, ensure - that the remove message is sent before the new lookup message. - It should be rare to need a delay here, but if not, then it may - be worthwhile to add a proper wait mechanism rather than a delay. */ + * that the remove message is sent before the new lookup message. + */ + +#define DLM_WAIT_PENDING_COND(ls, r) \ + (ls->ls_remove_len && \ + !rsb_cmp(r, ls->ls_remove_name, \ + ls->ls_remove_len)) static void wait_pending_remove(struct dlm_rsb *r) { struct dlm_ls *ls = r->res_ls; restart: spin_lock(&ls->ls_remove_spin); - if (ls->ls_remove_len && - !rsb_cmp(r, ls->ls_remove_name, ls->ls_remove_len)) { + if (DLM_WAIT_PENDING_COND(ls, r)) { log_debug(ls, "delay lookup for remove dir %d %s", - r->res_dir_nodeid, r->res_name); + r->res_dir_nodeid, r->res_name); spin_unlock(&ls->ls_remove_spin); - msleep(1); + wait_event(ls->ls_remove_wait, !DLM_WAIT_PENDING_COND(ls, r)); goto restart; } spin_unlock(&ls->ls_remove_spin); @@ -1784,6 +1795,7 @@ static void shrink_bucket(struct dlm_ls *ls, int b) memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN); spin_unlock(&ls->ls_remove_spin); spin_unlock(&ls->ls_rsbtbl[b].lock); + wake_up(&ls->ls_remove_wait); send_remove(r); @@ -3437,6 +3449,8 @@ int dlm_lock(dlm_lockspace_t *lockspace, if (error) goto out; + trace_dlm_lock_start(ls, lkb, mode, flags); + error = set_lock_args(mode, lksb, flags, namelen, 0, ast, astarg, bast, &args); if (error) @@ -3450,6 +3464,8 @@ int dlm_lock(dlm_lockspace_t *lockspace, if (error == -EINPROGRESS) error = 0; out_put: + trace_dlm_lock_end(ls, lkb, mode, flags, error); + if (convert || error) __put_lkb(ls, lkb); if (error == -EAGAIN || error == -EDEADLK) @@ -3481,6 +3497,8 @@ int dlm_unlock(dlm_lockspace_t *lockspace, if (error) goto out; + trace_dlm_unlock_start(ls, lkb, flags); + error = set_unlock_args(flags, astarg, &args); if (error) goto out_put; @@ -3495,6 +3513,8 @@ int dlm_unlock(dlm_lockspace_t *lockspace, if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK))) error = 0; out_put: + trace_dlm_unlock_end(ls, lkb, flags, error); + dlm_put_lkb(lkb); out: dlm_unlock_recovery(ls); @@ -3973,6 +3993,14 @@ static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms) int from = ms->m_header.h_nodeid; int error = 0; + /* currently mixing of user/kernel locks are not supported */ + if (ms->m_flags & DLM_IFL_USER && ~lkb->lkb_flags & DLM_IFL_USER) { + log_error(lkb->lkb_resource->res_ls, + "got user dlm message for a kernel lock"); + error = -EINVAL; + goto out; + } + switch (ms->m_type) { case DLM_MSG_CONVERT: case DLM_MSG_UNLOCK: @@ -4001,6 +4029,7 @@ static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms) error = -EINVAL; } +out: if (error) log_error(lkb->lkb_resource->res_ls, "ignore invalid message %d from %d %x %x %x %d", @@ -4050,6 +4079,7 @@ static void send_repeat_remove(struct dlm_ls *ls, char *ms_name, int len) memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN); spin_unlock(&ls->ls_remove_spin); spin_unlock(&ls->ls_rsbtbl[b].lock); + wake_up(&ls->ls_remove_wait); rv = _create_message(ls, sizeof(struct dlm_message) + len, dir_nodeid, DLM_MSG_REMOVE, &ms, &mh); @@ -6301,3 +6331,64 @@ int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc, return error; } +/* debug functionality */ +int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len, + int lkb_nodeid, unsigned int lkb_flags, int lkb_status) +{ + struct dlm_lksb *lksb; + struct dlm_lkb *lkb; + struct dlm_rsb *r; + int error; + + /* we currently can't set a valid user lock */ + if (lkb_flags & DLM_IFL_USER) + return -EOPNOTSUPP; + + lksb = kzalloc(sizeof(*lksb), GFP_NOFS); + if (!lksb) + return -ENOMEM; + + error = _create_lkb(ls, &lkb, lkb_id, lkb_id + 1); + if (error) { + kfree(lksb); + return error; + } + + lkb->lkb_flags = lkb_flags; + lkb->lkb_nodeid = lkb_nodeid; + lkb->lkb_lksb = lksb; + /* user specific pointer, just don't have it NULL for kernel locks */ + if (~lkb_flags & DLM_IFL_USER) + lkb->lkb_astparam = (void *)0xDEADBEEF; + + error = find_rsb(ls, name, len, 0, R_REQUEST, &r); + if (error) { + kfree(lksb); + __put_lkb(ls, lkb); + return error; + } + + lock_rsb(r); + attach_lkb(r, lkb); + add_lkb(r, lkb, lkb_status); + unlock_rsb(r); + put_rsb(r); + + return 0; +} + +int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id, + int mstype, int to_nodeid) +{ + struct dlm_lkb *lkb; + int error; + + error = find_lkb(ls, lkb_id, &lkb); + if (error) + return error; + + error = add_to_waiters(lkb, mstype, to_nodeid); + dlm_put_lkb(lkb); + return error; +} + diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h index 456c6ec3ef6f..252a5898f908 100644 --- a/fs/dlm/lock.h +++ b/fs/dlm/lock.h @@ -58,6 +58,10 @@ int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc, int nodeid, int pid); int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid); void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc); +int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len, + int lkb_nodeid, unsigned int lkb_flags, int lkb_status); +int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id, + int mstype, int to_nodeid); static inline int is_master(struct dlm_rsb *r) { diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c index 10eddfa6c3d7..0d3833a124a3 100644 --- a/fs/dlm/lockspace.c +++ b/fs/dlm/lockspace.c @@ -216,8 +216,7 @@ static int do_uevent(struct dlm_ls *ls, int in) return ls->ls_uevent_result; } -static int dlm_uevent(struct kset *kset, struct kobject *kobj, - struct kobj_uevent_env *env) +static int dlm_uevent(struct kobject *kobj, struct kobj_uevent_env *env) { struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj); @@ -314,7 +313,7 @@ struct dlm_ls *dlm_find_lockspace_global(uint32_t id) list_for_each_entry(ls, &lslist, ls_list) { if (ls->ls_global_id == id) { - ls->ls_count++; + atomic_inc(&ls->ls_count); goto out; } } @@ -331,7 +330,7 @@ struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace) spin_lock(&lslist_lock); list_for_each_entry(ls, &lslist, ls_list) { if (ls->ls_local_handle == lockspace) { - ls->ls_count++; + atomic_inc(&ls->ls_count); goto out; } } @@ -348,7 +347,7 @@ struct dlm_ls *dlm_find_lockspace_device(int minor) spin_lock(&lslist_lock); list_for_each_entry(ls, &lslist, ls_list) { if (ls->ls_device.minor == minor) { - ls->ls_count++; + atomic_inc(&ls->ls_count); goto out; } } @@ -360,24 +359,24 @@ struct dlm_ls *dlm_find_lockspace_device(int minor) void dlm_put_lockspace(struct dlm_ls *ls) { - spin_lock(&lslist_lock); - ls->ls_count--; - spin_unlock(&lslist_lock); + if (atomic_dec_and_test(&ls->ls_count)) + wake_up(&ls->ls_count_wait); } static void remove_lockspace(struct dlm_ls *ls) { - for (;;) { - spin_lock(&lslist_lock); - if (ls->ls_count == 0) { - WARN_ON(ls->ls_create_count != 0); - list_del(&ls->ls_list); - spin_unlock(&lslist_lock); - return; - } +retry: + wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0); + + spin_lock(&lslist_lock); + if (atomic_read(&ls->ls_count) != 0) { spin_unlock(&lslist_lock); - ssleep(1); + goto retry; } + + WARN_ON(ls->ls_create_count != 0); + list_del(&ls->ls_list); + spin_unlock(&lslist_lock); } static int threads_start(void) @@ -481,7 +480,8 @@ static int new_lockspace(const char *name, const char *cluster, memcpy(ls->ls_name, name, namelen); ls->ls_namelen = namelen; ls->ls_lvblen = lvblen; - ls->ls_count = 0; + atomic_set(&ls->ls_count, 0); + init_waitqueue_head(&ls->ls_count_wait); ls->ls_flags = 0; ls->ls_scan_time = jiffies; @@ -511,6 +511,7 @@ static int new_lockspace(const char *name, const char *cluster, } spin_lock_init(&ls->ls_remove_spin); + init_waitqueue_head(&ls->ls_remove_wait); for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) { ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1, @@ -564,6 +565,8 @@ static int new_lockspace(const char *name, const char *cluster, init_rwsem(&ls->ls_in_recovery); init_rwsem(&ls->ls_recv_active); INIT_LIST_HEAD(&ls->ls_requestqueue); + atomic_set(&ls->ls_requestqueue_cnt, 0); + init_waitqueue_head(&ls->ls_requestqueue_wait); mutex_init(&ls->ls_requestqueue_mutex); mutex_init(&ls->ls_clear_proc_locks); @@ -868,7 +871,7 @@ static int release_lockspace(struct dlm_ls *ls, int force) * until this returns. * * Force has 4 possible values: - * 0 - don't destroy locksapce if it has any LKBs + * 0 - don't destroy lockspace if it has any LKBs * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs * 2 - destroy lockspace regardless of LKBs * 3 - destroy lockspace as part of a forced shutdown diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index 8f715c620e1f..e284d696c1fd 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -53,9 +53,12 @@ #include <net/sctp/sctp.h> #include <net/ipv6.h> +#include <trace/events/dlm.h> + #include "dlm_internal.h" #include "lowcomms.h" #include "midcomms.h" +#include "memory.h" #include "config.h" #define NEEDED_RMEM (4*1024*1024) @@ -84,7 +87,6 @@ struct connection { struct list_head writequeue; /* List of outgoing writequeue_entries */ spinlock_t writequeue_lock; atomic_t writequeue_cnt; - struct mutex wq_alloc; int retries; #define MAX_CONNECT_RETRIES 3 struct hlist_node list; @@ -189,6 +191,24 @@ static const struct dlm_proto_ops *dlm_proto_ops; static void process_recv_sockets(struct work_struct *work); static void process_send_sockets(struct work_struct *work); +static void writequeue_entry_ctor(void *data) +{ + struct writequeue_entry *entry = data; + + INIT_LIST_HEAD(&entry->msgs); +} + +struct kmem_cache *dlm_lowcomms_writequeue_cache_create(void) +{ + return kmem_cache_create("dlm_writequeue", sizeof(struct writequeue_entry), + 0, 0, writequeue_entry_ctor); +} + +struct kmem_cache *dlm_lowcomms_msg_cache_create(void) +{ + return kmem_cache_create("dlm_msg", sizeof(struct dlm_msg), 0, 0, NULL); +} + /* need to held writequeue_lock */ static struct writequeue_entry *con_next_wq(struct connection *con) { @@ -199,7 +219,10 @@ static struct writequeue_entry *con_next_wq(struct connection *con) e = list_first_entry(&con->writequeue, struct writequeue_entry, list); - if (e->len == 0) + /* if len is zero nothing is to send, if there are users filling + * buffers we wait until the users are done so we can send more. + */ + if (e->users || e->len == 0) return NULL; return e; @@ -265,8 +288,6 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc) return NULL; } - mutex_init(&con->wq_alloc); - spin_lock(&connections_lock); /* Because multiple workqueues/threads calls this function it can * race on multiple cpu's. Instead of locking hot path __find_con() @@ -486,11 +507,9 @@ static void lowcomms_data_ready(struct sock *sk) { struct connection *con; - read_lock_bh(&sk->sk_callback_lock); con = sock2con(sk); if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags)) queue_work(recv_workqueue, &con->rwork); - read_unlock_bh(&sk->sk_callback_lock); } static void lowcomms_listen_data_ready(struct sock *sk) @@ -505,15 +524,14 @@ static void lowcomms_write_space(struct sock *sk) { struct connection *con; - read_lock_bh(&sk->sk_callback_lock); con = sock2con(sk); if (!con) - goto out; + return; if (!test_and_set_bit(CF_CONNECTED, &con->flags)) { log_print("successful connected to node %d", con->nodeid); queue_work(send_workqueue, &con->swork); - goto out; + return; } clear_bit(SOCK_NOSPACE, &con->sock->flags); @@ -524,8 +542,6 @@ static void lowcomms_write_space(struct sock *sk) } queue_work(send_workqueue, &con->swork); -out: - read_unlock_bh(&sk->sk_callback_lock); } static inline void lowcomms_connect_sock(struct connection *con) @@ -592,42 +608,41 @@ int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark) static void lowcomms_error_report(struct sock *sk) { struct connection *con; - struct sockaddr_storage saddr; void (*orig_report)(struct sock *) = NULL; + struct inet_sock *inet; - read_lock_bh(&sk->sk_callback_lock); con = sock2con(sk); if (con == NULL) goto out; orig_report = listen_sock.sk_error_report; - if (kernel_getpeername(sk->sk_socket, (struct sockaddr *)&saddr) < 0) { - printk_ratelimited(KERN_ERR "dlm: node %d: socket error " - "sending to node %d, port %d, " - "sk_err=%d/%d\n", dlm_our_nodeid(), - con->nodeid, dlm_config.ci_tcp_port, - sk->sk_err, sk->sk_err_soft); - } else if (saddr.ss_family == AF_INET) { - struct sockaddr_in *sin4 = (struct sockaddr_in *)&saddr; + inet = inet_sk(sk); + switch (sk->sk_family) { + case AF_INET: printk_ratelimited(KERN_ERR "dlm: node %d: socket error " - "sending to node %d at %pI4, port %d, " + "sending to node %d at %pI4, dport %d, " "sk_err=%d/%d\n", dlm_our_nodeid(), - con->nodeid, &sin4->sin_addr.s_addr, - dlm_config.ci_tcp_port, sk->sk_err, + con->nodeid, &inet->inet_daddr, + ntohs(inet->inet_dport), sk->sk_err, sk->sk_err_soft); - } else { - struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&saddr; - + break; +#if IS_ENABLED(CONFIG_IPV6) + case AF_INET6: printk_ratelimited(KERN_ERR "dlm: node %d: socket error " - "sending to node %d at %u.%u.%u.%u, " - "port %d, sk_err=%d/%d\n", dlm_our_nodeid(), - con->nodeid, sin6->sin6_addr.s6_addr32[0], - sin6->sin6_addr.s6_addr32[1], - sin6->sin6_addr.s6_addr32[2], - sin6->sin6_addr.s6_addr32[3], - dlm_config.ci_tcp_port, sk->sk_err, + "sending to node %d at %pI6c, " + "dport %d, sk_err=%d/%d\n", dlm_our_nodeid(), + con->nodeid, &sk->sk_v6_daddr, + ntohs(inet->inet_dport), sk->sk_err, sk->sk_err_soft); + break; +#endif + default: + printk_ratelimited(KERN_ERR "dlm: node %d: socket error " + "invalid socket family %d set, " + "sk_err=%d/%d\n", dlm_our_nodeid(), + sk->sk_family, sk->sk_err, sk->sk_err_soft); + goto out; } /* below sendcon only handling */ @@ -646,7 +661,6 @@ static void lowcomms_error_report(struct sock *sk) queue_work(send_workqueue, &con->swork); out: - read_unlock_bh(&sk->sk_callback_lock); if (orig_report) orig_report(sk); } @@ -666,20 +680,20 @@ static void restore_callbacks(struct socket *sock) { struct sock *sk = sock->sk; - write_lock_bh(&sk->sk_callback_lock); + lock_sock(sk); sk->sk_user_data = NULL; sk->sk_data_ready = listen_sock.sk_data_ready; sk->sk_state_change = listen_sock.sk_state_change; sk->sk_write_space = listen_sock.sk_write_space; sk->sk_error_report = listen_sock.sk_error_report; - write_unlock_bh(&sk->sk_callback_lock); + release_sock(sk); } static void add_listen_sock(struct socket *sock, struct listen_connection *con) { struct sock *sk = sock->sk; - write_lock_bh(&sk->sk_callback_lock); + lock_sock(sk); save_listen_callbacks(sock); con->sock = sock; @@ -687,7 +701,7 @@ static void add_listen_sock(struct socket *sock, struct listen_connection *con) sk->sk_allocation = GFP_NOFS; /* Install a data_ready callback */ sk->sk_data_ready = lowcomms_listen_data_ready; - write_unlock_bh(&sk->sk_callback_lock); + release_sock(sk); } /* Make a socket active */ @@ -695,7 +709,7 @@ static void add_sock(struct socket *sock, struct connection *con) { struct sock *sk = sock->sk; - write_lock_bh(&sk->sk_callback_lock); + lock_sock(sk); con->sock = sock; sk->sk_user_data = con; @@ -705,7 +719,7 @@ static void add_sock(struct socket *sock, struct connection *con) sk->sk_state_change = lowcomms_state_change; sk->sk_allocation = GFP_NOFS; sk->sk_error_report = lowcomms_error_report; - write_unlock_bh(&sk->sk_callback_lock); + release_sock(sk); } /* Add the port number to an IPv6 or 4 sockaddr and return the address @@ -733,7 +747,7 @@ static void dlm_page_release(struct kref *kref) ref); __free_page(e->page); - kfree(e); + dlm_free_writequeue(e); } static void dlm_msg_release(struct kref *kref) @@ -741,7 +755,7 @@ static void dlm_msg_release(struct kref *kref) struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref); kref_put(&msg->entry->ref, dlm_page_release); - kfree(msg); + dlm_free_msg(msg); } static void free_entry(struct writequeue_entry *e) @@ -925,6 +939,7 @@ static int receive_from_sock(struct connection *con) msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, msg.msg_flags); + trace_dlm_recv(con->nodeid, ret); if (ret == -EAGAIN) break; else if (ret <= 0) @@ -1013,10 +1028,28 @@ static int accept_from_sock(struct listen_connection *con) /* Get the new node's NODEID */ make_sockaddr(&peeraddr, 0, &len); if (addr_to_nodeid(&peeraddr, &nodeid, &mark)) { - unsigned char *b=(unsigned char *)&peeraddr; - log_print("connect from non cluster node"); - print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, - b, sizeof(struct sockaddr_storage)); + switch (peeraddr.ss_family) { + case AF_INET: { + struct sockaddr_in *sin = (struct sockaddr_in *)&peeraddr; + + log_print("connect from non cluster IPv4 node %pI4", + &sin->sin_addr); + break; + } +#if IS_ENABLED(CONFIG_IPV6) + case AF_INET6: { + struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&peeraddr; + + log_print("connect from non cluster IPv6 node %pI6c", + &sin6->sin6_addr); + break; + } +#endif + default: + log_print("invalid family from non cluster node"); + break; + } + sock_release(newsock); return -1; } @@ -1177,33 +1210,33 @@ static void deinit_local(void) kfree(dlm_local_addr[i]); } -static struct writequeue_entry *new_writequeue_entry(struct connection *con, - gfp_t allocation) +static struct writequeue_entry *new_writequeue_entry(struct connection *con) { struct writequeue_entry *entry; - entry = kzalloc(sizeof(*entry), allocation); + entry = dlm_allocate_writequeue(); if (!entry) return NULL; - entry->page = alloc_page(allocation | __GFP_ZERO); + entry->page = alloc_page(GFP_ATOMIC | __GFP_ZERO); if (!entry->page) { - kfree(entry); + dlm_free_writequeue(entry); return NULL; } + entry->offset = 0; + entry->len = 0; + entry->end = 0; + entry->dirty = false; entry->con = con; entry->users = 1; kref_init(&entry->ref); - INIT_LIST_HEAD(&entry->msgs); - return entry; } static struct writequeue_entry *new_wq_entry(struct connection *con, int len, - gfp_t allocation, char **ppc, - void (*cb)(struct dlm_mhandle *mh), - struct dlm_mhandle *mh) + char **ppc, void (*cb)(void *data), + void *data) { struct writequeue_entry *e; @@ -1215,74 +1248,54 @@ static struct writequeue_entry *new_wq_entry(struct connection *con, int len, *ppc = page_address(e->page) + e->end; if (cb) - cb(mh); + cb(data); e->end += len; e->users++; - spin_unlock(&con->writequeue_lock); - - return e; + goto out; } } - spin_unlock(&con->writequeue_lock); - e = new_writequeue_entry(con, allocation); + e = new_writequeue_entry(con); if (!e) - return NULL; + goto out; kref_get(&e->ref); *ppc = page_address(e->page); e->end += len; atomic_inc(&con->writequeue_cnt); - - spin_lock(&con->writequeue_lock); if (cb) - cb(mh); + cb(data); list_add_tail(&e->list, &con->writequeue); - spin_unlock(&con->writequeue_lock); +out: + spin_unlock(&con->writequeue_lock); return e; }; static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len, gfp_t allocation, char **ppc, - void (*cb)(struct dlm_mhandle *mh), - struct dlm_mhandle *mh) + void (*cb)(void *data), + void *data) { struct writequeue_entry *e; struct dlm_msg *msg; - bool sleepable; - msg = kzalloc(sizeof(*msg), allocation); + msg = dlm_allocate_msg(allocation); if (!msg) return NULL; - /* this mutex is being used as a wait to avoid multiple "fast" - * new writequeue page list entry allocs in new_wq_entry in - * normal operation which is sleepable context. Without it - * we could end in multiple writequeue entries with one - * dlm message because multiple callers were waiting at - * the writequeue_lock in new_wq_entry(). - */ - sleepable = gfpflags_normal_context(allocation); - if (sleepable) - mutex_lock(&con->wq_alloc); - kref_init(&msg->ref); - e = new_wq_entry(con, len, allocation, ppc, cb, mh); + e = new_wq_entry(con, len, ppc, cb, data); if (!e) { - if (sleepable) - mutex_unlock(&con->wq_alloc); - - kfree(msg); + dlm_free_msg(msg); return NULL; } - if (sleepable) - mutex_unlock(&con->wq_alloc); - + msg->retransmit = false; + msg->orig_msg = NULL; msg->ppc = *ppc; msg->len = len; msg->entry = e; @@ -1291,8 +1304,8 @@ static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len, } struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation, - char **ppc, void (*cb)(struct dlm_mhandle *mh), - struct dlm_mhandle *mh) + char **ppc, void (*cb)(void *data), + void *data) { struct connection *con; struct dlm_msg *msg; @@ -1313,7 +1326,7 @@ struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation, return NULL; } - msg = dlm_lowcomms_new_msg_con(con, len, allocation, ppc, cb, mh); + msg = dlm_lowcomms_new_msg_con(con, len, allocation, ppc, cb, data); if (!msg) { srcu_read_unlock(&connections_srcu, idx); return NULL; @@ -1403,7 +1416,6 @@ static void send_to_sock(struct connection *con) if (!e) break; - e = list_first_entry(&con->writequeue, struct writequeue_entry, list); len = e->len; offset = e->offset; BUG_ON(len == 0 && e->users == 0); @@ -1411,6 +1423,7 @@ static void send_to_sock(struct connection *con) ret = kernel_sendpage(con->sock, e->page, offset, len, msg_flags); + trace_dlm_send(con->nodeid, ret); if (ret == -EAGAIN || ret == 0) { if (ret == -EAGAIN && test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) && @@ -1680,9 +1693,9 @@ static void _stop_conn(struct connection *con, bool and_other) set_bit(CF_READ_PENDING, &con->flags); set_bit(CF_WRITE_PENDING, &con->flags); if (con->sock && con->sock->sk) { - write_lock_bh(&con->sock->sk->sk_callback_lock); + lock_sock(con->sock->sk); con->sock->sk->sk_user_data = NULL; - write_unlock_bh(&con->sock->sk->sk_callback_lock); + release_sock(con->sock->sk); } if (con->othercon && and_other) _stop_conn(con->othercon, false); @@ -1775,7 +1788,7 @@ static int dlm_listen_for_all(void) result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, SOCK_STREAM, dlm_proto_ops->proto, &sock); if (result < 0) { - log_print("Can't create comms socket, check SCTP is loaded"); + log_print("Can't create comms socket: %d", result); goto out; } diff --git a/fs/dlm/lowcomms.h b/fs/dlm/lowcomms.h index 4ccae07cf005..29369feea991 100644 --- a/fs/dlm/lowcomms.h +++ b/fs/dlm/lowcomms.h @@ -38,8 +38,8 @@ void dlm_lowcomms_stop(void); void dlm_lowcomms_exit(void); int dlm_lowcomms_close(int nodeid); struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation, - char **ppc, void (*cb)(struct dlm_mhandle *mh), - struct dlm_mhandle *mh); + char **ppc, void (*cb)(void *data), + void *data); void dlm_lowcomms_commit_msg(struct dlm_msg *msg); void dlm_lowcomms_put_msg(struct dlm_msg *msg); int dlm_lowcomms_resend_msg(struct dlm_msg *msg); @@ -47,6 +47,8 @@ int dlm_lowcomms_connect_node(int nodeid); int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark); int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len); void dlm_midcomms_receive_done(int nodeid); +struct kmem_cache *dlm_lowcomms_writequeue_cache_create(void); +struct kmem_cache *dlm_lowcomms_msg_cache_create(void); #endif /* __LOWCOMMS_DOT_H__ */ diff --git a/fs/dlm/main.c b/fs/dlm/main.c index afc66a1346d3..1c5be4b70ac1 100644 --- a/fs/dlm/main.c +++ b/fs/dlm/main.c @@ -19,6 +19,9 @@ #include "config.h" #include "lowcomms.h" +#define CREATE_TRACE_POINTS +#include <trace/events/dlm.h> + static int __init init_dlm(void) { int error; diff --git a/fs/dlm/member.c b/fs/dlm/member.c index 731d489aa323..61f906e705db 100644 --- a/fs/dlm/member.c +++ b/fs/dlm/member.c @@ -442,8 +442,7 @@ static int ping_members(struct dlm_ls *ls) int error = 0; list_for_each_entry(memb, &ls->ls_nodes, list) { - error = dlm_recovery_stopped(ls); - if (error) { + if (dlm_recovery_stopped(ls)) { error = -EINTR; break; } diff --git a/fs/dlm/memory.c b/fs/dlm/memory.c index 5918f4d39586..ce35c3c19aeb 100644 --- a/fs/dlm/memory.c +++ b/fs/dlm/memory.c @@ -10,32 +10,61 @@ ******************************************************************************/ #include "dlm_internal.h" +#include "midcomms.h" +#include "lowcomms.h" #include "config.h" #include "memory.h" +static struct kmem_cache *writequeue_cache; +static struct kmem_cache *mhandle_cache; +static struct kmem_cache *msg_cache; static struct kmem_cache *lkb_cache; static struct kmem_cache *rsb_cache; int __init dlm_memory_init(void) { + writequeue_cache = dlm_lowcomms_writequeue_cache_create(); + if (!writequeue_cache) + goto out; + + mhandle_cache = dlm_midcomms_cache_create(); + if (!mhandle_cache) + goto mhandle; + lkb_cache = kmem_cache_create("dlm_lkb", sizeof(struct dlm_lkb), __alignof__(struct dlm_lkb), 0, NULL); if (!lkb_cache) - return -ENOMEM; + goto lkb; + + msg_cache = dlm_lowcomms_msg_cache_create(); + if (!msg_cache) + goto msg; rsb_cache = kmem_cache_create("dlm_rsb", sizeof(struct dlm_rsb), __alignof__(struct dlm_rsb), 0, NULL); - if (!rsb_cache) { - kmem_cache_destroy(lkb_cache); - return -ENOMEM; - } + if (!rsb_cache) + goto rsb; return 0; + +rsb: + kmem_cache_destroy(msg_cache); +msg: + kmem_cache_destroy(lkb_cache); +lkb: + kmem_cache_destroy(mhandle_cache); +mhandle: + kmem_cache_destroy(writequeue_cache); +out: + return -ENOMEM; } void dlm_memory_exit(void) { + kmem_cache_destroy(writequeue_cache); + kmem_cache_destroy(mhandle_cache); + kmem_cache_destroy(msg_cache); kmem_cache_destroy(lkb_cache); kmem_cache_destroy(rsb_cache); } @@ -89,3 +118,32 @@ void dlm_free_lkb(struct dlm_lkb *lkb) kmem_cache_free(lkb_cache, lkb); } +struct dlm_mhandle *dlm_allocate_mhandle(void) +{ + return kmem_cache_alloc(mhandle_cache, GFP_NOFS); +} + +void dlm_free_mhandle(struct dlm_mhandle *mhandle) +{ + kmem_cache_free(mhandle_cache, mhandle); +} + +struct writequeue_entry *dlm_allocate_writequeue(void) +{ + return kmem_cache_alloc(writequeue_cache, GFP_ATOMIC); +} + +void dlm_free_writequeue(struct writequeue_entry *writequeue) +{ + kmem_cache_free(writequeue_cache, writequeue); +} + +struct dlm_msg *dlm_allocate_msg(gfp_t allocation) +{ + return kmem_cache_alloc(msg_cache, allocation); +} + +void dlm_free_msg(struct dlm_msg *msg) +{ + kmem_cache_free(msg_cache, msg); +} diff --git a/fs/dlm/memory.h b/fs/dlm/memory.h index 4f218ea4b187..7bd3f1a391ca 100644 --- a/fs/dlm/memory.h +++ b/fs/dlm/memory.h @@ -20,6 +20,12 @@ struct dlm_lkb *dlm_allocate_lkb(struct dlm_ls *ls); void dlm_free_lkb(struct dlm_lkb *l); char *dlm_allocate_lvb(struct dlm_ls *ls); void dlm_free_lvb(char *l); +struct dlm_mhandle *dlm_allocate_mhandle(void); +void dlm_free_mhandle(struct dlm_mhandle *mhandle); +struct writequeue_entry *dlm_allocate_writequeue(void); +void dlm_free_writequeue(struct writequeue_entry *writequeue); +struct dlm_msg *dlm_allocate_msg(gfp_t allocation); +void dlm_free_msg(struct dlm_msg *msg); #endif /* __MEMORY_DOT_H__ */ diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c index 7ae39ec8d9b0..3635e42b0669 100644 --- a/fs/dlm/midcomms.c +++ b/fs/dlm/midcomms.c @@ -137,6 +137,7 @@ #include "dlm_internal.h" #include "lowcomms.h" #include "config.h" +#include "memory.h" #include "lock.h" #include "util.h" #include "midcomms.h" @@ -220,6 +221,12 @@ DEFINE_STATIC_SRCU(nodes_srcu); */ static DEFINE_MUTEX(close_lock); +struct kmem_cache *dlm_midcomms_cache_create(void) +{ + return kmem_cache_create("dlm_mhandle", sizeof(struct dlm_mhandle), + 0, 0, NULL); +} + static inline const char *dlm_state_str(int state) { switch (state) { @@ -279,7 +286,7 @@ static void dlm_mhandle_release(struct rcu_head *rcu) struct dlm_mhandle *mh = container_of(rcu, struct dlm_mhandle, rcu); dlm_lowcomms_put_msg(mh->msg); - kfree(mh); + dlm_free_mhandle(mh); } static void dlm_mhandle_delete(struct midcomms_node *node, @@ -909,11 +916,11 @@ int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len) if (msglen > len) break; - switch (le32_to_cpu(hd->h_version)) { - case DLM_VERSION_3_1: + switch (hd->h_version) { + case cpu_to_le32(DLM_VERSION_3_1): dlm_midcomms_receive_buffer_3_1((union dlm_packet *)ptr, nodeid); break; - case DLM_VERSION_3_2: + case cpu_to_le32(DLM_VERSION_3_2): dlm_midcomms_receive_buffer_3_2((union dlm_packet *)ptr, nodeid); break; default: @@ -969,7 +976,7 @@ void dlm_midcomms_receive_done(int nodeid) spin_unlock(&node->state_lock); /* do nothing FIN has it's own ack send */ break; - }; + } srcu_read_unlock(&nodes_srcu, idx); } @@ -1020,8 +1027,10 @@ static void dlm_fill_opts_header(struct dlm_opts *opts, uint16_t inner_len, header_out(&opts->o_header); } -static void midcomms_new_msg_cb(struct dlm_mhandle *mh) +static void midcomms_new_msg_cb(void *data) { + struct dlm_mhandle *mh = data; + atomic_inc(&mh->node->send_queue_cnt); spin_lock(&mh->node->send_queue_lock); @@ -1071,10 +1080,12 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len, /* this is a bug, however we going on and hope it will be resolved */ WARN_ON(test_bit(DLM_NODE_FLAG_STOP_TX, &node->flags)); - mh = kzalloc(sizeof(*mh), GFP_NOFS); + mh = dlm_allocate_mhandle(); if (!mh) goto err; + mh->committed = false; + mh->ack_rcv = NULL; mh->idx = idx; mh->node = node; @@ -1083,7 +1094,7 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len, msg = dlm_lowcomms_new_msg(nodeid, len, allocation, ppc, NULL, NULL); if (!msg) { - kfree(mh); + dlm_free_mhandle(mh); goto err; } @@ -1092,13 +1103,13 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len, msg = dlm_midcomms_get_msg_3_2(mh, nodeid, len, allocation, ppc); if (!msg) { - kfree(mh); + dlm_free_mhandle(mh); goto err; } break; default: - kfree(mh); + dlm_free_mhandle(mh); WARN_ON(1); goto err; } @@ -1134,7 +1145,7 @@ void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh) dlm_lowcomms_commit_msg(mh->msg); dlm_lowcomms_put_msg(mh->msg); /* mh is not part of rcu list in this case */ - kfree(mh); + dlm_free_mhandle(mh); break; case DLM_VERSION_3_2: dlm_midcomms_commit_msg_3_2(mh); @@ -1231,7 +1242,7 @@ void dlm_midcomms_add_member(int nodeid) } node->users++; - pr_debug("users inc count %d\n", node->users); + pr_debug("node %d users inc count %d\n", nodeid, node->users); spin_unlock(&node->state_lock); srcu_read_unlock(&nodes_srcu, idx); @@ -1254,7 +1265,7 @@ void dlm_midcomms_remove_member(int nodeid) spin_lock(&node->state_lock); node->users--; - pr_debug("users dec count %d\n", node->users); + pr_debug("node %d users dec count %d\n", nodeid, node->users); /* hitting users count to zero means the * other side is running dlm_midcomms_stop() @@ -1425,3 +1436,51 @@ int dlm_midcomms_close(int nodeid) return ret; } + +/* debug functionality to send raw dlm msg from user space */ +struct dlm_rawmsg_data { + struct midcomms_node *node; + void *buf; +}; + +static void midcomms_new_rawmsg_cb(void *data) +{ + struct dlm_rawmsg_data *rd = data; + struct dlm_header *h = rd->buf; + + switch (h->h_version) { + case cpu_to_le32(DLM_VERSION_3_1): + break; + default: + switch (h->h_cmd) { + case DLM_OPTS: + if (!h->u.h_seq) + h->u.h_seq = rd->node->seq_send++; + break; + default: + break; + } + break; + } +} + +int dlm_midcomms_rawmsg_send(struct midcomms_node *node, void *buf, + int buflen) +{ + struct dlm_rawmsg_data rd; + struct dlm_msg *msg; + char *msgbuf; + + rd.node = node; + rd.buf = buf; + + msg = dlm_lowcomms_new_msg(node->nodeid, buflen, GFP_NOFS, + &msgbuf, midcomms_new_rawmsg_cb, &rd); + if (!msg) + return -ENOMEM; + + memcpy(msgbuf, buf, buflen); + dlm_lowcomms_commit_msg(msg); + return 0; +} + diff --git a/fs/dlm/midcomms.h b/fs/dlm/midcomms.h index 579abc6929be..82bcd9661922 100644 --- a/fs/dlm/midcomms.h +++ b/fs/dlm/midcomms.h @@ -28,6 +28,9 @@ const char *dlm_midcomms_state(struct midcomms_node *node); unsigned long dlm_midcomms_flags(struct midcomms_node *node); int dlm_midcomms_send_queue_cnt(struct midcomms_node *node); uint32_t dlm_midcomms_version(struct midcomms_node *node); +int dlm_midcomms_rawmsg_send(struct midcomms_node *node, void *buf, + int buflen); +struct kmem_cache *dlm_midcomms_cache_create(void); #endif /* __MIDCOMMS_DOT_H__ */ diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c index 6cba86470278..5821b777a1a7 100644 --- a/fs/dlm/rcom.c +++ b/fs/dlm/rcom.c @@ -601,7 +601,7 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) spin_lock(&ls->ls_recover_lock); status = ls->ls_recover_status; - stop = test_bit(LSFL_RECOVER_STOP, &ls->ls_flags); + stop = dlm_recovery_stopped(ls); seq = ls->ls_recover_seq; spin_unlock(&ls->ls_recover_lock); diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c index 97d052cea5a9..a55dfce705dd 100644 --- a/fs/dlm/recoverd.c +++ b/fs/dlm/recoverd.c @@ -124,8 +124,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) dlm_recover_waiters_pre(ls); - error = dlm_recovery_stopped(ls); - if (error) { + if (dlm_recovery_stopped(ls)) { error = -EINTR; goto fail; } diff --git a/fs/dlm/requestqueue.c b/fs/dlm/requestqueue.c index e89e0ff8bfa3..ccb5307c21e9 100644 --- a/fs/dlm/requestqueue.c +++ b/fs/dlm/requestqueue.c @@ -44,6 +44,7 @@ void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_message *ms) e->nodeid = nodeid; memcpy(&e->request, ms, ms->m_header.h_length); + atomic_inc(&ls->ls_requestqueue_cnt); mutex_lock(&ls->ls_requestqueue_mutex); list_add_tail(&e->list, &ls->ls_requestqueue); mutex_unlock(&ls->ls_requestqueue_mutex); @@ -89,6 +90,8 @@ int dlm_process_requestqueue(struct dlm_ls *ls) mutex_lock(&ls->ls_requestqueue_mutex); list_del(&e->list); + if (atomic_dec_and_test(&ls->ls_requestqueue_cnt)) + wake_up(&ls->ls_requestqueue_wait); kfree(e); if (dlm_locking_stopped(ls)) { @@ -115,14 +118,8 @@ int dlm_process_requestqueue(struct dlm_ls *ls) void dlm_wait_requestqueue(struct dlm_ls *ls) { - for (;;) { - mutex_lock(&ls->ls_requestqueue_mutex); - if (list_empty(&ls->ls_requestqueue)) - break; - mutex_unlock(&ls->ls_requestqueue_mutex); - schedule(); - } - mutex_unlock(&ls->ls_requestqueue_mutex); + wait_event(ls->ls_requestqueue_wait, + atomic_read(&ls->ls_requestqueue_cnt) == 0); } static int purge_request(struct dlm_ls *ls, struct dlm_message *ms, int nodeid) @@ -130,7 +127,7 @@ static int purge_request(struct dlm_ls *ls, struct dlm_message *ms, int nodeid) uint32_t type = ms->m_type; /* the ls is being cleaned up and freed by release_lockspace */ - if (!ls->ls_count) + if (!atomic_read(&ls->ls_count)) return 1; if (dlm_is_removed(ls, nodeid)) @@ -161,6 +158,8 @@ void dlm_purge_requestqueue(struct dlm_ls *ls) if (purge_request(ls, ms, e->nodeid)) { list_del(&e->list); + if (atomic_dec_and_test(&ls->ls_requestqueue_cnt)) + wake_up(&ls->ls_requestqueue_wait); kfree(e); } } |