summaryrefslogtreecommitdiffstats
path: root/fs/dlm/lowcomms.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/dlm/lowcomms.c')
-rw-r--r--fs/dlm/lowcomms.c209
1 files changed, 111 insertions, 98 deletions
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 8f715c620e1f..e284d696c1fd 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -53,9 +53,12 @@
#include <net/sctp/sctp.h>
#include <net/ipv6.h>
+#include <trace/events/dlm.h>
+
#include "dlm_internal.h"
#include "lowcomms.h"
#include "midcomms.h"
+#include "memory.h"
#include "config.h"
#define NEEDED_RMEM (4*1024*1024)
@@ -84,7 +87,6 @@ struct connection {
struct list_head writequeue; /* List of outgoing writequeue_entries */
spinlock_t writequeue_lock;
atomic_t writequeue_cnt;
- struct mutex wq_alloc;
int retries;
#define MAX_CONNECT_RETRIES 3
struct hlist_node list;
@@ -189,6 +191,24 @@ static const struct dlm_proto_ops *dlm_proto_ops;
static void process_recv_sockets(struct work_struct *work);
static void process_send_sockets(struct work_struct *work);
+static void writequeue_entry_ctor(void *data)
+{
+ struct writequeue_entry *entry = data;
+
+ INIT_LIST_HEAD(&entry->msgs);
+}
+
+struct kmem_cache *dlm_lowcomms_writequeue_cache_create(void)
+{
+ return kmem_cache_create("dlm_writequeue", sizeof(struct writequeue_entry),
+ 0, 0, writequeue_entry_ctor);
+}
+
+struct kmem_cache *dlm_lowcomms_msg_cache_create(void)
+{
+ return kmem_cache_create("dlm_msg", sizeof(struct dlm_msg), 0, 0, NULL);
+}
+
/* need to held writequeue_lock */
static struct writequeue_entry *con_next_wq(struct connection *con)
{
@@ -199,7 +219,10 @@ static struct writequeue_entry *con_next_wq(struct connection *con)
e = list_first_entry(&con->writequeue, struct writequeue_entry,
list);
- if (e->len == 0)
+ /* if len is zero nothing is to send, if there are users filling
+ * buffers we wait until the users are done so we can send more.
+ */
+ if (e->users || e->len == 0)
return NULL;
return e;
@@ -265,8 +288,6 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc)
return NULL;
}
- mutex_init(&con->wq_alloc);
-
spin_lock(&connections_lock);
/* Because multiple workqueues/threads calls this function it can
* race on multiple cpu's. Instead of locking hot path __find_con()
@@ -486,11 +507,9 @@ static void lowcomms_data_ready(struct sock *sk)
{
struct connection *con;
- read_lock_bh(&sk->sk_callback_lock);
con = sock2con(sk);
if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags))
queue_work(recv_workqueue, &con->rwork);
- read_unlock_bh(&sk->sk_callback_lock);
}
static void lowcomms_listen_data_ready(struct sock *sk)
@@ -505,15 +524,14 @@ static void lowcomms_write_space(struct sock *sk)
{
struct connection *con;
- read_lock_bh(&sk->sk_callback_lock);
con = sock2con(sk);
if (!con)
- goto out;
+ return;
if (!test_and_set_bit(CF_CONNECTED, &con->flags)) {
log_print("successful connected to node %d", con->nodeid);
queue_work(send_workqueue, &con->swork);
- goto out;
+ return;
}
clear_bit(SOCK_NOSPACE, &con->sock->flags);
@@ -524,8 +542,6 @@ static void lowcomms_write_space(struct sock *sk)
}
queue_work(send_workqueue, &con->swork);
-out:
- read_unlock_bh(&sk->sk_callback_lock);
}
static inline void lowcomms_connect_sock(struct connection *con)
@@ -592,42 +608,41 @@ int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark)
static void lowcomms_error_report(struct sock *sk)
{
struct connection *con;
- struct sockaddr_storage saddr;
void (*orig_report)(struct sock *) = NULL;
+ struct inet_sock *inet;
- read_lock_bh(&sk->sk_callback_lock);
con = sock2con(sk);
if (con == NULL)
goto out;
orig_report = listen_sock.sk_error_report;
- if (kernel_getpeername(sk->sk_socket, (struct sockaddr *)&saddr) < 0) {
- printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
- "sending to node %d, port %d, "
- "sk_err=%d/%d\n", dlm_our_nodeid(),
- con->nodeid, dlm_config.ci_tcp_port,
- sk->sk_err, sk->sk_err_soft);
- } else if (saddr.ss_family == AF_INET) {
- struct sockaddr_in *sin4 = (struct sockaddr_in *)&saddr;
+ inet = inet_sk(sk);
+ switch (sk->sk_family) {
+ case AF_INET:
printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
- "sending to node %d at %pI4, port %d, "
+ "sending to node %d at %pI4, dport %d, "
"sk_err=%d/%d\n", dlm_our_nodeid(),
- con->nodeid, &sin4->sin_addr.s_addr,
- dlm_config.ci_tcp_port, sk->sk_err,
+ con->nodeid, &inet->inet_daddr,
+ ntohs(inet->inet_dport), sk->sk_err,
sk->sk_err_soft);
- } else {
- struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&saddr;
-
+ break;
+#if IS_ENABLED(CONFIG_IPV6)
+ case AF_INET6:
printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
- "sending to node %d at %u.%u.%u.%u, "
- "port %d, sk_err=%d/%d\n", dlm_our_nodeid(),
- con->nodeid, sin6->sin6_addr.s6_addr32[0],
- sin6->sin6_addr.s6_addr32[1],
- sin6->sin6_addr.s6_addr32[2],
- sin6->sin6_addr.s6_addr32[3],
- dlm_config.ci_tcp_port, sk->sk_err,
+ "sending to node %d at %pI6c, "
+ "dport %d, sk_err=%d/%d\n", dlm_our_nodeid(),
+ con->nodeid, &sk->sk_v6_daddr,
+ ntohs(inet->inet_dport), sk->sk_err,
sk->sk_err_soft);
+ break;
+#endif
+ default:
+ printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
+ "invalid socket family %d set, "
+ "sk_err=%d/%d\n", dlm_our_nodeid(),
+ sk->sk_family, sk->sk_err, sk->sk_err_soft);
+ goto out;
}
/* below sendcon only handling */
@@ -646,7 +661,6 @@ static void lowcomms_error_report(struct sock *sk)
queue_work(send_workqueue, &con->swork);
out:
- read_unlock_bh(&sk->sk_callback_lock);
if (orig_report)
orig_report(sk);
}
@@ -666,20 +680,20 @@ static void restore_callbacks(struct socket *sock)
{
struct sock *sk = sock->sk;
- write_lock_bh(&sk->sk_callback_lock);
+ lock_sock(sk);
sk->sk_user_data = NULL;
sk->sk_data_ready = listen_sock.sk_data_ready;
sk->sk_state_change = listen_sock.sk_state_change;
sk->sk_write_space = listen_sock.sk_write_space;
sk->sk_error_report = listen_sock.sk_error_report;
- write_unlock_bh(&sk->sk_callback_lock);
+ release_sock(sk);
}
static void add_listen_sock(struct socket *sock, struct listen_connection *con)
{
struct sock *sk = sock->sk;
- write_lock_bh(&sk->sk_callback_lock);
+ lock_sock(sk);
save_listen_callbacks(sock);
con->sock = sock;
@@ -687,7 +701,7 @@ static void add_listen_sock(struct socket *sock, struct listen_connection *con)
sk->sk_allocation = GFP_NOFS;
/* Install a data_ready callback */
sk->sk_data_ready = lowcomms_listen_data_ready;
- write_unlock_bh(&sk->sk_callback_lock);
+ release_sock(sk);
}
/* Make a socket active */
@@ -695,7 +709,7 @@ static void add_sock(struct socket *sock, struct connection *con)
{
struct sock *sk = sock->sk;
- write_lock_bh(&sk->sk_callback_lock);
+ lock_sock(sk);
con->sock = sock;
sk->sk_user_data = con;
@@ -705,7 +719,7 @@ static void add_sock(struct socket *sock, struct connection *con)
sk->sk_state_change = lowcomms_state_change;
sk->sk_allocation = GFP_NOFS;
sk->sk_error_report = lowcomms_error_report;
- write_unlock_bh(&sk->sk_callback_lock);
+ release_sock(sk);
}
/* Add the port number to an IPv6 or 4 sockaddr and return the address
@@ -733,7 +747,7 @@ static void dlm_page_release(struct kref *kref)
ref);
__free_page(e->page);
- kfree(e);
+ dlm_free_writequeue(e);
}
static void dlm_msg_release(struct kref *kref)
@@ -741,7 +755,7 @@ static void dlm_msg_release(struct kref *kref)
struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref);
kref_put(&msg->entry->ref, dlm_page_release);
- kfree(msg);
+ dlm_free_msg(msg);
}
static void free_entry(struct writequeue_entry *e)
@@ -925,6 +939,7 @@ static int receive_from_sock(struct connection *con)
msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
msg.msg_flags);
+ trace_dlm_recv(con->nodeid, ret);
if (ret == -EAGAIN)
break;
else if (ret <= 0)
@@ -1013,10 +1028,28 @@ static int accept_from_sock(struct listen_connection *con)
/* Get the new node's NODEID */
make_sockaddr(&peeraddr, 0, &len);
if (addr_to_nodeid(&peeraddr, &nodeid, &mark)) {
- unsigned char *b=(unsigned char *)&peeraddr;
- log_print("connect from non cluster node");
- print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
- b, sizeof(struct sockaddr_storage));
+ switch (peeraddr.ss_family) {
+ case AF_INET: {
+ struct sockaddr_in *sin = (struct sockaddr_in *)&peeraddr;
+
+ log_print("connect from non cluster IPv4 node %pI4",
+ &sin->sin_addr);
+ break;
+ }
+#if IS_ENABLED(CONFIG_IPV6)
+ case AF_INET6: {
+ struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&peeraddr;
+
+ log_print("connect from non cluster IPv6 node %pI6c",
+ &sin6->sin6_addr);
+ break;
+ }
+#endif
+ default:
+ log_print("invalid family from non cluster node");
+ break;
+ }
+
sock_release(newsock);
return -1;
}
@@ -1177,33 +1210,33 @@ static void deinit_local(void)
kfree(dlm_local_addr[i]);
}
-static struct writequeue_entry *new_writequeue_entry(struct connection *con,
- gfp_t allocation)
+static struct writequeue_entry *new_writequeue_entry(struct connection *con)
{
struct writequeue_entry *entry;
- entry = kzalloc(sizeof(*entry), allocation);
+ entry = dlm_allocate_writequeue();
if (!entry)
return NULL;
- entry->page = alloc_page(allocation | __GFP_ZERO);
+ entry->page = alloc_page(GFP_ATOMIC | __GFP_ZERO);
if (!entry->page) {
- kfree(entry);
+ dlm_free_writequeue(entry);
return NULL;
}
+ entry->offset = 0;
+ entry->len = 0;
+ entry->end = 0;
+ entry->dirty = false;
entry->con = con;
entry->users = 1;
kref_init(&entry->ref);
- INIT_LIST_HEAD(&entry->msgs);
-
return entry;
}
static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
- gfp_t allocation, char **ppc,
- void (*cb)(struct dlm_mhandle *mh),
- struct dlm_mhandle *mh)
+ char **ppc, void (*cb)(void *data),
+ void *data)
{
struct writequeue_entry *e;
@@ -1215,74 +1248,54 @@ static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
*ppc = page_address(e->page) + e->end;
if (cb)
- cb(mh);
+ cb(data);
e->end += len;
e->users++;
- spin_unlock(&con->writequeue_lock);
-
- return e;
+ goto out;
}
}
- spin_unlock(&con->writequeue_lock);
- e = new_writequeue_entry(con, allocation);
+ e = new_writequeue_entry(con);
if (!e)
- return NULL;
+ goto out;
kref_get(&e->ref);
*ppc = page_address(e->page);
e->end += len;
atomic_inc(&con->writequeue_cnt);
-
- spin_lock(&con->writequeue_lock);
if (cb)
- cb(mh);
+ cb(data);
list_add_tail(&e->list, &con->writequeue);
- spin_unlock(&con->writequeue_lock);
+out:
+ spin_unlock(&con->writequeue_lock);
return e;
};
static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len,
gfp_t allocation, char **ppc,
- void (*cb)(struct dlm_mhandle *mh),
- struct dlm_mhandle *mh)
+ void (*cb)(void *data),
+ void *data)
{
struct writequeue_entry *e;
struct dlm_msg *msg;
- bool sleepable;
- msg = kzalloc(sizeof(*msg), allocation);
+ msg = dlm_allocate_msg(allocation);
if (!msg)
return NULL;
- /* this mutex is being used as a wait to avoid multiple "fast"
- * new writequeue page list entry allocs in new_wq_entry in
- * normal operation which is sleepable context. Without it
- * we could end in multiple writequeue entries with one
- * dlm message because multiple callers were waiting at
- * the writequeue_lock in new_wq_entry().
- */
- sleepable = gfpflags_normal_context(allocation);
- if (sleepable)
- mutex_lock(&con->wq_alloc);
-
kref_init(&msg->ref);
- e = new_wq_entry(con, len, allocation, ppc, cb, mh);
+ e = new_wq_entry(con, len, ppc, cb, data);
if (!e) {
- if (sleepable)
- mutex_unlock(&con->wq_alloc);
-
- kfree(msg);
+ dlm_free_msg(msg);
return NULL;
}
- if (sleepable)
- mutex_unlock(&con->wq_alloc);
-
+ msg->retransmit = false;
+ msg->orig_msg = NULL;
msg->ppc = *ppc;
msg->len = len;
msg->entry = e;
@@ -1291,8 +1304,8 @@ static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len,
}
struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
- char **ppc, void (*cb)(struct dlm_mhandle *mh),
- struct dlm_mhandle *mh)
+ char **ppc, void (*cb)(void *data),
+ void *data)
{
struct connection *con;
struct dlm_msg *msg;
@@ -1313,7 +1326,7 @@ struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
return NULL;
}
- msg = dlm_lowcomms_new_msg_con(con, len, allocation, ppc, cb, mh);
+ msg = dlm_lowcomms_new_msg_con(con, len, allocation, ppc, cb, data);
if (!msg) {
srcu_read_unlock(&connections_srcu, idx);
return NULL;
@@ -1403,7 +1416,6 @@ static void send_to_sock(struct connection *con)
if (!e)
break;
- e = list_first_entry(&con->writequeue, struct writequeue_entry, list);
len = e->len;
offset = e->offset;
BUG_ON(len == 0 && e->users == 0);
@@ -1411,6 +1423,7 @@ static void send_to_sock(struct connection *con)
ret = kernel_sendpage(con->sock, e->page, offset, len,
msg_flags);
+ trace_dlm_send(con->nodeid, ret);
if (ret == -EAGAIN || ret == 0) {
if (ret == -EAGAIN &&
test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) &&
@@ -1680,9 +1693,9 @@ static void _stop_conn(struct connection *con, bool and_other)
set_bit(CF_READ_PENDING, &con->flags);
set_bit(CF_WRITE_PENDING, &con->flags);
if (con->sock && con->sock->sk) {
- write_lock_bh(&con->sock->sk->sk_callback_lock);
+ lock_sock(con->sock->sk);
con->sock->sk->sk_user_data = NULL;
- write_unlock_bh(&con->sock->sk->sk_callback_lock);
+ release_sock(con->sock->sk);
}
if (con->othercon && and_other)
_stop_conn(con->othercon, false);
@@ -1775,7 +1788,7 @@ static int dlm_listen_for_all(void)
result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
SOCK_STREAM, dlm_proto_ops->proto, &sock);
if (result < 0) {
- log_print("Can't create comms socket, check SCTP is loaded");
+ log_print("Can't create comms socket: %d", result);
goto out;
}