Skip to content

RF: Clean-up the BaseInterface run() function using context #3347

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 51 additions & 127 deletions nipype/interfaces/base/core.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,31 @@
# -*- coding: utf-8 -*-
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
"""
Nipype interfaces core
......................


Defines the ``Interface`` API and the body of the
most basic interfaces.
The I/O specifications corresponding to these base
interfaces are found in the ``specs`` module.

"""
from copy import deepcopy
from datetime import datetime as dt
import os
import platform
import subprocess as sp
import shlex
import sys
import simplejson as json
from dateutil.parser import parse as parseutc
from traits.trait_errors import TraitError

from ... import config, logging, LooseVersion
from ...utils.provenance import write_provenance
from ...utils.misc import str2bool, rgetcwd
from ...utils.filemanip import split_filename, which, get_dependencies, canonicalize_env
from ...utils.misc import str2bool
from ...utils.filemanip import (
canonicalize_env,
get_dependencies,
indirectory,
split_filename,
which,
)
from ...utils.subprocess import run_command

from ...external.due import due
Expand All @@ -39,7 +38,12 @@
MpiCommandLineInputSpec,
get_filecopy_info,
)
from .support import Bunch, InterfaceResult, NipypeInterfaceError, format_help
from .support import (
RuntimeContext,
InterfaceResult,
NipypeInterfaceError,
format_help,
)

iflogger = logging.getLogger("nipype.interface")

Expand All @@ -63,8 +67,15 @@ class Interface(object):

"""

input_spec = None # A traited input specification
output_spec = None # A traited output specification
input_spec = None
"""
The specification of the input, defined by a :py:class:`~traits.has_traits.HasTraits` class.
"""
output_spec = None
"""
The specification of the output, defined by a :py:class:`~traits.has_traits.HasTraits` class.
"""

_can_resume = False # See property below
_always_run = False # See property below

Expand Down Expand Up @@ -365,131 +376,44 @@ def run(self, cwd=None, ignore_exception=None, **inputs):
if successful, results

"""
from ...utils.profiler import ResourceMonitor

# if ignore_exception is not provided, taking self.ignore_exception
if ignore_exception is None:
ignore_exception = self.ignore_exception

# Tear-up: get current and prev directories
syscwd = rgetcwd(error=False) # Recover when wd does not exist
if cwd is None:
cwd = syscwd

os.chdir(cwd) # Change to the interface wd
rtc = RuntimeContext(
resource_monitor=config.resource_monitor and self.resource_monitor,
ignore_exception=ignore_exception
if ignore_exception is not None
else self.ignore_exception,
)

enable_rm = config.resource_monitor and self.resource_monitor
self.inputs.trait_set(**inputs)
with indirectory(cwd or os.getcwd()):
self.inputs.trait_set(**inputs)
self._check_mandatory_inputs()
self._check_version_requirements(self.inputs)
interface = self.__class__
self._duecredit_cite()

# initialize provenance tracking
store_provenance = str2bool(
config.get("execution", "write_provenance", "false")
)
env = deepcopy(dict(os.environ))
if self._redirect_x:
env["DISPLAY"] = config.get_display()

runtime = Bunch(
cwd=cwd,
prevcwd=syscwd,
returncode=None,
duration=None,
environ=env,
startTime=dt.isoformat(dt.utcnow()),
endTime=None,
platform=platform.platform(),
hostname=platform.node(),
version=self.version,
)
runtime_attrs = set(runtime.dictcopy())

mon_sp = None
if enable_rm:
mon_freq = float(config.get("execution", "resource_monitor_frequency", 1))
proc_pid = os.getpid()
iflogger.debug(
"Creating a ResourceMonitor on a %s interface, PID=%d.",
self.__class__.__name__,
proc_pid,
)
mon_sp = ResourceMonitor(proc_pid, freq=mon_freq)
mon_sp.start()
with rtc(self, cwd=cwd, redirect_x=self._redirect_x) as runtime:

# Grab inputs now, as they should not change during execution
inputs = self.inputs.get_traitsfree()
outputs = None

try:
# Grab inputs now, as they should not change during execution
inputs = self.inputs.get_traitsfree()
outputs = None
# Run interface
runtime = self._pre_run_hook(runtime)
runtime = self._run_interface(runtime)
runtime = self._post_run_hook(runtime)
# Collect outputs
outputs = self.aggregate_outputs(runtime)
except Exception as e:
import traceback

# Retrieve the maximum info fast
runtime.traceback = traceback.format_exc()
# Gather up the exception arguments and append nipype info.
exc_args = e.args if getattr(e, "args") else tuple()
exc_args += (
"An exception of type %s occurred while running interface %s."
% (type(e).__name__, self.__class__.__name__),
)
if config.get("logging", "interface_level", "info").lower() == "debug":
exc_args += ("Inputs: %s" % str(self.inputs),)

runtime.traceback_args = ("\n".join(["%s" % arg for arg in exc_args]),)

if not ignore_exception:
raise
finally:
if runtime is None or runtime_attrs - set(runtime.dictcopy()):
raise RuntimeError(
"{} interface failed to return valid "
"runtime object".format(interface.__class__.__name__)
)
# This needs to be done always
runtime.endTime = dt.isoformat(dt.utcnow())
timediff = parseutc(runtime.endTime) - parseutc(runtime.startTime)
runtime.duration = (
timediff.days * 86400 + timediff.seconds + timediff.microseconds / 1e6
)
results = InterfaceResult(
interface, runtime, inputs=inputs, outputs=outputs, provenance=None
)

# Add provenance (if required)
if store_provenance:
# Provenance will only throw a warning if something went wrong
results.provenance = write_provenance(results)

# Make sure runtime profiler is shut down
if enable_rm:
import numpy as np

mon_sp.stop()

runtime.mem_peak_gb = None
runtime.cpu_percent = None

# Read .prof file in and set runtime values
vals = np.loadtxt(mon_sp.fname, delimiter=",")
if vals.size:
vals = np.atleast_2d(vals)
runtime.mem_peak_gb = vals[:, 2].max() / 1024
runtime.cpu_percent = vals[:, 1].max()

runtime.prof_dict = {
"time": vals[:, 0].tolist(),
"cpus": vals[:, 1].tolist(),
"rss_GiB": (vals[:, 2] / 1024).tolist(),
"vms_GiB": (vals[:, 3] / 1024).tolist(),
}
os.chdir(syscwd)
results = InterfaceResult(
self.__class__,
rtc.runtime,
inputs=inputs,
outputs=outputs,
provenance=None,
)

# Add provenance (if required)
if str2bool(config.get("execution", "write_provenance", "false")):
# Provenance will only throw a warning if something went wrong
results.provenance = write_provenance(results)

self._duecredit_cite()

return results

Expand Down
98 changes: 96 additions & 2 deletions nipype/interfaces/base/support.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,113 @@

"""
import os
from contextlib import AbstractContextManager
from copy import deepcopy
from textwrap import wrap
import re
from datetime import datetime as dt
from dateutil.parser import parse as parseutc
import platform

from ... import logging
from ...utils.misc import is_container
from ... import logging, config
from ...utils.misc import is_container, rgetcwd
from ...utils.filemanip import md5, hash_infile

iflogger = logging.getLogger("nipype.interface")

HELP_LINEWIDTH = 70


class RuntimeContext(AbstractContextManager):
"""A context manager to run NiPype interfaces."""

__slots__ = ("_runtime", "_resmon", "_ignore_exc")

def __init__(self, resource_monitor=False, ignore_exception=False):
"""Initialize the context manager object."""
self._ignore_exc = ignore_exception
_proc_pid = os.getpid()
if resource_monitor:
from ...utils.profiler import ResourceMonitor
else:
from ...utils.profiler import ResourceMonitorMock as ResourceMonitor

self._resmon = ResourceMonitor(
_proc_pid,
freq=float(config.get("execution", "resource_monitor_frequency", 1)),
)

def __call__(self, interface, cwd=None, redirect_x=False):
"""Generate a new runtime object."""
# Tear-up: get current and prev directories
_syscwd = rgetcwd(error=False) # Recover when wd does not exist
if cwd is None:
cwd = _syscwd

self._runtime = Bunch(
cwd=str(cwd),
duration=None,
endTime=None,
environ=deepcopy(dict(os.environ)),
hostname=platform.node(),
interface=interface.__class__.__name__,
platform=platform.platform(),
prevcwd=str(_syscwd),
redirect_x=redirect_x,
resmon=self._resmon.fname or "off",
returncode=None,
startTime=None,
version=interface.version,
)
return self

def __enter__(self):
"""Tear-up the execution of an interface."""
if self._runtime.redirect_x:
self._runtime.environ["DISPLAY"] = config.get_display()

self._runtime.startTime = dt.isoformat(dt.utcnow())
self._resmon.start()
# TODO: Perhaps clean-up path and ensure it exists?
os.chdir(self._runtime.cwd)
return self._runtime

def __exit__(self, exc_type, exc_value, exc_tb):
"""Tear-down interface execution."""
self._runtime.endTime = dt.isoformat(dt.utcnow())
timediff = parseutc(self._runtime.endTime) - parseutc(self._runtime.startTime)
self._runtime.duration = (
timediff.days * 86400 + timediff.seconds + timediff.microseconds / 1e6
)
# Collect monitored data
for k, v in self._resmon.stop():
setattr(self._runtime, k, v)

os.chdir(self._runtime.prevcwd)

if exc_type is not None or exc_value is not None or exc_tb is not None:
import traceback

# Retrieve the maximum info fast
self._runtime.traceback = "".join(
traceback.format_exception(exc_type, exc_value, exc_tb)
)
# Gather up the exception arguments and append nipype info.
exc_args = exc_value.args if getattr(exc_value, "args") else tuple()
exc_args += (
f"An exception of type {exc_type.__name__} occurred while "
f"running interface {self._runtime.interface}.",
)
self._runtime.traceback_args = ("\n".join([f"{arg}" for arg in exc_args]),)

if self._ignore_exc:
return True

@property
def runtime(self):
return self._runtime


class NipypeInterfaceError(Exception):
"""Custom error for interfaces"""

Expand Down
Loading