diff options
Diffstat (limited to 'drivers/block/rbd.c')
-rw-r--r-- | drivers/block/rbd.c | 812 |
1 files changed, 796 insertions, 16 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index cb96fb19e8a7..7cda1cc60c2c 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -31,6 +31,7 @@ #include <linux/ceph/libceph.h> #include <linux/ceph/osd_client.h> #include <linux/ceph/mon_client.h> +#include <linux/ceph/cls_lock_client.h> #include <linux/ceph/decode.h> #include <linux/parser.h> #include <linux/bsearch.h> @@ -114,14 +115,17 @@ static int atomic_dec_return_safe(atomic_t *v) #define RBD_OBJ_PREFIX_LEN_MAX 64 +#define RBD_NOTIFY_TIMEOUT 5 /* seconds */ #define RBD_RETRY_DELAY msecs_to_jiffies(1000) /* Feature bits */ #define RBD_FEATURE_LAYERING (1<<0) #define RBD_FEATURE_STRIPINGV2 (1<<1) -#define RBD_FEATURES_ALL \ - (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2) +#define RBD_FEATURE_EXCLUSIVE_LOCK (1<<2) +#define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \ + RBD_FEATURE_STRIPINGV2 | \ + RBD_FEATURE_EXCLUSIVE_LOCK) /* Features supported by this (client software) implementation. */ @@ -327,6 +331,18 @@ enum rbd_watch_state { RBD_WATCH_STATE_ERROR, }; +enum rbd_lock_state { + RBD_LOCK_STATE_UNLOCKED, + RBD_LOCK_STATE_LOCKED, + RBD_LOCK_STATE_RELEASING, +}; + +/* WatchNotify::ClientId */ +struct rbd_client_id { + u64 gid; + u64 handle; +}; + struct rbd_mapping { u64 size; u64 features; @@ -366,6 +382,15 @@ struct rbd_device { u64 watch_cookie; struct delayed_work watch_dwork; + struct rw_semaphore lock_rwsem; + enum rbd_lock_state lock_state; + struct rbd_client_id owner_cid; + struct work_struct acquired_lock_work; + struct work_struct released_lock_work; + struct delayed_work lock_dwork; + struct work_struct unlock_work; + wait_queue_head_t lock_waitq; + struct workqueue_struct *task_wq; struct rbd_spec *parent_spec; @@ -450,6 +475,29 @@ static int minor_to_rbd_dev_id(int minor) return minor >> RBD_SINGLE_MAJOR_PART_SHIFT; } +static bool rbd_is_lock_supported(struct rbd_device *rbd_dev) +{ + return (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) && + rbd_dev->spec->snap_id == CEPH_NOSNAP && + !rbd_dev->mapping.read_only; +} + +static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev) +{ + return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED || + rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING; +} + +static bool rbd_is_lock_owner(struct rbd_device *rbd_dev) +{ + bool is_lock_owner; + + down_read(&rbd_dev->lock_rwsem); + is_lock_owner = __rbd_is_lock_owner(rbd_dev); + up_read(&rbd_dev->lock_rwsem); + return is_lock_owner; +} + static BUS_ATTR(add, S_IWUSR, NULL, rbd_add); static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove); static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major); @@ -3095,31 +3143,690 @@ out_err: obj_request_done_set(obj_request); } -static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie, - u64 notifier_id, void *data, size_t data_len) +static const struct rbd_client_id rbd_empty_cid; + +static bool rbd_cid_equal(const struct rbd_client_id *lhs, + const struct rbd_client_id *rhs) +{ + return lhs->gid == rhs->gid && lhs->handle == rhs->handle; +} + +static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev) +{ + struct rbd_client_id cid; + + mutex_lock(&rbd_dev->watch_mutex); + cid.gid = ceph_client_gid(rbd_dev->rbd_client->client); + cid.handle = rbd_dev->watch_cookie; + mutex_unlock(&rbd_dev->watch_mutex); + return cid; +} + +/* + * lock_rwsem must be held for write + */ +static void rbd_set_owner_cid(struct rbd_device *rbd_dev, + const struct rbd_client_id *cid) +{ + dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev, + rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle, + cid->gid, cid->handle); + rbd_dev->owner_cid = *cid; /* struct */ +} + +static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf) +{ + mutex_lock(&rbd_dev->watch_mutex); + sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie); + mutex_unlock(&rbd_dev->watch_mutex); +} + +/* + * lock_rwsem must be held for write + */ +static int rbd_lock(struct rbd_device *rbd_dev) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + struct rbd_client_id cid = rbd_get_cid(rbd_dev); + char cookie[32]; + int ret; + + WARN_ON(__rbd_is_lock_owner(rbd_dev)); + + format_lock_cookie(rbd_dev, cookie); + ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, + RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie, + RBD_LOCK_TAG, "", 0); + if (ret) + return ret; + + rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED; + rbd_set_owner_cid(rbd_dev, &cid); + queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work); + return 0; +} + +/* + * lock_rwsem must be held for write + */ +static int rbd_unlock(struct rbd_device *rbd_dev) { - struct rbd_device *rbd_dev = arg; struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + char cookie[32]; int ret; - dout("%s rbd_dev %p cookie %llu notify_id %llu\n", __func__, rbd_dev, - cookie, notify_id); + WARN_ON(!__rbd_is_lock_owner(rbd_dev)); + + rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED; + + format_lock_cookie(rbd_dev, cookie); + ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, + RBD_LOCK_NAME, cookie); + if (ret && ret != -ENOENT) { + rbd_warn(rbd_dev, "cls_unlock failed: %d", ret); + return ret; + } + + rbd_set_owner_cid(rbd_dev, &rbd_empty_cid); + queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work); + return 0; +} + +static int __rbd_notify_op_lock(struct rbd_device *rbd_dev, + enum rbd_notify_op notify_op, + struct page ***preply_pages, + size_t *preply_len) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + struct rbd_client_id cid = rbd_get_cid(rbd_dev); + int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN; + char buf[buf_size]; + void *p = buf; + + dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op); + + /* encode *LockPayload NotifyMessage (op + ClientId) */ + ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN); + ceph_encode_32(&p, notify_op); + ceph_encode_64(&p, cid.gid); + ceph_encode_64(&p, cid.handle); + + return ceph_osdc_notify(osdc, &rbd_dev->header_oid, + &rbd_dev->header_oloc, buf, buf_size, + RBD_NOTIFY_TIMEOUT, preply_pages, preply_len); +} + +static void rbd_notify_op_lock(struct rbd_device *rbd_dev, + enum rbd_notify_op notify_op) +{ + struct page **reply_pages; + size_t reply_len; + + __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len); + ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len)); +} + +static void rbd_notify_acquired_lock(struct work_struct *work) +{ + struct rbd_device *rbd_dev = container_of(work, struct rbd_device, + acquired_lock_work); + + rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK); +} + +static void rbd_notify_released_lock(struct work_struct *work) +{ + struct rbd_device *rbd_dev = container_of(work, struct rbd_device, + released_lock_work); + + rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK); +} + +static int rbd_request_lock(struct rbd_device *rbd_dev) +{ + struct page **reply_pages; + size_t reply_len; + bool lock_owner_responded = false; + int ret; + dout("%s rbd_dev %p\n", __func__, rbd_dev); + + ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK, + &reply_pages, &reply_len); + if (ret && ret != -ETIMEDOUT) { + rbd_warn(rbd_dev, "failed to request lock: %d", ret); + goto out; + } + + if (reply_len > 0 && reply_len <= PAGE_SIZE) { + void *p = page_address(reply_pages[0]); + void *const end = p + reply_len; + u32 n; + + ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */ + while (n--) { + u8 struct_v; + u32 len; + + ceph_decode_need(&p, end, 8 + 8, e_inval); + p += 8 + 8; /* skip gid and cookie */ + + ceph_decode_32_safe(&p, end, len, e_inval); + if (!len) + continue; + + if (lock_owner_responded) { + rbd_warn(rbd_dev, + "duplicate lock owners detected"); + ret = -EIO; + goto out; + } + + lock_owner_responded = true; + ret = ceph_start_decoding(&p, end, 1, "ResponseMessage", + &struct_v, &len); + if (ret) { + rbd_warn(rbd_dev, + "failed to decode ResponseMessage: %d", + ret); + goto e_inval; + } + + ret = ceph_decode_32(&p); + } + } + + if (!lock_owner_responded) { + rbd_warn(rbd_dev, "no lock owners detected"); + ret = -ETIMEDOUT; + } + +out: + ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len)); + return ret; + +e_inval: + ret = -EINVAL; + goto out; +} + +static void wake_requests(struct rbd_device *rbd_dev, bool wake_all) +{ + dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all); + + cancel_delayed_work(&rbd_dev->lock_dwork); + if (wake_all) + wake_up_all(&rbd_dev->lock_waitq); + else + wake_up(&rbd_dev->lock_waitq); +} + +static int get_lock_owner_info(struct rbd_device *rbd_dev, + struct ceph_locker **lockers, u32 *num_lockers) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + u8 lock_type; + char *lock_tag; + int ret; + + dout("%s rbd_dev %p\n", __func__, rbd_dev); + + ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid, + &rbd_dev->header_oloc, RBD_LOCK_NAME, + &lock_type, &lock_tag, lockers, num_lockers); + if (ret) + return ret; + + if (*num_lockers == 0) { + dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev); + goto out; + } + + if (strcmp(lock_tag, RBD_LOCK_TAG)) { + rbd_warn(rbd_dev, "locked by external mechanism, tag %s", + lock_tag); + ret = -EBUSY; + goto out; + } + + if (lock_type == CEPH_CLS_LOCK_SHARED) { + rbd_warn(rbd_dev, "shared lock type detected"); + ret = -EBUSY; + goto out; + } + + if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX, + strlen(RBD_LOCK_COOKIE_PREFIX))) { + rbd_warn(rbd_dev, "locked by external mechanism, cookie %s", + (*lockers)[0].id.cookie); + ret = -EBUSY; + goto out; + } + +out: + kfree(lock_tag); + return ret; +} + +static int find_watcher(struct rbd_device *rbd_dev, + const struct ceph_locker *locker) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + struct ceph_watch_item *watchers; + u32 num_watchers; + u64 cookie; + int i; + int ret; + + ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid, + &rbd_dev->header_oloc, &watchers, + &num_watchers); + if (ret) + return ret; + + sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie); + for (i = 0; i < num_watchers; i++) { + if (!memcmp(&watchers[i].addr, &locker->info.addr, + sizeof(locker->info.addr)) && + watchers[i].cookie == cookie) { + struct rbd_client_id cid = { + .gid = le64_to_cpu(watchers[i].name.num), + .handle = cookie, + }; + + dout("%s rbd_dev %p found cid %llu-%llu\n", __func__, + rbd_dev, cid.gid, cid.handle); + rbd_set_owner_cid(rbd_dev, &cid); + ret = 1; + goto out; + } + } + + dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev); + ret = 0; +out: + kfree(watchers); + return ret; +} + +/* + * lock_rwsem must be held for write + */ +static int rbd_try_lock(struct rbd_device *rbd_dev) +{ + struct ceph_client *client = rbd_dev->rbd_client->client; + struct ceph_locker *lockers; + u32 num_lockers; + int ret; + + for (;;) { + ret = rbd_lock(rbd_dev); + if (ret != -EBUSY) + return ret; + + /* determine if the current lock holder is still alive */ + ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers); + if (ret) + return ret; + + if (num_lockers == 0) + goto again; + + ret = find_watcher(rbd_dev, lockers); + if (ret) { + if (ret > 0) + ret = 0; /* have to request lock */ + goto out; + } + + rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock", + ENTITY_NAME(lockers[0].id.name)); + + ret = ceph_monc_blacklist_add(&client->monc, + &lockers[0].info.addr); + if (ret) { + rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d", + ENTITY_NAME(lockers[0].id.name), ret); + goto out; + } + + ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid, + &rbd_dev->header_oloc, RBD_LOCK_NAME, + lockers[0].id.cookie, + &lockers[0].id.name); + if (ret && ret != -ENOENT) + goto out; + +again: + ceph_free_lockers(lockers, num_lockers); + } + +out: + ceph_free_lockers(lockers, num_lockers); + return ret; +} + +/* + * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED + */ +static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev, + int *pret) +{ + enum rbd_lock_state lock_state; + + down_read(&rbd_dev->lock_rwsem); + dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev, + rbd_dev->lock_state); + if (__rbd_is_lock_owner(rbd_dev)) { + lock_state = rbd_dev->lock_state; + up_read(&rbd_dev->lock_rwsem); + return lock_state; + } + + up_read(&rbd_dev->lock_rwsem); + down_write(&rbd_dev->lock_rwsem); + dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev, + rbd_dev->lock_state); + if (!__rbd_is_lock_owner(rbd_dev)) { + *pret = rbd_try_lock(rbd_dev); + if (*pret) + rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret); + } + + lock_state = rbd_dev->lock_state; + up_write(&rbd_dev->lock_rwsem); + return lock_state; +} + +static void rbd_acquire_lock(struct work_struct *work) +{ + struct rbd_device *rbd_dev = container_of(to_delayed_work(work), + struct rbd_device, lock_dwork); + enum rbd_lock_state lock_state; + int ret; + + dout("%s rbd_dev %p\n", __func__, rbd_dev); +again: + lock_state = rbd_try_acquire_lock(rbd_dev, &ret); + if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) { + if (lock_state == RBD_LOCK_STATE_LOCKED) + wake_requests(rbd_dev, true); + dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__, + rbd_dev, lock_state, ret); + return; + } + + ret = rbd_request_lock(rbd_dev); + if (ret == -ETIMEDOUT) { + goto again; /* treat this as a dead client */ + } else if (ret < 0) { + rbd_warn(rbd_dev, "error requesting lock: %d", ret); + mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, + RBD_RETRY_DELAY); + } else { + /* + * lock owner acked, but resend if we don't see them + * release the lock + */ + dout("%s rbd_dev %p requeueing lock_dwork\n", __func__, + rbd_dev); + mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, + msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC)); + } +} + +/* + * lock_rwsem must be held for write + */ +static bool rbd_release_lock(struct rbd_device *rbd_dev) +{ + dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev, + rbd_dev->lock_state); + if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED) + return false; + + rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING; + downgrade_write(&rbd_dev->lock_rwsem); /* - * Until adequate refresh error handling is in place, there is - * not much we can do here, except warn. + * Ensure that all in-flight IO is flushed. * - * See http://tracker.ceph.com/issues/5040 + * FIXME: ceph_osdc_sync() flushes the entire OSD client, which + * may be shared with other devices. */ - ret = rbd_dev_refresh(rbd_dev); - if (ret) - rbd_warn(rbd_dev, "refresh failed: %d", ret); + ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc); + up_read(&rbd_dev->lock_rwsem); + + down_write(&rbd_dev->lock_rwsem); + dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev, + rbd_dev->lock_state); + if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING) + return false; + + if (!rbd_unlock(rbd_dev)) + /* + * Give others a chance to grab the lock - we would re-acquire + * almost immediately if we got new IO during ceph_osdc_sync() + * otherwise. We need to ack our own notifications, so this + * lock_dwork will be requeued from rbd_wait_state_locked() + * after wake_requests() in rbd_handle_released_lock(). + */ + cancel_delayed_work(&rbd_dev->lock_dwork); + + return true; +} + +static void rbd_release_lock_work(struct work_struct *work) +{ + struct rbd_device *rbd_dev = container_of(work, struct rbd_device, + unlock_work); + + down_write(&rbd_dev->lock_rwsem); + rbd_release_lock(rbd_dev); + up_write(&rbd_dev->lock_rwsem); +} + +static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v, + void **p) +{ + struct rbd_client_id cid = { 0 }; + + if (struct_v >= 2) { + cid.gid = ceph_decode_64(p); + cid.handle = ceph_decode_64(p); + } + + dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid, + cid.handle); + if (!rbd_cid_equal(&cid, &rbd_empty_cid)) { + down_write(&rbd_dev->lock_rwsem); + if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) { + /* + * we already know that the remote client is + * the owner + */ + up_write(&rbd_dev->lock_rwsem); + return; + } + + rbd_set_owner_cid(rbd_dev, &cid); + downgrade_write(&rbd_dev->lock_rwsem); + } else { + down_read(&rbd_dev->lock_rwsem); + } + + if (!__rbd_is_lock_owner(rbd_dev)) + wake_requests(rbd_dev, false); + up_read(&rbd_dev->lock_rwsem); +} + +static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v, + void **p) +{ + struct rbd_client_id cid = { 0 }; + + if (struct_v >= 2) { + cid.gid = ceph_decode_64(p); + cid.handle = ceph_decode_64(p); + } + + dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid, + cid.handle); + if (!rbd_cid_equal(&cid, &rbd_empty_cid)) { + down_write(&rbd_dev->lock_rwsem); + if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) { + dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n", + __func__, rbd_dev, cid.gid, cid.handle, + rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle); + up_write(&rbd_dev->lock_rwsem); + return; + } + + rbd_set_owner_cid(rbd_dev, &rbd_empty_cid); + downgrade_write(&rbd_dev->lock_rwsem); + } else { + down_read(&rbd_dev->lock_rwsem); + } + + if (!__rbd_is_lock_owner(rbd_dev)) + wake_requests(rbd_dev, false); + up_read(&rbd_dev->lock_rwsem); +} + +static bool rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v, + void **p) +{ + struct rbd_client_id my_cid = rbd_get_cid(rbd_dev); + struct rbd_client_id cid = { 0 }; + bool need_to_send; + + if (struct_v >= 2) { + cid.gid = ceph_decode_64(p); + cid.handle = ceph_decode_64(p); + } + + dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid, + cid.handle); + if (rbd_cid_equal(&cid, &my_cid)) + return false; + + down_read(&rbd_dev->lock_rwsem); + need_to_send = __rbd_is_lock_owner(rbd_dev); + if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) { + if (!rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid)) { + dout("%s rbd_dev %p queueing unlock_work\n", __func__, + rbd_dev); + queue_work(rbd_dev->task_wq, &rbd_dev->unlock_work); + } + } + up_read(&rbd_dev->lock_rwsem); + return need_to_send; +} + +static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev, + u64 notify_id, u64 cookie, s32 *result) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN; + char buf[buf_size]; + int ret; + + if (result) { + void *p = buf; + + /* encode ResponseMessage */ + ceph_start_encoding(&p, 1, 1, + buf_size - CEPH_ENCODING_START_BLK_LEN); + ceph_encode_32(&p, *result); + } else { + buf_size = 0; + } ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, notify_id, cookie, - NULL, 0); + buf, buf_size); if (ret) - rbd_warn(rbd_dev, "notify_ack ret %d", ret); + rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret); +} + +static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id, + u64 cookie) +{ + dout("%s rbd_dev %p\n", __func__, rbd_dev); + __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL); +} + +static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev, + u64 notify_id, u64 cookie, s32 result) +{ + dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result); + __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result); +} + +static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie, + u64 notifier_id, void *data, size_t data_len) +{ + struct rbd_device *rbd_dev = arg; + void *p = data; + void *const end = p + data_len; + u8 struct_v; + u32 len; + u32 notify_op; + int ret; + + dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n", + __func__, rbd_dev, cookie, notify_id, data_len); + if (data_len) { + ret = ceph_start_decoding(&p, end, 1, "NotifyMessage", + &struct_v, &len); + if (ret) { + rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d", + ret); + return; + } + + notify_op = ceph_decode_32(&p); + } else { + /* legacy notification for header updates */ + notify_op = RBD_NOTIFY_OP_HEADER_UPDATE; + len = 0; + } + + dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op); + switch (notify_op) { + case RBD_NOTIFY_OP_ACQUIRED_LOCK: + rbd_handle_acquired_lock(rbd_dev, struct_v, &p); + rbd_acknowledge_notify(rbd_dev, notify_id, cookie); + break; + case RBD_NOTIFY_OP_RELEASED_LOCK: + rbd_handle_released_lock(rbd_dev, struct_v, &p); + rbd_acknowledge_notify(rbd_dev, notify_id, cookie); + break; + case RBD_NOTIFY_OP_REQUEST_LOCK: + if (rbd_handle_request_lock(rbd_dev, struct_v, &p)) + /* + * send ResponseMessage(0) back so the client + * can detect a missing owner + */ + rbd_acknowledge_notify_result(rbd_dev, notify_id, + cookie, 0); + else + rbd_acknowledge_notify(rbd_dev, notify_id, cookie); + break; + case RBD_NOTIFY_OP_HEADER_UPDATE: + ret = rbd_dev_refresh(rbd_dev); + if (ret) + rbd_warn(rbd_dev, "refresh failed: %d", ret); + + rbd_acknowledge_notify(rbd_dev, notify_id, cookie); + break; + default: + if (rbd_is_lock_owner(rbd_dev)) + rbd_acknowledge_notify_result(rbd_dev, notify_id, + cookie, -EOPNOTSUPP); + else + rbd_acknowledge_notify(rbd_dev, notify_id, cookie); + break; + } } static void __rbd_unregister_watch(struct rbd_device *rbd_dev); @@ -3130,6 +3837,10 @@ static void rbd_watch_errcb(void *arg, u64 cookie, int err) rbd_warn(rbd_dev, "encountered watch error: %d", err); + down_write(&rbd_dev->lock_rwsem); + rbd_set_owner_cid(rbd_dev, &rbd_empty_cid); + up_write(&rbd_dev->lock_rwsem); + mutex_lock(&rbd_dev->watch_mutex); if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) { __rbd_unregister_watch(rbd_dev); @@ -3202,10 +3913,15 @@ static void cancel_tasks_sync(struct rbd_device *rbd_dev) dout("%s rbd_dev %p\n", __func__, rbd_dev); cancel_delayed_work_sync(&rbd_dev->watch_dwork); + cancel_work_sync(&rbd_dev->acquired_lock_work); + cancel_work_sync(&rbd_dev->released_lock_work); + cancel_delayed_work_sync(&rbd_dev->lock_dwork); + cancel_work_sync(&rbd_dev->unlock_work); } static void rbd_unregister_watch(struct rbd_device *rbd_dev) { + WARN_ON(waitqueue_active(&rbd_dev->lock_waitq)); cancel_tasks_sync(rbd_dev); mutex_lock(&rbd_dev->watch_mutex); @@ -3221,10 +3937,15 @@ static void rbd_reregister_watch(struct work_struct *work) { struct rbd_device *rbd_dev = container_of(to_delayed_work(work), struct rbd_device, watch_dwork); + bool was_lock_owner = false; int ret; dout("%s rbd_dev %p\n", __func__, rbd_dev); + down_write(&rbd_dev->lock_rwsem); + if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) + was_lock_owner = rbd_release_lock(rbd_dev); + mutex_lock(&rbd_dev->watch_mutex); if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) goto fail_unlock; @@ -3247,10 +3968,20 @@ static void rbd_reregister_watch(struct work_struct *work) if (ret) rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret); + if (was_lock_owner) { + ret = rbd_try_lock(rbd_dev); + if (ret) + rbd_warn(rbd_dev, "reregisteration lock failed: %d", + ret); + } + + up_write(&rbd_dev->lock_rwsem); + wake_requests(rbd_dev, true); return; fail_unlock: mutex_unlock(&rbd_dev->watch_mutex); + up_write(&rbd_dev->lock_rwsem); } /* @@ -3340,6 +4071,29 @@ out: return ret; } +/* + * lock_rwsem must be held for read + */ +static void rbd_wait_state_locked(struct rbd_device *rbd_dev) +{ + DEFINE_WAIT(wait); + + do { + /* + * Note the use of mod_delayed_work() in rbd_acquire_lock() + * and cancel_delayed_work() in wake_requests(). + */ + dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev); + queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); + prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait, + TASK_UNINTERRUPTIBLE); + up_read(&rbd_dev->lock_rwsem); + schedule(); + down_read(&rbd_dev->lock_rwsem); + } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED); + finish_wait(&rbd_dev->lock_waitq, &wait); +} + static void rbd_queue_workfn(struct work_struct *work) { struct request *rq = blk_mq_rq_from_pdu(work); @@ -3350,6 +4104,7 @@ static void rbd_queue_workfn(struct work_struct *work) u64 length = blk_rq_bytes(rq); enum obj_operation_type op_type; u64 mapping_size; + bool must_be_locked = false; int result; if (rq->cmd_type != REQ_TYPE_FS) { @@ -3411,6 +4166,7 @@ static void rbd_queue_workfn(struct work_struct *work) if (op_type != OBJ_OP_READ) { snapc = rbd_dev->header.snapc; ceph_get_snap_context(snapc); + must_be_locked = rbd_is_lock_supported(rbd_dev); } up_read(&rbd_dev->header_rwsem); @@ -3421,11 +4177,17 @@ static void rbd_queue_workfn(struct work_struct *work) goto err_rq; } + if (must_be_locked) { + down_read(&rbd_dev->lock_rwsem); + if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED) + rbd_wait_state_locked(rbd_dev); + } + img_request = rbd_img_request_create(rbd_dev, offset, length, op_type, snapc); if (!img_request) { result = -ENOMEM; - goto err_rq; + goto err_unlock; } img_request->rq = rq; snapc = NULL; /* img_request consumes a ref */ @@ -3443,10 +4205,15 @@ static void rbd_queue_workfn(struct work_struct *work) if (result) goto err_img_request; + if (must_be_locked) + up_read(&rbd_dev->lock_rwsem); return; err_img_request: rbd_img_request_put(img_request); +err_unlock: + if (must_be_locked) + up_read(&rbd_dev->lock_rwsem); err_rq: if (result) rbd_warn(rbd_dev, "%s %llx at %llx result %d", @@ -4020,6 +4787,7 @@ static void rbd_spec_free(struct kref *kref) static void rbd_dev_free(struct rbd_device *rbd_dev) { WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED); + WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED); ceph_oid_destroy(&rbd_dev->header_oid); ceph_oloc_destroy(&rbd_dev->header_oloc); @@ -4071,6 +4839,14 @@ static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc, rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED; INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch); + init_rwsem(&rbd_dev->lock_rwsem); + rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED; + INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock); + INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock); + INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock); + INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work); + init_waitqueue_head(&rbd_dev->lock_waitq); + rbd_dev->dev.bus = &rbd_bus_type; rbd_dev->dev.type = &rbd_device_type; rbd_dev->dev.parent = &rbd_root_dev; @@ -5553,6 +6329,10 @@ static ssize_t do_rbd_remove(struct bus_type *bus, if (ret < 0 || already) return ret; + down_write(&rbd_dev->lock_rwsem); + if (__rbd_is_lock_owner(rbd_dev)) + rbd_unlock(rbd_dev); + up_write(&rbd_dev->lock_rwsem); rbd_unregister_watch(rbd_dev); /* |