summaryrefslogtreecommitdiffstats
path: root/net/tipc/socket.c
diff options
context:
space:
mode:
authorYing Xue <ying.xue@windriver.com>2013-06-17 16:54:39 +0200
committerDavid S. Miller <davem@davemloft.net>2013-06-18 00:53:00 +0200
commitc5fa7b3cf3cb22e4ac60485fc2dc187fe012910f (patch)
tree9cc84cfb44bd0962132f9506adec55fe8fb433d4 /net/tipc/socket.c
parenttipc: allow implicit connect for stream sockets (diff)
downloadlinux-c5fa7b3cf3cb22e4ac60485fc2dc187fe012910f.tar.xz
linux-c5fa7b3cf3cb22e4ac60485fc2dc187fe012910f.zip
tipc: introduce new TIPC server infrastructure
TIPC has two internal servers, one providing a subscription service for topology events, and another providing the configuration interface. These servers have previously been running in BH context, accessing the TIPC-port (aka native) API directly. Apart from these servers, even the TIPC socket implementation is partially built on this API. As this API may simultaneously be called via different paths and in different contexts, a complex and costly lock policiy is required in order to protect TIPC internal resources. To eliminate the need for this complex lock policiy, we introduce a new, generic service API that uses kernel sockets for message passing instead of the native API. Once the toplogy and configuration servers are converted to use this new service, all code pertaining to the native API can be removed. This entails a significant reduction in code amount and complexity, and opens up for a complete rework of the locking policy in TIPC. The new service also solves another problem: As the current topology server works in BH context, it cannot easily be blocked when sending of events fails due to congestion. In such cases events may have to be silently dropped, something that is unacceptable. Therefore, the new service keeps a dedicated outbound queue receiving messages from BH context. Once messages are inserted into this queue, we will immediately schedule a work from a special workqueue. This way, messages/events from the topology server are in reality sent in process context, and the server can block if necessary. Analogously, there is a new workqueue for receiving messages. Once a notification about an arriving message is received in BH context, we schedule a work from the receive workqueue to do the job of receiving the message in process context. As both sending and receive messages are now finished in processes, subscribed events cannot be dropped any more. As of this commit, this new server infrastructure is built, but not actually yet called by the existing TIPC code, but since the conversion changes required in order to use it are significant, the addition is kept here as a separate commit. Signed-off-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: Paul Gortmaker <paul.gortmaker@windriver.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to '')
-rw-r--r--net/tipc/socket.c99
1 files changed, 92 insertions, 7 deletions
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index d5fa708f037d..bd8e2cdeceef 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -2,7 +2,7 @@
* net/tipc/socket.c: TIPC socket API
*
* Copyright (c) 2001-2007, 2012 Ericsson AB
- * Copyright (c) 2004-2008, 2010-2012, Wind River Systems
+ * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -63,12 +63,15 @@ static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf);
static void wakeupdispatch(struct tipc_port *tport);
static void tipc_data_ready(struct sock *sk, int len);
static void tipc_write_space(struct sock *sk);
+static int release(struct socket *sock);
+static int accept(struct socket *sock, struct socket *new_sock, int flags);
static const struct proto_ops packet_ops;
static const struct proto_ops stream_ops;
static const struct proto_ops msg_ops;
static struct proto tipc_proto;
+static struct proto tipc_proto_kern;
static int sockets_enabled;
@@ -141,7 +144,7 @@ static void reject_rx_queue(struct sock *sk)
}
/**
- * tipc_create - create a TIPC socket
+ * tipc_sk_create - create a TIPC socket
* @net: network namespace (must be default network)
* @sock: pre-allocated socket structure
* @protocol: protocol indicator (must be 0)
@@ -152,8 +155,8 @@ static void reject_rx_queue(struct sock *sk)
*
* Returns 0 on success, errno otherwise
*/
-static int tipc_create(struct net *net, struct socket *sock, int protocol,
- int kern)
+static int tipc_sk_create(struct net *net, struct socket *sock, int protocol,
+ int kern)
{
const struct proto_ops *ops;
socket_state state;
@@ -183,7 +186,11 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol,
}
/* Allocate socket's protocol area */
- sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto);
+ if (!kern)
+ sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto);
+ else
+ sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto_kern);
+
if (sk == NULL)
return -ENOMEM;
@@ -219,6 +226,78 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol,
}
/**
+ * tipc_sock_create_local - create TIPC socket from inside TIPC module
+ * @type: socket type - SOCK_RDM or SOCK_SEQPACKET
+ *
+ * We cannot use sock_creat_kern here because it bumps module user count.
+ * Since socket owner and creator is the same module we must make sure
+ * that module count remains zero for module local sockets, otherwise
+ * we cannot do rmmod.
+ *
+ * Returns 0 on success, errno otherwise
+ */
+int tipc_sock_create_local(int type, struct socket **res)
+{
+ int rc;
+ struct sock *sk;
+
+ rc = sock_create_lite(AF_TIPC, type, 0, res);
+ if (rc < 0) {
+ pr_err("Failed to create kernel socket\n");
+ return rc;
+ }
+ tipc_sk_create(&init_net, *res, 0, 1);
+
+ sk = (*res)->sk;
+
+ return 0;
+}
+
+/**
+ * tipc_sock_release_local - release socket created by tipc_sock_create_local
+ * @sock: the socket to be released.
+ *
+ * Module reference count is not incremented when such sockets are created,
+ * so we must keep it from being decremented when they are released.
+ */
+void tipc_sock_release_local(struct socket *sock)
+{
+ release(sock);
+ sock->ops = NULL;
+ sock_release(sock);
+}
+
+/**
+ * tipc_sock_accept_local - accept a connection on a socket created
+ * with tipc_sock_create_local. Use this function to avoid that
+ * module reference count is inadvertently incremented.
+ *
+ * @sock: the accepting socket
+ * @newsock: reference to the new socket to be created
+ * @flags: socket flags
+ */
+
+int tipc_sock_accept_local(struct socket *sock, struct socket **newsock,
+ int flags)
+{
+ struct sock *sk = sock->sk;
+ int ret;
+
+ ret = sock_create_lite(sk->sk_family, sk->sk_type,
+ sk->sk_protocol, newsock);
+ if (ret < 0)
+ return ret;
+
+ ret = accept(sock, *newsock, flags);
+ if (ret < 0) {
+ sock_release(*newsock);
+ return ret;
+ }
+ (*newsock)->ops = sock->ops;
+ return ret;
+}
+
+/**
* release - destroy a TIPC socket
* @sock: socket to destroy
*
@@ -1529,7 +1608,7 @@ static int accept(struct socket *sock, struct socket *new_sock, int flags)
buf = skb_peek(&sk->sk_receive_queue);
- res = tipc_create(sock_net(sock->sk), new_sock, 0, 0);
+ res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, 1);
if (res)
goto exit;
@@ -1839,7 +1918,7 @@ static const struct proto_ops stream_ops = {
static const struct net_proto_family tipc_family_ops = {
.owner = THIS_MODULE,
.family = AF_TIPC,
- .create = tipc_create
+ .create = tipc_sk_create
};
static struct proto tipc_proto = {
@@ -1849,6 +1928,12 @@ static struct proto tipc_proto = {
.sysctl_rmem = sysctl_tipc_rmem
};
+static struct proto tipc_proto_kern = {
+ .name = "TIPC",
+ .obj_size = sizeof(struct tipc_sock),
+ .sysctl_rmem = sysctl_tipc_rmem
+};
+
/**
* tipc_socket_init - initialize TIPC socket interface
*