summaryrefslogtreecommitdiffstats
path: root/net/ceph/messenger.c
diff options
context:
space:
mode:
authorAlex Elder <elder@inktank.com>2013-03-12 05:34:23 +0100
committerSage Weil <sage@inktank.com>2013-05-02 06:17:30 +0200
commit4c59b4a278f9b7a418ad8af933fd7b341df64393 (patch)
tree8f42d880e2ed3fb97c64ea293b42ff08f891537a /net/ceph/messenger.c
parentlibceph: get rid of read helpers (diff)
downloadlinux-4c59b4a278f9b7a418ad8af933fd7b341df64393.tar.xz
linux-4c59b4a278f9b7a418ad8af933fd7b341df64393.zip
libceph: collapse all data items into one
It turns out that only one of the data item types is ever used at any one time in a single message (currently). - A page array is used by the osd client (on behalf of the file system) and by rbd. Only one osd op (and therefore at most one data item) is ever used at a time by rbd. And the only time the file system sends two, the second op contains no data. - A bio is only used by the rbd client (and again, only one data item per message) - A page list is used by the file system and by rbd for outgoing data, but only one op (and one data item) at a time. We can therefore collapse all three of our data item fields into a single field "data", and depend on the messenger code to properly handle it based on its type. This allows us to eliminate quite a bit of duplicated code. This is related to: http://tracker.ceph.com/issues/4429 Signed-off-by: Alex Elder <elder@inktank.com> Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
Diffstat (limited to 'net/ceph/messenger.c')
-rw-r--r--net/ceph/messenger.c123
1 files changed, 38 insertions, 85 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index a19ba00ce777..6b5b5c625547 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -1085,22 +1085,15 @@ static void prepare_message_data(struct ceph_msg *msg,
/* initialize page iterator */
msg_pos->page = 0;
- if (ceph_msg_has_pages(msg))
- msg_pos->page_pos = msg->p.alignment;
+ if (ceph_msg_has_data(msg))
+ msg_pos->page_pos = msg->data.alignment;
else
msg_pos->page_pos = 0;
msg_pos->data_pos = 0;
- /* Initialize data cursors */
+ /* Initialize data cursor */
-#ifdef CONFIG_BLOCK
- if (ceph_msg_has_bio(msg))
- ceph_msg_data_cursor_init(&msg->b, data_len);
-#endif /* CONFIG_BLOCK */
- if (ceph_msg_has_pages(msg))
- ceph_msg_data_cursor_init(&msg->p, data_len);
- if (ceph_msg_has_pagelist(msg))
- ceph_msg_data_cursor_init(&msg->l, data_len);
+ ceph_msg_data_cursor_init(&msg->data, data_len);
msg_pos->did_page_crc = false;
}
@@ -1166,10 +1159,10 @@ static void prepare_write_message(struct ceph_connection *con)
m->needs_out_seq = false;
}
- dout("prepare_write_message %p seq %lld type %d len %d+%d+%d (%zd)\n",
+ dout("prepare_write_message %p seq %lld type %d len %d+%d+%d\n",
m, con->out_seq, le16_to_cpu(m->hdr.type),
le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
- le32_to_cpu(m->hdr.data_len), m->p.length);
+ le32_to_cpu(m->hdr.data_len));
BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
/* tag + hdr + front + middle */
@@ -1411,14 +1404,7 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
msg_pos->data_pos += sent;
msg_pos->page_pos += sent;
- if (ceph_msg_has_pages(msg))
- need_crc = ceph_msg_data_advance(&msg->p, sent);
- else if (ceph_msg_has_pagelist(msg))
- need_crc = ceph_msg_data_advance(&msg->l, sent);
-#ifdef CONFIG_BLOCK
- else if (ceph_msg_has_bio(msg))
- need_crc = ceph_msg_data_advance(&msg->b, sent);
-#endif /* CONFIG_BLOCK */
+ need_crc = ceph_msg_data_advance(&msg->data, sent);
BUG_ON(need_crc && sent != len);
if (sent < len)
@@ -1441,12 +1427,8 @@ static void in_msg_pos_next(struct ceph_connection *con, size_t len,
msg_pos->data_pos += received;
msg_pos->page_pos += received;
- if (ceph_msg_has_pages(msg))
- (void) ceph_msg_data_advance(&msg->p, received);
-#ifdef CONFIG_BLOCK
- else if (ceph_msg_has_bio(msg))
- (void) ceph_msg_data_advance(&msg->b, received);
-#endif /* CONFIG_BLOCK */
+ (void) ceph_msg_data_advance(&msg->data, received);
+
if (received < len)
return;
@@ -1486,6 +1468,9 @@ static int write_partial_message_data(struct ceph_connection *con)
dout("%s %p msg %p page %d offset %d\n", __func__,
con, msg, msg_pos->page, msg_pos->page_pos);
+ if (WARN_ON(!ceph_msg_has_data(msg)))
+ return -EINVAL;
+
/*
* Iterate through each page that contains data to be
* written, and send as much as possible for each.
@@ -1500,23 +1485,8 @@ static int write_partial_message_data(struct ceph_connection *con)
size_t length;
bool last_piece;
- if (ceph_msg_has_pages(msg)) {
- page = ceph_msg_data_next(&msg->p, &page_offset,
- &length, &last_piece);
- } else if (ceph_msg_has_pagelist(msg)) {
- page = ceph_msg_data_next(&msg->l, &page_offset,
- &length, &last_piece);
-#ifdef CONFIG_BLOCK
- } else if (ceph_msg_has_bio(msg)) {
- page = ceph_msg_data_next(&msg->b, &page_offset,
- &length, &last_piece);
-#endif
- } else {
- WARN(1, "con %p data_len %u but no outbound data\n",
- con, data_len);
- ret = -EINVAL;
- goto out;
- }
+ page = ceph_msg_data_next(&msg->data, &page_offset, &length,
+ &last_piece);
if (do_datacrc && !msg_pos->did_page_crc) {
u32 crc = le32_to_cpu(msg->footer.data_crc);
@@ -2197,20 +2167,13 @@ static int read_partial_msg_data(struct ceph_connection *con)
int ret;
BUG_ON(!msg);
+ if (WARN_ON(!ceph_msg_has_data(msg)))
+ return -EIO;
data_len = le32_to_cpu(con->in_hdr.data_len);
while (msg_pos->data_pos < data_len) {
- if (ceph_msg_has_pages(msg)) {
- page = ceph_msg_data_next(&msg->p, &page_offset,
- &length, NULL);
-#ifdef CONFIG_BLOCK
- } else if (ceph_msg_has_bio(msg)) {
- page = ceph_msg_data_next(&msg->b, &page_offset,
- &length, NULL);
-#endif
- } else {
- BUG_ON(1);
- }
+ page = ceph_msg_data_next(&msg->data, &page_offset, &length,
+ NULL);
ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
if (ret <= 0)
return ret;
@@ -2218,7 +2181,6 @@ static int read_partial_msg_data(struct ceph_connection *con)
if (do_datacrc)
con->in_data_crc = ceph_crc32c_page(con->in_data_crc,
page, page_offset, ret);
-
in_msg_pos_next(con, length, ret);
}
@@ -3043,12 +3005,12 @@ void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages,
{
BUG_ON(!pages);
BUG_ON(!length);
- BUG_ON(msg->p.type != CEPH_MSG_DATA_NONE);
+ BUG_ON(msg->data.type != CEPH_MSG_DATA_NONE);
- msg->p.type = CEPH_MSG_DATA_PAGES;
- msg->p.pages = pages;
- msg->p.length = length;
- msg->p.alignment = alignment & ~PAGE_MASK;
+ msg->data.type = CEPH_MSG_DATA_PAGES;
+ msg->data.pages = pages;
+ msg->data.length = length;
+ msg->data.alignment = alignment & ~PAGE_MASK;
}
EXPORT_SYMBOL(ceph_msg_data_set_pages);
@@ -3057,20 +3019,20 @@ void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
{
BUG_ON(!pagelist);
BUG_ON(!pagelist->length);
- BUG_ON(msg->l.type != CEPH_MSG_DATA_NONE);
+ BUG_ON(msg->data.type != CEPH_MSG_DATA_NONE);
- msg->l.type = CEPH_MSG_DATA_PAGELIST;
- msg->l.pagelist = pagelist;
+ msg->data.type = CEPH_MSG_DATA_PAGELIST;
+ msg->data.pagelist = pagelist;
}
EXPORT_SYMBOL(ceph_msg_data_set_pagelist);
void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio)
{
BUG_ON(!bio);
- BUG_ON(msg->b.type != CEPH_MSG_DATA_NONE);
+ BUG_ON(msg->data.type != CEPH_MSG_DATA_NONE);
- msg->b.type = CEPH_MSG_DATA_BIO;
- msg->b.bio = bio;
+ msg->data.type = CEPH_MSG_DATA_BIO;
+ msg->data.bio = bio;
}
EXPORT_SYMBOL(ceph_msg_data_set_bio);
@@ -3094,9 +3056,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
INIT_LIST_HEAD(&m->list_head);
kref_init(&m->kref);
- ceph_msg_data_init(&m->p);
- ceph_msg_data_init(&m->l);
- ceph_msg_data_init(&m->b);
+ ceph_msg_data_init(&m->data);
/* front */
m->front_max = front_len;
@@ -3251,20 +3211,13 @@ void ceph_msg_last_put(struct kref *kref)
ceph_buffer_put(m->middle);
m->middle = NULL;
}
- if (ceph_msg_has_pages(m)) {
- m->p.length = 0;
- m->p.pages = NULL;
- m->p.type = CEPH_OSD_DATA_TYPE_NONE;
- }
- if (ceph_msg_has_pagelist(m)) {
- ceph_pagelist_release(m->l.pagelist);
- kfree(m->l.pagelist);
- m->l.pagelist = NULL;
- m->l.type = CEPH_OSD_DATA_TYPE_NONE;
- }
- if (ceph_msg_has_bio(m)) {
- m->b.bio = NULL;
- m->b.type = CEPH_OSD_DATA_TYPE_NONE;
+ if (ceph_msg_has_data(m)) {
+ if (m->data.type == CEPH_MSG_DATA_PAGELIST) {
+ ceph_pagelist_release(m->data.pagelist);
+ kfree(m->data.pagelist);
+ }
+ memset(&m->data, 0, sizeof m->data);
+ ceph_msg_data_init(&m->data);
}
if (m->pool)
@@ -3277,7 +3230,7 @@ EXPORT_SYMBOL(ceph_msg_last_put);
void ceph_msg_dump(struct ceph_msg *msg)
{
pr_debug("msg_dump %p (front_max %d length %zd)\n", msg,
- msg->front_max, msg->p.length);
+ msg->front_max, msg->data.length);
print_hex_dump(KERN_DEBUG, "header: ",
DUMP_PREFIX_OFFSET, 16, 1,
&msg->hdr, sizeof(msg->hdr), true);