diff options
-rw-r--r-- | fs/dlm/lowcomms.c | 1 | ||||
-rw-r--r-- | fs/dlm/lowcomms.h | 1 | ||||
-rw-r--r-- | fs/dlm/midcomms.c | 56 |
3 files changed, 50 insertions, 8 deletions
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index cce1d50aa09f..8f715c620e1f 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -947,6 +947,7 @@ static int receive_from_sock(struct connection *con) } } + dlm_midcomms_receive_done(con->nodeid); mutex_unlock(&con->sock_mutex); return 0; diff --git a/fs/dlm/lowcomms.h b/fs/dlm/lowcomms.h index aaae7115c00d..4ccae07cf005 100644 --- a/fs/dlm/lowcomms.h +++ b/fs/dlm/lowcomms.h @@ -46,6 +46,7 @@ int dlm_lowcomms_resend_msg(struct dlm_msg *msg); int dlm_lowcomms_connect_node(int nodeid); int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark); int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len); +void dlm_midcomms_receive_done(int nodeid); #endif /* __LOWCOMMS_DOT_H__ */ diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c index e3de268898ed..7ae39ec8d9b0 100644 --- a/fs/dlm/midcomms.c +++ b/fs/dlm/midcomms.c @@ -109,12 +109,6 @@ * compatibility. There exists better ways to make a better handling. * However this should be changed in the next major version bump of dlm. * - * Ack handling: - * - * Currently we send an ack message for every dlm message. However we - * can ack multiple dlm messages with one ack by just delaying the ack - * message. Will reduce some traffic but makes the drop detection slower. - * * Tail Size checking: * * There exists a message tail payload in e.g. DLM_MSG however we don't @@ -169,6 +163,7 @@ struct midcomms_node { #define DLM_NODE_FLAG_CLOSE 1 #define DLM_NODE_FLAG_STOP_TX 2 #define DLM_NODE_FLAG_STOP_RX 3 +#define DLM_NODE_ULP_DELIVERED 4 unsigned long flags; wait_queue_head_t shutdown_wait; @@ -480,11 +475,12 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p, { if (seq == node->seq_next) { node->seq_next++; - /* send ack before fin */ - dlm_send_ack(node->nodeid, node->seq_next); switch (p->header.h_cmd) { case DLM_FIN: + /* send ack before fin */ + dlm_send_ack(node->nodeid, node->seq_next); + spin_lock(&node->state_lock); pr_debug("receive fin msg from node %d with state %s\n", node->nodeid, dlm_state_str(node->state)); @@ -534,6 +530,7 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p, default: WARN_ON(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags)); dlm_receive_buffer(p, node->nodeid); + set_bit(DLM_NODE_ULP_DELIVERED, &node->flags); break; } } else { @@ -933,6 +930,49 @@ int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len) return ret; } +void dlm_midcomms_receive_done(int nodeid) +{ + struct midcomms_node *node; + int idx; + + idx = srcu_read_lock(&nodes_srcu); + node = nodeid2node(nodeid, 0); + if (!node) { + srcu_read_unlock(&nodes_srcu, idx); + return; + } + + /* old protocol, we do nothing */ + switch (node->version) { + case DLM_VERSION_3_2: + break; + default: + srcu_read_unlock(&nodes_srcu, idx); + return; + } + + /* do nothing if we didn't delivered stateful to ulp */ + if (!test_and_clear_bit(DLM_NODE_ULP_DELIVERED, + &node->flags)) { + srcu_read_unlock(&nodes_srcu, idx); + return; + } + + spin_lock(&node->state_lock); + /* we only ack if state is ESTABLISHED */ + switch (node->state) { + case DLM_ESTABLISHED: + spin_unlock(&node->state_lock); + dlm_send_ack(node->nodeid, node->seq_next); + break; + default: + spin_unlock(&node->state_lock); + /* do nothing FIN has it's own ack send */ + break; + }; + srcu_read_unlock(&nodes_srcu, idx); +} + void dlm_midcomms_unack_msg_resend(int nodeid) { struct midcomms_node *node; |