diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2017-05-10 17:42:33 +0200 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2017-05-10 17:42:33 +0200 |
commit | 26c5eaa1326e9703effd01e7cc3cc0d4ad4b3c19 (patch) | |
tree | 070c518340ae308dce62695a06a118a1df78be15 /net/ceph/osd_client.c | |
parent | Merge branch 'for-linus-4.12' of git://git.kernel.org/pub/scm/linux/kernel/gi... (diff) | |
parent | ceph: fix memory leak in __ceph_setxattr() (diff) | |
download | linux-26c5eaa1326e9703effd01e7cc3cc0d4ad4b3c19.tar.xz linux-26c5eaa1326e9703effd01e7cc3cc0d4ad4b3c19.zip |
Merge tag 'ceph-for-4.12-rc1' of git://github.com/ceph/ceph-client
Pull ceph updates from Ilya Dryomov:
"The two main items are support for disabling automatic rbd exclusive
lock transfers from myself and the long awaited -ENOSPC handling
series from Jeff.
The former will allow rbd users to take advantage of exclusive lock's
built-in blacklist/break-lock functionality while staying in control
of who owns the lock. With the latter in place, we will abort
filesystem writes on -ENOSPC instead of having them block
indefinitely.
Beyond that we've got the usual pile of filesystem fixes from Zheng,
some refcount_t conversion patches from Elena and a patch for an
ancient open() flags handling bug from Alexander"
* tag 'ceph-for-4.12-rc1' of git://github.com/ceph/ceph-client: (31 commits)
ceph: fix memory leak in __ceph_setxattr()
ceph: fix file open flags on ppc64
ceph: choose readdir frag based on previous readdir reply
rbd: exclusive map option
rbd: return ResponseMessage result from rbd_handle_request_lock()
rbd: kill rbd_is_lock_supported()
rbd: support updating the lock cookie without releasing the lock
rbd: store lock cookie
rbd: ignore unlock errors
rbd: fix error handling around rbd_init_disk()
rbd: move rbd_unregister_watch() call into rbd_dev_image_release()
rbd: move rbd_dev_destroy() call out of rbd_dev_image_release()
ceph: when seeing write errors on an inode, switch to sync writes
Revert "ceph: SetPageError() for writeback pages if writepages fails"
ceph: handle epoch barriers in cap messages
libceph: add an epoch_barrier field to struct ceph_osd_client
libceph: abort already submitted but abortable requests when map or pool goes full
libceph: allow requests to return immediately on full conditions if caller wishes
libceph: remove req->r_replay_version
ceph: make seeky readdir more efficient
...
Diffstat (limited to 'net/ceph/osd_client.c')
-rw-r--r-- | net/ceph/osd_client.c | 139 |
1 files changed, 123 insertions, 16 deletions
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 242d7c0d92f8..924f07c36ddb 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -961,6 +961,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, truncate_size, truncate_seq); } + req->r_abort_on_full = true; req->r_flags = flags; req->r_base_oloc.pool = layout->pool_id; req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns); @@ -1005,7 +1006,7 @@ static bool osd_registered(struct ceph_osd *osd) */ static void osd_init(struct ceph_osd *osd) { - atomic_set(&osd->o_ref, 1); + refcount_set(&osd->o_ref, 1); RB_CLEAR_NODE(&osd->o_node); osd->o_requests = RB_ROOT; osd->o_linger_requests = RB_ROOT; @@ -1050,9 +1051,9 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum) static struct ceph_osd *get_osd(struct ceph_osd *osd) { - if (atomic_inc_not_zero(&osd->o_ref)) { - dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1, - atomic_read(&osd->o_ref)); + if (refcount_inc_not_zero(&osd->o_ref)) { + dout("get_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref)-1, + refcount_read(&osd->o_ref)); return osd; } else { dout("get_osd %p FAIL\n", osd); @@ -1062,9 +1063,9 @@ static struct ceph_osd *get_osd(struct ceph_osd *osd) static void put_osd(struct ceph_osd *osd) { - dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref), - atomic_read(&osd->o_ref) - 1); - if (atomic_dec_and_test(&osd->o_ref)) { + dout("put_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref), + refcount_read(&osd->o_ref) - 1); + if (refcount_dec_and_test(&osd->o_ref)) { osd_cleanup(osd); kfree(osd); } @@ -1297,8 +1298,9 @@ static bool target_should_be_paused(struct ceph_osd_client *osdc, __pool_full(pi); WARN_ON(pi->id != t->base_oloc.pool); - return (t->flags & CEPH_OSD_FLAG_READ && pauserd) || - (t->flags & CEPH_OSD_FLAG_WRITE && pausewr); + return ((t->flags & CEPH_OSD_FLAG_READ) && pauserd) || + ((t->flags & CEPH_OSD_FLAG_WRITE) && pausewr) || + (osdc->osdmap->epoch < osdc->epoch_barrier); } enum calc_target_result { @@ -1503,9 +1505,10 @@ static void encode_request(struct ceph_osd_request *req, struct ceph_msg *msg) ceph_encode_32(&p, req->r_flags); ceph_encode_timespec(p, &req->r_mtime); p += sizeof(struct ceph_timespec); - /* aka reassert_version */ - memcpy(p, &req->r_replay_version, sizeof(req->r_replay_version)); - p += sizeof(req->r_replay_version); + + /* reassert_version */ + memset(p, 0, sizeof(struct ceph_eversion)); + p += sizeof(struct ceph_eversion); /* oloc */ ceph_start_encoding(&p, 5, 4, @@ -1626,6 +1629,7 @@ static void maybe_request_map(struct ceph_osd_client *osdc) ceph_monc_renew_subs(&osdc->client->monc); } +static void complete_request(struct ceph_osd_request *req, int err); static void send_map_check(struct ceph_osd_request *req); static void __submit_request(struct ceph_osd_request *req, bool wrlocked) @@ -1635,6 +1639,7 @@ static void __submit_request(struct ceph_osd_request *req, bool wrlocked) enum calc_target_result ct_res; bool need_send = false; bool promoted = false; + bool need_abort = false; WARN_ON(req->r_tid); dout("%s req %p wrlocked %d\n", __func__, req, wrlocked); @@ -1650,8 +1655,13 @@ again: goto promote; } - if ((req->r_flags & CEPH_OSD_FLAG_WRITE) && - ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) { + if (osdc->osdmap->epoch < osdc->epoch_barrier) { + dout("req %p epoch %u barrier %u\n", req, osdc->osdmap->epoch, + osdc->epoch_barrier); + req->r_t.paused = true; + maybe_request_map(osdc); + } else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) && + ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) { dout("req %p pausewr\n", req); req->r_t.paused = true; maybe_request_map(osdc); @@ -1669,6 +1679,8 @@ again: pr_warn_ratelimited("FULL or reached pool quota\n"); req->r_t.paused = true; maybe_request_map(osdc); + if (req->r_abort_on_full) + need_abort = true; } else if (!osd_homeless(osd)) { need_send = true; } else { @@ -1685,6 +1697,8 @@ again: link_request(osd, req); if (need_send) send_request(req); + else if (need_abort) + complete_request(req, -ENOSPC); mutex_unlock(&osd->lock); if (ct_res == CALC_TARGET_POOL_DNE) @@ -1799,6 +1813,97 @@ static void abort_request(struct ceph_osd_request *req, int err) complete_request(req, err); } +static void update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb) +{ + if (likely(eb > osdc->epoch_barrier)) { + dout("updating epoch_barrier from %u to %u\n", + osdc->epoch_barrier, eb); + osdc->epoch_barrier = eb; + /* Request map if we're not to the barrier yet */ + if (eb > osdc->osdmap->epoch) + maybe_request_map(osdc); + } +} + +void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb) +{ + down_read(&osdc->lock); + if (unlikely(eb > osdc->epoch_barrier)) { + up_read(&osdc->lock); + down_write(&osdc->lock); + update_epoch_barrier(osdc, eb); + up_write(&osdc->lock); + } else { + up_read(&osdc->lock); + } +} +EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier); + +/* + * Drop all pending requests that are stalled waiting on a full condition to + * clear, and complete them with ENOSPC as the return code. Set the + * osdc->epoch_barrier to the latest map epoch that we've seen if any were + * cancelled. + */ +static void ceph_osdc_abort_on_full(struct ceph_osd_client *osdc) +{ + struct rb_node *n; + bool victims = false; + + dout("enter abort_on_full\n"); + + if (!ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) && !have_pool_full(osdc)) + goto out; + + /* Scan list and see if there is anything to abort */ + for (n = rb_first(&osdc->osds); n; n = rb_next(n)) { + struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); + struct rb_node *m; + + m = rb_first(&osd->o_requests); + while (m) { + struct ceph_osd_request *req = rb_entry(m, + struct ceph_osd_request, r_node); + m = rb_next(m); + + if (req->r_abort_on_full) { + victims = true; + break; + } + } + if (victims) + break; + } + + if (!victims) + goto out; + + /* + * Update the barrier to current epoch if it's behind that point, + * since we know we have some calls to be aborted in the tree. + */ + update_epoch_barrier(osdc, osdc->osdmap->epoch); + + for (n = rb_first(&osdc->osds); n; n = rb_next(n)) { + struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); + struct rb_node *m; + + m = rb_first(&osd->o_requests); + while (m) { + struct ceph_osd_request *req = rb_entry(m, + struct ceph_osd_request, r_node); + m = rb_next(m); + + if (req->r_abort_on_full && + (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || + pool_full(osdc, req->r_t.target_oloc.pool))) + abort_request(req, -ENOSPC); + } + } +out: + dout("return abort_on_full barrier=%u\n", osdc->epoch_barrier); +} + static void check_pool_dne(struct ceph_osd_request *req) { struct ceph_osd_client *osdc = req->r_osdc; @@ -3252,11 +3357,13 @@ done: pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) || ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || have_pool_full(osdc); - if (was_pauserd || was_pausewr || pauserd || pausewr) + if (was_pauserd || was_pausewr || pauserd || pausewr || + osdc->osdmap->epoch < osdc->epoch_barrier) maybe_request_map(osdc); kick_requests(osdc, &need_resend, &need_resend_linger); + ceph_osdc_abort_on_full(osdc); ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP, osdc->osdmap->epoch); up_write(&osdc->lock); @@ -4126,7 +4233,7 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc) close_osd(osd); } up_write(&osdc->lock); - WARN_ON(atomic_read(&osdc->homeless_osd.o_ref) != 1); + WARN_ON(refcount_read(&osdc->homeless_osd.o_ref) != 1); osd_cleanup(&osdc->homeless_osd); WARN_ON(!list_empty(&osdc->osd_lru)); |