diff options
Diffstat (limited to '')
-rw-r--r-- | fs/dlm/lowcomms.c (renamed from fs/dlm/lowcomms-tcp.c) | 788 |
1 files changed, 628 insertions, 160 deletions
diff --git a/fs/dlm/lowcomms-tcp.c b/fs/dlm/lowcomms.c index 07e0a122c32f..27970a58d29b 100644 --- a/fs/dlm/lowcomms-tcp.c +++ b/fs/dlm/lowcomms.c @@ -36,30 +36,36 @@ * of high load. Also, this way, the sending thread can collect together * messages bound for one node and send them in one block. * - * I don't see any problem with the recv thread executing the locking - * code on behalf of remote processes as the locking code is - * short, efficient and never waits. + * lowcomms will choose to use wither TCP or SCTP as its transport layer + * depending on the configuration variable 'protocol'. This should be set + * to 0 (default) for TCP or 1 for SCTP. It shouldbe configured using a + * cluster-wide mechanism as it must be the same on all nodes of the cluster + * for the DLM to function. * */ - #include <asm/ioctls.h> #include <net/sock.h> #include <net/tcp.h> #include <linux/pagemap.h> +#include <linux/idr.h> +#include <linux/file.h> +#include <linux/sctp.h> +#include <net/sctp/user.h> #include "dlm_internal.h" #include "lowcomms.h" #include "midcomms.h" #include "config.h" +#define NEEDED_RMEM (4*1024*1024) + struct cbuf { unsigned int base; unsigned int len; unsigned int mask; }; -#define NODE_INCREMENT 32 static void cbuf_add(struct cbuf *cb, int n) { cb->len += n; @@ -88,28 +94,25 @@ static bool cbuf_empty(struct cbuf *cb) return cb->len == 0; } -/* Maximum number of incoming messages to process before - doing a cond_resched() -*/ -#define MAX_RX_MSG_COUNT 25 - struct connection { struct socket *sock; /* NULL if not connected */ uint32_t nodeid; /* So we know who we are in the list */ struct mutex sock_mutex; - unsigned long flags; /* bit 1,2 = We are on the read/write lists */ + unsigned long flags; #define CF_READ_PENDING 1 #define CF_WRITE_PENDING 2 #define CF_CONNECT_PENDING 3 -#define CF_IS_OTHERCON 4 +#define CF_INIT_PENDING 4 +#define CF_IS_OTHERCON 5 struct list_head writequeue; /* List of outgoing writequeue_entries */ - struct list_head listenlist; /* List of allocated listening sockets */ spinlock_t writequeue_lock; int (*rx_action) (struct connection *); /* What to do when active */ + void (*connect_action) (struct connection *); /* What to do to connect */ struct page *rx_page; struct cbuf cb; int retries; #define MAX_CONNECT_RETRIES 3 + int sctp_assoc; struct connection *othercon; struct work_struct rwork; /* Receive workqueue */ struct work_struct swork; /* Send workqueue */ @@ -127,68 +130,136 @@ struct writequeue_entry { struct connection *con; }; -static struct sockaddr_storage dlm_local_addr; +static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT]; +static int dlm_local_count; /* Work queues */ static struct workqueue_struct *recv_workqueue; static struct workqueue_struct *send_workqueue; -/* An array of pointers to connections, indexed by NODEID */ -static struct connection **connections; +static DEFINE_IDR(connections_idr); static DECLARE_MUTEX(connections_lock); +static int max_nodeid; static struct kmem_cache *con_cache; -static int conn_array_size; static void process_recv_sockets(struct work_struct *work); static void process_send_sockets(struct work_struct *work); -static struct connection *nodeid2con(int nodeid, gfp_t allocation) +/* + * If 'allocation' is zero then we don't attempt to create a new + * connection structure for this node. + */ +static struct connection *__nodeid2con(int nodeid, gfp_t alloc) { struct connection *con = NULL; + int r; + int n; - down(&connections_lock); - if (nodeid >= conn_array_size) { - int new_size = nodeid + NODE_INCREMENT; - struct connection **new_conns; + con = idr_find(&connections_idr, nodeid); + if (con || !alloc) + return con; - new_conns = kzalloc(sizeof(struct connection *) * - new_size, allocation); - if (!new_conns) - goto finish; + r = idr_pre_get(&connections_idr, alloc); + if (!r) + return NULL; + + con = kmem_cache_zalloc(con_cache, alloc); + if (!con) + return NULL; - memcpy(new_conns, connections, sizeof(struct connection *) * conn_array_size); - conn_array_size = new_size; - kfree(connections); - connections = new_conns; + r = idr_get_new_above(&connections_idr, con, nodeid, &n); + if (r) { + kmem_cache_free(con_cache, con); + return NULL; + } + if (n != nodeid) { + idr_remove(&connections_idr, n); + kmem_cache_free(con_cache, con); + return NULL; } - con = connections[nodeid]; - if (con == NULL && allocation) { - con = kmem_cache_zalloc(con_cache, allocation); - if (!con) - goto finish; + con->nodeid = nodeid; + mutex_init(&con->sock_mutex); + INIT_LIST_HEAD(&con->writequeue); + spin_lock_init(&con->writequeue_lock); + INIT_WORK(&con->swork, process_send_sockets); + INIT_WORK(&con->rwork, process_recv_sockets); - con->nodeid = nodeid; - mutex_init(&con->sock_mutex); - INIT_LIST_HEAD(&con->writequeue); - spin_lock_init(&con->writequeue_lock); - INIT_WORK(&con->swork, process_send_sockets); - INIT_WORK(&con->rwork, process_recv_sockets); + /* Setup action pointers for child sockets */ + if (con->nodeid) { + struct connection *zerocon = idr_find(&connections_idr, 0); - connections[nodeid] = con; + con->connect_action = zerocon->connect_action; + if (!con->rx_action) + con->rx_action = zerocon->rx_action; } -finish: + if (nodeid > max_nodeid) + max_nodeid = nodeid; + + return con; +} + +static struct connection *nodeid2con(int nodeid, gfp_t allocation) +{ + struct connection *con; + + down(&connections_lock); + con = __nodeid2con(nodeid, allocation); up(&connections_lock); + return con; } +/* This is a bit drastic, but only called when things go wrong */ +static struct connection *assoc2con(int assoc_id) +{ + int i; + struct connection *con; + + down(&connections_lock); + for (i=0; i<=max_nodeid; i++) { + con = __nodeid2con(i, 0); + if (con && con->sctp_assoc == assoc_id) { + up(&connections_lock); + return con; + } + } + up(&connections_lock); + return NULL; +} + +static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr) +{ + struct sockaddr_storage addr; + int error; + + if (!dlm_local_count) + return -1; + + error = dlm_nodeid_to_addr(nodeid, &addr); + if (error) + return error; + + if (dlm_local_addr[0]->ss_family == AF_INET) { + struct sockaddr_in *in4 = (struct sockaddr_in *) &addr; + struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr; + ret4->sin_addr.s_addr = in4->sin_addr.s_addr; + } else { + struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &addr; + struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr; + memcpy(&ret6->sin6_addr, &in6->sin6_addr, + sizeof(in6->sin6_addr)); + } + + return 0; +} + /* Data available on socket or listen socket received a connect */ static void lowcomms_data_ready(struct sock *sk, int count_unused) { struct connection *con = sock2con(sk); - if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) queue_work(recv_workqueue, &con->rwork); } @@ -222,20 +293,21 @@ static int add_sock(struct socket *sock, struct connection *con) con->sock->sk->sk_data_ready = lowcomms_data_ready; con->sock->sk->sk_write_space = lowcomms_write_space; con->sock->sk->sk_state_change = lowcomms_state_change; - + con->sock->sk->sk_user_data = con; return 0; } -/* Add the port number to an IP6 or 4 sockaddr and return the address +/* Add the port number to an IPv6 or 4 sockaddr and return the address length */ static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, int *addr_len) { - saddr->ss_family = dlm_local_addr.ss_family; + saddr->ss_family = dlm_local_addr[0]->ss_family; if (saddr->ss_family == AF_INET) { struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; in4_addr->sin_port = cpu_to_be16(port); *addr_len = sizeof(struct sockaddr_in); + memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); } else { struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; in6_addr->sin6_port = cpu_to_be16(port); @@ -264,6 +336,193 @@ static void close_connection(struct connection *con, bool and_other) mutex_unlock(&con->sock_mutex); } +/* We only send shutdown messages to nodes that are not part of the cluster */ +static void sctp_send_shutdown(sctp_assoc_t associd) +{ + static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; + struct msghdr outmessage; + struct cmsghdr *cmsg; + struct sctp_sndrcvinfo *sinfo; + int ret; + struct connection *con; + + con = nodeid2con(0,0); + BUG_ON(con == NULL); + + outmessage.msg_name = NULL; + outmessage.msg_namelen = 0; + outmessage.msg_control = outcmsg; + outmessage.msg_controllen = sizeof(outcmsg); + outmessage.msg_flags = MSG_EOR; + + cmsg = CMSG_FIRSTHDR(&outmessage); + cmsg->cmsg_level = IPPROTO_SCTP; + cmsg->cmsg_type = SCTP_SNDRCV; + cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); + outmessage.msg_controllen = cmsg->cmsg_len; + sinfo = CMSG_DATA(cmsg); + memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); + + sinfo->sinfo_flags |= MSG_EOF; + sinfo->sinfo_assoc_id = associd; + + ret = kernel_sendmsg(con->sock, &outmessage, NULL, 0, 0); + + if (ret != 0) + log_print("send EOF to node failed: %d", ret); +} + +/* INIT failed but we don't know which node... + restart INIT on all pending nodes */ +static void sctp_init_failed(void) +{ + int i; + struct connection *con; + + down(&connections_lock); + for (i=1; i<=max_nodeid; i++) { + con = __nodeid2con(i, 0); + if (!con) + continue; + con->sctp_assoc = 0; + if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) { + if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) { + queue_work(send_workqueue, &con->swork); + } + } + } + up(&connections_lock); +} + +/* Something happened to an association */ +static void process_sctp_notification(struct connection *con, + struct msghdr *msg, char *buf) +{ + union sctp_notification *sn = (union sctp_notification *)buf; + + if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) { + switch (sn->sn_assoc_change.sac_state) { + + case SCTP_COMM_UP: + case SCTP_RESTART: + { + /* Check that the new node is in the lockspace */ + struct sctp_prim prim; + int nodeid; + int prim_len, ret; + int addr_len; + struct connection *new_con; + struct file *file; + sctp_peeloff_arg_t parg; + int parglen = sizeof(parg); + + /* + * We get this before any data for an association. + * We verify that the node is in the cluster and + * then peel off a socket for it. + */ + if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) { + log_print("COMM_UP for invalid assoc ID %d", + (int)sn->sn_assoc_change.sac_assoc_id); + sctp_init_failed(); + return; + } + memset(&prim, 0, sizeof(struct sctp_prim)); + prim_len = sizeof(struct sctp_prim); + prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id; + + ret = kernel_getsockopt(con->sock, + IPPROTO_SCTP, + SCTP_PRIMARY_ADDR, + (char*)&prim, + &prim_len); + if (ret < 0) { + log_print("getsockopt/sctp_primary_addr on " + "new assoc %d failed : %d", + (int)sn->sn_assoc_change.sac_assoc_id, + ret); + + /* Retry INIT later */ + new_con = assoc2con(sn->sn_assoc_change.sac_assoc_id); + if (new_con) + clear_bit(CF_CONNECT_PENDING, &con->flags); + return; + } + make_sockaddr(&prim.ssp_addr, 0, &addr_len); + if (dlm_addr_to_nodeid(&prim.ssp_addr, &nodeid)) { + int i; + unsigned char *b=(unsigned char *)&prim.ssp_addr; + log_print("reject connect from unknown addr"); + for (i=0; i<sizeof(struct sockaddr_storage);i++) + printk("%02x ", b[i]); + printk("\n"); + sctp_send_shutdown(prim.ssp_assoc_id); + return; + } + + new_con = nodeid2con(nodeid, GFP_KERNEL); + if (!new_con) + return; + + /* Peel off a new sock */ + parg.associd = sn->sn_assoc_change.sac_assoc_id; + ret = kernel_getsockopt(con->sock, IPPROTO_SCTP, + SCTP_SOCKOPT_PEELOFF, + (void *)&parg, &parglen); + if (ret) { + log_print("Can't peel off a socket for " + "connection %d to node %d: err=%d\n", + parg.associd, nodeid, ret); + } + file = fget(parg.sd); + new_con->sock = SOCKET_I(file->f_dentry->d_inode); + add_sock(new_con->sock, new_con); + fput(file); + put_unused_fd(parg.sd); + + log_print("got new/restarted association %d nodeid %d", + (int)sn->sn_assoc_change.sac_assoc_id, nodeid); + + /* Send any pending writes */ + clear_bit(CF_CONNECT_PENDING, &new_con->flags); + clear_bit(CF_INIT_PENDING, &con->flags); + if (!test_and_set_bit(CF_WRITE_PENDING, &new_con->flags)) { + queue_work(send_workqueue, &new_con->swork); + } + if (!test_and_set_bit(CF_READ_PENDING, &new_con->flags)) + queue_work(recv_workqueue, &new_con->rwork); + } + break; + + case SCTP_COMM_LOST: + case SCTP_SHUTDOWN_COMP: + { + con = assoc2con(sn->sn_assoc_change.sac_assoc_id); + if (con) { + con->sctp_assoc = 0; + } + } + break; + + /* We don't know which INIT failed, so clear the PENDING flags + * on them all. if assoc_id is zero then it will then try + * again */ + + case SCTP_CANT_STR_ASSOC: + { + log_print("Can't start SCTP association - retrying"); + sctp_init_failed(); + } + break; + + default: + log_print("unexpected SCTP assoc change id=%d state=%d", + (int)sn->sn_assoc_change.sac_assoc_id, + sn->sn_assoc_change.sac_state); + } + } +} + /* Data received from remote end */ static int receive_from_sock(struct connection *con) { @@ -274,6 +533,7 @@ static int receive_from_sock(struct connection *con) int r; int call_again_soon = 0; int nvec; + char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; mutex_lock(&con->sock_mutex); @@ -293,12 +553,18 @@ static int receive_from_sock(struct connection *con) cbuf_init(&con->cb, PAGE_CACHE_SIZE); } + /* Only SCTP needs these really */ + memset(&incmsg, 0, sizeof(incmsg)); + msg.msg_control = incmsg; + msg.msg_controllen = sizeof(incmsg); + /* * iov[0] is the bit of the circular buffer between the current end * point (cb.base + cb.len) and the end of the buffer. */ iov[0].iov_len = con->cb.base - cbuf_data(&con->cb); iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb); + iov[1].iov_len = 0; nvec = 1; /* @@ -315,11 +581,20 @@ static int receive_from_sock(struct connection *con) r = ret = kernel_recvmsg(con->sock, &msg, iov, nvec, len, MSG_DONTWAIT | MSG_NOSIGNAL); - if (ret <= 0) goto out_close; - if (ret == -EAGAIN) - goto out_resched; + + /* Process SCTP notifications */ + if (msg.msg_flags & MSG_NOTIFICATION) { + msg.msg_control = incmsg; + msg.msg_controllen = sizeof(incmsg); + + process_sctp_notification(con, &msg, + page_address(con->rx_page) + con->cb.base); + mutex_unlock(&con->sock_mutex); + return 0; + } + BUG_ON(con->nodeid == 0); if (ret == len) call_again_soon = 1; @@ -329,10 +604,10 @@ static int receive_from_sock(struct connection *con) con->cb.base, con->cb.len, PAGE_CACHE_SIZE); if (ret == -EBADMSG) { - printk(KERN_INFO "dlm: lowcomms: addr=%p, base=%u, len=%u, " - "iov_len=%u, iov_base[0]=%p, read=%d\n", - page_address(con->rx_page), con->cb.base, con->cb.len, - len, iov[0].iov_base, r); + log_print("lowcomms: addr=%p, base=%u, len=%u, " + "iov_len=%u, iov_base[0]=%p, read=%d", + page_address(con->rx_page), con->cb.base, con->cb.len, + len, iov[0].iov_base, r); } if (ret < 0) goto out_close; @@ -368,7 +643,7 @@ out_close: } /* Listening socket is busy, accept a connection */ -static int accept_from_sock(struct connection *con) +static int tcp_accept_from_sock(struct connection *con) { int result; struct sockaddr_storage peeraddr; @@ -379,7 +654,7 @@ static int accept_from_sock(struct connection *con) struct connection *addcon; memset(&peeraddr, 0, sizeof(peeraddr)); - result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, + result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM, IPPROTO_TCP, &newsock); if (result < 0) return -ENOMEM; @@ -408,7 +683,7 @@ static int accept_from_sock(struct connection *con) /* Get the new node's NODEID */ make_sockaddr(&peeraddr, 0, &len); if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) { - printk("dlm: connect from non cluster node\n"); + log_print("connect from non cluster node"); sock_release(newsock); mutex_unlock(&con->sock_mutex); return -1; @@ -419,7 +694,6 @@ static int accept_from_sock(struct connection *con) /* Check to see if we already have a connection to this node. This * could happen if the two nodes initiate a connection at roughly * the same time and the connections cross on the wire. - * TEMPORARY FIX: * In this case we store the incoming one in "othercon" */ newcon = nodeid2con(nodeid, GFP_KERNEL); @@ -434,7 +708,7 @@ static int accept_from_sock(struct connection *con) if (!othercon) { othercon = kmem_cache_zalloc(con_cache, GFP_KERNEL); if (!othercon) { - printk("dlm: failed to allocate incoming socket\n"); + log_print("failed to allocate incoming socket"); mutex_unlock(&newcon->sock_mutex); result = -ENOMEM; goto accept_err; @@ -477,12 +751,107 @@ accept_err: sock_release(newsock); if (result != -EAGAIN) - printk("dlm: error accepting connection from node: %d\n", result); + log_print("error accepting connection from node: %d", result); return result; } +static void free_entry(struct writequeue_entry *e) +{ + __free_page(e->page); + kfree(e); +} + +/* Initiate an SCTP association. + This is a special case of send_to_sock() in that we don't yet have a + peeled-off socket for this association, so we use the listening socket + and add the primary IP address of the remote node. + */ +static void sctp_init_assoc(struct connection *con) +{ + struct sockaddr_storage rem_addr; + char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; + struct msghdr outmessage; + struct cmsghdr *cmsg; + struct sctp_sndrcvinfo *sinfo; + struct connection *base_con; + struct writequeue_entry *e; + int len, offset; + int ret; + int addrlen; + struct kvec iov[1]; + + if (test_and_set_bit(CF_INIT_PENDING, &con->flags)) + return; + + if (con->retries++ > MAX_CONNECT_RETRIES) + return; + + log_print("Initiating association with node %d", con->nodeid); + + if (nodeid_to_addr(con->nodeid, (struct sockaddr *)&rem_addr)) { + log_print("no address for nodeid %d", con->nodeid); + return; + } + base_con = nodeid2con(0, 0); + BUG_ON(base_con == NULL); + + make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen); + + outmessage.msg_name = &rem_addr; + outmessage.msg_namelen = addrlen; + outmessage.msg_control = outcmsg; + outmessage.msg_controllen = sizeof(outcmsg); + outmessage.msg_flags = MSG_EOR; + + spin_lock(&con->writequeue_lock); + e = list_entry(con->writequeue.next, struct writequeue_entry, + list); + + BUG_ON((struct list_head *) e == &con->writequeue); + + len = e->len; + offset = e->offset; + spin_unlock(&con->writequeue_lock); + kmap(e->page); + + /* Send the first block off the write queue */ + iov[0].iov_base = page_address(e->page)+offset; + iov[0].iov_len = len; + + cmsg = CMSG_FIRSTHDR(&outmessage); + cmsg->cmsg_level = IPPROTO_SCTP; + cmsg->cmsg_type = SCTP_SNDRCV; + cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); + sinfo = CMSG_DATA(cmsg); + memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); + sinfo->sinfo_ppid = cpu_to_le32(dlm_our_nodeid()); + outmessage.msg_controllen = cmsg->cmsg_len; + + ret = kernel_sendmsg(base_con->sock, &outmessage, iov, 1, len); + if (ret < 0) { + log_print("Send first packet to node %d failed: %d", + con->nodeid, ret); + + /* Try again later */ + clear_bit(CF_CONNECT_PENDING, &con->flags); + clear_bit(CF_INIT_PENDING, &con->flags); + } + else { + spin_lock(&con->writequeue_lock); + e->offset += ret; + e->len -= ret; + + if (e->len == 0 && e->users == 0) { + list_del(&e->list); + kunmap(e->page); + free_entry(e); + } + spin_unlock(&con->writequeue_lock); + } +} + /* Connect a new socket to its peer */ -static void connect_to_sock(struct connection *con) +static void tcp_connect_to_sock(struct connection *con) { int result = -EHOSTUNREACH; struct sockaddr_storage saddr; @@ -505,7 +874,7 @@ static void connect_to_sock(struct connection *con) } /* Create a socket to communicate with */ - result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, + result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM, IPPROTO_TCP, &sock); if (result < 0) goto out_err; @@ -516,11 +885,11 @@ static void connect_to_sock(struct connection *con) sock->sk->sk_user_data = con; con->rx_action = receive_from_sock; + con->connect_action = tcp_connect_to_sock; + add_sock(sock, con); make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len); - add_sock(sock, con); - log_print("connecting to %d", con->nodeid); result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len, @@ -550,64 +919,57 @@ out: return; } -static struct socket *create_listen_sock(struct connection *con, - struct sockaddr_storage *saddr) +static struct socket *tcp_create_listen_sock(struct connection *con, + struct sockaddr_storage *saddr) { struct socket *sock = NULL; - mm_segment_t fs; int result = 0; int one = 1; int addr_len; - if (dlm_local_addr.ss_family == AF_INET) + if (dlm_local_addr[0]->ss_family == AF_INET) addr_len = sizeof(struct sockaddr_in); else addr_len = sizeof(struct sockaddr_in6); /* Create a socket to communicate with */ - result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &sock); + result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM, + IPPROTO_TCP, &sock); if (result < 0) { - printk("dlm: Can't create listening comms socket\n"); + log_print("Can't create listening comms socket"); goto create_out; } - fs = get_fs(); - set_fs(get_ds()); - result = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, - (char *)&one, sizeof(one)); - set_fs(fs); + result = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, + (char *)&one, sizeof(one)); + if (result < 0) { - printk("dlm: Failed to set SO_REUSEADDR on socket: result=%d\n", - result); + log_print("Failed to set SO_REUSEADDR on socket: %d", result); } sock->sk->sk_user_data = con; - con->rx_action = accept_from_sock; + con->rx_action = tcp_accept_from_sock; + con->connect_action = tcp_connect_to_sock; con->sock = sock; /* Bind to our port */ make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len); result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len); if (result < 0) { - printk("dlm: Can't bind to port %d\n", dlm_config.ci_tcp_port); + log_print("Can't bind to port %d", dlm_config.ci_tcp_port); sock_release(sock); sock = NULL; con->sock = NULL; goto create_out; } - - fs = get_fs(); - set_fs(get_ds()); - - result = sock_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, + result = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *)&one, sizeof(one)); - set_fs(fs); if (result < 0) { - printk("dlm: Set keepalive failed: %d\n", result); + log_print("Set keepalive failed: %d", result); } result = sock->ops->listen(sock, 5); if (result < 0) { - printk("dlm: Can't listen on port %d\n", dlm_config.ci_tcp_port); + log_print("Can't listen on port %d", dlm_config.ci_tcp_port); sock_release(sock); sock = NULL; goto create_out; @@ -617,18 +979,146 @@ create_out: return sock; } +/* Get local addresses */ +static void init_local(void) +{ + struct sockaddr_storage sas, *addr; + int i; + + dlm_local_count = 0; + for (i = 0; i < DLM_MAX_ADDR_COUNT - 1; i++) { + if (dlm_our_addr(&sas, i)) + break; + + addr = kmalloc(sizeof(*addr), GFP_KERNEL); + if (!addr) + break; + memcpy(addr, &sas, sizeof(*addr)); + dlm_local_addr[dlm_local_count++] = addr; + } +} + +/* Bind to an IP address. SCTP allows multiple address so it can do + multi-homing */ +static int add_sctp_bind_addr(struct connection *sctp_con, + struct sockaddr_storage *addr, + int addr_len, int num) +{ + int result = 0; + + if (num == 1) + result = kernel_bind(sctp_con->sock, + (struct sockaddr *) addr, + addr_len); + else + result = kernel_setsockopt(sctp_con->sock, SOL_SCTP, + SCTP_SOCKOPT_BINDX_ADD, + (char *)addr, addr_len); + + if (result < 0) + log_print("Can't bind to port %d addr number %d", + dlm_config.ci_tcp_port, num); + + return result; +} -/* Listen on all interfaces */ -static int listen_for_all(void) +/* Initialise SCTP socket and bind to all interfaces */ +static int sctp_listen_for_all(void) +{ + struct socket *sock = NULL; + struct sockaddr_storage localaddr; + struct sctp_event_subscribe subscribe; + int result = -EINVAL, num = 1, i, addr_len; + struct connection *con = nodeid2con(0, GFP_KERNEL); + int bufsize = NEEDED_RMEM; + + if (!con) + return -ENOMEM; + + log_print("Using SCTP for communications"); + + result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET, + IPPROTO_SCTP, &sock); + if (result < 0) { + log_print("Can't create comms socket, check SCTP is loaded"); + goto out; + } + + /* Listen for events */ + memset(&subscribe, 0, sizeof(subscribe)); + subscribe.sctp_data_io_event = 1; + subscribe.sctp_association_event = 1; + subscribe.sctp_send_failure_event = 1; + subscribe.sctp_shutdown_event = 1; + subscribe.sctp_partial_delivery_event = 1; + + result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUF, + (char *)&bufsize, sizeof(bufsize)); + if (result) + log_print("Error increasing buffer space on socket %d", result); + + result = kernel_setsockopt(sock, SOL_SCTP, SCTP_EVENTS, + (char *)&subscribe, sizeof(subscribe)); + if (result < 0) { + log_print("Failed to set SCTP_EVENTS on socket: result=%d", + result); + goto create_delsock; + } + + /* Init con struct */ + sock->sk->sk_user_data = con; + con->sock = sock; + con->sock->sk->sk_data_ready = lowcomms_data_ready; + con->rx_action = receive_from_sock; + con->connect_action = sctp_init_assoc; + + /* Bind to all interfaces. */ + for (i = 0; i < dlm_local_count; i++) { + memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr)); + make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len); + + result = add_sctp_bind_addr(con, &localaddr, addr_len, num); + if (result) + goto create_delsock; + ++num; + } + + result = sock->ops->listen(sock, 5); + if (result < 0) { + log_print("Can't set socket listening"); + goto create_delsock; + } + + return 0; + +create_delsock: + sock_release(sock); + con->sock = NULL; +out: + return result; +} + +static int tcp_listen_for_all(void) { struct socket *sock = NULL; struct connection *con = nodeid2con(0, GFP_KERNEL); int result = -EINVAL; + if (!con) + return -ENOMEM; + /* We don't support multi-homed hosts */ + if (dlm_local_addr[1] != NULL) { + log_print("TCP protocol can't handle multi-homed hosts, " + "try SCTP"); + return -EINVAL; + } + + log_print("Using TCP for communications"); + set_bit(CF_IS_OTHERCON, &con->flags); - sock = create_listen_sock(con, &dlm_local_addr); + sock = tcp_create_listen_sock(con, dlm_local_addr[0]); if (sock) { add_sock(sock, con); result = 0; @@ -666,8 +1156,7 @@ static struct writequeue_entry *new_writequeue_entry(struct connection *con, return entry; } -void *dlm_lowcomms_get_buffer(int nodeid, int len, - gfp_t allocation, char **ppc) +void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc) { struct connection *con; struct writequeue_entry *e; @@ -735,12 +1224,6 @@ out: return; } -static void free_entry(struct writequeue_entry *e) -{ - __free_page(e->page); - kfree(e); -} - /* Send a message */ static void send_to_sock(struct connection *con) { @@ -777,8 +1260,7 @@ static void send_to_sock(struct connection *con) goto out; if (ret <= 0) goto send_error; - } - else { + } else { /* Don't starve people filling buffers */ cond_resched(); } @@ -807,7 +1289,8 @@ send_error: out_connect: mutex_unlock(&con->sock_mutex); - connect_to_sock(con); + if (!test_bit(CF_INIT_PENDING, &con->flags)) + lowcomms_connect_sock(con); return; } @@ -832,9 +1315,6 @@ int dlm_lowcomms_close(int nodeid) { struct connection *con; - if (!connections) - goto out; - log_print("closing connection to node %d", nodeid); con = nodeid2con(nodeid, 0); if (con) { @@ -842,12 +1322,9 @@ int dlm_lowcomms_close(int nodeid) close_connection(con, true); } return 0; - -out: - return -1; } -/* Look for activity on active sockets */ +/* Receive workqueue function */ static void process_recv_sockets(struct work_struct *work) { struct connection *con = container_of(work, struct connection, rwork); @@ -859,15 +1336,14 @@ static void process_recv_sockets(struct work_struct *work) } while (!err); } - +/* Send workqueue function */ static void process_send_sockets(struct work_struct *work) { struct connection *con = container_of(work, struct connection, swork); if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) { - connect_to_sock(con); + con->connect_action(con); } - clear_bit(CF_WRITE_PENDING, &con->flags); send_to_sock(con); } @@ -878,8 +1354,8 @@ static void clean_writequeues(void) { int nodeid; - for (nodeid = 1; nodeid < conn_array_size; nodeid++) { - struct connection *con = nodeid2con(nodeid, 0); + for (nodeid = 1; nodeid <= max_nodeid; nodeid++) { + struct connection *con = __nodeid2con(nodeid, 0); if (con) clean_one_writequeue(con); @@ -916,64 +1392,67 @@ static int work_start(void) void dlm_lowcomms_stop(void) { int i; + struct connection *con; /* Set all the flags to prevent any socket activity. */ - for (i = 0; i < conn_array_size; i++) { - if (connections[i]) - connections[i]->flags |= 0xFF; + down(&connections_lock); + for (i = 0; i <= max_nodeid; i++) { + con = __nodeid2con(i, 0); + if (con) + con->flags |= 0xFF; } + up(&connections_lock); work_stop(); + + down(&connections_lock); clean_writequeues(); - for (i = 0; i < conn_array_size; i++) { - if (connections[i]) { - close_connection(connections[i], true); - if (connections[i]->othercon) - kmem_cache_free(con_cache, connections[i]->othercon); - kmem_cache_free(con_cache, connections[i]); + for (i = 0; i <= max_nodeid; i++) { + con = __nodeid2con(i, 0); + if (con) { + close_connection(con, true); + if (con->othercon) + kmem_cache_free(con_cache, con->othercon); + kmem_cache_free(con_cache, con); } } - - kfree(connections); - connections = NULL; - + max_nodeid = 0; + up(&connections_lock); kmem_cache_destroy(con_cache); + idr_init(&connections_idr); } -/* This is quite likely to sleep... */ int dlm_lowcomms_start(void) { - int error = 0; - - error = -ENOMEM; - connections = kzalloc(sizeof(struct connection *) * - NODE_INCREMENT, GFP_KERNEL); - if (!connections) - goto out; - - conn_array_size = NODE_INCREMENT; + int error = -EINVAL; + struct connection *con; - if (dlm_our_addr(&dlm_local_addr, 0)) { + init_local(); + if (!dlm_local_count) { + error = -ENOTCONN; log_print("no local IP address has been set"); - goto fail_free_conn; - } - if (!dlm_our_addr(&dlm_local_addr, 1)) { - log_print("This dlm comms module does not support multi-homed clustering"); - goto fail_free_conn; + goto out; } + error = -ENOMEM; con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection), __alignof__(struct connection), 0, NULL, NULL); if (!con_cache) - goto fail_free_conn; + goto out; + /* Set some sysctl minima */ + if (sysctl_rmem_max < NEEDED_RMEM) + sysctl_rmem_max = NEEDED_RMEM; /* Start listening */ - error = listen_for_all(); + if (dlm_config.ci_protocol == 0) + error = tcp_listen_for_all(); + else + error = sctp_listen_for_all(); if (error) goto fail_unlisten; @@ -984,24 +1463,13 @@ int dlm_lowcomms_start(void) return 0; fail_unlisten: - close_connection(connections[0], false); - kmem_cache_free(con_cache, connections[0]); + con = nodeid2con(0,0); + if (con) { + close_connection(con, false); + kmem_cache_free(con_cache, con); + } kmem_cache_destroy(con_cache); -fail_free_conn: - kfree(connections); - out: return error; } - -/* - * Overrides for Emacs so that we follow Linus's tabbing style. - * Emacs will notice this stuff at the end of the file and automatically - * adjust the settings for this buffer only. This must remain at the end - * of the file. - * --------------------------------------------------------------------------- - * Local variables: - * c-file-style: "linux" - * End: - */ |