Skip to content

Commit 14e8ac4

Browse files
committed
TEST: TempFATFS allows testing Windows FS on POSIX
1 parent cb207f8 commit 14e8ac4

File tree

4 files changed

+83
-1
lines changed

4 files changed

+83
-1
lines changed

.travis.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ before_install:
3434
echo 'include_dirs = /usr/include:/usr/include/X11' >> $HOME/.numpy-site.cfg;
3535
fi
3636
install:
37+
- sudo apt-get install fusefat
3738
- conda update --yes conda
3839
- conda create -n testenv --yes pip python=$TRAVIS_PYTHON_VERSION
3940
- source activate testenv

nipype/testing/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
from numpy.testing import *
3030

3131
from . import decorators as dec
32-
from .utils import skip_if_no_package, package_check
32+
from .utils import skip_if_no_package, package_check, TempFATFS
3333

3434
skipif = dec.skipif
3535

nipype/testing/tests/test_utils.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
2+
# vi: set ft=python sts=4 ts=4 sw=4 et:
3+
"""Test testing utilities
4+
"""
5+
6+
from nipype.testing.utils import TempFATFS
7+
from nose.tools import assert_true
8+
9+
10+
def test_tempfatfs():
11+
with TempFATFS() as tmpdir:
12+
yield assert_true, tmpdir is not None

nipype/testing/utils.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,12 @@
44
"""
55
__docformat__ = 'restructuredtext'
66

7+
import os
8+
import time
9+
import shutil
10+
import signal
11+
import subprocess
12+
from tempfile import mkdtemp
713
from ..utils.misc import package_check
814
from nose import SkipTest
915

@@ -19,3 +25,66 @@ def skip_if_no_package(*args, **kwargs):
1925
package_check(exc_failed_import=SkipTest,
2026
exc_failed_check=SkipTest,
2127
*args, **kwargs)
28+
29+
30+
class TempFATFS(object):
31+
def __init__(self, size_in_mbytes=8, delay=0.5):
32+
"""Temporary filesystem for testing non-POSIX filesystems on a POSIX
33+
system.
34+
35+
with TempFATFS() as fatdir:
36+
target = os.path.join(fatdir, 'target')
37+
copyfile(file1, target, copy=False)
38+
assert_false(os.path.islink(target))
39+
40+
Arguments
41+
---------
42+
size_in_mbytes : int
43+
Size (in MiB) of filesystem to create
44+
delay : float
45+
Time (in seconds) to wait for fusefat to start, stop
46+
"""
47+
self.delay = delay
48+
self.tmpdir = mkdtemp()
49+
self.dev_null = open(os.devnull, 'wb')
50+
51+
vfatfile = os.path.join(self.tmpdir, 'vfatblock')
52+
self.vfatmount = os.path.join(self.tmpdir, 'vfatmount')
53+
self.canary = os.path.join(self.vfatmount, '.canary')
54+
55+
with open(vfatfile, 'wb') as fobj:
56+
fobj.write(b'\x00' * (int(size_in_mbytes) << 20))
57+
os.mkdir(self.vfatmount)
58+
59+
mkfs_args = ['mkfs.vfat', vfatfile]
60+
mount_args = ['fusefat', '-o', 'rw+', '-f', vfatfile, self.vfatmount]
61+
62+
subprocess.check_call(args=mkfs_args, stdout=self.dev_null,
63+
stderr=self.dev_null)
64+
self.fusefat = subprocess.Popen(args=mount_args, stdout=self.dev_null,
65+
stderr=self.dev_null)
66+
time.sleep(self.delay)
67+
68+
if self.fusefat.poll() is not None:
69+
raise IOError("fatfuse terminated too soon")
70+
71+
open(self.canary, 'wb').close()
72+
73+
def __enter__(self):
74+
return self.vfatmount
75+
76+
def __exit__(self, exc_type, exc_val, exc_tb):
77+
if self.fusefat is not None:
78+
self.fusefat.send_signal(signal.SIGINT)
79+
80+
# Allow 1s to return without sending terminate
81+
for count in range(10):
82+
time.sleep(0.1)
83+
if self.fusefat.poll() is not None:
84+
break
85+
else:
86+
self.fusefat.terminate()
87+
time.sleep(self.delay)
88+
assert not os.path.exists(self.canary)
89+
self.dev_null.close()
90+
shutil.rmtree(self.tmpdir)

0 commit comments

Comments
 (0)