diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2020-10-15 00:05:38 +0200 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2020-10-15 00:05:38 +0200 |
commit | 4815519ed0af833884ce9c288183bf1ae3cb9caa (patch) | |
tree | 04ab0cdbe903165d7286811da64156dc2a523f73 /drivers/md/dm.c | |
parent | Merge tag 'for-linus-5.10-1' of git://github.com/cminyard/linux-ipmi (diff) | |
parent | dm: fix request-based DM to not bounce through indirect dm_submit_bio (diff) | |
download | linux-4815519ed0af833884ce9c288183bf1ae3cb9caa.tar.xz linux-4815519ed0af833884ce9c288183bf1ae3cb9caa.zip |
Merge tag 'for-5.10/dm-changes' of git://git.kernel.org/pub/scm/linux/kernel/git/device-mapper/linux-dm
Pull device mapper updates from Mike Snitzer:
- Improve DM core's bio splitting to use blk_max_size_offset(). Also
fix bio splitting for bios that were deferred to the worker thread
due to a DM device being suspended.
- Remove DM core's special handling of NVMe devices now that block core
has internalized efficiencies drivers previously needed to be
concerned about (via now removed direct_make_request).
- Fix request-based DM to not bounce through indirect dm_submit_bio;
instead have block core make direct call to blk_mq_submit_bio().
- Various DM core cleanups to simplify and improve code.
- Update DM cryot to not use drivers that set
CRYPTO_ALG_ALLOCATES_MEMORY.
- Fix DM raid's raid1 and raid10 discard limits for the purposes of
linux-stable. But then remove DM raid's discard limits settings now
that MD raid can efficiently handle large discards.
- A couple small cleanups across various targets.
* tag 'for-5.10/dm-changes' of git://git.kernel.org/pub/scm/linux/kernel/git/device-mapper/linux-dm:
dm: fix request-based DM to not bounce through indirect dm_submit_bio
dm: remove special-casing of bio-based immutable singleton target on NVMe
dm: export dm_copy_name_and_uuid
dm: fix comment in __dm_suspend()
dm: fold dm_process_bio() into dm_submit_bio()
dm: fix missing imposition of queue_limits from dm_wq_work() thread
dm snap persistent: simplify area_io()
dm thin metadata: Remove unused local variable when create thin and snap
dm raid: remove unnecessary discard limits for raid10
dm raid: fix discard limits for raid1 and raid10
dm crypt: don't use drivers that have CRYPTO_ALG_ALLOCATES_MEMORY
dm: use dm_table_get_device_name() where appropriate in targets
dm table: make 'struct dm_table' definition accessible to all of DM core
dm: eliminate need for start_io_acct() forward declaration
dm: simplify __process_abnormal_io()
dm: push use of on-stack flush_bio down to __send_empty_flush()
dm: optimize max_io_len() by inlining max_io_len_target_boundary()
dm: push md->immutable_target optimization down to __process_bio()
dm: change max_io_len() to use blk_max_size_offset()
dm table: stack 'chunk_sectors' limit to account for target-specific splitting
Diffstat (limited to 'drivers/md/dm.c')
-rw-r--r-- | drivers/md/dm.c | 404 |
1 files changed, 132 insertions, 272 deletions
diff --git a/drivers/md/dm.c b/drivers/md/dm.c index cd2b3526c07b..c18fc2548518 100644 --- a/drivers/md/dm.c +++ b/drivers/md/dm.c @@ -422,21 +422,6 @@ static void do_deferred_remove(struct work_struct *w) dm_deferred_remove(); } -sector_t dm_get_size(struct mapped_device *md) -{ - return get_capacity(md->disk); -} - -struct request_queue *dm_get_md_queue(struct mapped_device *md) -{ - return md->queue; -} - -struct dm_stats *dm_get_stats(struct mapped_device *md) -{ - return &md->stats; -} - static int dm_blk_getgeo(struct block_device *bdev, struct hd_geometry *geo) { struct mapped_device *md = bdev->bd_disk->private_data; @@ -591,7 +576,44 @@ out: return r; } -static void start_io_acct(struct dm_io *io); +u64 dm_start_time_ns_from_clone(struct bio *bio) +{ + struct dm_target_io *tio = container_of(bio, struct dm_target_io, clone); + struct dm_io *io = tio->io; + + return jiffies_to_nsecs(io->start_time); +} +EXPORT_SYMBOL_GPL(dm_start_time_ns_from_clone); + +static void start_io_acct(struct dm_io *io) +{ + struct mapped_device *md = io->md; + struct bio *bio = io->orig_bio; + + io->start_time = bio_start_io_acct(bio); + if (unlikely(dm_stats_used(&md->stats))) + dm_stats_account_io(&md->stats, bio_data_dir(bio), + bio->bi_iter.bi_sector, bio_sectors(bio), + false, 0, &io->stats_aux); +} + +static void end_io_acct(struct dm_io *io) +{ + struct mapped_device *md = io->md; + struct bio *bio = io->orig_bio; + unsigned long duration = jiffies - io->start_time; + + bio_end_io_acct(bio, io->start_time); + + if (unlikely(dm_stats_used(&md->stats))) + dm_stats_account_io(&md->stats, bio_data_dir(bio), + bio->bi_iter.bi_sector, bio_sectors(bio), + true, duration, &io->stats_aux); + + /* nudge anyone waiting on suspend queue */ + if (unlikely(wq_has_sleeper(&md->wait))) + wake_up(&md->wait); +} static struct dm_io *alloc_io(struct mapped_device *md, struct bio *bio) { @@ -657,45 +679,6 @@ static void free_tio(struct dm_target_io *tio) bio_put(&tio->clone); } -u64 dm_start_time_ns_from_clone(struct bio *bio) -{ - struct dm_target_io *tio = container_of(bio, struct dm_target_io, clone); - struct dm_io *io = tio->io; - - return jiffies_to_nsecs(io->start_time); -} -EXPORT_SYMBOL_GPL(dm_start_time_ns_from_clone); - -static void start_io_acct(struct dm_io *io) -{ - struct mapped_device *md = io->md; - struct bio *bio = io->orig_bio; - - io->start_time = bio_start_io_acct(bio); - if (unlikely(dm_stats_used(&md->stats))) - dm_stats_account_io(&md->stats, bio_data_dir(bio), - bio->bi_iter.bi_sector, bio_sectors(bio), - false, 0, &io->stats_aux); -} - -static void end_io_acct(struct dm_io *io) -{ - struct mapped_device *md = io->md; - struct bio *bio = io->orig_bio; - unsigned long duration = jiffies - io->start_time; - - bio_end_io_acct(bio, io->start_time); - - if (unlikely(dm_stats_used(&md->stats))) - dm_stats_account_io(&md->stats, bio_data_dir(bio), - bio->bi_iter.bi_sector, bio_sectors(bio), - true, duration, &io->stats_aux); - - /* nudge anyone waiting on suspend queue */ - if (unlikely(wq_has_sleeper(&md->wait))) - wake_up(&md->wait); -} - /* * Add the bio to the list of deferred io. */ @@ -992,7 +975,7 @@ static void clone_endio(struct bio *bio) dm_endio_fn endio = tio->ti->type->end_io; struct bio *orig_bio = io->orig_bio; - if (unlikely(error == BLK_STS_TARGET) && md->type != DM_TYPE_NVME_BIO_BASED) { + if (unlikely(error == BLK_STS_TARGET)) { if (bio_op(bio) == REQ_OP_DISCARD && !bio->bi_disk->queue->limits.max_discard_sectors) disable_discard(md); @@ -1041,32 +1024,28 @@ static void clone_endio(struct bio *bio) * Return maximum size of I/O possible at the supplied sector up to the current * target boundary. */ -static sector_t max_io_len_target_boundary(sector_t sector, struct dm_target *ti) +static inline sector_t max_io_len_target_boundary(struct dm_target *ti, + sector_t target_offset) { - sector_t target_offset = dm_target_offset(ti, sector); - return ti->len - target_offset; } -static sector_t max_io_len(sector_t sector, struct dm_target *ti) +static sector_t max_io_len(struct dm_target *ti, sector_t sector) { - sector_t len = max_io_len_target_boundary(sector, ti); - sector_t offset, max_len; + sector_t target_offset = dm_target_offset(ti, sector); + sector_t len = max_io_len_target_boundary(ti, target_offset); + sector_t max_len; /* * Does the target need to split even further? + * - q->limits.chunk_sectors reflects ti->max_io_len so + * blk_max_size_offset() provides required splitting. + * - blk_max_size_offset() also respects q->limits.max_sectors */ - if (ti->max_io_len) { - offset = dm_target_offset(ti, sector); - if (unlikely(ti->max_io_len & (ti->max_io_len - 1))) - max_len = sector_div(offset, ti->max_io_len); - else - max_len = offset & (ti->max_io_len - 1); - max_len = ti->max_io_len - max_len; - - if (len > max_len) - len = max_len; - } + max_len = blk_max_size_offset(ti->table->md->queue, + target_offset); + if (len > max_len) + len = max_len; return len; } @@ -1119,7 +1098,7 @@ static long dm_dax_direct_access(struct dax_device *dax_dev, pgoff_t pgoff, goto out; if (!ti->type->direct_access) goto out; - len = max_io_len(sector, ti) / PAGE_SECTORS; + len = max_io_len(ti, sector) / PAGE_SECTORS; if (len < 1) goto out; nr_pages = min(len, nr_pages); @@ -1431,6 +1410,17 @@ static int __send_empty_flush(struct clone_info *ci) { unsigned target_nr = 0; struct dm_target *ti; + struct bio flush_bio; + + /* + * Use an on-stack bio for this, it's safe since we don't + * need to reference it after submit. It's just used as + * the basis for the clone(s). + */ + bio_init(&flush_bio, NULL, 0); + flush_bio.bi_opf = REQ_OP_WRITE | REQ_PREFLUSH | REQ_SYNC; + ci->bio = &flush_bio; + ci->sector_count = 0; /* * Empty flush uses a statically initialized bio, as the base for @@ -1444,6 +1434,8 @@ static int __send_empty_flush(struct clone_info *ci) BUG_ON(bio_has_data(ci->bio)); while ((ti = dm_table_get_target(ci->map, target_nr++))) __send_duplicate_bios(ci, ti, ti->num_flush_bios, NULL); + + bio_uninit(ci->bio); return 0; } @@ -1466,28 +1458,6 @@ static int __clone_and_map_data_bio(struct clone_info *ci, struct dm_target *ti, return 0; } -typedef unsigned (*get_num_bios_fn)(struct dm_target *ti); - -static unsigned get_num_discard_bios(struct dm_target *ti) -{ - return ti->num_discard_bios; -} - -static unsigned get_num_secure_erase_bios(struct dm_target *ti) -{ - return ti->num_secure_erase_bios; -} - -static unsigned get_num_write_same_bios(struct dm_target *ti) -{ - return ti->num_write_same_bios; -} - -static unsigned get_num_write_zeroes_bios(struct dm_target *ti) -{ - return ti->num_write_zeroes_bios; -} - static int __send_changing_extent_only(struct clone_info *ci, struct dm_target *ti, unsigned num_bios) { @@ -1502,7 +1472,8 @@ static int __send_changing_extent_only(struct clone_info *ci, struct dm_target * if (!num_bios) return -EOPNOTSUPP; - len = min((sector_t)ci->sector_count, max_io_len_target_boundary(ci->sector, ti)); + len = min_t(sector_t, ci->sector_count, + max_io_len_target_boundary(ti, dm_target_offset(ti, ci->sector))); __send_duplicate_bios(ci, ti, num_bios, &len); @@ -1512,26 +1483,6 @@ static int __send_changing_extent_only(struct clone_info *ci, struct dm_target * return 0; } -static int __send_discard(struct clone_info *ci, struct dm_target *ti) -{ - return __send_changing_extent_only(ci, ti, get_num_discard_bios(ti)); -} - -static int __send_secure_erase(struct clone_info *ci, struct dm_target *ti) -{ - return __send_changing_extent_only(ci, ti, get_num_secure_erase_bios(ti)); -} - -static int __send_write_same(struct clone_info *ci, struct dm_target *ti) -{ - return __send_changing_extent_only(ci, ti, get_num_write_same_bios(ti)); -} - -static int __send_write_zeroes(struct clone_info *ci, struct dm_target *ti) -{ - return __send_changing_extent_only(ci, ti, get_num_write_zeroes_bios(ti)); -} - static bool is_abnormal_io(struct bio *bio) { bool r = false; @@ -1552,18 +1503,26 @@ static bool __process_abnormal_io(struct clone_info *ci, struct dm_target *ti, int *result) { struct bio *bio = ci->bio; + unsigned num_bios = 0; - if (bio_op(bio) == REQ_OP_DISCARD) - *result = __send_discard(ci, ti); - else if (bio_op(bio) == REQ_OP_SECURE_ERASE) - *result = __send_secure_erase(ci, ti); - else if (bio_op(bio) == REQ_OP_WRITE_SAME) - *result = __send_write_same(ci, ti); - else if (bio_op(bio) == REQ_OP_WRITE_ZEROES) - *result = __send_write_zeroes(ci, ti); - else + switch (bio_op(bio)) { + case REQ_OP_DISCARD: + num_bios = ti->num_discard_bios; + break; + case REQ_OP_SECURE_ERASE: + num_bios = ti->num_secure_erase_bios; + break; + case REQ_OP_WRITE_SAME: + num_bios = ti->num_write_same_bios; + break; + case REQ_OP_WRITE_ZEROES: + num_bios = ti->num_write_zeroes_bios; + break; + default: return false; + } + *result = __send_changing_extent_only(ci, ti, num_bios); return true; } @@ -1583,7 +1542,7 @@ static int __split_and_process_non_flush(struct clone_info *ci) if (__process_abnormal_io(ci, ti, &r)) return r; - len = min_t(sector_t, max_io_len(ci->sector, ti), ci->sector_count); + len = min_t(sector_t, max_io_len(ti, ci->sector), ci->sector_count); r = __clone_and_map_data_bio(ci, ti, ci->sector, &len); if (r < 0) @@ -1619,19 +1578,7 @@ static blk_qc_t __split_and_process_bio(struct mapped_device *md, init_clone_info(&ci, md, map, bio); if (bio->bi_opf & REQ_PREFLUSH) { - struct bio flush_bio; - - /* - * Use an on-stack bio for this, it's safe since we don't - * need to reference it after submit. It's just used as - * the basis for the clone(s). - */ - bio_init(&flush_bio, NULL, 0); - flush_bio.bi_opf = REQ_OP_WRITE | REQ_PREFLUSH | REQ_SYNC; - ci.bio = &flush_bio; - ci.sector_count = 0; error = __send_empty_flush(&ci); - bio_uninit(ci.bio); /* dec_pending submits any data associated with flush */ } else if (op_is_zone_mgmt(bio_op(bio))) { ci.bio = bio; @@ -1680,88 +1627,6 @@ static blk_qc_t __split_and_process_bio(struct mapped_device *md, return ret; } -/* - * Optimized variant of __split_and_process_bio that leverages the - * fact that targets that use it do _not_ have a need to split bios. - */ -static blk_qc_t __process_bio(struct mapped_device *md, struct dm_table *map, - struct bio *bio, struct dm_target *ti) -{ - struct clone_info ci; - blk_qc_t ret = BLK_QC_T_NONE; - int error = 0; - - init_clone_info(&ci, md, map, bio); - - if (bio->bi_opf & REQ_PREFLUSH) { - struct bio flush_bio; - - /* - * Use an on-stack bio for this, it's safe since we don't - * need to reference it after submit. It's just used as - * the basis for the clone(s). - */ - bio_init(&flush_bio, NULL, 0); - flush_bio.bi_opf = REQ_OP_WRITE | REQ_PREFLUSH | REQ_SYNC; - ci.bio = &flush_bio; - ci.sector_count = 0; - error = __send_empty_flush(&ci); - bio_uninit(ci.bio); - /* dec_pending submits any data associated with flush */ - } else { - struct dm_target_io *tio; - - ci.bio = bio; - ci.sector_count = bio_sectors(bio); - if (__process_abnormal_io(&ci, ti, &error)) - goto out; - - tio = alloc_tio(&ci, ti, 0, GFP_NOIO); - ret = __clone_and_map_simple_bio(&ci, tio, NULL); - } -out: - /* drop the extra reference count */ - dec_pending(ci.io, errno_to_blk_status(error)); - return ret; -} - -static blk_qc_t dm_process_bio(struct mapped_device *md, - struct dm_table *map, struct bio *bio) -{ - blk_qc_t ret = BLK_QC_T_NONE; - struct dm_target *ti = md->immutable_target; - - if (unlikely(!map)) { - bio_io_error(bio); - return ret; - } - - if (!ti) { - ti = dm_table_find_target(map, bio->bi_iter.bi_sector); - if (unlikely(!ti)) { - bio_io_error(bio); - return ret; - } - } - - /* - * If in ->submit_bio we need to use blk_queue_split(), otherwise - * queue_limits for abnormal requests (e.g. discard, writesame, etc) - * won't be imposed. - * If called from dm_wq_work() for deferred bio processing, bio - * was already handled by following code with previous ->submit_bio. - */ - if (current->bio_list) { - if (is_abnormal_io(bio)) - blk_queue_split(&bio); - /* regular IO is split by __split_and_process_bio */ - } - - if (dm_get_md_type(md) == DM_TYPE_NVME_BIO_BASED) - return __process_bio(md, map, bio, ti); - return __split_and_process_bio(md, map, bio); -} - static blk_qc_t dm_submit_bio(struct bio *bio) { struct mapped_device *md = bio->bi_disk->private_data; @@ -1769,35 +1634,34 @@ static blk_qc_t dm_submit_bio(struct bio *bio) int srcu_idx; struct dm_table *map; - if (dm_get_md_type(md) == DM_TYPE_REQUEST_BASED) { - /* - * We are called with a live reference on q_usage_counter, but - * that one will be released as soon as we return. Grab an - * extra one as blk_mq_submit_bio expects to be able to consume - * a reference (which lives until the request is freed in case a - * request is allocated). - */ - percpu_ref_get(&bio->bi_disk->queue->q_usage_counter); - return blk_mq_submit_bio(bio); - } - map = dm_get_live_table(md, &srcu_idx); + if (unlikely(!map)) { + DMERR_LIMIT("%s: mapping table unavailable, erroring io", + dm_device_name(md)); + bio_io_error(bio); + goto out; + } - /* if we're suspended, we have to queue this io for later */ + /* If suspended, queue this IO for later */ if (unlikely(test_bit(DMF_BLOCK_IO_FOR_SUSPEND, &md->flags))) { - dm_put_live_table(md, srcu_idx); - if (bio->bi_opf & REQ_NOWAIT) bio_wouldblock_error(bio); - else if (!(bio->bi_opf & REQ_RAHEAD)) - queue_io(md, bio); - else + else if (bio->bi_opf & REQ_RAHEAD) bio_io_error(bio); - return ret; + else + queue_io(md, bio); + goto out; } - ret = dm_process_bio(md, map, bio); + /* + * Use blk_queue_split() for abnormal IO (e.g. discard, writesame, etc) + * otherwise associated queue_limits won't be imposed. + */ + if (is_abnormal_io(bio)) + blk_queue_split(&bio); + ret = __split_and_process_bio(md, map, bio); +out: dm_put_live_table(md, srcu_idx); return ret; } @@ -1852,6 +1716,7 @@ static int next_free_minor(int *minor) } static const struct block_device_operations dm_blk_dops; +static const struct block_device_operations dm_rq_blk_dops; static const struct dax_operations dm_dax_ops; static void dm_wq_work(struct work_struct *work); @@ -2121,12 +1986,10 @@ static struct dm_table *__bind(struct mapped_device *md, struct dm_table *t, if (request_based) dm_stop_queue(q); - if (request_based || md->type == DM_TYPE_NVME_BIO_BASED) { + if (request_based) { /* - * Leverage the fact that request-based DM targets and - * NVMe bio based targets are immutable singletons - * - used to optimize both dm_request_fn and dm_mq_queue_rq; - * and __process_bio. + * Leverage the fact that request-based DM targets are + * immutable singletons - used to optimize dm_mq_queue_rq. */ md->immutable_target = dm_table_get_immutable_target(t); } @@ -2240,15 +2103,15 @@ int dm_setup_md_queue(struct mapped_device *md, struct dm_table *t) switch (type) { case DM_TYPE_REQUEST_BASED: + md->disk->fops = &dm_rq_blk_dops; r = dm_mq_init_request_queue(md, t); if (r) { - DMERR("Cannot initialize queue for request-based dm-mq mapped device"); + DMERR("Cannot initialize queue for request-based dm mapped device"); return r; } break; case DM_TYPE_BIO_BASED: case DM_TYPE_DAX_BIO_BASED: - case DM_TYPE_NVME_BIO_BASED: break; case DM_TYPE_NONE: WARN_ON_ONCE(true); @@ -2453,29 +2316,19 @@ static int dm_wait_for_completion(struct mapped_device *md, long task_state) */ static void dm_wq_work(struct work_struct *work) { - struct mapped_device *md = container_of(work, struct mapped_device, - work); - struct bio *c; - int srcu_idx; - struct dm_table *map; - - map = dm_get_live_table(md, &srcu_idx); + struct mapped_device *md = container_of(work, struct mapped_device, work); + struct bio *bio; while (!test_bit(DMF_BLOCK_IO_FOR_SUSPEND, &md->flags)) { spin_lock_irq(&md->deferred_lock); - c = bio_list_pop(&md->deferred); + bio = bio_list_pop(&md->deferred); spin_unlock_irq(&md->deferred_lock); - if (!c) + if (!bio) break; - if (dm_request_based(md)) - (void) submit_bio_noacct(c); - else - (void) dm_process_bio(md, map, c); + submit_bio_noacct(bio); } - - dm_put_live_table(md, srcu_idx); } static void dm_queue_flush(struct mapped_device *md) @@ -2612,13 +2465,12 @@ static int __dm_suspend(struct mapped_device *md, struct dm_table *map, /* * Here we must make sure that no processes are submitting requests * to target drivers i.e. no one may be executing - * __split_and_process_bio. This is called from dm_request and - * dm_wq_work. + * __split_and_process_bio from dm_submit_bio. * - * To get all processes out of __split_and_process_bio in dm_request, + * To get all processes out of __split_and_process_bio in dm_submit_bio, * we take the write lock. To prevent any process from reentering - * __split_and_process_bio from dm_request and quiesce the thread - * (dm_wq_work), we set BMF_BLOCK_IO_FOR_SUSPEND and call + * __split_and_process_bio from dm_submit_bio and quiesce the thread + * (dm_wq_work), we set DMF_BLOCK_IO_FOR_SUSPEND and call * flush_workqueue(md->wq). */ set_bit(DMF_BLOCK_IO_FOR_SUSPEND, &md->flags); @@ -2986,19 +2838,19 @@ int dm_test_deferred_remove_flag(struct mapped_device *md) int dm_suspended(struct dm_target *ti) { - return dm_suspended_md(dm_table_get_md(ti->table)); + return dm_suspended_md(ti->table->md); } EXPORT_SYMBOL_GPL(dm_suspended); int dm_post_suspending(struct dm_target *ti) { - return dm_post_suspending_md(dm_table_get_md(ti->table)); + return dm_post_suspending_md(ti->table->md); } EXPORT_SYMBOL_GPL(dm_post_suspending); int dm_noflush_suspending(struct dm_target *ti) { - return __noflush_suspending(dm_table_get_md(ti->table)); + return __noflush_suspending(ti->table->md); } EXPORT_SYMBOL_GPL(dm_noflush_suspending); @@ -3017,7 +2869,6 @@ struct dm_md_mempools *dm_alloc_md_mempools(struct mapped_device *md, enum dm_qu switch (type) { case DM_TYPE_BIO_BASED: case DM_TYPE_DAX_BIO_BASED: - case DM_TYPE_NVME_BIO_BASED: pool_size = max(dm_get_reserved_bio_based_ios(), min_pool_size); front_pad = roundup(per_io_data_size, __alignof__(struct dm_target_io)) + offsetof(struct dm_target_io, clone); io_front_pad = roundup(front_pad, __alignof__(struct dm_io)) + offsetof(struct dm_io, tio); @@ -3235,6 +3086,15 @@ static const struct block_device_operations dm_blk_dops = { .owner = THIS_MODULE }; +static const struct block_device_operations dm_rq_blk_dops = { + .open = dm_blk_open, + .release = dm_blk_close, + .ioctl = dm_blk_ioctl, + .getgeo = dm_blk_getgeo, + .pr_ops = &dm_pr_ops, + .owner = THIS_MODULE +}; + static const struct dax_operations dm_dax_ops = { .direct_access = dm_dax_direct_access, .dax_supported = dm_dax_supported, |