Skip to content

Commit 9317425

Browse files
committed
Merge pull request #1201 from pearsonlab/s3io
Added support for grabbing and putting data on S3.
2 parents 315b2d0 + a526121 commit 9317425

File tree

3 files changed

+444
-2
lines changed

3 files changed

+444
-2
lines changed

circle.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ dependencies:
99
override:
1010
- pip install --upgrade pip
1111
- pip install -e .
12-
- pip install matplotlib sphinx ipython
12+
- pip install matplotlib sphinx ipython boto
13+
- gem install fakes3
1314
- if [[ ! -d ~/examples/data ]]; then wget "http://tcpdiag.dl.sourceforge.net/project/nipy/nipype/nipype-0.2/nipype-tutorial.tar.bz2"; tar jxvf nipype-tutorial.tar.bz2; mkdir ~/examples; mv nipype-tutorial/* ~/examples/; fi
1415
# we download this manually because CircleCI does not cache apt
1516
- if [[ ! -d ~/examples/feeds ]]; then wget "http://fsl.fmrib.ox.ac.uk/fsldownloads/fsl-5.0.8-feeds.tar.gz"; tar zxvf fsl-5.0.8-feeds.tar.gz; mv feeds ~/examples/; fi

nipype/interfaces/io.py

Lines changed: 314 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,12 @@
4545
except:
4646
pass
4747

48+
try:
49+
import boto
50+
from boto.s3.connection import S3Connection, OrdinaryCallingFormat
51+
except:
52+
pass
53+
4854
from nipype.interfaces.base import (TraitedSpec, traits, File, Directory,
4955
BaseInterface, InputMultiPath, isdefined,
5056
OutputMultiPath, DynamicTraitedSpec,
@@ -371,6 +377,314 @@ def _list_outputs(self):
371377
return outputs
372378

373379

380+
class S3DataSinkInputSpec(DynamicTraitedSpec, BaseInterfaceInputSpec):
381+
testing = traits.Bool(False, usedefault=True,
382+
desc='Flag for using local fakes3 server.'
383+
' (for testing purposes only)')
384+
anon = traits.Bool(False, usedefault=True,
385+
desc='Use anonymous connection to s3')
386+
bucket = traits.Str(mandatory=True,
387+
desc='Amazon S3 bucket where your data is stored')
388+
bucket_path = traits.Str('', usedefault=True,
389+
desc='Location within your bucket to store '
390+
'data.')
391+
base_directory = Directory(
392+
desc='Path to the base directory for storing data.')
393+
container = traits.Str(
394+
desc='Folder within base directory in which to store output')
395+
parameterization = traits.Bool(True, usedefault=True,
396+
desc='store output in parametrized structure')
397+
strip_dir = Directory(desc='path to strip out of filename')
398+
substitutions = InputMultiPath(traits.Tuple(traits.Str, traits.Str),
399+
desc=('List of 2-tuples reflecting string '
400+
'to substitute and string to replace '
401+
'it with'))
402+
regexp_substitutions = InputMultiPath(traits.Tuple(traits.Str, traits.Str),
403+
desc=('List of 2-tuples reflecting a pair '
404+
'of a Python regexp pattern and a '
405+
'replacement string. Invoked after '
406+
'string `substitutions`'))
407+
408+
_outputs = traits.Dict(traits.Str, value={}, usedefault=True)
409+
remove_dest_dir = traits.Bool(False, usedefault=True,
410+
desc='remove dest directory when copying dirs')
411+
412+
def __setattr__(self, key, value):
413+
if key not in self.copyable_trait_names():
414+
if not isdefined(value):
415+
super(S3DataSinkInputSpec, self).__setattr__(key, value)
416+
self._outputs[key] = value
417+
else:
418+
if key in self._outputs:
419+
self._outputs[key] = value
420+
super(S3DataSinkInputSpec, self).__setattr__(key, value)
421+
422+
423+
class S3DataSink(DataSink):
424+
""" Works exactly like DataSink, except the specified files will
425+
also be uploaded to Amazon S3 storage in the specified bucket
426+
and location. 'bucket_path' is the s3 analog for
427+
'base_directory'.
428+
429+
"""
430+
input_spec = S3DataSinkInputSpec
431+
432+
def _list_outputs(self):
433+
"""Execute this module.
434+
"""
435+
outputs = super(S3DataSink, self)._list_outputs()
436+
437+
self.localtos3(outputs['out_file'])
438+
439+
return outputs
440+
441+
def localtos3(self, paths):
442+
if self.inputs.testing:
443+
conn = S3Connection(anon=True, is_secure=False, port=4567,
444+
host='localhost',
445+
calling_format=OrdinaryCallingFormat())
446+
447+
else:
448+
conn = S3Connection(anon=self.inputs.anon)
449+
bkt = conn.get_bucket(self.inputs.bucket)
450+
s3paths = []
451+
452+
for path in paths:
453+
# convert local path to s3 path
454+
bd_index = path.find(self.inputs.base_directory)
455+
if bd_index != -1: # base_directory is in path, maintain directory structure
456+
s3path = path[bd_index+len(self.inputs.base_directory):] # cut out base directory
457+
if s3path[0] == os.path.sep:
458+
s3path = s3path[1:]
459+
else: # base_directory isn't in path, simply place all files in bucket_path folder
460+
s3path = os.path.split(path)[1] # take filename from path
461+
s3path = os.path.join(self.inputs.bucket_path, s3path)
462+
if s3path[-1] == os.path.sep:
463+
s3path = s3path[:-1]
464+
s3paths.append(s3path)
465+
466+
k = boto.s3.key.Key(bkt)
467+
k.key = s3path
468+
k.set_contents_from_filename(path)
469+
470+
return s3paths
471+
472+
473+
class S3DataGrabberInputSpec(DynamicTraitedSpec, BaseInterfaceInputSpec):
474+
anon = traits.Bool(False, usedefault=True,
475+
desc='Use anonymous connection to s3')
476+
region = traits.Str('us-east-1', usedefault=True,
477+
desc='Region of s3 bucket')
478+
bucket = traits.Str(mandatory=True,
479+
desc='Amazon S3 bucket where your data is stored')
480+
bucket_path = traits.Str('', usedefault=True,
481+
desc='Location within your bucket for subject data.')
482+
local_directory = Directory(exists=True,
483+
desc='Path to the local directory for subject data to be downloaded '
484+
'and accessed. Should be on HDFS for Spark jobs.')
485+
raise_on_empty = traits.Bool(True, usedefault=True,
486+
desc='Generate exception if list is empty for a given field')
487+
sort_filelist = traits.Bool(mandatory=True,
488+
desc='Sort the filelist that matches the template')
489+
template = traits.Str(mandatory=True,
490+
desc='Layout used to get files. Relative to bucket_path if defined.'
491+
'Uses regex rather than glob style formatting.')
492+
template_args = traits.Dict(key_trait=traits.Str,
493+
value_trait=traits.List(traits.List),
494+
desc='Information to plug into template')
495+
496+
497+
class S3DataGrabber(IOBase):
498+
""" Generic datagrabber module that wraps around glob in an
499+
intelligent way for neuroimaging tasks to grab files from
500+
Amazon S3
501+
502+
Works exactly like DataGrabber, except, you must specify an
503+
S3 "bucket" and "bucket_path" to search for your data and a
504+
"local_directory" to store the data. "local_directory"
505+
should be a location on HDFS for Spark jobs. Additionally,
506+
"template" uses regex style formatting, rather than the
507+
glob-style found in the original DataGrabber.
508+
509+
"""
510+
input_spec = S3DataGrabberInputSpec
511+
output_spec = DynamicTraitedSpec
512+
_always_run = True
513+
514+
def __init__(self, infields=None, outfields=None, **kwargs):
515+
"""
516+
Parameters
517+
----------
518+
infields : list of str
519+
Indicates the input fields to be dynamically created
520+
521+
outfields: list of str
522+
Indicates output fields to be dynamically created
523+
524+
See class examples for usage
525+
526+
"""
527+
if not outfields:
528+
outfields = ['outfiles']
529+
super(S3DataGrabber, self).__init__(**kwargs)
530+
undefined_traits = {}
531+
# used for mandatory inputs check
532+
self._infields = infields
533+
self._outfields = outfields
534+
if infields:
535+
for key in infields:
536+
self.inputs.add_trait(key, traits.Any)
537+
undefined_traits[key] = Undefined
538+
# add ability to insert field specific templates
539+
self.inputs.add_trait('field_template',
540+
traits.Dict(traits.Enum(outfields),
541+
desc="arguments that fit into template"))
542+
undefined_traits['field_template'] = Undefined
543+
if not isdefined(self.inputs.template_args):
544+
self.inputs.template_args = {}
545+
for key in outfields:
546+
if not key in self.inputs.template_args:
547+
if infields:
548+
self.inputs.template_args[key] = [infields]
549+
else:
550+
self.inputs.template_args[key] = []
551+
552+
self.inputs.trait_set(trait_change_notify=False, **undefined_traits)
553+
554+
def _add_output_traits(self, base):
555+
"""
556+
S3 specific: Downloads relevant files to a local folder specified
557+
558+
Using traits.Any instead out OutputMultiPath till add_trait bug
559+
is fixed.
560+
"""
561+
return add_traits(base, self.inputs.template_args.keys())
562+
563+
def _list_outputs(self):
564+
# infields are mandatory, however I could not figure out how to set 'mandatory' flag dynamically
565+
# hence manual check
566+
if self._infields:
567+
for key in self._infields:
568+
value = getattr(self.inputs, key)
569+
if not isdefined(value):
570+
msg = "%s requires a value for input '%s' because it was listed in 'infields'" % \
571+
(self.__class__.__name__, key)
572+
raise ValueError(msg)
573+
574+
outputs = {}
575+
# get list of all files in s3 bucket
576+
conn = boto.connect_s3(anon=self.inputs.anon)
577+
bkt = conn.get_bucket(self.inputs.bucket)
578+
bkt_files = list(k.key for k in bkt.list())
579+
580+
# keys are outfields, args are template args for the outfield
581+
for key, args in self.inputs.template_args.items():
582+
outputs[key] = []
583+
template = self.inputs.template
584+
if hasattr(self.inputs, 'field_template') and \
585+
isdefined(self.inputs.field_template) and \
586+
key in self.inputs.field_template:
587+
template = self.inputs.field_template[key] # template override for multiple outfields
588+
if isdefined(self.inputs.bucket_path):
589+
template = os.path.join(self.inputs.bucket_path, template)
590+
if not args:
591+
filelist = []
592+
for fname in bkt_files:
593+
if re.match(template, fname):
594+
filelist.append(fname)
595+
if len(filelist) == 0:
596+
msg = 'Output key: %s Template: %s returned no files' % (
597+
key, template)
598+
if self.inputs.raise_on_empty:
599+
raise IOError(msg)
600+
else:
601+
warn(msg)
602+
else:
603+
if self.inputs.sort_filelist:
604+
filelist = human_order_sorted(filelist)
605+
outputs[key] = list_to_filename(filelist)
606+
for argnum, arglist in enumerate(args):
607+
maxlen = 1
608+
for arg in arglist:
609+
if isinstance(arg, six.string_types) and hasattr(self.inputs, arg):
610+
arg = getattr(self.inputs, arg)
611+
if isinstance(arg, list):
612+
if (maxlen > 1) and (len(arg) != maxlen):
613+
raise ValueError('incompatible number of arguments for %s' % key)
614+
if len(arg) > maxlen:
615+
maxlen = len(arg)
616+
outfiles = []
617+
for i in range(maxlen):
618+
argtuple = []
619+
for arg in arglist:
620+
if isinstance(arg, six.string_types) and hasattr(self.inputs, arg):
621+
arg = getattr(self.inputs, arg)
622+
if isinstance(arg, list):
623+
argtuple.append(arg[i])
624+
else:
625+
argtuple.append(arg)
626+
filledtemplate = template
627+
if argtuple:
628+
try:
629+
filledtemplate = template % tuple(argtuple)
630+
except TypeError as e:
631+
raise TypeError(e.message + ": Template %s failed to convert with args %s" % (template, str(tuple(argtuple))))
632+
outfiles = []
633+
for fname in bkt_files:
634+
if re.match(filledtemplate, fname):
635+
outfiles.append(fname)
636+
if len(outfiles) == 0:
637+
msg = 'Output key: %s Template: %s returned no files' % (key, filledtemplate)
638+
if self.inputs.raise_on_empty:
639+
raise IOError(msg)
640+
else:
641+
warn(msg)
642+
outputs[key].append(None)
643+
else:
644+
if self.inputs.sort_filelist:
645+
outfiles = human_order_sorted(outfiles)
646+
outputs[key].append(list_to_filename(outfiles))
647+
if any([val is None for val in outputs[key]]):
648+
outputs[key] = []
649+
if len(outputs[key]) == 0:
650+
outputs[key] = None
651+
elif len(outputs[key]) == 1:
652+
outputs[key] = outputs[key][0]
653+
# Outputs are currently stored as locations on S3.
654+
# We must convert to the local location specified
655+
# and download the files.
656+
for key in outputs:
657+
if type(outputs[key]) == list:
658+
paths = outputs[key]
659+
for i in range(len(paths)):
660+
path = paths[i]
661+
outputs[key][i] = self.s3tolocal(path, bkt)
662+
elif type(outputs[key]) == str:
663+
outputs[key] = self.s3tolocal(outputs[key], bkt)
664+
665+
return outputs
666+
667+
# Takes an s3 address and downloads the file to a local
668+
# directory, returning the local path.
669+
def s3tolocal(self, s3path, bkt):
670+
# path formatting
671+
if not os.path.split(self.inputs.local_directory)[1] == '':
672+
self.inputs.local_directory += '/'
673+
if not os.path.split(self.inputs.bucket_path)[1] == '':
674+
self.inputs.bucket_path += '/'
675+
if self.inputs.template[0] == '/':
676+
self.inputs.template = self.inputs.template[1:]
677+
678+
localpath = s3path.replace(self.inputs.bucket_path, self.inputs.local_directory)
679+
localdir = os.path.split(localpath)[0]
680+
if not os.path.exists(localdir):
681+
os.makedirs(localdir)
682+
k = boto.s3.key.Key(bkt)
683+
k.key = s3path
684+
k.get_contents_to_filename(localpath)
685+
return localpath
686+
687+
374688
class DataGrabberInputSpec(DynamicTraitedSpec, BaseInterfaceInputSpec):
375689
base_directory = Directory(exists=True,
376690
desc='Path to the base directory consisting of subject data.')

0 commit comments

Comments
 (0)