diff options
Diffstat (limited to 'fs/ceph')
-rw-r--r-- | fs/ceph/acl.c | 4 | ||||
-rw-r--r-- | fs/ceph/addr.c | 308 | ||||
-rw-r--r-- | fs/ceph/caps.c | 836 | ||||
-rw-r--r-- | fs/ceph/dir.c | 383 | ||||
-rw-r--r-- | fs/ceph/file.c | 61 | ||||
-rw-r--r-- | fs/ceph/inode.c | 155 | ||||
-rw-r--r-- | fs/ceph/mds_client.c | 425 | ||||
-rw-r--r-- | fs/ceph/mds_client.h | 23 | ||||
-rw-r--r-- | fs/ceph/snap.c | 173 | ||||
-rw-r--r-- | fs/ceph/super.c | 25 | ||||
-rw-r--r-- | fs/ceph/super.h | 125 | ||||
-rw-r--r-- | fs/ceph/xattr.c | 65 |
12 files changed, 1689 insertions, 894 deletions
diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c index 64fa248343f6..8f84646f10e9 100644 --- a/fs/ceph/acl.c +++ b/fs/ceph/acl.c @@ -187,10 +187,10 @@ int ceph_pre_init_acls(struct inode *dir, umode_t *mode, val_size2 = posix_acl_xattr_size(default_acl->a_count); err = -ENOMEM; - tmp_buf = kmalloc(max(val_size1, val_size2), GFP_NOFS); + tmp_buf = kmalloc(max(val_size1, val_size2), GFP_KERNEL); if (!tmp_buf) goto out_err; - pagelist = kmalloc(sizeof(struct ceph_pagelist), GFP_NOFS); + pagelist = kmalloc(sizeof(struct ceph_pagelist), GFP_KERNEL); if (!pagelist) goto out_err; ceph_pagelist_init(pagelist); diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index e162bcd105ee..890c50971a69 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -87,17 +87,21 @@ static int ceph_set_page_dirty(struct page *page) inode = mapping->host; ci = ceph_inode(inode); - /* - * Note that we're grabbing a snapc ref here without holding - * any locks! - */ - snapc = ceph_get_snap_context(ci->i_snap_realm->cached_context); - /* dirty the head */ spin_lock(&ci->i_ceph_lock); - if (ci->i_head_snapc == NULL) - ci->i_head_snapc = ceph_get_snap_context(snapc); - ++ci->i_wrbuffer_ref_head; + BUG_ON(ci->i_wr_ref == 0); // caller should hold Fw reference + if (__ceph_have_pending_cap_snap(ci)) { + struct ceph_cap_snap *capsnap = + list_last_entry(&ci->i_cap_snaps, + struct ceph_cap_snap, + ci_item); + snapc = ceph_get_snap_context(capsnap->context); + capsnap->dirty_pages++; + } else { + BUG_ON(!ci->i_head_snapc); + snapc = ceph_get_snap_context(ci->i_head_snapc); + ++ci->i_wrbuffer_ref_head; + } if (ci->i_wrbuffer_ref == 0) ihold(inode); ++ci->i_wrbuffer_ref; @@ -346,7 +350,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) /* build page vector */ nr_pages = calc_pages_for(0, len); - pages = kmalloc(sizeof(*pages) * nr_pages, GFP_NOFS); + pages = kmalloc(sizeof(*pages) * nr_pages, GFP_KERNEL); ret = -ENOMEM; if (!pages) goto out; @@ -358,7 +362,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) dout("start_read %p adding %p idx %lu\n", inode, page, page->index); if (add_to_page_cache_lru(page, &inode->i_data, page->index, - GFP_NOFS)) { + GFP_KERNEL)) { ceph_fscache_uncache_page(inode, page); page_cache_release(page); dout("start_read %p add_to_page_cache failed %p\n", @@ -436,7 +440,7 @@ out: * only snap context we are allowed to write back. */ static struct ceph_snap_context *get_oldest_context(struct inode *inode, - u64 *snap_size) + loff_t *snap_size) { struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_snap_context *snapc = NULL; @@ -476,8 +480,9 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) struct ceph_osd_client *osdc; struct ceph_snap_context *snapc, *oldest; loff_t page_off = page_offset(page); + loff_t snap_size = -1; long writeback_stat; - u64 truncate_size, snap_size = 0; + u64 truncate_size; u32 truncate_seq; int err = 0, len = PAGE_CACHE_SIZE; @@ -512,7 +517,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) spin_lock(&ci->i_ceph_lock); truncate_seq = ci->i_truncate_seq; truncate_size = ci->i_truncate_size; - if (!snap_size) + if (snap_size == -1) snap_size = i_size_read(inode); spin_unlock(&ci->i_ceph_lock); @@ -695,7 +700,8 @@ static int ceph_writepages_start(struct address_space *mapping, unsigned wsize = 1 << inode->i_blkbits; struct ceph_osd_request *req = NULL; int do_sync = 0; - u64 truncate_size, snap_size; + loff_t snap_size, i_size; + u64 truncate_size; u32 truncate_seq; /* @@ -741,7 +747,7 @@ static int ceph_writepages_start(struct address_space *mapping, retry: /* find oldest snap context with dirty data */ ceph_put_snap_context(snapc); - snap_size = 0; + snap_size = -1; snapc = get_oldest_context(inode, &snap_size); if (!snapc) { /* hmm, why does writepages get called when there @@ -749,16 +755,13 @@ retry: dout(" no snap context with dirty data?\n"); goto out; } - if (snap_size == 0) - snap_size = i_size_read(inode); dout(" oldest snapc is %p seq %lld (%d snaps)\n", snapc, snapc->seq, snapc->num_snaps); spin_lock(&ci->i_ceph_lock); truncate_seq = ci->i_truncate_seq; truncate_size = ci->i_truncate_size; - if (!snap_size) - snap_size = i_size_read(inode); + i_size = i_size_read(inode); spin_unlock(&ci->i_ceph_lock); if (last_snapc && snapc != last_snapc) { @@ -828,8 +831,10 @@ get_more_pages: dout("waiting on writeback %p\n", page); wait_on_page_writeback(page); } - if (page_offset(page) >= snap_size) { - dout("%p page eof %llu\n", page, snap_size); + if (page_offset(page) >= + (snap_size == -1 ? i_size : snap_size)) { + dout("%p page eof %llu\n", page, + (snap_size == -1 ? i_size : snap_size)); done = 1; unlock_page(page); break; @@ -884,7 +889,8 @@ get_more_pages: } if (do_sync) - osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC); + osd_req_op_init(req, 1, + CEPH_OSD_OP_STARTSYNC, 0); req->r_callback = writepages_finish; req->r_inode = inode; @@ -944,10 +950,18 @@ get_more_pages: } /* Format the osd request message and submit the write */ - offset = page_offset(pages[0]); - len = min(snap_size - offset, - (u64)locked_pages << PAGE_CACHE_SHIFT); + len = (u64)locked_pages << PAGE_CACHE_SHIFT; + if (snap_size == -1) { + len = min(len, (u64)i_size_read(inode) - offset); + /* writepages_finish() clears writeback pages + * according to the data length, so make sure + * data length covers all locked pages */ + len = max(len, 1 + + ((u64)(locked_pages - 1) << PAGE_CACHE_SHIFT)); + } else { + len = min(len, snap_size - offset); + } dout("writepages got %d pages at %llu~%llu\n", locked_pages, offset, len); @@ -1032,7 +1046,6 @@ static int ceph_update_writeable_page(struct file *file, { struct inode *inode = file_inode(file); struct ceph_inode_info *ci = ceph_inode(inode); - struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; loff_t page_off = pos & PAGE_CACHE_MASK; int pos_in_page = pos & ~PAGE_CACHE_MASK; int end_in_page = pos_in_page + len; @@ -1044,10 +1057,6 @@ retry_locked: /* writepages currently holds page lock, but if we change that later, */ wait_on_page_writeback(page); - /* check snap context */ - BUG_ON(!ci->i_snap_realm); - down_read(&mdsc->snap_rwsem); - BUG_ON(!ci->i_snap_realm->cached_context); snapc = page_snap_context(page); if (snapc && snapc != ci->i_head_snapc) { /* @@ -1055,7 +1064,6 @@ retry_locked: * context! is it writeable now? */ oldest = get_oldest_context(inode, NULL); - up_read(&mdsc->snap_rwsem); if (snapc->seq > oldest->seq) { ceph_put_snap_context(oldest); @@ -1112,7 +1120,6 @@ retry_locked: } /* we need to read it. */ - up_read(&mdsc->snap_rwsem); r = readpage_nounlock(file, page); if (r < 0) goto fail_nosnap; @@ -1157,16 +1164,13 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping, /* * we don't do anything in here that simple_write_end doesn't do - * except adjust dirty page accounting and drop read lock on - * mdsc->snap_rwsem. + * except adjust dirty page accounting */ static int ceph_write_end(struct file *file, struct address_space *mapping, loff_t pos, unsigned len, unsigned copied, struct page *page, void *fsdata) { struct inode *inode = file_inode(file); - struct ceph_fs_client *fsc = ceph_inode_to_client(inode); - struct ceph_mds_client *mdsc = fsc->mdsc; unsigned from = pos & (PAGE_CACHE_SIZE - 1); int check_cap = 0; @@ -1188,7 +1192,6 @@ static int ceph_write_end(struct file *file, struct address_space *mapping, set_page_dirty(page); unlock_page(page); - up_read(&mdsc->snap_rwsem); page_cache_release(page); if (check_cap) @@ -1314,13 +1317,17 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf) struct inode *inode = file_inode(vma->vm_file); struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_file_info *fi = vma->vm_file->private_data; - struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; + struct ceph_cap_flush *prealloc_cf; struct page *page = vmf->page; loff_t off = page_offset(page); loff_t size = i_size_read(inode); size_t len; int want, got, ret; + prealloc_cf = ceph_alloc_cap_flush(); + if (!prealloc_cf) + return VM_FAULT_SIGBUS; + if (ci->i_inline_version != CEPH_INLINE_NONE) { struct page *locked_page = NULL; if (off == 0) { @@ -1330,8 +1337,10 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf) ret = ceph_uninline_data(vma->vm_file, locked_page); if (locked_page) unlock_page(locked_page); - if (ret < 0) - return VM_FAULT_SIGBUS; + if (ret < 0) { + ret = VM_FAULT_SIGBUS; + goto out_free; + } } if (off + PAGE_CACHE_SIZE <= size) @@ -1353,7 +1362,8 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf) break; if (ret != -ERESTARTSYS) { WARN_ON(1); - return VM_FAULT_SIGBUS; + ret = VM_FAULT_SIGBUS; + goto out_free; } } dout("page_mkwrite %p %llu~%zd got cap refs on %s\n", @@ -1373,7 +1383,6 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf) if (ret == 0) { /* success. we'll keep the page locked. */ set_page_dirty(page); - up_read(&mdsc->snap_rwsem); ret = VM_FAULT_LOCKED; } else { if (ret == -ENOMEM) @@ -1389,7 +1398,8 @@ out: int dirty; spin_lock(&ci->i_ceph_lock); ci->i_inline_version = CEPH_INLINE_NONE; - dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); + dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, + &prealloc_cf); spin_unlock(&ci->i_ceph_lock); if (dirty) __mark_inode_dirty(inode, dirty); @@ -1398,6 +1408,8 @@ out: dout("page_mkwrite %p %llu~%zd dropping cap refs on %s ret %d\n", inode, off, len, ceph_cap_string(got), ret); ceph_put_cap_refs(ci, got); +out_free: + ceph_free_cap_flush(prealloc_cf); return ret; } @@ -1509,8 +1521,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page) ceph_vino(inode), 0, &len, 0, 1, CEPH_OSD_OP_CREATE, CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, - ci->i_snap_realm->cached_context, - 0, 0, false); + ceph_empty_snapc, 0, 0, false); if (IS_ERR(req)) { err = PTR_ERR(req); goto out; @@ -1528,7 +1539,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page) ceph_vino(inode), 0, &len, 1, 3, CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, - ci->i_snap_realm->cached_context, + ceph_empty_snapc, ci->i_truncate_seq, ci->i_truncate_size, false); if (IS_ERR(req)) { @@ -1597,3 +1608,206 @@ int ceph_mmap(struct file *file, struct vm_area_struct *vma) vma->vm_ops = &ceph_vmops; return 0; } + +enum { + POOL_READ = 1, + POOL_WRITE = 2, +}; + +static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool) +{ + struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->vfs_inode); + struct ceph_mds_client *mdsc = fsc->mdsc; + struct ceph_osd_request *rd_req = NULL, *wr_req = NULL; + struct rb_node **p, *parent; + struct ceph_pool_perm *perm; + struct page **pages; + int err = 0, err2 = 0, have = 0; + + down_read(&mdsc->pool_perm_rwsem); + p = &mdsc->pool_perm_tree.rb_node; + while (*p) { + perm = rb_entry(*p, struct ceph_pool_perm, node); + if (pool < perm->pool) + p = &(*p)->rb_left; + else if (pool > perm->pool) + p = &(*p)->rb_right; + else { + have = perm->perm; + break; + } + } + up_read(&mdsc->pool_perm_rwsem); + if (*p) + goto out; + + dout("__ceph_pool_perm_get pool %u no perm cached\n", pool); + + down_write(&mdsc->pool_perm_rwsem); + parent = NULL; + while (*p) { + parent = *p; + perm = rb_entry(parent, struct ceph_pool_perm, node); + if (pool < perm->pool) + p = &(*p)->rb_left; + else if (pool > perm->pool) + p = &(*p)->rb_right; + else { + have = perm->perm; + break; + } + } + if (*p) { + up_write(&mdsc->pool_perm_rwsem); + goto out; + } + + rd_req = ceph_osdc_alloc_request(&fsc->client->osdc, + ceph_empty_snapc, + 1, false, GFP_NOFS); + if (!rd_req) { + err = -ENOMEM; + goto out_unlock; + } + + rd_req->r_flags = CEPH_OSD_FLAG_READ; + osd_req_op_init(rd_req, 0, CEPH_OSD_OP_STAT, 0); + rd_req->r_base_oloc.pool = pool; + snprintf(rd_req->r_base_oid.name, sizeof(rd_req->r_base_oid.name), + "%llx.00000000", ci->i_vino.ino); + rd_req->r_base_oid.name_len = strlen(rd_req->r_base_oid.name); + + wr_req = ceph_osdc_alloc_request(&fsc->client->osdc, + ceph_empty_snapc, + 1, false, GFP_NOFS); + if (!wr_req) { + err = -ENOMEM; + goto out_unlock; + } + + wr_req->r_flags = CEPH_OSD_FLAG_WRITE | + CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK; + osd_req_op_init(wr_req, 0, CEPH_OSD_OP_CREATE, CEPH_OSD_OP_FLAG_EXCL); + wr_req->r_base_oloc.pool = pool; + wr_req->r_base_oid = rd_req->r_base_oid; + + /* one page should be large enough for STAT data */ + pages = ceph_alloc_page_vector(1, GFP_KERNEL); + if (IS_ERR(pages)) { + err = PTR_ERR(pages); + goto out_unlock; + } + + osd_req_op_raw_data_in_pages(rd_req, 0, pages, PAGE_SIZE, + 0, false, true); + ceph_osdc_build_request(rd_req, 0, NULL, CEPH_NOSNAP, + &ci->vfs_inode.i_mtime); + err = ceph_osdc_start_request(&fsc->client->osdc, rd_req, false); + + ceph_osdc_build_request(wr_req, 0, NULL, CEPH_NOSNAP, + &ci->vfs_inode.i_mtime); + err2 = ceph_osdc_start_request(&fsc->client->osdc, wr_req, false); + + if (!err) + err = ceph_osdc_wait_request(&fsc->client->osdc, rd_req); + if (!err2) + err2 = ceph_osdc_wait_request(&fsc->client->osdc, wr_req); + + if (err >= 0 || err == -ENOENT) + have |= POOL_READ; + else if (err != -EPERM) + goto out_unlock; + + if (err2 == 0 || err2 == -EEXIST) + have |= POOL_WRITE; + else if (err2 != -EPERM) { + err = err2; + goto out_unlock; + } + + perm = kmalloc(sizeof(*perm), GFP_NOFS); + if (!perm) { + err = -ENOMEM; + goto out_unlock; + } + + perm->pool = pool; + perm->perm = have; + rb_link_node(&perm->node, parent, p); + rb_insert_color(&perm->node, &mdsc->pool_perm_tree); + err = 0; +out_unlock: + up_write(&mdsc->pool_perm_rwsem); + + if (rd_req) + ceph_osdc_put_request(rd_req); + if (wr_req) + ceph_osdc_put_request(wr_req); +out: + if (!err) + err = have; + dout("__ceph_pool_perm_get pool %u result = %d\n", pool, err); + return err; +} + +int ceph_pool_perm_check(struct ceph_inode_info *ci, int need) +{ + u32 pool; + int ret, flags; + + if (ceph_test_mount_opt(ceph_inode_to_client(&ci->vfs_inode), + NOPOOLPERM)) + return 0; + + spin_lock(&ci->i_ceph_lock); + flags = ci->i_ceph_flags; + pool = ceph_file_layout_pg_pool(ci->i_layout); + spin_unlock(&ci->i_ceph_lock); +check: + if (flags & CEPH_I_POOL_PERM) { + if ((need & CEPH_CAP_FILE_RD) && !(flags & CEPH_I_POOL_RD)) { + dout("ceph_pool_perm_check pool %u no read perm\n", + pool); + return -EPERM; + } + if ((need & CEPH_CAP_FILE_WR) && !(flags & CEPH_I_POOL_WR)) { + dout("ceph_pool_perm_check pool %u no write perm\n", + pool); + return -EPERM; + } + return 0; + } + + ret = __ceph_pool_perm_get(ci, pool); + if (ret < 0) + return ret; + + flags = CEPH_I_POOL_PERM; + if (ret & POOL_READ) + flags |= CEPH_I_POOL_RD; + if (ret & POOL_WRITE) + flags |= CEPH_I_POOL_WR; + + spin_lock(&ci->i_ceph_lock); + if (pool == ceph_file_layout_pg_pool(ci->i_layout)) { + ci->i_ceph_flags = flags; + } else { + pool = ceph_file_layout_pg_pool(ci->i_layout); + flags = ci->i_ceph_flags; + } + spin_unlock(&ci->i_ceph_lock); + goto check; +} + +void ceph_pool_perm_destroy(struct ceph_mds_client *mdsc) +{ + struct ceph_pool_perm *perm; + struct rb_node *n; + + while (!RB_EMPTY_ROOT(&mdsc->pool_perm_tree)) { + n = rb_first(&mdsc->pool_perm_tree); + perm = rb_entry(n, struct ceph_pool_perm, node); + rb_erase(n, &mdsc->pool_perm_tree); + kfree(perm); + } +} diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index be5ea6af8366..dc10c9dd36c1 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -833,7 +833,9 @@ int __ceph_caps_used(struct ceph_inode_info *ci) used |= CEPH_CAP_PIN; if (ci->i_rd_ref) used |= CEPH_CAP_FILE_RD; - if (ci->i_rdcache_ref || ci->vfs_inode.i_data.nrpages) + if (ci->i_rdcache_ref || + (!S_ISDIR(ci->vfs_inode.i_mode) && /* ignore readdir cache */ + ci->vfs_inode.i_data.nrpages)) used |= CEPH_CAP_FILE_CACHE; if (ci->i_wr_ref) used |= CEPH_CAP_FILE_WR; @@ -926,16 +928,6 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release) /* remove from session list */ spin_lock(&session->s_cap_lock); - /* - * s_cap_reconnect is protected by s_cap_lock. no one changes - * s_cap_gen while session is in the reconnect state. - */ - if (queue_release && - (!session->s_cap_reconnect || - cap->cap_gen == session->s_cap_gen)) - __queue_cap_release(session, ci->i_vino.ino, cap->cap_id, - cap->mseq, cap->issue_seq); - if (session->s_cap_iterator == cap) { /* not yet, we are iterating over this very cap */ dout("__ceph_remove_cap delaying %p removal from session %p\n", @@ -948,6 +940,25 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release) } /* protect backpointer with s_cap_lock: see iterate_session_caps */ cap->ci = NULL; + + /* + * s_cap_reconnect is protected by s_cap_lock. no one changes + * s_cap_gen while session is in the reconnect state. + */ + if (queue_release && + (!session->s_cap_reconnect || cap->cap_gen == session->s_cap_gen)) { + cap->queue_release = 1; + if (removed) { + list_add_tail(&cap->session_caps, + &session->s_cap_releases); + session->s_num_cap_releases++; + removed = 0; + } + } else { + cap->queue_release = 0; + } + cap->cap_ino = ci->i_vino.ino; + spin_unlock(&session->s_cap_lock); /* remove from inode list */ @@ -977,8 +988,8 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release) static int send_cap_msg(struct ceph_mds_session *session, u64 ino, u64 cid, int op, int caps, int wanted, int dirty, - u32 seq, u64 flush_tid, u32 issue_seq, u32 mseq, - u64 size, u64 max_size, + u32 seq, u64 flush_tid, u64 oldest_flush_tid, + u32 issue_seq, u32 mseq, u64 size, u64 max_size, struct timespec *mtime, struct timespec *atime, u64 time_warp_seq, kuid_t uid, kgid_t gid, umode_t mode, @@ -992,20 +1003,23 @@ static int send_cap_msg(struct ceph_mds_session *session, size_t extra_len; dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s" - " seq %u/%u mseq %u follows %lld size %llu/%llu" + " seq %u/%u tid %llu/%llu mseq %u follows %lld size %llu/%llu" " xattr_ver %llu xattr_len %d\n", ceph_cap_op_name(op), cid, ino, ceph_cap_string(caps), ceph_cap_string(wanted), ceph_cap_string(dirty), - seq, issue_seq, mseq, follows, size, max_size, + seq, issue_seq, flush_tid, oldest_flush_tid, + mseq, follows, size, max_size, xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0); - /* flock buffer size + inline version + inline data size */ - extra_len = 4 + 8 + 4; + /* flock buffer size + inline version + inline data size + + * osd_epoch_barrier + oldest_flush_tid */ + extra_len = 4 + 8 + 4 + 4 + 8; msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc) + extra_len, GFP_NOFS, false); if (!msg) return -ENOMEM; + msg->hdr.version = cpu_to_le16(6); msg->hdr.tid = cpu_to_le64(flush_tid); fc = msg->front.iov_base; @@ -1041,6 +1055,10 @@ static int send_cap_msg(struct ceph_mds_session *session, ceph_encode_64(&p, inline_data ? 0 : CEPH_INLINE_NONE); /* inline data size */ ceph_encode_32(&p, 0); + /* osd_epoch_barrier */ + ceph_encode_32(&p, 0); + /* oldest_flush_tid */ + ceph_encode_64(&p, oldest_flush_tid); fc->xattr_version = cpu_to_le64(xattr_version); if (xattrs_buf) { @@ -1053,44 +1071,6 @@ static int send_cap_msg(struct ceph_mds_session *session, return 0; } -void __queue_cap_release(struct ceph_mds_session *session, - u64 ino, u64 cap_id, u32 migrate_seq, - u32 issue_seq) -{ - struct ceph_msg *msg; - struct ceph_mds_cap_release *head; - struct ceph_mds_cap_item *item; - - BUG_ON(!session->s_num_cap_releases); - msg = list_first_entry(&session->s_cap_releases, - struct ceph_msg, list_head); - - dout(" adding %llx release to mds%d msg %p (%d left)\n", - ino, session->s_mds, msg, session->s_num_cap_releases); - - BUG_ON(msg->front.iov_len + sizeof(*item) > PAGE_CACHE_SIZE); - head = msg->front.iov_base; - le32_add_cpu(&head->num, 1); - item = msg->front.iov_base + msg->front.iov_len; - item->ino = cpu_to_le64(ino); - item->cap_id = cpu_to_le64(cap_id); - item->migrate_seq = cpu_to_le32(migrate_seq); - item->seq = cpu_to_le32(issue_seq); - - session->s_num_cap_releases--; - - msg->front.iov_len += sizeof(*item); - if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { - dout(" release msg %p full\n", msg); - list_move_tail(&msg->list_head, &session->s_cap_releases_done); - } else { - dout(" release msg %p at %d/%d (%d)\n", msg, - (int)le32_to_cpu(head->num), - (int)CEPH_CAPS_PER_RELEASE, - (int)msg->front.iov_len); - } -} - /* * Queue cap releases when an inode is dropped from our cache. Since * inode is about to be destroyed, there is no need for i_ceph_lock. @@ -1127,7 +1107,7 @@ void ceph_queue_caps_release(struct inode *inode) */ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, int op, int used, int want, int retain, int flushing, - unsigned *pflush_tid) + u64 flush_tid, u64 oldest_flush_tid) __releases(cap->ci->i_ceph_lock) { struct ceph_inode_info *ci = cap->ci; @@ -1145,8 +1125,6 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, u64 xattr_version = 0; struct ceph_buffer *xattr_blob = NULL; int delayed = 0; - u64 flush_tid = 0; - int i; int ret; bool inline_data; @@ -1190,26 +1168,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, cap->implemented &= cap->issued | used; cap->mds_wanted = want; - if (flushing) { - /* - * assign a tid for flush operations so we can avoid - * flush1 -> dirty1 -> flush2 -> flushack1 -> mark - * clean type races. track latest tid for every bit - * so we can handle flush AxFw, flush Fw, and have the - * first ack clean Ax. - */ - flush_tid = ++ci->i_cap_flush_last_tid; - if (pflush_tid) - *pflush_tid = flush_tid; - dout(" cap_flush_tid %d\n", (int)flush_tid); - for (i = 0; i < CEPH_CAP_BITS; i++) - if (flushing & (1 << i)) - ci->i_cap_flush_tid[i] = flush_tid; - - follows = ci->i_head_snapc->seq; - } else { - follows = 0; - } + follows = flushing ? ci->i_head_snapc->seq : 0; keep = cap->implemented; seq = cap->seq; @@ -1237,7 +1196,8 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, spin_unlock(&ci->i_ceph_lock); ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id, - op, keep, want, flushing, seq, flush_tid, issue_seq, mseq, + op, keep, want, flushing, seq, + flush_tid, oldest_flush_tid, issue_seq, mseq, size, max_size, &mtime, &atime, time_warp_seq, uid, gid, mode, xattr_version, xattr_blob, follows, inline_data); @@ -1259,14 +1219,14 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, * asynchronously back to the MDS once sync writes complete and dirty * data is written out. * - * Unless @again is true, skip cap_snaps that were already sent to + * Unless @kick is true, skip cap_snaps that were already sent to * the MDS (i.e., during this session). * * Called under i_ceph_lock. Takes s_mutex as needed. */ void __ceph_flush_snaps(struct ceph_inode_info *ci, struct ceph_mds_session **psession, - int again) + int kick) __releases(ci->i_ceph_lock) __acquires(ci->i_ceph_lock) { @@ -1297,11 +1257,8 @@ retry: if (capsnap->dirty_pages || capsnap->writing) break; - /* - * if cap writeback already occurred, we should have dropped - * the capsnap in ceph_put_wrbuffer_cap_refs. - */ - BUG_ON(capsnap->dirty == 0); + /* should be removed by ceph_try_drop_cap_snap() */ + BUG_ON(!capsnap->need_flush); /* pick mds, take s_mutex */ if (ci->i_auth_cap == NULL) { @@ -1310,7 +1267,7 @@ retry: } /* only flush each capsnap once */ - if (!again && !list_empty(&capsnap->flushing_item)) { + if (!kick && !list_empty(&capsnap->flushing_item)) { dout("already flushed %p, skipping\n", capsnap); continue; } @@ -1320,6 +1277,9 @@ retry: if (session && session->s_mds != mds) { dout("oops, wrong session %p mutex\n", session); + if (kick) + goto out; + mutex_unlock(&session->s_mutex); ceph_put_mds_session(session); session = NULL; @@ -1343,20 +1303,22 @@ retry: goto retry; } - capsnap->flush_tid = ++ci->i_cap_flush_last_tid; + spin_lock(&mdsc->cap_dirty_lock); + capsnap->flush_tid = ++mdsc->last_cap_flush_tid; + spin_unlock(&mdsc->cap_dirty_lock); + atomic_inc(&capsnap->nref); - if (!list_empty(&capsnap->flushing_item)) - list_del_init(&capsnap->flushing_item); - list_add_tail(&capsnap->flushing_item, - &session->s_cap_snaps_flushing); + if (list_empty(&capsnap->flushing_item)) + list_add_tail(&capsnap->flushing_item, + &session->s_cap_snaps_flushing); spin_unlock(&ci->i_ceph_lock); dout("flush_snaps %p cap_snap %p follows %lld tid %llu\n", inode, capsnap, capsnap->follows, capsnap->flush_tid); send_cap_msg(session, ceph_vino(inode).ino, 0, CEPH_CAP_OP_FLUSHSNAP, capsnap->issued, 0, - capsnap->dirty, 0, capsnap->flush_tid, 0, mseq, - capsnap->size, 0, + capsnap->dirty, 0, capsnap->flush_tid, 0, + 0, mseq, capsnap->size, 0, &capsnap->mtime, &capsnap->atime, capsnap->time_warp_seq, capsnap->uid, capsnap->gid, capsnap->mode, @@ -1396,7 +1358,8 @@ static void ceph_flush_snaps(struct ceph_inode_info *ci) * Caller is then responsible for calling __mark_inode_dirty with the * returned flags value. */ -int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask) +int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask, + struct ceph_cap_flush **pcf) { struct ceph_mds_client *mdsc = ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc; @@ -1416,9 +1379,14 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask) ceph_cap_string(was | mask)); ci->i_dirty_caps |= mask; if (was == 0) { - if (!ci->i_head_snapc) + WARN_ON_ONCE(ci->i_prealloc_cap_flush); + swap(ci->i_prealloc_cap_flush, *pcf); + + if (!ci->i_head_snapc) { + WARN_ON_ONCE(!rwsem_is_locked(&mdsc->snap_rwsem)); ci->i_head_snapc = ceph_get_snap_context( ci->i_snap_realm->cached_context); + } dout(" inode %p now dirty snapc %p auth cap %p\n", &ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap); BUG_ON(!list_empty(&ci->i_dirty_item)); @@ -1429,6 +1397,8 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask) ihold(inode); dirty |= I_DIRTY_SYNC; } + } else { + WARN_ON_ONCE(!ci->i_prealloc_cap_flush); } BUG_ON(list_empty(&ci->i_dirty_item)); if (((was | ci->i_flushing_caps) & CEPH_CAP_FILE_BUFFER) && @@ -1438,6 +1408,74 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask) return dirty; } +static void __add_cap_flushing_to_inode(struct ceph_inode_info *ci, + struct ceph_cap_flush *cf) +{ + struct rb_node **p = &ci->i_cap_flush_tree.rb_node; + struct rb_node *parent = NULL; + struct ceph_cap_flush *other = NULL; + + while (*p) { + parent = *p; + other = rb_entry(parent, struct ceph_cap_flush, i_node); + + if (cf->tid < other->tid) + p = &(*p)->rb_left; + else if (cf->tid > other->tid) + p = &(*p)->rb_right; + else + BUG(); + } + + rb_link_node(&cf->i_node, parent, p); + rb_insert_color(&cf->i_node, &ci->i_cap_flush_tree); +} + +static void __add_cap_flushing_to_mdsc(struct ceph_mds_client *mdsc, + struct ceph_cap_flush *cf) +{ + struct rb_node **p = &mdsc->cap_flush_tree.rb_node; + struct rb_node *parent = NULL; + struct ceph_cap_flush *other = NULL; + + while (*p) { + parent = *p; + other = rb_entry(parent, struct ceph_cap_flush, g_node); + + if (cf->tid < other->tid) + p = &(*p)->rb_left; + else if (cf->tid > other->tid) + p = &(*p)->rb_right; + else + BUG(); + } + + rb_link_node(&cf->g_node, parent, p); + rb_insert_color(&cf->g_node, &mdsc->cap_flush_tree); +} + +struct ceph_cap_flush *ceph_alloc_cap_flush(void) +{ + return kmem_cache_alloc(ceph_cap_flush_cachep, GFP_KERNEL); +} + +void ceph_free_cap_flush(struct ceph_cap_flush *cf) +{ + if (cf) + kmem_cache_free(ceph_cap_flush_cachep, cf); +} + +static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc) +{ + struct rb_node *n = rb_first(&mdsc->cap_flush_tree); + if (n) { + struct ceph_cap_flush *cf = + rb_entry(n, struct ceph_cap_flush, g_node); + return cf->tid; + } + return 0; +} + /* * Add dirty inode to the flushing list. Assigned a seq number so we * can wait for caps to flush without starving. @@ -1445,14 +1483,17 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask) * Called under i_ceph_lock. */ static int __mark_caps_flushing(struct inode *inode, - struct ceph_mds_session *session) + struct ceph_mds_session *session, + u64 *flush_tid, u64 *oldest_flush_tid) { struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_cap_flush *cf = NULL; int flushing; BUG_ON(ci->i_dirty_caps == 0); BUG_ON(list_empty(&ci->i_dirty_item)); + BUG_ON(!ci->i_prealloc_cap_flush); flushing = ci->i_dirty_caps; dout("__mark_caps_flushing flushing %s, flushing_caps %s -> %s\n", @@ -1463,22 +1504,31 @@ static int __mark_caps_flushing(struct inode *inode, ci->i_dirty_caps = 0; dout(" inode %p now !dirty\n", inode); + swap(cf, ci->i_prealloc_cap_flush); + cf->caps = flushing; + cf->kick = false; + spin_lock(&mdsc->cap_dirty_lock); list_del_init(&ci->i_dirty_item); + cf->tid = ++mdsc->last_cap_flush_tid; + __add_cap_flushing_to_mdsc(mdsc, cf); + *oldest_flush_tid = __get_oldest_flush_tid(mdsc); + if (list_empty(&ci->i_flushing_item)) { - ci->i_cap_flush_seq = ++mdsc->cap_flush_seq; list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing); mdsc->num_cap_flushing++; - dout(" inode %p now flushing seq %lld\n", inode, - ci->i_cap_flush_seq); + dout(" inode %p now flushing tid %llu\n", inode, cf->tid); } else { list_move_tail(&ci->i_flushing_item, &session->s_cap_flushing); - dout(" inode %p now flushing (more) seq %lld\n", inode, - ci->i_cap_flush_seq); + dout(" inode %p now flushing (more) tid %llu\n", + inode, cf->tid); } spin_unlock(&mdsc->cap_dirty_lock); + __add_cap_flushing_to_inode(ci, cf); + + *flush_tid = cf->tid; return flushing; } @@ -1524,6 +1574,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, struct ceph_mds_client *mdsc = fsc->mdsc; struct inode *inode = &ci->vfs_inode; struct ceph_cap *cap; + u64 flush_tid, oldest_flush_tid; int file_wanted, used, cap_used; int took_snap_rwsem = 0; /* true if mdsc->snap_rwsem held */ int issued, implemented, want, retain, revoking, flushing = 0; @@ -1553,13 +1604,13 @@ retry: retry_locked: file_wanted = __ceph_caps_file_wanted(ci); used = __ceph_caps_used(ci); - want = file_wanted | used; issued = __ceph_caps_issued(ci, &implemented); revoking = implemented & ~issued; - retain = want | CEPH_CAP_PIN; + want = file_wanted; + retain = file_wanted | used | CEPH_CAP_PIN; if (!mdsc->stopping && inode->i_nlink > 0) { - if (want) { + if (file_wanted) { retain |= CEPH_CAP_ANY; /* be greedy */ } else if (S_ISDIR(inode->i_mode) && (issued & CEPH_CAP_FILE_SHARED) && @@ -1602,9 +1653,10 @@ retry_locked: * If we fail, it's because pages are locked.... try again later. */ if ((!is_delayed || mdsc->stopping) && - ci->i_wrbuffer_ref == 0 && /* no dirty pages... */ - inode->i_data.nrpages && /* have cached pages */ - (file_wanted == 0 || /* no open files */ + !S_ISDIR(inode->i_mode) && /* ignore readdir cache */ + ci->i_wrbuffer_ref == 0 && /* no dirty pages... */ + inode->i_data.nrpages && /* have cached pages */ + (file_wanted == 0 || /* no open files */ (revoking & (CEPH_CAP_FILE_CACHE| CEPH_CAP_FILE_LAZYIO))) && /* or revoking cache */ !tried_invalidate) { @@ -1742,17 +1794,25 @@ ack: took_snap_rwsem = 1; } - if (cap == ci->i_auth_cap && ci->i_dirty_caps) - flushing = __mark_caps_flushing(inode, session); - else + if (cap == ci->i_auth_cap && ci->i_dirty_caps) { + flushing = __mark_caps_flushing(inode, session, + &flush_tid, + &oldest_flush_tid); + } else { flushing = 0; + flush_tid = 0; + spin_lock(&mdsc->cap_dirty_lock); + oldest_flush_tid = __get_oldest_flush_tid(mdsc); + spin_unlock(&mdsc->cap_dirty_lock); + } mds = cap->mds; /* remember mds, so we don't repeat */ sent++; /* __send_cap drops i_ceph_lock */ delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, cap_used, - want, retain, flushing, NULL); + want, retain, flushing, + flush_tid, oldest_flush_tid); goto retry; /* retake i_ceph_lock and restart our cap scan. */ } @@ -1781,12 +1841,13 @@ ack: /* * Try to flush dirty caps back to the auth mds. */ -static int try_flush_caps(struct inode *inode, unsigned *flush_tid) +static int try_flush_caps(struct inode *inode, u64 *ptid) { struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; struct ceph_inode_info *ci = ceph_inode(inode); - int flushing = 0; struct ceph_mds_session *session = NULL; + int flushing = 0; + u64 flush_tid = 0, oldest_flush_tid = 0; retry: spin_lock(&ci->i_ceph_lock); @@ -1811,42 +1872,54 @@ retry: if (cap->session->s_state < CEPH_MDS_SESSION_OPEN) goto out; - flushing = __mark_caps_flushing(inode, session); + flushing = __mark_caps_flushing(inode, session, &flush_tid, + &oldest_flush_tid); /* __send_cap drops i_ceph_lock */ delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, used, want, - cap->issued | cap->implemented, flushing, - flush_tid); - if (!delayed) - goto out_unlocked; + (cap->issued | cap->implemented), + flushing, flush_tid, oldest_flush_tid); - spin_lock(&ci->i_ceph_lock); - __cap_delay_requeue(mdsc, ci); + if (delayed) { + spin_lock(&ci->i_ceph_lock); + __cap_delay_requeue(mdsc, ci); + spin_unlock(&ci->i_ceph_lock); + } + } else { + struct rb_node *n = rb_last(&ci->i_cap_flush_tree); + if (n) { + struct ceph_cap_flush *cf = + rb_entry(n, struct ceph_cap_flush, i_node); + flush_tid = cf->tid; + } + flushing = ci->i_flushing_caps; + spin_unlock(&ci->i_ceph_lock); } out: - spin_unlock(&ci->i_ceph_lock); -out_unlocked: if (session) mutex_unlock(&session->s_mutex); + + *ptid = flush_tid; return flushing; } /* * Return true if we've flushed caps through the given flush_tid. */ -static int caps_are_flushed(struct inode *inode, unsigned tid) +static int caps_are_flushed(struct inode *inode, u64 flush_tid) { struct ceph_inode_info *ci = ceph_inode(inode); - int i, ret = 1; + struct ceph_cap_flush *cf; + struct rb_node *n; + int ret = 1; spin_lock(&ci->i_ceph_lock); - for (i = 0; i < CEPH_CAP_BITS; i++) - if ((ci->i_flushing_caps & (1 << i)) && - ci->i_cap_flush_tid[i] <= tid) { - /* still flushing this bit */ + n = rb_first(&ci->i_cap_flush_tree); + if (n) { + cf = rb_entry(n, struct ceph_cap_flush, i_node); + if (cf->tid <= flush_tid) ret = 0; - break; - } + } spin_unlock(&ci->i_ceph_lock); return ret; } @@ -1864,13 +1937,16 @@ static void sync_write_wait(struct inode *inode) struct ceph_osd_request *req; u64 last_tid; + if (!S_ISREG(inode->i_mode)) + return; + spin_lock(&ci->i_unsafe_lock); if (list_empty(head)) goto out; /* set upper bound as _last_ entry in chain */ - req = list_entry(head->prev, struct ceph_osd_request, - r_unsafe_item); + req = list_last_entry(head, struct ceph_osd_request, + r_unsafe_item); last_tid = req->r_tid; do { @@ -1888,18 +1964,64 @@ static void sync_write_wait(struct inode *inode) */ if (list_empty(head)) break; - req = list_entry(head->next, struct ceph_osd_request, - r_unsafe_item); + req = list_first_entry(head, struct ceph_osd_request, + r_unsafe_item); } while (req->r_tid < last_tid); out: spin_unlock(&ci->i_unsafe_lock); } +/* + * wait for any uncommitted directory operations to commit. + */ +static int unsafe_dirop_wait(struct inode *inode) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + struct list_head *head = &ci->i_unsafe_dirops; + struct ceph_mds_request *req; + u64 last_tid; + int ret = 0; + + if (!S_ISDIR(inode->i_mode)) + return 0; + + spin_lock(&ci->i_unsafe_lock); + if (list_empty(head)) + goto out; + + req = list_last_entry(head, struct ceph_mds_request, + r_unsafe_dir_item); + last_tid = req->r_tid; + + do { + ceph_mdsc_get_request(req); + spin_unlock(&ci->i_unsafe_lock); + + dout("unsafe_dirop_wait %p wait on tid %llu (until %llu)\n", + inode, req->r_tid, last_tid); + ret = !wait_for_completion_timeout(&req->r_safe_completion, + ceph_timeout_jiffies(req->r_timeout)); + if (ret) + ret = -EIO; /* timed out */ + + ceph_mdsc_put_request(req); + + spin_lock(&ci->i_unsafe_lock); + if (ret || list_empty(head)) + break; + req = list_first_entry(head, struct ceph_mds_request, + r_unsafe_dir_item); + } while (req->r_tid < last_tid); +out: + spin_unlock(&ci->i_unsafe_lock); + return ret; +} + int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) { struct inode *inode = file->f_mapping->host; struct ceph_inode_info *ci = ceph_inode(inode); - unsigned flush_tid; + u64 flush_tid; int ret; int dirty; @@ -1908,25 +2030,30 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) ret = filemap_write_and_wait_range(inode->i_mapping, start, end); if (ret < 0) - return ret; + goto out; + + if (datasync) + goto out; + mutex_lock(&inode->i_mutex); dirty = try_flush_caps(inode, &flush_tid); dout("fsync dirty caps are %s\n", ceph_cap_string(dirty)); + ret = unsafe_dirop_wait(inode); + /* * only wait on non-file metadata writeback (the mds * can recover size and mtime, so we don't need to * wait for that) */ - if (!datasync && (dirty & ~CEPH_CAP_ANY_FILE_WR)) { - dout("fsync waiting for flush_tid %u\n", flush_tid); + if (!ret && (dirty & ~CEPH_CAP_ANY_FILE_WR)) { ret = wait_event_interruptible(ci->i_cap_wq, - caps_are_flushed(inode, flush_tid)); + caps_are_flushed(inode, flush_tid)); } - - dout("fsync %p%s done\n", inode, datasync ? " datasync" : ""); mutex_unlock(&inode->i_mutex); +out: + dout("fsync %p%s result=%d\n", inode, datasync ? " datasync" : "", ret); return ret; } @@ -1939,7 +2066,7 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) int ceph_write_inode(struct inode *inode, struct writeback_control *wbc) { struct ceph_inode_info *ci = ceph_inode(inode); - unsigned flush_tid; + u64 flush_tid; int err = 0; int dirty; int wait = wbc->sync_mode == WB_SYNC_ALL; @@ -1994,6 +2121,104 @@ static void kick_flushing_capsnaps(struct ceph_mds_client *mdsc, } } +static int __kick_flushing_caps(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session, + struct ceph_inode_info *ci, + bool kick_all) +{ + struct inode *inode = &ci->vfs_inode; + struct ceph_cap *cap; + struct ceph_cap_flush *cf; + struct rb_node *n; + int delayed = 0; + u64 first_tid = 0; + u64 oldest_flush_tid; + + spin_lock(&mdsc->cap_dirty_lock); + oldest_flush_tid = __get_oldest_flush_tid(mdsc); + spin_unlock(&mdsc->cap_dirty_lock); + + while (true) { + spin_lock(&ci->i_ceph_lock); + cap = ci->i_auth_cap; + if (!(cap && cap->session == session)) { + pr_err("%p auth cap %p not mds%d ???\n", inode, + cap, session->s_mds); + spin_unlock(&ci->i_ceph_lock); + break; + } + + for (n = rb_first(&ci->i_cap_flush_tree); n; n = rb_next(n)) { + cf = rb_entry(n, struct ceph_cap_flush, i_node); + if (cf->tid < first_tid) + continue; + if (kick_all || cf->kick) + break; + } + if (!n) { + spin_unlock(&ci->i_ceph_lock); + break; + } + + cf = rb_entry(n, struct ceph_cap_flush, i_node); + cf->kick = false; + + first_tid = cf->tid + 1; + + dout("kick_flushing_caps %p cap %p tid %llu %s\n", inode, + cap, cf->tid, ceph_cap_string(cf->caps)); + delayed |= __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, + __ceph_caps_used(ci), + __ceph_caps_wanted(ci), + cap->issued | cap->implemented, + cf->caps, cf->tid, oldest_flush_tid); + } + return delayed; +} + +void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session) +{ + struct ceph_inode_info *ci; + struct ceph_cap *cap; + struct ceph_cap_flush *cf; + struct rb_node *n; + + dout("early_kick_flushing_caps mds%d\n", session->s_mds); + list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) { + spin_lock(&ci->i_ceph_lock); + cap = ci->i_auth_cap; + if (!(cap && cap->session == session)) { + pr_err("%p auth cap %p not mds%d ???\n", + &ci->vfs_inode, cap, session->s_mds); + spin_unlock(&ci->i_ceph_lock); + continue; + } + + + /* + * if flushing caps were revoked, we re-send the cap flush + * in client reconnect stage. This guarantees MDS * processes + * the cap flush message before issuing the flushing caps to + * other client. + */ + if ((cap->issued & ci->i_flushing_caps) != + ci->i_flushing_caps) { + spin_unlock(&ci->i_ceph_lock); + if (!__kick_flushing_caps(mdsc, session, ci, true)) + continue; + spin_lock(&ci->i_ceph_lock); + } + + for (n = rb_first(&ci->i_cap_flush_tree); n; n = rb_next(n)) { + cf = rb_entry(n, struct ceph_cap_flush, i_node); + cf->kick = true; + } + + spin_unlock(&ci->i_ceph_lock); + } +} + void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc, struct ceph_mds_session *session) { @@ -2003,28 +2228,10 @@ void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc, dout("kick_flushing_caps mds%d\n", session->s_mds); list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) { - struct inode *inode = &ci->vfs_inode; - struct ceph_cap *cap; - int delayed = 0; - - spin_lock(&ci->i_ceph_lock); - cap = ci->i_auth_cap; - if (cap && cap->session == session) { - dout("kick_flushing_caps %p cap %p %s\n", inode, - cap, ceph_cap_string(ci->i_flushing_caps)); - delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, - __ceph_caps_used(ci), - __ceph_caps_wanted(ci), - cap->issued | cap->implemented, - ci->i_flushing_caps, NULL); - if (delayed) { - spin_lock(&ci->i_ceph_lock); - __cap_delay_requeue(mdsc, ci); - spin_unlock(&ci->i_ceph_lock); - } - } else { - pr_err("%p auth cap %p not mds%d ???\n", inode, - cap, session->s_mds); + int delayed = __kick_flushing_caps(mdsc, session, ci, false); + if (delayed) { + spin_lock(&ci->i_ceph_lock); + __cap_delay_requeue(mdsc, ci); spin_unlock(&ci->i_ceph_lock); } } @@ -2036,26 +2243,25 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc, { struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_cap *cap; - int delayed = 0; spin_lock(&ci->i_ceph_lock); cap = ci->i_auth_cap; - dout("kick_flushing_inode_caps %p flushing %s flush_seq %lld\n", inode, - ceph_cap_string(ci->i_flushing_caps), ci->i_cap_flush_seq); + dout("kick_flushing_inode_caps %p flushing %s\n", inode, + ceph_cap_string(ci->i_flushing_caps)); __ceph_flush_snaps(ci, &session, 1); if (ci->i_flushing_caps) { + int delayed; + spin_lock(&mdsc->cap_dirty_lock); list_move_tail(&ci->i_flushing_item, &cap->session->s_cap_flushing); spin_unlock(&mdsc->cap_dirty_lock); - delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, - __ceph_caps_used(ci), - __ceph_caps_wanted(ci), - cap->issued | cap->implemented, - ci->i_flushing_caps, NULL); + spin_unlock(&ci->i_ceph_lock); + + delayed = __kick_flushing_caps(mdsc, session, ci, true); if (delayed) { spin_lock(&ci->i_ceph_lock); __cap_delay_requeue(mdsc, ci); @@ -2073,7 +2279,8 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc, * * Protected by i_ceph_lock. */ -static void __take_cap_refs(struct ceph_inode_info *ci, int got) +static void __take_cap_refs(struct ceph_inode_info *ci, int got, + bool snap_rwsem_locked) { if (got & CEPH_CAP_PIN) ci->i_pin_ref++; @@ -2081,8 +2288,14 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got) ci->i_rd_ref++; if (got & CEPH_CAP_FILE_CACHE) ci->i_rdcache_ref++; - if (got & CEPH_CAP_FILE_WR) + if (got & CEPH_CAP_FILE_WR) { + if (ci->i_wr_ref == 0 && !ci->i_head_snapc) { + BUG_ON(!snap_rwsem_locked); + ci->i_head_snapc = ceph_get_snap_context( + ci->i_snap_realm->cached_context); + } ci->i_wr_ref++; + } if (got & CEPH_CAP_FILE_BUFFER) { if (ci->i_wb_ref == 0) ihold(&ci->vfs_inode); @@ -2100,16 +2313,19 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got) * requested from the MDS. */ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, - loff_t endoff, int *got, int *check_max, int *err) + loff_t endoff, bool nonblock, int *got, int *err) { struct inode *inode = &ci->vfs_inode; + struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; int ret = 0; int have, implemented; int file_wanted; + bool snap_rwsem_locked = false; dout("get_cap_refs %p need %s want %s\n", inode, ceph_cap_string(need), ceph_cap_string(want)); +again: spin_lock(&ci->i_ceph_lock); /* make sure file is actually open */ @@ -2125,6 +2341,10 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, /* finish pending truncate */ while (ci->i_truncate_pending) { spin_unlock(&ci->i_ceph_lock); + if (snap_rwsem_locked) { + up_read(&mdsc->snap_rwsem); + snap_rwsem_locked = false; + } __ceph_do_pending_vmtruncate(inode); spin_lock(&ci->i_ceph_lock); } @@ -2136,7 +2356,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, dout("get_cap_refs %p endoff %llu > maxsize %llu\n", inode, endoff, ci->i_max_size); if (endoff > ci->i_requested_max_size) { - *check_max = 1; + *err = -EAGAIN; ret = 1; } goto out_unlock; @@ -2164,8 +2384,29 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, inode, ceph_cap_string(have), ceph_cap_string(not), ceph_cap_string(revoking)); if ((revoking & not) == 0) { + if (!snap_rwsem_locked && + !ci->i_head_snapc && + (need & CEPH_CAP_FILE_WR)) { + if (!down_read_trylock(&mdsc->snap_rwsem)) { + /* + * we can not call down_read() when + * task isn't in TASK_RUNNING state + */ + if (nonblock) { + *err = -EAGAIN; + ret = 1; + goto out_unlock; + } + + spin_unlock(&ci->i_ceph_lock); + down_read(&mdsc->snap_rwsem); + snap_rwsem_locked = true; + goto again; + } + snap_rwsem_locked = true; + } *got = need | (have & want); - __take_cap_refs(ci, *got); + __take_cap_refs(ci, *got, true); ret = 1; } } else { @@ -2189,6 +2430,8 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, } out_unlock: spin_unlock(&ci->i_ceph_lock); + if (snap_rwsem_locked) + up_read(&mdsc->snap_rwsem); dout("get_cap_refs %p ret %d got %s\n", inode, ret, ceph_cap_string(*got)); @@ -2231,50 +2474,70 @@ static void check_max_size(struct inode *inode, loff_t endoff) int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, loff_t endoff, int *got, struct page **pinned_page) { - int _got, check_max, ret, err = 0; + int _got, ret, err = 0; -retry: - if (endoff > 0) - check_max_size(&ci->vfs_inode, endoff); - _got = 0; - check_max = 0; - ret = wait_event_interruptible(ci->i_cap_wq, - try_get_cap_refs(ci, need, want, endoff, - &_got, &check_max, &err)); - if (err) - ret = err; + ret = ceph_pool_perm_check(ci, need); if (ret < 0) return ret; - if (check_max) - goto retry; + while (true) { + if (endoff > 0) + check_max_size(&ci->vfs_inode, endoff); - if (ci->i_inline_version != CEPH_INLINE_NONE && - (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) && - i_size_read(&ci->vfs_inode) > 0) { - struct page *page = find_get_page(ci->vfs_inode.i_mapping, 0); - if (page) { - if (PageUptodate(page)) { - *pinned_page = page; - goto out; - } - page_cache_release(page); - } - /* - * drop cap refs first because getattr while holding - * caps refs can cause deadlock. - */ - ceph_put_cap_refs(ci, _got); + err = 0; _got = 0; + ret = try_get_cap_refs(ci, need, want, endoff, + false, &_got, &err); + if (ret) { + if (err == -EAGAIN) + continue; + if (err < 0) + return err; + } else { + ret = wait_event_interruptible(ci->i_cap_wq, + try_get_cap_refs(ci, need, want, endoff, + true, &_got, &err)); + if (err == -EAGAIN) + continue; + if (err < 0) + ret = err; + if (ret < 0) + return ret; + } - /* getattr request will bring inline data into page cache */ - ret = __ceph_do_getattr(&ci->vfs_inode, NULL, - CEPH_STAT_CAP_INLINE_DATA, true); - if (ret < 0) - return ret; - goto retry; + if (ci->i_inline_version != CEPH_INLINE_NONE && + (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) && + i_size_read(&ci->vfs_inode) > 0) { + struct page *page = + find_get_page(ci->vfs_inode.i_mapping, 0); + if (page) { + if (PageUptodate(page)) { + *pinned_page = page; + break; + } + page_cache_release(page); + } + /* + * drop cap refs first because getattr while + * holding * caps refs can cause deadlock. + */ + ceph_put_cap_refs(ci, _got); + _got = 0; + + /* + * getattr request will bring inline data into + * page cache + */ + ret = __ceph_do_getattr(&ci->vfs_inode, NULL, + CEPH_STAT_CAP_INLINE_DATA, + true); + if (ret < 0) + return ret; + continue; + } + break; } -out: + *got = _got; return 0; } @@ -2286,10 +2549,31 @@ out: void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps) { spin_lock(&ci->i_ceph_lock); - __take_cap_refs(ci, caps); + __take_cap_refs(ci, caps, false); spin_unlock(&ci->i_ceph_lock); } + +/* + * drop cap_snap that is not associated with any snapshot. + * we don't need to send FLUSHSNAP message for it. + */ +static int ceph_try_drop_cap_snap(struct ceph_cap_snap *capsnap) +{ + if (!capsnap->need_flush && + !capsnap->writing && !capsnap->dirty_pages) { + + dout("dropping cap_snap %p follows %llu\n", + capsnap, capsnap->follows); + ceph_put_snap_context(capsnap->context); + list_del(&capsnap->ci_item); + list_del(&capsnap->flushing_item); + ceph_put_cap_snap(capsnap); + return 1; + } + return 0; +} + /* * Release cap refs. * @@ -2303,7 +2587,6 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had) { struct inode *inode = &ci->vfs_inode; int last = 0, put = 0, flushsnaps = 0, wake = 0; - struct ceph_cap_snap *capsnap; spin_lock(&ci->i_ceph_lock); if (had & CEPH_CAP_PIN) @@ -2325,17 +2608,24 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had) if (had & CEPH_CAP_FILE_WR) if (--ci->i_wr_ref == 0) { last++; - if (!list_empty(&ci->i_cap_snaps)) { - capsnap = list_first_entry(&ci->i_cap_snaps, - struct ceph_cap_snap, - ci_item); - if (capsnap->writing) { - capsnap->writing = 0; - flushsnaps = - __ceph_finish_cap_snap(ci, - capsnap); - wake = 1; - } + if (__ceph_have_pending_cap_snap(ci)) { + struct ceph_cap_snap *capsnap = + list_last_entry(&ci->i_cap_snaps, + struct ceph_cap_snap, + ci_item); + capsnap->writing = 0; + if (ceph_try_drop_cap_snap(capsnap)) + put++; + else if (__ceph_finish_cap_snap(ci, capsnap)) + flushsnaps = 1; + wake = 1; + } + if (ci->i_wrbuffer_ref_head == 0 && + ci->i_dirty_caps == 0 && + ci->i_flushing_caps == 0) { + BUG_ON(!ci->i_head_snapc); + ceph_put_snap_context(ci->i_head_snapc); + ci->i_head_snapc = NULL; } /* see comment in __ceph_remove_cap() */ if (!__ceph_is_any_caps(ci) && ci->i_snap_realm) @@ -2352,7 +2642,7 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had) ceph_flush_snaps(ci); if (wake) wake_up_all(&ci->i_cap_wq); - if (put) + while (put-- > 0) iput(inode); } @@ -2380,7 +2670,9 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, if (ci->i_head_snapc == snapc) { ci->i_wrbuffer_ref_head -= nr; if (ci->i_wrbuffer_ref_head == 0 && - ci->i_dirty_caps == 0 && ci->i_flushing_caps == 0) { + ci->i_wr_ref == 0 && + ci->i_dirty_caps == 0 && + ci->i_flushing_caps == 0) { BUG_ON(!ci->i_head_snapc); ceph_put_snap_context(ci->i_head_snapc); ci->i_head_snapc = NULL; @@ -2401,25 +2693,15 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, capsnap->dirty_pages -= nr; if (capsnap->dirty_pages == 0) { complete_capsnap = 1; - if (capsnap->dirty == 0) - /* cap writeback completed before we created - * the cap_snap; no FLUSHSNAP is needed */ - drop_capsnap = 1; + drop_capsnap = ceph_try_drop_cap_snap(capsnap); } dout("put_wrbuffer_cap_refs on %p cap_snap %p " - " snap %lld %d/%d -> %d/%d %s%s%s\n", + " snap %lld %d/%d -> %d/%d %s%s\n", inode, capsnap, capsnap->context->seq, ci->i_wrbuffer_ref+nr, capsnap->dirty_pages + nr, ci->i_wrbuffer_ref, capsnap->dirty_pages, last ? " (wrbuffer last)" : "", - complete_capsnap ? " (complete capsnap)" : "", - drop_capsnap ? " (drop capsnap)" : ""); - if (drop_capsnap) { - ceph_put_snap_context(capsnap->context); - list_del(&capsnap->ci_item); - list_del(&capsnap->flushing_item); - ceph_put_cap_snap(capsnap); - } + complete_capsnap ? " (complete capsnap)" : ""); } spin_unlock(&ci->i_ceph_lock); @@ -2526,7 +2808,8 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc, * try to invalidate (once). (If there are dirty buffers, we * will invalidate _after_ writeback.) */ - if (((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) && + if (!S_ISDIR(inode->i_mode) && /* don't invalidate readdir cache */ + ((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) && (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 && !ci->i_wrbuffer_ref) { if (try_nonblocking_invalidate(inode)) { @@ -2732,16 +3015,29 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, { struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; + struct ceph_cap_flush *cf; + struct rb_node *n; + LIST_HEAD(to_remove); unsigned seq = le32_to_cpu(m->seq); int dirty = le32_to_cpu(m->dirty); int cleaned = 0; int drop = 0; - int i; - for (i = 0; i < CEPH_CAP_BITS; i++) - if ((dirty & (1 << i)) && - (u16)flush_tid == ci->i_cap_flush_tid[i]) - cleaned |= 1 << i; + n = rb_first(&ci->i_cap_flush_tree); + while (n) { + cf = rb_entry(n, struct ceph_cap_flush, i_node); + n = rb_next(&cf->i_node); + if (cf->tid == flush_tid) + cleaned = cf->caps; + if (cf->tid <= flush_tid) { + rb_erase(&cf->i_node, &ci->i_cap_flush_tree); + list_add_tail(&cf->list, &to_remove); + } else { + cleaned &= ~cf->caps; + if (!cleaned) + break; + } + } dout("handle_cap_flush_ack inode %p mds%d seq %d on %s cleaned %s," " flushing %s -> %s\n", @@ -2749,12 +3045,23 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, ceph_cap_string(cleaned), ceph_cap_string(ci->i_flushing_caps), ceph_cap_string(ci->i_flushing_caps & ~cleaned)); - if (ci->i_flushing_caps == (ci->i_flushing_caps & ~cleaned)) + if (list_empty(&to_remove) && !cleaned) goto out; ci->i_flushing_caps &= ~cleaned; spin_lock(&mdsc->cap_dirty_lock); + + if (!list_empty(&to_remove)) { + list_for_each_entry(cf, &to_remove, list) + rb_erase(&cf->g_node, &mdsc->cap_flush_tree); + + n = rb_first(&mdsc->cap_flush_tree); + cf = n ? rb_entry(n, struct ceph_cap_flush, g_node) : NULL; + if (!cf || cf->tid > flush_tid) + wake_up_all(&mdsc->cap_flushing_wq); + } + if (ci->i_flushing_caps == 0) { list_del_init(&ci->i_flushing_item); if (!list_empty(&session->s_cap_flushing)) @@ -2764,14 +3071,14 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, struct ceph_inode_info, i_flushing_item)->vfs_inode); mdsc->num_cap_flushing--; - wake_up_all(&mdsc->cap_flushing_wq); dout(" inode %p now !flushing\n", inode); if (ci->i_dirty_caps == 0) { dout(" inode %p now clean\n", inode); BUG_ON(!list_empty(&ci->i_dirty_item)); drop = 1; - if (ci->i_wrbuffer_ref_head == 0) { + if (ci->i_wr_ref == 0 && + ci->i_wrbuffer_ref_head == 0) { BUG_ON(!ci->i_head_snapc); ceph_put_snap_context(ci->i_head_snapc); ci->i_head_snapc = NULL; @@ -2785,6 +3092,13 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, out: spin_unlock(&ci->i_ceph_lock); + + while (!list_empty(&to_remove)) { + cf = list_first_entry(&to_remove, + struct ceph_cap_flush, list); + list_del(&cf->list); + ceph_free_cap_flush(cf); + } if (drop) iput(inode); } @@ -2800,6 +3114,7 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid, struct ceph_mds_session *session) { struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; u64 follows = le64_to_cpu(m->snap_follows); struct ceph_cap_snap *capsnap; int drop = 0; @@ -2823,6 +3138,7 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid, list_del(&capsnap->ci_item); list_del(&capsnap->flushing_item); ceph_put_cap_snap(capsnap); + wake_up_all(&mdsc->cap_flushing_wq); drop = 1; break; } else { @@ -2971,7 +3287,6 @@ retry: mutex_lock_nested(&session->s_mutex, SINGLE_DEPTH_NESTING); } - ceph_add_cap_releases(mdsc, tsession); new_cap = ceph_get_cap(mdsc, NULL); } else { WARN_ON(1); @@ -3167,16 +3482,20 @@ void ceph_handle_caps(struct ceph_mds_session *session, dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq, (unsigned)seq); - if (op == CEPH_CAP_OP_IMPORT) - ceph_add_cap_releases(mdsc, session); - if (!inode) { dout(" i don't have ino %llx\n", vino.ino); if (op == CEPH_CAP_OP_IMPORT) { + cap = ceph_get_cap(mdsc, NULL); + cap->cap_ino = vino.ino; + cap->queue_release = 1; + cap->cap_id = cap_id; + cap->mseq = mseq; + cap->seq = seq; spin_lock(&session->s_cap_lock); - __queue_cap_release(session, vino.ino, cap_id, - mseq, seq); + list_add_tail(&cap->session_caps, + &session->s_cap_releases); + session->s_num_cap_releases++; spin_unlock(&session->s_cap_lock); } goto flush_cap_releases; @@ -3252,11 +3571,10 @@ void ceph_handle_caps(struct ceph_mds_session *session, flush_cap_releases: /* - * send any full release message to try to move things + * send any cap release message to try to move things * along for the mds (who clearly thinks we still have this * cap). */ - ceph_add_cap_releases(mdsc, session); ceph_send_cap_releases(mdsc, session); done: diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index 4248307fea90..9314b4ea2375 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -38,7 +38,7 @@ int ceph_init_dentry(struct dentry *dentry) if (dentry->d_fsdata) return 0; - di = kmem_cache_alloc(ceph_dentry_cachep, GFP_NOFS | __GFP_ZERO); + di = kmem_cache_alloc(ceph_dentry_cachep, GFP_KERNEL | __GFP_ZERO); if (!di) return -ENOMEM; /* oh well */ @@ -107,6 +107,27 @@ static int fpos_cmp(loff_t l, loff_t r) } /* + * make note of the last dentry we read, so we can + * continue at the same lexicographical point, + * regardless of what dir changes take place on the + * server. + */ +static int note_last_dentry(struct ceph_file_info *fi, const char *name, + int len, unsigned next_offset) +{ + char *buf = kmalloc(len+1, GFP_KERNEL); + if (!buf) + return -ENOMEM; + kfree(fi->last_name); + fi->last_name = buf; + memcpy(fi->last_name, name, len); + fi->last_name[len] = 0; + fi->next_offset = next_offset; + dout("note_last_dentry '%s'\n", fi->last_name); + return 0; +} + +/* * When possible, we try to satisfy a readdir by peeking at the * dcache. We make this work by carefully ordering dentries on * d_child when we initially get results back from the MDS, and @@ -123,123 +144,113 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx, struct ceph_file_info *fi = file->private_data; struct dentry *parent = file->f_path.dentry; struct inode *dir = d_inode(parent); - struct list_head *p; - struct dentry *dentry, *last; + struct dentry *dentry, *last = NULL; struct ceph_dentry_info *di; + unsigned nsize = PAGE_CACHE_SIZE / sizeof(struct dentry *); int err = 0; + loff_t ptr_pos = 0; + struct ceph_readdir_cache_control cache_ctl = {}; - /* claim ref on last dentry we returned */ - last = fi->dentry; - fi->dentry = NULL; - - dout("__dcache_readdir %p v%u at %llu (last %p)\n", - dir, shared_gen, ctx->pos, last); + dout("__dcache_readdir %p v%u at %llu\n", dir, shared_gen, ctx->pos); - spin_lock(&parent->d_lock); - - /* start at beginning? */ - if (ctx->pos == 2 || last == NULL || - fpos_cmp(ctx->pos, ceph_dentry(last)->offset) < 0) { - if (list_empty(&parent->d_subdirs)) - goto out_unlock; - p = parent->d_subdirs.prev; - dout(" initial p %p/%p\n", p->prev, p->next); - } else { - p = last->d_child.prev; + /* we can calculate cache index for the first dirfrag */ + if (ceph_frag_is_leftmost(fpos_frag(ctx->pos))) { + cache_ctl.index = fpos_off(ctx->pos) - 2; + BUG_ON(cache_ctl.index < 0); + ptr_pos = cache_ctl.index * sizeof(struct dentry *); } -more: - dentry = list_entry(p, struct dentry, d_child); - di = ceph_dentry(dentry); - while (1) { - dout(" p %p/%p %s d_subdirs %p/%p\n", p->prev, p->next, - d_unhashed(dentry) ? "!hashed" : "hashed", - parent->d_subdirs.prev, parent->d_subdirs.next); - if (p == &parent->d_subdirs) { + while (true) { + pgoff_t pgoff; + bool emit_dentry; + + if (ptr_pos >= i_size_read(dir)) { fi->flags |= CEPH_F_ATEND; - goto out_unlock; + err = 0; + break; + } + + err = -EAGAIN; + pgoff = ptr_pos >> PAGE_CACHE_SHIFT; + if (!cache_ctl.page || pgoff != page_index(cache_ctl.page)) { + ceph_readdir_cache_release(&cache_ctl); + cache_ctl.page = find_lock_page(&dir->i_data, pgoff); + if (!cache_ctl.page) { + dout(" page %lu not found\n", pgoff); + break; + } + /* reading/filling the cache are serialized by + * i_mutex, no need to use page lock */ + unlock_page(cache_ctl.page); + cache_ctl.dentries = kmap(cache_ctl.page); } - spin_lock_nested(&dentry->d_lock, DENTRY_D_LOCK_NESTED); + + rcu_read_lock(); + spin_lock(&parent->d_lock); + /* check i_size again here, because empty directory can be + * marked as complete while not holding the i_mutex. */ + if (ceph_dir_is_complete_ordered(dir) && + ptr_pos < i_size_read(dir)) + dentry = cache_ctl.dentries[cache_ctl.index % nsize]; + else + dentry = NULL; + spin_unlock(&parent->d_lock); + if (dentry && !lockref_get_not_dead(&dentry->d_lockref)) + dentry = NULL; + rcu_read_unlock(); + if (!dentry) + break; + + emit_dentry = false; + di = ceph_dentry(dentry); + spin_lock(&dentry->d_lock); if (di->lease_shared_gen == shared_gen && - !d_unhashed(dentry) && d_really_is_positive(dentry) && + d_really_is_positive(dentry) && ceph_snap(d_inode(dentry)) != CEPH_SNAPDIR && ceph_ino(d_inode(dentry)) != CEPH_INO_CEPH && - fpos_cmp(ctx->pos, di->offset) <= 0) - break; - dout(" skipping %p %pd at %llu (%llu)%s%s\n", dentry, - dentry, di->offset, - ctx->pos, d_unhashed(dentry) ? " unhashed" : "", - !d_inode(dentry) ? " null" : ""); + fpos_cmp(ctx->pos, di->offset) <= 0) { + emit_dentry = true; + } spin_unlock(&dentry->d_lock); - p = p->prev; - dentry = list_entry(p, struct dentry, d_child); - di = ceph_dentry(dentry); - } - - dget_dlock(dentry); - spin_unlock(&dentry->d_lock); - spin_unlock(&parent->d_lock); - /* make sure a dentry wasn't dropped while we didn't have parent lock */ - if (!ceph_dir_is_complete_ordered(dir)) { - dout(" lost dir complete on %p; falling back to mds\n", dir); - dput(dentry); - err = -EAGAIN; - goto out; - } + if (emit_dentry) { + dout(" %llu (%llu) dentry %p %pd %p\n", di->offset, ctx->pos, + dentry, dentry, d_inode(dentry)); + ctx->pos = di->offset; + if (!dir_emit(ctx, dentry->d_name.name, + dentry->d_name.len, + ceph_translate_ino(dentry->d_sb, + d_inode(dentry)->i_ino), + d_inode(dentry)->i_mode >> 12)) { + dput(dentry); + err = 0; + break; + } + ctx->pos++; - dout(" %llu (%llu) dentry %p %pd %p\n", di->offset, ctx->pos, - dentry, dentry, d_inode(dentry)); - if (!dir_emit(ctx, dentry->d_name.name, - dentry->d_name.len, - ceph_translate_ino(dentry->d_sb, d_inode(dentry)->i_ino), - d_inode(dentry)->i_mode >> 12)) { - if (last) { - /* remember our position */ - fi->dentry = last; - fi->next_offset = fpos_off(di->offset); + if (last) + dput(last); + last = dentry; + } else { + dput(dentry); } - dput(dentry); - return 0; - } - - ctx->pos = di->offset + 1; - if (last) - dput(last); - last = dentry; - - spin_lock(&parent->d_lock); - p = p->prev; /* advance to next dentry */ - goto more; - -out_unlock: - spin_unlock(&parent->d_lock); -out: - if (last) + cache_ctl.index++; + ptr_pos += sizeof(struct dentry *); + } + ceph_readdir_cache_release(&cache_ctl); + if (last) { + int ret; + di = ceph_dentry(last); + ret = note_last_dentry(fi, last->d_name.name, last->d_name.len, + fpos_off(di->offset) + 1); + if (ret < 0) + err = ret; dput(last); + } return err; } -/* - * make note of the last dentry we read, so we can - * continue at the same lexicographical point, - * regardless of what dir changes take place on the - * server. - */ -static int note_last_dentry(struct ceph_file_info *fi, const char *name, - int len) -{ - kfree(fi->last_name); - fi->last_name = kmalloc(len+1, GFP_NOFS); - if (!fi->last_name) - return -ENOMEM; - memcpy(fi->last_name, name, len); - fi->last_name[len] = 0; - dout("note_last_dentry '%s'\n", fi->last_name); - return 0; -} - static int ceph_readdir(struct file *file, struct dir_context *ctx) { struct ceph_file_info *fi = file->private_data; @@ -280,8 +291,7 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) /* can we use the dcache? */ spin_lock(&ci->i_ceph_lock); - if ((ctx->pos == 2 || fi->dentry) && - ceph_test_mount_opt(fsc, DCACHE) && + if (ceph_test_mount_opt(fsc, DCACHE) && !ceph_test_mount_opt(fsc, NOASYNCREADDIR) && ceph_snap(inode) != CEPH_SNAPDIR && __ceph_dir_is_complete_ordered(ci) && @@ -296,24 +306,8 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) } else { spin_unlock(&ci->i_ceph_lock); } - if (fi->dentry) { - err = note_last_dentry(fi, fi->dentry->d_name.name, - fi->dentry->d_name.len); - if (err) - return err; - dput(fi->dentry); - fi->dentry = NULL; - } /* proceed with a normal readdir */ - - if (ctx->pos == 2) { - /* note dir version at start of readdir so we can tell - * if any dentries get dropped */ - fi->dir_release_count = atomic_read(&ci->i_release_count); - fi->dir_ordered_count = ci->i_ordered_count; - } - more: /* do we have the correct frag content buffered? */ if (fi->frag != frag || fi->last_readdir == NULL) { @@ -342,12 +336,15 @@ more: req->r_direct_hash = ceph_frag_value(frag); req->r_direct_is_hash = true; if (fi->last_name) { - req->r_path2 = kstrdup(fi->last_name, GFP_NOFS); + req->r_path2 = kstrdup(fi->last_name, GFP_KERNEL); if (!req->r_path2) { ceph_mdsc_put_request(req); return -ENOMEM; } } + req->r_dir_release_cnt = fi->dir_release_count; + req->r_dir_ordered_cnt = fi->dir_ordered_count; + req->r_readdir_cache_idx = fi->readdir_cache_idx; req->r_readdir_offset = fi->next_offset; req->r_args.readdir.frag = cpu_to_le32(frag); @@ -364,26 +361,38 @@ more: (int)req->r_reply_info.dir_end, (int)req->r_reply_info.dir_complete); - if (!req->r_did_prepopulate) { - dout("readdir !did_prepopulate"); - /* preclude from marking dir complete */ - fi->dir_release_count--; - } /* note next offset and last dentry name */ rinfo = &req->r_reply_info; if (le32_to_cpu(rinfo->dir_dir->frag) != frag) { frag = le32_to_cpu(rinfo->dir_dir->frag); - if (ceph_frag_is_leftmost(frag)) - fi->next_offset = 2; - else - fi->next_offset = 0; - off = fi->next_offset; + off = req->r_readdir_offset; + fi->next_offset = off; } + fi->frag = frag; fi->offset = fi->next_offset; fi->last_readdir = req; + if (req->r_did_prepopulate) { + fi->readdir_cache_idx = req->r_readdir_cache_idx; + if (fi->readdir_cache_idx < 0) { + /* preclude from marking dir ordered */ + fi->dir_ordered_count = 0; + } else if (ceph_frag_is_leftmost(frag) && off == 2) { + /* note dir version at start of readdir so + * we can tell if any dentries get dropped */ + fi->dir_release_count = req->r_dir_release_cnt; + fi->dir_ordered_count = req->r_dir_ordered_cnt; + } + } else { + dout("readdir !did_prepopulate"); + /* disable readdir cache */ + fi->readdir_cache_idx = -1; + /* preclude from marking dir complete */ + fi->dir_release_count = 0; + } + if (req->r_reply_info.dir_end) { kfree(fi->last_name); fi->last_name = NULL; @@ -394,10 +403,10 @@ more: } else { err = note_last_dentry(fi, rinfo->dir_dname[rinfo->dir_nr-1], - rinfo->dir_dname_len[rinfo->dir_nr-1]); + rinfo->dir_dname_len[rinfo->dir_nr-1], + fi->next_offset + rinfo->dir_nr); if (err) return err; - fi->next_offset += rinfo->dir_nr; } } @@ -453,16 +462,22 @@ more: * were released during the whole readdir, and we should have * the complete dir contents in our cache. */ - spin_lock(&ci->i_ceph_lock); - if (atomic_read(&ci->i_release_count) == fi->dir_release_count) { - if (ci->i_ordered_count == fi->dir_ordered_count) + if (atomic64_read(&ci->i_release_count) == fi->dir_release_count) { + spin_lock(&ci->i_ceph_lock); + if (fi->dir_ordered_count == atomic64_read(&ci->i_ordered_count)) { dout(" marking %p complete and ordered\n", inode); - else + /* use i_size to track number of entries in + * readdir cache */ + BUG_ON(fi->readdir_cache_idx < 0); + i_size_write(inode, fi->readdir_cache_idx * + sizeof(struct dentry*)); + } else { dout(" marking %p complete\n", inode); + } __ceph_dir_set_complete(ci, fi->dir_release_count, fi->dir_ordered_count); + spin_unlock(&ci->i_ceph_lock); } - spin_unlock(&ci->i_ceph_lock); dout("readdir %p file %p done.\n", inode, file); return 0; @@ -476,14 +491,12 @@ static void reset_readdir(struct ceph_file_info *fi, unsigned frag) } kfree(fi->last_name); fi->last_name = NULL; + fi->dir_release_count = 0; + fi->readdir_cache_idx = -1; if (ceph_frag_is_leftmost(frag)) fi->next_offset = 2; /* compensate for . and .. */ else fi->next_offset = 0; - if (fi->dentry) { - dput(fi->dentry); - fi->dentry = NULL; - } fi->flags &= ~CEPH_F_ATEND; } @@ -497,13 +510,12 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence) mutex_lock(&inode->i_mutex); retval = -EINVAL; switch (whence) { - case SEEK_END: - offset += inode->i_size + 2; /* FIXME */ - break; case SEEK_CUR: offset += file->f_pos; case SEEK_SET: break; + case SEEK_END: + retval = -EOPNOTSUPP; default: goto out; } @@ -516,20 +528,18 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence) } retval = offset; - /* - * discard buffered readdir content on seekdir(0), or - * seek to new frag, or seek prior to current chunk. - */ if (offset == 0 || fpos_frag(offset) != fi->frag || fpos_off(offset) < fi->offset) { + /* discard buffered readdir content on seekdir(0), or + * seek to new frag, or seek prior to current chunk */ dout("dir_llseek dropping %p content\n", file); reset_readdir(fi, fpos_frag(offset)); + } else if (fpos_cmp(offset, old_offset) > 0) { + /* reset dir_release_count if we did a forward seek */ + fi->dir_release_count = 0; + fi->readdir_cache_idx = -1; } - - /* bump dir_release_count if we did a forward seek */ - if (fpos_cmp(offset, old_offset) > 0) - fi->dir_release_count--; } out: mutex_unlock(&inode->i_mutex); @@ -764,7 +774,7 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry, err = PTR_ERR(req); goto out; } - req->r_path2 = kstrdup(dest, GFP_NOFS); + req->r_path2 = kstrdup(dest, GFP_KERNEL); if (!req->r_path2) { err = -ENOMEM; ceph_mdsc_put_request(req); @@ -985,16 +995,15 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry, * to do it here. */ + /* d_move screws up sibling dentries' offsets */ + ceph_dir_clear_complete(old_dir); + ceph_dir_clear_complete(new_dir); + d_move(old_dentry, new_dentry); /* ensure target dentry is invalidated, despite rehashing bug in vfs_rename_dir */ ceph_invalidate_dentry_lease(new_dentry); - - /* d_move screws up sibling dentries' offsets */ - ceph_dir_clear_complete(old_dir); - ceph_dir_clear_complete(new_dir); - } ceph_mdsc_put_request(req); return err; @@ -1189,7 +1198,7 @@ static ssize_t ceph_read_dir(struct file *file, char __user *buf, size_t size, return -EISDIR; if (!cf->dir_info) { - cf->dir_info = kmalloc(bufsize, GFP_NOFS); + cf->dir_info = kmalloc(bufsize, GFP_KERNEL); if (!cf->dir_info) return -ENOMEM; cf->dir_info_len = @@ -1224,66 +1233,6 @@ static ssize_t ceph_read_dir(struct file *file, char __user *buf, size_t size, } /* - * an fsync() on a dir will wait for any uncommitted directory - * operations to commit. - */ -static int ceph_dir_fsync(struct file *file, loff_t start, loff_t end, - int datasync) -{ - struct inode *inode = file_inode(file); - struct ceph_inode_info *ci = ceph_inode(inode); - struct list_head *head = &ci->i_unsafe_dirops; - struct ceph_mds_request *req; - u64 last_tid; - int ret = 0; - - dout("dir_fsync %p\n", inode); - ret = filemap_write_and_wait_range(inode->i_mapping, start, end); - if (ret) - return ret; - mutex_lock(&inode->i_mutex); - - spin_lock(&ci->i_unsafe_lock); - if (list_empty(head)) - goto out; - - req = list_entry(head->prev, - struct ceph_mds_request, r_unsafe_dir_item); - last_tid = req->r_tid; - - do { - ceph_mdsc_get_request(req); - spin_unlock(&ci->i_unsafe_lock); - - dout("dir_fsync %p wait on tid %llu (until %llu)\n", - inode, req->r_tid, last_tid); - if (req->r_timeout) { - unsigned long time_left = wait_for_completion_timeout( - &req->r_safe_completion, - req->r_timeout); - if (time_left > 0) - ret = 0; - else - ret = -EIO; /* timed out */ - } else { - wait_for_completion(&req->r_safe_completion); - } - ceph_mdsc_put_request(req); - - spin_lock(&ci->i_unsafe_lock); - if (ret || list_empty(head)) - break; - req = list_entry(head->next, - struct ceph_mds_request, r_unsafe_dir_item); - } while (req->r_tid < last_tid); -out: - spin_unlock(&ci->i_unsafe_lock); - mutex_unlock(&inode->i_mutex); - - return ret; -} - -/* * We maintain a private dentry LRU. * * FIXME: this needs to be changed to a per-mds lru to be useful. @@ -1353,7 +1302,7 @@ const struct file_operations ceph_dir_fops = { .open = ceph_open, .release = ceph_release, .unlocked_ioctl = ceph_ioctl, - .fsync = ceph_dir_fsync, + .fsync = ceph_fsync, }; const struct file_operations ceph_snapdir_fops = { diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 3b6b522b4b31..faf92095e105 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -89,13 +89,14 @@ static int ceph_init_file(struct inode *inode, struct file *file, int fmode) case S_IFDIR: dout("init_file %p %p 0%o (regular)\n", inode, file, inode->i_mode); - cf = kmem_cache_alloc(ceph_file_cachep, GFP_NOFS | __GFP_ZERO); + cf = kmem_cache_alloc(ceph_file_cachep, GFP_KERNEL | __GFP_ZERO); if (cf == NULL) { ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */ return -ENOMEM; } cf->fmode = fmode; cf->next_offset = 2; + cf->readdir_cache_idx = -1; file->private_data = cf; BUG_ON(inode->i_fop->release != ceph_release); break; @@ -324,7 +325,6 @@ int ceph_release(struct inode *inode, struct file *file) ceph_mdsc_put_request(cf->last_readdir); kfree(cf->last_name); kfree(cf->dir_info); - dput(cf->dentry); kmem_cache_free(ceph_file_cachep, cf); /* wake up anyone waiting for caps on this inode */ @@ -483,7 +483,7 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i, } } else { num_pages = calc_pages_for(off, len); - pages = ceph_alloc_page_vector(num_pages, GFP_NOFS); + pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); if (IS_ERR(pages)) return PTR_ERR(pages); ret = striped_read(inode, off, len, pages, @@ -557,13 +557,13 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe) * objects, rollback on failure, etc.) */ static ssize_t -ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos) +ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, + struct ceph_snap_context *snapc) { struct file *file = iocb->ki_filp; struct inode *inode = file_inode(file); struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_fs_client *fsc = ceph_inode_to_client(inode); - struct ceph_snap_context *snapc; struct ceph_vino vino; struct ceph_osd_request *req; struct page **pages; @@ -600,7 +600,6 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos) size_t start; ssize_t n; - snapc = ci->i_snap_realm->cached_context; vino = ceph_vino(inode); req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, vino, pos, &len, 0, @@ -614,7 +613,7 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos) break; } - osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC); + osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0); n = iov_iter_get_pages_alloc(from, &pages, len, &start); if (unlikely(n < 0)) { @@ -674,13 +673,13 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos) * objects, rollback on failure, etc.) */ static ssize_t -ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos) +ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, + struct ceph_snap_context *snapc) { struct file *file = iocb->ki_filp; struct inode *inode = file_inode(file); struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_fs_client *fsc = ceph_inode_to_client(inode); - struct ceph_snap_context *snapc; struct ceph_vino vino; struct ceph_osd_request *req; struct page **pages; @@ -717,7 +716,6 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos) size_t left; int n; - snapc = ci->i_snap_realm->cached_context; vino = ceph_vino(inode); req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, vino, pos, &len, 0, 1, @@ -736,7 +734,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos) */ num_pages = (len + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT; - pages = ceph_alloc_page_vector(num_pages, GFP_NOFS); + pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); if (IS_ERR(pages)) { ret = PTR_ERR(pages); goto out; @@ -860,7 +858,7 @@ again: struct page *page = NULL; loff_t i_size; if (retry_op == READ_INLINE) { - page = __page_cache_alloc(GFP_NOFS); + page = __page_cache_alloc(GFP_KERNEL); if (!page) return -ENOMEM; } @@ -941,6 +939,7 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from) struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_osd_client *osdc = &ceph_sb_to_client(inode->i_sb)->client->osdc; + struct ceph_cap_flush *prealloc_cf; ssize_t count, written = 0; int err, want, got; loff_t pos; @@ -948,6 +947,10 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from) if (ceph_snap(inode) != CEPH_NOSNAP) return -EROFS; + prealloc_cf = ceph_alloc_cap_flush(); + if (!prealloc_cf) + return -ENOMEM; + mutex_lock(&inode->i_mutex); /* We can write back this queue in page reclaim */ @@ -996,14 +999,30 @@ retry_snap: if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 || (iocb->ki_flags & IOCB_DIRECT) || (fi->flags & CEPH_F_SYNC)) { + struct ceph_snap_context *snapc; struct iov_iter data; mutex_unlock(&inode->i_mutex); + + spin_lock(&ci->i_ceph_lock); + if (__ceph_have_pending_cap_snap(ci)) { + struct ceph_cap_snap *capsnap = + list_last_entry(&ci->i_cap_snaps, + struct ceph_cap_snap, + ci_item); + snapc = ceph_get_snap_context(capsnap->context); + } else { + BUG_ON(!ci->i_head_snapc); + snapc = ceph_get_snap_context(ci->i_head_snapc); + } + spin_unlock(&ci->i_ceph_lock); + /* we might need to revert back to that point */ data = *from; if (iocb->ki_flags & IOCB_DIRECT) - written = ceph_sync_direct_write(iocb, &data, pos); + written = ceph_sync_direct_write(iocb, &data, pos, + snapc); else - written = ceph_sync_write(iocb, &data, pos); + written = ceph_sync_write(iocb, &data, pos, snapc); if (written == -EOLDSNAPC) { dout("aio_write %p %llx.%llx %llu~%u" "got EOLDSNAPC, retrying\n", @@ -1014,6 +1033,7 @@ retry_snap: } if (written > 0) iov_iter_advance(from, written); + ceph_put_snap_context(snapc); } else { loff_t old_size = inode->i_size; /* @@ -1035,7 +1055,8 @@ retry_snap: int dirty; spin_lock(&ci->i_ceph_lock); ci->i_inline_version = CEPH_INLINE_NONE; - dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); + dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, + &prealloc_cf); spin_unlock(&ci->i_ceph_lock); if (dirty) __mark_inode_dirty(inode, dirty); @@ -1059,6 +1080,7 @@ retry_snap: out: mutex_unlock(&inode->i_mutex); out_unlocked: + ceph_free_cap_flush(prealloc_cf); current->backing_dev_info = NULL; return written ? written : err; } @@ -1255,6 +1277,7 @@ static long ceph_fallocate(struct file *file, int mode, struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_osd_client *osdc = &ceph_inode_to_client(inode)->client->osdc; + struct ceph_cap_flush *prealloc_cf; int want, got = 0; int dirty; int ret = 0; @@ -1267,6 +1290,10 @@ static long ceph_fallocate(struct file *file, int mode, if (!S_ISREG(inode->i_mode)) return -EOPNOTSUPP; + prealloc_cf = ceph_alloc_cap_flush(); + if (!prealloc_cf) + return -ENOMEM; + mutex_lock(&inode->i_mutex); if (ceph_snap(inode) != CEPH_NOSNAP) { @@ -1313,7 +1340,8 @@ static long ceph_fallocate(struct file *file, int mode, if (!ret) { spin_lock(&ci->i_ceph_lock); ci->i_inline_version = CEPH_INLINE_NONE; - dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); + dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, + &prealloc_cf); spin_unlock(&ci->i_ceph_lock); if (dirty) __mark_inode_dirty(inode, dirty); @@ -1322,6 +1350,7 @@ static long ceph_fallocate(struct file *file, int mode, ceph_put_cap_refs(ci, got); unlock: mutex_unlock(&inode->i_mutex); + ceph_free_cap_flush(prealloc_cf); return ret; } diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 571acd88606c..96d2bd829902 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -389,9 +389,10 @@ struct inode *ceph_alloc_inode(struct super_block *sb) ci->i_inline_version = 0; ci->i_time_warp_seq = 0; ci->i_ceph_flags = 0; - ci->i_ordered_count = 0; - atomic_set(&ci->i_release_count, 1); - atomic_set(&ci->i_complete_count, 0); + atomic64_set(&ci->i_ordered_count, 1); + atomic64_set(&ci->i_release_count, 1); + atomic64_set(&ci->i_complete_seq[0], 0); + atomic64_set(&ci->i_complete_seq[1], 0); ci->i_symlink = NULL; memset(&ci->i_dir_layout, 0, sizeof(ci->i_dir_layout)); @@ -415,9 +416,8 @@ struct inode *ceph_alloc_inode(struct super_block *sb) ci->i_flushing_caps = 0; INIT_LIST_HEAD(&ci->i_dirty_item); INIT_LIST_HEAD(&ci->i_flushing_item); - ci->i_cap_flush_seq = 0; - ci->i_cap_flush_last_tid = 0; - memset(&ci->i_cap_flush_tid, 0, sizeof(ci->i_cap_flush_tid)); + ci->i_prealloc_cap_flush = NULL; + ci->i_cap_flush_tree = RB_ROOT; init_waitqueue_head(&ci->i_cap_wq); ci->i_hold_caps_min = 0; ci->i_hold_caps_max = 0; @@ -752,7 +752,10 @@ static int fill_inode(struct inode *inode, struct page *locked_page, if (new_version || (new_issued & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR))) { + if (ci->i_layout.fl_pg_pool != info->layout.fl_pg_pool) + ci->i_ceph_flags &= ~CEPH_I_POOL_PERM; ci->i_layout = info->layout; + queue_trunc = ceph_fill_file_size(inode, issued, le32_to_cpu(info->truncate_seq), le64_to_cpu(info->truncate_size), @@ -858,9 +861,10 @@ static int fill_inode(struct inode *inode, struct page *locked_page, (issued & CEPH_CAP_FILE_EXCL) == 0 && !__ceph_dir_is_complete(ci)) { dout(" marking %p complete (empty)\n", inode); + i_size_write(inode, 0); __ceph_dir_set_complete(ci, - atomic_read(&ci->i_release_count), - ci->i_ordered_count); + atomic64_read(&ci->i_release_count), + atomic64_read(&ci->i_ordered_count)); } wake = true; @@ -1212,6 +1216,10 @@ retry_lookup: dout("fill_trace doing d_move %p -> %p\n", req->r_old_dentry, dn); + /* d_move screws up sibling dentries' offsets */ + ceph_dir_clear_ordered(dir); + ceph_dir_clear_ordered(olddir); + d_move(req->r_old_dentry, dn); dout(" src %p '%pd' dst %p '%pd'\n", req->r_old_dentry, @@ -1222,10 +1230,6 @@ retry_lookup: rehashing bug in vfs_rename_dir */ ceph_invalidate_dentry_lease(dn); - /* d_move screws up sibling dentries' offsets */ - ceph_dir_clear_ordered(dir); - ceph_dir_clear_ordered(olddir); - dout("dn %p gets new offset %lld\n", req->r_old_dentry, ceph_dentry(req->r_old_dentry)->offset); @@ -1333,6 +1337,49 @@ static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req, return err; } +void ceph_readdir_cache_release(struct ceph_readdir_cache_control *ctl) +{ + if (ctl->page) { + kunmap(ctl->page); + page_cache_release(ctl->page); + ctl->page = NULL; + } +} + +static int fill_readdir_cache(struct inode *dir, struct dentry *dn, + struct ceph_readdir_cache_control *ctl, + struct ceph_mds_request *req) +{ + struct ceph_inode_info *ci = ceph_inode(dir); + unsigned nsize = PAGE_CACHE_SIZE / sizeof(struct dentry*); + unsigned idx = ctl->index % nsize; + pgoff_t pgoff = ctl->index / nsize; + + if (!ctl->page || pgoff != page_index(ctl->page)) { + ceph_readdir_cache_release(ctl); + ctl->page = grab_cache_page(&dir->i_data, pgoff); + if (!ctl->page) { + ctl->index = -1; + return -ENOMEM; + } + /* reading/filling the cache are serialized by + * i_mutex, no need to use page lock */ + unlock_page(ctl->page); + ctl->dentries = kmap(ctl->page); + } + + if (req->r_dir_release_cnt == atomic64_read(&ci->i_release_count) && + req->r_dir_ordered_cnt == atomic64_read(&ci->i_ordered_count)) { + dout("readdir cache dn %p idx %d\n", dn, ctl->index); + ctl->dentries[idx] = dn; + ctl->index++; + } else { + dout("disable readdir cache\n"); + ctl->index = -1; + } + return 0; +} + int ceph_readdir_prepopulate(struct ceph_mds_request *req, struct ceph_mds_session *session) { @@ -1345,8 +1392,11 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req, struct inode *snapdir = NULL; struct ceph_mds_request_head *rhead = req->r_request->front.iov_base; struct ceph_dentry_info *di; - u64 r_readdir_offset = req->r_readdir_offset; u32 frag = le32_to_cpu(rhead->args.readdir.frag); + struct ceph_readdir_cache_control cache_ctl = {}; + + if (req->r_aborted) + return readdir_prepopulate_inodes_only(req, session); if (rinfo->dir_dir && le32_to_cpu(rinfo->dir_dir->frag) != frag) { @@ -1354,14 +1404,11 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req, frag, le32_to_cpu(rinfo->dir_dir->frag)); frag = le32_to_cpu(rinfo->dir_dir->frag); if (ceph_frag_is_leftmost(frag)) - r_readdir_offset = 2; + req->r_readdir_offset = 2; else - r_readdir_offset = 0; + req->r_readdir_offset = 0; } - if (req->r_aborted) - return readdir_prepopulate_inodes_only(req, session); - if (le32_to_cpu(rinfo->head->op) == CEPH_MDS_OP_LSSNAP) { snapdir = ceph_get_snapdir(d_inode(parent)); parent = d_find_alias(snapdir); @@ -1374,6 +1421,17 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req, ceph_fill_dirfrag(d_inode(parent), rinfo->dir_dir); } + if (ceph_frag_is_leftmost(frag) && req->r_readdir_offset == 2) { + /* note dir version at start of readdir so we can tell + * if any dentries get dropped */ + struct ceph_inode_info *ci = ceph_inode(d_inode(parent)); + req->r_dir_release_cnt = atomic64_read(&ci->i_release_count); + req->r_dir_ordered_cnt = atomic64_read(&ci->i_ordered_count); + req->r_readdir_cache_idx = 0; + } + + cache_ctl.index = req->r_readdir_cache_idx; + /* FIXME: release caps/leases if error occurs */ for (i = 0; i < rinfo->dir_nr; i++) { struct ceph_vino vino; @@ -1413,13 +1471,6 @@ retry_lookup: d_delete(dn); dput(dn); goto retry_lookup; - } else { - /* reorder parent's d_subdirs */ - spin_lock(&parent->d_lock); - spin_lock_nested(&dn->d_lock, DENTRY_D_LOCK_NESTED); - list_move(&dn->d_child, &parent->d_subdirs); - spin_unlock(&dn->d_lock); - spin_unlock(&parent->d_lock); } /* inode */ @@ -1436,13 +1487,15 @@ retry_lookup: } } - if (fill_inode(in, NULL, &rinfo->dir_in[i], NULL, session, - req->r_request_started, -1, - &req->r_caps_reservation) < 0) { + ret = fill_inode(in, NULL, &rinfo->dir_in[i], NULL, session, + req->r_request_started, -1, + &req->r_caps_reservation); + if (ret < 0) { pr_err("fill_inode badness on %p\n", in); if (d_really_is_negative(dn)) iput(in); d_drop(dn); + err = ret; goto next_item; } @@ -1458,19 +1511,28 @@ retry_lookup: } di = dn->d_fsdata; - di->offset = ceph_make_fpos(frag, i + r_readdir_offset); + di->offset = ceph_make_fpos(frag, i + req->r_readdir_offset); update_dentry_lease(dn, rinfo->dir_dlease[i], req->r_session, req->r_request_started); + + if (err == 0 && cache_ctl.index >= 0) { + ret = fill_readdir_cache(d_inode(parent), dn, + &cache_ctl, req); + if (ret < 0) + err = ret; + } next_item: if (dn) dput(dn); } - if (err == 0) - req->r_did_prepopulate = true; - out: + if (err == 0) { + req->r_did_prepopulate = true; + req->r_readdir_cache_idx = cache_ctl.index; + } + ceph_readdir_cache_release(&cache_ctl); if (snapdir) { iput(snapdir); dput(parent); @@ -1712,11 +1774,13 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) const unsigned int ia_valid = attr->ia_valid; struct ceph_mds_request *req; struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc; + struct ceph_cap_flush *prealloc_cf; int issued; int release = 0, dirtied = 0; int mask = 0; int err = 0; int inode_dirty_flags = 0; + bool lock_snap_rwsem = false; if (ceph_snap(inode) != CEPH_NOSNAP) return -EROFS; @@ -1725,13 +1789,31 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) if (err != 0) return err; + prealloc_cf = ceph_alloc_cap_flush(); + if (!prealloc_cf) + return -ENOMEM; + req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETATTR, USE_AUTH_MDS); - if (IS_ERR(req)) + if (IS_ERR(req)) { + ceph_free_cap_flush(prealloc_cf); return PTR_ERR(req); + } spin_lock(&ci->i_ceph_lock); issued = __ceph_caps_issued(ci, NULL); + + if (!ci->i_head_snapc && + (issued & (CEPH_CAP_ANY_EXCL | CEPH_CAP_FILE_WR))) { + lock_snap_rwsem = true; + if (!down_read_trylock(&mdsc->snap_rwsem)) { + spin_unlock(&ci->i_ceph_lock); + down_read(&mdsc->snap_rwsem); + spin_lock(&ci->i_ceph_lock); + issued = __ceph_caps_issued(ci, NULL); + } + } + dout("setattr %p issued %s\n", inode, ceph_cap_string(issued)); if (ia_valid & ATTR_UID) { @@ -1874,12 +1956,15 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) dout("setattr %p ATTR_FILE ... hrm!\n", inode); if (dirtied) { - inode_dirty_flags = __ceph_mark_dirty_caps(ci, dirtied); + inode_dirty_flags = __ceph_mark_dirty_caps(ci, dirtied, + &prealloc_cf); inode->i_ctime = CURRENT_TIME; } release &= issued; spin_unlock(&ci->i_ceph_lock); + if (lock_snap_rwsem) + up_read(&mdsc->snap_rwsem); if (inode_dirty_flags) __mark_inode_dirty(inode, inode_dirty_flags); @@ -1904,9 +1989,11 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) ceph_mdsc_put_request(req); if (mask & CEPH_SETATTR_SIZE) __ceph_do_pending_vmtruncate(inode); + ceph_free_cap_flush(prealloc_cf); return err; out_put: ceph_mdsc_put_request(req); + ceph_free_cap_flush(prealloc_cf); return err; } diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 84f37f34f9aa..6aa07af67603 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -8,6 +8,7 @@ #include <linux/debugfs.h> #include <linux/seq_file.h> #include <linux/utsname.h> +#include <linux/ratelimit.h> #include "super.h" #include "mds_client.h" @@ -458,7 +459,6 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, s->s_cap_reconnect = 0; s->s_cap_iterator = NULL; INIT_LIST_HEAD(&s->s_cap_releases); - INIT_LIST_HEAD(&s->s_cap_releases_done); INIT_LIST_HEAD(&s->s_cap_flushing); INIT_LIST_HEAD(&s->s_cap_snaps_flushing); @@ -629,6 +629,9 @@ static void __register_request(struct ceph_mds_client *mdsc, req->r_uid = current_fsuid(); req->r_gid = current_fsgid(); + if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK) + mdsc->oldest_tid = req->r_tid; + if (dir) { struct ceph_inode_info *ci = ceph_inode(dir); @@ -644,6 +647,21 @@ static void __unregister_request(struct ceph_mds_client *mdsc, struct ceph_mds_request *req) { dout("__unregister_request %p tid %lld\n", req, req->r_tid); + + if (req->r_tid == mdsc->oldest_tid) { + struct rb_node *p = rb_next(&req->r_node); + mdsc->oldest_tid = 0; + while (p) { + struct ceph_mds_request *next_req = + rb_entry(p, struct ceph_mds_request, r_node); + if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) { + mdsc->oldest_tid = next_req->r_tid; + break; + } + p = rb_next(p); + } + } + rb_erase(&req->r_node, &mdsc->request_tree); RB_CLEAR_NODE(&req->r_node); @@ -998,27 +1016,25 @@ void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, * session caps */ -/* - * Free preallocated cap messages assigned to this session - */ -static void cleanup_cap_releases(struct ceph_mds_session *session) +/* caller holds s_cap_lock, we drop it */ +static void cleanup_cap_releases(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session) + __releases(session->s_cap_lock) { - struct ceph_msg *msg; + LIST_HEAD(tmp_list); + list_splice_init(&session->s_cap_releases, &tmp_list); + session->s_num_cap_releases = 0; + spin_unlock(&session->s_cap_lock); - spin_lock(&session->s_cap_lock); - while (!list_empty(&session->s_cap_releases)) { - msg = list_first_entry(&session->s_cap_releases, - struct ceph_msg, list_head); - list_del_init(&msg->list_head); - ceph_msg_put(msg); - } - while (!list_empty(&session->s_cap_releases_done)) { - msg = list_first_entry(&session->s_cap_releases_done, - struct ceph_msg, list_head); - list_del_init(&msg->list_head); - ceph_msg_put(msg); + dout("cleanup_cap_releases mds%d\n", session->s_mds); + while (!list_empty(&tmp_list)) { + struct ceph_cap *cap; + /* zero out the in-progress message */ + cap = list_first_entry(&tmp_list, + struct ceph_cap, session_caps); + list_del(&cap->session_caps); + ceph_put_cap(mdsc, cap); } - spin_unlock(&session->s_cap_lock); } static void cleanup_session_requests(struct ceph_mds_client *mdsc, @@ -1033,7 +1049,8 @@ static void cleanup_session_requests(struct ceph_mds_client *mdsc, req = list_first_entry(&session->s_unsafe, struct ceph_mds_request, r_unsafe_item); list_del_init(&req->r_unsafe_item); - pr_info(" dropping unsafe request %llu\n", req->r_tid); + pr_warn_ratelimited(" dropping unsafe request %llu\n", + req->r_tid); __unregister_request(mdsc, req); } /* zero r_attempts, so kick_requests() will re-send requests */ @@ -1095,10 +1112,16 @@ static int iterate_session_caps(struct ceph_mds_session *session, dout("iterate_session_caps finishing cap %p removal\n", cap); BUG_ON(cap->session != session); + cap->session = NULL; list_del_init(&cap->session_caps); session->s_nr_caps--; - cap->session = NULL; - old_cap = cap; /* put_cap it w/o locks held */ + if (cap->queue_release) { + list_add_tail(&cap->session_caps, + &session->s_cap_releases); + session->s_num_cap_releases++; + } else { + old_cap = cap; /* put_cap it w/o locks held */ + } } if (ret < 0) goto out; @@ -1119,6 +1142,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) { struct ceph_inode_info *ci = ceph_inode(inode); + LIST_HEAD(to_remove); int drop = 0; dout("removing cap %p, ci is %p, inode is %p\n", @@ -1126,12 +1150,27 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, spin_lock(&ci->i_ceph_lock); __ceph_remove_cap(cap, false); if (!ci->i_auth_cap) { + struct ceph_cap_flush *cf; struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; + while (true) { + struct rb_node *n = rb_first(&ci->i_cap_flush_tree); + if (!n) + break; + cf = rb_entry(n, struct ceph_cap_flush, i_node); + rb_erase(&cf->i_node, &ci->i_cap_flush_tree); + list_add(&cf->list, &to_remove); + } + spin_lock(&mdsc->cap_dirty_lock); + + list_for_each_entry(cf, &to_remove, list) + rb_erase(&cf->g_node, &mdsc->cap_flush_tree); + if (!list_empty(&ci->i_dirty_item)) { - pr_info(" dropping dirty %s state for %p %lld\n", + pr_warn_ratelimited( + " dropping dirty %s state for %p %lld\n", ceph_cap_string(ci->i_dirty_caps), inode, ceph_ino(inode)); ci->i_dirty_caps = 0; @@ -1139,7 +1178,8 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, drop = 1; } if (!list_empty(&ci->i_flushing_item)) { - pr_info(" dropping dirty+flushing %s state for %p %lld\n", + pr_warn_ratelimited( + " dropping dirty+flushing %s state for %p %lld\n", ceph_cap_string(ci->i_flushing_caps), inode, ceph_ino(inode)); ci->i_flushing_caps = 0; @@ -1148,8 +1188,20 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, drop = 1; } spin_unlock(&mdsc->cap_dirty_lock); + + if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) { + list_add(&ci->i_prealloc_cap_flush->list, &to_remove); + ci->i_prealloc_cap_flush = NULL; + } } spin_unlock(&ci->i_ceph_lock); + while (!list_empty(&to_remove)) { + struct ceph_cap_flush *cf; + cf = list_first_entry(&to_remove, + struct ceph_cap_flush, list); + list_del(&cf->list); + ceph_free_cap_flush(cf); + } while (drop--) iput(inode); return 0; @@ -1191,11 +1243,12 @@ static void remove_session_caps(struct ceph_mds_session *session) spin_lock(&session->s_cap_lock); } } - spin_unlock(&session->s_cap_lock); + + // drop cap expires and unlock s_cap_lock + cleanup_cap_releases(session->s_mdsc, session); BUG_ON(session->s_nr_caps > 0); BUG_ON(!list_empty(&session->s_cap_flushing)); - cleanup_cap_releases(session); } /* @@ -1371,7 +1424,8 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued), ceph_cap_string(used), ceph_cap_string(wanted)); if (cap == ci->i_auth_cap) { - if (ci->i_dirty_caps | ci->i_flushing_caps) + if (ci->i_dirty_caps || ci->i_flushing_caps || + !list_empty(&ci->i_cap_snaps)) goto out; if ((used | wanted) & CEPH_CAP_ANY_WR) goto out; @@ -1417,121 +1471,80 @@ static int trim_caps(struct ceph_mds_client *mdsc, session->s_trim_caps = 0; } - ceph_add_cap_releases(mdsc, session); ceph_send_cap_releases(mdsc, session); return 0; } -/* - * Allocate cap_release messages. If there is a partially full message - * in the queue, try to allocate enough to cover it's remainder, so that - * we can send it immediately. - * - * Called under s_mutex. - */ -int ceph_add_cap_releases(struct ceph_mds_client *mdsc, - struct ceph_mds_session *session) +static int check_capsnap_flush(struct ceph_inode_info *ci, + u64 want_snap_seq) { - struct ceph_msg *msg, *partial = NULL; - struct ceph_mds_cap_release *head; - int err = -ENOMEM; - int extra = mdsc->fsc->mount_options->cap_release_safety; - int num; - - dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds, - extra); - - spin_lock(&session->s_cap_lock); - - if (!list_empty(&session->s_cap_releases)) { - msg = list_first_entry(&session->s_cap_releases, - struct ceph_msg, - list_head); - head = msg->front.iov_base; - num = le32_to_cpu(head->num); - if (num) { - dout(" partial %p with (%d/%d)\n", msg, num, - (int)CEPH_CAPS_PER_RELEASE); - extra += CEPH_CAPS_PER_RELEASE - num; - partial = msg; - } - } - while (session->s_num_cap_releases < session->s_nr_caps + extra) { - spin_unlock(&session->s_cap_lock); - msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE, - GFP_NOFS, false); - if (!msg) - goto out_unlocked; - dout("add_cap_releases %p msg %p now %d\n", session, msg, - (int)msg->front.iov_len); - head = msg->front.iov_base; - head->num = cpu_to_le32(0); - msg->front.iov_len = sizeof(*head); - spin_lock(&session->s_cap_lock); - list_add(&msg->list_head, &session->s_cap_releases); - session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE; - } - - if (partial) { - head = partial->front.iov_base; - num = le32_to_cpu(head->num); - dout(" queueing partial %p with %d/%d\n", partial, num, - (int)CEPH_CAPS_PER_RELEASE); - list_move_tail(&partial->list_head, - &session->s_cap_releases_done); - session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num; + int ret = 1; + spin_lock(&ci->i_ceph_lock); + if (want_snap_seq > 0 && !list_empty(&ci->i_cap_snaps)) { + struct ceph_cap_snap *capsnap = + list_first_entry(&ci->i_cap_snaps, + struct ceph_cap_snap, ci_item); + ret = capsnap->follows >= want_snap_seq; } - err = 0; - spin_unlock(&session->s_cap_lock); -out_unlocked: - return err; + spin_unlock(&ci->i_ceph_lock); + return ret; } -static int check_cap_flush(struct inode *inode, u64 want_flush_seq) +static int check_caps_flush(struct ceph_mds_client *mdsc, + u64 want_flush_tid) { - struct ceph_inode_info *ci = ceph_inode(inode); - int ret; - spin_lock(&ci->i_ceph_lock); - if (ci->i_flushing_caps) - ret = ci->i_cap_flush_seq >= want_flush_seq; - else - ret = 1; - spin_unlock(&ci->i_ceph_lock); + struct rb_node *n; + struct ceph_cap_flush *cf; + int ret = 1; + + spin_lock(&mdsc->cap_dirty_lock); + n = rb_first(&mdsc->cap_flush_tree); + cf = n ? rb_entry(n, struct ceph_cap_flush, g_node) : NULL; + if (cf && cf->tid <= want_flush_tid) { + dout("check_caps_flush still flushing tid %llu <= %llu\n", + cf->tid, want_flush_tid); + ret = 0; + } + spin_unlock(&mdsc->cap_dirty_lock); return ret; } /* * flush all dirty inode data to disk. * - * returns true if we've flushed through want_flush_seq + * returns true if we've flushed through want_flush_tid */ -static void wait_caps_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq) +static void wait_caps_flush(struct ceph_mds_client *mdsc, + u64 want_flush_tid, u64 want_snap_seq) { int mds; - dout("check_cap_flush want %lld\n", want_flush_seq); + dout("check_caps_flush want %llu snap want %llu\n", + want_flush_tid, want_snap_seq); mutex_lock(&mdsc->mutex); - for (mds = 0; mds < mdsc->max_sessions; mds++) { + for (mds = 0; mds < mdsc->max_sessions; ) { struct ceph_mds_session *session = mdsc->sessions[mds]; struct inode *inode = NULL; - if (!session) + if (!session) { + mds++; continue; + } get_session(session); mutex_unlock(&mdsc->mutex); mutex_lock(&session->s_mutex); - if (!list_empty(&session->s_cap_flushing)) { - struct ceph_inode_info *ci = - list_entry(session->s_cap_flushing.next, - struct ceph_inode_info, - i_flushing_item); - - if (!check_cap_flush(&ci->vfs_inode, want_flush_seq)) { - dout("check_cap_flush still flushing %p " - "seq %lld <= %lld to mds%d\n", - &ci->vfs_inode, ci->i_cap_flush_seq, - want_flush_seq, session->s_mds); + if (!list_empty(&session->s_cap_snaps_flushing)) { + struct ceph_cap_snap *capsnap = + list_first_entry(&session->s_cap_snaps_flushing, + struct ceph_cap_snap, + flushing_item); + struct ceph_inode_info *ci = capsnap->ci; + if (!check_capsnap_flush(ci, want_snap_seq)) { + dout("check_cap_flush still flushing snap %p " + "follows %lld <= %lld to mds%d\n", + &ci->vfs_inode, capsnap->follows, + want_snap_seq, mds); inode = igrab(&ci->vfs_inode); } } @@ -1540,15 +1553,21 @@ static void wait_caps_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq) if (inode) { wait_event(mdsc->cap_flushing_wq, - check_cap_flush(inode, want_flush_seq)); + check_capsnap_flush(ceph_inode(inode), + want_snap_seq)); iput(inode); + } else { + mds++; } mutex_lock(&mdsc->mutex); } - mutex_unlock(&mdsc->mutex); - dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq); + + wait_event(mdsc->cap_flushing_wq, + check_caps_flush(mdsc, want_flush_tid)); + + dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid); } /* @@ -1557,60 +1576,74 @@ static void wait_caps_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq) void ceph_send_cap_releases(struct ceph_mds_client *mdsc, struct ceph_mds_session *session) { - struct ceph_msg *msg; + struct ceph_msg *msg = NULL; + struct ceph_mds_cap_release *head; + struct ceph_mds_cap_item *item; + struct ceph_cap *cap; + LIST_HEAD(tmp_list); + int num_cap_releases; - dout("send_cap_releases mds%d\n", session->s_mds); spin_lock(&session->s_cap_lock); - while (!list_empty(&session->s_cap_releases_done)) { - msg = list_first_entry(&session->s_cap_releases_done, - struct ceph_msg, list_head); - list_del_init(&msg->list_head); - spin_unlock(&session->s_cap_lock); - msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); - dout("send_cap_releases mds%d %p\n", session->s_mds, msg); - ceph_con_send(&session->s_con, msg); - spin_lock(&session->s_cap_lock); - } +again: + list_splice_init(&session->s_cap_releases, &tmp_list); + num_cap_releases = session->s_num_cap_releases; + session->s_num_cap_releases = 0; spin_unlock(&session->s_cap_lock); -} -static void discard_cap_releases(struct ceph_mds_client *mdsc, - struct ceph_mds_session *session) -{ - struct ceph_msg *msg; - struct ceph_mds_cap_release *head; - unsigned num; - - dout("discard_cap_releases mds%d\n", session->s_mds); + while (!list_empty(&tmp_list)) { + if (!msg) { + msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, + PAGE_CACHE_SIZE, GFP_NOFS, false); + if (!msg) + goto out_err; + head = msg->front.iov_base; + head->num = cpu_to_le32(0); + msg->front.iov_len = sizeof(*head); + } + cap = list_first_entry(&tmp_list, struct ceph_cap, + session_caps); + list_del(&cap->session_caps); + num_cap_releases--; - if (!list_empty(&session->s_cap_releases)) { - /* zero out the in-progress message */ - msg = list_first_entry(&session->s_cap_releases, - struct ceph_msg, list_head); head = msg->front.iov_base; - num = le32_to_cpu(head->num); - dout("discard_cap_releases mds%d %p %u\n", - session->s_mds, msg, num); - head->num = cpu_to_le32(0); - msg->front.iov_len = sizeof(*head); - session->s_num_cap_releases += num; + le32_add_cpu(&head->num, 1); + item = msg->front.iov_base + msg->front.iov_len; + item->ino = cpu_to_le64(cap->cap_ino); + item->cap_id = cpu_to_le64(cap->cap_id); + item->migrate_seq = cpu_to_le32(cap->mseq); + item->seq = cpu_to_le32(cap->issue_seq); + msg->front.iov_len += sizeof(*item); + + ceph_put_cap(mdsc, cap); + + if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { + msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); + dout("send_cap_releases mds%d %p\n", session->s_mds, msg); + ceph_con_send(&session->s_con, msg); + msg = NULL; + } } - /* requeue completed messages */ - while (!list_empty(&session->s_cap_releases_done)) { - msg = list_first_entry(&session->s_cap_releases_done, - struct ceph_msg, list_head); - list_del_init(&msg->list_head); + BUG_ON(num_cap_releases != 0); - head = msg->front.iov_base; - num = le32_to_cpu(head->num); - dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, - num); - session->s_num_cap_releases += num; - head->num = cpu_to_le32(0); - msg->front.iov_len = sizeof(*head); - list_add(&msg->list_head, &session->s_cap_releases); + spin_lock(&session->s_cap_lock); + if (!list_empty(&session->s_cap_releases)) + goto again; + spin_unlock(&session->s_cap_lock); + + if (msg) { + msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); + dout("send_cap_releases mds%d %p\n", session->s_mds, msg); + ceph_con_send(&session->s_con, msg); } + return; +out_err: + pr_err("send_cap_releases mds%d, failed to allocate message\n", + session->s_mds); + spin_lock(&session->s_cap_lock); + list_splice(&tmp_list, &session->s_cap_releases); + session->s_num_cap_releases += num_cap_releases; + spin_unlock(&session->s_cap_lock); } /* @@ -1635,7 +1668,8 @@ int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req, order = get_order(size * num_entries); while (order >= 0) { - rinfo->dir_in = (void*)__get_free_pages(GFP_NOFS | __GFP_NOWARN, + rinfo->dir_in = (void*)__get_free_pages(GFP_KERNEL | + __GFP_NOWARN, order); if (rinfo->dir_in) break; @@ -1697,13 +1731,9 @@ static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc) struct ceph_mds_request, r_node); } -static u64 __get_oldest_tid(struct ceph_mds_client *mdsc) +static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc) { - struct ceph_mds_request *req = __get_oldest_req(mdsc); - - if (req) - return req->r_tid; - return 0; + return mdsc->oldest_tid; } /* @@ -2267,15 +2297,18 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, /* wait */ mutex_unlock(&mdsc->mutex); dout("do_request waiting\n"); - if (req->r_timeout) { - err = (long)wait_for_completion_killable_timeout( - &req->r_completion, req->r_timeout); - if (err == 0) - err = -EIO; - } else if (req->r_wait_for_completion) { + if (!req->r_timeout && req->r_wait_for_completion) { err = req->r_wait_for_completion(mdsc, req); } else { - err = wait_for_completion_killable(&req->r_completion); + long timeleft = wait_for_completion_killable_timeout( + &req->r_completion, + ceph_timeout_jiffies(req->r_timeout)); + if (timeleft > 0) + err = 0; + else if (!timeleft) + err = -EIO; /* timed out */ + else + err = timeleft; /* killed */ } dout("do_request waited, got %d\n", err); mutex_lock(&mdsc->mutex); @@ -2496,7 +2529,6 @@ out_err: } mutex_unlock(&mdsc->mutex); - ceph_add_cap_releases(mdsc, req->r_session); mutex_unlock(&session->s_mutex); /* kick calling process */ @@ -2888,8 +2920,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, */ session->s_cap_reconnect = 1; /* drop old cap expires; we're about to reestablish that state */ - discard_cap_releases(mdsc, session); - spin_unlock(&session->s_cap_lock); + cleanup_cap_releases(mdsc, session); /* trim unused caps to reduce MDS's cache rejoin time */ if (mdsc->fsc->sb->s_root) @@ -2956,6 +2987,9 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, reply->hdr.data_len = cpu_to_le32(pagelist->length); ceph_msg_data_add_pagelist(reply, pagelist); + + ceph_early_kick_flushing_caps(mdsc, session); + ceph_con_send(&session->s_con, reply); mutex_unlock(&session->s_mutex); @@ -3352,7 +3386,6 @@ static void delayed_work(struct work_struct *work) send_renew_caps(mdsc, s); else ceph_con_keepalive(&s->s_con); - ceph_add_cap_releases(mdsc, s); if (s->s_state == CEPH_MDS_SESSION_OPEN || s->s_state == CEPH_MDS_SESSION_HUNG) ceph_send_cap_releases(mdsc, s); @@ -3390,11 +3423,13 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) atomic_set(&mdsc->num_sessions, 0); mdsc->max_sessions = 0; mdsc->stopping = 0; + mdsc->last_snap_seq = 0; init_rwsem(&mdsc->snap_rwsem); mdsc->snap_realms = RB_ROOT; INIT_LIST_HEAD(&mdsc->snap_empty); spin_lock_init(&mdsc->snap_empty_lock); mdsc->last_tid = 0; + mdsc->oldest_tid = 0; mdsc->request_tree = RB_ROOT; INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work); mdsc->last_renew_caps = jiffies; @@ -3402,7 +3437,8 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) spin_lock_init(&mdsc->cap_delay_lock); INIT_LIST_HEAD(&mdsc->snap_flush_list); spin_lock_init(&mdsc->snap_flush_lock); - mdsc->cap_flush_seq = 0; + mdsc->last_cap_flush_tid = 1; + mdsc->cap_flush_tree = RB_ROOT; INIT_LIST_HEAD(&mdsc->cap_dirty); INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); mdsc->num_cap_flushing = 0; @@ -3414,6 +3450,9 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) ceph_caps_init(mdsc); ceph_adjust_min_caps(mdsc, fsc->min_caps); + init_rwsem(&mdsc->pool_perm_rwsem); + mdsc->pool_perm_tree = RB_ROOT; + return 0; } @@ -3423,8 +3462,8 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) */ static void wait_requests(struct ceph_mds_client *mdsc) { + struct ceph_options *opts = mdsc->fsc->client->options; struct ceph_mds_request *req; - struct ceph_fs_client *fsc = mdsc->fsc; mutex_lock(&mdsc->mutex); if (__get_oldest_req(mdsc)) { @@ -3432,7 +3471,7 @@ static void wait_requests(struct ceph_mds_client *mdsc) dout("wait_requests waiting for requests\n"); wait_for_completion_timeout(&mdsc->safe_umount_waiters, - fsc->client->options->mount_timeout * HZ); + ceph_timeout_jiffies(opts->mount_timeout)); /* tear down remaining requests */ mutex_lock(&mdsc->mutex); @@ -3485,7 +3524,8 @@ restart: nextreq = rb_entry(n, struct ceph_mds_request, r_node); else nextreq = NULL; - if ((req->r_op & CEPH_MDS_OP_WRITE)) { + if (req->r_op != CEPH_MDS_OP_SETFILELOCK && + (req->r_op & CEPH_MDS_OP_WRITE)) { /* write op */ ceph_mdsc_get_request(req); if (nextreq) @@ -3513,7 +3553,7 @@ restart: void ceph_mdsc_sync(struct ceph_mds_client *mdsc) { - u64 want_tid, want_flush; + u64 want_tid, want_flush, want_snap; if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN) return; @@ -3525,13 +3565,18 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc) ceph_flush_dirty_caps(mdsc); spin_lock(&mdsc->cap_dirty_lock); - want_flush = mdsc->cap_flush_seq; + want_flush = mdsc->last_cap_flush_tid; spin_unlock(&mdsc->cap_dirty_lock); - dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush); + down_read(&mdsc->snap_rwsem); + want_snap = mdsc->last_snap_seq; + up_read(&mdsc->snap_rwsem); + + dout("sync want tid %lld flush_seq %lld snap_seq %lld\n", + want_tid, want_flush, want_snap); wait_unsafe_requests(mdsc, want_tid); - wait_caps_flush(mdsc, want_flush); + wait_caps_flush(mdsc, want_flush, want_snap); } /* @@ -3549,10 +3594,9 @@ static bool done_closing_sessions(struct ceph_mds_client *mdsc) */ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) { + struct ceph_options *opts = mdsc->fsc->client->options; struct ceph_mds_session *session; int i; - struct ceph_fs_client *fsc = mdsc->fsc; - unsigned long timeout = fsc->client->options->mount_timeout * HZ; dout("close_sessions\n"); @@ -3573,7 +3617,7 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) dout("waiting for sessions to close\n"); wait_event_timeout(mdsc->session_close_wq, done_closing_sessions(mdsc), - timeout); + ceph_timeout_jiffies(opts->mount_timeout)); /* tear down remaining sessions */ mutex_lock(&mdsc->mutex); @@ -3607,6 +3651,7 @@ static void ceph_mdsc_stop(struct ceph_mds_client *mdsc) ceph_mdsmap_destroy(mdsc->mdsmap); kfree(mdsc->sessions); ceph_caps_finalize(mdsc); + ceph_pool_perm_destroy(mdsc); } void ceph_mdsc_destroy(struct ceph_fs_client *fsc) diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 1875b5d985c6..762757e6cebf 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -139,7 +139,6 @@ struct ceph_mds_session { int s_cap_reconnect; int s_readonly; struct list_head s_cap_releases; /* waiting cap_release messages */ - struct list_head s_cap_releases_done; /* ready to send */ struct ceph_cap *s_cap_iterator; /* protected by mutex */ @@ -228,7 +227,7 @@ struct ceph_mds_request { int r_err; bool r_aborted; - unsigned long r_timeout; /* optional. jiffies */ + unsigned long r_timeout; /* optional. jiffies, 0 is "wait forever" */ unsigned long r_started; /* start time to measure timeout against */ unsigned long r_request_started; /* start time for mds request only, used to measure lease durations */ @@ -254,12 +253,21 @@ struct ceph_mds_request { bool r_got_unsafe, r_got_safe, r_got_result; bool r_did_prepopulate; + long long r_dir_release_cnt; + long long r_dir_ordered_cnt; + int r_readdir_cache_idx; u32 r_readdir_offset; struct ceph_cap_reservation r_caps_reservation; int r_num_caps; }; +struct ceph_pool_perm { + struct rb_node node; + u32 pool; + int perm; +}; + /* * mds client state */ @@ -284,12 +292,15 @@ struct ceph_mds_client { * references (implying they contain no inodes with caps) that * should be destroyed. */ + u64 last_snap_seq; struct rw_semaphore snap_rwsem; struct rb_root snap_realms; struct list_head snap_empty; spinlock_t snap_empty_lock; /* protect snap_empty */ u64 last_tid; /* most recent mds request */ + u64 oldest_tid; /* oldest incomplete mds request, + excluding setfilelock requests */ struct rb_root request_tree; /* pending mds requests */ struct delayed_work delayed_work; /* delayed work */ unsigned long last_renew_caps; /* last time we renewed our caps */ @@ -298,7 +309,8 @@ struct ceph_mds_client { struct list_head snap_flush_list; /* cap_snaps ready to flush */ spinlock_t snap_flush_lock; - u64 cap_flush_seq; + u64 last_cap_flush_tid; + struct rb_root cap_flush_tree; struct list_head cap_dirty; /* inodes with dirty caps */ struct list_head cap_dirty_migrating; /* ...that are migration... */ int num_cap_flushing; /* # caps we are flushing */ @@ -328,6 +340,9 @@ struct ceph_mds_client { spinlock_t dentry_lru_lock; struct list_head dentry_lru; int num_dentry; + + struct rw_semaphore pool_perm_rwsem; + struct rb_root pool_perm_tree; }; extern const char *ceph_mds_op_name(int op); @@ -379,8 +394,6 @@ static inline void ceph_mdsc_put_request(struct ceph_mds_request *req) kref_put(&req->r_kref, ceph_mdsc_release_request); } -extern int ceph_add_cap_releases(struct ceph_mds_client *mdsc, - struct ceph_mds_session *session); extern void ceph_send_cap_releases(struct ceph_mds_client *mdsc, struct ceph_mds_session *session); diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c index a97e39f09ba6..233d906aec02 100644 --- a/fs/ceph/snap.c +++ b/fs/ceph/snap.c @@ -296,7 +296,7 @@ static int cmpu64_rev(const void *a, const void *b) } -static struct ceph_snap_context *empty_snapc; +struct ceph_snap_context *ceph_empty_snapc; /* * build the snap context for a given realm. @@ -338,9 +338,9 @@ static int build_snap_context(struct ceph_snap_realm *realm) return 0; } - if (num == 0 && realm->seq == empty_snapc->seq) { - ceph_get_snap_context(empty_snapc); - snapc = empty_snapc; + if (num == 0 && realm->seq == ceph_empty_snapc->seq) { + ceph_get_snap_context(ceph_empty_snapc); + snapc = ceph_empty_snapc; goto done; } @@ -436,6 +436,14 @@ static int dup_array(u64 **dst, __le64 *src, u32 num) return 0; } +static bool has_new_snaps(struct ceph_snap_context *o, + struct ceph_snap_context *n) +{ + if (n->num_snaps == 0) + return false; + /* snaps are in descending order */ + return n->snaps[0] > o->seq; +} /* * When a snapshot is applied, the size/mtime inode metadata is queued @@ -455,6 +463,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci) { struct inode *inode = &ci->vfs_inode; struct ceph_cap_snap *capsnap; + struct ceph_snap_context *old_snapc, *new_snapc; int used, dirty; capsnap = kzalloc(sizeof(*capsnap), GFP_NOFS); @@ -467,6 +476,9 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci) used = __ceph_caps_used(ci); dirty = __ceph_caps_dirty(ci); + old_snapc = ci->i_head_snapc; + new_snapc = ci->i_snap_realm->cached_context; + /* * If there is a write in progress, treat that as a dirty Fw, * even though it hasn't completed yet; by the time we finish @@ -481,76 +493,95 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci) writes in progress now were started before the previous cap_snap. lucky us. */ dout("queue_cap_snap %p already pending\n", inode); - kfree(capsnap); - } else if (ci->i_snap_realm->cached_context == empty_snapc) { - dout("queue_cap_snap %p empty snapc\n", inode); - kfree(capsnap); - } else if (dirty & (CEPH_CAP_AUTH_EXCL|CEPH_CAP_XATTR_EXCL| - CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) { - struct ceph_snap_context *snapc = ci->i_head_snapc; - - /* - * if we are a sync write, we may need to go to the snaprealm - * to get the current snapc. - */ - if (!snapc) - snapc = ci->i_snap_realm->cached_context; + goto update_snapc; + } + if (ci->i_wrbuffer_ref_head == 0 && + !(dirty & (CEPH_CAP_ANY_EXCL|CEPH_CAP_FILE_WR))) { + dout("queue_cap_snap %p nothing dirty|writing\n", inode); + goto update_snapc; + } - dout("queue_cap_snap %p cap_snap %p queuing under %p %s\n", - inode, capsnap, snapc, ceph_cap_string(dirty)); - ihold(inode); + BUG_ON(!old_snapc); - atomic_set(&capsnap->nref, 1); - capsnap->ci = ci; - INIT_LIST_HEAD(&capsnap->ci_item); - INIT_LIST_HEAD(&capsnap->flushing_item); - - capsnap->follows = snapc->seq; - capsnap->issued = __ceph_caps_issued(ci, NULL); - capsnap->dirty = dirty; - - capsnap->mode = inode->i_mode; - capsnap->uid = inode->i_uid; - capsnap->gid = inode->i_gid; - - if (dirty & CEPH_CAP_XATTR_EXCL) { - __ceph_build_xattrs_blob(ci); - capsnap->xattr_blob = - ceph_buffer_get(ci->i_xattrs.blob); - capsnap->xattr_version = ci->i_xattrs.version; - } else { - capsnap->xattr_blob = NULL; - capsnap->xattr_version = 0; + /* + * There is no need to send FLUSHSNAP message to MDS if there is + * no new snapshot. But when there is dirty pages or on-going + * writes, we still need to create cap_snap. cap_snap is needed + * by the write path and page writeback path. + * + * also see ceph_try_drop_cap_snap() + */ + if (has_new_snaps(old_snapc, new_snapc)) { + if (dirty & (CEPH_CAP_ANY_EXCL|CEPH_CAP_FILE_WR)) + capsnap->need_flush = true; + } else { + if (!(used & CEPH_CAP_FILE_WR) && + ci->i_wrbuffer_ref_head == 0) { + dout("queue_cap_snap %p " + "no new_snap|dirty_page|writing\n", inode); + goto update_snapc; } + } - capsnap->inline_data = ci->i_inline_version != CEPH_INLINE_NONE; - - /* dirty page count moved from _head to this cap_snap; - all subsequent writes page dirties occur _after_ this - snapshot. */ - capsnap->dirty_pages = ci->i_wrbuffer_ref_head; - ci->i_wrbuffer_ref_head = 0; - capsnap->context = snapc; - ci->i_head_snapc = - ceph_get_snap_context(ci->i_snap_realm->cached_context); - dout(" new snapc is %p\n", ci->i_head_snapc); - list_add_tail(&capsnap->ci_item, &ci->i_cap_snaps); - - if (used & CEPH_CAP_FILE_WR) { - dout("queue_cap_snap %p cap_snap %p snapc %p" - " seq %llu used WR, now pending\n", inode, - capsnap, snapc, snapc->seq); - capsnap->writing = 1; - } else { - /* note mtime, size NOW. */ - __ceph_finish_cap_snap(ci, capsnap); - } + dout("queue_cap_snap %p cap_snap %p queuing under %p %s %s\n", + inode, capsnap, old_snapc, ceph_cap_string(dirty), + capsnap->need_flush ? "" : "no_flush"); + ihold(inode); + + atomic_set(&capsnap->nref, 1); + capsnap->ci = ci; + INIT_LIST_HEAD(&capsnap->ci_item); + INIT_LIST_HEAD(&capsnap->flushing_item); + + capsnap->follows = old_snapc->seq; + capsnap->issued = __ceph_caps_issued(ci, NULL); + capsnap->dirty = dirty; + + capsnap->mode = inode->i_mode; + capsnap->uid = inode->i_uid; + capsnap->gid = inode->i_gid; + + if (dirty & CEPH_CAP_XATTR_EXCL) { + __ceph_build_xattrs_blob(ci); + capsnap->xattr_blob = + ceph_buffer_get(ci->i_xattrs.blob); + capsnap->xattr_version = ci->i_xattrs.version; } else { - dout("queue_cap_snap %p nothing dirty|writing\n", inode); - kfree(capsnap); + capsnap->xattr_blob = NULL; + capsnap->xattr_version = 0; } + capsnap->inline_data = ci->i_inline_version != CEPH_INLINE_NONE; + + /* dirty page count moved from _head to this cap_snap; + all subsequent writes page dirties occur _after_ this + snapshot. */ + capsnap->dirty_pages = ci->i_wrbuffer_ref_head; + ci->i_wrbuffer_ref_head = 0; + capsnap->context = old_snapc; + list_add_tail(&capsnap->ci_item, &ci->i_cap_snaps); + old_snapc = NULL; + + if (used & CEPH_CAP_FILE_WR) { + dout("queue_cap_snap %p cap_snap %p snapc %p" + " seq %llu used WR, now pending\n", inode, + capsnap, old_snapc, old_snapc->seq); + capsnap->writing = 1; + } else { + /* note mtime, size NOW. */ + __ceph_finish_cap_snap(ci, capsnap); + } + capsnap = NULL; + +update_snapc: + if (ci->i_head_snapc) { + ci->i_head_snapc = ceph_get_snap_context(new_snapc); + dout(" new snapc is %p\n", new_snapc); + } spin_unlock(&ci->i_ceph_lock); + + kfree(capsnap); + ceph_put_snap_context(old_snapc); } /* @@ -699,6 +730,8 @@ more: /* queue realm for cap_snap creation */ list_add(&realm->dirty_item, &dirty_realms); + if (realm->seq > mdsc->last_snap_seq) + mdsc->last_snap_seq = realm->seq; invalidate = 1; } else if (!realm->cached_context) { @@ -964,14 +997,14 @@ out: int __init ceph_snap_init(void) { - empty_snapc = ceph_create_snap_context(0, GFP_NOFS); - if (!empty_snapc) + ceph_empty_snapc = ceph_create_snap_context(0, GFP_NOFS); + if (!ceph_empty_snapc) return -ENOMEM; - empty_snapc->seq = 1; + ceph_empty_snapc->seq = 1; return 0; } void ceph_snap_exit(void) { - ceph_put_snap_context(empty_snapc); + ceph_put_snap_context(ceph_empty_snapc); } diff --git a/fs/ceph/super.c b/fs/ceph/super.c index 4e9905374078..d1c833c321b9 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -134,10 +134,12 @@ enum { Opt_noino32, Opt_fscache, Opt_nofscache, + Opt_poolperm, + Opt_nopoolperm, #ifdef CONFIG_CEPH_FS_POSIX_ACL Opt_acl, #endif - Opt_noacl + Opt_noacl, }; static match_table_t fsopt_tokens = { @@ -165,6 +167,8 @@ static match_table_t fsopt_tokens = { {Opt_noino32, "noino32"}, {Opt_fscache, "fsc"}, {Opt_nofscache, "nofsc"}, + {Opt_poolperm, "poolperm"}, + {Opt_nopoolperm, "nopoolperm"}, #ifdef CONFIG_CEPH_FS_POSIX_ACL {Opt_acl, "acl"}, #endif @@ -268,6 +272,13 @@ static int parse_fsopt_token(char *c, void *private) case Opt_nofscache: fsopt->flags &= ~CEPH_MOUNT_OPT_FSCACHE; break; + case Opt_poolperm: + fsopt->flags &= ~CEPH_MOUNT_OPT_NOPOOLPERM; + printk ("pool perm"); + break; + case Opt_nopoolperm: + fsopt->flags |= CEPH_MOUNT_OPT_NOPOOLPERM; + break; #ifdef CONFIG_CEPH_FS_POSIX_ACL case Opt_acl: fsopt->sb_flags |= MS_POSIXACL; @@ -436,6 +447,8 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root) seq_puts(m, ",nodcache"); if (fsopt->flags & CEPH_MOUNT_OPT_FSCACHE) seq_puts(m, ",fsc"); + if (fsopt->flags & CEPH_MOUNT_OPT_NOPOOLPERM) + seq_puts(m, ",nopoolperm"); #ifdef CONFIG_CEPH_FS_POSIX_ACL if (fsopt->sb_flags & MS_POSIXACL) @@ -609,6 +622,7 @@ static void destroy_fs_client(struct ceph_fs_client *fsc) */ struct kmem_cache *ceph_inode_cachep; struct kmem_cache *ceph_cap_cachep; +struct kmem_cache *ceph_cap_flush_cachep; struct kmem_cache *ceph_dentry_cachep; struct kmem_cache *ceph_file_cachep; @@ -634,6 +648,10 @@ static int __init init_caches(void) SLAB_RECLAIM_ACCOUNT|SLAB_MEM_SPREAD); if (ceph_cap_cachep == NULL) goto bad_cap; + ceph_cap_flush_cachep = KMEM_CACHE(ceph_cap_flush, + SLAB_RECLAIM_ACCOUNT|SLAB_MEM_SPREAD); + if (ceph_cap_flush_cachep == NULL) + goto bad_cap_flush; ceph_dentry_cachep = KMEM_CACHE(ceph_dentry_info, SLAB_RECLAIM_ACCOUNT|SLAB_MEM_SPREAD); @@ -652,6 +670,8 @@ static int __init init_caches(void) bad_file: kmem_cache_destroy(ceph_dentry_cachep); bad_dentry: + kmem_cache_destroy(ceph_cap_flush_cachep); +bad_cap_flush: kmem_cache_destroy(ceph_cap_cachep); bad_cap: kmem_cache_destroy(ceph_inode_cachep); @@ -668,6 +688,7 @@ static void destroy_caches(void) kmem_cache_destroy(ceph_inode_cachep); kmem_cache_destroy(ceph_cap_cachep); + kmem_cache_destroy(ceph_cap_flush_cachep); kmem_cache_destroy(ceph_dentry_cachep); kmem_cache_destroy(ceph_file_cachep); @@ -729,7 +750,7 @@ static struct dentry *open_root_dentry(struct ceph_fs_client *fsc, req->r_ino1.ino = CEPH_INO_ROOT; req->r_ino1.snap = CEPH_NOSNAP; req->r_started = started; - req->r_timeout = fsc->client->options->mount_timeout * HZ; + req->r_timeout = fsc->client->options->mount_timeout; req->r_args.getattr.mask = cpu_to_le32(CEPH_STAT_CAP_INODE); req->r_num_caps = 2; err = ceph_mdsc_do_request(mdsc, NULL, req); diff --git a/fs/ceph/super.h b/fs/ceph/super.h index fa20e1318939..860cc016e70d 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -35,6 +35,7 @@ #define CEPH_MOUNT_OPT_INO32 (1<<8) /* 32 bit inos */ #define CEPH_MOUNT_OPT_DCACHE (1<<9) /* use dcache for readdir etc */ #define CEPH_MOUNT_OPT_FSCACHE (1<<10) /* use fscache */ +#define CEPH_MOUNT_OPT_NOPOOLPERM (1<<11) /* no pool permission check */ #define CEPH_MOUNT_OPT_DEFAULT (CEPH_MOUNT_OPT_RBYTES | \ CEPH_MOUNT_OPT_DCACHE) @@ -121,11 +122,21 @@ struct ceph_cap { struct rb_node ci_node; /* per-ci cap tree */ struct ceph_mds_session *session; struct list_head session_caps; /* per-session caplist */ - int mds; u64 cap_id; /* unique cap id (mds provided) */ - int issued; /* latest, from the mds */ - int implemented; /* implemented superset of issued (for revocation) */ - int mds_wanted; + union { + /* in-use caps */ + struct { + int issued; /* latest, from the mds */ + int implemented; /* implemented superset of + issued (for revocation) */ + int mds, mds_wanted; + }; + /* caps to release */ + struct { + u64 cap_ino; + int queue_release; + }; + }; u32 seq, issue_seq, mseq; u32 cap_gen; /* active/stale cycle */ unsigned long last_used; @@ -163,6 +174,7 @@ struct ceph_cap_snap { int writing; /* a sync write is still in progress */ int dirty_pages; /* dirty pages awaiting writeback */ bool inline_data; + bool need_flush; }; static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap) @@ -174,6 +186,17 @@ static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap) } } +struct ceph_cap_flush { + u64 tid; + int caps; + bool kick; + struct rb_node g_node; // global + union { + struct rb_node i_node; // inode + struct list_head list; + }; +}; + /* * The frag tree describes how a directory is fragmented, potentially across * multiple metadata servers. It is also used to indicate points where @@ -259,9 +282,9 @@ struct ceph_inode_info { u32 i_time_warp_seq; unsigned i_ceph_flags; - int i_ordered_count; - atomic_t i_release_count; - atomic_t i_complete_count; + atomic64_t i_release_count; + atomic64_t i_ordered_count; + atomic64_t i_complete_seq[2]; struct ceph_dir_layout i_dir_layout; struct ceph_file_layout i_layout; @@ -283,11 +306,11 @@ struct ceph_inode_info { struct ceph_cap *i_auth_cap; /* authoritative cap, if any */ unsigned i_dirty_caps, i_flushing_caps; /* mask of dirtied fields */ struct list_head i_dirty_item, i_flushing_item; - u64 i_cap_flush_seq; /* we need to track cap writeback on a per-cap-bit basis, to allow * overlapping, pipelined cap flushes to the mds. we can probably * reduce the tid to 8 bits if we're concerned about inode size. */ - u16 i_cap_flush_last_tid, i_cap_flush_tid[CEPH_CAP_BITS]; + struct ceph_cap_flush *i_prealloc_cap_flush; + struct rb_root i_cap_flush_tree; wait_queue_head_t i_cap_wq; /* threads waiting on a capability */ unsigned long i_hold_caps_min; /* jiffies */ unsigned long i_hold_caps_max; /* jiffies */ @@ -438,36 +461,46 @@ static inline struct inode *ceph_find_inode(struct super_block *sb, /* * Ceph inode. */ -#define CEPH_I_DIR_ORDERED 1 /* dentries in dir are ordered */ -#define CEPH_I_NODELAY 4 /* do not delay cap release */ -#define CEPH_I_FLUSH 8 /* do not delay flush of dirty metadata */ -#define CEPH_I_NOFLUSH 16 /* do not flush dirty caps */ +#define CEPH_I_DIR_ORDERED (1 << 0) /* dentries in dir are ordered */ +#define CEPH_I_NODELAY (1 << 1) /* do not delay cap release */ +#define CEPH_I_FLUSH (1 << 2) /* do not delay flush of dirty metadata */ +#define CEPH_I_NOFLUSH (1 << 3) /* do not flush dirty caps */ +#define CEPH_I_POOL_PERM (1 << 4) /* pool rd/wr bits are valid */ +#define CEPH_I_POOL_RD (1 << 5) /* can read from pool */ +#define CEPH_I_POOL_WR (1 << 6) /* can write to pool */ + static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci, - int release_count, int ordered_count) + long long release_count, + long long ordered_count) { - atomic_set(&ci->i_complete_count, release_count); - if (ci->i_ordered_count == ordered_count) - ci->i_ceph_flags |= CEPH_I_DIR_ORDERED; - else - ci->i_ceph_flags &= ~CEPH_I_DIR_ORDERED; + smp_mb__before_atomic(); + atomic64_set(&ci->i_complete_seq[0], release_count); + atomic64_set(&ci->i_complete_seq[1], ordered_count); } static inline void __ceph_dir_clear_complete(struct ceph_inode_info *ci) { - atomic_inc(&ci->i_release_count); + atomic64_inc(&ci->i_release_count); +} + +static inline void __ceph_dir_clear_ordered(struct ceph_inode_info *ci) +{ + atomic64_inc(&ci->i_ordered_count); } static inline bool __ceph_dir_is_complete(struct ceph_inode_info *ci) { - return atomic_read(&ci->i_complete_count) == - atomic_read(&ci->i_release_count); + return atomic64_read(&ci->i_complete_seq[0]) == + atomic64_read(&ci->i_release_count); } static inline bool __ceph_dir_is_complete_ordered(struct ceph_inode_info *ci) { - return __ceph_dir_is_complete(ci) && - (ci->i_ceph_flags & CEPH_I_DIR_ORDERED); + return atomic64_read(&ci->i_complete_seq[0]) == + atomic64_read(&ci->i_release_count) && + atomic64_read(&ci->i_complete_seq[1]) == + atomic64_read(&ci->i_ordered_count); } static inline void ceph_dir_clear_complete(struct inode *inode) @@ -477,20 +510,13 @@ static inline void ceph_dir_clear_complete(struct inode *inode) static inline void ceph_dir_clear_ordered(struct inode *inode) { - struct ceph_inode_info *ci = ceph_inode(inode); - spin_lock(&ci->i_ceph_lock); - ci->i_ordered_count++; - ci->i_ceph_flags &= ~CEPH_I_DIR_ORDERED; - spin_unlock(&ci->i_ceph_lock); + __ceph_dir_clear_ordered(ceph_inode(inode)); } static inline bool ceph_dir_is_complete_ordered(struct inode *inode) { - struct ceph_inode_info *ci = ceph_inode(inode); - bool ret; - spin_lock(&ci->i_ceph_lock); - ret = __ceph_dir_is_complete_ordered(ci); - spin_unlock(&ci->i_ceph_lock); + bool ret = __ceph_dir_is_complete_ordered(ceph_inode(inode)); + smp_rmb(); return ret; } @@ -552,7 +578,10 @@ static inline int __ceph_caps_dirty(struct ceph_inode_info *ci) { return ci->i_dirty_caps | ci->i_flushing_caps; } -extern int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask); +extern struct ceph_cap_flush *ceph_alloc_cap_flush(void); +extern void ceph_free_cap_flush(struct ceph_cap_flush *cf); +extern int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask, + struct ceph_cap_flush **pcf); extern int __ceph_caps_revoking_other(struct ceph_inode_info *ci, struct ceph_cap *ocap, int mask); @@ -606,16 +635,20 @@ struct ceph_file_info { unsigned offset; /* offset of last chunk, adjusted for . and .. */ unsigned next_offset; /* offset of next chunk (last_name's + 1) */ char *last_name; /* last entry in previous chunk */ - struct dentry *dentry; /* next dentry (for dcache readdir) */ - int dir_release_count; - int dir_ordered_count; + long long dir_release_count; + long long dir_ordered_count; + int readdir_cache_idx; /* used for -o dirstat read() on directory thing */ char *dir_info; int dir_info_len; }; - +struct ceph_readdir_cache_control { + struct page *page; + struct dentry **dentries; + int index; +}; /* * A "snap realm" describes a subset of the file hierarchy sharing @@ -687,6 +720,7 @@ static inline int default_congestion_kb(void) /* snap.c */ +extern struct ceph_snap_context *ceph_empty_snapc; struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc, u64 ino); extern void ceph_get_snap_realm(struct ceph_mds_client *mdsc, @@ -713,8 +747,8 @@ extern void ceph_snap_exit(void); static inline bool __ceph_have_pending_cap_snap(struct ceph_inode_info *ci) { return !list_empty(&ci->i_cap_snaps) && - list_entry(ci->i_cap_snaps.prev, struct ceph_cap_snap, - ci_item)->writing; + list_last_entry(&ci->i_cap_snaps, struct ceph_cap_snap, + ci_item)->writing; } /* inode.c */ @@ -838,12 +872,12 @@ extern void ceph_put_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap); extern int ceph_is_any_caps(struct inode *inode); -extern void __queue_cap_release(struct ceph_mds_session *session, u64 ino, - u64 cap_id, u32 migrate_seq, u32 issue_seq); extern void ceph_queue_caps_release(struct inode *inode); extern int ceph_write_inode(struct inode *inode, struct writeback_control *wbc); extern int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync); +extern void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session); extern void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc, struct ceph_mds_session *session); extern struct ceph_cap *ceph_get_cap_for_mds(struct ceph_inode_info *ci, @@ -879,6 +913,9 @@ extern void ceph_put_fmode(struct ceph_inode_info *ci, int mode); /* addr.c */ extern const struct address_space_operations ceph_aops; extern int ceph_mmap(struct file *file, struct vm_area_struct *vma); +extern int ceph_uninline_data(struct file *filp, struct page *locked_page); +extern int ceph_pool_perm_check(struct ceph_inode_info *ci, int need); +extern void ceph_pool_perm_destroy(struct ceph_mds_client* mdsc); /* file.c */ extern const struct file_operations ceph_file_fops; @@ -890,7 +927,6 @@ extern int ceph_atomic_open(struct inode *dir, struct dentry *dentry, extern int ceph_release(struct inode *inode, struct file *filp); extern void ceph_fill_inline_data(struct inode *inode, struct page *locked_page, char *data, size_t len); -int ceph_uninline_data(struct file *filp, struct page *locked_page); /* dir.c */ extern const struct file_operations ceph_dir_fops; extern const struct file_operations ceph_snapdir_fops; @@ -911,6 +947,7 @@ extern void ceph_dentry_lru_del(struct dentry *dn); extern void ceph_invalidate_dentry_lease(struct dentry *dentry); extern unsigned ceph_dentry_hash(struct inode *dir, struct dentry *dn); extern struct inode *ceph_get_dentry_parent_inode(struct dentry *dentry); +extern void ceph_readdir_cache_release(struct ceph_readdir_cache_control *ctl); /* * our d_ops vary depending on whether the inode is live, diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c index cd7ffad4041d..819163d8313b 100644 --- a/fs/ceph/xattr.c +++ b/fs/ceph/xattr.c @@ -911,6 +911,8 @@ int __ceph_setxattr(struct dentry *dentry, const char *name, struct inode *inode = d_inode(dentry); struct ceph_vxattr *vxattr; struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc; + struct ceph_cap_flush *prealloc_cf = NULL; int issued; int err; int dirty = 0; @@ -920,6 +922,7 @@ int __ceph_setxattr(struct dentry *dentry, const char *name, char *newval = NULL; struct ceph_inode_xattr *xattr = NULL; int required_blob_size; + bool lock_snap_rwsem = false; if (!ceph_is_valid_xattr(name)) return -EOPNOTSUPP; @@ -948,12 +951,27 @@ int __ceph_setxattr(struct dentry *dentry, const char *name, if (!xattr) goto out; + prealloc_cf = ceph_alloc_cap_flush(); + if (!prealloc_cf) + goto out; + spin_lock(&ci->i_ceph_lock); retry: issued = __ceph_caps_issued(ci, NULL); - dout("setxattr %p issued %s\n", inode, ceph_cap_string(issued)); if (ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL)) goto do_sync; + + if (!lock_snap_rwsem && !ci->i_head_snapc) { + lock_snap_rwsem = true; + if (!down_read_trylock(&mdsc->snap_rwsem)) { + spin_unlock(&ci->i_ceph_lock); + down_read(&mdsc->snap_rwsem); + spin_lock(&ci->i_ceph_lock); + goto retry; + } + } + + dout("setxattr %p issued %s\n", inode, ceph_cap_string(issued)); __build_xattrs(inode); required_blob_size = __get_required_blob_size(ci, name_len, val_len); @@ -966,7 +984,7 @@ retry: dout(" preaallocating new blob size=%d\n", required_blob_size); blob = ceph_buffer_new(required_blob_size, GFP_NOFS); if (!blob) - goto out; + goto do_sync_unlocked; spin_lock(&ci->i_ceph_lock); if (ci->i_xattrs.prealloc_blob) ceph_buffer_put(ci->i_xattrs.prealloc_blob); @@ -978,21 +996,28 @@ retry: flags, value ? 1 : -1, &xattr); if (!err) { - dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL); + dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL, + &prealloc_cf); ci->i_xattrs.dirty = true; inode->i_ctime = CURRENT_TIME; } spin_unlock(&ci->i_ceph_lock); + if (lock_snap_rwsem) + up_read(&mdsc->snap_rwsem); if (dirty) __mark_inode_dirty(inode, dirty); + ceph_free_cap_flush(prealloc_cf); return err; do_sync: spin_unlock(&ci->i_ceph_lock); do_sync_unlocked: + if (lock_snap_rwsem) + up_read(&mdsc->snap_rwsem); err = ceph_sync_setxattr(dentry, name, value, size, flags); out: + ceph_free_cap_flush(prealloc_cf); kfree(newname); kfree(newval); kfree(xattr); @@ -1044,10 +1069,13 @@ int __ceph_removexattr(struct dentry *dentry, const char *name) struct inode *inode = d_inode(dentry); struct ceph_vxattr *vxattr; struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc; + struct ceph_cap_flush *prealloc_cf = NULL; int issued; int err; int required_blob_size; int dirty; + bool lock_snap_rwsem = false; if (!ceph_is_valid_xattr(name)) return -EOPNOTSUPP; @@ -1060,14 +1088,29 @@ int __ceph_removexattr(struct dentry *dentry, const char *name) if (!strncmp(name, XATTR_CEPH_PREFIX, XATTR_CEPH_PREFIX_LEN)) goto do_sync_unlocked; + prealloc_cf = ceph_alloc_cap_flush(); + if (!prealloc_cf) + return -ENOMEM; + err = -ENOMEM; spin_lock(&ci->i_ceph_lock); retry: issued = __ceph_caps_issued(ci, NULL); - dout("removexattr %p issued %s\n", inode, ceph_cap_string(issued)); - if (ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL)) goto do_sync; + + if (!lock_snap_rwsem && !ci->i_head_snapc) { + lock_snap_rwsem = true; + if (!down_read_trylock(&mdsc->snap_rwsem)) { + spin_unlock(&ci->i_ceph_lock); + down_read(&mdsc->snap_rwsem); + spin_lock(&ci->i_ceph_lock); + goto retry; + } + } + + dout("removexattr %p issued %s\n", inode, ceph_cap_string(issued)); + __build_xattrs(inode); required_blob_size = __get_required_blob_size(ci, 0, 0); @@ -1080,7 +1123,7 @@ retry: dout(" preaallocating new blob size=%d\n", required_blob_size); blob = ceph_buffer_new(required_blob_size, GFP_NOFS); if (!blob) - goto out; + goto do_sync_unlocked; spin_lock(&ci->i_ceph_lock); if (ci->i_xattrs.prealloc_blob) ceph_buffer_put(ci->i_xattrs.prealloc_blob); @@ -1090,18 +1133,24 @@ retry: err = __remove_xattr_by_name(ceph_inode(inode), name); - dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL); + dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL, + &prealloc_cf); ci->i_xattrs.dirty = true; inode->i_ctime = CURRENT_TIME; spin_unlock(&ci->i_ceph_lock); + if (lock_snap_rwsem) + up_read(&mdsc->snap_rwsem); if (dirty) __mark_inode_dirty(inode, dirty); + ceph_free_cap_flush(prealloc_cf); return err; do_sync: spin_unlock(&ci->i_ceph_lock); do_sync_unlocked: + if (lock_snap_rwsem) + up_read(&mdsc->snap_rwsem); + ceph_free_cap_flush(prealloc_cf); err = ceph_send_removexattr(dentry, name); -out: return err; } |