diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2015-07-02 20:35:00 +0200 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2015-07-02 20:35:00 +0200 |
commit | 0c76c6ba246043bbc5c0f9620a0645ae78217421 (patch) | |
tree | 644a4db58706c4e97478951f0a3a0087ddf26e5e /fs/ceph/mds_client.c | |
parent | Merge tag 'nfs-for-4.2-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs (diff) | |
parent | rbd: use GFP_NOIO in rbd_obj_request_create() (diff) | |
download | linux-0c76c6ba246043bbc5c0f9620a0645ae78217421.tar.xz linux-0c76c6ba246043bbc5c0f9620a0645ae78217421.zip |
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph updates from Sage Weil:
"We have a pile of bug fixes from Ilya, including a few patches that
sync up the CRUSH code with the latest from userspace.
There is also a long series from Zheng that fixes various issues with
snapshots, inline data, and directory fsync, some simplification and
improvement in the cap release code, and a rework of the caching of
directory contents.
To top it off there are a few small fixes and cleanups from Benoit and
Hong"
* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (40 commits)
rbd: use GFP_NOIO in rbd_obj_request_create()
crush: fix a bug in tree bucket decode
libceph: Fix ceph_tcp_sendpage()'s more boolean usage
libceph: Remove spurious kunmap() of the zero page
rbd: queue_depth map option
rbd: store rbd_options in rbd_device
rbd: terminate rbd_opts_tokens with Opt_err
ceph: fix ceph_writepages_start()
rbd: bump queue_max_segments
ceph: rework dcache readdir
crush: sync up with userspace
crush: fix crash from invalid 'take' argument
ceph: switch some GFP_NOFS memory allocation to GFP_KERNEL
ceph: pre-allocate data structure that tracks caps flushing
ceph: re-send flushing caps (which are revoked) in reconnect stage
ceph: send TID of the oldest pending caps flush to MDS
ceph: track pending caps flushing globally
ceph: track pending caps flushing accurately
libceph: fix wrong name "Ceph filesystem for Linux"
ceph: fix directory fsync
...
Diffstat (limited to 'fs/ceph/mds_client.c')
-rw-r--r-- | fs/ceph/mds_client.c | 425 |
1 files changed, 235 insertions, 190 deletions
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 84f37f34f9aa..6aa07af67603 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -8,6 +8,7 @@ #include <linux/debugfs.h> #include <linux/seq_file.h> #include <linux/utsname.h> +#include <linux/ratelimit.h> #include "super.h" #include "mds_client.h" @@ -458,7 +459,6 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, s->s_cap_reconnect = 0; s->s_cap_iterator = NULL; INIT_LIST_HEAD(&s->s_cap_releases); - INIT_LIST_HEAD(&s->s_cap_releases_done); INIT_LIST_HEAD(&s->s_cap_flushing); INIT_LIST_HEAD(&s->s_cap_snaps_flushing); @@ -629,6 +629,9 @@ static void __register_request(struct ceph_mds_client *mdsc, req->r_uid = current_fsuid(); req->r_gid = current_fsgid(); + if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK) + mdsc->oldest_tid = req->r_tid; + if (dir) { struct ceph_inode_info *ci = ceph_inode(dir); @@ -644,6 +647,21 @@ static void __unregister_request(struct ceph_mds_client *mdsc, struct ceph_mds_request *req) { dout("__unregister_request %p tid %lld\n", req, req->r_tid); + + if (req->r_tid == mdsc->oldest_tid) { + struct rb_node *p = rb_next(&req->r_node); + mdsc->oldest_tid = 0; + while (p) { + struct ceph_mds_request *next_req = + rb_entry(p, struct ceph_mds_request, r_node); + if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) { + mdsc->oldest_tid = next_req->r_tid; + break; + } + p = rb_next(p); + } + } + rb_erase(&req->r_node, &mdsc->request_tree); RB_CLEAR_NODE(&req->r_node); @@ -998,27 +1016,25 @@ void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, * session caps */ -/* - * Free preallocated cap messages assigned to this session - */ -static void cleanup_cap_releases(struct ceph_mds_session *session) +/* caller holds s_cap_lock, we drop it */ +static void cleanup_cap_releases(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session) + __releases(session->s_cap_lock) { - struct ceph_msg *msg; + LIST_HEAD(tmp_list); + list_splice_init(&session->s_cap_releases, &tmp_list); + session->s_num_cap_releases = 0; + spin_unlock(&session->s_cap_lock); - spin_lock(&session->s_cap_lock); - while (!list_empty(&session->s_cap_releases)) { - msg = list_first_entry(&session->s_cap_releases, - struct ceph_msg, list_head); - list_del_init(&msg->list_head); - ceph_msg_put(msg); - } - while (!list_empty(&session->s_cap_releases_done)) { - msg = list_first_entry(&session->s_cap_releases_done, - struct ceph_msg, list_head); - list_del_init(&msg->list_head); - ceph_msg_put(msg); + dout("cleanup_cap_releases mds%d\n", session->s_mds); + while (!list_empty(&tmp_list)) { + struct ceph_cap *cap; + /* zero out the in-progress message */ + cap = list_first_entry(&tmp_list, + struct ceph_cap, session_caps); + list_del(&cap->session_caps); + ceph_put_cap(mdsc, cap); } - spin_unlock(&session->s_cap_lock); } static void cleanup_session_requests(struct ceph_mds_client *mdsc, @@ -1033,7 +1049,8 @@ static void cleanup_session_requests(struct ceph_mds_client *mdsc, req = list_first_entry(&session->s_unsafe, struct ceph_mds_request, r_unsafe_item); list_del_init(&req->r_unsafe_item); - pr_info(" dropping unsafe request %llu\n", req->r_tid); + pr_warn_ratelimited(" dropping unsafe request %llu\n", + req->r_tid); __unregister_request(mdsc, req); } /* zero r_attempts, so kick_requests() will re-send requests */ @@ -1095,10 +1112,16 @@ static int iterate_session_caps(struct ceph_mds_session *session, dout("iterate_session_caps finishing cap %p removal\n", cap); BUG_ON(cap->session != session); + cap->session = NULL; list_del_init(&cap->session_caps); session->s_nr_caps--; - cap->session = NULL; - old_cap = cap; /* put_cap it w/o locks held */ + if (cap->queue_release) { + list_add_tail(&cap->session_caps, + &session->s_cap_releases); + session->s_num_cap_releases++; + } else { + old_cap = cap; /* put_cap it w/o locks held */ + } } if (ret < 0) goto out; @@ -1119,6 +1142,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) { struct ceph_inode_info *ci = ceph_inode(inode); + LIST_HEAD(to_remove); int drop = 0; dout("removing cap %p, ci is %p, inode is %p\n", @@ -1126,12 +1150,27 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, spin_lock(&ci->i_ceph_lock); __ceph_remove_cap(cap, false); if (!ci->i_auth_cap) { + struct ceph_cap_flush *cf; struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; + while (true) { + struct rb_node *n = rb_first(&ci->i_cap_flush_tree); + if (!n) + break; + cf = rb_entry(n, struct ceph_cap_flush, i_node); + rb_erase(&cf->i_node, &ci->i_cap_flush_tree); + list_add(&cf->list, &to_remove); + } + spin_lock(&mdsc->cap_dirty_lock); + + list_for_each_entry(cf, &to_remove, list) + rb_erase(&cf->g_node, &mdsc->cap_flush_tree); + if (!list_empty(&ci->i_dirty_item)) { - pr_info(" dropping dirty %s state for %p %lld\n", + pr_warn_ratelimited( + " dropping dirty %s state for %p %lld\n", ceph_cap_string(ci->i_dirty_caps), inode, ceph_ino(inode)); ci->i_dirty_caps = 0; @@ -1139,7 +1178,8 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, drop = 1; } if (!list_empty(&ci->i_flushing_item)) { - pr_info(" dropping dirty+flushing %s state for %p %lld\n", + pr_warn_ratelimited( + " dropping dirty+flushing %s state for %p %lld\n", ceph_cap_string(ci->i_flushing_caps), inode, ceph_ino(inode)); ci->i_flushing_caps = 0; @@ -1148,8 +1188,20 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, drop = 1; } spin_unlock(&mdsc->cap_dirty_lock); + + if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) { + list_add(&ci->i_prealloc_cap_flush->list, &to_remove); + ci->i_prealloc_cap_flush = NULL; + } } spin_unlock(&ci->i_ceph_lock); + while (!list_empty(&to_remove)) { + struct ceph_cap_flush *cf; + cf = list_first_entry(&to_remove, + struct ceph_cap_flush, list); + list_del(&cf->list); + ceph_free_cap_flush(cf); + } while (drop--) iput(inode); return 0; @@ -1191,11 +1243,12 @@ static void remove_session_caps(struct ceph_mds_session *session) spin_lock(&session->s_cap_lock); } } - spin_unlock(&session->s_cap_lock); + + // drop cap expires and unlock s_cap_lock + cleanup_cap_releases(session->s_mdsc, session); BUG_ON(session->s_nr_caps > 0); BUG_ON(!list_empty(&session->s_cap_flushing)); - cleanup_cap_releases(session); } /* @@ -1371,7 +1424,8 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued), ceph_cap_string(used), ceph_cap_string(wanted)); if (cap == ci->i_auth_cap) { - if (ci->i_dirty_caps | ci->i_flushing_caps) + if (ci->i_dirty_caps || ci->i_flushing_caps || + !list_empty(&ci->i_cap_snaps)) goto out; if ((used | wanted) & CEPH_CAP_ANY_WR) goto out; @@ -1417,121 +1471,80 @@ static int trim_caps(struct ceph_mds_client *mdsc, session->s_trim_caps = 0; } - ceph_add_cap_releases(mdsc, session); ceph_send_cap_releases(mdsc, session); return 0; } -/* - * Allocate cap_release messages. If there is a partially full message - * in the queue, try to allocate enough to cover it's remainder, so that - * we can send it immediately. - * - * Called under s_mutex. - */ -int ceph_add_cap_releases(struct ceph_mds_client *mdsc, - struct ceph_mds_session *session) +static int check_capsnap_flush(struct ceph_inode_info *ci, + u64 want_snap_seq) { - struct ceph_msg *msg, *partial = NULL; - struct ceph_mds_cap_release *head; - int err = -ENOMEM; - int extra = mdsc->fsc->mount_options->cap_release_safety; - int num; - - dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds, - extra); - - spin_lock(&session->s_cap_lock); - - if (!list_empty(&session->s_cap_releases)) { - msg = list_first_entry(&session->s_cap_releases, - struct ceph_msg, - list_head); - head = msg->front.iov_base; - num = le32_to_cpu(head->num); - if (num) { - dout(" partial %p with (%d/%d)\n", msg, num, - (int)CEPH_CAPS_PER_RELEASE); - extra += CEPH_CAPS_PER_RELEASE - num; - partial = msg; - } - } - while (session->s_num_cap_releases < session->s_nr_caps + extra) { - spin_unlock(&session->s_cap_lock); - msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE, - GFP_NOFS, false); - if (!msg) - goto out_unlocked; - dout("add_cap_releases %p msg %p now %d\n", session, msg, - (int)msg->front.iov_len); - head = msg->front.iov_base; - head->num = cpu_to_le32(0); - msg->front.iov_len = sizeof(*head); - spin_lock(&session->s_cap_lock); - list_add(&msg->list_head, &session->s_cap_releases); - session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE; - } - - if (partial) { - head = partial->front.iov_base; - num = le32_to_cpu(head->num); - dout(" queueing partial %p with %d/%d\n", partial, num, - (int)CEPH_CAPS_PER_RELEASE); - list_move_tail(&partial->list_head, - &session->s_cap_releases_done); - session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num; + int ret = 1; + spin_lock(&ci->i_ceph_lock); + if (want_snap_seq > 0 && !list_empty(&ci->i_cap_snaps)) { + struct ceph_cap_snap *capsnap = + list_first_entry(&ci->i_cap_snaps, + struct ceph_cap_snap, ci_item); + ret = capsnap->follows >= want_snap_seq; } - err = 0; - spin_unlock(&session->s_cap_lock); -out_unlocked: - return err; + spin_unlock(&ci->i_ceph_lock); + return ret; } -static int check_cap_flush(struct inode *inode, u64 want_flush_seq) +static int check_caps_flush(struct ceph_mds_client *mdsc, + u64 want_flush_tid) { - struct ceph_inode_info *ci = ceph_inode(inode); - int ret; - spin_lock(&ci->i_ceph_lock); - if (ci->i_flushing_caps) - ret = ci->i_cap_flush_seq >= want_flush_seq; - else - ret = 1; - spin_unlock(&ci->i_ceph_lock); + struct rb_node *n; + struct ceph_cap_flush *cf; + int ret = 1; + + spin_lock(&mdsc->cap_dirty_lock); + n = rb_first(&mdsc->cap_flush_tree); + cf = n ? rb_entry(n, struct ceph_cap_flush, g_node) : NULL; + if (cf && cf->tid <= want_flush_tid) { + dout("check_caps_flush still flushing tid %llu <= %llu\n", + cf->tid, want_flush_tid); + ret = 0; + } + spin_unlock(&mdsc->cap_dirty_lock); return ret; } /* * flush all dirty inode data to disk. * - * returns true if we've flushed through want_flush_seq + * returns true if we've flushed through want_flush_tid */ -static void wait_caps_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq) +static void wait_caps_flush(struct ceph_mds_client *mdsc, + u64 want_flush_tid, u64 want_snap_seq) { int mds; - dout("check_cap_flush want %lld\n", want_flush_seq); + dout("check_caps_flush want %llu snap want %llu\n", + want_flush_tid, want_snap_seq); mutex_lock(&mdsc->mutex); - for (mds = 0; mds < mdsc->max_sessions; mds++) { + for (mds = 0; mds < mdsc->max_sessions; ) { struct ceph_mds_session *session = mdsc->sessions[mds]; struct inode *inode = NULL; - if (!session) + if (!session) { + mds++; continue; + } get_session(session); mutex_unlock(&mdsc->mutex); mutex_lock(&session->s_mutex); - if (!list_empty(&session->s_cap_flushing)) { - struct ceph_inode_info *ci = - list_entry(session->s_cap_flushing.next, - struct ceph_inode_info, - i_flushing_item); - - if (!check_cap_flush(&ci->vfs_inode, want_flush_seq)) { - dout("check_cap_flush still flushing %p " - "seq %lld <= %lld to mds%d\n", - &ci->vfs_inode, ci->i_cap_flush_seq, - want_flush_seq, session->s_mds); + if (!list_empty(&session->s_cap_snaps_flushing)) { + struct ceph_cap_snap *capsnap = + list_first_entry(&session->s_cap_snaps_flushing, + struct ceph_cap_snap, + flushing_item); + struct ceph_inode_info *ci = capsnap->ci; + if (!check_capsnap_flush(ci, want_snap_seq)) { + dout("check_cap_flush still flushing snap %p " + "follows %lld <= %lld to mds%d\n", + &ci->vfs_inode, capsnap->follows, + want_snap_seq, mds); inode = igrab(&ci->vfs_inode); } } @@ -1540,15 +1553,21 @@ static void wait_caps_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq) if (inode) { wait_event(mdsc->cap_flushing_wq, - check_cap_flush(inode, want_flush_seq)); + check_capsnap_flush(ceph_inode(inode), + want_snap_seq)); iput(inode); + } else { + mds++; } mutex_lock(&mdsc->mutex); } - mutex_unlock(&mdsc->mutex); - dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq); + + wait_event(mdsc->cap_flushing_wq, + check_caps_flush(mdsc, want_flush_tid)); + + dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid); } /* @@ -1557,60 +1576,74 @@ static void wait_caps_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq) void ceph_send_cap_releases(struct ceph_mds_client *mdsc, struct ceph_mds_session *session) { - struct ceph_msg *msg; + struct ceph_msg *msg = NULL; + struct ceph_mds_cap_release *head; + struct ceph_mds_cap_item *item; + struct ceph_cap *cap; + LIST_HEAD(tmp_list); + int num_cap_releases; - dout("send_cap_releases mds%d\n", session->s_mds); spin_lock(&session->s_cap_lock); - while (!list_empty(&session->s_cap_releases_done)) { - msg = list_first_entry(&session->s_cap_releases_done, - struct ceph_msg, list_head); - list_del_init(&msg->list_head); - spin_unlock(&session->s_cap_lock); - msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); - dout("send_cap_releases mds%d %p\n", session->s_mds, msg); - ceph_con_send(&session->s_con, msg); - spin_lock(&session->s_cap_lock); - } +again: + list_splice_init(&session->s_cap_releases, &tmp_list); + num_cap_releases = session->s_num_cap_releases; + session->s_num_cap_releases = 0; spin_unlock(&session->s_cap_lock); -} -static void discard_cap_releases(struct ceph_mds_client *mdsc, - struct ceph_mds_session *session) -{ - struct ceph_msg *msg; - struct ceph_mds_cap_release *head; - unsigned num; - - dout("discard_cap_releases mds%d\n", session->s_mds); + while (!list_empty(&tmp_list)) { + if (!msg) { + msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, + PAGE_CACHE_SIZE, GFP_NOFS, false); + if (!msg) + goto out_err; + head = msg->front.iov_base; + head->num = cpu_to_le32(0); + msg->front.iov_len = sizeof(*head); + } + cap = list_first_entry(&tmp_list, struct ceph_cap, + session_caps); + list_del(&cap->session_caps); + num_cap_releases--; - if (!list_empty(&session->s_cap_releases)) { - /* zero out the in-progress message */ - msg = list_first_entry(&session->s_cap_releases, - struct ceph_msg, list_head); head = msg->front.iov_base; - num = le32_to_cpu(head->num); - dout("discard_cap_releases mds%d %p %u\n", - session->s_mds, msg, num); - head->num = cpu_to_le32(0); - msg->front.iov_len = sizeof(*head); - session->s_num_cap_releases += num; + le32_add_cpu(&head->num, 1); + item = msg->front.iov_base + msg->front.iov_len; + item->ino = cpu_to_le64(cap->cap_ino); + item->cap_id = cpu_to_le64(cap->cap_id); + item->migrate_seq = cpu_to_le32(cap->mseq); + item->seq = cpu_to_le32(cap->issue_seq); + msg->front.iov_len += sizeof(*item); + + ceph_put_cap(mdsc, cap); + + if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { + msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); + dout("send_cap_releases mds%d %p\n", session->s_mds, msg); + ceph_con_send(&session->s_con, msg); + msg = NULL; + } } - /* requeue completed messages */ - while (!list_empty(&session->s_cap_releases_done)) { - msg = list_first_entry(&session->s_cap_releases_done, - struct ceph_msg, list_head); - list_del_init(&msg->list_head); + BUG_ON(num_cap_releases != 0); - head = msg->front.iov_base; - num = le32_to_cpu(head->num); - dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, - num); - session->s_num_cap_releases += num; - head->num = cpu_to_le32(0); - msg->front.iov_len = sizeof(*head); - list_add(&msg->list_head, &session->s_cap_releases); + spin_lock(&session->s_cap_lock); + if (!list_empty(&session->s_cap_releases)) + goto again; + spin_unlock(&session->s_cap_lock); + + if (msg) { + msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); + dout("send_cap_releases mds%d %p\n", session->s_mds, msg); + ceph_con_send(&session->s_con, msg); } + return; +out_err: + pr_err("send_cap_releases mds%d, failed to allocate message\n", + session->s_mds); + spin_lock(&session->s_cap_lock); + list_splice(&tmp_list, &session->s_cap_releases); + session->s_num_cap_releases += num_cap_releases; + spin_unlock(&session->s_cap_lock); } /* @@ -1635,7 +1668,8 @@ int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req, order = get_order(size * num_entries); while (order >= 0) { - rinfo->dir_in = (void*)__get_free_pages(GFP_NOFS | __GFP_NOWARN, + rinfo->dir_in = (void*)__get_free_pages(GFP_KERNEL | + __GFP_NOWARN, order); if (rinfo->dir_in) break; @@ -1697,13 +1731,9 @@ static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc) struct ceph_mds_request, r_node); } -static u64 __get_oldest_tid(struct ceph_mds_client *mdsc) +static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc) { - struct ceph_mds_request *req = __get_oldest_req(mdsc); - - if (req) - return req->r_tid; - return 0; + return mdsc->oldest_tid; } /* @@ -2267,15 +2297,18 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, /* wait */ mutex_unlock(&mdsc->mutex); dout("do_request waiting\n"); - if (req->r_timeout) { - err = (long)wait_for_completion_killable_timeout( - &req->r_completion, req->r_timeout); - if (err == 0) - err = -EIO; - } else if (req->r_wait_for_completion) { + if (!req->r_timeout && req->r_wait_for_completion) { err = req->r_wait_for_completion(mdsc, req); } else { - err = wait_for_completion_killable(&req->r_completion); + long timeleft = wait_for_completion_killable_timeout( + &req->r_completion, + ceph_timeout_jiffies(req->r_timeout)); + if (timeleft > 0) + err = 0; + else if (!timeleft) + err = -EIO; /* timed out */ + else + err = timeleft; /* killed */ } dout("do_request waited, got %d\n", err); mutex_lock(&mdsc->mutex); @@ -2496,7 +2529,6 @@ out_err: } mutex_unlock(&mdsc->mutex); - ceph_add_cap_releases(mdsc, req->r_session); mutex_unlock(&session->s_mutex); /* kick calling process */ @@ -2888,8 +2920,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, */ session->s_cap_reconnect = 1; /* drop old cap expires; we're about to reestablish that state */ - discard_cap_releases(mdsc, session); - spin_unlock(&session->s_cap_lock); + cleanup_cap_releases(mdsc, session); /* trim unused caps to reduce MDS's cache rejoin time */ if (mdsc->fsc->sb->s_root) @@ -2956,6 +2987,9 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, reply->hdr.data_len = cpu_to_le32(pagelist->length); ceph_msg_data_add_pagelist(reply, pagelist); + + ceph_early_kick_flushing_caps(mdsc, session); + ceph_con_send(&session->s_con, reply); mutex_unlock(&session->s_mutex); @@ -3352,7 +3386,6 @@ static void delayed_work(struct work_struct *work) send_renew_caps(mdsc, s); else ceph_con_keepalive(&s->s_con); - ceph_add_cap_releases(mdsc, s); if (s->s_state == CEPH_MDS_SESSION_OPEN || s->s_state == CEPH_MDS_SESSION_HUNG) ceph_send_cap_releases(mdsc, s); @@ -3390,11 +3423,13 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) atomic_set(&mdsc->num_sessions, 0); mdsc->max_sessions = 0; mdsc->stopping = 0; + mdsc->last_snap_seq = 0; init_rwsem(&mdsc->snap_rwsem); mdsc->snap_realms = RB_ROOT; INIT_LIST_HEAD(&mdsc->snap_empty); spin_lock_init(&mdsc->snap_empty_lock); mdsc->last_tid = 0; + mdsc->oldest_tid = 0; mdsc->request_tree = RB_ROOT; INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work); mdsc->last_renew_caps = jiffies; @@ -3402,7 +3437,8 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) spin_lock_init(&mdsc->cap_delay_lock); INIT_LIST_HEAD(&mdsc->snap_flush_list); spin_lock_init(&mdsc->snap_flush_lock); - mdsc->cap_flush_seq = 0; + mdsc->last_cap_flush_tid = 1; + mdsc->cap_flush_tree = RB_ROOT; INIT_LIST_HEAD(&mdsc->cap_dirty); INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); mdsc->num_cap_flushing = 0; @@ -3414,6 +3450,9 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) ceph_caps_init(mdsc); ceph_adjust_min_caps(mdsc, fsc->min_caps); + init_rwsem(&mdsc->pool_perm_rwsem); + mdsc->pool_perm_tree = RB_ROOT; + return 0; } @@ -3423,8 +3462,8 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) */ static void wait_requests(struct ceph_mds_client *mdsc) { + struct ceph_options *opts = mdsc->fsc->client->options; struct ceph_mds_request *req; - struct ceph_fs_client *fsc = mdsc->fsc; mutex_lock(&mdsc->mutex); if (__get_oldest_req(mdsc)) { @@ -3432,7 +3471,7 @@ static void wait_requests(struct ceph_mds_client *mdsc) dout("wait_requests waiting for requests\n"); wait_for_completion_timeout(&mdsc->safe_umount_waiters, - fsc->client->options->mount_timeout * HZ); + ceph_timeout_jiffies(opts->mount_timeout)); /* tear down remaining requests */ mutex_lock(&mdsc->mutex); @@ -3485,7 +3524,8 @@ restart: nextreq = rb_entry(n, struct ceph_mds_request, r_node); else nextreq = NULL; - if ((req->r_op & CEPH_MDS_OP_WRITE)) { + if (req->r_op != CEPH_MDS_OP_SETFILELOCK && + (req->r_op & CEPH_MDS_OP_WRITE)) { /* write op */ ceph_mdsc_get_request(req); if (nextreq) @@ -3513,7 +3553,7 @@ restart: void ceph_mdsc_sync(struct ceph_mds_client *mdsc) { - u64 want_tid, want_flush; + u64 want_tid, want_flush, want_snap; if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN) return; @@ -3525,13 +3565,18 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc) ceph_flush_dirty_caps(mdsc); spin_lock(&mdsc->cap_dirty_lock); - want_flush = mdsc->cap_flush_seq; + want_flush = mdsc->last_cap_flush_tid; spin_unlock(&mdsc->cap_dirty_lock); - dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush); + down_read(&mdsc->snap_rwsem); + want_snap = mdsc->last_snap_seq; + up_read(&mdsc->snap_rwsem); + + dout("sync want tid %lld flush_seq %lld snap_seq %lld\n", + want_tid, want_flush, want_snap); wait_unsafe_requests(mdsc, want_tid); - wait_caps_flush(mdsc, want_flush); + wait_caps_flush(mdsc, want_flush, want_snap); } /* @@ -3549,10 +3594,9 @@ static bool done_closing_sessions(struct ceph_mds_client *mdsc) */ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) { + struct ceph_options *opts = mdsc->fsc->client->options; struct ceph_mds_session *session; int i; - struct ceph_fs_client *fsc = mdsc->fsc; - unsigned long timeout = fsc->client->options->mount_timeout * HZ; dout("close_sessions\n"); @@ -3573,7 +3617,7 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) dout("waiting for sessions to close\n"); wait_event_timeout(mdsc->session_close_wq, done_closing_sessions(mdsc), - timeout); + ceph_timeout_jiffies(opts->mount_timeout)); /* tear down remaining sessions */ mutex_lock(&mdsc->mutex); @@ -3607,6 +3651,7 @@ static void ceph_mdsc_stop(struct ceph_mds_client *mdsc) ceph_mdsmap_destroy(mdsc->mdsmap); kfree(mdsc->sessions); ceph_caps_finalize(mdsc); + ceph_pool_perm_destroy(mdsc); } void ceph_mdsc_destroy(struct ceph_fs_client *fsc) |