summaryrefslogtreecommitdiffstats
path: root/fs/bcachefs/journal.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/bcachefs/journal.c')
-rw-r--r--fs/bcachefs/journal.c1140
1 files changed, 1140 insertions, 0 deletions
diff --git a/fs/bcachefs/journal.c b/fs/bcachefs/journal.c
new file mode 100644
index 000000000000..697f601c2cdf
--- /dev/null
+++ b/fs/bcachefs/journal.c
@@ -0,0 +1,1140 @@
+// SPDX-License-Identifier: GPL-2.0
+/*
+ * bcachefs journalling code, for btree insertions
+ *
+ * Copyright 2012 Google, Inc.
+ */
+
+#include "bcachefs.h"
+#include "alloc.h"
+#include "bkey_methods.h"
+#include "btree_gc.h"
+#include "buckets.h"
+#include "journal.h"
+#include "journal_io.h"
+#include "journal_reclaim.h"
+#include "journal_seq_blacklist.h"
+#include "super-io.h"
+#include "trace.h"
+
+static bool journal_entry_is_open(struct journal *j)
+{
+ return j->reservations.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL;
+}
+
+void bch2_journal_buf_put_slowpath(struct journal *j, bool need_write_just_set)
+{
+ struct journal_buf *w = journal_prev_buf(j);
+
+ atomic_dec_bug(&journal_seq_pin(j, le64_to_cpu(w->data->seq))->count);
+
+ if (!need_write_just_set &&
+ test_bit(JOURNAL_NEED_WRITE, &j->flags))
+ bch2_time_stats_update(j->delay_time,
+ j->need_write_time);
+#if 0
+ closure_call(&j->io, bch2_journal_write, NULL, NULL);
+#else
+ /* Shut sparse up: */
+ closure_init(&j->io, NULL);
+ set_closure_fn(&j->io, bch2_journal_write, NULL);
+ bch2_journal_write(&j->io);
+#endif
+}
+
+static void journal_pin_new_entry(struct journal *j, int count)
+{
+ struct journal_entry_pin_list *p;
+
+ /*
+ * The fifo_push() needs to happen at the same time as j->seq is
+ * incremented for journal_last_seq() to be calculated correctly
+ */
+ atomic64_inc(&j->seq);
+ p = fifo_push_ref(&j->pin);
+
+ INIT_LIST_HEAD(&p->list);
+ INIT_LIST_HEAD(&p->flushed);
+ atomic_set(&p->count, count);
+ p->devs.nr = 0;
+}
+
+static void bch2_journal_buf_init(struct journal *j)
+{
+ struct journal_buf *buf = journal_cur_buf(j);
+
+ memset(buf->has_inode, 0, sizeof(buf->has_inode));
+
+ memset(buf->data, 0, sizeof(*buf->data));
+ buf->data->seq = cpu_to_le64(journal_cur_seq(j));
+ buf->data->u64s = 0;
+}
+
+static inline size_t journal_entry_u64s_reserve(struct journal_buf *buf)
+{
+ return BTREE_ID_NR * (JSET_KEYS_U64s + BKEY_EXTENT_U64s_MAX);
+}
+
+static inline bool journal_entry_empty(struct jset *j)
+{
+ struct jset_entry *i;
+
+ if (j->seq != j->last_seq)
+ return false;
+
+ vstruct_for_each(j, i)
+ if (i->type || i->u64s)
+ return false;
+ return true;
+}
+
+static enum {
+ JOURNAL_ENTRY_ERROR,
+ JOURNAL_ENTRY_INUSE,
+ JOURNAL_ENTRY_CLOSED,
+ JOURNAL_UNLOCKED,
+} journal_buf_switch(struct journal *j, bool need_write_just_set)
+{
+ struct bch_fs *c = container_of(j, struct bch_fs, journal);
+ struct journal_buf *buf;
+ union journal_res_state old, new;
+ u64 v = atomic64_read(&j->reservations.counter);
+
+ lockdep_assert_held(&j->lock);
+
+ do {
+ old.v = new.v = v;
+ if (old.cur_entry_offset == JOURNAL_ENTRY_CLOSED_VAL)
+ return JOURNAL_ENTRY_CLOSED;
+
+ if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
+ return JOURNAL_ENTRY_ERROR;
+
+ if (new.prev_buf_unwritten)
+ return JOURNAL_ENTRY_INUSE;
+
+ /*
+ * avoid race between setting buf->data->u64s and
+ * journal_res_put starting write:
+ */
+ journal_state_inc(&new);
+
+ new.cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL;
+ new.idx++;
+ new.prev_buf_unwritten = 1;
+
+ BUG_ON(journal_state_count(new, new.idx));
+ } while ((v = atomic64_cmpxchg(&j->reservations.counter,
+ old.v, new.v)) != old.v);
+
+ clear_bit(JOURNAL_NEED_WRITE, &j->flags);
+
+ buf = &j->buf[old.idx];
+ buf->data->u64s = cpu_to_le32(old.cur_entry_offset);
+
+ j->prev_buf_sectors =
+ vstruct_blocks_plus(buf->data, c->block_bits,
+ journal_entry_u64s_reserve(buf)) *
+ c->opts.block_size;
+ BUG_ON(j->prev_buf_sectors > j->cur_buf_sectors);
+
+ bch2_journal_reclaim_fast(j);
+ /* XXX: why set this here, and not in bch2_journal_write()? */
+ buf->data->last_seq = cpu_to_le64(journal_last_seq(j));
+
+ if (journal_entry_empty(buf->data))
+ clear_bit(JOURNAL_NOT_EMPTY, &j->flags);
+ else
+ set_bit(JOURNAL_NOT_EMPTY, &j->flags);
+
+ journal_pin_new_entry(j, 1);
+
+ bch2_journal_buf_init(j);
+
+ cancel_delayed_work(&j->write_work);
+ spin_unlock(&j->lock);
+
+ if (c->bucket_journal_seq > 1 << 14) {
+ c->bucket_journal_seq = 0;
+ bch2_bucket_seq_cleanup(c);
+ }
+
+ c->bucket_journal_seq++;
+
+ /* ugh - might be called from __journal_res_get() under wait_event() */
+ __set_current_state(TASK_RUNNING);
+ bch2_journal_buf_put(j, old.idx, need_write_just_set);
+
+ return JOURNAL_UNLOCKED;
+}
+
+void bch2_journal_halt(struct journal *j)
+{
+ union journal_res_state old, new;
+ u64 v = atomic64_read(&j->reservations.counter);
+
+ do {
+ old.v = new.v = v;
+ if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
+ return;
+
+ new.cur_entry_offset = JOURNAL_ENTRY_ERROR_VAL;
+ } while ((v = atomic64_cmpxchg(&j->reservations.counter,
+ old.v, new.v)) != old.v);
+
+ journal_wake(j);
+ closure_wake_up(&journal_cur_buf(j)->wait);
+ closure_wake_up(&journal_prev_buf(j)->wait);
+}
+
+/*
+ * should _only_ called from journal_res_get() - when we actually want a
+ * journal reservation - journal entry is open means journal is dirty:
+ *
+ * returns:
+ * 1: success
+ * 0: journal currently full (must wait)
+ * -EROFS: insufficient rw devices
+ * -EIO: journal error
+ */
+static int journal_entry_open(struct journal *j)
+{
+ struct journal_buf *buf = journal_cur_buf(j);
+ union journal_res_state old, new;
+ ssize_t u64s;
+ int sectors;
+ u64 v;
+
+ lockdep_assert_held(&j->lock);
+ BUG_ON(journal_entry_is_open(j));
+
+ if (!fifo_free(&j->pin))
+ return 0;
+
+ sectors = bch2_journal_entry_sectors(j);
+ if (sectors <= 0)
+ return sectors;
+
+ buf->disk_sectors = sectors;
+
+ sectors = min_t(unsigned, sectors, buf->size >> 9);
+ j->cur_buf_sectors = sectors;
+
+ u64s = (sectors << 9) / sizeof(u64);
+
+ /* Subtract the journal header */
+ u64s -= sizeof(struct jset) / sizeof(u64);
+ /*
+ * Btree roots, prio pointers don't get added until right before we do
+ * the write:
+ */
+ u64s -= journal_entry_u64s_reserve(buf);
+ u64s = max_t(ssize_t, 0L, u64s);
+
+ BUG_ON(u64s >= JOURNAL_ENTRY_CLOSED_VAL);
+
+ if (u64s <= le32_to_cpu(buf->data->u64s))
+ return 0;
+
+ /*
+ * Must be set before marking the journal entry as open:
+ */
+ j->cur_entry_u64s = u64s;
+
+ v = atomic64_read(&j->reservations.counter);
+ do {
+ old.v = new.v = v;
+
+ if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL)
+ return -EIO;
+
+ /* Handle any already added entries */
+ new.cur_entry_offset = le32_to_cpu(buf->data->u64s);
+ } while ((v = atomic64_cmpxchg(&j->reservations.counter,
+ old.v, new.v)) != old.v);
+
+ if (j->res_get_blocked_start)
+ bch2_time_stats_update(j->blocked_time,
+ j->res_get_blocked_start);
+ j->res_get_blocked_start = 0;
+
+ mod_delayed_work(system_freezable_wq,
+ &j->write_work,
+ msecs_to_jiffies(j->write_delay_ms));
+ journal_wake(j);
+ return 1;
+}
+
+/*
+ * returns true if there's nothing to flush and no journal write still in flight
+ */
+static bool journal_flush_write(struct journal *j)
+{
+ bool ret;
+
+ spin_lock(&j->lock);
+ ret = !j->reservations.prev_buf_unwritten;
+
+ if (!journal_entry_is_open(j)) {
+ spin_unlock(&j->lock);
+ return ret;
+ }
+
+ set_bit(JOURNAL_NEED_WRITE, &j->flags);
+ if (journal_buf_switch(j, false) == JOURNAL_UNLOCKED)
+ ret = false;
+ else
+ spin_unlock(&j->lock);
+ return ret;
+}
+
+static void journal_write_work(struct work_struct *work)
+{
+ struct journal *j = container_of(work, struct journal, write_work.work);
+
+ journal_flush_write(j);
+}
+
+/*
+ * Given an inode number, if that inode number has data in the journal that
+ * hasn't yet been flushed, return the journal sequence number that needs to be
+ * flushed:
+ */
+u64 bch2_inode_journal_seq(struct journal *j, u64 inode)
+{
+ size_t h = hash_64(inode, ilog2(sizeof(j->buf[0].has_inode) * 8));
+ u64 seq = 0;
+
+ if (!test_bit(h, j->buf[0].has_inode) &&
+ !test_bit(h, j->buf[1].has_inode))
+ return 0;
+
+ spin_lock(&j->lock);
+ if (test_bit(h, journal_cur_buf(j)->has_inode))
+ seq = journal_cur_seq(j);
+ else if (test_bit(h, journal_prev_buf(j)->has_inode))
+ seq = journal_cur_seq(j) - 1;
+ spin_unlock(&j->lock);
+
+ return seq;
+}
+
+static int __journal_res_get(struct journal *j, struct journal_res *res,
+ unsigned u64s_min, unsigned u64s_max)
+{
+ struct bch_fs *c = container_of(j, struct bch_fs, journal);
+ struct journal_buf *buf;
+ int ret;
+retry:
+ ret = journal_res_get_fast(j, res, u64s_min, u64s_max);
+ if (ret)
+ return ret;
+
+ spin_lock(&j->lock);
+ /*
+ * Recheck after taking the lock, so we don't race with another thread
+ * that just did journal_entry_open() and call journal_entry_close()
+ * unnecessarily
+ */
+ ret = journal_res_get_fast(j, res, u64s_min, u64s_max);
+ if (ret) {
+ spin_unlock(&j->lock);
+ return 1;
+ }
+
+ /*
+ * If we couldn't get a reservation because the current buf filled up,
+ * and we had room for a bigger entry on disk, signal that we want to
+ * realloc the journal bufs:
+ */
+ buf = journal_cur_buf(j);
+ if (journal_entry_is_open(j) &&
+ buf->size >> 9 < buf->disk_sectors &&
+ buf->size < JOURNAL_ENTRY_SIZE_MAX)
+ j->buf_size_want = max(j->buf_size_want, buf->size << 1);
+
+ /*
+ * Close the current journal entry if necessary, then try to start a new
+ * one:
+ */
+ switch (journal_buf_switch(j, false)) {
+ case JOURNAL_ENTRY_ERROR:
+ spin_unlock(&j->lock);
+ return -EROFS;
+ case JOURNAL_ENTRY_INUSE:
+ /* haven't finished writing out the previous one: */
+ spin_unlock(&j->lock);
+ trace_journal_entry_full(c);
+ goto blocked;
+ case JOURNAL_ENTRY_CLOSED:
+ break;
+ case JOURNAL_UNLOCKED:
+ goto retry;
+ }
+
+ /* We now have a new, closed journal buf - see if we can open it: */
+ ret = journal_entry_open(j);
+ spin_unlock(&j->lock);
+
+ if (ret < 0)
+ return ret;
+ if (ret)
+ goto retry;
+
+ /* Journal's full, we have to wait */
+
+ /*
+ * Direct reclaim - can't rely on reclaim from work item
+ * due to freezing..
+ */
+ bch2_journal_reclaim_work(&j->reclaim_work.work);
+
+ trace_journal_full(c);
+blocked:
+ if (!j->res_get_blocked_start)
+ j->res_get_blocked_start = local_clock() ?: 1;
+ return 0;
+}
+
+/*
+ * Essentially the entry function to the journaling code. When bcachefs is doing
+ * a btree insert, it calls this function to get the current journal write.
+ * Journal write is the structure used set up journal writes. The calling
+ * function will then add its keys to the structure, queuing them for the next
+ * write.
+ *
+ * To ensure forward progress, the current task must not be holding any
+ * btree node write locks.
+ */
+int bch2_journal_res_get_slowpath(struct journal *j, struct journal_res *res,
+ unsigned u64s_min, unsigned u64s_max)
+{
+ int ret;
+
+ wait_event(j->wait,
+ (ret = __journal_res_get(j, res, u64s_min,
+ u64s_max)));
+ return ret < 0 ? ret : 0;
+}
+
+u64 bch2_journal_last_unwritten_seq(struct journal *j)
+{
+ u64 seq;
+
+ spin_lock(&j->lock);
+ seq = journal_cur_seq(j);
+ if (j->reservations.prev_buf_unwritten)
+ seq--;
+ spin_unlock(&j->lock);
+
+ return seq;
+}
+
+/**
+ * bch2_journal_open_seq_async - try to open a new journal entry if @seq isn't
+ * open yet, or wait if we cannot
+ *
+ * used by the btree interior update machinery, when it needs to write a new
+ * btree root - every journal entry contains the roots of all the btrees, so it
+ * doesn't need to bother with getting a journal reservation
+ */
+int bch2_journal_open_seq_async(struct journal *j, u64 seq, struct closure *parent)
+{
+ int ret;
+
+ spin_lock(&j->lock);
+ BUG_ON(seq > journal_cur_seq(j));
+
+ if (seq < journal_cur_seq(j) ||
+ journal_entry_is_open(j)) {
+ spin_unlock(&j->lock);
+ return 1;
+ }
+
+ ret = journal_entry_open(j);
+ if (!ret)
+ closure_wait(&j->async_wait, parent);
+ spin_unlock(&j->lock);
+
+ if (!ret)
+ bch2_journal_reclaim_work(&j->reclaim_work.work);
+
+ return ret;
+}
+
+/**
+ * bch2_journal_wait_on_seq - wait for a journal entry to be written
+ *
+ * does _not_ cause @seq to be written immediately - if there is no other
+ * activity to cause the relevant journal entry to be filled up or flushed it
+ * can wait for an arbitrary amount of time (up to @j->write_delay_ms, which is
+ * configurable).
+ */
+void bch2_journal_wait_on_seq(struct journal *j, u64 seq, struct closure *parent)
+{
+ spin_lock(&j->lock);
+
+ BUG_ON(seq > journal_cur_seq(j));
+
+ if (bch2_journal_error(j)) {
+ spin_unlock(&j->lock);
+ return;
+ }
+
+ if (seq == journal_cur_seq(j)) {
+ if (!closure_wait(&journal_cur_buf(j)->wait, parent))
+ BUG();
+ } else if (seq + 1 == journal_cur_seq(j) &&
+ j->reservations.prev_buf_unwritten) {
+ if (!closure_wait(&journal_prev_buf(j)->wait, parent))
+ BUG();
+
+ smp_mb();
+
+ /* check if raced with write completion (or failure) */
+ if (!j->reservations.prev_buf_unwritten ||
+ bch2_journal_error(j))
+ closure_wake_up(&journal_prev_buf(j)->wait);
+ }
+
+ spin_unlock(&j->lock);
+}
+
+/**
+ * bch2_journal_flush_seq_async - wait for a journal entry to be written
+ *
+ * like bch2_journal_wait_on_seq, except that it triggers a write immediately if
+ * necessary
+ */
+void bch2_journal_flush_seq_async(struct journal *j, u64 seq, struct closure *parent)
+{
+ struct journal_buf *buf;
+
+ spin_lock(&j->lock);
+
+ BUG_ON(seq > journal_cur_seq(j));
+
+ if (bch2_journal_error(j)) {
+ spin_unlock(&j->lock);
+ return;
+ }
+
+ if (seq == journal_cur_seq(j)) {
+ bool set_need_write = false;
+
+ buf = journal_cur_buf(j);
+
+ if (parent && !closure_wait(&buf->wait, parent))
+ BUG();
+
+ if (!test_and_set_bit(JOURNAL_NEED_WRITE, &j->flags)) {
+ j->need_write_time = local_clock();
+ set_need_write = true;
+ }
+
+ switch (journal_buf_switch(j, set_need_write)) {
+ case JOURNAL_ENTRY_ERROR:
+ if (parent)
+ closure_wake_up(&buf->wait);
+ break;
+ case JOURNAL_ENTRY_CLOSED:
+ /*
+ * Journal entry hasn't been opened yet, but caller
+ * claims it has something
+ */
+ BUG();
+ case JOURNAL_ENTRY_INUSE:
+ break;
+ case JOURNAL_UNLOCKED:
+ return;
+ }
+ } else if (parent &&
+ seq + 1 == journal_cur_seq(j) &&
+ j->reservations.prev_buf_unwritten) {
+ buf = journal_prev_buf(j);
+
+ if (!closure_wait(&buf->wait, parent))
+ BUG();
+
+ smp_mb();
+
+ /* check if raced with write completion (or failure) */
+ if (!j->reservations.prev_buf_unwritten ||
+ bch2_journal_error(j))
+ closure_wake_up(&buf->wait);
+ }
+
+ spin_unlock(&j->lock);
+}
+
+static int journal_seq_flushed(struct journal *j, u64 seq)
+{
+ struct journal_buf *buf;
+ int ret = 1;
+
+ spin_lock(&j->lock);
+ BUG_ON(seq > journal_cur_seq(j));
+
+ if (seq == journal_cur_seq(j)) {
+ bool set_need_write = false;
+
+ ret = 0;
+
+ buf = journal_cur_buf(j);
+
+ if (!test_and_set_bit(JOURNAL_NEED_WRITE, &j->flags)) {
+ j->need_write_time = local_clock();
+ set_need_write = true;
+ }
+
+ switch (journal_buf_switch(j, set_need_write)) {
+ case JOURNAL_ENTRY_ERROR:
+ ret = -EIO;
+ break;
+ case JOURNAL_ENTRY_CLOSED:
+ /*
+ * Journal entry hasn't been opened yet, but caller
+ * claims it has something
+ */
+ BUG();
+ case JOURNAL_ENTRY_INUSE:
+ break;
+ case JOURNAL_UNLOCKED:
+ return 0;
+ }
+ } else if (seq + 1 == journal_cur_seq(j) &&
+ j->reservations.prev_buf_unwritten) {
+ ret = bch2_journal_error(j);
+ }
+
+ spin_unlock(&j->lock);
+
+ return ret;
+}
+
+int bch2_journal_flush_seq(struct journal *j, u64 seq)
+{
+ u64 start_time = local_clock();
+ int ret, ret2;
+
+ ret = wait_event_killable(j->wait, (ret2 = journal_seq_flushed(j, seq)));
+
+ bch2_time_stats_update(j->flush_seq_time, start_time);
+
+ return ret ?: ret2 < 0 ? ret2 : 0;
+}
+
+/**
+ * bch2_journal_meta_async - force a journal entry to be written
+ */
+void bch2_journal_meta_async(struct journal *j, struct closure *parent)
+{
+ struct journal_res res;
+ unsigned u64s = jset_u64s(0);
+
+ memset(&res, 0, sizeof(res));
+
+ bch2_journal_res_get(j, &res, u64s, u64s);
+ bch2_journal_res_put(j, &res);
+
+ bch2_journal_flush_seq_async(j, res.seq, parent);
+}
+
+int bch2_journal_meta(struct journal *j)
+{
+ struct journal_res res;
+ unsigned u64s = jset_u64s(0);
+ int ret;
+
+ memset(&res, 0, sizeof(res));
+
+ ret = bch2_journal_res_get(j, &res, u64s, u64s);
+ if (ret)
+ return ret;
+
+ bch2_journal_res_put(j, &res);
+
+ return bch2_journal_flush_seq(j, res.seq);
+}
+
+/*
+ * bch2_journal_flush_async - if there is an open journal entry, or a journal
+ * still being written, write it and wait for the write to complete
+ */
+void bch2_journal_flush_async(struct journal *j, struct closure *parent)
+{
+ u64 seq, journal_seq;
+
+ spin_lock(&j->lock);
+ journal_seq = journal_cur_seq(j);
+
+ if (journal_entry_is_open(j)) {
+ seq = journal_seq;
+ } else if (journal_seq) {
+ seq = journal_seq - 1;
+ } else {
+ spin_unlock(&j->lock);
+ return;
+ }
+ spin_unlock(&j->lock);
+
+ bch2_journal_flush_seq_async(j, seq, parent);
+}
+
+int bch2_journal_flush(struct journal *j)
+{
+ u64 seq, journal_seq;
+
+ spin_lock(&j->lock);
+ journal_seq = journal_cur_seq(j);
+
+ if (journal_entry_is_open(j)) {
+ seq = journal_seq;
+ } else if (journal_seq) {
+ seq = journal_seq - 1;
+ } else {
+ spin_unlock(&j->lock);
+ return 0;
+ }
+ spin_unlock(&j->lock);
+
+ return bch2_journal_flush_seq(j, seq);
+}
+
+/* allocate journal on a device: */
+
+static int __bch2_set_nr_journal_buckets(struct bch_dev *ca, unsigned nr,
+ bool new_fs, struct closure *cl)
+{
+ struct bch_fs *c = ca->fs;
+ struct journal_device *ja = &ca->journal;
+ struct bch_sb_field_journal *journal_buckets;
+ u64 *new_bucket_seq = NULL, *new_buckets = NULL;
+ int ret = 0;
+
+ /* don't handle reducing nr of buckets yet: */
+ if (nr <= ja->nr)
+ return 0;
+
+ ret = -ENOMEM;
+ new_buckets = kzalloc(nr * sizeof(u64), GFP_KERNEL);
+ new_bucket_seq = kzalloc(nr * sizeof(u64), GFP_KERNEL);
+ if (!new_buckets || !new_bucket_seq)
+ goto err;
+
+ journal_buckets = bch2_sb_resize_journal(&ca->disk_sb,
+ nr + sizeof(*journal_buckets) / sizeof(u64));
+ if (!journal_buckets)
+ goto err;
+
+ if (c)
+ spin_lock(&c->journal.lock);
+
+ memcpy(new_buckets, ja->buckets, ja->nr * sizeof(u64));
+ memcpy(new_bucket_seq, ja->bucket_seq, ja->nr * sizeof(u64));
+ swap(new_buckets, ja->buckets);
+ swap(new_bucket_seq, ja->bucket_seq);
+
+ if (c)
+ spin_unlock(&c->journal.lock);
+
+ while (ja->nr < nr) {
+ struct open_bucket *ob = NULL;
+ long bucket;
+
+ if (new_fs) {
+ percpu_down_read(&c->usage_lock);
+ bucket = bch2_bucket_alloc_new_fs(ca);
+ percpu_up_read(&c->usage_lock);
+
+ if (bucket < 0) {
+ ret = -ENOSPC;
+ goto err;
+ }
+ } else {
+ int ob_idx = bch2_bucket_alloc(c, ca, RESERVE_ALLOC, false, cl);
+ if (ob_idx < 0) {
+ ret = cl ? -EAGAIN : -ENOSPC;
+ goto err;
+ }
+
+ ob = c->open_buckets + ob_idx;
+ bucket = sector_to_bucket(ca, ob->ptr.offset);
+ }
+
+ if (c) {
+ percpu_down_read(&c->usage_lock);
+ spin_lock(&c->journal.lock);
+ }
+
+ __array_insert_item(ja->buckets, ja->nr, ja->last_idx);
+ __array_insert_item(ja->bucket_seq, ja->nr, ja->last_idx);
+ __array_insert_item(journal_buckets->buckets, ja->nr, ja->last_idx);
+
+ ja->buckets[ja->last_idx] = bucket;
+ ja->bucket_seq[ja->last_idx] = 0;
+ journal_buckets->buckets[ja->last_idx] = cpu_to_le64(bucket);
+
+ if (ja->last_idx < ja->nr) {
+ if (ja->cur_idx >= ja->last_idx)
+ ja->cur_idx++;
+ ja->last_idx++;
+ }
+ ja->nr++;
+
+ bch2_mark_metadata_bucket(c, ca, bucket, BCH_DATA_JOURNAL,
+ ca->mi.bucket_size,
+ gc_phase(GC_PHASE_SB),
+ new_fs
+ ? BCH_BUCKET_MARK_MAY_MAKE_UNAVAILABLE
+ : 0);
+
+ if (c) {
+ spin_unlock(&c->journal.lock);
+ percpu_up_read(&c->usage_lock);
+ }
+
+ if (!new_fs)
+ bch2_open_bucket_put(c, ob);
+ }
+
+ ret = 0;
+err:
+ kfree(new_bucket_seq);
+ kfree(new_buckets);
+
+ return ret;
+}
+
+/*
+ * Allocate more journal space at runtime - not currently making use if it, but
+ * the code works:
+ */
+int bch2_set_nr_journal_buckets(struct bch_fs *c, struct bch_dev *ca,
+ unsigned nr)
+{
+ struct journal_device *ja = &ca->journal;
+ struct closure cl;
+ unsigned current_nr;
+ int ret;
+
+ closure_init_stack(&cl);
+
+ do {
+ struct disk_reservation disk_res = { 0, 0 };
+
+ closure_sync(&cl);
+
+ mutex_lock(&c->sb_lock);
+ current_nr = ja->nr;
+
+ /*
+ * note: journal buckets aren't really counted as _sectors_ used yet, so
+ * we don't need the disk reservation to avoid the BUG_ON() in buckets.c
+ * when space used goes up without a reservation - but we do need the
+ * reservation to ensure we'll actually be able to allocate:
+ */
+
+ if (bch2_disk_reservation_get(c, &disk_res,
+ bucket_to_sector(ca, nr - ja->nr), 1, 0)) {
+ mutex_unlock(&c->sb_lock);
+ return -ENOSPC;
+ }
+
+ ret = __bch2_set_nr_journal_buckets(ca, nr, false, &cl);
+
+ bch2_disk_reservation_put(c, &disk_res);
+
+ if (ja->nr != current_nr)
+ bch2_write_super(c);
+ mutex_unlock(&c->sb_lock);
+ } while (ret == -EAGAIN);
+
+ return ret;
+}
+
+int bch2_dev_journal_alloc(struct bch_dev *ca)
+{
+ unsigned nr;
+
+ if (dynamic_fault("bcachefs:add:journal_alloc"))
+ return -ENOMEM;
+
+ /*
+ * clamp journal size to 1024 buckets or 512MB (in sectors), whichever
+ * is smaller:
+ */
+ nr = clamp_t(unsigned, ca->mi.nbuckets >> 8,
+ BCH_JOURNAL_BUCKETS_MIN,
+ min(1 << 10,
+ (1 << 20) / ca->mi.bucket_size));
+
+ return __bch2_set_nr_journal_buckets(ca, nr, true, NULL);
+}
+
+/* startup/shutdown: */
+
+static bool bch2_journal_writing_to_device(struct journal *j, unsigned dev_idx)
+{
+ union journal_res_state state;
+ struct journal_buf *w;
+ bool ret;
+
+ spin_lock(&j->lock);
+ state = READ_ONCE(j->reservations);
+ w = j->buf + !state.idx;
+
+ ret = state.prev_buf_unwritten &&
+ bch2_extent_has_device(bkey_i_to_s_c_extent(&w->key), dev_idx);
+ spin_unlock(&j->lock);
+
+ return ret;
+}
+
+void bch2_dev_journal_stop(struct journal *j, struct bch_dev *ca)
+{
+ spin_lock(&j->lock);
+ bch2_extent_drop_device(bkey_i_to_s_extent(&j->key), ca->dev_idx);
+ spin_unlock(&j->lock);
+
+ wait_event(j->wait, !bch2_journal_writing_to_device(j, ca->dev_idx));
+}
+
+void bch2_fs_journal_stop(struct journal *j)
+{
+ struct bch_fs *c = container_of(j, struct bch_fs, journal);
+
+ wait_event(j->wait, journal_flush_write(j));
+
+ /* do we need to write another journal entry? */
+ if (test_bit(JOURNAL_NOT_EMPTY, &j->flags) ||
+ c->btree_roots_dirty)
+ bch2_journal_meta(j);
+
+ BUG_ON(!bch2_journal_error(j) &&
+ test_bit(JOURNAL_NOT_EMPTY, &j->flags));
+
+ cancel_delayed_work_sync(&j->write_work);
+ cancel_delayed_work_sync(&j->reclaim_work);
+}
+
+void bch2_fs_journal_start(struct journal *j)
+{
+ struct journal_seq_blacklist *bl;
+ u64 blacklist = 0;
+
+ list_for_each_entry(bl, &j->seq_blacklist, list)
+ blacklist = max(blacklist, bl->end);
+
+ spin_lock(&j->lock);
+
+ set_bit(JOURNAL_STARTED, &j->flags);
+
+ while (journal_cur_seq(j) < blacklist)
+ journal_pin_new_entry(j, 0);
+
+ /*
+ * journal_buf_switch() only inits the next journal entry when it
+ * closes an open journal entry - the very first journal entry gets
+ * initialized here:
+ */
+ journal_pin_new_entry(j, 1);
+ bch2_journal_buf_init(j);
+
+ spin_unlock(&j->lock);
+
+ /*
+ * Adding entries to the next journal entry before allocating space on
+ * disk for the next journal entry - this is ok, because these entries
+ * only have to go down with the next journal entry we write:
+ */
+ bch2_journal_seq_blacklist_write(j);
+
+ queue_delayed_work(system_freezable_wq, &j->reclaim_work, 0);
+}
+
+/* init/exit: */
+
+void bch2_dev_journal_exit(struct bch_dev *ca)
+{
+ kfree(ca->journal.bio);
+ kfree(ca->journal.buckets);
+ kfree(ca->journal.bucket_seq);
+
+ ca->journal.bio = NULL;
+ ca->journal.buckets = NULL;
+ ca->journal.bucket_seq = NULL;
+}
+
+int bch2_dev_journal_init(struct bch_dev *ca, struct bch_sb *sb)
+{
+ struct journal_device *ja = &ca->journal;
+ struct bch_sb_field_journal *journal_buckets =
+ bch2_sb_get_journal(sb);
+ unsigned i, nr_bvecs;
+
+ ja->nr = bch2_nr_journal_buckets(journal_buckets);
+
+ ja->bucket_seq = kcalloc(ja->nr, sizeof(u64), GFP_KERNEL);
+ if (!ja->bucket_seq)
+ return -ENOMEM;
+
+ nr_bvecs = DIV_ROUND_UP(JOURNAL_ENTRY_SIZE_MAX, PAGE_SIZE);
+
+ ca->journal.bio = bio_kmalloc(nr_bvecs, GFP_KERNEL);
+ if (!ca->journal.bio)
+ return -ENOMEM;
+
+ bio_init(ca->journal.bio, NULL, ca->journal.bio->bi_inline_vecs, nr_bvecs, 0);
+
+ ja->buckets = kcalloc(ja->nr, sizeof(u64), GFP_KERNEL);
+ if (!ja->buckets)
+ return -ENOMEM;
+
+ for (i = 0; i < ja->nr; i++)
+ ja->buckets[i] = le64_to_cpu(journal_buckets->buckets[i]);
+
+ return 0;
+}
+
+void bch2_fs_journal_exit(struct journal *j)
+{
+ kvpfree(j->buf[1].data, j->buf[1].size);
+ kvpfree(j->buf[0].data, j->buf[0].size);
+ free_fifo(&j->pin);
+}
+
+int bch2_fs_journal_init(struct journal *j)
+{
+ struct bch_fs *c = container_of(j, struct bch_fs, journal);
+ static struct lock_class_key res_key;
+ int ret = 0;
+
+ pr_verbose_init(c->opts, "");
+
+ spin_lock_init(&j->lock);
+ spin_lock_init(&j->err_lock);
+ init_waitqueue_head(&j->wait);
+ INIT_DELAYED_WORK(&j->write_work, journal_write_work);
+ INIT_DELAYED_WORK(&j->reclaim_work, bch2_journal_reclaim_work);
+ mutex_init(&j->blacklist_lock);
+ INIT_LIST_HEAD(&j->seq_blacklist);
+ mutex_init(&j->reclaim_lock);
+
+ lockdep_init_map(&j->res_map, "journal res", &res_key, 0);
+
+ j->buf[0].size = JOURNAL_ENTRY_SIZE_MIN;
+ j->buf[1].size = JOURNAL_ENTRY_SIZE_MIN;
+ j->write_delay_ms = 1000;
+ j->reclaim_delay_ms = 100;
+
+ bkey_extent_init(&j->key);
+
+ atomic64_set(&j->reservations.counter,
+ ((union journal_res_state)
+ { .cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL }).v);
+
+ if (!(init_fifo(&j->pin, JOURNAL_PIN, GFP_KERNEL)) ||
+ !(j->buf[0].data = kvpmalloc(j->buf[0].size, GFP_KERNEL)) ||
+ !(j->buf[1].data = kvpmalloc(j->buf[1].size, GFP_KERNEL))) {
+ ret = -ENOMEM;
+ goto out;
+ }
+
+ j->pin.front = j->pin.back = 1;
+out:
+ pr_verbose_init(c->opts, "ret %i", ret);
+ return ret;
+}
+
+/* debug: */
+
+ssize_t bch2_journal_print_debug(struct journal *j, char *buf)
+{
+ struct bch_fs *c = container_of(j, struct bch_fs, journal);
+ union journal_res_state *s = &j->reservations;
+ struct bch_dev *ca;
+ unsigned iter;
+ ssize_t ret = 0;
+
+ rcu_read_lock();
+ spin_lock(&j->lock);
+
+ ret += scnprintf(buf + ret, PAGE_SIZE - ret,
+ "active journal entries:\t%llu\n"
+ "seq:\t\t\t%llu\n"
+ "last_seq:\t\t%llu\n"
+ "last_seq_ondisk:\t%llu\n"
+ "reservation count:\t%u\n"
+ "reservation offset:\t%u\n"
+ "current entry u64s:\t%u\n"
+ "io in flight:\t\t%i\n"
+ "need write:\t\t%i\n"
+ "dirty:\t\t\t%i\n"
+ "replay done:\t\t%i\n",
+ fifo_used(&j->pin),
+ journal_cur_seq(j),
+ journal_last_seq(j),
+ j->last_seq_ondisk,
+ journal_state_count(*s, s->idx),
+ s->cur_entry_offset,
+ j->cur_entry_u64s,
+ s->prev_buf_unwritten,
+ test_bit(JOURNAL_NEED_WRITE, &j->flags),
+ journal_entry_is_open(j),
+ test_bit(JOURNAL_REPLAY_DONE, &j->flags));
+
+ for_each_member_device_rcu(ca, c, iter,
+ &c->rw_devs[BCH_DATA_JOURNAL]) {
+ struct journal_device *ja = &ca->journal;
+
+ if (!ja->nr)
+ continue;
+
+ ret += scnprintf(buf + ret, PAGE_SIZE - ret,
+ "dev %u:\n"
+ "\tnr\t\t%u\n"
+ "\tcur_idx\t\t%u (seq %llu)\n"
+ "\tlast_idx\t%u (seq %llu)\n",
+ iter, ja->nr,
+ ja->cur_idx, ja->bucket_seq[ja->cur_idx],
+ ja->last_idx, ja->bucket_seq[ja->last_idx]);
+ }
+
+ spin_unlock(&j->lock);
+ rcu_read_unlock();
+
+ return ret;
+}
+
+ssize_t bch2_journal_print_pins(struct journal *j, char *buf)
+{
+ struct journal_entry_pin_list *pin_list;
+ struct journal_entry_pin *pin;
+ ssize_t ret = 0;
+ u64 i;
+
+ spin_lock(&j->lock);
+ fifo_for_each_entry_ptr(pin_list, &j->pin, i) {
+ ret += scnprintf(buf + ret, PAGE_SIZE - ret,
+ "%llu: count %u\n",
+ i, atomic_read(&pin_list->count));
+
+ list_for_each_entry(pin, &pin_list->list, list)
+ ret += scnprintf(buf + ret, PAGE_SIZE - ret,
+ "\t%p %pf\n",
+ pin, pin->flush);
+
+ if (!list_empty(&pin_list->flushed))
+ ret += scnprintf(buf + ret, PAGE_SIZE - ret,
+ "flushed:\n");
+
+ list_for_each_entry(pin, &pin_list->flushed, list)
+ ret += scnprintf(buf + ret, PAGE_SIZE - ret,
+ "\t%p %pf\n",
+ pin, pin->flush);
+ }
+ spin_unlock(&j->lock);
+
+ return ret;
+}