diff options
Diffstat (limited to 'fs/ceph/mds_client.c')
-rw-r--r-- | fs/ceph/mds_client.c | 223 |
1 files changed, 86 insertions, 137 deletions
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 2bb9264b9225..76eb14489bfa 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -458,7 +458,6 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, s->s_cap_reconnect = 0; s->s_cap_iterator = NULL; INIT_LIST_HEAD(&s->s_cap_releases); - INIT_LIST_HEAD(&s->s_cap_releases_done); INIT_LIST_HEAD(&s->s_cap_flushing); INIT_LIST_HEAD(&s->s_cap_snaps_flushing); @@ -998,27 +997,25 @@ void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, * session caps */ -/* - * Free preallocated cap messages assigned to this session - */ -static void cleanup_cap_releases(struct ceph_mds_session *session) +/* caller holds s_cap_lock, we drop it */ +static void cleanup_cap_releases(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session) + __releases(session->s_cap_lock) { - struct ceph_msg *msg; + LIST_HEAD(tmp_list); + list_splice_init(&session->s_cap_releases, &tmp_list); + session->s_num_cap_releases = 0; + spin_unlock(&session->s_cap_lock); - spin_lock(&session->s_cap_lock); - while (!list_empty(&session->s_cap_releases)) { - msg = list_first_entry(&session->s_cap_releases, - struct ceph_msg, list_head); - list_del_init(&msg->list_head); - ceph_msg_put(msg); - } - while (!list_empty(&session->s_cap_releases_done)) { - msg = list_first_entry(&session->s_cap_releases_done, - struct ceph_msg, list_head); - list_del_init(&msg->list_head); - ceph_msg_put(msg); + dout("cleanup_cap_releases mds%d\n", session->s_mds); + while (!list_empty(&tmp_list)) { + struct ceph_cap *cap; + /* zero out the in-progress message */ + cap = list_first_entry(&tmp_list, + struct ceph_cap, session_caps); + list_del(&cap->session_caps); + ceph_put_cap(mdsc, cap); } - spin_unlock(&session->s_cap_lock); } static void cleanup_session_requests(struct ceph_mds_client *mdsc, @@ -1095,10 +1092,16 @@ static int iterate_session_caps(struct ceph_mds_session *session, dout("iterate_session_caps finishing cap %p removal\n", cap); BUG_ON(cap->session != session); + cap->session = NULL; list_del_init(&cap->session_caps); session->s_nr_caps--; - cap->session = NULL; - old_cap = cap; /* put_cap it w/o locks held */ + if (cap->queue_release) { + list_add_tail(&cap->session_caps, + &session->s_cap_releases); + session->s_num_cap_releases++; + } else { + old_cap = cap; /* put_cap it w/o locks held */ + } } if (ret < 0) goto out; @@ -1191,11 +1194,12 @@ static void remove_session_caps(struct ceph_mds_session *session) spin_lock(&session->s_cap_lock); } } - spin_unlock(&session->s_cap_lock); + + // drop cap expires and unlock s_cap_lock + cleanup_cap_releases(session->s_mdsc, session); BUG_ON(session->s_nr_caps > 0); BUG_ON(!list_empty(&session->s_cap_flushing)); - cleanup_cap_releases(session); } /* @@ -1418,76 +1422,10 @@ static int trim_caps(struct ceph_mds_client *mdsc, session->s_trim_caps = 0; } - ceph_add_cap_releases(mdsc, session); ceph_send_cap_releases(mdsc, session); return 0; } -/* - * Allocate cap_release messages. If there is a partially full message - * in the queue, try to allocate enough to cover it's remainder, so that - * we can send it immediately. - * - * Called under s_mutex. - */ -int ceph_add_cap_releases(struct ceph_mds_client *mdsc, - struct ceph_mds_session *session) -{ - struct ceph_msg *msg, *partial = NULL; - struct ceph_mds_cap_release *head; - int err = -ENOMEM; - int extra = mdsc->fsc->mount_options->cap_release_safety; - int num; - - dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds, - extra); - - spin_lock(&session->s_cap_lock); - - if (!list_empty(&session->s_cap_releases)) { - msg = list_first_entry(&session->s_cap_releases, - struct ceph_msg, - list_head); - head = msg->front.iov_base; - num = le32_to_cpu(head->num); - if (num) { - dout(" partial %p with (%d/%d)\n", msg, num, - (int)CEPH_CAPS_PER_RELEASE); - extra += CEPH_CAPS_PER_RELEASE - num; - partial = msg; - } - } - while (session->s_num_cap_releases < session->s_nr_caps + extra) { - spin_unlock(&session->s_cap_lock); - msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE, - GFP_NOFS, false); - if (!msg) - goto out_unlocked; - dout("add_cap_releases %p msg %p now %d\n", session, msg, - (int)msg->front.iov_len); - head = msg->front.iov_base; - head->num = cpu_to_le32(0); - msg->front.iov_len = sizeof(*head); - spin_lock(&session->s_cap_lock); - list_add(&msg->list_head, &session->s_cap_releases); - session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE; - } - - if (partial) { - head = partial->front.iov_base; - num = le32_to_cpu(head->num); - dout(" queueing partial %p with %d/%d\n", partial, num, - (int)CEPH_CAPS_PER_RELEASE); - list_move_tail(&partial->list_head, - &session->s_cap_releases_done); - session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num; - } - err = 0; - spin_unlock(&session->s_cap_lock); -out_unlocked: - return err; -} - static int check_cap_flush(struct ceph_inode_info *ci, u64 want_flush_seq, u64 want_snap_seq) { @@ -1590,60 +1528,74 @@ static void wait_caps_flush(struct ceph_mds_client *mdsc, void ceph_send_cap_releases(struct ceph_mds_client *mdsc, struct ceph_mds_session *session) { - struct ceph_msg *msg; + struct ceph_msg *msg = NULL; + struct ceph_mds_cap_release *head; + struct ceph_mds_cap_item *item; + struct ceph_cap *cap; + LIST_HEAD(tmp_list); + int num_cap_releases; - dout("send_cap_releases mds%d\n", session->s_mds); spin_lock(&session->s_cap_lock); - while (!list_empty(&session->s_cap_releases_done)) { - msg = list_first_entry(&session->s_cap_releases_done, - struct ceph_msg, list_head); - list_del_init(&msg->list_head); - spin_unlock(&session->s_cap_lock); - msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); - dout("send_cap_releases mds%d %p\n", session->s_mds, msg); - ceph_con_send(&session->s_con, msg); - spin_lock(&session->s_cap_lock); - } +again: + list_splice_init(&session->s_cap_releases, &tmp_list); + num_cap_releases = session->s_num_cap_releases; + session->s_num_cap_releases = 0; spin_unlock(&session->s_cap_lock); -} - -static void discard_cap_releases(struct ceph_mds_client *mdsc, - struct ceph_mds_session *session) -{ - struct ceph_msg *msg; - struct ceph_mds_cap_release *head; - unsigned num; - dout("discard_cap_releases mds%d\n", session->s_mds); + while (!list_empty(&tmp_list)) { + if (!msg) { + msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, + PAGE_CACHE_SIZE, GFP_NOFS, false); + if (!msg) + goto out_err; + head = msg->front.iov_base; + head->num = cpu_to_le32(0); + msg->front.iov_len = sizeof(*head); + } + cap = list_first_entry(&tmp_list, struct ceph_cap, + session_caps); + list_del(&cap->session_caps); + num_cap_releases--; - if (!list_empty(&session->s_cap_releases)) { - /* zero out the in-progress message */ - msg = list_first_entry(&session->s_cap_releases, - struct ceph_msg, list_head); head = msg->front.iov_base; - num = le32_to_cpu(head->num); - dout("discard_cap_releases mds%d %p %u\n", - session->s_mds, msg, num); - head->num = cpu_to_le32(0); - msg->front.iov_len = sizeof(*head); - session->s_num_cap_releases += num; + le32_add_cpu(&head->num, 1); + item = msg->front.iov_base + msg->front.iov_len; + item->ino = cpu_to_le64(cap->cap_ino); + item->cap_id = cpu_to_le64(cap->cap_id); + item->migrate_seq = cpu_to_le32(cap->mseq); + item->seq = cpu_to_le32(cap->issue_seq); + msg->front.iov_len += sizeof(*item); + + ceph_put_cap(mdsc, cap); + + if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { + msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); + dout("send_cap_releases mds%d %p\n", session->s_mds, msg); + ceph_con_send(&session->s_con, msg); + msg = NULL; + } } - /* requeue completed messages */ - while (!list_empty(&session->s_cap_releases_done)) { - msg = list_first_entry(&session->s_cap_releases_done, - struct ceph_msg, list_head); - list_del_init(&msg->list_head); + BUG_ON(num_cap_releases != 0); - head = msg->front.iov_base; - num = le32_to_cpu(head->num); - dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, - num); - session->s_num_cap_releases += num; - head->num = cpu_to_le32(0); - msg->front.iov_len = sizeof(*head); - list_add(&msg->list_head, &session->s_cap_releases); + spin_lock(&session->s_cap_lock); + if (!list_empty(&session->s_cap_releases)) + goto again; + spin_unlock(&session->s_cap_lock); + + if (msg) { + msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); + dout("send_cap_releases mds%d %p\n", session->s_mds, msg); + ceph_con_send(&session->s_con, msg); } + return; +out_err: + pr_err("send_cap_releases mds%d, failed to allocate message\n", + session->s_mds); + spin_lock(&session->s_cap_lock); + list_splice(&tmp_list, &session->s_cap_releases); + session->s_num_cap_releases += num_cap_releases; + spin_unlock(&session->s_cap_lock); } /* @@ -2529,7 +2481,6 @@ out_err: } mutex_unlock(&mdsc->mutex); - ceph_add_cap_releases(mdsc, req->r_session); mutex_unlock(&session->s_mutex); /* kick calling process */ @@ -2921,8 +2872,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, */ session->s_cap_reconnect = 1; /* drop old cap expires; we're about to reestablish that state */ - discard_cap_releases(mdsc, session); - spin_unlock(&session->s_cap_lock); + cleanup_cap_releases(mdsc, session); /* trim unused caps to reduce MDS's cache rejoin time */ if (mdsc->fsc->sb->s_root) @@ -3385,7 +3335,6 @@ static void delayed_work(struct work_struct *work) send_renew_caps(mdsc, s); else ceph_con_keepalive(&s->s_con); - ceph_add_cap_releases(mdsc, s); if (s->s_state == CEPH_MDS_SESSION_OPEN || s->s_state == CEPH_MDS_SESSION_HUNG) ceph_send_cap_releases(mdsc, s); |