diff options
Diffstat (limited to 'fs/xfs/xfs_buf.c')
-rw-r--r-- | fs/xfs/xfs_buf.c | 236 |
1 files changed, 166 insertions, 70 deletions
diff --git a/fs/xfs/xfs_buf.c b/fs/xfs/xfs_buf.c index e71cfbd5acb3..32fc5401a756 100644 --- a/fs/xfs/xfs_buf.c +++ b/fs/xfs/xfs_buf.c @@ -80,6 +80,47 @@ xfs_buf_vmap_len( } /* + * Bump the I/O in flight count on the buftarg if we haven't yet done so for + * this buffer. The count is incremented once per buffer (per hold cycle) + * because the corresponding decrement is deferred to buffer release. Buffers + * can undergo I/O multiple times in a hold-release cycle and per buffer I/O + * tracking adds unnecessary overhead. This is used for sychronization purposes + * with unmount (see xfs_wait_buftarg()), so all we really need is a count of + * in-flight buffers. + * + * Buffers that are never released (e.g., superblock, iclog buffers) must set + * the XBF_NO_IOACCT flag before I/O submission. Otherwise, the buftarg count + * never reaches zero and unmount hangs indefinitely. + */ +static inline void +xfs_buf_ioacct_inc( + struct xfs_buf *bp) +{ + if (bp->b_flags & (XBF_NO_IOACCT|_XBF_IN_FLIGHT)) + return; + + ASSERT(bp->b_flags & XBF_ASYNC); + bp->b_flags |= _XBF_IN_FLIGHT; + percpu_counter_inc(&bp->b_target->bt_io_count); +} + +/* + * Clear the in-flight state on a buffer about to be released to the LRU or + * freed and unaccount from the buftarg. + */ +static inline void +xfs_buf_ioacct_dec( + struct xfs_buf *bp) +{ + if (!(bp->b_flags & _XBF_IN_FLIGHT)) + return; + + ASSERT(bp->b_flags & XBF_ASYNC); + bp->b_flags &= ~_XBF_IN_FLIGHT; + percpu_counter_dec(&bp->b_target->bt_io_count); +} + +/* * When we mark a buffer stale, we remove the buffer from the LRU and clear the * b_lru_ref count so that the buffer is freed immediately when the buffer * reference count falls to zero. If the buffer is already on the LRU, we need @@ -102,6 +143,14 @@ xfs_buf_stale( */ bp->b_flags &= ~_XBF_DELWRI_Q; + /* + * Once the buffer is marked stale and unlocked, a subsequent lookup + * could reset b_flags. There is no guarantee that the buffer is + * unaccounted (released to LRU) before that occurs. Drop in-flight + * status now to preserve accounting consistency. + */ + xfs_buf_ioacct_dec(bp); + spin_lock(&bp->b_lock); atomic_set(&bp->b_lru_ref, 0); if (!(bp->b_state & XFS_BSTATE_DISPOSE) && @@ -815,7 +864,8 @@ xfs_buf_get_uncached( struct xfs_buf *bp; DEFINE_SINGLE_BUF_MAP(map, XFS_BUF_DADDR_NULL, numblks); - bp = _xfs_buf_alloc(target, &map, 1, 0); + /* flags might contain irrelevant bits, pass only what we care about */ + bp = _xfs_buf_alloc(target, &map, 1, flags & XBF_NO_IOACCT); if (unlikely(bp == NULL)) goto fail; @@ -866,63 +916,85 @@ xfs_buf_hold( } /* - * Releases a hold on the specified buffer. If the - * the hold count is 1, calls xfs_buf_free. + * Release a hold on the specified buffer. If the hold count is 1, the buffer is + * placed on LRU or freed (depending on b_lru_ref). */ void xfs_buf_rele( xfs_buf_t *bp) { struct xfs_perag *pag = bp->b_pag; + bool release; + bool freebuf = false; trace_xfs_buf_rele(bp, _RET_IP_); if (!pag) { ASSERT(list_empty(&bp->b_lru)); ASSERT(RB_EMPTY_NODE(&bp->b_rbnode)); - if (atomic_dec_and_test(&bp->b_hold)) + if (atomic_dec_and_test(&bp->b_hold)) { + xfs_buf_ioacct_dec(bp); xfs_buf_free(bp); + } return; } ASSERT(!RB_EMPTY_NODE(&bp->b_rbnode)); ASSERT(atomic_read(&bp->b_hold) > 0); - if (atomic_dec_and_lock(&bp->b_hold, &pag->pag_buf_lock)) { - spin_lock(&bp->b_lock); - if (!(bp->b_flags & XBF_STALE) && atomic_read(&bp->b_lru_ref)) { - /* - * If the buffer is added to the LRU take a new - * reference to the buffer for the LRU and clear the - * (now stale) dispose list state flag - */ - if (list_lru_add(&bp->b_target->bt_lru, &bp->b_lru)) { - bp->b_state &= ~XFS_BSTATE_DISPOSE; - atomic_inc(&bp->b_hold); - } - spin_unlock(&bp->b_lock); - spin_unlock(&pag->pag_buf_lock); - } else { - /* - * most of the time buffers will already be removed from - * the LRU, so optimise that case by checking for the - * XFS_BSTATE_DISPOSE flag indicating the last list the - * buffer was on was the disposal list - */ - if (!(bp->b_state & XFS_BSTATE_DISPOSE)) { - list_lru_del(&bp->b_target->bt_lru, &bp->b_lru); - } else { - ASSERT(list_empty(&bp->b_lru)); - } - spin_unlock(&bp->b_lock); - ASSERT(!(bp->b_flags & _XBF_DELWRI_Q)); - rb_erase(&bp->b_rbnode, &pag->pag_buf_tree); - spin_unlock(&pag->pag_buf_lock); - xfs_perag_put(pag); - xfs_buf_free(bp); + release = atomic_dec_and_lock(&bp->b_hold, &pag->pag_buf_lock); + spin_lock(&bp->b_lock); + if (!release) { + /* + * Drop the in-flight state if the buffer is already on the LRU + * and it holds the only reference. This is racy because we + * haven't acquired the pag lock, but the use of _XBF_IN_FLIGHT + * ensures the decrement occurs only once per-buf. + */ + if ((atomic_read(&bp->b_hold) == 1) && !list_empty(&bp->b_lru)) + xfs_buf_ioacct_dec(bp); + goto out_unlock; + } + + /* the last reference has been dropped ... */ + xfs_buf_ioacct_dec(bp); + if (!(bp->b_flags & XBF_STALE) && atomic_read(&bp->b_lru_ref)) { + /* + * If the buffer is added to the LRU take a new reference to the + * buffer for the LRU and clear the (now stale) dispose list + * state flag + */ + if (list_lru_add(&bp->b_target->bt_lru, &bp->b_lru)) { + bp->b_state &= ~XFS_BSTATE_DISPOSE; + atomic_inc(&bp->b_hold); + } + spin_unlock(&pag->pag_buf_lock); + } else { + /* + * most of the time buffers will already be removed from the + * LRU, so optimise that case by checking for the + * XFS_BSTATE_DISPOSE flag indicating the last list the buffer + * was on was the disposal list + */ + if (!(bp->b_state & XFS_BSTATE_DISPOSE)) { + list_lru_del(&bp->b_target->bt_lru, &bp->b_lru); + } else { + ASSERT(list_empty(&bp->b_lru)); } + + ASSERT(!(bp->b_flags & _XBF_DELWRI_Q)); + rb_erase(&bp->b_rbnode, &pag->pag_buf_tree); + spin_unlock(&pag->pag_buf_lock); + xfs_perag_put(pag); + freebuf = true; } + +out_unlock: + spin_unlock(&bp->b_lock); + + if (freebuf) + xfs_buf_free(bp); } @@ -944,10 +1016,12 @@ xfs_buf_trylock( int locked; locked = down_trylock(&bp->b_sema) == 0; - if (locked) + if (locked) { XB_SET_OWNER(bp); - - trace_xfs_buf_trylock(bp, _RET_IP_); + trace_xfs_buf_trylock(bp, _RET_IP_); + } else { + trace_xfs_buf_trylock_fail(bp, _RET_IP_); + } return locked; } @@ -1339,6 +1413,7 @@ xfs_buf_submit( * xfs_buf_ioend too early. */ atomic_set(&bp->b_io_remaining, 1); + xfs_buf_ioacct_inc(bp); _xfs_buf_ioapply(bp); /* @@ -1524,13 +1599,19 @@ xfs_wait_buftarg( int loop = 0; /* - * We need to flush the buffer workqueue to ensure that all IO - * completion processing is 100% done. Just waiting on buffer locks is - * not sufficient for async IO as the reference count held over IO is - * not released until after the buffer lock is dropped. Hence we need to - * ensure here that all reference counts have been dropped before we - * start walking the LRU list. + * First wait on the buftarg I/O count for all in-flight buffers to be + * released. This is critical as new buffers do not make the LRU until + * they are released. + * + * Next, flush the buffer workqueue to ensure all completion processing + * has finished. Just waiting on buffer locks is not sufficient for + * async IO as the reference count held over IO is not released until + * after the buffer lock is dropped. Hence we need to ensure here that + * all reference counts have been dropped before we start walking the + * LRU list. */ + while (percpu_counter_sum(&btp->bt_io_count)) + delay(100); drain_workqueue(btp->bt_mount->m_buf_workqueue); /* loop until there is nothing left on the lru list. */ @@ -1627,6 +1708,8 @@ xfs_free_buftarg( struct xfs_buftarg *btp) { unregister_shrinker(&btp->bt_shrinker); + ASSERT(percpu_counter_sum(&btp->bt_io_count) == 0); + percpu_counter_destroy(&btp->bt_io_count); list_lru_destroy(&btp->bt_lru); if (mp->m_flags & XFS_MOUNT_BARRIER) @@ -1691,6 +1774,9 @@ xfs_alloc_buftarg( if (list_lru_init(&btp->bt_lru)) goto error; + if (percpu_counter_init(&btp->bt_io_count, 0, GFP_KERNEL)) + goto error; + btp->bt_shrinker.count_objects = xfs_buftarg_shrink_count; btp->bt_shrinker.scan_objects = xfs_buftarg_shrink_scan; btp->bt_shrinker.seeks = DEFAULT_SEEKS; @@ -1774,18 +1860,33 @@ xfs_buf_cmp( return 0; } +/* + * submit buffers for write. + * + * When we have a large buffer list, we do not want to hold all the buffers + * locked while we block on the request queue waiting for IO dispatch. To avoid + * this problem, we lock and submit buffers in groups of 50, thereby minimising + * the lock hold times for lists which may contain thousands of objects. + * + * To do this, we sort the buffer list before we walk the list to lock and + * submit buffers, and we plug and unplug around each group of buffers we + * submit. + */ static int -__xfs_buf_delwri_submit( +xfs_buf_delwri_submit_buffers( struct list_head *buffer_list, - struct list_head *io_list, - bool wait) + struct list_head *wait_list) { - struct blk_plug plug; struct xfs_buf *bp, *n; + LIST_HEAD (submit_list); int pinned = 0; + struct blk_plug plug; + list_sort(NULL, buffer_list, xfs_buf_cmp); + + blk_start_plug(&plug); list_for_each_entry_safe(bp, n, buffer_list, b_list) { - if (!wait) { + if (!wait_list) { if (xfs_buf_ispinned(bp)) { pinned++; continue; @@ -1808,25 +1909,21 @@ __xfs_buf_delwri_submit( continue; } - list_move_tail(&bp->b_list, io_list); trace_xfs_buf_delwri_split(bp, _RET_IP_); - } - - list_sort(NULL, io_list, xfs_buf_cmp); - - blk_start_plug(&plug); - list_for_each_entry_safe(bp, n, io_list, b_list) { - bp->b_flags &= ~(_XBF_DELWRI_Q | XBF_ASYNC | XBF_WRITE_FAIL); - bp->b_flags |= XBF_WRITE | XBF_ASYNC; /* - * we do all Io submission async. This means if we need to wait - * for IO completion we need to take an extra reference so the - * buffer is still valid on the other side. + * We do all IO submission async. This means if we need + * to wait for IO completion we need to take an extra + * reference so the buffer is still valid on the other + * side. We need to move the buffer onto the io_list + * at this point so the caller can still access it. */ - if (wait) + bp->b_flags &= ~(_XBF_DELWRI_Q | XBF_WRITE_FAIL); + bp->b_flags |= XBF_WRITE | XBF_ASYNC; + if (wait_list) { xfs_buf_hold(bp); - else + list_move_tail(&bp->b_list, wait_list); + } else list_del_init(&bp->b_list); xfs_buf_submit(bp); @@ -1849,8 +1946,7 @@ int xfs_buf_delwri_submit_nowait( struct list_head *buffer_list) { - LIST_HEAD (io_list); - return __xfs_buf_delwri_submit(buffer_list, &io_list, false); + return xfs_buf_delwri_submit_buffers(buffer_list, NULL); } /* @@ -1865,15 +1961,15 @@ int xfs_buf_delwri_submit( struct list_head *buffer_list) { - LIST_HEAD (io_list); + LIST_HEAD (wait_list); int error = 0, error2; struct xfs_buf *bp; - __xfs_buf_delwri_submit(buffer_list, &io_list, true); + xfs_buf_delwri_submit_buffers(buffer_list, &wait_list); /* Wait for IO to complete. */ - while (!list_empty(&io_list)) { - bp = list_first_entry(&io_list, struct xfs_buf, b_list); + while (!list_empty(&wait_list)) { + bp = list_first_entry(&wait_list, struct xfs_buf, b_list); list_del_init(&bp->b_list); |