diff options
Diffstat (limited to 'drivers/block/drbd')
-rw-r--r-- | drivers/block/drbd/drbd_bitmap.c | 21 | ||||
-rw-r--r-- | drivers/block/drbd/drbd_int.h | 149 | ||||
-rw-r--r-- | drivers/block/drbd/drbd_main.c | 154 | ||||
-rw-r--r-- | drivers/block/drbd/drbd_nl.c | 52 | ||||
-rw-r--r-- | drivers/block/drbd/drbd_proc.c | 19 | ||||
-rw-r--r-- | drivers/block/drbd/drbd_receiver.c | 689 | ||||
-rw-r--r-- | drivers/block/drbd/drbd_req.c | 94 | ||||
-rw-r--r-- | drivers/block/drbd/drbd_req.h | 1 | ||||
-rw-r--r-- | drivers/block/drbd/drbd_strings.c | 2 | ||||
-rw-r--r-- | drivers/block/drbd/drbd_worker.c | 230 | ||||
-rw-r--r-- | drivers/block/drbd/drbd_wrappers.h | 16 |
11 files changed, 917 insertions, 510 deletions
diff --git a/drivers/block/drbd/drbd_bitmap.c b/drivers/block/drbd/drbd_bitmap.c index 3390716898d5..e3f88d6e1412 100644 --- a/drivers/block/drbd/drbd_bitmap.c +++ b/drivers/block/drbd/drbd_bitmap.c @@ -84,6 +84,9 @@ struct drbd_bitmap { #define BM_MD_IO_ERROR 1 #define BM_P_VMALLOCED 2 +static int __bm_change_bits_to(struct drbd_conf *mdev, const unsigned long s, + unsigned long e, int val, const enum km_type km); + static int bm_is_locked(struct drbd_bitmap *b) { return test_bit(BM_LOCKED, &b->bm_flags); @@ -441,7 +444,7 @@ static void bm_memset(struct drbd_bitmap *b, size_t offset, int c, size_t len) * In case this is actually a resize, we copy the old bitmap into the new one. * Otherwise, the bitmap is initialized to all bits set. */ -int drbd_bm_resize(struct drbd_conf *mdev, sector_t capacity) +int drbd_bm_resize(struct drbd_conf *mdev, sector_t capacity, int set_new_bits) { struct drbd_bitmap *b = mdev->bitmap; unsigned long bits, words, owords, obits, *p_addr, *bm; @@ -516,7 +519,7 @@ int drbd_bm_resize(struct drbd_conf *mdev, sector_t capacity) obits = b->bm_bits; growing = bits > obits; - if (opages) + if (opages && growing && set_new_bits) bm_set_surplus(b); b->bm_pages = npages; @@ -526,8 +529,12 @@ int drbd_bm_resize(struct drbd_conf *mdev, sector_t capacity) b->bm_dev_capacity = capacity; if (growing) { - bm_memset(b, owords, 0xff, words-owords); - b->bm_set += bits - obits; + if (set_new_bits) { + bm_memset(b, owords, 0xff, words-owords); + b->bm_set += bits - obits; + } else + bm_memset(b, owords, 0x00, words-owords); + } if (want < have) { @@ -773,7 +780,7 @@ static void bm_page_io_async(struct drbd_conf *mdev, struct drbd_bitmap *b, int /* nothing to do, on disk == in memory */ # define bm_cpu_to_lel(x) ((void)0) # else -void bm_cpu_to_lel(struct drbd_bitmap *b) +static void bm_cpu_to_lel(struct drbd_bitmap *b) { /* need to cpu_to_lel all the pages ... * this may be optimized by using @@ -1015,7 +1022,7 @@ unsigned long _drbd_bm_find_next_zero(struct drbd_conf *mdev, unsigned long bm_f * wants bitnr, not sector. * expected to be called for only a few bits (e - s about BITS_PER_LONG). * Must hold bitmap lock already. */ -int __bm_change_bits_to(struct drbd_conf *mdev, const unsigned long s, +static int __bm_change_bits_to(struct drbd_conf *mdev, const unsigned long s, unsigned long e, int val, const enum km_type km) { struct drbd_bitmap *b = mdev->bitmap; @@ -1053,7 +1060,7 @@ int __bm_change_bits_to(struct drbd_conf *mdev, const unsigned long s, * for val != 0, we change 0 -> 1, return code positive * for val == 0, we change 1 -> 0, return code negative * wants bitnr, not sector */ -int bm_change_bits_to(struct drbd_conf *mdev, const unsigned long s, +static int bm_change_bits_to(struct drbd_conf *mdev, const unsigned long s, const unsigned long e, int val) { unsigned long flags; diff --git a/drivers/block/drbd/drbd_int.h b/drivers/block/drbd/drbd_int.h index e5e86a781820..485ed8c7d623 100644 --- a/drivers/block/drbd/drbd_int.h +++ b/drivers/block/drbd/drbd_int.h @@ -132,6 +132,7 @@ enum { DRBD_FAULT_DT_RA = 6, /* data read ahead */ DRBD_FAULT_BM_ALLOC = 7, /* bitmap allocation */ DRBD_FAULT_AL_EE = 8, /* alloc ee */ + DRBD_FAULT_RECEIVE = 9, /* Changes some bytes upon receiving a [rs]data block */ DRBD_FAULT_MAX, }; @@ -208,8 +209,11 @@ enum drbd_packets { P_RS_IS_IN_SYNC = 0x22, /* meta socket */ P_SYNC_PARAM89 = 0x23, /* data socket, protocol version 89 replacement for P_SYNC_PARAM */ P_COMPRESSED_BITMAP = 0x24, /* compressed or otherwise encoded bitmap transfer */ + /* P_CKPT_FENCE_REQ = 0x25, * currently reserved for protocol D */ + /* P_CKPT_DISABLE_REQ = 0x26, * currently reserved for protocol D */ + P_DELAY_PROBE = 0x27, /* is used on BOTH sockets */ - P_MAX_CMD = 0x25, + P_MAX_CMD = 0x28, P_MAY_IGNORE = 0x100, /* Flag to test if (cmd > P_MAY_IGNORE) ... */ P_MAX_OPT_CMD = 0x101, @@ -264,6 +268,7 @@ static inline const char *cmdname(enum drbd_packets cmd) [P_CSUM_RS_REQUEST] = "CsumRSRequest", [P_RS_IS_IN_SYNC] = "CsumRSIsInSync", [P_COMPRESSED_BITMAP] = "CBitmap", + [P_DELAY_PROBE] = "DelayProbe", [P_MAX_CMD] = NULL, }; @@ -481,7 +486,8 @@ struct p_sizes { u64 u_size; /* user requested size */ u64 c_size; /* current exported size */ u32 max_segment_size; /* Maximal size of a BIO */ - u32 queue_order_type; + u16 queue_order_type; /* not yet implemented in DRBD*/ + u16 dds_flags; /* use enum dds_flags here. */ } __packed; struct p_state { @@ -538,6 +544,18 @@ struct p_compressed_bm { u8 code[0]; } __packed; +struct p_delay_probe { + struct p_header head; + u32 seq_num; /* sequence number to match the two probe packets */ + u32 offset; /* usecs the probe got sent after the reference time point */ +} __packed; + +struct delay_probe { + struct list_head list; + unsigned int seq_num; + struct timeval time; +}; + /* DCBP: Drbd Compressed Bitmap Packet ... */ static inline enum drbd_bitmap_code DCBP_get_code(struct p_compressed_bm *p) @@ -722,22 +740,6 @@ enum epoch_event { EV_CLEANUP = 32, /* used as flag */ }; -struct drbd_epoch_entry { - struct drbd_work w; - struct drbd_conf *mdev; - struct bio *private_bio; - struct hlist_node colision; - sector_t sector; - unsigned int size; - struct drbd_epoch *epoch; - - /* up to here, the struct layout is identical to drbd_request; - * we might be able to use that to our advantage... */ - - unsigned int flags; - u64 block_id; -}; - struct drbd_wq_barrier { struct drbd_work w; struct completion done; @@ -748,17 +750,49 @@ struct digest_info { void *digest; }; -/* ee flag bits */ +struct drbd_epoch_entry { + struct drbd_work w; + struct hlist_node colision; + struct drbd_epoch *epoch; + struct drbd_conf *mdev; + struct page *pages; + atomic_t pending_bios; + unsigned int size; + /* see comments on ee flag bits below */ + unsigned long flags; + sector_t sector; + u64 block_id; +}; + +/* ee flag bits. + * While corresponding bios are in flight, the only modification will be + * set_bit WAS_ERROR, which has to be atomic. + * If no bios are in flight yet, or all have been completed, + * non-atomic modification to ee->flags is ok. + */ enum { __EE_CALL_AL_COMPLETE_IO, - __EE_CONFLICT_PENDING, __EE_MAY_SET_IN_SYNC, + + /* This epoch entry closes an epoch using a barrier. + * On sucessful completion, the epoch is released, + * and the P_BARRIER_ACK send. */ __EE_IS_BARRIER, + + /* In case a barrier failed, + * we need to resubmit without the barrier flag. */ + __EE_RESUBMITTED, + + /* we may have several bios per epoch entry. + * if any of those fail, we set this flag atomically + * from the endio callback */ + __EE_WAS_ERROR, }; #define EE_CALL_AL_COMPLETE_IO (1<<__EE_CALL_AL_COMPLETE_IO) -#define EE_CONFLICT_PENDING (1<<__EE_CONFLICT_PENDING) #define EE_MAY_SET_IN_SYNC (1<<__EE_MAY_SET_IN_SYNC) #define EE_IS_BARRIER (1<<__EE_IS_BARRIER) +#define EE_RESUBMITTED (1<<__EE_RESUBMITTED) +#define EE_WAS_ERROR (1<<__EE_WAS_ERROR) /* global flag bits */ enum { @@ -908,9 +942,11 @@ struct drbd_conf { unsigned int ko_count; struct drbd_work resync_work, unplug_work, - md_sync_work; + md_sync_work, + delay_probe_work; struct timer_list resync_timer; struct timer_list md_sync_timer; + struct timer_list delay_probe_timer; /* Used after attach while negotiating new disk state. */ union drbd_state new_state_tmp; @@ -1026,6 +1062,12 @@ struct drbd_conf { u64 ed_uuid; /* UUID of the exposed data */ struct mutex state_mutex; char congestion_reason; /* Why we where congested... */ + struct list_head delay_probes; /* protected by peer_seq_lock */ + int data_delay; /* Delay of packets on the data-sock behind meta-sock */ + unsigned int delay_seq; /* To generate sequence numbers of delay probes */ + struct timeval dps_time; /* delay-probes-start-time */ + unsigned int dp_volume_last; /* send_cnt of last delay probe */ + int c_sync_rate; /* current resync rate after delay_probe magic */ }; static inline struct drbd_conf *minor_to_mdev(unsigned int minor) @@ -1081,6 +1123,11 @@ enum chg_state_flags { CS_ORDERED = CS_WAIT_COMPLETE + CS_SERIALIZE, }; +enum dds_flags { + DDSF_FORCED = 1, + DDSF_NO_RESYNC = 2, /* Do not run a resync for the new space */ +}; + extern void drbd_init_set_defaults(struct drbd_conf *mdev); extern int drbd_change_state(struct drbd_conf *mdev, enum chg_state_flags f, union drbd_state mask, union drbd_state val); @@ -1113,7 +1160,7 @@ extern int drbd_send_protocol(struct drbd_conf *mdev); extern int drbd_send_uuids(struct drbd_conf *mdev); extern int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev); extern int drbd_send_sync_uuid(struct drbd_conf *mdev, u64 val); -extern int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply); +extern int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags); extern int _drbd_send_state(struct drbd_conf *mdev); extern int drbd_send_state(struct drbd_conf *mdev); extern int _drbd_send_cmd(struct drbd_conf *mdev, struct socket *sock, @@ -1311,7 +1358,7 @@ struct bm_extent { #define APP_R_HSIZE 15 extern int drbd_bm_init(struct drbd_conf *mdev); -extern int drbd_bm_resize(struct drbd_conf *mdev, sector_t sectors); +extern int drbd_bm_resize(struct drbd_conf *mdev, sector_t sectors, int set_new_bits); extern void drbd_bm_cleanup(struct drbd_conf *mdev); extern void drbd_bm_set_all(struct drbd_conf *mdev); extern void drbd_bm_clear_all(struct drbd_conf *mdev); @@ -1383,7 +1430,7 @@ extern void drbd_resume_io(struct drbd_conf *mdev); extern char *ppsize(char *buf, unsigned long long size); extern sector_t drbd_new_dev_size(struct drbd_conf *, struct drbd_backing_dev *, int); enum determine_dev_size { dev_size_error = -1, unchanged = 0, shrunk = 1, grew = 2 }; -extern enum determine_dev_size drbd_determin_dev_size(struct drbd_conf *, int force) __must_hold(local); +extern enum determine_dev_size drbd_determin_dev_size(struct drbd_conf *, enum dds_flags) __must_hold(local); extern void resync_after_online_grow(struct drbd_conf *); extern void drbd_setup_queue_param(struct drbd_conf *mdev, unsigned int) __must_hold(local); extern int drbd_set_role(struct drbd_conf *mdev, enum drbd_role new_role, @@ -1414,7 +1461,8 @@ static inline void ov_oos_print(struct drbd_conf *mdev) } -extern void drbd_csum(struct drbd_conf *, struct crypto_hash *, struct bio *, void *); +extern void drbd_csum_bio(struct drbd_conf *, struct crypto_hash *, struct bio *, void *); +extern void drbd_csum_ee(struct drbd_conf *, struct crypto_hash *, struct drbd_epoch_entry *, void *); /* worker callbacks */ extern int w_req_cancel_conflict(struct drbd_conf *, struct drbd_work *, int); extern int w_read_retry_remote(struct drbd_conf *, struct drbd_work *, int); @@ -1426,7 +1474,6 @@ extern int w_e_end_ov_req(struct drbd_conf *, struct drbd_work *, int); extern int w_ov_finished(struct drbd_conf *, struct drbd_work *, int); extern int w_resync_inactive(struct drbd_conf *, struct drbd_work *, int); extern int w_resume_next_sg(struct drbd_conf *, struct drbd_work *, int); -extern int w_io_error(struct drbd_conf *, struct drbd_work *, int); extern int w_send_write_hint(struct drbd_conf *, struct drbd_work *, int); extern int w_make_resync_request(struct drbd_conf *, struct drbd_work *, int); extern int w_send_dblock(struct drbd_conf *, struct drbd_work *, int); @@ -1438,6 +1485,8 @@ extern int w_e_reissue(struct drbd_conf *, struct drbd_work *, int); extern void resync_timer_fn(unsigned long data); /* drbd_receiver.c */ +extern int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e, + const unsigned rw, const int fault_type); extern int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list); extern struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev, u64 id, @@ -1490,7 +1539,7 @@ static inline void drbd_tcp_nodelay(struct socket *sock) static inline void drbd_tcp_quickack(struct socket *sock) { - int __user val = 1; + int __user val = 2; (void) drbd_setsockopt(sock, SOL_TCP, TCP_QUICKACK, (char __user *)&val, sizeof(val)); } @@ -1593,6 +1642,41 @@ void drbd_bcast_ee(struct drbd_conf *mdev, * inline helper functions *************************/ +/* see also page_chain_add and friends in drbd_receiver.c */ +static inline struct page *page_chain_next(struct page *page) +{ + return (struct page *)page_private(page); +} +#define page_chain_for_each(page) \ + for (; page && ({ prefetch(page_chain_next(page)); 1; }); \ + page = page_chain_next(page)) +#define page_chain_for_each_safe(page, n) \ + for (; page && ({ n = page_chain_next(page); 1; }); page = n) + +static inline int drbd_bio_has_active_page(struct bio *bio) +{ + struct bio_vec *bvec; + int i; + + __bio_for_each_segment(bvec, bio, i, 0) { + if (page_count(bvec->bv_page) > 1) + return 1; + } + + return 0; +} + +static inline int drbd_ee_has_active_page(struct drbd_epoch_entry *e) +{ + struct page *page = e->pages; + page_chain_for_each(page) { + if (page_count(page) > 1) + return 1; + } + return 0; +} + + static inline void drbd_state_lock(struct drbd_conf *mdev) { wait_event(mdev->misc_wait, @@ -1641,7 +1725,7 @@ static inline void __drbd_chk_io_error_(struct drbd_conf *mdev, int forcedetach, switch (mdev->ldev->dc.on_io_error) { case EP_PASS_ON: if (!forcedetach) { - if (printk_ratelimit()) + if (__ratelimit(&drbd_ratelimit_state)) dev_err(DEV, "Local IO failed in %s." "Passing error on...\n", where); break; @@ -2138,7 +2222,7 @@ static inline int __inc_ap_bio_cond(struct drbd_conf *mdev) /* I'd like to use wait_event_lock_irq, * but I'm not sure when it got introduced, * and not sure when it has 3 or 4 arguments */ -static inline void inc_ap_bio(struct drbd_conf *mdev, int one_or_two) +static inline void inc_ap_bio(struct drbd_conf *mdev, int count) { /* compare with after_state_ch, * os.conn != C_WF_BITMAP_S && ns.conn == C_WF_BITMAP_S */ @@ -2160,7 +2244,7 @@ static inline void inc_ap_bio(struct drbd_conf *mdev, int one_or_two) finish_wait(&mdev->misc_wait, &wait); spin_lock_irq(&mdev->req_lock); } - atomic_add(one_or_two, &mdev->ap_bio_cnt); + atomic_add(count, &mdev->ap_bio_cnt); spin_unlock_irq(&mdev->req_lock); } @@ -2251,7 +2335,8 @@ static inline void drbd_md_flush(struct drbd_conf *mdev) if (test_bit(MD_NO_BARRIER, &mdev->flags)) return; - r = blkdev_issue_flush(mdev->ldev->md_bdev, NULL); + r = blkdev_issue_flush(mdev->ldev->md_bdev, GFP_KERNEL, NULL, + BLKDEV_IFL_WAIT); if (r) { set_bit(MD_NO_BARRIER, &mdev->flags); dev_err(DEV, "meta data flush failed with status %d, disabling md-flushes\n", r); diff --git a/drivers/block/drbd/drbd_main.c b/drivers/block/drbd/drbd_main.c index 93d1f9b469d4..6b077f93acc6 100644 --- a/drivers/block/drbd/drbd_main.c +++ b/drivers/block/drbd/drbd_main.c @@ -684,6 +684,9 @@ static int is_valid_state(struct drbd_conf *mdev, union drbd_state ns) else if (ns.conn > C_CONNECTED && ns.pdsk < D_INCONSISTENT) rv = SS_NO_REMOTE_DISK; + else if (ns.conn > C_CONNECTED && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE) + rv = SS_NO_UP_TO_DATE_DISK; + else if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S || ns.conn == C_SYNC_SOURCE || @@ -840,7 +843,12 @@ static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state break; case C_WF_BITMAP_S: case C_PAUSED_SYNC_S: - ns.pdsk = D_OUTDATED; + /* remap any consistent state to D_OUTDATED, + * but disallow "upgrade" of not even consistent states. + */ + ns.pdsk = + (D_DISKLESS < os.pdsk && os.pdsk < D_OUTDATED) + ? os.pdsk : D_OUTDATED; break; case C_SYNC_SOURCE: ns.pdsk = D_INCONSISTENT; @@ -1205,8 +1213,6 @@ static void after_state_ch(struct drbd_conf *mdev, union drbd_state os, && (ns.pdsk < D_INCONSISTENT || ns.pdsk == D_UNKNOWN || ns.pdsk == D_OUTDATED)) { - kfree(mdev->p_uuid); - mdev->p_uuid = NULL; if (get_ldev(mdev)) { if ((ns.role == R_PRIMARY || ns.peer == R_PRIMARY) && mdev->ldev->md.uuid[UI_BITMAP] == 0 && ns.disk >= D_UP_TO_DATE) { @@ -1232,7 +1238,7 @@ static void after_state_ch(struct drbd_conf *mdev, union drbd_state os, os.disk == D_ATTACHING && ns.disk == D_NEGOTIATING) { kfree(mdev->p_uuid); /* We expect to receive up-to-date UUIDs soon. */ mdev->p_uuid = NULL; /* ...to not use the old ones in the mean time */ - drbd_send_sizes(mdev, 0); /* to start sync... */ + drbd_send_sizes(mdev, 0, 0); /* to start sync... */ drbd_send_uuids(mdev); drbd_send_state(mdev); } @@ -1755,7 +1761,7 @@ int drbd_send_sync_uuid(struct drbd_conf *mdev, u64 val) (struct p_header *)&p, sizeof(p)); } -int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply) +int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags) { struct p_sizes p; sector_t d_size, u_size; @@ -1767,7 +1773,6 @@ int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply) d_size = drbd_get_max_capacity(mdev->ldev); u_size = mdev->ldev->dc.disk_size; q_order_type = drbd_queue_order_type(mdev); - p.queue_order_type = cpu_to_be32(drbd_queue_order_type(mdev)); put_ldev(mdev); } else { d_size = 0; @@ -1779,7 +1784,8 @@ int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply) p.u_size = cpu_to_be64(u_size); p.c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev)); p.max_segment_size = cpu_to_be32(queue_max_segment_size(mdev->rq_queue)); - p.queue_order_type = cpu_to_be32(q_order_type); + p.queue_order_type = cpu_to_be16(q_order_type); + p.dds_flags = cpu_to_be16(flags); ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SIZES, (struct p_header *)&p, sizeof(p)); @@ -2180,6 +2186,43 @@ int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size) return ok; } +static int drbd_send_delay_probe(struct drbd_conf *mdev, struct drbd_socket *ds) +{ + struct p_delay_probe dp; + int offset, ok = 0; + struct timeval now; + + mutex_lock(&ds->mutex); + if (likely(ds->socket)) { + do_gettimeofday(&now); + offset = now.tv_usec - mdev->dps_time.tv_usec + + (now.tv_sec - mdev->dps_time.tv_sec) * 1000000; + dp.seq_num = cpu_to_be32(mdev->delay_seq); + dp.offset = cpu_to_be32(offset); + + ok = _drbd_send_cmd(mdev, ds->socket, P_DELAY_PROBE, + (struct p_header *)&dp, sizeof(dp), 0); + } + mutex_unlock(&ds->mutex); + + return ok; +} + +static int drbd_send_delay_probes(struct drbd_conf *mdev) +{ + int ok; + + mdev->delay_seq++; + do_gettimeofday(&mdev->dps_time); + ok = drbd_send_delay_probe(mdev, &mdev->meta); + ok = ok && drbd_send_delay_probe(mdev, &mdev->data); + + mdev->dp_volume_last = mdev->send_cnt; + mod_timer(&mdev->delay_probe_timer, jiffies + mdev->sync_conf.dp_interval * HZ / 10); + + return ok; +} + /* called on sndtimeo * returns FALSE if we should retry, * TRUE if we think connection is dead @@ -2229,9 +2272,9 @@ static int we_should_drop_the_connection(struct drbd_conf *mdev, struct socket * * with page_count == 0 or PageSlab. */ static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page, - int offset, size_t size) + int offset, size_t size, unsigned msg_flags) { - int sent = drbd_send(mdev, mdev->data.socket, kmap(page) + offset, size, 0); + int sent = drbd_send(mdev, mdev->data.socket, kmap(page) + offset, size, msg_flags); kunmap(page); if (sent == size) mdev->send_cnt += size>>9; @@ -2239,7 +2282,7 @@ static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page, } static int _drbd_send_page(struct drbd_conf *mdev, struct page *page, - int offset, size_t size) + int offset, size_t size, unsigned msg_flags) { mm_segment_t oldfs = get_fs(); int sent, ok; @@ -2252,14 +2295,15 @@ static int _drbd_send_page(struct drbd_conf *mdev, struct page *page, * __page_cache_release a page that would actually still be referenced * by someone, leading to some obscure delayed Oops somewhere else. */ if (disable_sendpage || (page_count(page) < 1) || PageSlab(page)) - return _drbd_no_send_page(mdev, page, offset, size); + return _drbd_no_send_page(mdev, page, offset, size, msg_flags); + msg_flags |= MSG_NOSIGNAL; drbd_update_congested(mdev); set_fs(KERNEL_DS); do { sent = mdev->data.socket->ops->sendpage(mdev->data.socket, page, offset, len, - MSG_NOSIGNAL); + msg_flags); if (sent == -EAGAIN) { if (we_should_drop_the_connection(mdev, mdev->data.socket)) @@ -2288,9 +2332,11 @@ static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio) { struct bio_vec *bvec; int i; + /* hint all but last page with MSG_MORE */ __bio_for_each_segment(bvec, bio, i, 0) { if (!_drbd_no_send_page(mdev, bvec->bv_page, - bvec->bv_offset, bvec->bv_len)) + bvec->bv_offset, bvec->bv_len, + i == bio->bi_vcnt -1 ? 0 : MSG_MORE)) return 0; } return 1; @@ -2300,15 +2346,56 @@ static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio) { struct bio_vec *bvec; int i; + /* hint all but last page with MSG_MORE */ __bio_for_each_segment(bvec, bio, i, 0) { if (!_drbd_send_page(mdev, bvec->bv_page, - bvec->bv_offset, bvec->bv_len)) + bvec->bv_offset, bvec->bv_len, + i == bio->bi_vcnt -1 ? 0 : MSG_MORE)) + return 0; + } + return 1; +} + +static int _drbd_send_zc_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e) +{ + struct page *page = e->pages; + unsigned len = e->size; + /* hint all but last page with MSG_MORE */ + page_chain_for_each(page) { + unsigned l = min_t(unsigned, len, PAGE_SIZE); + if (!_drbd_send_page(mdev, page, 0, l, + page_chain_next(page) ? MSG_MORE : 0)) return 0; + len -= l; } + return 1; +} + +static void consider_delay_probes(struct drbd_conf *mdev) +{ + if (mdev->state.conn != C_SYNC_SOURCE || mdev->agreed_pro_version < 93) + return; + + if (mdev->dp_volume_last + mdev->sync_conf.dp_volume * 2 < mdev->send_cnt) + drbd_send_delay_probes(mdev); +} + +static int w_delay_probes(struct drbd_conf *mdev, struct drbd_work *w, int cancel) +{ + if (!cancel && mdev->state.conn == C_SYNC_SOURCE) + drbd_send_delay_probes(mdev); return 1; } +static void delay_probe_timer_fn(unsigned long data) +{ + struct drbd_conf *mdev = (struct drbd_conf *) data; + + if (list_empty(&mdev->delay_probe_work.list)) + drbd_queue_work(&mdev->data.work, &mdev->delay_probe_work); +} + /* Used to send write requests * R_PRIMARY -> Peer (P_DATA) */ @@ -2357,11 +2444,11 @@ int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req) p.dp_flags = cpu_to_be32(dp_flags); set_bit(UNPLUG_REMOTE, &mdev->flags); ok = (sizeof(p) == - drbd_send(mdev, mdev->data.socket, &p, sizeof(p), MSG_MORE)); + drbd_send(mdev, mdev->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0)); if (ok && dgs) { dgb = mdev->int_dig_out; - drbd_csum(mdev, mdev->integrity_w_tfm, req->master_bio, dgb); - ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, MSG_MORE); + drbd_csum_bio(mdev, mdev->integrity_w_tfm, req->master_bio, dgb); + ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, 0); } if (ok) { if (mdev->net_conf->wire_protocol == DRBD_PROT_A) @@ -2371,6 +2458,10 @@ int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req) } drbd_put_data_sock(mdev); + + if (ok) + consider_delay_probes(mdev); + return ok; } @@ -2406,16 +2497,20 @@ int drbd_send_block(struct drbd_conf *mdev, enum drbd_packets cmd, return 0; ok = sizeof(p) == drbd_send(mdev, mdev->data.socket, &p, - sizeof(p), MSG_MORE); + sizeof(p), dgs ? MSG_MORE : 0); if (ok && dgs) { dgb = mdev->int_dig_out; - drbd_csum(mdev, mdev->integrity_w_tfm, e->private_bio, dgb); - ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, MSG_MORE); + drbd_csum_ee(mdev, mdev->integrity_w_tfm, e, dgb); + ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, 0); } if (ok) - ok = _drbd_send_zc_bio(mdev, e->private_bio); + ok = _drbd_send_zc_ee(mdev, e); drbd_put_data_sock(mdev); + + if (ok) + consider_delay_probes(mdev); + return ok; } @@ -2628,16 +2723,24 @@ void drbd_init_set_defaults(struct drbd_conf *mdev) INIT_LIST_HEAD(&mdev->unplug_work.list); INIT_LIST_HEAD(&mdev->md_sync_work.list); INIT_LIST_HEAD(&mdev->bm_io_work.w.list); + INIT_LIST_HEAD(&mdev->delay_probes); + INIT_LIST_HEAD(&mdev->delay_probe_work.list); + mdev->resync_work.cb = w_resync_inactive; mdev->unplug_work.cb = w_send_write_hint; mdev->md_sync_work.cb = w_md_sync; mdev->bm_io_work.w.cb = w_bitmap_io; + mdev->delay_probe_work.cb = w_delay_probes; init_timer(&mdev->resync_timer); init_timer(&mdev->md_sync_timer); + init_timer(&mdev->delay_probe_timer); mdev->resync_timer.function = resync_timer_fn; mdev->resync_timer.data = (unsigned long) mdev; mdev->md_sync_timer.function = md_sync_timer_fn; mdev->md_sync_timer.data = (unsigned long) mdev; + mdev->delay_probe_timer.function = delay_probe_timer_fn; + mdev->delay_probe_timer.data = (unsigned long) mdev; + init_waitqueue_head(&mdev->misc_wait); init_waitqueue_head(&mdev->state_wait); @@ -2680,7 +2783,7 @@ void drbd_mdev_cleanup(struct drbd_conf *mdev) drbd_set_my_capacity(mdev, 0); if (mdev->bitmap) { /* maybe never allocated. */ - drbd_bm_resize(mdev, 0); + drbd_bm_resize(mdev, 0, 1); drbd_bm_cleanup(mdev); } @@ -3129,7 +3232,7 @@ int __init drbd_init(void) if (err) goto Enomem; - drbd_proc = proc_create("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops); + drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL); if (!drbd_proc) { printk(KERN_ERR "drbd: unable to register proc file\n"); goto Enomem; @@ -3660,7 +3763,8 @@ _drbd_fault_str(unsigned int type) { [DRBD_FAULT_DT_RD] = "Data read", [DRBD_FAULT_DT_RA] = "Data read ahead", [DRBD_FAULT_BM_ALLOC] = "BM allocation", - [DRBD_FAULT_AL_EE] = "EE allocation" + [DRBD_FAULT_AL_EE] = "EE allocation", + [DRBD_FAULT_RECEIVE] = "receive data corruption", }; return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**"; @@ -3679,7 +3783,7 @@ _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type) if (ret) { fault_count++; - if (printk_ratelimit()) + if (__ratelimit(&drbd_ratelimit_state)) dev_warn(DEV, "***Simulating %s failure\n", _drbd_fault_str(type)); } diff --git a/drivers/block/drbd/drbd_nl.c b/drivers/block/drbd/drbd_nl.c index 6429d2b19e06..632e3245d1bb 100644 --- a/drivers/block/drbd/drbd_nl.c +++ b/drivers/block/drbd/drbd_nl.c @@ -510,7 +510,7 @@ void drbd_resume_io(struct drbd_conf *mdev) * Returns 0 on success, negative return values indicate errors. * You should call drbd_md_sync() after calling this function. */ -enum determine_dev_size drbd_determin_dev_size(struct drbd_conf *mdev, int force) __must_hold(local) +enum determine_dev_size drbd_determin_dev_size(struct drbd_conf *mdev, enum dds_flags flags) __must_hold(local) { sector_t prev_first_sect, prev_size; /* previous meta location */ sector_t la_size; @@ -541,12 +541,12 @@ enum determine_dev_size drbd_determin_dev_size(struct drbd_conf *mdev, int force /* TODO: should only be some assert here, not (re)init... */ drbd_md_set_sector_offsets(mdev, mdev->ldev); - size = drbd_new_dev_size(mdev, mdev->ldev, force); + size = drbd_new_dev_size(mdev, mdev->ldev, flags & DDSF_FORCED); if (drbd_get_capacity(mdev->this_bdev) != size || drbd_bm_capacity(mdev) != size) { int err; - err = drbd_bm_resize(mdev, size); + err = drbd_bm_resize(mdev, size, !(flags & DDSF_NO_RESYNC)); if (unlikely(err)) { /* currently there is only one error: ENOMEM! */ size = drbd_bm_capacity(mdev)>>1; @@ -704,9 +704,6 @@ void drbd_setup_queue_param(struct drbd_conf *mdev, unsigned int max_seg_s) __mu struct request_queue * const b = mdev->ldev->backing_bdev->bd_disk->queue; int max_segments = mdev->ldev->dc.max_bio_bvecs; - if (b->merge_bvec_fn && !mdev->ldev->dc.use_bmbv) - max_seg_s = PAGE_SIZE; - max_seg_s = min(queue_max_sectors(b) * queue_logical_block_size(b), max_seg_s); blk_queue_max_hw_sectors(q, max_seg_s >> 9); @@ -1199,13 +1196,12 @@ static int drbd_nl_net_conf(struct drbd_conf *mdev, struct drbd_nl_cfg_req *nlp, } /* allocation not in the IO path, cqueue thread context */ - new_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL); + new_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL); if (!new_conf) { retcode = ERR_NOMEM; goto fail; } - memset(new_conf, 0, sizeof(struct net_conf)); new_conf->timeout = DRBD_TIMEOUT_DEF; new_conf->try_connect_int = DRBD_CONNECT_INT_DEF; new_conf->ping_int = DRBD_PING_INT_DEF; @@ -1477,8 +1473,8 @@ static int drbd_nl_resize(struct drbd_conf *mdev, struct drbd_nl_cfg_req *nlp, { struct resize rs; int retcode = NO_ERROR; - int ldsc = 0; /* local disk size changed */ enum determine_dev_size dd; + enum dds_flags ddsf; memset(&rs, 0, sizeof(struct resize)); if (!resize_from_tags(mdev, nlp->tag_list, &rs)) { @@ -1502,13 +1498,17 @@ static int drbd_nl_resize(struct drbd_conf *mdev, struct drbd_nl_cfg_req *nlp, goto fail; } - if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) { - mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev); - ldsc = 1; + if (rs.no_resync && mdev->agreed_pro_version < 93) { + retcode = ERR_NEED_APV_93; + goto fail; } + if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) + mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev); + mdev->ldev->dc.disk_size = (sector_t)rs.resize_size; - dd = drbd_determin_dev_size(mdev, rs.resize_force); + ddsf = (rs.resize_force ? DDSF_FORCED : 0) | (rs.no_resync ? DDSF_NO_RESYNC : 0); + dd = drbd_determin_dev_size(mdev, ddsf); drbd_md_sync(mdev); put_ldev(mdev); if (dd == dev_size_error) { @@ -1516,12 +1516,12 @@ static int drbd_nl_resize(struct drbd_conf *mdev, struct drbd_nl_cfg_req *nlp, goto fail; } - if (mdev->state.conn == C_CONNECTED && (dd != unchanged || ldsc)) { + if (mdev->state.conn == C_CONNECTED) { if (dd == grew) set_bit(RESIZE_PENDING, &mdev->flags); drbd_send_uuids(mdev); - drbd_send_sizes(mdev, 1); + drbd_send_sizes(mdev, 1, ddsf); } fail: @@ -1551,6 +1551,10 @@ static int drbd_nl_syncer_conf(struct drbd_conf *mdev, struct drbd_nl_cfg_req *n sc.rate = DRBD_RATE_DEF; sc.after = DRBD_AFTER_DEF; sc.al_extents = DRBD_AL_EXTENTS_DEF; + sc.dp_volume = DRBD_DP_VOLUME_DEF; + sc.dp_interval = DRBD_DP_INTERVAL_DEF; + sc.throttle_th = DRBD_RS_THROTTLE_TH_DEF; + sc.hold_off_th = DRBD_RS_HOLD_OFF_TH_DEF; } else memcpy(&sc, &mdev->sync_conf, sizeof(struct syncer_conf)); @@ -2207,9 +2211,9 @@ void drbd_bcast_ee(struct drbd_conf *mdev, { struct cn_msg *cn_reply; struct drbd_nl_cfg_reply *reply; - struct bio_vec *bvec; unsigned short *tl; - int i; + struct page *page; + unsigned len; if (!e) return; @@ -2247,11 +2251,15 @@ void drbd_bcast_ee(struct drbd_conf *mdev, put_unaligned(T_ee_data, tl++); put_unaligned(e->size, tl++); - __bio_for_each_segment(bvec, e->private_bio, i, 0) { - void *d = kmap(bvec->bv_page); - memcpy(tl, d + bvec->bv_offset, bvec->bv_len); - kunmap(bvec->bv_page); - tl=(unsigned short*)((char*)tl + bvec->bv_len); + len = e->size; + page = e->pages; + page_chain_for_each(page) { + void *d = kmap_atomic(page, KM_USER0); + unsigned l = min_t(unsigned, len, PAGE_SIZE); + memcpy(tl, d, l); + kunmap_atomic(d, KM_USER0); + tl = (unsigned short*)((char*)tl + l); + len -= l; } put_unaligned(TT_END, tl++); /* Close the tag list */ diff --git a/drivers/block/drbd/drbd_proc.c b/drivers/block/drbd/drbd_proc.c index be3374b68460..d0f1767ea4c3 100644 --- a/drivers/block/drbd/drbd_proc.c +++ b/drivers/block/drbd/drbd_proc.c @@ -73,14 +73,21 @@ static void drbd_syncer_progress(struct drbd_conf *mdev, struct seq_file *seq) seq_printf(seq, "sync'ed:%3u.%u%% ", res / 10, res % 10); /* if more than 1 GB display in MB */ if (mdev->rs_total > 0x100000L) - seq_printf(seq, "(%lu/%lu)M\n\t", + seq_printf(seq, "(%lu/%lu)M", (unsigned long) Bit2KB(rs_left >> 10), (unsigned long) Bit2KB(mdev->rs_total >> 10)); else - seq_printf(seq, "(%lu/%lu)K\n\t", + seq_printf(seq, "(%lu/%lu)K", (unsigned long) Bit2KB(rs_left), (unsigned long) Bit2KB(mdev->rs_total)); + if (mdev->state.conn == C_SYNC_TARGET) + seq_printf(seq, " queue_delay: %d.%d ms\n\t", + mdev->data_delay / 1000, + (mdev->data_delay % 1000) / 100); + else if (mdev->state.conn == C_SYNC_SOURCE) + seq_printf(seq, " delay_probe: %u\n\t", mdev->delay_seq); + /* see drivers/md/md.c * We do not want to overflow, so the order of operands and * the * 100 / 100 trick are important. We do a +1 to be @@ -128,6 +135,14 @@ static void drbd_syncer_progress(struct drbd_conf *mdev, struct seq_file *seq) else seq_printf(seq, " (%ld)", dbdt); + if (mdev->state.conn == C_SYNC_TARGET) { + if (mdev->c_sync_rate > 1000) + seq_printf(seq, " want: %d,%03d", + mdev->c_sync_rate / 1000, mdev->c_sync_rate % 1000); + else + seq_printf(seq, " want: %d", mdev->c_sync_rate); + } + seq_printf(seq, " K/sec\n"); } diff --git a/drivers/block/drbd/drbd_receiver.c b/drivers/block/drbd/drbd_receiver.c index 3f096e7959b4..dff48701b84d 100644 --- a/drivers/block/drbd/drbd_receiver.c +++ b/drivers/block/drbd/drbd_receiver.c @@ -42,7 +42,6 @@ #include <linux/unistd.h> #include <linux/vmalloc.h> #include <linux/random.h> -#include <linux/mm.h> #include <linux/string.h> #include <linux/scatterlist.h> #include "drbd_int.h" @@ -80,30 +79,128 @@ static struct drbd_epoch *previous_epoch(struct drbd_conf *mdev, struct drbd_epo #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN) -static struct page *drbd_pp_first_page_or_try_alloc(struct drbd_conf *mdev) +/* + * some helper functions to deal with single linked page lists, + * page->private being our "next" pointer. + */ + +/* If at least n pages are linked at head, get n pages off. + * Otherwise, don't modify head, and return NULL. + * Locking is the responsibility of the caller. + */ +static struct page *page_chain_del(struct page **head, int n) +{ + struct page *page; + struct page *tmp; + + BUG_ON(!n); + BUG_ON(!head); + + page = *head; + + if (!page) + return NULL; + + while (page) { + tmp = page_chain_next(page); + if (--n == 0) + break; /* found sufficient pages */ + if (tmp == NULL) + /* insufficient pages, don't use any of them. */ + return NULL; + page = tmp; + } + + /* add end of list marker for the returned list */ + set_page_private(page, 0); + /* actual return value, and adjustment of head */ + page = *head; + *head = tmp; + return page; +} + +/* may be used outside of locks to find the tail of a (usually short) + * "private" page chain, before adding it back to a global chain head + * with page_chain_add() under a spinlock. */ +static struct page *page_chain_tail(struct page *page, int *len) +{ + struct page *tmp; + int i = 1; + while ((tmp = page_chain_next(page))) + ++i, page = tmp; + if (len) + *len = i; + return page; +} + +static int page_chain_free(struct page *page) +{ + struct page *tmp; + int i = 0; + page_chain_for_each_safe(page, tmp) { + put_page(page); + ++i; + } + return i; +} + +static void page_chain_add(struct page **head, + struct page *chain_first, struct page *chain_last) +{ +#if 1 + struct page *tmp; + tmp = page_chain_tail(chain_first, NULL); + BUG_ON(tmp != chain_last); +#endif + + /* add chain to head */ + set_page_private(chain_last, (unsigned long)*head); + *head = chain_first; +} + +static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number) { struct page *page = NULL; + struct page *tmp = NULL; + int i = 0; /* Yes, testing drbd_pp_vacant outside the lock is racy. * So what. It saves a spin_lock. */ - if (drbd_pp_vacant > 0) { + if (drbd_pp_vacant >= number) { spin_lock(&drbd_pp_lock); - page = drbd_pp_pool; - if (page) { - drbd_pp_pool = (struct page *)page_private(page); - set_page_private(page, 0); /* just to be polite */ - drbd_pp_vacant--; - } + page = page_chain_del(&drbd_pp_pool, number); + if (page) + drbd_pp_vacant -= number; spin_unlock(&drbd_pp_lock); + if (page) + return page; } + /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD * "criss-cross" setup, that might cause write-out on some other DRBD, * which in turn might block on the other node at this very place. */ - if (!page) - page = alloc_page(GFP_TRY); - if (page) - atomic_inc(&mdev->pp_in_use); - return page; + for (i = 0; i < number; i++) { + tmp = alloc_page(GFP_TRY); + if (!tmp) + break; + set_page_private(tmp, (unsigned long)page); + page = tmp; + } + + if (i == number) + return page; + + /* Not enough pages immediately available this time. + * No need to jump around here, drbd_pp_alloc will retry this + * function "soon". */ + if (page) { + tmp = page_chain_tail(page, NULL); + spin_lock(&drbd_pp_lock); + page_chain_add(&drbd_pp_pool, page, tmp); + drbd_pp_vacant += i; + spin_unlock(&drbd_pp_lock); + } + return NULL; } /* kick lower level device, if we have more than (arbitrary number) @@ -127,7 +224,7 @@ static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed list_for_each_safe(le, tle, &mdev->net_ee) { e = list_entry(le, struct drbd_epoch_entry, w.list); - if (drbd_bio_has_active_page(e->private_bio)) + if (drbd_ee_has_active_page(e)) break; list_move(le, to_be_freed); } @@ -148,32 +245,34 @@ static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev) } /** - * drbd_pp_alloc() - Returns a page, fails only if a signal comes in + * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled) * @mdev: DRBD device. - * @retry: whether or not to retry allocation forever (or until signalled) + * @number: number of pages requested + * @retry: whether to retry, if not enough pages are available right now + * + * Tries to allocate number pages, first from our own page pool, then from + * the kernel, unless this allocation would exceed the max_buffers setting. + * Possibly retry until DRBD frees sufficient pages somewhere else. * - * Tries to allocate a page, first from our own page pool, then from the - * kernel, unless this allocation would exceed the max_buffers setting. - * If @retry is non-zero, retry until DRBD frees a page somewhere else. + * Returns a page chain linked via page->private. */ -static struct page *drbd_pp_alloc(struct drbd_conf *mdev, int retry) +static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry) { struct page *page = NULL; DEFINE_WAIT(wait); - if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) { - page = drbd_pp_first_page_or_try_alloc(mdev); - if (page) - return page; - } + /* Yes, we may run up to @number over max_buffers. If we + * follow it strictly, the admin will get it wrong anyways. */ + if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) + page = drbd_pp_first_pages_or_try_alloc(mdev, number); - for (;;) { + while (page == NULL) { prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE); drbd_kick_lo_and_reclaim_net(mdev); if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) { - page = drbd_pp_first_page_or_try_alloc(mdev); + page = drbd_pp_first_pages_or_try_alloc(mdev, number); if (page) break; } @@ -190,62 +289,32 @@ static struct page *drbd_pp_alloc(struct drbd_conf *mdev, int retry) } finish_wait(&drbd_pp_wait, &wait); + if (page) + atomic_add(number, &mdev->pp_in_use); return page; } /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc. - * Is also used from inside an other spin_lock_irq(&mdev->req_lock) */ + * Is also used from inside an other spin_lock_irq(&mdev->req_lock); + * Either links the page chain back to the global pool, + * or returns all pages to the system. */ static void drbd_pp_free(struct drbd_conf *mdev, struct page *page) { - int free_it; - - spin_lock(&drbd_pp_lock); - if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) { - free_it = 1; - } else { - set_page_private(page, (unsigned long)drbd_pp_pool); - drbd_pp_pool = page; - drbd_pp_vacant++; - free_it = 0; - } - spin_unlock(&drbd_pp_lock); - - atomic_dec(&mdev->pp_in_use); - - if (free_it) - __free_page(page); - - wake_up(&drbd_pp_wait); -} - -static void drbd_pp_free_bio_pages(struct drbd_conf *mdev, struct bio *bio) -{ - struct page *p_to_be_freed = NULL; - struct page *page; - struct bio_vec *bvec; int i; - - spin_lock(&drbd_pp_lock); - __bio_for_each_segment(bvec, bio, i, 0) { - if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) { - set_page_private(bvec->bv_page, (unsigned long)p_to_be_freed); - p_to_be_freed = bvec->bv_page; - } else { - set_page_private(bvec->bv_page, (unsigned long)drbd_pp_pool); - drbd_pp_pool = bvec->bv_page; - drbd_pp_vacant++; - } - } - spin_unlock(&drbd_pp_lock); - atomic_sub(bio->bi_vcnt, &mdev->pp_in_use); - - while (p_to_be_freed) { - page = p_to_be_freed; - p_to_be_freed = (struct page *)page_private(page); - set_page_private(page, 0); /* just to be polite */ - put_page(page); + if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) + i = page_chain_free(page); + else { + struct page *tmp; + tmp = page_chain_tail(page, &i); + spin_lock(&drbd_pp_lock); + page_chain_add(&drbd_pp_pool, page, tmp); + drbd_pp_vacant += i; + spin_unlock(&drbd_pp_lock); } - + atomic_sub(i, &mdev->pp_in_use); + i = atomic_read(&mdev->pp_in_use); + if (i < 0) + dev_warn(DEV, "ASSERTION FAILED: pp_in_use: %d < 0\n", i); wake_up(&drbd_pp_wait); } @@ -270,11 +339,9 @@ struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev, unsigned int data_size, gfp_t gfp_mask) __must_hold(local) { - struct request_queue *q; struct drbd_epoch_entry *e; struct page *page; - struct bio *bio; - unsigned int ds; + unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT; if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE)) return NULL; @@ -286,84 +353,32 @@ struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev, return NULL; } - bio = bio_alloc(gfp_mask & ~__GFP_HIGHMEM, div_ceil(data_size, PAGE_SIZE)); - if (!bio) { - if (!(gfp_mask & __GFP_NOWARN)) - dev_err(DEV, "alloc_ee: Allocation of a bio failed\n"); - goto fail1; - } - - bio->bi_bdev = mdev->ldev->backing_bdev; - bio->bi_sector = sector; - - ds = data_size; - while (ds) { - page = drbd_pp_alloc(mdev, (gfp_mask & __GFP_WAIT)); - if (!page) { - if (!(gfp_mask & __GFP_NOWARN)) - dev_err(DEV, "alloc_ee: Allocation of a page failed\n"); - goto fail2; - } - if (!bio_add_page(bio, page, min_t(int, ds, PAGE_SIZE), 0)) { - drbd_pp_free(mdev, page); - dev_err(DEV, "alloc_ee: bio_add_page(s=%llu," - "data_size=%u,ds=%u) failed\n", - (unsigned long long)sector, data_size, ds); - - q = bdev_get_queue(bio->bi_bdev); - if (q->merge_bvec_fn) { - struct bvec_merge_data bvm = { - .bi_bdev = bio->bi_bdev, - .bi_sector = bio->bi_sector, - .bi_size = bio->bi_size, - .bi_rw = bio->bi_rw, - }; - int l = q->merge_bvec_fn(q, &bvm, - &bio->bi_io_vec[bio->bi_vcnt]); - dev_err(DEV, "merge_bvec_fn() = %d\n", l); - } - - /* dump more of the bio. */ - dev_err(DEV, "bio->bi_max_vecs = %d\n", bio->bi_max_vecs); - dev_err(DEV, "bio->bi_vcnt = %d\n", bio->bi_vcnt); - dev_err(DEV, "bio->bi_size = %d\n", bio->bi_size); - dev_err(DEV, "bio->bi_phys_segments = %d\n", bio->bi_phys_segments); - - goto fail2; - break; - } - ds -= min_t(int, ds, PAGE_SIZE); - } - - D_ASSERT(data_size == bio->bi_size); - - bio->bi_private = e; - e->mdev = mdev; - e->sector = sector; - e->size = bio->bi_size; + page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT)); + if (!page) + goto fail; - e->private_bio = bio; - e->block_id = id; INIT_HLIST_NODE(&e->colision); e->epoch = NULL; + e->mdev = mdev; + e->pages = page; + atomic_set(&e->pending_bios, 0); + e->size = data_size; e->flags = 0; + e->sector = sector; + e->sector = sector; + e->block_id = id; return e; - fail2: - drbd_pp_free_bio_pages(mdev, bio); - bio_put(bio); - fail1: + fail: mempool_free(e, drbd_ee_mempool); - return NULL; } void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e) { - struct bio *bio = e->private_bio; - drbd_pp_free_bio_pages(mdev, bio); - bio_put(bio); + drbd_pp_free(mdev, e->pages); + D_ASSERT(atomic_read(&e->pending_bios) == 0); D_ASSERT(hlist_unhashed(&e->colision)); mempool_free(e, drbd_ee_mempool); } @@ -555,6 +570,25 @@ static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size) return rv; } +/* quoting tcp(7): + * On individual connections, the socket buffer size must be set prior to the + * listen(2) or connect(2) calls in order to have it take effect. + * This is our wrapper to do so. + */ +static void drbd_setbufsize(struct socket *sock, unsigned int snd, + unsigned int rcv) +{ + /* open coded SO_SNDBUF, SO_RCVBUF */ + if (snd) { + sock->sk->sk_sndbuf = snd; + sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK; + } + if (rcv) { + sock->sk->sk_rcvbuf = rcv; + sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK; + } +} + static struct socket *drbd_try_connect(struct drbd_conf *mdev) { const char *what; @@ -576,6 +610,8 @@ static struct socket *drbd_try_connect(struct drbd_conf *mdev) sock->sk->sk_rcvtimeo = sock->sk->sk_sndtimeo = mdev->net_conf->try_connect_int*HZ; + drbd_setbufsize(sock, mdev->net_conf->sndbuf_size, + mdev->net_conf->rcvbuf_size); /* explicitly bind to the configured IP as source IP * for the outgoing connections. @@ -654,6 +690,8 @@ static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev) s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */ s_listen->sk->sk_rcvtimeo = timeo; s_listen->sk->sk_sndtimeo = timeo; + drbd_setbufsize(s_listen, mdev->net_conf->sndbuf_size, + mdev->net_conf->rcvbuf_size); what = "bind before listen"; err = s_listen->ops->bind(s_listen, @@ -840,16 +878,6 @@ retry: sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK; msock->sk->sk_priority = TC_PRIO_INTERACTIVE; - if (mdev->net_conf->sndbuf_size) { - sock->sk->sk_sndbuf = mdev->net_conf->sndbuf_size; - sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK; - } - - if (mdev->net_conf->rcvbuf_size) { - sock->sk->sk_rcvbuf = mdev->net_conf->rcvbuf_size; - sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK; - } - /* NOT YET ... * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10; * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT; @@ -902,7 +930,7 @@ retry: if (!drbd_send_protocol(mdev)) return -1; drbd_send_sync_param(mdev, &mdev->sync_conf); - drbd_send_sizes(mdev, 0); + drbd_send_sizes(mdev, 0, 0); drbd_send_uuids(mdev); drbd_send_state(mdev); clear_bit(USE_DEGR_WFC_T, &mdev->flags); @@ -946,7 +974,8 @@ static enum finish_epoch drbd_flush_after_epoch(struct drbd_conf *mdev, struct d int rv; if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) { - rv = blkdev_issue_flush(mdev->ldev->backing_bdev, NULL); + rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL, + NULL, BLKDEV_IFL_WAIT); if (rv) { dev_err(DEV, "local disk flush failed with status %d\n", rv); /* would rather check on EOPNOTSUPP, but that is not reliable. @@ -1120,6 +1149,90 @@ void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) } /** + * drbd_submit_ee() + * @mdev: DRBD device. + * @e: epoch entry + * @rw: flag field, see bio->bi_rw + */ +/* TODO allocate from our own bio_set. */ +int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e, + const unsigned rw, const int fault_type) +{ + struct bio *bios = NULL; + struct bio *bio; + struct page *page = e->pages; + sector_t sector = e->sector; + unsigned ds = e->size; + unsigned n_bios = 0; + unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT; + + /* In most cases, we will only need one bio. But in case the lower + * level restrictions happen to be different at this offset on this + * side than those of the sending peer, we may need to submit the + * request in more than one bio. */ +next_bio: + bio = bio_alloc(GFP_NOIO, nr_pages); + if (!bio) { + dev_err(DEV, "submit_ee: Allocation of a bio failed\n"); + goto fail; + } + /* > e->sector, unless this is the first bio */ + bio->bi_sector = sector; + bio->bi_bdev = mdev->ldev->backing_bdev; + /* we special case some flags in the multi-bio case, see below + * (BIO_RW_UNPLUG, BIO_RW_BARRIER) */ + bio->bi_rw = rw; + bio->bi_private = e; + bio->bi_end_io = drbd_endio_sec; + + bio->bi_next = bios; + bios = bio; + ++n_bios; + + page_chain_for_each(page) { + unsigned len = min_t(unsigned, ds, PAGE_SIZE); + if (!bio_add_page(bio, page, len, 0)) { + /* a single page must always be possible! */ + BUG_ON(bio->bi_vcnt == 0); + goto next_bio; + } + ds -= len; + sector += len >> 9; + --nr_pages; + } + D_ASSERT(page == NULL); + D_ASSERT(ds == 0); + + atomic_set(&e->pending_bios, n_bios); + do { + bio = bios; + bios = bios->bi_next; + bio->bi_next = NULL; + + /* strip off BIO_RW_UNPLUG unless it is the last bio */ + if (bios) + bio->bi_rw &= ~(1<<BIO_RW_UNPLUG); + + drbd_generic_make_request(mdev, fault_type, bio); + + /* strip off BIO_RW_BARRIER, + * unless it is the first or last bio */ + if (bios && bios->bi_next) + bios->bi_rw &= ~(1<<BIO_RW_BARRIER); + } while (bios); + maybe_kick_lo(mdev); + return 0; + +fail: + while (bios) { + bio = bios; + bios = bios->bi_next; + bio_put(bio); + } + return -ENOMEM; +} + +/** * w_e_reissue() - Worker callback; Resubmit a bio, without BIO_RW_BARRIER set * @mdev: DRBD device. * @w: work object. @@ -1128,8 +1241,6 @@ void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local) { struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w; - struct bio *bio = e->private_bio; - /* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place, (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch) so that we can finish that epoch in drbd_may_finish_epoch(). @@ -1143,33 +1254,17 @@ int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __relea if (previous_epoch(mdev, e->epoch)) dev_warn(DEV, "Write ordering was not enforced (one time event)\n"); - /* prepare bio for re-submit, - * re-init volatile members */ /* we still have a local reference, * get_ldev was done in receive_Data. */ - bio->bi_bdev = mdev->ldev->backing_bdev; - bio->bi_sector = e->sector; - bio->bi_size = e->size; - bio->bi_idx = 0; - - bio->bi_flags &= ~(BIO_POOL_MASK - 1); - bio->bi_flags |= 1 << BIO_UPTODATE; - - /* don't know whether this is necessary: */ - bio->bi_phys_segments = 0; - bio->bi_next = NULL; - - /* these should be unchanged: */ - /* bio->bi_end_io = drbd_endio_write_sec; */ - /* bio->bi_vcnt = whatever; */ e->w.cb = e_end_block; - - /* This is no longer a barrier request. */ - bio->bi_rw &= ~(1UL << BIO_RW_BARRIER); - - drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, bio); - + if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_DT_WR) != 0) { + /* drbd_submit_ee fails for one reason only: + * if was not able to allocate sufficient bios. + * requeue, try again later. */ + e->w.cb = w_e_reissue; + drbd_queue_work(&mdev->data.work, &e->w); + } return 1; } @@ -1261,13 +1356,13 @@ static int receive_Barrier(struct drbd_conf *mdev, struct p_header *h) static struct drbd_epoch_entry * read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local) { + const sector_t capacity = drbd_get_capacity(mdev->this_bdev); struct drbd_epoch_entry *e; - struct bio_vec *bvec; struct page *page; - struct bio *bio; - int dgs, ds, i, rr; + int dgs, ds, rr; void *dig_in = mdev->int_dig_in; void *dig_vv = mdev->int_dig_vv; + unsigned long *data; dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ? crypto_hash_digestsize(mdev->integrity_r_tfm) : 0; @@ -1286,29 +1381,44 @@ read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __ ERR_IF(data_size & 0x1ff) return NULL; ERR_IF(data_size > DRBD_MAX_SEGMENT_SIZE) return NULL; + /* even though we trust out peer, + * we sometimes have to double check. */ + if (sector + (data_size>>9) > capacity) { + dev_err(DEV, "capacity: %llus < sector: %llus + size: %u\n", + (unsigned long long)capacity, + (unsigned long long)sector, data_size); + return NULL; + } + /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD * "criss-cross" setup, that might cause write-out on some other DRBD, * which in turn might block on the other node at this very place. */ e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO); if (!e) return NULL; - bio = e->private_bio; + ds = data_size; - bio_for_each_segment(bvec, bio, i) { - page = bvec->bv_page; - rr = drbd_recv(mdev, kmap(page), min_t(int, ds, PAGE_SIZE)); + page = e->pages; + page_chain_for_each(page) { + unsigned len = min_t(int, ds, PAGE_SIZE); + data = kmap(page); + rr = drbd_recv(mdev, data, len); + if (FAULT_ACTIVE(mdev, DRBD_FAULT_RECEIVE)) { + dev_err(DEV, "Fault injection: Corrupting data on receive\n"); + data[0] = data[0] ^ (unsigned long)-1; + } kunmap(page); - if (rr != min_t(int, ds, PAGE_SIZE)) { + if (rr != len) { drbd_free_ee(mdev, e); dev_warn(DEV, "short read receiving data: read %d expected %d\n", - rr, min_t(int, ds, PAGE_SIZE)); + rr, len); return NULL; } ds -= rr; } if (dgs) { - drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv); + drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv); if (memcmp(dig_in, dig_vv, dgs)) { dev_err(DEV, "Digest integrity check FAILED.\n"); drbd_bcast_ee(mdev, "digest failed", @@ -1330,7 +1440,10 @@ static int drbd_drain_block(struct drbd_conf *mdev, int data_size) int rr, rv = 1; void *data; - page = drbd_pp_alloc(mdev, 1); + if (!data_size) + return TRUE; + + page = drbd_pp_alloc(mdev, 1, 1); data = kmap(page); while (data_size) { @@ -1394,7 +1507,7 @@ static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req, } if (dgs) { - drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv); + drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv); if (memcmp(dig_in, dig_vv, dgs)) { dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n"); return 0; @@ -1415,7 +1528,7 @@ static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int u D_ASSERT(hlist_unhashed(&e->colision)); - if (likely(drbd_bio_uptodate(e->private_bio))) { + if (likely((e->flags & EE_WAS_ERROR) == 0)) { drbd_set_in_sync(mdev, sector, e->size); ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e); } else { @@ -1434,30 +1547,28 @@ static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_si struct drbd_epoch_entry *e; e = read_in_block(mdev, ID_SYNCER, sector, data_size); - if (!e) { - put_ldev(mdev); - return FALSE; - } + if (!e) + goto fail; dec_rs_pending(mdev); - e->private_bio->bi_end_io = drbd_endio_write_sec; - e->private_bio->bi_rw = WRITE; - e->w.cb = e_end_resync_block; - inc_unacked(mdev); /* corresponding dec_unacked() in e_end_resync_block() * respective _drbd_clear_done_ee */ + e->w.cb = e_end_resync_block; + spin_lock_irq(&mdev->req_lock); list_add(&e->w.list, &mdev->sync_ee); spin_unlock_irq(&mdev->req_lock); - drbd_generic_make_request(mdev, DRBD_FAULT_RS_WR, e->private_bio); - /* accounting done in endio */ + if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0) + return TRUE; - maybe_kick_lo(mdev); - return TRUE; + drbd_free_ee(mdev, e); +fail: + put_ldev(mdev); + return FALSE; } static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h) @@ -1552,7 +1663,7 @@ static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel) } if (mdev->net_conf->wire_protocol == DRBD_PROT_C) { - if (likely(drbd_bio_uptodate(e->private_bio))) { + if (likely((e->flags & EE_WAS_ERROR) == 0)) { pcmd = (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn <= C_PAUSED_SYNC_T && e->flags & EE_MAY_SET_IN_SYNC) ? @@ -1698,7 +1809,6 @@ static int receive_Data(struct drbd_conf *mdev, struct p_header *h) return FALSE; } - e->private_bio->bi_end_io = drbd_endio_write_sec; e->w.cb = e_end_block; spin_lock(&mdev->epoch_lock); @@ -1894,12 +2004,8 @@ static int receive_Data(struct drbd_conf *mdev, struct p_header *h) drbd_al_begin_io(mdev, e->sector); } - e->private_bio->bi_rw = rw; - drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, e->private_bio); - /* accounting done in endio */ - - maybe_kick_lo(mdev); - return TRUE; + if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0) + return TRUE; out_interrupted: /* yes, the epoch_size now is imbalanced. @@ -1945,7 +2051,7 @@ static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h) "no local data.\n"); drbd_send_ack_rp(mdev, h->command == P_DATA_REQUEST ? P_NEG_DREPLY : P_NEG_RS_DREPLY , p); - return TRUE; + return drbd_drain_block(mdev, h->length - brps); } /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD @@ -1957,9 +2063,6 @@ static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h) return FALSE; } - e->private_bio->bi_rw = READ; - e->private_bio->bi_end_io = drbd_endio_read_sec; - switch (h->command) { case P_DATA_REQUEST: e->w.cb = w_e_end_data_req; @@ -2053,10 +2156,8 @@ static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h) inc_unacked(mdev); - drbd_generic_make_request(mdev, fault_type, e->private_bio); - maybe_kick_lo(mdev); - - return TRUE; + if (drbd_submit_ee(mdev, e, READ, fault_type) == 0) + return TRUE; out_free_e: kfree(di); @@ -2473,6 +2574,9 @@ static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_rol hg > 0 ? "source" : "target"); } + if (abs(hg) == 100) + drbd_khelper(mdev, "initial-split-brain"); + if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) { int pcount = (mdev->state.role == R_PRIMARY) + (peer_role == R_PRIMARY); @@ -2518,7 +2622,7 @@ static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_rol * after an attempted attach on a diskless node. * We just refuse to attach -- well, we drop the "connection" * to that disk, in a way... */ - dev_alert(DEV, "Split-Brain detected, dropping connection!\n"); + dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n"); drbd_khelper(mdev, "split-brain"); return C_MASK; } @@ -2849,7 +2953,7 @@ static int receive_sizes(struct drbd_conf *mdev, struct p_header *h) unsigned int max_seg_s; sector_t p_size, p_usize, my_usize; int ldsc = 0; /* local disk size changed */ - enum drbd_conns nconn; + enum dds_flags ddsf; ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE; if (drbd_recv(mdev, h->payload, h->length) != h->length) @@ -2905,8 +3009,9 @@ static int receive_sizes(struct drbd_conf *mdev, struct p_header *h) } #undef min_not_zero + ddsf = be16_to_cpu(p->dds_flags); if (get_ldev(mdev)) { - dd = drbd_determin_dev_size(mdev, 0); + dd = drbd_determin_dev_size(mdev, ddsf); put_ldev(mdev); if (dd == dev_size_error) return FALSE; @@ -2916,33 +3021,21 @@ static int receive_sizes(struct drbd_conf *mdev, struct p_header *h) drbd_set_my_capacity(mdev, p_size); } - if (mdev->p_uuid && mdev->state.conn <= C_CONNECTED && get_ldev(mdev)) { - nconn = drbd_sync_handshake(mdev, - mdev->state.peer, mdev->state.pdsk); - put_ldev(mdev); - - if (nconn == C_MASK) { - drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); - return FALSE; - } - - if (drbd_request_state(mdev, NS(conn, nconn)) < SS_SUCCESS) { - drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); - return FALSE; - } - } - if (get_ldev(mdev)) { if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) { mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev); ldsc = 1; } - max_seg_s = be32_to_cpu(p->max_segment_size); + if (mdev->agreed_pro_version < 94) + max_seg_s = be32_to_cpu(p->max_segment_size); + else /* drbd 8.3.8 onwards */ + max_seg_s = DRBD_MAX_SEGMENT_SIZE; + if (max_seg_s != queue_max_segment_size(mdev->rq_queue)) drbd_setup_queue_param(mdev, max_seg_s); - drbd_setup_order_type(mdev, be32_to_cpu(p->queue_order_type)); + drbd_setup_order_type(mdev, be16_to_cpu(p->queue_order_type)); put_ldev(mdev); } @@ -2951,14 +3044,17 @@ static int receive_sizes(struct drbd_conf *mdev, struct p_header *h) drbd_get_capacity(mdev->this_bdev) || ldsc) { /* we have different sizes, probably peer * needs to know my new size... */ - drbd_send_sizes(mdev, 0); + drbd_send_sizes(mdev, 0, ddsf); } if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) || (dd == grew && mdev->state.conn == C_CONNECTED)) { if (mdev->state.pdsk >= D_INCONSISTENT && - mdev->state.disk >= D_INCONSISTENT) - resync_after_online_grow(mdev); - else + mdev->state.disk >= D_INCONSISTENT) { + if (ddsf & DDSF_NO_RESYNC) + dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n"); + else + resync_after_online_grow(mdev); + } else set_bit(RESYNC_AFTER_NEG, &mdev->flags); } } @@ -3490,6 +3586,92 @@ static int receive_UnplugRemote(struct drbd_conf *mdev, struct p_header *h) return TRUE; } +static void timeval_sub_us(struct timeval* tv, unsigned int us) +{ + tv->tv_sec -= us / 1000000; + us = us % 1000000; + if (tv->tv_usec > us) { + tv->tv_usec += 1000000; + tv->tv_sec--; + } + tv->tv_usec -= us; +} + +static void got_delay_probe(struct drbd_conf *mdev, int from, struct p_delay_probe *p) +{ + struct delay_probe *dp; + struct list_head *le; + struct timeval now; + int seq_num; + int offset; + int data_delay; + + seq_num = be32_to_cpu(p->seq_num); + offset = be32_to_cpu(p->offset); + + spin_lock(&mdev->peer_seq_lock); + if (!list_empty(&mdev->delay_probes)) { + if (from == USE_DATA_SOCKET) + le = mdev->delay_probes.next; + else + le = mdev->delay_probes.prev; + + dp = list_entry(le, struct delay_probe, list); + + if (dp->seq_num == seq_num) { + list_del(le); + spin_unlock(&mdev->peer_seq_lock); + do_gettimeofday(&now); + timeval_sub_us(&now, offset); + data_delay = + now.tv_usec - dp->time.tv_usec + + (now.tv_sec - dp->time.tv_sec) * 1000000; + + if (data_delay > 0) + mdev->data_delay = data_delay; + + kfree(dp); + return; + } + + if (dp->seq_num > seq_num) { + spin_unlock(&mdev->peer_seq_lock); + dev_warn(DEV, "Previous allocation failure of struct delay_probe?\n"); + return; /* Do not alloca a struct delay_probe.... */ + } + } + spin_unlock(&mdev->peer_seq_lock); + + dp = kmalloc(sizeof(struct delay_probe), GFP_NOIO); + if (!dp) { + dev_warn(DEV, "Failed to allocate a struct delay_probe, do not worry.\n"); + return; + } + + dp->seq_num = seq_num; + do_gettimeofday(&dp->time); + timeval_sub_us(&dp->time, offset); + + spin_lock(&mdev->peer_seq_lock); + if (from == USE_DATA_SOCKET) + list_add(&dp->list, &mdev->delay_probes); + else + list_add_tail(&dp->list, &mdev->delay_probes); + spin_unlock(&mdev->peer_seq_lock); +} + +static int receive_delay_probe(struct drbd_conf *mdev, struct p_header *h) +{ + struct p_delay_probe *p = (struct p_delay_probe *)h; + + ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE; + if (drbd_recv(mdev, h->payload, h->length) != h->length) + return FALSE; + + got_delay_probe(mdev, USE_DATA_SOCKET, p); + return TRUE; +} + typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, struct p_header *); static drbd_cmd_handler_f drbd_default_handler[] = { @@ -3513,6 +3695,7 @@ static drbd_cmd_handler_f drbd_default_handler[] = { [P_OV_REQUEST] = receive_DataRequest, [P_OV_REPLY] = receive_DataRequest, [P_CSUM_RS_REQUEST] = receive_DataRequest, + [P_DELAY_PROBE] = receive_delay_probe, /* anything missing from this table is in * the asender_tbl, see get_asender_cmd */ [P_MAX_CMD] = NULL, @@ -3739,7 +3922,7 @@ static void drbd_disconnect(struct drbd_conf *mdev) dev_info(DEV, "net_ee not empty, killed %u entries\n", i); i = atomic_read(&mdev->pp_in_use); if (i) - dev_info(DEV, "pp_in_use = %u, expected 0\n", i); + dev_info(DEV, "pp_in_use = %d, expected 0\n", i); D_ASSERT(list_empty(&mdev->read_ee)); D_ASSERT(list_empty(&mdev->active_ee)); @@ -4232,7 +4415,6 @@ static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header *h) sector = be64_to_cpu(p->sector); size = be32_to_cpu(p->blksize); - D_ASSERT(p->block_id == ID_SYNCER); update_peer_seq(mdev, be32_to_cpu(p->seq_num)); @@ -4290,6 +4472,14 @@ static int got_OVResult(struct drbd_conf *mdev, struct p_header *h) return TRUE; } +static int got_delay_probe_m(struct drbd_conf *mdev, struct p_header *h) +{ + struct p_delay_probe *p = (struct p_delay_probe *)h; + + got_delay_probe(mdev, USE_META_SOCKET, p); + return TRUE; +} + struct asender_cmd { size_t pkt_size; int (*process)(struct drbd_conf *mdev, struct p_header *h); @@ -4314,6 +4504,7 @@ static struct asender_cmd *get_asender_cmd(int cmd) [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck }, [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply }, [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync }, + [P_DELAY_PROBE] = { sizeof(struct p_delay_probe), got_delay_probe_m }, [P_MAX_CMD] = { 0, NULL }, }; if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL) diff --git a/drivers/block/drbd/drbd_req.c b/drivers/block/drbd/drbd_req.c index de81ab7b4627..654f1ef5cbb0 100644 --- a/drivers/block/drbd/drbd_req.c +++ b/drivers/block/drbd/drbd_req.c @@ -102,32 +102,7 @@ static void _req_is_done(struct drbd_conf *mdev, struct drbd_request *req, const } } - /* if it was a local io error, we want to notify our - * peer about that, and see if we need to - * detach the disk and stuff. - * to avoid allocating some special work - * struct, reuse the request. */ - - /* THINK - * why do we do this not when we detect the error, - * but delay it until it is "done", i.e. possibly - * until the next barrier ack? */ - - if (rw == WRITE && - ((s & RQ_LOCAL_MASK) && !(s & RQ_LOCAL_OK))) { - if (!(req->w.list.next == LIST_POISON1 || - list_empty(&req->w.list))) { - /* DEBUG ASSERT only; if this triggers, we - * probably corrupt the worker list here */ - dev_err(DEV, "req->w.list.next = %p\n", req->w.list.next); - dev_err(DEV, "req->w.list.prev = %p\n", req->w.list.prev); - } - req->w.cb = w_io_error; - drbd_queue_work(&mdev->data.work, &req->w); - /* drbd_req_free() is done in w_io_error */ - } else { - drbd_req_free(req); - } + drbd_req_free(req); } static void queue_barrier(struct drbd_conf *mdev) @@ -453,9 +428,6 @@ void __req_mod(struct drbd_request *req, enum drbd_req_event what, req->rq_state |= RQ_LOCAL_COMPLETED; req->rq_state &= ~RQ_LOCAL_PENDING; - dev_alert(DEV, "Local WRITE failed sec=%llus size=%u\n", - (unsigned long long)req->sector, req->size); - /* and now: check how to handle local io error. */ __drbd_chk_io_error(mdev, FALSE); _req_may_be_done(req, m); put_ldev(mdev); @@ -475,22 +447,21 @@ void __req_mod(struct drbd_request *req, enum drbd_req_event what, req->rq_state |= RQ_LOCAL_COMPLETED; req->rq_state &= ~RQ_LOCAL_PENDING; - dev_alert(DEV, "Local READ failed sec=%llus size=%u\n", - (unsigned long long)req->sector, req->size); - /* _req_mod(req,to_be_send); oops, recursion... */ D_ASSERT(!(req->rq_state & RQ_NET_MASK)); - req->rq_state |= RQ_NET_PENDING; - inc_ap_pending(mdev); __drbd_chk_io_error(mdev, FALSE); put_ldev(mdev); - /* NOTE: if we have no connection, - * or know the peer has no good data either, - * then we don't actually need to "queue_for_net_read", - * but we do so anyways, since the drbd_io_error() - * and the potential state change to "Diskless" - * needs to be done from process context */ + /* no point in retrying if there is no good remote data, + * or we have no connection. */ + if (mdev->state.pdsk != D_UP_TO_DATE) { + _req_may_be_done(req, m); + break; + } + + /* _req_mod(req,to_be_send); oops, recursion... */ + req->rq_state |= RQ_NET_PENDING; + inc_ap_pending(mdev); /* fall through: _req_mod(req,queue_for_net_read); */ case queue_for_net_read: @@ -600,6 +571,9 @@ void __req_mod(struct drbd_request *req, enum drbd_req_event what, _req_may_be_done(req, m); break; + case read_retry_remote_canceled: + req->rq_state &= ~RQ_NET_QUEUED; + /* fall through, in case we raced with drbd_disconnect */ case connection_lost_while_pending: /* transfer log cleanup after connection loss */ /* assert something? */ @@ -722,6 +696,7 @@ static int drbd_make_request_common(struct drbd_conf *mdev, struct bio *bio) struct drbd_request *req; int local, remote; int err = -EIO; + int ret = 0; /* allocate outside of all locks; */ req = drbd_req_new(mdev, bio); @@ -784,7 +759,7 @@ static int drbd_make_request_common(struct drbd_conf *mdev, struct bio *bio) (mdev->state.pdsk == D_INCONSISTENT && mdev->state.conn >= C_CONNECTED)); - if (!(local || remote)) { + if (!(local || remote) && !mdev->state.susp) { dev_err(DEV, "IO ERROR: neither local nor remote disk\n"); goto fail_free_complete; } @@ -810,6 +785,16 @@ allocate_barrier: /* GOOD, everything prepared, grab the spin_lock */ spin_lock_irq(&mdev->req_lock); + if (mdev->state.susp) { + /* If we got suspended, use the retry mechanism of + generic_make_request() to restart processing of this + bio. In the next call to drbd_make_request_26 + we sleep in inc_ap_bio() */ + ret = 1; + spin_unlock_irq(&mdev->req_lock); + goto fail_free_complete; + } + if (remote) { remote = (mdev->state.pdsk == D_UP_TO_DATE || (mdev->state.pdsk == D_INCONSISTENT && @@ -947,12 +932,14 @@ fail_and_free_req: req->private_bio = NULL; put_ldev(mdev); } - bio_endio(bio, err); + if (!ret) + bio_endio(bio, err); + drbd_req_free(req); dec_ap_bio(mdev); kfree(b); - return 0; + return ret; } /* helper function for drbd_make_request @@ -962,11 +949,6 @@ fail_and_free_req: */ static int drbd_fail_request_early(struct drbd_conf *mdev, int is_write) { - /* Unconfigured */ - if (mdev->state.conn == C_DISCONNECTING && - mdev->state.disk == D_DISKLESS) - return 1; - if (mdev->state.role != R_PRIMARY && (!allow_oos || is_write)) { if (__ratelimit(&drbd_ratelimit_state)) { @@ -1070,15 +1052,21 @@ int drbd_make_request_26(struct request_queue *q, struct bio *bio) /* we need to get a "reference count" (ap_bio_cnt) * to avoid races with the disconnect/reconnect/suspend code. - * In case we need to split the bio here, we need to get two references + * In case we need to split the bio here, we need to get three references * atomically, otherwise we might deadlock when trying to submit the * second one! */ - inc_ap_bio(mdev, 2); + inc_ap_bio(mdev, 3); D_ASSERT(e_enr == s_enr + 1); - drbd_make_request_common(mdev, &bp->bio1); - drbd_make_request_common(mdev, &bp->bio2); + while (drbd_make_request_common(mdev, &bp->bio1)) + inc_ap_bio(mdev, 1); + + while (drbd_make_request_common(mdev, &bp->bio2)) + inc_ap_bio(mdev, 1); + + dec_ap_bio(mdev); + bio_pair_release(bp); } return 0; @@ -1115,7 +1103,7 @@ int drbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bvm, struct } else if (limit && get_ldev(mdev)) { struct request_queue * const b = mdev->ldev->backing_bdev->bd_disk->queue; - if (b->merge_bvec_fn && mdev->ldev->dc.use_bmbv) { + if (b->merge_bvec_fn) { backing_limit = b->merge_bvec_fn(b, bvm, bvec); limit = min(limit, backing_limit); } diff --git a/drivers/block/drbd/drbd_req.h b/drivers/block/drbd/drbd_req.h index 16119d7056cc..02d575d24518 100644 --- a/drivers/block/drbd/drbd_req.h +++ b/drivers/block/drbd/drbd_req.h @@ -91,6 +91,7 @@ enum drbd_req_event { send_failed, handed_over_to_network, connection_lost_while_pending, + read_retry_remote_canceled, recv_acked_by_peer, write_acked_by_peer, write_acked_by_peer_and_sis, /* and set_in_sync */ diff --git a/drivers/block/drbd/drbd_strings.c b/drivers/block/drbd/drbd_strings.c index 76863e3f05be..85179e1fb50a 100644 --- a/drivers/block/drbd/drbd_strings.c +++ b/drivers/block/drbd/drbd_strings.c @@ -70,7 +70,7 @@ static const char *drbd_disk_s_names[] = { static const char *drbd_state_sw_errors[] = { [-SS_TWO_PRIMARIES] = "Multiple primaries not allowed by config", - [-SS_NO_UP_TO_DATE_DISK] = "Refusing to be Primary without at least one UpToDate disk", + [-SS_NO_UP_TO_DATE_DISK] = "Need access to UpToDate data", [-SS_NO_LOCAL_DISK] = "Can not resync without local disk", [-SS_NO_REMOTE_DISK] = "Can not resync without remote disk", [-SS_CONNECTED_OUTDATES] = "Refusing to be Outdated while Connected", diff --git a/drivers/block/drbd/drbd_worker.c b/drivers/block/drbd/drbd_worker.c index d48a1dfd7b24..b623ceee2a4a 100644 --- a/drivers/block/drbd/drbd_worker.c +++ b/drivers/block/drbd/drbd_worker.c @@ -47,8 +47,7 @@ static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int ca /* defined here: drbd_md_io_complete - drbd_endio_write_sec - drbd_endio_read_sec + drbd_endio_sec drbd_endio_pri * more endio handlers: @@ -85,27 +84,10 @@ void drbd_md_io_complete(struct bio *bio, int error) /* reads on behalf of the partner, * "submitted" by the receiver */ -void drbd_endio_read_sec(struct bio *bio, int error) __releases(local) +void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local) { unsigned long flags = 0; - struct drbd_epoch_entry *e = NULL; - struct drbd_conf *mdev; - int uptodate = bio_flagged(bio, BIO_UPTODATE); - - e = bio->bi_private; - mdev = e->mdev; - - if (error) - dev_warn(DEV, "read: error=%d s=%llus\n", error, - (unsigned long long)e->sector); - if (!error && !uptodate) { - dev_warn(DEV, "read: setting error to -EIO s=%llus\n", - (unsigned long long)e->sector); - /* strange behavior of some lower level drivers... - * fail the request by clearing the uptodate flag, - * but do not return any error?! */ - error = -EIO; - } + struct drbd_conf *mdev = e->mdev; D_ASSERT(e->block_id != ID_VACANT); @@ -114,49 +96,38 @@ void drbd_endio_read_sec(struct bio *bio, int error) __releases(local) list_del(&e->w.list); if (list_empty(&mdev->read_ee)) wake_up(&mdev->ee_wait); + if (test_bit(__EE_WAS_ERROR, &e->flags)) + __drbd_chk_io_error(mdev, FALSE); spin_unlock_irqrestore(&mdev->req_lock, flags); - drbd_chk_io_error(mdev, error, FALSE); drbd_queue_work(&mdev->data.work, &e->w); put_ldev(mdev); } +static int is_failed_barrier(int ee_flags) +{ + return (ee_flags & (EE_IS_BARRIER|EE_WAS_ERROR|EE_RESUBMITTED)) + == (EE_IS_BARRIER|EE_WAS_ERROR); +} + /* writes on behalf of the partner, or resync writes, - * "submitted" by the receiver. - */ -void drbd_endio_write_sec(struct bio *bio, int error) __releases(local) + * "submitted" by the receiver, final stage. */ +static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local) { unsigned long flags = 0; - struct drbd_epoch_entry *e = NULL; - struct drbd_conf *mdev; + struct drbd_conf *mdev = e->mdev; sector_t e_sector; int do_wake; int is_syncer_req; int do_al_complete_io; - int uptodate = bio_flagged(bio, BIO_UPTODATE); - int is_barrier = bio_rw_flagged(bio, BIO_RW_BARRIER); - - e = bio->bi_private; - mdev = e->mdev; - if (error) - dev_warn(DEV, "write: error=%d s=%llus\n", error, - (unsigned long long)e->sector); - if (!error && !uptodate) { - dev_warn(DEV, "write: setting error to -EIO s=%llus\n", - (unsigned long long)e->sector); - /* strange behavior of some lower level drivers... - * fail the request by clearing the uptodate flag, - * but do not return any error?! */ - error = -EIO; - } - - /* error == -ENOTSUPP would be a better test, - * alas it is not reliable */ - if (error && is_barrier && e->flags & EE_IS_BARRIER) { + /* if this is a failed barrier request, disable use of barriers, + * and schedule for resubmission */ + if (is_failed_barrier(e->flags)) { drbd_bump_write_ordering(mdev, WO_bdev_flush); spin_lock_irqsave(&mdev->req_lock, flags); list_del(&e->w.list); + e->flags = (e->flags & ~EE_WAS_ERROR) | EE_RESUBMITTED; e->w.cb = w_e_reissue; /* put_ldev actually happens below, once we come here again. */ __release(local); @@ -167,17 +138,16 @@ void drbd_endio_write_sec(struct bio *bio, int error) __releases(local) D_ASSERT(e->block_id != ID_VACANT); - spin_lock_irqsave(&mdev->req_lock, flags); - mdev->writ_cnt += e->size >> 9; - is_syncer_req = is_syncer_block_id(e->block_id); - /* after we moved e to done_ee, * we may no longer access it, * it may be freed/reused already! * (as soon as we release the req_lock) */ e_sector = e->sector; do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO; + is_syncer_req = is_syncer_block_id(e->block_id); + spin_lock_irqsave(&mdev->req_lock, flags); + mdev->writ_cnt += e->size >> 9; list_del(&e->w.list); /* has been on active_ee or sync_ee */ list_add_tail(&e->w.list, &mdev->done_ee); @@ -190,7 +160,7 @@ void drbd_endio_write_sec(struct bio *bio, int error) __releases(local) ? list_empty(&mdev->sync_ee) : list_empty(&mdev->active_ee); - if (error) + if (test_bit(__EE_WAS_ERROR, &e->flags)) __drbd_chk_io_error(mdev, FALSE); spin_unlock_irqrestore(&mdev->req_lock, flags); @@ -205,7 +175,42 @@ void drbd_endio_write_sec(struct bio *bio, int error) __releases(local) wake_asender(mdev); put_ldev(mdev); +} + +/* writes on behalf of the partner, or resync writes, + * "submitted" by the receiver. + */ +void drbd_endio_sec(struct bio *bio, int error) +{ + struct drbd_epoch_entry *e = bio->bi_private; + struct drbd_conf *mdev = e->mdev; + int uptodate = bio_flagged(bio, BIO_UPTODATE); + int is_write = bio_data_dir(bio) == WRITE; + + if (error) + dev_warn(DEV, "%s: error=%d s=%llus\n", + is_write ? "write" : "read", error, + (unsigned long long)e->sector); + if (!error && !uptodate) { + dev_warn(DEV, "%s: setting error to -EIO s=%llus\n", + is_write ? "write" : "read", + (unsigned long long)e->sector); + /* strange behavior of some lower level drivers... + * fail the request by clearing the uptodate flag, + * but do not return any error?! */ + error = -EIO; + } + + if (error) + set_bit(__EE_WAS_ERROR, &e->flags); + bio_put(bio); /* no need for the bio anymore */ + if (atomic_dec_and_test(&e->pending_bios)) { + if (is_write) + drbd_endio_write_sec_final(e); + else + drbd_endio_read_sec_final(e); + } } /* read, readA or write requests on R_PRIMARY coming from drbd_make_request @@ -219,9 +224,6 @@ void drbd_endio_pri(struct bio *bio, int error) enum drbd_req_event what; int uptodate = bio_flagged(bio, BIO_UPTODATE); - if (error) - dev_warn(DEV, "p %s: error=%d\n", - bio_data_dir(bio) == WRITE ? "write" : "read", error); if (!error && !uptodate) { dev_warn(DEV, "p %s: setting error to -EIO\n", bio_data_dir(bio) == WRITE ? "write" : "read"); @@ -252,20 +254,6 @@ void drbd_endio_pri(struct bio *bio, int error) complete_master_bio(mdev, &m); } -int w_io_error(struct drbd_conf *mdev, struct drbd_work *w, int cancel) -{ - struct drbd_request *req = container_of(w, struct drbd_request, w); - - /* NOTE: mdev->ldev can be NULL by the time we get here! */ - /* D_ASSERT(mdev->ldev->dc.on_io_error != EP_PASS_ON); */ - - /* the only way this callback is scheduled is from _req_may_be_done, - * when it is done and had a local write error, see comments there */ - drbd_req_free(req); - - return TRUE; -} - int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel) { struct drbd_request *req = container_of(w, struct drbd_request, w); @@ -275,12 +263,9 @@ int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel) * to give the disk the chance to relocate that block */ spin_lock_irq(&mdev->req_lock); - if (cancel || - mdev->state.conn < C_CONNECTED || - mdev->state.pdsk <= D_INCONSISTENT) { - _req_mod(req, send_canceled); + if (cancel || mdev->state.pdsk != D_UP_TO_DATE) { + _req_mod(req, read_retry_remote_canceled); spin_unlock_irq(&mdev->req_lock); - dev_alert(DEV, "WE ARE LOST. Local IO failure, no peer.\n"); return 1; } spin_unlock_irq(&mdev->req_lock); @@ -295,7 +280,34 @@ int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel) return 1; /* Simply ignore this! */ } -void drbd_csum(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest) +void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest) +{ + struct hash_desc desc; + struct scatterlist sg; + struct page *page = e->pages; + struct page *tmp; + unsigned len; + + desc.tfm = tfm; + desc.flags = 0; + + sg_init_table(&sg, 1); + crypto_hash_init(&desc); + + while ((tmp = page_chain_next(page))) { + /* all but the last page will be fully used */ + sg_set_page(&sg, page, PAGE_SIZE, 0); + crypto_hash_update(&desc, &sg, sg.length); + page = tmp; + } + /* and now the last, possibly only partially used page */ + len = e->size & (PAGE_SIZE - 1); + sg_set_page(&sg, page, len ?: PAGE_SIZE, 0); + crypto_hash_update(&desc, &sg, sg.length); + crypto_hash_final(&desc, digest); +} + +void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest) { struct hash_desc desc; struct scatterlist sg; @@ -329,11 +341,11 @@ static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel return 1; } - if (likely(drbd_bio_uptodate(e->private_bio))) { + if (likely((e->flags & EE_WAS_ERROR) == 0)) { digest_size = crypto_hash_digestsize(mdev->csums_tfm); digest = kmalloc(digest_size, GFP_NOIO); if (digest) { - drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest); + drbd_csum_ee(mdev, mdev->csums_tfm, e, digest); inc_rs_pending(mdev); ok = drbd_send_drequest_csum(mdev, @@ -369,23 +381,21 @@ static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size) /* GFP_TRY, because if there is no memory available right now, this may * be rescheduled for later. It is "only" background resync, after all. */ e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY); - if (!e) { - put_ldev(mdev); - return 2; - } + if (!e) + goto fail; spin_lock_irq(&mdev->req_lock); list_add(&e->w.list, &mdev->read_ee); spin_unlock_irq(&mdev->req_lock); - e->private_bio->bi_end_io = drbd_endio_read_sec; - e->private_bio->bi_rw = READ; e->w.cb = w_e_send_csum; + if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0) + return 1; - mdev->read_cnt += size >> 9; - drbd_generic_make_request(mdev, DRBD_FAULT_RS_RD, e->private_bio); - - return 1; + drbd_free_ee(mdev, e); +fail: + put_ldev(mdev); + return 2; } void resync_timer_fn(unsigned long data) @@ -414,13 +424,25 @@ void resync_timer_fn(unsigned long data) drbd_queue_work(&mdev->data.work, &mdev->resync_work); } +static int calc_resync_rate(struct drbd_conf *mdev) +{ + int d = mdev->data_delay / 1000; /* us -> ms */ + int td = mdev->sync_conf.throttle_th * 100; /* 0.1s -> ms */ + int hd = mdev->sync_conf.hold_off_th * 100; /* 0.1s -> ms */ + int cr = mdev->sync_conf.rate; + + return d <= td ? cr : + d >= hd ? 0 : + cr + (cr * (td - d) / (hd - td)); +} + int w_make_resync_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel) { unsigned long bit; sector_t sector; const sector_t capacity = drbd_get_capacity(mdev->this_bdev); - int max_segment_size = queue_max_segment_size(mdev->rq_queue); + int max_segment_size; int number, i, size, pe, mx; int align, queued, sndbuf; @@ -446,7 +468,13 @@ int w_make_resync_request(struct drbd_conf *mdev, return 1; } - number = SLEEP_TIME * mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ); + /* starting with drbd 8.3.8, we can handle multi-bio EEs, + * if it should be necessary */ + max_segment_size = mdev->agreed_pro_version < 94 ? + queue_max_segment_size(mdev->rq_queue) : DRBD_MAX_SEGMENT_SIZE; + + mdev->c_sync_rate = calc_resync_rate(mdev); + number = SLEEP_TIME * mdev->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ); pe = atomic_read(&mdev->rs_pending_cnt); mutex_lock(&mdev->data.mutex); @@ -509,12 +537,6 @@ next_sector: * * Additionally always align bigger requests, in order to * be prepared for all stripe sizes of software RAIDs. - * - * we _do_ care about the agreed-upon q->max_segment_size - * here, as splitting up the requests on the other side is more - * difficult. the consequence is, that on lvm and md and other - * "indirect" devices, this is dead code, since - * q->max_segment_size will be PAGE_SIZE. */ align = 1; for (;;) { @@ -806,7 +828,7 @@ out: /* helper */ static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e) { - if (drbd_bio_has_active_page(e->private_bio)) { + if (drbd_ee_has_active_page(e)) { /* This might happen if sendpage() has not finished */ spin_lock_irq(&mdev->req_lock); list_add_tail(&e->w.list, &mdev->net_ee); @@ -832,7 +854,7 @@ int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) return 1; } - if (likely(drbd_bio_uptodate(e->private_bio))) { + if (likely((e->flags & EE_WAS_ERROR) == 0)) { ok = drbd_send_block(mdev, P_DATA_REPLY, e); } else { if (__ratelimit(&drbd_ratelimit_state)) @@ -873,7 +895,7 @@ int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) put_ldev(mdev); } - if (likely(drbd_bio_uptodate(e->private_bio))) { + if (likely((e->flags & EE_WAS_ERROR) == 0)) { if (likely(mdev->state.pdsk >= D_INCONSISTENT)) { inc_rs_pending(mdev); ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e); @@ -921,7 +943,7 @@ int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) di = (struct digest_info *)(unsigned long)e->block_id; - if (likely(drbd_bio_uptodate(e->private_bio))) { + if (likely((e->flags & EE_WAS_ERROR) == 0)) { /* quick hack to try to avoid a race against reconfiguration. * a real fix would be much more involved, * introducing more locking mechanisms */ @@ -931,7 +953,7 @@ int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) digest = kmalloc(digest_size, GFP_NOIO); } if (digest) { - drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest); + drbd_csum_ee(mdev, mdev->csums_tfm, e, digest); eq = !memcmp(digest, di->digest, digest_size); kfree(digest); } @@ -973,14 +995,14 @@ int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) if (unlikely(cancel)) goto out; - if (unlikely(!drbd_bio_uptodate(e->private_bio))) + if (unlikely((e->flags & EE_WAS_ERROR) != 0)) goto out; digest_size = crypto_hash_digestsize(mdev->verify_tfm); /* FIXME if this allocation fails, online verify will not terminate! */ digest = kmalloc(digest_size, GFP_NOIO); if (digest) { - drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest); + drbd_csum_ee(mdev, mdev->verify_tfm, e, digest); inc_rs_pending(mdev); ok = drbd_send_drequest_csum(mdev, e->sector, e->size, digest, digest_size, P_OV_REPLY); @@ -1029,11 +1051,11 @@ int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel) di = (struct digest_info *)(unsigned long)e->block_id; - if (likely(drbd_bio_uptodate(e->private_bio))) { + if (likely((e->flags & EE_WAS_ERROR) == 0)) { digest_size = crypto_hash_digestsize(mdev->verify_tfm); digest = kmalloc(digest_size, GFP_NOIO); if (digest) { - drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest); + drbd_csum_ee(mdev, mdev->verify_tfm, e, digest); D_ASSERT(digest_size == di->digest_size); eq = !memcmp(digest, di->digest, digest_size); diff --git a/drivers/block/drbd/drbd_wrappers.h b/drivers/block/drbd/drbd_wrappers.h index f93fa111ce50..defdb5013ea3 100644 --- a/drivers/block/drbd/drbd_wrappers.h +++ b/drivers/block/drbd/drbd_wrappers.h @@ -18,23 +18,9 @@ static inline void drbd_set_my_capacity(struct drbd_conf *mdev, #define drbd_bio_uptodate(bio) bio_flagged(bio, BIO_UPTODATE) -static inline int drbd_bio_has_active_page(struct bio *bio) -{ - struct bio_vec *bvec; - int i; - - __bio_for_each_segment(bvec, bio, i, 0) { - if (page_count(bvec->bv_page) > 1) - return 1; - } - - return 0; -} - /* bi_end_io handlers */ extern void drbd_md_io_complete(struct bio *bio, int error); -extern void drbd_endio_read_sec(struct bio *bio, int error); -extern void drbd_endio_write_sec(struct bio *bio, int error); +extern void drbd_endio_sec(struct bio *bio, int error); extern void drbd_endio_pri(struct bio *bio, int error); /* |