diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2017-05-10 17:42:33 +0200 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2017-05-10 17:42:33 +0200 |
commit | 26c5eaa1326e9703effd01e7cc3cc0d4ad4b3c19 (patch) | |
tree | 070c518340ae308dce62695a06a118a1df78be15 /fs/ceph | |
parent | Merge branch 'for-linus-4.12' of git://git.kernel.org/pub/scm/linux/kernel/gi... (diff) | |
parent | ceph: fix memory leak in __ceph_setxattr() (diff) | |
download | linux-26c5eaa1326e9703effd01e7cc3cc0d4ad4b3c19.tar.xz linux-26c5eaa1326e9703effd01e7cc3cc0d4ad4b3c19.zip |
Merge tag 'ceph-for-4.12-rc1' of git://github.com/ceph/ceph-client
Pull ceph updates from Ilya Dryomov:
"The two main items are support for disabling automatic rbd exclusive
lock transfers from myself and the long awaited -ENOSPC handling
series from Jeff.
The former will allow rbd users to take advantage of exclusive lock's
built-in blacklist/break-lock functionality while staying in control
of who owns the lock. With the latter in place, we will abort
filesystem writes on -ENOSPC instead of having them block
indefinitely.
Beyond that we've got the usual pile of filesystem fixes from Zheng,
some refcount_t conversion patches from Elena and a patch for an
ancient open() flags handling bug from Alexander"
* tag 'ceph-for-4.12-rc1' of git://github.com/ceph/ceph-client: (31 commits)
ceph: fix memory leak in __ceph_setxattr()
ceph: fix file open flags on ppc64
ceph: choose readdir frag based on previous readdir reply
rbd: exclusive map option
rbd: return ResponseMessage result from rbd_handle_request_lock()
rbd: kill rbd_is_lock_supported()
rbd: support updating the lock cookie without releasing the lock
rbd: store lock cookie
rbd: ignore unlock errors
rbd: fix error handling around rbd_init_disk()
rbd: move rbd_unregister_watch() call into rbd_dev_image_release()
rbd: move rbd_dev_destroy() call out of rbd_dev_image_release()
ceph: when seeing write errors on an inode, switch to sync writes
Revert "ceph: SetPageError() for writeback pages if writepages fails"
ceph: handle epoch barriers in cap messages
libceph: add an epoch_barrier field to struct ceph_osd_client
libceph: abort already submitted but abortable requests when map or pool goes full
libceph: allow requests to return immediately on full conditions if caller wishes
libceph: remove req->r_replay_version
ceph: make seeky readdir more efficient
...
Diffstat (limited to 'fs/ceph')
-rw-r--r-- | fs/ceph/addr.c | 10 | ||||
-rw-r--r-- | fs/ceph/caps.c | 25 | ||||
-rw-r--r-- | fs/ceph/debugfs.c | 21 | ||||
-rw-r--r-- | fs/ceph/dir.c | 23 | ||||
-rw-r--r-- | fs/ceph/file.c | 68 | ||||
-rw-r--r-- | fs/ceph/inode.c | 17 | ||||
-rw-r--r-- | fs/ceph/mds_client.c | 75 | ||||
-rw-r--r-- | fs/ceph/mds_client.h | 15 | ||||
-rw-r--r-- | fs/ceph/mdsmap.c | 44 | ||||
-rw-r--r-- | fs/ceph/snap.c | 2 | ||||
-rw-r--r-- | fs/ceph/super.c | 7 | ||||
-rw-r--r-- | fs/ceph/super.h | 31 | ||||
-rw-r--r-- | fs/ceph/xattr.c | 3 |
13 files changed, 255 insertions, 86 deletions
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 9ecb2fd348cb..1e71e6ca5ddf 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -670,8 +670,12 @@ static void writepages_finish(struct ceph_osd_request *req) bool remove_page; dout("writepages_finish %p rc %d\n", inode, rc); - if (rc < 0) + if (rc < 0) { mapping_set_error(mapping, rc); + ceph_set_error_write(ci); + } else { + ceph_clear_error_write(ci); + } /* * We lost the cache cap, need to truncate the page before @@ -703,9 +707,6 @@ static void writepages_finish(struct ceph_osd_request *req) clear_bdi_congested(inode_to_bdi(inode), BLK_RW_ASYNC); - if (rc < 0) - SetPageError(page); - ceph_put_snap_context(page_snap_context(page)); page->private = 0; ClearPagePrivate(page); @@ -1892,6 +1893,7 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, err = ceph_osdc_start_request(&fsc->client->osdc, rd_req, false); wr_req->r_mtime = ci->vfs_inode.i_mtime; + wr_req->r_abort_on_full = true; err2 = ceph_osdc_start_request(&fsc->client->osdc, wr_req, false); if (!err) diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 68c78be19d5b..a3ebb632294e 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -1015,6 +1015,7 @@ static int send_cap_msg(struct cap_msg_args *arg) void *p; size_t extra_len; struct timespec zerotime = {0}; + struct ceph_osd_client *osdc = &arg->session->s_mdsc->fsc->client->osdc; dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s" " seq %u/%u tid %llu/%llu mseq %u follows %lld size %llu/%llu" @@ -1076,8 +1077,12 @@ static int send_cap_msg(struct cap_msg_args *arg) ceph_encode_64(&p, arg->inline_data ? 0 : CEPH_INLINE_NONE); /* inline data size */ ceph_encode_32(&p, 0); - /* osd_epoch_barrier (version 5) */ - ceph_encode_32(&p, 0); + /* + * osd_epoch_barrier (version 5) + * The epoch_barrier is protected osdc->lock, so READ_ONCE here in + * case it was recently changed + */ + ceph_encode_32(&p, READ_ONCE(osdc->epoch_barrier)); /* oldest_flush_tid (version 6) */ ceph_encode_64(&p, arg->oldest_flush_tid); @@ -1389,7 +1394,7 @@ static void __ceph_flush_snaps(struct ceph_inode_info *ci, first_tid = cf->tid + 1; capsnap = container_of(cf, struct ceph_cap_snap, cap_flush); - atomic_inc(&capsnap->nref); + refcount_inc(&capsnap->nref); spin_unlock(&ci->i_ceph_lock); dout("__flush_snaps %p capsnap %p tid %llu %s\n", @@ -2202,7 +2207,7 @@ static void __kick_flushing_caps(struct ceph_mds_client *mdsc, inode, capsnap, cf->tid, ceph_cap_string(capsnap->dirty)); - atomic_inc(&capsnap->nref); + refcount_inc(&capsnap->nref); spin_unlock(&ci->i_ceph_lock); ret = __send_flush_snap(inode, session, capsnap, cap->mseq, @@ -3633,13 +3638,19 @@ void ceph_handle_caps(struct ceph_mds_session *session, p += inline_len; } + if (le16_to_cpu(msg->hdr.version) >= 5) { + struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc; + u32 epoch_barrier; + + ceph_decode_32_safe(&p, end, epoch_barrier, bad); + ceph_osdc_update_epoch_barrier(osdc, epoch_barrier); + } + if (le16_to_cpu(msg->hdr.version) >= 8) { u64 flush_tid; u32 caller_uid, caller_gid; - u32 osd_epoch_barrier; u32 pool_ns_len; - /* version >= 5 */ - ceph_decode_32_safe(&p, end, osd_epoch_barrier, bad); + /* version >= 6 */ ceph_decode_64_safe(&p, end, flush_tid, bad); /* version >= 7 */ diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c index 3ef11bc8d728..4e2d112c982f 100644 --- a/fs/ceph/debugfs.c +++ b/fs/ceph/debugfs.c @@ -22,20 +22,19 @@ static int mdsmap_show(struct seq_file *s, void *p) { int i; struct ceph_fs_client *fsc = s->private; + struct ceph_mdsmap *mdsmap; if (fsc->mdsc == NULL || fsc->mdsc->mdsmap == NULL) return 0; - seq_printf(s, "epoch %d\n", fsc->mdsc->mdsmap->m_epoch); - seq_printf(s, "root %d\n", fsc->mdsc->mdsmap->m_root); - seq_printf(s, "session_timeout %d\n", - fsc->mdsc->mdsmap->m_session_timeout); - seq_printf(s, "session_autoclose %d\n", - fsc->mdsc->mdsmap->m_session_autoclose); - for (i = 0; i < fsc->mdsc->mdsmap->m_max_mds; i++) { - struct ceph_entity_addr *addr = - &fsc->mdsc->mdsmap->m_info[i].addr; - int state = fsc->mdsc->mdsmap->m_info[i].state; - + mdsmap = fsc->mdsc->mdsmap; + seq_printf(s, "epoch %d\n", mdsmap->m_epoch); + seq_printf(s, "root %d\n", mdsmap->m_root); + seq_printf(s, "max_mds %d\n", mdsmap->m_max_mds); + seq_printf(s, "session_timeout %d\n", mdsmap->m_session_timeout); + seq_printf(s, "session_autoclose %d\n", mdsmap->m_session_autoclose); + for (i = 0; i < mdsmap->m_num_mds; i++) { + struct ceph_entity_addr *addr = &mdsmap->m_info[i].addr; + int state = mdsmap->m_info[i].state; seq_printf(s, "\tmds%d\t%s\t(%s)\n", i, ceph_pr_addr(&addr->in_addr), ceph_mds_state_name(state)); diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index 3e9ad501addf..e071d23f6148 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -294,7 +294,7 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) struct ceph_mds_client *mdsc = fsc->mdsc; int i; int err; - u32 ftype; + unsigned frag = -1; struct ceph_mds_reply_info_parsed *rinfo; dout("readdir %p file %p pos %llx\n", inode, file, ctx->pos); @@ -341,7 +341,6 @@ more: /* do we have the correct frag content buffered? */ if (need_send_readdir(fi, ctx->pos)) { struct ceph_mds_request *req; - unsigned frag; int op = ceph_snap(inode) == CEPH_SNAPDIR ? CEPH_MDS_OP_LSSNAP : CEPH_MDS_OP_READDIR; @@ -352,8 +351,11 @@ more: } if (is_hash_order(ctx->pos)) { - frag = ceph_choose_frag(ci, fpos_hash(ctx->pos), - NULL, NULL); + /* fragtree isn't always accurate. choose frag + * based on previous reply when possible. */ + if (frag == (unsigned)-1) + frag = ceph_choose_frag(ci, fpos_hash(ctx->pos), + NULL, NULL); } else { frag = fpos_frag(ctx->pos); } @@ -378,7 +380,11 @@ more: ceph_mdsc_put_request(req); return -ENOMEM; } + } else if (is_hash_order(ctx->pos)) { + req->r_args.readdir.offset_hash = + cpu_to_le32(fpos_hash(ctx->pos)); } + req->r_dir_release_cnt = fi->dir_release_count; req->r_dir_ordered_cnt = fi->dir_ordered_count; req->r_readdir_cache_idx = fi->readdir_cache_idx; @@ -476,6 +482,7 @@ more: struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i; struct ceph_vino vino; ino_t ino; + u32 ftype; BUG_ON(rde->offset < ctx->pos); @@ -498,15 +505,17 @@ more: ctx->pos++; } + ceph_mdsc_put_request(fi->last_readdir); + fi->last_readdir = NULL; + if (fi->next_offset > 2) { - ceph_mdsc_put_request(fi->last_readdir); - fi->last_readdir = NULL; + frag = fi->frag; goto more; } /* more frags? */ if (!ceph_frag_is_rightmost(fi->frag)) { - unsigned frag = ceph_frag_next(fi->frag); + frag = ceph_frag_next(fi->frag); if (is_hash_order(ctx->pos)) { loff_t new_pos = ceph_make_fpos(ceph_frag_value(frag), fi->next_offset, true); diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 18c045e2ead6..3fdde0b283c9 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -13,6 +13,38 @@ #include "mds_client.h" #include "cache.h" +static __le32 ceph_flags_sys2wire(u32 flags) +{ + u32 wire_flags = 0; + + switch (flags & O_ACCMODE) { + case O_RDONLY: + wire_flags |= CEPH_O_RDONLY; + break; + case O_WRONLY: + wire_flags |= CEPH_O_WRONLY; + break; + case O_RDWR: + wire_flags |= CEPH_O_RDWR; + break; + } + +#define ceph_sys2wire(a) if (flags & a) { wire_flags |= CEPH_##a; flags &= ~a; } + + ceph_sys2wire(O_CREAT); + ceph_sys2wire(O_EXCL); + ceph_sys2wire(O_TRUNC); + ceph_sys2wire(O_DIRECTORY); + ceph_sys2wire(O_NOFOLLOW); + +#undef ceph_sys2wire + + if (flags) + dout("unused open flags: %x", flags); + + return cpu_to_le32(wire_flags); +} + /* * Ceph file operations * @@ -120,7 +152,7 @@ prepare_open_request(struct super_block *sb, int flags, int create_mode) if (IS_ERR(req)) goto out; req->r_fmode = ceph_flags_to_mode(flags); - req->r_args.open.flags = cpu_to_le32(flags); + req->r_args.open.flags = ceph_flags_sys2wire(flags); req->r_args.open.mode = cpu_to_le32(create_mode); out: return req; @@ -189,7 +221,7 @@ int ceph_renew_caps(struct inode *inode) spin_lock(&ci->i_ceph_lock); wanted = __ceph_caps_file_wanted(ci); if (__ceph_is_any_real_caps(ci) && - (!(wanted & CEPH_CAP_ANY_WR) == 0 || ci->i_auth_cap)) { + (!(wanted & CEPH_CAP_ANY_WR) || ci->i_auth_cap)) { int issued = __ceph_caps_issued(ci, NULL); spin_unlock(&ci->i_ceph_lock); dout("renew caps %p want %s issued %s updating mds_wanted\n", @@ -778,6 +810,7 @@ static void ceph_aio_retry_work(struct work_struct *work) req->r_callback = ceph_aio_complete_req; req->r_inode = inode; req->r_priv = aio_req; + req->r_abort_on_full = true; ret = ceph_osdc_start_request(req->r_osdc, req, false); out: @@ -1085,19 +1118,22 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, out: ceph_osdc_put_request(req); - if (ret == 0) { - pos += len; - written += len; - - if (pos > i_size_read(inode)) { - check_caps = ceph_inode_set_size(inode, pos); - if (check_caps) - ceph_check_caps(ceph_inode(inode), - CHECK_CAPS_AUTHONLY, - NULL); - } - } else + if (ret != 0) { + ceph_set_error_write(ci); break; + } + + ceph_clear_error_write(ci); + pos += len; + written += len; + if (pos > i_size_read(inode)) { + check_caps = ceph_inode_set_size(inode, pos); + if (check_caps) + ceph_check_caps(ceph_inode(inode), + CHECK_CAPS_AUTHONLY, + NULL); + } + } if (ret != -EOLDSNAPC && written > 0) { @@ -1303,6 +1339,7 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from) } retry_snap: + /* FIXME: not complete since it doesn't account for being at quota */ if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL)) { err = -ENOSPC; goto out; @@ -1324,7 +1361,8 @@ retry_snap: inode, ceph_vinop(inode), pos, count, ceph_cap_string(got)); if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 || - (iocb->ki_flags & IOCB_DIRECT) || (fi->flags & CEPH_F_SYNC)) { + (iocb->ki_flags & IOCB_DIRECT) || (fi->flags & CEPH_F_SYNC) || + (ci->i_ceph_flags & CEPH_I_ERROR_WRITE)) { struct ceph_snap_context *snapc; struct iov_iter data; inode_unlock(inode); diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index d3119fe3ab45..dcce79b84406 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -1482,10 +1482,17 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req, if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) return readdir_prepopulate_inodes_only(req, session); - if (rinfo->hash_order && req->r_path2) { - last_hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash, - req->r_path2, strlen(req->r_path2)); - last_hash = ceph_frag_value(last_hash); + if (rinfo->hash_order) { + if (req->r_path2) { + last_hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash, + req->r_path2, + strlen(req->r_path2)); + last_hash = ceph_frag_value(last_hash); + } else if (rinfo->offset_hash) { + /* mds understands offset_hash */ + WARN_ON_ONCE(req->r_readdir_offset != 2); + last_hash = le32_to_cpu(rhead->args.readdir.offset_hash); + } } if (rinfo->dir_dir && @@ -1510,7 +1517,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req, } if (ceph_frag_is_leftmost(frag) && req->r_readdir_offset == 2 && - !(rinfo->hash_order && req->r_path2)) { + !(rinfo->hash_order && last_hash)) { /* note dir version at start of readdir so we can tell * if any dentries get dropped */ req->r_dir_release_cnt = atomic64_read(&ci->i_release_count); diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 1d3fa90d40b9..f38e56fa9712 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -189,6 +189,7 @@ static int parse_reply_info_dir(void **p, void *end, info->dir_end = !!(flags & CEPH_READDIR_FRAG_END); info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE); info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER); + info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH); } if (num == 0) goto done; @@ -378,9 +379,9 @@ const char *ceph_session_state_name(int s) static struct ceph_mds_session *get_session(struct ceph_mds_session *s) { - if (atomic_inc_not_zero(&s->s_ref)) { + if (refcount_inc_not_zero(&s->s_ref)) { dout("mdsc get_session %p %d -> %d\n", s, - atomic_read(&s->s_ref)-1, atomic_read(&s->s_ref)); + refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref)); return s; } else { dout("mdsc get_session %p 0 -- FAIL", s); @@ -391,8 +392,8 @@ static struct ceph_mds_session *get_session(struct ceph_mds_session *s) void ceph_put_mds_session(struct ceph_mds_session *s) { dout("mdsc put_session %p %d -> %d\n", s, - atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1); - if (atomic_dec_and_test(&s->s_ref)) { + refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1); + if (refcount_dec_and_test(&s->s_ref)) { if (s->s_auth.authorizer) ceph_auth_destroy_authorizer(s->s_auth.authorizer); kfree(s); @@ -411,7 +412,7 @@ struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc, return NULL; session = mdsc->sessions[mds]; dout("lookup_mds_session %p %d\n", session, - atomic_read(&session->s_ref)); + refcount_read(&session->s_ref)); get_session(session); return session; } @@ -441,7 +442,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, { struct ceph_mds_session *s; - if (mds >= mdsc->mdsmap->m_max_mds) + if (mds >= mdsc->mdsmap->m_num_mds) return ERR_PTR(-EINVAL); s = kzalloc(sizeof(*s), GFP_NOFS); @@ -466,7 +467,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, INIT_LIST_HEAD(&s->s_caps); s->s_nr_caps = 0; s->s_trim_caps = 0; - atomic_set(&s->s_ref, 1); + refcount_set(&s->s_ref, 1); INIT_LIST_HEAD(&s->s_waiting); INIT_LIST_HEAD(&s->s_unsafe); s->s_num_cap_releases = 0; @@ -494,7 +495,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, } mdsc->sessions[mds] = s; atomic_inc(&mdsc->num_sessions); - atomic_inc(&s->s_ref); /* one ref to sessions[], one to caller */ + refcount_inc(&s->s_ref); /* one ref to sessions[], one to caller */ ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds, ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); @@ -1004,7 +1005,7 @@ static void __open_export_target_sessions(struct ceph_mds_client *mdsc, struct ceph_mds_session *ts; int i, mds = session->s_mds; - if (mds >= mdsc->mdsmap->m_max_mds) + if (mds >= mdsc->mdsmap->m_num_mds) return; mi = &mdsc->mdsmap->m_info[mds]; @@ -1551,9 +1552,15 @@ void ceph_send_cap_releases(struct ceph_mds_client *mdsc, struct ceph_msg *msg = NULL; struct ceph_mds_cap_release *head; struct ceph_mds_cap_item *item; + struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc; struct ceph_cap *cap; LIST_HEAD(tmp_list); int num_cap_releases; + __le32 barrier, *cap_barrier; + + down_read(&osdc->lock); + barrier = cpu_to_le32(osdc->epoch_barrier); + up_read(&osdc->lock); spin_lock(&session->s_cap_lock); again: @@ -1571,7 +1578,11 @@ again: head = msg->front.iov_base; head->num = cpu_to_le32(0); msg->front.iov_len = sizeof(*head); + + msg->hdr.version = cpu_to_le16(2); + msg->hdr.compat_version = cpu_to_le16(1); } + cap = list_first_entry(&tmp_list, struct ceph_cap, session_caps); list_del(&cap->session_caps); @@ -1589,6 +1600,11 @@ again: ceph_put_cap(mdsc, cap); if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { + // Append cap_barrier field + cap_barrier = msg->front.iov_base + msg->front.iov_len; + *cap_barrier = barrier; + msg->front.iov_len += sizeof(*cap_barrier); + msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); dout("send_cap_releases mds%d %p\n", session->s_mds, msg); ceph_con_send(&session->s_con, msg); @@ -1604,6 +1620,11 @@ again: spin_unlock(&session->s_cap_lock); if (msg) { + // Append cap_barrier field + cap_barrier = msg->front.iov_base + msg->front.iov_len; + *cap_barrier = barrier; + msg->front.iov_len += sizeof(*cap_barrier); + msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); dout("send_cap_releases mds%d %p\n", session->s_mds, msg); ceph_con_send(&session->s_con, msg); @@ -1993,7 +2014,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, if (req->r_pagelist) { struct ceph_pagelist *pagelist = req->r_pagelist; - atomic_inc(&pagelist->refcnt); + refcount_inc(&pagelist->refcnt); ceph_msg_data_add_pagelist(msg, pagelist); msg->hdr.data_len = cpu_to_le32(pagelist->length); } else { @@ -2640,8 +2661,10 @@ static void handle_session(struct ceph_mds_session *session, seq = le64_to_cpu(h->seq); mutex_lock(&mdsc->mutex); - if (op == CEPH_SESSION_CLOSE) + if (op == CEPH_SESSION_CLOSE) { + get_session(session); __unregister_session(mdsc, session); + } /* FIXME: this ttl calculation is generous */ session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose; mutex_unlock(&mdsc->mutex); @@ -2730,6 +2753,8 @@ static void handle_session(struct ceph_mds_session *session, kick_requests(mdsc, mds); mutex_unlock(&mdsc->mutex); } + if (op == CEPH_SESSION_CLOSE) + ceph_put_mds_session(session); return; bad: @@ -3109,7 +3134,7 @@ static void check_new_map(struct ceph_mds_client *mdsc, dout("check_new_map new %u old %u\n", newmap->m_epoch, oldmap->m_epoch); - for (i = 0; i < oldmap->m_max_mds && i < mdsc->max_sessions; i++) { + for (i = 0; i < oldmap->m_num_mds && i < mdsc->max_sessions; i++) { if (mdsc->sessions[i] == NULL) continue; s = mdsc->sessions[i]; @@ -3123,15 +3148,33 @@ static void check_new_map(struct ceph_mds_client *mdsc, ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "", ceph_session_state_name(s->s_state)); - if (i >= newmap->m_max_mds || + if (i >= newmap->m_num_mds || memcmp(ceph_mdsmap_get_addr(oldmap, i), ceph_mdsmap_get_addr(newmap, i), sizeof(struct ceph_entity_addr))) { if (s->s_state == CEPH_MDS_SESSION_OPENING) { /* the session never opened, just close it * out now */ + get_session(s); + __unregister_session(mdsc, s); __wake_requests(mdsc, &s->s_waiting); + ceph_put_mds_session(s); + } else if (i >= newmap->m_num_mds) { + /* force close session for stopped mds */ + get_session(s); __unregister_session(mdsc, s); + __wake_requests(mdsc, &s->s_waiting); + kick_requests(mdsc, i); + mutex_unlock(&mdsc->mutex); + + mutex_lock(&s->s_mutex); + cleanup_session_requests(mdsc, s); + remove_session_caps(s); + mutex_unlock(&s->s_mutex); + + ceph_put_mds_session(s); + + mutex_lock(&mdsc->mutex); } else { /* just close it */ mutex_unlock(&mdsc->mutex); @@ -3169,7 +3212,7 @@ static void check_new_map(struct ceph_mds_client *mdsc, } } - for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) { + for (i = 0; i < newmap->m_num_mds && i < mdsc->max_sessions; i++) { s = mdsc->sessions[i]; if (!s) continue; @@ -3883,7 +3926,7 @@ static struct ceph_connection *con_get(struct ceph_connection *con) struct ceph_mds_session *s = con->private; if (get_session(s)) { - dout("mdsc con_get %p ok (%d)\n", s, atomic_read(&s->s_ref)); + dout("mdsc con_get %p ok (%d)\n", s, refcount_read(&s->s_ref)); return con; } dout("mdsc con_get %p FAIL\n", s); @@ -3894,7 +3937,7 @@ static void con_put(struct ceph_connection *con) { struct ceph_mds_session *s = con->private; - dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref) - 1); + dout("mdsc con_put %p (%d)\n", s, refcount_read(&s->s_ref) - 1); ceph_put_mds_session(s); } diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index ac0475a2daa7..db57ae98ed34 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -7,6 +7,7 @@ #include <linux/mutex.h> #include <linux/rbtree.h> #include <linux/spinlock.h> +#include <linux/refcount.h> #include <linux/ceph/types.h> #include <linux/ceph/messenger.h> @@ -82,9 +83,10 @@ struct ceph_mds_reply_info_parsed { struct ceph_mds_reply_dirfrag *dir_dir; size_t dir_buf_size; int dir_nr; - bool dir_complete; bool dir_end; + bool dir_complete; bool hash_order; + bool offset_hash; struct ceph_mds_reply_dir_entry *dir_entries; }; @@ -104,10 +106,13 @@ struct ceph_mds_reply_info_parsed { /* * cap releases are batched and sent to the MDS en masse. + * + * Account for per-message overhead of mds_cap_release header + * and __le32 for osd epoch barrier trailing field. */ -#define CEPH_CAPS_PER_RELEASE ((PAGE_SIZE - \ +#define CEPH_CAPS_PER_RELEASE ((PAGE_SIZE - sizeof(u32) - \ sizeof(struct ceph_mds_cap_release)) / \ - sizeof(struct ceph_mds_cap_item)) + sizeof(struct ceph_mds_cap_item)) /* @@ -156,7 +161,7 @@ struct ceph_mds_session { unsigned long s_renew_requested; /* last time we sent a renew req */ u64 s_renew_seq; - atomic_t s_ref; + refcount_t s_ref; struct list_head s_waiting; /* waiting requests */ struct list_head s_unsafe; /* unsafe requests */ }; @@ -373,7 +378,7 @@ __ceph_lookup_mds_session(struct ceph_mds_client *, int mds); static inline struct ceph_mds_session * ceph_get_mds_session(struct ceph_mds_session *s) { - atomic_inc(&s->s_ref); + refcount_inc(&s->s_ref); return s; } diff --git a/fs/ceph/mdsmap.c b/fs/ceph/mdsmap.c index 5454e2327a5f..1a748cf88535 100644 --- a/fs/ceph/mdsmap.c +++ b/fs/ceph/mdsmap.c @@ -22,11 +22,11 @@ int ceph_mdsmap_get_random_mds(struct ceph_mdsmap *m) int i; /* special case for one mds */ - if (1 == m->m_max_mds && m->m_info[0].state > 0) + if (1 == m->m_num_mds && m->m_info[0].state > 0) return 0; /* count */ - for (i = 0; i < m->m_max_mds; i++) + for (i = 0; i < m->m_num_mds; i++) if (m->m_info[i].state > 0) n++; if (n == 0) @@ -135,8 +135,9 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end) m->m_session_autoclose = ceph_decode_32(p); m->m_max_file_size = ceph_decode_64(p); m->m_max_mds = ceph_decode_32(p); + m->m_num_mds = m->m_max_mds; - m->m_info = kcalloc(m->m_max_mds, sizeof(*m->m_info), GFP_NOFS); + m->m_info = kcalloc(m->m_num_mds, sizeof(*m->m_info), GFP_NOFS); if (m->m_info == NULL) goto nomem; @@ -207,9 +208,20 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end) ceph_pr_addr(&addr.in_addr), ceph_mds_state_name(state)); - if (mds < 0 || mds >= m->m_max_mds || state <= 0) + if (mds < 0 || state <= 0) continue; + if (mds >= m->m_num_mds) { + int new_num = max(mds + 1, m->m_num_mds * 2); + void *new_m_info = krealloc(m->m_info, + new_num * sizeof(*m->m_info), + GFP_NOFS | __GFP_ZERO); + if (!new_m_info) + goto nomem; + m->m_info = new_m_info; + m->m_num_mds = new_num; + } + info = &m->m_info[mds]; info->global_id = global_id; info->state = state; @@ -229,6 +241,14 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end) info->export_targets = NULL; } } + if (m->m_num_mds > m->m_max_mds) { + /* find max up mds */ + for (i = m->m_num_mds; i >= m->m_max_mds; i--) { + if (i == 0 || m->m_info[i-1].state > 0) + break; + } + m->m_num_mds = i; + } /* pg_pools */ ceph_decode_32_safe(p, end, n, bad); @@ -270,12 +290,22 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end) for (i = 0; i < n; i++) { s32 mds = ceph_decode_32(p); - if (mds >= 0 && mds < m->m_max_mds) { + if (mds >= 0 && mds < m->m_num_mds) { if (m->m_info[mds].laggy) num_laggy++; } } m->m_num_laggy = num_laggy; + + if (n > m->m_num_mds) { + void *new_m_info = krealloc(m->m_info, + n * sizeof(*m->m_info), + GFP_NOFS | __GFP_ZERO); + if (!new_m_info) + goto nomem; + m->m_info = new_m_info; + } + m->m_num_mds = n; } /* inc */ @@ -341,7 +371,7 @@ void ceph_mdsmap_destroy(struct ceph_mdsmap *m) { int i; - for (i = 0; i < m->m_max_mds; i++) + for (i = 0; i < m->m_num_mds; i++) kfree(m->m_info[i].export_targets); kfree(m->m_info); kfree(m->m_data_pg_pools); @@ -357,7 +387,7 @@ bool ceph_mdsmap_is_cluster_available(struct ceph_mdsmap *m) return false; if (m->m_num_laggy > 0) return false; - for (i = 0; i < m->m_max_mds; i++) { + for (i = 0; i < m->m_num_mds; i++) { if (m->m_info[i].state == CEPH_MDS_STATE_ACTIVE) nr_active++; } diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c index 8f8b41c2ef0f..dab5d6732345 100644 --- a/fs/ceph/snap.c +++ b/fs/ceph/snap.c @@ -519,7 +519,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci) capsnap->need_flush ? "" : "no_flush"); ihold(inode); - atomic_set(&capsnap->nref, 1); + refcount_set(&capsnap->nref, 1); INIT_LIST_HEAD(&capsnap->ci_item); capsnap->follows = old_snapc->seq; diff --git a/fs/ceph/super.c b/fs/ceph/super.c index a8c81b2052ca..8d7918ce694a 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -544,10 +544,6 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt, struct ceph_options *opt) { struct ceph_fs_client *fsc; - const u64 supported_features = - CEPH_FEATURE_FLOCK | CEPH_FEATURE_DIRLAYOUTHASH | - CEPH_FEATURE_MDSENC | CEPH_FEATURE_MDS_INLINE_DATA; - const u64 required_features = 0; int page_count; size_t size; int err = -ENOMEM; @@ -556,8 +552,7 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt, if (!fsc) return ERR_PTR(-ENOMEM); - fsc->client = ceph_create_client(opt, fsc, supported_features, - required_features); + fsc->client = ceph_create_client(opt, fsc); if (IS_ERR(fsc->client)) { err = PTR_ERR(fsc->client); goto fail; diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 176186b12457..a973acd8beaf 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -14,6 +14,7 @@ #include <linux/writeback.h> #include <linux/slab.h> #include <linux/posix_acl.h> +#include <linux/refcount.h> #include <linux/ceph/libceph.h> @@ -160,7 +161,7 @@ struct ceph_cap_flush { * data before flushing the snapped state (tracked here) back to the MDS. */ struct ceph_cap_snap { - atomic_t nref; + refcount_t nref; struct list_head ci_item; struct ceph_cap_flush cap_flush; @@ -189,7 +190,7 @@ struct ceph_cap_snap { static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap) { - if (atomic_dec_and_test(&capsnap->nref)) { + if (refcount_dec_and_test(&capsnap->nref)) { if (capsnap->xattr_blob) ceph_buffer_put(capsnap->xattr_blob); kfree(capsnap); @@ -471,6 +472,32 @@ static inline struct inode *ceph_find_inode(struct super_block *sb, #define CEPH_I_CAP_DROPPED (1 << 8) /* caps were forcibly dropped */ #define CEPH_I_KICK_FLUSH (1 << 9) /* kick flushing caps */ #define CEPH_I_FLUSH_SNAPS (1 << 10) /* need flush snapss */ +#define CEPH_I_ERROR_WRITE (1 << 11) /* have seen write errors */ + +/* + * We set the ERROR_WRITE bit when we start seeing write errors on an inode + * and then clear it when they start succeeding. Note that we do a lockless + * check first, and only take the lock if it looks like it needs to be changed. + * The write submission code just takes this as a hint, so we're not too + * worried if a few slip through in either direction. + */ +static inline void ceph_set_error_write(struct ceph_inode_info *ci) +{ + if (!(READ_ONCE(ci->i_ceph_flags) & CEPH_I_ERROR_WRITE)) { + spin_lock(&ci->i_ceph_lock); + ci->i_ceph_flags |= CEPH_I_ERROR_WRITE; + spin_unlock(&ci->i_ceph_lock); + } +} + +static inline void ceph_clear_error_write(struct ceph_inode_info *ci) +{ + if (READ_ONCE(ci->i_ceph_flags) & CEPH_I_ERROR_WRITE) { + spin_lock(&ci->i_ceph_lock); + ci->i_ceph_flags &= ~CEPH_I_ERROR_WRITE; + spin_unlock(&ci->i_ceph_lock); + } +} static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci, long long release_count, diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c index febc28f9e2c2..75267cdd5dfd 100644 --- a/fs/ceph/xattr.c +++ b/fs/ceph/xattr.c @@ -392,6 +392,7 @@ static int __set_xattr(struct ceph_inode_info *ci, if (update_xattr) { int err = 0; + if (xattr && (flags & XATTR_CREATE)) err = -EEXIST; else if (!xattr && (flags & XATTR_REPLACE)) @@ -399,12 +400,14 @@ static int __set_xattr(struct ceph_inode_info *ci, if (err) { kfree(name); kfree(val); + kfree(*newxattr); return err; } if (update_xattr < 0) { if (xattr) __remove_xattr(ci, xattr); kfree(name); + kfree(*newxattr); return 0; } } |