diff options
Diffstat (limited to 'net')
-rw-r--r-- | net/ceph/messenger.c | 31 | ||||
-rw-r--r-- | net/ceph/osd_client.c | 216 | ||||
-rw-r--r-- | net/ceph/osdmap.c | 19 |
3 files changed, 149 insertions, 117 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 3b3d33ea9ed8..c6413c360771 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -168,12 +168,6 @@ static char tag_keepalive2 = CEPH_MSGR_TAG_KEEPALIVE2; static struct lock_class_key socket_class; #endif -/* - * When skipping (ignoring) a block of input we read it into a "skip - * buffer," which is this many bytes in size. - */ -#define SKIP_BUF_SIZE 1024 - static void queue_con(struct ceph_connection *con); static void cancel_con(struct ceph_connection *con); static void ceph_con_workfn(struct work_struct *); @@ -520,12 +514,18 @@ static int ceph_tcp_connect(struct ceph_connection *con) return 0; } +/* + * If @buf is NULL, discard up to @len bytes. + */ static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len) { struct kvec iov = {buf, len}; struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; int r; + if (!buf) + msg.msg_flags |= MSG_TRUNC; + iov_iter_kvec(&msg.msg_iter, READ | ITER_KVEC, &iov, 1, len); r = sock_recvmsg(sock, &msg, msg.msg_flags); if (r == -EAGAIN) @@ -2575,9 +2575,6 @@ static int try_write(struct ceph_connection *con) con->state != CON_STATE_OPEN) return 0; -more: - dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); - /* open the socket first? */ if (con->state == CON_STATE_PREOPEN) { BUG_ON(con->sock); @@ -2598,7 +2595,8 @@ more: } } -more_kvec: +more: + dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); BUG_ON(!con->sock); /* kvec data queued? */ @@ -2623,7 +2621,7 @@ more_kvec: ret = write_partial_message_data(con); if (ret == 1) - goto more_kvec; /* we need to send the footer, too! */ + goto more; /* we need to send the footer, too! */ if (ret == 0) goto out; if (ret < 0) { @@ -2659,8 +2657,6 @@ out: return ret; } - - /* * Read what we can from the socket. */ @@ -2721,16 +2717,11 @@ more: if (con->in_base_pos < 0) { /* * skipping + discarding content. - * - * FIXME: there must be a better way to do this! */ - static char buf[SKIP_BUF_SIZE]; - int skip = min((int) sizeof (buf), -con->in_base_pos); - - dout("skipping %d / %d bytes\n", skip, -con->in_base_pos); - ret = ceph_tcp_recvmsg(con->sock, buf, skip); + ret = ceph_tcp_recvmsg(con->sock, NULL, -con->in_base_pos); if (ret <= 0) goto out; + dout("skipped %d / %d bytes\n", ret, -con->in_base_pos); con->in_base_pos += ret; if (con->in_base_pos) goto more; diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 69a2581ddbba..a00c74f1154e 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -766,7 +766,7 @@ void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req, } EXPORT_SYMBOL(osd_req_op_extent_dup_last); -void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, +int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, u16 opcode, const char *class, const char *method) { struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, @@ -778,7 +778,9 @@ void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, BUG_ON(opcode != CEPH_OSD_OP_CALL); pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS); - BUG_ON(!pagelist); + if (!pagelist) + return -ENOMEM; + ceph_pagelist_init(pagelist); op->cls.class_name = class; @@ -798,6 +800,7 @@ void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist); op->indata_len = payload_len; + return 0; } EXPORT_SYMBOL(osd_req_op_cls_init); @@ -1026,7 +1029,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, truncate_size, truncate_seq); } - req->r_abort_on_full = true; req->r_flags = flags; req->r_base_oloc.pool = layout->pool_id; req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns); @@ -1054,6 +1056,38 @@ EXPORT_SYMBOL(ceph_osdc_new_request); DEFINE_RB_FUNCS(request, struct ceph_osd_request, r_tid, r_node) DEFINE_RB_FUNCS(request_mc, struct ceph_osd_request, r_tid, r_mc_node) +/* + * Call @fn on each OSD request as long as @fn returns 0. + */ +static void for_each_request(struct ceph_osd_client *osdc, + int (*fn)(struct ceph_osd_request *req, void *arg), + void *arg) +{ + struct rb_node *n, *p; + + for (n = rb_first(&osdc->osds); n; n = rb_next(n)) { + struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); + + for (p = rb_first(&osd->o_requests); p; ) { + struct ceph_osd_request *req = + rb_entry(p, struct ceph_osd_request, r_node); + + p = rb_next(p); + if (fn(req, arg)) + return; + } + } + + for (p = rb_first(&osdc->homeless_osd.o_requests); p; ) { + struct ceph_osd_request *req = + rb_entry(p, struct ceph_osd_request, r_node); + + p = rb_next(p); + if (fn(req, arg)) + return; + } +} + static bool osd_homeless(struct ceph_osd *osd) { return osd->o_osd == CEPH_HOMELESS_OSD; @@ -1395,7 +1429,6 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc, bool recovery_deletes = ceph_osdmap_flag(osdc, CEPH_OSDMAP_RECOVERY_DELETES); enum calc_target_result ct_res; - int ret; t->epoch = osdc->osdmap->epoch; pi = ceph_pg_pool_by_id(osdc->osdmap, t->base_oloc.pool); @@ -1431,14 +1464,7 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc, } } - ret = __ceph_object_locator_to_pg(pi, &t->target_oid, &t->target_oloc, - &pgid); - if (ret) { - WARN_ON(ret != -ENOENT); - t->osd = CEPH_HOMELESS_OSD; - ct_res = CALC_TARGET_POOL_DNE; - goto out; - } + __ceph_object_locator_to_pg(pi, &t->target_oid, &t->target_oloc, &pgid); last_pgid.pool = pgid.pool; last_pgid.seed = ceph_stable_mod(pgid.seed, t->pg_num, t->pg_num_mask); @@ -2161,9 +2187,9 @@ static void __submit_request(struct ceph_osd_request *req, bool wrlocked) struct ceph_osd_client *osdc = req->r_osdc; struct ceph_osd *osd; enum calc_target_result ct_res; + int err = 0; bool need_send = false; bool promoted = false; - bool need_abort = false; WARN_ON(req->r_tid); dout("%s req %p wrlocked %d\n", __func__, req, wrlocked); @@ -2179,7 +2205,10 @@ again: goto promote; } - if (osdc->osdmap->epoch < osdc->epoch_barrier) { + if (osdc->abort_err) { + dout("req %p abort_err %d\n", req, osdc->abort_err); + err = osdc->abort_err; + } else if (osdc->osdmap->epoch < osdc->epoch_barrier) { dout("req %p epoch %u barrier %u\n", req, osdc->osdmap->epoch, osdc->epoch_barrier); req->r_t.paused = true; @@ -2200,11 +2229,13 @@ again: (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || pool_full(osdc, req->r_t.base_oloc.pool))) { dout("req %p full/pool_full\n", req); - pr_warn_ratelimited("FULL or reached pool quota\n"); - req->r_t.paused = true; - maybe_request_map(osdc); - if (req->r_abort_on_full) - need_abort = true; + if (osdc->abort_on_full) { + err = -ENOSPC; + } else { + pr_warn_ratelimited("FULL or reached pool quota\n"); + req->r_t.paused = true; + maybe_request_map(osdc); + } } else if (!osd_homeless(osd)) { need_send = true; } else { @@ -2221,11 +2252,11 @@ again: link_request(osd, req); if (need_send) send_request(req); - else if (need_abort) - complete_request(req, -ENOSPC); + else if (err) + complete_request(req, err); mutex_unlock(&osd->lock); - if (ct_res == CALC_TARGET_POOL_DNE) + if (!err && ct_res == CALC_TARGET_POOL_DNE) send_map_check(req); if (promoted) @@ -2281,11 +2312,21 @@ static void finish_request(struct ceph_osd_request *req) static void __complete_request(struct ceph_osd_request *req) { - if (req->r_callback) { - dout("%s req %p tid %llu cb %pf result %d\n", __func__, req, - req->r_tid, req->r_callback, req->r_result); + dout("%s req %p tid %llu cb %pf result %d\n", __func__, req, + req->r_tid, req->r_callback, req->r_result); + + if (req->r_callback) req->r_callback(req); - } + complete_all(&req->r_completion); + ceph_osdc_put_request(req); +} + +static void complete_request_workfn(struct work_struct *work) +{ + struct ceph_osd_request *req = + container_of(work, struct ceph_osd_request, r_complete_work); + + __complete_request(req); } /* @@ -2297,9 +2338,9 @@ static void complete_request(struct ceph_osd_request *req, int err) req->r_result = err; finish_request(req); - __complete_request(req); - complete_all(&req->r_completion); - ceph_osdc_put_request(req); + + INIT_WORK(&req->r_complete_work, complete_request_workfn); + queue_work(req->r_osdc->completion_wq, &req->r_complete_work); } static void cancel_map_check(struct ceph_osd_request *req) @@ -2336,6 +2377,28 @@ static void abort_request(struct ceph_osd_request *req, int err) complete_request(req, err); } +static int abort_fn(struct ceph_osd_request *req, void *arg) +{ + int err = *(int *)arg; + + abort_request(req, err); + return 0; /* continue iteration */ +} + +/* + * Abort all in-flight requests with @err and arrange for all future + * requests to be failed immediately. + */ +void ceph_osdc_abort_requests(struct ceph_osd_client *osdc, int err) +{ + dout("%s osdc %p err %d\n", __func__, osdc, err); + down_write(&osdc->lock); + for_each_request(osdc, abort_fn, &err); + osdc->abort_err = err; + up_write(&osdc->lock); +} +EXPORT_SYMBOL(ceph_osdc_abort_requests); + static void update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb) { if (likely(eb > osdc->epoch_barrier)) { @@ -2363,6 +2426,30 @@ void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb) EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier); /* + * We can end up releasing caps as a result of abort_request(). + * In that case, we probably want to ensure that the cap release message + * has an updated epoch barrier in it, so set the epoch barrier prior to + * aborting the first request. + */ +static int abort_on_full_fn(struct ceph_osd_request *req, void *arg) +{ + struct ceph_osd_client *osdc = req->r_osdc; + bool *victims = arg; + + if ((req->r_flags & CEPH_OSD_FLAG_WRITE) && + (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || + pool_full(osdc, req->r_t.base_oloc.pool))) { + if (!*victims) { + update_epoch_barrier(osdc, osdc->osdmap->epoch); + *victims = true; + } + abort_request(req, -ENOSPC); + } + + return 0; /* continue iteration */ +} + +/* * Drop all pending requests that are stalled waiting on a full condition to * clear, and complete them with ENOSPC as the return code. Set the * osdc->epoch_barrier to the latest map epoch that we've seen if any were @@ -2370,61 +2457,11 @@ EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier); */ static void ceph_osdc_abort_on_full(struct ceph_osd_client *osdc) { - struct rb_node *n; bool victims = false; - dout("enter abort_on_full\n"); - - if (!ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) && !have_pool_full(osdc)) - goto out; - - /* Scan list and see if there is anything to abort */ - for (n = rb_first(&osdc->osds); n; n = rb_next(n)) { - struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); - struct rb_node *m; - - m = rb_first(&osd->o_requests); - while (m) { - struct ceph_osd_request *req = rb_entry(m, - struct ceph_osd_request, r_node); - m = rb_next(m); - - if (req->r_abort_on_full) { - victims = true; - break; - } - } - if (victims) - break; - } - - if (!victims) - goto out; - - /* - * Update the barrier to current epoch if it's behind that point, - * since we know we have some calls to be aborted in the tree. - */ - update_epoch_barrier(osdc, osdc->osdmap->epoch); - - for (n = rb_first(&osdc->osds); n; n = rb_next(n)) { - struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); - struct rb_node *m; - - m = rb_first(&osd->o_requests); - while (m) { - struct ceph_osd_request *req = rb_entry(m, - struct ceph_osd_request, r_node); - m = rb_next(m); - - if (req->r_abort_on_full && - (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || - pool_full(osdc, req->r_t.target_oloc.pool))) - abort_request(req, -ENOSPC); - } - } -out: - dout("return abort_on_full barrier=%u\n", osdc->epoch_barrier); + if (osdc->abort_on_full && + (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || have_pool_full(osdc))) + for_each_request(osdc, abort_on_full_fn, &victims); } static void check_pool_dne(struct ceph_osd_request *req) @@ -3541,8 +3578,6 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg) up_read(&osdc->lock); __complete_request(req); - complete_all(&req->r_completion); - ceph_osdc_put_request(req); return; fail_request: @@ -4927,7 +4962,10 @@ int ceph_osdc_call(struct ceph_osd_client *osdc, if (ret) goto out_put_req; - osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method); + ret = osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method); + if (ret) + goto out_put_req; + if (req_page) osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len, 0, false, false); @@ -4996,6 +5034,10 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) if (!osdc->notify_wq) goto out_msgpool_reply; + osdc->completion_wq = create_singlethread_workqueue("ceph-completion"); + if (!osdc->completion_wq) + goto out_notify_wq; + schedule_delayed_work(&osdc->timeout_work, osdc->client->options->osd_keepalive_timeout); schedule_delayed_work(&osdc->osds_timeout_work, @@ -5003,6 +5045,8 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) return 0; +out_notify_wq: + destroy_workqueue(osdc->notify_wq); out_msgpool_reply: ceph_msgpool_destroy(&osdc->msgpool_op_reply); out_msgpool: @@ -5017,7 +5061,7 @@ out: void ceph_osdc_stop(struct ceph_osd_client *osdc) { - flush_workqueue(osdc->notify_wq); + destroy_workqueue(osdc->completion_wq); destroy_workqueue(osdc->notify_wq); cancel_delayed_work_sync(&osdc->timeout_work); cancel_delayed_work_sync(&osdc->osds_timeout_work); diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index e22820e24f50..98c0ff3d6441 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -2146,10 +2146,10 @@ bool ceph_osds_changed(const struct ceph_osds *old_acting, * Should only be called with target_oid and target_oloc (as opposed to * base_oid and base_oloc), since tiering isn't taken into account. */ -int __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi, - const struct ceph_object_id *oid, - const struct ceph_object_locator *oloc, - struct ceph_pg *raw_pgid) +void __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi, + const struct ceph_object_id *oid, + const struct ceph_object_locator *oloc, + struct ceph_pg *raw_pgid) { WARN_ON(pi->id != oloc->pool); @@ -2165,11 +2165,8 @@ int __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi, int nsl = oloc->pool_ns->len; size_t total = nsl + 1 + oid->name_len; - if (total > sizeof(stack_buf)) { - buf = kmalloc(total, GFP_NOIO); - if (!buf) - return -ENOMEM; - } + if (total > sizeof(stack_buf)) + buf = kmalloc(total, GFP_NOIO | __GFP_NOFAIL); memcpy(buf, oloc->pool_ns->str, nsl); buf[nsl] = '\037'; memcpy(buf + nsl + 1, oid->name, oid->name_len); @@ -2181,7 +2178,6 @@ int __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi, oid->name, nsl, oloc->pool_ns->str, raw_pgid->pool, raw_pgid->seed); } - return 0; } int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap, @@ -2195,7 +2191,8 @@ int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap, if (!pi) return -ENOENT; - return __ceph_object_locator_to_pg(pi, oid, oloc, raw_pgid); + __ceph_object_locator_to_pg(pi, oid, oloc, raw_pgid); + return 0; } EXPORT_SYMBOL(ceph_object_locator_to_pg); |