From d9171b9345261e0d941d92fdda5672b5db67f968 Mon Sep 17 00:00:00 2001 From: Al Viro Date: Fri, 15 Apr 2016 03:33:13 -0400 Subject: parallel lookups machinery, part 4 (and last) If we *do* run into an in-lookup match, we need to wait for it to cease being in-lookup. Fortunately, we do have unused space in in-lookup dentries - d_lru is never looked at until it stops being in-lookup. So we can stash a pointer to wait_queue_head from stack frame of the caller of ->lookup(). Some precautions are needed while waiting, but it's not that hard - we do hold a reference to dentry we are waiting for, so it can't go away. If it's found to be in-lookup the wait_queue_head is still alive and will remain so at least while ->d_lock is held. Moreover, the condition we are waiting for becomes true at the same point where everything on that wq gets woken up, so we can just add ourselves to the queue once. d_alloc_parallel() gets a pointer to wait_queue_head_t from its caller; lookup_slow() adjusted, d_add_ci() taught to use d_alloc_parallel() if the dentry passed to it happens to be in-lookup one (i.e. if it's been called from the parallel lookup). That's pretty much it - all that remains is to switch ->i_mutex to rwsem and have lookup_slow() take it shared. Signed-off-by: Al Viro --- fs/dcache.c | 94 ++++++++++++++++++++++++++++++++++++++++++++++++------------- fs/namei.c | 3 +- 2 files changed, 76 insertions(+), 21 deletions(-) (limited to 'fs') diff --git a/fs/dcache.c b/fs/dcache.c index ea2de7c19b08..59fcffcbf096 100644 --- a/fs/dcache.c +++ b/fs/dcache.c @@ -1987,28 +1987,36 @@ EXPORT_SYMBOL(d_obtain_root); struct dentry *d_add_ci(struct dentry *dentry, struct inode *inode, struct qstr *name) { - struct dentry *found; - struct dentry *new; + struct dentry *found, *res; /* * First check if a dentry matching the name already exists, * if not go ahead and create it now. */ found = d_hash_and_lookup(dentry->d_parent, name); - if (!found) { - new = d_alloc(dentry->d_parent, name); - if (!new) { - found = ERR_PTR(-ENOMEM); - } else { - found = d_splice_alias(inode, new); - if (found) { - dput(new); - return found; - } - return new; + if (found) { + iput(inode); + return found; + } + if (d_in_lookup(dentry)) { + found = d_alloc_parallel(dentry->d_parent, name, + dentry->d_wait); + if (IS_ERR(found) || !d_in_lookup(found)) { + iput(inode); + return found; } + } else { + found = d_alloc(dentry->d_parent, name); + if (!found) { + iput(inode); + return ERR_PTR(-ENOMEM); + } + } + res = d_splice_alias(inode, found); + if (res) { + dput(found); + return res; } - iput(inode); return found; } EXPORT_SYMBOL(d_add_ci); @@ -2391,8 +2399,23 @@ static inline void end_dir_add(struct inode *dir, unsigned n) smp_store_release(&dir->i_dir_seq, n + 2); } +static void d_wait_lookup(struct dentry *dentry) +{ + if (d_in_lookup(dentry)) { + DECLARE_WAITQUEUE(wait, current); + add_wait_queue(dentry->d_wait, &wait); + do { + set_current_state(TASK_UNINTERRUPTIBLE); + spin_unlock(&dentry->d_lock); + schedule(); + spin_lock(&dentry->d_lock); + } while (d_in_lookup(dentry)); + } +} + struct dentry *d_alloc_parallel(struct dentry *parent, - const struct qstr *name) + const struct qstr *name, + wait_queue_head_t *wq) { unsigned int len = name->len; unsigned int hash = name->hash; @@ -2463,18 +2486,47 @@ retry: } dget(dentry); hlist_bl_unlock(b); - /* impossible until we actually enable parallel lookups */ - BUG(); - /* and this will be "wait for it to stop being in-lookup" */ - /* this one will be handled in the next commit */ + /* somebody is doing lookup for it right now; wait for it */ + spin_lock(&dentry->d_lock); + d_wait_lookup(dentry); + /* + * it's not in-lookup anymore; in principle we should repeat + * everything from dcache lookup, but it's likely to be what + * d_lookup() would've found anyway. If it is, just return it; + * otherwise we really have to repeat the whole thing. + */ + if (unlikely(dentry->d_name.hash != hash)) + goto mismatch; + if (unlikely(dentry->d_parent != parent)) + goto mismatch; + if (unlikely(d_unhashed(dentry))) + goto mismatch; + if (parent->d_flags & DCACHE_OP_COMPARE) { + int tlen = dentry->d_name.len; + const char *tname = dentry->d_name.name; + if (parent->d_op->d_compare(parent, dentry, tlen, tname, name)) + goto mismatch; + } else { + if (unlikely(dentry->d_name.len != len)) + goto mismatch; + if (unlikely(dentry_cmp(dentry, str, len))) + goto mismatch; + } + /* OK, it *is* a hashed match; return it */ + spin_unlock(&dentry->d_lock); dput(new); return dentry; } /* we can't take ->d_lock here; it's OK, though. */ new->d_flags |= DCACHE_PAR_LOOKUP; + new->d_wait = wq; hlist_bl_add_head_rcu(&new->d_u.d_in_lookup_hash, b); hlist_bl_unlock(b); return new; +mismatch: + spin_unlock(&dentry->d_lock); + dput(dentry); + goto retry; } EXPORT_SYMBOL(d_alloc_parallel); @@ -2485,9 +2537,11 @@ void __d_lookup_done(struct dentry *dentry) hlist_bl_lock(b); dentry->d_flags &= ~DCACHE_PAR_LOOKUP; __hlist_bl_del(&dentry->d_u.d_in_lookup_hash); + wake_up_all(dentry->d_wait); + dentry->d_wait = NULL; hlist_bl_unlock(b); INIT_HLIST_NODE(&dentry->d_u.d_alias); - /* more stuff will land here */ + INIT_LIST_HEAD(&dentry->d_lru); } EXPORT_SYMBOL(__d_lookup_done); diff --git a/fs/namei.c b/fs/namei.c index aa04320e1f37..7babb5e5f276 100644 --- a/fs/namei.c +++ b/fs/namei.c @@ -1605,13 +1605,14 @@ static struct dentry *lookup_slow(const struct qstr *name, { struct dentry *dentry = ERR_PTR(-ENOENT), *old; struct inode *inode = dir->d_inode; + DECLARE_WAIT_QUEUE_HEAD_ONSTACK(wq); inode_lock(inode); /* Don't go there if it's already dead */ if (unlikely(IS_DEADDIR(inode))) goto out; again: - dentry = d_alloc_parallel(dir, name); + dentry = d_alloc_parallel(dir, name, &wq); if (IS_ERR(dentry)) goto out; if (unlikely(!d_in_lookup(dentry))) { -- cgit v1.2.3