diff --git a/.bcachefs_revision b/.bcachefs_revision index b1e0bf83..81c781d7 100644 --- a/.bcachefs_revision +++ b/.bcachefs_revision @@ -1 +1 @@ -ffad51ba45f8c0785bbb2e2903715d825a8eea9a +8ffb42b3d09418642680d23401a7a71d6ff87e3a diff --git a/libbcachefs/btree_gc.c b/libbcachefs/btree_gc.c index cd901654..73b947a4 100644 --- a/libbcachefs/btree_gc.c +++ b/libbcachefs/btree_gc.c @@ -781,7 +781,7 @@ static int bch2_gc_mark_key(struct btree_trans *trans, enum btree_id btree_id, if (initial) { BUG_ON(bch2_journal_seq_verify && - k->k->version.lo > journal_cur_seq(&c->journal)); + k->k->version.lo > atomic64_read(&c->journal.seq)); ret = bch2_check_fix_ptrs(c, btree_id, level, is_root, k); if (ret) diff --git a/libbcachefs/btree_update_interior.c b/libbcachefs/btree_update_interior.c index 63832fb9..e2cf0f58 100644 --- a/libbcachefs/btree_update_interior.c +++ b/libbcachefs/btree_update_interior.c @@ -557,8 +557,6 @@ static void btree_update_nodes_written(struct btree_update *as) if (ret) goto err; - BUG_ON(!journal_pin_active(&as->journal)); - /* * Wait for any in flight writes to finish before we free the old nodes * on disk: @@ -1047,10 +1045,6 @@ bch2_btree_update_start(struct btree_trans *trans, struct btree_path *path, goto err; } - bch2_journal_pin_add(&c->journal, - atomic64_read(&c->journal.seq), - &as->journal, NULL); - return as; err: bch2_btree_update_free(as); diff --git a/libbcachefs/journal.c b/libbcachefs/journal.c index 9cd1e11a..fb533ecc 100644 --- a/libbcachefs/journal.c +++ b/libbcachefs/journal.c @@ -20,18 +20,9 @@ #include -static u64 last_unwritten_seq(struct journal *j) -{ - union journal_res_state s = READ_ONCE(j->reservations); - - lockdep_assert_held(&j->lock); - - return journal_cur_seq(j) - ((s.idx - s.unwritten_idx) & JOURNAL_BUF_MASK); -} - static inline bool journal_seq_unwritten(struct journal *j, u64 seq) { - return seq >= last_unwritten_seq(j); + return seq > j->seq_ondisk; } static bool __journal_entry_is_open(union journal_res_state state) @@ -39,6 +30,11 @@ static bool __journal_entry_is_open(union journal_res_state state) return state.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL; } +static inline unsigned nr_unwritten_journal_entries(struct journal *j) +{ + return atomic64_read(&j->seq) - j->seq_ondisk; +} + static bool journal_entry_is_open(struct journal *j) { return __journal_entry_is_open(j->reservations); @@ -50,8 +46,6 @@ journal_seq_to_buf(struct journal *j, u64 seq) struct journal_buf *buf = NULL; EBUG_ON(seq > journal_cur_seq(j)); - EBUG_ON(seq == journal_cur_seq(j) && - j->reservations.cur_entry_offset == JOURNAL_ENTRY_CLOSED_VAL); if (journal_seq_unwritten(j, seq)) { buf = j->buf + (seq & JOURNAL_BUF_MASK); @@ -69,54 +63,6 @@ static void journal_pin_list_init(struct journal_entry_pin_list *p, int count) p->devs.nr = 0; } -static void journal_pin_new_entry(struct journal *j) -{ - /* - * The fifo_push() needs to happen at the same time as j->seq is - * incremented for journal_last_seq() to be calculated correctly - */ - atomic64_inc(&j->seq); - journal_pin_list_init(fifo_push_ref(&j->pin), 1); -} - -static void bch2_journal_buf_init(struct journal *j) -{ - struct journal_buf *buf = journal_cur_buf(j); - - bkey_extent_init(&buf->key); - buf->noflush = false; - buf->must_flush = false; - buf->separate_flush = false; - - memset(buf->data, 0, sizeof(*buf->data)); - buf->data->seq = cpu_to_le64(journal_cur_seq(j)); - buf->data->u64s = 0; -} - -void bch2_journal_halt(struct journal *j) -{ - union journal_res_state old, new; - u64 v = atomic64_read(&j->reservations.counter); - - do { - old.v = new.v = v; - if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL) - return; - - new.cur_entry_offset = JOURNAL_ENTRY_ERROR_VAL; - } while ((v = atomic64_cmpxchg(&j->reservations.counter, - old.v, new.v)) != old.v); - - /* - * XXX: we're not using j->lock here because this can be called from - * interrupt context, this can race with journal_write_done() - */ - if (!j->err_seq) - j->err_seq = journal_cur_seq(j); - journal_wake(j); - closure_wake_up(&journal_cur_buf(j)->wait); -} - /* journal entry close/open: */ void __bch2_journal_buf_put(struct journal *j) @@ -132,7 +78,7 @@ void __bch2_journal_buf_put(struct journal *j) * We don't close a journal_buf until the next journal_buf is finished writing, * and can be opened again - this also initializes the next journal_buf: */ -static bool __journal_entry_close(struct journal *j) +static void __journal_entry_close(struct journal *j, unsigned closed_val) { struct bch_fs *c = container_of(j, struct bch_fs, journal); struct journal_buf *buf = journal_cur_buf(j); @@ -140,34 +86,24 @@ static bool __journal_entry_close(struct journal *j) u64 v = atomic64_read(&j->reservations.counter); unsigned sectors; + BUG_ON(closed_val != JOURNAL_ENTRY_CLOSED_VAL && + closed_val != JOURNAL_ENTRY_ERROR_VAL); + lockdep_assert_held(&j->lock); do { old.v = new.v = v; - if (old.cur_entry_offset == JOURNAL_ENTRY_CLOSED_VAL) - return true; + new.cur_entry_offset = closed_val; - if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL) { - /* this entry will never be written: */ - closure_wake_up(&buf->wait); - return true; - } - - if (!test_bit(JOURNAL_NEED_WRITE, &j->flags)) { - set_bit(JOURNAL_NEED_WRITE, &j->flags); - j->need_write_time = local_clock(); - } - - new.cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL; - new.idx++; - - if (new.idx == new.unwritten_idx) - return false; - - BUG_ON(journal_state_count(new, new.idx)); + if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL || + old.cur_entry_offset == new.cur_entry_offset) + return; } while ((v = atomic64_cmpxchg(&j->reservations.counter, old.v, new.v)) != old.v); + if (!__journal_entry_is_open(old)) + return; + /* Close out old buffer: */ buf->data->u64s = cpu_to_le32(old.cur_entry_offset); @@ -197,36 +133,42 @@ static bool __journal_entry_close(struct journal *j) */ buf->last_seq = journal_last_seq(j); buf->data->last_seq = cpu_to_le64(buf->last_seq); + BUG_ON(buf->last_seq > le64_to_cpu(buf->data->seq)); __bch2_journal_pin_put(j, le64_to_cpu(buf->data->seq)); - /* Initialize new buffer: */ - journal_pin_new_entry(j); - - bch2_journal_buf_init(j); - cancel_delayed_work(&j->write_work); - clear_bit(JOURNAL_NEED_WRITE, &j->flags); bch2_journal_space_available(j); bch2_journal_buf_put(j, old.idx); - return true; +} + +void bch2_journal_halt(struct journal *j) +{ + spin_lock(&j->lock); + __journal_entry_close(j, JOURNAL_ENTRY_ERROR_VAL); + if (!j->err_seq) + j->err_seq = journal_cur_seq(j); + spin_unlock(&j->lock); } static bool journal_entry_want_write(struct journal *j) { - union journal_res_state s = READ_ONCE(j->reservations); - bool ret = false; + bool ret = !journal_entry_is_open(j) || + journal_cur_seq(j) == journal_last_unwritten_seq(j); - /* - * Don't close it yet if we already have a write in flight, but do set - * NEED_WRITE: - */ - if (s.idx != s.unwritten_idx) - set_bit(JOURNAL_NEED_WRITE, &j->flags); - else - ret = __journal_entry_close(j); + /* Don't close it yet if we already have a write in flight: */ + if (ret) + __journal_entry_close(j, JOURNAL_ENTRY_CLOSED_VAL); + else if (nr_unwritten_journal_entries(j)) { + struct journal_buf *buf = journal_cur_buf(j); + + if (!buf->flush_time) { + buf->flush_time = local_clock() ?: 1; + buf->expires = jiffies; + } + } return ret; } @@ -255,15 +197,15 @@ static bool journal_entry_close(struct journal *j) static int journal_entry_open(struct journal *j) { struct bch_fs *c = container_of(j, struct bch_fs, journal); - struct journal_buf *buf = journal_cur_buf(j); + struct journal_buf *buf = j->buf + + ((journal_cur_seq(j) + 1) & JOURNAL_BUF_MASK); union journal_res_state old, new; int u64s; u64 v; - BUG_ON(BCH_SB_CLEAN(c->disk_sb.sb)); - lockdep_assert_held(&j->lock); BUG_ON(journal_entry_is_open(j)); + BUG_ON(BCH_SB_CLEAN(c->disk_sb.sb)); if (j->blocked) return cur_entry_blocked; @@ -271,19 +213,53 @@ static int journal_entry_open(struct journal *j) if (j->cur_entry_error) return j->cur_entry_error; + if (bch2_journal_error(j)) + return cur_entry_insufficient_devices; /* -EROFS */ + + if (!fifo_free(&j->pin)) + return cur_entry_journal_pin_full; + + if (nr_unwritten_journal_entries(j) == ARRAY_SIZE(j->buf) - 1) + return cur_entry_max_in_flight; + BUG_ON(!j->cur_entry_sectors); + buf->expires = + (journal_cur_seq(j) == j->flushed_seq_ondisk + ? jiffies + : j->last_flush_write) + + msecs_to_jiffies(c->opts.journal_flush_delay); + buf->u64s_reserved = j->entry_u64s_reserved; buf->disk_sectors = j->cur_entry_sectors; buf->sectors = min(buf->disk_sectors, buf->buf_size >> 9); u64s = (int) (buf->sectors << 9) / sizeof(u64) - journal_entry_overhead(j); - u64s = clamp_t(int, u64s, 0, JOURNAL_ENTRY_CLOSED_VAL - 1); + u64s = clamp_t(int, u64s, 0, JOURNAL_ENTRY_CLOSED_VAL - 1); - if (u64s <= le32_to_cpu(buf->data->u64s)) + if (u64s <= 0) return cur_entry_journal_full; + /* + * The fifo_push() needs to happen at the same time as j->seq is + * incremented for journal_last_seq() to be calculated correctly + */ + atomic64_inc(&j->seq); + journal_pin_list_init(fifo_push_ref(&j->pin), 1); + + BUG_ON(j->buf + (journal_cur_seq(j) & JOURNAL_BUF_MASK) != buf); + + bkey_extent_init(&buf->key); + buf->noflush = false; + buf->must_flush = false; + buf->separate_flush = false; + buf->flush_time = 0; + + memset(buf->data, 0, sizeof(*buf->data)); + buf->data->seq = cpu_to_le64(journal_cur_seq(j)); + buf->data->u64s = 0; + /* * Must be set before marking the journal entry as open: */ @@ -293,14 +269,14 @@ static int journal_entry_open(struct journal *j) do { old.v = new.v = v; - if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL) - return cur_entry_insufficient_devices; + BUG_ON(old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL); - /* Handle any already added entries */ - new.cur_entry_offset = le32_to_cpu(buf->data->u64s); + new.idx++; + BUG_ON(journal_state_count(new, new.idx)); + BUG_ON(new.idx != (journal_cur_seq(j) & JOURNAL_BUF_MASK)); - EBUG_ON(journal_state_count(new, new.idx)); journal_state_inc(&new); + new.cur_entry_offset = 0; } while ((v = atomic64_cmpxchg(&j->reservations.counter, old.v, new.v)) != old.v); @@ -318,8 +294,7 @@ static int journal_entry_open(struct journal *j) static bool journal_quiesced(struct journal *j) { - union journal_res_state s = READ_ONCE(j->reservations); - bool ret = s.idx == s.unwritten_idx && !__journal_entry_is_open(s); + bool ret = atomic64_read(&j->seq) == j->seq_ondisk; if (!ret) journal_entry_close(j); @@ -334,8 +309,19 @@ static void journal_quiesce(struct journal *j) static void journal_write_work(struct work_struct *work) { struct journal *j = container_of(work, struct journal, write_work.work); + struct bch_fs *c = container_of(j, struct bch_fs, journal); + struct journal_buf *buf; + long delta; - journal_entry_close(j); + spin_lock(&j->lock); + buf = journal_cur_buf(j); + delta = buf->expires - jiffies; + + if (delta > 0) + mod_delayed_work(c->io_complete_wq, &j->write_work, delta); + else + __journal_entry_close(j, JOURNAL_ENTRY_CLOSED_VAL); + spin_unlock(&j->lock); } static int __journal_res_get(struct journal *j, struct journal_res *res, @@ -385,18 +371,11 @@ retry: buf->buf_size < JOURNAL_ENTRY_SIZE_MAX) j->buf_size_want = max(j->buf_size_want, buf->buf_size << 1); - if (journal_entry_is_open(j) && - !__journal_entry_close(j)) { - /* - * We failed to get a reservation on the current open journal - * entry because it's full, and we can't close it because - * there's still a previous one in flight: - */ + __journal_entry_close(j, JOURNAL_ENTRY_CLOSED_VAL); + ret = journal_entry_open(j); + + if (ret == cur_entry_max_in_flight) trace_journal_entry_full(c); - ret = cur_entry_blocked; - } else { - ret = journal_entry_open(j); - } unlock: if ((ret && ret != cur_entry_insufficient_devices) && !j->res_get_blocked_start) { @@ -413,7 +392,7 @@ unlock: if ((ret == cur_entry_journal_full || ret == cur_entry_journal_pin_full) && !can_discard && - j->reservations.idx == j->reservations.unwritten_idx && + !nr_unwritten_journal_entries(j) && (flags & JOURNAL_RES_GET_RESERVED)) { struct printbuf buf = PRINTBUF; @@ -528,7 +507,7 @@ void bch2_journal_entry_res_resize(struct journal *j, /* * Not enough room in current journal entry, have to flush it: */ - __journal_entry_close(j); + __journal_entry_close(j, JOURNAL_ENTRY_CLOSED_VAL); } else { journal_cur_buf(j)->u64s_reserved += d; } @@ -573,12 +552,15 @@ int bch2_journal_flush_seq_async(struct journal *j, u64 seq, } /* if seq was written, but not flushed - flush a newer one instead */ - seq = max(seq, last_unwritten_seq(j)); + seq = max(seq, journal_last_unwritten_seq(j)); recheck_need_open: - if (seq == journal_cur_seq(j) && !journal_entry_is_open(j)) { + if (seq > journal_cur_seq(j)) { struct journal_res res = { 0 }; + if (journal_entry_is_open(j)) + __journal_entry_close(j, JOURNAL_ENTRY_CLOSED_VAL); + spin_unlock(&j->lock); ret = bch2_journal_res_get(j, &res, jset_u64s(0), 0); @@ -588,7 +570,11 @@ recheck_need_open: seq = res.seq; buf = j->buf + (seq & JOURNAL_BUF_MASK); buf->must_flush = true; - set_bit(JOURNAL_NEED_WRITE, &j->flags); + + if (!buf->flush_time) { + buf->flush_time = local_clock() ?: 1; + buf->expires = jiffies; + } if (parent && !closure_wait(&buf->wait, parent)) BUG(); @@ -654,7 +640,11 @@ int bch2_journal_meta(struct journal *j) buf = j->buf + (res.seq & JOURNAL_BUF_MASK); buf->must_flush = true; - set_bit(JOURNAL_NEED_WRITE, &j->flags); + + if (!buf->flush_time) { + buf->flush_time = local_clock() ?: 1; + buf->expires = jiffies; + } bch2_journal_res_put(j, &res); @@ -667,42 +657,12 @@ int bch2_journal_meta(struct journal *j) */ void bch2_journal_flush_async(struct journal *j, struct closure *parent) { - u64 seq, journal_seq; - - spin_lock(&j->lock); - journal_seq = journal_cur_seq(j); - - if (journal_entry_is_open(j)) { - seq = journal_seq; - } else if (journal_seq) { - seq = journal_seq - 1; - } else { - spin_unlock(&j->lock); - return; - } - spin_unlock(&j->lock); - - bch2_journal_flush_seq_async(j, seq, parent); + bch2_journal_flush_seq_async(j, atomic64_read(&j->seq), parent); } int bch2_journal_flush(struct journal *j) { - u64 seq, journal_seq; - - spin_lock(&j->lock); - journal_seq = journal_cur_seq(j); - - if (journal_entry_is_open(j)) { - seq = journal_seq; - } else if (journal_seq) { - seq = journal_seq - 1; - } else { - spin_unlock(&j->lock); - return 0; - } - spin_unlock(&j->lock); - - return bch2_journal_flush_seq(j, seq); + return bch2_journal_flush_seq(j, atomic64_read(&j->seq)); } /* @@ -725,13 +685,13 @@ bool bch2_journal_noflush_seq(struct journal *j, u64 seq) if (seq <= c->journal.flushed_seq_ondisk) goto out; - for (unwritten_seq = last_unwritten_seq(j); + for (unwritten_seq = journal_last_unwritten_seq(j); unwritten_seq < seq; unwritten_seq++) { struct journal_buf *buf = journal_seq_to_buf(j, unwritten_seq); /* journal write is already in flight, and was a flush write: */ - if (unwritten_seq == last_unwritten_seq(j) && !buf->noflush) + if (unwritten_seq == journal_last_unwritten_seq(j) && !buf->noflush) goto out; buf->noflush = true; @@ -956,17 +916,16 @@ int bch2_dev_journal_alloc(struct bch_dev *ca) static bool bch2_journal_writing_to_device(struct journal *j, unsigned dev_idx) { - union journal_res_state state; bool ret = false; - unsigned i; + u64 seq; spin_lock(&j->lock); - state = READ_ONCE(j->reservations); - i = state.idx; + for (seq = journal_last_unwritten_seq(j); + seq <= journal_cur_seq(j) && !ret; + seq++) { + struct journal_buf *buf = journal_seq_to_buf(j, seq); - while (i != state.unwritten_idx) { - i = (i - 1) & JOURNAL_BUF_MASK; - if (bch2_bkey_has_device(bkey_i_to_s_c(&j->buf[i].key), dev_idx)) + if (bch2_bkey_has_device(bkey_i_to_s_c(&buf->key), dev_idx)) ret = true; } spin_unlock(&j->lock); @@ -995,8 +954,7 @@ void bch2_fs_journal_stop(struct journal *j) BUG_ON(!bch2_journal_error(j) && test_bit(JOURNAL_REPLAY_DONE, &j->flags) && - (journal_entry_is_open(j) || - j->last_empty_seq + 1 != journal_cur_seq(j))); + j->last_empty_seq != journal_cur_seq(j)); cancel_delayed_work_sync(&j->write_work); bch2_journal_reclaim_stop(j); @@ -1029,6 +987,7 @@ int bch2_fs_journal_start(struct journal *j, u64 cur_seq, j->replay_journal_seq_end = cur_seq; j->last_seq_ondisk = last_seq; j->flushed_seq_ondisk = cur_seq - 1; + j->seq_ondisk = cur_seq - 1; j->pin.front = last_seq; j->pin.back = cur_seq; atomic64_set(&j->seq, cur_seq - 1); @@ -1066,11 +1025,8 @@ int bch2_fs_journal_start(struct journal *j, u64 cur_seq, set_bit(JOURNAL_STARTED, &j->flags); j->last_flush_write = jiffies; - journal_pin_new_entry(j); - j->reservations.idx = j->reservations.unwritten_idx = journal_cur_seq(j); - - bch2_journal_buf_init(j); + j->reservations.unwritten_idx++; c->last_bucket_seq_cleanup = journal_cur_seq(j); @@ -1182,15 +1138,18 @@ void __bch2_journal_debug_to_text(struct printbuf *out, struct journal *j) union journal_res_state s; struct bch_dev *ca; unsigned long now = jiffies; + u64 seq; unsigned i; out->atomic++; + out->tabstops[0] = 24; rcu_read_lock(); s = READ_ONCE(j->reservations); - pr_buf(out, "active journal entries:\t%llu\n", fifo_used(&j->pin)); + pr_buf(out, "dirty journal entries:\t%llu\n", fifo_used(&j->pin)); pr_buf(out, "seq:\t\t\t%llu\n", journal_cur_seq(j)); + pr_buf(out, "seq_ondisk:\t\t%llu\n", j->seq_ondisk); pr_buf(out, "last_seq:\t\t%llu\n", journal_last_seq(j)); pr_buf(out, "last_seq_ondisk:\t%llu\n", j->last_seq_ondisk); pr_buf(out, "flushed_seq_ondisk:\t%llu\n", j->flushed_seq_ondisk); @@ -1209,30 +1168,49 @@ void __bch2_journal_debug_to_text(struct printbuf *out, struct journal *j) switch (s.cur_entry_offset) { case JOURNAL_ENTRY_ERROR_VAL: - pr_buf(out, "error\n"); + pr_buf(out, "error"); break; case JOURNAL_ENTRY_CLOSED_VAL: - pr_buf(out, "closed\n"); + pr_buf(out, "closed"); break; default: - pr_buf(out, "%u/%u\n", s.cur_entry_offset, j->cur_entry_u64s); + pr_buf(out, "%u/%u", s.cur_entry_offset, j->cur_entry_u64s); break; } - pr_buf(out, "current entry:\t\tidx %u refcount %u\n", s.idx, journal_state_count(s, s.idx)); + pr_newline(out); - i = s.idx; - while (i != s.unwritten_idx) { - i = (i - 1) & JOURNAL_BUF_MASK; + for (seq = journal_cur_seq(j); + seq >= journal_last_unwritten_seq(j); + --seq) { + i = seq & JOURNAL_BUF_MASK; - pr_buf(out, "unwritten entry:\tidx %u refcount %u sectors %u\n", - i, journal_state_count(s, i), j->buf[i].sectors); + pr_buf(out, "unwritten entry:"); + pr_tab(out); + pr_buf(out, "%llu", seq); + pr_newline(out); + pr_indent_push(out, 2); + + pr_buf(out, "refcount:"); + pr_tab(out); + pr_buf(out, "%u", journal_state_count(s, i)); + pr_newline(out); + + pr_buf(out, "sectors:"); + pr_tab(out); + pr_buf(out, "%u", j->buf[i].sectors); + pr_newline(out); + + pr_buf(out, "expires"); + pr_tab(out); + pr_buf(out, "%li jiffies", j->buf[i].expires - jiffies); + pr_newline(out); + + pr_indent_pop(out, 2); } pr_buf(out, - "need write:\t\t%i\n" "replay done:\t\t%i\n", - test_bit(JOURNAL_NEED_WRITE, &j->flags), test_bit(JOURNAL_REPLAY_DONE, &j->flags)); pr_buf(out, "space:\n"); diff --git a/libbcachefs/journal.h b/libbcachefs/journal.h index 0a3fb8a0..1bb0e00d 100644 --- a/libbcachefs/journal.h +++ b/libbcachefs/journal.h @@ -141,6 +141,11 @@ static inline u64 journal_cur_seq(struct journal *j) return j->pin.back - 1; } +static inline u64 journal_last_unwritten_seq(struct journal *j) +{ + return j->seq_ondisk + 1; +} + void bch2_journal_set_has_inum(struct journal *, u64, u64); static inline int journal_state_count(union journal_res_state s, int idx) @@ -261,9 +266,6 @@ static inline void bch2_journal_buf_put(struct journal *j, unsigned idx) .buf3_count = idx == 3, }).v, &j->reservations.counter); - EBUG_ON(((s.idx - idx) & 3) > - ((s.idx - s.unwritten_idx) & 3)); - if (!journal_state_count(s, idx) && idx == s.unwritten_idx) __bch2_journal_buf_put(j); } diff --git a/libbcachefs/journal_io.c b/libbcachefs/journal_io.c index 4380ebf5..fb24ca21 100644 --- a/libbcachefs/journal_io.c +++ b/libbcachefs/journal_io.c @@ -1332,7 +1332,7 @@ static void journal_buf_realloc(struct journal *j, struct journal_buf *buf) static inline struct journal_buf *journal_last_unwritten_buf(struct journal *j) { - return j->buf + j->reservations.unwritten_idx; + return j->buf + (journal_last_unwritten_seq(j) & JOURNAL_BUF_MASK); } static void journal_write_done(struct closure *cl) @@ -1369,8 +1369,6 @@ static void journal_write_done(struct closure *cl) journal_seq_pin(j, seq)->devs = w->devs_written; if (!err) { - j->seq_ondisk = seq; - if (!JSET_NO_FLUSH(w->data)) { j->flushed_seq_ondisk = seq; j->last_seq_ondisk = w->last_seq; @@ -1378,6 +1376,8 @@ static void journal_write_done(struct closure *cl) } else if (!j->err_seq || seq < j->err_seq) j->err_seq = seq; + j->seq_ondisk = seq; + /* * Updating last_seq_ondisk may let bch2_journal_reclaim_work() discard * more buckets: @@ -1393,7 +1393,7 @@ static void journal_write_done(struct closure *cl) v = atomic64_read(&j->reservations.counter); do { old.v = new.v = v; - BUG_ON(new.idx == new.unwritten_idx); + BUG_ON(journal_state_count(new, new.unwritten_idx)); new.unwritten_idx++; } while ((v = atomic64_cmpxchg(&j->reservations.counter, @@ -1404,13 +1404,24 @@ static void journal_write_done(struct closure *cl) closure_wake_up(&w->wait); journal_wake(j); - if (test_bit(JOURNAL_NEED_WRITE, &j->flags)) - mod_delayed_work(c->io_complete_wq, &j->write_work, 0); - spin_unlock(&j->lock); - - if (new.unwritten_idx != new.idx && - !journal_state_count(new, new.unwritten_idx)) + if (!journal_state_count(new, new.unwritten_idx) && + journal_last_unwritten_seq(j) <= journal_cur_seq(j)) { closure_call(&j->io, bch2_journal_write, c->io_complete_wq, NULL); + } else if (journal_last_unwritten_seq(j) == journal_cur_seq(j) && + new.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL) { + struct journal_buf *buf = journal_cur_buf(j); + long delta = buf->expires - jiffies; + + /* + * We don't close a journal entry to write it while there's + * previous entries still in flight - the current journal entry + * might want to be written now: + */ + + mod_delayed_work(c->io_complete_wq, &j->write_work, max(0L, delta)); + } + + spin_unlock(&j->lock); } static void journal_write_endio(struct bio *bio) @@ -1505,11 +1516,11 @@ void bch2_journal_write(struct closure *cl) j->write_start_time = local_clock(); spin_lock(&j->lock); - if (c->sb.features & (1ULL << BCH_FEATURE_journal_no_flush) && - (w->noflush || - (!w->must_flush && - (jiffies - j->last_flush_write) < msecs_to_jiffies(c->opts.journal_flush_delay) && - test_bit(JOURNAL_MAY_SKIP_FLUSH, &j->flags)))) { + if (bch2_journal_error(j) || + w->noflush || + (!w->must_flush && + (jiffies - j->last_flush_write) < msecs_to_jiffies(c->opts.journal_flush_delay) && + test_bit(JOURNAL_MAY_SKIP_FLUSH, &j->flags))) { w->noflush = true; SET_JSET_NO_FLUSH(jset, true); jset->last_seq = 0; diff --git a/libbcachefs/journal_reclaim.c b/libbcachefs/journal_reclaim.c index 3dca50f7..ec565edb 100644 --- a/libbcachefs/journal_reclaim.c +++ b/libbcachefs/journal_reclaim.c @@ -59,25 +59,13 @@ static void journal_set_remaining(struct journal *j, unsigned u64s_remaining) old.v, new.v)) != old.v); } -static inline unsigned get_unwritten_sectors(struct journal *j, unsigned *idx) -{ - unsigned sectors = 0; - - while (!sectors && *idx != j->reservations.idx) { - sectors = j->buf[*idx].sectors; - - *idx = (*idx + 1) & JOURNAL_BUF_MASK; - } - - return sectors; -} - static struct journal_space journal_dev_space_available(struct journal *j, struct bch_dev *ca, enum journal_space_from from) { struct journal_device *ja = &ca->journal; - unsigned sectors, buckets, unwritten, idx = j->reservations.unwritten_idx; + unsigned sectors, buckets, unwritten; + u64 seq; if (from == journal_space_total) return (struct journal_space) { @@ -92,7 +80,14 @@ journal_dev_space_available(struct journal *j, struct bch_dev *ca, * We that we don't allocate the space for a journal entry * until we write it out - thus, account for it here: */ - while ((unwritten = get_unwritten_sectors(j, &idx))) { + for (seq = journal_last_unwritten_seq(j); + seq <= journal_cur_seq(j); + seq++) { + unwritten = j->buf[seq & JOURNAL_BUF_MASK].sectors; + + if (!unwritten) + continue; + /* entry won't fit on this device, skip: */ if (unwritten > ca->mi.bucket_size) continue; @@ -214,8 +209,7 @@ void bch2_journal_space_available(struct journal *j) total = j->space[journal_space_total].total; if (!clean_ondisk && - j->reservations.idx == - j->reservations.unwritten_idx) { + journal_cur_seq(j) == j->seq_ondisk) { struct printbuf buf = PRINTBUF; __bch2_journal_debug_to_text(&buf, j); @@ -226,8 +220,6 @@ void bch2_journal_space_available(struct journal *j) ret = cur_entry_journal_stuck; } else if (!j->space[journal_space_discarded].next_entry) ret = cur_entry_journal_full; - else if (!fifo_free(&j->pin)) - ret = cur_entry_journal_pin_full; if ((j->space[journal_space_clean_ondisk].next_entry < j->space[journal_space_clean_ondisk].total) && @@ -369,9 +361,6 @@ static inline void __journal_pin_drop(struct journal *j, if (atomic_dec_and_test(&pin_list->count) && pin_list == &fifo_peek_front(&j->pin)) bch2_journal_reclaim_fast(j); - else if (fifo_used(&j->pin) == 1 && - atomic_read(&pin_list->count) == 1) - journal_wake(j); } void bch2_journal_pin_drop(struct journal *j, @@ -772,8 +761,7 @@ static int journal_flush_done(struct journal *j, u64 seq_to_flush, */ ret = !test_bit(JOURNAL_REPLAY_DONE, &j->flags) || journal_last_seq(j) > seq_to_flush || - (fifo_used(&j->pin) == 1 && - atomic_read(&fifo_peek_front(&j->pin).count) == 1); + !fifo_used(&j->pin); spin_unlock(&j->lock); mutex_unlock(&j->reclaim_lock); diff --git a/libbcachefs/journal_types.h b/libbcachefs/journal_types.h index cd66b738..6fd45819 100644 --- a/libbcachefs/journal_types.h +++ b/libbcachefs/journal_types.h @@ -25,6 +25,8 @@ struct journal_buf { struct closure_waitlist wait; u64 last_seq; /* copy of data->last_seq */ + long expires; + u64 flush_time; unsigned buf_size; /* size in bytes of @data */ unsigned sectors; /* maximum size for current entry */ @@ -139,16 +141,9 @@ enum journal_space_from { journal_space_nr, }; -/* - * JOURNAL_NEED_WRITE - current (pending) journal entry should be written ASAP, - * either because something's waiting on the write to complete or because it's - * been dirty too long and the timer's expired. - */ - enum { JOURNAL_REPLAY_DONE, JOURNAL_STARTED, - JOURNAL_NEED_WRITE, JOURNAL_MAY_GET_UNRESERVED, JOURNAL_MAY_SKIP_FLUSH, }; @@ -172,6 +167,7 @@ struct journal { enum { cur_entry_ok, cur_entry_blocked, + cur_entry_max_in_flight, cur_entry_journal_full, cur_entry_journal_pin_full, cur_entry_journal_stuck, @@ -263,7 +259,6 @@ struct journal { unsigned long last_flush_write; u64 res_get_blocked_start; - u64 need_write_time; u64 write_start_time; u64 nr_flush_writes; diff --git a/libbcachefs/super-io.c b/libbcachefs/super-io.c index 8580b6fd..1a70adae 100644 --- a/libbcachefs/super-io.c +++ b/libbcachefs/super-io.c @@ -1341,7 +1341,7 @@ void bch2_fs_mark_clean(struct bch_fs *c) } sb_clean->flags = 0; - sb_clean->journal_seq = cpu_to_le64(journal_cur_seq(&c->journal) - 1); + sb_clean->journal_seq = cpu_to_le64(atomic64_read(&c->journal.seq)); /* Trying to catch outstanding bug: */ BUG_ON(le64_to_cpu(sb_clean->journal_seq) > S64_MAX);