summaryrefslogtreecommitdiffstats
path: root/net/ceph/osd_client.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/ceph/osd_client.c')
-rw-r--r--net/ceph/osd_client.c355
1 files changed, 178 insertions, 177 deletions
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 013101598c41..8a008f083283 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -34,8 +34,6 @@ static void __unregister_request(struct ceph_osd_client *osdc,
static void __unregister_linger_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req);
static void __enqueue_request(struct ceph_osd_request *req);
-static void __send_request(struct ceph_osd_client *osdc,
- struct ceph_osd_request *req);
/*
* Implement client access to distributed object storage cluster.
@@ -209,6 +207,8 @@ void osd_req_op_cls_request_data_pagelist(
osd_data = osd_req_op_data(osd_req, which, cls, request_data);
ceph_osd_data_pagelist_init(osd_data, pagelist);
+ osd_req->r_ops[which].cls.indata_len += pagelist->length;
+ osd_req->r_ops[which].indata_len += pagelist->length;
}
EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist);
@@ -221,6 +221,8 @@ void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req,
osd_data = osd_req_op_data(osd_req, which, cls, request_data);
ceph_osd_data_pages_init(osd_data, pages, length, alignment,
pages_from_pool, own_pages);
+ osd_req->r_ops[which].cls.indata_len += length;
+ osd_req->r_ops[which].indata_len += length;
}
EXPORT_SYMBOL(osd_req_op_cls_request_data_pages);
@@ -610,8 +612,6 @@ void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist);
- op->cls.argc = 0; /* currently unused */
-
op->indata_len = payload_len;
}
EXPORT_SYMBOL(osd_req_op_cls_init);
@@ -709,16 +709,9 @@ static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
}
}
-static u64 osd_req_encode_op(struct ceph_osd_request *req,
- struct ceph_osd_op *dst, unsigned int which)
+static u32 osd_req_encode_op(struct ceph_osd_op *dst,
+ const struct ceph_osd_req_op *src)
{
- struct ceph_osd_req_op *src;
- struct ceph_osd_data *osd_data;
- u64 request_data_len = 0;
- u64 data_length;
-
- BUG_ON(which >= req->r_num_ops);
- src = &req->r_ops[which];
if (WARN_ON(!osd_req_opcode_valid(src->op))) {
pr_err("unrecognized osd opcode %d\n", src->op);
@@ -727,49 +720,23 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
switch (src->op) {
case CEPH_OSD_OP_STAT:
- osd_data = &src->raw_data_in;
- ceph_osdc_msg_data_add(req->r_reply, osd_data);
break;
case CEPH_OSD_OP_READ:
case CEPH_OSD_OP_WRITE:
case CEPH_OSD_OP_WRITEFULL:
case CEPH_OSD_OP_ZERO:
case CEPH_OSD_OP_TRUNCATE:
- if (src->op == CEPH_OSD_OP_WRITE ||
- src->op == CEPH_OSD_OP_WRITEFULL)
- request_data_len = src->extent.length;
dst->extent.offset = cpu_to_le64(src->extent.offset);
dst->extent.length = cpu_to_le64(src->extent.length);
dst->extent.truncate_size =
cpu_to_le64(src->extent.truncate_size);
dst->extent.truncate_seq =
cpu_to_le32(src->extent.truncate_seq);
- osd_data = &src->extent.osd_data;
- if (src->op == CEPH_OSD_OP_WRITE ||
- src->op == CEPH_OSD_OP_WRITEFULL)
- ceph_osdc_msg_data_add(req->r_request, osd_data);
- else
- ceph_osdc_msg_data_add(req->r_reply, osd_data);
break;
case CEPH_OSD_OP_CALL:
dst->cls.class_len = src->cls.class_len;
dst->cls.method_len = src->cls.method_len;
- osd_data = &src->cls.request_info;
- ceph_osdc_msg_data_add(req->r_request, osd_data);
- BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGELIST);
- request_data_len = osd_data->pagelist->length;
-
- osd_data = &src->cls.request_data;
- data_length = ceph_osd_data_length(osd_data);
- if (data_length) {
- BUG_ON(osd_data->type == CEPH_OSD_DATA_TYPE_NONE);
- dst->cls.indata_len = cpu_to_le32(data_length);
- ceph_osdc_msg_data_add(req->r_request, osd_data);
- src->indata_len += data_length;
- request_data_len += data_length;
- }
- osd_data = &src->cls.response_data;
- ceph_osdc_msg_data_add(req->r_reply, osd_data);
+ dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
break;
case CEPH_OSD_OP_STARTSYNC:
break;
@@ -791,9 +758,6 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
dst->xattr.cmp_op = src->xattr.cmp_op;
dst->xattr.cmp_mode = src->xattr.cmp_mode;
- osd_data = &src->xattr.osd_data;
- ceph_osdc_msg_data_add(req->r_request, osd_data);
- request_data_len = osd_data->pagelist->length;
break;
case CEPH_OSD_OP_CREATE:
case CEPH_OSD_OP_DELETE:
@@ -810,7 +774,7 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
dst->flags = cpu_to_le32(src->flags);
dst->payload_len = cpu_to_le32(src->indata_len);
- return request_data_len;
+ return src->indata_len;
}
/*
@@ -852,8 +816,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
goto fail;
}
- req->r_flags = flags;
-
/* calculate max write size */
r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen);
if (r)
@@ -877,9 +839,14 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
truncate_size, truncate_seq);
}
+ req->r_flags = flags;
req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout);
ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum);
+ req->r_snapid = vino.snap;
+ if (flags & CEPH_OSD_FLAG_WRITE)
+ req->r_data_offset = off;
+
r = ceph_osdc_alloc_messages(req, GFP_NOFS);
if (r)
goto fail;
@@ -1509,37 +1476,173 @@ out:
return err;
}
-/*
- * caller should hold map_sem (for read) and request_mutex
- */
-static void __send_request(struct ceph_osd_client *osdc,
- struct ceph_osd_request *req)
+static void setup_request_data(struct ceph_osd_request *req,
+ struct ceph_msg *msg)
{
- void *p;
+ u32 data_len = 0;
+ int i;
+
+ if (!list_empty(&msg->data))
+ return;
- dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n",
- req, req->r_tid, req->r_osd->o_osd, req->r_flags,
- req->r_t.pgid.pool, req->r_t.pgid.seed);
+ WARN_ON(msg->data_length);
+ for (i = 0; i < req->r_num_ops; i++) {
+ struct ceph_osd_req_op *op = &req->r_ops[i];
+
+ switch (op->op) {
+ /* request */
+ case CEPH_OSD_OP_WRITE:
+ case CEPH_OSD_OP_WRITEFULL:
+ WARN_ON(op->indata_len != op->extent.length);
+ ceph_osdc_msg_data_add(msg, &op->extent.osd_data);
+ break;
+ case CEPH_OSD_OP_SETXATTR:
+ case CEPH_OSD_OP_CMPXATTR:
+ WARN_ON(op->indata_len != op->xattr.name_len +
+ op->xattr.value_len);
+ ceph_osdc_msg_data_add(msg, &op->xattr.osd_data);
+ break;
+
+ /* reply */
+ case CEPH_OSD_OP_STAT:
+ ceph_osdc_msg_data_add(req->r_reply,
+ &op->raw_data_in);
+ break;
+ case CEPH_OSD_OP_READ:
+ ceph_osdc_msg_data_add(req->r_reply,
+ &op->extent.osd_data);
+ break;
+
+ /* both */
+ case CEPH_OSD_OP_CALL:
+ WARN_ON(op->indata_len != op->cls.class_len +
+ op->cls.method_len +
+ op->cls.indata_len);
+ ceph_osdc_msg_data_add(msg, &op->cls.request_info);
+ /* optional, can be NONE */
+ ceph_osdc_msg_data_add(msg, &op->cls.request_data);
+ /* optional, can be NONE */
+ ceph_osdc_msg_data_add(req->r_reply,
+ &op->cls.response_data);
+ break;
+ }
+
+ data_len += op->indata_len;
+ }
- /* fill in message content that changes each time we send it */
- put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch);
- put_unaligned_le32(req->r_flags, req->r_request_flags);
- put_unaligned_le64(req->r_t.target_oloc.pool, req->r_request_pool);
- p = req->r_request_pgid;
+ WARN_ON(data_len != msg->data_length);
+}
+
+static void encode_request(struct ceph_osd_request *req, struct ceph_msg *msg)
+{
+ void *p = msg->front.iov_base;
+ void *const end = p + msg->front_alloc_len;
+ u32 data_len = 0;
+ int i;
+
+ if (req->r_flags & CEPH_OSD_FLAG_WRITE) {
+ /* snapshots aren't writeable */
+ WARN_ON(req->r_snapid != CEPH_NOSNAP);
+ } else {
+ WARN_ON(req->r_mtime.tv_sec || req->r_mtime.tv_nsec ||
+ req->r_data_offset || req->r_snapc);
+ }
+
+ setup_request_data(req, msg);
+
+ ceph_encode_32(&p, 1); /* client_inc, always 1 */
+ ceph_encode_32(&p, req->r_osdc->osdmap->epoch);
+ ceph_encode_32(&p, req->r_flags);
+ ceph_encode_timespec(p, &req->r_mtime);
+ p += sizeof(struct ceph_timespec);
+ /* aka reassert_version */
+ memcpy(p, &req->r_replay_version, sizeof(req->r_replay_version));
+ p += sizeof(req->r_replay_version);
+
+ /* oloc */
+ ceph_encode_8(&p, 4);
+ ceph_encode_8(&p, 4);
+ ceph_encode_32(&p, 8 + 4 + 4);
+ ceph_encode_64(&p, req->r_t.target_oloc.pool);
+ ceph_encode_32(&p, -1); /* preferred */
+ ceph_encode_32(&p, 0); /* key len */
+
+ /* pgid */
+ ceph_encode_8(&p, 1);
ceph_encode_64(&p, req->r_t.pgid.pool);
ceph_encode_32(&p, req->r_t.pgid.seed);
- put_unaligned_le64(1, req->r_request_attempts); /* FIXME */
- memcpy(req->r_request_reassert_version, &req->r_reassert_version,
- sizeof(req->r_reassert_version));
+ ceph_encode_32(&p, -1); /* preferred */
- req->r_stamp = jiffies;
- list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
+ /* oid */
+ ceph_encode_32(&p, req->r_t.target_oid.name_len);
+ memcpy(p, req->r_t.target_oid.name, req->r_t.target_oid.name_len);
+ p += req->r_t.target_oid.name_len;
- ceph_msg_get(req->r_request); /* send consumes a ref */
+ /* ops, can imply data */
+ ceph_encode_16(&p, req->r_num_ops);
+ for (i = 0; i < req->r_num_ops; i++) {
+ data_len += osd_req_encode_op(p, &req->r_ops[i]);
+ p += sizeof(struct ceph_osd_op);
+ }
- req->r_sent = req->r_osd->o_incarnation;
+ ceph_encode_64(&p, req->r_snapid); /* snapid */
+ if (req->r_snapc) {
+ ceph_encode_64(&p, req->r_snapc->seq);
+ ceph_encode_32(&p, req->r_snapc->num_snaps);
+ for (i = 0; i < req->r_snapc->num_snaps; i++)
+ ceph_encode_64(&p, req->r_snapc->snaps[i]);
+ } else {
+ ceph_encode_64(&p, 0); /* snap_seq */
+ ceph_encode_32(&p, 0); /* snaps len */
+ }
+
+ ceph_encode_32(&p, req->r_attempts); /* retry_attempt */
+
+ BUG_ON(p > end);
+ msg->front.iov_len = p - msg->front.iov_base;
+ msg->hdr.version = cpu_to_le16(4); /* MOSDOp v4 */
+ msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
+ msg->hdr.data_len = cpu_to_le32(data_len);
+ /*
+ * The header "data_off" is a hint to the receiver allowing it
+ * to align received data into its buffers such that there's no
+ * need to re-copy it before writing it to disk (direct I/O).
+ */
+ msg->hdr.data_off = cpu_to_le16(req->r_data_offset);
- ceph_con_send(&req->r_osd->o_con, req->r_request);
+ dout("%s req %p oid %*pE oid_len %d front %zu data %u\n", __func__,
+ req, req->r_t.target_oid.name_len, req->r_t.target_oid.name,
+ req->r_t.target_oid.name_len, msg->front.iov_len, data_len);
+}
+
+/*
+ * @req has to be assigned a tid and registered.
+ */
+static void send_request(struct ceph_osd_request *req)
+{
+ struct ceph_osd *osd = req->r_osd;
+
+ WARN_ON(osd->o_osd != req->r_t.osd);
+
+ req->r_flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
+ if (req->r_attempts)
+ req->r_flags |= CEPH_OSD_FLAG_RETRY;
+ else
+ WARN_ON(req->r_flags & CEPH_OSD_FLAG_RETRY);
+
+ encode_request(req, req->r_request);
+
+ dout("%s req %p tid %llu to pg %llu.%x osd%d flags 0x%x attempt %d\n",
+ __func__, req, req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed,
+ req->r_t.osd, req->r_flags, req->r_attempts);
+
+ req->r_t.paused = false;
+ req->r_stamp = jiffies;
+ req->r_attempts++;
+
+ req->r_sent = osd->o_incarnation;
+ req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
+ ceph_con_send(&osd->o_con, ceph_msg_get(req->r_request));
}
/*
@@ -1550,8 +1653,10 @@ static void __send_queued(struct ceph_osd_client *osdc)
struct ceph_osd_request *req, *tmp;
dout("__send_queued\n");
- list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item)
- __send_request(osdc, req);
+ list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) {
+ list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
+ send_request(req);
+ }
}
/*
@@ -1915,8 +2020,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
req->r_result = bytes;
/* in case this is a write and we need to replay, */
- req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch);
- req->r_reassert_version.version = cpu_to_le64(reassert_version);
+ req->r_replay_version.epoch = cpu_to_le32(reassert_epoch);
+ req->r_replay_version.version = cpu_to_le64(reassert_version);
req->r_got_reply = 1;
} else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
@@ -2433,105 +2538,6 @@ bad:
}
/*
- * build new request AND message
- *
- */
-void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off,
- struct ceph_snap_context *snapc, u64 snap_id,
- struct timespec *mtime)
-{
- struct ceph_msg *msg = req->r_request;
- void *p;
- size_t msg_size;
- int flags = req->r_flags;
- u64 data_len;
- unsigned int i;
-
- req->r_snapid = snap_id;
- WARN_ON(snapc != req->r_snapc);
-
- /* encode request */
- msg->hdr.version = cpu_to_le16(4);
-
- p = msg->front.iov_base;
- ceph_encode_32(&p, 1); /* client_inc is always 1 */
- req->r_request_osdmap_epoch = p;
- p += 4;
- req->r_request_flags = p;
- p += 4;
- if (req->r_flags & CEPH_OSD_FLAG_WRITE)
- ceph_encode_timespec(p, mtime);
- p += sizeof(struct ceph_timespec);
- req->r_request_reassert_version = p;
- p += sizeof(struct ceph_eversion); /* will get filled in */
-
- /* oloc */
- ceph_encode_8(&p, 4);
- ceph_encode_8(&p, 4);
- ceph_encode_32(&p, 8 + 4 + 4);
- req->r_request_pool = p;
- p += 8;
- ceph_encode_32(&p, -1); /* preferred */
- ceph_encode_32(&p, 0); /* key len */
-
- ceph_encode_8(&p, 1);
- req->r_request_pgid = p;
- p += 8 + 4;
- ceph_encode_32(&p, -1); /* preferred */
-
- /* oid */
- ceph_encode_32(&p, req->r_base_oid.name_len);
- memcpy(p, req->r_base_oid.name, req->r_base_oid.name_len);
- dout("oid %*pE len %d\n", req->r_base_oid.name_len,
- req->r_base_oid.name, req->r_base_oid.name_len);
- p += req->r_base_oid.name_len;
-
- /* ops--can imply data */
- ceph_encode_16(&p, (u16)req->r_num_ops);
- data_len = 0;
- for (i = 0; i < req->r_num_ops; i++) {
- data_len += osd_req_encode_op(req, p, i);
- p += sizeof(struct ceph_osd_op);
- }
-
- /* snaps */
- ceph_encode_64(&p, req->r_snapid);
- ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0);
- ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0);
- if (req->r_snapc) {
- for (i = 0; i < req->r_snapc->num_snaps; i++) {
- ceph_encode_64(&p, req->r_snapc->snaps[i]);
- }
- }
-
- req->r_request_attempts = p;
- p += 4;
-
- /* data */
- if (flags & CEPH_OSD_FLAG_WRITE) {
- u16 data_off;
-
- /*
- * The header "data_off" is a hint to the receiver
- * allowing it to align received data into its
- * buffers such that there's no need to re-copy
- * it before writing it to disk (direct I/O).
- */
- data_off = (u16) (off & 0xffff);
- req->r_request->hdr.data_off = cpu_to_le16(data_off);
- }
- req->r_request->hdr.data_len = cpu_to_le32(data_len);
-
- BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
- msg_size = p - msg->front.iov_base;
- msg->front.iov_len = msg_size;
- msg->hdr.front_len = cpu_to_le32(msg_size);
-
- dout("build_request msg_size was %d\n", (int)msg_size);
-}
-EXPORT_SYMBOL(ceph_osdc_build_request);
-
-/*
* Register request, send initial attempt.
*/
int ceph_osdc_start_request(struct ceph_osd_client *osdc,
@@ -2749,15 +2755,12 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
return PTR_ERR(req);
/* it may be a short read due to an object boundary */
-
osd_req_op_extent_osd_data_pages(req, 0,
pages, *plen, page_align, false, false);
dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n",
off, *plen, *plen, page_align);
- ceph_osdc_build_request(req, off, NULL, vino.snap, NULL);
-
rc = ceph_osdc_start_request(osdc, req, false);
if (!rc)
rc = ceph_osdc_wait_request(osdc, req);
@@ -2783,7 +2786,6 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
int rc = 0;
int page_align = off & ~PAGE_MASK;
- BUG_ON(vino.snap != CEPH_NOSNAP); /* snapshots aren't writeable */
req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1,
CEPH_OSD_OP_WRITE,
CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
@@ -2797,8 +2799,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
false, false);
dout("writepages %llu~%llu (%llu bytes)\n", off, len, len);
- ceph_osdc_build_request(req, off, snapc, CEPH_NOSNAP, mtime);
-
+ req->r_mtime = *mtime;
rc = ceph_osdc_start_request(osdc, req, true);
if (!rc)
rc = ceph_osdc_wait_request(osdc, req);