Skip to content

Make load scheduler configurable #255

Closed
@nicoulaj

Description

@nicoulaj

I have several projects where the distribution of tests runtime is quite scattered, eg:

  • 1000 tests of 10ms
  • 100 tests of 1 minute

The current load scheduler comes short in this case, as it often ends up sending a batch of slow tests to the same worker.

As a workaround, I use a forked LoadScheduler that uses a fixed queue size (which I use with the minimum value of 2 -> each worker only has one test in its queue at any time):

class FixedLocalQueueLoadScheduling(LoadScheduling):  # no cover
    """
    A fork of pytest-xdist default load scheduler that uses a fixed size for workers local queue size.
    """

    def __init__(self, config, log=None, queue_size=2):
        super().__init__(config, log)
        if queue_size < 2:
            raise ValueError('Queue size must be at least 2')
        self.queue_size = queue_size

    def check_schedule(self, node, duration=0):
        if node.shutting_down:
            return

        if self.pending:
            node_pending = self.node2pending[node]
            if len(node_pending) < self.queue_size:
                num_send = self.queue_size - len(node_pending)
                self._send_tests(node, num_send)
        self.log("num items waiting for node:", len(self.pending))

    def schedule(self):
        assert self.collection_is_completed

        # Initial distribution already happened, reschedule on all nodes
        if self.collection is not None:
            for node in self.nodes:
                self.check_schedule(node)
            return

        # allow nodes to have different collections
        if not self._check_nodes_have_same_collection():
            self.log('**Different tests collected, aborting run**')
            return

        # Collections are identical, create the index of pending items.
        self.collection = list(self.node2collection.values())[0]
        self.pending[:] = range(len(self.collection))
        if not self.collection:
            return

        # Send a batch of tests to run. If we don't have at least two
        # tests per node, we have to send them all so that we can send
        # shutdown signals and get all nodes working.
        initial_batch = min(len(self.pending), self.queue_size * len(self.nodes))

        # distribute tests round-robin up to the batch size
        # (or until we run out)
        nodes = cycle(self.nodes)
        for i in range(initial_batch):
            self._send_tests(next(nodes), 1)

        if not self.pending:
            # initial distribution sent all tests, start node shutdown
            for node in self.nodes:
                node.shutdown()

It would be nice to have at least one of these propositions implemented in xdist:

  1. Integrate this scheduler (or an even simpler version where queue_size=2)
  2. Make LoadScheduler configurable, so that users can provide initial_batch_size / items_per_node_min / items_per_node_max
  3. When sending a batch of jobs to a node, shuffle like for the initial batch
  4. Maybe improve/reduce a bit the defaults settings for initial_batch_size / items_per_node_min / items_per_node_max

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions