diff options
Diffstat (limited to 'fs')
-rw-r--r-- | fs/ceph/addr.c | 98 | ||||
-rw-r--r-- | fs/ceph/caps.c | 323 | ||||
-rw-r--r-- | fs/ceph/file.c | 127 | ||||
-rw-r--r-- | fs/ceph/mds_client.c | 23 | ||||
-rw-r--r-- | fs/ceph/mdsmap.c | 163 | ||||
-rw-r--r-- | fs/ceph/snap.c | 2 | ||||
-rw-r--r-- | fs/ceph/super.c | 10 | ||||
-rw-r--r-- | fs/ceph/super.h | 5 |
8 files changed, 540 insertions, 211 deletions
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index ef3ebd780aff..a0f1e2b91c8e 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -315,7 +315,32 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) struct page **pages; pgoff_t next_index; int nr_pages = 0; - int ret; + int got = 0; + int ret = 0; + + if (!current->journal_info) { + /* caller of readpages does not hold buffer and read caps + * (fadvise, madvise and readahead cases) */ + int want = CEPH_CAP_FILE_CACHE; + ret = ceph_try_get_caps(ci, CEPH_CAP_FILE_RD, want, &got); + if (ret < 0) { + dout("start_read %p, error getting cap\n", inode); + } else if (!(got & want)) { + dout("start_read %p, no cache cap\n", inode); + ret = 0; + } + if (ret <= 0) { + if (got) + ceph_put_cap_refs(ci, got); + while (!list_empty(page_list)) { + page = list_entry(page_list->prev, + struct page, lru); + list_del(&page->lru); + put_page(page); + } + return ret; + } + } off = (u64) page_offset(page); @@ -338,15 +363,18 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) CEPH_OSD_FLAG_READ, NULL, ci->i_truncate_seq, ci->i_truncate_size, false); - if (IS_ERR(req)) - return PTR_ERR(req); + if (IS_ERR(req)) { + ret = PTR_ERR(req); + goto out; + } /* build page vector */ nr_pages = calc_pages_for(0, len); pages = kmalloc(sizeof(*pages) * nr_pages, GFP_KERNEL); - ret = -ENOMEM; - if (!pages) - goto out; + if (!pages) { + ret = -ENOMEM; + goto out_put; + } for (i = 0; i < nr_pages; ++i) { page = list_entry(page_list->prev, struct page, lru); BUG_ON(PageLocked(page)); @@ -378,6 +406,12 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) if (ret < 0) goto out_pages; ceph_osdc_put_request(req); + + /* After adding locked pages to page cache, the inode holds cache cap. + * So we can drop our cap refs. */ + if (got) + ceph_put_cap_refs(ci, got); + return nr_pages; out_pages: @@ -386,8 +420,11 @@ out_pages: unlock_page(pages[i]); } ceph_put_page_vector(pages, nr_pages, false); -out: +out_put: ceph_osdc_put_request(req); +out: + if (got) + ceph_put_cap_refs(ci, got); return ret; } @@ -424,7 +461,6 @@ static int ceph_readpages(struct file *file, struct address_space *mapping, rc = start_read(inode, page_list, max); if (rc < 0) goto out; - BUG_ON(rc == 0); } out: ceph_fscache_readpages_cancel(inode, page_list); @@ -438,7 +474,9 @@ out: * only snap context we are allowed to write back. */ static struct ceph_snap_context *get_oldest_context(struct inode *inode, - loff_t *snap_size) + loff_t *snap_size, + u64 *truncate_size, + u32 *truncate_seq) { struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_snap_context *snapc = NULL; @@ -452,6 +490,10 @@ static struct ceph_snap_context *get_oldest_context(struct inode *inode, snapc = ceph_get_snap_context(capsnap->context); if (snap_size) *snap_size = capsnap->size; + if (truncate_size) + *truncate_size = capsnap->truncate_size; + if (truncate_seq) + *truncate_seq = capsnap->truncate_seq; break; } } @@ -459,6 +501,10 @@ static struct ceph_snap_context *get_oldest_context(struct inode *inode, snapc = ceph_get_snap_context(ci->i_head_snapc); dout(" head snapc %p has %d dirty pages\n", snapc, ci->i_wrbuffer_ref_head); + if (truncate_size) + *truncate_size = capsnap->truncate_size; + if (truncate_seq) + *truncate_seq = capsnap->truncate_seq; } spin_unlock(&ci->i_ceph_lock); return snapc; @@ -501,7 +547,8 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) dout("writepage %p page %p not dirty?\n", inode, page); goto out; } - oldest = get_oldest_context(inode, &snap_size); + oldest = get_oldest_context(inode, &snap_size, + &truncate_size, &truncate_seq); if (snapc->seq > oldest->seq) { dout("writepage %p page %p snapc %p not writeable - noop\n", inode, page, snapc); @@ -512,12 +559,8 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) } ceph_put_snap_context(oldest); - spin_lock(&ci->i_ceph_lock); - truncate_seq = ci->i_truncate_seq; - truncate_size = ci->i_truncate_size; if (snap_size == -1) snap_size = i_size_read(inode); - spin_unlock(&ci->i_ceph_lock); /* is this a partial page at end of file? */ if (page_off >= snap_size) { @@ -764,7 +807,8 @@ retry: /* find oldest snap context with dirty data */ ceph_put_snap_context(snapc); snap_size = -1; - snapc = get_oldest_context(inode, &snap_size); + snapc = get_oldest_context(inode, &snap_size, + &truncate_size, &truncate_seq); if (!snapc) { /* hmm, why does writepages get called when there is no dirty data? */ @@ -774,11 +818,7 @@ retry: dout(" oldest snapc is %p seq %lld (%d snaps)\n", snapc, snapc->seq, snapc->num_snaps); - spin_lock(&ci->i_ceph_lock); - truncate_seq = ci->i_truncate_seq; - truncate_size = ci->i_truncate_size; i_size = i_size_read(inode); - spin_unlock(&ci->i_ceph_lock); if (last_snapc && snapc != last_snapc) { /* if we switched to a newer snapc, restart our scan at the @@ -1124,7 +1164,8 @@ out: static int context_is_writeable_or_written(struct inode *inode, struct ceph_snap_context *snapc) { - struct ceph_snap_context *oldest = get_oldest_context(inode, NULL); + struct ceph_snap_context *oldest = get_oldest_context(inode, NULL, + NULL, NULL); int ret = !oldest || snapc->seq <= oldest->seq; ceph_put_snap_context(oldest); @@ -1169,7 +1210,7 @@ retry_locked: * this page is already dirty in another (older) snap * context! is it writeable now? */ - oldest = get_oldest_context(inode, NULL); + oldest = get_oldest_context(inode, NULL, NULL, NULL); if (snapc->seq > oldest->seq) { ceph_put_snap_context(oldest); @@ -1371,9 +1412,11 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf) inode, off, (size_t)PAGE_SIZE, ceph_cap_string(got)); if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) || - ci->i_inline_version == CEPH_INLINE_NONE) + ci->i_inline_version == CEPH_INLINE_NONE) { + current->journal_info = vma->vm_file; ret = filemap_fault(vma, vmf); - else + current->journal_info = NULL; + } else ret = -EAGAIN; dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n", @@ -1905,6 +1948,15 @@ int ceph_pool_perm_check(struct ceph_inode_info *ci, int need) struct ceph_string *pool_ns; int ret, flags; + if (ci->i_vino.snap != CEPH_NOSNAP) { + /* + * Pool permission check needs to write to the first object. + * But for snapshot, head of the first object may have alread + * been deleted. Skip check to avoid creating orphan object. + */ + return 0; + } + if (ceph_test_mount_opt(ceph_inode_to_client(&ci->vfs_inode), NOPOOLPERM)) return 0; diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 16e6ded0b7f2..baea866a6751 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -987,96 +987,127 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release) __cap_delay_cancel(mdsc, ci); } +struct cap_msg_args { + struct ceph_mds_session *session; + u64 ino, cid, follows; + u64 flush_tid, oldest_flush_tid, size, max_size; + u64 xattr_version; + struct ceph_buffer *xattr_buf; + struct timespec atime, mtime, ctime; + int op, caps, wanted, dirty; + u32 seq, issue_seq, mseq, time_warp_seq; + u32 flags; + kuid_t uid; + kgid_t gid; + umode_t mode; + bool inline_data; +}; + /* * Build and send a cap message to the given MDS. * * Caller should be holding s_mutex. */ -static int send_cap_msg(struct ceph_mds_session *session, - u64 ino, u64 cid, int op, - int caps, int wanted, int dirty, - u32 seq, u64 flush_tid, u64 oldest_flush_tid, - u32 issue_seq, u32 mseq, u64 size, u64 max_size, - struct timespec *mtime, struct timespec *atime, - struct timespec *ctime, u32 time_warp_seq, - kuid_t uid, kgid_t gid, umode_t mode, - u64 xattr_version, - struct ceph_buffer *xattrs_buf, - u64 follows, bool inline_data) +static int send_cap_msg(struct cap_msg_args *arg) { struct ceph_mds_caps *fc; struct ceph_msg *msg; void *p; size_t extra_len; + struct timespec zerotime = {0}; dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s" " seq %u/%u tid %llu/%llu mseq %u follows %lld size %llu/%llu" - " xattr_ver %llu xattr_len %d\n", ceph_cap_op_name(op), - cid, ino, ceph_cap_string(caps), ceph_cap_string(wanted), - ceph_cap_string(dirty), - seq, issue_seq, flush_tid, oldest_flush_tid, - mseq, follows, size, max_size, - xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0); + " xattr_ver %llu xattr_len %d\n", ceph_cap_op_name(arg->op), + arg->cid, arg->ino, ceph_cap_string(arg->caps), + ceph_cap_string(arg->wanted), ceph_cap_string(arg->dirty), + arg->seq, arg->issue_seq, arg->flush_tid, arg->oldest_flush_tid, + arg->mseq, arg->follows, arg->size, arg->max_size, + arg->xattr_version, + arg->xattr_buf ? (int)arg->xattr_buf->vec.iov_len : 0); /* flock buffer size + inline version + inline data size + * osd_epoch_barrier + oldest_flush_tid */ - extra_len = 4 + 8 + 4 + 4 + 8; + extra_len = 4 + 8 + 4 + 4 + 8 + 4 + 4 + 4 + 8 + 8 + 4; msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc) + extra_len, GFP_NOFS, false); if (!msg) return -ENOMEM; - msg->hdr.version = cpu_to_le16(6); - msg->hdr.tid = cpu_to_le64(flush_tid); + msg->hdr.version = cpu_to_le16(10); + msg->hdr.tid = cpu_to_le64(arg->flush_tid); fc = msg->front.iov_base; memset(fc, 0, sizeof(*fc)); - fc->cap_id = cpu_to_le64(cid); - fc->op = cpu_to_le32(op); - fc->seq = cpu_to_le32(seq); - fc->issue_seq = cpu_to_le32(issue_seq); - fc->migrate_seq = cpu_to_le32(mseq); - fc->caps = cpu_to_le32(caps); - fc->wanted = cpu_to_le32(wanted); - fc->dirty = cpu_to_le32(dirty); - fc->ino = cpu_to_le64(ino); - fc->snap_follows = cpu_to_le64(follows); - - fc->size = cpu_to_le64(size); - fc->max_size = cpu_to_le64(max_size); - if (mtime) - ceph_encode_timespec(&fc->mtime, mtime); - if (atime) - ceph_encode_timespec(&fc->atime, atime); - if (ctime) - ceph_encode_timespec(&fc->ctime, ctime); - fc->time_warp_seq = cpu_to_le32(time_warp_seq); - - fc->uid = cpu_to_le32(from_kuid(&init_user_ns, uid)); - fc->gid = cpu_to_le32(from_kgid(&init_user_ns, gid)); - fc->mode = cpu_to_le32(mode); + fc->cap_id = cpu_to_le64(arg->cid); + fc->op = cpu_to_le32(arg->op); + fc->seq = cpu_to_le32(arg->seq); + fc->issue_seq = cpu_to_le32(arg->issue_seq); + fc->migrate_seq = cpu_to_le32(arg->mseq); + fc->caps = cpu_to_le32(arg->caps); + fc->wanted = cpu_to_le32(arg->wanted); + fc->dirty = cpu_to_le32(arg->dirty); + fc->ino = cpu_to_le64(arg->ino); + fc->snap_follows = cpu_to_le64(arg->follows); + + fc->size = cpu_to_le64(arg->size); + fc->max_size = cpu_to_le64(arg->max_size); + ceph_encode_timespec(&fc->mtime, &arg->mtime); + ceph_encode_timespec(&fc->atime, &arg->atime); + ceph_encode_timespec(&fc->ctime, &arg->ctime); + fc->time_warp_seq = cpu_to_le32(arg->time_warp_seq); + + fc->uid = cpu_to_le32(from_kuid(&init_user_ns, arg->uid)); + fc->gid = cpu_to_le32(from_kgid(&init_user_ns, arg->gid)); + fc->mode = cpu_to_le32(arg->mode); + + fc->xattr_version = cpu_to_le64(arg->xattr_version); + if (arg->xattr_buf) { + msg->middle = ceph_buffer_get(arg->xattr_buf); + fc->xattr_len = cpu_to_le32(arg->xattr_buf->vec.iov_len); + msg->hdr.middle_len = cpu_to_le32(arg->xattr_buf->vec.iov_len); + } p = fc + 1; - /* flock buffer size */ + /* flock buffer size (version 2) */ ceph_encode_32(&p, 0); - /* inline version */ - ceph_encode_64(&p, inline_data ? 0 : CEPH_INLINE_NONE); + /* inline version (version 4) */ + ceph_encode_64(&p, arg->inline_data ? 0 : CEPH_INLINE_NONE); /* inline data size */ ceph_encode_32(&p, 0); - /* osd_epoch_barrier */ + /* osd_epoch_barrier (version 5) */ ceph_encode_32(&p, 0); - /* oldest_flush_tid */ - ceph_encode_64(&p, oldest_flush_tid); + /* oldest_flush_tid (version 6) */ + ceph_encode_64(&p, arg->oldest_flush_tid); - fc->xattr_version = cpu_to_le64(xattr_version); - if (xattrs_buf) { - msg->middle = ceph_buffer_get(xattrs_buf); - fc->xattr_len = cpu_to_le32(xattrs_buf->vec.iov_len); - msg->hdr.middle_len = cpu_to_le32(xattrs_buf->vec.iov_len); - } + /* + * caller_uid/caller_gid (version 7) + * + * Currently, we don't properly track which caller dirtied the caps + * last, and force a flush of them when there is a conflict. For now, + * just set this to 0:0, to emulate how the MDS has worked up to now. + */ + ceph_encode_32(&p, 0); + ceph_encode_32(&p, 0); + + /* pool namespace (version 8) (mds always ignores this) */ + ceph_encode_32(&p, 0); - ceph_con_send(&session->s_con, msg); + /* + * btime and change_attr (version 9) + * + * We just zero these out for now, as the MDS ignores them unless + * the requisite feature flags are set (which we don't do yet). + */ + ceph_encode_timespec(p, &zerotime); + p += sizeof(struct ceph_timespec); + ceph_encode_64(&p, 0); + + /* Advisory flags (version 10) */ + ceph_encode_32(&p, arg->flags); + + ceph_con_send(&arg->session->s_con, msg); return 0; } @@ -1115,27 +1146,17 @@ void ceph_queue_caps_release(struct inode *inode) * caller should hold snap_rwsem (read), s_mutex. */ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, - int op, int used, int want, int retain, int flushing, - u64 flush_tid, u64 oldest_flush_tid) + int op, bool sync, int used, int want, int retain, + int flushing, u64 flush_tid, u64 oldest_flush_tid) __releases(cap->ci->i_ceph_lock) { struct ceph_inode_info *ci = cap->ci; struct inode *inode = &ci->vfs_inode; - u64 cap_id = cap->cap_id; - int held, revoking, dropping, keep; - u64 follows, size, max_size; - u32 seq, issue_seq, mseq, time_warp_seq; - struct timespec mtime, atime, ctime; + struct cap_msg_args arg; + int held, revoking, dropping; int wake = 0; - umode_t mode; - kuid_t uid; - kgid_t gid; - struct ceph_mds_session *session; - u64 xattr_version = 0; - struct ceph_buffer *xattr_blob = NULL; int delayed = 0; int ret; - bool inline_data; held = cap->issued | cap->implemented; revoking = cap->implemented & ~cap->issued; @@ -1148,7 +1169,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, ceph_cap_string(revoking)); BUG_ON((retain & CEPH_CAP_PIN) == 0); - session = cap->session; + arg.session = cap->session; /* don't release wanted unless we've waited a bit. */ if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0 && @@ -1177,40 +1198,51 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, cap->implemented &= cap->issued | used; cap->mds_wanted = want; - follows = flushing ? ci->i_head_snapc->seq : 0; - - keep = cap->implemented; - seq = cap->seq; - issue_seq = cap->issue_seq; - mseq = cap->mseq; - size = inode->i_size; - ci->i_reported_size = size; - max_size = ci->i_wanted_max_size; - ci->i_requested_max_size = max_size; - mtime = inode->i_mtime; - atime = inode->i_atime; - ctime = inode->i_ctime; - time_warp_seq = ci->i_time_warp_seq; - uid = inode->i_uid; - gid = inode->i_gid; - mode = inode->i_mode; + arg.ino = ceph_vino(inode).ino; + arg.cid = cap->cap_id; + arg.follows = flushing ? ci->i_head_snapc->seq : 0; + arg.flush_tid = flush_tid; + arg.oldest_flush_tid = oldest_flush_tid; + + arg.size = inode->i_size; + ci->i_reported_size = arg.size; + arg.max_size = ci->i_wanted_max_size; + ci->i_requested_max_size = arg.max_size; if (flushing & CEPH_CAP_XATTR_EXCL) { __ceph_build_xattrs_blob(ci); - xattr_blob = ci->i_xattrs.blob; - xattr_version = ci->i_xattrs.version; + arg.xattr_version = ci->i_xattrs.version; + arg.xattr_buf = ci->i_xattrs.blob; + } else { + arg.xattr_buf = NULL; } - inline_data = ci->i_inline_version != CEPH_INLINE_NONE; + arg.mtime = inode->i_mtime; + arg.atime = inode->i_atime; + arg.ctime = inode->i_ctime; + + arg.op = op; + arg.caps = cap->implemented; + arg.wanted = want; + arg.dirty = flushing; + + arg.seq = cap->seq; + arg.issue_seq = cap->issue_seq; + arg.mseq = cap->mseq; + arg.time_warp_seq = ci->i_time_warp_seq; + + arg.uid = inode->i_uid; + arg.gid = inode->i_gid; + arg.mode = inode->i_mode; + + arg.inline_data = ci->i_inline_version != CEPH_INLINE_NONE; + arg.flags = 0; + if (sync) + arg.flags |= CEPH_CLIENT_CAPS_SYNC; spin_unlock(&ci->i_ceph_lock); - ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id, - op, keep, want, flushing, seq, - flush_tid, oldest_flush_tid, issue_seq, mseq, - size, max_size, &mtime, &atime, &ctime, time_warp_seq, - uid, gid, mode, xattr_version, xattr_blob, - follows, inline_data); + ret = send_cap_msg(&arg); if (ret < 0) { dout("error sending cap msg, must requeue %p\n", inode); delayed = 1; @@ -1227,15 +1259,42 @@ static inline int __send_flush_snap(struct inode *inode, struct ceph_cap_snap *capsnap, u32 mseq, u64 oldest_flush_tid) { - return send_cap_msg(session, ceph_vino(inode).ino, 0, - CEPH_CAP_OP_FLUSHSNAP, capsnap->issued, 0, - capsnap->dirty, 0, capsnap->cap_flush.tid, - oldest_flush_tid, 0, mseq, capsnap->size, 0, - &capsnap->mtime, &capsnap->atime, - &capsnap->ctime, capsnap->time_warp_seq, - capsnap->uid, capsnap->gid, capsnap->mode, - capsnap->xattr_version, capsnap->xattr_blob, - capsnap->follows, capsnap->inline_data); + struct cap_msg_args arg; + + arg.session = session; + arg.ino = ceph_vino(inode).ino; + arg.cid = 0; + arg.follows = capsnap->follows; + arg.flush_tid = capsnap->cap_flush.tid; + arg.oldest_flush_tid = oldest_flush_tid; + + arg.size = capsnap->size; + arg.max_size = 0; + arg.xattr_version = capsnap->xattr_version; + arg.xattr_buf = capsnap->xattr_blob; + + arg.atime = capsnap->atime; + arg.mtime = capsnap->mtime; + arg.ctime = capsnap->ctime; + + arg.op = CEPH_CAP_OP_FLUSHSNAP; + arg.caps = capsnap->issued; + arg.wanted = 0; + arg.dirty = capsnap->dirty; + + arg.seq = 0; + arg.issue_seq = 0; + arg.mseq = mseq; + arg.time_warp_seq = capsnap->time_warp_seq; + + arg.uid = capsnap->uid; + arg.gid = capsnap->gid; + arg.mode = capsnap->mode; + + arg.inline_data = capsnap->inline_data; + arg.flags = 0; + + return send_cap_msg(&arg); } /* @@ -1858,9 +1917,9 @@ ack: sent++; /* __send_cap drops i_ceph_lock */ - delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, cap_used, - want, retain, flushing, - flush_tid, oldest_flush_tid); + delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, false, + cap_used, want, retain, flushing, + flush_tid, oldest_flush_tid); goto retry; /* retake i_ceph_lock and restart our cap scan. */ } @@ -1924,9 +1983,9 @@ retry: &flush_tid, &oldest_flush_tid); /* __send_cap drops i_ceph_lock */ - delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, used, want, - (cap->issued | cap->implemented), - flushing, flush_tid, oldest_flush_tid); + delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, true, + used, want, (cap->issued | cap->implemented), + flushing, flush_tid, oldest_flush_tid); if (delayed) { spin_lock(&ci->i_ceph_lock); @@ -1996,7 +2055,7 @@ static int unsafe_request_wait(struct inode *inode) } spin_unlock(&ci->i_unsafe_lock); - dout("unsafe_requeset_wait %p wait on tid %llu %llu\n", + dout("unsafe_request_wait %p wait on tid %llu %llu\n", inode, req1 ? req1->r_tid : 0ULL, req2 ? req2->r_tid : 0ULL); if (req1) { ret = !wait_for_completion_timeout(&req1->r_safe_completion, @@ -2119,7 +2178,7 @@ static void __kick_flushing_caps(struct ceph_mds_client *mdsc, inode, cap, cf->tid, ceph_cap_string(cf->caps)); ci->i_ceph_flags |= CEPH_I_NODELAY; ret = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, - __ceph_caps_used(ci), + false, __ceph_caps_used(ci), __ceph_caps_wanted(ci), cap->issued | cap->implemented, cf->caps, cf->tid, oldest_flush_tid); @@ -2479,6 +2538,27 @@ static void check_max_size(struct inode *inode, loff_t endoff) ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); } +int ceph_try_get_caps(struct ceph_inode_info *ci, int need, int want, int *got) +{ + int ret, err = 0; + + BUG_ON(need & ~CEPH_CAP_FILE_RD); + BUG_ON(want & ~(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)); + ret = ceph_pool_perm_check(ci, need); + if (ret < 0) + return ret; + + ret = try_get_cap_refs(ci, need, want, 0, true, got, &err); + if (ret) { + if (err == -EAGAIN) { + ret = 0; + } else if (err < 0) { + ret = err; + } + } + return ret; +} + /* * Wait for caps, and take cap references. If we can't get a WR cap * due to a small max_size, make sure we check_max_size (and possibly @@ -2507,9 +2587,15 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, if (err < 0) ret = err; } else { - ret = wait_event_interruptible(ci->i_cap_wq, - try_get_cap_refs(ci, need, want, endoff, - true, &_got, &err)); + DEFINE_WAIT_FUNC(wait, woken_wake_function); + add_wait_queue(&ci->i_cap_wq, &wait); + + while (!try_get_cap_refs(ci, need, want, endoff, + true, &_got, &err)) + wait_woken(&wait, TASK_INTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT); + + remove_wait_queue(&ci->i_cap_wq, &wait); + if (err == -EAGAIN) continue; if (err < 0) @@ -3570,6 +3656,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, cap->cap_id = le64_to_cpu(h->cap_id); cap->mseq = mseq; cap->seq = seq; + cap->issue_seq = seq; spin_lock(&session->s_cap_lock); list_add_tail(&cap->session_caps, &session->s_cap_releases); diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 159fc8f1a6a0..045d30d26624 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -454,71 +454,60 @@ enum { * only return a short read to the caller if we hit EOF. */ static int striped_read(struct inode *inode, - u64 off, u64 len, + u64 pos, u64 len, struct page **pages, int num_pages, - int *checkeof) + int page_align, int *checkeof) { struct ceph_fs_client *fsc = ceph_inode_to_client(inode); struct ceph_inode_info *ci = ceph_inode(inode); - u64 pos, this_len, left; + u64 this_len; loff_t i_size; - int page_align, pages_left; - int read, ret; - struct page **page_pos; + int page_idx; + int ret, read = 0; bool hit_stripe, was_short; /* * we may need to do multiple reads. not atomic, unfortunately. */ - pos = off; - left = len; - page_pos = pages; - pages_left = num_pages; - read = 0; - more: - page_align = pos & ~PAGE_MASK; - this_len = left; + this_len = len; + page_idx = (page_align + read) >> PAGE_SHIFT; ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode), &ci->i_layout, pos, &this_len, - ci->i_truncate_seq, - ci->i_truncate_size, - page_pos, pages_left, page_align); + ci->i_truncate_seq, ci->i_truncate_size, + pages + page_idx, num_pages - page_idx, + ((page_align + read) & ~PAGE_MASK)); if (ret == -ENOENT) ret = 0; - hit_stripe = this_len < left; + hit_stripe = this_len < len; was_short = ret >= 0 && ret < this_len; - dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, left, read, + dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, len, read, ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : ""); i_size = i_size_read(inode); if (ret >= 0) { - int didpages; if (was_short && (pos + ret < i_size)) { int zlen = min(this_len - ret, i_size - pos - ret); - int zoff = (off & ~PAGE_MASK) + read + ret; + int zoff = page_align + read + ret; dout(" zero gap %llu to %llu\n", - pos + ret, pos + ret + zlen); + pos + ret, pos + ret + zlen); ceph_zero_page_vector_range(zoff, zlen, pages); ret += zlen; } - didpages = (page_align + ret) >> PAGE_SHIFT; + read += ret; pos += ret; - read = pos - off; - left -= ret; - page_pos += didpages; - pages_left -= didpages; + len -= ret; /* hit stripe and need continue*/ - if (left && hit_stripe && pos < i_size) + if (len && hit_stripe && pos < i_size) goto more; } if (read > 0) { ret = read; /* did we bounce off eof? */ - if (pos + left > i_size) + if (pos + len > i_size) *checkeof = CHECK_EOF; } @@ -532,15 +521,16 @@ more: * * If the read spans object boundary, just do multiple reads. */ -static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i, - int *checkeof) +static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to, + int *checkeof) { struct file *file = iocb->ki_filp; struct inode *inode = file_inode(file); struct page **pages; u64 off = iocb->ki_pos; - int num_pages, ret; - size_t len = iov_iter_count(i); + int num_pages; + ssize_t ret; + size_t len = iov_iter_count(to); dout("sync_read on file %p %llu~%u %s\n", file, off, (unsigned)len, @@ -559,35 +549,56 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i, if (ret < 0) return ret; - num_pages = calc_pages_for(off, len); - pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); - if (IS_ERR(pages)) - return PTR_ERR(pages); - ret = striped_read(inode, off, len, pages, - num_pages, checkeof); - if (ret > 0) { - int l, k = 0; - size_t left = ret; - - while (left) { - size_t page_off = off & ~PAGE_MASK; - size_t copy = min_t(size_t, left, - PAGE_SIZE - page_off); - l = copy_page_to_iter(pages[k++], page_off, copy, i); - off += l; - left -= l; - if (l < copy) - break; + if (unlikely(to->type & ITER_PIPE)) { + size_t page_off; + ret = iov_iter_get_pages_alloc(to, &pages, len, + &page_off); + if (ret <= 0) + return -ENOMEM; + num_pages = DIV_ROUND_UP(ret + page_off, PAGE_SIZE); + + ret = striped_read(inode, off, ret, pages, num_pages, + page_off, checkeof); + if (ret > 0) { + iov_iter_advance(to, ret); + off += ret; + } else { + iov_iter_advance(to, 0); + } + ceph_put_page_vector(pages, num_pages, false); + } else { + num_pages = calc_pages_for(off, len); + pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); + if (IS_ERR(pages)) + return PTR_ERR(pages); + + ret = striped_read(inode, off, len, pages, num_pages, + (off & ~PAGE_MASK), checkeof); + if (ret > 0) { + int l, k = 0; + size_t left = ret; + + while (left) { + size_t page_off = off & ~PAGE_MASK; + size_t copy = min_t(size_t, left, + PAGE_SIZE - page_off); + l = copy_page_to_iter(pages[k++], page_off, + copy, to); + off += l; + left -= l; + if (l < copy) + break; + } } + ceph_release_page_vector(pages, num_pages); } - ceph_release_page_vector(pages, num_pages); if (off > iocb->ki_pos) { ret = off - iocb->ki_pos; iocb->ki_pos = off; } - dout("sync_read result %d\n", ret); + dout("sync_read result %zd\n", ret); return ret; } @@ -849,7 +860,7 @@ void ceph_sync_write_wait(struct inode *inode) dout("sync_write_wait on tid %llu (until %llu)\n", req->r_tid, last_tid); - wait_for_completion(&req->r_safe_completion); + wait_for_completion(&req->r_done_completion); ceph_osdc_put_request(req); spin_lock(&ci->i_unsafe_lock); @@ -902,7 +913,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, pos >> PAGE_SHIFT, (pos + count) >> PAGE_SHIFT); if (ret2 < 0) - dout("invalidate_inode_pages2_range returned %d\n", ret); + dout("invalidate_inode_pages2_range returned %d\n", ret2); flags = CEPH_OSD_FLAG_ORDERSNAP | CEPH_OSD_FLAG_ONDISK | @@ -1245,8 +1256,9 @@ again: dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n", inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, ceph_cap_string(got)); - + current->journal_info = filp; ret = generic_file_read_iter(iocb, to); + current->journal_info = NULL; } dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n", inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret); @@ -1766,6 +1778,7 @@ const struct file_operations ceph_file_fops = { .fsync = ceph_fsync, .lock = ceph_lock, .flock = ceph_flock, + .splice_read = generic_file_splice_read, .splice_write = iter_file_splice_write, .unlocked_ioctl = ceph_ioctl, .compat_ioctl = ceph_ioctl, diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 815acd1a56d4..4f49253387a0 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -2100,17 +2100,26 @@ static int __do_request(struct ceph_mds_client *mdsc, err = -EIO; goto finish; } + if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) { + if (mdsc->mdsmap_err) { + err = mdsc->mdsmap_err; + dout("do_request mdsmap err %d\n", err); + goto finish; + } + if (!(mdsc->fsc->mount_options->flags & + CEPH_MOUNT_OPT_MOUNTWAIT) && + !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) { + err = -ENOENT; + pr_info("probably no mds server is up\n"); + goto finish; + } + } put_request_session(req); mds = __choose_mds(mdsc, req); if (mds < 0 || ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { - if (mdsc->mdsmap_err) { - err = mdsc->mdsmap_err; - dout("do_request mdsmap err %d\n", err); - goto finish; - } dout("do_request no mds or not active, waiting for map\n"); list_add(&req->r_wait, &mdsc->waiting_for_map); goto out; @@ -3943,13 +3952,13 @@ static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, } -static int verify_authorizer_reply(struct ceph_connection *con, int len) +static int verify_authorizer_reply(struct ceph_connection *con) { struct ceph_mds_session *s = con->private; struct ceph_mds_client *mdsc = s->s_mdsc; struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; - return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer, len); + return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer); } static int invalidate_authorizer(struct ceph_connection *con) diff --git a/fs/ceph/mdsmap.c b/fs/ceph/mdsmap.c index 8c3591a7fbae..5454e2327a5f 100644 --- a/fs/ceph/mdsmap.c +++ b/fs/ceph/mdsmap.c @@ -42,6 +42,60 @@ int ceph_mdsmap_get_random_mds(struct ceph_mdsmap *m) return i; } +#define __decode_and_drop_type(p, end, type, bad) \ + do { \ + if (*p + sizeof(type) > end) \ + goto bad; \ + *p += sizeof(type); \ + } while (0) + +#define __decode_and_drop_set(p, end, type, bad) \ + do { \ + u32 n; \ + size_t need; \ + ceph_decode_32_safe(p, end, n, bad); \ + need = sizeof(type) * n; \ + ceph_decode_need(p, end, need, bad); \ + *p += need; \ + } while (0) + +#define __decode_and_drop_map(p, end, ktype, vtype, bad) \ + do { \ + u32 n; \ + size_t need; \ + ceph_decode_32_safe(p, end, n, bad); \ + need = (sizeof(ktype) + sizeof(vtype)) * n; \ + ceph_decode_need(p, end, need, bad); \ + *p += need; \ + } while (0) + + +static int __decode_and_drop_compat_set(void **p, void* end) +{ + int i; + /* compat, ro_compat, incompat*/ + for (i = 0; i < 3; i++) { + u32 n; + ceph_decode_need(p, end, sizeof(u64) + sizeof(u32), bad); + /* mask */ + *p += sizeof(u64); + /* names (map<u64, string>) */ + n = ceph_decode_32(p); + while (n-- > 0) { + u32 len; + ceph_decode_need(p, end, sizeof(u64) + sizeof(u32), + bad); + *p += sizeof(u64); + len = ceph_decode_32(p); + ceph_decode_need(p, end, len, bad); + *p += len; + } + } + return 0; +bad: + return -1; +} + /* * Decode an MDS map * @@ -55,6 +109,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end) int i, j, n; int err = -EINVAL; u8 mdsmap_v, mdsmap_cv; + u16 mdsmap_ev; m = kzalloc(sizeof(*m), GFP_NOFS); if (m == NULL) @@ -83,7 +138,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end) m->m_info = kcalloc(m->m_max_mds, sizeof(*m->m_info), GFP_NOFS); if (m->m_info == NULL) - goto badmem; + goto nomem; /* pick out active nodes from mds_info (state > 0) */ n = ceph_decode_32(p); @@ -166,7 +221,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end) info->export_targets = kcalloc(num_export_targets, sizeof(u32), GFP_NOFS); if (info->export_targets == NULL) - goto badmem; + goto nomem; for (j = 0; j < num_export_targets; j++) info->export_targets[j] = ceph_decode_32(&pexport_targets); @@ -180,24 +235,104 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end) m->m_num_data_pg_pools = n; m->m_data_pg_pools = kcalloc(n, sizeof(u64), GFP_NOFS); if (!m->m_data_pg_pools) - goto badmem; + goto nomem; ceph_decode_need(p, end, sizeof(u64)*(n+1), bad); for (i = 0; i < n; i++) m->m_data_pg_pools[i] = ceph_decode_64(p); m->m_cas_pg_pool = ceph_decode_64(p); + m->m_enabled = m->m_epoch > 1; + + mdsmap_ev = 1; + if (mdsmap_v >= 2) { + ceph_decode_16_safe(p, end, mdsmap_ev, bad_ext); + } + if (mdsmap_ev >= 3) { + if (__decode_and_drop_compat_set(p, end) < 0) + goto bad_ext; + } + /* metadata_pool */ + if (mdsmap_ev < 5) { + __decode_and_drop_type(p, end, u32, bad_ext); + } else { + __decode_and_drop_type(p, end, u64, bad_ext); + } - /* ok, we don't care about the rest. */ + /* created + modified + tableserver */ + __decode_and_drop_type(p, end, struct ceph_timespec, bad_ext); + __decode_and_drop_type(p, end, struct ceph_timespec, bad_ext); + __decode_and_drop_type(p, end, u32, bad_ext); + + /* in */ + { + int num_laggy = 0; + ceph_decode_32_safe(p, end, n, bad_ext); + ceph_decode_need(p, end, sizeof(u32) * n, bad_ext); + + for (i = 0; i < n; i++) { + s32 mds = ceph_decode_32(p); + if (mds >= 0 && mds < m->m_max_mds) { + if (m->m_info[mds].laggy) + num_laggy++; + } + } + m->m_num_laggy = num_laggy; + } + + /* inc */ + __decode_and_drop_map(p, end, u32, u32, bad_ext); + /* up */ + __decode_and_drop_map(p, end, u32, u64, bad_ext); + /* failed */ + __decode_and_drop_set(p, end, u32, bad_ext); + /* stopped */ + __decode_and_drop_set(p, end, u32, bad_ext); + + if (mdsmap_ev >= 4) { + /* last_failure_osd_epoch */ + __decode_and_drop_type(p, end, u32, bad_ext); + } + if (mdsmap_ev >= 6) { + /* ever_allowed_snaps */ + __decode_and_drop_type(p, end, u8, bad_ext); + /* explicitly_allowed_snaps */ + __decode_and_drop_type(p, end, u8, bad_ext); + } + if (mdsmap_ev >= 7) { + /* inline_data_enabled */ + __decode_and_drop_type(p, end, u8, bad_ext); + } + if (mdsmap_ev >= 8) { + u32 name_len; + /* enabled */ + ceph_decode_8_safe(p, end, m->m_enabled, bad_ext); + ceph_decode_32_safe(p, end, name_len, bad_ext); + ceph_decode_need(p, end, name_len, bad_ext); + *p += name_len; + } + /* damaged */ + if (mdsmap_ev >= 9) { + size_t need; + ceph_decode_32_safe(p, end, n, bad_ext); + need = sizeof(u32) * n; + ceph_decode_need(p, end, need, bad_ext); + *p += need; + m->m_damaged = n > 0; + } else { + m->m_damaged = false; + } +bad_ext: *p = end; dout("mdsmap_decode success epoch %u\n", m->m_epoch); return m; - -badmem: +nomem: err = -ENOMEM; + goto out_err; bad: pr_err("corrupt mdsmap\n"); print_hex_dump(KERN_DEBUG, "mdsmap: ", DUMP_PREFIX_OFFSET, 16, 1, start, end - start, true); +out_err: ceph_mdsmap_destroy(m); return ERR_PTR(err); } @@ -212,3 +347,19 @@ void ceph_mdsmap_destroy(struct ceph_mdsmap *m) kfree(m->m_data_pg_pools); kfree(m); } + +bool ceph_mdsmap_is_cluster_available(struct ceph_mdsmap *m) +{ + int i, nr_active = 0; + if (!m->m_enabled) + return false; + if (m->m_damaged) + return false; + if (m->m_num_laggy > 0) + return false; + for (i = 0; i < m->m_max_mds; i++) { + if (m->m_info[i].state == CEPH_MDS_STATE_ACTIVE) + nr_active++; + } + return nr_active > 0; +} diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c index 9ff5219d849e..8f8b41c2ef0f 100644 --- a/fs/ceph/snap.c +++ b/fs/ceph/snap.c @@ -593,6 +593,8 @@ int __ceph_finish_cap_snap(struct ceph_inode_info *ci, capsnap->atime = inode->i_atime; capsnap->ctime = inode->i_ctime; capsnap->time_warp_seq = ci->i_time_warp_seq; + capsnap->truncate_size = ci->i_truncate_size; + capsnap->truncate_seq = ci->i_truncate_seq; if (capsnap->dirty_pages) { dout("finish_cap_snap %p cap_snap %p snapc %p %llu %s s=%llu " "still has %d dirty pages\n", inode, capsnap, diff --git a/fs/ceph/super.c b/fs/ceph/super.c index f2f76696ddca..6bd20d707bfd 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -137,6 +137,8 @@ enum { Opt_nofscache, Opt_poolperm, Opt_nopoolperm, + Opt_require_active_mds, + Opt_norequire_active_mds, #ifdef CONFIG_CEPH_FS_POSIX_ACL Opt_acl, #endif @@ -171,6 +173,8 @@ static match_table_t fsopt_tokens = { {Opt_nofscache, "nofsc"}, {Opt_poolperm, "poolperm"}, {Opt_nopoolperm, "nopoolperm"}, + {Opt_require_active_mds, "require_active_mds"}, + {Opt_norequire_active_mds, "norequire_active_mds"}, #ifdef CONFIG_CEPH_FS_POSIX_ACL {Opt_acl, "acl"}, #endif @@ -287,6 +291,12 @@ static int parse_fsopt_token(char *c, void *private) case Opt_nopoolperm: fsopt->flags |= CEPH_MOUNT_OPT_NOPOOLPERM; break; + case Opt_require_active_mds: + fsopt->flags &= ~CEPH_MOUNT_OPT_MOUNTWAIT; + break; + case Opt_norequire_active_mds: + fsopt->flags |= CEPH_MOUNT_OPT_MOUNTWAIT; + break; #ifdef CONFIG_CEPH_FS_POSIX_ACL case Opt_acl: fsopt->sb_flags |= MS_POSIXACL; diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 931687f71a7c..3373b61faefd 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -36,6 +36,7 @@ #define CEPH_MOUNT_OPT_DCACHE (1<<9) /* use dcache for readdir etc */ #define CEPH_MOUNT_OPT_FSCACHE (1<<10) /* use fscache */ #define CEPH_MOUNT_OPT_NOPOOLPERM (1<<11) /* no pool permission check */ +#define CEPH_MOUNT_OPT_MOUNTWAIT (1<<12) /* mount waits if no mds is up */ #define CEPH_MOUNT_OPT_DEFAULT CEPH_MOUNT_OPT_DCACHE @@ -180,6 +181,8 @@ struct ceph_cap_snap { u64 size; struct timespec mtime, atime, ctime; u64 time_warp_seq; + u64 truncate_size; + u32 truncate_seq; int writing; /* a sync write is still in progress */ int dirty_pages; /* dirty pages awaiting writeback */ bool inline_data; @@ -905,6 +908,8 @@ extern int ceph_encode_dentry_release(void **p, struct dentry *dn, extern int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, loff_t endoff, int *got, struct page **pinned_page); +extern int ceph_try_get_caps(struct ceph_inode_info *ci, + int need, int want, int *got); /* for counting open files by mode */ extern void __ceph_get_fmode(struct ceph_inode_info *ci, int mode); |