diff options
-rw-r--r-- | include/linux/ceph/messenger.h | 76 | ||||
-rw-r--r-- | net/ceph/messenger.c | 8 | ||||
-rw-r--r-- | net/ceph/messenger_v1.c | 420 |
3 files changed, 257 insertions, 247 deletions
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h index b5268127c55e..54a64e8dfce6 100644 --- a/include/linux/ceph/messenger.h +++ b/include/linux/ceph/messenger.h @@ -264,6 +264,43 @@ struct ceph_msg { #define BASE_DELAY_INTERVAL (HZ / 4) #define MAX_DELAY_INTERVAL (15 * HZ) +struct ceph_connection_v1_info { + struct kvec out_kvec[8], /* sending header/footer data */ + *out_kvec_cur; + int out_kvec_left; /* kvec's left in out_kvec */ + int out_skip; /* skip this many bytes */ + int out_kvec_bytes; /* total bytes left */ + bool out_more; /* there is more data after the kvecs */ + bool out_msg_done; + + struct ceph_auth_handshake *auth; + int auth_retry; /* true if we need a newer authorizer */ + + /* connection negotiation temps */ + u8 in_banner[CEPH_BANNER_MAX_LEN]; + struct ceph_entity_addr actual_peer_addr; + struct ceph_entity_addr peer_addr_for_me; + struct ceph_msg_connect out_connect; + struct ceph_msg_connect_reply in_reply; + + int in_base_pos; /* bytes read */ + + /* message in temps */ + u8 in_tag; /* protocol control byte */ + struct ceph_msg_header in_hdr; + __le64 in_temp_ack; /* for reading an ack */ + + /* message out temps */ + struct ceph_msg_header out_hdr; + __le64 out_temp_ack; /* for writing an ack */ + struct ceph_timespec out_temp_keepalive2; /* for writing keepalive2 + stamp */ + + u32 connect_seq; /* identify the most recent connection + attempt for this session */ + u32 peer_global_seq; /* peer's global seq for this connection */ +}; + /* * A single connection with another host. * @@ -281,21 +318,13 @@ struct ceph_connection { int state; /* CEPH_CON_S_* */ atomic_t sock_state; struct socket *sock; - struct ceph_entity_addr peer_addr; /* peer address */ - struct ceph_entity_addr peer_addr_for_me; unsigned long flags; /* CEPH_CON_F_* */ const char *error_msg; /* error message, if any */ struct ceph_entity_name peer_name; /* peer name */ - + struct ceph_entity_addr peer_addr; /* peer address */ u64 peer_features; - u32 connect_seq; /* identify the most recent connection - attempt for this connection, client */ - u32 peer_global_seq; /* peer's global seq for this connection */ - - struct ceph_auth_handshake *auth; - int auth_retry; /* true if we need a newer authorizer */ struct mutex mutex; @@ -306,41 +335,18 @@ struct ceph_connection { u64 in_seq, in_seq_acked; /* last message received, acked */ - /* connection negotiation temps */ - char in_banner[CEPH_BANNER_MAX_LEN]; - struct ceph_msg_connect out_connect; - struct ceph_msg_connect_reply in_reply; - struct ceph_entity_addr actual_peer_addr; - - /* message out temps */ - struct ceph_msg_header out_hdr; + struct ceph_msg *in_msg; struct ceph_msg *out_msg; /* sending message (== tail of out_sent) */ - bool out_msg_done; - - struct kvec out_kvec[8], /* sending header/footer data */ - *out_kvec_cur; - int out_kvec_left; /* kvec's left in out_kvec */ - int out_skip; /* skip this many bytes */ - int out_kvec_bytes; /* total bytes left */ - int out_more; /* there is more data after the kvecs */ - __le64 out_temp_ack; /* for writing an ack */ - struct ceph_timespec out_temp_keepalive2; /* for writing keepalive2 - stamp */ - /* message in temps */ - struct ceph_msg_header in_hdr; - struct ceph_msg *in_msg; u32 in_front_crc, in_middle_crc, in_data_crc; /* calculated crc */ - char in_tag; /* protocol control byte */ - int in_base_pos; /* bytes read */ - __le64 in_temp_ack; /* for reading an ack */ - struct timespec64 last_keepalive_ack; /* keepalive2 ack stamp */ struct delayed_work work; /* send|recv work */ unsigned long delay; /* current delay interval */ + + struct ceph_connection_v1_info v1; }; extern struct page *ceph_zero_page; diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 544cfdbe52d6..4fb3c33a7b03 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -1448,11 +1448,11 @@ static void con_fault_finish(struct ceph_connection *con) * in case we faulted due to authentication, invalidate our * current tickets so that we can get new ones. */ - if (con->auth_retry) { - dout("auth_retry %d, invalidating\n", con->auth_retry); + if (con->v1.auth_retry) { + dout("auth_retry %d, invalidating\n", con->v1.auth_retry); if (con->ops->invalidate_authorizer) con->ops->invalidate_authorizer(con); - con->auth_retry = 0; + con->v1.auth_retry = 0; } if (con->ops->fault) @@ -1631,7 +1631,7 @@ static void clear_standby(struct ceph_connection *con) if (con->state == CEPH_CON_S_STANDBY) { dout("clear_standby %p and ++connect_seq\n", con); con->state = CEPH_CON_S_PREOPEN; - con->connect_seq++; + con->v1.connect_seq++; WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_WRITE_PENDING)); WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING)); } diff --git a/net/ceph/messenger_v1.c b/net/ceph/messenger_v1.c index 899038a9678e..04f653b3c897 100644 --- a/net/ceph/messenger_v1.c +++ b/net/ceph/messenger_v1.c @@ -110,25 +110,25 @@ static int ceph_tcp_sendpage(struct socket *sock, struct page *page, static void con_out_kvec_reset(struct ceph_connection *con) { - BUG_ON(con->out_skip); + BUG_ON(con->v1.out_skip); - con->out_kvec_left = 0; - con->out_kvec_bytes = 0; - con->out_kvec_cur = &con->out_kvec[0]; + con->v1.out_kvec_left = 0; + con->v1.out_kvec_bytes = 0; + con->v1.out_kvec_cur = &con->v1.out_kvec[0]; } static void con_out_kvec_add(struct ceph_connection *con, size_t size, void *data) { - int index = con->out_kvec_left; + int index = con->v1.out_kvec_left; - BUG_ON(con->out_skip); - BUG_ON(index >= ARRAY_SIZE(con->out_kvec)); + BUG_ON(con->v1.out_skip); + BUG_ON(index >= ARRAY_SIZE(con->v1.out_kvec)); - con->out_kvec[index].iov_len = size; - con->out_kvec[index].iov_base = data; - con->out_kvec_left++; - con->out_kvec_bytes += size; + con->v1.out_kvec[index].iov_len = size; + con->v1.out_kvec[index].iov_base = data; + con->v1.out_kvec_left++; + con->v1.out_kvec_bytes += size; } /* @@ -138,15 +138,14 @@ static void con_out_kvec_add(struct ceph_connection *con, */ static int con_out_kvec_skip(struct ceph_connection *con) { - int off = con->out_kvec_cur - con->out_kvec; int skip = 0; - if (con->out_kvec_bytes > 0) { - skip = con->out_kvec[off + con->out_kvec_left - 1].iov_len; - BUG_ON(con->out_kvec_bytes < skip); - BUG_ON(!con->out_kvec_left); - con->out_kvec_bytes -= skip; - con->out_kvec_left--; + if (con->v1.out_kvec_bytes > 0) { + skip = con->v1.out_kvec_cur[con->v1.out_kvec_left - 1].iov_len; + BUG_ON(con->v1.out_kvec_bytes < skip); + BUG_ON(!con->v1.out_kvec_left); + con->v1.out_kvec_bytes -= skip; + con->v1.out_kvec_left--; } return skip; @@ -186,8 +185,8 @@ static void prepare_write_message_footer(struct ceph_connection *con) } else { m->old_footer.flags = m->footer.flags; } - con->out_more = m->more_to_follow; - con->out_msg_done = true; + con->v1.out_more = m->more_to_follow; + con->v1.out_msg_done = true; } /* @@ -199,16 +198,16 @@ static void prepare_write_message(struct ceph_connection *con) u32 crc; con_out_kvec_reset(con); - con->out_msg_done = false; + con->v1.out_msg_done = false; /* Sneak an ack in there first? If we can get it into the same * TCP packet that's a good thing. */ if (con->in_seq > con->in_seq_acked) { con->in_seq_acked = con->in_seq; con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); - con->out_temp_ack = cpu_to_le64(con->in_seq_acked); - con_out_kvec_add(con, sizeof (con->out_temp_ack), - &con->out_temp_ack); + con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked); + con_out_kvec_add(con, sizeof(con->v1.out_temp_ack), + &con->v1.out_temp_ack); } ceph_con_get_out_msg(con); @@ -223,7 +222,7 @@ static void prepare_write_message(struct ceph_connection *con) /* tag + hdr + front + middle */ con_out_kvec_add(con, sizeof (tag_msg), &tag_msg); - con_out_kvec_add(con, sizeof(con->out_hdr), &con->out_hdr); + con_out_kvec_add(con, sizeof(con->v1.out_hdr), &con->v1.out_hdr); con_out_kvec_add(con, m->front.iov_len, m->front.iov_base); if (m->middle) @@ -233,7 +232,7 @@ static void prepare_write_message(struct ceph_connection *con) /* fill in hdr crc and finalize hdr */ crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc)); con->out_msg->hdr.crc = cpu_to_le32(crc); - memcpy(&con->out_hdr, &con->out_msg->hdr, sizeof(con->out_hdr)); + memcpy(&con->v1.out_hdr, &con->out_msg->hdr, sizeof(con->v1.out_hdr)); /* fill in front and middle crc, footer */ crc = crc32c(0, m->front.iov_base, m->front.iov_len); @@ -253,7 +252,7 @@ static void prepare_write_message(struct ceph_connection *con) con->out_msg->footer.data_crc = 0; if (m->data_length) { prepare_message_data(con->out_msg, m->data_length); - con->out_more = 1; /* data + footer will follow */ + con->v1.out_more = 1; /* data + footer will follow */ } else { /* no, queue up footer too and be done */ prepare_write_message_footer(con); @@ -275,11 +274,11 @@ static void prepare_write_ack(struct ceph_connection *con) con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); - con->out_temp_ack = cpu_to_le64(con->in_seq_acked); - con_out_kvec_add(con, sizeof (con->out_temp_ack), - &con->out_temp_ack); + con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked); + con_out_kvec_add(con, sizeof(con->v1.out_temp_ack), + &con->v1.out_temp_ack); - con->out_more = 1; /* more will follow.. eventually.. */ + con->v1.out_more = 1; /* more will follow.. eventually.. */ ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); } @@ -294,9 +293,9 @@ static void prepare_write_seq(struct ceph_connection *con) con_out_kvec_reset(con); - con->out_temp_ack = cpu_to_le64(con->in_seq_acked); - con_out_kvec_add(con, sizeof (con->out_temp_ack), - &con->out_temp_ack); + con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked); + con_out_kvec_add(con, sizeof(con->v1.out_temp_ack), + &con->v1.out_temp_ack); ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); } @@ -313,9 +312,9 @@ static void prepare_write_keepalive(struct ceph_connection *con) ktime_get_real_ts64(&now); con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2); - ceph_encode_timespec64(&con->out_temp_keepalive2, &now); - con_out_kvec_add(con, sizeof(con->out_temp_keepalive2), - &con->out_temp_keepalive2); + ceph_encode_timespec64(&con->v1.out_temp_keepalive2, &now); + con_out_kvec_add(con, sizeof(con->v1.out_temp_keepalive2), + &con->v1.out_temp_keepalive2); } else { con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive); } @@ -332,19 +331,20 @@ static int get_connect_authorizer(struct ceph_connection *con) int auth_proto; if (!con->ops->get_authorizer) { - con->auth = NULL; - con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; - con->out_connect.authorizer_len = 0; + con->v1.auth = NULL; + con->v1.out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; + con->v1.out_connect.authorizer_len = 0; return 0; } - auth = con->ops->get_authorizer(con, &auth_proto, con->auth_retry); + auth = con->ops->get_authorizer(con, &auth_proto, con->v1.auth_retry); if (IS_ERR(auth)) return PTR_ERR(auth); - con->auth = auth; - con->out_connect.authorizer_protocol = cpu_to_le32(auth_proto); - con->out_connect.authorizer_len = cpu_to_le32(auth->authorizer_buf_len); + con->v1.auth = auth; + con->v1.out_connect.authorizer_protocol = cpu_to_le32(auth_proto); + con->v1.out_connect.authorizer_len = + cpu_to_le32(auth->authorizer_buf_len); return 0; } @@ -357,18 +357,19 @@ static void prepare_write_banner(struct ceph_connection *con) con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr), &con->msgr->my_enc_addr); - con->out_more = 0; + con->v1.out_more = 0; ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); } static void __prepare_write_connect(struct ceph_connection *con) { - con_out_kvec_add(con, sizeof(con->out_connect), &con->out_connect); - if (con->auth) - con_out_kvec_add(con, con->auth->authorizer_buf_len, - con->auth->authorizer_buf); + con_out_kvec_add(con, sizeof(con->v1.out_connect), + &con->v1.out_connect); + if (con->v1.auth) + con_out_kvec_add(con, con->v1.auth->authorizer_buf_len, + con->v1.auth->authorizer_buf); - con->out_more = 0; + con->v1.out_more = 0; ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); } @@ -393,15 +394,15 @@ static int prepare_write_connect(struct ceph_connection *con) } dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, - con->connect_seq, global_seq, proto); + con->v1.connect_seq, global_seq, proto); - con->out_connect.features = - cpu_to_le64(from_msgr(con->msgr)->supported_features); - con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); - con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); - con->out_connect.global_seq = cpu_to_le32(global_seq); - con->out_connect.protocol_version = cpu_to_le32(proto); - con->out_connect.flags = 0; + con->v1.out_connect.features = + cpu_to_le64(from_msgr(con->msgr)->supported_features); + con->v1.out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); + con->v1.out_connect.connect_seq = cpu_to_le32(con->v1.connect_seq); + con->v1.out_connect.global_seq = cpu_to_le32(global_seq); + con->v1.out_connect.protocol_version = cpu_to_le32(proto); + con->v1.out_connect.flags = 0; ret = get_connect_authorizer(con); if (ret) @@ -421,35 +422,36 @@ static int write_partial_kvec(struct ceph_connection *con) { int ret; - dout("write_partial_kvec %p %d left\n", con, con->out_kvec_bytes); - while (con->out_kvec_bytes > 0) { - ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur, - con->out_kvec_left, con->out_kvec_bytes, - con->out_more); + dout("write_partial_kvec %p %d left\n", con, con->v1.out_kvec_bytes); + while (con->v1.out_kvec_bytes > 0) { + ret = ceph_tcp_sendmsg(con->sock, con->v1.out_kvec_cur, + con->v1.out_kvec_left, + con->v1.out_kvec_bytes, + con->v1.out_more); if (ret <= 0) goto out; - con->out_kvec_bytes -= ret; - if (con->out_kvec_bytes == 0) + con->v1.out_kvec_bytes -= ret; + if (!con->v1.out_kvec_bytes) break; /* done */ /* account for full iov entries consumed */ - while (ret >= con->out_kvec_cur->iov_len) { - BUG_ON(!con->out_kvec_left); - ret -= con->out_kvec_cur->iov_len; - con->out_kvec_cur++; - con->out_kvec_left--; + while (ret >= con->v1.out_kvec_cur->iov_len) { + BUG_ON(!con->v1.out_kvec_left); + ret -= con->v1.out_kvec_cur->iov_len; + con->v1.out_kvec_cur++; + con->v1.out_kvec_left--; } /* and for a partially-consumed entry */ if (ret) { - con->out_kvec_cur->iov_len -= ret; - con->out_kvec_cur->iov_base += ret; + con->v1.out_kvec_cur->iov_len -= ret; + con->v1.out_kvec_cur->iov_base += ret; } } - con->out_kvec_left = 0; + con->v1.out_kvec_left = 0; ret = 1; out: dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con, - con->out_kvec_bytes, con->out_kvec_left, ret); + con->v1.out_kvec_bytes, con->v1.out_kvec_left, ret); return ret; /* done! */ } @@ -530,17 +532,17 @@ static int write_partial_skip(struct ceph_connection *con) int more = MSG_MORE | MSG_SENDPAGE_NOTLAST; int ret; - dout("%s %p %d left\n", __func__, con, con->out_skip); - while (con->out_skip > 0) { - size_t size = min(con->out_skip, (int) PAGE_SIZE); + dout("%s %p %d left\n", __func__, con, con->v1.out_skip); + while (con->v1.out_skip > 0) { + size_t size = min(con->v1.out_skip, (int)PAGE_SIZE); - if (size == con->out_skip) + if (size == con->v1.out_skip) more = MSG_MORE; ret = ceph_tcp_sendpage(con->sock, ceph_zero_page, 0, size, more); if (ret <= 0) goto out; - con->out_skip -= ret; + con->v1.out_skip -= ret; } ret = 1; out: @@ -553,39 +555,39 @@ out: static void prepare_read_banner(struct ceph_connection *con) { dout("prepare_read_banner %p\n", con); - con->in_base_pos = 0; + con->v1.in_base_pos = 0; } static void prepare_read_connect(struct ceph_connection *con) { dout("prepare_read_connect %p\n", con); - con->in_base_pos = 0; + con->v1.in_base_pos = 0; } static void prepare_read_ack(struct ceph_connection *con) { dout("prepare_read_ack %p\n", con); - con->in_base_pos = 0; + con->v1.in_base_pos = 0; } static void prepare_read_seq(struct ceph_connection *con) { dout("prepare_read_seq %p\n", con); - con->in_base_pos = 0; - con->in_tag = CEPH_MSGR_TAG_SEQ; + con->v1.in_base_pos = 0; + con->v1.in_tag = CEPH_MSGR_TAG_SEQ; } static void prepare_read_tag(struct ceph_connection *con) { dout("prepare_read_tag %p\n", con); - con->in_base_pos = 0; - con->in_tag = CEPH_MSGR_TAG_READY; + con->v1.in_base_pos = 0; + con->v1.in_tag = CEPH_MSGR_TAG_READY; } static void prepare_read_keepalive_ack(struct ceph_connection *con) { dout("prepare_read_keepalive_ack %p\n", con); - con->in_base_pos = 0; + con->v1.in_base_pos = 0; } /* @@ -595,7 +597,7 @@ static int prepare_read_message(struct ceph_connection *con) { dout("prepare_read_message %p\n", con); BUG_ON(con->in_msg != NULL); - con->in_base_pos = 0; + con->v1.in_base_pos = 0; con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0; return 0; } @@ -603,13 +605,13 @@ static int prepare_read_message(struct ceph_connection *con) static int read_partial(struct ceph_connection *con, int end, int size, void *object) { - while (con->in_base_pos < end) { - int left = end - con->in_base_pos; + while (con->v1.in_base_pos < end) { + int left = end - con->v1.in_base_pos; int have = size - left; int ret = ceph_tcp_recvmsg(con->sock, object + have, left); if (ret <= 0) return ret; - con->in_base_pos += ret; + con->v1.in_base_pos += ret; } return 1; } @@ -623,28 +625,28 @@ static int read_partial_banner(struct ceph_connection *con) int end; int ret; - dout("read_partial_banner %p at %d\n", con, con->in_base_pos); + dout("read_partial_banner %p at %d\n", con, con->v1.in_base_pos); /* peer's banner */ size = strlen(CEPH_BANNER); end = size; - ret = read_partial(con, end, size, con->in_banner); + ret = read_partial(con, end, size, con->v1.in_banner); if (ret <= 0) goto out; - size = sizeof (con->actual_peer_addr); + size = sizeof(con->v1.actual_peer_addr); end += size; - ret = read_partial(con, end, size, &con->actual_peer_addr); + ret = read_partial(con, end, size, &con->v1.actual_peer_addr); if (ret <= 0) goto out; - ceph_decode_banner_addr(&con->actual_peer_addr); + ceph_decode_banner_addr(&con->v1.actual_peer_addr); - size = sizeof (con->peer_addr_for_me); + size = sizeof(con->v1.peer_addr_for_me); end += size; - ret = read_partial(con, end, size, &con->peer_addr_for_me); + ret = read_partial(con, end, size, &con->v1.peer_addr_for_me); if (ret <= 0) goto out; - ceph_decode_banner_addr(&con->peer_addr_for_me); + ceph_decode_banner_addr(&con->v1.peer_addr_for_me); out: return ret; @@ -656,34 +658,34 @@ static int read_partial_connect(struct ceph_connection *con) int end; int ret; - dout("read_partial_connect %p at %d\n", con, con->in_base_pos); + dout("read_partial_connect %p at %d\n", con, con->v1.in_base_pos); - size = sizeof (con->in_reply); + size = sizeof(con->v1.in_reply); end = size; - ret = read_partial(con, end, size, &con->in_reply); + ret = read_partial(con, end, size, &con->v1.in_reply); if (ret <= 0) goto out; - if (con->auth) { - size = le32_to_cpu(con->in_reply.authorizer_len); - if (size > con->auth->authorizer_reply_buf_len) { + if (con->v1.auth) { + size = le32_to_cpu(con->v1.in_reply.authorizer_len); + if (size > con->v1.auth->authorizer_reply_buf_len) { pr_err("authorizer reply too big: %d > %zu\n", size, - con->auth->authorizer_reply_buf_len); + con->v1.auth->authorizer_reply_buf_len); ret = -EINVAL; goto out; } end += size; ret = read_partial(con, end, size, - con->auth->authorizer_reply_buf); + con->v1.auth->authorizer_reply_buf); if (ret <= 0) goto out; } dout("read_partial_connect %p tag %d, con_seq = %u, g_seq = %u\n", - con, (int)con->in_reply.tag, - le32_to_cpu(con->in_reply.connect_seq), - le32_to_cpu(con->in_reply.global_seq)); + con, con->v1.in_reply.tag, + le32_to_cpu(con->v1.in_reply.connect_seq), + le32_to_cpu(con->v1.in_reply.global_seq)); out: return ret; } @@ -693,7 +695,7 @@ out: */ static int verify_hello(struct ceph_connection *con) { - if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) { + if (memcmp(con->v1.in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) { pr_err("connect to %s got bad banner\n", ceph_pr_addr(&con->peer_addr)); con->error_msg = "protocol error, bad banner"; @@ -716,15 +718,15 @@ static int process_banner(struct ceph_connection *con) * end may not yet know their ip address, so if it's 0.0.0.0, give * them the benefit of the doubt. */ - if (memcmp(&con->peer_addr, &con->actual_peer_addr, + if (memcmp(&con->peer_addr, &con->v1.actual_peer_addr, sizeof(con->peer_addr)) != 0 && - !(ceph_addr_is_blank(&con->actual_peer_addr) && - con->actual_peer_addr.nonce == con->peer_addr.nonce)) { + !(ceph_addr_is_blank(&con->v1.actual_peer_addr) && + con->v1.actual_peer_addr.nonce == con->peer_addr.nonce)) { pr_warn("wrong peer, want %s/%u, got %s/%u\n", ceph_pr_addr(&con->peer_addr), le32_to_cpu(con->peer_addr.nonce), - ceph_pr_addr(&con->actual_peer_addr), - le32_to_cpu(con->actual_peer_addr.nonce)); + ceph_pr_addr(&con->v1.actual_peer_addr), + le32_to_cpu(con->v1.actual_peer_addr.nonce)); con->error_msg = "wrong peer at address"; return -1; } @@ -734,8 +736,8 @@ static int process_banner(struct ceph_connection *con) */ if (ceph_addr_is_blank(my_addr)) { memcpy(&my_addr->in_addr, - &con->peer_addr_for_me.in_addr, - sizeof(con->peer_addr_for_me.in_addr)); + &con->v1.peer_addr_for_me.in_addr, + sizeof(con->v1.peer_addr_for_me.in_addr)); ceph_addr_set_port(my_addr, 0); ceph_encode_my_addr(con->msgr); dout("process_banner learned my addr is %s\n", @@ -749,13 +751,13 @@ static int process_connect(struct ceph_connection *con) { u64 sup_feat = from_msgr(con->msgr)->supported_features; u64 req_feat = from_msgr(con->msgr)->required_features; - u64 server_feat = le64_to_cpu(con->in_reply.features); + u64 server_feat = le64_to_cpu(con->v1.in_reply.features); int ret; - dout("process_connect on %p tag %d\n", con, (int)con->in_tag); + dout("process_connect on %p tag %d\n", con, con->v1.in_tag); - if (con->auth) { - int len = le32_to_cpu(con->in_reply.authorizer_len); + if (con->v1.auth) { + int len = le32_to_cpu(con->v1.in_reply.authorizer_len); /* * Any connection that defines ->get_authorizer() @@ -764,9 +766,10 @@ static int process_connect(struct ceph_connection *con) * * See get_connect_authorizer(). */ - if (con->in_reply.tag == CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) { + if (con->v1.in_reply.tag == + CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) { ret = con->ops->add_authorizer_challenge( - con, con->auth->authorizer_reply_buf, len); + con, con->v1.auth->authorizer_reply_buf, len); if (ret < 0) return ret; @@ -785,7 +788,7 @@ static int process_connect(struct ceph_connection *con) } } - switch (con->in_reply.tag) { + switch (con->v1.in_reply.tag) { case CEPH_MSGR_TAG_FEATURES: pr_err("%s%lld %s feature set mismatch," " my %llx < server's %llx, missing %llx\n", @@ -800,16 +803,16 @@ static int process_connect(struct ceph_connection *con) " my %d != server's %d\n", ENTITY_NAME(con->peer_name), ceph_pr_addr(&con->peer_addr), - le32_to_cpu(con->out_connect.protocol_version), - le32_to_cpu(con->in_reply.protocol_version)); + le32_to_cpu(con->v1.out_connect.protocol_version), + le32_to_cpu(con->v1.in_reply.protocol_version)); con->error_msg = "protocol version mismatch"; return -1; case CEPH_MSGR_TAG_BADAUTHORIZER: - con->auth_retry++; + con->v1.auth_retry++; dout("process_connect %p got BADAUTHORIZER attempt %d\n", con, - con->auth_retry); - if (con->auth_retry == 2) { + con->v1.auth_retry); + if (con->v1.auth_retry == 2) { con->error_msg = "connect authorization failure"; return -1; } @@ -829,7 +832,7 @@ static int process_connect(struct ceph_connection *con) * dropped messages. */ dout("process_connect got RESET peer seq %u\n", - le32_to_cpu(con->in_reply.connect_seq)); + le32_to_cpu(con->v1.in_reply.connect_seq)); pr_info("%s%lld %s session reset\n", ENTITY_NAME(con->peer_name), ceph_pr_addr(&con->peer_addr)); @@ -855,9 +858,9 @@ static int process_connect(struct ceph_connection *con) * again with a larger value. */ dout("process_connect got RETRY_SESSION my seq %u, peer %u\n", - le32_to_cpu(con->out_connect.connect_seq), - le32_to_cpu(con->in_reply.connect_seq)); - con->connect_seq = le32_to_cpu(con->in_reply.connect_seq); + le32_to_cpu(con->v1.out_connect.connect_seq), + le32_to_cpu(con->v1.in_reply.connect_seq)); + con->v1.connect_seq = le32_to_cpu(con->v1.in_reply.connect_seq); con_out_kvec_reset(con); ret = prepare_write_connect(con); if (ret < 0) @@ -871,10 +874,10 @@ static int process_connect(struct ceph_connection *con) * again with a larger value. */ dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n", - con->peer_global_seq, - le32_to_cpu(con->in_reply.global_seq)); + con->v1.peer_global_seq, + le32_to_cpu(con->v1.in_reply.global_seq)); ceph_get_global_seq(con->msgr, - le32_to_cpu(con->in_reply.global_seq)); + le32_to_cpu(con->v1.in_reply.global_seq)); con_out_kvec_reset(con); ret = prepare_write_connect(con); if (ret < 0) @@ -896,23 +899,24 @@ static int process_connect(struct ceph_connection *con) WARN_ON(con->state != CEPH_CON_S_V1_CONNECT_MSG); con->state = CEPH_CON_S_OPEN; - con->auth_retry = 0; /* we authenticated; clear flag */ - con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); - con->connect_seq++; + con->v1.auth_retry = 0; /* we authenticated; clear flag */ + con->v1.peer_global_seq = + le32_to_cpu(con->v1.in_reply.global_seq); + con->v1.connect_seq++; con->peer_features = server_feat; dout("process_connect got READY gseq %d cseq %d (%d)\n", - con->peer_global_seq, - le32_to_cpu(con->in_reply.connect_seq), - con->connect_seq); - WARN_ON(con->connect_seq != - le32_to_cpu(con->in_reply.connect_seq)); + con->v1.peer_global_seq, + le32_to_cpu(con->v1.in_reply.connect_seq), + con->v1.connect_seq); + WARN_ON(con->v1.connect_seq != + le32_to_cpu(con->v1.in_reply.connect_seq)); - if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) + if (con->v1.in_reply.flags & CEPH_MSG_CONNECT_LOSSY) ceph_con_flag_set(con, CEPH_CON_F_LOSSYTX); con->delay = 0; /* reset backoff memory */ - if (con->in_reply.tag == CEPH_MSGR_TAG_SEQ) { + if (con->v1.in_reply.tag == CEPH_MSGR_TAG_SEQ) { prepare_write_seq(con); prepare_read_seq(con); } else { @@ -942,10 +946,10 @@ static int process_connect(struct ceph_connection *con) */ static int read_partial_ack(struct ceph_connection *con) { - int size = sizeof (con->in_temp_ack); + int size = sizeof(con->v1.in_temp_ack); int end = size; - return read_partial(con, end, size, &con->in_temp_ack); + return read_partial(con, end, size, &con->v1.in_temp_ack); } /* @@ -953,9 +957,9 @@ static int read_partial_ack(struct ceph_connection *con) */ static void process_ack(struct ceph_connection *con) { - u64 ack = le64_to_cpu(con->in_temp_ack); + u64 ack = le64_to_cpu(con->v1.in_temp_ack); - if (con->in_tag == CEPH_MSGR_TAG_ACK) + if (con->v1.in_tag == CEPH_MSGR_TAG_ACK) ceph_con_discard_sent(con, ack); else ceph_con_discard_requeued(con, ack); @@ -1045,39 +1049,39 @@ static int read_partial_message(struct ceph_connection *con) dout("read_partial_message con %p msg %p\n", con, m); /* header */ - size = sizeof (con->in_hdr); + size = sizeof(con->v1.in_hdr); end = size; - ret = read_partial(con, end, size, &con->in_hdr); + ret = read_partial(con, end, size, &con->v1.in_hdr); if (ret <= 0) return ret; - crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc)); - if (cpu_to_le32(crc) != con->in_hdr.crc) { + crc = crc32c(0, &con->v1.in_hdr, offsetof(struct ceph_msg_header, crc)); + if (cpu_to_le32(crc) != con->v1.in_hdr.crc) { pr_err("read_partial_message bad hdr crc %u != expected %u\n", - crc, con->in_hdr.crc); + crc, con->v1.in_hdr.crc); return -EBADMSG; } - front_len = le32_to_cpu(con->in_hdr.front_len); + front_len = le32_to_cpu(con->v1.in_hdr.front_len); if (front_len > CEPH_MSG_MAX_FRONT_LEN) return -EIO; - middle_len = le32_to_cpu(con->in_hdr.middle_len); + middle_len = le32_to_cpu(con->v1.in_hdr.middle_len); if (middle_len > CEPH_MSG_MAX_MIDDLE_LEN) return -EIO; - data_len = le32_to_cpu(con->in_hdr.data_len); + data_len = le32_to_cpu(con->v1.in_hdr.data_len); if (data_len > CEPH_MSG_MAX_DATA_LEN) return -EIO; /* verify seq# */ - seq = le64_to_cpu(con->in_hdr.seq); + seq = le64_to_cpu(con->v1.in_hdr.seq); if ((s64)seq - (s64)con->in_seq < 1) { pr_info("skipping %s%lld %s seq %lld expected %lld\n", ENTITY_NAME(con->peer_name), ceph_pr_addr(&con->peer_addr), seq, con->in_seq + 1); - con->in_base_pos = -front_len - middle_len - data_len - - sizeof_footer(con); - con->in_tag = CEPH_MSGR_TAG_READY; + con->v1.in_base_pos = -front_len - middle_len - data_len - + sizeof_footer(con); + con->v1.in_tag = CEPH_MSGR_TAG_READY; return 1; } else if ((s64)seq - (s64)con->in_seq > 1) { pr_err("read_partial_message bad seq %lld expected %lld\n", @@ -1090,9 +1094,9 @@ static int read_partial_message(struct ceph_connection *con) if (!con->in_msg) { int skip = 0; - dout("got hdr type %d front %d data %d\n", con->in_hdr.type, + dout("got hdr type %d front %d data %d\n", con->v1.in_hdr.type, front_len, data_len); - ret = ceph_con_in_msg_alloc(con, &con->in_hdr, &skip); + ret = ceph_con_in_msg_alloc(con, &con->v1.in_hdr, &skip); if (ret < 0) return ret; @@ -1100,9 +1104,9 @@ static int read_partial_message(struct ceph_connection *con) if (skip) { /* skip this message */ dout("alloc_msg said skip message\n"); - con->in_base_pos = -front_len - middle_len - data_len - - sizeof_footer(con); - con->in_tag = CEPH_MSGR_TAG_READY; + con->v1.in_base_pos = -front_len - middle_len - + data_len - sizeof_footer(con); + con->v1.in_tag = CEPH_MSGR_TAG_READY; con->in_seq++; return 1; } @@ -1214,8 +1218,8 @@ more: BUG_ON(!con->sock); - dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, - con->in_base_pos); + dout("try_read tag %d in_base_pos %d\n", con->v1.in_tag, + con->v1.in_base_pos); if (con->state == CEPH_CON_S_V1_BANNER) { ret = read_partial_banner(con); @@ -1253,27 +1257,27 @@ more: WARN_ON(con->state != CEPH_CON_S_OPEN); - if (con->in_base_pos < 0) { + if (con->v1.in_base_pos < 0) { /* * skipping + discarding content. */ - ret = ceph_tcp_recvmsg(con->sock, NULL, -con->in_base_pos); + ret = ceph_tcp_recvmsg(con->sock, NULL, -con->v1.in_base_pos); if (ret <= 0) goto out; - dout("skipped %d / %d bytes\n", ret, -con->in_base_pos); - con->in_base_pos += ret; - if (con->in_base_pos) + dout("skipped %d / %d bytes\n", ret, -con->v1.in_base_pos); + con->v1.in_base_pos += ret; + if (con->v1.in_base_pos) goto more; } - if (con->in_tag == CEPH_MSGR_TAG_READY) { + if (con->v1.in_tag == CEPH_MSGR_TAG_READY) { /* * what's next? */ - ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1); + ret = ceph_tcp_recvmsg(con->sock, &con->v1.in_tag, 1); if (ret <= 0) goto out; - dout("try_read got tag %d\n", (int)con->in_tag); - switch (con->in_tag) { + dout("try_read got tag %d\n", con->v1.in_tag); + switch (con->v1.in_tag) { case CEPH_MSGR_TAG_MSG: prepare_read_message(con); break; @@ -1291,7 +1295,7 @@ more: goto bad_tag; } } - if (con->in_tag == CEPH_MSGR_TAG_MSG) { + if (con->v1.in_tag == CEPH_MSGR_TAG_MSG) { ret = read_partial_message(con); if (ret <= 0) { switch (ret) { @@ -1307,15 +1311,15 @@ more: } goto out; } - if (con->in_tag == CEPH_MSGR_TAG_READY) + if (con->v1.in_tag == CEPH_MSGR_TAG_READY) goto more; ceph_con_process_message(con); if (con->state == CEPH_CON_S_OPEN) prepare_read_tag(con); goto more; } - if (con->in_tag == CEPH_MSGR_TAG_ACK || - con->in_tag == CEPH_MSGR_TAG_SEQ) { + if (con->v1.in_tag == CEPH_MSGR_TAG_ACK || + con->v1.in_tag == CEPH_MSGR_TAG_SEQ) { /* * the final handshake seq exchange is semantically * equivalent to an ACK @@ -1326,7 +1330,7 @@ more: process_ack(con); goto more; } - if (con->in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) { + if (con->v1.in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) { ret = read_keepalive_ack(con); if (ret <= 0) goto out; @@ -1338,7 +1342,7 @@ out: return ret; bad_tag: - pr_err("try_read bad con->in_tag = %d\n", (int)con->in_tag); + pr_err("try_read bad tag %d\n", con->v1.in_tag); con->error_msg = "protocol error, garbage tag"; ret = -1; goto out; @@ -1369,7 +1373,7 @@ int ceph_con_v1_try_write(struct ceph_connection *con) prepare_read_banner(con); BUG_ON(con->in_msg); - con->in_tag = CEPH_MSGR_TAG_READY; + con->v1.in_tag = CEPH_MSGR_TAG_READY; dout("try_write initiating connect on %p new state %d\n", con, con->state); ret = ceph_tcp_connect(con); @@ -1380,16 +1384,16 @@ int ceph_con_v1_try_write(struct ceph_connection *con) } more: - dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); + dout("try_write out_kvec_bytes %d\n", con->v1.out_kvec_bytes); BUG_ON(!con->sock); /* kvec data queued? */ - if (con->out_kvec_left) { + if (con->v1.out_kvec_left) { ret = write_partial_kvec(con); if (ret <= 0) goto out; } - if (con->out_skip) { + if (con->v1.out_skip) { ret = write_partial_skip(con); if (ret <= 0) goto out; @@ -1397,7 +1401,7 @@ more: /* msg pages? */ if (con->out_msg) { - if (con->out_msg_done) { + if (con->v1.out_msg_done) { ceph_msg_put(con->out_msg); con->out_msg = NULL; /* we're done with this one */ goto do_next; @@ -1446,57 +1450,57 @@ void ceph_con_v1_revoke(struct ceph_connection *con) { struct ceph_msg *msg = con->out_msg; - WARN_ON(con->out_skip); + WARN_ON(con->v1.out_skip); /* footer */ - if (con->out_msg_done) { - con->out_skip += con_out_kvec_skip(con); + if (con->v1.out_msg_done) { + con->v1.out_skip += con_out_kvec_skip(con); } else { WARN_ON(!msg->data_length); - con->out_skip += sizeof_footer(con); + con->v1.out_skip += sizeof_footer(con); } /* data, middle, front */ if (msg->data_length) - con->out_skip += msg->cursor.total_resid; + con->v1.out_skip += msg->cursor.total_resid; if (msg->middle) - con->out_skip += con_out_kvec_skip(con); - con->out_skip += con_out_kvec_skip(con); + con->v1.out_skip += con_out_kvec_skip(con); + con->v1.out_skip += con_out_kvec_skip(con); dout("%s con %p out_kvec_bytes %d out_skip %d\n", __func__, con, - con->out_kvec_bytes, con->out_skip); + con->v1.out_kvec_bytes, con->v1.out_skip); } void ceph_con_v1_revoke_incoming(struct ceph_connection *con) { - unsigned int front_len = le32_to_cpu(con->in_hdr.front_len); - unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len); - unsigned int data_len = le32_to_cpu(con->in_hdr.data_len); + unsigned int front_len = le32_to_cpu(con->v1.in_hdr.front_len); + unsigned int middle_len = le32_to_cpu(con->v1.in_hdr.middle_len); + unsigned int data_len = le32_to_cpu(con->v1.in_hdr.data_len); /* skip rest of message */ - con->in_base_pos = con->in_base_pos - + con->v1.in_base_pos = con->v1.in_base_pos - sizeof(struct ceph_msg_header) - front_len - middle_len - data_len - sizeof(struct ceph_msg_footer); - con->in_tag = CEPH_MSGR_TAG_READY; + con->v1.in_tag = CEPH_MSGR_TAG_READY; con->in_seq++; - dout("%s con %p in_base_pos %d\n", __func__, con, con->in_base_pos); + dout("%s con %p in_base_pos %d\n", __func__, con, con->v1.in_base_pos); } bool ceph_con_v1_opened(struct ceph_connection *con) { - return con->connect_seq; + return con->v1.connect_seq; } void ceph_con_v1_reset_session(struct ceph_connection *con) { - con->connect_seq = 0; - con->peer_global_seq = 0; + con->v1.connect_seq = 0; + con->v1.peer_global_seq = 0; } void ceph_con_v1_reset_protocol(struct ceph_connection *con) { - con->out_skip = 0; + con->v1.out_skip = 0; } |