diff options
Diffstat (limited to 'fs/ceph/caps.c')
-rw-r--r-- | fs/ceph/caps.c | 836 |
1 files changed, 577 insertions, 259 deletions
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index be5ea6af8366..dc10c9dd36c1 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -833,7 +833,9 @@ int __ceph_caps_used(struct ceph_inode_info *ci) used |= CEPH_CAP_PIN; if (ci->i_rd_ref) used |= CEPH_CAP_FILE_RD; - if (ci->i_rdcache_ref || ci->vfs_inode.i_data.nrpages) + if (ci->i_rdcache_ref || + (!S_ISDIR(ci->vfs_inode.i_mode) && /* ignore readdir cache */ + ci->vfs_inode.i_data.nrpages)) used |= CEPH_CAP_FILE_CACHE; if (ci->i_wr_ref) used |= CEPH_CAP_FILE_WR; @@ -926,16 +928,6 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release) /* remove from session list */ spin_lock(&session->s_cap_lock); - /* - * s_cap_reconnect is protected by s_cap_lock. no one changes - * s_cap_gen while session is in the reconnect state. - */ - if (queue_release && - (!session->s_cap_reconnect || - cap->cap_gen == session->s_cap_gen)) - __queue_cap_release(session, ci->i_vino.ino, cap->cap_id, - cap->mseq, cap->issue_seq); - if (session->s_cap_iterator == cap) { /* not yet, we are iterating over this very cap */ dout("__ceph_remove_cap delaying %p removal from session %p\n", @@ -948,6 +940,25 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release) } /* protect backpointer with s_cap_lock: see iterate_session_caps */ cap->ci = NULL; + + /* + * s_cap_reconnect is protected by s_cap_lock. no one changes + * s_cap_gen while session is in the reconnect state. + */ + if (queue_release && + (!session->s_cap_reconnect || cap->cap_gen == session->s_cap_gen)) { + cap->queue_release = 1; + if (removed) { + list_add_tail(&cap->session_caps, + &session->s_cap_releases); + session->s_num_cap_releases++; + removed = 0; + } + } else { + cap->queue_release = 0; + } + cap->cap_ino = ci->i_vino.ino; + spin_unlock(&session->s_cap_lock); /* remove from inode list */ @@ -977,8 +988,8 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release) static int send_cap_msg(struct ceph_mds_session *session, u64 ino, u64 cid, int op, int caps, int wanted, int dirty, - u32 seq, u64 flush_tid, u32 issue_seq, u32 mseq, - u64 size, u64 max_size, + u32 seq, u64 flush_tid, u64 oldest_flush_tid, + u32 issue_seq, u32 mseq, u64 size, u64 max_size, struct timespec *mtime, struct timespec *atime, u64 time_warp_seq, kuid_t uid, kgid_t gid, umode_t mode, @@ -992,20 +1003,23 @@ static int send_cap_msg(struct ceph_mds_session *session, size_t extra_len; dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s" - " seq %u/%u mseq %u follows %lld size %llu/%llu" + " seq %u/%u tid %llu/%llu mseq %u follows %lld size %llu/%llu" " xattr_ver %llu xattr_len %d\n", ceph_cap_op_name(op), cid, ino, ceph_cap_string(caps), ceph_cap_string(wanted), ceph_cap_string(dirty), - seq, issue_seq, mseq, follows, size, max_size, + seq, issue_seq, flush_tid, oldest_flush_tid, + mseq, follows, size, max_size, xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0); - /* flock buffer size + inline version + inline data size */ - extra_len = 4 + 8 + 4; + /* flock buffer size + inline version + inline data size + + * osd_epoch_barrier + oldest_flush_tid */ + extra_len = 4 + 8 + 4 + 4 + 8; msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc) + extra_len, GFP_NOFS, false); if (!msg) return -ENOMEM; + msg->hdr.version = cpu_to_le16(6); msg->hdr.tid = cpu_to_le64(flush_tid); fc = msg->front.iov_base; @@ -1041,6 +1055,10 @@ static int send_cap_msg(struct ceph_mds_session *session, ceph_encode_64(&p, inline_data ? 0 : CEPH_INLINE_NONE); /* inline data size */ ceph_encode_32(&p, 0); + /* osd_epoch_barrier */ + ceph_encode_32(&p, 0); + /* oldest_flush_tid */ + ceph_encode_64(&p, oldest_flush_tid); fc->xattr_version = cpu_to_le64(xattr_version); if (xattrs_buf) { @@ -1053,44 +1071,6 @@ static int send_cap_msg(struct ceph_mds_session *session, return 0; } -void __queue_cap_release(struct ceph_mds_session *session, - u64 ino, u64 cap_id, u32 migrate_seq, - u32 issue_seq) -{ - struct ceph_msg *msg; - struct ceph_mds_cap_release *head; - struct ceph_mds_cap_item *item; - - BUG_ON(!session->s_num_cap_releases); - msg = list_first_entry(&session->s_cap_releases, - struct ceph_msg, list_head); - - dout(" adding %llx release to mds%d msg %p (%d left)\n", - ino, session->s_mds, msg, session->s_num_cap_releases); - - BUG_ON(msg->front.iov_len + sizeof(*item) > PAGE_CACHE_SIZE); - head = msg->front.iov_base; - le32_add_cpu(&head->num, 1); - item = msg->front.iov_base + msg->front.iov_len; - item->ino = cpu_to_le64(ino); - item->cap_id = cpu_to_le64(cap_id); - item->migrate_seq = cpu_to_le32(migrate_seq); - item->seq = cpu_to_le32(issue_seq); - - session->s_num_cap_releases--; - - msg->front.iov_len += sizeof(*item); - if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { - dout(" release msg %p full\n", msg); - list_move_tail(&msg->list_head, &session->s_cap_releases_done); - } else { - dout(" release msg %p at %d/%d (%d)\n", msg, - (int)le32_to_cpu(head->num), - (int)CEPH_CAPS_PER_RELEASE, - (int)msg->front.iov_len); - } -} - /* * Queue cap releases when an inode is dropped from our cache. Since * inode is about to be destroyed, there is no need for i_ceph_lock. @@ -1127,7 +1107,7 @@ void ceph_queue_caps_release(struct inode *inode) */ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, int op, int used, int want, int retain, int flushing, - unsigned *pflush_tid) + u64 flush_tid, u64 oldest_flush_tid) __releases(cap->ci->i_ceph_lock) { struct ceph_inode_info *ci = cap->ci; @@ -1145,8 +1125,6 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, u64 xattr_version = 0; struct ceph_buffer *xattr_blob = NULL; int delayed = 0; - u64 flush_tid = 0; - int i; int ret; bool inline_data; @@ -1190,26 +1168,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, cap->implemented &= cap->issued | used; cap->mds_wanted = want; - if (flushing) { - /* - * assign a tid for flush operations so we can avoid - * flush1 -> dirty1 -> flush2 -> flushack1 -> mark - * clean type races. track latest tid for every bit - * so we can handle flush AxFw, flush Fw, and have the - * first ack clean Ax. - */ - flush_tid = ++ci->i_cap_flush_last_tid; - if (pflush_tid) - *pflush_tid = flush_tid; - dout(" cap_flush_tid %d\n", (int)flush_tid); - for (i = 0; i < CEPH_CAP_BITS; i++) - if (flushing & (1 << i)) - ci->i_cap_flush_tid[i] = flush_tid; - - follows = ci->i_head_snapc->seq; - } else { - follows = 0; - } + follows = flushing ? ci->i_head_snapc->seq : 0; keep = cap->implemented; seq = cap->seq; @@ -1237,7 +1196,8 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, spin_unlock(&ci->i_ceph_lock); ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id, - op, keep, want, flushing, seq, flush_tid, issue_seq, mseq, + op, keep, want, flushing, seq, + flush_tid, oldest_flush_tid, issue_seq, mseq, size, max_size, &mtime, &atime, time_warp_seq, uid, gid, mode, xattr_version, xattr_blob, follows, inline_data); @@ -1259,14 +1219,14 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, * asynchronously back to the MDS once sync writes complete and dirty * data is written out. * - * Unless @again is true, skip cap_snaps that were already sent to + * Unless @kick is true, skip cap_snaps that were already sent to * the MDS (i.e., during this session). * * Called under i_ceph_lock. Takes s_mutex as needed. */ void __ceph_flush_snaps(struct ceph_inode_info *ci, struct ceph_mds_session **psession, - int again) + int kick) __releases(ci->i_ceph_lock) __acquires(ci->i_ceph_lock) { @@ -1297,11 +1257,8 @@ retry: if (capsnap->dirty_pages || capsnap->writing) break; - /* - * if cap writeback already occurred, we should have dropped - * the capsnap in ceph_put_wrbuffer_cap_refs. - */ - BUG_ON(capsnap->dirty == 0); + /* should be removed by ceph_try_drop_cap_snap() */ + BUG_ON(!capsnap->need_flush); /* pick mds, take s_mutex */ if (ci->i_auth_cap == NULL) { @@ -1310,7 +1267,7 @@ retry: } /* only flush each capsnap once */ - if (!again && !list_empty(&capsnap->flushing_item)) { + if (!kick && !list_empty(&capsnap->flushing_item)) { dout("already flushed %p, skipping\n", capsnap); continue; } @@ -1320,6 +1277,9 @@ retry: if (session && session->s_mds != mds) { dout("oops, wrong session %p mutex\n", session); + if (kick) + goto out; + mutex_unlock(&session->s_mutex); ceph_put_mds_session(session); session = NULL; @@ -1343,20 +1303,22 @@ retry: goto retry; } - capsnap->flush_tid = ++ci->i_cap_flush_last_tid; + spin_lock(&mdsc->cap_dirty_lock); + capsnap->flush_tid = ++mdsc->last_cap_flush_tid; + spin_unlock(&mdsc->cap_dirty_lock); + atomic_inc(&capsnap->nref); - if (!list_empty(&capsnap->flushing_item)) - list_del_init(&capsnap->flushing_item); - list_add_tail(&capsnap->flushing_item, - &session->s_cap_snaps_flushing); + if (list_empty(&capsnap->flushing_item)) + list_add_tail(&capsnap->flushing_item, + &session->s_cap_snaps_flushing); spin_unlock(&ci->i_ceph_lock); dout("flush_snaps %p cap_snap %p follows %lld tid %llu\n", inode, capsnap, capsnap->follows, capsnap->flush_tid); send_cap_msg(session, ceph_vino(inode).ino, 0, CEPH_CAP_OP_FLUSHSNAP, capsnap->issued, 0, - capsnap->dirty, 0, capsnap->flush_tid, 0, mseq, - capsnap->size, 0, + capsnap->dirty, 0, capsnap->flush_tid, 0, + 0, mseq, capsnap->size, 0, &capsnap->mtime, &capsnap->atime, capsnap->time_warp_seq, capsnap->uid, capsnap->gid, capsnap->mode, @@ -1396,7 +1358,8 @@ static void ceph_flush_snaps(struct ceph_inode_info *ci) * Caller is then responsible for calling __mark_inode_dirty with the * returned flags value. */ -int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask) +int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask, + struct ceph_cap_flush **pcf) { struct ceph_mds_client *mdsc = ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc; @@ -1416,9 +1379,14 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask) ceph_cap_string(was | mask)); ci->i_dirty_caps |= mask; if (was == 0) { - if (!ci->i_head_snapc) + WARN_ON_ONCE(ci->i_prealloc_cap_flush); + swap(ci->i_prealloc_cap_flush, *pcf); + + if (!ci->i_head_snapc) { + WARN_ON_ONCE(!rwsem_is_locked(&mdsc->snap_rwsem)); ci->i_head_snapc = ceph_get_snap_context( ci->i_snap_realm->cached_context); + } dout(" inode %p now dirty snapc %p auth cap %p\n", &ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap); BUG_ON(!list_empty(&ci->i_dirty_item)); @@ -1429,6 +1397,8 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask) ihold(inode); dirty |= I_DIRTY_SYNC; } + } else { + WARN_ON_ONCE(!ci->i_prealloc_cap_flush); } BUG_ON(list_empty(&ci->i_dirty_item)); if (((was | ci->i_flushing_caps) & CEPH_CAP_FILE_BUFFER) && @@ -1438,6 +1408,74 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask) return dirty; } +static void __add_cap_flushing_to_inode(struct ceph_inode_info *ci, + struct ceph_cap_flush *cf) +{ + struct rb_node **p = &ci->i_cap_flush_tree.rb_node; + struct rb_node *parent = NULL; + struct ceph_cap_flush *other = NULL; + + while (*p) { + parent = *p; + other = rb_entry(parent, struct ceph_cap_flush, i_node); + + if (cf->tid < other->tid) + p = &(*p)->rb_left; + else if (cf->tid > other->tid) + p = &(*p)->rb_right; + else + BUG(); + } + + rb_link_node(&cf->i_node, parent, p); + rb_insert_color(&cf->i_node, &ci->i_cap_flush_tree); +} + +static void __add_cap_flushing_to_mdsc(struct ceph_mds_client *mdsc, + struct ceph_cap_flush *cf) +{ + struct rb_node **p = &mdsc->cap_flush_tree.rb_node; + struct rb_node *parent = NULL; + struct ceph_cap_flush *other = NULL; + + while (*p) { + parent = *p; + other = rb_entry(parent, struct ceph_cap_flush, g_node); + + if (cf->tid < other->tid) + p = &(*p)->rb_left; + else if (cf->tid > other->tid) + p = &(*p)->rb_right; + else + BUG(); + } + + rb_link_node(&cf->g_node, parent, p); + rb_insert_color(&cf->g_node, &mdsc->cap_flush_tree); +} + +struct ceph_cap_flush *ceph_alloc_cap_flush(void) +{ + return kmem_cache_alloc(ceph_cap_flush_cachep, GFP_KERNEL); +} + +void ceph_free_cap_flush(struct ceph_cap_flush *cf) +{ + if (cf) + kmem_cache_free(ceph_cap_flush_cachep, cf); +} + +static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc) +{ + struct rb_node *n = rb_first(&mdsc->cap_flush_tree); + if (n) { + struct ceph_cap_flush *cf = + rb_entry(n, struct ceph_cap_flush, g_node); + return cf->tid; + } + return 0; +} + /* * Add dirty inode to the flushing list. Assigned a seq number so we * can wait for caps to flush without starving. @@ -1445,14 +1483,17 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask) * Called under i_ceph_lock. */ static int __mark_caps_flushing(struct inode *inode, - struct ceph_mds_session *session) + struct ceph_mds_session *session, + u64 *flush_tid, u64 *oldest_flush_tid) { struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_cap_flush *cf = NULL; int flushing; BUG_ON(ci->i_dirty_caps == 0); BUG_ON(list_empty(&ci->i_dirty_item)); + BUG_ON(!ci->i_prealloc_cap_flush); flushing = ci->i_dirty_caps; dout("__mark_caps_flushing flushing %s, flushing_caps %s -> %s\n", @@ -1463,22 +1504,31 @@ static int __mark_caps_flushing(struct inode *inode, ci->i_dirty_caps = 0; dout(" inode %p now !dirty\n", inode); + swap(cf, ci->i_prealloc_cap_flush); + cf->caps = flushing; + cf->kick = false; + spin_lock(&mdsc->cap_dirty_lock); list_del_init(&ci->i_dirty_item); + cf->tid = ++mdsc->last_cap_flush_tid; + __add_cap_flushing_to_mdsc(mdsc, cf); + *oldest_flush_tid = __get_oldest_flush_tid(mdsc); + if (list_empty(&ci->i_flushing_item)) { - ci->i_cap_flush_seq = ++mdsc->cap_flush_seq; list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing); mdsc->num_cap_flushing++; - dout(" inode %p now flushing seq %lld\n", inode, - ci->i_cap_flush_seq); + dout(" inode %p now flushing tid %llu\n", inode, cf->tid); } else { list_move_tail(&ci->i_flushing_item, &session->s_cap_flushing); - dout(" inode %p now flushing (more) seq %lld\n", inode, - ci->i_cap_flush_seq); + dout(" inode %p now flushing (more) tid %llu\n", + inode, cf->tid); } spin_unlock(&mdsc->cap_dirty_lock); + __add_cap_flushing_to_inode(ci, cf); + + *flush_tid = cf->tid; return flushing; } @@ -1524,6 +1574,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, struct ceph_mds_client *mdsc = fsc->mdsc; struct inode *inode = &ci->vfs_inode; struct ceph_cap *cap; + u64 flush_tid, oldest_flush_tid; int file_wanted, used, cap_used; int took_snap_rwsem = 0; /* true if mdsc->snap_rwsem held */ int issued, implemented, want, retain, revoking, flushing = 0; @@ -1553,13 +1604,13 @@ retry: retry_locked: file_wanted = __ceph_caps_file_wanted(ci); used = __ceph_caps_used(ci); - want = file_wanted | used; issued = __ceph_caps_issued(ci, &implemented); revoking = implemented & ~issued; - retain = want | CEPH_CAP_PIN; + want = file_wanted; + retain = file_wanted | used | CEPH_CAP_PIN; if (!mdsc->stopping && inode->i_nlink > 0) { - if (want) { + if (file_wanted) { retain |= CEPH_CAP_ANY; /* be greedy */ } else if (S_ISDIR(inode->i_mode) && (issued & CEPH_CAP_FILE_SHARED) && @@ -1602,9 +1653,10 @@ retry_locked: * If we fail, it's because pages are locked.... try again later. */ if ((!is_delayed || mdsc->stopping) && - ci->i_wrbuffer_ref == 0 && /* no dirty pages... */ - inode->i_data.nrpages && /* have cached pages */ - (file_wanted == 0 || /* no open files */ + !S_ISDIR(inode->i_mode) && /* ignore readdir cache */ + ci->i_wrbuffer_ref == 0 && /* no dirty pages... */ + inode->i_data.nrpages && /* have cached pages */ + (file_wanted == 0 || /* no open files */ (revoking & (CEPH_CAP_FILE_CACHE| CEPH_CAP_FILE_LAZYIO))) && /* or revoking cache */ !tried_invalidate) { @@ -1742,17 +1794,25 @@ ack: took_snap_rwsem = 1; } - if (cap == ci->i_auth_cap && ci->i_dirty_caps) - flushing = __mark_caps_flushing(inode, session); - else + if (cap == ci->i_auth_cap && ci->i_dirty_caps) { + flushing = __mark_caps_flushing(inode, session, + &flush_tid, + &oldest_flush_tid); + } else { flushing = 0; + flush_tid = 0; + spin_lock(&mdsc->cap_dirty_lock); + oldest_flush_tid = __get_oldest_flush_tid(mdsc); + spin_unlock(&mdsc->cap_dirty_lock); + } mds = cap->mds; /* remember mds, so we don't repeat */ sent++; /* __send_cap drops i_ceph_lock */ delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, cap_used, - want, retain, flushing, NULL); + want, retain, flushing, + flush_tid, oldest_flush_tid); goto retry; /* retake i_ceph_lock and restart our cap scan. */ } @@ -1781,12 +1841,13 @@ ack: /* * Try to flush dirty caps back to the auth mds. */ -static int try_flush_caps(struct inode *inode, unsigned *flush_tid) +static int try_flush_caps(struct inode *inode, u64 *ptid) { struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; struct ceph_inode_info *ci = ceph_inode(inode); - int flushing = 0; struct ceph_mds_session *session = NULL; + int flushing = 0; + u64 flush_tid = 0, oldest_flush_tid = 0; retry: spin_lock(&ci->i_ceph_lock); @@ -1811,42 +1872,54 @@ retry: if (cap->session->s_state < CEPH_MDS_SESSION_OPEN) goto out; - flushing = __mark_caps_flushing(inode, session); + flushing = __mark_caps_flushing(inode, session, &flush_tid, + &oldest_flush_tid); /* __send_cap drops i_ceph_lock */ delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, used, want, - cap->issued | cap->implemented, flushing, - flush_tid); - if (!delayed) - goto out_unlocked; + (cap->issued | cap->implemented), + flushing, flush_tid, oldest_flush_tid); - spin_lock(&ci->i_ceph_lock); - __cap_delay_requeue(mdsc, ci); + if (delayed) { + spin_lock(&ci->i_ceph_lock); + __cap_delay_requeue(mdsc, ci); + spin_unlock(&ci->i_ceph_lock); + } + } else { + struct rb_node *n = rb_last(&ci->i_cap_flush_tree); + if (n) { + struct ceph_cap_flush *cf = + rb_entry(n, struct ceph_cap_flush, i_node); + flush_tid = cf->tid; + } + flushing = ci->i_flushing_caps; + spin_unlock(&ci->i_ceph_lock); } out: - spin_unlock(&ci->i_ceph_lock); -out_unlocked: if (session) mutex_unlock(&session->s_mutex); + + *ptid = flush_tid; return flushing; } /* * Return true if we've flushed caps through the given flush_tid. */ -static int caps_are_flushed(struct inode *inode, unsigned tid) +static int caps_are_flushed(struct inode *inode, u64 flush_tid) { struct ceph_inode_info *ci = ceph_inode(inode); - int i, ret = 1; + struct ceph_cap_flush *cf; + struct rb_node *n; + int ret = 1; spin_lock(&ci->i_ceph_lock); - for (i = 0; i < CEPH_CAP_BITS; i++) - if ((ci->i_flushing_caps & (1 << i)) && - ci->i_cap_flush_tid[i] <= tid) { - /* still flushing this bit */ + n = rb_first(&ci->i_cap_flush_tree); + if (n) { + cf = rb_entry(n, struct ceph_cap_flush, i_node); + if (cf->tid <= flush_tid) ret = 0; - break; - } + } spin_unlock(&ci->i_ceph_lock); return ret; } @@ -1864,13 +1937,16 @@ static void sync_write_wait(struct inode *inode) struct ceph_osd_request *req; u64 last_tid; + if (!S_ISREG(inode->i_mode)) + return; + spin_lock(&ci->i_unsafe_lock); if (list_empty(head)) goto out; /* set upper bound as _last_ entry in chain */ - req = list_entry(head->prev, struct ceph_osd_request, - r_unsafe_item); + req = list_last_entry(head, struct ceph_osd_request, + r_unsafe_item); last_tid = req->r_tid; do { @@ -1888,18 +1964,64 @@ static void sync_write_wait(struct inode *inode) */ if (list_empty(head)) break; - req = list_entry(head->next, struct ceph_osd_request, - r_unsafe_item); + req = list_first_entry(head, struct ceph_osd_request, + r_unsafe_item); } while (req->r_tid < last_tid); out: spin_unlock(&ci->i_unsafe_lock); } +/* + * wait for any uncommitted directory operations to commit. + */ +static int unsafe_dirop_wait(struct inode *inode) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + struct list_head *head = &ci->i_unsafe_dirops; + struct ceph_mds_request *req; + u64 last_tid; + int ret = 0; + + if (!S_ISDIR(inode->i_mode)) + return 0; + + spin_lock(&ci->i_unsafe_lock); + if (list_empty(head)) + goto out; + + req = list_last_entry(head, struct ceph_mds_request, + r_unsafe_dir_item); + last_tid = req->r_tid; + + do { + ceph_mdsc_get_request(req); + spin_unlock(&ci->i_unsafe_lock); + + dout("unsafe_dirop_wait %p wait on tid %llu (until %llu)\n", + inode, req->r_tid, last_tid); + ret = !wait_for_completion_timeout(&req->r_safe_completion, + ceph_timeout_jiffies(req->r_timeout)); + if (ret) + ret = -EIO; /* timed out */ + + ceph_mdsc_put_request(req); + + spin_lock(&ci->i_unsafe_lock); + if (ret || list_empty(head)) + break; + req = list_first_entry(head, struct ceph_mds_request, + r_unsafe_dir_item); + } while (req->r_tid < last_tid); +out: + spin_unlock(&ci->i_unsafe_lock); + return ret; +} + int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) { struct inode *inode = file->f_mapping->host; struct ceph_inode_info *ci = ceph_inode(inode); - unsigned flush_tid; + u64 flush_tid; int ret; int dirty; @@ -1908,25 +2030,30 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) ret = filemap_write_and_wait_range(inode->i_mapping, start, end); if (ret < 0) - return ret; + goto out; + + if (datasync) + goto out; + mutex_lock(&inode->i_mutex); dirty = try_flush_caps(inode, &flush_tid); dout("fsync dirty caps are %s\n", ceph_cap_string(dirty)); + ret = unsafe_dirop_wait(inode); + /* * only wait on non-file metadata writeback (the mds * can recover size and mtime, so we don't need to * wait for that) */ - if (!datasync && (dirty & ~CEPH_CAP_ANY_FILE_WR)) { - dout("fsync waiting for flush_tid %u\n", flush_tid); + if (!ret && (dirty & ~CEPH_CAP_ANY_FILE_WR)) { ret = wait_event_interruptible(ci->i_cap_wq, - caps_are_flushed(inode, flush_tid)); + caps_are_flushed(inode, flush_tid)); } - - dout("fsync %p%s done\n", inode, datasync ? " datasync" : ""); mutex_unlock(&inode->i_mutex); +out: + dout("fsync %p%s result=%d\n", inode, datasync ? " datasync" : "", ret); return ret; } @@ -1939,7 +2066,7 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) int ceph_write_inode(struct inode *inode, struct writeback_control *wbc) { struct ceph_inode_info *ci = ceph_inode(inode); - unsigned flush_tid; + u64 flush_tid; int err = 0; int dirty; int wait = wbc->sync_mode == WB_SYNC_ALL; @@ -1994,6 +2121,104 @@ static void kick_flushing_capsnaps(struct ceph_mds_client *mdsc, } } +static int __kick_flushing_caps(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session, + struct ceph_inode_info *ci, + bool kick_all) +{ + struct inode *inode = &ci->vfs_inode; + struct ceph_cap *cap; + struct ceph_cap_flush *cf; + struct rb_node *n; + int delayed = 0; + u64 first_tid = 0; + u64 oldest_flush_tid; + + spin_lock(&mdsc->cap_dirty_lock); + oldest_flush_tid = __get_oldest_flush_tid(mdsc); + spin_unlock(&mdsc->cap_dirty_lock); + + while (true) { + spin_lock(&ci->i_ceph_lock); + cap = ci->i_auth_cap; + if (!(cap && cap->session == session)) { + pr_err("%p auth cap %p not mds%d ???\n", inode, + cap, session->s_mds); + spin_unlock(&ci->i_ceph_lock); + break; + } + + for (n = rb_first(&ci->i_cap_flush_tree); n; n = rb_next(n)) { + cf = rb_entry(n, struct ceph_cap_flush, i_node); + if (cf->tid < first_tid) + continue; + if (kick_all || cf->kick) + break; + } + if (!n) { + spin_unlock(&ci->i_ceph_lock); + break; + } + + cf = rb_entry(n, struct ceph_cap_flush, i_node); + cf->kick = false; + + first_tid = cf->tid + 1; + + dout("kick_flushing_caps %p cap %p tid %llu %s\n", inode, + cap, cf->tid, ceph_cap_string(cf->caps)); + delayed |= __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, + __ceph_caps_used(ci), + __ceph_caps_wanted(ci), + cap->issued | cap->implemented, + cf->caps, cf->tid, oldest_flush_tid); + } + return delayed; +} + +void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session) +{ + struct ceph_inode_info *ci; + struct ceph_cap *cap; + struct ceph_cap_flush *cf; + struct rb_node *n; + + dout("early_kick_flushing_caps mds%d\n", session->s_mds); + list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) { + spin_lock(&ci->i_ceph_lock); + cap = ci->i_auth_cap; + if (!(cap && cap->session == session)) { + pr_err("%p auth cap %p not mds%d ???\n", + &ci->vfs_inode, cap, session->s_mds); + spin_unlock(&ci->i_ceph_lock); + continue; + } + + + /* + * if flushing caps were revoked, we re-send the cap flush + * in client reconnect stage. This guarantees MDS * processes + * the cap flush message before issuing the flushing caps to + * other client. + */ + if ((cap->issued & ci->i_flushing_caps) != + ci->i_flushing_caps) { + spin_unlock(&ci->i_ceph_lock); + if (!__kick_flushing_caps(mdsc, session, ci, true)) + continue; + spin_lock(&ci->i_ceph_lock); + } + + for (n = rb_first(&ci->i_cap_flush_tree); n; n = rb_next(n)) { + cf = rb_entry(n, struct ceph_cap_flush, i_node); + cf->kick = true; + } + + spin_unlock(&ci->i_ceph_lock); + } +} + void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc, struct ceph_mds_session *session) { @@ -2003,28 +2228,10 @@ void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc, dout("kick_flushing_caps mds%d\n", session->s_mds); list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) { - struct inode *inode = &ci->vfs_inode; - struct ceph_cap *cap; - int delayed = 0; - - spin_lock(&ci->i_ceph_lock); - cap = ci->i_auth_cap; - if (cap && cap->session == session) { - dout("kick_flushing_caps %p cap %p %s\n", inode, - cap, ceph_cap_string(ci->i_flushing_caps)); - delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, - __ceph_caps_used(ci), - __ceph_caps_wanted(ci), - cap->issued | cap->implemented, - ci->i_flushing_caps, NULL); - if (delayed) { - spin_lock(&ci->i_ceph_lock); - __cap_delay_requeue(mdsc, ci); - spin_unlock(&ci->i_ceph_lock); - } - } else { - pr_err("%p auth cap %p not mds%d ???\n", inode, - cap, session->s_mds); + int delayed = __kick_flushing_caps(mdsc, session, ci, false); + if (delayed) { + spin_lock(&ci->i_ceph_lock); + __cap_delay_requeue(mdsc, ci); spin_unlock(&ci->i_ceph_lock); } } @@ -2036,26 +2243,25 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc, { struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_cap *cap; - int delayed = 0; spin_lock(&ci->i_ceph_lock); cap = ci->i_auth_cap; - dout("kick_flushing_inode_caps %p flushing %s flush_seq %lld\n", inode, - ceph_cap_string(ci->i_flushing_caps), ci->i_cap_flush_seq); + dout("kick_flushing_inode_caps %p flushing %s\n", inode, + ceph_cap_string(ci->i_flushing_caps)); __ceph_flush_snaps(ci, &session, 1); if (ci->i_flushing_caps) { + int delayed; + spin_lock(&mdsc->cap_dirty_lock); list_move_tail(&ci->i_flushing_item, &cap->session->s_cap_flushing); spin_unlock(&mdsc->cap_dirty_lock); - delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, - __ceph_caps_used(ci), - __ceph_caps_wanted(ci), - cap->issued | cap->implemented, - ci->i_flushing_caps, NULL); + spin_unlock(&ci->i_ceph_lock); + + delayed = __kick_flushing_caps(mdsc, session, ci, true); if (delayed) { spin_lock(&ci->i_ceph_lock); __cap_delay_requeue(mdsc, ci); @@ -2073,7 +2279,8 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc, * * Protected by i_ceph_lock. */ -static void __take_cap_refs(struct ceph_inode_info *ci, int got) +static void __take_cap_refs(struct ceph_inode_info *ci, int got, + bool snap_rwsem_locked) { if (got & CEPH_CAP_PIN) ci->i_pin_ref++; @@ -2081,8 +2288,14 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got) ci->i_rd_ref++; if (got & CEPH_CAP_FILE_CACHE) ci->i_rdcache_ref++; - if (got & CEPH_CAP_FILE_WR) + if (got & CEPH_CAP_FILE_WR) { + if (ci->i_wr_ref == 0 && !ci->i_head_snapc) { + BUG_ON(!snap_rwsem_locked); + ci->i_head_snapc = ceph_get_snap_context( + ci->i_snap_realm->cached_context); + } ci->i_wr_ref++; + } if (got & CEPH_CAP_FILE_BUFFER) { if (ci->i_wb_ref == 0) ihold(&ci->vfs_inode); @@ -2100,16 +2313,19 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got) * requested from the MDS. */ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, - loff_t endoff, int *got, int *check_max, int *err) + loff_t endoff, bool nonblock, int *got, int *err) { struct inode *inode = &ci->vfs_inode; + struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; int ret = 0; int have, implemented; int file_wanted; + bool snap_rwsem_locked = false; dout("get_cap_refs %p need %s want %s\n", inode, ceph_cap_string(need), ceph_cap_string(want)); +again: spin_lock(&ci->i_ceph_lock); /* make sure file is actually open */ @@ -2125,6 +2341,10 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, /* finish pending truncate */ while (ci->i_truncate_pending) { spin_unlock(&ci->i_ceph_lock); + if (snap_rwsem_locked) { + up_read(&mdsc->snap_rwsem); + snap_rwsem_locked = false; + } __ceph_do_pending_vmtruncate(inode); spin_lock(&ci->i_ceph_lock); } @@ -2136,7 +2356,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, dout("get_cap_refs %p endoff %llu > maxsize %llu\n", inode, endoff, ci->i_max_size); if (endoff > ci->i_requested_max_size) { - *check_max = 1; + *err = -EAGAIN; ret = 1; } goto out_unlock; @@ -2164,8 +2384,29 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, inode, ceph_cap_string(have), ceph_cap_string(not), ceph_cap_string(revoking)); if ((revoking & not) == 0) { + if (!snap_rwsem_locked && + !ci->i_head_snapc && + (need & CEPH_CAP_FILE_WR)) { + if (!down_read_trylock(&mdsc->snap_rwsem)) { + /* + * we can not call down_read() when + * task isn't in TASK_RUNNING state + */ + if (nonblock) { + *err = -EAGAIN; + ret = 1; + goto out_unlock; + } + + spin_unlock(&ci->i_ceph_lock); + down_read(&mdsc->snap_rwsem); + snap_rwsem_locked = true; + goto again; + } + snap_rwsem_locked = true; + } *got = need | (have & want); - __take_cap_refs(ci, *got); + __take_cap_refs(ci, *got, true); ret = 1; } } else { @@ -2189,6 +2430,8 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, } out_unlock: spin_unlock(&ci->i_ceph_lock); + if (snap_rwsem_locked) + up_read(&mdsc->snap_rwsem); dout("get_cap_refs %p ret %d got %s\n", inode, ret, ceph_cap_string(*got)); @@ -2231,50 +2474,70 @@ static void check_max_size(struct inode *inode, loff_t endoff) int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, loff_t endoff, int *got, struct page **pinned_page) { - int _got, check_max, ret, err = 0; + int _got, ret, err = 0; -retry: - if (endoff > 0) - check_max_size(&ci->vfs_inode, endoff); - _got = 0; - check_max = 0; - ret = wait_event_interruptible(ci->i_cap_wq, - try_get_cap_refs(ci, need, want, endoff, - &_got, &check_max, &err)); - if (err) - ret = err; + ret = ceph_pool_perm_check(ci, need); if (ret < 0) return ret; - if (check_max) - goto retry; + while (true) { + if (endoff > 0) + check_max_size(&ci->vfs_inode, endoff); - if (ci->i_inline_version != CEPH_INLINE_NONE && - (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) && - i_size_read(&ci->vfs_inode) > 0) { - struct page *page = find_get_page(ci->vfs_inode.i_mapping, 0); - if (page) { - if (PageUptodate(page)) { - *pinned_page = page; - goto out; - } - page_cache_release(page); - } - /* - * drop cap refs first because getattr while holding - * caps refs can cause deadlock. - */ - ceph_put_cap_refs(ci, _got); + err = 0; _got = 0; + ret = try_get_cap_refs(ci, need, want, endoff, + false, &_got, &err); + if (ret) { + if (err == -EAGAIN) + continue; + if (err < 0) + return err; + } else { + ret = wait_event_interruptible(ci->i_cap_wq, + try_get_cap_refs(ci, need, want, endoff, + true, &_got, &err)); + if (err == -EAGAIN) + continue; + if (err < 0) + ret = err; + if (ret < 0) + return ret; + } - /* getattr request will bring inline data into page cache */ - ret = __ceph_do_getattr(&ci->vfs_inode, NULL, - CEPH_STAT_CAP_INLINE_DATA, true); - if (ret < 0) - return ret; - goto retry; + if (ci->i_inline_version != CEPH_INLINE_NONE && + (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) && + i_size_read(&ci->vfs_inode) > 0) { + struct page *page = + find_get_page(ci->vfs_inode.i_mapping, 0); + if (page) { + if (PageUptodate(page)) { + *pinned_page = page; + break; + } + page_cache_release(page); + } + /* + * drop cap refs first because getattr while + * holding * caps refs can cause deadlock. + */ + ceph_put_cap_refs(ci, _got); + _got = 0; + + /* + * getattr request will bring inline data into + * page cache + */ + ret = __ceph_do_getattr(&ci->vfs_inode, NULL, + CEPH_STAT_CAP_INLINE_DATA, + true); + if (ret < 0) + return ret; + continue; + } + break; } -out: + *got = _got; return 0; } @@ -2286,10 +2549,31 @@ out: void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps) { spin_lock(&ci->i_ceph_lock); - __take_cap_refs(ci, caps); + __take_cap_refs(ci, caps, false); spin_unlock(&ci->i_ceph_lock); } + +/* + * drop cap_snap that is not associated with any snapshot. + * we don't need to send FLUSHSNAP message for it. + */ +static int ceph_try_drop_cap_snap(struct ceph_cap_snap *capsnap) +{ + if (!capsnap->need_flush && + !capsnap->writing && !capsnap->dirty_pages) { + + dout("dropping cap_snap %p follows %llu\n", + capsnap, capsnap->follows); + ceph_put_snap_context(capsnap->context); + list_del(&capsnap->ci_item); + list_del(&capsnap->flushing_item); + ceph_put_cap_snap(capsnap); + return 1; + } + return 0; +} + /* * Release cap refs. * @@ -2303,7 +2587,6 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had) { struct inode *inode = &ci->vfs_inode; int last = 0, put = 0, flushsnaps = 0, wake = 0; - struct ceph_cap_snap *capsnap; spin_lock(&ci->i_ceph_lock); if (had & CEPH_CAP_PIN) @@ -2325,17 +2608,24 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had) if (had & CEPH_CAP_FILE_WR) if (--ci->i_wr_ref == 0) { last++; - if (!list_empty(&ci->i_cap_snaps)) { - capsnap = list_first_entry(&ci->i_cap_snaps, - struct ceph_cap_snap, - ci_item); - if (capsnap->writing) { - capsnap->writing = 0; - flushsnaps = - __ceph_finish_cap_snap(ci, - capsnap); - wake = 1; - } + if (__ceph_have_pending_cap_snap(ci)) { + struct ceph_cap_snap *capsnap = + list_last_entry(&ci->i_cap_snaps, + struct ceph_cap_snap, + ci_item); + capsnap->writing = 0; + if (ceph_try_drop_cap_snap(capsnap)) + put++; + else if (__ceph_finish_cap_snap(ci, capsnap)) + flushsnaps = 1; + wake = 1; + } + if (ci->i_wrbuffer_ref_head == 0 && + ci->i_dirty_caps == 0 && + ci->i_flushing_caps == 0) { + BUG_ON(!ci->i_head_snapc); + ceph_put_snap_context(ci->i_head_snapc); + ci->i_head_snapc = NULL; } /* see comment in __ceph_remove_cap() */ if (!__ceph_is_any_caps(ci) && ci->i_snap_realm) @@ -2352,7 +2642,7 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had) ceph_flush_snaps(ci); if (wake) wake_up_all(&ci->i_cap_wq); - if (put) + while (put-- > 0) iput(inode); } @@ -2380,7 +2670,9 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, if (ci->i_head_snapc == snapc) { ci->i_wrbuffer_ref_head -= nr; if (ci->i_wrbuffer_ref_head == 0 && - ci->i_dirty_caps == 0 && ci->i_flushing_caps == 0) { + ci->i_wr_ref == 0 && + ci->i_dirty_caps == 0 && + ci->i_flushing_caps == 0) { BUG_ON(!ci->i_head_snapc); ceph_put_snap_context(ci->i_head_snapc); ci->i_head_snapc = NULL; @@ -2401,25 +2693,15 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, capsnap->dirty_pages -= nr; if (capsnap->dirty_pages == 0) { complete_capsnap = 1; - if (capsnap->dirty == 0) - /* cap writeback completed before we created - * the cap_snap; no FLUSHSNAP is needed */ - drop_capsnap = 1; + drop_capsnap = ceph_try_drop_cap_snap(capsnap); } dout("put_wrbuffer_cap_refs on %p cap_snap %p " - " snap %lld %d/%d -> %d/%d %s%s%s\n", + " snap %lld %d/%d -> %d/%d %s%s\n", inode, capsnap, capsnap->context->seq, ci->i_wrbuffer_ref+nr, capsnap->dirty_pages + nr, ci->i_wrbuffer_ref, capsnap->dirty_pages, last ? " (wrbuffer last)" : "", - complete_capsnap ? " (complete capsnap)" : "", - drop_capsnap ? " (drop capsnap)" : ""); - if (drop_capsnap) { - ceph_put_snap_context(capsnap->context); - list_del(&capsnap->ci_item); - list_del(&capsnap->flushing_item); - ceph_put_cap_snap(capsnap); - } + complete_capsnap ? " (complete capsnap)" : ""); } spin_unlock(&ci->i_ceph_lock); @@ -2526,7 +2808,8 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc, * try to invalidate (once). (If there are dirty buffers, we * will invalidate _after_ writeback.) */ - if (((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) && + if (!S_ISDIR(inode->i_mode) && /* don't invalidate readdir cache */ + ((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) && (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 && !ci->i_wrbuffer_ref) { if (try_nonblocking_invalidate(inode)) { @@ -2732,16 +3015,29 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, { struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; + struct ceph_cap_flush *cf; + struct rb_node *n; + LIST_HEAD(to_remove); unsigned seq = le32_to_cpu(m->seq); int dirty = le32_to_cpu(m->dirty); int cleaned = 0; int drop = 0; - int i; - for (i = 0; i < CEPH_CAP_BITS; i++) - if ((dirty & (1 << i)) && - (u16)flush_tid == ci->i_cap_flush_tid[i]) - cleaned |= 1 << i; + n = rb_first(&ci->i_cap_flush_tree); + while (n) { + cf = rb_entry(n, struct ceph_cap_flush, i_node); + n = rb_next(&cf->i_node); + if (cf->tid == flush_tid) + cleaned = cf->caps; + if (cf->tid <= flush_tid) { + rb_erase(&cf->i_node, &ci->i_cap_flush_tree); + list_add_tail(&cf->list, &to_remove); + } else { + cleaned &= ~cf->caps; + if (!cleaned) + break; + } + } dout("handle_cap_flush_ack inode %p mds%d seq %d on %s cleaned %s," " flushing %s -> %s\n", @@ -2749,12 +3045,23 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, ceph_cap_string(cleaned), ceph_cap_string(ci->i_flushing_caps), ceph_cap_string(ci->i_flushing_caps & ~cleaned)); - if (ci->i_flushing_caps == (ci->i_flushing_caps & ~cleaned)) + if (list_empty(&to_remove) && !cleaned) goto out; ci->i_flushing_caps &= ~cleaned; spin_lock(&mdsc->cap_dirty_lock); + + if (!list_empty(&to_remove)) { + list_for_each_entry(cf, &to_remove, list) + rb_erase(&cf->g_node, &mdsc->cap_flush_tree); + + n = rb_first(&mdsc->cap_flush_tree); + cf = n ? rb_entry(n, struct ceph_cap_flush, g_node) : NULL; + if (!cf || cf->tid > flush_tid) + wake_up_all(&mdsc->cap_flushing_wq); + } + if (ci->i_flushing_caps == 0) { list_del_init(&ci->i_flushing_item); if (!list_empty(&session->s_cap_flushing)) @@ -2764,14 +3071,14 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, struct ceph_inode_info, i_flushing_item)->vfs_inode); mdsc->num_cap_flushing--; - wake_up_all(&mdsc->cap_flushing_wq); dout(" inode %p now !flushing\n", inode); if (ci->i_dirty_caps == 0) { dout(" inode %p now clean\n", inode); BUG_ON(!list_empty(&ci->i_dirty_item)); drop = 1; - if (ci->i_wrbuffer_ref_head == 0) { + if (ci->i_wr_ref == 0 && + ci->i_wrbuffer_ref_head == 0) { BUG_ON(!ci->i_head_snapc); ceph_put_snap_context(ci->i_head_snapc); ci->i_head_snapc = NULL; @@ -2785,6 +3092,13 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, out: spin_unlock(&ci->i_ceph_lock); + + while (!list_empty(&to_remove)) { + cf = list_first_entry(&to_remove, + struct ceph_cap_flush, list); + list_del(&cf->list); + ceph_free_cap_flush(cf); + } if (drop) iput(inode); } @@ -2800,6 +3114,7 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid, struct ceph_mds_session *session) { struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; u64 follows = le64_to_cpu(m->snap_follows); struct ceph_cap_snap *capsnap; int drop = 0; @@ -2823,6 +3138,7 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid, list_del(&capsnap->ci_item); list_del(&capsnap->flushing_item); ceph_put_cap_snap(capsnap); + wake_up_all(&mdsc->cap_flushing_wq); drop = 1; break; } else { @@ -2971,7 +3287,6 @@ retry: mutex_lock_nested(&session->s_mutex, SINGLE_DEPTH_NESTING); } - ceph_add_cap_releases(mdsc, tsession); new_cap = ceph_get_cap(mdsc, NULL); } else { WARN_ON(1); @@ -3167,16 +3482,20 @@ void ceph_handle_caps(struct ceph_mds_session *session, dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq, (unsigned)seq); - if (op == CEPH_CAP_OP_IMPORT) - ceph_add_cap_releases(mdsc, session); - if (!inode) { dout(" i don't have ino %llx\n", vino.ino); if (op == CEPH_CAP_OP_IMPORT) { + cap = ceph_get_cap(mdsc, NULL); + cap->cap_ino = vino.ino; + cap->queue_release = 1; + cap->cap_id = cap_id; + cap->mseq = mseq; + cap->seq = seq; spin_lock(&session->s_cap_lock); - __queue_cap_release(session, vino.ino, cap_id, - mseq, seq); + list_add_tail(&cap->session_caps, + &session->s_cap_releases); + session->s_num_cap_releases++; spin_unlock(&session->s_cap_lock); } goto flush_cap_releases; @@ -3252,11 +3571,10 @@ void ceph_handle_caps(struct ceph_mds_session *session, flush_cap_releases: /* - * send any full release message to try to move things + * send any cap release message to try to move things * along for the mds (who clearly thinks we still have this * cap). */ - ceph_add_cap_releases(mdsc, session); ceph_send_cap_releases(mdsc, session); done: |