test improvements

This commit is contained in:
h4sh 2023-09-23 23:05:21 +10:00
parent e5f4fcaad7
commit 47a580dd77
4 changed files with 247 additions and 139 deletions

View File

@ -3,24 +3,28 @@ import pytest
import time
import re
# If a test fails, wait a moment before retrieving the captured
# stdout/stderr. When using a server process, this makes sure that we capture
# any potential output of the server that comes *after* a test has failed. For
# example, if a request handler raises an exception, the server first signals an
# error to FUSE (causing the test to fail), and then logs the exception. Without
# the extra delay, the exception will go into nowhere.
@pytest.mark.hookwrapper
# If a test fails, wait a moment before retrieving the captured stdout/stderr.
# When using a server process, this makes sure that we capture any potential
# output of the server that comes *after* a test has failed. For example, if a
# request handler raises an exception, the server first signals an error to
# FUSE (causing the test to fail), and then logs the exception. Without the
# extra delay, the exception will go into nowhere.
@pytest.hookimpl(hookwrapper=True)
def pytest_pyfunc_call(pyfuncitem):
outcome = yield
failed = outcome.excinfo is not None
if failed:
time.sleep(1)
@pytest.fixture()
def pass_capfd(request, capfd):
'''Provide capfd object to UnitTest instances'''
"""Provide capfd object to UnitTest instances"""
request.instance.capfd = capfd
def check_test_output(capfd):
(stdout, stderr) = capfd.readouterr()
@ -31,39 +35,57 @@ def check_test_output(capfd):
# Strip out false positives
for (pattern, flags, count) in capfd.false_positives:
cp = re.compile(pattern, flags)
(stdout, cnt) = cp.subn('', stdout, count=count)
(stdout, cnt) = cp.subn("", stdout, count=count)
if count == 0 or count - cnt > 0:
stderr = cp.sub('', stderr, count=count - cnt)
stderr = cp.sub("", stderr, count=count - cnt)
patterns = [ r'\b{}\b'.format(x) for x in
('exception', 'error', 'warning', 'fatal', 'traceback',
'fault', 'crash(?:ed)?', 'abort(?:ed)',
'uninitiali[zs]ed') ]
patterns += ['^==[0-9]+== ']
patterns = [
r"\b{}\b".format(x)
for x in (
"exception",
"error",
"warning",
"fatal",
"traceback",
"fault",
"crash(?:ed)?",
"abort(?:ed)",
"uninitiali[zs]ed",
)
]
patterns += ["^==[0-9]+== "]
for pattern in patterns:
cp = re.compile(pattern, re.IGNORECASE | re.MULTILINE)
hit = cp.search(stderr)
if hit:
raise AssertionError('Suspicious output to stderr (matched "%s")' % hit.group(0))
raise AssertionError(
'Suspicious output to stderr (matched "%s")' % hit.group(0)
)
hit = cp.search(stdout)
if hit:
raise AssertionError('Suspicious output to stdout (matched "%s")' % hit.group(0))
raise AssertionError(
'Suspicious output to stdout (matched "%s")' % hit.group(0)
)
def register_output(self, pattern, count=1, flags=re.MULTILINE):
'''Register *pattern* as false positive for output checking
"""Register *pattern* as false positive for output checking
This prevents the test from failing because the output otherwise
appears suspicious.
'''
"""
self.false_positives.append((pattern, flags, count))
# This is a terrible hack that allows us to access the fixtures from the
# pytest_runtest_call hook. Among a lot of other hidden assumptions, it probably
# relies on tests running sequential (i.e., don't dare to use e.g. the xdist
# plugin)
current_capfd = None
@pytest.yield_fixture(autouse=True)
@pytest.fixture(autouse=True)
def save_cap_fixtures(request, capfd):
global current_capfd
capfd.false_positives = []
@ -71,7 +93,7 @@ def save_cap_fixtures(request, capfd):
# Monkeypatch in a function to register false positives
type(capfd).register_output = register_output
if request.config.getoption('capture') == 'no':
if request.config.getoption("capture") == "no":
capfd = None
current_capfd = capfd
bak = current_capfd
@ -82,6 +104,7 @@ def save_cap_fixtures(request, capfd):
assert bak is current_capfd
current_capfd = None
@pytest.hookimpl(trylast=True)
def pytest_runtest_call(item):
capfd = current_capfd

View File

@ -1,2 +1,4 @@
[pytest]
addopts = --verbose --assert=rewrite --tb=native -x -r a
markers = uses_fuse: Mark to indicate that FUSE is available on the system running the test

View File

@ -1,8 +1,9 @@
#!/usr/bin/env python3
if __name__ == '__main__':
if __name__ == "__main__":
import pytest
import sys
sys.exit(pytest.main([__file__] + sys.argv[1:]))
import subprocess
@ -13,31 +14,60 @@ import stat
import shutil
import filecmp
import errno
from contextlib import contextmanager
from tempfile import NamedTemporaryFile
from util import (wait_for_mount, umount, cleanup, base_cmdline,
basename, fuse_test_marker, safe_sleep)
from util import (
wait_for_mount,
umount,
cleanup,
base_cmdline,
basename,
fuse_test_marker,
safe_sleep,
os_create,
os_open,
)
from os.path import join as pjoin
TEST_FILE = __file__
pytestmark = fuse_test_marker()
with open(TEST_FILE, 'rb') as fh:
with open(TEST_FILE, "rb") as fh:
TEST_DATA = fh.read()
def name_generator(__ctr=[0]):
__ctr[0] += 1
return 'testfile_%d' % __ctr[0]
@pytest.mark.parametrize("debug", (False, True))
@pytest.mark.parametrize("cache_timeout", (0,1))
@pytest.mark.parametrize("sync_rd", (True, False))
@pytest.mark.parametrize("multiconn", (True,False))
def test_sshfs(tmpdir, debug, cache_timeout, sync_rd, multiconn, capfd):
def name_generator(__ctr=[0]) -> str:
"""Generate a fresh filename on each call"""
__ctr[0] += 1
return f"testfile_{__ctr[0]}"
@pytest.mark.parametrize(
"debug",
[pytest.param(False, id="debug=false"), pytest.param(True, id="debug=true")],
)
@pytest.mark.parametrize(
"cache_timeout",
[pytest.param(0, id="cache_timeout=0"), pytest.param(1, id="cache_timeout=1")],
)
@pytest.mark.parametrize(
"sync_rd",
[pytest.param(True, id="sync_rd=true"), pytest.param(False, id="sync_rd=false")],
)
@pytest.mark.parametrize(
"multiconn",
[
pytest.param(True, id="multiconn=true"),
pytest.param(False, id="multiconn=false"),
],
)
def test_sshfs(
tmpdir, debug: bool, cache_timeout: int, sync_rd: bool, multiconn: bool, capfd
) -> None:
# Avoid false positives from debug messages
#if debug:
# if debug:
# capfd.register_output(r'^ unique: [0-9]+, error: -[0-9]+ .+$',
# count=0)
@ -46,45 +76,60 @@ def test_sshfs(tmpdir, debug, cache_timeout, sync_rd, multiconn, capfd):
# Test if we can ssh into localhost without password
try:
res = subprocess.call(['ssh', '-o', 'KbdInteractiveAuthentication=no',
'-o', 'ChallengeResponseAuthentication=no',
'-o', 'PasswordAuthentication=no',
'localhost', '--', 'true'], stdin=subprocess.DEVNULL,
timeout=10)
res = subprocess.call(
[
"ssh",
"-o",
"StrictHostKeyChecking=no",
"-o",
"KbdInteractiveAuthentication=no",
"-o",
"ChallengeResponseAuthentication=no",
"-o",
"PasswordAuthentication=no",
"localhost",
"--",
"true",
],
stdin=subprocess.DEVNULL,
timeout=10,
)
except subprocess.TimeoutExpired:
res = 1
if res != 0:
pytest.fail('Unable to ssh into localhost without password prompt.')
pytest.fail("Unable to ssh into localhost without password prompt.")
mnt_dir = str(tmpdir.mkdir('mnt'))
src_dir = str(tmpdir.mkdir('src'))
mnt_dir = str(tmpdir.mkdir("mnt"))
src_dir = str(tmpdir.mkdir("src"))
cmdline = base_cmdline + [ pjoin(basename, 'sshfs'),
'-f', 'localhost:' + src_dir, mnt_dir ]
cmdline = base_cmdline + [
pjoin(basename, "sshfs"),
"-f",
f"localhost:{src_dir}",
mnt_dir,
]
if debug:
cmdline += [ '-o', 'sshfs_debug' ]
cmdline += ["-o", "sshfs_debug"]
if sync_rd:
cmdline += [ '-o', 'sync_readdir' ]
cmdline += ["-o", "sync_readdir"]
# SSHFS Cache
if cache_timeout == 0:
cmdline += [ '-o', 'dir_cache=no' ]
cmdline += ["-o", "dir_cache=no"]
else:
cmdline += [ '-o', 'dcache_timeout=%d' % cache_timeout,
'-o', 'dir_cache=yes' ]
cmdline += ["-o", f"dcache_timeout={cache_timeout}", "-o", "dir_cache=yes"]
# FUSE Cache
cmdline += [ '-o', 'entry_timeout=0',
'-o', 'attr_timeout=0' ]
cmdline += ["-o", "entry_timeout=0", "-o", "attr_timeout=0"]
if multiconn:
cmdline += [ '-o', 'max_conns=3' ]
cmdline += ["-o", "max_conns=3"]
new_env = dict(os.environ) # copy, don't modify
new_env = dict(os.environ) # copy, don't modify
# Abort on warnings from glib
new_env['G_DEBUG'] = 'fatal-warnings'
new_env["G_DEBUG"] = "fatal-warnings"
mount_process = subprocess.Popen(cmdline, env=new_env)
try:
@ -114,30 +159,20 @@ def test_sshfs(tmpdir, debug, cache_timeout, sync_rd, multiconn, capfd):
tst_truncate_path(mnt_dir)
tst_truncate_fd(mnt_dir)
tst_open_unlink(mnt_dir)
except:
except Exception as exc:
cleanup(mount_process, mnt_dir)
raise
raise exc
else:
umount(mount_process, mnt_dir)
@contextmanager
def os_open(name, flags):
fd = os.open(name, flags)
try:
yield fd
finally:
os.close(fd)
def os_create(name):
os.close(os.open(name, os.O_CREAT | os.O_RDWR))
def tst_unlink(src_dir, mnt_dir, cache_timeout):
name = name_generator()
fullname = mnt_dir + "/" + name
with open(pjoin(src_dir, name), 'wb') as fh:
fh.write(b'hello')
with open(pjoin(src_dir, name), "wb") as fh:
fh.write(b"hello")
if cache_timeout:
safe_sleep(cache_timeout+1)
safe_sleep(cache_timeout + 1)
assert name in os.listdir(mnt_dir)
os.unlink(fullname)
with pytest.raises(OSError) as exc_info:
@ -146,22 +181,24 @@ def tst_unlink(src_dir, mnt_dir, cache_timeout):
assert name not in os.listdir(mnt_dir)
assert name not in os.listdir(src_dir)
def tst_mkdir(mnt_dir):
dirname = name_generator()
fullname = mnt_dir + "/" + dirname
os.mkdir(fullname)
fstat = os.stat(fullname)
assert stat.S_ISDIR(fstat.st_mode)
assert os.listdir(fullname) == []
assert fstat.st_nlink in (1,2)
assert os.listdir(fullname) == []
assert fstat.st_nlink in (1, 2)
assert dirname in os.listdir(mnt_dir)
def tst_rmdir(src_dir, mnt_dir, cache_timeout):
name = name_generator()
fullname = mnt_dir + "/" + name
os.mkdir(pjoin(src_dir, name))
if cache_timeout:
safe_sleep(cache_timeout+1)
safe_sleep(cache_timeout + 1)
assert name in os.listdir(mnt_dir)
os.rmdir(fullname)
with pytest.raises(OSError) as exc_info:
@ -170,6 +207,7 @@ def tst_rmdir(src_dir, mnt_dir, cache_timeout):
assert name not in os.listdir(mnt_dir)
assert name not in os.listdir(src_dir)
def tst_symlink(mnt_dir):
linkname = name_generator()
fullname = mnt_dir + "/" + linkname
@ -180,6 +218,7 @@ def tst_symlink(mnt_dir):
assert fstat.st_nlink == 1
assert linkname in os.listdir(mnt_dir)
def tst_create(mnt_dir):
name = name_generator()
fullname = pjoin(mnt_dir, name)
@ -197,6 +236,7 @@ def tst_create(mnt_dir):
assert fstat.st_nlink == 1
assert fstat.st_size == 0
def tst_chown(mnt_dir):
filename = pjoin(mnt_dir, name_generator())
os.mkdir(filename)
@ -216,37 +256,38 @@ def tst_chown(mnt_dir):
assert fstat.st_uid == uid_new
assert fstat.st_gid == gid_new
def tst_open_read(src_dir, mnt_dir):
name = name_generator()
with open(pjoin(src_dir, name), 'wb') as fh_out, \
open(TEST_FILE, 'rb') as fh_in:
with open(pjoin(src_dir, name), "wb") as fh_out, open(TEST_FILE, "rb") as fh_in:
shutil.copyfileobj(fh_in, fh_out)
assert filecmp.cmp(pjoin(mnt_dir, name), TEST_FILE, False)
def tst_open_write(src_dir, mnt_dir):
name = name_generator()
fd = os.open(pjoin(src_dir, name),
os.O_CREAT | os.O_RDWR)
fd = os.open(pjoin(src_dir, name), os.O_CREAT | os.O_RDWR)
os.close(fd)
fullname = pjoin(mnt_dir, name)
with open(fullname, 'wb') as fh_out, \
open(TEST_FILE, 'rb') as fh_in:
with open(fullname, "wb") as fh_out, open(TEST_FILE, "rb") as fh_in:
shutil.copyfileobj(fh_in, fh_out)
assert filecmp.cmp(fullname, TEST_FILE, False)
def tst_append(src_dir, mnt_dir):
name = name_generator()
os_create(pjoin(src_dir, name))
fullname = pjoin(mnt_dir, name)
with os_open(fullname, os.O_WRONLY) as fd:
os.write(fd, b'foo\n')
with os_open(fullname, os.O_WRONLY|os.O_APPEND) as fd:
os.write(fd, b'bar\n')
os.write(fd, b"foo\n")
with os_open(fullname, os.O_WRONLY | os.O_APPEND) as fd:
os.write(fd, b"bar\n")
with open(fullname, "rb") as fh:
assert fh.read() == b"foo\nbar\n"
with open(fullname, 'rb') as fh:
assert fh.read() == b'foo\nbar\n'
def tst_seek(src_dir, mnt_dir):
name = name_generator()
@ -254,20 +295,21 @@ def tst_seek(src_dir, mnt_dir):
fullname = pjoin(mnt_dir, name)
with os_open(fullname, os.O_WRONLY) as fd:
os.lseek(fd, 1, os.SEEK_SET)
os.write(fd, b'foobar\n')
os.write(fd, b"foobar\n")
with os_open(fullname, os.O_WRONLY) as fd:
os.lseek(fd, 4, os.SEEK_SET)
os.write(fd, b'com')
os.write(fd, b"com")
with open(fullname, "rb") as fh:
assert fh.read() == b"\0foocom\n"
with open(fullname, 'rb') as fh:
assert fh.read() == b'\0foocom\n'
def tst_open_unlink(mnt_dir):
name = pjoin(mnt_dir, name_generator())
data1 = b'foo'
data2 = b'bar'
data1 = b"foo"
data2 = b"bar"
fullname = pjoin(mnt_dir, name)
with open(fullname, 'wb+', buffering=0) as fh:
with open(fullname, "wb+", buffering=0) as fh:
fh.write(data1)
os.unlink(fullname)
with pytest.raises(OSError) as exc_info:
@ -276,11 +318,13 @@ def tst_open_unlink(mnt_dir):
assert name not in os.listdir(mnt_dir)
fh.write(data2)
fh.seek(0)
assert fh.read() == data1+data2
assert fh.read() == data1 + data2
def tst_statvfs(mnt_dir):
os.statvfs(mnt_dir)
def tst_link(mnt_dir, cache_timeout):
name1 = pjoin(mnt_dir, name_generator())
name2 = pjoin(mnt_dir, name_generator())
@ -302,8 +346,16 @@ def tst_link(mnt_dir, cache_timeout):
fstat1 = os.lstat(name1)
fstat2 = os.lstat(name2)
for attr in ('st_mode', 'st_dev', 'st_uid', 'st_gid',
'st_size', 'st_atime', 'st_mtime', 'st_ctime'):
for attr in (
"st_mode",
"st_dev",
"st_uid",
"st_gid",
"st_size",
"st_atime",
"st_mtime",
"st_ctime",
):
assert getattr(fstat1, attr) == getattr(fstat2, attr)
assert os.path.basename(name2) in os.listdir(mnt_dir)
assert filecmp.cmp(name1, name2, False)
@ -316,6 +368,7 @@ def tst_link(mnt_dir, cache_timeout):
os.unlink(name1)
def tst_readdir(src_dir, mnt_dir):
newdir = name_generator()
src_newdir = pjoin(src_dir, newdir)
@ -331,7 +384,7 @@ def tst_readdir(src_dir, mnt_dir):
listdir_is = os.listdir(mnt_newdir)
listdir_is.sort()
listdir_should = [ os.path.basename(file_), os.path.basename(subdir) ]
listdir_should = [os.path.basename(file_), os.path.basename(subdir)]
listdir_should.sort()
assert listdir_is == listdir_should
@ -340,11 +393,12 @@ def tst_readdir(src_dir, mnt_dir):
os.rmdir(subdir)
os.rmdir(src_newdir)
def tst_truncate_path(mnt_dir):
assert len(TEST_DATA) > 1024
filename = pjoin(mnt_dir, name_generator())
with open(filename, 'wb') as fh:
with open(filename, "wb") as fh:
fh.write(TEST_DATA)
fstat = os.stat(filename)
@ -354,21 +408,22 @@ def tst_truncate_path(mnt_dir):
# Add zeros at the end
os.truncate(filename, size + 1024)
assert os.stat(filename).st_size == size + 1024
with open(filename, 'rb') as fh:
with open(filename, "rb") as fh:
assert fh.read(size) == TEST_DATA
assert fh.read(1025) == b'\0' * 1024
assert fh.read(1025) == b"\0" * 1024
# Truncate data
os.truncate(filename, size - 1024)
assert os.stat(filename).st_size == size - 1024
with open(filename, 'rb') as fh:
assert fh.read(size) == TEST_DATA[:size-1024]
with open(filename, "rb") as fh:
assert fh.read(size) == TEST_DATA[: size - 1024]
os.unlink(filename)
def tst_truncate_fd(mnt_dir):
assert len(TEST_DATA) > 1024
with NamedTemporaryFile('w+b', 0, dir=mnt_dir) as fh:
with NamedTemporaryFile("w+b", 0, dir=mnt_dir) as fh:
fd = fh.fileno()
fh.write(TEST_DATA)
fstat = os.fstat(fd)
@ -380,13 +435,14 @@ def tst_truncate_fd(mnt_dir):
assert os.fstat(fd).st_size == size + 1024
fh.seek(0)
assert fh.read(size) == TEST_DATA
assert fh.read(1025) == b'\0' * 1024
assert fh.read(1025) == b"\0" * 1024
# Truncate data
os.ftruncate(fd, size - 1024)
assert os.fstat(fd).st_size == size - 1024
fh.seek(0)
assert fh.read(size) == TEST_DATA[:size-1024]
assert fh.read(size) == TEST_DATA[: size - 1024]
def tst_utimens(mnt_dir, tol=0):
filename = pjoin(mnt_dir, name_generator())
@ -395,20 +451,21 @@ def tst_utimens(mnt_dir, tol=0):
atime = fstat.st_atime + 42.28
mtime = fstat.st_mtime - 42.23
if sys.version_info < (3,3):
if sys.version_info < (3, 3):
os.utime(filename, (atime, mtime))
else:
atime_ns = fstat.st_atime_ns + int(42.28*1e9)
mtime_ns = fstat.st_mtime_ns - int(42.23*1e9)
atime_ns = fstat.st_atime_ns + int(42.28 * 1e9)
mtime_ns = fstat.st_mtime_ns - int(42.23 * 1e9)
os.utime(filename, None, ns=(atime_ns, mtime_ns))
fstat = os.lstat(filename)
assert abs(fstat.st_atime - atime) < tol
assert abs(fstat.st_mtime - mtime) < tol
if sys.version_info >= (3,3):
assert abs(fstat.st_atime_ns - atime_ns) < tol*1e9
assert abs(fstat.st_mtime_ns - mtime_ns) < tol*1e9
if sys.version_info >= (3, 3):
assert abs(fstat.st_atime_ns - atime_ns) < tol * 1e9
assert abs(fstat.st_mtime_ns - mtime_ns) < tol * 1e9
def tst_utimens_now(mnt_dir):
fullname = pjoin(mnt_dir, name_generator())
@ -422,17 +479,18 @@ def tst_utimens_now(mnt_dir):
assert fstat.st_atime != 0
assert fstat.st_mtime != 0
def tst_passthrough(src_dir, mnt_dir, cache_timeout):
name = name_generator()
src_name = pjoin(src_dir, name)
mnt_name = pjoin(src_dir, name)
assert name not in os.listdir(src_dir)
assert name not in os.listdir(mnt_dir)
with open(src_name, 'w') as fh:
fh.write('Hello, world')
with open(src_name, "w") as fh:
fh.write("Hello, world")
assert name in os.listdir(src_dir)
if cache_timeout:
safe_sleep(cache_timeout+1)
safe_sleep(cache_timeout + 1)
assert name in os.listdir(mnt_dir)
assert os.stat(src_name) == os.stat(mnt_name)
@ -441,10 +499,10 @@ def tst_passthrough(src_dir, mnt_dir, cache_timeout):
mnt_name = pjoin(src_dir, name)
assert name not in os.listdir(src_dir)
assert name not in os.listdir(mnt_dir)
with open(mnt_name, 'w') as fh:
fh.write('Hello, world')
with open(mnt_name, "w") as fh:
fh.write("Hello, world")
assert name in os.listdir(src_dir)
if cache_timeout:
safe_sleep(cache_timeout+1)
safe_sleep(cache_timeout + 1)
assert name in os.listdir(mnt_dir)
assert os.stat(src_name) == os.stat(mnt_name)

View File

@ -5,25 +5,42 @@ import os
import stat
import time
from os.path import join as pjoin
from contextlib import contextmanager
basename = pjoin(os.path.dirname(__file__), '..')
basename = pjoin(os.path.dirname(__file__), "..")
def wait_for_mount(mount_process, mnt_dir,
test_fn=os.path.ismount):
def os_create(name):
os.close(os.open(name, os.O_CREAT | os.O_RDWR))
@contextmanager
def os_open(name, flags):
fd = os.open(name, flags)
try:
yield fd
finally:
os.close(fd)
def wait_for_mount(mount_process, mnt_dir, test_fn=os.path.ismount):
elapsed = 0
while elapsed < 30:
if test_fn(mnt_dir):
return True
if mount_process.poll() is not None:
pytest.fail('file system process terminated prematurely')
pytest.fail("file system process terminated prematurely")
time.sleep(0.1)
elapsed += 0.1
pytest.fail("mountpoint failed to come up")
def cleanup(mount_process, mnt_dir):
subprocess.call(['fusermount', '-z', '-u', mnt_dir],
stdout=subprocess.DEVNULL,
stderr=subprocess.STDOUT)
subprocess.call(
["fusermount", "-z", "-u", mnt_dir],
stdout=subprocess.DEVNULL,
stderr=subprocess.STDOUT,
)
mount_process.terminate()
try:
mount_process.wait(1)
@ -32,7 +49,7 @@ def cleanup(mount_process, mnt_dir):
def umount(mount_process, mnt_dir):
subprocess.check_call(['fusermount3', '-z', '-u', mnt_dir ])
subprocess.check_call(["fusermount3", "-z", "-u", mnt_dir])
assert not os.path.ismount(mnt_dir)
# Give mount process a little while to terminate. Popen.wait(timeout)
@ -43,18 +60,19 @@ def umount(mount_process, mnt_dir):
if code is not None:
if code == 0:
return
pytest.fail('file system process terminated with code %s' % (code,))
pytest.fail(f"file system process terminated with code {code}")
time.sleep(0.1)
elapsed += 0.1
pytest.fail('mount process did not terminate')
pytest.fail("mount process did not terminate")
def safe_sleep(secs):
'''Like time.sleep(), but sleep for at least *secs*
"""Like time.sleep(), but sleep for at least *secs*
`time.sleep` may sleep less than the given period if a signal is
received. This function ensures that we sleep for at least the
desired time.
'''
"""
now = time.time()
end = now + secs
@ -62,24 +80,27 @@ def safe_sleep(secs):
time.sleep(end - now)
now = time.time()
def fuse_test_marker():
'''Return a pytest.marker that indicates FUSE availability
"""Return a pytest.marker that indicates FUSE availability
If system/user/environment does not support FUSE, return
a `pytest.mark.skip` object with more details. If FUSE is
supported, return `pytest.mark.uses_fuse()`.
'''
"""
skip = lambda x: pytest.mark.skip(reason=x)
def skip(reason: str):
return pytest.mark.skip(reason=reason)
with subprocess.Popen(['which', 'fusermount'], stdout=subprocess.PIPE,
universal_newlines=True) as which:
with subprocess.Popen(
["which", "fusermount"], stdout=subprocess.PIPE, universal_newlines=True
) as which:
fusermount_path = which.communicate()[0].strip()
if not fusermount_path or which.returncode != 0:
return skip("Can't find fusermount executable")
if not os.path.exists('/dev/fuse'):
if not os.path.exists("/dev/fuse"):
return skip("FUSE kernel module does not seem to be loaded")
if os.getuid() == 0:
@ -87,20 +108,24 @@ def fuse_test_marker():
mode = os.stat(fusermount_path).st_mode
if mode & stat.S_ISUID == 0:
return skip('fusermount executable not setuid, and we are not root.')
return skip("fusermount executable not setuid, and we are not root.")
try:
fd = os.open('/dev/fuse', os.O_RDWR)
fd = os.open("/dev/fuse", os.O_RDWR)
except OSError as exc:
return skip('Unable to open /dev/fuse: %s' % exc.strerror)
return skip(f"Unable to open /dev/fuse: {exc.strerror}")
else:
os.close(fd)
return pytest.mark.uses_fuse()
# Use valgrind if requested
if os.environ.get('TEST_WITH_VALGRIND', 'no').lower().strip() \
not in ('no', 'false', '0'):
base_cmdline = [ 'valgrind', '-q', '--' ]
if os.environ.get("TEST_WITH_VALGRIND", "no").lower().strip() not in (
"no",
"false",
"0",
):
base_cmdline = ["valgrind", "-q", "--"]
else:
base_cmdline = []