Added unit tests and travis integration

This commit is contained in:
Nikolaus Rath 2017-06-20 13:53:55 -07:00
parent b66ecb9c3a
commit 5f4619bac3
15 changed files with 698 additions and 6 deletions

3
.gitignore vendored
View File

@ -6,8 +6,6 @@
# NOTE! Please use 'git ls-files -i --exclude-standard'
# command after changing this file, to see if there are
# any tracked files which get ignored after the change.
.*
!.gitignore
*.o
*.lo
*.la
@ -35,3 +33,4 @@ sshfs.1
/.pc
/patches
/m4
.deps/

20
.travis.yml Normal file
View File

@ -0,0 +1,20 @@
sudo: required
dist: trusty
language:
- c
addons:
apt:
sources:
- ubuntu-toolchain-r-test
packages:
- valgrind
- clang
- gcc
- gcc-6
- fuse
- libfuse2
- libfuse-dev
install: test/travis-install.sh
script: test/travis-build.sh

View File

@ -1,6 +1,7 @@
Unreleased Changes
------------------
* Added unit tests
* Documented limited hardlink support.
* Added support for building with Meson.
* Added support for more SSH options.

View File

@ -48,12 +48,10 @@ be available from your operating system's package manager).
To build and install, we recommend to use Meson_ (version 0.38 or
newer) and Ninja_. After extracting the sshfs tarball, create a
(temporary) build directory and run Meson and Ninja::
(temporary) build directory and run Meson::
$ md build; cd build
$ meson ..
$ ninja
$ sudo ninja install
Normally, the default build options will work fine. If you
nevertheless want to adjust them, you can do so with the *mesonconf*
@ -61,13 +59,20 @@ command::
$ mesonconf # list options
$ mesonconf -D strip=true # set an option
$ ninja # rebuild
To build, test and install SSHFS, you then use Ninja (running the
tests requires the `py.test`_ Python module)::
$ ninja
$ python3 -m pytest test/ # optional, but recommended
$ sudo ninja install
.. _libfuse: http://github.com/libfuse/libfuse
.. _OSXFUSE: https://osxfuse.github.io/
.. _Glib: https://developer.gnome.org/glib/stable/
.. _Meson: http://mesonbuild.com/
.. _Ninja: https://ninja-build.org/
.. _`py.test`: http://www.pytest.org/
Alternate Installation
----------------------

View File

@ -59,3 +59,6 @@ executable('sshfs', sshfs_sources,
# This is a little ugly. Is there a better way to tell Meson that the
# manpage is in the build directory?
install_man(join_paths(meson.current_build_dir(), 'sshfs.1'))
subdir('test')

1
test/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
__pycache__/

89
test/conftest.py Normal file
View File

@ -0,0 +1,89 @@
import sys
import pytest
import time
import re
# If a test fails, wait a moment before retrieving the captured
# stdout/stderr. When using a server process, this makes sure that we capture
# any potential output of the server that comes *after* a test has failed. For
# example, if a request handler raises an exception, the server first signals an
# error to FUSE (causing the test to fail), and then logs the exception. Without
# the extra delay, the exception will go into nowhere.
@pytest.mark.hookwrapper
def pytest_pyfunc_call(pyfuncitem):
outcome = yield
failed = outcome.excinfo is not None
if failed:
time.sleep(1)
@pytest.fixture()
def pass_capfd(request, capfd):
'''Provide capfd object to UnitTest instances'''
request.instance.capfd = capfd
def check_test_output(capfd):
(stdout, stderr) = capfd.readouterr()
# Write back what we've read (so that it will still be printed.
sys.stdout.write(stdout)
sys.stderr.write(stderr)
# Strip out false positives
for (pattern, flags, count) in capfd.false_positives:
cp = re.compile(pattern, flags)
(stdout, cnt) = cp.subn('', stdout, count=count)
if count == 0 or count - cnt > 0:
stderr = cp.sub('', stderr, count=count - cnt)
patterns = [ r'\b{}\b'.format(x) for x in
('exception', 'error', 'warning', 'fatal', 'traceback',
'fault', 'crash(?:ed)?', 'abort(?:ed)',
'uninitiali[zs]ed') ]
patterns += ['^==[0-9]+== ']
for pattern in patterns:
cp = re.compile(pattern, re.IGNORECASE | re.MULTILINE)
hit = cp.search(stderr)
if hit:
raise AssertionError('Suspicious output to stderr (matched "%s")' % hit.group(0))
hit = cp.search(stdout)
if hit:
raise AssertionError('Suspicious output to stdout (matched "%s")' % hit.group(0))
def register_output(self, pattern, count=1, flags=re.MULTILINE):
'''Register *pattern* as false positive for output checking
This prevents the test from failing because the output otherwise
appears suspicious.
'''
self.false_positives.append((pattern, flags, count))
# This is a terrible hack that allows us to access the fixtures from the
# pytest_runtest_call hook. Among a lot of other hidden assumptions, it probably
# relies on tests running sequential (i.e., don't dare to use e.g. the xdist
# plugin)
current_capfd = None
@pytest.yield_fixture(autouse=True)
def save_cap_fixtures(request, capfd):
global current_capfd
capfd.false_positives = []
# Monkeypatch in a function to register false positives
type(capfd).register_output = register_output
if request.config.getoption('capture') == 'no':
capfd = None
current_capfd = capfd
bak = current_capfd
yield
# Try to catch problems with this hack (e.g. when running tests
# simultaneously)
assert bak is current_capfd
current_capfd = None
@pytest.hookimpl(trylast=True)
def pytest_runtest_call(item):
capfd = current_capfd
if capfd is not None:
check_test_output(capfd)

11
test/lsan_suppress.txt Normal file
View File

@ -0,0 +1,11 @@
# Suppression file for address sanitizer.
# There are some leaks in command line option parsing. They should be
# fixed at some point, but are harmless since the consume just a small,
# constant amount of memory and do not grow.
leak:fuse_opt_parse
# Leaks in fusermount3 are harmless as well (it's a short-lived
# process) - but patches are welcome!
leak:fusermount.c

11
test/meson.build Normal file
View File

@ -0,0 +1,11 @@
test_scripts = [ 'conftest.py', 'pytest.ini', 'test_sshfs.py',
'util.py' ]
custom_target('test_scripts', input: test_scripts,
output: test_scripts, build_by_default: true,
command: ['cp', '-fPu', '--preserve=mode',
'@INPUT@', meson.current_build_dir() ])
# Provide something helpful when running 'ninja test'
wrong_cmd = executable('wrong_command', 'wrong_command.c',
install: false)
test('wrong_cmd', wrong_cmd)

2
test/pytest.ini Normal file
View File

@ -0,0 +1,2 @@
[pytest]
addopts = --verbose --assert=rewrite --tb=native -x -r a

372
test/test_sshfs.py Executable file
View File

@ -0,0 +1,372 @@
#!/usr/bin/env python3
if __name__ == '__main__':
import pytest
import sys
sys.exit(pytest.main([__file__] + sys.argv[1:]))
import subprocess
import os
import sys
import pytest
import stat
import shutil
import filecmp
import errno
from tempfile import NamedTemporaryFile
from util import (wait_for_mount, umount, cleanup, base_cmdline,
basename, fuse_test_marker, safe_sleep)
from os.path import join as pjoin
TEST_FILE = __file__
pytestmark = fuse_test_marker()
with open(TEST_FILE, 'rb') as fh:
TEST_DATA = fh.read()
def name_generator(__ctr=[0]):
__ctr[0] += 1
return 'testfile_%d' % __ctr[0]
@pytest.mark.parametrize("debug", (False, True))
@pytest.mark.parametrize("cache_timeout", (0, 1))
def test_sshfs(tmpdir, debug, cache_timeout, capfd):
# Avoid false positives from debug messages
#if debug:
# capfd.register_output(r'^ unique: [0-9]+, error: -[0-9]+ .+$',
# count=0)
# Test if we can ssh into localhost without password
try:
res = subprocess.call(['ssh', '-o', 'KbdInteractiveAuthentication=no',
'-o', 'ChallengeResponseAuthentication=no',
'-o', 'PasswordAuthentication=no',
'localhost', '--', 'true'], stdin=subprocess.DEVNULL,
timeout=10)
except subprocess.TimeoutExpired:
res = 1
if res != 0:
pytest.fail('Unable to ssh into localhost without password prompt.')
mnt_dir = str(tmpdir.mkdir('mnt'))
src_dir = str(tmpdir.mkdir('src'))
cmdline = base_cmdline + [ pjoin(basename, 'sshfs'),
'-f', 'localhost:' + src_dir, mnt_dir ]
if debug:
cmdline += [ '-o', 'sshfs_debug' ]
# SSHFS Cache
if cache_timeout == 0:
cmdline += [ '-o', 'cache=no' ]
else:
cmdline += [ '-o', 'cache_timeout=%d' % cache_timeout ]
# FUSE Cache
cmdline += [ '-o', 'entry_timeout=0',
'-o', 'attr_timeout=0' ]
mount_process = subprocess.Popen(cmdline)
try:
wait_for_mount(mount_process, mnt_dir)
tst_statvfs(mnt_dir)
tst_readdir(src_dir, mnt_dir)
tst_open_read(src_dir, mnt_dir)
tst_open_write(src_dir, mnt_dir)
tst_create(mnt_dir)
tst_passthrough(src_dir, mnt_dir, cache_timeout)
tst_mkdir(mnt_dir)
tst_rmdir(src_dir, mnt_dir, cache_timeout)
tst_unlink(src_dir, mnt_dir, cache_timeout)
tst_symlink(mnt_dir)
if os.getuid() == 0:
tst_chown(mnt_dir)
# SSHFS only supports one second resolution when setting
# file timestamps.
tst_utimens(mnt_dir, tol=1)
tst_link(mnt_dir)
tst_truncate_path(mnt_dir)
tst_truncate_fd(mnt_dir)
tst_open_unlink(mnt_dir)
except:
cleanup(mnt_dir)
raise
else:
umount(mount_process, mnt_dir)
def tst_unlink(src_dir, mnt_dir, cache_timeout):
name = name_generator()
fullname = mnt_dir + "/" + name
with open(pjoin(src_dir, name), 'wb') as fh:
fh.write(b'hello')
if cache_timeout:
safe_sleep(cache_timeout+1)
assert name in os.listdir(mnt_dir)
os.unlink(fullname)
with pytest.raises(OSError) as exc_info:
os.stat(fullname)
assert exc_info.value.errno == errno.ENOENT
assert name not in os.listdir(mnt_dir)
assert name not in os.listdir(src_dir)
def tst_mkdir(mnt_dir):
dirname = name_generator()
fullname = mnt_dir + "/" + dirname
os.mkdir(fullname)
fstat = os.stat(fullname)
assert stat.S_ISDIR(fstat.st_mode)
assert os.listdir(fullname) == []
assert fstat.st_nlink in (1,2)
assert dirname in os.listdir(mnt_dir)
def tst_rmdir(src_dir, mnt_dir, cache_timeout):
name = name_generator()
fullname = mnt_dir + "/" + name
os.mkdir(pjoin(src_dir, name))
if cache_timeout:
safe_sleep(cache_timeout+1)
assert name in os.listdir(mnt_dir)
os.rmdir(fullname)
with pytest.raises(OSError) as exc_info:
os.stat(fullname)
assert exc_info.value.errno == errno.ENOENT
assert name not in os.listdir(mnt_dir)
assert name not in os.listdir(src_dir)
def tst_symlink(mnt_dir):
linkname = name_generator()
fullname = mnt_dir + "/" + linkname
os.symlink("/imaginary/dest", fullname)
fstat = os.lstat(fullname)
assert stat.S_ISLNK(fstat.st_mode)
assert os.readlink(fullname) == "/imaginary/dest"
assert fstat.st_nlink == 1
assert linkname in os.listdir(mnt_dir)
def tst_create(mnt_dir):
name = name_generator()
fullname = pjoin(mnt_dir, name)
with pytest.raises(OSError) as exc_info:
os.stat(fullname)
assert exc_info.value.errno == errno.ENOENT
assert name not in os.listdir(mnt_dir)
fd = os.open(fullname, os.O_CREAT | os.O_RDWR)
os.close(fd)
assert name in os.listdir(mnt_dir)
fstat = os.lstat(fullname)
assert stat.S_ISREG(fstat.st_mode)
assert fstat.st_nlink == 1
assert fstat.st_size == 0
def tst_chown(mnt_dir):
filename = pjoin(mnt_dir, name_generator())
os.mkdir(filename)
fstat = os.lstat(filename)
uid = fstat.st_uid
gid = fstat.st_gid
uid_new = uid + 1
os.chown(filename, uid_new, -1)
fstat = os.lstat(filename)
assert fstat.st_uid == uid_new
assert fstat.st_gid == gid
gid_new = gid + 1
os.chown(filename, -1, gid_new)
fstat = os.lstat(filename)
assert fstat.st_uid == uid_new
assert fstat.st_gid == gid_new
def tst_open_read(src_dir, mnt_dir):
name = name_generator()
with open(pjoin(src_dir, name), 'wb') as fh_out, \
open(TEST_FILE, 'rb') as fh_in:
shutil.copyfileobj(fh_in, fh_out)
assert filecmp.cmp(pjoin(mnt_dir, name), TEST_FILE, False)
def tst_open_write(src_dir, mnt_dir):
name = name_generator()
fd = os.open(pjoin(src_dir, name),
os.O_CREAT | os.O_RDWR)
os.close(fd)
fullname = pjoin(mnt_dir, name)
with open(fullname, 'wb') as fh_out, \
open(TEST_FILE, 'rb') as fh_in:
shutil.copyfileobj(fh_in, fh_out)
assert filecmp.cmp(fullname, TEST_FILE, False)
def tst_open_unlink(mnt_dir):
name = pjoin(mnt_dir, name_generator())
data1 = b'foo'
data2 = b'bar'
fullname = pjoin(mnt_dir, name)
with open(fullname, 'wb+', buffering=0) as fh:
fh.write(data1)
os.unlink(fullname)
with pytest.raises(OSError) as exc_info:
os.stat(fullname)
assert exc_info.value.errno == errno.ENOENT
assert name not in os.listdir(mnt_dir)
fh.write(data2)
fh.seek(0)
assert fh.read() == data1+data2
def tst_statvfs(mnt_dir):
os.statvfs(mnt_dir)
def tst_link(mnt_dir):
name1 = pjoin(mnt_dir, name_generator())
name2 = pjoin(mnt_dir, name_generator())
shutil.copyfile(TEST_FILE, name1)
assert filecmp.cmp(name1, TEST_FILE, False)
fstat1 = os.lstat(name1)
assert fstat1.st_nlink == 1
os.link(name1, name2)
fstat1 = os.lstat(name1)
fstat2 = os.lstat(name2)
for attr in ('st_mode', 'st_dev', 'st_uid', 'st_gid',
'st_size', 'st_atime', 'st_mtime', 'st_ctime'):
assert getattr(fstat1, attr) == getattr(fstat2, attr)
assert os.path.basename(name2) in os.listdir(mnt_dir)
assert filecmp.cmp(name1, name2, False)
os.unlink(name2)
assert os.path.basename(name2) not in os.listdir(mnt_dir)
with pytest.raises(FileNotFoundError):
os.lstat(name2)
os.unlink(name1)
def tst_readdir(src_dir, mnt_dir):
newdir = name_generator()
src_newdir = pjoin(src_dir, newdir)
mnt_newdir = pjoin(mnt_dir, newdir)
file_ = src_newdir + "/" + name_generator()
subdir = src_newdir + "/" + name_generator()
subfile = subdir + "/" + name_generator()
os.mkdir(src_newdir)
shutil.copyfile(TEST_FILE, file_)
os.mkdir(subdir)
shutil.copyfile(TEST_FILE, subfile)
listdir_is = os.listdir(mnt_newdir)
listdir_is.sort()
listdir_should = [ os.path.basename(file_), os.path.basename(subdir) ]
listdir_should.sort()
assert listdir_is == listdir_should
os.unlink(file_)
os.unlink(subfile)
os.rmdir(subdir)
os.rmdir(src_newdir)
def tst_truncate_path(mnt_dir):
assert len(TEST_DATA) > 1024
filename = pjoin(mnt_dir, name_generator())
with open(filename, 'wb') as fh:
fh.write(TEST_DATA)
fstat = os.stat(filename)
size = fstat.st_size
assert size == len(TEST_DATA)
# Add zeros at the end
os.truncate(filename, size + 1024)
assert os.stat(filename).st_size == size + 1024
with open(filename, 'rb') as fh:
assert fh.read(size) == TEST_DATA
assert fh.read(1025) == b'\0' * 1024
# Truncate data
os.truncate(filename, size - 1024)
assert os.stat(filename).st_size == size - 1024
with open(filename, 'rb') as fh:
assert fh.read(size) == TEST_DATA[:size-1024]
os.unlink(filename)
def tst_truncate_fd(mnt_dir):
assert len(TEST_DATA) > 1024
with NamedTemporaryFile('w+b', 0, dir=mnt_dir) as fh:
fd = fh.fileno()
fh.write(TEST_DATA)
fstat = os.fstat(fd)
size = fstat.st_size
assert size == len(TEST_DATA)
# Add zeros at the end
os.ftruncate(fd, size + 1024)
assert os.fstat(fd).st_size == size + 1024
fh.seek(0)
assert fh.read(size) == TEST_DATA
assert fh.read(1025) == b'\0' * 1024
# Truncate data
os.ftruncate(fd, size - 1024)
assert os.fstat(fd).st_size == size - 1024
fh.seek(0)
assert fh.read(size) == TEST_DATA[:size-1024]
def tst_utimens(mnt_dir, tol=0):
filename = pjoin(mnt_dir, name_generator())
os.mkdir(filename)
fstat = os.lstat(filename)
atime = fstat.st_atime + 42.28
mtime = fstat.st_mtime - 42.23
if sys.version_info < (3,3):
os.utime(filename, (atime, mtime))
else:
atime_ns = fstat.st_atime_ns + int(42.28*1e9)
mtime_ns = fstat.st_mtime_ns - int(42.23*1e9)
os.utime(filename, None, ns=(atime_ns, mtime_ns))
fstat = os.lstat(filename)
assert abs(fstat.st_atime - atime) < tol
assert abs(fstat.st_mtime - mtime) < tol
if sys.version_info >= (3,3):
assert abs(fstat.st_atime_ns - atime_ns) < tol*1e9
assert abs(fstat.st_mtime_ns - mtime_ns) < tol*1e9
def tst_passthrough(src_dir, mnt_dir, cache_timeout):
name = name_generator()
src_name = pjoin(src_dir, name)
mnt_name = pjoin(src_dir, name)
assert name not in os.listdir(src_dir)
assert name not in os.listdir(mnt_dir)
with open(src_name, 'w') as fh:
fh.write('Hello, world')
assert name in os.listdir(src_dir)
if cache_timeout:
safe_sleep(cache_timeout+1)
assert name in os.listdir(mnt_dir)
assert os.stat(src_name) == os.stat(mnt_name)
name = name_generator()
src_name = pjoin(src_dir, name)
mnt_name = pjoin(src_dir, name)
assert name not in os.listdir(src_dir)
assert name not in os.listdir(mnt_dir)
with open(mnt_name, 'w') as fh:
fh.write('Hello, world')
assert name in os.listdir(src_dir)
if cache_timeout:
safe_sleep(cache_timeout+1)
assert name in os.listdir(mnt_dir)
assert os.stat(src_name) == os.stat(mnt_name)

48
test/travis-build.sh Executable file
View File

@ -0,0 +1,48 @@
#!/bin/bash
set -e
# Disable leak checking for now, there are some issues (or false positives)
# that we still need to fix
export ASAN_OPTIONS="detect_leaks=0"
export LSAN_OPTIONS="suppressions=$(pwd)/test/lsan_suppress.txt"
export CC
TEST_CMD="python3 -m pytest --maxfail=99 test/"
# Standard build with Valgrind
for CC in gcc gcc-6 clang; do
mkdir build-${CC}; cd build-${CC}
if [ ${CC} == 'gcc-6' ]; then
build_opts='-D b_lundef=false'
else
build_opts=''
fi
meson -D werror=true ${build_opts} ../
ninja
TEST_WITH_VALGRIND=true ${TEST_CMD}
cd ..
done
(cd build-$CC; sudo ninja install)
# Sanitized build
CC=clang
for san in undefined address; do
mkdir build-${san}; cd build-${san}
# b_lundef=false is required to work around clang
# bug, cf. https://groups.google.com/forum/#!topic/mesonbuild/tgEdAXIIdC4
meson -D b_sanitize=${san} -D b_lundef=false -D werror=true ..
ninja
${TEST_CMD}
cd ..
done
# Autotools build
CC=gcc
autoreconf -i
./configure
make
${TEST_CMD}
sudo make install

21
test/travis-install.sh Executable file
View File

@ -0,0 +1,21 @@
#!/bin/sh
set -e
sudo ln -svf $(which python3) /usr/bin/python3
sudo python3 -m pip install pytest meson
wget https://github.com/ninja-build/ninja/releases/download/v1.7.2/ninja-linux.zip
unzip ninja-linux.zip
chmod 755 ninja
sudo chown root:root ninja
sudo mv -fv ninja /usr/local/bin
valgrind --version
ninja --version
meson --version
# Setup ssh
ssh-keygen -b 768 -t rsa -f ~/.ssh/id_rsa -P ''
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
chmod 600 ~/.ssh/authorized_keys
ssh -o "StrictHostKeyChecking=no" localhost echo "SSH connection succeeded"

100
test/util.py Normal file
View File

@ -0,0 +1,100 @@
#!/usr/bin/env python3
import subprocess
import pytest
import os
import stat
import time
from os.path import join as pjoin
basename = pjoin(os.path.dirname(__file__), '..')
def wait_for_mount(mount_process, mnt_dir,
test_fn=os.path.ismount):
elapsed = 0
while elapsed < 30:
if test_fn(mnt_dir):
return True
if mount_process.poll() is not None:
pytest.fail('file system process terminated prematurely')
time.sleep(0.1)
elapsed += 0.1
pytest.fail("mountpoint failed to come up")
def cleanup(mnt_dir):
subprocess.call(['fusermount', '-z', '-u', mnt_dir],
stdout=subprocess.DEVNULL,
stderr=subprocess.STDOUT)
def umount(mount_process, mnt_dir):
subprocess.check_call(['fusermount', '-z', '-u', mnt_dir ])
assert not os.path.ismount(mnt_dir)
# Give mount process a little while to terminate. Popen.wait(timeout)
# was only added in 3.3...
elapsed = 0
while elapsed < 30:
code = mount_process.poll()
if code is not None:
if code == 0:
return
pytest.fail('file system process terminated with code %s' % (code,))
time.sleep(0.1)
elapsed += 0.1
pytest.fail('mount process did not terminate')
def safe_sleep(secs):
'''Like time.sleep(), but sleep for at least *secs*
`time.sleep` may sleep less than the given period if a signal is
received. This function ensures that we sleep for at least the
desired time.
'''
now = time.time()
end = now + secs
while now < end:
time.sleep(end - now)
now = time.time()
def fuse_test_marker():
'''Return a pytest.marker that indicates FUSE availability
If system/user/environment does not support FUSE, return
a `pytest.mark.skip` object with more details. If FUSE is
supported, return `pytest.mark.uses_fuse()`.
'''
skip = lambda x: pytest.mark.skip(reason=x)
with subprocess.Popen(['which', 'fusermount'], stdout=subprocess.PIPE,
universal_newlines=True) as which:
fusermount_path = which.communicate()[0].strip()
if not fusermount_path or which.returncode != 0:
return skip("Can't find fusermount executable")
if not os.path.exists('/dev/fuse'):
return skip("FUSE kernel module does not seem to be loaded")
if os.getuid() == 0:
return pytest.mark.uses_fuse()
mode = os.stat(fusermount_path).st_mode
if mode & stat.S_ISUID == 0:
return skip('fusermount executable not setuid, and we are not root.')
try:
fd = os.open('/dev/fuse', os.O_RDWR)
except OSError as exc:
return skip('Unable to open /dev/fuse: %s' % exc.strerror)
else:
os.close(fd)
return pytest.mark.uses_fuse()
# Use valgrind if requested
if os.environ.get('TEST_WITH_VALGRIND', 'no').lower().strip() \
not in ('no', 'false', '0'):
base_cmdline = [ 'valgrind', '-q', '--' ]
else:
base_cmdline = []

9
test/wrong_command.c Normal file
View File

@ -0,0 +1,9 @@
#include <stdio.h>
int main(void) {
fprintf(stderr, "\x1B[31m\e[1m"
"This is not the command you are looking for.\n"
"You probably want to run 'python3 -m pytest test/' instead"
"\e[0m\n");
return 1;
}