diff options
-rw-r--r-- | fs/ceph/ioctl.c | 8 | ||||
-rw-r--r-- | fs/ceph/xattr.c | 8 | ||||
-rw-r--r-- | include/linux/ceph/osd_client.h | 18 | ||||
-rw-r--r-- | net/ceph/debugfs.c | 34 | ||||
-rw-r--r-- | net/ceph/osd_client.c | 1164 |
5 files changed, 602 insertions, 630 deletions
diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c index 1831ad6cf066..be6b1657b1af 100644 --- a/fs/ceph/ioctl.c +++ b/fs/ceph/ioctl.c @@ -193,12 +193,12 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg) if (copy_from_user(&dl, arg, sizeof(dl))) return -EFAULT; - down_read(&osdc->map_sem); + down_read(&osdc->lock); r = ceph_calc_file_object_mapping(&ci->i_layout, dl.file_offset, len, &dl.object_no, &dl.object_offset, &olen); if (r < 0) { - up_read(&osdc->map_sem); + up_read(&osdc->lock); return -EIO; } dl.file_offset -= dl.object_offset; @@ -217,7 +217,7 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg) r = ceph_object_locator_to_pg(osdc->osdmap, &oid, &oloc, &pgid); if (r < 0) { - up_read(&osdc->map_sem); + up_read(&osdc->lock); return r; } @@ -230,7 +230,7 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg) } else { memset(&dl.osd_addr, 0, sizeof(dl.osd_addr)); } - up_read(&osdc->map_sem); + up_read(&osdc->lock); /* send result back to user */ if (copy_to_user(arg, &dl, sizeof(dl))) diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c index 9410abdef3ce..5afabc4bf4c7 100644 --- a/fs/ceph/xattr.c +++ b/fs/ceph/xattr.c @@ -75,7 +75,7 @@ static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val, char buf[128]; dout("ceph_vxattrcb_layout %p\n", &ci->vfs_inode); - down_read(&osdc->map_sem); + down_read(&osdc->lock); pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, pool); if (pool_name) { size_t len = strlen(pool_name); @@ -107,7 +107,7 @@ static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val, ret = -ERANGE; } } - up_read(&osdc->map_sem); + up_read(&osdc->lock); return ret; } @@ -141,13 +141,13 @@ static size_t ceph_vxattrcb_layout_pool(struct ceph_inode_info *ci, s64 pool = ceph_file_layout_pg_pool(ci->i_layout); const char *pool_name; - down_read(&osdc->map_sem); + down_read(&osdc->lock); pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, pool); if (pool_name) ret = snprintf(val, size, "%s", pool_name); else ret = snprintf(val, size, "%lld", (unsigned long long)pool); - up_read(&osdc->map_sem); + up_read(&osdc->lock); return ret; } diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 486d681694c4..342f22f1f040 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -33,12 +33,13 @@ struct ceph_osd { int o_incarnation; struct rb_node o_node; struct ceph_connection o_con; - struct list_head o_requests; + struct rb_root o_requests; struct list_head o_linger_requests; struct list_head o_osd_lru; struct ceph_auth_handshake o_auth; unsigned long lru_ttl; struct list_head o_keepalive_item; + struct mutex lock; }; #define CEPH_OSD_SLAB_OPS 2 @@ -144,8 +145,6 @@ struct ceph_osd_request_target { struct ceph_osd_request { u64 r_tid; /* unique for this client */ struct rb_node r_node; - struct list_head r_req_lru_item; - struct list_head r_osd_item; struct list_head r_linger_item; struct list_head r_linger_osd_item; struct ceph_osd *r_osd; @@ -219,19 +218,16 @@ struct ceph_osd_client { struct ceph_client *client; struct ceph_osdmap *osdmap; /* current map */ - struct rw_semaphore map_sem; + struct rw_semaphore lock; - struct mutex request_mutex; struct rb_root osds; /* osds */ struct list_head osd_lru; /* idle osds */ spinlock_t osd_lru_lock; - u64 last_tid; /* tid of last request */ - struct rb_root requests; /* pending requests */ - struct list_head req_lru; /* in-flight lru */ - struct list_head req_unsent; /* unsent/need-resend queue */ - struct list_head req_notarget; /* map to no osd */ struct list_head req_linger; /* lingering requests */ - int num_requests; + struct ceph_osd homeless_osd; + atomic64_t last_tid; /* tid of last request */ + atomic_t num_requests; + atomic_t num_homeless; struct delayed_work timeout_work; struct delayed_work osds_timeout_work; #ifdef CONFIG_DEBUG_FS diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c index 6d3ff713edeb..61dbd9de4650 100644 --- a/net/ceph/debugfs.c +++ b/net/ceph/debugfs.c @@ -182,21 +182,39 @@ static void dump_request(struct seq_file *s, struct ceph_osd_request *req) seq_putc(s, '\n'); } +static void dump_requests(struct seq_file *s, struct ceph_osd *osd) +{ + struct rb_node *n; + + mutex_lock(&osd->lock); + for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) { + struct ceph_osd_request *req = + rb_entry(n, struct ceph_osd_request, r_node); + + dump_request(s, req); + } + + mutex_unlock(&osd->lock); +} + static int osdc_show(struct seq_file *s, void *pp) { struct ceph_client *client = s->private; struct ceph_osd_client *osdc = &client->osdc; - struct rb_node *p; - - mutex_lock(&osdc->request_mutex); - for (p = rb_first(&osdc->requests); p; p = rb_next(p)) { - struct ceph_osd_request *req; + struct rb_node *n; - req = rb_entry(p, struct ceph_osd_request, r_node); + down_read(&osdc->lock); + seq_printf(s, "REQUESTS %d homeless %d\n", + atomic_read(&osdc->num_requests), + atomic_read(&osdc->num_homeless)); + for (n = rb_first(&osdc->osds); n; n = rb_next(n)) { + struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); - dump_request(s, req); + dump_requests(s, osd); } - mutex_unlock(&osdc->request_mutex); + dump_requests(s, &osdc->homeless_osd); + + up_read(&osdc->lock); return 0; } diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index d1c8e06f1261..4c856c87b1a9 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -25,16 +25,6 @@ static struct kmem_cache *ceph_osd_request_cache; static const struct ceph_connection_operations osd_con_ops; -static void __send_queued(struct ceph_osd_client *osdc); -static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd); -static void __register_request(struct ceph_osd_client *osdc, - struct ceph_osd_request *req); -static void __unregister_request(struct ceph_osd_client *osdc, - struct ceph_osd_request *req); -static void __unregister_linger_request(struct ceph_osd_client *osdc, - struct ceph_osd_request *req); -static void __enqueue_request(struct ceph_osd_request *req); - /* * Implement client access to distributed object storage cluster. * @@ -53,6 +43,43 @@ static void __enqueue_request(struct ceph_osd_request *req); * channel with an OSD is reset. */ +static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req); +static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req); + +#if 1 +static inline bool rwsem_is_wrlocked(struct rw_semaphore *sem) +{ + bool wrlocked = true; + + if (unlikely(down_read_trylock(sem))) { + wrlocked = false; + up_read(sem); + } + + return wrlocked; +} +static inline void verify_osdc_locked(struct ceph_osd_client *osdc) +{ + WARN_ON(!rwsem_is_locked(&osdc->lock)); +} +static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc) +{ + WARN_ON(!rwsem_is_wrlocked(&osdc->lock)); +} +static inline void verify_osd_locked(struct ceph_osd *osd) +{ + struct ceph_osd_client *osdc = osd->o_osdc; + + WARN_ON(!(mutex_is_locked(&osd->lock) && + rwsem_is_locked(&osdc->lock)) && + !rwsem_is_wrlocked(&osdc->lock)); +} +#else +static inline void verify_osdc_locked(struct ceph_osd_client *osdc) { } +static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc) { } +static inline void verify_osd_locked(struct ceph_osd *osd) { } +#endif + /* * calculate the mapping of a file extent onto an object, and fill out the * request accordingly. shorten extent as necessary if it crosses an @@ -336,18 +363,14 @@ static void ceph_osdc_release_request(struct kref *kref) dout("%s %p (r_request %p r_reply %p)\n", __func__, req, req->r_request, req->r_reply); WARN_ON(!RB_EMPTY_NODE(&req->r_node)); - WARN_ON(!list_empty(&req->r_req_lru_item)); - WARN_ON(!list_empty(&req->r_osd_item)); WARN_ON(!list_empty(&req->r_linger_item)); WARN_ON(!list_empty(&req->r_linger_osd_item)); WARN_ON(req->r_osd); if (req->r_request) ceph_msg_put(req->r_request); - if (req->r_reply) { - ceph_msg_revoke_incoming(req->r_reply); + if (req->r_reply) ceph_msg_put(req->r_reply); - } for (which = 0; which < req->r_num_ops; which++) osd_req_op_data_release(req, which); @@ -418,8 +441,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, INIT_LIST_HEAD(&req->r_unsafe_item); INIT_LIST_HEAD(&req->r_linger_item); INIT_LIST_HEAD(&req->r_linger_osd_item); - INIT_LIST_HEAD(&req->r_req_lru_item); - INIT_LIST_HEAD(&req->r_osd_item); target_init(&req->r_t); @@ -869,141 +890,11 @@ static bool osd_homeless(struct ceph_osd *osd) return osd->o_osd == CEPH_HOMELESS_OSD; } -static struct ceph_osd_request * -__lookup_request_ge(struct ceph_osd_client *osdc, - u64 tid) -{ - struct ceph_osd_request *req; - struct rb_node *n = osdc->requests.rb_node; - - while (n) { - req = rb_entry(n, struct ceph_osd_request, r_node); - if (tid < req->r_tid) { - if (!n->rb_left) - return req; - n = n->rb_left; - } else if (tid > req->r_tid) { - n = n->rb_right; - } else { - return req; - } - } - return NULL; -} - -static void __kick_linger_request(struct ceph_osd_request *req) -{ - struct ceph_osd_client *osdc = req->r_osdc; - struct ceph_osd *osd = req->r_osd; - - /* - * Linger requests need to be resent with a new tid to avoid - * the dup op detection logic on the OSDs. Achieve this with - * a re-register dance instead of open-coding. - */ - ceph_osdc_get_request(req); - if (!list_empty(&req->r_linger_item)) - __unregister_linger_request(osdc, req); - else - __unregister_request(osdc, req); - __register_request(osdc, req); - ceph_osdc_put_request(req); - - /* - * Unless request has been registered as both normal and - * lingering, __unregister{,_linger}_request clears r_osd. - * However, here we need to preserve r_osd to make sure we - * requeue on the same OSD. - */ - WARN_ON(req->r_osd || !osd); - req->r_osd = osd; - - dout("%s requeueing %p tid %llu\n", __func__, req, req->r_tid); - __enqueue_request(req); -} - -/* - * Resubmit requests pending on the given osd. - */ -static void __kick_osd_requests(struct ceph_osd_client *osdc, - struct ceph_osd *osd) -{ - struct ceph_osd_request *req, *nreq; - LIST_HEAD(resend); - LIST_HEAD(resend_linger); - int err; - - dout("%s osd%d\n", __func__, osd->o_osd); - err = __reset_osd(osdc, osd); - if (err) - return; - - /* - * Build up a list of requests to resend by traversing the - * osd's list of requests. Requests for a given object are - * sent in tid order, and that is also the order they're - * kept on this list. Therefore all requests that are in - * flight will be found first, followed by all requests that - * have not yet been sent. And to resend requests while - * preserving this order we will want to put any sent - * requests back on the front of the osd client's unsent - * list. - * - * So we build a separate ordered list of already-sent - * requests for the affected osd and splice it onto the - * front of the osd client's unsent list. Once we've seen a - * request that has not yet been sent we're done. Those - * requests are already sitting right where they belong. - */ - list_for_each_entry(req, &osd->o_requests, r_osd_item) { - if (!req->r_sent) - break; - - if (!req->r_linger) { - dout("%s requeueing %p tid %llu\n", __func__, req, - req->r_tid); - list_move_tail(&req->r_req_lru_item, &resend); - req->r_flags |= CEPH_OSD_FLAG_RETRY; - } else { - list_move_tail(&req->r_req_lru_item, &resend_linger); - } - } - list_splice(&resend, &osdc->req_unsent); - - /* - * Both registered and not yet registered linger requests are - * enqueued with a new tid on the same OSD. We add/move them - * to req_unsent/o_requests at the end to keep things in tid - * order. - */ - list_for_each_entry_safe(req, nreq, &osd->o_linger_requests, - r_linger_osd_item) { - WARN_ON(!list_empty(&req->r_req_lru_item)); - __kick_linger_request(req); - } - - list_for_each_entry_safe(req, nreq, &resend_linger, r_req_lru_item) - __kick_linger_request(req); -} - -/* - * If the osd connection drops, we need to resubmit all requests. - */ -static void osd_reset(struct ceph_connection *con) +static bool osd_registered(struct ceph_osd *osd) { - struct ceph_osd *osd = con->private; - struct ceph_osd_client *osdc; + verify_osdc_locked(osd->o_osdc); - if (!osd) - return; - dout("osd_reset osd%d\n", osd->o_osd); - osdc = osd->o_osdc; - down_read(&osdc->map_sem); - mutex_lock(&osdc->request_mutex); - __kick_osd_requests(osdc, osd); - __send_queued(osdc); - mutex_unlock(&osdc->request_mutex); - up_read(&osdc->map_sem); + return !RB_EMPTY_NODE(&osd->o_node); } /* @@ -1013,17 +904,18 @@ static void osd_init(struct ceph_osd *osd) { atomic_set(&osd->o_ref, 1); RB_CLEAR_NODE(&osd->o_node); - INIT_LIST_HEAD(&osd->o_requests); + osd->o_requests = RB_ROOT; INIT_LIST_HEAD(&osd->o_linger_requests); INIT_LIST_HEAD(&osd->o_osd_lru); INIT_LIST_HEAD(&osd->o_keepalive_item); osd->o_incarnation = 1; + mutex_init(&osd->lock); } static void osd_cleanup(struct ceph_osd *osd) { WARN_ON(!RB_EMPTY_NODE(&osd->o_node)); - WARN_ON(!list_empty(&osd->o_requests)); + WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests)); WARN_ON(!list_empty(&osd->o_linger_requests)); WARN_ON(!list_empty(&osd->o_osd_lru)); WARN_ON(!list_empty(&osd->o_keepalive_item)); @@ -1077,30 +969,6 @@ static void put_osd(struct ceph_osd *osd) DEFINE_RB_FUNCS(osd, struct ceph_osd, o_osd, o_node) -/* - * remove an osd from our map - */ -static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) -{ - dout("%s %p osd%d\n", __func__, osd, osd->o_osd); - WARN_ON(!list_empty(&osd->o_requests)); - WARN_ON(!list_empty(&osd->o_linger_requests)); - - list_del_init(&osd->o_osd_lru); - erase_osd(&osdc->osds, osd); -} - -static void remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) -{ - dout("%s %p osd%d\n", __func__, osd, osd->o_osd); - - if (!RB_EMPTY_NODE(&osd->o_node)) { - ceph_con_close(&osd->o_con); - __remove_osd(osdc, osd); - put_osd(osd); - } -} - static void __move_osd_to_lru(struct ceph_osd *osd) { struct ceph_osd_client *osdc = osd->o_osdc; @@ -1117,7 +985,7 @@ static void __move_osd_to_lru(struct ceph_osd *osd) static void maybe_move_osd_to_lru(struct ceph_osd *osd) { - if (list_empty(&osd->o_requests) && + if (RB_EMPTY_ROOT(&osd->o_requests) && list_empty(&osd->o_linger_requests)) __move_osd_to_lru(osd); } @@ -1135,29 +1003,63 @@ static void __remove_osd_from_lru(struct ceph_osd *osd) } /* + * Close the connection and assign any leftover requests to the + * homeless session. + */ +static void close_osd(struct ceph_osd *osd) +{ + struct ceph_osd_client *osdc = osd->o_osdc; + struct rb_node *n; + + verify_osdc_wrlocked(osdc); + dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd); + + ceph_con_close(&osd->o_con); + + for (n = rb_first(&osd->o_requests); n; ) { + struct ceph_osd_request *req = + rb_entry(n, struct ceph_osd_request, r_node); + + n = rb_next(n); /* unlink_request() */ + + dout(" reassigning req %p tid %llu\n", req, req->r_tid); + unlink_request(osd, req); + link_request(&osdc->homeless_osd, req); + } + + __remove_osd_from_lru(osd); + erase_osd(&osdc->osds, osd); + put_osd(osd); +} + +/* * reset osd connect */ -static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) +static int reopen_osd(struct ceph_osd *osd) { struct ceph_entity_addr *peer_addr; - dout("__reset_osd %p osd%d\n", osd, osd->o_osd); - if (list_empty(&osd->o_requests) && + dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd); + + if (RB_EMPTY_ROOT(&osd->o_requests) && list_empty(&osd->o_linger_requests)) { - remove_osd(osdc, osd); + close_osd(osd); return -ENODEV; } - peer_addr = &osdc->osdmap->osd_addr[osd->o_osd]; + peer_addr = &osd->o_osdc->osdmap->osd_addr[osd->o_osd]; if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) && !ceph_con_opened(&osd->o_con)) { - struct ceph_osd_request *req; + struct rb_node *n; dout("osd addr hasn't changed and connection never opened, " "letting msgr retry\n"); /* touch each r_stamp for handle_timeout()'s benfit */ - list_for_each_entry(req, &osd->o_requests, r_osd_item) + for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) { + struct ceph_osd_request *req = + rb_entry(n, struct ceph_osd_request, r_node); req->r_stamp = jiffies; + } return -EAGAIN; } @@ -1169,73 +1071,84 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) return 0; } -/* - * Register request, assign tid. If this is the first request, set up - * the timeout event. - */ -static void __register_request(struct ceph_osd_client *osdc, - struct ceph_osd_request *req) -{ - req->r_tid = ++osdc->last_tid; - req->r_request->hdr.tid = cpu_to_le64(req->r_tid); - dout("__register_request %p tid %lld\n", req, req->r_tid); - insert_request(&osdc->requests, req); - ceph_osdc_get_request(req); - osdc->num_requests++; -} - -/* - * called under osdc->request_mutex - */ -static void __unregister_request(struct ceph_osd_client *osdc, - struct ceph_osd_request *req) +static struct ceph_osd *lookup_create_osd(struct ceph_osd_client *osdc, int o, + bool wrlocked) { - if (RB_EMPTY_NODE(&req->r_node)) { - dout("__unregister_request %p tid %lld not registered\n", - req, req->r_tid); - return; - } + struct ceph_osd *osd; - dout("__unregister_request %p tid %lld\n", req, req->r_tid); - erase_request(&osdc->requests, req); - osdc->num_requests--; + if (wrlocked) + verify_osdc_wrlocked(osdc); + else + verify_osdc_locked(osdc); - if (req->r_osd) { - /* make sure the original request isn't in flight. */ - ceph_msg_revoke(req->r_request); + if (o != CEPH_HOMELESS_OSD) + osd = lookup_osd(&osdc->osds, o); + else + osd = &osdc->homeless_osd; + if (!osd) { + if (!wrlocked) + return ERR_PTR(-EAGAIN); - list_del_init(&req->r_osd_item); - maybe_move_osd_to_lru(req->r_osd); - if (list_empty(&req->r_linger_osd_item)) - req->r_osd = NULL; + osd = create_osd(osdc, o); + insert_osd(&osdc->osds, osd); + ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, + &osdc->osdmap->osd_addr[osd->o_osd]); } - list_del_init(&req->r_req_lru_item); - ceph_osdc_put_request(req); + dout("%s osdc %p osd%d -> osd %p\n", __func__, osdc, o, osd); + return osd; } /* - * Cancel a previously queued request message + * Create request <-> OSD session relation. + * + * @req has to be assigned a tid, @osd may be homeless. */ -static void __cancel_request(struct ceph_osd_request *req) +static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req) { - if (req->r_sent && req->r_osd) { - ceph_msg_revoke(req->r_request); - req->r_sent = 0; - } + verify_osd_locked(osd); + WARN_ON(!req->r_tid || req->r_osd); + dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd, + req, req->r_tid); + + if (!osd_homeless(osd)) + __remove_osd_from_lru(osd); + else + atomic_inc(&osd->o_osdc->num_homeless); + + get_osd(osd); + insert_request(&osd->o_requests, req); + req->r_osd = osd; } -static void __register_linger_request(struct ceph_osd_client *osdc, +static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req) +{ + verify_osd_locked(osd); + WARN_ON(req->r_osd != osd); + dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd, + req, req->r_tid); + + req->r_osd = NULL; + erase_request(&osd->o_requests, req); + put_osd(osd); + + if (!osd_homeless(osd)) + maybe_move_osd_to_lru(osd); + else + atomic_dec(&osd->o_osdc->num_homeless); +} + +static void __register_linger_request(struct ceph_osd *osd, struct ceph_osd_request *req) { dout("%s %p tid %llu\n", __func__, req, req->r_tid); WARN_ON(!req->r_linger); ceph_osdc_get_request(req); - list_add_tail(&req->r_linger_item, &osdc->req_linger); - if (req->r_osd) - list_add_tail(&req->r_linger_osd_item, - &req->r_osd->o_linger_requests); + list_add_tail(&req->r_linger_item, &osd->o_osdc->req_linger); + list_add_tail(&req->r_linger_osd_item, &osd->o_linger_requests); + __remove_osd_from_lru(osd); + req->r_osd = osd; } static void __unregister_linger_request(struct ceph_osd_client *osdc, @@ -1255,7 +1168,7 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc, if (req->r_osd) { list_del_init(&req->r_linger_osd_item); maybe_move_osd_to_lru(req->r_osd); - if (list_empty(&req->r_osd_item)) + if (RB_EMPTY_ROOT(&req->r_osd->o_requests)) req->r_osd = NULL; } ceph_osdc_put_request(req); @@ -1291,11 +1204,20 @@ static bool have_pool_full(struct ceph_osd_client *osdc) return false; } +static bool pool_full(struct ceph_osd_client *osdc, s64 pool_id) +{ + struct ceph_pg_pool_info *pi; + + pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id); + if (!pi) + return false; + + return __pool_full(pi); +} + /* * Returns whether a request should be blocked from being sent * based on the current osdmap and osd_client settings. - * - * Caller should hold map_sem for read. */ static bool target_should_be_paused(struct ceph_osd_client *osdc, const struct ceph_osd_request_target *t, @@ -1421,87 +1343,6 @@ out: return ct_res; } -static void __enqueue_request(struct ceph_osd_request *req) -{ - struct ceph_osd_client *osdc = req->r_osdc; - - dout("%s %p tid %llu to osd%d\n", __func__, req, req->r_tid, - req->r_osd ? req->r_osd->o_osd : -1); - - if (req->r_osd) { - __remove_osd_from_lru(req->r_osd); - list_add_tail(&req->r_osd_item, &req->r_osd->o_requests); - list_move_tail(&req->r_req_lru_item, &osdc->req_unsent); - } else { - list_move_tail(&req->r_req_lru_item, &osdc->req_notarget); - } -} - -/* - * Pick an osd (the first 'up' osd in the pg), allocate the osd struct - * (as needed), and set the request r_osd appropriately. If there is - * no up osd, set r_osd to NULL. Move the request to the appropriate list - * (unsent, homeless) or leave on in-flight lru. - * - * Return 0 if unchanged, 1 if changed, or negative on error. - * - * Caller should hold map_sem for read and request_mutex. - */ -static int __map_request(struct ceph_osd_client *osdc, - struct ceph_osd_request *req, int force_resend) -{ - enum calc_target_result ct_res; - int err; - - dout("map_request %p tid %lld\n", req, req->r_tid); - - ct_res = calc_target(osdc, &req->r_t, NULL, force_resend); - switch (ct_res) { - case CALC_TARGET_POOL_DNE: - list_move(&req->r_req_lru_item, &osdc->req_notarget); - return -EIO; - case CALC_TARGET_NO_ACTION: - return 0; /* no change */ - default: - BUG_ON(ct_res != CALC_TARGET_NEED_RESEND); - } - - dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n", - req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed, req->r_t.osd, - req->r_osd ? req->r_osd->o_osd : -1); - - if (req->r_osd) { - __cancel_request(req); - list_del_init(&req->r_osd_item); - list_del_init(&req->r_linger_osd_item); - req->r_osd = NULL; - } - - req->r_osd = lookup_osd(&osdc->osds, req->r_t.osd); - if (!req->r_osd && req->r_t.osd >= 0) { - err = -ENOMEM; - req->r_osd = create_osd(osdc, req->r_t.osd); - if (!req->r_osd) { - list_move(&req->r_req_lru_item, &osdc->req_notarget); - goto out; - } - - dout("map_request osd %p is osd%d\n", req->r_osd, - req->r_osd->o_osd); - insert_osd(&osdc->osds, req->r_osd); - - ceph_con_open(&req->r_osd->o_con, - CEPH_ENTITY_TYPE_OSD, req->r_osd->o_osd, - &osdc->osdmap->osd_addr[req->r_osd->o_osd]); - } - - __enqueue_request(req); - err = 1; /* osd or pg changed */ - -out: - return err; -} - static void setup_request_data(struct ceph_osd_request *req, struct ceph_msg *msg) { @@ -1648,8 +1489,16 @@ static void send_request(struct ceph_osd_request *req) { struct ceph_osd *osd = req->r_osd; + verify_osd_locked(osd); WARN_ON(osd->o_osd != req->r_t.osd); + /* + * We may have a previously queued request message hanging + * around. Cancel it to avoid corrupting the msgr. + */ + if (req->r_sent) + ceph_msg_revoke(req->r_request); + req->r_flags |= CEPH_OSD_FLAG_KNOWN_REDIR; if (req->r_attempts) req->r_flags |= CEPH_OSD_FLAG_RETRY; @@ -1671,24 +1520,11 @@ static void send_request(struct ceph_osd_request *req) ceph_con_send(&osd->o_con, ceph_msg_get(req->r_request)); } -/* - * Send any requests in the queue (req_unsent). - */ -static void __send_queued(struct ceph_osd_client *osdc) -{ - struct ceph_osd_request *req, *tmp; - - dout("__send_queued\n"); - list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) { - list_move_tail(&req->r_req_lru_item, &osdc->req_lru); - send_request(req); - } -} - static void maybe_request_map(struct ceph_osd_client *osdc) { bool continuous = false; + verify_osdc_locked(osdc); WARN_ON(!osdc->osdmap->epoch); if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) || @@ -1705,38 +1541,121 @@ static void maybe_request_map(struct ceph_osd_client *osdc) ceph_monc_renew_subs(&osdc->client->monc); } -/* - * Caller should hold map_sem for read and request_mutex. - */ -static int __ceph_osdc_start_request(struct ceph_osd_client *osdc, - struct ceph_osd_request *req, - bool nofail) +static void __submit_request(struct ceph_osd_request *req, bool wrlocked) { - int rc; + struct ceph_osd_client *osdc = req->r_osdc; + struct ceph_osd *osd; + bool need_send = false; + bool promoted = false; - __register_request(osdc, req); - req->r_sent = 0; - req->r_got_reply = 0; - rc = __map_request(osdc, req, 0); - if (rc < 0) { - if (nofail) { - dout("osdc_start_request failed map, " - " will retry %lld\n", req->r_tid); - rc = 0; - } else { - __unregister_request(osdc, req); - } - return rc; + WARN_ON(req->r_tid || req->r_got_reply); + dout("%s req %p wrlocked %d\n", __func__, req, wrlocked); + +again: + calc_target(osdc, &req->r_t, &req->r_last_force_resend, false); + osd = lookup_create_osd(osdc, req->r_t.osd, wrlocked); + if (IS_ERR(osd)) { + WARN_ON(PTR_ERR(osd) != -EAGAIN || wrlocked); + goto promote; } - if (req->r_osd == NULL) { - dout("send_request %p no up osds in pg\n", req); - ceph_monc_request_next_osdmap(&osdc->client->monc); + if ((req->r_flags & CEPH_OSD_FLAG_WRITE) && + ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR)) { + dout("req %p pausewr\n", req); + req->r_t.paused = true; + maybe_request_map(osdc); + } else if ((req->r_flags & CEPH_OSD_FLAG_READ) && + ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD)) { + dout("req %p pauserd\n", req); + req->r_t.paused = true; + maybe_request_map(osdc); + } else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) && + !(req->r_flags & (CEPH_OSD_FLAG_FULL_TRY | + CEPH_OSD_FLAG_FULL_FORCE)) && + (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) || + pool_full(osdc, req->r_t.base_oloc.pool))) { + dout("req %p full/pool_full\n", req); + pr_warn_ratelimited("FULL or reached pool quota\n"); + req->r_t.paused = true; + maybe_request_map(osdc); + } else if (!osd_homeless(osd)) { + need_send = true; } else { - __send_queued(osdc); + maybe_request_map(osdc); } - return 0; + mutex_lock(&osd->lock); + /* + * Assign the tid atomically with send_request() to protect + * multiple writes to the same object from racing with each + * other, resulting in out of order ops on the OSDs. + */ + req->r_tid = atomic64_inc_return(&osdc->last_tid); + link_request(osd, req); + if (need_send) + send_request(req); + mutex_unlock(&osd->lock); + + if (promoted) + downgrade_write(&osdc->lock); + return; + +promote: + up_read(&osdc->lock); + down_write(&osdc->lock); + wrlocked = true; + promoted = true; + goto again; +} + +static void account_request(struct ceph_osd_request *req) +{ + unsigned int mask = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK; + + if (req->r_flags & CEPH_OSD_FLAG_READ) { + WARN_ON(req->r_flags & mask); + req->r_flags |= CEPH_OSD_FLAG_ACK; + } else if (req->r_flags & CEPH_OSD_FLAG_WRITE) + WARN_ON(!(req->r_flags & mask)); + else + WARN_ON(1); + + WARN_ON(req->r_unsafe_callback && (req->r_flags & mask) != mask); + atomic_inc(&req->r_osdc->num_requests); +} + +static void submit_request(struct ceph_osd_request *req, bool wrlocked) +{ + ceph_osdc_get_request(req); + account_request(req); + __submit_request(req, wrlocked); +} + +static void __finish_request(struct ceph_osd_request *req) +{ + struct ceph_osd_client *osdc = req->r_osdc; + struct ceph_osd *osd = req->r_osd; + + verify_osd_locked(osd); + dout("%s req %p tid %llu\n", __func__, req, req->r_tid); + + unlink_request(osd, req); + atomic_dec(&osdc->num_requests); + + /* + * If an OSD has failed or returned and a request has been sent + * twice, it's possible to get a reply and end up here while the + * request message is queued for delivery. We will ignore the + * reply, so not a big deal, but better to try and catch it. + */ + ceph_msg_revoke(req->r_request); + ceph_msg_revoke_incoming(req->r_reply); +} + +static void finish_request(struct ceph_osd_request *req) +{ + __finish_request(req); + ceph_osdc_put_request(req); } static void __complete_request(struct ceph_osd_request *req) @@ -1747,6 +1666,13 @@ static void __complete_request(struct ceph_osd_request *req) complete_all(&req->r_completion); } +static void cancel_request(struct ceph_osd_request *req) +{ + dout("%s req %p tid %llu\n", __func__, req, req->r_tid); + + finish_request(req); +} + /* * Timeout callback, called every N seconds. When 1 or more OSD * requests has been active for more than N seconds, we send a keepalive @@ -1758,44 +1684,49 @@ static void handle_timeout(struct work_struct *work) struct ceph_osd_client *osdc = container_of(work, struct ceph_osd_client, timeout_work.work); struct ceph_options *opts = osdc->client->options; - struct ceph_osd_request *req; - struct ceph_osd *osd; - struct list_head slow_osds; - dout("timeout\n"); - down_read(&osdc->map_sem); - - ceph_monc_request_next_osdmap(&osdc->client->monc); + unsigned long cutoff = jiffies - opts->osd_keepalive_timeout; + LIST_HEAD(slow_osds); + struct rb_node *n, *p; - mutex_lock(&osdc->request_mutex); + dout("%s osdc %p\n", __func__, osdc); + down_write(&osdc->lock); /* * ping osds that are a bit slow. this ensures that if there * is a break in the TCP connection we will notice, and reopen * a connection with that osd (from the fault callback). */ - INIT_LIST_HEAD(&slow_osds); - list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) { - if (time_before(jiffies, - req->r_stamp + opts->osd_keepalive_timeout)) - break; + for (n = rb_first(&osdc->osds); n; n = rb_next(n)) { + struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); + bool found = false; + + for (p = rb_first(&osd->o_requests); p; p = rb_next(p)) { + struct ceph_osd_request *req = + rb_entry(p, struct ceph_osd_request, r_node); + + if (time_before(req->r_stamp, cutoff)) { + dout(" req %p tid %llu on osd%d is laggy\n", + req, req->r_tid, osd->o_osd); + found = true; + } + } - osd = req->r_osd; - BUG_ON(!osd); - dout(" tid %llu is slow, will send keepalive on osd%d\n", - req->r_tid, osd->o_osd); - list_move_tail(&osd->o_keepalive_item, &slow_osds); + if (found) + list_move_tail(&osd->o_keepalive_item, &slow_osds); } + + if (atomic_read(&osdc->num_homeless) || !list_empty(&slow_osds)) + maybe_request_map(osdc); + while (!list_empty(&slow_osds)) { - osd = list_entry(slow_osds.next, struct ceph_osd, - o_keepalive_item); + struct ceph_osd *osd = list_first_entry(&slow_osds, + struct ceph_osd, + o_keepalive_item); list_del_init(&osd->o_keepalive_item); ceph_con_keepalive(&osd->o_con); } - __send_queued(osdc); - mutex_unlock(&osdc->request_mutex); - up_read(&osdc->map_sem); - + up_write(&osdc->lock); schedule_delayed_work(&osdc->timeout_work, osdc->client->options->osd_keepalive_timeout); } @@ -1809,18 +1740,17 @@ static void handle_osds_timeout(struct work_struct *work) struct ceph_osd *osd, *nosd; dout("%s osdc %p\n", __func__, osdc); - down_read(&osdc->map_sem); - mutex_lock(&osdc->request_mutex); - + down_write(&osdc->lock); list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) { if (time_before(jiffies, osd->lru_ttl)) break; - remove_osd(osdc, osd); + WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests)); + WARN_ON(!list_empty(&osd->o_linger_requests)); + close_osd(osd); } - mutex_unlock(&osdc->request_mutex); - up_read(&osdc->map_sem); + up_write(&osdc->lock); schedule_delayed_work(&osdc->osds_timeout_work, round_jiffies_relative(delay)); } @@ -2045,8 +1975,9 @@ static bool done_request(const struct ceph_osd_request *req, * when we get the safe reply r_unsafe_cb(false), r_cb/r_completion, * r_safe_completion r_safe_completion */ -static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) +static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg) { + struct ceph_osd_client *osdc = osd->o_osdc; struct ceph_osd_request *req; struct MOSDOpReply m; u64 tid = le64_to_cpu(msg->hdr.tid); @@ -2057,14 +1988,19 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) dout("%s msg %p tid %llu\n", __func__, msg, tid); - down_read(&osdc->map_sem); - mutex_lock(&osdc->request_mutex); - req = lookup_request(&osdc->requests, tid); + down_read(&osdc->lock); + if (!osd_registered(osd)) { + dout("%s osd%d unknown\n", __func__, osd->o_osd); + goto out_unlock_osdc; + } + WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num)); + + mutex_lock(&osd->lock); + req = lookup_request(&osd->o_requests, tid); if (!req) { - dout("%s no tid %llu\n", __func__, tid); - goto out_unlock; + dout("%s osd%d tid %llu unknown\n", __func__, osd->o_osd, tid); + goto out_unlock_session; } - ceph_osdc_get_request(req); ret = decode_MOSDOpReply(msg, &m); if (ret) { @@ -2083,7 +2019,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) dout("req %p tid %llu retry_attempt %d != %d, ignoring\n", req, req->r_tid, m.retry_attempt, req->r_attempts - 1); - goto out_put; + goto out_unlock_session; } } else { WARN_ON(1); /* MOSDOpReply v4 is assumed */ @@ -2092,22 +2028,14 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) if (!ceph_oloc_empty(&m.redirect.oloc)) { dout("req %p tid %llu redirect pool %lld\n", req, req->r_tid, m.redirect.oloc.pool); - __unregister_request(osdc, req); + unlink_request(osd, req); + mutex_unlock(&osd->lock); ceph_oloc_copy(&req->r_t.target_oloc, &m.redirect.oloc); - - /* - * Start redirect requests with nofail=true. If - * mapping fails, request will end up on the notarget - * list, waiting for the new osdmap (which can take - * a while), even though the original request mapped - * successfully. In the future we might want to follow - * original request's nofail setting here. - */ - ret = __ceph_osdc_start_request(osdc, req, true); - BUG_ON(ret); - - goto out_put; + req->r_flags |= CEPH_OSD_FLAG_REDIRECTED; + req->r_tid = 0; + __submit_request(req, false); + goto out_unlock_osdc; } if (m.num_ops != req->r_num_ops) { @@ -2137,19 +2065,19 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) req->r_got_reply = true; } else if (!(m.flags & CEPH_OSD_FLAG_ONDISK)) { dout("req %p tid %llu dup ack\n", req, req->r_tid); - goto out_put; + goto out_unlock_session; } if (done_request(req, &m)) { - __unregister_request(osdc, req); + __finish_request(req); if (req->r_linger) { WARN_ON(req->r_unsafe_callback); - __register_linger_request(osdc, req); + __register_linger_request(osd, req); } } - mutex_unlock(&osdc->request_mutex); - up_read(&osdc->map_sem); + mutex_unlock(&osd->lock); + up_read(&osdc->lock); if (done_request(req, &m)) { if (already_acked && req->r_unsafe_callback) { @@ -2175,14 +2103,13 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) fail_request: req->r_result = -EIO; - __unregister_request(osdc, req); + __finish_request(req); __complete_request(req); complete_all(&req->r_safe_completion); -out_put: - ceph_osdc_put_request(req); -out_unlock: - mutex_unlock(&osdc->request_mutex); - up_read(&osdc->map_sem); +out_unlock_session: + mutex_unlock(&osd->lock); +out_unlock_osdc: + up_read(&osdc->lock); } static void set_pool_was_full(struct ceph_osd_client *osdc) @@ -2197,126 +2124,66 @@ static void set_pool_was_full(struct ceph_osd_client *osdc) } } -static void reset_changed_osds(struct ceph_osd_client *osdc) +static bool pool_cleared_full(struct ceph_osd_client *osdc, s64 pool_id) { - struct rb_node *p, *n; + struct ceph_pg_pool_info *pi; - dout("%s %p\n", __func__, osdc); - for (p = rb_first(&osdc->osds); p; p = n) { - struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node); + pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id); + if (!pi) + return false; - n = rb_next(p); - if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) || - memcmp(&osd->o_con.peer_addr, - ceph_osd_addr(osdc->osdmap, - osd->o_osd), - sizeof(struct ceph_entity_addr)) != 0) - __reset_osd(osdc, osd); - } + return pi->was_full && !__pool_full(pi); } /* - * Requeue requests whose mapping to an OSD has changed. If requests map to - * no osd, request a new map. - * - * Caller should hold map_sem for read. + * Requeue requests whose mapping to an OSD has changed. */ -static void kick_requests(struct ceph_osd_client *osdc, bool force_resend, - bool force_resend_writes) +static void scan_requests(struct ceph_osd *osd, + bool force_resend, + bool cleared_full, + bool check_pool_cleared_full, + struct rb_root *need_resend, + struct list_head *need_resend_linger) { - struct ceph_osd_request *req, *nreq; - struct rb_node *p; - int needmap = 0; - int err; - bool force_resend_req; - - dout("kick_requests %s %s\n", force_resend ? " (force resend)" : "", - force_resend_writes ? " (force resend writes)" : ""); - mutex_lock(&osdc->request_mutex); - for (p = rb_first(&osdc->requests); p; ) { - req = rb_entry(p, struct ceph_osd_request, r_node); - p = rb_next(p); - - /* - * For linger requests that have not yet been - * registered, move them to the linger list; they'll - * be sent to the osd in the loop below. Unregister - * the request before re-registering it as a linger - * request to ensure the __map_request() below - * will decide it needs to be sent. - */ - if (req->r_linger && list_empty(&req->r_linger_item)) { - dout("%p tid %llu restart on osd%d\n", - req, req->r_tid, - req->r_osd ? req->r_osd->o_osd : -1); - ceph_osdc_get_request(req); - __unregister_request(osdc, req); - __register_linger_request(osdc, req); - ceph_osdc_put_request(req); - continue; - } - - force_resend_req = force_resend || - (force_resend_writes && - req->r_flags & CEPH_OSD_FLAG_WRITE); - err = __map_request(osdc, req, force_resend_req); - if (err < 0) - continue; /* error */ - if (req->r_osd == NULL) { - dout("%p tid %llu maps to no osd\n", req, req->r_tid); - needmap++; /* request a newer map */ - } else if (err > 0) { - if (!req->r_linger) { - dout("%p tid %llu requeued on osd%d\n", req, - req->r_tid, - req->r_osd ? req->r_osd->o_osd : -1); - req->r_flags |= CEPH_OSD_FLAG_RETRY; - } - } - } - - list_for_each_entry_safe(req, nreq, &osdc->req_linger, - r_linger_item) { - dout("linger req=%p req->r_osd=%p\n", req, req->r_osd); - - err = __map_request(osdc, req, - force_resend || force_resend_writes); - dout("__map_request returned %d\n", err); - if (err < 0) - continue; /* hrm! */ - if (req->r_osd == NULL || err > 0) { - if (req->r_osd == NULL) { - dout("lingering %p tid %llu maps to no osd\n", - req, req->r_tid); - /* - * A homeless lingering request makes - * no sense, as it's job is to keep - * a particular OSD connection open. - * Request a newer map and kick the - * request, knowing that it won't be - * resent until we actually get a map - * that can tell us where to send it. - */ - needmap++; - } - - dout("kicking lingering %p tid %llu osd%d\n", req, - req->r_tid, req->r_osd ? req->r_osd->o_osd : -1); - __register_request(osdc, req); - __unregister_linger_request(osdc, req); + struct ceph_osd_client *osdc = osd->o_osdc; + struct rb_node *n; + bool force_resend_writes; + + for (n = rb_first(&osd->o_requests); n; ) { + struct ceph_osd_request *req = + rb_entry(n, struct ceph_osd_request, r_node); + enum calc_target_result ct_res; + + n = rb_next(n); /* unlink_request() */ + + dout("%s req %p tid %llu\n", __func__, req, req->r_tid); + ct_res = calc_target(osdc, &req->r_t, + &req->r_last_force_resend, false); + switch (ct_res) { + case CALC_TARGET_NO_ACTION: + force_resend_writes = cleared_full || + (check_pool_cleared_full && + pool_cleared_full(osdc, req->r_t.base_oloc.pool)); + if (!force_resend && + (!(req->r_flags & CEPH_OSD_FLAG_WRITE) || + !force_resend_writes)) + break; + + /* fall through */ + case CALC_TARGET_NEED_RESEND: + unlink_request(osd, req); + insert_request(need_resend, req); + break; + case CALC_TARGET_POOL_DNE: + break; } } - reset_changed_osds(osdc); - mutex_unlock(&osdc->request_mutex); - - if (needmap) { - dout("%d requests for down osds, need new map\n", needmap); - ceph_monc_request_next_osdmap(&osdc->client->monc); - } } static int handle_one_map(struct ceph_osd_client *osdc, - void *p, void *end, bool incremental) + void *p, void *end, bool incremental, + struct rb_root *need_resend, + struct list_head *need_resend_linger) { struct ceph_osdmap *newmap; struct rb_node *n; @@ -2362,11 +2229,51 @@ static int handle_one_map(struct ceph_osd_client *osdc, } was_full &= !ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL); - kick_requests(osdc, skipped_map, was_full); + scan_requests(&osdc->homeless_osd, skipped_map, was_full, true, + need_resend, need_resend_linger); + + for (n = rb_first(&osdc->osds); n; ) { + struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); + + n = rb_next(n); /* close_osd() */ + + scan_requests(osd, skipped_map, was_full, true, need_resend, + need_resend_linger); + if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) || + memcmp(&osd->o_con.peer_addr, + ceph_osd_addr(osdc->osdmap, osd->o_osd), + sizeof(struct ceph_entity_addr))) + close_osd(osd); + } return 0; } +static void kick_requests(struct ceph_osd_client *osdc, + struct rb_root *need_resend, + struct list_head *need_resend_linger) +{ + struct rb_node *n; + + for (n = rb_first(need_resend); n; ) { + struct ceph_osd_request *req = + rb_entry(n, struct ceph_osd_request, r_node); + struct ceph_osd *osd; + + n = rb_next(n); + erase_request(need_resend, req); /* before link_request() */ + + WARN_ON(req->r_osd); + calc_target(osdc, &req->r_t, NULL, false); + osd = lookup_create_osd(osdc, req->r_t.osd, true); + link_request(osd, req); + if (!req->r_linger) { + if (!osd_homeless(osd) && !req->r_t.paused) + send_request(req); + } + } +} + /* * Process updated osd map. * @@ -2381,13 +2288,15 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) u32 nr_maps, maplen; u32 epoch; struct ceph_fsid fsid; + struct rb_root need_resend = RB_ROOT; + LIST_HEAD(need_resend_linger); bool handled_incremental = false; bool was_pauserd, was_pausewr; bool pauserd, pausewr; int err; dout("%s have %u\n", __func__, osdc->osdmap->epoch); - down_write(&osdc->map_sem); + down_write(&osdc->lock); /* verify fsid */ ceph_decode_need(&p, end, sizeof(fsid), bad); @@ -2412,7 +2321,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) osdc->osdmap->epoch + 1 == epoch) { dout("applying incremental map %u len %d\n", epoch, maplen); - err = handle_one_map(osdc, p, p + maplen, true); + err = handle_one_map(osdc, p, p + maplen, true, + &need_resend, &need_resend_linger); if (err) goto bad; handled_incremental = true; @@ -2443,7 +2353,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) osdc->osdmap->epoch); } else { dout("taking full map %u len %d\n", epoch, maplen); - err = handle_one_map(osdc, p, p + maplen, false); + err = handle_one_map(osdc, p, p + maplen, false, + &need_resend, &need_resend_linger); if (err) goto bad; } @@ -2464,20 +2375,60 @@ done: if (was_pauserd || was_pausewr || pauserd || pausewr) maybe_request_map(osdc); - mutex_lock(&osdc->request_mutex); - __send_queued(osdc); - mutex_unlock(&osdc->request_mutex); + kick_requests(osdc, &need_resend, &need_resend_linger); ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP, osdc->osdmap->epoch); - up_write(&osdc->map_sem); + up_write(&osdc->lock); wake_up_all(&osdc->client->auth_wq); return; bad: pr_err("osdc handle_map corrupt msg\n"); ceph_msg_dump(msg); - up_write(&osdc->map_sem); + up_write(&osdc->lock); +} + +/* + * Resubmit requests pending on the given osd. + */ +static void kick_osd_requests(struct ceph_osd *osd) +{ + struct rb_node *n; + + for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) { + struct ceph_osd_request *req = + rb_entry(n, struct ceph_osd_request, r_node); + + if (!req->r_linger) { + if (!req->r_t.paused) + send_request(req); + } + } +} + +/* + * If the osd connection drops, we need to resubmit all requests. + */ +static void osd_fault(struct ceph_connection *con) +{ + struct ceph_osd *osd = con->private; + struct ceph_osd_client *osdc = osd->o_osdc; + + dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd); + + down_write(&osdc->lock); + if (!osd_registered(osd)) { + dout("%s osd%d unknown\n", __func__, osd->o_osd); + goto out_unlock; + } + + if (!reopen_osd(osd)) + kick_osd_requests(osd); + maybe_request_map(osdc); + +out_unlock: + up_write(&osdc->lock); } /* @@ -2680,17 +2631,11 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req, bool nofail) { - int rc; - - down_read(&osdc->map_sem); - mutex_lock(&osdc->request_mutex); - - rc = __ceph_osdc_start_request(osdc, req, nofail); + down_read(&osdc->lock); + submit_request(req, false); + up_read(&osdc->lock); - mutex_unlock(&osdc->request_mutex); - up_read(&osdc->map_sem); - - return rc; + return 0; } EXPORT_SYMBOL(ceph_osdc_start_request); @@ -2703,13 +2648,12 @@ void ceph_osdc_cancel_request(struct ceph_osd_request *req) { struct ceph_osd_client *osdc = req->r_osdc; - mutex_lock(&osdc->request_mutex); + down_write(&osdc->lock); if (req->r_linger) __unregister_linger_request(osdc, req); - __unregister_request(osdc, req); - mutex_unlock(&osdc->request_mutex); - - dout("%s %p tid %llu canceled\n", __func__, req, req->r_tid); + if (req->r_osd) + cancel_request(req); + up_write(&osdc->lock); } EXPORT_SYMBOL(ceph_osdc_cancel_request); @@ -2744,32 +2688,40 @@ EXPORT_SYMBOL(ceph_osdc_wait_request); */ void ceph_osdc_sync(struct ceph_osd_client *osdc) { - struct ceph_osd_request *req; - u64 last_tid, next_tid = 0; + struct rb_node *n, *p; + u64 last_tid = atomic64_read(&osdc->last_tid); - mutex_lock(&osdc->request_mutex); - last_tid = osdc->last_tid; - while (1) { - req = __lookup_request_ge(osdc, next_tid); - if (!req) - break; - if (req->r_tid > last_tid) - break; +again: + down_read(&osdc->lock); + for (n = rb_first(&osdc->osds); n; n = rb_next(n)) { + struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); + + mutex_lock(&osd->lock); + for (p = rb_first(&osd->o_requests); p; p = rb_next(p)) { + struct ceph_osd_request *req = + rb_entry(p, struct ceph_osd_request, r_node); + + if (req->r_tid > last_tid) + break; + + if (!(req->r_flags & CEPH_OSD_FLAG_WRITE)) + continue; - next_tid = req->r_tid + 1; - if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0) - continue; + ceph_osdc_get_request(req); + mutex_unlock(&osd->lock); + up_read(&osdc->lock); + dout("%s waiting on req %p tid %llu last_tid %llu\n", + __func__, req, req->r_tid, last_tid); + wait_for_completion(&req->r_safe_completion); + ceph_osdc_put_request(req); + goto again; + } - ceph_osdc_get_request(req); - mutex_unlock(&osdc->request_mutex); - dout("sync waiting on tid %llu (last is %llu)\n", - req->r_tid, last_tid); - wait_for_completion(&req->r_safe_completion); - mutex_lock(&osdc->request_mutex); - ceph_osdc_put_request(req); + mutex_unlock(&osd->lock); } - mutex_unlock(&osdc->request_mutex); - dout("sync done (thru tid %llu)\n", last_tid); + + up_read(&osdc->lock); + dout("%s done last_tid %llu\n", __func__, last_tid); } EXPORT_SYMBOL(ceph_osdc_sync); @@ -2793,18 +2745,14 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) dout("init\n"); osdc->client = client; - init_rwsem(&osdc->map_sem); - mutex_init(&osdc->request_mutex); - osdc->last_tid = 0; + init_rwsem(&osdc->lock); osdc->osds = RB_ROOT; INIT_LIST_HEAD(&osdc->osd_lru); spin_lock_init(&osdc->osd_lru_lock); - osdc->requests = RB_ROOT; - INIT_LIST_HEAD(&osdc->req_lru); - INIT_LIST_HEAD(&osdc->req_unsent); - INIT_LIST_HEAD(&osdc->req_notarget); INIT_LIST_HEAD(&osdc->req_linger); - osdc->num_requests = 0; + osd_init(&osdc->homeless_osd); + osdc->homeless_osd.o_osdc = osdc; + osdc->homeless_osd.o_osd = CEPH_HOMELESS_OSD; INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); spin_lock_init(&osdc->event_lock); @@ -2861,13 +2809,19 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc) cancel_delayed_work_sync(&osdc->timeout_work); cancel_delayed_work_sync(&osdc->osds_timeout_work); - mutex_lock(&osdc->request_mutex); + down_write(&osdc->lock); while (!RB_EMPTY_ROOT(&osdc->osds)) { struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds), struct ceph_osd, o_node); - remove_osd(osdc, osd); + close_osd(osd); } - mutex_unlock(&osdc->request_mutex); + up_write(&osdc->lock); + WARN_ON(atomic_read(&osdc->homeless_osd.o_ref) != 1); + osd_cleanup(&osdc->homeless_osd); + + WARN_ON(!list_empty(&osdc->osd_lru)); + WARN_ON(atomic_read(&osdc->num_requests)); + WARN_ON(atomic_read(&osdc->num_homeless)); ceph_osdmap_destroy(osdc->osdmap); mempool_destroy(osdc->req_mempool); @@ -2982,19 +2936,15 @@ EXPORT_SYMBOL(ceph_osdc_cleanup); static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) { struct ceph_osd *osd = con->private; - struct ceph_osd_client *osdc; + struct ceph_osd_client *osdc = osd->o_osdc; int type = le16_to_cpu(msg->hdr.type); - if (!osd) - goto out; - osdc = osd->o_osdc; - switch (type) { case CEPH_MSG_OSD_MAP: ceph_osdc_handle_map(osdc, msg); break; case CEPH_MSG_OSD_OPREPLY: - handle_reply(osdc, msg); + handle_reply(osd, msg); break; case CEPH_MSG_WATCH_NOTIFY: handle_watch_notify(osdc, msg); @@ -3004,7 +2954,7 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) pr_err("received unknown message type %d %s\n", type, ceph_msg_type_name(type)); } -out: + ceph_msg_put(msg); } @@ -3019,21 +2969,27 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, { struct ceph_osd *osd = con->private; struct ceph_osd_client *osdc = osd->o_osdc; - struct ceph_msg *m; + struct ceph_msg *m = NULL; struct ceph_osd_request *req; int front_len = le32_to_cpu(hdr->front_len); int data_len = le32_to_cpu(hdr->data_len); - u64 tid; + u64 tid = le64_to_cpu(hdr->tid); - tid = le64_to_cpu(hdr->tid); - mutex_lock(&osdc->request_mutex); - req = lookup_request(&osdc->requests, tid); + down_read(&osdc->lock); + if (!osd_registered(osd)) { + dout("%s osd%d unknown, skipping\n", __func__, osd->o_osd); + *skip = 1; + goto out_unlock_osdc; + } + WARN_ON(osd->o_osd != le64_to_cpu(hdr->src.num)); + + mutex_lock(&osd->lock); + req = lookup_request(&osd->o_requests, tid); if (!req) { dout("%s osd%d tid %llu unknown, skipping\n", __func__, osd->o_osd, tid); - m = NULL; *skip = 1; - goto out; + goto out_unlock_session; } ceph_msg_revoke_incoming(req->r_reply); @@ -3045,7 +3001,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS, false); if (!m) - goto out; + goto out_unlock_session; ceph_msg_put(req->r_reply); req->r_reply = m; } @@ -3056,14 +3012,16 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, req->r_reply->data_length); m = NULL; *skip = 1; - goto out; + goto out_unlock_session; } m = ceph_msg_get(req->r_reply); dout("get_reply tid %lld %p\n", tid, m); -out: - mutex_unlock(&osdc->request_mutex); +out_unlock_session: + mutex_unlock(&osd->lock); +out_unlock_osdc: + up_read(&osdc->lock); return m; } @@ -3083,8 +3041,8 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con, case CEPH_MSG_OSD_OPREPLY: return get_reply(con, hdr, skip); default: - pr_info("alloc_msg unexpected msg type %d from osd%d\n", type, - osd->o_osd); + pr_warn("%s osd%d unknown msg type %d, skipping\n", __func__, + osd->o_osd, type); *skip = 1; return NULL; } @@ -3188,5 +3146,5 @@ static const struct ceph_connection_operations osd_con_ops = { .alloc_msg = alloc_msg, .sign_message = osd_sign_message, .check_message_signature = osd_check_message_signature, - .fault = osd_reset, + .fault = osd_fault, }; |