summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--fs/dlm/lowcomms.c1
-rw-r--r--fs/dlm/lowcomms.h1
-rw-r--r--fs/dlm/midcomms.c56
3 files changed, 50 insertions, 8 deletions
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index cce1d50aa09f..8f715c620e1f 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -947,6 +947,7 @@ static int receive_from_sock(struct connection *con)
}
}
+ dlm_midcomms_receive_done(con->nodeid);
mutex_unlock(&con->sock_mutex);
return 0;
diff --git a/fs/dlm/lowcomms.h b/fs/dlm/lowcomms.h
index aaae7115c00d..4ccae07cf005 100644
--- a/fs/dlm/lowcomms.h
+++ b/fs/dlm/lowcomms.h
@@ -46,6 +46,7 @@ int dlm_lowcomms_resend_msg(struct dlm_msg *msg);
int dlm_lowcomms_connect_node(int nodeid);
int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark);
int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len);
+void dlm_midcomms_receive_done(int nodeid);
#endif /* __LOWCOMMS_DOT_H__ */
diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
index e3de268898ed..7ae39ec8d9b0 100644
--- a/fs/dlm/midcomms.c
+++ b/fs/dlm/midcomms.c
@@ -109,12 +109,6 @@
* compatibility. There exists better ways to make a better handling.
* However this should be changed in the next major version bump of dlm.
*
- * Ack handling:
- *
- * Currently we send an ack message for every dlm message. However we
- * can ack multiple dlm messages with one ack by just delaying the ack
- * message. Will reduce some traffic but makes the drop detection slower.
- *
* Tail Size checking:
*
* There exists a message tail payload in e.g. DLM_MSG however we don't
@@ -169,6 +163,7 @@ struct midcomms_node {
#define DLM_NODE_FLAG_CLOSE 1
#define DLM_NODE_FLAG_STOP_TX 2
#define DLM_NODE_FLAG_STOP_RX 3
+#define DLM_NODE_ULP_DELIVERED 4
unsigned long flags;
wait_queue_head_t shutdown_wait;
@@ -480,11 +475,12 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
{
if (seq == node->seq_next) {
node->seq_next++;
- /* send ack before fin */
- dlm_send_ack(node->nodeid, node->seq_next);
switch (p->header.h_cmd) {
case DLM_FIN:
+ /* send ack before fin */
+ dlm_send_ack(node->nodeid, node->seq_next);
+
spin_lock(&node->state_lock);
pr_debug("receive fin msg from node %d with state %s\n",
node->nodeid, dlm_state_str(node->state));
@@ -534,6 +530,7 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
default:
WARN_ON(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
dlm_receive_buffer(p, node->nodeid);
+ set_bit(DLM_NODE_ULP_DELIVERED, &node->flags);
break;
}
} else {
@@ -933,6 +930,49 @@ int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
return ret;
}
+void dlm_midcomms_receive_done(int nodeid)
+{
+ struct midcomms_node *node;
+ int idx;
+
+ idx = srcu_read_lock(&nodes_srcu);
+ node = nodeid2node(nodeid, 0);
+ if (!node) {
+ srcu_read_unlock(&nodes_srcu, idx);
+ return;
+ }
+
+ /* old protocol, we do nothing */
+ switch (node->version) {
+ case DLM_VERSION_3_2:
+ break;
+ default:
+ srcu_read_unlock(&nodes_srcu, idx);
+ return;
+ }
+
+ /* do nothing if we didn't delivered stateful to ulp */
+ if (!test_and_clear_bit(DLM_NODE_ULP_DELIVERED,
+ &node->flags)) {
+ srcu_read_unlock(&nodes_srcu, idx);
+ return;
+ }
+
+ spin_lock(&node->state_lock);
+ /* we only ack if state is ESTABLISHED */
+ switch (node->state) {
+ case DLM_ESTABLISHED:
+ spin_unlock(&node->state_lock);
+ dlm_send_ack(node->nodeid, node->seq_next);
+ break;
+ default:
+ spin_unlock(&node->state_lock);
+ /* do nothing FIN has it's own ack send */
+ break;
+ };
+ srcu_read_unlock(&nodes_srcu, idx);
+}
+
void dlm_midcomms_unack_msg_resend(int nodeid)
{
struct midcomms_node *node;