Closed
Description
I have several projects where the distribution of tests runtime is quite scattered, eg:
- 1000 tests of 10ms
- 100 tests of 1 minute
The current load scheduler comes short in this case, as it often ends up sending a batch of slow tests to the same worker.
As a workaround, I use a forked LoadScheduler that uses a fixed queue size (which I use with the minimum value of 2 -> each worker only has one test in its queue at any time):
class FixedLocalQueueLoadScheduling(LoadScheduling): # no cover
"""
A fork of pytest-xdist default load scheduler that uses a fixed size for workers local queue size.
"""
def __init__(self, config, log=None, queue_size=2):
super().__init__(config, log)
if queue_size < 2:
raise ValueError('Queue size must be at least 2')
self.queue_size = queue_size
def check_schedule(self, node, duration=0):
if node.shutting_down:
return
if self.pending:
node_pending = self.node2pending[node]
if len(node_pending) < self.queue_size:
num_send = self.queue_size - len(node_pending)
self._send_tests(node, num_send)
self.log("num items waiting for node:", len(self.pending))
def schedule(self):
assert self.collection_is_completed
# Initial distribution already happened, reschedule on all nodes
if self.collection is not None:
for node in self.nodes:
self.check_schedule(node)
return
# allow nodes to have different collections
if not self._check_nodes_have_same_collection():
self.log('**Different tests collected, aborting run**')
return
# Collections are identical, create the index of pending items.
self.collection = list(self.node2collection.values())[0]
self.pending[:] = range(len(self.collection))
if not self.collection:
return
# Send a batch of tests to run. If we don't have at least two
# tests per node, we have to send them all so that we can send
# shutdown signals and get all nodes working.
initial_batch = min(len(self.pending), self.queue_size * len(self.nodes))
# distribute tests round-robin up to the batch size
# (or until we run out)
nodes = cycle(self.nodes)
for i in range(initial_batch):
self._send_tests(next(nodes), 1)
if not self.pending:
# initial distribution sent all tests, start node shutdown
for node in self.nodes:
node.shutdown()
It would be nice to have at least one of these propositions implemented in xdist:
- Integrate this scheduler (or an even simpler version where queue_size=2)
- Make LoadScheduler configurable, so that users can provide initial_batch_size / items_per_node_min / items_per_node_max
- When sending a batch of jobs to a node, shuffle like for the initial batch
- Maybe improve/reduce a bit the defaults settings for initial_batch_size / items_per_node_min / items_per_node_max