From 963b61eb041e8850807d95f8d7a4c6a454c45000 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 6 Oct 2009 11:31:12 -0700 Subject: ceph: snapshot management Ceph snapshots rely on client cooperation in determining which operations apply to which snapshots, and appropriately flushing snapshotted data and metadata back to the OSD and MDS clusters. Because snapshots apply to subtrees of the file hierarchy and can be created at any time, there is a fair bit of bookkeeping required to make this work. Portions of the hierarchy that belong to the same set of snapshots are described by a single 'snap realm.' A 'snap context' describes the set of snapshots that exist for a given file or directory. Signed-off-by: Sage Weil --- fs/ceph/snap.c | 897 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 897 insertions(+) create mode 100644 fs/ceph/snap.c (limited to 'fs/ceph/snap.c') diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c new file mode 100644 index 000000000000..2e3cb40b7e48 --- /dev/null +++ b/fs/ceph/snap.c @@ -0,0 +1,897 @@ +#include "ceph_debug.h" + +#include +#include + +#include "super.h" +#include "decode.h" + +/* + * Snapshots in ceph are driven in large part by cooperation from the + * client. In contrast to local file systems or file servers that + * implement snapshots at a single point in the system, ceph's + * distributed access to storage requires clients to help decide + * whether a write logically occurs before or after a recently created + * snapshot. + * + * This provides a perfect instantanous client-wide snapshot. Between + * clients, however, snapshots may appear to be applied at slightly + * different points in time, depending on delays in delivering the + * snapshot notification. + * + * Snapshots are _not_ file system-wide. Instead, each snapshot + * applies to the subdirectory nested beneath some directory. This + * effectively divides the hierarchy into multiple "realms," where all + * of the files contained by each realm share the same set of + * snapshots. An individual realm's snap set contains snapshots + * explicitly created on that realm, as well as any snaps in its + * parent's snap set _after_ the point at which the parent became it's + * parent (due to, say, a rename). Similarly, snaps from prior parents + * during the time intervals during which they were the parent are included. + * + * The client is spared most of this detail, fortunately... it must only + * maintains a hierarchy of realms reflecting the current parent/child + * realm relationship, and for each realm has an explicit list of snaps + * inherited from prior parents. + * + * A snap_realm struct is maintained for realms containing every inode + * with an open cap in the system. (The needed snap realm information is + * provided by the MDS whenever a cap is issued, i.e., on open.) A 'seq' + * version number is used to ensure that as realm parameters change (new + * snapshot, new parent, etc.) the client's realm hierarchy is updated. + * + * The realm hierarchy drives the generation of a 'snap context' for each + * realm, which simply lists the resulting set of snaps for the realm. This + * is attached to any writes sent to OSDs. + */ +/* + * Unfortunately error handling is a bit mixed here. If we get a snap + * update, but don't have enough memory to update our realm hierarchy, + * it's not clear what we can do about it (besides complaining to the + * console). + */ + + +/* + * increase ref count for the realm + * + * caller must hold snap_rwsem for write. + */ +void ceph_get_snap_realm(struct ceph_mds_client *mdsc, + struct ceph_snap_realm *realm) +{ + dout("get_realm %p %d -> %d\n", realm, + atomic_read(&realm->nref), atomic_read(&realm->nref)+1); + /* + * since we _only_ increment realm refs or empty the empty + * list with snap_rwsem held, adjusting the empty list here is + * safe. we do need to protect against concurrent empty list + * additions, however. + */ + if (atomic_read(&realm->nref) == 0) { + spin_lock(&mdsc->snap_empty_lock); + list_del_init(&realm->empty_item); + spin_unlock(&mdsc->snap_empty_lock); + } + + atomic_inc(&realm->nref); +} + +/* + * create and get the realm rooted at @ino and bump its ref count. + * + * caller must hold snap_rwsem for write. + */ +static struct ceph_snap_realm *ceph_create_snap_realm( + struct ceph_mds_client *mdsc, + u64 ino) +{ + struct ceph_snap_realm *realm; + + realm = kzalloc(sizeof(*realm), GFP_NOFS); + if (!realm) + return ERR_PTR(-ENOMEM); + + radix_tree_insert(&mdsc->snap_realms, ino, realm); + + atomic_set(&realm->nref, 0); /* tree does not take a ref */ + realm->ino = ino; + INIT_LIST_HEAD(&realm->children); + INIT_LIST_HEAD(&realm->child_item); + INIT_LIST_HEAD(&realm->empty_item); + INIT_LIST_HEAD(&realm->inodes_with_caps); + spin_lock_init(&realm->inodes_with_caps_lock); + dout("create_snap_realm %llx %p\n", realm->ino, realm); + return realm; +} + +/* + * find and get (if found) the realm rooted at @ino and bump its ref count. + * + * caller must hold snap_rwsem for write. + */ +struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc, + u64 ino) +{ + struct ceph_snap_realm *realm; + + realm = radix_tree_lookup(&mdsc->snap_realms, ino); + if (realm) + dout("lookup_snap_realm %llx %p\n", realm->ino, realm); + return realm; +} + +static void __put_snap_realm(struct ceph_mds_client *mdsc, + struct ceph_snap_realm *realm); + +/* + * called with snap_rwsem (write) + */ +static void __destroy_snap_realm(struct ceph_mds_client *mdsc, + struct ceph_snap_realm *realm) +{ + dout("__destroy_snap_realm %p %llx\n", realm, realm->ino); + + radix_tree_delete(&mdsc->snap_realms, realm->ino); + + if (realm->parent) { + list_del_init(&realm->child_item); + __put_snap_realm(mdsc, realm->parent); + } + + kfree(realm->prior_parent_snaps); + kfree(realm->snaps); + ceph_put_snap_context(realm->cached_context); + kfree(realm); +} + +/* + * caller holds snap_rwsem (write) + */ +static void __put_snap_realm(struct ceph_mds_client *mdsc, + struct ceph_snap_realm *realm) +{ + dout("__put_snap_realm %llx %p %d -> %d\n", realm->ino, realm, + atomic_read(&realm->nref), atomic_read(&realm->nref)-1); + if (atomic_dec_and_test(&realm->nref)) + __destroy_snap_realm(mdsc, realm); +} + +/* + * caller needn't hold any locks + */ +void ceph_put_snap_realm(struct ceph_mds_client *mdsc, + struct ceph_snap_realm *realm) +{ + dout("put_snap_realm %llx %p %d -> %d\n", realm->ino, realm, + atomic_read(&realm->nref), atomic_read(&realm->nref)-1); + if (!atomic_dec_and_test(&realm->nref)) + return; + + if (down_write_trylock(&mdsc->snap_rwsem)) { + __destroy_snap_realm(mdsc, realm); + up_write(&mdsc->snap_rwsem); + } else { + spin_lock(&mdsc->snap_empty_lock); + list_add(&mdsc->snap_empty, &realm->empty_item); + spin_unlock(&mdsc->snap_empty_lock); + } +} + +/* + * Clean up any realms whose ref counts have dropped to zero. Note + * that this does not include realms who were created but not yet + * used. + * + * Called under snap_rwsem (write) + */ +static void __cleanup_empty_realms(struct ceph_mds_client *mdsc) +{ + struct ceph_snap_realm *realm; + + spin_lock(&mdsc->snap_empty_lock); + while (!list_empty(&mdsc->snap_empty)) { + realm = list_first_entry(&mdsc->snap_empty, + struct ceph_snap_realm, empty_item); + list_del(&realm->empty_item); + spin_unlock(&mdsc->snap_empty_lock); + __destroy_snap_realm(mdsc, realm); + spin_lock(&mdsc->snap_empty_lock); + } + spin_unlock(&mdsc->snap_empty_lock); +} + +void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc) +{ + down_write(&mdsc->snap_rwsem); + __cleanup_empty_realms(mdsc); + up_write(&mdsc->snap_rwsem); +} + +/* + * adjust the parent realm of a given @realm. adjust child list, and parent + * pointers, and ref counts appropriately. + * + * return true if parent was changed, 0 if unchanged, <0 on error. + * + * caller must hold snap_rwsem for write. + */ +static int adjust_snap_realm_parent(struct ceph_mds_client *mdsc, + struct ceph_snap_realm *realm, + u64 parentino) +{ + struct ceph_snap_realm *parent; + + if (realm->parent_ino == parentino) + return 0; + + parent = ceph_lookup_snap_realm(mdsc, parentino); + if (IS_ERR(parent)) + return PTR_ERR(parent); + if (!parent) { + parent = ceph_create_snap_realm(mdsc, parentino); + if (IS_ERR(parent)) + return PTR_ERR(parent); + } + dout("adjust_snap_realm_parent %llx %p: %llx %p -> %llx %p\n", + realm->ino, realm, realm->parent_ino, realm->parent, + parentino, parent); + if (realm->parent) { + list_del_init(&realm->child_item); + ceph_put_snap_realm(mdsc, realm->parent); + } + realm->parent_ino = parentino; + realm->parent = parent; + ceph_get_snap_realm(mdsc, parent); + list_add(&realm->child_item, &parent->children); + return 1; +} + + +static int cmpu64_rev(const void *a, const void *b) +{ + if (*(u64 *)a < *(u64 *)b) + return 1; + if (*(u64 *)a > *(u64 *)b) + return -1; + return 0; +} + +/* + * build the snap context for a given realm. + */ +static int build_snap_context(struct ceph_snap_realm *realm) +{ + struct ceph_snap_realm *parent = realm->parent; + struct ceph_snap_context *snapc; + int err = 0; + int i; + int num = realm->num_prior_parent_snaps + realm->num_snaps; + + /* + * build parent context, if it hasn't been built. + * conservatively estimate that all parent snaps might be + * included by us. + */ + if (parent) { + if (!parent->cached_context) { + err = build_snap_context(parent); + if (err) + goto fail; + } + num += parent->cached_context->num_snaps; + } + + /* do i actually need to update? not if my context seq + matches realm seq, and my parents' does to. (this works + because we rebuild_snap_realms() works _downward_ in + hierarchy after each update.) */ + if (realm->cached_context && + realm->cached_context->seq <= realm->seq && + (!parent || + realm->cached_context->seq <= parent->cached_context->seq)) { + dout("build_snap_context %llx %p: %p seq %lld (%d snaps)" + " (unchanged)\n", + realm->ino, realm, realm->cached_context, + realm->cached_context->seq, + realm->cached_context->num_snaps); + return 0; + } + + /* alloc new snap context */ + err = -ENOMEM; + if (num > ULONG_MAX / sizeof(u64) - sizeof(*snapc)) + goto fail; + snapc = kzalloc(sizeof(*snapc) + num*sizeof(u64), GFP_NOFS); + if (!snapc) + goto fail; + atomic_set(&snapc->nref, 1); + + /* build (reverse sorted) snap vector */ + num = 0; + snapc->seq = realm->seq; + if (parent) { + /* include any of parent's snaps occuring _after_ my + parent became my parent */ + for (i = 0; i < parent->cached_context->num_snaps; i++) + if (parent->cached_context->snaps[i] >= + realm->parent_since) + snapc->snaps[num++] = + parent->cached_context->snaps[i]; + if (parent->cached_context->seq > snapc->seq) + snapc->seq = parent->cached_context->seq; + } + memcpy(snapc->snaps + num, realm->snaps, + sizeof(u64)*realm->num_snaps); + num += realm->num_snaps; + memcpy(snapc->snaps + num, realm->prior_parent_snaps, + sizeof(u64)*realm->num_prior_parent_snaps); + num += realm->num_prior_parent_snaps; + + sort(snapc->snaps, num, sizeof(u64), cmpu64_rev, NULL); + snapc->num_snaps = num; + dout("build_snap_context %llx %p: %p seq %lld (%d snaps)\n", + realm->ino, realm, snapc, snapc->seq, snapc->num_snaps); + + if (realm->cached_context) + ceph_put_snap_context(realm->cached_context); + realm->cached_context = snapc; + return 0; + +fail: + /* + * if we fail, clear old (incorrect) cached_context... hopefully + * we'll have better luck building it later + */ + if (realm->cached_context) { + ceph_put_snap_context(realm->cached_context); + realm->cached_context = NULL; + } + pr_err("build_snap_context %llx %p fail %d\n", realm->ino, + realm, err); + return err; +} + +/* + * rebuild snap context for the given realm and all of its children. + */ +static void rebuild_snap_realms(struct ceph_snap_realm *realm) +{ + struct ceph_snap_realm *child; + + dout("rebuild_snap_realms %llx %p\n", realm->ino, realm); + build_snap_context(realm); + + list_for_each_entry(child, &realm->children, child_item) + rebuild_snap_realms(child); +} + + +/* + * helper to allocate and decode an array of snapids. free prior + * instance, if any. + */ +static int dup_array(u64 **dst, __le64 *src, int num) +{ + int i; + + kfree(*dst); + if (num) { + *dst = kcalloc(num, sizeof(u64), GFP_NOFS); + if (!*dst) + return -ENOMEM; + for (i = 0; i < num; i++) + (*dst)[i] = get_unaligned_le64(src + i); + } else { + *dst = NULL; + } + return 0; +} + + +/* + * When a snapshot is applied, the size/mtime inode metadata is queued + * in a ceph_cap_snap (one for each snapshot) until writeback + * completes and the metadata can be flushed back to the MDS. + * + * However, if a (sync) write is currently in-progress when we apply + * the snapshot, we have to wait until the write succeeds or fails + * (and a final size/mtime is known). In this case the + * cap_snap->writing = 1, and is said to be "pending." When the write + * finishes, we __ceph_finish_cap_snap(). + * + * Caller must hold snap_rwsem for read (i.e., the realm topology won't + * change). + */ +void ceph_queue_cap_snap(struct ceph_inode_info *ci, + struct ceph_snap_context *snapc) +{ + struct inode *inode = &ci->vfs_inode; + struct ceph_cap_snap *capsnap; + int used; + + capsnap = kzalloc(sizeof(*capsnap), GFP_NOFS); + if (!capsnap) { + pr_err("ENOMEM allocating ceph_cap_snap on %p\n", inode); + return; + } + + spin_lock(&inode->i_lock); + used = __ceph_caps_used(ci); + if (__ceph_have_pending_cap_snap(ci)) { + /* there is no point in queuing multiple "pending" cap_snaps, + as no new writes are allowed to start when pending, so any + writes in progress now were started before the previous + cap_snap. lucky us. */ + dout("queue_cap_snap %p snapc %p seq %llu used %d" + " already pending\n", inode, snapc, snapc->seq, used); + kfree(capsnap); + } else if (ci->i_wrbuffer_ref_head || (used & CEPH_CAP_FILE_WR)) { + igrab(inode); + + atomic_set(&capsnap->nref, 1); + capsnap->ci = ci; + INIT_LIST_HEAD(&capsnap->ci_item); + INIT_LIST_HEAD(&capsnap->flushing_item); + + capsnap->follows = snapc->seq - 1; + capsnap->context = ceph_get_snap_context(snapc); + capsnap->issued = __ceph_caps_issued(ci, NULL); + capsnap->dirty = __ceph_caps_dirty(ci); + + capsnap->mode = inode->i_mode; + capsnap->uid = inode->i_uid; + capsnap->gid = inode->i_gid; + + /* fixme? */ + capsnap->xattr_blob = NULL; + capsnap->xattr_len = 0; + + /* dirty page count moved from _head to this cap_snap; + all subsequent writes page dirties occur _after_ this + snapshot. */ + capsnap->dirty_pages = ci->i_wrbuffer_ref_head; + ci->i_wrbuffer_ref_head = 0; + ceph_put_snap_context(ci->i_head_snapc); + ci->i_head_snapc = NULL; + list_add_tail(&capsnap->ci_item, &ci->i_cap_snaps); + + if (used & CEPH_CAP_FILE_WR) { + dout("queue_cap_snap %p cap_snap %p snapc %p" + " seq %llu used WR, now pending\n", inode, + capsnap, snapc, snapc->seq); + capsnap->writing = 1; + } else { + /* note mtime, size NOW. */ + __ceph_finish_cap_snap(ci, capsnap); + } + } else { + dout("queue_cap_snap %p nothing dirty|writing\n", inode); + kfree(capsnap); + } + + spin_unlock(&inode->i_lock); +} + +/* + * Finalize the size, mtime for a cap_snap.. that is, settle on final values + * to be used for the snapshot, to be flushed back to the mds. + * + * If capsnap can now be flushed, add to snap_flush list, and return 1. + * + * Caller must hold i_lock. + */ +int __ceph_finish_cap_snap(struct ceph_inode_info *ci, + struct ceph_cap_snap *capsnap) +{ + struct inode *inode = &ci->vfs_inode; + struct ceph_mds_client *mdsc = &ceph_client(inode->i_sb)->mdsc; + + BUG_ON(capsnap->writing); + capsnap->size = inode->i_size; + capsnap->mtime = inode->i_mtime; + capsnap->atime = inode->i_atime; + capsnap->ctime = inode->i_ctime; + capsnap->time_warp_seq = ci->i_time_warp_seq; + if (capsnap->dirty_pages) { + dout("finish_cap_snap %p cap_snap %p snapc %p %llu s=%llu " + "still has %d dirty pages\n", inode, capsnap, + capsnap->context, capsnap->context->seq, + capsnap->size, capsnap->dirty_pages); + return 0; + } + dout("finish_cap_snap %p cap_snap %p snapc %p %llu s=%llu clean\n", + inode, capsnap, capsnap->context, + capsnap->context->seq, capsnap->size); + + spin_lock(&mdsc->snap_flush_lock); + list_add_tail(&ci->i_snap_flush_item, &mdsc->snap_flush_list); + spin_unlock(&mdsc->snap_flush_lock); + return 1; /* caller may want to ceph_flush_snaps */ +} + + +/* + * Parse and apply a snapblob "snap trace" from the MDS. This specifies + * the snap realm parameters from a given realm and all of its ancestors, + * up to the root. + * + * Caller must hold snap_rwsem for write. + */ +int ceph_update_snap_trace(struct ceph_mds_client *mdsc, + void *p, void *e, bool deletion) +{ + struct ceph_mds_snap_realm *ri; /* encoded */ + __le64 *snaps; /* encoded */ + __le64 *prior_parent_snaps; /* encoded */ + struct ceph_snap_realm *realm; + int invalidate = 0; + int err = -ENOMEM; + + dout("update_snap_trace deletion=%d\n", deletion); +more: + ceph_decode_need(&p, e, sizeof(*ri), bad); + ri = p; + p += sizeof(*ri); + ceph_decode_need(&p, e, sizeof(u64)*(le32_to_cpu(ri->num_snaps) + + le32_to_cpu(ri->num_prior_parent_snaps)), bad); + snaps = p; + p += sizeof(u64) * le32_to_cpu(ri->num_snaps); + prior_parent_snaps = p; + p += sizeof(u64) * le32_to_cpu(ri->num_prior_parent_snaps); + + realm = ceph_lookup_snap_realm(mdsc, le64_to_cpu(ri->ino)); + if (IS_ERR(realm)) { + err = PTR_ERR(realm); + goto fail; + } + if (!realm) { + realm = ceph_create_snap_realm(mdsc, le64_to_cpu(ri->ino)); + if (IS_ERR(realm)) { + err = PTR_ERR(realm); + goto fail; + } + } + + if (le64_to_cpu(ri->seq) > realm->seq) { + dout("update_snap_trace updating %llx %p %lld -> %lld\n", + realm->ino, realm, realm->seq, le64_to_cpu(ri->seq)); + /* + * if the realm seq has changed, queue a cap_snap for every + * inode with open caps. we do this _before_ we update + * the realm info so that we prepare for writeback under the + * _previous_ snap context. + * + * ...unless it's a snap deletion! + */ + if (!deletion) { + struct ceph_inode_info *ci; + struct inode *lastinode = NULL; + + spin_lock(&realm->inodes_with_caps_lock); + list_for_each_entry(ci, &realm->inodes_with_caps, + i_snap_realm_item) { + struct inode *inode = igrab(&ci->vfs_inode); + if (!inode) + continue; + spin_unlock(&realm->inodes_with_caps_lock); + if (lastinode) + iput(lastinode); + lastinode = inode; + ceph_queue_cap_snap(ci, realm->cached_context); + spin_lock(&realm->inodes_with_caps_lock); + } + spin_unlock(&realm->inodes_with_caps_lock); + if (lastinode) + iput(lastinode); + dout("update_snap_trace cap_snaps queued\n"); + } + + } else { + dout("update_snap_trace %llx %p seq %lld unchanged\n", + realm->ino, realm, realm->seq); + } + + /* ensure the parent is correct */ + err = adjust_snap_realm_parent(mdsc, realm, le64_to_cpu(ri->parent)); + if (err < 0) + goto fail; + invalidate += err; + + if (le64_to_cpu(ri->seq) > realm->seq) { + /* update realm parameters, snap lists */ + realm->seq = le64_to_cpu(ri->seq); + realm->created = le64_to_cpu(ri->created); + realm->parent_since = le64_to_cpu(ri->parent_since); + + realm->num_snaps = le32_to_cpu(ri->num_snaps); + err = dup_array(&realm->snaps, snaps, realm->num_snaps); + if (err < 0) + goto fail; + + realm->num_prior_parent_snaps = + le32_to_cpu(ri->num_prior_parent_snaps); + err = dup_array(&realm->prior_parent_snaps, prior_parent_snaps, + realm->num_prior_parent_snaps); + if (err < 0) + goto fail; + + invalidate = 1; + } else if (!realm->cached_context) { + invalidate = 1; + } + + dout("done with %llx %p, invalidated=%d, %p %p\n", realm->ino, + realm, invalidate, p, e); + + if (p < e) + goto more; + + /* invalidate when we reach the _end_ (root) of the trace */ + if (invalidate) + rebuild_snap_realms(realm); + + __cleanup_empty_realms(mdsc); + return 0; + +bad: + err = -EINVAL; +fail: + pr_err("update_snap_trace error %d\n", err); + return err; +} + + +/* + * Send any cap_snaps that are queued for flush. Try to carry + * s_mutex across multiple snap flushes to avoid locking overhead. + * + * Caller holds no locks. + */ +static void flush_snaps(struct ceph_mds_client *mdsc) +{ + struct ceph_inode_info *ci; + struct inode *inode; + struct ceph_mds_session *session = NULL; + + dout("flush_snaps\n"); + spin_lock(&mdsc->snap_flush_lock); + while (!list_empty(&mdsc->snap_flush_list)) { + ci = list_first_entry(&mdsc->snap_flush_list, + struct ceph_inode_info, i_snap_flush_item); + inode = &ci->vfs_inode; + igrab(inode); + spin_unlock(&mdsc->snap_flush_lock); + spin_lock(&inode->i_lock); + __ceph_flush_snaps(ci, &session); + spin_unlock(&inode->i_lock); + iput(inode); + spin_lock(&mdsc->snap_flush_lock); + } + spin_unlock(&mdsc->snap_flush_lock); + + if (session) { + mutex_unlock(&session->s_mutex); + ceph_put_mds_session(session); + } + dout("flush_snaps done\n"); +} + + +/* + * Handle a snap notification from the MDS. + * + * This can take two basic forms: the simplest is just a snap creation + * or deletion notification on an existing realm. This should update the + * realm and its children. + * + * The more difficult case is realm creation, due to snap creation at a + * new point in the file hierarchy, or due to a rename that moves a file or + * directory into another realm. + */ +void ceph_handle_snap(struct ceph_mds_client *mdsc, + struct ceph_msg *msg) +{ + struct super_block *sb = mdsc->client->sb; + struct ceph_mds_session *session; + int mds; + u64 split; + int op; + int trace_len; + struct ceph_snap_realm *realm = NULL; + void *p = msg->front.iov_base; + void *e = p + msg->front.iov_len; + struct ceph_mds_snap_head *h; + int num_split_inos, num_split_realms; + __le64 *split_inos = NULL, *split_realms = NULL; + int i; + int locked_rwsem = 0; + + if (msg->hdr.src.name.type != CEPH_ENTITY_TYPE_MDS) + return; + mds = le64_to_cpu(msg->hdr.src.name.num); + + /* decode */ + if (msg->front.iov_len < sizeof(*h)) + goto bad; + h = p; + op = le32_to_cpu(h->op); + split = le64_to_cpu(h->split); /* non-zero if we are splitting an + * existing realm */ + num_split_inos = le32_to_cpu(h->num_split_inos); + num_split_realms = le32_to_cpu(h->num_split_realms); + trace_len = le32_to_cpu(h->trace_len); + p += sizeof(*h); + + dout("handle_snap from mds%d op %s split %llx tracelen %d\n", mds, + ceph_snap_op_name(op), split, trace_len); + + /* find session */ + mutex_lock(&mdsc->mutex); + session = __ceph_lookup_mds_session(mdsc, mds); + mutex_unlock(&mdsc->mutex); + if (!session) { + dout("WTF, got snap but no session for mds%d\n", mds); + return; + } + + mutex_lock(&session->s_mutex); + session->s_seq++; + mutex_unlock(&session->s_mutex); + + down_write(&mdsc->snap_rwsem); + locked_rwsem = 1; + + if (op == CEPH_SNAP_OP_SPLIT) { + struct ceph_mds_snap_realm *ri; + + /* + * A "split" breaks part of an existing realm off into + * a new realm. The MDS provides a list of inodes + * (with caps) and child realms that belong to the new + * child. + */ + split_inos = p; + p += sizeof(u64) * num_split_inos; + split_realms = p; + p += sizeof(u64) * num_split_realms; + ceph_decode_need(&p, e, sizeof(*ri), bad); + /* we will peek at realm info here, but will _not_ + * advance p, as the realm update will occur below in + * ceph_update_snap_trace. */ + ri = p; + + realm = ceph_lookup_snap_realm(mdsc, split); + if (IS_ERR(realm)) + goto out; + if (!realm) { + realm = ceph_create_snap_realm(mdsc, split); + if (IS_ERR(realm)) + goto out; + } + ceph_get_snap_realm(mdsc, realm); + + dout("splitting snap_realm %llx %p\n", realm->ino, realm); + for (i = 0; i < num_split_inos; i++) { + struct ceph_vino vino = { + .ino = le64_to_cpu(split_inos[i]), + .snap = CEPH_NOSNAP, + }; + struct inode *inode = ceph_find_inode(sb, vino); + struct ceph_inode_info *ci; + + if (!inode) + continue; + ci = ceph_inode(inode); + + spin_lock(&inode->i_lock); + if (!ci->i_snap_realm) + goto skip_inode; + /* + * If this inode belongs to a realm that was + * created after our new realm, we experienced + * a race (due to another split notifications + * arriving from a different MDS). So skip + * this inode. + */ + if (ci->i_snap_realm->created > + le64_to_cpu(ri->created)) { + dout(" leaving %p in newer realm %llx %p\n", + inode, ci->i_snap_realm->ino, + ci->i_snap_realm); + goto skip_inode; + } + dout(" will move %p to split realm %llx %p\n", + inode, realm->ino, realm); + /* + * Remove the inode from the realm's inode + * list, but don't add it to the new realm + * yet. We don't want the cap_snap to be + * queued (again) by ceph_update_snap_trace() + * below. Queue it _now_, under the old context. + */ + list_del_init(&ci->i_snap_realm_item); + spin_unlock(&inode->i_lock); + + ceph_queue_cap_snap(ci, + ci->i_snap_realm->cached_context); + + iput(inode); + continue; + +skip_inode: + spin_unlock(&inode->i_lock); + iput(inode); + } + + /* we may have taken some of the old realm's children. */ + for (i = 0; i < num_split_realms; i++) { + struct ceph_snap_realm *child = + ceph_lookup_snap_realm(mdsc, + le64_to_cpu(split_realms[i])); + if (IS_ERR(child)) + continue; + if (!child) + continue; + adjust_snap_realm_parent(mdsc, child, realm->ino); + } + } + + /* + * update using the provided snap trace. if we are deleting a + * snap, we can avoid queueing cap_snaps. + */ + ceph_update_snap_trace(mdsc, p, e, + op == CEPH_SNAP_OP_DESTROY); + + if (op == CEPH_SNAP_OP_SPLIT) { + /* + * ok, _now_ add the inodes into the new realm. + */ + for (i = 0; i < num_split_inos; i++) { + struct ceph_vino vino = { + .ino = le64_to_cpu(split_inos[i]), + .snap = CEPH_NOSNAP, + }; + struct inode *inode = ceph_find_inode(sb, vino); + struct ceph_inode_info *ci; + + if (!inode) + continue; + ci = ceph_inode(inode); + spin_lock(&inode->i_lock); + if (!ci->i_snap_realm) + goto split_skip_inode; + ceph_put_snap_realm(mdsc, ci->i_snap_realm); + spin_lock(&realm->inodes_with_caps_lock); + list_add(&ci->i_snap_realm_item, + &realm->inodes_with_caps); + ci->i_snap_realm = realm; + spin_unlock(&realm->inodes_with_caps_lock); + ceph_get_snap_realm(mdsc, realm); +split_skip_inode: + spin_unlock(&inode->i_lock); + iput(inode); + } + + /* we took a reference when we created the realm, above */ + ceph_put_snap_realm(mdsc, realm); + } + + __cleanup_empty_realms(mdsc); + + up_write(&mdsc->snap_rwsem); + + flush_snaps(mdsc); + return; + +bad: + pr_err("corrupt snap message from mds%d\n", mds); +out: + if (locked_rwsem) + up_write(&mdsc->snap_rwsem); + return; +} + + + -- cgit v1.2.3 From 75eb3592811028e5b01835126483d115532a3aa1 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Sat, 21 Nov 2009 13:08:14 -0800 Subject: ceph: remove useless IS_ERR checks ceph_lookup_snap_realm either returns a valid pointer or NULL; there is no need to check IS_ERR(result). Reported-by: Julia Lawall Signed-off-by: Sage Weil --- fs/ceph/snap.c | 10 ---------- 1 file changed, 10 deletions(-) (limited to 'fs/ceph/snap.c') diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c index 2e3cb40b7e48..52f46a1208f5 100644 --- a/fs/ceph/snap.c +++ b/fs/ceph/snap.c @@ -226,8 +226,6 @@ static int adjust_snap_realm_parent(struct ceph_mds_client *mdsc, return 0; parent = ceph_lookup_snap_realm(mdsc, parentino); - if (IS_ERR(parent)) - return PTR_ERR(parent); if (!parent) { parent = ceph_create_snap_realm(mdsc, parentino); if (IS_ERR(parent)) @@ -541,10 +539,6 @@ more: p += sizeof(u64) * le32_to_cpu(ri->num_prior_parent_snaps); realm = ceph_lookup_snap_realm(mdsc, le64_to_cpu(ri->ino)); - if (IS_ERR(realm)) { - err = PTR_ERR(realm); - goto fail; - } if (!realm) { realm = ceph_create_snap_realm(mdsc, le64_to_cpu(ri->ino)); if (IS_ERR(realm)) { @@ -762,8 +756,6 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc, ri = p; realm = ceph_lookup_snap_realm(mdsc, split); - if (IS_ERR(realm)) - goto out; if (!realm) { realm = ceph_create_snap_realm(mdsc, split); if (IS_ERR(realm)) @@ -829,8 +821,6 @@ skip_inode: struct ceph_snap_realm *child = ceph_lookup_snap_realm(mdsc, le64_to_cpu(split_realms[i])); - if (IS_ERR(child)) - continue; if (!child) continue; adjust_snap_realm_parent(mdsc, child, realm->ino); -- cgit v1.2.3 From 9ec7cab14e6de732d4e7c355fe67c5810c32c758 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 14 Dec 2009 15:13:47 -0800 Subject: ceph: hex dump corrupt server data to KERN_DEBUG Also, print fsid using standard format, NOT hex dump. Signed-off-by: Sage Weil --- fs/ceph/caps.c | 1 + fs/ceph/mds_client.c | 4 ++++ fs/ceph/mdsmap.c | 4 ++++ fs/ceph/messenger.c | 20 ++++++++++++++++++++ fs/ceph/messenger.h | 2 ++ fs/ceph/mon_client.c | 2 ++ fs/ceph/osd_client.c | 2 ++ fs/ceph/osdmap.c | 3 +++ fs/ceph/snap.c | 1 + fs/ceph/super.c | 9 ++------- 10 files changed, 41 insertions(+), 7 deletions(-) (limited to 'fs/ceph/snap.c') diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 9b9ce143ac1f..dfb509f53542 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -2681,6 +2681,7 @@ done: bad: pr_err("ceph_handle_caps: corrupt message\n"); + ceph_msg_dump(msg); return; } diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 739093f281d0..29a93fe35f85 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -1650,6 +1650,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) return; if (msg->front.iov_len < sizeof(*head)) { pr_err("mdsc_handle_reply got corrupt (short) reply\n"); + ceph_msg_dump(msg); return; } @@ -1740,6 +1741,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) mutex_lock(&session->s_mutex); if (err < 0) { pr_err("mdsc_handle_reply got corrupt reply mds%d\n", mds); + ceph_msg_dump(msg); goto out_err; } @@ -1929,6 +1931,7 @@ static void handle_session(struct ceph_mds_session *session, bad: pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds, (int)msg->front.iov_len); + ceph_msg_dump(msg); return; } @@ -2394,6 +2397,7 @@ out: bad: pr_err("corrupt lease message\n"); + ceph_msg_dump(msg); } void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, diff --git a/fs/ceph/mdsmap.c b/fs/ceph/mdsmap.c index cad8d25861e5..c4c498e6dfef 100644 --- a/fs/ceph/mdsmap.c +++ b/fs/ceph/mdsmap.c @@ -49,6 +49,7 @@ int ceph_mdsmap_get_random_mds(struct ceph_mdsmap *m) struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end) { struct ceph_mdsmap *m; + const void *start = *p; int i, j, n; int err = -EINVAL; u16 version; @@ -154,6 +155,9 @@ badmem: err = -ENOMEM; bad: pr_err("corrupt mdsmap\n"); + print_hex_dump(KERN_DEBUG, "mdsmap: ", + DUMP_PREFIX_OFFSET, 16, 1, + start, end - start, true); ceph_mdsmap_destroy(m); return ERR_PTR(-EINVAL); } diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c index d5eef76a253c..b10f88c56706 100644 --- a/fs/ceph/messenger.c +++ b/fs/ceph/messenger.c @@ -2115,3 +2115,23 @@ void ceph_msg_last_put(struct kref *kref) else ceph_msg_kfree(m); } + +void ceph_msg_dump(struct ceph_msg *msg) +{ + pr_debug("msg_dump %p (front_max %d nr_pages %d)\n", msg, + msg->front_max, msg->nr_pages); + print_hex_dump(KERN_DEBUG, "header: ", + DUMP_PREFIX_OFFSET, 16, 1, + &msg->hdr, sizeof(msg->hdr), true); + print_hex_dump(KERN_DEBUG, " front: ", + DUMP_PREFIX_OFFSET, 16, 1, + msg->front.iov_base, msg->front.iov_len, true); + if (msg->middle) + print_hex_dump(KERN_DEBUG, "middle: ", + DUMP_PREFIX_OFFSET, 16, 1, + msg->middle->vec.iov_base, + msg->middle->vec.iov_len, true); + print_hex_dump(KERN_DEBUG, "footer: ", + DUMP_PREFIX_OFFSET, 16, 1, + &msg->footer, sizeof(msg->footer), true); +} diff --git a/fs/ceph/messenger.h b/fs/ceph/messenger.h index eff5cb5197fc..e04c214b4f6f 100644 --- a/fs/ceph/messenger.h +++ b/fs/ceph/messenger.h @@ -254,4 +254,6 @@ static inline void ceph_msg_put(struct ceph_msg *msg) kref_put(&msg->kref, ceph_msg_last_put); } +extern void ceph_msg_dump(struct ceph_msg *msg); + #endif diff --git a/fs/ceph/mon_client.c b/fs/ceph/mon_client.c index a76da5e6dbdd..775a9c029c51 100644 --- a/fs/ceph/mon_client.c +++ b/fs/ceph/mon_client.c @@ -242,6 +242,7 @@ static void handle_subscribe_ack(struct ceph_mon_client *monc, return; bad: pr_err("got corrupt subscribe-ack msg\n"); + ceph_msg_dump(msg); } /* @@ -364,6 +365,7 @@ static void handle_statfs_reply(struct ceph_mon_client *monc, bad: pr_err("corrupt statfs reply, no tid\n"); + ceph_msg_dump(msg); } /* diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c index 63482ef3de01..4bfe880d53c8 100644 --- a/fs/ceph/osd_client.c +++ b/fs/ceph/osd_client.c @@ -773,6 +773,7 @@ bad: pr_err("corrupt osd_op_reply got %d %d expected %d\n", (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len), (int)sizeof(*rhead)); + ceph_msg_dump(msg); } @@ -964,6 +965,7 @@ done: bad: pr_err("osdc handle_map corrupt msg\n"); + ceph_msg_dump(msg); up_write(&osdc->map_sem); return; } diff --git a/fs/ceph/osdmap.c b/fs/ceph/osdmap.c index be5318aa7714..8c8ffe5ef7d4 100644 --- a/fs/ceph/osdmap.c +++ b/fs/ceph/osdmap.c @@ -726,6 +726,9 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, bad: pr_err("corrupt inc osdmap epoch %d off %d (%p of %p-%p)\n", epoch, (int)(*p - start), *p, start, end); + print_hex_dump(KERN_DEBUG, "osdmap: ", + DUMP_PREFIX_OFFSET, 16, 1, + start, end - start, true); if (newcrush) crush_destroy(newcrush); return ERR_PTR(err); diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c index 52f46a1208f5..dcf18d92130a 100644 --- a/fs/ceph/snap.c +++ b/fs/ceph/snap.c @@ -877,6 +877,7 @@ split_skip_inode: bad: pr_err("corrupt snap message from mds%d\n", mds); + ceph_msg_dump(msg); out: if (locked_rwsem) up_write(&mdsc->snap_rwsem); diff --git a/fs/ceph/super.c b/fs/ceph/super.c index a828943296c5..6d02a166f8ff 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -602,13 +602,8 @@ int ceph_check_fsid(struct ceph_client *client, struct ceph_fsid *fsid) { if (client->have_fsid) { if (ceph_fsid_compare(&client->fsid, fsid)) { - print_hex_dump(KERN_ERR, "this fsid: ", - DUMP_PREFIX_NONE, 16, 1, - (void *)fsid, 16, 0); - print_hex_dump(KERN_ERR, " old fsid: ", - DUMP_PREFIX_NONE, 16, 1, - (void *)&client->fsid, 16, 0); - pr_err("fsid mismatch\n"); + pr_err("bad fsid, had " FSID_FORMAT " got " FSID_FORMAT, + PR_FSID(&client->fsid), PR_FSID(fsid)); return -1; } } else { -- cgit v1.2.3 From a105f00cf17d711e876b3dc67e15f9a89b7de5a3 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 15 Feb 2010 14:37:55 -0800 Subject: ceph: use rbtree for snap_realms Switch from radix tree to rbtree for snap realms. This is much more appropriate given that realm keys are few and far between. Signed-off-by: Sage Weil --- fs/ceph/mds_client.c | 16 +++++----------- fs/ceph/mds_client.h | 3 +-- fs/ceph/snap.c | 51 ++++++++++++++++++++++++++++++++++++++++----------- fs/ceph/super.h | 2 ++ 4 files changed, 48 insertions(+), 24 deletions(-) (limited to 'fs/ceph/snap.c') diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 81840d6b68a4..02834cecc3a0 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -2097,9 +2097,8 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds) { struct ceph_mds_session *session = NULL; struct ceph_msg *reply; + struct rb_node *p; int err; - int got; - u64 next_snap_ino = 0; struct ceph_pagelist *pagelist; pr_info("reconnect to recovering mds%d\n", mds); @@ -2155,14 +2154,10 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds) * parent for all of our realms. If the mds has any newer info, * it will tell us. */ - next_snap_ino = 0; - while (1) { - struct ceph_snap_realm *realm; + for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) { + struct ceph_snap_realm *realm = + rb_entry(p, struct ceph_snap_realm, node); struct ceph_mds_snaprealm_reconnect sr_rec; - got = radix_tree_gang_lookup(&mdsc->snap_realms, - (void **)&realm, next_snap_ino, 1); - if (!got) - break; dout(" adding snap realm %llx seq %lld parent %llx\n", realm->ino, realm->seq, realm->parent_ino); @@ -2172,7 +2167,6 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds) err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec)); if (err) goto fail; - next_snap_ino = realm->ino + 1; } send: @@ -2603,7 +2597,7 @@ int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client) mdsc->max_sessions = 0; mdsc->stopping = 0; init_rwsem(&mdsc->snap_rwsem); - INIT_RADIX_TREE(&mdsc->snap_realms, GFP_NOFS); + mdsc->snap_realms = RB_ROOT; INIT_LIST_HEAD(&mdsc->snap_empty); spin_lock_init(&mdsc->snap_empty_lock); mdsc->last_tid = 0; diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 98f09cd06006..9d6b90173879 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -5,7 +5,6 @@ #include #include #include -#include #include #include @@ -246,7 +245,7 @@ struct ceph_mds_client { * should be destroyed. */ struct rw_semaphore snap_rwsem; - struct radix_tree_root snap_realms; + struct rb_root snap_realms; struct list_head snap_empty; spinlock_t snap_empty_lock; /* protect snap_empty */ diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c index dcf18d92130a..49d0c4c59d81 100644 --- a/fs/ceph/snap.c +++ b/fs/ceph/snap.c @@ -1,6 +1,5 @@ #include "ceph_debug.h" -#include #include #include "super.h" @@ -77,6 +76,28 @@ void ceph_get_snap_realm(struct ceph_mds_client *mdsc, atomic_inc(&realm->nref); } +static void __insert_snap_realm(struct rb_root *root, + struct ceph_snap_realm *new) +{ + struct rb_node **p = &root->rb_node; + struct rb_node *parent = NULL; + struct ceph_snap_realm *r = NULL; + + while (*p) { + parent = *p; + r = rb_entry(parent, struct ceph_snap_realm, node); + if (new->ino < r->ino) + p = &(*p)->rb_left; + else if (new->ino > r->ino) + p = &(*p)->rb_right; + else + BUG(); + } + + rb_link_node(&new->node, parent, p); + rb_insert_color(&new->node, root); +} + /* * create and get the realm rooted at @ino and bump its ref count. * @@ -92,8 +113,6 @@ static struct ceph_snap_realm *ceph_create_snap_realm( if (!realm) return ERR_PTR(-ENOMEM); - radix_tree_insert(&mdsc->snap_realms, ino, realm); - atomic_set(&realm->nref, 0); /* tree does not take a ref */ realm->ino = ino; INIT_LIST_HEAD(&realm->children); @@ -101,24 +120,34 @@ static struct ceph_snap_realm *ceph_create_snap_realm( INIT_LIST_HEAD(&realm->empty_item); INIT_LIST_HEAD(&realm->inodes_with_caps); spin_lock_init(&realm->inodes_with_caps_lock); + __insert_snap_realm(&mdsc->snap_realms, realm); dout("create_snap_realm %llx %p\n", realm->ino, realm); return realm; } /* - * find and get (if found) the realm rooted at @ino and bump its ref count. + * lookup the realm rooted at @ino. * * caller must hold snap_rwsem for write. */ struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc, u64 ino) { - struct ceph_snap_realm *realm; - - realm = radix_tree_lookup(&mdsc->snap_realms, ino); - if (realm) - dout("lookup_snap_realm %llx %p\n", realm->ino, realm); - return realm; + struct rb_node *n = mdsc->snap_realms.rb_node; + struct ceph_snap_realm *r; + + while (n) { + r = rb_entry(n, struct ceph_snap_realm, node); + if (ino < r->ino) + n = n->rb_left; + else if (ino > r->ino) + n = n->rb_right; + else { + dout("lookup_snap_realm %llx %p\n", r->ino, r); + return r; + } + } + return NULL; } static void __put_snap_realm(struct ceph_mds_client *mdsc, @@ -132,7 +161,7 @@ static void __destroy_snap_realm(struct ceph_mds_client *mdsc, { dout("__destroy_snap_realm %p %llx\n", realm, realm->ino); - radix_tree_delete(&mdsc->snap_realms, realm->ino); + rb_erase(&realm->node, &mdsc->snap_realms); if (realm->parent) { list_del_init(&realm->child_item); diff --git a/fs/ceph/super.h b/fs/ceph/super.h index b2adfccbab98..1f3928785e12 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -656,6 +656,8 @@ static inline void ceph_put_snap_context(struct ceph_snap_context *sc) struct ceph_snap_realm { u64 ino; atomic_t nref; + struct rb_node node; + u64 created, seq; u64 parent_ino; u64 parent_since; /* snapid when our current parent became so */ -- cgit v1.2.3 From 2600d2dd5085ab6fb09540226138a60055abf335 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 22 Feb 2010 15:12:16 -0800 Subject: ceph: drop messages on unregistered mds sessions; cleanup Verify the mds session is currently registered before handling incoming messages. Clean up message handlers to pull mds out of session->s_mds instead of less trustworthy src field. Clean up con_{get,put} debug output. Signed-off-by: Sage Weil --- fs/ceph/caps.c | 2 +- fs/ceph/mds_client.c | 85 ++++++++++++++++++++++++++-------------------------- fs/ceph/snap.c | 17 ++--------- fs/ceph/super.h | 1 + 4 files changed, 46 insertions(+), 59 deletions(-) (limited to 'fs/ceph/snap.c') diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index b6154ffe70df..bb846164addc 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -2600,7 +2600,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, struct inode *inode; struct ceph_cap *cap; struct ceph_mds_caps *h; - int mds = le64_to_cpu(msg->hdr.src.name.num); + int mds = session->s_mds; int op; u32 seq; struct ceph_vino vino; diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 124c0c17a14a..4d00ea2af000 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -309,6 +309,15 @@ static bool __have_session(struct ceph_mds_client *mdsc, int mds) return mdsc->sessions[mds]; } +static int __verify_registered_session(struct ceph_mds_client *mdsc, + struct ceph_mds_session *s) +{ + if (s->s_mds >= mdsc->max_sessions || + mdsc->sessions[s->s_mds] != s) + return -ENOENT; + return 0; +} + /* * create+register a new session for given mds. * called under mdsc->mutex. @@ -382,10 +391,11 @@ fail_realloc: /* * called under mdsc->mutex */ -static void unregister_session(struct ceph_mds_client *mdsc, +static void __unregister_session(struct ceph_mds_client *mdsc, struct ceph_mds_session *s) { - dout("unregister_session mds%d %p\n", s->s_mds, s); + dout("__unregister_session mds%d %p\n", s->s_mds, s); + BUG_ON(mdsc->sessions[s->s_mds] != s); mdsc->sessions[s->s_mds] = NULL; ceph_con_close(&s->s_con); ceph_put_mds_session(s); @@ -1740,10 +1750,8 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ u64 tid; int err, result; - int mds; + int mds = session->s_mds; - if (msg->hdr.src.name.type != CEPH_ENTITY_TYPE_MDS) - return; if (msg->front.iov_len < sizeof(*head)) { pr_err("mdsc_handle_reply got corrupt (short) reply\n"); ceph_msg_dump(msg); @@ -1760,7 +1768,6 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) return; } dout("handle_reply %p\n", req); - mds = le64_to_cpu(msg->hdr.src.name.num); /* correct session? */ if (!req->r_session && req->r_session != session) { @@ -1884,7 +1891,9 @@ out: /* * handle mds notification that our request has been forwarded. */ -static void handle_forward(struct ceph_mds_client *mdsc, struct ceph_msg *msg) +static void handle_forward(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session, + struct ceph_msg *msg) { struct ceph_mds_request *req; u64 tid; @@ -1894,11 +1903,7 @@ static void handle_forward(struct ceph_mds_client *mdsc, struct ceph_msg *msg) int err = -EINVAL; void *p = msg->front.iov_base; void *end = p + msg->front.iov_len; - int from_mds, state; - - if (msg->hdr.src.name.type != CEPH_ENTITY_TYPE_MDS) - goto bad; - from_mds = le64_to_cpu(msg->hdr.src.name.num); + int state; ceph_decode_need(&p, end, sizeof(u64)+2*sizeof(u32), bad); tid = ceph_decode_64(&p); @@ -1915,6 +1920,9 @@ static void handle_forward(struct ceph_mds_client *mdsc, struct ceph_msg *msg) goto out; /* dup reply? */ } + if (next_mds >= mdsc->max_sessions) + goto out; + state = mdsc->sessions[next_mds]->s_state; if (fwd_seq <= req->r_num_fwd) { dout("forward %llu to mds%d - old seq %d <= %d\n", @@ -1945,14 +1953,10 @@ static void handle_session(struct ceph_mds_session *session, struct ceph_mds_client *mdsc = session->s_mdsc; u32 op; u64 seq; - int mds; + int mds = session->s_mds; struct ceph_mds_session_head *h = msg->front.iov_base; int wake = 0; - if (msg->hdr.src.name.type != CEPH_ENTITY_TYPE_MDS) - return; - mds = le64_to_cpu(msg->hdr.src.name.num); - /* decode */ if (msg->front.iov_len != sizeof(*h)) goto bad; @@ -1960,6 +1964,8 @@ static void handle_session(struct ceph_mds_session *session, seq = le64_to_cpu(h->seq); mutex_lock(&mdsc->mutex); + if (op == CEPH_SESSION_CLOSE) + __unregister_session(mdsc, session); /* FIXME: this ttl calculation is generous */ session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose; mutex_unlock(&mdsc->mutex); @@ -1990,7 +1996,6 @@ static void handle_session(struct ceph_mds_session *session, break; case CEPH_SESSION_CLOSE: - unregister_session(mdsc, session); remove_session_caps(session); wake = 1; /* for good measure */ complete(&mdsc->session_close_waiters); @@ -2269,7 +2274,7 @@ static void check_new_map(struct ceph_mds_client *mdsc, /* the session never opened, just close it * out now */ __wake_requests(mdsc, &s->s_waiting); - unregister_session(mdsc, s); + __unregister_session(mdsc, s); } else { /* just close it */ mutex_unlock(&mdsc->mutex); @@ -2329,24 +2334,22 @@ void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry) di->lease_session = NULL; } -static void handle_lease(struct ceph_mds_client *mdsc, struct ceph_msg *msg) +static void handle_lease(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session, + struct ceph_msg *msg) { struct super_block *sb = mdsc->client->sb; struct inode *inode; - struct ceph_mds_session *session; struct ceph_inode_info *ci; struct dentry *parent, *dentry; struct ceph_dentry_info *di; - int mds; + int mds = session->s_mds; struct ceph_mds_lease *h = msg->front.iov_base; struct ceph_vino vino; int mask; struct qstr dname; int release = 0; - if (msg->hdr.src.name.type != CEPH_ENTITY_TYPE_MDS) - return; - mds = le64_to_cpu(msg->hdr.src.name.num); dout("handle_lease from mds%d\n", mds); /* decode */ @@ -2360,15 +2363,6 @@ static void handle_lease(struct ceph_mds_client *mdsc, struct ceph_msg *msg) if (dname.len != get_unaligned_le32(h+1)) goto bad; - /* find session */ - mutex_lock(&mdsc->mutex); - session = __ceph_lookup_mds_session(mdsc, mds); - mutex_unlock(&mdsc->mutex); - if (!session) { - pr_err("handle_lease got lease but no session mds%d\n", mds); - return; - } - mutex_lock(&session->s_mutex); session->s_seq++; @@ -2437,7 +2431,6 @@ release: out: iput(inode); mutex_unlock(&session->s_mutex); - ceph_put_mds_session(session); return; bad: @@ -2794,7 +2787,7 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) for (i = 0; i < mdsc->max_sessions; i++) { if (mdsc->sessions[i]) { session = get_session(mdsc->sessions[i]); - unregister_session(mdsc, session); + __unregister_session(mdsc, session); mutex_unlock(&mdsc->mutex); mutex_lock(&session->s_mutex); remove_session_caps(session); @@ -2891,8 +2884,7 @@ static struct ceph_connection *con_get(struct ceph_connection *con) struct ceph_mds_session *s = con->private; if (get_session(s)) { - dout("mdsc con_get %p %d -> %d\n", s, - atomic_read(&s->s_ref) - 1, atomic_read(&s->s_ref)); + dout("mdsc con_get %p ok (%d)\n", s, atomic_read(&s->s_ref)); return con; } dout("mdsc con_get %p FAIL\n", s); @@ -2903,9 +2895,8 @@ static void con_put(struct ceph_connection *con) { struct ceph_mds_session *s = con->private; - dout("mdsc con_put %p %d -> %d\n", s, atomic_read(&s->s_ref), - atomic_read(&s->s_ref) - 1); ceph_put_mds_session(s); + dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref)); } /* @@ -2926,6 +2917,13 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) struct ceph_mds_client *mdsc = s->s_mdsc; int type = le16_to_cpu(msg->hdr.type); + mutex_lock(&mdsc->mutex); + if (__verify_registered_session(mdsc, s) < 0) { + mutex_unlock(&mdsc->mutex); + goto out; + } + mutex_unlock(&mdsc->mutex); + switch (type) { case CEPH_MSG_MDS_MAP: ceph_mdsc_handle_map(mdsc, msg); @@ -2937,22 +2935,23 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) handle_reply(s, msg); break; case CEPH_MSG_CLIENT_REQUEST_FORWARD: - handle_forward(mdsc, msg); + handle_forward(mdsc, s, msg); break; case CEPH_MSG_CLIENT_CAPS: ceph_handle_caps(s, msg); break; case CEPH_MSG_CLIENT_SNAP: - ceph_handle_snap(mdsc, msg); + ceph_handle_snap(mdsc, s, msg); break; case CEPH_MSG_CLIENT_LEASE: - handle_lease(mdsc, msg); + handle_lease(mdsc, s, msg); break; default: pr_err("received unknown message type %d %s\n", type, ceph_msg_type_name(type)); } +out: ceph_msg_put(msg); } diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c index 49d0c4c59d81..bf2a5f3846a4 100644 --- a/fs/ceph/snap.c +++ b/fs/ceph/snap.c @@ -713,11 +713,11 @@ static void flush_snaps(struct ceph_mds_client *mdsc) * directory into another realm. */ void ceph_handle_snap(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session, struct ceph_msg *msg) { struct super_block *sb = mdsc->client->sb; - struct ceph_mds_session *session; - int mds; + int mds = session->s_mds; u64 split; int op; int trace_len; @@ -730,10 +730,6 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc, int i; int locked_rwsem = 0; - if (msg->hdr.src.name.type != CEPH_ENTITY_TYPE_MDS) - return; - mds = le64_to_cpu(msg->hdr.src.name.num); - /* decode */ if (msg->front.iov_len < sizeof(*h)) goto bad; @@ -749,15 +745,6 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc, dout("handle_snap from mds%d op %s split %llx tracelen %d\n", mds, ceph_snap_op_name(op), split, trace_len); - /* find session */ - mutex_lock(&mdsc->mutex); - session = __ceph_lookup_mds_session(mdsc, mds); - mutex_unlock(&mdsc->mutex); - if (!session) { - dout("WTF, got snap but no session for mds%d\n", mds); - return; - } - mutex_lock(&session->s_mutex); session->s_seq++; mutex_unlock(&session->s_mutex); diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 384f0e2e7c68..ff7aaa32736c 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -707,6 +707,7 @@ extern void ceph_put_snap_realm(struct ceph_mds_client *mdsc, extern int ceph_update_snap_trace(struct ceph_mds_client *m, void *p, void *e, bool deletion); extern void ceph_handle_snap(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session, struct ceph_msg *msg); extern void ceph_queue_cap_snap(struct ceph_inode_info *ci, struct ceph_snap_context *snapc); -- cgit v1.2.3