diff options
Diffstat (limited to 'fs/ceph/file.c')
-rw-r--r-- | fs/ceph/file.c | 299 |
1 files changed, 257 insertions, 42 deletions
diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 2ddf061c1c4a..3de89829e2a1 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -8,9 +8,11 @@ #include <linux/namei.h> #include <linux/writeback.h> #include <linux/aio.h> +#include <linux/falloc.h> #include "super.h" #include "mds_client.h" +#include "cache.h" /* * Ceph file operations @@ -68,9 +70,23 @@ static int ceph_init_file(struct inode *inode, struct file *file, int fmode) { struct ceph_file_info *cf; int ret = 0; + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb); + struct ceph_mds_client *mdsc = fsc->mdsc; switch (inode->i_mode & S_IFMT) { case S_IFREG: + /* First file open request creates the cookie, we want to keep + * this cookie around for the filetime of the inode as not to + * have to worry about fscache register / revoke / operation + * races. + * + * Also, if we know the operation is going to invalidate data + * (non readonly) just nuke the cache right away. + */ + ceph_fscache_register_inode_cookie(mdsc->fsc, ci); + if ((fmode & CEPH_FILE_MODE_WR)) + ceph_fscache_invalidate(inode); case S_IFDIR: dout("init_file %p %p 0%o (regular)\n", inode, file, inode->i_mode); @@ -181,6 +197,7 @@ int ceph_open(struct inode *inode, struct file *file) spin_unlock(&ci->i_ceph_lock); return ceph_init_file(inode, file, fmode); } + spin_unlock(&ci->i_ceph_lock); dout("open fmode %d wants %s\n", fmode, ceph_cap_string(wanted)); @@ -191,6 +208,7 @@ int ceph_open(struct inode *inode, struct file *file) } req->r_inode = inode; ihold(inode); + req->r_num_caps = 1; if (flags & (O_CREAT|O_TRUNC)) parent_inode = ceph_get_dentry_parent_inode(file->f_dentry); @@ -313,9 +331,9 @@ static int striped_read(struct inode *inode, { struct ceph_fs_client *fsc = ceph_inode_to_client(inode); struct ceph_inode_info *ci = ceph_inode(inode); - u64 pos, this_len; + u64 pos, this_len, left; int io_align, page_align; - int left, pages_left; + int pages_left; int read; struct page **page_pos; int ret; @@ -346,47 +364,40 @@ more: ret = 0; hit_stripe = this_len < left; was_short = ret >= 0 && ret < this_len; - dout("striped_read %llu~%u (read %u) got %d%s%s\n", pos, left, read, + dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, left, read, ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : ""); - if (ret > 0) { - int didpages = (page_align + ret) >> PAGE_CACHE_SHIFT; - - if (read < pos - off) { - dout(" zero gap %llu to %llu\n", off + read, pos); - ceph_zero_page_vector_range(page_align + read, - pos - off - read, pages); + if (ret >= 0) { + int didpages; + if (was_short && (pos + ret < inode->i_size)) { + u64 tmp = min(this_len - ret, + inode->i_size - pos - ret); + dout(" zero gap %llu to %llu\n", + pos + ret, pos + ret + tmp); + ceph_zero_page_vector_range(page_align + read + ret, + tmp, pages); + ret += tmp; } + + didpages = (page_align + ret) >> PAGE_CACHE_SHIFT; pos += ret; read = pos - off; left -= ret; page_pos += didpages; pages_left -= didpages; - /* hit stripe? */ - if (left && hit_stripe) + /* hit stripe and need continue*/ + if (left && hit_stripe && pos < inode->i_size) goto more; } - if (was_short) { + if (read > 0) { + ret = read; /* did we bounce off eof? */ if (pos + left > inode->i_size) *checkeof = 1; - - /* zero trailing bytes (inside i_size) */ - if (left > 0 && pos < inode->i_size) { - if (pos + left > inode->i_size) - left = inode->i_size - pos; - - dout("zero tail %d\n", left); - ceph_zero_page_vector_range(page_align + read, left, - pages); - read += left; - } } - if (ret >= 0) - ret = read; dout("striped_read returns %d\n", ret); return ret; } @@ -618,6 +629,8 @@ out: if (check_caps) ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, NULL); + } else if (ret != -EOLDSNAPC && written > 0) { + ret = written; } return ret; } @@ -659,7 +672,6 @@ again: if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 || (iocb->ki_filp->f_flags & O_DIRECT) || - (inode->i_sb->s_flags & MS_SYNCHRONOUS) || (fi->flags & CEPH_F_SYNC)) /* hmm, this isn't really async... */ ret = ceph_sync_read(filp, base, len, ppos, &checkeof); @@ -711,13 +723,11 @@ static ssize_t ceph_aio_write(struct kiocb *iocb, const struct iovec *iov, &ceph_sb_to_client(inode->i_sb)->client->osdc; ssize_t count, written = 0; int err, want, got; - bool hold_mutex; if (ceph_snap(inode) != CEPH_NOSNAP) return -EROFS; mutex_lock(&inode->i_mutex); - hold_mutex = true; err = generic_segment_checks(iov, &nr_segs, &count, VERIFY_READ); if (err) @@ -763,18 +773,31 @@ retry_snap: if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 || (iocb->ki_filp->f_flags & O_DIRECT) || - (inode->i_sb->s_flags & MS_SYNCHRONOUS) || (fi->flags & CEPH_F_SYNC)) { mutex_unlock(&inode->i_mutex); written = ceph_sync_write(file, iov->iov_base, count, pos, &iocb->ki_pos); + if (written == -EOLDSNAPC) { + dout("aio_write %p %llx.%llx %llu~%u" + "got EOLDSNAPC, retrying\n", + inode, ceph_vinop(inode), + pos, (unsigned)iov->iov_len); + mutex_lock(&inode->i_mutex); + goto retry_snap; + } } else { + /* + * No need to acquire the i_truncate_mutex. Because + * the MDS revokes Fwb caps before sending truncate + * message to us. We can't get Fwb cap while there + * are pending vmtruncate. So write and vmtruncate + * can not run at the same time + */ written = generic_file_buffered_write(iocb, iov, nr_segs, pos, &iocb->ki_pos, count, 0); mutex_unlock(&inode->i_mutex); } - hold_mutex = false; if (written >= 0) { int dirty; @@ -798,18 +821,12 @@ retry_snap: written = err; } - if (written == -EOLDSNAPC) { - dout("aio_write %p %llx.%llx %llu~%u got EOLDSNAPC, retrying\n", - inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len); - mutex_lock(&inode->i_mutex); - hold_mutex = true; - goto retry_snap; - } + goto out_unlocked; + out: - if (hold_mutex) - mutex_unlock(&inode->i_mutex); + mutex_unlock(&inode->i_mutex); +out_unlocked: current->backing_dev_info = NULL; - return written ? written : err; } @@ -822,7 +839,6 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int whence) int ret; mutex_lock(&inode->i_mutex); - __ceph_do_pending_vmtruncate(inode); if (whence == SEEK_END || whence == SEEK_DATA || whence == SEEK_HOLE) { ret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE); @@ -871,6 +887,204 @@ out: return offset; } +static inline void ceph_zero_partial_page( + struct inode *inode, loff_t offset, unsigned size) +{ + struct page *page; + pgoff_t index = offset >> PAGE_CACHE_SHIFT; + + page = find_lock_page(inode->i_mapping, index); + if (page) { + wait_on_page_writeback(page); + zero_user(page, offset & (PAGE_CACHE_SIZE - 1), size); + unlock_page(page); + page_cache_release(page); + } +} + +static void ceph_zero_pagecache_range(struct inode *inode, loff_t offset, + loff_t length) +{ + loff_t nearly = round_up(offset, PAGE_CACHE_SIZE); + if (offset < nearly) { + loff_t size = nearly - offset; + if (length < size) + size = length; + ceph_zero_partial_page(inode, offset, size); + offset += size; + length -= size; + } + if (length >= PAGE_CACHE_SIZE) { + loff_t size = round_down(length, PAGE_CACHE_SIZE); + truncate_pagecache_range(inode, offset, offset + size - 1); + offset += size; + length -= size; + } + if (length) + ceph_zero_partial_page(inode, offset, length); +} + +static int ceph_zero_partial_object(struct inode *inode, + loff_t offset, loff_t *length) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_fs_client *fsc = ceph_inode_to_client(inode); + struct ceph_osd_request *req; + int ret = 0; + loff_t zero = 0; + int op; + + if (!length) { + op = offset ? CEPH_OSD_OP_DELETE : CEPH_OSD_OP_TRUNCATE; + length = &zero; + } else { + op = CEPH_OSD_OP_ZERO; + } + + req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, + ceph_vino(inode), + offset, length, + 1, op, + CEPH_OSD_FLAG_WRITE | + CEPH_OSD_FLAG_ONDISK, + NULL, 0, 0, false); + if (IS_ERR(req)) { + ret = PTR_ERR(req); + goto out; + } + + ceph_osdc_build_request(req, offset, NULL, ceph_vino(inode).snap, + &inode->i_mtime); + + ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); + if (!ret) { + ret = ceph_osdc_wait_request(&fsc->client->osdc, req); + if (ret == -ENOENT) + ret = 0; + } + ceph_osdc_put_request(req); + +out: + return ret; +} + +static int ceph_zero_objects(struct inode *inode, loff_t offset, loff_t length) +{ + int ret = 0; + struct ceph_inode_info *ci = ceph_inode(inode); + s32 stripe_unit = ceph_file_layout_su(ci->i_layout); + s32 stripe_count = ceph_file_layout_stripe_count(ci->i_layout); + s32 object_size = ceph_file_layout_object_size(ci->i_layout); + u64 object_set_size = object_size * stripe_count; + u64 nearly, t; + + /* round offset up to next period boundary */ + nearly = offset + object_set_size - 1; + t = nearly; + nearly -= do_div(t, object_set_size); + + while (length && offset < nearly) { + loff_t size = length; + ret = ceph_zero_partial_object(inode, offset, &size); + if (ret < 0) + return ret; + offset += size; + length -= size; + } + while (length >= object_set_size) { + int i; + loff_t pos = offset; + for (i = 0; i < stripe_count; ++i) { + ret = ceph_zero_partial_object(inode, pos, NULL); + if (ret < 0) + return ret; + pos += stripe_unit; + } + offset += object_set_size; + length -= object_set_size; + } + while (length) { + loff_t size = length; + ret = ceph_zero_partial_object(inode, offset, &size); + if (ret < 0) + return ret; + offset += size; + length -= size; + } + return ret; +} + +static long ceph_fallocate(struct file *file, int mode, + loff_t offset, loff_t length) +{ + struct ceph_file_info *fi = file->private_data; + struct inode *inode = file->f_dentry->d_inode; + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_osd_client *osdc = + &ceph_inode_to_client(inode)->client->osdc; + int want, got = 0; + int dirty; + int ret = 0; + loff_t endoff = 0; + loff_t size; + + if (!S_ISREG(inode->i_mode)) + return -EOPNOTSUPP; + + if (IS_SWAPFILE(inode)) + return -ETXTBSY; + + mutex_lock(&inode->i_mutex); + + if (ceph_snap(inode) != CEPH_NOSNAP) { + ret = -EROFS; + goto unlock; + } + + if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) && + !(mode & FALLOC_FL_PUNCH_HOLE)) { + ret = -ENOSPC; + goto unlock; + } + + size = i_size_read(inode); + if (!(mode & FALLOC_FL_KEEP_SIZE)) + endoff = offset + length; + + if (fi->fmode & CEPH_FILE_MODE_LAZY) + want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO; + else + want = CEPH_CAP_FILE_BUFFER; + + ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, endoff); + if (ret < 0) + goto unlock; + + if (mode & FALLOC_FL_PUNCH_HOLE) { + if (offset < size) + ceph_zero_pagecache_range(inode, offset, length); + ret = ceph_zero_objects(inode, offset, length); + } else if (endoff > size) { + truncate_pagecache_range(inode, size, -1); + if (ceph_inode_set_size(inode, endoff)) + ceph_check_caps(ceph_inode(inode), + CHECK_CAPS_AUTHONLY, NULL); + } + + if (!ret) { + spin_lock(&ci->i_ceph_lock); + dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); + spin_unlock(&ci->i_ceph_lock); + if (dirty) + __mark_inode_dirty(inode, dirty); + } + + ceph_put_cap_refs(ci, got); +unlock: + mutex_unlock(&inode->i_mutex); + return ret; +} + const struct file_operations ceph_file_fops = { .open = ceph_open, .release = ceph_release, @@ -887,5 +1101,6 @@ const struct file_operations ceph_file_fops = { .splice_write = generic_file_splice_write, .unlocked_ioctl = ceph_ioctl, .compat_ioctl = ceph_ioctl, + .fallocate = ceph_fallocate, }; |