diff options
Diffstat (limited to 'fs/ceph')
-rw-r--r-- | fs/ceph/caps.c | 93 | ||||
-rw-r--r-- | fs/ceph/debugfs.c | 40 | ||||
-rw-r--r-- | fs/ceph/export.c | 356 | ||||
-rw-r--r-- | fs/ceph/file.c | 2 | ||||
-rw-r--r-- | fs/ceph/inode.c | 85 | ||||
-rw-r--r-- | fs/ceph/locks.c | 13 | ||||
-rw-r--r-- | fs/ceph/mds_client.c | 205 | ||||
-rw-r--r-- | fs/ceph/mds_client.h | 33 | ||||
-rw-r--r-- | fs/ceph/mdsmap.c | 2 | ||||
-rw-r--r-- | fs/ceph/quota.c | 177 | ||||
-rw-r--r-- | fs/ceph/super.c | 7 | ||||
-rw-r--r-- | fs/ceph/super.h | 2 |
12 files changed, 751 insertions, 264 deletions
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 36a8dc699448..72f8e1311392 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -892,8 +892,8 @@ int __ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask, int touch) int have = ci->i_snap_caps; if ((have & mask) == mask) { - dout("__ceph_caps_issued_mask %p snap issued %s" - " (mask %s)\n", &ci->vfs_inode, + dout("__ceph_caps_issued_mask ino 0x%lx snap issued %s" + " (mask %s)\n", ci->vfs_inode.i_ino, ceph_cap_string(have), ceph_cap_string(mask)); return 1; @@ -904,8 +904,8 @@ int __ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask, int touch) if (!__cap_is_valid(cap)) continue; if ((cap->issued & mask) == mask) { - dout("__ceph_caps_issued_mask %p cap %p issued %s" - " (mask %s)\n", &ci->vfs_inode, cap, + dout("__ceph_caps_issued_mask ino 0x%lx cap %p issued %s" + " (mask %s)\n", ci->vfs_inode.i_ino, cap, ceph_cap_string(cap->issued), ceph_cap_string(mask)); if (touch) @@ -916,8 +916,8 @@ int __ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask, int touch) /* does a combination of caps satisfy mask? */ have |= cap->issued; if ((have & mask) == mask) { - dout("__ceph_caps_issued_mask %p combo issued %s" - " (mask %s)\n", &ci->vfs_inode, + dout("__ceph_caps_issued_mask ino 0x%lx combo issued %s" + " (mask %s)\n", ci->vfs_inode.i_ino, ceph_cap_string(cap->issued), ceph_cap_string(mask)); if (touch) { @@ -2257,8 +2257,6 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) if (datasync) goto out; - inode_lock(inode); - dirty = try_flush_caps(inode, &flush_tid); dout("fsync dirty caps are %s\n", ceph_cap_string(dirty)); @@ -2273,7 +2271,6 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) ret = wait_event_interruptible(ci->i_cap_wq, caps_are_flushed(inode, flush_tid)); } - inode_unlock(inode); out: dout("fsync %p%s result=%d\n", inode, datasync ? " datasync" : "", ret); return ret; @@ -2528,9 +2525,14 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got, * to (when applicable), and check against max_size here as well. * Note that caller is responsible for ensuring max_size increases are * requested from the MDS. + * + * Returns 0 if caps were not able to be acquired (yet), a 1 if they were, + * or a negative error code. + * + * FIXME: how does a 0 return differ from -EAGAIN? */ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, - loff_t endoff, bool nonblock, int *got, int *err) + loff_t endoff, bool nonblock, int *got) { struct inode *inode = &ci->vfs_inode; struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; @@ -2550,8 +2552,7 @@ again: if ((file_wanted & need) != need) { dout("try_get_cap_refs need %s file_wanted %s, EBADF\n", ceph_cap_string(need), ceph_cap_string(file_wanted)); - *err = -EBADF; - ret = 1; + ret = -EBADF; goto out_unlock; } @@ -2572,10 +2573,8 @@ again: if (endoff >= 0 && endoff > (loff_t)ci->i_max_size) { dout("get_cap_refs %p endoff %llu > maxsize %llu\n", inode, endoff, ci->i_max_size); - if (endoff > ci->i_requested_max_size) { - *err = -EAGAIN; - ret = 1; - } + if (endoff > ci->i_requested_max_size) + ret = -EAGAIN; goto out_unlock; } /* @@ -2610,8 +2609,7 @@ again: * task isn't in TASK_RUNNING state */ if (nonblock) { - *err = -EAGAIN; - ret = 1; + ret = -EAGAIN; goto out_unlock; } @@ -2640,8 +2638,7 @@ again: if (session_readonly) { dout("get_cap_refs %p needed %s but mds%d readonly\n", inode, ceph_cap_string(need), ci->i_auth_cap->mds); - *err = -EROFS; - ret = 1; + ret = -EROFS; goto out_unlock; } @@ -2650,16 +2647,14 @@ again: if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { dout("get_cap_refs %p forced umount\n", inode); - *err = -EIO; - ret = 1; + ret = -EIO; goto out_unlock; } mds_wanted = __ceph_caps_mds_wanted(ci, false); if (need & ~(mds_wanted & need)) { dout("get_cap_refs %p caps were dropped" " (session killed?)\n", inode); - *err = -ESTALE; - ret = 1; + ret = -ESTALE; goto out_unlock; } if (!(file_wanted & ~mds_wanted)) @@ -2710,7 +2705,7 @@ static void check_max_size(struct inode *inode, loff_t endoff) int ceph_try_get_caps(struct ceph_inode_info *ci, int need, int want, bool nonblock, int *got) { - int ret, err = 0; + int ret; BUG_ON(need & ~CEPH_CAP_FILE_RD); BUG_ON(want & ~(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO|CEPH_CAP_FILE_SHARED)); @@ -2718,15 +2713,8 @@ int ceph_try_get_caps(struct ceph_inode_info *ci, int need, int want, if (ret < 0) return ret; - ret = try_get_cap_refs(ci, need, want, 0, nonblock, got, &err); - if (ret) { - if (err == -EAGAIN) { - ret = 0; - } else if (err < 0) { - ret = err; - } - } - return ret; + ret = try_get_cap_refs(ci, need, want, 0, nonblock, got); + return ret == -EAGAIN ? 0 : ret; } /* @@ -2737,7 +2725,7 @@ int ceph_try_get_caps(struct ceph_inode_info *ci, int need, int want, int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, loff_t endoff, int *got, struct page **pinned_page) { - int _got, ret, err = 0; + int _got, ret; ret = ceph_pool_perm_check(ci, need); if (ret < 0) @@ -2747,21 +2735,19 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, if (endoff > 0) check_max_size(&ci->vfs_inode, endoff); - err = 0; _got = 0; ret = try_get_cap_refs(ci, need, want, endoff, - false, &_got, &err); - if (ret) { - if (err == -EAGAIN) - continue; - if (err < 0) - ret = err; - } else { + false, &_got); + if (ret == -EAGAIN) { + continue; + } else if (!ret) { + int err; + DEFINE_WAIT_FUNC(wait, woken_wake_function); add_wait_queue(&ci->i_cap_wq, &wait); - while (!try_get_cap_refs(ci, need, want, endoff, - true, &_got, &err)) { + while (!(err = try_get_cap_refs(ci, need, want, endoff, + true, &_got))) { if (signal_pending(current)) { ret = -ERESTARTSYS; break; @@ -2770,19 +2756,14 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, } remove_wait_queue(&ci->i_cap_wq, &wait); - if (err == -EAGAIN) continue; - if (err < 0) - ret = err; } - if (ret < 0) { - if (err == -ESTALE) { - /* session was killed, try renew caps */ - ret = ceph_renew_caps(&ci->vfs_inode); - if (ret == 0) - continue; - } + if (ret == -ESTALE) { + /* session was killed, try renew caps */ + ret = ceph_renew_caps(&ci->vfs_inode); + if (ret == 0) + continue; return ret; } @@ -4099,7 +4080,7 @@ void ceph_put_fmode(struct ceph_inode_info *ci, int fmode) } /* - * For a soon-to-be unlinked file, drop the AUTH_RDCACHE caps. If it + * For a soon-to-be unlinked file, drop the LINK caps. If it * looks like the link count will hit 0, drop any other caps (other * than PIN) we don't specifically want (due to the file still being * open). diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c index 98365e74cb4a..b3fc5fe26a1a 100644 --- a/fs/ceph/debugfs.c +++ b/fs/ceph/debugfs.c @@ -37,7 +37,7 @@ static int mdsmap_show(struct seq_file *s, void *p) struct ceph_entity_addr *addr = &mdsmap->m_info[i].addr; int state = mdsmap->m_info[i].state; seq_printf(s, "\tmds%d\t%s\t(%s)\n", i, - ceph_pr_addr(&addr->in_addr), + ceph_pr_addr(addr), ceph_mds_state_name(state)); } return 0; @@ -88,7 +88,7 @@ static int mdsc_show(struct seq_file *s, void *p) req->r_dentry, path ? path : ""); spin_unlock(&req->r_dentry->d_lock); - kfree(path); + ceph_mdsc_free_path(path, pathlen); } else if (req->r_path1) { seq_printf(s, " #%llx/%s", req->r_ino1.ino, req->r_path1); @@ -108,7 +108,7 @@ static int mdsc_show(struct seq_file *s, void *p) req->r_old_dentry, path ? path : ""); spin_unlock(&req->r_old_dentry->d_lock); - kfree(path); + ceph_mdsc_free_path(path, pathlen); } else if (req->r_path2 && req->r_op != CEPH_MDS_OP_SYMLINK) { if (req->r_ino2.ino) seq_printf(s, " #%llx/%s", req->r_ino2.ino, @@ -124,18 +124,48 @@ static int mdsc_show(struct seq_file *s, void *p) return 0; } +static int caps_show_cb(struct inode *inode, struct ceph_cap *cap, void *p) +{ + struct seq_file *s = p; + + seq_printf(s, "0x%-17lx%-17s%-17s\n", inode->i_ino, + ceph_cap_string(cap->issued), + ceph_cap_string(cap->implemented)); + return 0; +} + static int caps_show(struct seq_file *s, void *p) { struct ceph_fs_client *fsc = s->private; - int total, avail, used, reserved, min; + struct ceph_mds_client *mdsc = fsc->mdsc; + int total, avail, used, reserved, min, i; ceph_reservation_status(fsc, &total, &avail, &used, &reserved, &min); seq_printf(s, "total\t\t%d\n" "avail\t\t%d\n" "used\t\t%d\n" "reserved\t%d\n" - "min\t%d\n", + "min\t\t%d\n\n", total, avail, used, reserved, min); + seq_printf(s, "ino issued implemented\n"); + seq_printf(s, "-----------------------------------------------\n"); + + mutex_lock(&mdsc->mutex); + for (i = 0; i < mdsc->max_sessions; i++) { + struct ceph_mds_session *session; + + session = __ceph_lookup_mds_session(mdsc, i); + if (!session) + continue; + mutex_unlock(&mdsc->mutex); + mutex_lock(&session->s_mutex); + ceph_iterate_session_caps(session, caps_show_cb, s); + mutex_unlock(&session->s_mutex); + ceph_put_mds_session(session); + mutex_lock(&mdsc->mutex); + } + mutex_unlock(&mdsc->mutex); + return 0; } diff --git a/fs/ceph/export.c b/fs/ceph/export.c index 3c59ad180ef0..d3ef7ee429ec 100644 --- a/fs/ceph/export.c +++ b/fs/ceph/export.c @@ -22,18 +22,77 @@ struct ceph_nfs_confh { u64 ino, parent_ino; } __attribute__ ((packed)); +/* + * fh for snapped inode + */ +struct ceph_nfs_snapfh { + u64 ino; + u64 snapid; + u64 parent_ino; + u32 hash; +} __attribute__ ((packed)); + +static int ceph_encode_snapfh(struct inode *inode, u32 *rawfh, int *max_len, + struct inode *parent_inode) +{ + const static int snap_handle_length = + sizeof(struct ceph_nfs_snapfh) >> 2; + struct ceph_nfs_snapfh *sfh = (void *)rawfh; + u64 snapid = ceph_snap(inode); + int ret; + bool no_parent = true; + + if (*max_len < snap_handle_length) { + *max_len = snap_handle_length; + ret = FILEID_INVALID; + goto out; + } + + ret = -EINVAL; + if (snapid != CEPH_SNAPDIR) { + struct inode *dir; + struct dentry *dentry = d_find_alias(inode); + if (!dentry) + goto out; + + rcu_read_lock(); + dir = d_inode_rcu(dentry->d_parent); + if (ceph_snap(dir) != CEPH_SNAPDIR) { + sfh->parent_ino = ceph_ino(dir); + sfh->hash = ceph_dentry_hash(dir, dentry); + no_parent = false; + } + rcu_read_unlock(); + dput(dentry); + } + + if (no_parent) { + if (!S_ISDIR(inode->i_mode)) + goto out; + sfh->parent_ino = sfh->ino; + sfh->hash = 0; + } + sfh->ino = ceph_ino(inode); + sfh->snapid = snapid; + + *max_len = snap_handle_length; + ret = FILEID_BTRFS_WITH_PARENT; +out: + dout("encode_snapfh %llx.%llx ret=%d\n", ceph_vinop(inode), ret); + return ret; +} + static int ceph_encode_fh(struct inode *inode, u32 *rawfh, int *max_len, struct inode *parent_inode) { + const static int handle_length = + sizeof(struct ceph_nfs_fh) >> 2; + const static int connected_handle_length = + sizeof(struct ceph_nfs_confh) >> 2; int type; - struct ceph_nfs_fh *fh = (void *)rawfh; - struct ceph_nfs_confh *cfh = (void *)rawfh; - int connected_handle_length = sizeof(*cfh)/4; - int handle_length = sizeof(*fh)/4; - /* don't re-export snaps */ if (ceph_snap(inode) != CEPH_NOSNAP) - return -EINVAL; + return ceph_encode_snapfh(inode, rawfh, max_len, parent_inode); if (parent_inode && (*max_len < connected_handle_length)) { *max_len = connected_handle_length; @@ -44,6 +103,7 @@ static int ceph_encode_fh(struct inode *inode, u32 *rawfh, int *max_len, } if (parent_inode) { + struct ceph_nfs_confh *cfh = (void *)rawfh; dout("encode_fh %llx with parent %llx\n", ceph_ino(inode), ceph_ino(parent_inode)); cfh->ino = ceph_ino(inode); @@ -51,6 +111,7 @@ static int ceph_encode_fh(struct inode *inode, u32 *rawfh, int *max_len, *max_len = connected_handle_length; type = FILEID_INO32_GEN_PARENT; } else { + struct ceph_nfs_fh *fh = (void *)rawfh; dout("encode_fh %llx\n", ceph_ino(inode)); fh->ino = ceph_ino(inode); *max_len = handle_length; @@ -59,7 +120,7 @@ static int ceph_encode_fh(struct inode *inode, u32 *rawfh, int *max_len, return type; } -static struct dentry *__fh_to_dentry(struct super_block *sb, u64 ino) +static struct inode *__lookup_inode(struct super_block *sb, u64 ino) { struct ceph_mds_client *mdsc = ceph_sb_to_client(sb)->mdsc; struct inode *inode; @@ -81,7 +142,7 @@ static struct dentry *__fh_to_dentry(struct super_block *sb, u64 ino) mask = CEPH_STAT_CAP_INODE; if (ceph_security_xattr_wanted(d_inode(sb->s_root))) mask |= CEPH_CAP_XATTR_SHARED; - req->r_args.getattr.mask = cpu_to_le32(mask); + req->r_args.lookupino.mask = cpu_to_le32(mask); req->r_ino1 = vino; req->r_num_caps = 1; @@ -91,16 +152,114 @@ static struct dentry *__fh_to_dentry(struct super_block *sb, u64 ino) ihold(inode); ceph_mdsc_put_request(req); if (!inode) - return ERR_PTR(-ESTALE); - if (inode->i_nlink == 0) { - iput(inode); - return ERR_PTR(-ESTALE); - } + return err < 0 ? ERR_PTR(err) : ERR_PTR(-ESTALE); } + return inode; +} + +struct inode *ceph_lookup_inode(struct super_block *sb, u64 ino) +{ + struct inode *inode = __lookup_inode(sb, ino); + if (IS_ERR(inode)) + return inode; + if (inode->i_nlink == 0) { + iput(inode); + return ERR_PTR(-ESTALE); + } + return inode; +} +static struct dentry *__fh_to_dentry(struct super_block *sb, u64 ino) +{ + struct inode *inode = __lookup_inode(sb, ino); + if (IS_ERR(inode)) + return ERR_CAST(inode); + if (inode->i_nlink == 0) { + iput(inode); + return ERR_PTR(-ESTALE); + } return d_obtain_alias(inode); } +static struct dentry *__snapfh_to_dentry(struct super_block *sb, + struct ceph_nfs_snapfh *sfh, + bool want_parent) +{ + struct ceph_mds_client *mdsc = ceph_sb_to_client(sb)->mdsc; + struct ceph_mds_request *req; + struct inode *inode; + struct ceph_vino vino; + int mask; + int err; + bool unlinked = false; + + if (want_parent) { + vino.ino = sfh->parent_ino; + if (sfh->snapid == CEPH_SNAPDIR) + vino.snap = CEPH_NOSNAP; + else if (sfh->ino == sfh->parent_ino) + vino.snap = CEPH_SNAPDIR; + else + vino.snap = sfh->snapid; + } else { + vino.ino = sfh->ino; + vino.snap = sfh->snapid; + } + inode = ceph_find_inode(sb, vino); + if (inode) + return d_obtain_alias(inode); + + req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LOOKUPINO, + USE_ANY_MDS); + if (IS_ERR(req)) + return ERR_CAST(req); + + mask = CEPH_STAT_CAP_INODE; + if (ceph_security_xattr_wanted(d_inode(sb->s_root))) + mask |= CEPH_CAP_XATTR_SHARED; + req->r_args.lookupino.mask = cpu_to_le32(mask); + if (vino.snap < CEPH_NOSNAP) { + req->r_args.lookupino.snapid = cpu_to_le64(vino.snap); + if (!want_parent && sfh->ino != sfh->parent_ino) { + req->r_args.lookupino.parent = + cpu_to_le64(sfh->parent_ino); + req->r_args.lookupino.hash = + cpu_to_le32(sfh->hash); + } + } + + req->r_ino1 = vino; + req->r_num_caps = 1; + err = ceph_mdsc_do_request(mdsc, NULL, req); + inode = req->r_target_inode; + if (inode) { + if (vino.snap == CEPH_SNAPDIR) { + if (inode->i_nlink == 0) + unlinked = true; + inode = ceph_get_snapdir(inode); + } else if (ceph_snap(inode) == vino.snap) { + ihold(inode); + } else { + /* mds does not support lookup snapped inode */ + err = -EOPNOTSUPP; + inode = NULL; + } + } + ceph_mdsc_put_request(req); + + if (want_parent) { + dout("snapfh_to_parent %llx.%llx\n err=%d\n", + vino.ino, vino.snap, err); + } else { + dout("snapfh_to_dentry %llx.%llx parent %llx hash %x err=%d", + vino.ino, vino.snap, sfh->parent_ino, sfh->hash, err); + } + if (!inode) + return ERR_PTR(-ESTALE); + /* see comments in ceph_get_parent() */ + return unlinked ? d_obtain_root(inode) : d_obtain_alias(inode); +} + /* * convert regular fh to dentry */ @@ -110,6 +269,11 @@ static struct dentry *ceph_fh_to_dentry(struct super_block *sb, { struct ceph_nfs_fh *fh = (void *)fid->raw; + if (fh_type == FILEID_BTRFS_WITH_PARENT) { + struct ceph_nfs_snapfh *sfh = (void *)fid->raw; + return __snapfh_to_dentry(sb, sfh, false); + } + if (fh_type != FILEID_INO32_GEN && fh_type != FILEID_INO32_GEN_PARENT) return NULL; @@ -163,13 +327,49 @@ static struct dentry *__get_parent(struct super_block *sb, static struct dentry *ceph_get_parent(struct dentry *child) { - /* don't re-export snaps */ - if (ceph_snap(d_inode(child)) != CEPH_NOSNAP) - return ERR_PTR(-EINVAL); - - dout("get_parent %p ino %llx.%llx\n", - child, ceph_vinop(d_inode(child))); - return __get_parent(child->d_sb, child, 0); + struct inode *inode = d_inode(child); + struct dentry *dn; + + if (ceph_snap(inode) != CEPH_NOSNAP) { + struct inode* dir; + bool unlinked = false; + /* do not support non-directory */ + if (!d_is_dir(child)) { + dn = ERR_PTR(-EINVAL); + goto out; + } + dir = __lookup_inode(inode->i_sb, ceph_ino(inode)); + if (IS_ERR(dir)) { + dn = ERR_CAST(dir); + goto out; + } + /* There can be multiple paths to access snapped inode. + * For simplicity, treat snapdir of head inode as parent */ + if (ceph_snap(inode) != CEPH_SNAPDIR) { + struct inode *snapdir = ceph_get_snapdir(dir); + if (dir->i_nlink == 0) + unlinked = true; + iput(dir); + if (IS_ERR(snapdir)) { + dn = ERR_CAST(snapdir); + goto out; + } + dir = snapdir; + } + /* If directory has already been deleted, futher get_parent + * will fail. Do not mark snapdir dentry as disconnected, + * this prevent exportfs from doing futher get_parent. */ + if (unlinked) + dn = d_obtain_root(dir); + else + dn = d_obtain_alias(dir); + } else { + dn = __get_parent(child->d_sb, child, 0); + } +out: + dout("get_parent %p ino %llx.%llx err=%ld\n", + child, ceph_vinop(inode), (IS_ERR(dn) ? PTR_ERR(dn) : 0)); + return dn; } /* @@ -182,6 +382,11 @@ static struct dentry *ceph_fh_to_parent(struct super_block *sb, struct ceph_nfs_confh *cfh = (void *)fid->raw; struct dentry *dentry; + if (fh_type == FILEID_BTRFS_WITH_PARENT) { + struct ceph_nfs_snapfh *sfh = (void *)fid->raw; + return __snapfh_to_dentry(sb, sfh, true); + } + if (fh_type != FILEID_INO32_GEN_PARENT) return NULL; if (fh_len < sizeof(*cfh) / 4) @@ -194,14 +399,115 @@ static struct dentry *ceph_fh_to_parent(struct super_block *sb, return dentry; } +static int __get_snap_name(struct dentry *parent, char *name, + struct dentry *child) +{ + struct inode *inode = d_inode(child); + struct inode *dir = d_inode(parent); + struct ceph_fs_client *fsc = ceph_inode_to_client(inode); + struct ceph_mds_request *req = NULL; + char *last_name = NULL; + unsigned next_offset = 2; + int err = -EINVAL; + + if (ceph_ino(inode) != ceph_ino(dir)) + goto out; + if (ceph_snap(inode) == CEPH_SNAPDIR) { + if (ceph_snap(dir) == CEPH_NOSNAP) { + strcpy(name, fsc->mount_options->snapdir_name); + err = 0; + } + goto out; + } + if (ceph_snap(dir) != CEPH_SNAPDIR) + goto out; + + while (1) { + struct ceph_mds_reply_info_parsed *rinfo; + struct ceph_mds_reply_dir_entry *rde; + int i; + + req = ceph_mdsc_create_request(fsc->mdsc, CEPH_MDS_OP_LSSNAP, + USE_AUTH_MDS); + if (IS_ERR(req)) { + err = PTR_ERR(req); + req = NULL; + goto out; + } + err = ceph_alloc_readdir_reply_buffer(req, inode); + if (err) + goto out; + + req->r_direct_mode = USE_AUTH_MDS; + req->r_readdir_offset = next_offset; + req->r_args.readdir.flags = + cpu_to_le16(CEPH_READDIR_REPLY_BITFLAGS); + if (last_name) { + req->r_path2 = last_name; + last_name = NULL; + } + + req->r_inode = dir; + ihold(dir); + req->r_dentry = dget(parent); + + inode_lock(dir); + err = ceph_mdsc_do_request(fsc->mdsc, NULL, req); + inode_unlock(dir); + + if (err < 0) + goto out; + + rinfo = &req->r_reply_info; + for (i = 0; i < rinfo->dir_nr; i++) { + rde = rinfo->dir_entries + i; + BUG_ON(!rde->inode.in); + if (ceph_snap(inode) == + le64_to_cpu(rde->inode.in->snapid)) { + memcpy(name, rde->name, rde->name_len); + name[rde->name_len] = '\0'; + err = 0; + goto out; + } + } + + if (rinfo->dir_end) + break; + + BUG_ON(rinfo->dir_nr <= 0); + rde = rinfo->dir_entries + (rinfo->dir_nr - 1); + next_offset += rinfo->dir_nr; + last_name = kstrndup(rde->name, rde->name_len, GFP_KERNEL); + if (!last_name) { + err = -ENOMEM; + goto out; + } + + ceph_mdsc_put_request(req); + req = NULL; + } + err = -ENOENT; +out: + if (req) + ceph_mdsc_put_request(req); + kfree(last_name); + dout("get_snap_name %p ino %llx.%llx err=%d\n", + child, ceph_vinop(inode), err); + return err; +} + static int ceph_get_name(struct dentry *parent, char *name, struct dentry *child) { struct ceph_mds_client *mdsc; struct ceph_mds_request *req; + struct inode *inode = d_inode(child); int err; - mdsc = ceph_inode_to_client(d_inode(child))->mdsc; + if (ceph_snap(inode) != CEPH_NOSNAP) + return __get_snap_name(parent, name, child); + + mdsc = ceph_inode_to_client(inode)->mdsc; req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LOOKUPNAME, USE_ANY_MDS); if (IS_ERR(req)) @@ -209,8 +515,8 @@ static int ceph_get_name(struct dentry *parent, char *name, inode_lock(d_inode(parent)); - req->r_inode = d_inode(child); - ihold(d_inode(child)); + req->r_inode = inode; + ihold(inode); req->r_ino2 = ceph_vino(d_inode(parent)); req->r_parent = d_inode(parent); set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); @@ -224,10 +530,10 @@ static int ceph_get_name(struct dentry *parent, char *name, memcpy(name, rinfo->dname, rinfo->dname_len); name[rinfo->dname_len] = 0; dout("get_name %p ino %llx.%llx name %s\n", - child, ceph_vinop(d_inode(child)), name); + child, ceph_vinop(inode), name); } else { dout("get_name %p ino %llx.%llx err %d\n", - child, ceph_vinop(d_inode(child)), err); + child, ceph_vinop(inode), err); } ceph_mdsc_put_request(req); diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 84725b53ac21..305daf043eb0 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -929,7 +929,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, dout("sync_direct_%s on file %p %lld~%u snapc %p seq %lld\n", (write ? "write" : "read"), file, pos, (unsigned)count, - snapc, snapc->seq); + snapc, snapc ? snapc->seq : 0); ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count - 1); diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 35dae6d5493a..f85355bf49c4 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -2266,43 +2266,72 @@ int ceph_permission(struct inode *inode, int mask) return err; } +/* Craft a mask of needed caps given a set of requested statx attrs. */ +static int statx_to_caps(u32 want) +{ + int mask = 0; + + if (want & (STATX_MODE|STATX_UID|STATX_GID|STATX_CTIME)) + mask |= CEPH_CAP_AUTH_SHARED; + + if (want & (STATX_NLINK|STATX_CTIME)) + mask |= CEPH_CAP_LINK_SHARED; + + if (want & (STATX_ATIME|STATX_MTIME|STATX_CTIME|STATX_SIZE| + STATX_BLOCKS)) + mask |= CEPH_CAP_FILE_SHARED; + + if (want & (STATX_CTIME)) + mask |= CEPH_CAP_XATTR_SHARED; + + return mask; +} + /* - * Get all attributes. Hopefully somedata we'll have a statlite() - * and can limit the fields we require to be accurate. + * Get all the attributes. If we have sufficient caps for the requested attrs, + * then we can avoid talking to the MDS at all. */ int ceph_getattr(const struct path *path, struct kstat *stat, u32 request_mask, unsigned int flags) { struct inode *inode = d_inode(path->dentry); struct ceph_inode_info *ci = ceph_inode(inode); - int err; + int err = 0; - err = ceph_do_getattr(inode, CEPH_STAT_CAP_INODE_ALL, false); - if (!err) { - generic_fillattr(inode, stat); - stat->ino = ceph_translate_ino(inode->i_sb, inode->i_ino); - if (ceph_snap(inode) == CEPH_NOSNAP) - stat->dev = inode->i_sb->s_dev; + /* Skip the getattr altogether if we're asked not to sync */ + if (!(flags & AT_STATX_DONT_SYNC)) { + err = ceph_do_getattr(inode, statx_to_caps(request_mask), + flags & AT_STATX_FORCE_SYNC); + if (err) + return err; + } + + generic_fillattr(inode, stat); + stat->ino = ceph_translate_ino(inode->i_sb, inode->i_ino); + if (ceph_snap(inode) == CEPH_NOSNAP) + stat->dev = inode->i_sb->s_dev; + else + stat->dev = ci->i_snapid_map ? ci->i_snapid_map->dev : 0; + + if (S_ISDIR(inode->i_mode)) { + if (ceph_test_mount_opt(ceph_sb_to_client(inode->i_sb), + RBYTES)) + stat->size = ci->i_rbytes; else - stat->dev = ci->i_snapid_map ? ci->i_snapid_map->dev : 0; - - if (S_ISDIR(inode->i_mode)) { - if (ceph_test_mount_opt(ceph_sb_to_client(inode->i_sb), - RBYTES)) - stat->size = ci->i_rbytes; - else - stat->size = ci->i_files + ci->i_subdirs; - stat->blocks = 0; - stat->blksize = 65536; - /* - * Some applications rely on the number of st_nlink - * value on directories to be either 0 (if unlinked) - * or 2 + number of subdirectories. - */ - if (stat->nlink == 1) - /* '.' + '..' + subdirs */ - stat->nlink = 1 + 1 + ci->i_subdirs; - } + stat->size = ci->i_files + ci->i_subdirs; + stat->blocks = 0; + stat->blksize = 65536; + /* + * Some applications rely on the number of st_nlink + * value on directories to be either 0 (if unlinked) + * or 2 + number of subdirectories. + */ + if (stat->nlink == 1) + /* '.' + '..' + subdirs */ + stat->nlink = 1 + 1 + ci->i_subdirs; } + + /* Mask off any higher bits (e.g. btime) until we have support */ + stat->result_mask = request_mask & STATX_BASIC_STATS; return err; } diff --git a/fs/ceph/locks.c b/fs/ceph/locks.c index 9dae2ec7e1fa..ac9b53b89365 100644 --- a/fs/ceph/locks.c +++ b/fs/ceph/locks.c @@ -237,15 +237,6 @@ int ceph_lock(struct file *file, int cmd, struct file_lock *fl) spin_lock(&ci->i_ceph_lock); if (ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) { err = -EIO; - } else if (op == CEPH_MDS_OP_SETFILELOCK) { - /* - * increasing i_filelock_ref closes race window between - * handling request reply and adding file_lock struct to - * inode. Otherwise, i_auth_cap may get trimmed in the - * window. Caller function will decrease the counter. - */ - fl->fl_ops = &ceph_fl_lock_ops; - atomic_inc(&ci->i_filelock_ref); } spin_unlock(&ci->i_ceph_lock); if (err < 0) { @@ -299,10 +290,6 @@ int ceph_flock(struct file *file, int cmd, struct file_lock *fl) spin_lock(&ci->i_ceph_lock); if (ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) { err = -EIO; - } else { - /* see comment in ceph_lock */ - fl->fl_ops = &ceph_fl_lock_ops; - atomic_inc(&ci->i_filelock_ref); } spin_unlock(&ci->i_ceph_lock); if (err < 0) { diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 9049c2a3e972..959b1bf7c327 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -550,15 +550,9 @@ void ceph_put_mds_session(struct ceph_mds_session *s) struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc, int mds) { - struct ceph_mds_session *session; - if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) return NULL; - session = mdsc->sessions[mds]; - dout("lookup_mds_session %p %d\n", session, - refcount_read(&session->s_ref)); - get_session(session); - return session; + return get_session(mdsc->sessions[mds]); } static bool __have_session(struct ceph_mds_client *mdsc, int mds) @@ -1284,9 +1278,9 @@ static void cleanup_session_requests(struct ceph_mds_client *mdsc, * * Caller must hold session s_mutex. */ -static int iterate_session_caps(struct ceph_mds_session *session, - int (*cb)(struct inode *, struct ceph_cap *, - void *), void *arg) +int ceph_iterate_session_caps(struct ceph_mds_session *session, + int (*cb)(struct inode *, struct ceph_cap *, + void *), void *arg) { struct list_head *p; struct ceph_cap *cap; @@ -1451,7 +1445,7 @@ static void remove_session_caps(struct ceph_mds_session *session) LIST_HEAD(dispose); dout("remove_session_caps on %p\n", session); - iterate_session_caps(session, remove_session_caps_cb, fsc); + ceph_iterate_session_caps(session, remove_session_caps_cb, fsc); wake_up_all(&fsc->mdsc->cap_flushing_wq); @@ -1534,8 +1528,8 @@ static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap, static void wake_up_session_caps(struct ceph_mds_session *session, int ev) { dout("wake_up_session_caps %p mds%d\n", session, session->s_mds); - iterate_session_caps(session, wake_up_session_cb, - (void *)(unsigned long)ev); + ceph_iterate_session_caps(session, wake_up_session_cb, + (void *)(unsigned long)ev); } /* @@ -1768,7 +1762,7 @@ int ceph_trim_caps(struct ceph_mds_client *mdsc, session->s_mds, session->s_nr_caps, max_caps, trim_caps); if (trim_caps > 0) { session->s_trim_caps = trim_caps; - iterate_session_caps(session, trim_caps_cb, session); + ceph_iterate_session_caps(session, trim_caps_cb, session); dout("trim_caps mds%d done: %d / %d, trimmed %d\n", session->s_mds, session->s_nr_caps, max_caps, trim_caps - session->s_trim_caps); @@ -1861,7 +1855,8 @@ again: num_cap_releases--; head = msg->front.iov_base; - le32_add_cpu(&head->num, 1); + put_unaligned_le32(get_unaligned_le32(&head->num) + 1, + &head->num); item = msg->front.iov_base + msg->front.iov_len; item->ino = cpu_to_le64(cap->cap_ino); item->cap_id = cpu_to_le64(cap->cap_id); @@ -2089,43 +2084,29 @@ static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc) * Encode hidden .snap dirs as a double /, i.e. * foo/.snap/bar -> foo//bar */ -char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base, +char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase, int stop_on_nosnap) { struct dentry *temp; char *path; - int len, pos; + int pos; unsigned seq; + u64 base; if (!dentry) return ERR_PTR(-EINVAL); -retry: - len = 0; - seq = read_seqbegin(&rename_lock); - rcu_read_lock(); - for (temp = dentry; !IS_ROOT(temp);) { - struct inode *inode = d_inode(temp); - if (inode && ceph_snap(inode) == CEPH_SNAPDIR) - len++; /* slash only */ - else if (stop_on_nosnap && inode && - ceph_snap(inode) == CEPH_NOSNAP) - break; - else - len += 1 + temp->d_name.len; - temp = temp->d_parent; - } - rcu_read_unlock(); - if (len) - len--; /* no leading '/' */ - - path = kmalloc(len+1, GFP_NOFS); + path = __getname(); if (!path) return ERR_PTR(-ENOMEM); - pos = len; - path[pos] = 0; /* trailing null */ +retry: + pos = PATH_MAX - 1; + path[pos] = '\0'; + + seq = read_seqbegin(&rename_lock); rcu_read_lock(); - for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) { + temp = dentry; + for (;;) { struct inode *inode; spin_lock(&temp->d_lock); @@ -2143,83 +2124,54 @@ retry: spin_unlock(&temp->d_lock); break; } - strncpy(path + pos, temp->d_name.name, - temp->d_name.len); + memcpy(path + pos, temp->d_name.name, temp->d_name.len); } spin_unlock(&temp->d_lock); - if (pos) - path[--pos] = '/'; temp = temp->d_parent; + + /* Are we at the root? */ + if (IS_ROOT(temp)) + break; + + /* Are we out of buffer? */ + if (--pos < 0) + break; + + path[pos] = '/'; } + base = ceph_ino(d_inode(temp)); rcu_read_unlock(); - if (pos != 0 || read_seqretry(&rename_lock, seq)) { + if (pos < 0 || read_seqretry(&rename_lock, seq)) { pr_err("build_path did not end path lookup where " - "expected, namelen is %d, pos is %d\n", len, pos); + "expected, pos is %d\n", pos); /* presumably this is only possible if racing with a rename of one of the parent directories (we can not lock the dentries above us to prevent this, but retrying should be harmless) */ - kfree(path); goto retry; } - *base = ceph_ino(d_inode(temp)); - *plen = len; + *pbase = base; + *plen = PATH_MAX - 1 - pos; dout("build_path on %p %d built %llx '%.*s'\n", - dentry, d_count(dentry), *base, len, path); - return path; -} - -/* Duplicate the dentry->d_name.name safely */ -static int clone_dentry_name(struct dentry *dentry, const char **ppath, - int *ppathlen) -{ - u32 len; - char *name; - -retry: - len = READ_ONCE(dentry->d_name.len); - name = kmalloc(len + 1, GFP_NOFS); - if (!name) - return -ENOMEM; - - spin_lock(&dentry->d_lock); - if (dentry->d_name.len != len) { - spin_unlock(&dentry->d_lock); - kfree(name); - goto retry; - } - memcpy(name, dentry->d_name.name, len); - spin_unlock(&dentry->d_lock); - - name[len] = '\0'; - *ppath = name; - *ppathlen = len; - return 0; + dentry, d_count(dentry), base, *plen, path + pos); + return path + pos; } static int build_dentry_path(struct dentry *dentry, struct inode *dir, const char **ppath, int *ppathlen, u64 *pino, bool *pfreepath, bool parent_locked) { - int ret; char *path; rcu_read_lock(); if (!dir) dir = d_inode_rcu(dentry->d_parent); - if (dir && ceph_snap(dir) == CEPH_NOSNAP) { + if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) { *pino = ceph_ino(dir); rcu_read_unlock(); - if (parent_locked) { - *ppath = dentry->d_name.name; - *ppathlen = dentry->d_name.len; - } else { - ret = clone_dentry_name(dentry, ppath, ppathlen); - if (ret) - return ret; - *pfreepath = true; - } + *ppath = dentry->d_name.name; + *ppathlen = dentry->d_name.len; return 0; } rcu_read_unlock(); @@ -2331,9 +2283,9 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, (!!req->r_inode_drop + !!req->r_dentry_drop + !!req->r_old_inode_drop + !!req->r_old_dentry_drop); if (req->r_dentry_drop) - len += req->r_dentry->d_name.len; + len += pathlen1; if (req->r_old_dentry_drop) - len += req->r_old_dentry->d_name.len; + len += pathlen2; msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false); if (!msg) { @@ -2410,10 +2362,10 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, out_free2: if (freepath2) - kfree((char *)path2); + ceph_mdsc_free_path((char *)path2, pathlen2); out_free1: if (freepath1) - kfree((char *)path1); + ceph_mdsc_free_path((char *)path1, pathlen1); out: return msg; } @@ -2427,8 +2379,7 @@ static void complete_request(struct ceph_mds_client *mdsc, { if (req->r_callback) req->r_callback(mdsc, req); - else - complete_all(&req->r_completion); + complete_all(&req->r_completion); } /* @@ -2670,28 +2621,11 @@ static void kick_requests(struct ceph_mds_client *mdsc, int mds) } } -void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, +int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir, struct ceph_mds_request *req) { - dout("submit_request on %p\n", req); - mutex_lock(&mdsc->mutex); - __register_request(mdsc, req, NULL); - __do_request(mdsc, req); - mutex_unlock(&mdsc->mutex); -} - -/* - * Synchrously perform an mds request. Take care of all of the - * session setup, forwarding, retry details. - */ -int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, - struct inode *dir, - struct ceph_mds_request *req) -{ int err; - dout("do_request on %p\n", req); - /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */ if (req->r_inode) ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); @@ -2701,18 +2635,21 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), CEPH_CAP_PIN); - /* issue */ + dout("submit_request on %p for inode %p\n", req, dir); mutex_lock(&mdsc->mutex); __register_request(mdsc, req, dir); __do_request(mdsc, req); + err = req->r_err; + mutex_unlock(&mdsc->mutex); + return err; +} - if (req->r_err) { - err = req->r_err; - goto out; - } +static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc, + struct ceph_mds_request *req) +{ + int err; /* wait */ - mutex_unlock(&mdsc->mutex); dout("do_request waiting\n"); if (!req->r_timeout && req->r_wait_for_completion) { err = req->r_wait_for_completion(mdsc, req); @@ -2753,8 +2690,26 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, err = req->r_err; } -out: mutex_unlock(&mdsc->mutex); + return err; +} + +/* + * Synchrously perform an mds request. Take care of all of the + * session setup, forwarding, retry details. + */ +int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, + struct inode *dir, + struct ceph_mds_request *req) +{ + int err; + + dout("do_request on %p\n", req); + + /* issue */ + err = ceph_mdsc_submit_request(mdsc, dir, req); + if (!err) + err = ceph_mdsc_wait_request(mdsc, req); dout("do_request %p done, result %d\n", req, err); return err; } @@ -3485,7 +3440,7 @@ out_freeflocks: ceph_pagelist_encode_string(pagelist, path, pathlen); ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1)); out_freepath: - kfree(path); + ceph_mdsc_free_path(path, pathlen); } out_err: @@ -3642,7 +3597,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, recon_state.msg_version = 2; } /* trsaverse this session's caps */ - err = iterate_session_caps(session, encode_caps_cb, &recon_state); + err = ceph_iterate_session_caps(session, encode_caps_cb, &recon_state); spin_lock(&session->s_cap_lock); session->s_cap_reconnect = 0; @@ -4125,6 +4080,8 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) mdsc->max_sessions = 0; mdsc->stopping = 0; atomic64_set(&mdsc->quotarealms_count, 0); + mdsc->quotarealms_inodes = RB_ROOT; + mutex_init(&mdsc->quotarealms_inodes_mutex); mdsc->last_snap_seq = 0; init_rwsem(&mdsc->snap_rwsem); mdsc->snap_realms = RB_ROOT; @@ -4216,6 +4173,8 @@ void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc) * their inode/dcache refs */ ceph_msgr_flush(); + + ceph_cleanup_quotarealms_inodes(mdsc); } /* diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 50385a481fdb..a83f28bc2387 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -326,6 +326,18 @@ struct ceph_snapid_map { }; /* + * node for list of quotarealm inodes that are not visible from the filesystem + * mountpoint, but required to handle, e.g. quotas. + */ +struct ceph_quotarealm_inode { + struct rb_node node; + u64 ino; + unsigned long timeout; /* last time a lookup failed for this inode */ + struct mutex mutex; + struct inode *inode; +}; + +/* * mds client state */ struct ceph_mds_client { @@ -344,6 +356,12 @@ struct ceph_mds_client { int stopping; /* true if shutting down */ atomic64_t quotarealms_count; /* # realms with quota */ + /* + * We keep a list of inodes we don't see in the mountpoint but that we + * need to track quota realms. + */ + struct rb_root quotarealms_inodes; + struct mutex quotarealms_inodes_mutex; /* * snap_rwsem will cover cap linkage into snaprealms, and @@ -447,8 +465,9 @@ extern int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req, struct inode *dir); extern struct ceph_mds_request * ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode); -extern void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, - struct ceph_mds_request *req); +extern int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, + struct inode *dir, + struct ceph_mds_request *req); extern int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, struct inode *dir, struct ceph_mds_request *req); @@ -468,8 +487,18 @@ extern void ceph_flush_cap_releases(struct ceph_mds_client *mdsc, struct ceph_mds_session *session); extern void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc); extern void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr); +extern int ceph_iterate_session_caps(struct ceph_mds_session *session, + int (*cb)(struct inode *, + struct ceph_cap *, void *), + void *arg); extern void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc); +static inline void ceph_mdsc_free_path(char *path, int len) +{ + if (path) + __putname(path - (PATH_MAX - 1 - len)); +} + extern char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base, int stop_on_nosnap); diff --git a/fs/ceph/mdsmap.c b/fs/ceph/mdsmap.c index 1a2c5d390f7f..701b4fb0fb5a 100644 --- a/fs/ceph/mdsmap.c +++ b/fs/ceph/mdsmap.c @@ -205,7 +205,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end) dout("mdsmap_decode %d/%d %lld mds%d.%d %s %s\n", i+1, n, global_id, mds, inc, - ceph_pr_addr(&addr.in_addr), + ceph_pr_addr(&addr), ceph_mds_state_name(state)); if (mds < 0 || state <= 0) diff --git a/fs/ceph/quota.c b/fs/ceph/quota.c index 9455d3aef0c3..c4522212872c 100644 --- a/fs/ceph/quota.c +++ b/fs/ceph/quota.c @@ -22,7 +22,16 @@ void ceph_adjust_quota_realms_count(struct inode *inode, bool inc) static inline bool ceph_has_realms_with_quotas(struct inode *inode) { struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; - return atomic64_read(&mdsc->quotarealms_count) > 0; + struct super_block *sb = mdsc->fsc->sb; + + if (atomic64_read(&mdsc->quotarealms_count) > 0) + return true; + /* if root is the real CephFS root, we don't have quota realms */ + if (sb->s_root->d_inode && + (sb->s_root->d_inode->i_ino == CEPH_INO_ROOT)) + return false; + /* otherwise, we can't know for sure */ + return true; } void ceph_handle_quota(struct ceph_mds_client *mdsc, @@ -68,6 +77,108 @@ void ceph_handle_quota(struct ceph_mds_client *mdsc, iput(inode); } +static struct ceph_quotarealm_inode * +find_quotarealm_inode(struct ceph_mds_client *mdsc, u64 ino) +{ + struct ceph_quotarealm_inode *qri = NULL; + struct rb_node **node, *parent = NULL; + + mutex_lock(&mdsc->quotarealms_inodes_mutex); + node = &(mdsc->quotarealms_inodes.rb_node); + while (*node) { + parent = *node; + qri = container_of(*node, struct ceph_quotarealm_inode, node); + + if (ino < qri->ino) + node = &((*node)->rb_left); + else if (ino > qri->ino) + node = &((*node)->rb_right); + else + break; + } + if (!qri || (qri->ino != ino)) { + /* Not found, create a new one and insert it */ + qri = kmalloc(sizeof(*qri), GFP_KERNEL); + if (qri) { + qri->ino = ino; + qri->inode = NULL; + qri->timeout = 0; + mutex_init(&qri->mutex); + rb_link_node(&qri->node, parent, node); + rb_insert_color(&qri->node, &mdsc->quotarealms_inodes); + } else + pr_warn("Failed to alloc quotarealms_inode\n"); + } + mutex_unlock(&mdsc->quotarealms_inodes_mutex); + + return qri; +} + +/* + * This function will try to lookup a realm inode which isn't visible in the + * filesystem mountpoint. A list of these kind of inodes (not visible) is + * maintained in the mdsc and freed only when the filesystem is umounted. + * + * Note that these inodes are kept in this list even if the lookup fails, which + * allows to prevent useless lookup requests. + */ +static struct inode *lookup_quotarealm_inode(struct ceph_mds_client *mdsc, + struct super_block *sb, + struct ceph_snap_realm *realm) +{ + struct ceph_quotarealm_inode *qri; + struct inode *in; + + qri = find_quotarealm_inode(mdsc, realm->ino); + if (!qri) + return NULL; + + mutex_lock(&qri->mutex); + if (qri->inode) { + /* A request has already returned the inode */ + mutex_unlock(&qri->mutex); + return qri->inode; + } + /* Check if this inode lookup has failed recently */ + if (qri->timeout && + time_before_eq(jiffies, qri->timeout)) { + mutex_unlock(&qri->mutex); + return NULL; + } + in = ceph_lookup_inode(sb, realm->ino); + if (IS_ERR(in)) { + pr_warn("Can't lookup inode %llx (err: %ld)\n", + realm->ino, PTR_ERR(in)); + qri->timeout = jiffies + msecs_to_jiffies(60 * 1000); /* XXX */ + } else { + qri->timeout = 0; + qri->inode = in; + } + mutex_unlock(&qri->mutex); + + return in; +} + +void ceph_cleanup_quotarealms_inodes(struct ceph_mds_client *mdsc) +{ + struct ceph_quotarealm_inode *qri; + struct rb_node *node; + + /* + * It should now be safe to clean quotarealms_inode tree without holding + * mdsc->quotarealms_inodes_mutex... + */ + mutex_lock(&mdsc->quotarealms_inodes_mutex); + while (!RB_EMPTY_ROOT(&mdsc->quotarealms_inodes)) { + node = rb_first(&mdsc->quotarealms_inodes); + qri = rb_entry(node, struct ceph_quotarealm_inode, node); + rb_erase(node, &mdsc->quotarealms_inodes); + iput(qri->inode); + kfree(qri); + } + mutex_unlock(&mdsc->quotarealms_inodes_mutex); +} + /* * This function walks through the snaprealm for an inode and returns the * ceph_snap_realm for the first snaprealm that has quotas set (either max_files @@ -76,9 +187,15 @@ void ceph_handle_quota(struct ceph_mds_client *mdsc, * * Note that the caller is responsible for calling ceph_put_snap_realm() on the * returned realm. + * + * Callers of this function need to hold mdsc->snap_rwsem. However, if there's + * a need to do an inode lookup, this rwsem will be temporarily dropped. Hence + * the 'retry' argument: if rwsem needs to be dropped and 'retry' is 'false' + * this function will return -EAGAIN; otherwise, the snaprealms walk-through + * will be restarted. */ static struct ceph_snap_realm *get_quota_realm(struct ceph_mds_client *mdsc, - struct inode *inode) + struct inode *inode, bool retry) { struct ceph_inode_info *ci = NULL; struct ceph_snap_realm *realm, *next; @@ -88,6 +205,7 @@ static struct ceph_snap_realm *get_quota_realm(struct ceph_mds_client *mdsc, if (ceph_snap(inode) != CEPH_NOSNAP) return NULL; +restart: realm = ceph_inode(inode)->i_snap_realm; if (realm) ceph_get_snap_realm(mdsc, realm); @@ -95,11 +213,25 @@ static struct ceph_snap_realm *get_quota_realm(struct ceph_mds_client *mdsc, pr_err_ratelimited("get_quota_realm: ino (%llx.%llx) " "null i_snap_realm\n", ceph_vinop(inode)); while (realm) { + bool has_inode; + spin_lock(&realm->inodes_with_caps_lock); - in = realm->inode ? igrab(realm->inode) : NULL; + has_inode = realm->inode; + in = has_inode ? igrab(realm->inode) : NULL; spin_unlock(&realm->inodes_with_caps_lock); - if (!in) + if (has_inode && !in) break; + if (!in) { + up_read(&mdsc->snap_rwsem); + in = lookup_quotarealm_inode(mdsc, inode->i_sb, realm); + down_read(&mdsc->snap_rwsem); + if (IS_ERR_OR_NULL(in)) + break; + ceph_put_snap_realm(mdsc, realm); + if (!retry) + return ERR_PTR(-EAGAIN); + goto restart; + } ci = ceph_inode(in); has_quota = __ceph_has_any_quota(ci); @@ -125,9 +257,22 @@ bool ceph_quota_is_same_realm(struct inode *old, struct inode *new) struct ceph_snap_realm *old_realm, *new_realm; bool is_same; +restart: + /* + * We need to lookup 2 quota realms atomically, i.e. with snap_rwsem. + * However, get_quota_realm may drop it temporarily. By setting the + * 'retry' parameter to 'false', we'll get -EAGAIN if the rwsem was + * dropped and we can then restart the whole operation. + */ down_read(&mdsc->snap_rwsem); - old_realm = get_quota_realm(mdsc, old); - new_realm = get_quota_realm(mdsc, new); + old_realm = get_quota_realm(mdsc, old, true); + new_realm = get_quota_realm(mdsc, new, false); + if (PTR_ERR(new_realm) == -EAGAIN) { + up_read(&mdsc->snap_rwsem); + if (old_realm) + ceph_put_snap_realm(mdsc, old_realm); + goto restart; + } is_same = (old_realm == new_realm); up_read(&mdsc->snap_rwsem); @@ -166,6 +311,7 @@ static bool check_quota_exceeded(struct inode *inode, enum quota_check_op op, return false; down_read(&mdsc->snap_rwsem); +restart: realm = ceph_inode(inode)->i_snap_realm; if (realm) ceph_get_snap_realm(mdsc, realm); @@ -173,12 +319,23 @@ static bool check_quota_exceeded(struct inode *inode, enum quota_check_op op, pr_err_ratelimited("check_quota_exceeded: ino (%llx.%llx) " "null i_snap_realm\n", ceph_vinop(inode)); while (realm) { + bool has_inode; + spin_lock(&realm->inodes_with_caps_lock); - in = realm->inode ? igrab(realm->inode) : NULL; + has_inode = realm->inode; + in = has_inode ? igrab(realm->inode) : NULL; spin_unlock(&realm->inodes_with_caps_lock); - if (!in) + if (has_inode && !in) break; - + if (!in) { + up_read(&mdsc->snap_rwsem); + in = lookup_quotarealm_inode(mdsc, inode->i_sb, realm); + down_read(&mdsc->snap_rwsem); + if (IS_ERR_OR_NULL(in)) + break; + ceph_put_snap_realm(mdsc, realm); + goto restart; + } ci = ceph_inode(in); spin_lock(&ci->i_ceph_lock); if (op == QUOTA_CHECK_MAX_FILES_OP) { @@ -314,7 +471,7 @@ bool ceph_quota_update_statfs(struct ceph_fs_client *fsc, struct kstatfs *buf) bool is_updated = false; down_read(&mdsc->snap_rwsem); - realm = get_quota_realm(mdsc, d_inode(fsc->sb->s_root)); + realm = get_quota_realm(mdsc, d_inode(fsc->sb->s_root), true); up_read(&mdsc->snap_rwsem); if (!realm) return false; diff --git a/fs/ceph/super.c b/fs/ceph/super.c index 285edda4fc3b..c864b44c8341 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -845,6 +845,12 @@ static void ceph_umount_begin(struct super_block *sb) return; } +static int ceph_remount(struct super_block *sb, int *flags, char *data) +{ + sync_filesystem(sb); + return 0; +} + static const struct super_operations ceph_super_ops = { .alloc_inode = ceph_alloc_inode, .destroy_inode = ceph_destroy_inode, @@ -853,6 +859,7 @@ static const struct super_operations ceph_super_ops = { .drop_inode = ceph_drop_inode, .sync_fs = ceph_sync_fs, .put_super = ceph_put_super, + .remount_fs = ceph_remount, .show_options = ceph_show_options, .statfs = ceph_statfs, .umount_begin = ceph_umount_begin, diff --git a/fs/ceph/super.h b/fs/ceph/super.h index c5b4a05905c0..6edab9a750f8 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -1083,6 +1083,7 @@ extern long ceph_ioctl(struct file *file, unsigned int cmd, unsigned long arg); /* export.c */ extern const struct export_operations ceph_export_ops; +struct inode *ceph_lookup_inode(struct super_block *sb, u64 ino); /* locks.c */ extern __init void ceph_flock_init(void); @@ -1133,5 +1134,6 @@ extern bool ceph_quota_is_max_bytes_approaching(struct inode *inode, loff_t newlen); extern bool ceph_quota_update_statfs(struct ceph_fs_client *fsc, struct kstatfs *buf); +extern void ceph_cleanup_quotarealms_inodes(struct ceph_mds_client *mdsc); #endif /* _FS_CEPH_SUPER_H */ |