Skip to content

Commit 3a7218a

Browse files
committed
move around functions
1 parent 2c56c58 commit 3a7218a

File tree

2 files changed

+47
-51
lines changed

2 files changed

+47
-51
lines changed

adaptive/runner.py

Lines changed: 46 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
except ImportError:
5151
from typing_extensions import Literal
5252

53-
5453
try:
5554
import ipyparallel
5655
from ipyparallel.client.asyncresult import AsyncResult
@@ -101,52 +100,6 @@
101100
_default_executor = loky.get_reusable_executor
102101

103102

104-
# -- Internal executor-related, things
105-
106-
107-
def _ensure_executor(
108-
executor: ExecutorTypes | None,
109-
) -> concurrent.Executor:
110-
if executor is None:
111-
executor = concurrent.ProcessPoolExecutor()
112-
113-
if isinstance(executor, concurrent.Executor):
114-
return executor
115-
elif with_ipyparallel and isinstance(executor, ipyparallel.Client):
116-
return executor.executor()
117-
elif with_distributed and isinstance(executor, distributed.Client):
118-
return executor.get_executor()
119-
else:
120-
raise TypeError(
121-
# TODO: check if this is correct. Isn't MPI,loky supported?
122-
"Only a concurrent.futures.Executor, distributed.Client,"
123-
" or ipyparallel.Client can be used."
124-
)
125-
126-
127-
def _get_ncores(
128-
ex: (ExecutorTypes),
129-
) -> int:
130-
"""Return the maximum number of cores that an executor can use."""
131-
if with_ipyparallel and isinstance(ex, ipyparallel.client.view.ViewExecutor):
132-
return len(ex.view)
133-
elif isinstance(
134-
ex, (concurrent.ProcessPoolExecutor, concurrent.ThreadPoolExecutor)
135-
):
136-
return ex._max_workers # not public API!
137-
elif isinstance(ex, loky.reusable_executor._ReusablePoolExecutor):
138-
return ex._max_workers # not public API!
139-
elif isinstance(ex, SequentialExecutor):
140-
return 1
141-
elif with_distributed and isinstance(ex, distributed.cfexecutor.ClientExecutor):
142-
return sum(n for n in ex._client.ncores().values())
143-
elif with_mpi4py and isinstance(ex, mpi4py.futures.MPIPoolExecutor):
144-
ex.bootup() # wait until all workers are up and running
145-
return ex._pool.size # not public API!
146-
else:
147-
raise TypeError(f"Cannot get number of cores for {ex.__class__}")
148-
149-
150103
class BaseRunner(metaclass=abc.ABCMeta):
151104
r"""Base class for runners that use `concurrent.futures.Executor`\'s.
152105
@@ -1016,9 +969,6 @@ def stop_after(*, seconds=0, minutes=0, hours=0) -> Callable[[BaseLearner], bool
1016969
return lambda _: time.time() > stop_time
1017970

1018971

1019-
# -- Internal executor-related, things
1020-
1021-
1022972
class _TimeGoal:
1023973
def __init__(self, dt: timedelta | datetime | int | float):
1024974
self.dt = dt if isinstance(dt, (timedelta, datetime)) else timedelta(seconds=dt)
@@ -1142,3 +1092,49 @@ def _goal(
11421092
duration=duration_goal,
11431093
allow_running_forever=allow_running_forever,
11441094
)
1095+
1096+
1097+
# -- Internal executor-related, things
1098+
1099+
1100+
def _ensure_executor(
1101+
executor: ExecutorTypes | None,
1102+
) -> concurrent.Executor:
1103+
if executor is None:
1104+
executor = concurrent.ProcessPoolExecutor()
1105+
1106+
if isinstance(executor, concurrent.Executor):
1107+
return executor
1108+
elif with_ipyparallel and isinstance(executor, ipyparallel.Client):
1109+
return executor.executor()
1110+
elif with_distributed and isinstance(executor, distributed.Client):
1111+
return executor.get_executor()
1112+
else:
1113+
raise TypeError(
1114+
# TODO: check if this is correct. Isn't MPI,loky supported?
1115+
"Only a concurrent.futures.Executor, distributed.Client,"
1116+
" or ipyparallel.Client can be used."
1117+
)
1118+
1119+
1120+
def _get_ncores(
1121+
ex: (ExecutorTypes),
1122+
) -> int:
1123+
"""Return the maximum number of cores that an executor can use."""
1124+
if with_ipyparallel and isinstance(ex, ipyparallel.client.view.ViewExecutor):
1125+
return len(ex.view)
1126+
elif isinstance(
1127+
ex, (concurrent.ProcessPoolExecutor, concurrent.ThreadPoolExecutor)
1128+
):
1129+
return ex._max_workers # not public API!
1130+
elif isinstance(ex, loky.reusable_executor._ReusablePoolExecutor):
1131+
return ex._max_workers # not public API!
1132+
elif isinstance(ex, SequentialExecutor):
1133+
return 1
1134+
elif with_distributed and isinstance(ex, distributed.cfexecutor.ClientExecutor):
1135+
return sum(n for n in ex._client.ncores().values())
1136+
elif with_mpi4py and isinstance(ex, mpi4py.futures.MPIPoolExecutor):
1137+
ex.bootup() # wait until all workers are up and running
1138+
return ex._pool.size # not public API!
1139+
else:
1140+
raise TypeError(f"Cannot get number of cores for {ex.__class__}")

example-notebook.ipynb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2000,7 +2000,7 @@
20002000
}
20012001
],
20022002
"source": [
2003-
"type(ex.submit(lambda x: x, 1))"
2003+
"type(ex.submiddt(lambda x: x, 1))"
20042004
]
20052005
},
20062006
{

0 commit comments

Comments
 (0)