summaryrefslogtreecommitdiffstats
path: root/fs/ceph/mds_client.c
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2015-07-02 20:35:00 +0200
committerLinus Torvalds <torvalds@linux-foundation.org>2015-07-02 20:35:00 +0200
commit0c76c6ba246043bbc5c0f9620a0645ae78217421 (patch)
tree644a4db58706c4e97478951f0a3a0087ddf26e5e /fs/ceph/mds_client.c
parentMerge tag 'nfs-for-4.2-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs (diff)
parentrbd: use GFP_NOIO in rbd_obj_request_create() (diff)
downloadlinux-0c76c6ba246043bbc5c0f9620a0645ae78217421.tar.xz
linux-0c76c6ba246043bbc5c0f9620a0645ae78217421.zip
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph updates from Sage Weil: "We have a pile of bug fixes from Ilya, including a few patches that sync up the CRUSH code with the latest from userspace. There is also a long series from Zheng that fixes various issues with snapshots, inline data, and directory fsync, some simplification and improvement in the cap release code, and a rework of the caching of directory contents. To top it off there are a few small fixes and cleanups from Benoit and Hong" * 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (40 commits) rbd: use GFP_NOIO in rbd_obj_request_create() crush: fix a bug in tree bucket decode libceph: Fix ceph_tcp_sendpage()'s more boolean usage libceph: Remove spurious kunmap() of the zero page rbd: queue_depth map option rbd: store rbd_options in rbd_device rbd: terminate rbd_opts_tokens with Opt_err ceph: fix ceph_writepages_start() rbd: bump queue_max_segments ceph: rework dcache readdir crush: sync up with userspace crush: fix crash from invalid 'take' argument ceph: switch some GFP_NOFS memory allocation to GFP_KERNEL ceph: pre-allocate data structure that tracks caps flushing ceph: re-send flushing caps (which are revoked) in reconnect stage ceph: send TID of the oldest pending caps flush to MDS ceph: track pending caps flushing globally ceph: track pending caps flushing accurately libceph: fix wrong name "Ceph filesystem for Linux" ceph: fix directory fsync ...
Diffstat (limited to 'fs/ceph/mds_client.c')
-rw-r--r--fs/ceph/mds_client.c425
1 files changed, 235 insertions, 190 deletions
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 84f37f34f9aa..6aa07af67603 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -8,6 +8,7 @@
#include <linux/debugfs.h>
#include <linux/seq_file.h>
#include <linux/utsname.h>
+#include <linux/ratelimit.h>
#include "super.h"
#include "mds_client.h"
@@ -458,7 +459,6 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
s->s_cap_reconnect = 0;
s->s_cap_iterator = NULL;
INIT_LIST_HEAD(&s->s_cap_releases);
- INIT_LIST_HEAD(&s->s_cap_releases_done);
INIT_LIST_HEAD(&s->s_cap_flushing);
INIT_LIST_HEAD(&s->s_cap_snaps_flushing);
@@ -629,6 +629,9 @@ static void __register_request(struct ceph_mds_client *mdsc,
req->r_uid = current_fsuid();
req->r_gid = current_fsgid();
+ if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
+ mdsc->oldest_tid = req->r_tid;
+
if (dir) {
struct ceph_inode_info *ci = ceph_inode(dir);
@@ -644,6 +647,21 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
struct ceph_mds_request *req)
{
dout("__unregister_request %p tid %lld\n", req, req->r_tid);
+
+ if (req->r_tid == mdsc->oldest_tid) {
+ struct rb_node *p = rb_next(&req->r_node);
+ mdsc->oldest_tid = 0;
+ while (p) {
+ struct ceph_mds_request *next_req =
+ rb_entry(p, struct ceph_mds_request, r_node);
+ if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
+ mdsc->oldest_tid = next_req->r_tid;
+ break;
+ }
+ p = rb_next(p);
+ }
+ }
+
rb_erase(&req->r_node, &mdsc->request_tree);
RB_CLEAR_NODE(&req->r_node);
@@ -998,27 +1016,25 @@ void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
* session caps
*/
-/*
- * Free preallocated cap messages assigned to this session
- */
-static void cleanup_cap_releases(struct ceph_mds_session *session)
+/* caller holds s_cap_lock, we drop it */
+static void cleanup_cap_releases(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session)
+ __releases(session->s_cap_lock)
{
- struct ceph_msg *msg;
+ LIST_HEAD(tmp_list);
+ list_splice_init(&session->s_cap_releases, &tmp_list);
+ session->s_num_cap_releases = 0;
+ spin_unlock(&session->s_cap_lock);
- spin_lock(&session->s_cap_lock);
- while (!list_empty(&session->s_cap_releases)) {
- msg = list_first_entry(&session->s_cap_releases,
- struct ceph_msg, list_head);
- list_del_init(&msg->list_head);
- ceph_msg_put(msg);
- }
- while (!list_empty(&session->s_cap_releases_done)) {
- msg = list_first_entry(&session->s_cap_releases_done,
- struct ceph_msg, list_head);
- list_del_init(&msg->list_head);
- ceph_msg_put(msg);
+ dout("cleanup_cap_releases mds%d\n", session->s_mds);
+ while (!list_empty(&tmp_list)) {
+ struct ceph_cap *cap;
+ /* zero out the in-progress message */
+ cap = list_first_entry(&tmp_list,
+ struct ceph_cap, session_caps);
+ list_del(&cap->session_caps);
+ ceph_put_cap(mdsc, cap);
}
- spin_unlock(&session->s_cap_lock);
}
static void cleanup_session_requests(struct ceph_mds_client *mdsc,
@@ -1033,7 +1049,8 @@ static void cleanup_session_requests(struct ceph_mds_client *mdsc,
req = list_first_entry(&session->s_unsafe,
struct ceph_mds_request, r_unsafe_item);
list_del_init(&req->r_unsafe_item);
- pr_info(" dropping unsafe request %llu\n", req->r_tid);
+ pr_warn_ratelimited(" dropping unsafe request %llu\n",
+ req->r_tid);
__unregister_request(mdsc, req);
}
/* zero r_attempts, so kick_requests() will re-send requests */
@@ -1095,10 +1112,16 @@ static int iterate_session_caps(struct ceph_mds_session *session,
dout("iterate_session_caps finishing cap %p removal\n",
cap);
BUG_ON(cap->session != session);
+ cap->session = NULL;
list_del_init(&cap->session_caps);
session->s_nr_caps--;
- cap->session = NULL;
- old_cap = cap; /* put_cap it w/o locks held */
+ if (cap->queue_release) {
+ list_add_tail(&cap->session_caps,
+ &session->s_cap_releases);
+ session->s_num_cap_releases++;
+ } else {
+ old_cap = cap; /* put_cap it w/o locks held */
+ }
}
if (ret < 0)
goto out;
@@ -1119,6 +1142,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
void *arg)
{
struct ceph_inode_info *ci = ceph_inode(inode);
+ LIST_HEAD(to_remove);
int drop = 0;
dout("removing cap %p, ci is %p, inode is %p\n",
@@ -1126,12 +1150,27 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
spin_lock(&ci->i_ceph_lock);
__ceph_remove_cap(cap, false);
if (!ci->i_auth_cap) {
+ struct ceph_cap_flush *cf;
struct ceph_mds_client *mdsc =
ceph_sb_to_client(inode->i_sb)->mdsc;
+ while (true) {
+ struct rb_node *n = rb_first(&ci->i_cap_flush_tree);
+ if (!n)
+ break;
+ cf = rb_entry(n, struct ceph_cap_flush, i_node);
+ rb_erase(&cf->i_node, &ci->i_cap_flush_tree);
+ list_add(&cf->list, &to_remove);
+ }
+
spin_lock(&mdsc->cap_dirty_lock);
+
+ list_for_each_entry(cf, &to_remove, list)
+ rb_erase(&cf->g_node, &mdsc->cap_flush_tree);
+
if (!list_empty(&ci->i_dirty_item)) {
- pr_info(" dropping dirty %s state for %p %lld\n",
+ pr_warn_ratelimited(
+ " dropping dirty %s state for %p %lld\n",
ceph_cap_string(ci->i_dirty_caps),
inode, ceph_ino(inode));
ci->i_dirty_caps = 0;
@@ -1139,7 +1178,8 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
drop = 1;
}
if (!list_empty(&ci->i_flushing_item)) {
- pr_info(" dropping dirty+flushing %s state for %p %lld\n",
+ pr_warn_ratelimited(
+ " dropping dirty+flushing %s state for %p %lld\n",
ceph_cap_string(ci->i_flushing_caps),
inode, ceph_ino(inode));
ci->i_flushing_caps = 0;
@@ -1148,8 +1188,20 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
drop = 1;
}
spin_unlock(&mdsc->cap_dirty_lock);
+
+ if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
+ list_add(&ci->i_prealloc_cap_flush->list, &to_remove);
+ ci->i_prealloc_cap_flush = NULL;
+ }
}
spin_unlock(&ci->i_ceph_lock);
+ while (!list_empty(&to_remove)) {
+ struct ceph_cap_flush *cf;
+ cf = list_first_entry(&to_remove,
+ struct ceph_cap_flush, list);
+ list_del(&cf->list);
+ ceph_free_cap_flush(cf);
+ }
while (drop--)
iput(inode);
return 0;
@@ -1191,11 +1243,12 @@ static void remove_session_caps(struct ceph_mds_session *session)
spin_lock(&session->s_cap_lock);
}
}
- spin_unlock(&session->s_cap_lock);
+
+ // drop cap expires and unlock s_cap_lock
+ cleanup_cap_releases(session->s_mdsc, session);
BUG_ON(session->s_nr_caps > 0);
BUG_ON(!list_empty(&session->s_cap_flushing));
- cleanup_cap_releases(session);
}
/*
@@ -1371,7 +1424,8 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
ceph_cap_string(used), ceph_cap_string(wanted));
if (cap == ci->i_auth_cap) {
- if (ci->i_dirty_caps | ci->i_flushing_caps)
+ if (ci->i_dirty_caps || ci->i_flushing_caps ||
+ !list_empty(&ci->i_cap_snaps))
goto out;
if ((used | wanted) & CEPH_CAP_ANY_WR)
goto out;
@@ -1417,121 +1471,80 @@ static int trim_caps(struct ceph_mds_client *mdsc,
session->s_trim_caps = 0;
}
- ceph_add_cap_releases(mdsc, session);
ceph_send_cap_releases(mdsc, session);
return 0;
}
-/*
- * Allocate cap_release messages. If there is a partially full message
- * in the queue, try to allocate enough to cover it's remainder, so that
- * we can send it immediately.
- *
- * Called under s_mutex.
- */
-int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
- struct ceph_mds_session *session)
+static int check_capsnap_flush(struct ceph_inode_info *ci,
+ u64 want_snap_seq)
{
- struct ceph_msg *msg, *partial = NULL;
- struct ceph_mds_cap_release *head;
- int err = -ENOMEM;
- int extra = mdsc->fsc->mount_options->cap_release_safety;
- int num;
-
- dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds,
- extra);
-
- spin_lock(&session->s_cap_lock);
-
- if (!list_empty(&session->s_cap_releases)) {
- msg = list_first_entry(&session->s_cap_releases,
- struct ceph_msg,
- list_head);
- head = msg->front.iov_base;
- num = le32_to_cpu(head->num);
- if (num) {
- dout(" partial %p with (%d/%d)\n", msg, num,
- (int)CEPH_CAPS_PER_RELEASE);
- extra += CEPH_CAPS_PER_RELEASE - num;
- partial = msg;
- }
- }
- while (session->s_num_cap_releases < session->s_nr_caps + extra) {
- spin_unlock(&session->s_cap_lock);
- msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
- GFP_NOFS, false);
- if (!msg)
- goto out_unlocked;
- dout("add_cap_releases %p msg %p now %d\n", session, msg,
- (int)msg->front.iov_len);
- head = msg->front.iov_base;
- head->num = cpu_to_le32(0);
- msg->front.iov_len = sizeof(*head);
- spin_lock(&session->s_cap_lock);
- list_add(&msg->list_head, &session->s_cap_releases);
- session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE;
- }
-
- if (partial) {
- head = partial->front.iov_base;
- num = le32_to_cpu(head->num);
- dout(" queueing partial %p with %d/%d\n", partial, num,
- (int)CEPH_CAPS_PER_RELEASE);
- list_move_tail(&partial->list_head,
- &session->s_cap_releases_done);
- session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num;
+ int ret = 1;
+ spin_lock(&ci->i_ceph_lock);
+ if (want_snap_seq > 0 && !list_empty(&ci->i_cap_snaps)) {
+ struct ceph_cap_snap *capsnap =
+ list_first_entry(&ci->i_cap_snaps,
+ struct ceph_cap_snap, ci_item);
+ ret = capsnap->follows >= want_snap_seq;
}
- err = 0;
- spin_unlock(&session->s_cap_lock);
-out_unlocked:
- return err;
+ spin_unlock(&ci->i_ceph_lock);
+ return ret;
}
-static int check_cap_flush(struct inode *inode, u64 want_flush_seq)
+static int check_caps_flush(struct ceph_mds_client *mdsc,
+ u64 want_flush_tid)
{
- struct ceph_inode_info *ci = ceph_inode(inode);
- int ret;
- spin_lock(&ci->i_ceph_lock);
- if (ci->i_flushing_caps)
- ret = ci->i_cap_flush_seq >= want_flush_seq;
- else
- ret = 1;
- spin_unlock(&ci->i_ceph_lock);
+ struct rb_node *n;
+ struct ceph_cap_flush *cf;
+ int ret = 1;
+
+ spin_lock(&mdsc->cap_dirty_lock);
+ n = rb_first(&mdsc->cap_flush_tree);
+ cf = n ? rb_entry(n, struct ceph_cap_flush, g_node) : NULL;
+ if (cf && cf->tid <= want_flush_tid) {
+ dout("check_caps_flush still flushing tid %llu <= %llu\n",
+ cf->tid, want_flush_tid);
+ ret = 0;
+ }
+ spin_unlock(&mdsc->cap_dirty_lock);
return ret;
}
/*
* flush all dirty inode data to disk.
*
- * returns true if we've flushed through want_flush_seq
+ * returns true if we've flushed through want_flush_tid
*/
-static void wait_caps_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
+static void wait_caps_flush(struct ceph_mds_client *mdsc,
+ u64 want_flush_tid, u64 want_snap_seq)
{
int mds;
- dout("check_cap_flush want %lld\n", want_flush_seq);
+ dout("check_caps_flush want %llu snap want %llu\n",
+ want_flush_tid, want_snap_seq);
mutex_lock(&mdsc->mutex);
- for (mds = 0; mds < mdsc->max_sessions; mds++) {
+ for (mds = 0; mds < mdsc->max_sessions; ) {
struct ceph_mds_session *session = mdsc->sessions[mds];
struct inode *inode = NULL;
- if (!session)
+ if (!session) {
+ mds++;
continue;
+ }
get_session(session);
mutex_unlock(&mdsc->mutex);
mutex_lock(&session->s_mutex);
- if (!list_empty(&session->s_cap_flushing)) {
- struct ceph_inode_info *ci =
- list_entry(session->s_cap_flushing.next,
- struct ceph_inode_info,
- i_flushing_item);
-
- if (!check_cap_flush(&ci->vfs_inode, want_flush_seq)) {
- dout("check_cap_flush still flushing %p "
- "seq %lld <= %lld to mds%d\n",
- &ci->vfs_inode, ci->i_cap_flush_seq,
- want_flush_seq, session->s_mds);
+ if (!list_empty(&session->s_cap_snaps_flushing)) {
+ struct ceph_cap_snap *capsnap =
+ list_first_entry(&session->s_cap_snaps_flushing,
+ struct ceph_cap_snap,
+ flushing_item);
+ struct ceph_inode_info *ci = capsnap->ci;
+ if (!check_capsnap_flush(ci, want_snap_seq)) {
+ dout("check_cap_flush still flushing snap %p "
+ "follows %lld <= %lld to mds%d\n",
+ &ci->vfs_inode, capsnap->follows,
+ want_snap_seq, mds);
inode = igrab(&ci->vfs_inode);
}
}
@@ -1540,15 +1553,21 @@ static void wait_caps_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
if (inode) {
wait_event(mdsc->cap_flushing_wq,
- check_cap_flush(inode, want_flush_seq));
+ check_capsnap_flush(ceph_inode(inode),
+ want_snap_seq));
iput(inode);
+ } else {
+ mds++;
}
mutex_lock(&mdsc->mutex);
}
-
mutex_unlock(&mdsc->mutex);
- dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq);
+
+ wait_event(mdsc->cap_flushing_wq,
+ check_caps_flush(mdsc, want_flush_tid));
+
+ dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid);
}
/*
@@ -1557,60 +1576,74 @@ static void wait_caps_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session)
{
- struct ceph_msg *msg;
+ struct ceph_msg *msg = NULL;
+ struct ceph_mds_cap_release *head;
+ struct ceph_mds_cap_item *item;
+ struct ceph_cap *cap;
+ LIST_HEAD(tmp_list);
+ int num_cap_releases;
- dout("send_cap_releases mds%d\n", session->s_mds);
spin_lock(&session->s_cap_lock);
- while (!list_empty(&session->s_cap_releases_done)) {
- msg = list_first_entry(&session->s_cap_releases_done,
- struct ceph_msg, list_head);
- list_del_init(&msg->list_head);
- spin_unlock(&session->s_cap_lock);
- msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
- dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
- ceph_con_send(&session->s_con, msg);
- spin_lock(&session->s_cap_lock);
- }
+again:
+ list_splice_init(&session->s_cap_releases, &tmp_list);
+ num_cap_releases = session->s_num_cap_releases;
+ session->s_num_cap_releases = 0;
spin_unlock(&session->s_cap_lock);
-}
-static void discard_cap_releases(struct ceph_mds_client *mdsc,
- struct ceph_mds_session *session)
-{
- struct ceph_msg *msg;
- struct ceph_mds_cap_release *head;
- unsigned num;
-
- dout("discard_cap_releases mds%d\n", session->s_mds);
+ while (!list_empty(&tmp_list)) {
+ if (!msg) {
+ msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
+ PAGE_CACHE_SIZE, GFP_NOFS, false);
+ if (!msg)
+ goto out_err;
+ head = msg->front.iov_base;
+ head->num = cpu_to_le32(0);
+ msg->front.iov_len = sizeof(*head);
+ }
+ cap = list_first_entry(&tmp_list, struct ceph_cap,
+ session_caps);
+ list_del(&cap->session_caps);
+ num_cap_releases--;
- if (!list_empty(&session->s_cap_releases)) {
- /* zero out the in-progress message */
- msg = list_first_entry(&session->s_cap_releases,
- struct ceph_msg, list_head);
head = msg->front.iov_base;
- num = le32_to_cpu(head->num);
- dout("discard_cap_releases mds%d %p %u\n",
- session->s_mds, msg, num);
- head->num = cpu_to_le32(0);
- msg->front.iov_len = sizeof(*head);
- session->s_num_cap_releases += num;
+ le32_add_cpu(&head->num, 1);
+ item = msg->front.iov_base + msg->front.iov_len;
+ item->ino = cpu_to_le64(cap->cap_ino);
+ item->cap_id = cpu_to_le64(cap->cap_id);
+ item->migrate_seq = cpu_to_le32(cap->mseq);
+ item->seq = cpu_to_le32(cap->issue_seq);
+ msg->front.iov_len += sizeof(*item);
+
+ ceph_put_cap(mdsc, cap);
+
+ if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
+ msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
+ dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
+ ceph_con_send(&session->s_con, msg);
+ msg = NULL;
+ }
}
- /* requeue completed messages */
- while (!list_empty(&session->s_cap_releases_done)) {
- msg = list_first_entry(&session->s_cap_releases_done,
- struct ceph_msg, list_head);
- list_del_init(&msg->list_head);
+ BUG_ON(num_cap_releases != 0);
- head = msg->front.iov_base;
- num = le32_to_cpu(head->num);
- dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg,
- num);
- session->s_num_cap_releases += num;
- head->num = cpu_to_le32(0);
- msg->front.iov_len = sizeof(*head);
- list_add(&msg->list_head, &session->s_cap_releases);
+ spin_lock(&session->s_cap_lock);
+ if (!list_empty(&session->s_cap_releases))
+ goto again;
+ spin_unlock(&session->s_cap_lock);
+
+ if (msg) {
+ msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
+ dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
+ ceph_con_send(&session->s_con, msg);
}
+ return;
+out_err:
+ pr_err("send_cap_releases mds%d, failed to allocate message\n",
+ session->s_mds);
+ spin_lock(&session->s_cap_lock);
+ list_splice(&tmp_list, &session->s_cap_releases);
+ session->s_num_cap_releases += num_cap_releases;
+ spin_unlock(&session->s_cap_lock);
}
/*
@@ -1635,7 +1668,8 @@ int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
order = get_order(size * num_entries);
while (order >= 0) {
- rinfo->dir_in = (void*)__get_free_pages(GFP_NOFS | __GFP_NOWARN,
+ rinfo->dir_in = (void*)__get_free_pages(GFP_KERNEL |
+ __GFP_NOWARN,
order);
if (rinfo->dir_in)
break;
@@ -1697,13 +1731,9 @@ static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
struct ceph_mds_request, r_node);
}
-static u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
+static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
{
- struct ceph_mds_request *req = __get_oldest_req(mdsc);
-
- if (req)
- return req->r_tid;
- return 0;
+ return mdsc->oldest_tid;
}
/*
@@ -2267,15 +2297,18 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
/* wait */
mutex_unlock(&mdsc->mutex);
dout("do_request waiting\n");
- if (req->r_timeout) {
- err = (long)wait_for_completion_killable_timeout(
- &req->r_completion, req->r_timeout);
- if (err == 0)
- err = -EIO;
- } else if (req->r_wait_for_completion) {
+ if (!req->r_timeout && req->r_wait_for_completion) {
err = req->r_wait_for_completion(mdsc, req);
} else {
- err = wait_for_completion_killable(&req->r_completion);
+ long timeleft = wait_for_completion_killable_timeout(
+ &req->r_completion,
+ ceph_timeout_jiffies(req->r_timeout));
+ if (timeleft > 0)
+ err = 0;
+ else if (!timeleft)
+ err = -EIO; /* timed out */
+ else
+ err = timeleft; /* killed */
}
dout("do_request waited, got %d\n", err);
mutex_lock(&mdsc->mutex);
@@ -2496,7 +2529,6 @@ out_err:
}
mutex_unlock(&mdsc->mutex);
- ceph_add_cap_releases(mdsc, req->r_session);
mutex_unlock(&session->s_mutex);
/* kick calling process */
@@ -2888,8 +2920,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
*/
session->s_cap_reconnect = 1;
/* drop old cap expires; we're about to reestablish that state */
- discard_cap_releases(mdsc, session);
- spin_unlock(&session->s_cap_lock);
+ cleanup_cap_releases(mdsc, session);
/* trim unused caps to reduce MDS's cache rejoin time */
if (mdsc->fsc->sb->s_root)
@@ -2956,6 +2987,9 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
reply->hdr.data_len = cpu_to_le32(pagelist->length);
ceph_msg_data_add_pagelist(reply, pagelist);
+
+ ceph_early_kick_flushing_caps(mdsc, session);
+
ceph_con_send(&session->s_con, reply);
mutex_unlock(&session->s_mutex);
@@ -3352,7 +3386,6 @@ static void delayed_work(struct work_struct *work)
send_renew_caps(mdsc, s);
else
ceph_con_keepalive(&s->s_con);
- ceph_add_cap_releases(mdsc, s);
if (s->s_state == CEPH_MDS_SESSION_OPEN ||
s->s_state == CEPH_MDS_SESSION_HUNG)
ceph_send_cap_releases(mdsc, s);
@@ -3390,11 +3423,13 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
atomic_set(&mdsc->num_sessions, 0);
mdsc->max_sessions = 0;
mdsc->stopping = 0;
+ mdsc->last_snap_seq = 0;
init_rwsem(&mdsc->snap_rwsem);
mdsc->snap_realms = RB_ROOT;
INIT_LIST_HEAD(&mdsc->snap_empty);
spin_lock_init(&mdsc->snap_empty_lock);
mdsc->last_tid = 0;
+ mdsc->oldest_tid = 0;
mdsc->request_tree = RB_ROOT;
INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
mdsc->last_renew_caps = jiffies;
@@ -3402,7 +3437,8 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
spin_lock_init(&mdsc->cap_delay_lock);
INIT_LIST_HEAD(&mdsc->snap_flush_list);
spin_lock_init(&mdsc->snap_flush_lock);
- mdsc->cap_flush_seq = 0;
+ mdsc->last_cap_flush_tid = 1;
+ mdsc->cap_flush_tree = RB_ROOT;
INIT_LIST_HEAD(&mdsc->cap_dirty);
INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
mdsc->num_cap_flushing = 0;
@@ -3414,6 +3450,9 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
ceph_caps_init(mdsc);
ceph_adjust_min_caps(mdsc, fsc->min_caps);
+ init_rwsem(&mdsc->pool_perm_rwsem);
+ mdsc->pool_perm_tree = RB_ROOT;
+
return 0;
}
@@ -3423,8 +3462,8 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
*/
static void wait_requests(struct ceph_mds_client *mdsc)
{
+ struct ceph_options *opts = mdsc->fsc->client->options;
struct ceph_mds_request *req;
- struct ceph_fs_client *fsc = mdsc->fsc;
mutex_lock(&mdsc->mutex);
if (__get_oldest_req(mdsc)) {
@@ -3432,7 +3471,7 @@ static void wait_requests(struct ceph_mds_client *mdsc)
dout("wait_requests waiting for requests\n");
wait_for_completion_timeout(&mdsc->safe_umount_waiters,
- fsc->client->options->mount_timeout * HZ);
+ ceph_timeout_jiffies(opts->mount_timeout));
/* tear down remaining requests */
mutex_lock(&mdsc->mutex);
@@ -3485,7 +3524,8 @@ restart:
nextreq = rb_entry(n, struct ceph_mds_request, r_node);
else
nextreq = NULL;
- if ((req->r_op & CEPH_MDS_OP_WRITE)) {
+ if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
+ (req->r_op & CEPH_MDS_OP_WRITE)) {
/* write op */
ceph_mdsc_get_request(req);
if (nextreq)
@@ -3513,7 +3553,7 @@ restart:
void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
{
- u64 want_tid, want_flush;
+ u64 want_tid, want_flush, want_snap;
if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN)
return;
@@ -3525,13 +3565,18 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
ceph_flush_dirty_caps(mdsc);
spin_lock(&mdsc->cap_dirty_lock);
- want_flush = mdsc->cap_flush_seq;
+ want_flush = mdsc->last_cap_flush_tid;
spin_unlock(&mdsc->cap_dirty_lock);
- dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush);
+ down_read(&mdsc->snap_rwsem);
+ want_snap = mdsc->last_snap_seq;
+ up_read(&mdsc->snap_rwsem);
+
+ dout("sync want tid %lld flush_seq %lld snap_seq %lld\n",
+ want_tid, want_flush, want_snap);
wait_unsafe_requests(mdsc, want_tid);
- wait_caps_flush(mdsc, want_flush);
+ wait_caps_flush(mdsc, want_flush, want_snap);
}
/*
@@ -3549,10 +3594,9 @@ static bool done_closing_sessions(struct ceph_mds_client *mdsc)
*/
void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
{
+ struct ceph_options *opts = mdsc->fsc->client->options;
struct ceph_mds_session *session;
int i;
- struct ceph_fs_client *fsc = mdsc->fsc;
- unsigned long timeout = fsc->client->options->mount_timeout * HZ;
dout("close_sessions\n");
@@ -3573,7 +3617,7 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
dout("waiting for sessions to close\n");
wait_event_timeout(mdsc->session_close_wq, done_closing_sessions(mdsc),
- timeout);
+ ceph_timeout_jiffies(opts->mount_timeout));
/* tear down remaining sessions */
mutex_lock(&mdsc->mutex);
@@ -3607,6 +3651,7 @@ static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
ceph_mdsmap_destroy(mdsc->mdsmap);
kfree(mdsc->sessions);
ceph_caps_finalize(mdsc);
+ ceph_pool_perm_destroy(mdsc);
}
void ceph_mdsc_destroy(struct ceph_fs_client *fsc)