mirror of
https://github.com/koverstreet/bcachefs-tools.git
synced 2025-02-02 00:00:03 +03:00
482 lines
12 KiB
Plaintext
482 lines
12 KiB
Plaintext
|
#!/usr/bin/python3
|
||
|
|
||
|
# SPDX-License-Identifier: GPL-2.0-or-later
|
||
|
# Copyright (C) 2023-2024 Oracle. All rights reserved.
|
||
|
#
|
||
|
# Author: Darrick J. Wong <djwong@kernel.org>
|
||
|
|
||
|
# Run bcachefsck in parallel, but avoid thrashing.
|
||
|
|
||
|
import subprocess
|
||
|
import json
|
||
|
import threading
|
||
|
import time
|
||
|
import sys
|
||
|
import os
|
||
|
import argparse
|
||
|
import signal
|
||
|
import dbus
|
||
|
from io import TextIOWrapper
|
||
|
from pathlib import Path
|
||
|
from datetime import timedelta
|
||
|
from datetime import datetime
|
||
|
from datetime import timezone
|
||
|
|
||
|
retcode = 0
|
||
|
terminate = False
|
||
|
debug = False
|
||
|
|
||
|
def DEVNULL():
|
||
|
'''Return /dev/null in subprocess writable format.'''
|
||
|
try:
|
||
|
from subprocess import DEVNULL
|
||
|
return DEVNULL
|
||
|
except ImportError:
|
||
|
return open(os.devnull, 'wb')
|
||
|
|
||
|
def find_mounts():
|
||
|
'''Map mountpoints to physical disks.'''
|
||
|
def find_bcachefs_mounts(bdev, fs, lastdisk):
|
||
|
'''Attach all lastdisk to each fs found under bdev.'''
|
||
|
if bdev['fstype'] == 'bcachefs' and bdev['mountpoint'] is not None:
|
||
|
mnt = bdev['mountpoint']
|
||
|
if mnt in fs:
|
||
|
fs[mnt].add(lastdisk.split(':'))
|
||
|
else:
|
||
|
fs[mnt] = set(lastdisk.split(':'))
|
||
|
if 'children' not in bdev:
|
||
|
return
|
||
|
for child in bdev['children']:
|
||
|
find_bcachefs_mounts(child, fs, lastdisk)
|
||
|
|
||
|
fs = {}
|
||
|
cmd=['lsblk', '-o', 'NAME,KNAME,TYPE,FSTYPE,MOUNTPOINT', '-J']
|
||
|
result = subprocess.Popen(cmd, stdout=subprocess.PIPE)
|
||
|
result.wait()
|
||
|
if result.returncode != 0:
|
||
|
return fs
|
||
|
sarray = [x.decode(sys.stdout.encoding) for x in result.stdout.readlines()]
|
||
|
output = ' '.join(sarray)
|
||
|
bdevdata = json.loads(output)
|
||
|
|
||
|
# The lsblk output had better be in disks-then-partitions order
|
||
|
for bdev in bdevdata['blockdevices']:
|
||
|
lastdisk = bdev['kname']
|
||
|
find_bcachefs_mounts(bdev, fs, lastdisk)
|
||
|
|
||
|
return fs
|
||
|
|
||
|
def backtick(cmd):
|
||
|
'''Generator function that yields lines of a program's stdout.'''
|
||
|
p = subprocess.Popen(cmd, stdout = subprocess.PIPE)
|
||
|
for line in TextIOWrapper(p.stdout, encoding="utf-8"):
|
||
|
yield line.strip()
|
||
|
|
||
|
def remove_killfunc(killfuncs, fn):
|
||
|
'''Ensure fn is not in killfuncs.'''
|
||
|
try:
|
||
|
killfuncs.remove(fn)
|
||
|
except:
|
||
|
pass
|
||
|
|
||
|
class scrub_control(object):
|
||
|
'''Control object for bcachefsck.'''
|
||
|
def __init__(self):
|
||
|
pass
|
||
|
|
||
|
def start(self):
|
||
|
'''Start scrub and wait for it to complete. Returns -1 if the
|
||
|
service was not started, 0 if it succeeded, or 1 if it
|
||
|
failed.'''
|
||
|
assert False
|
||
|
|
||
|
def stop(self):
|
||
|
'''Stop scrub.'''
|
||
|
assert False
|
||
|
|
||
|
class scrub_subprocess(scrub_control):
|
||
|
'''Control object for bcachefsck subprocesses.'''
|
||
|
def __init__(self, mnt):
|
||
|
cmd = ['bcachefs', 'fsck']
|
||
|
cmd += '@bcachefsck_args@'.split()
|
||
|
cmd += [mnt]
|
||
|
self.cmdline = cmd
|
||
|
self.proc = None
|
||
|
|
||
|
def start(self):
|
||
|
'''Start bcachefsck and wait for it to complete. Returns -1 if
|
||
|
the service was not started, 0 if it succeeded, or 1 if it
|
||
|
failed.'''
|
||
|
global debug
|
||
|
|
||
|
if debug:
|
||
|
print('run ', ' '.join(self.cmdline))
|
||
|
|
||
|
try:
|
||
|
self.proc = subprocess.Popen(self.cmdline)
|
||
|
self.proc.wait()
|
||
|
except:
|
||
|
return -1
|
||
|
|
||
|
proc = self.proc
|
||
|
self.proc = None
|
||
|
return proc.returncode
|
||
|
|
||
|
def stop(self):
|
||
|
'''Stop bcachefsck.'''
|
||
|
global debug
|
||
|
|
||
|
if debug:
|
||
|
print('kill ', ' '.join(self.cmdline))
|
||
|
if self.proc is not None:
|
||
|
self.proc.terminate()
|
||
|
|
||
|
def run_subprocess(mnt, killfuncs):
|
||
|
'''Run a killable program. Returns program retcode or -1 if we can't
|
||
|
start it.'''
|
||
|
try:
|
||
|
p = scrub_subprocess(mnt)
|
||
|
killfuncs.add(p.stop)
|
||
|
ret = p.start()
|
||
|
remove_killfunc(killfuncs, p.stop)
|
||
|
return ret
|
||
|
except:
|
||
|
return -1
|
||
|
|
||
|
# systemd doesn't like unit instance names with slashes in them, so it
|
||
|
# replaces them with dashes when it invokes the service. Filesystem paths
|
||
|
# need a special --path argument so that dashes do not get mangled.
|
||
|
def path_to_serviceunit(path):
|
||
|
'''Convert a pathname into a systemd service unit name.'''
|
||
|
|
||
|
svcname = 'bcachefsck@.service'
|
||
|
cmd = ['systemd-escape', '--template', svcname, '--path', path]
|
||
|
|
||
|
proc = subprocess.Popen(cmd, stdout = subprocess.PIPE)
|
||
|
proc.wait()
|
||
|
for line in proc.stdout:
|
||
|
return line.decode(sys.stdout.encoding).strip()
|
||
|
|
||
|
def fibonacci(max_ret):
|
||
|
'''Yield fibonacci sequence up to but not including max_ret.'''
|
||
|
if max_ret < 1:
|
||
|
return
|
||
|
|
||
|
x = 0
|
||
|
y = 1
|
||
|
yield 1
|
||
|
|
||
|
z = x + y
|
||
|
while z <= max_ret:
|
||
|
yield z
|
||
|
x = y
|
||
|
y = z
|
||
|
z = x + y
|
||
|
|
||
|
class scrub_service(scrub_control):
|
||
|
'''Control object for bcachefsck systemd service.'''
|
||
|
def __init__(self, mnt):
|
||
|
self.unitname = path_to_serviceunit(mnt)
|
||
|
self.prop = None
|
||
|
self.unit = None
|
||
|
self.bind()
|
||
|
|
||
|
def bind(self):
|
||
|
'''Bind to the dbus proxy object for this service.'''
|
||
|
sysbus = dbus.SystemBus()
|
||
|
systemd1 = sysbus.get_object('org.freedesktop.systemd1',
|
||
|
'/org/freedesktop/systemd1')
|
||
|
manager = dbus.Interface(systemd1,
|
||
|
'org.freedesktop.systemd1.Manager')
|
||
|
path = manager.LoadUnit(self.unitname)
|
||
|
|
||
|
svc_obj = sysbus.get_object('org.freedesktop.systemd1', path)
|
||
|
self.prop = dbus.Interface(svc_obj,
|
||
|
'org.freedesktop.DBus.Properties')
|
||
|
self.unit = dbus.Interface(svc_obj,
|
||
|
'org.freedesktop.systemd1.Unit')
|
||
|
|
||
|
def __dbusrun(self, lambda_fn):
|
||
|
'''Call the lambda function to execute something on dbus. dbus
|
||
|
exceptions result in retries with Fibonacci backoff, and the
|
||
|
bindings will be rebuilt every time.'''
|
||
|
global debug
|
||
|
|
||
|
fatal_ex = None
|
||
|
|
||
|
for i in fibonacci(30):
|
||
|
try:
|
||
|
return lambda_fn()
|
||
|
except dbus.exceptions.DBusException as e:
|
||
|
if debug:
|
||
|
print(e)
|
||
|
fatal_ex = e
|
||
|
time.sleep(i)
|
||
|
self.bind()
|
||
|
raise fatal_ex
|
||
|
|
||
|
def state(self):
|
||
|
'''Retrieve the active state for a systemd service. As of
|
||
|
systemd 249, this is supposed to be one of the following:
|
||
|
"active", "reloading", "inactive", "failed", "activating",
|
||
|
or "deactivating". These strings are not localized.'''
|
||
|
global debug
|
||
|
|
||
|
l = lambda: self.prop.Get('org.freedesktop.systemd1.Unit',
|
||
|
'ActiveState')
|
||
|
try:
|
||
|
return self.__dbusrun(l)
|
||
|
except Exception as e:
|
||
|
if debug:
|
||
|
print(e, file = sys.stderr)
|
||
|
return 'failed'
|
||
|
|
||
|
def wait(self, interval = 1):
|
||
|
'''Wait until the service finishes.'''
|
||
|
global debug
|
||
|
|
||
|
# Use a poll/sleep loop to wait for the service to finish.
|
||
|
# Avoid adding a dependency on python3 glib, which is required
|
||
|
# to use an event loop to receive a dbus signal.
|
||
|
s = self.state()
|
||
|
while s not in ['failed', 'inactive']:
|
||
|
if debug:
|
||
|
print('waiting %s %s' % (self.unitname, s))
|
||
|
time.sleep(interval)
|
||
|
s = self.state()
|
||
|
if debug:
|
||
|
print('waited %s %s' % (self.unitname, s))
|
||
|
if s == 'failed':
|
||
|
return 1
|
||
|
return 0
|
||
|
|
||
|
def start(self):
|
||
|
'''Start the service and wait for it to complete. Returns -1
|
||
|
if the service was not started, 0 if it succeeded, or 1 if it
|
||
|
failed.'''
|
||
|
global debug
|
||
|
|
||
|
if debug:
|
||
|
print('starting %s' % self.unitname)
|
||
|
|
||
|
try:
|
||
|
self.__dbusrun(lambda: self.unit.Start('replace'))
|
||
|
return self.wait()
|
||
|
except Exception as e:
|
||
|
print(e, file = sys.stderr)
|
||
|
return -1
|
||
|
|
||
|
def stop(self):
|
||
|
'''Stop the service.'''
|
||
|
global debug
|
||
|
|
||
|
if debug:
|
||
|
print('stopping %s' % self.unitname)
|
||
|
|
||
|
try:
|
||
|
self.__dbusrun(lambda: self.unit.Stop('replace'))
|
||
|
return self.wait()
|
||
|
except Exception as e:
|
||
|
print(e, file = sys.stderr)
|
||
|
return -1
|
||
|
|
||
|
def run_service(mnt, killfuncs):
|
||
|
'''Run scrub as a service.'''
|
||
|
try:
|
||
|
svc = scrub_service(mnt)
|
||
|
except:
|
||
|
return -1
|
||
|
|
||
|
killfuncs.add(svc.stop)
|
||
|
retcode = svc.start()
|
||
|
remove_killfunc(killfuncs, svc.stop)
|
||
|
return retcode
|
||
|
|
||
|
def run_scrub(mnt, cond, running_devs, mntdevs, killfuncs):
|
||
|
'''Run a scrub process.'''
|
||
|
global retcode, terminate
|
||
|
|
||
|
print("Scrubbing %s..." % mnt)
|
||
|
sys.stdout.flush()
|
||
|
|
||
|
try:
|
||
|
if terminate:
|
||
|
return
|
||
|
|
||
|
# Run per-mount systemd bcachefsck service only if we ourselves
|
||
|
# are running as a systemd service.
|
||
|
if 'SERVICE_MODE' in os.environ:
|
||
|
ret = run_service(mnt, killfuncs)
|
||
|
if ret == 0 or ret == 1:
|
||
|
print("Scrubbing %s done, (err=%d)" % (mnt, ret))
|
||
|
sys.stdout.flush()
|
||
|
retcode |= ret
|
||
|
return
|
||
|
|
||
|
if terminate:
|
||
|
return
|
||
|
|
||
|
# Invoke bcachefsck manually if we're running in the foreground.
|
||
|
# We also permit this if we're running as a cronjob where
|
||
|
# systemd services are unavailable.
|
||
|
ret = run_subprocess(mnt, killfuncs)
|
||
|
if ret >= 0:
|
||
|
print("Scrubbing %s done, (err=%d)" % (mnt, ret))
|
||
|
sys.stdout.flush()
|
||
|
retcode |= ret
|
||
|
return
|
||
|
|
||
|
if terminate:
|
||
|
return
|
||
|
|
||
|
print("Unable to start scrub tool.")
|
||
|
sys.stdout.flush()
|
||
|
finally:
|
||
|
running_devs -= mntdevs
|
||
|
cond.acquire()
|
||
|
cond.notify()
|
||
|
cond.release()
|
||
|
|
||
|
def signal_scrubs(signum, cond):
|
||
|
'''Handle termination signals by killing bcachefsck children.'''
|
||
|
global debug, terminate
|
||
|
|
||
|
if debug:
|
||
|
print('Signal handler called with signal', signum)
|
||
|
sys.stdout.flush()
|
||
|
|
||
|
terminate = True
|
||
|
cond.acquire()
|
||
|
cond.notify()
|
||
|
cond.release()
|
||
|
|
||
|
def wait_for_termination(cond, killfuncs):
|
||
|
'''Wait for a child thread to terminate. Returns True if we should
|
||
|
abort the program, False otherwise.'''
|
||
|
global debug, terminate
|
||
|
|
||
|
if debug:
|
||
|
print('waiting for threads to terminate')
|
||
|
sys.stdout.flush()
|
||
|
|
||
|
cond.acquire()
|
||
|
try:
|
||
|
cond.wait()
|
||
|
except KeyboardInterrupt:
|
||
|
terminate = True
|
||
|
cond.release()
|
||
|
|
||
|
if not terminate:
|
||
|
return False
|
||
|
|
||
|
print("Terminating...")
|
||
|
sys.stdout.flush()
|
||
|
while len(killfuncs) > 0:
|
||
|
fn = killfuncs.pop()
|
||
|
fn()
|
||
|
return True
|
||
|
|
||
|
def scan_interval(string):
|
||
|
'''Convert a textual scan interval argument into a time delta.'''
|
||
|
|
||
|
if string.endswith('y'):
|
||
|
year = timedelta(seconds = 31556952)
|
||
|
return year * float(string[:-1])
|
||
|
if string.endswith('q'):
|
||
|
return timedelta(days = 90 * float(string[:-1]))
|
||
|
if string.endswith('mo'):
|
||
|
return timedelta(days = 30 * float(string[:-2]))
|
||
|
if string.endswith('w'):
|
||
|
return timedelta(weeks = float(string[:-1]))
|
||
|
if string.endswith('d'):
|
||
|
return timedelta(days = float(string[:-1]))
|
||
|
if string.endswith('h'):
|
||
|
return timedelta(hours = float(string[:-1]))
|
||
|
if string.endswith('m'):
|
||
|
return timedelta(minutes = float(string[:-1]))
|
||
|
if string.endswith('s'):
|
||
|
return timedelta(seconds = float(string[:-1]))
|
||
|
return timedelta(seconds = int(string))
|
||
|
|
||
|
def utcnow():
|
||
|
'''Create a representation of the time right now, in UTC.'''
|
||
|
|
||
|
dt = datetime.utcnow()
|
||
|
return dt.replace(tzinfo = timezone.utc)
|
||
|
|
||
|
def main():
|
||
|
'''Find mounts, schedule bcachefsck runs.'''
|
||
|
def thr(mnt, devs):
|
||
|
a = (mnt, cond, running_devs, devs, killfuncs)
|
||
|
thr = threading.Thread(target = run_scrub, args = a)
|
||
|
thr.start()
|
||
|
global retcode, terminate, debug
|
||
|
|
||
|
parser = argparse.ArgumentParser( \
|
||
|
description = "Scrub all mounted bcachefs filesystems.")
|
||
|
parser.add_argument("--debug", help = "Enabling debugging messages.", \
|
||
|
action = "store_true")
|
||
|
args = parser.parse_args()
|
||
|
|
||
|
if args.debug:
|
||
|
debug = True
|
||
|
|
||
|
fs = find_mounts()
|
||
|
|
||
|
# Schedule scrub jobs...
|
||
|
running_devs = set()
|
||
|
killfuncs = set()
|
||
|
cond = threading.Condition()
|
||
|
|
||
|
signal.signal(signal.SIGINT, lambda s, f: signal_scrubs(s, cond))
|
||
|
signal.signal(signal.SIGTERM, lambda s, f: signal_scrubs(s, cond))
|
||
|
|
||
|
while len(fs) > 0:
|
||
|
if len(running_devs) == 0:
|
||
|
mnt, devs = fs.popitem()
|
||
|
running_devs.update(devs)
|
||
|
thr(mnt, devs)
|
||
|
poppers = set()
|
||
|
for mnt in fs:
|
||
|
devs = fs[mnt]
|
||
|
can_run = True
|
||
|
for dev in devs:
|
||
|
if dev in running_devs:
|
||
|
can_run = False
|
||
|
break
|
||
|
if can_run:
|
||
|
running_devs.update(devs)
|
||
|
poppers.add(mnt)
|
||
|
thr(mnt, devs)
|
||
|
for p in poppers:
|
||
|
fs.pop(p)
|
||
|
|
||
|
# Wait for one thread to finish
|
||
|
if wait_for_termination(cond, killfuncs):
|
||
|
break
|
||
|
|
||
|
# Wait for the rest of the threads to finish
|
||
|
while len(killfuncs) > 0:
|
||
|
wait_for_termination(cond, killfuncs)
|
||
|
|
||
|
# If we're being run as a service, the return code must fit the LSB
|
||
|
# init script action error guidelines, which is to say that we compress
|
||
|
# all errors to 1 ("generic or unspecified error", LSB 5.0 section
|
||
|
# 22.2) and hope the admin will scan the log for what actually
|
||
|
# happened.
|
||
|
#
|
||
|
# We have to sleep 2 seconds here because journald uses the pid to
|
||
|
# connect our log messages to the systemd service. This is critical
|
||
|
# for capturing all the log messages if the scrub fails, because the
|
||
|
# fail service uses the service name to gather log messages for the
|
||
|
# error report.
|
||
|
if 'SERVICE_MODE' in os.environ:
|
||
|
time.sleep(2)
|
||
|
if retcode != 0:
|
||
|
retcode = 1
|
||
|
|
||
|
sys.exit(retcode)
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
main()
|