From c5093fb53d75dab22255973ec0a52834f1d3e203 Mon Sep 17 00:00:00 2001 From: Oscar Esteban Date: Tue, 22 Jun 2021 08:54:29 +0200 Subject: [PATCH 1/2] ENH: Clean-up the BaseInterface ``run()`` function using context Python contexts seem the most appropriate pattern to follow. --- nipype/interfaces/base/core.py | 172 ++++++++---------------------- nipype/interfaces/base/support.py | 98 ++++++++++++++++- nipype/utils/profiler.py | 40 ++++++- 3 files changed, 179 insertions(+), 131 deletions(-) diff --git a/nipype/interfaces/base/core.py b/nipype/interfaces/base/core.py index 57e889da9b..f23e11e7bd 100644 --- a/nipype/interfaces/base/core.py +++ b/nipype/interfaces/base/core.py @@ -1,31 +1,24 @@ -# -*- coding: utf-8 -*- # emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- # vi: set ft=python sts=4 ts=4 sw=4 et: """ Nipype interfaces core ...................... - Defines the ``Interface`` API and the body of the most basic interfaces. The I/O specifications corresponding to these base interfaces are found in the ``specs`` module. """ -from copy import deepcopy -from datetime import datetime as dt import os -import platform import subprocess as sp import shlex -import sys import simplejson as json -from dateutil.parser import parse as parseutc from traits.trait_errors import TraitError from ... import config, logging, LooseVersion from ...utils.provenance import write_provenance -from ...utils.misc import str2bool, rgetcwd +from ...utils.misc import str2bool from ...utils.filemanip import split_filename, which, get_dependencies, canonicalize_env from ...utils.subprocess import run_command @@ -39,7 +32,12 @@ MpiCommandLineInputSpec, get_filecopy_info, ) -from .support import Bunch, InterfaceResult, NipypeInterfaceError, format_help +from .support import ( + RuntimeContext, + InterfaceResult, + NipypeInterfaceError, + format_help, +) iflogger = logging.getLogger("nipype.interface") @@ -63,8 +61,15 @@ class Interface(object): """ - input_spec = None # A traited input specification - output_spec = None # A traited output specification + input_spec = None + """ + The specification of the input, defined by a :py:class:`~traits.has_traits.HasTraits` class. + """ + output_spec = None + """ + The specification of the output, defined by a :py:class:`~traits.has_traits.HasTraits` class. + """ + _can_resume = False # See property below _always_run = False # See property below @@ -365,131 +370,42 @@ def run(self, cwd=None, ignore_exception=None, **inputs): if successful, results """ - from ...utils.profiler import ResourceMonitor - - # if ignore_exception is not provided, taking self.ignore_exception - if ignore_exception is None: - ignore_exception = self.ignore_exception - - # Tear-up: get current and prev directories - syscwd = rgetcwd(error=False) # Recover when wd does not exist - if cwd is None: - cwd = syscwd - - os.chdir(cwd) # Change to the interface wd - - enable_rm = config.resource_monitor and self.resource_monitor - self.inputs.trait_set(**inputs) - self._check_mandatory_inputs() - self._check_version_requirements(self.inputs) - interface = self.__class__ - self._duecredit_cite() - - # initialize provenance tracking - store_provenance = str2bool( - config.get("execution", "write_provenance", "false") - ) - env = deepcopy(dict(os.environ)) - if self._redirect_x: - env["DISPLAY"] = config.get_display() - - runtime = Bunch( - cwd=cwd, - prevcwd=syscwd, - returncode=None, - duration=None, - environ=env, - startTime=dt.isoformat(dt.utcnow()), - endTime=None, - platform=platform.platform(), - hostname=platform.node(), - version=self.version, + rtc = RuntimeContext( + resource_monitor=config.resource_monitor and self.resource_monitor, + ignore_exception=ignore_exception + if ignore_exception is not None + else self.ignore_exception, ) - runtime_attrs = set(runtime.dictcopy()) - - mon_sp = None - if enable_rm: - mon_freq = float(config.get("execution", "resource_monitor_frequency", 1)) - proc_pid = os.getpid() - iflogger.debug( - "Creating a ResourceMonitor on a %s interface, PID=%d.", - self.__class__.__name__, - proc_pid, - ) - mon_sp = ResourceMonitor(proc_pid, freq=mon_freq) - mon_sp.start() - # Grab inputs now, as they should not change during execution - inputs = self.inputs.get_traitsfree() - outputs = None + with rtc(self, cwd=cwd, redirect_x=self._redirect_x) as runtime: + self.inputs.trait_set(**inputs) + self._check_mandatory_inputs() + self._check_version_requirements(self.inputs) - try: + # Grab inputs now, as they should not change during execution + inputs = self.inputs.get_traitsfree() + outputs = None + # Run interface runtime = self._pre_run_hook(runtime) runtime = self._run_interface(runtime) runtime = self._post_run_hook(runtime) + # Collect outputs outputs = self.aggregate_outputs(runtime) - except Exception as e: - import traceback - - # Retrieve the maximum info fast - runtime.traceback = traceback.format_exc() - # Gather up the exception arguments and append nipype info. - exc_args = e.args if getattr(e, "args") else tuple() - exc_args += ( - "An exception of type %s occurred while running interface %s." - % (type(e).__name__, self.__class__.__name__), - ) - if config.get("logging", "interface_level", "info").lower() == "debug": - exc_args += ("Inputs: %s" % str(self.inputs),) - - runtime.traceback_args = ("\n".join(["%s" % arg for arg in exc_args]),) - - if not ignore_exception: - raise - finally: - if runtime is None or runtime_attrs - set(runtime.dictcopy()): - raise RuntimeError( - "{} interface failed to return valid " - "runtime object".format(interface.__class__.__name__) - ) - # This needs to be done always - runtime.endTime = dt.isoformat(dt.utcnow()) - timediff = parseutc(runtime.endTime) - parseutc(runtime.startTime) - runtime.duration = ( - timediff.days * 86400 + timediff.seconds + timediff.microseconds / 1e6 - ) - results = InterfaceResult( - interface, runtime, inputs=inputs, outputs=outputs, provenance=None - ) - # Add provenance (if required) - if store_provenance: - # Provenance will only throw a warning if something went wrong - results.provenance = write_provenance(results) - - # Make sure runtime profiler is shut down - if enable_rm: - import numpy as np - - mon_sp.stop() - - runtime.mem_peak_gb = None - runtime.cpu_percent = None - - # Read .prof file in and set runtime values - vals = np.loadtxt(mon_sp.fname, delimiter=",") - if vals.size: - vals = np.atleast_2d(vals) - runtime.mem_peak_gb = vals[:, 2].max() / 1024 - runtime.cpu_percent = vals[:, 1].max() - - runtime.prof_dict = { - "time": vals[:, 0].tolist(), - "cpus": vals[:, 1].tolist(), - "rss_GiB": (vals[:, 2] / 1024).tolist(), - "vms_GiB": (vals[:, 3] / 1024).tolist(), - } - os.chdir(syscwd) + results = InterfaceResult( + self.__class__, + rtc.runtime, + inputs=inputs, + outputs=outputs, + provenance=None, + ) + + # Add provenance (if required) + if str2bool(config.get("execution", "write_provenance", "false")): + # Provenance will only throw a warning if something went wrong + results.provenance = write_provenance(results) + + self._duecredit_cite() return results diff --git a/nipype/interfaces/base/support.py b/nipype/interfaces/base/support.py index 88359354fd..4b01769e75 100644 --- a/nipype/interfaces/base/support.py +++ b/nipype/interfaces/base/support.py @@ -8,12 +8,16 @@ """ import os +from contextlib import AbstractContextManager from copy import deepcopy from textwrap import wrap import re +from datetime import datetime as dt +from dateutil.parser import parse as parseutc +import platform -from ... import logging -from ...utils.misc import is_container +from ... import logging, config +from ...utils.misc import is_container, rgetcwd from ...utils.filemanip import md5, hash_infile iflogger = logging.getLogger("nipype.interface") @@ -21,6 +25,96 @@ HELP_LINEWIDTH = 70 +class RuntimeContext(AbstractContextManager): + """A context manager to run NiPype interfaces.""" + + __slots__ = ("_runtime", "_resmon", "_ignore_exc") + + def __init__(self, resource_monitor=False, ignore_exception=False): + """Initialize the context manager object.""" + self._ignore_exc = ignore_exception + _proc_pid = os.getpid() + if resource_monitor: + from ...utils.profiler import ResourceMonitor + else: + from ...utils.profiler import ResourceMonitorMock as ResourceMonitor + + self._resmon = ResourceMonitor( + _proc_pid, + freq=float(config.get("execution", "resource_monitor_frequency", 1)), + ) + + def __call__(self, interface, cwd=None, redirect_x=False): + """Generate a new runtime object.""" + # Tear-up: get current and prev directories + _syscwd = rgetcwd(error=False) # Recover when wd does not exist + if cwd is None: + cwd = _syscwd + + self._runtime = Bunch( + cwd=str(cwd), + duration=None, + endTime=None, + environ=deepcopy(dict(os.environ)), + hostname=platform.node(), + interface=interface.__class__.__name__, + platform=platform.platform(), + prevcwd=str(_syscwd), + redirect_x=redirect_x, + resmon=self._resmon.fname or "off", + returncode=None, + startTime=None, + version=interface.version, + ) + return self + + def __enter__(self): + """Tear-up the execution of an interface.""" + if self._runtime.redirect_x: + self._runtime.environ["DISPLAY"] = config.get_display() + + self._runtime.startTime = dt.isoformat(dt.utcnow()) + self._resmon.start() + # TODO: Perhaps clean-up path and ensure it exists? + os.chdir(self._runtime.cwd) + return self._runtime + + def __exit__(self, exc_type, exc_value, exc_tb): + """Tear-down interface execution.""" + self._runtime.endTime = dt.isoformat(dt.utcnow()) + timediff = parseutc(self._runtime.endTime) - parseutc(self._runtime.startTime) + self._runtime.duration = ( + timediff.days * 86400 + timediff.seconds + timediff.microseconds / 1e6 + ) + # Collect monitored data + for k, v in self._resmon.stop(): + setattr(self._runtime, k, v) + + os.chdir(self._runtime.prevcwd) + + if exc_type is not None or exc_value is not None or exc_tb is not None: + import traceback + + # Retrieve the maximum info fast + self._runtime.traceback = "".join( + traceback.format_exception(exc_type, exc_value, exc_tb) + ) + # Gather up the exception arguments and append nipype info. + exc_args = exc_value.args if getattr(exc_value, "args") else tuple() + exc_args += ( + f"An exception of type {exc_type.__name__} occurred while " + f"running interface {self._runtime.interface}.", + ) + self._runtime.traceback_args = ("\n".join([f"{arg}" for arg in exc_args]),) + + if self._ignore_exc: + return True + + @property + def runtime(self): + return self._runtime + + class NipypeInterfaceError(Exception): """Custom error for interfaces""" diff --git a/nipype/utils/profiler.py b/nipype/utils/profiler.py index 20e024693f..9ed380eb04 100644 --- a/nipype/utils/profiler.py +++ b/nipype/utils/profiler.py @@ -5,6 +5,7 @@ Utilities to keep track of performance """ import os +import numpy as np import threading from time import time @@ -23,6 +24,24 @@ _MB = 1024.0 ** 2 +class ResourceMonitorMock: + """A mock class to use when the monitor is disabled.""" + + @property + def fname(self): + """Get/set the internal filename""" + return None + + def __init__(self, pid, freq=5, fname=None, python=True): + pass + + def start(self): + pass + + def stop(self): + return {} + + class ResourceMonitor(threading.Thread): """ A ``Thread`` to monitor a specific PID with a certain frequence @@ -57,7 +76,7 @@ def fname(self): return self._fname def stop(self): - """Stop monitoring""" + """Stop monitoring.""" if not self._event.is_set(): self._event.set() self.join() @@ -65,6 +84,25 @@ def stop(self): self._logfile.flush() self._logfile.close() + retval = { + "mem_peak_gb": None, + "cpu_percent": None, + } + + # Read .prof file in and set runtime values + vals = np.loadtxt(self._fname, delimiter=",") + if vals.size: + vals = np.atleast_2d(vals) + retval["mem_peak_gb"] = vals[:, 2].max() / 1024 + retval["cpu_percent"] = vals[:, 1].max() + retval["prof_dict"] = { + "time": vals[:, 0].tolist(), + "cpus": vals[:, 1].tolist(), + "rss_GiB": (vals[:, 2] / 1024).tolist(), + "vms_GiB": (vals[:, 3] / 1024).tolist(), + } + return retval + def _sample(self, cpu_interval=None): cpu = 0.0 rss = 0.0 From 24f2cbced7cb27364ca4c8725954f8ec53d16fac Mon Sep 17 00:00:00 2001 From: Oscar Esteban Date: Wed, 21 Jul 2021 16:01:31 +0200 Subject: [PATCH 2/2] enh: roll back to a more constant behavior with errors --- nipype/interfaces/base/core.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/nipype/interfaces/base/core.py b/nipype/interfaces/base/core.py index f23e11e7bd..7eb81b2c40 100644 --- a/nipype/interfaces/base/core.py +++ b/nipype/interfaces/base/core.py @@ -19,7 +19,13 @@ from ... import config, logging, LooseVersion from ...utils.provenance import write_provenance from ...utils.misc import str2bool -from ...utils.filemanip import split_filename, which, get_dependencies, canonicalize_env +from ...utils.filemanip import ( + canonicalize_env, + get_dependencies, + indirectory, + split_filename, + which, +) from ...utils.subprocess import run_command from ...external.due import due @@ -377,10 +383,12 @@ def run(self, cwd=None, ignore_exception=None, **inputs): else self.ignore_exception, ) - with rtc(self, cwd=cwd, redirect_x=self._redirect_x) as runtime: + with indirectory(cwd or os.getcwd()): self.inputs.trait_set(**inputs) - self._check_mandatory_inputs() - self._check_version_requirements(self.inputs) + self._check_mandatory_inputs() + self._check_version_requirements(self.inputs) + + with rtc(self, cwd=cwd, redirect_x=self._redirect_x) as runtime: # Grab inputs now, as they should not change during execution inputs = self.inputs.get_traitsfree()