summaryrefslogtreecommitdiffstats
path: root/fs/ceph
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ceph')
-rw-r--r--fs/ceph/caps.c93
-rw-r--r--fs/ceph/debugfs.c40
-rw-r--r--fs/ceph/export.c356
-rw-r--r--fs/ceph/file.c2
-rw-r--r--fs/ceph/inode.c85
-rw-r--r--fs/ceph/locks.c13
-rw-r--r--fs/ceph/mds_client.c205
-rw-r--r--fs/ceph/mds_client.h33
-rw-r--r--fs/ceph/mdsmap.c2
-rw-r--r--fs/ceph/quota.c177
-rw-r--r--fs/ceph/super.c7
-rw-r--r--fs/ceph/super.h2
12 files changed, 751 insertions, 264 deletions
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 36a8dc699448..72f8e1311392 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -892,8 +892,8 @@ int __ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask, int touch)
int have = ci->i_snap_caps;
if ((have & mask) == mask) {
- dout("__ceph_caps_issued_mask %p snap issued %s"
- " (mask %s)\n", &ci->vfs_inode,
+ dout("__ceph_caps_issued_mask ino 0x%lx snap issued %s"
+ " (mask %s)\n", ci->vfs_inode.i_ino,
ceph_cap_string(have),
ceph_cap_string(mask));
return 1;
@@ -904,8 +904,8 @@ int __ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask, int touch)
if (!__cap_is_valid(cap))
continue;
if ((cap->issued & mask) == mask) {
- dout("__ceph_caps_issued_mask %p cap %p issued %s"
- " (mask %s)\n", &ci->vfs_inode, cap,
+ dout("__ceph_caps_issued_mask ino 0x%lx cap %p issued %s"
+ " (mask %s)\n", ci->vfs_inode.i_ino, cap,
ceph_cap_string(cap->issued),
ceph_cap_string(mask));
if (touch)
@@ -916,8 +916,8 @@ int __ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask, int touch)
/* does a combination of caps satisfy mask? */
have |= cap->issued;
if ((have & mask) == mask) {
- dout("__ceph_caps_issued_mask %p combo issued %s"
- " (mask %s)\n", &ci->vfs_inode,
+ dout("__ceph_caps_issued_mask ino 0x%lx combo issued %s"
+ " (mask %s)\n", ci->vfs_inode.i_ino,
ceph_cap_string(cap->issued),
ceph_cap_string(mask));
if (touch) {
@@ -2257,8 +2257,6 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
if (datasync)
goto out;
- inode_lock(inode);
-
dirty = try_flush_caps(inode, &flush_tid);
dout("fsync dirty caps are %s\n", ceph_cap_string(dirty));
@@ -2273,7 +2271,6 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
ret = wait_event_interruptible(ci->i_cap_wq,
caps_are_flushed(inode, flush_tid));
}
- inode_unlock(inode);
out:
dout("fsync %p%s result=%d\n", inode, datasync ? " datasync" : "", ret);
return ret;
@@ -2528,9 +2525,14 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got,
* to (when applicable), and check against max_size here as well.
* Note that caller is responsible for ensuring max_size increases are
* requested from the MDS.
+ *
+ * Returns 0 if caps were not able to be acquired (yet), a 1 if they were,
+ * or a negative error code.
+ *
+ * FIXME: how does a 0 return differ from -EAGAIN?
*/
static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
- loff_t endoff, bool nonblock, int *got, int *err)
+ loff_t endoff, bool nonblock, int *got)
{
struct inode *inode = &ci->vfs_inode;
struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
@@ -2550,8 +2552,7 @@ again:
if ((file_wanted & need) != need) {
dout("try_get_cap_refs need %s file_wanted %s, EBADF\n",
ceph_cap_string(need), ceph_cap_string(file_wanted));
- *err = -EBADF;
- ret = 1;
+ ret = -EBADF;
goto out_unlock;
}
@@ -2572,10 +2573,8 @@ again:
if (endoff >= 0 && endoff > (loff_t)ci->i_max_size) {
dout("get_cap_refs %p endoff %llu > maxsize %llu\n",
inode, endoff, ci->i_max_size);
- if (endoff > ci->i_requested_max_size) {
- *err = -EAGAIN;
- ret = 1;
- }
+ if (endoff > ci->i_requested_max_size)
+ ret = -EAGAIN;
goto out_unlock;
}
/*
@@ -2610,8 +2609,7 @@ again:
* task isn't in TASK_RUNNING state
*/
if (nonblock) {
- *err = -EAGAIN;
- ret = 1;
+ ret = -EAGAIN;
goto out_unlock;
}
@@ -2640,8 +2638,7 @@ again:
if (session_readonly) {
dout("get_cap_refs %p needed %s but mds%d readonly\n",
inode, ceph_cap_string(need), ci->i_auth_cap->mds);
- *err = -EROFS;
- ret = 1;
+ ret = -EROFS;
goto out_unlock;
}
@@ -2650,16 +2647,14 @@ again:
if (READ_ONCE(mdsc->fsc->mount_state) ==
CEPH_MOUNT_SHUTDOWN) {
dout("get_cap_refs %p forced umount\n", inode);
- *err = -EIO;
- ret = 1;
+ ret = -EIO;
goto out_unlock;
}
mds_wanted = __ceph_caps_mds_wanted(ci, false);
if (need & ~(mds_wanted & need)) {
dout("get_cap_refs %p caps were dropped"
" (session killed?)\n", inode);
- *err = -ESTALE;
- ret = 1;
+ ret = -ESTALE;
goto out_unlock;
}
if (!(file_wanted & ~mds_wanted))
@@ -2710,7 +2705,7 @@ static void check_max_size(struct inode *inode, loff_t endoff)
int ceph_try_get_caps(struct ceph_inode_info *ci, int need, int want,
bool nonblock, int *got)
{
- int ret, err = 0;
+ int ret;
BUG_ON(need & ~CEPH_CAP_FILE_RD);
BUG_ON(want & ~(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO|CEPH_CAP_FILE_SHARED));
@@ -2718,15 +2713,8 @@ int ceph_try_get_caps(struct ceph_inode_info *ci, int need, int want,
if (ret < 0)
return ret;
- ret = try_get_cap_refs(ci, need, want, 0, nonblock, got, &err);
- if (ret) {
- if (err == -EAGAIN) {
- ret = 0;
- } else if (err < 0) {
- ret = err;
- }
- }
- return ret;
+ ret = try_get_cap_refs(ci, need, want, 0, nonblock, got);
+ return ret == -EAGAIN ? 0 : ret;
}
/*
@@ -2737,7 +2725,7 @@ int ceph_try_get_caps(struct ceph_inode_info *ci, int need, int want,
int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
loff_t endoff, int *got, struct page **pinned_page)
{
- int _got, ret, err = 0;
+ int _got, ret;
ret = ceph_pool_perm_check(ci, need);
if (ret < 0)
@@ -2747,21 +2735,19 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
if (endoff > 0)
check_max_size(&ci->vfs_inode, endoff);
- err = 0;
_got = 0;
ret = try_get_cap_refs(ci, need, want, endoff,
- false, &_got, &err);
- if (ret) {
- if (err == -EAGAIN)
- continue;
- if (err < 0)
- ret = err;
- } else {
+ false, &_got);
+ if (ret == -EAGAIN) {
+ continue;
+ } else if (!ret) {
+ int err;
+
DEFINE_WAIT_FUNC(wait, woken_wake_function);
add_wait_queue(&ci->i_cap_wq, &wait);
- while (!try_get_cap_refs(ci, need, want, endoff,
- true, &_got, &err)) {
+ while (!(err = try_get_cap_refs(ci, need, want, endoff,
+ true, &_got))) {
if (signal_pending(current)) {
ret = -ERESTARTSYS;
break;
@@ -2770,19 +2756,14 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
}
remove_wait_queue(&ci->i_cap_wq, &wait);
-
if (err == -EAGAIN)
continue;
- if (err < 0)
- ret = err;
}
- if (ret < 0) {
- if (err == -ESTALE) {
- /* session was killed, try renew caps */
- ret = ceph_renew_caps(&ci->vfs_inode);
- if (ret == 0)
- continue;
- }
+ if (ret == -ESTALE) {
+ /* session was killed, try renew caps */
+ ret = ceph_renew_caps(&ci->vfs_inode);
+ if (ret == 0)
+ continue;
return ret;
}
@@ -4099,7 +4080,7 @@ void ceph_put_fmode(struct ceph_inode_info *ci, int fmode)
}
/*
- * For a soon-to-be unlinked file, drop the AUTH_RDCACHE caps. If it
+ * For a soon-to-be unlinked file, drop the LINK caps. If it
* looks like the link count will hit 0, drop any other caps (other
* than PIN) we don't specifically want (due to the file still being
* open).
diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c
index 98365e74cb4a..b3fc5fe26a1a 100644
--- a/fs/ceph/debugfs.c
+++ b/fs/ceph/debugfs.c
@@ -37,7 +37,7 @@ static int mdsmap_show(struct seq_file *s, void *p)
struct ceph_entity_addr *addr = &mdsmap->m_info[i].addr;
int state = mdsmap->m_info[i].state;
seq_printf(s, "\tmds%d\t%s\t(%s)\n", i,
- ceph_pr_addr(&addr->in_addr),
+ ceph_pr_addr(addr),
ceph_mds_state_name(state));
}
return 0;
@@ -88,7 +88,7 @@ static int mdsc_show(struct seq_file *s, void *p)
req->r_dentry,
path ? path : "");
spin_unlock(&req->r_dentry->d_lock);
- kfree(path);
+ ceph_mdsc_free_path(path, pathlen);
} else if (req->r_path1) {
seq_printf(s, " #%llx/%s", req->r_ino1.ino,
req->r_path1);
@@ -108,7 +108,7 @@ static int mdsc_show(struct seq_file *s, void *p)
req->r_old_dentry,
path ? path : "");
spin_unlock(&req->r_old_dentry->d_lock);
- kfree(path);
+ ceph_mdsc_free_path(path, pathlen);
} else if (req->r_path2 && req->r_op != CEPH_MDS_OP_SYMLINK) {
if (req->r_ino2.ino)
seq_printf(s, " #%llx/%s", req->r_ino2.ino,
@@ -124,18 +124,48 @@ static int mdsc_show(struct seq_file *s, void *p)
return 0;
}
+static int caps_show_cb(struct inode *inode, struct ceph_cap *cap, void *p)
+{
+ struct seq_file *s = p;
+
+ seq_printf(s, "0x%-17lx%-17s%-17s\n", inode->i_ino,
+ ceph_cap_string(cap->issued),
+ ceph_cap_string(cap->implemented));
+ return 0;
+}
+
static int caps_show(struct seq_file *s, void *p)
{
struct ceph_fs_client *fsc = s->private;
- int total, avail, used, reserved, min;
+ struct ceph_mds_client *mdsc = fsc->mdsc;
+ int total, avail, used, reserved, min, i;
ceph_reservation_status(fsc, &total, &avail, &used, &reserved, &min);
seq_printf(s, "total\t\t%d\n"
"avail\t\t%d\n"
"used\t\t%d\n"
"reserved\t%d\n"
- "min\t%d\n",
+ "min\t\t%d\n\n",
total, avail, used, reserved, min);
+ seq_printf(s, "ino issued implemented\n");
+ seq_printf(s, "-----------------------------------------------\n");
+
+ mutex_lock(&mdsc->mutex);
+ for (i = 0; i < mdsc->max_sessions; i++) {
+ struct ceph_mds_session *session;
+
+ session = __ceph_lookup_mds_session(mdsc, i);
+ if (!session)
+ continue;
+ mutex_unlock(&mdsc->mutex);
+ mutex_lock(&session->s_mutex);
+ ceph_iterate_session_caps(session, caps_show_cb, s);
+ mutex_unlock(&session->s_mutex);
+ ceph_put_mds_session(session);
+ mutex_lock(&mdsc->mutex);
+ }
+ mutex_unlock(&mdsc->mutex);
+
return 0;
}
diff --git a/fs/ceph/export.c b/fs/ceph/export.c
index 3c59ad180ef0..d3ef7ee429ec 100644
--- a/fs/ceph/export.c
+++ b/fs/ceph/export.c
@@ -22,18 +22,77 @@ struct ceph_nfs_confh {
u64 ino, parent_ino;
} __attribute__ ((packed));
+/*
+ * fh for snapped inode
+ */
+struct ceph_nfs_snapfh {
+ u64 ino;
+ u64 snapid;
+ u64 parent_ino;
+ u32 hash;
+} __attribute__ ((packed));
+
+static int ceph_encode_snapfh(struct inode *inode, u32 *rawfh, int *max_len,
+ struct inode *parent_inode)
+{
+ const static int snap_handle_length =
+ sizeof(struct ceph_nfs_snapfh) >> 2;
+ struct ceph_nfs_snapfh *sfh = (void *)rawfh;
+ u64 snapid = ceph_snap(inode);
+ int ret;
+ bool no_parent = true;
+
+ if (*max_len < snap_handle_length) {
+ *max_len = snap_handle_length;
+ ret = FILEID_INVALID;
+ goto out;
+ }
+
+ ret = -EINVAL;
+ if (snapid != CEPH_SNAPDIR) {
+ struct inode *dir;
+ struct dentry *dentry = d_find_alias(inode);
+ if (!dentry)
+ goto out;
+
+ rcu_read_lock();
+ dir = d_inode_rcu(dentry->d_parent);
+ if (ceph_snap(dir) != CEPH_SNAPDIR) {
+ sfh->parent_ino = ceph_ino(dir);
+ sfh->hash = ceph_dentry_hash(dir, dentry);
+ no_parent = false;
+ }
+ rcu_read_unlock();
+ dput(dentry);
+ }
+
+ if (no_parent) {
+ if (!S_ISDIR(inode->i_mode))
+ goto out;
+ sfh->parent_ino = sfh->ino;
+ sfh->hash = 0;
+ }
+ sfh->ino = ceph_ino(inode);
+ sfh->snapid = snapid;
+
+ *max_len = snap_handle_length;
+ ret = FILEID_BTRFS_WITH_PARENT;
+out:
+ dout("encode_snapfh %llx.%llx ret=%d\n", ceph_vinop(inode), ret);
+ return ret;
+}
+
static int ceph_encode_fh(struct inode *inode, u32 *rawfh, int *max_len,
struct inode *parent_inode)
{
+ const static int handle_length =
+ sizeof(struct ceph_nfs_fh) >> 2;
+ const static int connected_handle_length =
+ sizeof(struct ceph_nfs_confh) >> 2;
int type;
- struct ceph_nfs_fh *fh = (void *)rawfh;
- struct ceph_nfs_confh *cfh = (void *)rawfh;
- int connected_handle_length = sizeof(*cfh)/4;
- int handle_length = sizeof(*fh)/4;
- /* don't re-export snaps */
if (ceph_snap(inode) != CEPH_NOSNAP)
- return -EINVAL;
+ return ceph_encode_snapfh(inode, rawfh, max_len, parent_inode);
if (parent_inode && (*max_len < connected_handle_length)) {
*max_len = connected_handle_length;
@@ -44,6 +103,7 @@ static int ceph_encode_fh(struct inode *inode, u32 *rawfh, int *max_len,
}
if (parent_inode) {
+ struct ceph_nfs_confh *cfh = (void *)rawfh;
dout("encode_fh %llx with parent %llx\n",
ceph_ino(inode), ceph_ino(parent_inode));
cfh->ino = ceph_ino(inode);
@@ -51,6 +111,7 @@ static int ceph_encode_fh(struct inode *inode, u32 *rawfh, int *max_len,
*max_len = connected_handle_length;
type = FILEID_INO32_GEN_PARENT;
} else {
+ struct ceph_nfs_fh *fh = (void *)rawfh;
dout("encode_fh %llx\n", ceph_ino(inode));
fh->ino = ceph_ino(inode);
*max_len = handle_length;
@@ -59,7 +120,7 @@ static int ceph_encode_fh(struct inode *inode, u32 *rawfh, int *max_len,
return type;
}
-static struct dentry *__fh_to_dentry(struct super_block *sb, u64 ino)
+static struct inode *__lookup_inode(struct super_block *sb, u64 ino)
{
struct ceph_mds_client *mdsc = ceph_sb_to_client(sb)->mdsc;
struct inode *inode;
@@ -81,7 +142,7 @@ static struct dentry *__fh_to_dentry(struct super_block *sb, u64 ino)
mask = CEPH_STAT_CAP_INODE;
if (ceph_security_xattr_wanted(d_inode(sb->s_root)))
mask |= CEPH_CAP_XATTR_SHARED;
- req->r_args.getattr.mask = cpu_to_le32(mask);
+ req->r_args.lookupino.mask = cpu_to_le32(mask);
req->r_ino1 = vino;
req->r_num_caps = 1;
@@ -91,16 +152,114 @@ static struct dentry *__fh_to_dentry(struct super_block *sb, u64 ino)
ihold(inode);
ceph_mdsc_put_request(req);
if (!inode)
- return ERR_PTR(-ESTALE);
- if (inode->i_nlink == 0) {
- iput(inode);
- return ERR_PTR(-ESTALE);
- }
+ return err < 0 ? ERR_PTR(err) : ERR_PTR(-ESTALE);
}
+ return inode;
+}
+
+struct inode *ceph_lookup_inode(struct super_block *sb, u64 ino)
+{
+ struct inode *inode = __lookup_inode(sb, ino);
+ if (IS_ERR(inode))
+ return inode;
+ if (inode->i_nlink == 0) {
+ iput(inode);
+ return ERR_PTR(-ESTALE);
+ }
+ return inode;
+}
+static struct dentry *__fh_to_dentry(struct super_block *sb, u64 ino)
+{
+ struct inode *inode = __lookup_inode(sb, ino);
+ if (IS_ERR(inode))
+ return ERR_CAST(inode);
+ if (inode->i_nlink == 0) {
+ iput(inode);
+ return ERR_PTR(-ESTALE);
+ }
return d_obtain_alias(inode);
}
+static struct dentry *__snapfh_to_dentry(struct super_block *sb,
+ struct ceph_nfs_snapfh *sfh,
+ bool want_parent)
+{
+ struct ceph_mds_client *mdsc = ceph_sb_to_client(sb)->mdsc;
+ struct ceph_mds_request *req;
+ struct inode *inode;
+ struct ceph_vino vino;
+ int mask;
+ int err;
+ bool unlinked = false;
+
+ if (want_parent) {
+ vino.ino = sfh->parent_ino;
+ if (sfh->snapid == CEPH_SNAPDIR)
+ vino.snap = CEPH_NOSNAP;
+ else if (sfh->ino == sfh->parent_ino)
+ vino.snap = CEPH_SNAPDIR;
+ else
+ vino.snap = sfh->snapid;
+ } else {
+ vino.ino = sfh->ino;
+ vino.snap = sfh->snapid;
+ }
+ inode = ceph_find_inode(sb, vino);
+ if (inode)
+ return d_obtain_alias(inode);
+
+ req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LOOKUPINO,
+ USE_ANY_MDS);
+ if (IS_ERR(req))
+ return ERR_CAST(req);
+
+ mask = CEPH_STAT_CAP_INODE;
+ if (ceph_security_xattr_wanted(d_inode(sb->s_root)))
+ mask |= CEPH_CAP_XATTR_SHARED;
+ req->r_args.lookupino.mask = cpu_to_le32(mask);
+ if (vino.snap < CEPH_NOSNAP) {
+ req->r_args.lookupino.snapid = cpu_to_le64(vino.snap);
+ if (!want_parent && sfh->ino != sfh->parent_ino) {
+ req->r_args.lookupino.parent =
+ cpu_to_le64(sfh->parent_ino);
+ req->r_args.lookupino.hash =
+ cpu_to_le32(sfh->hash);
+ }
+ }
+
+ req->r_ino1 = vino;
+ req->r_num_caps = 1;
+ err = ceph_mdsc_do_request(mdsc, NULL, req);
+ inode = req->r_target_inode;
+ if (inode) {
+ if (vino.snap == CEPH_SNAPDIR) {
+ if (inode->i_nlink == 0)
+ unlinked = true;
+ inode = ceph_get_snapdir(inode);
+ } else if (ceph_snap(inode) == vino.snap) {
+ ihold(inode);
+ } else {
+ /* mds does not support lookup snapped inode */
+ err = -EOPNOTSUPP;
+ inode = NULL;
+ }
+ }
+ ceph_mdsc_put_request(req);
+
+ if (want_parent) {
+ dout("snapfh_to_parent %llx.%llx\n err=%d\n",
+ vino.ino, vino.snap, err);
+ } else {
+ dout("snapfh_to_dentry %llx.%llx parent %llx hash %x err=%d",
+ vino.ino, vino.snap, sfh->parent_ino, sfh->hash, err);
+ }
+ if (!inode)
+ return ERR_PTR(-ESTALE);
+ /* see comments in ceph_get_parent() */
+ return unlinked ? d_obtain_root(inode) : d_obtain_alias(inode);
+}
+
/*
* convert regular fh to dentry
*/
@@ -110,6 +269,11 @@ static struct dentry *ceph_fh_to_dentry(struct super_block *sb,
{
struct ceph_nfs_fh *fh = (void *)fid->raw;
+ if (fh_type == FILEID_BTRFS_WITH_PARENT) {
+ struct ceph_nfs_snapfh *sfh = (void *)fid->raw;
+ return __snapfh_to_dentry(sb, sfh, false);
+ }
+
if (fh_type != FILEID_INO32_GEN &&
fh_type != FILEID_INO32_GEN_PARENT)
return NULL;
@@ -163,13 +327,49 @@ static struct dentry *__get_parent(struct super_block *sb,
static struct dentry *ceph_get_parent(struct dentry *child)
{
- /* don't re-export snaps */
- if (ceph_snap(d_inode(child)) != CEPH_NOSNAP)
- return ERR_PTR(-EINVAL);
-
- dout("get_parent %p ino %llx.%llx\n",
- child, ceph_vinop(d_inode(child)));
- return __get_parent(child->d_sb, child, 0);
+ struct inode *inode = d_inode(child);
+ struct dentry *dn;
+
+ if (ceph_snap(inode) != CEPH_NOSNAP) {
+ struct inode* dir;
+ bool unlinked = false;
+ /* do not support non-directory */
+ if (!d_is_dir(child)) {
+ dn = ERR_PTR(-EINVAL);
+ goto out;
+ }
+ dir = __lookup_inode(inode->i_sb, ceph_ino(inode));
+ if (IS_ERR(dir)) {
+ dn = ERR_CAST(dir);
+ goto out;
+ }
+ /* There can be multiple paths to access snapped inode.
+ * For simplicity, treat snapdir of head inode as parent */
+ if (ceph_snap(inode) != CEPH_SNAPDIR) {
+ struct inode *snapdir = ceph_get_snapdir(dir);
+ if (dir->i_nlink == 0)
+ unlinked = true;
+ iput(dir);
+ if (IS_ERR(snapdir)) {
+ dn = ERR_CAST(snapdir);
+ goto out;
+ }
+ dir = snapdir;
+ }
+ /* If directory has already been deleted, futher get_parent
+ * will fail. Do not mark snapdir dentry as disconnected,
+ * this prevent exportfs from doing futher get_parent. */
+ if (unlinked)
+ dn = d_obtain_root(dir);
+ else
+ dn = d_obtain_alias(dir);
+ } else {
+ dn = __get_parent(child->d_sb, child, 0);
+ }
+out:
+ dout("get_parent %p ino %llx.%llx err=%ld\n",
+ child, ceph_vinop(inode), (IS_ERR(dn) ? PTR_ERR(dn) : 0));
+ return dn;
}
/*
@@ -182,6 +382,11 @@ static struct dentry *ceph_fh_to_parent(struct super_block *sb,
struct ceph_nfs_confh *cfh = (void *)fid->raw;
struct dentry *dentry;
+ if (fh_type == FILEID_BTRFS_WITH_PARENT) {
+ struct ceph_nfs_snapfh *sfh = (void *)fid->raw;
+ return __snapfh_to_dentry(sb, sfh, true);
+ }
+
if (fh_type != FILEID_INO32_GEN_PARENT)
return NULL;
if (fh_len < sizeof(*cfh) / 4)
@@ -194,14 +399,115 @@ static struct dentry *ceph_fh_to_parent(struct super_block *sb,
return dentry;
}
+static int __get_snap_name(struct dentry *parent, char *name,
+ struct dentry *child)
+{
+ struct inode *inode = d_inode(child);
+ struct inode *dir = d_inode(parent);
+ struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+ struct ceph_mds_request *req = NULL;
+ char *last_name = NULL;
+ unsigned next_offset = 2;
+ int err = -EINVAL;
+
+ if (ceph_ino(inode) != ceph_ino(dir))
+ goto out;
+ if (ceph_snap(inode) == CEPH_SNAPDIR) {
+ if (ceph_snap(dir) == CEPH_NOSNAP) {
+ strcpy(name, fsc->mount_options->snapdir_name);
+ err = 0;
+ }
+ goto out;
+ }
+ if (ceph_snap(dir) != CEPH_SNAPDIR)
+ goto out;
+
+ while (1) {
+ struct ceph_mds_reply_info_parsed *rinfo;
+ struct ceph_mds_reply_dir_entry *rde;
+ int i;
+
+ req = ceph_mdsc_create_request(fsc->mdsc, CEPH_MDS_OP_LSSNAP,
+ USE_AUTH_MDS);
+ if (IS_ERR(req)) {
+ err = PTR_ERR(req);
+ req = NULL;
+ goto out;
+ }
+ err = ceph_alloc_readdir_reply_buffer(req, inode);
+ if (err)
+ goto out;
+
+ req->r_direct_mode = USE_AUTH_MDS;
+ req->r_readdir_offset = next_offset;
+ req->r_args.readdir.flags =
+ cpu_to_le16(CEPH_READDIR_REPLY_BITFLAGS);
+ if (last_name) {
+ req->r_path2 = last_name;
+ last_name = NULL;
+ }
+
+ req->r_inode = dir;
+ ihold(dir);
+ req->r_dentry = dget(parent);
+
+ inode_lock(dir);
+ err = ceph_mdsc_do_request(fsc->mdsc, NULL, req);
+ inode_unlock(dir);
+
+ if (err < 0)
+ goto out;
+
+ rinfo = &req->r_reply_info;
+ for (i = 0; i < rinfo->dir_nr; i++) {
+ rde = rinfo->dir_entries + i;
+ BUG_ON(!rde->inode.in);
+ if (ceph_snap(inode) ==
+ le64_to_cpu(rde->inode.in->snapid)) {
+ memcpy(name, rde->name, rde->name_len);
+ name[rde->name_len] = '\0';
+ err = 0;
+ goto out;
+ }
+ }
+
+ if (rinfo->dir_end)
+ break;
+
+ BUG_ON(rinfo->dir_nr <= 0);
+ rde = rinfo->dir_entries + (rinfo->dir_nr - 1);
+ next_offset += rinfo->dir_nr;
+ last_name = kstrndup(rde->name, rde->name_len, GFP_KERNEL);
+ if (!last_name) {
+ err = -ENOMEM;
+ goto out;
+ }
+
+ ceph_mdsc_put_request(req);
+ req = NULL;
+ }
+ err = -ENOENT;
+out:
+ if (req)
+ ceph_mdsc_put_request(req);
+ kfree(last_name);
+ dout("get_snap_name %p ino %llx.%llx err=%d\n",
+ child, ceph_vinop(inode), err);
+ return err;
+}
+
static int ceph_get_name(struct dentry *parent, char *name,
struct dentry *child)
{
struct ceph_mds_client *mdsc;
struct ceph_mds_request *req;
+ struct inode *inode = d_inode(child);
int err;
- mdsc = ceph_inode_to_client(d_inode(child))->mdsc;
+ if (ceph_snap(inode) != CEPH_NOSNAP)
+ return __get_snap_name(parent, name, child);
+
+ mdsc = ceph_inode_to_client(inode)->mdsc;
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LOOKUPNAME,
USE_ANY_MDS);
if (IS_ERR(req))
@@ -209,8 +515,8 @@ static int ceph_get_name(struct dentry *parent, char *name,
inode_lock(d_inode(parent));
- req->r_inode = d_inode(child);
- ihold(d_inode(child));
+ req->r_inode = inode;
+ ihold(inode);
req->r_ino2 = ceph_vino(d_inode(parent));
req->r_parent = d_inode(parent);
set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
@@ -224,10 +530,10 @@ static int ceph_get_name(struct dentry *parent, char *name,
memcpy(name, rinfo->dname, rinfo->dname_len);
name[rinfo->dname_len] = 0;
dout("get_name %p ino %llx.%llx name %s\n",
- child, ceph_vinop(d_inode(child)), name);
+ child, ceph_vinop(inode), name);
} else {
dout("get_name %p ino %llx.%llx err %d\n",
- child, ceph_vinop(d_inode(child)), err);
+ child, ceph_vinop(inode), err);
}
ceph_mdsc_put_request(req);
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 84725b53ac21..305daf043eb0 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -929,7 +929,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
dout("sync_direct_%s on file %p %lld~%u snapc %p seq %lld\n",
(write ? "write" : "read"), file, pos, (unsigned)count,
- snapc, snapc->seq);
+ snapc, snapc ? snapc->seq : 0);
ret = filemap_write_and_wait_range(inode->i_mapping,
pos, pos + count - 1);
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 35dae6d5493a..f85355bf49c4 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -2266,43 +2266,72 @@ int ceph_permission(struct inode *inode, int mask)
return err;
}
+/* Craft a mask of needed caps given a set of requested statx attrs. */
+static int statx_to_caps(u32 want)
+{
+ int mask = 0;
+
+ if (want & (STATX_MODE|STATX_UID|STATX_GID|STATX_CTIME))
+ mask |= CEPH_CAP_AUTH_SHARED;
+
+ if (want & (STATX_NLINK|STATX_CTIME))
+ mask |= CEPH_CAP_LINK_SHARED;
+
+ if (want & (STATX_ATIME|STATX_MTIME|STATX_CTIME|STATX_SIZE|
+ STATX_BLOCKS))
+ mask |= CEPH_CAP_FILE_SHARED;
+
+ if (want & (STATX_CTIME))
+ mask |= CEPH_CAP_XATTR_SHARED;
+
+ return mask;
+}
+
/*
- * Get all attributes. Hopefully somedata we'll have a statlite()
- * and can limit the fields we require to be accurate.
+ * Get all the attributes. If we have sufficient caps for the requested attrs,
+ * then we can avoid talking to the MDS at all.
*/
int ceph_getattr(const struct path *path, struct kstat *stat,
u32 request_mask, unsigned int flags)
{
struct inode *inode = d_inode(path->dentry);
struct ceph_inode_info *ci = ceph_inode(inode);
- int err;
+ int err = 0;
- err = ceph_do_getattr(inode, CEPH_STAT_CAP_INODE_ALL, false);
- if (!err) {
- generic_fillattr(inode, stat);
- stat->ino = ceph_translate_ino(inode->i_sb, inode->i_ino);
- if (ceph_snap(inode) == CEPH_NOSNAP)
- stat->dev = inode->i_sb->s_dev;
+ /* Skip the getattr altogether if we're asked not to sync */
+ if (!(flags & AT_STATX_DONT_SYNC)) {
+ err = ceph_do_getattr(inode, statx_to_caps(request_mask),
+ flags & AT_STATX_FORCE_SYNC);
+ if (err)
+ return err;
+ }
+
+ generic_fillattr(inode, stat);
+ stat->ino = ceph_translate_ino(inode->i_sb, inode->i_ino);
+ if (ceph_snap(inode) == CEPH_NOSNAP)
+ stat->dev = inode->i_sb->s_dev;
+ else
+ stat->dev = ci->i_snapid_map ? ci->i_snapid_map->dev : 0;
+
+ if (S_ISDIR(inode->i_mode)) {
+ if (ceph_test_mount_opt(ceph_sb_to_client(inode->i_sb),
+ RBYTES))
+ stat->size = ci->i_rbytes;
else
- stat->dev = ci->i_snapid_map ? ci->i_snapid_map->dev : 0;
-
- if (S_ISDIR(inode->i_mode)) {
- if (ceph_test_mount_opt(ceph_sb_to_client(inode->i_sb),
- RBYTES))
- stat->size = ci->i_rbytes;
- else
- stat->size = ci->i_files + ci->i_subdirs;
- stat->blocks = 0;
- stat->blksize = 65536;
- /*
- * Some applications rely on the number of st_nlink
- * value on directories to be either 0 (if unlinked)
- * or 2 + number of subdirectories.
- */
- if (stat->nlink == 1)
- /* '.' + '..' + subdirs */
- stat->nlink = 1 + 1 + ci->i_subdirs;
- }
+ stat->size = ci->i_files + ci->i_subdirs;
+ stat->blocks = 0;
+ stat->blksize = 65536;
+ /*
+ * Some applications rely on the number of st_nlink
+ * value on directories to be either 0 (if unlinked)
+ * or 2 + number of subdirectories.
+ */
+ if (stat->nlink == 1)
+ /* '.' + '..' + subdirs */
+ stat->nlink = 1 + 1 + ci->i_subdirs;
}
+
+ /* Mask off any higher bits (e.g. btime) until we have support */
+ stat->result_mask = request_mask & STATX_BASIC_STATS;
return err;
}
diff --git a/fs/ceph/locks.c b/fs/ceph/locks.c
index 9dae2ec7e1fa..ac9b53b89365 100644
--- a/fs/ceph/locks.c
+++ b/fs/ceph/locks.c
@@ -237,15 +237,6 @@ int ceph_lock(struct file *file, int cmd, struct file_lock *fl)
spin_lock(&ci->i_ceph_lock);
if (ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) {
err = -EIO;
- } else if (op == CEPH_MDS_OP_SETFILELOCK) {
- /*
- * increasing i_filelock_ref closes race window between
- * handling request reply and adding file_lock struct to
- * inode. Otherwise, i_auth_cap may get trimmed in the
- * window. Caller function will decrease the counter.
- */
- fl->fl_ops = &ceph_fl_lock_ops;
- atomic_inc(&ci->i_filelock_ref);
}
spin_unlock(&ci->i_ceph_lock);
if (err < 0) {
@@ -299,10 +290,6 @@ int ceph_flock(struct file *file, int cmd, struct file_lock *fl)
spin_lock(&ci->i_ceph_lock);
if (ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) {
err = -EIO;
- } else {
- /* see comment in ceph_lock */
- fl->fl_ops = &ceph_fl_lock_ops;
- atomic_inc(&ci->i_filelock_ref);
}
spin_unlock(&ci->i_ceph_lock);
if (err < 0) {
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 9049c2a3e972..959b1bf7c327 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -550,15 +550,9 @@ void ceph_put_mds_session(struct ceph_mds_session *s)
struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
int mds)
{
- struct ceph_mds_session *session;
-
if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
return NULL;
- session = mdsc->sessions[mds];
- dout("lookup_mds_session %p %d\n", session,
- refcount_read(&session->s_ref));
- get_session(session);
- return session;
+ return get_session(mdsc->sessions[mds]);
}
static bool __have_session(struct ceph_mds_client *mdsc, int mds)
@@ -1284,9 +1278,9 @@ static void cleanup_session_requests(struct ceph_mds_client *mdsc,
*
* Caller must hold session s_mutex.
*/
-static int iterate_session_caps(struct ceph_mds_session *session,
- int (*cb)(struct inode *, struct ceph_cap *,
- void *), void *arg)
+int ceph_iterate_session_caps(struct ceph_mds_session *session,
+ int (*cb)(struct inode *, struct ceph_cap *,
+ void *), void *arg)
{
struct list_head *p;
struct ceph_cap *cap;
@@ -1451,7 +1445,7 @@ static void remove_session_caps(struct ceph_mds_session *session)
LIST_HEAD(dispose);
dout("remove_session_caps on %p\n", session);
- iterate_session_caps(session, remove_session_caps_cb, fsc);
+ ceph_iterate_session_caps(session, remove_session_caps_cb, fsc);
wake_up_all(&fsc->mdsc->cap_flushing_wq);
@@ -1534,8 +1528,8 @@ static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
static void wake_up_session_caps(struct ceph_mds_session *session, int ev)
{
dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
- iterate_session_caps(session, wake_up_session_cb,
- (void *)(unsigned long)ev);
+ ceph_iterate_session_caps(session, wake_up_session_cb,
+ (void *)(unsigned long)ev);
}
/*
@@ -1768,7 +1762,7 @@ int ceph_trim_caps(struct ceph_mds_client *mdsc,
session->s_mds, session->s_nr_caps, max_caps, trim_caps);
if (trim_caps > 0) {
session->s_trim_caps = trim_caps;
- iterate_session_caps(session, trim_caps_cb, session);
+ ceph_iterate_session_caps(session, trim_caps_cb, session);
dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
session->s_mds, session->s_nr_caps, max_caps,
trim_caps - session->s_trim_caps);
@@ -1861,7 +1855,8 @@ again:
num_cap_releases--;
head = msg->front.iov_base;
- le32_add_cpu(&head->num, 1);
+ put_unaligned_le32(get_unaligned_le32(&head->num) + 1,
+ &head->num);
item = msg->front.iov_base + msg->front.iov_len;
item->ino = cpu_to_le64(cap->cap_ino);
item->cap_id = cpu_to_le64(cap->cap_id);
@@ -2089,43 +2084,29 @@ static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
* Encode hidden .snap dirs as a double /, i.e.
* foo/.snap/bar -> foo//bar
*/
-char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
+char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase,
int stop_on_nosnap)
{
struct dentry *temp;
char *path;
- int len, pos;
+ int pos;
unsigned seq;
+ u64 base;
if (!dentry)
return ERR_PTR(-EINVAL);
-retry:
- len = 0;
- seq = read_seqbegin(&rename_lock);
- rcu_read_lock();
- for (temp = dentry; !IS_ROOT(temp);) {
- struct inode *inode = d_inode(temp);
- if (inode && ceph_snap(inode) == CEPH_SNAPDIR)
- len++; /* slash only */
- else if (stop_on_nosnap && inode &&
- ceph_snap(inode) == CEPH_NOSNAP)
- break;
- else
- len += 1 + temp->d_name.len;
- temp = temp->d_parent;
- }
- rcu_read_unlock();
- if (len)
- len--; /* no leading '/' */
-
- path = kmalloc(len+1, GFP_NOFS);
+ path = __getname();
if (!path)
return ERR_PTR(-ENOMEM);
- pos = len;
- path[pos] = 0; /* trailing null */
+retry:
+ pos = PATH_MAX - 1;
+ path[pos] = '\0';
+
+ seq = read_seqbegin(&rename_lock);
rcu_read_lock();
- for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
+ temp = dentry;
+ for (;;) {
struct inode *inode;
spin_lock(&temp->d_lock);
@@ -2143,83 +2124,54 @@ retry:
spin_unlock(&temp->d_lock);
break;
}
- strncpy(path + pos, temp->d_name.name,
- temp->d_name.len);
+ memcpy(path + pos, temp->d_name.name, temp->d_name.len);
}
spin_unlock(&temp->d_lock);
- if (pos)
- path[--pos] = '/';
temp = temp->d_parent;
+
+ /* Are we at the root? */
+ if (IS_ROOT(temp))
+ break;
+
+ /* Are we out of buffer? */
+ if (--pos < 0)
+ break;
+
+ path[pos] = '/';
}
+ base = ceph_ino(d_inode(temp));
rcu_read_unlock();
- if (pos != 0 || read_seqretry(&rename_lock, seq)) {
+ if (pos < 0 || read_seqretry(&rename_lock, seq)) {
pr_err("build_path did not end path lookup where "
- "expected, namelen is %d, pos is %d\n", len, pos);
+ "expected, pos is %d\n", pos);
/* presumably this is only possible if racing with a
rename of one of the parent directories (we can not
lock the dentries above us to prevent this, but
retrying should be harmless) */
- kfree(path);
goto retry;
}
- *base = ceph_ino(d_inode(temp));
- *plen = len;
+ *pbase = base;
+ *plen = PATH_MAX - 1 - pos;
dout("build_path on %p %d built %llx '%.*s'\n",
- dentry, d_count(dentry), *base, len, path);
- return path;
-}
-
-/* Duplicate the dentry->d_name.name safely */
-static int clone_dentry_name(struct dentry *dentry, const char **ppath,
- int *ppathlen)
-{
- u32 len;
- char *name;
-
-retry:
- len = READ_ONCE(dentry->d_name.len);
- name = kmalloc(len + 1, GFP_NOFS);
- if (!name)
- return -ENOMEM;
-
- spin_lock(&dentry->d_lock);
- if (dentry->d_name.len != len) {
- spin_unlock(&dentry->d_lock);
- kfree(name);
- goto retry;
- }
- memcpy(name, dentry->d_name.name, len);
- spin_unlock(&dentry->d_lock);
-
- name[len] = '\0';
- *ppath = name;
- *ppathlen = len;
- return 0;
+ dentry, d_count(dentry), base, *plen, path + pos);
+ return path + pos;
}
static int build_dentry_path(struct dentry *dentry, struct inode *dir,
const char **ppath, int *ppathlen, u64 *pino,
bool *pfreepath, bool parent_locked)
{
- int ret;
char *path;
rcu_read_lock();
if (!dir)
dir = d_inode_rcu(dentry->d_parent);
- if (dir && ceph_snap(dir) == CEPH_NOSNAP) {
+ if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) {
*pino = ceph_ino(dir);
rcu_read_unlock();
- if (parent_locked) {
- *ppath = dentry->d_name.name;
- *ppathlen = dentry->d_name.len;
- } else {
- ret = clone_dentry_name(dentry, ppath, ppathlen);
- if (ret)
- return ret;
- *pfreepath = true;
- }
+ *ppath = dentry->d_name.name;
+ *ppathlen = dentry->d_name.len;
return 0;
}
rcu_read_unlock();
@@ -2331,9 +2283,9 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
(!!req->r_inode_drop + !!req->r_dentry_drop +
!!req->r_old_inode_drop + !!req->r_old_dentry_drop);
if (req->r_dentry_drop)
- len += req->r_dentry->d_name.len;
+ len += pathlen1;
if (req->r_old_dentry_drop)
- len += req->r_old_dentry->d_name.len;
+ len += pathlen2;
msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false);
if (!msg) {
@@ -2410,10 +2362,10 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
out_free2:
if (freepath2)
- kfree((char *)path2);
+ ceph_mdsc_free_path((char *)path2, pathlen2);
out_free1:
if (freepath1)
- kfree((char *)path1);
+ ceph_mdsc_free_path((char *)path1, pathlen1);
out:
return msg;
}
@@ -2427,8 +2379,7 @@ static void complete_request(struct ceph_mds_client *mdsc,
{
if (req->r_callback)
req->r_callback(mdsc, req);
- else
- complete_all(&req->r_completion);
+ complete_all(&req->r_completion);
}
/*
@@ -2670,28 +2621,11 @@ static void kick_requests(struct ceph_mds_client *mdsc, int mds)
}
}
-void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
+int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
struct ceph_mds_request *req)
{
- dout("submit_request on %p\n", req);
- mutex_lock(&mdsc->mutex);
- __register_request(mdsc, req, NULL);
- __do_request(mdsc, req);
- mutex_unlock(&mdsc->mutex);
-}
-
-/*
- * Synchrously perform an mds request. Take care of all of the
- * session setup, forwarding, retry details.
- */
-int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
- struct inode *dir,
- struct ceph_mds_request *req)
-{
int err;
- dout("do_request on %p\n", req);
-
/* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
if (req->r_inode)
ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
@@ -2701,18 +2635,21 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
CEPH_CAP_PIN);
- /* issue */
+ dout("submit_request on %p for inode %p\n", req, dir);
mutex_lock(&mdsc->mutex);
__register_request(mdsc, req, dir);
__do_request(mdsc, req);
+ err = req->r_err;
+ mutex_unlock(&mdsc->mutex);
+ return err;
+}
- if (req->r_err) {
- err = req->r_err;
- goto out;
- }
+static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc,
+ struct ceph_mds_request *req)
+{
+ int err;
/* wait */
- mutex_unlock(&mdsc->mutex);
dout("do_request waiting\n");
if (!req->r_timeout && req->r_wait_for_completion) {
err = req->r_wait_for_completion(mdsc, req);
@@ -2753,8 +2690,26 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
err = req->r_err;
}
-out:
mutex_unlock(&mdsc->mutex);
+ return err;
+}
+
+/*
+ * Synchrously perform an mds request. Take care of all of the
+ * session setup, forwarding, retry details.
+ */
+int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
+ struct inode *dir,
+ struct ceph_mds_request *req)
+{
+ int err;
+
+ dout("do_request on %p\n", req);
+
+ /* issue */
+ err = ceph_mdsc_submit_request(mdsc, dir, req);
+ if (!err)
+ err = ceph_mdsc_wait_request(mdsc, req);
dout("do_request %p done, result %d\n", req, err);
return err;
}
@@ -3485,7 +3440,7 @@ out_freeflocks:
ceph_pagelist_encode_string(pagelist, path, pathlen);
ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
out_freepath:
- kfree(path);
+ ceph_mdsc_free_path(path, pathlen);
}
out_err:
@@ -3642,7 +3597,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
recon_state.msg_version = 2;
}
/* trsaverse this session's caps */
- err = iterate_session_caps(session, encode_caps_cb, &recon_state);
+ err = ceph_iterate_session_caps(session, encode_caps_cb, &recon_state);
spin_lock(&session->s_cap_lock);
session->s_cap_reconnect = 0;
@@ -4125,6 +4080,8 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
mdsc->max_sessions = 0;
mdsc->stopping = 0;
atomic64_set(&mdsc->quotarealms_count, 0);
+ mdsc->quotarealms_inodes = RB_ROOT;
+ mutex_init(&mdsc->quotarealms_inodes_mutex);
mdsc->last_snap_seq = 0;
init_rwsem(&mdsc->snap_rwsem);
mdsc->snap_realms = RB_ROOT;
@@ -4216,6 +4173,8 @@ void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
* their inode/dcache refs
*/
ceph_msgr_flush();
+
+ ceph_cleanup_quotarealms_inodes(mdsc);
}
/*
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 50385a481fdb..a83f28bc2387 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -326,6 +326,18 @@ struct ceph_snapid_map {
};
/*
+ * node for list of quotarealm inodes that are not visible from the filesystem
+ * mountpoint, but required to handle, e.g. quotas.
+ */
+struct ceph_quotarealm_inode {
+ struct rb_node node;
+ u64 ino;
+ unsigned long timeout; /* last time a lookup failed for this inode */
+ struct mutex mutex;
+ struct inode *inode;
+};
+
+/*
* mds client state
*/
struct ceph_mds_client {
@@ -344,6 +356,12 @@ struct ceph_mds_client {
int stopping; /* true if shutting down */
atomic64_t quotarealms_count; /* # realms with quota */
+ /*
+ * We keep a list of inodes we don't see in the mountpoint but that we
+ * need to track quota realms.
+ */
+ struct rb_root quotarealms_inodes;
+ struct mutex quotarealms_inodes_mutex;
/*
* snap_rwsem will cover cap linkage into snaprealms, and
@@ -447,8 +465,9 @@ extern int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
struct inode *dir);
extern struct ceph_mds_request *
ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode);
-extern void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
- struct ceph_mds_request *req);
+extern int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
+ struct inode *dir,
+ struct ceph_mds_request *req);
extern int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
struct inode *dir,
struct ceph_mds_request *req);
@@ -468,8 +487,18 @@ extern void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session);
extern void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc);
extern void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr);
+extern int ceph_iterate_session_caps(struct ceph_mds_session *session,
+ int (*cb)(struct inode *,
+ struct ceph_cap *, void *),
+ void *arg);
extern void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc);
+static inline void ceph_mdsc_free_path(char *path, int len)
+{
+ if (path)
+ __putname(path - (PATH_MAX - 1 - len));
+}
+
extern char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
int stop_on_nosnap);
diff --git a/fs/ceph/mdsmap.c b/fs/ceph/mdsmap.c
index 1a2c5d390f7f..701b4fb0fb5a 100644
--- a/fs/ceph/mdsmap.c
+++ b/fs/ceph/mdsmap.c
@@ -205,7 +205,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
dout("mdsmap_decode %d/%d %lld mds%d.%d %s %s\n",
i+1, n, global_id, mds, inc,
- ceph_pr_addr(&addr.in_addr),
+ ceph_pr_addr(&addr),
ceph_mds_state_name(state));
if (mds < 0 || state <= 0)
diff --git a/fs/ceph/quota.c b/fs/ceph/quota.c
index 9455d3aef0c3..c4522212872c 100644
--- a/fs/ceph/quota.c
+++ b/fs/ceph/quota.c
@@ -22,7 +22,16 @@ void ceph_adjust_quota_realms_count(struct inode *inode, bool inc)
static inline bool ceph_has_realms_with_quotas(struct inode *inode)
{
struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
- return atomic64_read(&mdsc->quotarealms_count) > 0;
+ struct super_block *sb = mdsc->fsc->sb;
+
+ if (atomic64_read(&mdsc->quotarealms_count) > 0)
+ return true;
+ /* if root is the real CephFS root, we don't have quota realms */
+ if (sb->s_root->d_inode &&
+ (sb->s_root->d_inode->i_ino == CEPH_INO_ROOT))
+ return false;
+ /* otherwise, we can't know for sure */
+ return true;
}
void ceph_handle_quota(struct ceph_mds_client *mdsc,
@@ -68,6 +77,108 @@ void ceph_handle_quota(struct ceph_mds_client *mdsc,
iput(inode);
}
+static struct ceph_quotarealm_inode *
+find_quotarealm_inode(struct ceph_mds_client *mdsc, u64 ino)
+{
+ struct ceph_quotarealm_inode *qri = NULL;
+ struct rb_node **node, *parent = NULL;
+
+ mutex_lock(&mdsc->quotarealms_inodes_mutex);
+ node = &(mdsc->quotarealms_inodes.rb_node);
+ while (*node) {
+ parent = *node;
+ qri = container_of(*node, struct ceph_quotarealm_inode, node);
+
+ if (ino < qri->ino)
+ node = &((*node)->rb_left);
+ else if (ino > qri->ino)
+ node = &((*node)->rb_right);
+ else
+ break;
+ }
+ if (!qri || (qri->ino != ino)) {
+ /* Not found, create a new one and insert it */
+ qri = kmalloc(sizeof(*qri), GFP_KERNEL);
+ if (qri) {
+ qri->ino = ino;
+ qri->inode = NULL;
+ qri->timeout = 0;
+ mutex_init(&qri->mutex);
+ rb_link_node(&qri->node, parent, node);
+ rb_insert_color(&qri->node, &mdsc->quotarealms_inodes);
+ } else
+ pr_warn("Failed to alloc quotarealms_inode\n");
+ }
+ mutex_unlock(&mdsc->quotarealms_inodes_mutex);
+
+ return qri;
+}
+
+/*
+ * This function will try to lookup a realm inode which isn't visible in the
+ * filesystem mountpoint. A list of these kind of inodes (not visible) is
+ * maintained in the mdsc and freed only when the filesystem is umounted.
+ *
+ * Note that these inodes are kept in this list even if the lookup fails, which
+ * allows to prevent useless lookup requests.
+ */
+static struct inode *lookup_quotarealm_inode(struct ceph_mds_client *mdsc,
+ struct super_block *sb,
+ struct ceph_snap_realm *realm)
+{
+ struct ceph_quotarealm_inode *qri;
+ struct inode *in;
+
+ qri = find_quotarealm_inode(mdsc, realm->ino);
+ if (!qri)
+ return NULL;
+
+ mutex_lock(&qri->mutex);
+ if (qri->inode) {
+ /* A request has already returned the inode */
+ mutex_unlock(&qri->mutex);
+ return qri->inode;
+ }
+ /* Check if this inode lookup has failed recently */
+ if (qri->timeout &&
+ time_before_eq(jiffies, qri->timeout)) {
+ mutex_unlock(&qri->mutex);
+ return NULL;
+ }
+ in = ceph_lookup_inode(sb, realm->ino);
+ if (IS_ERR(in)) {
+ pr_warn("Can't lookup inode %llx (err: %ld)\n",
+ realm->ino, PTR_ERR(in));
+ qri->timeout = jiffies + msecs_to_jiffies(60 * 1000); /* XXX */
+ } else {
+ qri->timeout = 0;
+ qri->inode = in;
+ }
+ mutex_unlock(&qri->mutex);
+
+ return in;
+}
+
+void ceph_cleanup_quotarealms_inodes(struct ceph_mds_client *mdsc)
+{
+ struct ceph_quotarealm_inode *qri;
+ struct rb_node *node;
+
+ /*
+ * It should now be safe to clean quotarealms_inode tree without holding
+ * mdsc->quotarealms_inodes_mutex...
+ */
+ mutex_lock(&mdsc->quotarealms_inodes_mutex);
+ while (!RB_EMPTY_ROOT(&mdsc->quotarealms_inodes)) {
+ node = rb_first(&mdsc->quotarealms_inodes);
+ qri = rb_entry(node, struct ceph_quotarealm_inode, node);
+ rb_erase(node, &mdsc->quotarealms_inodes);
+ iput(qri->inode);
+ kfree(qri);
+ }
+ mutex_unlock(&mdsc->quotarealms_inodes_mutex);
+}
+
/*
* This function walks through the snaprealm for an inode and returns the
* ceph_snap_realm for the first snaprealm that has quotas set (either max_files
@@ -76,9 +187,15 @@ void ceph_handle_quota(struct ceph_mds_client *mdsc,
*
* Note that the caller is responsible for calling ceph_put_snap_realm() on the
* returned realm.
+ *
+ * Callers of this function need to hold mdsc->snap_rwsem. However, if there's
+ * a need to do an inode lookup, this rwsem will be temporarily dropped. Hence
+ * the 'retry' argument: if rwsem needs to be dropped and 'retry' is 'false'
+ * this function will return -EAGAIN; otherwise, the snaprealms walk-through
+ * will be restarted.
*/
static struct ceph_snap_realm *get_quota_realm(struct ceph_mds_client *mdsc,
- struct inode *inode)
+ struct inode *inode, bool retry)
{
struct ceph_inode_info *ci = NULL;
struct ceph_snap_realm *realm, *next;
@@ -88,6 +205,7 @@ static struct ceph_snap_realm *get_quota_realm(struct ceph_mds_client *mdsc,
if (ceph_snap(inode) != CEPH_NOSNAP)
return NULL;
+restart:
realm = ceph_inode(inode)->i_snap_realm;
if (realm)
ceph_get_snap_realm(mdsc, realm);
@@ -95,11 +213,25 @@ static struct ceph_snap_realm *get_quota_realm(struct ceph_mds_client *mdsc,
pr_err_ratelimited("get_quota_realm: ino (%llx.%llx) "
"null i_snap_realm\n", ceph_vinop(inode));
while (realm) {
+ bool has_inode;
+
spin_lock(&realm->inodes_with_caps_lock);
- in = realm->inode ? igrab(realm->inode) : NULL;
+ has_inode = realm->inode;
+ in = has_inode ? igrab(realm->inode) : NULL;
spin_unlock(&realm->inodes_with_caps_lock);
- if (!in)
+ if (has_inode && !in)
break;
+ if (!in) {
+ up_read(&mdsc->snap_rwsem);
+ in = lookup_quotarealm_inode(mdsc, inode->i_sb, realm);
+ down_read(&mdsc->snap_rwsem);
+ if (IS_ERR_OR_NULL(in))
+ break;
+ ceph_put_snap_realm(mdsc, realm);
+ if (!retry)
+ return ERR_PTR(-EAGAIN);
+ goto restart;
+ }
ci = ceph_inode(in);
has_quota = __ceph_has_any_quota(ci);
@@ -125,9 +257,22 @@ bool ceph_quota_is_same_realm(struct inode *old, struct inode *new)
struct ceph_snap_realm *old_realm, *new_realm;
bool is_same;
+restart:
+ /*
+ * We need to lookup 2 quota realms atomically, i.e. with snap_rwsem.
+ * However, get_quota_realm may drop it temporarily. By setting the
+ * 'retry' parameter to 'false', we'll get -EAGAIN if the rwsem was
+ * dropped and we can then restart the whole operation.
+ */
down_read(&mdsc->snap_rwsem);
- old_realm = get_quota_realm(mdsc, old);
- new_realm = get_quota_realm(mdsc, new);
+ old_realm = get_quota_realm(mdsc, old, true);
+ new_realm = get_quota_realm(mdsc, new, false);
+ if (PTR_ERR(new_realm) == -EAGAIN) {
+ up_read(&mdsc->snap_rwsem);
+ if (old_realm)
+ ceph_put_snap_realm(mdsc, old_realm);
+ goto restart;
+ }
is_same = (old_realm == new_realm);
up_read(&mdsc->snap_rwsem);
@@ -166,6 +311,7 @@ static bool check_quota_exceeded(struct inode *inode, enum quota_check_op op,
return false;
down_read(&mdsc->snap_rwsem);
+restart:
realm = ceph_inode(inode)->i_snap_realm;
if (realm)
ceph_get_snap_realm(mdsc, realm);
@@ -173,12 +319,23 @@ static bool check_quota_exceeded(struct inode *inode, enum quota_check_op op,
pr_err_ratelimited("check_quota_exceeded: ino (%llx.%llx) "
"null i_snap_realm\n", ceph_vinop(inode));
while (realm) {
+ bool has_inode;
+
spin_lock(&realm->inodes_with_caps_lock);
- in = realm->inode ? igrab(realm->inode) : NULL;
+ has_inode = realm->inode;
+ in = has_inode ? igrab(realm->inode) : NULL;
spin_unlock(&realm->inodes_with_caps_lock);
- if (!in)
+ if (has_inode && !in)
break;
-
+ if (!in) {
+ up_read(&mdsc->snap_rwsem);
+ in = lookup_quotarealm_inode(mdsc, inode->i_sb, realm);
+ down_read(&mdsc->snap_rwsem);
+ if (IS_ERR_OR_NULL(in))
+ break;
+ ceph_put_snap_realm(mdsc, realm);
+ goto restart;
+ }
ci = ceph_inode(in);
spin_lock(&ci->i_ceph_lock);
if (op == QUOTA_CHECK_MAX_FILES_OP) {
@@ -314,7 +471,7 @@ bool ceph_quota_update_statfs(struct ceph_fs_client *fsc, struct kstatfs *buf)
bool is_updated = false;
down_read(&mdsc->snap_rwsem);
- realm = get_quota_realm(mdsc, d_inode(fsc->sb->s_root));
+ realm = get_quota_realm(mdsc, d_inode(fsc->sb->s_root), true);
up_read(&mdsc->snap_rwsem);
if (!realm)
return false;
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 285edda4fc3b..c864b44c8341 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -845,6 +845,12 @@ static void ceph_umount_begin(struct super_block *sb)
return;
}
+static int ceph_remount(struct super_block *sb, int *flags, char *data)
+{
+ sync_filesystem(sb);
+ return 0;
+}
+
static const struct super_operations ceph_super_ops = {
.alloc_inode = ceph_alloc_inode,
.destroy_inode = ceph_destroy_inode,
@@ -853,6 +859,7 @@ static const struct super_operations ceph_super_ops = {
.drop_inode = ceph_drop_inode,
.sync_fs = ceph_sync_fs,
.put_super = ceph_put_super,
+ .remount_fs = ceph_remount,
.show_options = ceph_show_options,
.statfs = ceph_statfs,
.umount_begin = ceph_umount_begin,
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index c5b4a05905c0..6edab9a750f8 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -1083,6 +1083,7 @@ extern long ceph_ioctl(struct file *file, unsigned int cmd, unsigned long arg);
/* export.c */
extern const struct export_operations ceph_export_ops;
+struct inode *ceph_lookup_inode(struct super_block *sb, u64 ino);
/* locks.c */
extern __init void ceph_flock_init(void);
@@ -1133,5 +1134,6 @@ extern bool ceph_quota_is_max_bytes_approaching(struct inode *inode,
loff_t newlen);
extern bool ceph_quota_update_statfs(struct ceph_fs_client *fsc,
struct kstatfs *buf);
+extern void ceph_cleanup_quotarealms_inodes(struct ceph_mds_client *mdsc);
#endif /* _FS_CEPH_SUPER_H */