summaryrefslogtreecommitdiffstats
path: root/net/ceph
diff options
context:
space:
mode:
Diffstat (limited to 'net/ceph')
-rw-r--r--net/ceph/Kconfig4
-rw-r--r--net/ceph/Makefile2
-rw-r--r--net/ceph/auth.c117
-rw-r--r--net/ceph/auth_x.c24
-rw-r--r--net/ceph/auth_x.h1
-rw-r--r--net/ceph/ceph_common.c34
-rw-r--r--net/ceph/ceph_strings.c39
-rw-r--r--net/ceph/crush/mapper.c15
-rw-r--r--net/ceph/crypto.c7
-rw-r--r--net/ceph/debugfs.c31
-rw-r--r--net/ceph/messenger.c1275
-rw-r--r--net/ceph/mon_client.c9
-rw-r--r--net/ceph/osd_client.c1414
-rw-r--r--net/ceph/osdmap.c305
-rw-r--r--net/ceph/pagevec.c24
-rw-r--r--net/ceph/snapshot.c78
16 files changed, 2257 insertions, 1122 deletions
diff --git a/net/ceph/Kconfig b/net/ceph/Kconfig
index cc04dd667a10..e50cc69ae8ca 100644
--- a/net/ceph/Kconfig
+++ b/net/ceph/Kconfig
@@ -1,6 +1,6 @@
config CEPH_LIB
- tristate "Ceph core library (EXPERIMENTAL)"
- depends on INET && EXPERIMENTAL
+ tristate "Ceph core library"
+ depends on INET
select LIBCRC32C
select CRYPTO_AES
select CRYPTO
diff --git a/net/ceph/Makefile b/net/ceph/Makefile
index e87ef435e11b..958d9856912c 100644
--- a/net/ceph/Makefile
+++ b/net/ceph/Makefile
@@ -11,5 +11,5 @@ libceph-y := ceph_common.o messenger.o msgpool.o buffer.o pagelist.o \
crypto.o armor.o \
auth_x.o \
ceph_fs.o ceph_strings.o ceph_hash.o \
- pagevec.o
+ pagevec.o snapshot.o
diff --git a/net/ceph/auth.c b/net/ceph/auth.c
index b4bf4ac090f1..6b923bcaa2a4 100644
--- a/net/ceph/auth.c
+++ b/net/ceph/auth.c
@@ -47,6 +47,7 @@ struct ceph_auth_client *ceph_auth_init(const char *name, const struct ceph_cryp
if (!ac)
goto out;
+ mutex_init(&ac->mutex);
ac->negotiating = true;
if (name)
ac->name = name;
@@ -73,10 +74,12 @@ void ceph_auth_destroy(struct ceph_auth_client *ac)
*/
void ceph_auth_reset(struct ceph_auth_client *ac)
{
+ mutex_lock(&ac->mutex);
dout("auth_reset %p\n", ac);
if (ac->ops && !ac->negotiating)
ac->ops->reset(ac);
ac->negotiating = true;
+ mutex_unlock(&ac->mutex);
}
int ceph_entity_name_encode(const char *name, void **p, void *end)
@@ -102,6 +105,7 @@ int ceph_auth_build_hello(struct ceph_auth_client *ac, void *buf, size_t len)
int i, num;
int ret;
+ mutex_lock(&ac->mutex);
dout("auth_build_hello\n");
monhdr->have_version = 0;
monhdr->session_mon = cpu_to_le16(-1);
@@ -122,15 +126,19 @@ int ceph_auth_build_hello(struct ceph_auth_client *ac, void *buf, size_t len)
ret = ceph_entity_name_encode(ac->name, &p, end);
if (ret < 0)
- return ret;
+ goto out;
ceph_decode_need(&p, end, sizeof(u64), bad);
ceph_encode_64(&p, ac->global_id);
ceph_encode_32(&lenp, p - lenp - sizeof(u32));
- return p - buf;
+ ret = p - buf;
+out:
+ mutex_unlock(&ac->mutex);
+ return ret;
bad:
- return -ERANGE;
+ ret = -ERANGE;
+ goto out;
}
static int ceph_build_auth_request(struct ceph_auth_client *ac,
@@ -151,11 +159,13 @@ static int ceph_build_auth_request(struct ceph_auth_client *ac,
if (ret < 0) {
pr_err("error %d building auth method %s request\n", ret,
ac->ops->name);
- return ret;
+ goto out;
}
dout(" built request %d bytes\n", ret);
ceph_encode_32(&p, ret);
- return p + ret - msg_buf;
+ ret = p + ret - msg_buf;
+out:
+ return ret;
}
/*
@@ -176,6 +186,7 @@ int ceph_handle_auth_reply(struct ceph_auth_client *ac,
int result_msg_len;
int ret = -EINVAL;
+ mutex_lock(&ac->mutex);
dout("handle_auth_reply %p %p\n", p, end);
ceph_decode_need(&p, end, sizeof(u32) * 3 + sizeof(u64), bad);
protocol = ceph_decode_32(&p);
@@ -227,33 +238,103 @@ int ceph_handle_auth_reply(struct ceph_auth_client *ac,
ret = ac->ops->handle_reply(ac, result, payload, payload_end);
if (ret == -EAGAIN) {
- return ceph_build_auth_request(ac, reply_buf, reply_len);
+ ret = ceph_build_auth_request(ac, reply_buf, reply_len);
} else if (ret) {
pr_err("auth method '%s' error %d\n", ac->ops->name, ret);
- return ret;
}
- return 0;
-bad:
- pr_err("failed to decode auth msg\n");
out:
+ mutex_unlock(&ac->mutex);
return ret;
+
+bad:
+ pr_err("failed to decode auth msg\n");
+ ret = -EINVAL;
+ goto out;
}
int ceph_build_auth(struct ceph_auth_client *ac,
void *msg_buf, size_t msg_len)
{
+ int ret = 0;
+
+ mutex_lock(&ac->mutex);
if (!ac->protocol)
- return ceph_auth_build_hello(ac, msg_buf, msg_len);
- BUG_ON(!ac->ops);
- if (ac->ops->should_authenticate(ac))
- return ceph_build_auth_request(ac, msg_buf, msg_len);
- return 0;
+ ret = ceph_auth_build_hello(ac, msg_buf, msg_len);
+ else if (ac->ops->should_authenticate(ac))
+ ret = ceph_build_auth_request(ac, msg_buf, msg_len);
+ mutex_unlock(&ac->mutex);
+ return ret;
}
int ceph_auth_is_authenticated(struct ceph_auth_client *ac)
{
- if (!ac->ops)
- return 0;
- return ac->ops->is_authenticated(ac);
+ int ret = 0;
+
+ mutex_lock(&ac->mutex);
+ if (ac->ops)
+ ret = ac->ops->is_authenticated(ac);
+ mutex_unlock(&ac->mutex);
+ return ret;
+}
+EXPORT_SYMBOL(ceph_auth_is_authenticated);
+
+int ceph_auth_create_authorizer(struct ceph_auth_client *ac,
+ int peer_type,
+ struct ceph_auth_handshake *auth)
+{
+ int ret = 0;
+
+ mutex_lock(&ac->mutex);
+ if (ac->ops && ac->ops->create_authorizer)
+ ret = ac->ops->create_authorizer(ac, peer_type, auth);
+ mutex_unlock(&ac->mutex);
+ return ret;
+}
+EXPORT_SYMBOL(ceph_auth_create_authorizer);
+
+void ceph_auth_destroy_authorizer(struct ceph_auth_client *ac,
+ struct ceph_authorizer *a)
+{
+ mutex_lock(&ac->mutex);
+ if (ac->ops && ac->ops->destroy_authorizer)
+ ac->ops->destroy_authorizer(ac, a);
+ mutex_unlock(&ac->mutex);
+}
+EXPORT_SYMBOL(ceph_auth_destroy_authorizer);
+
+int ceph_auth_update_authorizer(struct ceph_auth_client *ac,
+ int peer_type,
+ struct ceph_auth_handshake *a)
+{
+ int ret = 0;
+
+ mutex_lock(&ac->mutex);
+ if (ac->ops && ac->ops->update_authorizer)
+ ret = ac->ops->update_authorizer(ac, peer_type, a);
+ mutex_unlock(&ac->mutex);
+ return ret;
+}
+EXPORT_SYMBOL(ceph_auth_update_authorizer);
+
+int ceph_auth_verify_authorizer_reply(struct ceph_auth_client *ac,
+ struct ceph_authorizer *a, size_t len)
+{
+ int ret = 0;
+
+ mutex_lock(&ac->mutex);
+ if (ac->ops && ac->ops->verify_authorizer_reply)
+ ret = ac->ops->verify_authorizer_reply(ac, a, len);
+ mutex_unlock(&ac->mutex);
+ return ret;
+}
+EXPORT_SYMBOL(ceph_auth_verify_authorizer_reply);
+
+void ceph_auth_invalidate_authorizer(struct ceph_auth_client *ac, int peer_type)
+{
+ mutex_lock(&ac->mutex);
+ if (ac->ops && ac->ops->invalidate_authorizer)
+ ac->ops->invalidate_authorizer(ac, peer_type);
+ mutex_unlock(&ac->mutex);
}
+EXPORT_SYMBOL(ceph_auth_invalidate_authorizer);
diff --git a/net/ceph/auth_x.c b/net/ceph/auth_x.c
index a16bf14eb027..96238ba95f2b 100644
--- a/net/ceph/auth_x.c
+++ b/net/ceph/auth_x.c
@@ -298,6 +298,7 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac,
return -ENOMEM;
}
au->service = th->service;
+ au->secret_id = th->secret_id;
msg_a = au->buf->vec.iov_base;
msg_a->struct_v = 1;
@@ -555,6 +556,26 @@ static int ceph_x_create_authorizer(
return 0;
}
+static int ceph_x_update_authorizer(
+ struct ceph_auth_client *ac, int peer_type,
+ struct ceph_auth_handshake *auth)
+{
+ struct ceph_x_authorizer *au;
+ struct ceph_x_ticket_handler *th;
+
+ th = get_ticket_handler(ac, peer_type);
+ if (IS_ERR(th))
+ return PTR_ERR(th);
+
+ au = (struct ceph_x_authorizer *)auth->authorizer;
+ if (au->secret_id < th->secret_id) {
+ dout("ceph_x_update_authorizer service %u secret %llu < %llu\n",
+ au->service, au->secret_id, th->secret_id);
+ return ceph_x_build_authorizer(ac, th, au);
+ }
+ return 0;
+}
+
static int ceph_x_verify_authorizer_reply(struct ceph_auth_client *ac,
struct ceph_authorizer *a, size_t len)
{
@@ -630,7 +651,7 @@ static void ceph_x_invalidate_authorizer(struct ceph_auth_client *ac,
th = get_ticket_handler(ac, peer_type);
if (!IS_ERR(th))
- remove_ticket_handler(ac, th);
+ memset(&th->validity, 0, sizeof(th->validity));
}
@@ -641,6 +662,7 @@ static const struct ceph_auth_client_ops ceph_x_ops = {
.build_request = ceph_x_build_request,
.handle_reply = ceph_x_handle_reply,
.create_authorizer = ceph_x_create_authorizer,
+ .update_authorizer = ceph_x_update_authorizer,
.verify_authorizer_reply = ceph_x_verify_authorizer_reply,
.destroy_authorizer = ceph_x_destroy_authorizer,
.invalidate_authorizer = ceph_x_invalidate_authorizer,
diff --git a/net/ceph/auth_x.h b/net/ceph/auth_x.h
index f459e93b774f..c5a058da7ac8 100644
--- a/net/ceph/auth_x.h
+++ b/net/ceph/auth_x.h
@@ -29,6 +29,7 @@ struct ceph_x_authorizer {
struct ceph_buffer *buf;
unsigned int service;
u64 nonce;
+ u64 secret_id;
char reply_buf[128]; /* big enough for encrypted blob */
};
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index ee71ea26777a..34b11ee8124e 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -15,6 +15,8 @@
#include <linux/slab.h>
#include <linux/statfs.h>
#include <linux/string.h>
+#include <linux/nsproxy.h>
+#include <net/net_namespace.h>
#include <linux/ceph/ceph_features.h>
@@ -26,6 +28,22 @@
#include "crypto.h"
+/*
+ * Module compatibility interface. For now it doesn't do anything,
+ * but its existence signals a certain level of functionality.
+ *
+ * The data buffer is used to pass information both to and from
+ * libceph. The return value indicates whether libceph determines
+ * it is compatible with the caller (from another kernel module),
+ * given the provided data.
+ *
+ * The data pointer can be null.
+ */
+bool libceph_compatible(void *data)
+{
+ return true;
+}
+EXPORT_SYMBOL(libceph_compatible);
/*
* find filename portion of a path (/foo/bar/baz -> baz)
@@ -292,6 +310,9 @@ ceph_parse_options(char *options, const char *dev_name,
int err = -ENOMEM;
substring_t argstr[MAX_OPT_ARGS];
+ if (current->nsproxy->net_ns != &init_net)
+ return ERR_PTR(-EINVAL);
+
opt = kzalloc(sizeof(*opt), GFP_KERNEL);
if (!opt)
return ERR_PTR(-ENOMEM);
@@ -585,13 +606,17 @@ static int __init init_ceph_lib(void)
if (ret < 0)
goto out_crypto;
- pr_info("loaded (mon/osd proto %d/%d, osdmap %d/%d %d/%d)\n",
- CEPH_MONC_PROTOCOL, CEPH_OSDC_PROTOCOL,
- CEPH_OSDMAP_VERSION, CEPH_OSDMAP_VERSION_EXT,
- CEPH_OSDMAP_INC_VERSION, CEPH_OSDMAP_INC_VERSION_EXT);
+ ret = ceph_osdc_setup();
+ if (ret < 0)
+ goto out_msgr;
+
+ pr_info("loaded (mon/osd proto %d/%d)\n",
+ CEPH_MONC_PROTOCOL, CEPH_OSDC_PROTOCOL);
return 0;
+out_msgr:
+ ceph_msgr_exit();
out_crypto:
ceph_crypto_shutdown();
out_debugfs:
@@ -603,6 +628,7 @@ out:
static void __exit exit_ceph_lib(void)
{
dout("exit_ceph_lib\n");
+ ceph_osdc_cleanup();
ceph_msgr_exit();
ceph_crypto_shutdown();
ceph_debugfs_cleanup();
diff --git a/net/ceph/ceph_strings.c b/net/ceph/ceph_strings.c
index 3fbda04de29c..1348df96fe15 100644
--- a/net/ceph/ceph_strings.c
+++ b/net/ceph/ceph_strings.c
@@ -21,9 +21,15 @@ const char *ceph_osd_op_name(int op)
switch (op) {
case CEPH_OSD_OP_READ: return "read";
case CEPH_OSD_OP_STAT: return "stat";
+ case CEPH_OSD_OP_MAPEXT: return "mapext";
+ case CEPH_OSD_OP_SPARSE_READ: return "sparse-read";
+ case CEPH_OSD_OP_NOTIFY: return "notify";
+ case CEPH_OSD_OP_NOTIFY_ACK: return "notify-ack";
+ case CEPH_OSD_OP_ASSERT_VER: return "assert-version";
case CEPH_OSD_OP_MASKTRUNC: return "masktrunc";
+ case CEPH_OSD_OP_CREATE: return "create";
case CEPH_OSD_OP_WRITE: return "write";
case CEPH_OSD_OP_DELETE: return "delete";
case CEPH_OSD_OP_TRUNCATE: return "truncate";
@@ -39,6 +45,11 @@ const char *ceph_osd_op_name(int op)
case CEPH_OSD_OP_TMAPUP: return "tmapup";
case CEPH_OSD_OP_TMAPGET: return "tmapget";
case CEPH_OSD_OP_TMAPPUT: return "tmapput";
+ case CEPH_OSD_OP_WATCH: return "watch";
+
+ case CEPH_OSD_OP_CLONERANGE: return "clonerange";
+ case CEPH_OSD_OP_ASSERT_SRC_VERSION: return "assert-src-version";
+ case CEPH_OSD_OP_SRC_CMPXATTR: return "src-cmpxattr";
case CEPH_OSD_OP_GETXATTR: return "getxattr";
case CEPH_OSD_OP_GETXATTRS: return "getxattrs";
@@ -53,6 +64,10 @@ const char *ceph_osd_op_name(int op)
case CEPH_OSD_OP_BALANCEREADS: return "balance-reads";
case CEPH_OSD_OP_UNBALANCEREADS: return "unbalance-reads";
case CEPH_OSD_OP_SCRUB: return "scrub";
+ case CEPH_OSD_OP_SCRUB_RESERVE: return "scrub-reserve";
+ case CEPH_OSD_OP_SCRUB_UNRESERVE: return "scrub-unreserve";
+ case CEPH_OSD_OP_SCRUB_STOP: return "scrub-stop";
+ case CEPH_OSD_OP_SCRUB_MAP: return "scrub-map";
case CEPH_OSD_OP_WRLOCK: return "wrlock";
case CEPH_OSD_OP_WRUNLOCK: return "wrunlock";
@@ -64,10 +79,34 @@ const char *ceph_osd_op_name(int op)
case CEPH_OSD_OP_CALL: return "call";
case CEPH_OSD_OP_PGLS: return "pgls";
+ case CEPH_OSD_OP_PGLS_FILTER: return "pgls-filter";
+ case CEPH_OSD_OP_OMAPGETKEYS: return "omap-get-keys";
+ case CEPH_OSD_OP_OMAPGETVALS: return "omap-get-vals";
+ case CEPH_OSD_OP_OMAPGETHEADER: return "omap-get-header";
+ case CEPH_OSD_OP_OMAPGETVALSBYKEYS: return "omap-get-vals-by-keys";
+ case CEPH_OSD_OP_OMAPSETVALS: return "omap-set-vals";
+ case CEPH_OSD_OP_OMAPSETHEADER: return "omap-set-header";
+ case CEPH_OSD_OP_OMAPCLEAR: return "omap-clear";
+ case CEPH_OSD_OP_OMAPRMKEYS: return "omap-rm-keys";
}
return "???";
}
+const char *ceph_osd_state_name(int s)
+{
+ switch (s) {
+ case CEPH_OSD_EXISTS:
+ return "exists";
+ case CEPH_OSD_UP:
+ return "up";
+ case CEPH_OSD_AUTOOUT:
+ return "autoout";
+ case CEPH_OSD_NEW:
+ return "new";
+ default:
+ return "???";
+ }
+}
const char *ceph_pool_op_name(int op)
{
diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c
index 35fce755ce10..cbd06a91941c 100644
--- a/net/ceph/crush/mapper.c
+++ b/net/ceph/crush/mapper.c
@@ -287,6 +287,7 @@ static int is_out(const struct crush_map *map, const __u32 *weight, int item, in
* @outpos: our position in that vector
* @firstn: true if choosing "first n" items, false if choosing "indep"
* @recurse_to_leaf: true if we want one device under each item of given type
+ * @descend_once: true if we should only try one descent before giving up
* @out2: second output vector for leaf items (if @recurse_to_leaf)
*/
static int crush_choose(const struct crush_map *map,
@@ -295,7 +296,7 @@ static int crush_choose(const struct crush_map *map,
int x, int numrep, int type,
int *out, int outpos,
int firstn, int recurse_to_leaf,
- int *out2)
+ int descend_once, int *out2)
{
int rep;
unsigned int ftotal, flocal;
@@ -391,7 +392,7 @@ static int crush_choose(const struct crush_map *map,
}
reject = 0;
- if (recurse_to_leaf) {
+ if (!collide && recurse_to_leaf) {
if (item < 0) {
if (crush_choose(map,
map->buckets[-1-item],
@@ -399,6 +400,7 @@ static int crush_choose(const struct crush_map *map,
x, outpos+1, 0,
out2, outpos,
firstn, 0,
+ map->chooseleaf_descend_once,
NULL) <= outpos)
/* didn't get leaf */
reject = 1;
@@ -422,7 +424,10 @@ reject:
ftotal++;
flocal++;
- if (collide && flocal <= map->choose_local_tries)
+ if (reject && descend_once)
+ /* let outer call try again */
+ skip_rep = 1;
+ else if (collide && flocal <= map->choose_local_tries)
/* retry locally a few times */
retry_bucket = 1;
else if (map->choose_local_fallback_tries > 0 &&
@@ -485,6 +490,7 @@ int crush_do_rule(const struct crush_map *map,
int i, j;
int numrep;
int firstn;
+ const int descend_once = 0;
if ((__u32)ruleno >= map->max_rules) {
dprintk(" bad ruleno %d\n", ruleno);
@@ -544,7 +550,8 @@ int crush_do_rule(const struct crush_map *map,
curstep->arg2,
o+osize, j,
firstn,
- recurse_to_leaf, c+osize);
+ recurse_to_leaf,
+ descend_once, c+osize);
}
if (recurse_to_leaf)
diff --git a/net/ceph/crypto.c b/net/ceph/crypto.c
index af14cb425164..6e7a236525b6 100644
--- a/net/ceph/crypto.c
+++ b/net/ceph/crypto.c
@@ -423,7 +423,8 @@ int ceph_encrypt2(struct ceph_crypto_key *secret, void *dst, size_t *dst_len,
}
}
-int ceph_key_instantiate(struct key *key, struct key_preparsed_payload *prep)
+static int ceph_key_instantiate(struct key *key,
+ struct key_preparsed_payload *prep)
{
struct ceph_crypto_key *ckey;
size_t datalen = prep->datalen;
@@ -458,12 +459,12 @@ err:
return ret;
}
-int ceph_key_match(const struct key *key, const void *description)
+static int ceph_key_match(const struct key *key, const void *description)
{
return strcmp(key->description, description) == 0;
}
-void ceph_key_destroy(struct key *key) {
+static void ceph_key_destroy(struct key *key) {
struct ceph_crypto_key *ckey = key->payload.data;
ceph_crypto_key_destroy(ckey);
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c
index 38b5dc1823d4..83661cdc0766 100644
--- a/net/ceph/debugfs.c
+++ b/net/ceph/debugfs.c
@@ -66,9 +66,9 @@ static int osdmap_show(struct seq_file *s, void *p)
for (n = rb_first(&client->osdc.osdmap->pg_pools); n; n = rb_next(n)) {
struct ceph_pg_pool_info *pool =
rb_entry(n, struct ceph_pg_pool_info, node);
- seq_printf(s, "pg_pool %d pg_num %d / %d, lpg_num %d / %d\n",
- pool->id, pool->v.pg_num, pool->pg_num_mask,
- pool->v.lpg_num, pool->lpg_num_mask);
+ seq_printf(s, "pg_pool %llu pg_num %d / %d\n",
+ (unsigned long long)pool->id, pool->pg_num,
+ pool->pg_num_mask);
}
for (i = 0; i < client->osdc.osdmap->max_osd; i++) {
struct ceph_entity_addr *addr =
@@ -123,26 +123,16 @@ static int osdc_show(struct seq_file *s, void *pp)
mutex_lock(&osdc->request_mutex);
for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
struct ceph_osd_request *req;
- struct ceph_osd_request_head *head;
- struct ceph_osd_op *op;
- int num_ops;
- int opcode, olen;
- int i;
+ unsigned int i;
+ int opcode;
req = rb_entry(p, struct ceph_osd_request, r_node);
- seq_printf(s, "%lld\tosd%d\t%d.%x\t", req->r_tid,
+ seq_printf(s, "%lld\tosd%d\t%lld.%x\t", req->r_tid,
req->r_osd ? req->r_osd->o_osd : -1,
- le32_to_cpu(req->r_pgid.pool),
- le16_to_cpu(req->r_pgid.ps));
+ req->r_pgid.pool, req->r_pgid.seed);
- head = req->r_request->front.iov_base;
- op = (void *)(head + 1);
-
- num_ops = le16_to_cpu(head->num_ops);
- olen = le32_to_cpu(head->object_len);
- seq_printf(s, "%.*s", olen,
- (const char *)(head->ops + num_ops));
+ seq_printf(s, "%.*s", req->r_oid_len, req->r_oid);
if (req->r_reassert_version.epoch)
seq_printf(s, "\t%u'%llu",
@@ -151,10 +141,9 @@ static int osdc_show(struct seq_file *s, void *pp)
else
seq_printf(s, "\t");
- for (i = 0; i < num_ops; i++) {
- opcode = le16_to_cpu(op->op);
+ for (i = 0; i < req->r_num_ops; i++) {
+ opcode = req->r_ops[i].op;
seq_printf(s, "\t%s", ceph_osd_op_name(opcode));
- op++;
}
seq_printf(s, "\n");
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 5ccf87ed8d68..eb0a46a49bd4 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -9,8 +9,9 @@
#include <linux/slab.h>
#include <linux/socket.h>
#include <linux/string.h>
+#ifdef CONFIG_BLOCK
#include <linux/bio.h>
-#include <linux/blkdev.h>
+#endif /* CONFIG_BLOCK */
#include <linux/dns_resolver.h>
#include <net/tcp.h>
@@ -20,6 +21,9 @@
#include <linux/ceph/pagelist.h>
#include <linux/export.h>
+#define list_entry_next(pos, member) \
+ list_entry(pos->member.next, typeof(*pos), member)
+
/*
* Ceph uses the messenger to exchange ceph_msg messages with other
* hosts in the system. The messenger provides ordered and reliable
@@ -97,6 +101,62 @@
#define CON_FLAG_SOCK_CLOSED 3 /* socket state changed to closed */
#define CON_FLAG_BACKOFF 4 /* need to retry queuing delayed work */
+static bool con_flag_valid(unsigned long con_flag)
+{
+ switch (con_flag) {
+ case CON_FLAG_LOSSYTX:
+ case CON_FLAG_KEEPALIVE_PENDING:
+ case CON_FLAG_WRITE_PENDING:
+ case CON_FLAG_SOCK_CLOSED:
+ case CON_FLAG_BACKOFF:
+ return true;
+ default:
+ return false;
+ }
+}
+
+static void con_flag_clear(struct ceph_connection *con, unsigned long con_flag)
+{
+ BUG_ON(!con_flag_valid(con_flag));
+
+ clear_bit(con_flag, &con->flags);
+}
+
+static void con_flag_set(struct ceph_connection *con, unsigned long con_flag)
+{
+ BUG_ON(!con_flag_valid(con_flag));
+
+ set_bit(con_flag, &con->flags);
+}
+
+static bool con_flag_test(struct ceph_connection *con, unsigned long con_flag)
+{
+ BUG_ON(!con_flag_valid(con_flag));
+
+ return test_bit(con_flag, &con->flags);
+}
+
+static bool con_flag_test_and_clear(struct ceph_connection *con,
+ unsigned long con_flag)
+{
+ BUG_ON(!con_flag_valid(con_flag));
+
+ return test_and_clear_bit(con_flag, &con->flags);
+}
+
+static bool con_flag_test_and_set(struct ceph_connection *con,
+ unsigned long con_flag)
+{
+ BUG_ON(!con_flag_valid(con_flag));
+
+ return test_and_set_bit(con_flag, &con->flags);
+}
+
+/* Slab caches for frequently-allocated structures */
+
+static struct kmem_cache *ceph_msg_cache;
+static struct kmem_cache *ceph_msg_data_cache;
+
/* static tag bytes (protocol control messages) */
static char tag_msg = CEPH_MSGR_TAG_MSG;
static char tag_ack = CEPH_MSGR_TAG_ACK;
@@ -114,7 +174,7 @@ static struct lock_class_key socket_class;
static void queue_con(struct ceph_connection *con);
static void con_work(struct work_struct *);
-static void ceph_fault(struct ceph_connection *con);
+static void con_fault(struct ceph_connection *con);
/*
* Nicely render a sockaddr as a string. An array of formatted
@@ -171,13 +231,50 @@ static void encode_my_addr(struct ceph_messenger *msgr)
*/
static struct workqueue_struct *ceph_msgr_wq;
-void _ceph_msgr_exit(void)
+static int ceph_msgr_slab_init(void)
+{
+ BUG_ON(ceph_msg_cache);
+ ceph_msg_cache = kmem_cache_create("ceph_msg",
+ sizeof (struct ceph_msg),
+ __alignof__(struct ceph_msg), 0, NULL);
+
+ if (!ceph_msg_cache)
+ return -ENOMEM;
+
+ BUG_ON(ceph_msg_data_cache);
+ ceph_msg_data_cache = kmem_cache_create("ceph_msg_data",
+ sizeof (struct ceph_msg_data),
+ __alignof__(struct ceph_msg_data),
+ 0, NULL);
+ if (ceph_msg_data_cache)
+ return 0;
+
+ kmem_cache_destroy(ceph_msg_cache);
+ ceph_msg_cache = NULL;
+
+ return -ENOMEM;
+}
+
+static void ceph_msgr_slab_exit(void)
+{
+ BUG_ON(!ceph_msg_data_cache);
+ kmem_cache_destroy(ceph_msg_data_cache);
+ ceph_msg_data_cache = NULL;
+
+ BUG_ON(!ceph_msg_cache);
+ kmem_cache_destroy(ceph_msg_cache);
+ ceph_msg_cache = NULL;
+}
+
+static void _ceph_msgr_exit(void)
{
if (ceph_msgr_wq) {
destroy_workqueue(ceph_msgr_wq);
ceph_msgr_wq = NULL;
}
+ ceph_msgr_slab_exit();
+
BUG_ON(zero_page == NULL);
kunmap(zero_page);
page_cache_release(zero_page);
@@ -190,6 +287,9 @@ int ceph_msgr_init(void)
zero_page = ZERO_PAGE(0);
page_cache_get(zero_page);
+ if (ceph_msgr_slab_init())
+ return -ENOMEM;
+
ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0);
if (ceph_msgr_wq)
return 0;
@@ -308,7 +408,7 @@ static void ceph_sock_write_space(struct sock *sk)
* buffer. See net/ipv4/tcp_input.c:tcp_check_space()
* and net/core/stream.c:sk_stream_write_space().
*/
- if (test_bit(CON_FLAG_WRITE_PENDING, &con->flags)) {
+ if (con_flag_test(con, CON_FLAG_WRITE_PENDING)) {
if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
dout("%s %p queueing write work\n", __func__, con);
clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
@@ -333,7 +433,7 @@ static void ceph_sock_state_change(struct sock *sk)
case TCP_CLOSE_WAIT:
dout("%s TCP_CLOSE_WAIT\n", __func__);
con_sock_state_closing(con);
- set_bit(CON_FLAG_SOCK_CLOSED, &con->flags);
+ con_flag_set(con, CON_FLAG_SOCK_CLOSED);
queue_con(con);
break;
case TCP_ESTABLISHED:
@@ -419,6 +519,22 @@ static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
return r;
}
+static int ceph_tcp_recvpage(struct socket *sock, struct page *page,
+ int page_offset, size_t length)
+{
+ void *kaddr;
+ int ret;
+
+ BUG_ON(page_offset + length > PAGE_SIZE);
+
+ kaddr = kmap(page);
+ BUG_ON(!kaddr);
+ ret = ceph_tcp_recvmsg(sock, kaddr + page_offset, length);
+ kunmap(page);
+
+ return ret;
+}
+
/*
* write something. @more is true if caller will be sending more data
* shortly.
@@ -441,7 +557,7 @@ static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
}
static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
- int offset, size_t size, int more)
+ int offset, size_t size, bool more)
{
int flags = MSG_DONTWAIT | MSG_NOSIGNAL | (more ? MSG_MORE : MSG_EOR);
int ret;
@@ -474,7 +590,7 @@ static int con_close_socket(struct ceph_connection *con)
* received a socket close event before we had the chance to
* shut the socket down.
*/
- clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags);
+ con_flag_clear(con, CON_FLAG_SOCK_CLOSED);
con_sock_state_closed(con);
return rc;
@@ -538,11 +654,10 @@ void ceph_con_close(struct ceph_connection *con)
ceph_pr_addr(&con->peer_addr.in_addr));
con->state = CON_STATE_CLOSED;
- clear_bit(CON_FLAG_LOSSYTX, &con->flags); /* so we retry next connect */
- clear_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags);
- clear_bit(CON_FLAG_WRITE_PENDING, &con->flags);
- clear_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags);
- clear_bit(CON_FLAG_BACKOFF, &con->flags);
+ con_flag_clear(con, CON_FLAG_LOSSYTX); /* so we retry next connect */
+ con_flag_clear(con, CON_FLAG_KEEPALIVE_PENDING);
+ con_flag_clear(con, CON_FLAG_WRITE_PENDING);
+ con_flag_clear(con, CON_FLAG_BACKOFF);
reset_connection(con);
con->peer_global_seq = 0;
@@ -646,50 +761,397 @@ static void con_out_kvec_add(struct ceph_connection *con,
}
#ifdef CONFIG_BLOCK
-static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg)
+
+/*
+ * For a bio data item, a piece is whatever remains of the next
+ * entry in the current bio iovec, or the first entry in the next
+ * bio in the list.
+ */
+static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data_cursor *cursor,
+ size_t length)
{
- if (!bio) {
- *iter = NULL;
- *seg = 0;
- return;
+ struct ceph_msg_data *data = cursor->data;
+ struct bio *bio;
+
+ BUG_ON(data->type != CEPH_MSG_DATA_BIO);
+
+ bio = data->bio;
+ BUG_ON(!bio);
+ BUG_ON(!bio->bi_vcnt);
+
+ cursor->resid = min(length, data->bio_length);
+ cursor->bio = bio;
+ cursor->vector_index = 0;
+ cursor->vector_offset = 0;
+ cursor->last_piece = length <= bio->bi_io_vec[0].bv_len;
+}
+
+static struct page *ceph_msg_data_bio_next(struct ceph_msg_data_cursor *cursor,
+ size_t *page_offset,
+ size_t *length)
+{
+ struct ceph_msg_data *data = cursor->data;
+ struct bio *bio;
+ struct bio_vec *bio_vec;
+ unsigned int index;
+
+ BUG_ON(data->type != CEPH_MSG_DATA_BIO);
+
+ bio = cursor->bio;
+ BUG_ON(!bio);
+
+ index = cursor->vector_index;
+ BUG_ON(index >= (unsigned int) bio->bi_vcnt);
+
+ bio_vec = &bio->bi_io_vec[index];
+ BUG_ON(cursor->vector_offset >= bio_vec->bv_len);
+ *page_offset = (size_t) (bio_vec->bv_offset + cursor->vector_offset);
+ BUG_ON(*page_offset >= PAGE_SIZE);
+ if (cursor->last_piece) /* pagelist offset is always 0 */
+ *length = cursor->resid;
+ else
+ *length = (size_t) (bio_vec->bv_len - cursor->vector_offset);
+ BUG_ON(*length > cursor->resid);
+ BUG_ON(*page_offset + *length > PAGE_SIZE);
+
+ return bio_vec->bv_page;
+}
+
+static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor,
+ size_t bytes)
+{
+ struct bio *bio;
+ struct bio_vec *bio_vec;
+ unsigned int index;
+
+ BUG_ON(cursor->data->type != CEPH_MSG_DATA_BIO);
+
+ bio = cursor->bio;
+ BUG_ON(!bio);
+
+ index = cursor->vector_index;
+ BUG_ON(index >= (unsigned int) bio->bi_vcnt);
+ bio_vec = &bio->bi_io_vec[index];
+
+ /* Advance the cursor offset */
+
+ BUG_ON(cursor->resid < bytes);
+ cursor->resid -= bytes;
+ cursor->vector_offset += bytes;
+ if (cursor->vector_offset < bio_vec->bv_len)
+ return false; /* more bytes to process in this segment */
+ BUG_ON(cursor->vector_offset != bio_vec->bv_len);
+
+ /* Move on to the next segment, and possibly the next bio */
+
+ if (++index == (unsigned int) bio->bi_vcnt) {
+ bio = bio->bi_next;
+ index = 0;
+ }
+ cursor->bio = bio;
+ cursor->vector_index = index;
+ cursor->vector_offset = 0;
+
+ if (!cursor->last_piece) {
+ BUG_ON(!cursor->resid);
+ BUG_ON(!bio);
+ /* A short read is OK, so use <= rather than == */
+ if (cursor->resid <= bio->bi_io_vec[index].bv_len)
+ cursor->last_piece = true;
}
- *iter = bio;
- *seg = bio->bi_idx;
+
+ return true;
}
+#endif /* CONFIG_BLOCK */
-static void iter_bio_next(struct bio **bio_iter, int *seg)
+/*
+ * For a page array, a piece comes from the first page in the array
+ * that has not already been fully consumed.
+ */
+static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data_cursor *cursor,
+ size_t length)
{
- if (*bio_iter == NULL)
- return;
+ struct ceph_msg_data *data = cursor->data;
+ int page_count;
+
+ BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
- BUG_ON(*seg >= (*bio_iter)->bi_vcnt);
+ BUG_ON(!data->pages);
+ BUG_ON(!data->length);
- (*seg)++;
- if (*seg == (*bio_iter)->bi_vcnt)
- init_bio_iter((*bio_iter)->bi_next, bio_iter, seg);
+ cursor->resid = min(length, data->length);
+ page_count = calc_pages_for(data->alignment, (u64)data->length);
+ cursor->page_offset = data->alignment & ~PAGE_MASK;
+ cursor->page_index = 0;
+ BUG_ON(page_count > (int)USHRT_MAX);
+ cursor->page_count = (unsigned short)page_count;
+ BUG_ON(length > SIZE_MAX - cursor->page_offset);
+ cursor->last_piece = (size_t)cursor->page_offset + length <= PAGE_SIZE;
}
-#endif
-static void prepare_write_message_data(struct ceph_connection *con)
+static struct page *
+ceph_msg_data_pages_next(struct ceph_msg_data_cursor *cursor,
+ size_t *page_offset, size_t *length)
{
- struct ceph_msg *msg = con->out_msg;
+ struct ceph_msg_data *data = cursor->data;
- BUG_ON(!msg);
- BUG_ON(!msg->hdr.data_len);
+ BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
+
+ BUG_ON(cursor->page_index >= cursor->page_count);
+ BUG_ON(cursor->page_offset >= PAGE_SIZE);
- /* initialize page iterator */
- con->out_msg_pos.page = 0;
- if (msg->pages)
- con->out_msg_pos.page_pos = msg->page_alignment;
+ *page_offset = cursor->page_offset;
+ if (cursor->last_piece)
+ *length = cursor->resid;
else
- con->out_msg_pos.page_pos = 0;
+ *length = PAGE_SIZE - *page_offset;
+
+ return data->pages[cursor->page_index];
+}
+
+static bool ceph_msg_data_pages_advance(struct ceph_msg_data_cursor *cursor,
+ size_t bytes)
+{
+ BUG_ON(cursor->data->type != CEPH_MSG_DATA_PAGES);
+
+ BUG_ON(cursor->page_offset + bytes > PAGE_SIZE);
+
+ /* Advance the cursor page offset */
+
+ cursor->resid -= bytes;
+ cursor->page_offset = (cursor->page_offset + bytes) & ~PAGE_MASK;
+ if (!bytes || cursor->page_offset)
+ return false; /* more bytes to process in the current page */
+
+ /* Move on to the next page; offset is already at 0 */
+
+ BUG_ON(cursor->page_index >= cursor->page_count);
+ cursor->page_index++;
+ cursor->last_piece = cursor->resid <= PAGE_SIZE;
+
+ return true;
+}
+
+/*
+ * For a pagelist, a piece is whatever remains to be consumed in the
+ * first page in the list, or the front of the next page.
+ */
+static void
+ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data_cursor *cursor,
+ size_t length)
+{
+ struct ceph_msg_data *data = cursor->data;
+ struct ceph_pagelist *pagelist;
+ struct page *page;
+
+ BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
+
+ pagelist = data->pagelist;
+ BUG_ON(!pagelist);
+
+ if (!length)
+ return; /* pagelist can be assigned but empty */
+
+ BUG_ON(list_empty(&pagelist->head));
+ page = list_first_entry(&pagelist->head, struct page, lru);
+
+ cursor->resid = min(length, pagelist->length);
+ cursor->page = page;
+ cursor->offset = 0;
+ cursor->last_piece = cursor->resid <= PAGE_SIZE;
+}
+
+static struct page *
+ceph_msg_data_pagelist_next(struct ceph_msg_data_cursor *cursor,
+ size_t *page_offset, size_t *length)
+{
+ struct ceph_msg_data *data = cursor->data;
+ struct ceph_pagelist *pagelist;
+
+ BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
+
+ pagelist = data->pagelist;
+ BUG_ON(!pagelist);
+
+ BUG_ON(!cursor->page);
+ BUG_ON(cursor->offset + cursor->resid != pagelist->length);
+
+ /* offset of first page in pagelist is always 0 */
+ *page_offset = cursor->offset & ~PAGE_MASK;
+ if (cursor->last_piece)
+ *length = cursor->resid;
+ else
+ *length = PAGE_SIZE - *page_offset;
+
+ return cursor->page;
+}
+
+static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor,
+ size_t bytes)
+{
+ struct ceph_msg_data *data = cursor->data;
+ struct ceph_pagelist *pagelist;
+
+ BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
+
+ pagelist = data->pagelist;
+ BUG_ON(!pagelist);
+
+ BUG_ON(cursor->offset + cursor->resid != pagelist->length);
+ BUG_ON((cursor->offset & ~PAGE_MASK) + bytes > PAGE_SIZE);
+
+ /* Advance the cursor offset */
+
+ cursor->resid -= bytes;
+ cursor->offset += bytes;
+ /* offset of first page in pagelist is always 0 */
+ if (!bytes || cursor->offset & ~PAGE_MASK)
+ return false; /* more bytes to process in the current page */
+
+ /* Move on to the next page */
+
+ BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head));
+ cursor->page = list_entry_next(cursor->page, lru);
+ cursor->last_piece = cursor->resid <= PAGE_SIZE;
+
+ return true;
+}
+
+/*
+ * Message data is handled (sent or received) in pieces, where each
+ * piece resides on a single page. The network layer might not
+ * consume an entire piece at once. A data item's cursor keeps
+ * track of which piece is next to process and how much remains to
+ * be processed in that piece. It also tracks whether the current
+ * piece is the last one in the data item.
+ */
+static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor)
+{
+ size_t length = cursor->total_resid;
+
+ switch (cursor->data->type) {
+ case CEPH_MSG_DATA_PAGELIST:
+ ceph_msg_data_pagelist_cursor_init(cursor, length);
+ break;
+ case CEPH_MSG_DATA_PAGES:
+ ceph_msg_data_pages_cursor_init(cursor, length);
+ break;
#ifdef CONFIG_BLOCK
- if (msg->bio)
- init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg);
-#endif
- con->out_msg_pos.data_pos = 0;
- con->out_msg_pos.did_page_crc = false;
- con->out_more = 1; /* data + footer will follow */
+ case CEPH_MSG_DATA_BIO:
+ ceph_msg_data_bio_cursor_init(cursor, length);
+ break;
+#endif /* CONFIG_BLOCK */
+ case CEPH_MSG_DATA_NONE:
+ default:
+ /* BUG(); */
+ break;
+ }
+ cursor->need_crc = true;
+}
+
+static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
+{
+ struct ceph_msg_data_cursor *cursor = &msg->cursor;
+ struct ceph_msg_data *data;
+
+ BUG_ON(!length);
+ BUG_ON(length > msg->data_length);
+ BUG_ON(list_empty(&msg->data));
+
+ cursor->data_head = &msg->data;
+ cursor->total_resid = length;
+ data = list_first_entry(&msg->data, struct ceph_msg_data, links);
+ cursor->data = data;
+
+ __ceph_msg_data_cursor_init(cursor);
+}
+
+/*
+ * Return the page containing the next piece to process for a given
+ * data item, and supply the page offset and length of that piece.
+ * Indicate whether this is the last piece in this data item.
+ */
+static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
+ size_t *page_offset, size_t *length,
+ bool *last_piece)
+{
+ struct page *page;
+
+ switch (cursor->data->type) {
+ case CEPH_MSG_DATA_PAGELIST:
+ page = ceph_msg_data_pagelist_next(cursor, page_offset, length);
+ break;
+ case CEPH_MSG_DATA_PAGES:
+ page = ceph_msg_data_pages_next(cursor, page_offset, length);
+ break;
+#ifdef CONFIG_BLOCK
+ case CEPH_MSG_DATA_BIO:
+ page = ceph_msg_data_bio_next(cursor, page_offset, length);
+ break;
+#endif /* CONFIG_BLOCK */
+ case CEPH_MSG_DATA_NONE:
+ default:
+ page = NULL;
+ break;
+ }
+ BUG_ON(!page);
+ BUG_ON(*page_offset + *length > PAGE_SIZE);
+ BUG_ON(!*length);
+ if (last_piece)
+ *last_piece = cursor->last_piece;
+
+ return page;
+}
+
+/*
+ * Returns true if the result moves the cursor on to the next piece
+ * of the data item.
+ */
+static bool ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
+ size_t bytes)
+{
+ bool new_piece;
+
+ BUG_ON(bytes > cursor->resid);
+ switch (cursor->data->type) {
+ case CEPH_MSG_DATA_PAGELIST:
+ new_piece = ceph_msg_data_pagelist_advance(cursor, bytes);
+ break;
+ case CEPH_MSG_DATA_PAGES:
+ new_piece = ceph_msg_data_pages_advance(cursor, bytes);
+ break;
+#ifdef CONFIG_BLOCK
+ case CEPH_MSG_DATA_BIO:
+ new_piece = ceph_msg_data_bio_advance(cursor, bytes);
+ break;
+#endif /* CONFIG_BLOCK */
+ case CEPH_MSG_DATA_NONE:
+ default:
+ BUG();
+ break;
+ }
+ cursor->total_resid -= bytes;
+
+ if (!cursor->resid && cursor->total_resid) {
+ WARN_ON(!cursor->last_piece);
+ BUG_ON(list_is_last(&cursor->data->links, cursor->data_head));
+ cursor->data = list_entry_next(cursor->data, links);
+ __ceph_msg_data_cursor_init(cursor);
+ new_piece = true;
+ }
+ cursor->need_crc = new_piece;
+
+ return new_piece;
+}
+
+static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
+{
+ BUG_ON(!msg);
+ BUG_ON(!data_len);
+
+ /* Initialize data cursor */
+
+ ceph_msg_data_cursor_init(msg, (size_t)data_len);
}
/*
@@ -752,16 +1214,12 @@ static void prepare_write_message(struct ceph_connection *con)
m->hdr.seq = cpu_to_le64(++con->out_seq);
m->needs_out_seq = false;
}
-#ifdef CONFIG_BLOCK
- else
- m->bio_iter = NULL;
-#endif
+ WARN_ON(m->data_length != le32_to_cpu(m->hdr.data_len));
- dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n",
+ dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n",
m, con->out_seq, le16_to_cpu(m->hdr.type),
le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
- le32_to_cpu(m->hdr.data_len),
- m->nr_pages);
+ m->data_length);
BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
/* tag + hdr + front + middle */
@@ -792,13 +1250,15 @@ static void prepare_write_message(struct ceph_connection *con)
/* is there a data payload? */
con->out_msg->footer.data_crc = 0;
- if (m->hdr.data_len)
- prepare_write_message_data(con);
- else
+ if (m->data_length) {
+ prepare_message_data(con->out_msg, m->data_length);
+ con->out_more = 1; /* data + footer will follow */
+ } else {
/* no, queue up footer too and be done */
prepare_write_message_footer(con);
+ }
- set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
+ con_flag_set(con, CON_FLAG_WRITE_PENDING);
}
/*
@@ -819,7 +1279,25 @@ static void prepare_write_ack(struct ceph_connection *con)
&con->out_temp_ack);
con->out_more = 1; /* more will follow.. eventually.. */
- set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
+ con_flag_set(con, CON_FLAG_WRITE_PENDING);
+}
+
+/*
+ * Prepare to share the seq during handshake
+ */
+static void prepare_write_seq(struct ceph_connection *con)
+{
+ dout("prepare_write_seq %p %llu -> %llu\n", con,
+ con->in_seq_acked, con->in_seq);
+ con->in_seq_acked = con->in_seq;
+
+ con_out_kvec_reset(con);
+
+ con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
+ con_out_kvec_add(con, sizeof (con->out_temp_ack),
+ &con->out_temp_ack);
+
+ con_flag_set(con, CON_FLAG_WRITE_PENDING);
}
/*
@@ -830,7 +1308,7 @@ static void prepare_write_keepalive(struct ceph_connection *con)
dout("prepare_write_keepalive %p\n", con);
con_out_kvec_reset(con);
con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive);
- set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
+ con_flag_set(con, CON_FLAG_WRITE_PENDING);
}
/*
@@ -873,7 +1351,7 @@ static void prepare_write_banner(struct ceph_connection *con)
&con->msgr->my_enc_addr);
con->out_more = 0;
- set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
+ con_flag_set(con, CON_FLAG_WRITE_PENDING);
}
static int prepare_write_connect(struct ceph_connection *con)
@@ -923,7 +1401,7 @@ static int prepare_write_connect(struct ceph_connection *con)
auth->authorizer_buf);
con->out_more = 0;
- set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
+ con_flag_set(con, CON_FLAG_WRITE_PENDING);
return 0;
}
@@ -971,35 +1449,19 @@ out:
return ret; /* done! */
}
-static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
- size_t len, size_t sent, bool in_trail)
+static u32 ceph_crc32c_page(u32 crc, struct page *page,
+ unsigned int page_offset,
+ unsigned int length)
{
- struct ceph_msg *msg = con->out_msg;
-
- BUG_ON(!msg);
- BUG_ON(!sent);
+ char *kaddr;
- con->out_msg_pos.data_pos += sent;
- con->out_msg_pos.page_pos += sent;
- if (sent < len)
- return;
+ kaddr = kmap(page);
+ BUG_ON(kaddr == NULL);
+ crc = crc32c(crc, kaddr + page_offset, length);
+ kunmap(page);
- BUG_ON(sent != len);
- con->out_msg_pos.page_pos = 0;
- con->out_msg_pos.page++;
- con->out_msg_pos.did_page_crc = false;
- if (in_trail)
- list_move_tail(&page->lru,
- &msg->trail->head);
- else if (msg->pagelist)
- list_move_tail(&page->lru,
- &msg->pagelist->head);
-#ifdef CONFIG_BLOCK
- else if (msg->bio)
- iter_bio_next(&msg->bio_iter, &msg->bio_seg);
-#endif
+ return crc;
}
-
/*
* Write as much message data payload as we can. If we finish, queue
* up the footer.
@@ -1007,21 +1469,17 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
* 0 -> socket full, but more to do
* <0 -> error
*/
-static int write_partial_msg_pages(struct ceph_connection *con)
+static int write_partial_message_data(struct ceph_connection *con)
{
struct ceph_msg *msg = con->out_msg;
- unsigned int data_len = le32_to_cpu(msg->hdr.data_len);
- size_t len;
+ struct ceph_msg_data_cursor *cursor = &msg->cursor;
bool do_datacrc = !con->msgr->nocrc;
- int ret;
- int total_max_write;
- bool in_trail = false;
- const size_t trail_len = (msg->trail ? msg->trail->length : 0);
- const size_t trail_off = data_len - trail_len;
+ u32 crc;
+
+ dout("%s %p msg %p\n", __func__, con, msg);
- dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n",
- con, msg, con->out_msg_pos.page, msg->nr_pages,
- con->out_msg_pos.page_pos);
+ if (list_empty(&msg->data))
+ return -EINVAL;
/*
* Iterate through each page that contains data to be
@@ -1031,72 +1489,41 @@ static int write_partial_msg_pages(struct ceph_connection *con)
* need to map the page. If we have no pages, they have
* been revoked, so use the zero page.
*/
- while (data_len > con->out_msg_pos.data_pos) {
- struct page *page = NULL;
- int max_write = PAGE_SIZE;
- int bio_offset = 0;
-
- in_trail = in_trail || con->out_msg_pos.data_pos >= trail_off;
- if (!in_trail)
- total_max_write = trail_off - con->out_msg_pos.data_pos;
-
- if (in_trail) {
- total_max_write = data_len - con->out_msg_pos.data_pos;
-
- page = list_first_entry(&msg->trail->head,
- struct page, lru);
- } else if (msg->pages) {
- page = msg->pages[con->out_msg_pos.page];
- } else if (msg->pagelist) {
- page = list_first_entry(&msg->pagelist->head,
- struct page, lru);
-#ifdef CONFIG_BLOCK
- } else if (msg->bio) {
- struct bio_vec *bv;
+ crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0;
+ while (cursor->resid) {
+ struct page *page;
+ size_t page_offset;
+ size_t length;
+ bool last_piece;
+ bool need_crc;
+ int ret;
+
+ page = ceph_msg_data_next(&msg->cursor, &page_offset, &length,
+ &last_piece);
+ ret = ceph_tcp_sendpage(con->sock, page, page_offset,
+ length, last_piece);
+ if (ret <= 0) {
+ if (do_datacrc)
+ msg->footer.data_crc = cpu_to_le32(crc);
- bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg);
- page = bv->bv_page;
- bio_offset = bv->bv_offset;
- max_write = bv->bv_len;
-#endif
- } else {
- page = zero_page;
- }
- len = min_t(int, max_write - con->out_msg_pos.page_pos,
- total_max_write);
-
- if (do_datacrc && !con->out_msg_pos.did_page_crc) {
- void *base;
- u32 crc = le32_to_cpu(msg->footer.data_crc);
- char *kaddr;
-
- kaddr = kmap(page);
- BUG_ON(kaddr == NULL);
- base = kaddr + con->out_msg_pos.page_pos + bio_offset;
- crc = crc32c(crc, base, len);
- kunmap(page);
- msg->footer.data_crc = cpu_to_le32(crc);
- con->out_msg_pos.did_page_crc = true;
+ return ret;
}
- ret = ceph_tcp_sendpage(con->sock, page,
- con->out_msg_pos.page_pos + bio_offset,
- len, 1);
- if (ret <= 0)
- goto out;
-
- out_msg_pos_next(con, page, len, (size_t) ret, in_trail);
+ if (do_datacrc && cursor->need_crc)
+ crc = ceph_crc32c_page(crc, page, page_offset, length);
+ need_crc = ceph_msg_data_advance(&msg->cursor, (size_t)ret);
}
- dout("write_partial_msg_pages %p msg %p done\n", con, msg);
+ dout("%s %p msg %p done\n", __func__, con, msg);
/* prepare and queue up footer, too */
- if (!do_datacrc)
+ if (do_datacrc)
+ msg->footer.data_crc = cpu_to_le32(crc);
+ else
msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
con_out_kvec_reset(con);
prepare_write_message_footer(con);
- ret = 1;
-out:
- return ret;
+
+ return 1; /* must return > 0 to indicate success */
}
/*
@@ -1109,7 +1536,7 @@ static int write_partial_skip(struct ceph_connection *con)
while (con->out_skip > 0) {
size_t size = min(con->out_skip, (int) PAGE_CACHE_SIZE);
- ret = ceph_tcp_sendpage(con->sock, zero_page, 0, size, 1);
+ ret = ceph_tcp_sendpage(con->sock, zero_page, 0, size, true);
if (ret <= 0)
goto out;
con->out_skip -= ret;
@@ -1140,6 +1567,13 @@ static void prepare_read_ack(struct ceph_connection *con)
con->in_base_pos = 0;
}
+static void prepare_read_seq(struct ceph_connection *con)
+{
+ dout("prepare_read_seq %p\n", con);
+ con->in_base_pos = 0;
+ con->in_tag = CEPH_MSGR_TAG_SEQ;
+}
+
static void prepare_read_tag(struct ceph_connection *con)
{
dout("prepare_read_tag %p\n", con);
@@ -1546,7 +1980,6 @@ static int process_connect(struct ceph_connection *con)
con->error_msg = "connect authorization failure";
return -1;
}
- con->auth_retry = 1;
con_out_kvec_reset(con);
ret = prepare_write_connect(con);
if (ret < 0)
@@ -1617,6 +2050,7 @@ static int process_connect(struct ceph_connection *con)
prepare_read_connect(con);
break;
+ case CEPH_MSGR_TAG_SEQ:
case CEPH_MSGR_TAG_READY:
if (req_feat & ~server_feat) {
pr_err("%s%lld %s protocol feature mismatch,"
@@ -1631,7 +2065,7 @@ static int process_connect(struct ceph_connection *con)
WARN_ON(con->state != CON_STATE_NEGOTIATING);
con->state = CON_STATE_OPEN;
-
+ con->auth_retry = 0; /* we authenticated; clear flag */
con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
con->connect_seq++;
con->peer_features = server_feat;
@@ -1643,11 +2077,16 @@ static int process_connect(struct ceph_connection *con)
le32_to_cpu(con->in_reply.connect_seq));
if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
- set_bit(CON_FLAG_LOSSYTX, &con->flags);
+ con_flag_set(con, CON_FLAG_LOSSYTX);
con->delay = 0; /* reset backoff memory */
- prepare_read_tag(con);
+ if (con->in_reply.tag == CEPH_MSGR_TAG_SEQ) {
+ prepare_write_seq(con);
+ prepare_read_seq(con);
+ } else {
+ prepare_read_tag(con);
+ }
break;
case CEPH_MSGR_TAG_WAIT:
@@ -1681,7 +2120,6 @@ static int read_partial_ack(struct ceph_connection *con)
return read_partial(con, end, size, &con->in_temp_ack);
}
-
/*
* We can finally discard anything that's been acked.
*/
@@ -1706,8 +2144,6 @@ static void process_ack(struct ceph_connection *con)
}
-
-
static int read_partial_message_section(struct ceph_connection *con,
struct kvec *section,
unsigned int sec_len, u32 *crc)
@@ -1731,77 +2167,49 @@ static int read_partial_message_section(struct ceph_connection *con,
return 1;
}
-static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip);
-
-static int read_partial_message_pages(struct ceph_connection *con,
- struct page **pages,
- unsigned int data_len, bool do_datacrc)
+static int read_partial_msg_data(struct ceph_connection *con)
{
- void *p;
+ struct ceph_msg *msg = con->in_msg;
+ struct ceph_msg_data_cursor *cursor = &msg->cursor;
+ const bool do_datacrc = !con->msgr->nocrc;
+ struct page *page;
+ size_t page_offset;
+ size_t length;
+ u32 crc = 0;
int ret;
- int left;
-
- left = min((int)(data_len - con->in_msg_pos.data_pos),
- (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
- /* (page) data */
- BUG_ON(pages == NULL);
- p = kmap(pages[con->in_msg_pos.page]);
- ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
- left);
- if (ret > 0 && do_datacrc)
- con->in_data_crc =
- crc32c(con->in_data_crc,
- p + con->in_msg_pos.page_pos, ret);
- kunmap(pages[con->in_msg_pos.page]);
- if (ret <= 0)
- return ret;
- con->in_msg_pos.data_pos += ret;
- con->in_msg_pos.page_pos += ret;
- if (con->in_msg_pos.page_pos == PAGE_SIZE) {
- con->in_msg_pos.page_pos = 0;
- con->in_msg_pos.page++;
- }
-
- return ret;
-}
-#ifdef CONFIG_BLOCK
-static int read_partial_message_bio(struct ceph_connection *con,
- struct bio **bio_iter, int *bio_seg,
- unsigned int data_len, bool do_datacrc)
-{
- struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg);
- void *p;
- int ret, left;
+ BUG_ON(!msg);
+ if (list_empty(&msg->data))
+ return -EIO;
- left = min((int)(data_len - con->in_msg_pos.data_pos),
- (int)(bv->bv_len - con->in_msg_pos.page_pos));
+ if (do_datacrc)
+ crc = con->in_data_crc;
+ while (cursor->resid) {
+ page = ceph_msg_data_next(&msg->cursor, &page_offset, &length,
+ NULL);
+ ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
+ if (ret <= 0) {
+ if (do_datacrc)
+ con->in_data_crc = crc;
- p = kmap(bv->bv_page) + bv->bv_offset;
+ return ret;
+ }
- ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
- left);
- if (ret > 0 && do_datacrc)
- con->in_data_crc =
- crc32c(con->in_data_crc,
- p + con->in_msg_pos.page_pos, ret);
- kunmap(bv->bv_page);
- if (ret <= 0)
- return ret;
- con->in_msg_pos.data_pos += ret;
- con->in_msg_pos.page_pos += ret;
- if (con->in_msg_pos.page_pos == bv->bv_len) {
- con->in_msg_pos.page_pos = 0;
- iter_bio_next(bio_iter, bio_seg);
+ if (do_datacrc)
+ crc = ceph_crc32c_page(crc, page, page_offset, ret);
+ (void) ceph_msg_data_advance(&msg->cursor, (size_t)ret);
}
+ if (do_datacrc)
+ con->in_data_crc = crc;
- return ret;
+ return 1; /* must return > 0 to indicate success */
}
-#endif
/*
* read (part of) a message.
*/
+static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip);
+
static int read_partial_message(struct ceph_connection *con)
{
struct ceph_msg *m = con->in_msg;
@@ -1834,7 +2242,7 @@ static int read_partial_message(struct ceph_connection *con)
if (front_len > CEPH_MSG_MAX_FRONT_LEN)
return -EIO;
middle_len = le32_to_cpu(con->in_hdr.middle_len);
- if (middle_len > CEPH_MSG_MAX_DATA_LEN)
+ if (middle_len > CEPH_MSG_MAX_MIDDLE_LEN)
return -EIO;
data_len = le32_to_cpu(con->in_hdr.data_len);
if (data_len > CEPH_MSG_MAX_DATA_LEN)
@@ -1863,14 +2271,22 @@ static int read_partial_message(struct ceph_connection *con)
int skip = 0;
dout("got hdr type %d front %d data %d\n", con->in_hdr.type,
- con->in_hdr.front_len, con->in_hdr.data_len);
+ front_len, data_len);
ret = ceph_con_in_msg_alloc(con, &skip);
if (ret < 0)
return ret;
+
+ BUG_ON(!con->in_msg ^ skip);
+ if (con->in_msg && data_len > con->in_msg->data_length) {
+ pr_warning("%s skipping long message (%u > %zd)\n",
+ __func__, data_len, con->in_msg->data_length);
+ ceph_msg_put(con->in_msg);
+ con->in_msg = NULL;
+ skip = 1;
+ }
if (skip) {
/* skip this message */
dout("alloc_msg said skip message\n");
- BUG_ON(con->in_msg);
con->in_base_pos = -front_len - middle_len - data_len -
sizeof(m->footer);
con->in_tag = CEPH_MSGR_TAG_READY;
@@ -1885,17 +2301,10 @@ static int read_partial_message(struct ceph_connection *con)
if (m->middle)
m->middle->vec.iov_len = 0;
- con->in_msg_pos.page = 0;
- if (m->pages)
- con->in_msg_pos.page_pos = m->page_alignment;
- else
- con->in_msg_pos.page_pos = 0;
- con->in_msg_pos.data_pos = 0;
+ /* prepare for data payload, if any */
-#ifdef CONFIG_BLOCK
- if (m->bio)
- init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg);
-#endif
+ if (data_len)
+ prepare_message_data(con->in_msg, data_len);
}
/* front */
@@ -1914,24 +2323,10 @@ static int read_partial_message(struct ceph_connection *con)
}
/* (page) data */
- while (con->in_msg_pos.data_pos < data_len) {
- if (m->pages) {
- ret = read_partial_message_pages(con, m->pages,
- data_len, do_datacrc);
- if (ret <= 0)
- return ret;
-#ifdef CONFIG_BLOCK
- } else if (m->bio) {
- BUG_ON(!m->bio_iter);
- ret = read_partial_message_bio(con,
- &m->bio_iter, &m->bio_seg,
- data_len, do_datacrc);
- if (ret <= 0)
- return ret;
-#endif
- } else {
- BUG_ON(1);
- }
+ if (data_len) {
+ ret = read_partial_msg_data(con);
+ if (ret <= 0)
+ return ret;
}
/* footer */
@@ -2057,13 +2452,13 @@ more_kvec:
goto do_next;
}
- ret = write_partial_msg_pages(con);
+ ret = write_partial_message_data(con);
if (ret == 1)
goto more_kvec; /* we need to send the footer, too! */
if (ret == 0)
goto out;
if (ret < 0) {
- dout("try_write write_partial_msg_pages err %d\n",
+ dout("try_write write_partial_message_data err %d\n",
ret);
goto out;
}
@@ -2080,15 +2475,14 @@ do_next:
prepare_write_ack(con);
goto more;
}
- if (test_and_clear_bit(CON_FLAG_KEEPALIVE_PENDING,
- &con->flags)) {
+ if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) {
prepare_write_keepalive(con);
goto more;
}
}
/* Nothing to do! */
- clear_bit(CON_FLAG_WRITE_PENDING, &con->flags);
+ con_flag_clear(con, CON_FLAG_WRITE_PENDING);
dout("try_write nothing else to write.\n");
ret = 0;
out:
@@ -2216,7 +2610,12 @@ more:
prepare_read_tag(con);
goto more;
}
- if (con->in_tag == CEPH_MSGR_TAG_ACK) {
+ if (con->in_tag == CEPH_MSGR_TAG_ACK ||
+ con->in_tag == CEPH_MSGR_TAG_SEQ) {
+ /*
+ * the final handshake seq exchange is semantically
+ * equivalent to an ACK
+ */
ret = read_partial_ack(con);
if (ret <= 0)
goto out;
@@ -2268,7 +2667,7 @@ static void queue_con(struct ceph_connection *con)
static bool con_sock_closed(struct ceph_connection *con)
{
- if (!test_and_clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags))
+ if (!con_flag_test_and_clear(con, CON_FLAG_SOCK_CLOSED))
return false;
#define CASE(x) \
@@ -2295,6 +2694,41 @@ static bool con_sock_closed(struct ceph_connection *con)
return true;
}
+static bool con_backoff(struct ceph_connection *con)
+{
+ int ret;
+
+ if (!con_flag_test_and_clear(con, CON_FLAG_BACKOFF))
+ return false;
+
+ ret = queue_con_delay(con, round_jiffies_relative(con->delay));
+ if (ret) {
+ dout("%s: con %p FAILED to back off %lu\n", __func__,
+ con, con->delay);
+ BUG_ON(ret == -ENOENT);
+ con_flag_set(con, CON_FLAG_BACKOFF);
+ }
+
+ return true;
+}
+
+/* Finish fault handling; con->mutex must *not* be held here */
+
+static void con_fault_finish(struct ceph_connection *con)
+{
+ /*
+ * in case we faulted due to authentication, invalidate our
+ * current tickets so that we can get new ones.
+ */
+ if (con->auth_retry && con->ops->invalidate_authorizer) {
+ dout("calling invalidate_authorizer()\n");
+ con->ops->invalidate_authorizer(con);
+ }
+
+ if (con->ops->fault)
+ con->ops->fault(con);
+}
+
/*
* Do some work on a connection. Drop a connection ref when we're done.
*/
@@ -2302,73 +2736,68 @@ static void con_work(struct work_struct *work)
{
struct ceph_connection *con = container_of(work, struct ceph_connection,
work.work);
- int ret;
+ bool fault;
mutex_lock(&con->mutex);
-restart:
- if (con_sock_closed(con))
- goto fault;
+ while (true) {
+ int ret;
- if (test_and_clear_bit(CON_FLAG_BACKOFF, &con->flags)) {
- dout("con_work %p backing off\n", con);
- ret = queue_con_delay(con, round_jiffies_relative(con->delay));
- if (ret) {
- dout("con_work %p FAILED to back off %lu\n", con,
- con->delay);
- BUG_ON(ret == -ENOENT);
- set_bit(CON_FLAG_BACKOFF, &con->flags);
+ if ((fault = con_sock_closed(con))) {
+ dout("%s: con %p SOCK_CLOSED\n", __func__, con);
+ break;
+ }
+ if (con_backoff(con)) {
+ dout("%s: con %p BACKOFF\n", __func__, con);
+ break;
+ }
+ if (con->state == CON_STATE_STANDBY) {
+ dout("%s: con %p STANDBY\n", __func__, con);
+ break;
+ }
+ if (con->state == CON_STATE_CLOSED) {
+ dout("%s: con %p CLOSED\n", __func__, con);
+ BUG_ON(con->sock);
+ break;
+ }
+ if (con->state == CON_STATE_PREOPEN) {
+ dout("%s: con %p PREOPEN\n", __func__, con);
+ BUG_ON(con->sock);
}
- goto done;
- }
- if (con->state == CON_STATE_STANDBY) {
- dout("con_work %p STANDBY\n", con);
- goto done;
- }
- if (con->state == CON_STATE_CLOSED) {
- dout("con_work %p CLOSED\n", con);
- BUG_ON(con->sock);
- goto done;
- }
- if (con->state == CON_STATE_PREOPEN) {
- dout("con_work OPENING\n");
- BUG_ON(con->sock);
- }
+ ret = try_read(con);
+ if (ret < 0) {
+ if (ret == -EAGAIN)
+ continue;
+ con->error_msg = "socket error on read";
+ fault = true;
+ break;
+ }
- ret = try_read(con);
- if (ret == -EAGAIN)
- goto restart;
- if (ret < 0) {
- con->error_msg = "socket error on read";
- goto fault;
- }
+ ret = try_write(con);
+ if (ret < 0) {
+ if (ret == -EAGAIN)
+ continue;
+ con->error_msg = "socket error on write";
+ fault = true;
+ }
- ret = try_write(con);
- if (ret == -EAGAIN)
- goto restart;
- if (ret < 0) {
- con->error_msg = "socket error on write";
- goto fault;
+ break; /* If we make it to here, we're done */
}
-
-done:
+ if (fault)
+ con_fault(con);
mutex_unlock(&con->mutex);
-done_unlocked:
- con->ops->put(con);
- return;
-fault:
- ceph_fault(con); /* error/fault path */
- goto done_unlocked;
-}
+ if (fault)
+ con_fault_finish(con);
+ con->ops->put(con);
+}
/*
* Generic error/fault handler. A retry mechanism is used with
* exponential backoff
*/
-static void ceph_fault(struct ceph_connection *con)
- __releases(con->mutex)
+static void con_fault(struct ceph_connection *con)
{
pr_warning("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg);
@@ -2381,10 +2810,10 @@ static void ceph_fault(struct ceph_connection *con)
con_close_socket(con);
- if (test_bit(CON_FLAG_LOSSYTX, &con->flags)) {
+ if (con_flag_test(con, CON_FLAG_LOSSYTX)) {
dout("fault on LOSSYTX channel, marking CLOSED\n");
con->state = CON_STATE_CLOSED;
- goto out_unlock;
+ return;
}
if (con->in_msg) {
@@ -2401,9 +2830,9 @@ static void ceph_fault(struct ceph_connection *con)
/* If there are no messages queued or keepalive pending, place
* the connection in a STANDBY state */
if (list_empty(&con->out_queue) &&
- !test_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags)) {
+ !con_flag_test(con, CON_FLAG_KEEPALIVE_PENDING)) {
dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
- clear_bit(CON_FLAG_WRITE_PENDING, &con->flags);
+ con_flag_clear(con, CON_FLAG_WRITE_PENDING);
con->state = CON_STATE_STANDBY;
} else {
/* retry after a delay. */
@@ -2412,23 +2841,9 @@ static void ceph_fault(struct ceph_connection *con)
con->delay = BASE_DELAY_INTERVAL;
else if (con->delay < MAX_DELAY_INTERVAL)
con->delay *= 2;
- set_bit(CON_FLAG_BACKOFF, &con->flags);
+ con_flag_set(con, CON_FLAG_BACKOFF);
queue_con(con);
}
-
-out_unlock:
- mutex_unlock(&con->mutex);
- /*
- * in case we faulted due to authentication, invalidate our
- * current tickets so that we can get new ones.
- */
- if (con->auth_retry && con->ops->invalidate_authorizer) {
- dout("calling invalidate_authorizer()\n");
- con->ops->invalidate_authorizer(con);
- }
-
- if (con->ops->fault)
- con->ops->fault(con);
}
@@ -2469,8 +2884,8 @@ static void clear_standby(struct ceph_connection *con)
dout("clear_standby %p and ++connect_seq\n", con);
con->state = CON_STATE_PREOPEN;
con->connect_seq++;
- WARN_ON(test_bit(CON_FLAG_WRITE_PENDING, &con->flags));
- WARN_ON(test_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags));
+ WARN_ON(con_flag_test(con, CON_FLAG_WRITE_PENDING));
+ WARN_ON(con_flag_test(con, CON_FLAG_KEEPALIVE_PENDING));
}
}
@@ -2511,7 +2926,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
/* if there wasn't anything waiting to send before, queue
* new work */
- if (test_and_set_bit(CON_FLAG_WRITE_PENDING, &con->flags) == 0)
+ if (con_flag_test_and_set(con, CON_FLAG_WRITE_PENDING) == 0)
queue_con(con);
}
EXPORT_SYMBOL(ceph_con_send);
@@ -2600,12 +3015,94 @@ void ceph_con_keepalive(struct ceph_connection *con)
mutex_lock(&con->mutex);
clear_standby(con);
mutex_unlock(&con->mutex);
- if (test_and_set_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags) == 0 &&
- test_and_set_bit(CON_FLAG_WRITE_PENDING, &con->flags) == 0)
+ if (con_flag_test_and_set(con, CON_FLAG_KEEPALIVE_PENDING) == 0 &&
+ con_flag_test_and_set(con, CON_FLAG_WRITE_PENDING) == 0)
queue_con(con);
}
EXPORT_SYMBOL(ceph_con_keepalive);
+static struct ceph_msg_data *ceph_msg_data_create(enum ceph_msg_data_type type)
+{
+ struct ceph_msg_data *data;
+
+ if (WARN_ON(!ceph_msg_data_type_valid(type)))
+ return NULL;
+
+ data = kmem_cache_zalloc(ceph_msg_data_cache, GFP_NOFS);
+ if (data)
+ data->type = type;
+ INIT_LIST_HEAD(&data->links);
+
+ return data;
+}
+
+static void ceph_msg_data_destroy(struct ceph_msg_data *data)
+{
+ if (!data)
+ return;
+
+ WARN_ON(!list_empty(&data->links));
+ if (data->type == CEPH_MSG_DATA_PAGELIST) {
+ ceph_pagelist_release(data->pagelist);
+ kfree(data->pagelist);
+ }
+ kmem_cache_free(ceph_msg_data_cache, data);
+}
+
+void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
+ size_t length, size_t alignment)
+{
+ struct ceph_msg_data *data;
+
+ BUG_ON(!pages);
+ BUG_ON(!length);
+
+ data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES);
+ BUG_ON(!data);
+ data->pages = pages;
+ data->length = length;
+ data->alignment = alignment & ~PAGE_MASK;
+
+ list_add_tail(&data->links, &msg->data);
+ msg->data_length += length;
+}
+EXPORT_SYMBOL(ceph_msg_data_add_pages);
+
+void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
+ struct ceph_pagelist *pagelist)
+{
+ struct ceph_msg_data *data;
+
+ BUG_ON(!pagelist);
+ BUG_ON(!pagelist->length);
+
+ data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST);
+ BUG_ON(!data);
+ data->pagelist = pagelist;
+
+ list_add_tail(&data->links, &msg->data);
+ msg->data_length += pagelist->length;
+}
+EXPORT_SYMBOL(ceph_msg_data_add_pagelist);
+
+#ifdef CONFIG_BLOCK
+void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio,
+ size_t length)
+{
+ struct ceph_msg_data *data;
+
+ BUG_ON(!bio);
+
+ data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
+ BUG_ON(!data);
+ data->bio = bio;
+ data->bio_length = length;
+
+ list_add_tail(&data->links, &msg->data);
+ msg->data_length += length;
+}
+EXPORT_SYMBOL(ceph_msg_data_add_bio);
+#endif /* CONFIG_BLOCK */
/*
* construct a new message with given type, size
@@ -2616,47 +3113,20 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
{
struct ceph_msg *m;
- m = kmalloc(sizeof(*m), flags);
+ m = kmem_cache_zalloc(ceph_msg_cache, flags);
if (m == NULL)
goto out;
- kref_init(&m->kref);
- m->con = NULL;
- INIT_LIST_HEAD(&m->list_head);
-
- m->hdr.tid = 0;
m->hdr.type = cpu_to_le16(type);
m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT);
- m->hdr.version = 0;
m->hdr.front_len = cpu_to_le32(front_len);
- m->hdr.middle_len = 0;
- m->hdr.data_len = 0;
- m->hdr.data_off = 0;
- m->hdr.reserved = 0;
- m->footer.front_crc = 0;
- m->footer.middle_crc = 0;
- m->footer.data_crc = 0;
- m->footer.flags = 0;
- m->front_max = front_len;
- m->front_is_vmalloc = false;
- m->more_to_follow = false;
- m->ack_stamp = 0;
- m->pool = NULL;
- /* middle */
- m->middle = NULL;
-
- /* data */
- m->nr_pages = 0;
- m->page_alignment = 0;
- m->pages = NULL;
- m->pagelist = NULL;
- m->bio = NULL;
- m->bio_iter = NULL;
- m->bio_seg = 0;
- m->trail = NULL;
+ INIT_LIST_HEAD(&m->list_head);
+ kref_init(&m->kref);
+ INIT_LIST_HEAD(&m->data);
/* front */
+ m->front_max = front_len;
if (front_len) {
if (front_len > PAGE_CACHE_SIZE) {
m->front.iov_base = __vmalloc(front_len, flags,
@@ -2734,49 +3204,37 @@ static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip)
{
struct ceph_msg_header *hdr = &con->in_hdr;
- int type = le16_to_cpu(hdr->type);
- int front_len = le32_to_cpu(hdr->front_len);
int middle_len = le32_to_cpu(hdr->middle_len);
+ struct ceph_msg *msg;
int ret = 0;
BUG_ON(con->in_msg != NULL);
+ BUG_ON(!con->ops->alloc_msg);
- if (con->ops->alloc_msg) {
- struct ceph_msg *msg;
-
- mutex_unlock(&con->mutex);
- msg = con->ops->alloc_msg(con, hdr, skip);
- mutex_lock(&con->mutex);
- if (con->state != CON_STATE_OPEN) {
- if (msg)
- ceph_msg_put(msg);
- return -EAGAIN;
- }
- con->in_msg = msg;
- if (con->in_msg) {
- con->in_msg->con = con->ops->get(con);
- BUG_ON(con->in_msg->con == NULL);
- }
- if (*skip) {
- con->in_msg = NULL;
- return 0;
- }
- if (!con->in_msg) {
- con->error_msg =
- "error allocating memory for incoming message";
- return -ENOMEM;
- }
+ mutex_unlock(&con->mutex);
+ msg = con->ops->alloc_msg(con, hdr, skip);
+ mutex_lock(&con->mutex);
+ if (con->state != CON_STATE_OPEN) {
+ if (msg)
+ ceph_msg_put(msg);
+ return -EAGAIN;
}
- if (!con->in_msg) {
- con->in_msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
- if (!con->in_msg) {
- pr_err("unable to allocate msg type %d len %d\n",
- type, front_len);
- return -ENOMEM;
- }
+ if (msg) {
+ BUG_ON(*skip);
+ con->in_msg = msg;
con->in_msg->con = con->ops->get(con);
BUG_ON(con->in_msg->con == NULL);
- con->in_msg->page_alignment = le16_to_cpu(hdr->data_off);
+ } else {
+ /*
+ * Null message pointer means either we should skip
+ * this message or we couldn't allocate memory. The
+ * former is not an error.
+ */
+ if (*skip)
+ return 0;
+ con->error_msg = "error allocating memory for incoming message";
+
+ return -ENOMEM;
}
memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
@@ -2802,7 +3260,7 @@ void ceph_msg_kfree(struct ceph_msg *m)
vfree(m->front.iov_base);
else
kfree(m->front.iov_base);
- kfree(m);
+ kmem_cache_free(ceph_msg_cache, m);
}
/*
@@ -2811,6 +3269,9 @@ void ceph_msg_kfree(struct ceph_msg *m)
void ceph_msg_last_put(struct kref *kref)
{
struct ceph_msg *m = container_of(kref, struct ceph_msg, kref);
+ LIST_HEAD(data);
+ struct list_head *links;
+ struct list_head *next;
dout("ceph_msg_put last one on %p\n", m);
WARN_ON(!list_empty(&m->list_head));
@@ -2820,16 +3281,16 @@ void ceph_msg_last_put(struct kref *kref)
ceph_buffer_put(m->middle);
m->middle = NULL;
}
- m->nr_pages = 0;
- m->pages = NULL;
- if (m->pagelist) {
- ceph_pagelist_release(m->pagelist);
- kfree(m->pagelist);
- m->pagelist = NULL;
- }
+ list_splice_init(&m->data, &data);
+ list_for_each_safe(links, next, &data) {
+ struct ceph_msg_data *data;
- m->trail = NULL;
+ data = list_entry(links, struct ceph_msg_data, links);
+ list_del_init(links);
+ ceph_msg_data_destroy(data);
+ }
+ m->data_length = 0;
if (m->pool)
ceph_msgpool_put(m->pool, m);
@@ -2840,8 +3301,8 @@ EXPORT_SYMBOL(ceph_msg_last_put);
void ceph_msg_dump(struct ceph_msg *msg)
{
- pr_debug("msg_dump %p (front_max %d nr_pages %d)\n", msg,
- msg->front_max, msg->nr_pages);
+ pr_debug("msg_dump %p (front_max %d length %zd)\n", msg,
+ msg->front_max, msg->data_length);
print_hex_dump(KERN_DEBUG, "header: ",
DUMP_PREFIX_OFFSET, 16, 1,
&msg->hdr, sizeof(msg->hdr), true);
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index 812eb3b46c1f..1fe25cd29d0e 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -697,7 +697,7 @@ int ceph_monc_delete_snapid(struct ceph_mon_client *monc,
u32 pool, u64 snapid)
{
return do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP,
- pool, snapid, 0, 0);
+ pool, snapid, NULL, 0);
}
@@ -737,7 +737,7 @@ static void delayed_work(struct work_struct *work)
__validate_auth(monc);
- if (monc->auth->ops->is_authenticated(monc->auth))
+ if (ceph_auth_is_authenticated(monc->auth))
__send_subscribe(monc);
}
__schedule_delayed(monc);
@@ -892,8 +892,7 @@ static void handle_auth_reply(struct ceph_mon_client *monc,
mutex_lock(&monc->mutex);
had_debugfs_info = have_debugfs_info(monc);
- if (monc->auth->ops)
- was_auth = monc->auth->ops->is_authenticated(monc->auth);
+ was_auth = ceph_auth_is_authenticated(monc->auth);
monc->pending_auth = 0;
ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
msg->front.iov_len,
@@ -904,7 +903,7 @@ static void handle_auth_reply(struct ceph_mon_client *monc,
wake_up_all(&monc->client->auth_wq);
} else if (ret > 0) {
__send_prepared_auth_request(monc, ret);
- } else if (!was_auth && monc->auth->ops->is_authenticated(monc->auth)) {
+ } else if (!was_auth && ceph_auth_is_authenticated(monc->auth)) {
dout("authenticated, starting session\n");
monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index eb9a44478764..a3395fdfbd4f 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -1,3 +1,4 @@
+
#include <linux/ceph/ceph_debug.h>
#include <linux/module.h>
@@ -21,9 +22,11 @@
#define OSD_OP_FRONT_LEN 4096
#define OSD_OPREPLY_FRONT_LEN 512
+static struct kmem_cache *ceph_osd_request_cache;
+
static const struct ceph_connection_operations osd_con_ops;
-static void send_queued(struct ceph_osd_client *osdc);
+static void __send_queued(struct ceph_osd_client *osdc);
static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
static void __register_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req);
@@ -32,64 +35,6 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
static void __send_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req);
-static int op_needs_trail(int op)
-{
- switch (op) {
- case CEPH_OSD_OP_GETXATTR:
- case CEPH_OSD_OP_SETXATTR:
- case CEPH_OSD_OP_CMPXATTR:
- case CEPH_OSD_OP_CALL:
- case CEPH_OSD_OP_NOTIFY:
- return 1;
- default:
- return 0;
- }
-}
-
-static int op_has_extent(int op)
-{
- return (op == CEPH_OSD_OP_READ ||
- op == CEPH_OSD_OP_WRITE);
-}
-
-int ceph_calc_raw_layout(struct ceph_osd_client *osdc,
- struct ceph_file_layout *layout,
- u64 snapid,
- u64 off, u64 *plen, u64 *bno,
- struct ceph_osd_request *req,
- struct ceph_osd_req_op *op)
-{
- struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
- u64 orig_len = *plen;
- u64 objoff, objlen; /* extent in object */
- int r;
-
- reqhead->snapid = cpu_to_le64(snapid);
-
- /* object extent? */
- r = ceph_calc_file_object_mapping(layout, off, plen, bno,
- &objoff, &objlen);
- if (r < 0)
- return r;
- if (*plen < orig_len)
- dout(" skipping last %llu, final file extent %llu~%llu\n",
- orig_len - *plen, off, *plen);
-
- if (op_has_extent(op->op)) {
- op->extent.offset = objoff;
- op->extent.length = objlen;
- }
- req->r_num_pages = calc_pages_for(off, *plen);
- req->r_page_alignment = off & ~PAGE_MASK;
- if (op->op == CEPH_OSD_OP_WRITE)
- op->payload_len = *plen;
-
- dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
- *bno, objoff, objlen, req->r_num_pages);
- return 0;
-}
-EXPORT_SYMBOL(ceph_calc_raw_layout);
-
/*
* Implement client access to distributed object storage cluster.
*
@@ -115,25 +60,238 @@ EXPORT_SYMBOL(ceph_calc_raw_layout);
*
* fill osd op in request message.
*/
-static int calc_layout(struct ceph_osd_client *osdc,
- struct ceph_vino vino,
- struct ceph_file_layout *layout,
- u64 off, u64 *plen,
- struct ceph_osd_request *req,
- struct ceph_osd_req_op *op)
-{
- u64 bno;
+static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen,
+ u64 *objnum, u64 *objoff, u64 *objlen)
+{
+ u64 orig_len = *plen;
int r;
- r = ceph_calc_raw_layout(osdc, layout, vino.snap, off,
- plen, &bno, req, op);
+ /* object extent? */
+ r = ceph_calc_file_object_mapping(layout, off, orig_len, objnum,
+ objoff, objlen);
if (r < 0)
return r;
+ if (*objlen < orig_len) {
+ *plen = *objlen;
+ dout(" skipping last %llu, final file extent %llu~%llu\n",
+ orig_len - *plen, off, *plen);
+ }
- snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno);
- req->r_oid_len = strlen(req->r_oid);
+ dout("calc_layout objnum=%llx %llu~%llu\n", *objnum, *objoff, *objlen);
- return r;
+ return 0;
+}
+
+static void ceph_osd_data_init(struct ceph_osd_data *osd_data)
+{
+ memset(osd_data, 0, sizeof (*osd_data));
+ osd_data->type = CEPH_OSD_DATA_TYPE_NONE;
+}
+
+static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
+ struct page **pages, u64 length, u32 alignment,
+ bool pages_from_pool, bool own_pages)
+{
+ osd_data->type = CEPH_OSD_DATA_TYPE_PAGES;
+ osd_data->pages = pages;
+ osd_data->length = length;
+ osd_data->alignment = alignment;
+ osd_data->pages_from_pool = pages_from_pool;
+ osd_data->own_pages = own_pages;
+}
+
+static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data,
+ struct ceph_pagelist *pagelist)
+{
+ osd_data->type = CEPH_OSD_DATA_TYPE_PAGELIST;
+ osd_data->pagelist = pagelist;
+}
+
+#ifdef CONFIG_BLOCK
+static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data,
+ struct bio *bio, size_t bio_length)
+{
+ osd_data->type = CEPH_OSD_DATA_TYPE_BIO;
+ osd_data->bio = bio;
+ osd_data->bio_length = bio_length;
+}
+#endif /* CONFIG_BLOCK */
+
+#define osd_req_op_data(oreq, whch, typ, fld) \
+ ({ \
+ BUG_ON(whch >= (oreq)->r_num_ops); \
+ &(oreq)->r_ops[whch].typ.fld; \
+ })
+
+static struct ceph_osd_data *
+osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which)
+{
+ BUG_ON(which >= osd_req->r_num_ops);
+
+ return &osd_req->r_ops[which].raw_data_in;
+}
+
+struct ceph_osd_data *
+osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req,
+ unsigned int which)
+{
+ return osd_req_op_data(osd_req, which, extent, osd_data);
+}
+EXPORT_SYMBOL(osd_req_op_extent_osd_data);
+
+struct ceph_osd_data *
+osd_req_op_cls_response_data(struct ceph_osd_request *osd_req,
+ unsigned int which)
+{
+ return osd_req_op_data(osd_req, which, cls, response_data);
+}
+EXPORT_SYMBOL(osd_req_op_cls_response_data); /* ??? */
+
+void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req,
+ unsigned int which, struct page **pages,
+ u64 length, u32 alignment,
+ bool pages_from_pool, bool own_pages)
+{
+ struct ceph_osd_data *osd_data;
+
+ osd_data = osd_req_op_raw_data_in(osd_req, which);
+ ceph_osd_data_pages_init(osd_data, pages, length, alignment,
+ pages_from_pool, own_pages);
+}
+EXPORT_SYMBOL(osd_req_op_raw_data_in_pages);
+
+void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req,
+ unsigned int which, struct page **pages,
+ u64 length, u32 alignment,
+ bool pages_from_pool, bool own_pages)
+{
+ struct ceph_osd_data *osd_data;
+
+ osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
+ ceph_osd_data_pages_init(osd_data, pages, length, alignment,
+ pages_from_pool, own_pages);
+}
+EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages);
+
+void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *osd_req,
+ unsigned int which, struct ceph_pagelist *pagelist)
+{
+ struct ceph_osd_data *osd_data;
+
+ osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
+ ceph_osd_data_pagelist_init(osd_data, pagelist);
+}
+EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist);
+
+#ifdef CONFIG_BLOCK
+void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
+ unsigned int which, struct bio *bio, size_t bio_length)
+{
+ struct ceph_osd_data *osd_data;
+
+ osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
+ ceph_osd_data_bio_init(osd_data, bio, bio_length);
+}
+EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio);
+#endif /* CONFIG_BLOCK */
+
+static void osd_req_op_cls_request_info_pagelist(
+ struct ceph_osd_request *osd_req,
+ unsigned int which, struct ceph_pagelist *pagelist)
+{
+ struct ceph_osd_data *osd_data;
+
+ osd_data = osd_req_op_data(osd_req, which, cls, request_info);
+ ceph_osd_data_pagelist_init(osd_data, pagelist);
+}
+
+void osd_req_op_cls_request_data_pagelist(
+ struct ceph_osd_request *osd_req,
+ unsigned int which, struct ceph_pagelist *pagelist)
+{
+ struct ceph_osd_data *osd_data;
+
+ osd_data = osd_req_op_data(osd_req, which, cls, request_data);
+ ceph_osd_data_pagelist_init(osd_data, pagelist);
+}
+EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist);
+
+void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req,
+ unsigned int which, struct page **pages, u64 length,
+ u32 alignment, bool pages_from_pool, bool own_pages)
+{
+ struct ceph_osd_data *osd_data;
+
+ osd_data = osd_req_op_data(osd_req, which, cls, request_data);
+ ceph_osd_data_pages_init(osd_data, pages, length, alignment,
+ pages_from_pool, own_pages);
+}
+EXPORT_SYMBOL(osd_req_op_cls_request_data_pages);
+
+void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req,
+ unsigned int which, struct page **pages, u64 length,
+ u32 alignment, bool pages_from_pool, bool own_pages)
+{
+ struct ceph_osd_data *osd_data;
+
+ osd_data = osd_req_op_data(osd_req, which, cls, response_data);
+ ceph_osd_data_pages_init(osd_data, pages, length, alignment,
+ pages_from_pool, own_pages);
+}
+EXPORT_SYMBOL(osd_req_op_cls_response_data_pages);
+
+static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
+{
+ switch (osd_data->type) {
+ case CEPH_OSD_DATA_TYPE_NONE:
+ return 0;
+ case CEPH_OSD_DATA_TYPE_PAGES:
+ return osd_data->length;
+ case CEPH_OSD_DATA_TYPE_PAGELIST:
+ return (u64)osd_data->pagelist->length;
+#ifdef CONFIG_BLOCK
+ case CEPH_OSD_DATA_TYPE_BIO:
+ return (u64)osd_data->bio_length;
+#endif /* CONFIG_BLOCK */
+ default:
+ WARN(true, "unrecognized data type %d\n", (int)osd_data->type);
+ return 0;
+ }
+}
+
+static void ceph_osd_data_release(struct ceph_osd_data *osd_data)
+{
+ if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) {
+ int num_pages;
+
+ num_pages = calc_pages_for((u64)osd_data->alignment,
+ (u64)osd_data->length);
+ ceph_release_page_vector(osd_data->pages, num_pages);
+ }
+ ceph_osd_data_init(osd_data);
+}
+
+static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
+ unsigned int which)
+{
+ struct ceph_osd_req_op *op;
+
+ BUG_ON(which >= osd_req->r_num_ops);
+ op = &osd_req->r_ops[which];
+
+ switch (op->op) {
+ case CEPH_OSD_OP_READ:
+ case CEPH_OSD_OP_WRITE:
+ ceph_osd_data_release(&op->extent.osd_data);
+ break;
+ case CEPH_OSD_OP_CALL:
+ ceph_osd_data_release(&op->cls.request_info);
+ ceph_osd_data_release(&op->cls.request_data);
+ ceph_osd_data_release(&op->cls.response_data);
+ break;
+ default:
+ break;
+ }
}
/*
@@ -141,82 +299,64 @@ static int calc_layout(struct ceph_osd_client *osdc,
*/
void ceph_osdc_release_request(struct kref *kref)
{
- struct ceph_osd_request *req = container_of(kref,
- struct ceph_osd_request,
- r_kref);
+ struct ceph_osd_request *req;
+ unsigned int which;
+ req = container_of(kref, struct ceph_osd_request, r_kref);
if (req->r_request)
ceph_msg_put(req->r_request);
- if (req->r_con_filling_msg) {
- dout("%s revoking pages %p from con %p\n", __func__,
- req->r_pages, req->r_con_filling_msg);
+ if (req->r_reply) {
ceph_msg_revoke_incoming(req->r_reply);
- req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
- }
- if (req->r_reply)
ceph_msg_put(req->r_reply);
- if (req->r_own_pages)
- ceph_release_page_vector(req->r_pages,
- req->r_num_pages);
-#ifdef CONFIG_BLOCK
- if (req->r_bio)
- bio_put(req->r_bio);
-#endif
- ceph_put_snap_context(req->r_snapc);
- if (req->r_trail) {
- ceph_pagelist_release(req->r_trail);
- kfree(req->r_trail);
}
+
+ for (which = 0; which < req->r_num_ops; which++)
+ osd_req_op_data_release(req, which);
+
+ ceph_put_snap_context(req->r_snapc);
if (req->r_mempool)
mempool_free(req, req->r_osdc->req_mempool);
else
- kfree(req);
-}
-EXPORT_SYMBOL(ceph_osdc_release_request);
+ kmem_cache_free(ceph_osd_request_cache, req);
-static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
-{
- int i = 0;
-
- if (needs_trail)
- *needs_trail = 0;
- while (ops[i].op) {
- if (needs_trail && op_needs_trail(ops[i].op))
- *needs_trail = 1;
- i++;
- }
-
- return i;
}
+EXPORT_SYMBOL(ceph_osdc_release_request);
struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
- int flags,
struct ceph_snap_context *snapc,
- struct ceph_osd_req_op *ops,
+ unsigned int num_ops,
bool use_mempool,
- gfp_t gfp_flags,
- struct page **pages,
- struct bio *bio)
+ gfp_t gfp_flags)
{
struct ceph_osd_request *req;
struct ceph_msg *msg;
- int needs_trail;
- int num_op = get_num_ops(ops, &needs_trail);
- size_t msg_size = sizeof(struct ceph_osd_request_head);
+ size_t msg_size;
+
+ BUILD_BUG_ON(CEPH_OSD_MAX_OP > U16_MAX);
+ BUG_ON(num_ops > CEPH_OSD_MAX_OP);
- msg_size += num_op*sizeof(struct ceph_osd_op);
+ msg_size = 4 + 4 + 8 + 8 + 4+8;
+ msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
+ msg_size += 1 + 8 + 4 + 4; /* pg_t */
+ msg_size += 4 + MAX_OBJ_NAME_SIZE;
+ msg_size += 2 + num_ops*sizeof(struct ceph_osd_op);
+ msg_size += 8; /* snapid */
+ msg_size += 8; /* snap_seq */
+ msg_size += 8 * (snapc ? snapc->num_snaps : 0); /* snaps */
+ msg_size += 4;
if (use_mempool) {
req = mempool_alloc(osdc->req_mempool, gfp_flags);
memset(req, 0, sizeof(*req));
} else {
- req = kzalloc(sizeof(*req), gfp_flags);
+ req = kmem_cache_zalloc(ceph_osd_request_cache, gfp_flags);
}
if (req == NULL)
return NULL;
req->r_osdc = osdc;
req->r_mempool = use_mempool;
+ req->r_num_ops = num_ops;
kref_init(&req->r_kref);
init_completion(&req->r_completion);
@@ -228,10 +368,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
INIT_LIST_HEAD(&req->r_req_lru_item);
INIT_LIST_HEAD(&req->r_osd_item);
- req->r_flags = flags;
-
- WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
-
/* create reply message */
if (use_mempool)
msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
@@ -244,20 +380,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
}
req->r_reply = msg;
- /* allocate space for the trailing data */
- if (needs_trail) {
- req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
- if (!req->r_trail) {
- ceph_osdc_put_request(req);
- return NULL;
- }
- ceph_pagelist_init(req->r_trail);
- }
-
/* create request message; allow space for oid */
- msg_size += MAX_OBJ_NAME_SIZE;
- if (snapc)
- msg_size += sizeof(u64) * snapc->num_snaps;
if (use_mempool)
msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
else
@@ -270,82 +393,280 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
memset(msg->front.iov_base, 0, msg->front.iov_len);
req->r_request = msg;
- req->r_pages = pages;
-#ifdef CONFIG_BLOCK
- if (bio) {
- req->r_bio = bio;
- bio_get(req->r_bio);
- }
-#endif
return req;
}
EXPORT_SYMBOL(ceph_osdc_alloc_request);
-static void osd_req_encode_op(struct ceph_osd_request *req,
- struct ceph_osd_op *dst,
- struct ceph_osd_req_op *src)
+static bool osd_req_opcode_valid(u16 opcode)
{
- dst->op = cpu_to_le16(src->op);
+ switch (opcode) {
+ case CEPH_OSD_OP_READ:
+ case CEPH_OSD_OP_STAT:
+ case CEPH_OSD_OP_MAPEXT:
+ case CEPH_OSD_OP_MASKTRUNC:
+ case CEPH_OSD_OP_SPARSE_READ:
+ case CEPH_OSD_OP_NOTIFY:
+ case CEPH_OSD_OP_NOTIFY_ACK:
+ case CEPH_OSD_OP_ASSERT_VER:
+ case CEPH_OSD_OP_WRITE:
+ case CEPH_OSD_OP_WRITEFULL:
+ case CEPH_OSD_OP_TRUNCATE:
+ case CEPH_OSD_OP_ZERO:
+ case CEPH_OSD_OP_DELETE:
+ case CEPH_OSD_OP_APPEND:
+ case CEPH_OSD_OP_STARTSYNC:
+ case CEPH_OSD_OP_SETTRUNC:
+ case CEPH_OSD_OP_TRIMTRUNC:
+ case CEPH_OSD_OP_TMAPUP:
+ case CEPH_OSD_OP_TMAPPUT:
+ case CEPH_OSD_OP_TMAPGET:
+ case CEPH_OSD_OP_CREATE:
+ case CEPH_OSD_OP_ROLLBACK:
+ case CEPH_OSD_OP_WATCH:
+ case CEPH_OSD_OP_OMAPGETKEYS:
+ case CEPH_OSD_OP_OMAPGETVALS:
+ case CEPH_OSD_OP_OMAPGETHEADER:
+ case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
+ case CEPH_OSD_OP_OMAPSETVALS:
+ case CEPH_OSD_OP_OMAPSETHEADER:
+ case CEPH_OSD_OP_OMAPCLEAR:
+ case CEPH_OSD_OP_OMAPRMKEYS:
+ case CEPH_OSD_OP_OMAP_CMP:
+ case CEPH_OSD_OP_CLONERANGE:
+ case CEPH_OSD_OP_ASSERT_SRC_VERSION:
+ case CEPH_OSD_OP_SRC_CMPXATTR:
+ case CEPH_OSD_OP_GETXATTR:
+ case CEPH_OSD_OP_GETXATTRS:
+ case CEPH_OSD_OP_CMPXATTR:
+ case CEPH_OSD_OP_SETXATTR:
+ case CEPH_OSD_OP_SETXATTRS:
+ case CEPH_OSD_OP_RESETXATTRS:
+ case CEPH_OSD_OP_RMXATTR:
+ case CEPH_OSD_OP_PULL:
+ case CEPH_OSD_OP_PUSH:
+ case CEPH_OSD_OP_BALANCEREADS:
+ case CEPH_OSD_OP_UNBALANCEREADS:
+ case CEPH_OSD_OP_SCRUB:
+ case CEPH_OSD_OP_SCRUB_RESERVE:
+ case CEPH_OSD_OP_SCRUB_UNRESERVE:
+ case CEPH_OSD_OP_SCRUB_STOP:
+ case CEPH_OSD_OP_SCRUB_MAP:
+ case CEPH_OSD_OP_WRLOCK:
+ case CEPH_OSD_OP_WRUNLOCK:
+ case CEPH_OSD_OP_RDLOCK:
+ case CEPH_OSD_OP_RDUNLOCK:
+ case CEPH_OSD_OP_UPLOCK:
+ case CEPH_OSD_OP_DNLOCK:
+ case CEPH_OSD_OP_CALL:
+ case CEPH_OSD_OP_PGLS:
+ case CEPH_OSD_OP_PGLS_FILTER:
+ return true;
+ default:
+ return false;
+ }
+}
+
+/*
+ * This is an osd op init function for opcodes that have no data or
+ * other information associated with them. It also serves as a
+ * common init routine for all the other init functions, below.
+ */
+static struct ceph_osd_req_op *
+_osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
+ u16 opcode)
+{
+ struct ceph_osd_req_op *op;
+
+ BUG_ON(which >= osd_req->r_num_ops);
+ BUG_ON(!osd_req_opcode_valid(opcode));
+
+ op = &osd_req->r_ops[which];
+ memset(op, 0, sizeof (*op));
+ op->op = opcode;
+
+ return op;
+}
+
+void osd_req_op_init(struct ceph_osd_request *osd_req,
+ unsigned int which, u16 opcode)
+{
+ (void)_osd_req_op_init(osd_req, which, opcode);
+}
+EXPORT_SYMBOL(osd_req_op_init);
+
+void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
+ unsigned int which, u16 opcode,
+ u64 offset, u64 length,
+ u64 truncate_size, u32 truncate_seq)
+{
+ struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
+ size_t payload_len = 0;
+
+ BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE);
+
+ op->extent.offset = offset;
+ op->extent.length = length;
+ op->extent.truncate_size = truncate_size;
+ op->extent.truncate_seq = truncate_seq;
+ if (opcode == CEPH_OSD_OP_WRITE)
+ payload_len += length;
+
+ op->payload_len = payload_len;
+}
+EXPORT_SYMBOL(osd_req_op_extent_init);
+
+void osd_req_op_extent_update(struct ceph_osd_request *osd_req,
+ unsigned int which, u64 length)
+{
+ struct ceph_osd_req_op *op;
+ u64 previous;
+
+ BUG_ON(which >= osd_req->r_num_ops);
+ op = &osd_req->r_ops[which];
+ previous = op->extent.length;
+
+ if (length == previous)
+ return; /* Nothing to do */
+ BUG_ON(length > previous);
+
+ op->extent.length = length;
+ op->payload_len -= previous - length;
+}
+EXPORT_SYMBOL(osd_req_op_extent_update);
+
+void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
+ u16 opcode, const char *class, const char *method)
+{
+ struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
+ struct ceph_pagelist *pagelist;
+ size_t payload_len = 0;
+ size_t size;
+
+ BUG_ON(opcode != CEPH_OSD_OP_CALL);
+
+ pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
+ BUG_ON(!pagelist);
+ ceph_pagelist_init(pagelist);
+
+ op->cls.class_name = class;
+ size = strlen(class);
+ BUG_ON(size > (size_t) U8_MAX);
+ op->cls.class_len = size;
+ ceph_pagelist_append(pagelist, class, size);
+ payload_len += size;
+
+ op->cls.method_name = method;
+ size = strlen(method);
+ BUG_ON(size > (size_t) U8_MAX);
+ op->cls.method_len = size;
+ ceph_pagelist_append(pagelist, method, size);
+ payload_len += size;
+
+ osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist);
+
+ op->cls.argc = 0; /* currently unused */
+
+ op->payload_len = payload_len;
+}
+EXPORT_SYMBOL(osd_req_op_cls_init);
+
+void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
+ unsigned int which, u16 opcode,
+ u64 cookie, u64 version, int flag)
+{
+ struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
+
+ BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH);
+
+ op->watch.cookie = cookie;
+ op->watch.ver = version;
+ if (opcode == CEPH_OSD_OP_WATCH && flag)
+ op->watch.flag = (u8)1;
+}
+EXPORT_SYMBOL(osd_req_op_watch_init);
+
+static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
+ struct ceph_osd_data *osd_data)
+{
+ u64 length = ceph_osd_data_length(osd_data);
+
+ if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
+ BUG_ON(length > (u64) SIZE_MAX);
+ if (length)
+ ceph_msg_data_add_pages(msg, osd_data->pages,
+ length, osd_data->alignment);
+ } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
+ BUG_ON(!length);
+ ceph_msg_data_add_pagelist(msg, osd_data->pagelist);
+#ifdef CONFIG_BLOCK
+ } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
+ ceph_msg_data_add_bio(msg, osd_data->bio, length);
+#endif
+ } else {
+ BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
+ }
+}
+
+static u64 osd_req_encode_op(struct ceph_osd_request *req,
+ struct ceph_osd_op *dst, unsigned int which)
+{
+ struct ceph_osd_req_op *src;
+ struct ceph_osd_data *osd_data;
+ u64 request_data_len = 0;
+ u64 data_length;
+
+ BUG_ON(which >= req->r_num_ops);
+ src = &req->r_ops[which];
+ if (WARN_ON(!osd_req_opcode_valid(src->op))) {
+ pr_err("unrecognized osd opcode %d\n", src->op);
+
+ return 0;
+ }
switch (src->op) {
+ case CEPH_OSD_OP_STAT:
+ osd_data = &src->raw_data_in;
+ ceph_osdc_msg_data_add(req->r_reply, osd_data);
+ break;
case CEPH_OSD_OP_READ:
case CEPH_OSD_OP_WRITE:
- dst->extent.offset =
- cpu_to_le64(src->extent.offset);
- dst->extent.length =
- cpu_to_le64(src->extent.length);
+ if (src->op == CEPH_OSD_OP_WRITE)
+ request_data_len = src->extent.length;
+ dst->extent.offset = cpu_to_le64(src->extent.offset);
+ dst->extent.length = cpu_to_le64(src->extent.length);
dst->extent.truncate_size =
cpu_to_le64(src->extent.truncate_size);
dst->extent.truncate_seq =
cpu_to_le32(src->extent.truncate_seq);
- break;
-
- case CEPH_OSD_OP_GETXATTR:
- case CEPH_OSD_OP_SETXATTR:
- case CEPH_OSD_OP_CMPXATTR:
- BUG_ON(!req->r_trail);
-
- dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
- dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
- dst->xattr.cmp_op = src->xattr.cmp_op;
- dst->xattr.cmp_mode = src->xattr.cmp_mode;
- ceph_pagelist_append(req->r_trail, src->xattr.name,
- src->xattr.name_len);
- ceph_pagelist_append(req->r_trail, src->xattr.val,
- src->xattr.value_len);
+ osd_data = &src->extent.osd_data;
+ if (src->op == CEPH_OSD_OP_WRITE)
+ ceph_osdc_msg_data_add(req->r_request, osd_data);
+ else
+ ceph_osdc_msg_data_add(req->r_reply, osd_data);
break;
case CEPH_OSD_OP_CALL:
- BUG_ON(!req->r_trail);
-
dst->cls.class_len = src->cls.class_len;
dst->cls.method_len = src->cls.method_len;
- dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
-
- ceph_pagelist_append(req->r_trail, src->cls.class_name,
- src->cls.class_len);
- ceph_pagelist_append(req->r_trail, src->cls.method_name,
- src->cls.method_len);
- ceph_pagelist_append(req->r_trail, src->cls.indata,
- src->cls.indata_len);
- break;
- case CEPH_OSD_OP_ROLLBACK:
- dst->snap.snapid = cpu_to_le64(src->snap.snapid);
+ osd_data = &src->cls.request_info;
+ ceph_osdc_msg_data_add(req->r_request, osd_data);
+ BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGELIST);
+ request_data_len = osd_data->pagelist->length;
+
+ osd_data = &src->cls.request_data;
+ data_length = ceph_osd_data_length(osd_data);
+ if (data_length) {
+ BUG_ON(osd_data->type == CEPH_OSD_DATA_TYPE_NONE);
+ dst->cls.indata_len = cpu_to_le32(data_length);
+ ceph_osdc_msg_data_add(req->r_request, osd_data);
+ src->payload_len += data_length;
+ request_data_len += data_length;
+ }
+ osd_data = &src->cls.response_data;
+ ceph_osdc_msg_data_add(req->r_reply, osd_data);
break;
case CEPH_OSD_OP_STARTSYNC:
break;
- case CEPH_OSD_OP_NOTIFY:
- {
- __le32 prot_ver = cpu_to_le32(src->watch.prot_ver);
- __le32 timeout = cpu_to_le32(src->watch.timeout);
-
- BUG_ON(!req->r_trail);
-
- ceph_pagelist_append(req->r_trail,
- &prot_ver, sizeof(prot_ver));
- ceph_pagelist_append(req->r_trail,
- &timeout, sizeof(timeout));
- }
case CEPH_OSD_OP_NOTIFY_ACK:
case CEPH_OSD_OP_WATCH:
dst->watch.cookie = cpu_to_le64(src->watch.cookie);
@@ -353,90 +674,17 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
dst->watch.flag = src->watch.flag;
break;
default:
- pr_err("unrecognized osd opcode %d\n", dst->op);
+ pr_err("unsupported osd opcode %s\n",
+ ceph_osd_op_name(src->op));
WARN_ON(1);
- break;
- }
- dst->payload_len = cpu_to_le32(src->payload_len);
-}
-
-/*
- * build new request AND message
- *
- */
-void ceph_osdc_build_request(struct ceph_osd_request *req,
- u64 off, u64 *plen,
- struct ceph_osd_req_op *src_ops,
- struct ceph_snap_context *snapc,
- struct timespec *mtime,
- const char *oid,
- int oid_len)
-{
- struct ceph_msg *msg = req->r_request;
- struct ceph_osd_request_head *head;
- struct ceph_osd_req_op *src_op;
- struct ceph_osd_op *op;
- void *p;
- int num_op = get_num_ops(src_ops, NULL);
- size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
- int flags = req->r_flags;
- u64 data_len = 0;
- int i;
-
- head = msg->front.iov_base;
- op = (void *)(head + 1);
- p = (void *)(op + num_op);
- req->r_snapc = ceph_get_snap_context(snapc);
-
- head->client_inc = cpu_to_le32(1); /* always, for now. */
- head->flags = cpu_to_le32(flags);
- if (flags & CEPH_OSD_FLAG_WRITE)
- ceph_encode_timespec(&head->mtime, mtime);
- head->num_ops = cpu_to_le16(num_op);
-
-
- /* fill in oid */
- head->object_len = cpu_to_le32(oid_len);
- memcpy(p, oid, oid_len);
- p += oid_len;
-
- src_op = src_ops;
- while (src_op->op) {
- osd_req_encode_op(req, op, src_op);
- src_op++;
- op++;
- }
-
- if (req->r_trail)
- data_len += req->r_trail->length;
-
- if (snapc) {
- head->snap_seq = cpu_to_le64(snapc->seq);
- head->num_snaps = cpu_to_le32(snapc->num_snaps);
- for (i = 0; i < snapc->num_snaps; i++) {
- put_unaligned_le64(snapc->snaps[i], p);
- p += sizeof(u64);
- }
- }
-
- if (flags & CEPH_OSD_FLAG_WRITE) {
- req->r_request->hdr.data_off = cpu_to_le16(off);
- req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len);
- } else if (data_len) {
- req->r_request->hdr.data_off = 0;
- req->r_request->hdr.data_len = cpu_to_le32(data_len);
+ return 0;
}
+ dst->op = cpu_to_le16(src->op);
+ dst->payload_len = cpu_to_le32(src->payload_len);
- req->r_request->page_alignment = req->r_page_alignment;
-
- BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
- msg_size = p - msg->front.iov_base;
- msg->front.iov_len = msg_size;
- msg->hdr.front_len = cpu_to_le32(msg_size);
- return;
+ return request_data_len;
}
-EXPORT_SYMBOL(ceph_osdc_build_request);
/*
* build new request AND message, calculate layout, and adjust file
@@ -452,54 +700,63 @@ EXPORT_SYMBOL(ceph_osdc_build_request);
struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
struct ceph_file_layout *layout,
struct ceph_vino vino,
- u64 off, u64 *plen,
+ u64 off, u64 *plen, int num_ops,
int opcode, int flags,
struct ceph_snap_context *snapc,
- int do_sync,
u32 truncate_seq,
u64 truncate_size,
- struct timespec *mtime,
- bool use_mempool, int num_reply,
- int page_align)
+ bool use_mempool)
{
- struct ceph_osd_req_op ops[3];
struct ceph_osd_request *req;
+ u64 objnum = 0;
+ u64 objoff = 0;
+ u64 objlen = 0;
+ u32 object_size;
+ u64 object_base;
int r;
- ops[0].op = opcode;
- ops[0].extent.truncate_seq = truncate_seq;
- ops[0].extent.truncate_size = truncate_size;
- ops[0].payload_len = 0;
-
- if (do_sync) {
- ops[1].op = CEPH_OSD_OP_STARTSYNC;
- ops[1].payload_len = 0;
- ops[2].op = 0;
- } else
- ops[1].op = 0;
-
- req = ceph_osdc_alloc_request(osdc, flags,
- snapc, ops,
- use_mempool,
- GFP_NOFS, NULL, NULL);
+ BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE);
+
+ req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
+ GFP_NOFS);
if (!req)
return ERR_PTR(-ENOMEM);
+ req->r_flags = flags;
+
/* calculate max write size */
- r = calc_layout(osdc, vino, layout, off, plen, req, ops);
- if (r < 0)
+ r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen);
+ if (r < 0) {
+ ceph_osdc_put_request(req);
return ERR_PTR(r);
- req->r_file_layout = *layout; /* keep a copy */
+ }
+
+ object_size = le32_to_cpu(layout->fl_object_size);
+ object_base = off - objoff;
+ if (truncate_size <= object_base) {
+ truncate_size = 0;
+ } else {
+ truncate_size -= object_base;
+ if (truncate_size > object_size)
+ truncate_size = object_size;
+ }
- /* in case it differs from natural (file) alignment that
- calc_layout filled in for us */
- req->r_num_pages = calc_pages_for(page_align, *plen);
- req->r_page_alignment = page_align;
+ osd_req_op_extent_init(req, 0, opcode, objoff, objlen,
+ truncate_size, truncate_seq);
- ceph_osdc_build_request(req, off, plen, ops,
- snapc,
- mtime,
- req->r_oid, req->r_oid_len);
+ /*
+ * A second op in the ops array means the caller wants to
+ * also issue a include a 'startsync' command so that the
+ * osd will flush data quickly.
+ */
+ if (num_ops > 1)
+ osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC);
+
+ req->r_file_layout = *layout; /* keep a copy */
+
+ snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx",
+ vino.ino, objnum);
+ req->r_oid_len = strlen(req->r_oid);
return req;
}
@@ -577,21 +834,46 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
struct ceph_osd *osd)
{
struct ceph_osd_request *req, *nreq;
+ LIST_HEAD(resend);
int err;
dout("__kick_osd_requests osd%d\n", osd->o_osd);
err = __reset_osd(osdc, osd);
if (err)
return;
-
+ /*
+ * Build up a list of requests to resend by traversing the
+ * osd's list of requests. Requests for a given object are
+ * sent in tid order, and that is also the order they're
+ * kept on this list. Therefore all requests that are in
+ * flight will be found first, followed by all requests that
+ * have not yet been sent. And to resend requests while
+ * preserving this order we will want to put any sent
+ * requests back on the front of the osd client's unsent
+ * list.
+ *
+ * So we build a separate ordered list of already-sent
+ * requests for the affected osd and splice it onto the
+ * front of the osd client's unsent list. Once we've seen a
+ * request that has not yet been sent we're done. Those
+ * requests are already sitting right where they belong.
+ */
list_for_each_entry(req, &osd->o_requests, r_osd_item) {
- list_move(&req->r_req_lru_item, &osdc->req_unsent);
- dout("requeued %p tid %llu osd%d\n", req, req->r_tid,
+ if (!req->r_sent)
+ break;
+ list_move_tail(&req->r_req_lru_item, &resend);
+ dout("requeueing %p tid %llu osd%d\n", req, req->r_tid,
osd->o_osd);
if (!req->r_linger)
req->r_flags |= CEPH_OSD_FLAG_RETRY;
}
+ list_splice(&resend, &osdc->req_unsent);
+ /*
+ * Linger requests are re-registered before sending, which
+ * sets up a new tid for each. We add them to the unsent
+ * list at the end to keep things in tid order.
+ */
list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
r_linger_osd) {
/*
@@ -600,8 +882,8 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
*/
BUG_ON(!list_empty(&req->r_req_lru_item));
__register_request(osdc, req);
- list_add(&req->r_req_lru_item, &osdc->req_unsent);
- list_add(&req->r_osd_item, &req->r_osd->o_requests);
+ list_add_tail(&req->r_req_lru_item, &osdc->req_unsent);
+ list_add_tail(&req->r_osd_item, &req->r_osd->o_requests);
__unregister_linger_request(osdc, req);
dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid,
osd->o_osd);
@@ -623,8 +905,8 @@ static void osd_reset(struct ceph_connection *con)
down_read(&osdc->map_sem);
mutex_lock(&osdc->request_mutex);
__kick_osd_requests(osdc, osd);
+ __send_queued(osdc);
mutex_unlock(&osdc->request_mutex);
- send_queued(osdc);
up_read(&osdc->map_sem);
}
@@ -673,8 +955,7 @@ static void put_osd(struct ceph_osd *osd)
if (atomic_dec_and_test(&osd->o_ref) && osd->o_auth.authorizer) {
struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
- if (ac->ops && ac->ops->destroy_authorizer)
- ac->ops->destroy_authorizer(ac, osd->o_auth.authorizer);
+ ceph_auth_destroy_authorizer(ac, osd->o_auth.authorizer);
kfree(osd);
}
}
@@ -739,31 +1020,35 @@ static void remove_old_osds(struct ceph_osd_client *osdc)
*/
static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
{
- struct ceph_osd_request *req;
- int ret = 0;
+ struct ceph_entity_addr *peer_addr;
dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
if (list_empty(&osd->o_requests) &&
list_empty(&osd->o_linger_requests)) {
__remove_osd(osdc, osd);
- ret = -ENODEV;
- } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
- &osd->o_con.peer_addr,
- sizeof(osd->o_con.peer_addr)) == 0 &&
- !ceph_con_opened(&osd->o_con)) {
+
+ return -ENODEV;
+ }
+
+ peer_addr = &osdc->osdmap->osd_addr[osd->o_osd];
+ if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
+ !ceph_con_opened(&osd->o_con)) {
+ struct ceph_osd_request *req;
+
dout(" osd addr hasn't changed and connection never opened,"
" letting msgr retry");
/* touch each r_stamp for handle_timeout()'s benfit */
list_for_each_entry(req, &osd->o_requests, r_osd_item)
req->r_stamp = jiffies;
- ret = -EAGAIN;
- } else {
- ceph_con_close(&osd->o_con);
- ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
- &osdc->osdmap->osd_addr[osd->o_osd]);
- osd->o_incarnation++;
+
+ return -EAGAIN;
}
- return ret;
+
+ ceph_con_close(&osd->o_con);
+ ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr);
+ osd->o_incarnation++;
+
+ return 0;
}
static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
@@ -835,14 +1120,6 @@ static void __register_request(struct ceph_osd_client *osdc,
}
}
-static void register_request(struct ceph_osd_client *osdc,
- struct ceph_osd_request *req)
-{
- mutex_lock(&osdc->request_mutex);
- __register_request(osdc, req);
- mutex_unlock(&osdc->request_mutex);
-}
-
/*
* called under osdc->request_mutex
*/
@@ -961,20 +1238,18 @@ EXPORT_SYMBOL(ceph_osdc_set_request_linger);
static int __map_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req, int force_resend)
{
- struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
struct ceph_pg pgid;
int acting[CEPH_PG_MAX_SIZE];
int o = -1, num = 0;
int err;
dout("map_request %p tid %lld\n", req, req->r_tid);
- err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
- &req->r_file_layout, osdc->osdmap);
+ err = ceph_calc_ceph_pg(&pgid, req->r_oid, osdc->osdmap,
+ ceph_file_layout_pg_pool(req->r_file_layout));
if (err) {
list_move(&req->r_req_lru_item, &osdc->req_notarget);
return err;
}
- pgid = reqhead->layout.ol_pgid;
req->r_pgid = pgid;
err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting);
@@ -991,8 +1266,8 @@ static int __map_request(struct ceph_osd_client *osdc,
(req->r_osd == NULL && o == -1))
return 0; /* no change */
- dout("map_request tid %llu pgid %d.%x osd%d (was osd%d)\n",
- req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
+ dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n",
+ req->r_tid, pgid.pool, pgid.seed, o,
req->r_osd ? req->r_osd->o_osd : -1);
/* record full pg acting set */
@@ -1024,10 +1299,10 @@ static int __map_request(struct ceph_osd_client *osdc,
if (req->r_osd) {
__remove_osd_from_lru(req->r_osd);
- list_add(&req->r_osd_item, &req->r_osd->o_requests);
- list_move(&req->r_req_lru_item, &osdc->req_unsent);
+ list_add_tail(&req->r_osd_item, &req->r_osd->o_requests);
+ list_move_tail(&req->r_req_lru_item, &osdc->req_unsent);
} else {
- list_move(&req->r_req_lru_item, &osdc->req_notarget);
+ list_move_tail(&req->r_req_lru_item, &osdc->req_notarget);
}
err = 1; /* osd or pg changed */
@@ -1041,37 +1316,47 @@ out:
static void __send_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req)
{
- struct ceph_osd_request_head *reqhead;
-
- dout("send_request %p tid %llu to osd%d flags %d\n",
- req, req->r_tid, req->r_osd->o_osd, req->r_flags);
+ void *p;
- reqhead = req->r_request->front.iov_base;
- reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
- reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */
- reqhead->reassert_version = req->r_reassert_version;
+ dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n",
+ req, req->r_tid, req->r_osd->o_osd, req->r_flags,
+ (unsigned long long)req->r_pgid.pool, req->r_pgid.seed);
+
+ /* fill in message content that changes each time we send it */
+ put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch);
+ put_unaligned_le32(req->r_flags, req->r_request_flags);
+ put_unaligned_le64(req->r_pgid.pool, req->r_request_pool);
+ p = req->r_request_pgid;
+ ceph_encode_64(&p, req->r_pgid.pool);
+ ceph_encode_32(&p, req->r_pgid.seed);
+ put_unaligned_le64(1, req->r_request_attempts); /* FIXME */
+ memcpy(req->r_request_reassert_version, &req->r_reassert_version,
+ sizeof(req->r_reassert_version));
req->r_stamp = jiffies;
list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
ceph_msg_get(req->r_request); /* send consumes a ref */
- ceph_con_send(&req->r_osd->o_con, req->r_request);
+
+ /* Mark the request unsafe if this is the first timet's being sent. */
+
+ if (!req->r_sent && req->r_unsafe_callback)
+ req->r_unsafe_callback(req, true);
req->r_sent = req->r_osd->o_incarnation;
+
+ ceph_con_send(&req->r_osd->o_con, req->r_request);
}
/*
* Send any requests in the queue (req_unsent).
*/
-static void send_queued(struct ceph_osd_client *osdc)
+static void __send_queued(struct ceph_osd_client *osdc)
{
struct ceph_osd_request *req, *tmp;
- dout("send_queued\n");
- mutex_lock(&osdc->request_mutex);
- list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) {
+ dout("__send_queued\n");
+ list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item)
__send_request(osdc, req);
- }
- mutex_unlock(&osdc->request_mutex);
}
/*
@@ -1123,8 +1408,8 @@ static void handle_timeout(struct work_struct *work)
}
__schedule_osd_timeout(osdc);
+ __send_queued(osdc);
mutex_unlock(&osdc->request_mutex);
- send_queued(osdc);
up_read(&osdc->map_sem);
}
@@ -1147,8 +1432,8 @@ static void handle_osds_timeout(struct work_struct *work)
static void complete_request(struct ceph_osd_request *req)
{
- if (req->r_safe_callback)
- req->r_safe_callback(req, NULL);
+ if (req->r_unsafe_callback)
+ req->r_unsafe_callback(req, false);
complete_all(&req->r_safe_completion); /* fsync waiter */
}
@@ -1159,55 +1444,98 @@ static void complete_request(struct ceph_osd_request *req)
static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
struct ceph_connection *con)
{
- struct ceph_osd_reply_head *rhead = msg->front.iov_base;
+ void *p, *end;
struct ceph_osd_request *req;
u64 tid;
- int numops, object_len, flags;
+ int object_len;
+ unsigned int numops;
+ int payload_len, flags;
s32 result;
+ s32 retry_attempt;
+ struct ceph_pg pg;
+ int err;
+ u32 reassert_epoch;
+ u64 reassert_version;
+ u32 osdmap_epoch;
+ int already_completed;
+ u32 bytes;
+ unsigned int i;
tid = le64_to_cpu(msg->hdr.tid);
- if (msg->front.iov_len < sizeof(*rhead))
- goto bad;
- numops = le32_to_cpu(rhead->num_ops);
- object_len = le32_to_cpu(rhead->object_len);
- result = le32_to_cpu(rhead->result);
- if (msg->front.iov_len != sizeof(*rhead) + object_len +
- numops * sizeof(struct ceph_osd_op))
+ dout("handle_reply %p tid %llu\n", msg, tid);
+
+ p = msg->front.iov_base;
+ end = p + msg->front.iov_len;
+
+ ceph_decode_need(&p, end, 4, bad);
+ object_len = ceph_decode_32(&p);
+ ceph_decode_need(&p, end, object_len, bad);
+ p += object_len;
+
+ err = ceph_decode_pgid(&p, end, &pg);
+ if (err)
goto bad;
- dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result);
+
+ ceph_decode_need(&p, end, 8 + 4 + 4 + 8 + 4, bad);
+ flags = ceph_decode_64(&p);
+ result = ceph_decode_32(&p);
+ reassert_epoch = ceph_decode_32(&p);
+ reassert_version = ceph_decode_64(&p);
+ osdmap_epoch = ceph_decode_32(&p);
+
/* lookup */
mutex_lock(&osdc->request_mutex);
req = __lookup_request(osdc, tid);
if (req == NULL) {
dout("handle_reply tid %llu dne\n", tid);
- mutex_unlock(&osdc->request_mutex);
- return;
+ goto bad_mutex;
}
ceph_osdc_get_request(req);
- flags = le32_to_cpu(rhead->flags);
- /*
- * if this connection filled our message, drop our reference now, to
- * avoid a (safe but slower) revoke later.
- */
- if (req->r_con_filling_msg == con && req->r_reply == msg) {
- dout(" dropping con_filling_msg ref %p\n", con);
- req->r_con_filling_msg = NULL;
- con->ops->put(con);
- }
+ dout("handle_reply %p tid %llu req %p result %d\n", msg, tid,
+ req, result);
+
+ ceph_decode_need(&p, end, 4, bad);
+ numops = ceph_decode_32(&p);
+ if (numops > CEPH_OSD_MAX_OP)
+ goto bad_put;
+ if (numops != req->r_num_ops)
+ goto bad_put;
+ payload_len = 0;
+ ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad);
+ for (i = 0; i < numops; i++) {
+ struct ceph_osd_op *op = p;
+ int len;
+
+ len = le32_to_cpu(op->payload_len);
+ req->r_reply_op_len[i] = len;
+ dout(" op %d has %d bytes\n", i, len);
+ payload_len += len;
+ p += sizeof(*op);
+ }
+ bytes = le32_to_cpu(msg->hdr.data_len);
+ if (payload_len != bytes) {
+ pr_warning("sum of op payload lens %d != data_len %d",
+ payload_len, bytes);
+ goto bad_put;
+ }
+
+ ceph_decode_need(&p, end, 4 + numops * 4, bad);
+ retry_attempt = ceph_decode_32(&p);
+ for (i = 0; i < numops; i++)
+ req->r_reply_op_result[i] = ceph_decode_32(&p);
if (!req->r_got_reply) {
- unsigned int bytes;
- req->r_result = le32_to_cpu(rhead->result);
- bytes = le32_to_cpu(msg->hdr.data_len);
+ req->r_result = result;
dout("handle_reply result %d bytes %d\n", req->r_result,
bytes);
if (req->r_result == 0)
req->r_result = bytes;
/* in case this is a write and we need to replay, */
- req->r_reassert_version = rhead->reassert_version;
+ req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch);
+ req->r_reassert_version.version = cpu_to_le64(reassert_version);
req->r_got_reply = 1;
} else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
@@ -1227,7 +1555,11 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
((flags & CEPH_OSD_FLAG_WRITE) == 0))
__unregister_request(osdc, req);
+ already_completed = req->r_completed;
+ req->r_completed = 1;
mutex_unlock(&osdc->request_mutex);
+ if (already_completed)
+ goto done;
if (req->r_callback)
req->r_callback(req, msg);
@@ -1242,10 +1574,13 @@ done:
ceph_osdc_put_request(req);
return;
+bad_put:
+ ceph_osdc_put_request(req);
+bad_mutex:
+ mutex_unlock(&osdc->request_mutex);
bad:
- pr_err("corrupt osd_op_reply got %d %d expected %d\n",
- (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len),
- (int)sizeof(*rhead));
+ pr_err("corrupt osd_op_reply got %d %d\n",
+ (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len));
ceph_msg_dump(msg);
}
@@ -1462,7 +1797,9 @@ done:
if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL))
ceph_monc_request_next_osdmap(&osdc->client->monc);
- send_queued(osdc);
+ mutex_lock(&osdc->request_mutex);
+ __send_queued(osdc);
+ mutex_unlock(&osdc->request_mutex);
up_read(&osdc->map_sem);
wake_up_all(&osdc->client->auth_wq);
return;
@@ -1556,8 +1893,7 @@ static void __remove_event(struct ceph_osd_event *event)
int ceph_osdc_create_event(struct ceph_osd_client *osdc,
void (*event_cb)(u64, u64, u8, void *),
- int one_shot, void *data,
- struct ceph_osd_event **pevent)
+ void *data, struct ceph_osd_event **pevent)
{
struct ceph_osd_event *event;
@@ -1567,14 +1903,13 @@ int ceph_osdc_create_event(struct ceph_osd_client *osdc,
dout("create_event %p\n", event);
event->cb = event_cb;
- event->one_shot = one_shot;
+ event->one_shot = 0;
event->data = data;
event->osdc = osdc;
INIT_LIST_HEAD(&event->osd_node);
RB_CLEAR_NODE(&event->node);
kref_init(&event->kref); /* one ref for us */
kref_get(&event->kref); /* one ref for the caller */
- init_completion(&event->completion);
spin_lock(&osdc->event_lock);
event->cookie = ++osdc->event_count;
@@ -1610,7 +1945,6 @@ static void do_event_work(struct work_struct *work)
dout("do_event_work completing %p\n", event);
event->cb(ver, notify_id, opcode, event->data);
- complete(&event->completion);
dout("do_event_work completed %p\n", event);
ceph_osdc_put_event(event);
kfree(event_work);
@@ -1620,7 +1954,8 @@ static void do_event_work(struct work_struct *work)
/*
* Process osd watch notifications
*/
-void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg)
+static void handle_watch_notify(struct ceph_osd_client *osdc,
+ struct ceph_msg *msg)
{
void *p, *end;
u8 proto_ver;
@@ -1641,9 +1976,8 @@ void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg)
spin_lock(&osdc->event_lock);
event = __find_event(osdc, cookie);
if (event) {
+ BUG_ON(event->one_shot);
get_event(event);
- if (event->one_shot)
- __remove_event(event);
}
spin_unlock(&osdc->event_lock);
dout("handle_watch_notify cookie %lld ver %lld event %p\n",
@@ -1668,7 +2002,6 @@ void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg)
return;
done_err:
- complete(&event->completion);
ceph_osdc_put_event(event);
return;
@@ -1677,20 +2010,103 @@ bad:
return;
}
-int ceph_osdc_wait_event(struct ceph_osd_event *event, unsigned long timeout)
+/*
+ * build new request AND message
+ *
+ */
+void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off,
+ struct ceph_snap_context *snapc, u64 snap_id,
+ struct timespec *mtime)
{
- int err;
+ struct ceph_msg *msg = req->r_request;
+ void *p;
+ size_t msg_size;
+ int flags = req->r_flags;
+ u64 data_len;
+ unsigned int i;
- dout("wait_event %p\n", event);
- err = wait_for_completion_interruptible_timeout(&event->completion,
- timeout * HZ);
- ceph_osdc_put_event(event);
- if (err > 0)
- err = 0;
- dout("wait_event %p returns %d\n", event, err);
- return err;
+ req->r_snapid = snap_id;
+ req->r_snapc = ceph_get_snap_context(snapc);
+
+ /* encode request */
+ msg->hdr.version = cpu_to_le16(4);
+
+ p = msg->front.iov_base;
+ ceph_encode_32(&p, 1); /* client_inc is always 1 */
+ req->r_request_osdmap_epoch = p;
+ p += 4;
+ req->r_request_flags = p;
+ p += 4;
+ if (req->r_flags & CEPH_OSD_FLAG_WRITE)
+ ceph_encode_timespec(p, mtime);
+ p += sizeof(struct ceph_timespec);
+ req->r_request_reassert_version = p;
+ p += sizeof(struct ceph_eversion); /* will get filled in */
+
+ /* oloc */
+ ceph_encode_8(&p, 4);
+ ceph_encode_8(&p, 4);
+ ceph_encode_32(&p, 8 + 4 + 4);
+ req->r_request_pool = p;
+ p += 8;
+ ceph_encode_32(&p, -1); /* preferred */
+ ceph_encode_32(&p, 0); /* key len */
+
+ ceph_encode_8(&p, 1);
+ req->r_request_pgid = p;
+ p += 8 + 4;
+ ceph_encode_32(&p, -1); /* preferred */
+
+ /* oid */
+ ceph_encode_32(&p, req->r_oid_len);
+ memcpy(p, req->r_oid, req->r_oid_len);
+ dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len);
+ p += req->r_oid_len;
+
+ /* ops--can imply data */
+ ceph_encode_16(&p, (u16)req->r_num_ops);
+ data_len = 0;
+ for (i = 0; i < req->r_num_ops; i++) {
+ data_len += osd_req_encode_op(req, p, i);
+ p += sizeof(struct ceph_osd_op);
+ }
+
+ /* snaps */
+ ceph_encode_64(&p, req->r_snapid);
+ ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0);
+ ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0);
+ if (req->r_snapc) {
+ for (i = 0; i < snapc->num_snaps; i++) {
+ ceph_encode_64(&p, req->r_snapc->snaps[i]);
+ }
+ }
+
+ req->r_request_attempts = p;
+ p += 4;
+
+ /* data */
+ if (flags & CEPH_OSD_FLAG_WRITE) {
+ u16 data_off;
+
+ /*
+ * The header "data_off" is a hint to the receiver
+ * allowing it to align received data into its
+ * buffers such that there's no need to re-copy
+ * it before writing it to disk (direct I/O).
+ */
+ data_off = (u16) (off & 0xffff);
+ req->r_request->hdr.data_off = cpu_to_le16(data_off);
+ }
+ req->r_request->hdr.data_len = cpu_to_le32(data_len);
+
+ BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
+ msg_size = p - msg->front.iov_base;
+ msg->front.iov_len = msg_size;
+ msg->hdr.front_len = cpu_to_le32(msg_size);
+
+ dout("build_request msg_size was %d\n", (int)msg_size);
}
-EXPORT_SYMBOL(ceph_osdc_wait_event);
+EXPORT_SYMBOL(ceph_osdc_build_request);
/*
* Register request, send initial attempt.
@@ -1701,41 +2117,26 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
{
int rc = 0;
- req->r_request->pages = req->r_pages;
- req->r_request->nr_pages = req->r_num_pages;
-#ifdef CONFIG_BLOCK
- req->r_request->bio = req->r_bio;
-#endif
- req->r_request->trail = req->r_trail;
-
- register_request(osdc, req);
-
down_read(&osdc->map_sem);
mutex_lock(&osdc->request_mutex);
- /*
- * a racing kick_requests() may have sent the message for us
- * while we dropped request_mutex above, so only send now if
- * the request still han't been touched yet.
- */
- if (req->r_sent == 0) {
- rc = __map_request(osdc, req, 0);
- if (rc < 0) {
- if (nofail) {
- dout("osdc_start_request failed map, "
- " will retry %lld\n", req->r_tid);
- rc = 0;
- }
- goto out_unlock;
- }
- if (req->r_osd == NULL) {
- dout("send_request %p no up osds in pg\n", req);
- ceph_monc_request_next_osdmap(&osdc->client->monc);
- } else {
- __send_request(osdc, req);
+ __register_request(osdc, req);
+ WARN_ON(req->r_sent);
+ rc = __map_request(osdc, req, 0);
+ if (rc < 0) {
+ if (nofail) {
+ dout("osdc_start_request failed map, "
+ " will retry %lld\n", req->r_tid);
+ rc = 0;
}
- rc = 0;
+ goto out_unlock;
}
-
+ if (req->r_osd == NULL) {
+ dout("send_request %p no up osds in pg\n", req);
+ ceph_monc_request_next_osdmap(&osdc->client->monc);
+ } else {
+ __send_queued(osdc);
+ }
+ rc = 0;
out_unlock:
mutex_unlock(&osdc->request_mutex);
up_read(&osdc->map_sem);
@@ -1865,7 +2266,6 @@ out_mempool:
out:
return err;
}
-EXPORT_SYMBOL(ceph_osdc_init);
void ceph_osdc_stop(struct ceph_osd_client *osdc)
{
@@ -1882,7 +2282,6 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
ceph_msgpool_destroy(&osdc->msgpool_op);
ceph_msgpool_destroy(&osdc->msgpool_op_reply);
}
-EXPORT_SYMBOL(ceph_osdc_stop);
/*
* Read some contiguous pages. If we cross a stripe boundary, shorten
@@ -1899,18 +2298,22 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
vino.snap, off, *plen);
- req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
+ req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 1,
CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
- NULL, 0, truncate_seq, truncate_size, NULL,
- false, 1, page_align);
+ NULL, truncate_seq, truncate_size,
+ false);
if (IS_ERR(req))
return PTR_ERR(req);
/* it may be a short read due to an object boundary */
- req->r_pages = pages;
- dout("readpages final extent is %llu~%llu (%d pages align %d)\n",
- off, *plen, req->r_num_pages, page_align);
+ osd_req_op_extent_osd_data_pages(req, 0,
+ pages, *plen, page_align, false, false);
+
+ dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n",
+ off, *plen, *plen, page_align);
+
+ ceph_osdc_build_request(req, off, NULL, vino.snap, NULL);
rc = ceph_osdc_start_request(osdc, req, false);
if (!rc)
@@ -1931,30 +2334,29 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
u64 off, u64 len,
u32 truncate_seq, u64 truncate_size,
struct timespec *mtime,
- struct page **pages, int num_pages,
- int flags, int do_sync, bool nofail)
+ struct page **pages, int num_pages)
{
struct ceph_osd_request *req;
int rc = 0;
int page_align = off & ~PAGE_MASK;
- BUG_ON(vino.snap != CEPH_NOSNAP);
- req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
+ BUG_ON(vino.snap != CEPH_NOSNAP); /* snapshots aren't writeable */
+ req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 1,
CEPH_OSD_OP_WRITE,
- flags | CEPH_OSD_FLAG_ONDISK |
- CEPH_OSD_FLAG_WRITE,
- snapc, do_sync,
- truncate_seq, truncate_size, mtime,
- nofail, 1, page_align);
+ CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
+ snapc, truncate_seq, truncate_size,
+ true);
if (IS_ERR(req))
return PTR_ERR(req);
/* it may be a short write due to an object boundary */
- req->r_pages = pages;
- dout("writepages %llu~%llu (%d pages)\n", off, len,
- req->r_num_pages);
+ osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
+ false, false);
+ dout("writepages %llu~%llu (%llu bytes)\n", off, len, len);
- rc = ceph_osdc_start_request(osdc, req, nofail);
+ ceph_osdc_build_request(req, off, snapc, CEPH_NOSNAP, mtime);
+
+ rc = ceph_osdc_start_request(osdc, req, true);
if (!rc)
rc = ceph_osdc_wait_request(osdc, req);
@@ -1966,6 +2368,26 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
}
EXPORT_SYMBOL(ceph_osdc_writepages);
+int ceph_osdc_setup(void)
+{
+ BUG_ON(ceph_osd_request_cache);
+ ceph_osd_request_cache = kmem_cache_create("ceph_osd_request",
+ sizeof (struct ceph_osd_request),
+ __alignof__(struct ceph_osd_request),
+ 0, NULL);
+
+ return ceph_osd_request_cache ? 0 : -ENOMEM;
+}
+EXPORT_SYMBOL(ceph_osdc_setup);
+
+void ceph_osdc_cleanup(void)
+{
+ BUG_ON(!ceph_osd_request_cache);
+ kmem_cache_destroy(ceph_osd_request_cache);
+ ceph_osd_request_cache = NULL;
+}
+EXPORT_SYMBOL(ceph_osdc_cleanup);
+
/*
* handle incoming message
*/
@@ -2025,13 +2447,10 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
goto out;
}
- if (req->r_con_filling_msg) {
+ if (req->r_reply->con)
dout("%s revoking msg %p from old con %p\n", __func__,
- req->r_reply, req->r_con_filling_msg);
- ceph_msg_revoke_incoming(req->r_reply);
- req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
- req->r_con_filling_msg = NULL;
- }
+ req->r_reply, req->r_reply->con);
+ ceph_msg_revoke_incoming(req->r_reply);
if (front > req->r_reply->front.iov_len) {
pr_warning("get_reply front %d > preallocated %d\n",
@@ -2045,26 +2464,29 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
m = ceph_msg_get(req->r_reply);
if (data_len > 0) {
- int want = calc_pages_for(req->r_page_alignment, data_len);
-
- if (unlikely(req->r_num_pages < want)) {
- pr_warning("tid %lld reply has %d bytes %d pages, we"
- " had only %d pages ready\n", tid, data_len,
- want, req->r_num_pages);
- *skip = 1;
- ceph_msg_put(m);
- m = NULL;
- goto out;
+ struct ceph_osd_data *osd_data;
+
+ /*
+ * XXX This is assuming there is only one op containing
+ * XXX page data. Probably OK for reads, but this
+ * XXX ought to be done more generally.
+ */
+ osd_data = osd_req_op_extent_osd_data(req, 0);
+ if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
+ if (osd_data->pages &&
+ unlikely(osd_data->length < data_len)) {
+
+ pr_warning("tid %lld reply has %d bytes "
+ "we had only %llu bytes ready\n",
+ tid, data_len, osd_data->length);
+ *skip = 1;
+ ceph_msg_put(m);
+ m = NULL;
+ goto out;
+ }
}
- m->pages = req->r_pages;
- m->nr_pages = req->r_num_pages;
- m->page_alignment = req->r_page_alignment;
-#ifdef CONFIG_BLOCK
- m->bio = req->r_bio;
-#endif
}
*skip = 0;
- req->r_con_filling_msg = con->ops->get(con);
dout("get_reply tid %lld %p\n", tid, m);
out:
@@ -2129,13 +2551,17 @@ static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
struct ceph_auth_handshake *auth = &o->o_auth;
if (force_new && auth->authorizer) {
- if (ac->ops && ac->ops->destroy_authorizer)
- ac->ops->destroy_authorizer(ac, auth->authorizer);
+ ceph_auth_destroy_authorizer(ac, auth->authorizer);
auth->authorizer = NULL;
}
- if (!auth->authorizer && ac->ops && ac->ops->create_authorizer) {
- int ret = ac->ops->create_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
- auth);
+ if (!auth->authorizer) {
+ int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
+ auth);
+ if (ret)
+ return ERR_PTR(ret);
+ } else {
+ int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
+ auth);
if (ret)
return ERR_PTR(ret);
}
@@ -2151,11 +2577,7 @@ static int verify_authorizer_reply(struct ceph_connection *con, int len)
struct ceph_osd_client *osdc = o->o_osdc;
struct ceph_auth_client *ac = osdc->client->monc.auth;
- /*
- * XXX If ac->ops or ac->ops->verify_authorizer_reply is null,
- * XXX which do we do: succeed or fail?
- */
- return ac->ops->verify_authorizer_reply(ac, o->o_auth.authorizer, len);
+ return ceph_auth_verify_authorizer_reply(ac, o->o_auth.authorizer, len);
}
static int invalidate_authorizer(struct ceph_connection *con)
@@ -2164,9 +2586,7 @@ static int invalidate_authorizer(struct ceph_connection *con)
struct ceph_osd_client *osdc = o->o_osdc;
struct ceph_auth_client *ac = osdc->client->monc.auth;
- if (ac->ops && ac->ops->invalidate_authorizer)
- ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
-
+ ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
return ceph_monc_validate_auth(&osdc->client->monc);
}
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index de73214b5d26..603ddd92db19 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -13,26 +13,18 @@
char *ceph_osdmap_state_str(char *str, int len, int state)
{
- int flag = 0;
-
if (!len)
- goto done;
-
- *str = '\0';
- if (state) {
- if (state & CEPH_OSD_EXISTS) {
- snprintf(str, len, "exists");
- flag = 1;
- }
- if (state & CEPH_OSD_UP) {
- snprintf(str, len, "%s%s%s", str, (flag ? ", " : ""),
- "up");
- flag = 1;
- }
- } else {
+ return str;
+
+ if ((state & CEPH_OSD_EXISTS) && (state & CEPH_OSD_UP))
+ snprintf(str, len, "exists, up");
+ else if (state & CEPH_OSD_EXISTS)
+ snprintf(str, len, "exists");
+ else if (state & CEPH_OSD_UP)
+ snprintf(str, len, "up");
+ else
snprintf(str, len, "doesn't exist");
- }
-done:
+
return str;
}
@@ -53,13 +45,8 @@ static int calc_bits_of(unsigned int t)
*/
static void calc_pg_masks(struct ceph_pg_pool_info *pi)
{
- pi->pg_num_mask = (1 << calc_bits_of(le32_to_cpu(pi->v.pg_num)-1)) - 1;
- pi->pgp_num_mask =
- (1 << calc_bits_of(le32_to_cpu(pi->v.pgp_num)-1)) - 1;
- pi->lpg_num_mask =
- (1 << calc_bits_of(le32_to_cpu(pi->v.lpg_num)-1)) - 1;
- pi->lpgp_num_mask =
- (1 << calc_bits_of(le32_to_cpu(pi->v.lpgp_num)-1)) - 1;
+ pi->pg_num_mask = (1 << calc_bits_of(pi->pg_num-1)) - 1;
+ pi->pgp_num_mask = (1 << calc_bits_of(pi->pgp_num-1)) - 1;
}
/*
@@ -170,6 +157,7 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
c->choose_local_tries = 2;
c->choose_local_fallback_tries = 5;
c->choose_total_tries = 19;
+ c->chooseleaf_descend_once = 0;
ceph_decode_need(p, end, 4*sizeof(u32), bad);
magic = ceph_decode_32(p);
@@ -336,6 +324,11 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
dout("crush decode tunable choose_total_tries = %d",
c->choose_total_tries);
+ ceph_decode_need(p, end, sizeof(u32), done);
+ c->chooseleaf_descend_once = ceph_decode_32(p);
+ dout("crush decode tunable chooseleaf_descend_once = %d",
+ c->chooseleaf_descend_once);
+
done:
dout("crush_decode success\n");
return c;
@@ -354,12 +347,13 @@ bad:
*/
static int pgid_cmp(struct ceph_pg l, struct ceph_pg r)
{
- u64 a = *(u64 *)&l;
- u64 b = *(u64 *)&r;
-
- if (a < b)
+ if (l.pool < r.pool)
return -1;
- if (a > b)
+ if (l.pool > r.pool)
+ return 1;
+ if (l.seed < r.seed)
+ return -1;
+ if (l.seed > r.seed)
return 1;
return 0;
}
@@ -405,8 +399,8 @@ static struct ceph_pg_mapping *__lookup_pg_mapping(struct rb_root *root,
} else if (c > 0) {
n = n->rb_right;
} else {
- dout("__lookup_pg_mapping %llx got %p\n",
- *(u64 *)&pgid, pg);
+ dout("__lookup_pg_mapping %lld.%x got %p\n",
+ pgid.pool, pgid.seed, pg);
return pg;
}
}
@@ -418,12 +412,13 @@ static int __remove_pg_mapping(struct rb_root *root, struct ceph_pg pgid)
struct ceph_pg_mapping *pg = __lookup_pg_mapping(root, pgid);
if (pg) {
- dout("__remove_pg_mapping %llx %p\n", *(u64 *)&pgid, pg);
+ dout("__remove_pg_mapping %lld.%x %p\n", pgid.pool, pgid.seed,
+ pg);
rb_erase(&pg->node, root);
kfree(pg);
return 0;
}
- dout("__remove_pg_mapping %llx dne\n", *(u64 *)&pgid);
+ dout("__remove_pg_mapping %lld.%x dne\n", pgid.pool, pgid.seed);
return -ENOENT;
}
@@ -452,7 +447,7 @@ static int __insert_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *new)
return 0;
}
-static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, int id)
+static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, u64 id)
{
struct ceph_pg_pool_info *pi;
struct rb_node *n = root->rb_node;
@@ -508,24 +503,57 @@ static void __remove_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *pi)
static int __decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi)
{
- unsigned int n, m;
+ u8 ev, cv;
+ unsigned len, num;
+ void *pool_end;
+
+ ceph_decode_need(p, end, 2 + 4, bad);
+ ev = ceph_decode_8(p); /* encoding version */
+ cv = ceph_decode_8(p); /* compat version */
+ if (ev < 5) {
+ pr_warning("got v %d < 5 cv %d of ceph_pg_pool\n", ev, cv);
+ return -EINVAL;
+ }
+ if (cv > 7) {
+ pr_warning("got v %d cv %d > 7 of ceph_pg_pool\n", ev, cv);
+ return -EINVAL;
+ }
+ len = ceph_decode_32(p);
+ ceph_decode_need(p, end, len, bad);
+ pool_end = *p + len;
- ceph_decode_copy(p, &pi->v, sizeof(pi->v));
- calc_pg_masks(pi);
+ pi->type = ceph_decode_8(p);
+ pi->size = ceph_decode_8(p);
+ pi->crush_ruleset = ceph_decode_8(p);
+ pi->object_hash = ceph_decode_8(p);
+
+ pi->pg_num = ceph_decode_32(p);
+ pi->pgp_num = ceph_decode_32(p);
+
+ *p += 4 + 4; /* skip lpg* */
+ *p += 4; /* skip last_change */
+ *p += 8 + 4; /* skip snap_seq, snap_epoch */
- /* num_snaps * snap_info_t */
- n = le32_to_cpu(pi->v.num_snaps);
- while (n--) {
- ceph_decode_need(p, end, sizeof(u64) + 1 + sizeof(u64) +
- sizeof(struct ceph_timespec), bad);
- *p += sizeof(u64) + /* key */
- 1 + sizeof(u64) + /* u8, snapid */
- sizeof(struct ceph_timespec);
- m = ceph_decode_32(p); /* snap name */
- *p += m;
+ /* skip snaps */
+ num = ceph_decode_32(p);
+ while (num--) {
+ *p += 8; /* snapid key */
+ *p += 1 + 1; /* versions */
+ len = ceph_decode_32(p);
+ *p += len;
}
- *p += le32_to_cpu(pi->v.num_removed_snap_intervals) * sizeof(u64) * 2;
+ /* skip removed snaps */
+ num = ceph_decode_32(p);
+ *p += num * (8 + 8);
+
+ *p += 8; /* skip auid */
+ pi->flags = ceph_decode_64(p);
+
+ /* ignore the rest */
+
+ *p = pool_end;
+ calc_pg_masks(pi);
return 0;
bad:
@@ -535,14 +563,15 @@ bad:
static int __decode_pool_names(void **p, void *end, struct ceph_osdmap *map)
{
struct ceph_pg_pool_info *pi;
- u32 num, len, pool;
+ u32 num, len;
+ u64 pool;
ceph_decode_32_safe(p, end, num, bad);
dout(" %d pool names\n", num);
while (num--) {
- ceph_decode_32_safe(p, end, pool, bad);
+ ceph_decode_64_safe(p, end, pool, bad);
ceph_decode_32_safe(p, end, len, bad);
- dout(" pool %d len %d\n", pool, len);
+ dout(" pool %llu len %d\n", pool, len);
ceph_decode_need(p, end, len, bad);
pi = __lookup_pg_pool(&map->pg_pools, pool);
if (pi) {
@@ -633,7 +662,6 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
struct ceph_osdmap *map;
u16 version;
u32 len, max, i;
- u8 ev;
int err = -EINVAL;
void *start = *p;
struct ceph_pg_pool_info *pi;
@@ -646,9 +674,12 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
map->pg_temp = RB_ROOT;
ceph_decode_16_safe(p, end, version, bad);
- if (version > CEPH_OSDMAP_VERSION) {
- pr_warning("got unknown v %d > %d of osdmap\n", version,
- CEPH_OSDMAP_VERSION);
+ if (version > 6) {
+ pr_warning("got unknown v %d > 6 of osdmap\n", version);
+ goto bad;
+ }
+ if (version < 6) {
+ pr_warning("got old v %d < 6 of osdmap\n", version);
goto bad;
}
@@ -660,20 +691,12 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
ceph_decode_32_safe(p, end, max, bad);
while (max--) {
- ceph_decode_need(p, end, 4 + 1 + sizeof(pi->v), bad);
+ ceph_decode_need(p, end, 8 + 2, bad);
err = -ENOMEM;
pi = kzalloc(sizeof(*pi), GFP_NOFS);
if (!pi)
goto bad;
- pi->id = ceph_decode_32(p);
- err = -EINVAL;
- ev = ceph_decode_8(p); /* encoding version */
- if (ev > CEPH_PG_POOL_VERSION) {
- pr_warning("got unknown v %d > %d of ceph_pg_pool\n",
- ev, CEPH_PG_POOL_VERSION);
- kfree(pi);
- goto bad;
- }
+ pi->id = ceph_decode_64(p);
err = __decode_pool(p, end, pi);
if (err < 0) {
kfree(pi);
@@ -682,12 +705,10 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
__insert_pg_pool(&map->pg_pools, pi);
}
- if (version >= 5) {
- err = __decode_pool_names(p, end, map);
- if (err < 0) {
- dout("fail to decode pool names");
- goto bad;
- }
+ err = __decode_pool_names(p, end, map);
+ if (err < 0) {
+ dout("fail to decode pool names");
+ goto bad;
}
ceph_decode_32_safe(p, end, map->pool_max, bad);
@@ -726,8 +747,10 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
struct ceph_pg pgid;
struct ceph_pg_mapping *pg;
- ceph_decode_need(p, end, sizeof(u32) + sizeof(u64), bad);
- ceph_decode_copy(p, &pgid, sizeof(pgid));
+ err = ceph_decode_pgid(p, end, &pgid);
+ if (err)
+ goto bad;
+ ceph_decode_need(p, end, sizeof(u32), bad);
n = ceph_decode_32(p);
err = -EINVAL;
if (n > (UINT_MAX - sizeof(*pg)) / sizeof(u32))
@@ -745,7 +768,8 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
err = __insert_pg_mapping(pg, &map->pg_temp);
if (err)
goto bad;
- dout(" added pg_temp %llx len %d\n", *(u64 *)&pgid, len);
+ dout(" added pg_temp %lld.%x len %d\n", pgid.pool, pgid.seed,
+ len);
}
/* crush */
@@ -784,16 +808,17 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
struct ceph_fsid fsid;
u32 epoch = 0;
struct ceph_timespec modified;
- u32 len, pool;
- __s32 new_pool_max, new_flags, max;
+ s32 len;
+ u64 pool;
+ __s64 new_pool_max;
+ __s32 new_flags, max;
void *start = *p;
int err = -EINVAL;
u16 version;
ceph_decode_16_safe(p, end, version, bad);
- if (version > CEPH_OSDMAP_INC_VERSION) {
- pr_warning("got unknown v %d > %d of inc osdmap\n", version,
- CEPH_OSDMAP_INC_VERSION);
+ if (version != 6) {
+ pr_warning("got unknown v %d != 6 of inc osdmap\n", version);
goto bad;
}
@@ -803,7 +828,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
epoch = ceph_decode_32(p);
BUG_ON(epoch != map->epoch+1);
ceph_decode_copy(p, &modified, sizeof(modified));
- new_pool_max = ceph_decode_32(p);
+ new_pool_max = ceph_decode_64(p);
new_flags = ceph_decode_32(p);
/* full map? */
@@ -853,18 +878,9 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
/* new_pool */
ceph_decode_32_safe(p, end, len, bad);
while (len--) {
- __u8 ev;
struct ceph_pg_pool_info *pi;
- ceph_decode_32_safe(p, end, pool, bad);
- ceph_decode_need(p, end, 1 + sizeof(pi->v), bad);
- ev = ceph_decode_8(p); /* encoding version */
- if (ev > CEPH_PG_POOL_VERSION) {
- pr_warning("got unknown v %d > %d of ceph_pg_pool\n",
- ev, CEPH_PG_POOL_VERSION);
- err = -EINVAL;
- goto bad;
- }
+ ceph_decode_64_safe(p, end, pool, bad);
pi = __lookup_pg_pool(&map->pg_pools, pool);
if (!pi) {
pi = kzalloc(sizeof(*pi), GFP_NOFS);
@@ -890,7 +906,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
while (len--) {
struct ceph_pg_pool_info *pi;
- ceph_decode_32_safe(p, end, pool, bad);
+ ceph_decode_64_safe(p, end, pool, bad);
pi = __lookup_pg_pool(&map->pg_pools, pool);
if (pi)
__remove_pg_pool(&map->pg_pools, pi);
@@ -948,10 +964,12 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
int j;
struct ceph_pg pgid;
u32 pglen;
- ceph_decode_need(p, end, sizeof(u64) + sizeof(u32), bad);
- ceph_decode_copy(p, &pgid, sizeof(pgid));
- pglen = ceph_decode_32(p);
+ err = ceph_decode_pgid(p, end, &pgid);
+ if (err)
+ goto bad;
+ ceph_decode_need(p, end, sizeof(u32), bad);
+ pglen = ceph_decode_32(p);
if (pglen) {
ceph_decode_need(p, end, pglen*sizeof(u32), bad);
@@ -975,8 +993,8 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
kfree(pg);
goto bad;
}
- dout(" added pg_temp %llx len %d\n", *(u64 *)&pgid,
- pglen);
+ dout(" added pg_temp %lld.%x len %d\n", pgid.pool,
+ pgid.seed, pglen);
} else {
/* remove */
__remove_pg_mapping(&map->pg_temp, pgid);
@@ -1010,7 +1028,7 @@ bad:
* pass a stride back to the caller.
*/
int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
- u64 off, u64 *plen,
+ u64 off, u64 len,
u64 *ono,
u64 *oxoff, u64 *oxlen)
{
@@ -1021,7 +1039,7 @@ int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
u32 su_per_object;
u64 t, su_offset;
- dout("mapping %llu~%llu osize %u fl_su %u\n", off, *plen,
+ dout("mapping %llu~%llu osize %u fl_su %u\n", off, len,
osize, su);
if (su == 0 || sc == 0)
goto invalid;
@@ -1054,11 +1072,10 @@ int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
/*
* Calculate the length of the extent being written to the selected
- * object. This is the minimum of the full length requested (plen) or
+ * object. This is the minimum of the full length requested (len) or
* the remainder of the current stripe being written to.
*/
- *oxlen = min_t(u64, *plen, su - su_offset);
- *plen = *oxlen;
+ *oxlen = min_t(u64, len, su - su_offset);
dout(" obj extent %llu~%llu\n", *oxoff, *oxlen);
return 0;
@@ -1076,36 +1093,22 @@ EXPORT_SYMBOL(ceph_calc_file_object_mapping);
* calculate an object layout (i.e. pgid) from an oid,
* file_layout, and osdmap
*/
-int ceph_calc_object_layout(struct ceph_object_layout *ol,
- const char *oid,
- struct ceph_file_layout *fl,
- struct ceph_osdmap *osdmap)
+int ceph_calc_ceph_pg(struct ceph_pg *pg, const char *oid,
+ struct ceph_osdmap *osdmap, uint64_t pool)
{
- unsigned int num, num_mask;
- struct ceph_pg pgid;
- int poolid = le32_to_cpu(fl->fl_pg_pool);
- struct ceph_pg_pool_info *pool;
- unsigned int ps;
+ struct ceph_pg_pool_info *pool_info;
BUG_ON(!osdmap);
-
- pool = __lookup_pg_pool(&osdmap->pg_pools, poolid);
- if (!pool)
+ pool_info = __lookup_pg_pool(&osdmap->pg_pools, pool);
+ if (!pool_info)
return -EIO;
- ps = ceph_str_hash(pool->v.object_hash, oid, strlen(oid));
- num = le32_to_cpu(pool->v.pg_num);
- num_mask = pool->pg_num_mask;
-
- pgid.ps = cpu_to_le16(ps);
- pgid.preferred = cpu_to_le16(-1);
- pgid.pool = fl->fl_pg_pool;
- dout("calc_object_layout '%s' pgid %d.%x\n", oid, poolid, ps);
+ pg->pool = pool;
+ pg->seed = ceph_str_hash(pool_info->object_hash, oid, strlen(oid));
- ol->ol_pgid = pgid;
- ol->ol_stripe_unit = fl->fl_object_stripe_unit;
+ dout("%s '%s' pgid %lld.%x\n", __func__, oid, pg->pool, pg->seed);
return 0;
}
-EXPORT_SYMBOL(ceph_calc_object_layout);
+EXPORT_SYMBOL(ceph_calc_ceph_pg);
/*
* Calculate raw osd vector for the given pgid. Return pointer to osd
@@ -1117,19 +1120,16 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
struct ceph_pg_mapping *pg;
struct ceph_pg_pool_info *pool;
int ruleno;
- unsigned int poolid, ps, pps, t, r;
+ int r;
+ u32 pps;
- poolid = le32_to_cpu(pgid.pool);
- ps = le16_to_cpu(pgid.ps);
-
- pool = __lookup_pg_pool(&osdmap->pg_pools, poolid);
+ pool = __lookup_pg_pool(&osdmap->pg_pools, pgid.pool);
if (!pool)
return NULL;
/* pg_temp? */
- t = ceph_stable_mod(ps, le32_to_cpu(pool->v.pg_num),
- pool->pgp_num_mask);
- pgid.ps = cpu_to_le16(t);
+ pgid.seed = ceph_stable_mod(pgid.seed, pool->pg_num,
+ pool->pgp_num_mask);
pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid);
if (pg) {
*num = pg->len;
@@ -1137,26 +1137,39 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
}
/* crush */
- ruleno = crush_find_rule(osdmap->crush, pool->v.crush_ruleset,
- pool->v.type, pool->v.size);
+ ruleno = crush_find_rule(osdmap->crush, pool->crush_ruleset,
+ pool->type, pool->size);
if (ruleno < 0) {
- pr_err("no crush rule pool %d ruleset %d type %d size %d\n",
- poolid, pool->v.crush_ruleset, pool->v.type,
- pool->v.size);
+ pr_err("no crush rule pool %lld ruleset %d type %d size %d\n",
+ pgid.pool, pool->crush_ruleset, pool->type,
+ pool->size);
return NULL;
}
- pps = ceph_stable_mod(ps,
- le32_to_cpu(pool->v.pgp_num),
- pool->pgp_num_mask);
- pps += poolid;
+ if (pool->flags & CEPH_POOL_FLAG_HASHPSPOOL) {
+ /* hash pool id and seed sothat pool PGs do not overlap */
+ pps = crush_hash32_2(CRUSH_HASH_RJENKINS1,
+ ceph_stable_mod(pgid.seed, pool->pgp_num,
+ pool->pgp_num_mask),
+ pgid.pool);
+ } else {
+ /*
+ * legacy ehavior: add ps and pool together. this is
+ * not a great approach because the PGs from each pool
+ * will overlap on top of each other: 0.5 == 1.4 ==
+ * 2.3 == ...
+ */
+ pps = ceph_stable_mod(pgid.seed, pool->pgp_num,
+ pool->pgp_num_mask) +
+ (unsigned)pgid.pool;
+ }
r = crush_do_rule(osdmap->crush, ruleno, pps, osds,
- min_t(int, pool->v.size, *num),
+ min_t(int, pool->size, *num),
osdmap->osd_weight);
if (r < 0) {
- pr_err("error %d from crush rule: pool %d ruleset %d type %d"
- " size %d\n", r, poolid, pool->v.crush_ruleset,
- pool->v.type, pool->v.size);
+ pr_err("error %d from crush rule: pool %lld ruleset %d type %d"
+ " size %d\n", r, pgid.pool, pool->crush_ruleset,
+ pool->type, pool->size);
return NULL;
}
*num = r;
diff --git a/net/ceph/pagevec.c b/net/ceph/pagevec.c
index cd9c21df87d1..815a2249cfa9 100644
--- a/net/ceph/pagevec.c
+++ b/net/ceph/pagevec.c
@@ -12,7 +12,7 @@
/*
* build a vector of user pages
*/
-struct page **ceph_get_direct_page_vector(const char __user *data,
+struct page **ceph_get_direct_page_vector(const void __user *data,
int num_pages, bool write_page)
{
struct page **pages;
@@ -93,7 +93,7 @@ EXPORT_SYMBOL(ceph_alloc_page_vector);
* copy user data into a page vector
*/
int ceph_copy_user_to_page_vector(struct page **pages,
- const char __user *data,
+ const void __user *data,
loff_t off, size_t len)
{
int i = 0;
@@ -118,17 +118,17 @@ int ceph_copy_user_to_page_vector(struct page **pages,
}
EXPORT_SYMBOL(ceph_copy_user_to_page_vector);
-int ceph_copy_to_page_vector(struct page **pages,
- const char *data,
+void ceph_copy_to_page_vector(struct page **pages,
+ const void *data,
loff_t off, size_t len)
{
int i = 0;
size_t po = off & ~PAGE_CACHE_MASK;
size_t left = len;
- size_t l;
while (left > 0) {
- l = min_t(size_t, PAGE_CACHE_SIZE-po, left);
+ size_t l = min_t(size_t, PAGE_CACHE_SIZE-po, left);
+
memcpy(page_address(pages[i]) + po, data, l);
data += l;
left -= l;
@@ -138,21 +138,20 @@ int ceph_copy_to_page_vector(struct page **pages,
i++;
}
}
- return len;
}
EXPORT_SYMBOL(ceph_copy_to_page_vector);
-int ceph_copy_from_page_vector(struct page **pages,
- char *data,
+void ceph_copy_from_page_vector(struct page **pages,
+ void *data,
loff_t off, size_t len)
{
int i = 0;
size_t po = off & ~PAGE_CACHE_MASK;
size_t left = len;
- size_t l;
while (left > 0) {
- l = min_t(size_t, PAGE_CACHE_SIZE-po, left);
+ size_t l = min_t(size_t, PAGE_CACHE_SIZE-po, left);
+
memcpy(data, page_address(pages[i]) + po, l);
data += l;
left -= l;
@@ -162,7 +161,6 @@ int ceph_copy_from_page_vector(struct page **pages,
i++;
}
}
- return len;
}
EXPORT_SYMBOL(ceph_copy_from_page_vector);
@@ -170,7 +168,7 @@ EXPORT_SYMBOL(ceph_copy_from_page_vector);
* copy user data from a page vector into a user pointer
*/
int ceph_copy_page_vector_to_user(struct page **pages,
- char __user *data,
+ void __user *data,
loff_t off, size_t len)
{
int i = 0;
diff --git a/net/ceph/snapshot.c b/net/ceph/snapshot.c
new file mode 100644
index 000000000000..154683f5f14c
--- /dev/null
+++ b/net/ceph/snapshot.c
@@ -0,0 +1,78 @@
+/*
+ * snapshot.c Ceph snapshot context utility routines (part of libceph)
+ *
+ * Copyright (C) 2013 Inktank Storage, Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * version 2 as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+#include <stddef.h>
+
+#include <linux/types.h>
+#include <linux/export.h>
+#include <linux/ceph/libceph.h>
+
+/*
+ * Ceph snapshot contexts are reference counted objects, and the
+ * returned structure holds a single reference. Acquire additional
+ * references with ceph_get_snap_context(), and release them with
+ * ceph_put_snap_context(). When the reference count reaches zero
+ * the entire structure is freed.
+ */
+
+/*
+ * Create a new ceph snapshot context large enough to hold the
+ * indicated number of snapshot ids (which can be 0). Caller has
+ * to fill in snapc->seq and snapc->snaps[0..snap_count-1].
+ *
+ * Returns a null pointer if an error occurs.
+ */
+struct ceph_snap_context *ceph_create_snap_context(u32 snap_count,
+ gfp_t gfp_flags)
+{
+ struct ceph_snap_context *snapc;
+ size_t size;
+
+ size = sizeof (struct ceph_snap_context);
+ size += snap_count * sizeof (snapc->snaps[0]);
+ snapc = kzalloc(size, gfp_flags);
+ if (!snapc)
+ return NULL;
+
+ atomic_set(&snapc->nref, 1);
+ snapc->num_snaps = snap_count;
+
+ return snapc;
+}
+EXPORT_SYMBOL(ceph_create_snap_context);
+
+struct ceph_snap_context *ceph_get_snap_context(struct ceph_snap_context *sc)
+{
+ if (sc)
+ atomic_inc(&sc->nref);
+ return sc;
+}
+EXPORT_SYMBOL(ceph_get_snap_context);
+
+void ceph_put_snap_context(struct ceph_snap_context *sc)
+{
+ if (!sc)
+ return;
+ if (atomic_dec_and_test(&sc->nref)) {
+ /*printk(" deleting snap_context %p\n", sc);*/
+ kfree(sc);
+ }
+}
+EXPORT_SYMBOL(ceph_put_snap_context);