Skip to content

guidellm.scheduler

Scheduler subsystem for orchestrating benchmark workloads and managing worker processes.

This module provides the core scheduling infrastructure for guidellm, including strategies for controlling request timing patterns (synchronous, asynchronous, constant rate, Poisson), constraints for limiting benchmark execution (duration, error rates, request counts), and distributed execution through worker processes. The scheduler coordinates between backend interfaces, manages benchmark state transitions, and handles multi-turn request sequences with customizable timing strategies and resource constraints.

BackendT = TypeVar('BackendT', bound=BackendInterface) module-attribute

Generic backend interface type for request processing

DatasetIterT = TypeAliasType('DatasetIterT', Iterable[Iterable[RequestT]], type_params=(RequestT,)) module-attribute

Output of data loader, an iterable of batches, where each batch is an iterable of (request, timestamp) tuples.

HistoryT = TypeAliasType('HistoryT', list[tuple[RequestT, ResponseT | None]], type_params=(RequestT, ResponseT)) module-attribute

Record of requests + responses in conversation.

RequestDataT = TypeAliasType('RequestDataT', tuple[RequestT, RequestInfo], type_params=(RequestT,)) module-attribute

Request including external metadata and scheduling config.

RequestT = TypeVar('RequestT') module-attribute

Generic request object type for scheduler processing

ResponseT = TypeVar('ResponseT') module-attribute

Generic response object type returned by backend processing

StrategyT = TypeVar('StrategyT', bound=SchedulingStrategy) module-attribute

Type variable bound to SchedulingStrategy for generic strategy operations

AsyncConstantStrategy

Bases: SchedulingStrategy

Constant-rate scheduling for predictable load patterns.

Schedules requests at a fixed rate distributed evenly across worker processes, providing predictable timing behavior for steady-state load simulation and consistent system performance measurement. Requests arrive at uniform intervals.

Source code in src/guidellm/scheduler/strategies.py
@SchedulingStrategy.register("constant")
class AsyncConstantStrategy(SchedulingStrategy):
    """
    Constant-rate scheduling for predictable load patterns.

    Schedules requests at a fixed rate distributed evenly across worker processes,
    providing predictable timing behavior for steady-state load simulation and
    consistent system performance measurement. Requests arrive at uniform intervals.
    """

    type_: Literal["constant"] = "constant"  # type: ignore[assignment]
    rate: float = Field(
        description="Request scheduling rate in requests per second",
        gt=0,
    )
    max_concurrency: PositiveInt | None = Field(
        default=None,
        description="Maximum number of concurrent requests to schedule",
    )
    rampup_duration: NonNegativeFloat = Field(
        default=0.0,
        description=(
            "Duration in seconds to linearly ramp up from 0 to target rate "
            "at the beginning of each strategy run"
        ),
    )

    def __str__(self) -> str:
        """
        :return: String identifier with rate value
        """
        return f"constant@{self.rate:.2f}"

    @property
    def processes_limit(self) -> PositiveInt | None:
        """
        :return: Max concurrency if set, otherwise None for unlimited
        """
        return self.max_concurrency

    @property
    def requests_limit(self) -> PositiveInt | None:
        """
        :return: Max concurrency if set, otherwise None for unlimited
        """
        return self.max_concurrency

    async def next_request_time(self, worker_index: PositiveInt) -> float:
        """
        Calculate next request time at fixed intervals with optional linear rampup.

        Schedules requests at uniform intervals determined by the configured rate,
        independent of request completion times. If rampup_duration is set, the rate
        increases linearly from 0 to the target rate during the rampup period, then
        continues at the constant rate.

        :param worker_index: Unused for constant strategy
        :return: Start time plus interval based on request index and
            rampup configuration
        """
        _ = worker_index  # unused
        current_index = self.next_request_index()
        start_time = await self.get_processes_start_time()

        if self.rampup_duration > 0:
            # Calculate number of requests that would be sent during rampup
            # Cumulative requests by time t during rampup:
            # n = rate * t² / (2 * rampup_duration)
            # At end of rampup (t = rampup_duration), n_rampup is calculated below
            n_rampup = self.rate * self.rampup_duration / 2.0

            if current_index == 1:
                # First request at start_time
                return start_time
            elif current_index <= n_rampup:
                # During rampup: solve for t where
                # n = rate * t² / (2 * rampup_duration)
                time_offset = math.sqrt(
                    2.0 * current_index * self.rampup_duration / self.rate
                )
                return start_time + time_offset
            else:
                # After rampup: continue at constant rate
                time_offset = (
                    self.rampup_duration + (current_index - n_rampup) / self.rate
                )
                return start_time + time_offset
        else:
            # No rampup: uniform intervals
            return start_time + current_index / self.rate

    def request_completed(self, request_info: RequestInfo):
        """
        Handle request completion (no-op for constant strategy).

        :param request_info: Completed request metadata (unused)
        """
        _ = request_info  # request_info unused for async constant strategy

processes_limit property

Returns:

Type Description
PositiveInt | None

Max concurrency if set, otherwise None for unlimited

requests_limit property

Returns:

Type Description
PositiveInt | None

Max concurrency if set, otherwise None for unlimited

__str__()

Returns:

Type Description
str

String identifier with rate value

Source code in src/guidellm/scheduler/strategies.py
def __str__(self) -> str:
    """
    :return: String identifier with rate value
    """
    return f"constant@{self.rate:.2f}"

next_request_time(worker_index) async

Calculate next request time at fixed intervals with optional linear rampup.

Schedules requests at uniform intervals determined by the configured rate, independent of request completion times. If rampup_duration is set, the rate increases linearly from 0 to the target rate during the rampup period, then continues at the constant rate.

Parameters:

Name Type Description Default
worker_index PositiveInt

Unused for constant strategy

required

Returns:

Type Description
float

Start time plus interval based on request index and rampup configuration

Source code in src/guidellm/scheduler/strategies.py
async def next_request_time(self, worker_index: PositiveInt) -> float:
    """
    Calculate next request time at fixed intervals with optional linear rampup.

    Schedules requests at uniform intervals determined by the configured rate,
    independent of request completion times. If rampup_duration is set, the rate
    increases linearly from 0 to the target rate during the rampup period, then
    continues at the constant rate.

    :param worker_index: Unused for constant strategy
    :return: Start time plus interval based on request index and
        rampup configuration
    """
    _ = worker_index  # unused
    current_index = self.next_request_index()
    start_time = await self.get_processes_start_time()

    if self.rampup_duration > 0:
        # Calculate number of requests that would be sent during rampup
        # Cumulative requests by time t during rampup:
        # n = rate * t² / (2 * rampup_duration)
        # At end of rampup (t = rampup_duration), n_rampup is calculated below
        n_rampup = self.rate * self.rampup_duration / 2.0

        if current_index == 1:
            # First request at start_time
            return start_time
        elif current_index <= n_rampup:
            # During rampup: solve for t where
            # n = rate * t² / (2 * rampup_duration)
            time_offset = math.sqrt(
                2.0 * current_index * self.rampup_duration / self.rate
            )
            return start_time + time_offset
        else:
            # After rampup: continue at constant rate
            time_offset = (
                self.rampup_duration + (current_index - n_rampup) / self.rate
            )
            return start_time + time_offset
    else:
        # No rampup: uniform intervals
        return start_time + current_index / self.rate

request_completed(request_info)

Handle request completion (no-op for constant strategy).

Parameters:

Name Type Description Default
request_info RequestInfo

Completed request metadata (unused)

required
Source code in src/guidellm/scheduler/strategies.py
def request_completed(self, request_info: RequestInfo):
    """
    Handle request completion (no-op for constant strategy).

    :param request_info: Completed request metadata (unused)
    """
    _ = request_info  # request_info unused for async constant strategy

AsyncPoissonStrategy

Bases: SchedulingStrategy

Poisson-distributed scheduling for realistic load simulation.

Schedules requests following a Poisson process with exponentially distributed inter-arrival times, providing realistic simulation of user behavior and network traffic patterns. Request arrivals have random variance around the target rate.

Source code in src/guidellm/scheduler/strategies.py
@SchedulingStrategy.register("poisson")
class AsyncPoissonStrategy(SchedulingStrategy):
    """
    Poisson-distributed scheduling for realistic load simulation.

    Schedules requests following a Poisson process with exponentially distributed
    inter-arrival times, providing realistic simulation of user behavior and network
    traffic patterns. Request arrivals have random variance around the target rate.
    """

    type_: Literal["poisson"] = "poisson"  # type: ignore[assignment]
    rate: float = Field(
        description="Request scheduling rate in requests per second",
        gt=0,
    )
    max_concurrency: PositiveInt | None = Field(
        default=None,
        description="Maximum number of concurrent requests to schedule",
    )
    random_seed: int = Field(
        default=42,
        description="Random seed for Poisson distribution reproducibility",
    )

    _random: random.Random | None = PrivateAttr(None)
    _offset: Synchronized[float] | None = PrivateAttr(None)

    def __str__(self) -> str:
        """
        :return: String identifier with rate value
        """
        return f"poisson@{self.rate:.2f}"

    @property
    def processes_limit(self) -> PositiveInt | None:
        """
        :return: Max concurrency if set, otherwise None for unlimited
        """
        return self.max_concurrency

    @property
    def requests_limit(self) -> PositiveInt | None:
        """
        :return: Max concurrency if set, otherwise None for unlimited
        """
        return self.max_concurrency

    def init_processes_timings(
        self,
        worker_count: PositiveInt,
        max_concurrency: PositiveInt,
        mp_context: BaseContext,
    ):
        """
        Initialize Poisson-specific timing state.

        Sets up shared offset value for coordinating exponentially distributed
        request timing across worker processes.

        :param worker_count: Number of worker processes to coordinate
        :param max_concurrency: Maximum number of concurrent requests allowed
        """
        self._offset = mp_context.Value("d", -1.0)
        # Call base implementation last to avoid
        # setting Event before offset is ready
        super().init_processes_timings(worker_count, max_concurrency, mp_context)

    def init_processes_start(self, start_time: float):
        """
        Initialize the offset time for Poisson timing calculations.

        Sets the initial timing offset from which exponentially distributed
        intervals are calculated.

        :param start_time: Unix timestamp when request processing should begin
        """
        ThroughputStrategy.init_processes_start(self, start_time)

        if self._offset is None:
            raise RuntimeError(
                "_offset is None in init_processes_start; was "
                "init_processes_timings not called?"
            )
        with self._offset.get_lock():
            self._offset.value = start_time

    async def next_request_time(self, worker_index: PositiveInt) -> float:
        """
        Calculate next request time using exponential distribution.

        Generates inter-arrival times following exponential distribution,
        accumulating delays to produce Poisson-distributed request arrivals.

        :param worker_index: Unused for Poisson strategy
        :return: Next arrival time based on Poisson process
        """
        _ = worker_index  # unused
        _ = await self.get_processes_start_time()  # ensure offset is initialized

        if self._random is None:
            self._random = random.Random(self.random_seed)

        next_delay = self._random.expovariate(self.rate)

        if self._offset is None:
            raise RuntimeError(
                "_offset is None in next_request_time; was "
                "init_processes_timings not called?"
            )
        with self._offset.get_lock():
            self._offset.value += next_delay

            return self._offset.value

    def request_completed(self, request_info: RequestInfo):
        """
        Handle request completion (no-op for Poisson strategy).

        :param request_info: Completed request metadata (unused)
        """
        _ = request_info  # request_info unused for async poisson strategy

processes_limit property

Returns:

Type Description
PositiveInt | None

Max concurrency if set, otherwise None for unlimited

requests_limit property

Returns:

Type Description
PositiveInt | None

Max concurrency if set, otherwise None for unlimited

__str__()

Returns:

Type Description
str

String identifier with rate value

Source code in src/guidellm/scheduler/strategies.py
def __str__(self) -> str:
    """
    :return: String identifier with rate value
    """
    return f"poisson@{self.rate:.2f}"

init_processes_start(start_time)

Initialize the offset time for Poisson timing calculations.

Sets the initial timing offset from which exponentially distributed intervals are calculated.

Parameters:

Name Type Description Default
start_time float

Unix timestamp when request processing should begin

required
Source code in src/guidellm/scheduler/strategies.py
def init_processes_start(self, start_time: float):
    """
    Initialize the offset time for Poisson timing calculations.

    Sets the initial timing offset from which exponentially distributed
    intervals are calculated.

    :param start_time: Unix timestamp when request processing should begin
    """
    ThroughputStrategy.init_processes_start(self, start_time)

    if self._offset is None:
        raise RuntimeError(
            "_offset is None in init_processes_start; was "
            "init_processes_timings not called?"
        )
    with self._offset.get_lock():
        self._offset.value = start_time

init_processes_timings(worker_count, max_concurrency, mp_context)

Initialize Poisson-specific timing state.

Sets up shared offset value for coordinating exponentially distributed request timing across worker processes.

Parameters:

Name Type Description Default
worker_count PositiveInt

Number of worker processes to coordinate

required
max_concurrency PositiveInt

Maximum number of concurrent requests allowed

required
Source code in src/guidellm/scheduler/strategies.py
def init_processes_timings(
    self,
    worker_count: PositiveInt,
    max_concurrency: PositiveInt,
    mp_context: BaseContext,
):
    """
    Initialize Poisson-specific timing state.

    Sets up shared offset value for coordinating exponentially distributed
    request timing across worker processes.

    :param worker_count: Number of worker processes to coordinate
    :param max_concurrency: Maximum number of concurrent requests allowed
    """
    self._offset = mp_context.Value("d", -1.0)
    # Call base implementation last to avoid
    # setting Event before offset is ready
    super().init_processes_timings(worker_count, max_concurrency, mp_context)

next_request_time(worker_index) async

Calculate next request time using exponential distribution.

Generates inter-arrival times following exponential distribution, accumulating delays to produce Poisson-distributed request arrivals.

Parameters:

Name Type Description Default
worker_index PositiveInt

Unused for Poisson strategy

required

Returns:

Type Description
float

Next arrival time based on Poisson process

Source code in src/guidellm/scheduler/strategies.py
async def next_request_time(self, worker_index: PositiveInt) -> float:
    """
    Calculate next request time using exponential distribution.

    Generates inter-arrival times following exponential distribution,
    accumulating delays to produce Poisson-distributed request arrivals.

    :param worker_index: Unused for Poisson strategy
    :return: Next arrival time based on Poisson process
    """
    _ = worker_index  # unused
    _ = await self.get_processes_start_time()  # ensure offset is initialized

    if self._random is None:
        self._random = random.Random(self.random_seed)

    next_delay = self._random.expovariate(self.rate)

    if self._offset is None:
        raise RuntimeError(
            "_offset is None in next_request_time; was "
            "init_processes_timings not called?"
        )
    with self._offset.get_lock():
        self._offset.value += next_delay

        return self._offset.value

request_completed(request_info)

Handle request completion (no-op for Poisson strategy).

Parameters:

Name Type Description Default
request_info RequestInfo

Completed request metadata (unused)

required
Source code in src/guidellm/scheduler/strategies.py
def request_completed(self, request_info: RequestInfo):
    """
    Handle request completion (no-op for Poisson strategy).

    :param request_info: Completed request metadata (unused)
    """
    _ = request_info  # request_info unused for async poisson strategy

BackendInterface

Bases: Protocol, Generic[RequestT, ResponseT]

Protocol defining the interface for request processing backends.

Establishes the contract for backend implementations that process requests within the scheduler system. Backends manage initialization, validation, processing, and shutdown lifecycle. All properties must be pickleable before process_startup is called for multi-process environments.

Example: :: class CustomBackend(BackendInterface): @property def processes_limit(self) -> int: return 4

    async def resolve(self, request, request_info, history=None):
        yield response, updated_request_info
Source code in src/guidellm/scheduler/schemas.py
class BackendInterface(Protocol, Generic[RequestT, ResponseT]):
    """
    Protocol defining the interface for request processing backends.

    Establishes the contract for backend implementations that process requests
    within the scheduler system. Backends manage initialization, validation,
    processing, and shutdown lifecycle. All properties must be pickleable before
    process_startup is called for multi-process environments.

    Example:
    ::
        class CustomBackend(BackendInterface):
            @property
            def processes_limit(self) -> int:
                return 4

            async def resolve(self, request, request_info, history=None):
                yield response, updated_request_info
    """

    @property
    def processes_limit(self) -> int | None:
        """
        :return: Maximum worker processes supported, or None if unlimited
        """

    @property
    def requests_limit(self) -> int | None:
        """
        :return: Maximum concurrent requests supported, or None if unlimited
        """

    @property
    def info(self) -> dict[str, Any]:
        """
        :return: Backend metadata including model initialization and configuration
        """

    async def process_startup(self) -> None:
        """
        Perform backend initialization and startup procedures.

        :raises Exception: Implementation-specific exceptions for startup failures
        """

    async def validate(self) -> None:
        """
        Validate backend configuration and operational status.

        :raises Exception: Implementation-specific exceptions for validation failures
        """

    async def process_shutdown(self) -> None:
        """
        Perform backend cleanup and shutdown procedures.

        :raises Exception: Implementation-specific exceptions for shutdown failures
        """

    async def resolve(
        self,
        request: RequestT,
        request_info: RequestInfo,
        history: HistoryT[RequestT, ResponseT] | None = None,
    ) -> AsyncIterator[tuple[ResponseT | None, RequestInfo]]:
        """
        Process a request and yield incremental response updates.

        :param request: The request object to process
        :param request_info: Scheduling metadata and timing information
        :param history: Conversation history for multi-turn requests
        :yield: Tuples of (response, updated_request_info) for each response chunk.
            Response may be None for intermediate updates (e.g., first token arrival).
        :raises Exception: Implementation-specific exceptions for processing failures
        """

info property

Returns:

Type Description
dict[str, Any]

Backend metadata including model initialization and configuration

processes_limit property

Returns:

Type Description
int | None

Maximum worker processes supported, or None if unlimited

requests_limit property

Returns:

Type Description
int | None

Maximum concurrent requests supported, or None if unlimited

process_shutdown() async

Perform backend cleanup and shutdown procedures.

Raises:

Type Description
Exception

Implementation-specific exceptions for shutdown failures

Source code in src/guidellm/scheduler/schemas.py
async def process_shutdown(self) -> None:
    """
    Perform backend cleanup and shutdown procedures.

    :raises Exception: Implementation-specific exceptions for shutdown failures
    """

process_startup() async

Perform backend initialization and startup procedures.

Raises:

Type Description
Exception

Implementation-specific exceptions for startup failures

Source code in src/guidellm/scheduler/schemas.py
async def process_startup(self) -> None:
    """
    Perform backend initialization and startup procedures.

    :raises Exception: Implementation-specific exceptions for startup failures
    """

resolve(request, request_info, history=None) async

Process a request and yield incremental response updates.

:yield: Tuples of (response, updated_request_info) for each response chunk. Response may be None for intermediate updates (e.g., first token arrival).

Parameters:

Name Type Description Default
request RequestT

The request object to process

required
request_info RequestInfo

Scheduling metadata and timing information

required
history HistoryT[RequestT, ResponseT] | None

Conversation history for multi-turn requests

None

Raises:

Type Description
Exception

Implementation-specific exceptions for processing failures

Source code in src/guidellm/scheduler/schemas.py
async def resolve(
    self,
    request: RequestT,
    request_info: RequestInfo,
    history: HistoryT[RequestT, ResponseT] | None = None,
) -> AsyncIterator[tuple[ResponseT | None, RequestInfo]]:
    """
    Process a request and yield incremental response updates.

    :param request: The request object to process
    :param request_info: Scheduling metadata and timing information
    :param history: Conversation history for multi-turn requests
    :yield: Tuples of (response, updated_request_info) for each response chunk.
        Response may be None for intermediate updates (e.g., first token arrival).
    :raises Exception: Implementation-specific exceptions for processing failures
    """

validate() async

Validate backend configuration and operational status.

Raises:

Type Description
Exception

Implementation-specific exceptions for validation failures

Source code in src/guidellm/scheduler/schemas.py
async def validate(self) -> None:
    """
    Validate backend configuration and operational status.

    :raises Exception: Implementation-specific exceptions for validation failures
    """

ConcurrentStrategy

Bases: SchedulingStrategy

Parallel request processing with fixed concurrency limits.

Enables concurrent request processing up to a specified number of streams, providing balanced throughput while maintaining predictable resource usage. Requests are distributed across streams with completion-based timing coordination.

Source code in src/guidellm/scheduler/strategies.py
@SchedulingStrategy.register("concurrent")
class ConcurrentStrategy(SchedulingStrategy):
    """
    Parallel request processing with fixed concurrency limits.

    Enables concurrent request processing up to a specified number of streams,
    providing balanced throughput while maintaining predictable resource usage.
    Requests are distributed across streams with completion-based timing coordination.
    """

    type_: Literal["concurrent"] = "concurrent"  # type: ignore[assignment]
    streams: PositiveInt = Field(
        description="Number of concurrent streams for scheduling requests",
    )
    rampup_duration: NonNegativeFloat = Field(
        default=0.0,
        description=(
            "Duration in seconds to spread initial requests up to max_concurrency "
            "at the beginning of each strategy run"
        ),
    )

    _process_last_request_time: float | None = PrivateAttr(None)

    def __str__(self) -> str:
        """
        :return: String identifier with stream count
        """
        return f"concurrent@{self.streams}"

    @property
    def processes_limit(self) -> PositiveInt:
        """
        :return: Number of streams as maximum worker processes
        """
        return self.streams

    @property
    def requests_limit(self) -> PositiveInt:
        """
        :return: Number of streams as maximum concurrent requests
        """
        return self.streams

    async def next_request_time(self, worker_index: PositiveInt) -> float:
        """
        Calculate next request time with stream-based distribution.

        Initial requests are staggered across streams during rampup, subsequent
        requests scheduled after previous completion within each stream.

        :param worker_index: Worker process index for distributing initial requests
        :return: Time of last completion or staggered start time if first request
        """
        _ = worker_index  # unused
        current_index = self.next_request_index()
        start_time = await self.get_processes_start_time()

        if current_index < self.streams and self.rampup_duration > 0:
            # linearly spread start times for first concurrent requests across rampup
            return start_time + self.rampup_duration * (current_index / self.streams)

        if self._process_last_request_time is not None:
            return self._process_last_request_time

        return start_time

    def request_completed(self, request_info: RequestInfo):
        """
        Update timing state with completed request information.

        Tracks completion time to schedule next request in the same stream.

        :param request_info: Completed request metadata including timing
        """
        if request_info.completed_at is not None:
            self._process_last_request_time = request_info.completed_at

processes_limit property

Returns:

Type Description
PositiveInt

Number of streams as maximum worker processes

requests_limit property

Returns:

Type Description
PositiveInt

Number of streams as maximum concurrent requests

__str__()

Returns:

Type Description
str

String identifier with stream count

Source code in src/guidellm/scheduler/strategies.py
def __str__(self) -> str:
    """
    :return: String identifier with stream count
    """
    return f"concurrent@{self.streams}"

next_request_time(worker_index) async

Calculate next request time with stream-based distribution.

Initial requests are staggered across streams during rampup, subsequent requests scheduled after previous completion within each stream.

Parameters:

Name Type Description Default
worker_index PositiveInt

Worker process index for distributing initial requests

required

Returns:

Type Description
float

Time of last completion or staggered start time if first request

Source code in src/guidellm/scheduler/strategies.py
async def next_request_time(self, worker_index: PositiveInt) -> float:
    """
    Calculate next request time with stream-based distribution.

    Initial requests are staggered across streams during rampup, subsequent
    requests scheduled after previous completion within each stream.

    :param worker_index: Worker process index for distributing initial requests
    :return: Time of last completion or staggered start time if first request
    """
    _ = worker_index  # unused
    current_index = self.next_request_index()
    start_time = await self.get_processes_start_time()

    if current_index < self.streams and self.rampup_duration > 0:
        # linearly spread start times for first concurrent requests across rampup
        return start_time + self.rampup_duration * (current_index / self.streams)

    if self._process_last_request_time is not None:
        return self._process_last_request_time

    return start_time

request_completed(request_info)

Update timing state with completed request information.

Tracks completion time to schedule next request in the same stream.

Parameters:

Name Type Description Default
request_info RequestInfo

Completed request metadata including timing

required
Source code in src/guidellm/scheduler/strategies.py
def request_completed(self, request_info: RequestInfo):
    """
    Update timing state with completed request information.

    Tracks completion time to schedule next request in the same stream.

    :param request_info: Completed request metadata including timing
    """
    if request_info.completed_at is not None:
        self._process_last_request_time = request_info.completed_at

Constraint

Bases: Protocol

Protocol for constraint evaluation functions that control scheduler behavior.

Defines the interface that all constraint implementations must follow. Constraints are callable objects that evaluate scheduler state and request information to determine whether processing should continue or stop. The protocol enables type checking and runtime validation of constraint implementations while allowing flexible implementation approaches (functions, classes, closures).

Example: :: def my_constraint( state: SchedulerState, request: RequestInfo ) -> SchedulerUpdateAction: if state.processing_requests > 100: return SchedulerUpdateAction(request_queuing="stop") return SchedulerUpdateAction(request_queuing="continue")

Source code in src/guidellm/scheduler/constraints/constraint.py
@runtime_checkable
class Constraint(Protocol):
    """
    Protocol for constraint evaluation functions that control scheduler behavior.

    Defines the interface that all constraint implementations must follow. Constraints
    are callable objects that evaluate scheduler state and request information to
    determine whether processing should continue or stop. The protocol enables type
    checking and runtime validation of constraint implementations while allowing
    flexible implementation approaches (functions, classes, closures).

    Example:
    ::
        def my_constraint(
            state: SchedulerState, request: RequestInfo
        ) -> SchedulerUpdateAction:
            if state.processing_requests > 100:
                return SchedulerUpdateAction(request_queuing="stop")
            return SchedulerUpdateAction(request_queuing="continue")
    """

    def __call__(
        self, state: SchedulerState, request: RequestInfo
    ) -> SchedulerUpdateAction:
        """
        Evaluate constraint against scheduler state and request information.

        :param state: Current scheduler state with metrics and timing information
        :param request: Individual request information and metadata
        :return: Action indicating whether to continue or stop scheduler operations
        """

__call__(state, request)

Evaluate constraint against scheduler state and request information.

Parameters:

Name Type Description Default
state SchedulerState

Current scheduler state with metrics and timing information

required
request RequestInfo

Individual request information and metadata

required

Returns:

Type Description
SchedulerUpdateAction

Action indicating whether to continue or stop scheduler operations

Source code in src/guidellm/scheduler/constraints/constraint.py
def __call__(
    self, state: SchedulerState, request: RequestInfo
) -> SchedulerUpdateAction:
    """
    Evaluate constraint against scheduler state and request information.

    :param state: Current scheduler state with metrics and timing information
    :param request: Individual request information and metadata
    :return: Action indicating whether to continue or stop scheduler operations
    """

ConstraintArgs

Bases: PydanticClassRegistryMixin['ConstraintArgs']

Base class for constraint configuration arguments.

Uses PydanticClassRegistryMixin to enable polymorphic deserialization based on the kind field. Each registered subclass represents a specific constraint type with its own parameters.

Attributes:

Name Type Description
schema_discriminator str

Field name for polymorphic deserialization

Source code in src/guidellm/scheduler/constraints/args.py
class ConstraintArgs(PydanticClassRegistryMixin["ConstraintArgs"]):
    """
    Base class for constraint configuration arguments.

    Uses ``PydanticClassRegistryMixin`` to enable polymorphic deserialization
    based on the ``kind`` field. Each registered subclass represents a specific
    constraint type with its own parameters.

    :cvar schema_discriminator: Field name for polymorphic deserialization
    """

    model_config = ConfigDict(
        extra="forbid",
        serialize_by_alias=True,
        ser_json_bytes="base64",
        val_json_bytes="base64",
    )

    schema_discriminator: ClassVar[str] = "kind"

    @classmethod
    def __pydantic_schema_base_type__(cls) -> type[ConstraintArgs]:
        """
        Return base type for polymorphic validation hierarchy.

        :return: Base ConstraintArgs class for schema validation
        """
        if cls.__name__ == "ConstraintArgs":
            return cls

        return ConstraintArgs

    kind: str = Field(
        description="Constraint type discriminator for polymorphic serialization",
    )

    @property
    def constraint_key(self) -> str:
        """
        The key to use when inserting into the constraints dict.

        Defaults to ``kind``, but subclasses may override if the factory
        registry key differs from the args kind.

        :return: Registry key for this constraint type
        """
        return self.kind

constraint_key property

The key to use when inserting into the constraints dict.

Defaults to kind, but subclasses may override if the factory registry key differs from the args kind.

Returns:

Type Description
str

Registry key for this constraint type

__pydantic_schema_base_type__() classmethod

Return base type for polymorphic validation hierarchy.

Returns:

Type Description
type[ConstraintArgs]

Base ConstraintArgs class for schema validation

Source code in src/guidellm/scheduler/constraints/args.py
@classmethod
def __pydantic_schema_base_type__(cls) -> type[ConstraintArgs]:
    """
    Return base type for polymorphic validation hierarchy.

    :return: Base ConstraintArgs class for schema validation
    """
    if cls.__name__ == "ConstraintArgs":
        return cls

    return ConstraintArgs

ConstraintInitializer

Bases: Protocol

Protocol for constraint initializer factory functions that create constraints.

Defines the interface for factory objects that create constraint instances from configuration parameters. Constraint initializers enable dynamic constraint creation and configuration, supporting both simple boolean flags and complex parameter dictionaries. The protocol allows type checking while maintaining flexibility for different initialization patterns.

Example: :: class MaxRequestsInitializer: def init(self, max_requests: int): self.max_requests = max_requests

    def create_constraint(self) -> Constraint:
        def evaluate(state, request):
            if state.total_requests >= self.max_requests:
                return SchedulerUpdateAction(request_queuing="stop")
            return SchedulerUpdateAction(request_queuing="continue")
        return evaluate
Source code in src/guidellm/scheduler/constraints/constraint.py
@runtime_checkable
class ConstraintInitializer(Protocol):
    """
    Protocol for constraint initializer factory functions that create constraints.

    Defines the interface for factory objects that create constraint instances from
    configuration parameters. Constraint initializers enable dynamic constraint
    creation and configuration, supporting both simple boolean flags and complex
    parameter dictionaries. The protocol allows type checking while maintaining
    flexibility for different initialization patterns.

    Example:
    ::
        class MaxRequestsInitializer:
            def __init__(self, max_requests: int):
                self.max_requests = max_requests

            def create_constraint(self) -> Constraint:
                def evaluate(state, request):
                    if state.total_requests >= self.max_requests:
                        return SchedulerUpdateAction(request_queuing="stop")
                    return SchedulerUpdateAction(request_queuing="continue")
                return evaluate
    """

    def create_constraint(self, **kwargs) -> Constraint:
        """
        Create a constraint instance from configuration parameters.

        :param kwargs: Configuration parameters for constraint creation
        :return: Configured constraint evaluation function
        """

create_constraint(**kwargs)

Create a constraint instance from configuration parameters.

Parameters:

Name Type Description Default
kwargs

Configuration parameters for constraint creation

{}

Returns:

Type Description
Constraint

Configured constraint evaluation function

Source code in src/guidellm/scheduler/constraints/constraint.py
def create_constraint(self, **kwargs) -> Constraint:
    """
    Create a constraint instance from configuration parameters.

    :param kwargs: Configuration parameters for constraint creation
    :return: Configured constraint evaluation function
    """

ConstraintsInitializerFactory

Bases: RegistryMixin[ConstraintInitializer]

Registry factory for creating and managing constraint initializers.

Provides centralized access to registered constraint types with support for creating constraints from ConstraintArgs instances or pre-configured initializer instances. Handles constraint resolution and type validation for the scheduler constraint system.

Example: :: from guidellm.scheduler import ConstraintsInitializerFactory

# Register new constraint type
@ConstraintsInitializerFactory.register("new_constraint")
class NewConstraint:
    def create_constraint(self, **kwargs) -> Constraint:
        return lambda state, request: SchedulerUpdateAction()

# Create and use constraint
args = NewConstraintArgs(kind="new_constraint")
initializer = ConstraintsInitializerFactory.create(args)
constraint = initializer.create_constraint()
Source code in src/guidellm/scheduler/constraints/factory.py
class ConstraintsInitializerFactory(RegistryMixin[ConstraintInitializer]):
    """
    Registry factory for creating and managing constraint initializers.

    Provides centralized access to registered constraint types with support for
    creating constraints from ``ConstraintArgs`` instances or pre-configured
    initializer instances. Handles constraint resolution and type validation
    for the scheduler constraint system.

    Example:
    ::
        from guidellm.scheduler import ConstraintsInitializerFactory

        # Register new constraint type
        @ConstraintsInitializerFactory.register("new_constraint")
        class NewConstraint:
            def create_constraint(self, **kwargs) -> Constraint:
                return lambda state, request: SchedulerUpdateAction()

        # Create and use constraint
        args = NewConstraintArgs(kind="new_constraint")
        initializer = ConstraintsInitializerFactory.create(args)
        constraint = initializer.create_constraint()
    """

    @classmethod
    def create(cls, args: ConstraintArgs) -> ConstraintInitializer:
        """
        Create a constraint initializer from a ``ConstraintArgs`` instance.

        :param args: Validated constraint arguments with kind discriminator
        :return: Configured constraint initializer instance
        :raises ValueError: If args.kind is not registered in the factory
        """
        if cls.registry is None or args.kind not in cls.registry:
            raise ValueError(f"Unknown constraint discriminator: {args.kind}")

        initializer_class = cls.registry[args.kind]
        return initializer_class(args=args)  # type: ignore[operator]

    @classmethod
    def deserialize(
        cls, initializer_dict: dict[str, Any]
    ) -> SerializableConstraintInitializer | UnserializableConstraintInitializer:
        """
        Deserialize constraint initializer from dictionary format.

        :param initializer_dict: Dictionary representation of constraint initializer
        :return: Reconstructed constraint initializer instance
        :raises ValueError: If constraint type is unknown or cannot be deserialized
        """
        if initializer_dict.get("type_") == "unserializable":
            return UnserializableConstraintInitializer.model_validate(initializer_dict)

        if (
            cls.registry is not None
            and initializer_dict.get("type_")
            and initializer_dict["type_"] in cls.registry
        ):
            initializer_class = cls.registry[initializer_dict["type_"]]
            if hasattr(initializer_class, "model_validate"):
                return initializer_class.model_validate(initializer_dict)  # type: ignore[return-value]
            else:
                return initializer_class(**initializer_dict)  # type: ignore[return-value,operator]

        raise ValueError(
            f"Cannot deserialize unknown constraint initializer: "
            f"{initializer_dict.get('type_', 'unknown')}"
        )

    @classmethod
    def resolve(
        cls,
        initializers: dict[
            str,
            Constraint | ConstraintInitializer,
        ],
    ) -> dict[str, Constraint]:
        """
        Resolve constraint initializers to callable constraints.

        :param initializers: Dictionary mapping constraint keys to specifications.
            Values must be Constraint instances or ConstraintInitializer instances.
        :return: Dictionary mapping constraint keys to callable functions
        :raises TypeError: If a value is not a supported type
        """
        constraints = {}

        for key, val in initializers.items():
            if isinstance(val, Constraint):
                constraints[key] = val
            elif isinstance(val, ConstraintInitializer):
                constraints[key] = val.create_constraint()
            else:
                raise TypeError(
                    f"Constraint '{key}' has unsupported value type "
                    f"{type(val).__name__}. Expected a Constraint instance or "
                    f"ConstraintInitializer instance."
                )

        return constraints

create(args) classmethod

Create a constraint initializer from a ConstraintArgs instance.

Parameters:

Name Type Description Default
args ConstraintArgs

Validated constraint arguments with kind discriminator

required

Returns:

Type Description
ConstraintInitializer

Configured constraint initializer instance

Raises:

Type Description
ValueError

If args.kind is not registered in the factory

Source code in src/guidellm/scheduler/constraints/factory.py
@classmethod
def create(cls, args: ConstraintArgs) -> ConstraintInitializer:
    """
    Create a constraint initializer from a ``ConstraintArgs`` instance.

    :param args: Validated constraint arguments with kind discriminator
    :return: Configured constraint initializer instance
    :raises ValueError: If args.kind is not registered in the factory
    """
    if cls.registry is None or args.kind not in cls.registry:
        raise ValueError(f"Unknown constraint discriminator: {args.kind}")

    initializer_class = cls.registry[args.kind]
    return initializer_class(args=args)  # type: ignore[operator]

deserialize(initializer_dict) classmethod

Deserialize constraint initializer from dictionary format.

Parameters:

Name Type Description Default
initializer_dict dict[str, Any]

Dictionary representation of constraint initializer

required

Returns:

Type Description
SerializableConstraintInitializer | UnserializableConstraintInitializer

Reconstructed constraint initializer instance

Raises:

Type Description
ValueError

If constraint type is unknown or cannot be deserialized

Source code in src/guidellm/scheduler/constraints/factory.py
@classmethod
def deserialize(
    cls, initializer_dict: dict[str, Any]
) -> SerializableConstraintInitializer | UnserializableConstraintInitializer:
    """
    Deserialize constraint initializer from dictionary format.

    :param initializer_dict: Dictionary representation of constraint initializer
    :return: Reconstructed constraint initializer instance
    :raises ValueError: If constraint type is unknown or cannot be deserialized
    """
    if initializer_dict.get("type_") == "unserializable":
        return UnserializableConstraintInitializer.model_validate(initializer_dict)

    if (
        cls.registry is not None
        and initializer_dict.get("type_")
        and initializer_dict["type_"] in cls.registry
    ):
        initializer_class = cls.registry[initializer_dict["type_"]]
        if hasattr(initializer_class, "model_validate"):
            return initializer_class.model_validate(initializer_dict)  # type: ignore[return-value]
        else:
            return initializer_class(**initializer_dict)  # type: ignore[return-value,operator]

    raise ValueError(
        f"Cannot deserialize unknown constraint initializer: "
        f"{initializer_dict.get('type_', 'unknown')}"
    )

resolve(initializers) classmethod

Resolve constraint initializers to callable constraints.

Parameters:

Name Type Description Default
initializers dict[str, Constraint | ConstraintInitializer]

Dictionary mapping constraint keys to specifications. Values must be Constraint instances or ConstraintInitializer instances.

required

Returns:

Type Description
dict[str, Constraint]

Dictionary mapping constraint keys to callable functions

Raises:

Type Description
TypeError

If a value is not a supported type

Source code in src/guidellm/scheduler/constraints/factory.py
@classmethod
def resolve(
    cls,
    initializers: dict[
        str,
        Constraint | ConstraintInitializer,
    ],
) -> dict[str, Constraint]:
    """
    Resolve constraint initializers to callable constraints.

    :param initializers: Dictionary mapping constraint keys to specifications.
        Values must be Constraint instances or ConstraintInitializer instances.
    :return: Dictionary mapping constraint keys to callable functions
    :raises TypeError: If a value is not a supported type
    """
    constraints = {}

    for key, val in initializers.items():
        if isinstance(val, Constraint):
            constraints[key] = val
        elif isinstance(val, ConstraintInitializer):
            constraints[key] = val.create_constraint()
        else:
            raise TypeError(
                f"Constraint '{key}' has unsupported value type "
                f"{type(val).__name__}. Expected a Constraint instance or "
                f"ConstraintInitializer instance."
            )

    return constraints

Environment

Bases: ABC, Generic[RequestT, ResponseT], InfoMixin

Abstract interface for coordinating scheduler execution across distributed nodes.

Defines the protocol for managing distributed scheduler execution including parameter synchronization, timing coordination, state updates, error propagation, and result aggregation. Implementations handle distributed coordination complexity while providing a unified interface for scheduler orchestration.

Source code in src/guidellm/scheduler/environments.py
class Environment(ABC, Generic[RequestT, ResponseT], InfoMixin):
    """
    Abstract interface for coordinating scheduler execution across distributed nodes.

    Defines the protocol for managing distributed scheduler execution including
    parameter synchronization, timing coordination, state updates, error propagation,
    and result aggregation. Implementations handle distributed coordination complexity
    while providing a unified interface for scheduler orchestration.
    """

    @abstractmethod
    async def sync_run_params(
        self,
        requests: DatasetIterT[RequestT],
        strategy: SchedulingStrategy,
        constraints: dict[str, Constraint],
    ) -> tuple[
        DatasetIterT[RequestT],
        SchedulingStrategy,
        dict[str, Constraint],
    ]:
        """
        Synchronize execution parameters across nodes and resolve local scope.

        :param requests: Complete set of requests to process across all nodes
        :param strategy: Scheduling strategy to apply during execution
        :param constraints: Runtime constraints to enforce during execution
        :return: Tuple of (local_requests, strategy, constraints) for this node
        :raises Exception: If parameter synchronization fails or nodes inconsistent
        """
        ...

    @abstractmethod
    async def sync_run_start(self) -> float:
        """
        Coordinate synchronized start time across all nodes.

        :return: Unix timestamp when all nodes should begin processing
        :raises Exception: If startup synchronization fails across nodes
        """
        ...

    @abstractmethod
    async def update_run_iteration(
        self,
        response: ResponseT | None,
        request: RequestT,
        request_info: RequestInfo,
        state: SchedulerState,
    ):
        """
        Update environment state with completed request iteration results.

        :param response: Response generated for the request, if successful
        :param request: The processed request
        :param request_info: Metadata about request processing including timings
        :param state: Current scheduler state with metrics and progress
        :raises Exception: If state update fails or indicates critical errors
        """
        ...

    @abstractmethod
    async def sync_run_error(self, err: list[Exception] | Exception):
        """
        Handle and propagate errors across all active nodes.

        :param err: The exception(s) that occurred during execution
        """
        ...

    @abstractmethod
    async def sync_run_end(
        self,
    ) -> AsyncIterator[
        tuple[
            ResponseT | None,
            RequestT,
            RequestInfo,
            SchedulerState,
        ]
    ]:
        """
        Finalize execution and aggregate results from all nodes.

        :return: Iterator of (response, request, request_info, state) tuples from
            remote nodes in distributed environments, empty for non-distributed
        :raises Exception: Any errors that occurred during execution
        """
        yield None  # type: ignore[misc]

sync_run_end() abstractmethod async

Finalize execution and aggregate results from all nodes.

Returns:

Type Description
AsyncIterator[tuple[ResponseT | None, RequestT, RequestInfo, SchedulerState]]

Iterator of (response, request, request_info, state) tuples from remote nodes in distributed environments, empty for non-distributed

Raises:

Type Description
Exception

Any errors that occurred during execution

Source code in src/guidellm/scheduler/environments.py
@abstractmethod
async def sync_run_end(
    self,
) -> AsyncIterator[
    tuple[
        ResponseT | None,
        RequestT,
        RequestInfo,
        SchedulerState,
    ]
]:
    """
    Finalize execution and aggregate results from all nodes.

    :return: Iterator of (response, request, request_info, state) tuples from
        remote nodes in distributed environments, empty for non-distributed
    :raises Exception: Any errors that occurred during execution
    """
    yield None  # type: ignore[misc]

sync_run_error(err) abstractmethod async

Handle and propagate errors across all active nodes.

Parameters:

Name Type Description Default
err list[Exception] | Exception

The exception(s) that occurred during execution

required
Source code in src/guidellm/scheduler/environments.py
@abstractmethod
async def sync_run_error(self, err: list[Exception] | Exception):
    """
    Handle and propagate errors across all active nodes.

    :param err: The exception(s) that occurred during execution
    """
    ...

sync_run_params(requests, strategy, constraints) abstractmethod async

Synchronize execution parameters across nodes and resolve local scope.

Parameters:

Name Type Description Default
requests DatasetIterT[RequestT]

Complete set of requests to process across all nodes

required
strategy SchedulingStrategy

Scheduling strategy to apply during execution

required
constraints dict[str, Constraint]

Runtime constraints to enforce during execution

required

Returns:

Type Description
tuple[DatasetIterT[RequestT], SchedulingStrategy, dict[str, Constraint]]

Tuple of (local_requests, strategy, constraints) for this node

Raises:

Type Description
Exception

If parameter synchronization fails or nodes inconsistent

Source code in src/guidellm/scheduler/environments.py
@abstractmethod
async def sync_run_params(
    self,
    requests: DatasetIterT[RequestT],
    strategy: SchedulingStrategy,
    constraints: dict[str, Constraint],
) -> tuple[
    DatasetIterT[RequestT],
    SchedulingStrategy,
    dict[str, Constraint],
]:
    """
    Synchronize execution parameters across nodes and resolve local scope.

    :param requests: Complete set of requests to process across all nodes
    :param strategy: Scheduling strategy to apply during execution
    :param constraints: Runtime constraints to enforce during execution
    :return: Tuple of (local_requests, strategy, constraints) for this node
    :raises Exception: If parameter synchronization fails or nodes inconsistent
    """
    ...

sync_run_start() abstractmethod async

Coordinate synchronized start time across all nodes.

Returns:

Type Description
float

Unix timestamp when all nodes should begin processing

Raises:

Type Description
Exception

If startup synchronization fails across nodes

Source code in src/guidellm/scheduler/environments.py
@abstractmethod
async def sync_run_start(self) -> float:
    """
    Coordinate synchronized start time across all nodes.

    :return: Unix timestamp when all nodes should begin processing
    :raises Exception: If startup synchronization fails across nodes
    """
    ...

update_run_iteration(response, request, request_info, state) abstractmethod async

Update environment state with completed request iteration results.

Parameters:

Name Type Description Default
response ResponseT | None

Response generated for the request, if successful

required
request RequestT

The processed request

required
request_info RequestInfo

Metadata about request processing including timings

required
state SchedulerState

Current scheduler state with metrics and progress

required

Raises:

Type Description
Exception

If state update fails or indicates critical errors

Source code in src/guidellm/scheduler/environments.py
@abstractmethod
async def update_run_iteration(
    self,
    response: ResponseT | None,
    request: RequestT,
    request_info: RequestInfo,
    state: SchedulerState,
):
    """
    Update environment state with completed request iteration results.

    :param response: Response generated for the request, if successful
    :param request: The processed request
    :param request_info: Metadata about request processing including timings
    :param state: Current scheduler state with metrics and progress
    :raises Exception: If state update fails or indicates critical errors
    """
    ...

MaxDurationConstraint

Bases: PydanticConstraintInitializer

Constraint that limits execution based on maximum time duration.

Stops both request queuing and processing when the elapsed time since scheduler start exceeds the maximum duration. Provides progress tracking based on remaining time and completion fraction.

Source code in src/guidellm/scheduler/constraints/request.py
@ConstraintsInitializerFactory.register("max_duration")
class MaxDurationConstraint(PydanticConstraintInitializer):
    """
    Constraint that limits execution based on maximum time duration.

    Stops both request queuing and processing when the elapsed time since scheduler
    start exceeds the maximum duration. Provides progress tracking based on
    remaining time and completion fraction.
    """

    type_: Literal["max_duration"] = "max_duration"  # type: ignore[assignment]
    args: MaxDurationConstraintArgs = Field(
        description="Configuration arguments for max duration constraint",
    )
    current_index: int = Field(default=-1, description="Current index in duration list")

    def create_constraint(self, **_kwargs) -> Constraint:
        """
        Return self as the constraint instance.

        :param kwargs: Additional keyword arguments (unused)
        :return: Self instance as the constraint
        """
        self.current_index += 1

        return cast("Constraint", self.model_copy())

    def __call__(
        self, state: SchedulerState, request_info: RequestInfo
    ) -> SchedulerUpdateAction:
        """
        Evaluate constraint against current scheduler state and elapsed time.

        :param state: Current scheduler state with start time
        :param request_info: Individual request information (unused)
        :return: Action indicating whether to continue or stop operations
        """
        _ = request_info  # Unused parameters
        current_index = max(0, self.current_index)
        max_duration = (
            self.args.seconds
            if isinstance(self.args.seconds, int | float)
            else self.args.seconds[min(current_index, len(self.args.seconds) - 1)]
        )

        start_time = state.start_requests_time or state.start_time
        current_time = time.time()
        elapsed = current_time - start_time
        duration_exceeded = elapsed >= max_duration
        remaining_duration = min(max(0.0, max_duration - elapsed), max_duration)
        stop_time = None if not duration_exceeded else start_time + max_duration

        return SchedulerUpdateAction(
            request_queuing="stop" if duration_exceeded else "continue",
            request_processing="stop_local" if duration_exceeded else "continue",
            metadata={
                "max_duration": max_duration,
                "elapsed_time": elapsed,
                "duration_exceeded": duration_exceeded,
                "start_time": start_time,
                "current_time": current_time,
                "stop_time": stop_time,
            },
            progress=SchedulerProgress(
                remaining_duration=remaining_duration,
                total_duration=max_duration,
                stop_time=stop_time,
            ),
        )

__call__(state, request_info)

Evaluate constraint against current scheduler state and elapsed time.

Parameters:

Name Type Description Default
state SchedulerState

Current scheduler state with start time

required
request_info RequestInfo

Individual request information (unused)

required

Returns:

Type Description
SchedulerUpdateAction

Action indicating whether to continue or stop operations

Source code in src/guidellm/scheduler/constraints/request.py
def __call__(
    self, state: SchedulerState, request_info: RequestInfo
) -> SchedulerUpdateAction:
    """
    Evaluate constraint against current scheduler state and elapsed time.

    :param state: Current scheduler state with start time
    :param request_info: Individual request information (unused)
    :return: Action indicating whether to continue or stop operations
    """
    _ = request_info  # Unused parameters
    current_index = max(0, self.current_index)
    max_duration = (
        self.args.seconds
        if isinstance(self.args.seconds, int | float)
        else self.args.seconds[min(current_index, len(self.args.seconds) - 1)]
    )

    start_time = state.start_requests_time or state.start_time
    current_time = time.time()
    elapsed = current_time - start_time
    duration_exceeded = elapsed >= max_duration
    remaining_duration = min(max(0.0, max_duration - elapsed), max_duration)
    stop_time = None if not duration_exceeded else start_time + max_duration

    return SchedulerUpdateAction(
        request_queuing="stop" if duration_exceeded else "continue",
        request_processing="stop_local" if duration_exceeded else "continue",
        metadata={
            "max_duration": max_duration,
            "elapsed_time": elapsed,
            "duration_exceeded": duration_exceeded,
            "start_time": start_time,
            "current_time": current_time,
            "stop_time": stop_time,
        },
        progress=SchedulerProgress(
            remaining_duration=remaining_duration,
            total_duration=max_duration,
            stop_time=stop_time,
        ),
    )

create_constraint(**_kwargs)

Return self as the constraint instance.

Parameters:

Name Type Description Default
kwargs

Additional keyword arguments (unused)

required

Returns:

Type Description
Constraint

Self instance as the constraint

Source code in src/guidellm/scheduler/constraints/request.py
def create_constraint(self, **_kwargs) -> Constraint:
    """
    Return self as the constraint instance.

    :param kwargs: Additional keyword arguments (unused)
    :return: Self instance as the constraint
    """
    self.current_index += 1

    return cast("Constraint", self.model_copy())

MaxDurationConstraintArgs

Bases: ConstraintArgs

Arguments for maximum duration constraint.

Limits benchmark execution time per strategy.

Attributes:

Name Type Description
kind Literal['max_duration']

Always "max_duration"

Source code in src/guidellm/scheduler/constraints/request.py
@ConstraintArgs.register("max_duration")
class MaxDurationConstraintArgs(ConstraintArgs):
    """
    Arguments for maximum duration constraint.

    Limits benchmark execution time per strategy.

    :cvar kind: Always "max_duration"
    """

    kind: Literal["max_duration"] = Field(
        default="max_duration",
        description="Constraint type discriminator",
    )
    seconds: PositiveNumOrList = Field(
        description="Maximum duration in seconds before stopping execution",
    )

MaxErrorRateConstraint

Bases: PydanticConstraintInitializer

Constraint that limits execution based on sliding window error rate.

Tracks error status of recent requests in a sliding window and stops all processing when the error rate exceeds the threshold. Only applies the constraint after processing enough requests to fill the minimum window size for statistical significance.

Source code in src/guidellm/scheduler/constraints/error.py
@ConstraintsInitializerFactory.register("max_error_rate")
class MaxErrorRateConstraint(PydanticConstraintInitializer):
    """
    Constraint that limits execution based on sliding window error rate.

    Tracks error status of recent requests in a sliding window and stops all
    processing when the error rate exceeds the threshold. Only applies the
    constraint after processing enough requests to fill the minimum window size
    for statistical significance.
    """

    type_: Literal["max_error_rate"] = "max_error_rate"  # type: ignore[assignment]
    args: MaxErrorRateConstraintArgs = Field(
        description="Configuration arguments for max error rate constraint",
    )
    error_window: list[bool] = Field(
        default_factory=list,
        description="Sliding window tracking error status of recent requests",
    )
    current_index: int = Field(
        default=-1, description="Current index in the error window"
    )

    def create_constraint(self, **_kwargs) -> Constraint:
        """
        Create a new instance of MaxErrorRateConstraint (due to stateful window).

        :param kwargs: Additional keyword arguments (unused)
        :return: New instance of the constraint
        """
        self.current_index += 1

        return cast("Constraint", self.model_copy())

    def __call__(
        self, state: SchedulerState, request_info: RequestInfo
    ) -> SchedulerUpdateAction:
        """
        Evaluate constraint against sliding window error rate.

        :param state: Current scheduler state with request counts
        :param request_info: Individual request with completion status
        :return: Action indicating whether to continue or stop operations
        """
        current_index = max(0, self.current_index)
        max_error_rate = (
            self.args.rate
            if isinstance(self.args.rate, int | float)
            else self.args.rate[min(current_index, len(self.args.rate) - 1)]
        )

        if request_info.status in ["completed", "errored", "cancelled"]:
            self.error_window.append(request_info.status == "errored")
            if len(self.error_window) > self.args.window:
                self.error_window.pop(0)

        error_count = sum(self.error_window)
        window_requests = len(self.error_window)
        error_rate = (
            error_count / float(window_requests) if window_requests > 0 else 0.0
        )
        exceeded_min_processed = state.processed_requests >= self.args.window
        exceeded_error_rate = error_rate >= max_error_rate
        exceeded = exceeded_min_processed and exceeded_error_rate
        stop_time = None if not exceeded else request_info.completed_at or time.time()

        return SchedulerUpdateAction(
            request_queuing="stop" if exceeded else "continue",
            request_processing="stop_all" if exceeded else "continue",
            metadata={
                "max_error_rate": max_error_rate,
                "window_size": self.args.window,
                "error_count": error_count,
                "processed_count": state.processed_requests,
                "current_window_size": len(self.error_window),
                "current_error_rate": error_rate,
                "exceeded_min_processed": exceeded_min_processed,
                "exceeded_error_rate": exceeded_error_rate,
                "exceeded": exceeded,
                "stop_time": stop_time,
            },
        )

__call__(state, request_info)

Evaluate constraint against sliding window error rate.

Parameters:

Name Type Description Default
state SchedulerState

Current scheduler state with request counts

required
request_info RequestInfo

Individual request with completion status

required

Returns:

Type Description
SchedulerUpdateAction

Action indicating whether to continue or stop operations

Source code in src/guidellm/scheduler/constraints/error.py
def __call__(
    self, state: SchedulerState, request_info: RequestInfo
) -> SchedulerUpdateAction:
    """
    Evaluate constraint against sliding window error rate.

    :param state: Current scheduler state with request counts
    :param request_info: Individual request with completion status
    :return: Action indicating whether to continue or stop operations
    """
    current_index = max(0, self.current_index)
    max_error_rate = (
        self.args.rate
        if isinstance(self.args.rate, int | float)
        else self.args.rate[min(current_index, len(self.args.rate) - 1)]
    )

    if request_info.status in ["completed", "errored", "cancelled"]:
        self.error_window.append(request_info.status == "errored")
        if len(self.error_window) > self.args.window:
            self.error_window.pop(0)

    error_count = sum(self.error_window)
    window_requests = len(self.error_window)
    error_rate = (
        error_count / float(window_requests) if window_requests > 0 else 0.0
    )
    exceeded_min_processed = state.processed_requests >= self.args.window
    exceeded_error_rate = error_rate >= max_error_rate
    exceeded = exceeded_min_processed and exceeded_error_rate
    stop_time = None if not exceeded else request_info.completed_at or time.time()

    return SchedulerUpdateAction(
        request_queuing="stop" if exceeded else "continue",
        request_processing="stop_all" if exceeded else "continue",
        metadata={
            "max_error_rate": max_error_rate,
            "window_size": self.args.window,
            "error_count": error_count,
            "processed_count": state.processed_requests,
            "current_window_size": len(self.error_window),
            "current_error_rate": error_rate,
            "exceeded_min_processed": exceeded_min_processed,
            "exceeded_error_rate": exceeded_error_rate,
            "exceeded": exceeded,
            "stop_time": stop_time,
        },
    )

create_constraint(**_kwargs)

Create a new instance of MaxErrorRateConstraint (due to stateful window).

Parameters:

Name Type Description Default
kwargs

Additional keyword arguments (unused)

required

Returns:

Type Description
Constraint

New instance of the constraint

Source code in src/guidellm/scheduler/constraints/error.py
def create_constraint(self, **_kwargs) -> Constraint:
    """
    Create a new instance of MaxErrorRateConstraint (due to stateful window).

    :param kwargs: Additional keyword arguments (unused)
    :return: New instance of the constraint
    """
    self.current_index += 1

    return cast("Constraint", self.model_copy())

MaxErrorRateConstraintArgs

Bases: ConstraintArgs

Arguments for maximum error rate constraint (sliding window).

Stops execution when the windowed error rate exceeds the threshold.

Attributes:

Name Type Description
kind Literal['max_error_rate']

Always "max_error_rate"

Source code in src/guidellm/scheduler/constraints/error.py
@ConstraintArgs.register("max_error_rate")
class MaxErrorRateConstraintArgs(ConstraintArgs):
    """
    Arguments for maximum error rate constraint (sliding window).

    Stops execution when the windowed error rate exceeds the threshold.

    :cvar kind: Always "max_error_rate"
    """

    kind: Literal["max_error_rate"] = Field(
        default="max_error_rate",
        description="Constraint type discriminator",
    )
    rate: ErrorRateOrList = Field(
        description="Maximum error rate (0.0 to 1.0) before stopping execution",
    )
    window: int | float = Field(
        default_factory=lambda: settings.constraint_error_window_size,
        gt=0,
        description="Size of sliding window for calculating error rate",
    )

MaxErrorsConstraint

Bases: PydanticConstraintInitializer

Constraint that limits execution based on absolute error count.

Stops both request queuing and all request processing when the total number of errored requests reaches the maximum threshold. Uses global error tracking across all requests for immediate constraint evaluation.

Source code in src/guidellm/scheduler/constraints/error.py
@ConstraintsInitializerFactory.register("max_errors")
class MaxErrorsConstraint(PydanticConstraintInitializer):
    """
    Constraint that limits execution based on absolute error count.

    Stops both request queuing and all request processing when the total number
    of errored requests reaches the maximum threshold. Uses global error tracking
    across all requests for immediate constraint evaluation.
    """

    type_: Literal["max_errors"] = "max_errors"  # type: ignore[assignment]
    args: MaxErrorsConstraintArgs = Field(
        description="Configuration arguments for max errors constraint",
    )
    current_index: int = Field(default=-1, description="Current index in error list")

    def create_constraint(self, **_kwargs) -> Constraint:
        """
        Return self as the constraint instance.

        :param kwargs: Additional keyword arguments (unused)
        :return: Self instance as the constraint
        """
        self.current_index += 1

        return cast("Constraint", self.model_copy())

    def __call__(
        self, state: SchedulerState, request_info: RequestInfo
    ) -> SchedulerUpdateAction:
        """
        Evaluate constraint against current error count.

        :param state: Current scheduler state with error counts
        :param request_info: Individual request information (unused)
        :return: Action indicating whether to continue or stop operations
        """
        _ = request_info  # Unused parameters
        current_index = max(0, self.current_index)
        max_errors = (
            self.args.count
            if isinstance(self.args.count, int | float)
            else self.args.count[min(current_index, len(self.args.count) - 1)]
        )
        errors_exceeded = state.errored_requests >= max_errors
        stop_time = (
            None if not errors_exceeded else request_info.completed_at or time.time()
        )

        return SchedulerUpdateAction(
            request_queuing="stop" if errors_exceeded else "continue",
            request_processing="stop_all" if errors_exceeded else "continue",
            metadata={
                "max_errors": max_errors,
                "errors_exceeded": errors_exceeded,
                "current_errors": state.errored_requests,
                "stop_time": stop_time,
            },
            progress=SchedulerProgress(stop_time=stop_time),
        )

__call__(state, request_info)

Evaluate constraint against current error count.

Parameters:

Name Type Description Default
state SchedulerState

Current scheduler state with error counts

required
request_info RequestInfo

Individual request information (unused)

required

Returns:

Type Description
SchedulerUpdateAction

Action indicating whether to continue or stop operations

Source code in src/guidellm/scheduler/constraints/error.py
def __call__(
    self, state: SchedulerState, request_info: RequestInfo
) -> SchedulerUpdateAction:
    """
    Evaluate constraint against current error count.

    :param state: Current scheduler state with error counts
    :param request_info: Individual request information (unused)
    :return: Action indicating whether to continue or stop operations
    """
    _ = request_info  # Unused parameters
    current_index = max(0, self.current_index)
    max_errors = (
        self.args.count
        if isinstance(self.args.count, int | float)
        else self.args.count[min(current_index, len(self.args.count) - 1)]
    )
    errors_exceeded = state.errored_requests >= max_errors
    stop_time = (
        None if not errors_exceeded else request_info.completed_at or time.time()
    )

    return SchedulerUpdateAction(
        request_queuing="stop" if errors_exceeded else "continue",
        request_processing="stop_all" if errors_exceeded else "continue",
        metadata={
            "max_errors": max_errors,
            "errors_exceeded": errors_exceeded,
            "current_errors": state.errored_requests,
            "stop_time": stop_time,
        },
        progress=SchedulerProgress(stop_time=stop_time),
    )

create_constraint(**_kwargs)

Return self as the constraint instance.

Parameters:

Name Type Description Default
kwargs

Additional keyword arguments (unused)

required

Returns:

Type Description
Constraint

Self instance as the constraint

Source code in src/guidellm/scheduler/constraints/error.py
def create_constraint(self, **_kwargs) -> Constraint:
    """
    Return self as the constraint instance.

    :param kwargs: Additional keyword arguments (unused)
    :return: Self instance as the constraint
    """
    self.current_index += 1

    return cast("Constraint", self.model_copy())

MaxErrorsConstraintArgs

Bases: ConstraintArgs

Arguments for maximum error count constraint.

Stops execution when total errors reach the threshold.

Attributes:

Name Type Description
kind Literal['max_errors']

Always "max_errors"

Source code in src/guidellm/scheduler/constraints/error.py
@ConstraintArgs.register("max_errors")
class MaxErrorsConstraintArgs(ConstraintArgs):
    """
    Arguments for maximum error count constraint.

    Stops execution when total errors reach the threshold.

    :cvar kind: Always "max_errors"
    """

    kind: Literal["max_errors"] = Field(
        default="max_errors",
        description="Constraint type discriminator",
    )
    count: PositiveNumOrList = Field(
        description="Maximum number of errors before stopping execution",
    )

MaxGlobalErrorRateConstraint

Bases: PydanticConstraintInitializer

Constraint that limits execution based on global error rate.

Calculates error rate across all processed requests and stops all processing when the rate exceeds the threshold. Only applies the constraint after processing the minimum number of requests to ensure statistical significance for global error rate calculations.

Source code in src/guidellm/scheduler/constraints/error.py
@ConstraintsInitializerFactory.register("max_global_error_rate")
class MaxGlobalErrorRateConstraint(PydanticConstraintInitializer):
    """
    Constraint that limits execution based on global error rate.

    Calculates error rate across all processed requests and stops all processing
    when the rate exceeds the threshold. Only applies the constraint after
    processing the minimum number of requests to ensure statistical significance
    for global error rate calculations.
    """

    type_: Literal["max_global_error_rate"] = "max_global_error_rate"  # type: ignore[assignment]
    args: MaxGlobalErrorRateConstraintArgs = Field(
        description="Configuration arguments for max global error rate constraint",
    )
    current_index: int = Field(
        default=-1, description="Current index for list-based max_error_rate values"
    )

    def create_constraint(self, **_kwargs) -> Constraint:
        """
        Return self as the constraint instance.

        :param kwargs: Additional keyword arguments (unused)
        :return: Self instance as the constraint
        """
        self.current_index += 1

        return cast("Constraint", self.model_copy())

    def __call__(
        self, state: SchedulerState, request_info: RequestInfo
    ) -> SchedulerUpdateAction:
        """
        Evaluate constraint against global error rate.

        :param state: Current scheduler state with global request and error counts
        :param request_info: Individual request information (unused)
        :return: Action indicating whether to continue or stop operations
        """
        _ = request_info  # Unused parameters
        current_index = max(0, self.current_index)
        max_error_rate = (
            self.args.rate
            if isinstance(self.args.rate, int | float)
            else self.args.rate[min(current_index, len(self.args.rate) - 1)]
        )

        exceeded_min_processed = (
            self.args.minimum is None or state.processed_requests >= self.args.minimum
        )
        error_rate = (
            state.errored_requests / float(state.processed_requests)
            if state.processed_requests > 0
            else 0.0
        )
        exceeded_error_rate = error_rate >= max_error_rate
        exceeded = exceeded_min_processed and exceeded_error_rate
        stop_time = None if not exceeded else request_info.completed_at or time.time()

        return SchedulerUpdateAction(
            request_queuing="stop" if exceeded else "continue",
            request_processing="stop_all" if exceeded else "continue",
            metadata={
                "max_error_rate": max_error_rate,
                "min_processed": self.args.minimum,
                "processed_requests": state.processed_requests,
                "errored_requests": state.errored_requests,
                "error_rate": error_rate,
                "exceeded_min_processed": exceeded_min_processed,
                "exceeded_error_rate": exceeded_error_rate,
                "exceeded": exceeded,
                "stop_time": stop_time,
            },
            progress=SchedulerProgress(stop_time=stop_time),
        )

__call__(state, request_info)

Evaluate constraint against global error rate.

Parameters:

Name Type Description Default
state SchedulerState

Current scheduler state with global request and error counts

required
request_info RequestInfo

Individual request information (unused)

required

Returns:

Type Description
SchedulerUpdateAction

Action indicating whether to continue or stop operations

Source code in src/guidellm/scheduler/constraints/error.py
def __call__(
    self, state: SchedulerState, request_info: RequestInfo
) -> SchedulerUpdateAction:
    """
    Evaluate constraint against global error rate.

    :param state: Current scheduler state with global request and error counts
    :param request_info: Individual request information (unused)
    :return: Action indicating whether to continue or stop operations
    """
    _ = request_info  # Unused parameters
    current_index = max(0, self.current_index)
    max_error_rate = (
        self.args.rate
        if isinstance(self.args.rate, int | float)
        else self.args.rate[min(current_index, len(self.args.rate) - 1)]
    )

    exceeded_min_processed = (
        self.args.minimum is None or state.processed_requests >= self.args.minimum
    )
    error_rate = (
        state.errored_requests / float(state.processed_requests)
        if state.processed_requests > 0
        else 0.0
    )
    exceeded_error_rate = error_rate >= max_error_rate
    exceeded = exceeded_min_processed and exceeded_error_rate
    stop_time = None if not exceeded else request_info.completed_at or time.time()

    return SchedulerUpdateAction(
        request_queuing="stop" if exceeded else "continue",
        request_processing="stop_all" if exceeded else "continue",
        metadata={
            "max_error_rate": max_error_rate,
            "min_processed": self.args.minimum,
            "processed_requests": state.processed_requests,
            "errored_requests": state.errored_requests,
            "error_rate": error_rate,
            "exceeded_min_processed": exceeded_min_processed,
            "exceeded_error_rate": exceeded_error_rate,
            "exceeded": exceeded,
            "stop_time": stop_time,
        },
        progress=SchedulerProgress(stop_time=stop_time),
    )

create_constraint(**_kwargs)

Return self as the constraint instance.

Parameters:

Name Type Description Default
kwargs

Additional keyword arguments (unused)

required

Returns:

Type Description
Constraint

Self instance as the constraint

Source code in src/guidellm/scheduler/constraints/error.py
def create_constraint(self, **_kwargs) -> Constraint:
    """
    Return self as the constraint instance.

    :param kwargs: Additional keyword arguments (unused)
    :return: Self instance as the constraint
    """
    self.current_index += 1

    return cast("Constraint", self.model_copy())

MaxGlobalErrorRateConstraintArgs

Bases: ConstraintArgs

Arguments for maximum global error rate constraint.

Stops execution when the overall error rate across all requests exceeds the threshold. Only applies after min_processed requests are completed.

Attributes:

Name Type Description
kind Literal['max_global_error_rate']

Always "max_global_error_rate"

Source code in src/guidellm/scheduler/constraints/error.py
@ConstraintArgs.register("max_global_error_rate")
class MaxGlobalErrorRateConstraintArgs(ConstraintArgs):
    """
    Arguments for maximum global error rate constraint.

    Stops execution when the overall error rate across all requests exceeds
    the threshold. Only applies after min_processed requests are completed.

    :cvar kind: Always "max_global_error_rate"
    """

    kind: Literal["max_global_error_rate"] = Field(
        default="max_global_error_rate",
        description="Constraint type discriminator",
    )
    rate: ErrorRateOrList = Field(
        description="Maximum global error rate (0.0 to 1.0) before stopping",
    )
    minimum: int | float | None = Field(
        default_factory=lambda: settings.constraint_error_min_processed,
        gt=0,
        description="Minimum requests processed before applying error rate constraint",
    )

MaxNumberConstraint

Bases: PydanticConstraintInitializer

Constraint that limits execution based on maximum request counts.

Stops request queuing when created requests reach the limit and stops local request processing when processed requests reach the limit. Provides progress tracking based on remaining requests and completion fraction.

Source code in src/guidellm/scheduler/constraints/request.py
@ConstraintsInitializerFactory.register("max_requests")
class MaxNumberConstraint(PydanticConstraintInitializer):
    """
    Constraint that limits execution based on maximum request counts.

    Stops request queuing when created requests reach the limit and stops local
    request processing when processed requests reach the limit. Provides progress
    tracking based on remaining requests and completion fraction.
    """

    type_: Literal["max_requests"] = "max_requests"  # type: ignore[assignment]
    args: MaxRequestsConstraintArgs = Field(
        description="Configuration arguments for max request count constraint",
    )
    current_index: int = Field(
        default=-1, description="Current index for list-based max_num values"
    )

    def create_constraint(self, **_kwargs) -> Constraint:
        """
        Return self as the constraint instance.

        :param kwargs: Additional keyword arguments (unused)
        :return: Self instance as the constraint
        """
        self.current_index += 1

        return cast("Constraint", self.model_copy())

    def __call__(
        self, state: SchedulerState, request_info: RequestInfo
    ) -> SchedulerUpdateAction:
        """
        Evaluate constraint against current scheduler state and request count.

        :param state: Current scheduler state with request counts
        :param request_info: Individual request information (unused)
        :return: Action indicating whether to continue or stop operations
        """
        _ = request_info  # Unused parameters
        current_index = max(0, self.current_index)
        max_num = (
            self.args.count
            if isinstance(self.args.count, int | float)
            else self.args.count[min(current_index, len(self.args.count) - 1)]
        )

        create_exceeded = state.created_requests >= max_num
        processed_exceeded = state.processed_requests >= max_num
        remaining_requests = min(max(0, max_num - state.processed_requests), max_num)
        stop_time = (
            None if remaining_requests > 0 else request_info.completed_at or time.time()
        )

        return SchedulerUpdateAction(
            request_queuing="stop" if create_exceeded else "continue",
            request_processing="stop_local" if processed_exceeded else "continue",
            metadata={
                "max_requests": max_num,
                "create_exceeded": create_exceeded,
                "processed_exceeded": processed_exceeded,
                "created_requests": state.created_requests,
                "processed_requests": state.processed_requests,
                "remaining_requests": remaining_requests,
                "stop_time": stop_time,
            },
            progress=SchedulerProgress(
                remaining_requests=remaining_requests,
                total_requests=max_num,
                stop_time=stop_time,
            ),
        )

__call__(state, request_info)

Evaluate constraint against current scheduler state and request count.

Parameters:

Name Type Description Default
state SchedulerState

Current scheduler state with request counts

required
request_info RequestInfo

Individual request information (unused)

required

Returns:

Type Description
SchedulerUpdateAction

Action indicating whether to continue or stop operations

Source code in src/guidellm/scheduler/constraints/request.py
def __call__(
    self, state: SchedulerState, request_info: RequestInfo
) -> SchedulerUpdateAction:
    """
    Evaluate constraint against current scheduler state and request count.

    :param state: Current scheduler state with request counts
    :param request_info: Individual request information (unused)
    :return: Action indicating whether to continue or stop operations
    """
    _ = request_info  # Unused parameters
    current_index = max(0, self.current_index)
    max_num = (
        self.args.count
        if isinstance(self.args.count, int | float)
        else self.args.count[min(current_index, len(self.args.count) - 1)]
    )

    create_exceeded = state.created_requests >= max_num
    processed_exceeded = state.processed_requests >= max_num
    remaining_requests = min(max(0, max_num - state.processed_requests), max_num)
    stop_time = (
        None if remaining_requests > 0 else request_info.completed_at or time.time()
    )

    return SchedulerUpdateAction(
        request_queuing="stop" if create_exceeded else "continue",
        request_processing="stop_local" if processed_exceeded else "continue",
        metadata={
            "max_requests": max_num,
            "create_exceeded": create_exceeded,
            "processed_exceeded": processed_exceeded,
            "created_requests": state.created_requests,
            "processed_requests": state.processed_requests,
            "remaining_requests": remaining_requests,
            "stop_time": stop_time,
        },
        progress=SchedulerProgress(
            remaining_requests=remaining_requests,
            total_requests=max_num,
            stop_time=stop_time,
        ),
    )

create_constraint(**_kwargs)

Return self as the constraint instance.

Parameters:

Name Type Description Default
kwargs

Additional keyword arguments (unused)

required

Returns:

Type Description
Constraint

Self instance as the constraint

Source code in src/guidellm/scheduler/constraints/request.py
def create_constraint(self, **_kwargs) -> Constraint:
    """
    Return self as the constraint instance.

    :param kwargs: Additional keyword arguments (unused)
    :return: Self instance as the constraint
    """
    self.current_index += 1

    return cast("Constraint", self.model_copy())

MaxRequestsConstraintArgs

Bases: ConstraintArgs

Arguments for maximum request count constraint.

Limits the number of requests processed per strategy.

Attributes:

Name Type Description
kind Literal['max_requests']

Always "max_requests"

Source code in src/guidellm/scheduler/constraints/request.py
@ConstraintArgs.register("max_requests")
class MaxRequestsConstraintArgs(ConstraintArgs):
    """
    Arguments for maximum request count constraint.

    Limits the number of requests processed per strategy.

    :cvar kind: Always "max_requests"
    """

    kind: Literal["max_requests"] = Field(
        default="max_requests",
        description="Constraint type discriminator",
    )
    count: PositiveNumOrList = Field(
        description="Maximum number of requests before stopping execution",
    )

NonDistributedEnvironment

Bases: Environment[RequestT, ResponseT]

Single-node scheduler execution environment with minimal coordination overhead.

Implements the Environment interface with no-op synchronization for local testing, development, and single-machine benchmarking. All synchronization methods return immediately without distributed coordination logic.

Example: :: from guidellm.scheduler import ( MaxNumberConstraint, MaxRequestsConstraintArgs, NonDistributedEnvironment, RequestInfo, SchedulerState, SynchronousStrategy, )

env = NonDistributedEnvironment()
requests = [f"req_{ind}" for ind in range(5)]
strategy = SynchronousStrategy()
args = MaxRequestsConstraintArgs(max_num=5)
constraints = {"max_requests": MaxNumberConstraint(args=args)}
state = SchedulerState()

local_req, local_strat, local_const = await env.sync_run_params(
    requests, strategy, constraints
)
start_time = await env.sync_run_start()
for req in local_req:
    state.processed_requests += 1
    await env.update_run_iteration(f"resp_{req}", req, RequestInfo(), state)
async for nonlocal_req in env.sync_run_end():
    state.processed_requests += 1
Source code in src/guidellm/scheduler/environments.py
class NonDistributedEnvironment(Environment[RequestT, ResponseT]):
    """
    Single-node scheduler execution environment with minimal coordination overhead.

    Implements the Environment interface with no-op synchronization for local testing,
    development, and single-machine benchmarking. All synchronization methods return
    immediately without distributed coordination logic.

    Example:
    ::
        from guidellm.scheduler import (
            MaxNumberConstraint,
            MaxRequestsConstraintArgs,
            NonDistributedEnvironment,
            RequestInfo,
            SchedulerState,
            SynchronousStrategy,
        )

        env = NonDistributedEnvironment()
        requests = [f"req_{ind}" for ind in range(5)]
        strategy = SynchronousStrategy()
        args = MaxRequestsConstraintArgs(max_num=5)
        constraints = {"max_requests": MaxNumberConstraint(args=args)}
        state = SchedulerState()

        local_req, local_strat, local_const = await env.sync_run_params(
            requests, strategy, constraints
        )
        start_time = await env.sync_run_start()
        for req in local_req:
            state.processed_requests += 1
            await env.update_run_iteration(f"resp_{req}", req, RequestInfo(), state)
        async for nonlocal_req in env.sync_run_end():
            state.processed_requests += 1
    """

    def __init__(self):
        """
        Initialize single-node environment with empty error storage.
        """
        self.run_errors: list[Exception] = []

    async def sync_run_params(
        self,
        requests: DatasetIterT[RequestT],
        strategy: SchedulingStrategy,
        constraints: dict[str, Constraint],
    ) -> tuple[
        DatasetIterT[RequestT],
        SchedulingStrategy,
        dict[str, Constraint],
    ]:
        """
        Return parameters unchanged for single-node execution.

        :param requests: Requests to process locally
        :param strategy: Scheduling strategy to apply during execution
        :param constraints: Runtime constraints to enforce during execution
        :return: Original (requests, strategy, constraints) tuple unchanged
        """
        return requests, strategy, constraints

    async def sync_run_start(self) -> float:
        """
        Return current time plus configured delay for single-node startup.

        :return: Unix timestamp when execution should begin
        """
        return time.time() + settings.scheduler_start_delay_non_distributed

    async def update_run_iteration(
        self,
        response: ResponseT | None,
        request: RequestT,
        request_info: RequestInfo,
        state: SchedulerState,
    ):
        """
        No-op for single-node execution with no distributed state synchronization.

        :param response: Response generated for the request, if successful
        :param request: The processed request
        :param request_info: Metadata about request processing including timings
        :param state: Current scheduler state with metrics and progress
        """

    async def sync_run_error(self, err: Exception | list[Exception]):
        """
        Store error for later propagation during run finalization.

        :param err: The exception(s) that occurred during execution
        """
        err = [err] if not isinstance(err, list) else err
        self.run_errors.extend(err)

    async def sync_run_end(
        self,
    ) -> AsyncIterator[
        tuple[
            ResponseT | None,
            RequestT,
            RequestInfo,
            SchedulerState,
        ]
    ]:
        """
        Finalize single-node execution and propagate any stored errors.

        :return: Empty iterator as there are no remote nodes
        :raises Exception: Any error stored during execution via sync_run_error
        """
        if self.run_errors:
            if len(self.run_errors) == 1:
                raise self.run_errors[0]
            else:
                raise RuntimeError(
                    f"Errors occurred during execution: {self.run_errors}"
                )

        if False:
            # Force compiler to recognize as generator
            yield None  # type: ignore[misc]

__init__()

Initialize single-node environment with empty error storage.

Source code in src/guidellm/scheduler/environments.py
def __init__(self):
    """
    Initialize single-node environment with empty error storage.
    """
    self.run_errors: list[Exception] = []

sync_run_end() async

Finalize single-node execution and propagate any stored errors.

Returns:

Type Description
AsyncIterator[tuple[ResponseT | None, RequestT, RequestInfo, SchedulerState]]

Empty iterator as there are no remote nodes

Raises:

Type Description
Exception

Any error stored during execution via sync_run_error

Source code in src/guidellm/scheduler/environments.py
async def sync_run_end(
    self,
) -> AsyncIterator[
    tuple[
        ResponseT | None,
        RequestT,
        RequestInfo,
        SchedulerState,
    ]
]:
    """
    Finalize single-node execution and propagate any stored errors.

    :return: Empty iterator as there are no remote nodes
    :raises Exception: Any error stored during execution via sync_run_error
    """
    if self.run_errors:
        if len(self.run_errors) == 1:
            raise self.run_errors[0]
        else:
            raise RuntimeError(
                f"Errors occurred during execution: {self.run_errors}"
            )

    if False:
        # Force compiler to recognize as generator
        yield None  # type: ignore[misc]

sync_run_error(err) async

Store error for later propagation during run finalization.

Parameters:

Name Type Description Default
err Exception | list[Exception]

The exception(s) that occurred during execution

required
Source code in src/guidellm/scheduler/environments.py
async def sync_run_error(self, err: Exception | list[Exception]):
    """
    Store error for later propagation during run finalization.

    :param err: The exception(s) that occurred during execution
    """
    err = [err] if not isinstance(err, list) else err
    self.run_errors.extend(err)

sync_run_params(requests, strategy, constraints) async

Return parameters unchanged for single-node execution.

Parameters:

Name Type Description Default
requests DatasetIterT[RequestT]

Requests to process locally

required
strategy SchedulingStrategy

Scheduling strategy to apply during execution

required
constraints dict[str, Constraint]

Runtime constraints to enforce during execution

required

Returns:

Type Description
tuple[DatasetIterT[RequestT], SchedulingStrategy, dict[str, Constraint]]

Original (requests, strategy, constraints) tuple unchanged

Source code in src/guidellm/scheduler/environments.py
async def sync_run_params(
    self,
    requests: DatasetIterT[RequestT],
    strategy: SchedulingStrategy,
    constraints: dict[str, Constraint],
) -> tuple[
    DatasetIterT[RequestT],
    SchedulingStrategy,
    dict[str, Constraint],
]:
    """
    Return parameters unchanged for single-node execution.

    :param requests: Requests to process locally
    :param strategy: Scheduling strategy to apply during execution
    :param constraints: Runtime constraints to enforce during execution
    :return: Original (requests, strategy, constraints) tuple unchanged
    """
    return requests, strategy, constraints

sync_run_start() async

Return current time plus configured delay for single-node startup.

Returns:

Type Description
float

Unix timestamp when execution should begin

Source code in src/guidellm/scheduler/environments.py
async def sync_run_start(self) -> float:
    """
    Return current time plus configured delay for single-node startup.

    :return: Unix timestamp when execution should begin
    """
    return time.time() + settings.scheduler_start_delay_non_distributed

update_run_iteration(response, request, request_info, state) async

No-op for single-node execution with no distributed state synchronization.

Parameters:

Name Type Description Default
response ResponseT | None

Response generated for the request, if successful

required
request RequestT

The processed request

required
request_info RequestInfo

Metadata about request processing including timings

required
state SchedulerState

Current scheduler state with metrics and progress

required
Source code in src/guidellm/scheduler/environments.py
async def update_run_iteration(
    self,
    response: ResponseT | None,
    request: RequestT,
    request_info: RequestInfo,
    state: SchedulerState,
):
    """
    No-op for single-node execution with no distributed state synchronization.

    :param response: Response generated for the request, if successful
    :param request: The processed request
    :param request_info: Metadata about request processing including timings
    :param state: Current scheduler state with metrics and progress
    """

OverSaturationConstraint

Bases: Constraint

Constraint that detects and stops execution when over-saturation is detected.

This constraint implements the Over-Saturation Detection (OSD) algorithm to identify when a model becomes over-saturated (response rate doesn't keep up with request rate). When over-saturation is detected, the constraint stops request queuing and optionally stops processing of existing requests.

The constraint maintains internal state for tracking concurrent requests and time-to-first-token (TTFT) metrics, using statistical slope detection to identify performance degradation patterns.

Source code in src/guidellm/scheduler/constraints/saturation.py
class OverSaturationConstraint(Constraint):
    """
    Constraint that detects and stops execution when over-saturation is detected.

    This constraint implements the Over-Saturation Detection (OSD) algorithm to
    identify when a model becomes over-saturated (response rate doesn't keep up with
    request rate). When over-saturation is detected, the constraint stops request
    queuing and optionally stops processing of existing requests.

    The constraint maintains internal state for tracking concurrent requests and
    time-to-first-token (TTFT) metrics, using statistical slope detection to identify
    performance degradation patterns.
    """

    def __init__(
        self,
        minimum_duration: float = 30.0,
        minimum_ttft: float = 2.5,
        maximum_window_seconds: float = 120.0,
        moe_threshold: float = 2.0,
        maximum_window_ratio: float = 0.75,
        minimum_window_size: int = 5,
        confidence: float = 0.95,
        eps: float = 1e-12,
        mode: Literal["enforce", "monitor"] = "enforce",
    ) -> None:  # noqa: PLR0913
        """
        Initialize the over-saturation constraint.

        Creates a new constraint instance with specified detection parameters.
        The constraint will track concurrent requests and TTFT metrics, using
        statistical slope detection to identify when the model becomes
        over-saturated. All parameters have sensible defaults suitable for
        most benchmarking scenarios.

        :param minimum_duration: Minimum seconds before checking for over-saturation
            (default: 30.0)
        :param minimum_ttft: Minimum TTFT threshold in seconds for violation counting
            (default: 2.5)
        :param maximum_window_seconds: Maximum time window in seconds for data retention
            (default: 120.0)
        :param moe_threshold: Margin of error threshold for slope detection
            (default: 2.0)
        :param maximum_window_ratio: Maximum window size as ratio of total requests
            (default: 0.75)
        :param minimum_window_size: Minimum data points required for slope estimation
            (default: 5)
        :param confidence: Statistical confidence level for t-distribution (0-1)
            (default: 0.95)
        :param eps: Epsilon for numerical stability in calculations
            (default: 1e-12)
        :param mode: Whether to stop when over-saturation is detected, or only monitor
            (default: "enforce")
        """
        self.minimum_duration = minimum_duration
        self.minimum_ttft = minimum_ttft
        self.maximum_window_seconds = maximum_window_seconds
        self.maximum_window_ratio = maximum_window_ratio
        self.minimum_window_size = minimum_window_size
        self.moe_threshold = moe_threshold
        self.confidence = confidence
        self.eps = eps
        self.mode = mode
        self.reset()

    @property
    def info(self) -> dict[str, Any]:
        """
        Get current constraint configuration and state information.
        :return: Dictionary containing configuration parameters.
        """

        return {
            "type_": "over_saturation",
            "minimum_duration": self.minimum_duration,
            "minimum_ttft": self.minimum_ttft,
            "maximum_window_seconds": self.maximum_window_seconds,
            "maximum_window_ratio": self.maximum_window_ratio,
            "minimum_window_size": self.minimum_window_size,
            "moe_threshold": self.moe_threshold,
            "confidence": self.confidence,
            "mode": self.mode,
        }

    def reset(self) -> None:
        """
        Reset all internal state to initial values.

        Clears all tracked requests, resets counters, and reinitializes slope
        checkers. Useful for reusing constraint instances across multiple
        benchmark runs or resetting state after configuration changes.
        """
        self.duration = 0.0
        self.started_requests: list[dict[str, Any]] = []
        self.finished_requests: list[dict[str, Any]] = []
        self.ttft_violations_counter = 0
        self.total_finished_ever = 0
        self.total_started_ever = 0
        self._ttft_reported_request_ids: set[str] = set()
        self.concurrent_slope_checker = SlopeChecker(
            moe_threshold=self.moe_threshold, confidence=self.confidence, eps=self.eps
        )
        self.ttft_slope_checker = SlopeChecker(
            moe_threshold=self.moe_threshold, confidence=self.confidence, eps=self.eps
        )

    def _add_finished(self, request: dict[str, Any]) -> None:
        """
        Add a finished request to tracking.

        :param request: Dictionary containing request data with 'ttft' and
            'duration' keys.
        """
        ttft = request["ttft"]
        duration = request["duration"]
        if ttft is not None:
            self.total_finished_ever += 1
            self.finished_requests.append(request)
            if ttft > self.minimum_ttft:
                self.ttft_violations_counter += 1
            self.ttft_slope_checker.add_data_point(duration, ttft)

    def _remove_finished(self, request: dict[str, Any]) -> None:
        """
        Remove a finished request from tracking.

        :param request: Dictionary containing request data with 'ttft' and
            'duration' keys.
        """
        del self.finished_requests[0]
        ttft = request["ttft"]
        duration = request["duration"]
        if ttft > self.minimum_ttft:
            self.ttft_violations_counter -= 1
        self.ttft_slope_checker.remove_data_point(duration, ttft)

    def _add_started(self, request: dict[str, Any]) -> None:
        """
        Add a started request to tracking.

        :param request: Dictionary containing request data with
            'concurrent_requests' and 'duration' keys.
        """
        concurrent = request["concurrent_requests"]
        duration = request["duration"]
        if concurrent is not None:
            self.total_started_ever += 1
            self.started_requests.append(request)
            self.concurrent_slope_checker.add_data_point(duration, concurrent)

    def _remove_started(self, request: dict[str, Any]) -> None:
        """
        Remove a started request from tracking.

        :param request: Dictionary containing request data with
            'concurrent_requests' and 'duration' keys.
        """
        del self.started_requests[0]
        concurrent = request["concurrent_requests"]
        duration = request["duration"]
        self.concurrent_slope_checker.remove_data_point(duration, concurrent)

    def _update_duration(self, duration: float) -> None:
        """
        Update duration and prune old data points.

        Updates the current duration and removes data points that exceed the maximum
        window size (by ratio or time) to maintain bounded memory usage.

        :param duration: Current duration in seconds since benchmark start.
        """
        self.duration = duration

        maximum_finished_window_size = int(
            self.total_finished_ever * self.maximum_window_ratio
        )
        while len(self.finished_requests) > maximum_finished_window_size:
            self._remove_finished(self.finished_requests[0])

        while (len(self.finished_requests) > 0) and (
            (
                time_since_earliest_request := duration
                - self.finished_requests[0]["duration"]
            )
            > self.maximum_window_seconds
        ):
            self._remove_finished(self.finished_requests[0])

        maximum_started_window_size = int(
            self.total_started_ever * self.maximum_window_ratio
        )
        while len(self.started_requests) > maximum_started_window_size:
            self._remove_started(self.started_requests[0])

        while (len(self.started_requests) > 0) and (
            (
                time_since_earliest_request := duration  # noqa: F841
                - self.started_requests[0]["duration"]
            )
            > self.maximum_window_seconds
        ):
            self._remove_started(self.started_requests[0])

    def _check_alert(self) -> bool:
        """
        Check if over-saturation is currently detected.

        :return: True if over-saturation is detected, False otherwise.
        """
        # Use duration as the maximum n value since requests from the
        # same second are highly correlated, this is simple and good enough
        # given that the MOE has a custom threshold anyway.
        concurrent_n = min(self.duration, self.concurrent_slope_checker.n)
        ttft_n = min(self.duration, self.ttft_slope_checker.n)

        if (
            (self.duration < self.minimum_duration)
            or (self.ttft_slope_checker.n > self.ttft_violations_counter * 2)
            or (self.duration < self.minimum_ttft)
            or (concurrent_n < self.minimum_window_size)
        ):
            return False

        is_concurrent_slope_positive = self.concurrent_slope_checker.check_slope(
            concurrent_n
        )

        if ttft_n < self.minimum_window_size:
            return is_concurrent_slope_positive

        is_ttft_slope_positive = self.ttft_slope_checker.check_slope(ttft_n)

        return is_concurrent_slope_positive and is_ttft_slope_positive

    def __call__(
        self, state: SchedulerState, request_info: RequestInfo
    ) -> SchedulerUpdateAction:
        """
        Evaluate constraint against current scheduler state.

        :param state: Current scheduler state.
        :param request_info: Individual request information.
        :return: Action indicating whether to continue or stop operations.
        """
        duration = time.time() - state.start_time

        if request_info.status == "in_progress":
            concurrent_requests = state.processing_requests
            self._add_started(
                {"concurrent_requests": concurrent_requests, "duration": duration}
            )
        elif request_info.status in ("first_token", "completed"):
            if (
                request_info.request_id not in self._ttft_reported_request_ids
                and request_info.timings
                and request_info.timings.first_token_iteration
                and request_info.timings.request_start
            ):
                self._ttft_reported_request_ids.add(request_info.request_id)
                ttft = (
                    request_info.timings.first_token_iteration
                    - request_info.timings.request_start
                )
                self._add_finished({"ttft": ttft, "duration": duration})

        self._update_duration(duration)
        is_over_saturated = self._check_alert()

        ttft_slope = self.ttft_slope_checker.slope
        ttft_slope_moe = self.ttft_slope_checker.margin_of_error
        ttft_n = self.ttft_slope_checker.n
        ttft_violations = self.ttft_violations_counter
        concurrent_slope = self.concurrent_slope_checker.slope
        concurrent_slope_moe = self.concurrent_slope_checker.margin_of_error
        concurrent_n = self.concurrent_slope_checker.n

        should_stop = is_over_saturated and self.mode == "enforce"
        return SchedulerUpdateAction(
            request_queuing="stop" if should_stop else "continue",
            request_processing="stop_all" if should_stop else "continue",
            metadata={
                "ttft_slope": ttft_slope,
                "ttft_slope_moe": ttft_slope_moe,
                "ttft_n": ttft_n,
                "ttft_violations": ttft_violations,
                "concurrent_slope": concurrent_slope,
                "concurrent_slope_moe": concurrent_slope_moe,
                "concurrent_n": concurrent_n,
                "is_over_saturated": is_over_saturated,
            },
        )

info property

Get current constraint configuration and state information.

Returns:

Type Description
dict[str, Any]

Dictionary containing configuration parameters.

__call__(state, request_info)

Evaluate constraint against current scheduler state.

Parameters:

Name Type Description Default
state SchedulerState

Current scheduler state.

required
request_info RequestInfo

Individual request information.

required

Returns:

Type Description
SchedulerUpdateAction

Action indicating whether to continue or stop operations.

Source code in src/guidellm/scheduler/constraints/saturation.py
def __call__(
    self, state: SchedulerState, request_info: RequestInfo
) -> SchedulerUpdateAction:
    """
    Evaluate constraint against current scheduler state.

    :param state: Current scheduler state.
    :param request_info: Individual request information.
    :return: Action indicating whether to continue or stop operations.
    """
    duration = time.time() - state.start_time

    if request_info.status == "in_progress":
        concurrent_requests = state.processing_requests
        self._add_started(
            {"concurrent_requests": concurrent_requests, "duration": duration}
        )
    elif request_info.status in ("first_token", "completed"):
        if (
            request_info.request_id not in self._ttft_reported_request_ids
            and request_info.timings
            and request_info.timings.first_token_iteration
            and request_info.timings.request_start
        ):
            self._ttft_reported_request_ids.add(request_info.request_id)
            ttft = (
                request_info.timings.first_token_iteration
                - request_info.timings.request_start
            )
            self._add_finished({"ttft": ttft, "duration": duration})

    self._update_duration(duration)
    is_over_saturated = self._check_alert()

    ttft_slope = self.ttft_slope_checker.slope
    ttft_slope_moe = self.ttft_slope_checker.margin_of_error
    ttft_n = self.ttft_slope_checker.n
    ttft_violations = self.ttft_violations_counter
    concurrent_slope = self.concurrent_slope_checker.slope
    concurrent_slope_moe = self.concurrent_slope_checker.margin_of_error
    concurrent_n = self.concurrent_slope_checker.n

    should_stop = is_over_saturated and self.mode == "enforce"
    return SchedulerUpdateAction(
        request_queuing="stop" if should_stop else "continue",
        request_processing="stop_all" if should_stop else "continue",
        metadata={
            "ttft_slope": ttft_slope,
            "ttft_slope_moe": ttft_slope_moe,
            "ttft_n": ttft_n,
            "ttft_violations": ttft_violations,
            "concurrent_slope": concurrent_slope,
            "concurrent_slope_moe": concurrent_slope_moe,
            "concurrent_n": concurrent_n,
            "is_over_saturated": is_over_saturated,
        },
    )

__init__(minimum_duration=30.0, minimum_ttft=2.5, maximum_window_seconds=120.0, moe_threshold=2.0, maximum_window_ratio=0.75, minimum_window_size=5, confidence=0.95, eps=1e-12, mode='enforce')

Initialize the over-saturation constraint.

Creates a new constraint instance with specified detection parameters. The constraint will track concurrent requests and TTFT metrics, using statistical slope detection to identify when the model becomes over-saturated. All parameters have sensible defaults suitable for most benchmarking scenarios.

Parameters:

Name Type Description Default
minimum_duration float

Minimum seconds before checking for over-saturation (default: 30.0)

30.0
minimum_ttft float

Minimum TTFT threshold in seconds for violation counting (default: 2.5)

2.5
maximum_window_seconds float

Maximum time window in seconds for data retention (default: 120.0)

120.0
moe_threshold float

Margin of error threshold for slope detection (default: 2.0)

2.0
maximum_window_ratio float

Maximum window size as ratio of total requests (default: 0.75)

0.75
minimum_window_size int

Minimum data points required for slope estimation (default: 5)

5
confidence float

Statistical confidence level for t-distribution (0-1) (default: 0.95)

0.95
eps float

Epsilon for numerical stability in calculations (default: 1e-12)

1e-12
mode Literal['enforce', 'monitor']

Whether to stop when over-saturation is detected, or only monitor (default: "enforce")

'enforce'
Source code in src/guidellm/scheduler/constraints/saturation.py
def __init__(
    self,
    minimum_duration: float = 30.0,
    minimum_ttft: float = 2.5,
    maximum_window_seconds: float = 120.0,
    moe_threshold: float = 2.0,
    maximum_window_ratio: float = 0.75,
    minimum_window_size: int = 5,
    confidence: float = 0.95,
    eps: float = 1e-12,
    mode: Literal["enforce", "monitor"] = "enforce",
) -> None:  # noqa: PLR0913
    """
    Initialize the over-saturation constraint.

    Creates a new constraint instance with specified detection parameters.
    The constraint will track concurrent requests and TTFT metrics, using
    statistical slope detection to identify when the model becomes
    over-saturated. All parameters have sensible defaults suitable for
    most benchmarking scenarios.

    :param minimum_duration: Minimum seconds before checking for over-saturation
        (default: 30.0)
    :param minimum_ttft: Minimum TTFT threshold in seconds for violation counting
        (default: 2.5)
    :param maximum_window_seconds: Maximum time window in seconds for data retention
        (default: 120.0)
    :param moe_threshold: Margin of error threshold for slope detection
        (default: 2.0)
    :param maximum_window_ratio: Maximum window size as ratio of total requests
        (default: 0.75)
    :param minimum_window_size: Minimum data points required for slope estimation
        (default: 5)
    :param confidence: Statistical confidence level for t-distribution (0-1)
        (default: 0.95)
    :param eps: Epsilon for numerical stability in calculations
        (default: 1e-12)
    :param mode: Whether to stop when over-saturation is detected, or only monitor
        (default: "enforce")
    """
    self.minimum_duration = minimum_duration
    self.minimum_ttft = minimum_ttft
    self.maximum_window_seconds = maximum_window_seconds
    self.maximum_window_ratio = maximum_window_ratio
    self.minimum_window_size = minimum_window_size
    self.moe_threshold = moe_threshold
    self.confidence = confidence
    self.eps = eps
    self.mode = mode
    self.reset()

reset()

Reset all internal state to initial values.

Clears all tracked requests, resets counters, and reinitializes slope checkers. Useful for reusing constraint instances across multiple benchmark runs or resetting state after configuration changes.

Source code in src/guidellm/scheduler/constraints/saturation.py
def reset(self) -> None:
    """
    Reset all internal state to initial values.

    Clears all tracked requests, resets counters, and reinitializes slope
    checkers. Useful for reusing constraint instances across multiple
    benchmark runs or resetting state after configuration changes.
    """
    self.duration = 0.0
    self.started_requests: list[dict[str, Any]] = []
    self.finished_requests: list[dict[str, Any]] = []
    self.ttft_violations_counter = 0
    self.total_finished_ever = 0
    self.total_started_ever = 0
    self._ttft_reported_request_ids: set[str] = set()
    self.concurrent_slope_checker = SlopeChecker(
        moe_threshold=self.moe_threshold, confidence=self.confidence, eps=self.eps
    )
    self.ttft_slope_checker = SlopeChecker(
        moe_threshold=self.moe_threshold, confidence=self.confidence, eps=self.eps
    )

OverSaturationConstraintArgs

Bases: ConstraintArgs

Arguments for over-saturation detection constraint.

Detects when a model becomes over-saturated using statistical slope analysis of concurrent requests and time-to-first-token metrics.

Attributes:

Name Type Description
kind Literal['over_saturation']

Always "over_saturation"

Source code in src/guidellm/scheduler/constraints/saturation.py
@ConstraintArgs.register("over_saturation")
class OverSaturationConstraintArgs(ConstraintArgs):
    """
    Arguments for over-saturation detection constraint.

    Detects when a model becomes over-saturated using statistical slope analysis
    of concurrent requests and time-to-first-token metrics.

    :cvar kind: Always "over_saturation"
    """

    kind: Literal["over_saturation"] = Field(
        default="over_saturation",
        description="Constraint type discriminator",
    )
    mode: Literal["enforce", "monitor"] = Field(
        default="enforce",
        description=(
            "Whether to stop the benchmark if over-saturation is detected. "
            "Set to `enforce` to stop the benchmark if over-saturation is "
            "detected, and `monitor` to only report over-saturation."
        ),
    )
    min_seconds: int | float = Field(
        default=30.0,
        ge=0,
        description="Minimum seconds before checking for over-saturation",
    )
    max_window_seconds: int | float = Field(
        default=120.0,
        ge=0,
        description="Maximum over-saturation checking window size in seconds",
    )
    moe_threshold: float = Field(
        default=2.0,
        ge=0,
        description="Margin of error threshold for slope detection",
    )
    minimum_ttft: float = Field(
        default=2.5,
        ge=0,
        description="Minimum TTFT threshold for violation counting",
    )
    maximum_window_ratio: float = Field(
        default=0.75,
        ge=0,
        le=1.0,
        description="Maximum window size as ratio of total requests",
    )
    minimum_window_size: int = Field(
        default=5,
        ge=0,
        description="Minimum data points required for slope estimation",
    )
    confidence: float = Field(
        default=0.95,
        ge=0,
        le=1.0,
        description="Statistical confidence level for t-distribution",
    )

    @property
    def constraint_key(self) -> str:
        return "over_saturation"

OverSaturationConstraintInitializer

Bases: PydanticConstraintInitializer

Factory for creating OverSaturationConstraint instances from configuration.

Stores an OverSaturationConstraintArgs instance and delegates to OverSaturationConstraint in create_constraint().

Example: ::

from guidellm.scheduler.constraints import OverSaturationConstraintArgs

args = OverSaturationConstraintArgs(mode="enforce", min_seconds=60.0)
initializer = OverSaturationConstraintInitializer(args=args)
constraint = initializer.create_constraint()
Source code in src/guidellm/scheduler/constraints/saturation.py
@ConstraintsInitializerFactory.register("over_saturation")
class OverSaturationConstraintInitializer(PydanticConstraintInitializer):
    """
    Factory for creating OverSaturationConstraint instances from configuration.

    Stores an ``OverSaturationConstraintArgs`` instance and delegates to
    ``OverSaturationConstraint`` in ``create_constraint()``.

    Example:
    ::

        from guidellm.scheduler.constraints import OverSaturationConstraintArgs

        args = OverSaturationConstraintArgs(mode="enforce", min_seconds=60.0)
        initializer = OverSaturationConstraintInitializer(args=args)
        constraint = initializer.create_constraint()
    """

    type_: Literal["over_saturation"] = "over_saturation"  # type: ignore[assignment]
    args: OverSaturationConstraintArgs = Field(
        default_factory=OverSaturationConstraintArgs,
        description="Configuration arguments for over-saturation detection",
    )

    def create_constraint(self, **_kwargs) -> Constraint:
        """
        Create an OverSaturationConstraint instance from stored args.

        :param _kwargs: Additional keyword arguments (unused)
        :return: Configured OverSaturationConstraint instance ready for use
        """
        return OverSaturationConstraint(
            minimum_duration=self.args.min_seconds,
            minimum_ttft=self.args.minimum_ttft,
            maximum_window_seconds=self.args.max_window_seconds,
            moe_threshold=self.args.moe_threshold,
            maximum_window_ratio=self.args.maximum_window_ratio,
            minimum_window_size=self.args.minimum_window_size,
            confidence=self.args.confidence,
            mode=self.args.mode,
        )

create_constraint(**_kwargs)

Create an OverSaturationConstraint instance from stored args.

Parameters:

Name Type Description Default
_kwargs

Additional keyword arguments (unused)

{}

Returns:

Type Description
Constraint

Configured OverSaturationConstraint instance ready for use

Source code in src/guidellm/scheduler/constraints/saturation.py
def create_constraint(self, **_kwargs) -> Constraint:
    """
    Create an OverSaturationConstraint instance from stored args.

    :param _kwargs: Additional keyword arguments (unused)
    :return: Configured OverSaturationConstraint instance ready for use
    """
    return OverSaturationConstraint(
        minimum_duration=self.args.min_seconds,
        minimum_ttft=self.args.minimum_ttft,
        maximum_window_seconds=self.args.max_window_seconds,
        moe_threshold=self.args.moe_threshold,
        maximum_window_ratio=self.args.maximum_window_ratio,
        minimum_window_size=self.args.minimum_window_size,
        confidence=self.args.confidence,
        mode=self.args.mode,
    )

PydanticConstraintInitializer

Bases: StandardBaseModel, ABC, InfoMixin

Abstract base for Pydantic-based constraint initializers.

Provides standardized serialization, validation, and metadata handling for constraint initializers using Pydantic models. Subclasses implement specific constraint creation logic while inheriting validation and persistence support. Integrates with the constraint factory system for dynamic instantiation and configuration management.

Example: :: @ConstraintsInitializerFactory.register("max_duration") class MaxDurationConstraintInitializer(PydanticConstraintInitializer): type_: str = "max_duration" max_seconds: float = Field(description="Maximum duration in seconds")

    def create_constraint(self) -> Constraint:
        def evaluate(state, request):
            if time.time() - state.start_time > self.max_seconds:
                return SchedulerUpdateAction(request_queuing="stop")
            return SchedulerUpdateAction(request_queuing="continue")
        return evaluate

Attributes:

Name Type Description
type_ str

Type identifier for the constraint initializer

Source code in src/guidellm/scheduler/constraints/constraint.py
class PydanticConstraintInitializer(StandardBaseModel, ABC, InfoMixin):
    """
    Abstract base for Pydantic-based constraint initializers.

    Provides standardized serialization, validation, and metadata handling for
    constraint initializers using Pydantic models. Subclasses implement specific
    constraint creation logic while inheriting validation and persistence support.
    Integrates with the constraint factory system for dynamic instantiation and
    configuration management.

    Example:
    ::
        @ConstraintsInitializerFactory.register("max_duration")
        class MaxDurationConstraintInitializer(PydanticConstraintInitializer):
            type_: str = "max_duration"
            max_seconds: float = Field(description="Maximum duration in seconds")

            def create_constraint(self) -> Constraint:
                def evaluate(state, request):
                    if time.time() - state.start_time > self.max_seconds:
                        return SchedulerUpdateAction(request_queuing="stop")
                    return SchedulerUpdateAction(request_queuing="continue")
                return evaluate

    :cvar type_: Type identifier for the constraint initializer
    """

    type_: str = Field(description="Type identifier for the constraint initializer")

    @property
    def info(self) -> dict[str, Any]:
        """
        Extract serializable information from this constraint initializer.

        :return: Dictionary containing constraint configuration and metadata
        """
        return self.model_dump()

    @abstractmethod
    def create_constraint(self, **kwargs) -> Constraint:
        """
        Create a constraint instance.

        Must be implemented by subclasses to return their specific constraint type
        with appropriate configuration and validation. The returned constraint should
        be ready for evaluation against scheduler state and requests.

        :param kwargs: Additional keyword arguments (usually unused)
        :return: Configured constraint instance
        :raises NotImplementedError: Must be implemented by subclasses
        """
        ...

info property

Extract serializable information from this constraint initializer.

Returns:

Type Description
dict[str, Any]

Dictionary containing constraint configuration and metadata

create_constraint(**kwargs) abstractmethod

Create a constraint instance.

Must be implemented by subclasses to return their specific constraint type with appropriate configuration and validation. The returned constraint should be ready for evaluation against scheduler state and requests.

Parameters:

Name Type Description Default
kwargs

Additional keyword arguments (usually unused)

{}

Returns:

Type Description
Constraint

Configured constraint instance

Raises:

Type Description
NotImplementedError

Must be implemented by subclasses

Source code in src/guidellm/scheduler/constraints/constraint.py
@abstractmethod
def create_constraint(self, **kwargs) -> Constraint:
    """
    Create a constraint instance.

    Must be implemented by subclasses to return their specific constraint type
    with appropriate configuration and validation. The returned constraint should
    be ready for evaluation against scheduler state and requests.

    :param kwargs: Additional keyword arguments (usually unused)
    :return: Configured constraint instance
    :raises NotImplementedError: Must be implemented by subclasses
    """
    ...

Scheduler

Bases: Generic[RequestT, ResponseT], ThreadSafeSingletonMixin

Thread-safe singleton scheduler for distributed benchmarking workload coordination.

Orchestrates request processing across worker processes with distributed timing coordination, constraint enforcement, and result aggregation. Abstracts the complexity of multi-process coordination, environment synchronization, and resource management while providing a unified interface for executing benchmarking operations. Implements singleton pattern to ensure consistent execution state.

Example: :: from guidellm.scheduler import Scheduler from guidellm.scheduler import NonDistributedEnvironment, SynchronousStrategy

scheduler = Scheduler()
async for response, request, info, state in scheduler.run(
    requests=request_list,
    backend=backend,
    strategy=SynchronousStrategy(),
    env=NonDistributedEnvironment(),
    max_requests=1000
):
    print(f"Processed: {request}")
Source code in src/guidellm/scheduler/scheduler.py
class Scheduler(
    Generic[RequestT, ResponseT],
    ThreadSafeSingletonMixin,
):
    """
    Thread-safe singleton scheduler for distributed benchmarking workload coordination.

    Orchestrates request processing across worker processes with distributed timing
    coordination, constraint enforcement, and result aggregation. Abstracts the
    complexity of multi-process coordination, environment synchronization, and
    resource management while providing a unified interface for executing benchmarking
    operations. Implements singleton pattern to ensure consistent execution state.

    Example:
    ::
        from guidellm.scheduler import Scheduler
        from guidellm.scheduler import NonDistributedEnvironment, SynchronousStrategy

        scheduler = Scheduler()
        async for response, request, info, state in scheduler.run(
            requests=request_list,
            backend=backend,
            strategy=SynchronousStrategy(),
            env=NonDistributedEnvironment(),
            max_requests=1000
        ):
            print(f"Processed: {request}")
    """

    async def run(
        self,
        requests: DatasetIterT[RequestT],
        backend: BackendInterface[RequestT, ResponseT],
        strategy: SchedulingStrategy,
        env: Environment[RequestT, ResponseT] | None,
        **constraints: Constraint | ConstraintInitializer,
    ) -> AsyncIterator[
        tuple[
            ResponseT | None,
            RequestT,
            RequestInfo,
            SchedulerState,
        ]
    ]:
        """
        Execute distributed request processing with coordinated timing and constraints.

        Orchestrates the complete benchmarking workflow across worker processes with
        environment synchronization, constraint enforcement, and error handling. Manages
        resource lifecycle from initialization through cleanup while yielding real-time
        processing updates for monitoring and aggregation.

        :param requests: Request collection to process, supporting single requests or
            multi-turn sequences with optional inter-request delays
        :param backend: Backend interface for request processing and response generation
        :param strategy: Scheduling strategy controlling request timing and distribution
        :param env: Environment interface for distributed coordination and
            synchronization. Defaults to NonDistributedEnvironment if None
        :param constraints: Runtime constraints for execution control (max_requests,
            max_duration, max_error_rate, etc.) as primitives, dictionaries, or
            constraint instances
        :yields: Request updates as (response, request, request_info, scheduler_state)
            tuples. Each request generates three ordered updates: queued, in_progress,
            completed | errored | cancelled
        :raises Exception: Worker process errors, environment synchronization failures,
            or constraint evaluation errors are propagated after cleanup
        """
        with self.thread_lock:
            if env is None:
                env = NonDistributedEnvironment[RequestT, ResponseT]()

            worker_group: WorkerProcessGroup[RequestT, ResponseT] | None = None

            # Any issues during the run will raise an error (local or remote),
            # be caught and passed to the environment,
            # and will ensure clean up before raising the error.
            try:
                # Setup local run parameters, sync with the environment
                resolved_constraints = ConstraintsInitializerFactory.resolve(
                    constraints
                )
                (
                    local_requests,
                    local_strategy,
                    local_constraints,
                ) = await env.sync_run_params(requests, strategy, resolved_constraints)

                # Setup the worker group, sync start with the environment
                worker_group = WorkerProcessGroup[RequestT, ResponseT](
                    requests=local_requests,
                    backend=backend,
                    strategy=local_strategy,
                    **local_constraints,
                )
                await worker_group.create_processes()
                local_start_time = await env.sync_run_start()
                await worker_group.start(local_start_time)

                # Yield any updates and sync with the environment for non-local updates
                async for (
                    response,
                    request,
                    request_info,
                    state,
                ) in worker_group.request_updates():
                    await env.update_run_iteration(
                        response, request, request_info, state
                    )
                    yield response, request, request_info, state
            except Exception as err:  # noqa: BLE001
                await env.sync_run_error(err)
                raise err
            finally:
                # Ensure all worker processes are cleaned up for error or completion
                if worker_group is not None:
                    err = await worker_group.shutdown()  # type: ignore[misc]
                    if err is not None:
                        await env.sync_run_error(err)

            # Ensure any errors are raised and all responses
            # are yielded for aggregation on the primary node
            async for (
                dist_response,
                dist_request,
                dist_request_info,
                dist_state,
            ) in env.sync_run_end():
                yield dist_response, dist_request, dist_request_info, dist_state

run(requests, backend, strategy, env, **constraints) async

Execute distributed request processing with coordinated timing and constraints.

Orchestrates the complete benchmarking workflow across worker processes with environment synchronization, constraint enforcement, and error handling. Manages resource lifecycle from initialization through cleanup while yielding real-time processing updates for monitoring and aggregation.

:yields: Request updates as (response, request, request_info, scheduler_state) tuples. Each request generates three ordered updates: queued, in_progress, completed | errored | cancelled

Parameters:

Name Type Description Default
requests DatasetIterT[RequestT]

Request collection to process, supporting single requests or multi-turn sequences with optional inter-request delays

required
backend BackendInterface[RequestT, ResponseT]

Backend interface for request processing and response generation

required
strategy SchedulingStrategy

Scheduling strategy controlling request timing and distribution

required
env Environment[RequestT, ResponseT] | None

Environment interface for distributed coordination and synchronization. Defaults to NonDistributedEnvironment if None

required
constraints Constraint | ConstraintInitializer

Runtime constraints for execution control (max_requests, max_duration, max_error_rate, etc.) as primitives, dictionaries, or constraint instances

{}

Raises:

Type Description
Exception

Worker process errors, environment synchronization failures, or constraint evaluation errors are propagated after cleanup

Source code in src/guidellm/scheduler/scheduler.py
async def run(
    self,
    requests: DatasetIterT[RequestT],
    backend: BackendInterface[RequestT, ResponseT],
    strategy: SchedulingStrategy,
    env: Environment[RequestT, ResponseT] | None,
    **constraints: Constraint | ConstraintInitializer,
) -> AsyncIterator[
    tuple[
        ResponseT | None,
        RequestT,
        RequestInfo,
        SchedulerState,
    ]
]:
    """
    Execute distributed request processing with coordinated timing and constraints.

    Orchestrates the complete benchmarking workflow across worker processes with
    environment synchronization, constraint enforcement, and error handling. Manages
    resource lifecycle from initialization through cleanup while yielding real-time
    processing updates for monitoring and aggregation.

    :param requests: Request collection to process, supporting single requests or
        multi-turn sequences with optional inter-request delays
    :param backend: Backend interface for request processing and response generation
    :param strategy: Scheduling strategy controlling request timing and distribution
    :param env: Environment interface for distributed coordination and
        synchronization. Defaults to NonDistributedEnvironment if None
    :param constraints: Runtime constraints for execution control (max_requests,
        max_duration, max_error_rate, etc.) as primitives, dictionaries, or
        constraint instances
    :yields: Request updates as (response, request, request_info, scheduler_state)
        tuples. Each request generates three ordered updates: queued, in_progress,
        completed | errored | cancelled
    :raises Exception: Worker process errors, environment synchronization failures,
        or constraint evaluation errors are propagated after cleanup
    """
    with self.thread_lock:
        if env is None:
            env = NonDistributedEnvironment[RequestT, ResponseT]()

        worker_group: WorkerProcessGroup[RequestT, ResponseT] | None = None

        # Any issues during the run will raise an error (local or remote),
        # be caught and passed to the environment,
        # and will ensure clean up before raising the error.
        try:
            # Setup local run parameters, sync with the environment
            resolved_constraints = ConstraintsInitializerFactory.resolve(
                constraints
            )
            (
                local_requests,
                local_strategy,
                local_constraints,
            ) = await env.sync_run_params(requests, strategy, resolved_constraints)

            # Setup the worker group, sync start with the environment
            worker_group = WorkerProcessGroup[RequestT, ResponseT](
                requests=local_requests,
                backend=backend,
                strategy=local_strategy,
                **local_constraints,
            )
            await worker_group.create_processes()
            local_start_time = await env.sync_run_start()
            await worker_group.start(local_start_time)

            # Yield any updates and sync with the environment for non-local updates
            async for (
                response,
                request,
                request_info,
                state,
            ) in worker_group.request_updates():
                await env.update_run_iteration(
                    response, request, request_info, state
                )
                yield response, request, request_info, state
        except Exception as err:  # noqa: BLE001
            await env.sync_run_error(err)
            raise err
        finally:
            # Ensure all worker processes are cleaned up for error or completion
            if worker_group is not None:
                err = await worker_group.shutdown()  # type: ignore[misc]
                if err is not None:
                    await env.sync_run_error(err)

        # Ensure any errors are raised and all responses
        # are yielded for aggregation on the primary node
        async for (
            dist_response,
            dist_request,
            dist_request_info,
            dist_state,
        ) in env.sync_run_end():
            yield dist_response, dist_request, dist_request_info, dist_state

SchedulerMessagingPydanticRegistry

Bases: RegistryMixin[RegistryObjT]

Registry for Pydantic types used in scheduler inter-process messaging.

Enables generic interface for defining Pydantic class types used for communication between distributed scheduler components and worker processes.

Source code in src/guidellm/scheduler/schemas.py
class SchedulerMessagingPydanticRegistry(RegistryMixin[RegistryObjT]):
    """
    Registry for Pydantic types used in scheduler inter-process messaging.

    Enables generic interface for defining Pydantic class types used for
    communication between distributed scheduler components and worker processes.
    """

SchedulerProgress

Bases: StandardBaseModel

Progress tracking data for scheduler operations.

Provides estimates for remaining work in scheduler operations, including fraction complete, request counts, and duration. Used by constraints and monitoring systems to track execution progress and make termination decisions.

Source code in src/guidellm/scheduler/schemas.py
class SchedulerProgress(StandardBaseModel):
    """
    Progress tracking data for scheduler operations.

    Provides estimates for remaining work in scheduler operations, including
    fraction complete, request counts, and duration. Used by constraints and
    monitoring systems to track execution progress and make termination decisions.
    """

    remaining_requests: float | None = Field(
        description="Estimated number of remaining requests to process", default=None
    )
    total_requests: float | None = Field(
        description="Total number of requests to process", default=None
    )
    remaining_duration: float | None = Field(
        description="Estimated remaining duration in seconds", default=None
    )
    total_duration: float | None = Field(
        description="Total duration in seconds to process for", default=None
    )
    stop_time: float | None = Field(
        description="The timestamp the processing stopped at", default=None
    )

    @property
    def remaining_fraction(self) -> float | None:
        """
        :return: Estimated fraction of remaining progress, if known
        """
        fraction: float | None = None

        if (requests_fraction := self.remaining_requests_fraction) is not None:
            fraction = requests_fraction

        if (duration_fraction := self.remaining_duration_fraction) is not None:
            fraction = (
                duration_fraction
                if fraction is None
                else min(fraction, duration_fraction)
            )

        return fraction

    @property
    def remaining_requests_fraction(self) -> float | None:
        """
        :return: Estimated fraction of remaining requests, if known
        """
        return (
            self.remaining_requests / float(self.total_requests)
            if self.remaining_requests is not None
            and self.total_requests is not None
            and self.total_requests > 0
            else None
        )

    @property
    def remaining_duration_fraction(self) -> float | None:
        """
        :return: Estimated fraction of remaining duration, if known
        """
        return (
            self.remaining_duration / float(self.total_duration)
            if self.remaining_duration is not None
            and self.total_duration is not None
            and self.total_duration > 0
            else None
        )

    def combine(self, other: SchedulerProgress) -> SchedulerProgress:
        """
        Combine two progress instances, taking the minimum remaining estimates.

        :param other: Another progress instance to combine with
        :return: New progress instance with combined estimates
        """
        if (other_req_fraction := other.remaining_requests_fraction) is not None and (
            (cur_req_fraction := self.remaining_requests_fraction) is None
            or other_req_fraction < cur_req_fraction
        ):
            # Only update if the other is more advanced (lower fraction)
            self.remaining_requests = other.remaining_requests
            self.total_requests = other.total_requests

        if (other_dur_fraction := other.remaining_duration_fraction) is not None and (
            (cur_dur_fraction := self.remaining_duration_fraction) is None
            or other_dur_fraction < cur_dur_fraction
        ):
            # Only update if the other is more advanced (lower fraction)
            self.remaining_duration = other.remaining_duration
            self.total_duration = other.total_duration

        if other.stop_time is not None and (
            self.stop_time is None or other.stop_time < self.stop_time
        ):
            # Only update if the other has an earlier stop time
            self.stop_time = other.stop_time

        return self

remaining_duration_fraction property

Returns:

Type Description
float | None

Estimated fraction of remaining duration, if known

remaining_fraction property

Returns:

Type Description
float | None

Estimated fraction of remaining progress, if known

remaining_requests_fraction property

Returns:

Type Description
float | None

Estimated fraction of remaining requests, if known

combine(other)

Combine two progress instances, taking the minimum remaining estimates.

Parameters:

Name Type Description Default
other SchedulerProgress

Another progress instance to combine with

required

Returns:

Type Description
SchedulerProgress

New progress instance with combined estimates

Source code in src/guidellm/scheduler/schemas.py
def combine(self, other: SchedulerProgress) -> SchedulerProgress:
    """
    Combine two progress instances, taking the minimum remaining estimates.

    :param other: Another progress instance to combine with
    :return: New progress instance with combined estimates
    """
    if (other_req_fraction := other.remaining_requests_fraction) is not None and (
        (cur_req_fraction := self.remaining_requests_fraction) is None
        or other_req_fraction < cur_req_fraction
    ):
        # Only update if the other is more advanced (lower fraction)
        self.remaining_requests = other.remaining_requests
        self.total_requests = other.total_requests

    if (other_dur_fraction := other.remaining_duration_fraction) is not None and (
        (cur_dur_fraction := self.remaining_duration_fraction) is None
        or other_dur_fraction < cur_dur_fraction
    ):
        # Only update if the other is more advanced (lower fraction)
        self.remaining_duration = other.remaining_duration
        self.total_duration = other.total_duration

    if other.stop_time is not None and (
        self.stop_time is None or other.stop_time < self.stop_time
    ):
        # Only update if the other has an earlier stop time
        self.stop_time = other.stop_time

    return self

SchedulerState

Bases: StandardBaseModel

Comprehensive state tracking for scheduler execution.

Tracks scheduler execution progress, request counts, timing information, and constraint enforcement. Central to scheduler coordination, providing real-time metrics for monitoring and decision-making across distributed worker processes.

Example: :: state = SchedulerState(node_id=0, num_processes=4) state.created_requests += 1 state.queued_requests += 1 completion_rate = state.processed_requests / state.created_requests

Source code in src/guidellm/scheduler/schemas.py
class SchedulerState(StandardBaseModel):
    """
    Comprehensive state tracking for scheduler execution.

    Tracks scheduler execution progress, request counts, timing information,
    and constraint enforcement. Central to scheduler coordination, providing
    real-time metrics for monitoring and decision-making across distributed
    worker processes.

    Example:
    ::
        state = SchedulerState(node_id=0, num_processes=4)
        state.created_requests += 1
        state.queued_requests += 1
        completion_rate = state.processed_requests / state.created_requests
    """

    node_id: int = Field(
        description="Unique identifier for this scheduler node", default=-1
    )
    num_processes: int = Field(
        description="Number of worker processes in this scheduler", default=-1
    )
    start_time: float = Field(
        description="Unix timestamp when the scheduler started",
        default_factory=time.time,
    )
    end_time: float | None = Field(
        default=None, description="Unix timestamp when the scheduler stopped"
    )
    start_requests_time: float | None = Field(
        default=None, description="Unix timestamp of the first sent request"
    )
    end_requests_time: float | None = Field(
        default=None, description="Unix timestamp of the last finalized request"
    )
    end_queuing_time: float | None = Field(
        default=None, description="Unix timestamp when request queuing stopped"
    )
    end_queuing_constraints: dict[str, SchedulerUpdateAction] = Field(
        default_factory=dict,
        description="Constraints that triggered queuing termination",
    )
    end_processing_time: float | None = Field(
        default=None, description="Unix timestamp when request processing stopped"
    )
    end_processing_constraints: dict[str, SchedulerUpdateAction] = Field(
        default_factory=dict,
        description="Constraints that triggered processing termination",
    )
    scheduler_constraints: dict[str, SchedulerUpdateAction] = Field(
        default_factory=dict,
        description="Latest state from all constraints applied during scheduler run",
    )

    progress: SchedulerProgress = Field(
        default_factory=lambda: SchedulerProgress(),
        description="Overall progress information for the scheduler run",
    )

    created_requests: int = Field(
        default=0, description="Total number of requests created"
    )
    queued_requests: int = Field(
        default=0, description="Total number of requests queued for processing"
    )
    pending_requests: int = Field(
        default=0,
        description="Number of requests pending processing within a worker",
    )
    processing_requests: int = Field(
        default=0, description="Number of requests currently being processed"
    )
    processed_requests: int = Field(
        default=0, description="Number of requests that completed processing"
    )
    successful_requests: int = Field(
        default=0, description="Number of requests that completed successfully"
    )
    errored_requests: int = Field(
        default=0, description="Number of requests that failed with errors"
    )
    cancelled_requests: int = Field(
        default=0, description="Number of requests that were cancelled"
    )

SchedulerUpdateAction

Bases: StandardBaseModel

Control directives for scheduler behavior and operations.

Encapsulates control signals for scheduler operations including request queuing and processing directives. Used by constraints to communicate termination conditions and progress to scheduler components.

Example: :: action = SchedulerUpdateAction( request_queuing="stop", request_processing="continue", metadata={"reason": "max_requests_reached"} )

Source code in src/guidellm/scheduler/schemas.py
class SchedulerUpdateAction(StandardBaseModel):
    """
    Control directives for scheduler behavior and operations.

    Encapsulates control signals for scheduler operations including request
    queuing and processing directives. Used by constraints to communicate
    termination conditions and progress to scheduler components.

    Example:
    ::
        action = SchedulerUpdateAction(
            request_queuing="stop",
            request_processing="continue",
            metadata={"reason": "max_requests_reached"}
        )
    """

    request_queuing: Literal["continue", "stop"] = Field(
        default="continue", description="Action to take for request queuing operations"
    )
    request_processing: Literal["continue", "stop_local", "stop_all"] = Field(
        default="continue",
        description="Action to take for request processing operations",
    )
    metadata: dict[str, Any] = Field(
        default_factory=dict,
        description="Additional context and data for the scheduler action",
    )
    progress: SchedulerProgress = Field(
        default_factory=lambda: SchedulerProgress(),
        description="Progress information for the scheduler action",
    )

SchedulingStrategy

Bases: PydanticClassRegistryMixin['SchedulingStrategy'], InfoMixin

Base class for scheduling strategies controlling request processing patterns.

Defines the interface for strategies that combine timing implementations with process and concurrency constraints to enable various benchmark scenarios. Strategies manage request timing, worker process coordination, and concurrency limits across distributed execution environments.

Attributes:

Name Type Description
schema_discriminator str

Field name used for polymorphic deserialization

Source code in src/guidellm/scheduler/strategies.py
class SchedulingStrategy(PydanticClassRegistryMixin["SchedulingStrategy"], InfoMixin):
    """
    Base class for scheduling strategies controlling request processing patterns.

    Defines the interface for strategies that combine timing implementations with
    process and concurrency constraints to enable various benchmark scenarios.
    Strategies manage request timing, worker process coordination, and concurrency
    limits across distributed execution environments.

    :cvar schema_discriminator: Field name used for polymorphic deserialization
    """

    schema_discriminator: ClassVar[str] = "type_"

    @classmethod
    def __pydantic_schema_base_type__(cls) -> type[SchedulingStrategy]:
        if cls.__name__ == "SchedulingStrategy":
            return cls

        return SchedulingStrategy

    type_: Literal["strategy"] = Field(
        description="Scheduling strategy type identifier for polymorphic dispatch",
    )
    worker_count: PositiveInt | None = Field(
        default=None,
        description="Number of worker processes to use for this strategy",
    )
    max_concurrency: PositiveInt | None = Field(
        default=None,
        description="Maximum number of concurrent requests to allow",
    )

    _processes_init_event: synchronize.Event | None = PrivateAttr(None)
    _processes_request_index: Synchronized[int] | None = PrivateAttr(None)
    _processes_start_time: Synchronized[float] | None = PrivateAttr(None)
    _cached_processes_start_time: float | None = PrivateAttr(None)

    @property
    def processes_limit(self) -> PositiveInt | None:
        """
        Get the maximum number of worker processes supported by this strategy.

        :return: Maximum number of worker processes, None if unlimited
        """
        return None

    @property
    def requests_limit(self) -> PositiveInt | None:
        """
        Get the maximum number of concurrent requests supported by this strategy.

        :return: Maximum number of concurrent requests, None if unlimited
        """
        return None

    def init_processes_timings(
        self,
        worker_count: PositiveInt,
        max_concurrency: PositiveInt,
        mp_context: BaseContext,
    ):
        """
        Initialize shared timing state for multi-process coordination.

        Sets up synchronized counters and locks for coordinating request timing
        across distributed worker processes.

        :param worker_count: Number of worker processes to coordinate
        :param max_concurrency: Maximum number of concurrent requests allowed
        """
        self.worker_count = worker_count
        self.max_concurrency = max_concurrency

        self._processes_init_event = mp_context.Event()
        self._processes_request_index = mp_context.Value("i", 0)
        self._processes_start_time = mp_context.Value("d", -1.0)

    def init_processes_start(self, start_time: float):
        """
        Set the synchronized start time for all worker processes.

        Updates shared state with the benchmark start time to coordinate request
        scheduling across all workers.

        :param start_time: Unix timestamp when request processing should begin
        :raises RuntimeError: If called before init_processes_timings
        """
        if self._processes_init_event is None:
            raise RuntimeError(
                "SchedulingStrategy init_processes_start called before "
                "init_processes_timings"
            )
        if self._processes_start_time is None:
            raise RuntimeError(
                "_processes_lock is not None but _processes_start_time is None"
            )

        with self._processes_start_time.get_lock():
            self._processes_start_time.value = start_time
            self._processes_init_event.set()

    async def get_processes_start_time(self) -> float:
        """
        Get the synchronized start time, waiting if not yet set.

        Blocks until the main process sets the start time via init_processes_start,
        enabling synchronized request scheduling across all workers.

        :return: Unix timestamp when request processing began
        :raises RuntimeError: If called before init_processes_timings
        """
        if self._processes_init_event is None:
            raise RuntimeError(
                "SchedulingStrategy get_processes_start_time called before "
                "init_processes_timings"
            )
        if self._processes_start_time is None:
            raise RuntimeError(
                "_processes_lock is not None but _processes_start_time is None"
            )

        if self._cached_processes_start_time is None:
            # Wait for the init event to be set by the main process
            await asyncio.gather(asyncio.to_thread(self._processes_init_event.wait))
            self._cached_processes_start_time = self._processes_start_time.value

        return self._cached_processes_start_time

    def next_request_index(self) -> PositiveInt:
        """
        Get the next sequential request index across all worker processes.

        Thread-safe counter providing globally unique indices for request timing
        calculations in distributed environments.

        :return: Globally unique request index for timing calculations
        :raises RuntimeError: If called before init_processes_timings
        """
        if self._processes_request_index is None:
            raise RuntimeError(
                "SchedulingStrategy next_request_index called before "
                "init_processes_timings"
            )

        with self._processes_request_index.get_lock():
            self._processes_request_index.value += 1
            return self._processes_request_index.value

    @abstractmethod
    async def next_request_time(self, worker_index: NonNegativeInt) -> float:
        """
        Calculate the scheduled start time for the next request.

        Strategy-specific implementation determining when requests should be
        processed based on timing patterns and worker distribution.

        :param worker_index: Worker process index for distributing request timing
        :return: Unix timestamp when the request should be processed
        """

    @abstractmethod
    def request_completed(self, request_info: RequestInfo):
        """
        Handle request completion and update internal timing state.

        Strategy-specific handling of completed requests to maintain timing
        coordination and schedule subsequent requests.

        :param request_info: Completed request metadata including timing details
            and completion status
        """

    async def resolve_dequeued_target_start(
        self,
        worker_index: NonNegativeInt,
        provisional_start: float,
        settings: RequestSettings,
    ) -> float:
        """
        Resolve scheduled start time after dequeue using per-request settings.

        Default returns ``provisional_start`` unchanged. Strategies with
        enqueue-bound timing metadata can override to reinterpret settings.

        :param worker_index: Worker process index handling the request
        :param provisional_start: Start time from the worker's scheduling slot
        :param settings: Per-request scheduling metadata attached at enqueue
        :return: Unix timestamp when the request should begin processing
        """
        _ = (worker_index, settings)
        return provisional_start

    def requeue_delay(self) -> float:
        """
        Calculate delay before requeuing a conversation.

        Default implementation returns zero delay. Strategies can override
        to implement custom requeue timing logic.

        :return: Delay in seconds before the conversation should be requeued.
        """
        return 0.0

processes_limit property

Get the maximum number of worker processes supported by this strategy.

Returns:

Type Description
PositiveInt | None

Maximum number of worker processes, None if unlimited

requests_limit property

Get the maximum number of concurrent requests supported by this strategy.

Returns:

Type Description
PositiveInt | None

Maximum number of concurrent requests, None if unlimited

get_processes_start_time() async

Get the synchronized start time, waiting if not yet set.

Blocks until the main process sets the start time via init_processes_start, enabling synchronized request scheduling across all workers.

Returns:

Type Description
float

Unix timestamp when request processing began

Raises:

Type Description
RuntimeError

If called before init_processes_timings

Source code in src/guidellm/scheduler/strategies.py
async def get_processes_start_time(self) -> float:
    """
    Get the synchronized start time, waiting if not yet set.

    Blocks until the main process sets the start time via init_processes_start,
    enabling synchronized request scheduling across all workers.

    :return: Unix timestamp when request processing began
    :raises RuntimeError: If called before init_processes_timings
    """
    if self._processes_init_event is None:
        raise RuntimeError(
            "SchedulingStrategy get_processes_start_time called before "
            "init_processes_timings"
        )
    if self._processes_start_time is None:
        raise RuntimeError(
            "_processes_lock is not None but _processes_start_time is None"
        )

    if self._cached_processes_start_time is None:
        # Wait for the init event to be set by the main process
        await asyncio.gather(asyncio.to_thread(self._processes_init_event.wait))
        self._cached_processes_start_time = self._processes_start_time.value

    return self._cached_processes_start_time

init_processes_start(start_time)

Set the synchronized start time for all worker processes.

Updates shared state with the benchmark start time to coordinate request scheduling across all workers.

Parameters:

Name Type Description Default
start_time float

Unix timestamp when request processing should begin

required

Raises:

Type Description
RuntimeError

If called before init_processes_timings

Source code in src/guidellm/scheduler/strategies.py
def init_processes_start(self, start_time: float):
    """
    Set the synchronized start time for all worker processes.

    Updates shared state with the benchmark start time to coordinate request
    scheduling across all workers.

    :param start_time: Unix timestamp when request processing should begin
    :raises RuntimeError: If called before init_processes_timings
    """
    if self._processes_init_event is None:
        raise RuntimeError(
            "SchedulingStrategy init_processes_start called before "
            "init_processes_timings"
        )
    if self._processes_start_time is None:
        raise RuntimeError(
            "_processes_lock is not None but _processes_start_time is None"
        )

    with self._processes_start_time.get_lock():
        self._processes_start_time.value = start_time
        self._processes_init_event.set()

init_processes_timings(worker_count, max_concurrency, mp_context)

Initialize shared timing state for multi-process coordination.

Sets up synchronized counters and locks for coordinating request timing across distributed worker processes.

Parameters:

Name Type Description Default
worker_count PositiveInt

Number of worker processes to coordinate

required
max_concurrency PositiveInt

Maximum number of concurrent requests allowed

required
Source code in src/guidellm/scheduler/strategies.py
def init_processes_timings(
    self,
    worker_count: PositiveInt,
    max_concurrency: PositiveInt,
    mp_context: BaseContext,
):
    """
    Initialize shared timing state for multi-process coordination.

    Sets up synchronized counters and locks for coordinating request timing
    across distributed worker processes.

    :param worker_count: Number of worker processes to coordinate
    :param max_concurrency: Maximum number of concurrent requests allowed
    """
    self.worker_count = worker_count
    self.max_concurrency = max_concurrency

    self._processes_init_event = mp_context.Event()
    self._processes_request_index = mp_context.Value("i", 0)
    self._processes_start_time = mp_context.Value("d", -1.0)

next_request_index()

Get the next sequential request index across all worker processes.

Thread-safe counter providing globally unique indices for request timing calculations in distributed environments.

Returns:

Type Description
PositiveInt

Globally unique request index for timing calculations

Raises:

Type Description
RuntimeError

If called before init_processes_timings

Source code in src/guidellm/scheduler/strategies.py
def next_request_index(self) -> PositiveInt:
    """
    Get the next sequential request index across all worker processes.

    Thread-safe counter providing globally unique indices for request timing
    calculations in distributed environments.

    :return: Globally unique request index for timing calculations
    :raises RuntimeError: If called before init_processes_timings
    """
    if self._processes_request_index is None:
        raise RuntimeError(
            "SchedulingStrategy next_request_index called before "
            "init_processes_timings"
        )

    with self._processes_request_index.get_lock():
        self._processes_request_index.value += 1
        return self._processes_request_index.value

next_request_time(worker_index) abstractmethod async

Calculate the scheduled start time for the next request.

Strategy-specific implementation determining when requests should be processed based on timing patterns and worker distribution.

Parameters:

Name Type Description Default
worker_index NonNegativeInt

Worker process index for distributing request timing

required

Returns:

Type Description
float

Unix timestamp when the request should be processed

Source code in src/guidellm/scheduler/strategies.py
@abstractmethod
async def next_request_time(self, worker_index: NonNegativeInt) -> float:
    """
    Calculate the scheduled start time for the next request.

    Strategy-specific implementation determining when requests should be
    processed based on timing patterns and worker distribution.

    :param worker_index: Worker process index for distributing request timing
    :return: Unix timestamp when the request should be processed
    """

request_completed(request_info) abstractmethod

Handle request completion and update internal timing state.

Strategy-specific handling of completed requests to maintain timing coordination and schedule subsequent requests.

Parameters:

Name Type Description Default
request_info RequestInfo

Completed request metadata including timing details and completion status

required
Source code in src/guidellm/scheduler/strategies.py
@abstractmethod
def request_completed(self, request_info: RequestInfo):
    """
    Handle request completion and update internal timing state.

    Strategy-specific handling of completed requests to maintain timing
    coordination and schedule subsequent requests.

    :param request_info: Completed request metadata including timing details
        and completion status
    """

requeue_delay()

Calculate delay before requeuing a conversation.

Default implementation returns zero delay. Strategies can override to implement custom requeue timing logic.

Returns:

Type Description
float

Delay in seconds before the conversation should be requeued.

Source code in src/guidellm/scheduler/strategies.py
def requeue_delay(self) -> float:
    """
    Calculate delay before requeuing a conversation.

    Default implementation returns zero delay. Strategies can override
    to implement custom requeue timing logic.

    :return: Delay in seconds before the conversation should be requeued.
    """
    return 0.0

resolve_dequeued_target_start(worker_index, provisional_start, settings) async

Resolve scheduled start time after dequeue using per-request settings.

Default returns provisional_start unchanged. Strategies with enqueue-bound timing metadata can override to reinterpret settings.

Parameters:

Name Type Description Default
worker_index NonNegativeInt

Worker process index handling the request

required
provisional_start float

Start time from the worker's scheduling slot

required
settings RequestSettings

Per-request scheduling metadata attached at enqueue

required

Returns:

Type Description
float

Unix timestamp when the request should begin processing

Source code in src/guidellm/scheduler/strategies.py
async def resolve_dequeued_target_start(
    self,
    worker_index: NonNegativeInt,
    provisional_start: float,
    settings: RequestSettings,
) -> float:
    """
    Resolve scheduled start time after dequeue using per-request settings.

    Default returns ``provisional_start`` unchanged. Strategies with
    enqueue-bound timing metadata can override to reinterpret settings.

    :param worker_index: Worker process index handling the request
    :param provisional_start: Start time from the worker's scheduling slot
    :param settings: Per-request scheduling metadata attached at enqueue
    :return: Unix timestamp when the request should begin processing
    """
    _ = (worker_index, settings)
    return provisional_start

SerializableConstraintInitializer

Bases: Protocol

Protocol for serializable constraint initializers supporting persistence.

Extends ConstraintInitializer with serialization capabilities, enabling constraint configurations to be saved, loaded, and transmitted. Serializable initializers support validation, model-based configuration, and dictionary-based serialization for integration with configuration systems and persistence layers.

Example: :: class SerializableInitializer: @classmethod def model_validate(cls, data: dict) -> ConstraintInitializer: return cls(**data)

    def model_dump(self) -> dict[str, Any]:
        return {"type_": "max_requests", "max_requests": self.max_requests}

    def create_constraint(self) -> Constraint:
        # ... create constraint
Source code in src/guidellm/scheduler/constraints/constraint.py
@runtime_checkable
class SerializableConstraintInitializer(Protocol):
    """
    Protocol for serializable constraint initializers supporting persistence.

    Extends ConstraintInitializer with serialization capabilities, enabling constraint
    configurations to be saved, loaded, and transmitted. Serializable initializers
    support validation, model-based configuration, and dictionary-based serialization
    for integration with configuration systems and persistence layers.

    Example:
    ::
        class SerializableInitializer:
            @classmethod
            def model_validate(cls, data: dict) -> ConstraintInitializer:
                return cls(**data)

            def model_dump(self) -> dict[str, Any]:
                return {"type_": "max_requests", "max_requests": self.max_requests}

            def create_constraint(self) -> Constraint:
                # ... create constraint
    """

    @classmethod
    def model_validate(cls, **kwargs) -> ConstraintInitializer:
        """
        Create validated constraint initializer from configuration.

        :param kwargs: Configuration dictionary for initializer creation
        :return: Validated constraint initializer instance
        """

    def model_dump(self) -> dict[str, Any]:
        """
        Serialize constraint initializer to dictionary format.

        :return: Dictionary representation of constraint initializer
        """

    def create_constraint(self, **kwargs) -> Constraint:
        """
        Create constraint instance from this initializer.

        :param kwargs: Additional configuration parameters
        :return: Configured constraint evaluation function
        """

create_constraint(**kwargs)

Create constraint instance from this initializer.

Parameters:

Name Type Description Default
kwargs

Additional configuration parameters

{}

Returns:

Type Description
Constraint

Configured constraint evaluation function

Source code in src/guidellm/scheduler/constraints/constraint.py
def create_constraint(self, **kwargs) -> Constraint:
    """
    Create constraint instance from this initializer.

    :param kwargs: Additional configuration parameters
    :return: Configured constraint evaluation function
    """

model_dump()

Serialize constraint initializer to dictionary format.

Returns:

Type Description
dict[str, Any]

Dictionary representation of constraint initializer

Source code in src/guidellm/scheduler/constraints/constraint.py
def model_dump(self) -> dict[str, Any]:
    """
    Serialize constraint initializer to dictionary format.

    :return: Dictionary representation of constraint initializer
    """

model_validate(**kwargs) classmethod

Create validated constraint initializer from configuration.

Parameters:

Name Type Description Default
kwargs

Configuration dictionary for initializer creation

{}

Returns:

Type Description
ConstraintInitializer

Validated constraint initializer instance

Source code in src/guidellm/scheduler/constraints/constraint.py
@classmethod
def model_validate(cls, **kwargs) -> ConstraintInitializer:
    """
    Create validated constraint initializer from configuration.

    :param kwargs: Configuration dictionary for initializer creation
    :return: Validated constraint initializer instance
    """

SynchronousStrategy

Bases: SchedulingStrategy

Sequential request processing with strict single-request-at-a-time execution.

Processes requests one at a time in strict sequential order, providing predictable timing behavior ideal for measuring maximum sequential throughput and ensuring complete request isolation. Each request completes before the next begins.

Source code in src/guidellm/scheduler/strategies.py
@SchedulingStrategy.register("synchronous")
class SynchronousStrategy(SchedulingStrategy):
    """
    Sequential request processing with strict single-request-at-a-time execution.

    Processes requests one at a time in strict sequential order, providing predictable
    timing behavior ideal for measuring maximum sequential throughput and ensuring
    complete request isolation. Each request completes before the next begins.
    """

    type_: Literal["synchronous"] = "synchronous"  # type: ignore[assignment]
    _process_last_request_time: float | None = PrivateAttr(None)

    def __str__(self) -> str:
        """
        :return: String identifier for synchronous strategy
        """
        return "synchronous"

    @property
    def processes_limit(self) -> PositiveInt:
        """
        :return: Always 1 to enforce single-process constraint
        """
        return 1

    @property
    def requests_limit(self) -> PositiveInt:
        """
        :return: Always 1 to enforce single-request constraint
        """
        return 1

    async def next_request_time(self, worker_index: NonNegativeInt) -> float:
        """
        Calculate next request time based on previous completion.

        :param worker_index: Unused for synchronous strategy
        :return: Time of last completion or start time if first request
        """
        _ = worker_index  # unused for synchronous strategy

        if self._process_last_request_time is not None:
            return self._process_last_request_time

        return await self.get_processes_start_time()

    def request_completed(self, request_info: RequestInfo):
        """
        Update timing state with completed request information.

        :param request_info: Completed request metadata including timing
        """
        if request_info.completed_at is not None:
            self._process_last_request_time = request_info.completed_at

processes_limit property

Returns:

Type Description
PositiveInt

Always 1 to enforce single-process constraint

requests_limit property

Returns:

Type Description
PositiveInt

Always 1 to enforce single-request constraint

__str__()

Returns:

Type Description
str

String identifier for synchronous strategy

Source code in src/guidellm/scheduler/strategies.py
def __str__(self) -> str:
    """
    :return: String identifier for synchronous strategy
    """
    return "synchronous"

next_request_time(worker_index) async

Calculate next request time based on previous completion.

Parameters:

Name Type Description Default
worker_index NonNegativeInt

Unused for synchronous strategy

required

Returns:

Type Description
float

Time of last completion or start time if first request

Source code in src/guidellm/scheduler/strategies.py
async def next_request_time(self, worker_index: NonNegativeInt) -> float:
    """
    Calculate next request time based on previous completion.

    :param worker_index: Unused for synchronous strategy
    :return: Time of last completion or start time if first request
    """
    _ = worker_index  # unused for synchronous strategy

    if self._process_last_request_time is not None:
        return self._process_last_request_time

    return await self.get_processes_start_time()

request_completed(request_info)

Update timing state with completed request information.

Parameters:

Name Type Description Default
request_info RequestInfo

Completed request metadata including timing

required
Source code in src/guidellm/scheduler/strategies.py
def request_completed(self, request_info: RequestInfo):
    """
    Update timing state with completed request information.

    :param request_info: Completed request metadata including timing
    """
    if request_info.completed_at is not None:
        self._process_last_request_time = request_info.completed_at

ThroughputStrategy

Bases: SchedulingStrategy

Maximum throughput scheduling with optional concurrency limits.

Schedules requests to maximize system throughput by allowing unlimited concurrent processing with optional constraints. Supports startup ramping to gradually distribute initial requests for controlled system ramp-up.

Source code in src/guidellm/scheduler/strategies.py
@SchedulingStrategy.register("throughput")
class ThroughputStrategy(SchedulingStrategy):
    """
    Maximum throughput scheduling with optional concurrency limits.

    Schedules requests to maximize system throughput by allowing unlimited concurrent
    processing with optional constraints. Supports startup ramping to gradually
    distribute initial requests for controlled system ramp-up.
    """

    type_: Literal["throughput"] = "throughput"  # type: ignore[assignment]
    max_concurrency: PositiveInt | None = Field(
        default=None,
        description="Maximum number of concurrent requests to schedule",
    )
    rampup_duration: NonNegativeFloat = Field(
        default=0.0,
        description=(
            "Duration in seconds to spread initial requests up to max_concurrency "
            "at the beginning of each strategy run"
        ),
    )

    def __str__(self) -> str:
        """
        :return: String identifier for throughput strategy
        """
        return f"throughput@{self.max_concurrency or 'unlimited'}"

    @property
    def processes_limit(self) -> PositiveInt | None:
        """
        :return: Max concurrency if set, otherwise None for unlimited
        """
        return self.max_concurrency

    @property
    def requests_limit(self) -> PositiveInt | None:
        """
        :return: Max concurrency if set, otherwise None for unlimited
        """
        return self.max_concurrency

    async def next_request_time(self, worker_index: int) -> float:
        """
        Calculate next request time with optional startup ramping.

        Spreads initial requests linearly during rampup period, then schedules
        all subsequent requests immediately.

        :param worker_index: Unused for throughput strategy
        :return: Immediate start or ramped start time during startup period
        """
        _ = worker_index  # unused for throughput strategy
        start_time = await self.get_processes_start_time()

        if self.max_concurrency is not None and self.rampup_duration > 0:
            current_index = self.next_request_index()
            delay = (
                self.rampup_duration
                if current_index >= self.max_concurrency
                else self.rampup_duration
                * (current_index / float(self.max_concurrency))
            )

            return start_time + delay
        else:
            return start_time

    def request_completed(self, request_info: RequestInfo):
        """
        Handle request completion (no-op for throughput strategy).

        :param request_info: Completed request metadata (unused)
        """
        _ = request_info  # request_info unused for throughput strategy

processes_limit property

Returns:

Type Description
PositiveInt | None

Max concurrency if set, otherwise None for unlimited

requests_limit property

Returns:

Type Description
PositiveInt | None

Max concurrency if set, otherwise None for unlimited

__str__()

Returns:

Type Description
str

String identifier for throughput strategy

Source code in src/guidellm/scheduler/strategies.py
def __str__(self) -> str:
    """
    :return: String identifier for throughput strategy
    """
    return f"throughput@{self.max_concurrency or 'unlimited'}"

next_request_time(worker_index) async

Calculate next request time with optional startup ramping.

Spreads initial requests linearly during rampup period, then schedules all subsequent requests immediately.

Parameters:

Name Type Description Default
worker_index int

Unused for throughput strategy

required

Returns:

Type Description
float

Immediate start or ramped start time during startup period

Source code in src/guidellm/scheduler/strategies.py
async def next_request_time(self, worker_index: int) -> float:
    """
    Calculate next request time with optional startup ramping.

    Spreads initial requests linearly during rampup period, then schedules
    all subsequent requests immediately.

    :param worker_index: Unused for throughput strategy
    :return: Immediate start or ramped start time during startup period
    """
    _ = worker_index  # unused for throughput strategy
    start_time = await self.get_processes_start_time()

    if self.max_concurrency is not None and self.rampup_duration > 0:
        current_index = self.next_request_index()
        delay = (
            self.rampup_duration
            if current_index >= self.max_concurrency
            else self.rampup_duration
            * (current_index / float(self.max_concurrency))
        )

        return start_time + delay
    else:
        return start_time

request_completed(request_info)

Handle request completion (no-op for throughput strategy).

Parameters:

Name Type Description Default
request_info RequestInfo

Completed request metadata (unused)

required
Source code in src/guidellm/scheduler/strategies.py
def request_completed(self, request_info: RequestInfo):
    """
    Handle request completion (no-op for throughput strategy).

    :param request_info: Completed request metadata (unused)
    """
    _ = request_info  # request_info unused for throughput strategy

TraceReplayStrategy

Bases: SchedulingStrategy

Replay scheduling from a trace of timestamps.

Each request carries a relative_timestamp in RequestSettings from the dataset finalizer. next_request_time schedules dequeue immediately at benchmark start; resolve_dequeued_target_start applies the trace offset via start_time + time_scale * relative_timestamp, reproducing inter-arrival timing under multiprocessing.

Source code in src/guidellm/scheduler/strategies.py
@SchedulingStrategy.register("trace")
class TraceReplayStrategy(SchedulingStrategy):
    """
    Replay scheduling from a trace of timestamps.

    Each request carries a ``relative_timestamp`` in ``RequestSettings`` from the
    dataset finalizer. ``next_request_time`` schedules dequeue immediately at
    benchmark start; ``resolve_dequeued_target_start`` applies the trace offset via
    ``start_time + time_scale * relative_timestamp``, reproducing inter-arrival
    timing under multiprocessing.
    """

    type_: Literal["trace"] = "trace"  # type: ignore[assignment]
    time_scale: float = Field(
        default=1.0,
        gt=0,
        description="Scale factor applied to relative timestamps from the dataset",
    )

    def __str__(self) -> str:
        return f"trace@{self.time_scale:.2f}"

    @property
    def processes_limit(self) -> PositiveInt | None:
        return None

    @property
    def requests_limit(self) -> PositiveInt | None:
        return None

    async def next_request_time(self, worker_index: NonNegativeInt) -> float:
        _ = worker_index
        return await self.get_processes_start_time()

    async def resolve_dequeued_target_start(
        self,
        worker_index: NonNegativeInt,
        provisional_start: float,
        settings: RequestSettings,
    ) -> float:
        _ = (worker_index, provisional_start)
        if settings.relative_timestamp is None:
            return await self.get_processes_start_time()
        start_time = await self.get_processes_start_time()
        return start_time + self.time_scale * settings.relative_timestamp

    def request_completed(self, request_info: RequestInfo):
        _ = request_info

UnserializableConstraintInitializer

Bases: PydanticConstraintInitializer

Placeholder for constraints that cannot be serialized or executed.

Represents constraint initializers that failed serialization or contain non-serializable components. Cannot be executed and raises errors when invoked to prevent runtime failures from invalid constraint state. Used by the factory system to preserve constraint information even when full serialization is not possible.

Example: :: # Created automatically by factory when serialization fails unserializable = UnserializableConstraintInitializer( orig_info={"type_": "custom", "data": non_serializable_object} )

# Attempting to use it raises RuntimeError
constraint = unserializable.create_constraint()  # Raises RuntimeError

Attributes:

Name Type Description
type_ Literal['unserializable']

Always "unserializable" to identify placeholder constraints

orig_info dict[str, Any]

Original constraint information before serialization failure

Source code in src/guidellm/scheduler/constraints/constraint.py
class UnserializableConstraintInitializer(PydanticConstraintInitializer):
    """
    Placeholder for constraints that cannot be serialized or executed.

    Represents constraint initializers that failed serialization or contain
    non-serializable components. Cannot be executed and raises errors when
    invoked to prevent runtime failures from invalid constraint state. Used
    by the factory system to preserve constraint information even when full
    serialization is not possible.

    Example:
    ::
        # Created automatically by factory when serialization fails
        unserializable = UnserializableConstraintInitializer(
            orig_info={"type_": "custom", "data": non_serializable_object}
        )

        # Attempting to use it raises RuntimeError
        constraint = unserializable.create_constraint()  # Raises RuntimeError

    :cvar type_: Always "unserializable" to identify placeholder constraints
    :cvar orig_info: Original constraint information before serialization failure
    """

    type_: Literal["unserializable"] = "unserializable"  # type: ignore[assignment]
    orig_info: dict[str, Any] = Field(
        default_factory=dict,
        description="Original constraint information before serialization failure",
    )

    def create_constraint(self, **_kwargs) -> Constraint:
        """
        Raise error for unserializable constraint creation attempt.

        :param kwargs: Additional keyword arguments (unused)
        :raises RuntimeError: Always raised since unserializable constraints
            cannot be executed
        """
        raise RuntimeError(
            "Cannot create constraint from unserializable constraint instance. "
            "This constraint cannot be serialized and therefore cannot be executed."
        )

    def __call__(
        self, state: SchedulerState, request: RequestInfo
    ) -> SchedulerUpdateAction:
        """
        Raise error since unserializable constraints cannot be invoked.

        :param state: Current scheduler state (unused)
        :param request: Individual request information (unused)
        :raises RuntimeError: Always raised for unserializable constraints
        """
        _ = (state, request)  # Unused parameters
        raise RuntimeError(
            "Cannot invoke unserializable constraint instance. "
            "This constraint was not properly serialized and cannot be executed."
        )

__call__(state, request)

Raise error since unserializable constraints cannot be invoked.

Parameters:

Name Type Description Default
state SchedulerState

Current scheduler state (unused)

required
request RequestInfo

Individual request information (unused)

required

Raises:

Type Description
RuntimeError

Always raised for unserializable constraints

Source code in src/guidellm/scheduler/constraints/constraint.py
def __call__(
    self, state: SchedulerState, request: RequestInfo
) -> SchedulerUpdateAction:
    """
    Raise error since unserializable constraints cannot be invoked.

    :param state: Current scheduler state (unused)
    :param request: Individual request information (unused)
    :raises RuntimeError: Always raised for unserializable constraints
    """
    _ = (state, request)  # Unused parameters
    raise RuntimeError(
        "Cannot invoke unserializable constraint instance. "
        "This constraint was not properly serialized and cannot be executed."
    )

create_constraint(**_kwargs)

Raise error for unserializable constraint creation attempt.

Parameters:

Name Type Description Default
kwargs

Additional keyword arguments (unused)

required

Raises:

Type Description
RuntimeError

Always raised since unserializable constraints cannot be executed

Source code in src/guidellm/scheduler/constraints/constraint.py
def create_constraint(self, **_kwargs) -> Constraint:
    """
    Raise error for unserializable constraint creation attempt.

    :param kwargs: Additional keyword arguments (unused)
    :raises RuntimeError: Always raised since unserializable constraints
        cannot be executed
    """
    raise RuntimeError(
        "Cannot create constraint from unserializable constraint instance. "
        "This constraint cannot be serialized and therefore cannot be executed."
    )

WorkerProcess

Bases: Generic[RequestT, ResponseT]

Worker process for distributed request execution in the scheduler system.

Manages complete request lifecycle including queue consumption, backend processing, timing strategy application, and status publication. Coordinates with other workers through synchronization primitives while maintaining concurrency limits and handling graceful shutdown scenarios including errors and cancellations.

Example: :: worker = WorkerProcess( worker_index=0, messaging=messaging_interface, backend=backend_instance, strategy=timing_strategy, async_limit=10, fut_scheduling_time_limit=5.0, startup_barrier=barrier, requests_generated_event=generated_event, constraint_reached_event=constraint_event, shutdown_event=shutdown, error_event=error, ) worker.run()

Source code in src/guidellm/scheduler/worker.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
class WorkerProcess(Generic[RequestT, ResponseT]):
    """
    Worker process for distributed request execution in the scheduler system.

    Manages complete request lifecycle including queue consumption, backend processing,
    timing strategy application, and status publication. Coordinates with other workers
    through synchronization primitives while maintaining concurrency limits and handling
    graceful shutdown scenarios including errors and cancellations.

    Example:
    ::
        worker = WorkerProcess(
            worker_index=0,
            messaging=messaging_interface,
            backend=backend_instance,
            strategy=timing_strategy,
            async_limit=10,
            fut_scheduling_time_limit=5.0,
            startup_barrier=barrier,
            requests_generated_event=generated_event,
            constraint_reached_event=constraint_event,
            shutdown_event=shutdown,
            error_event=error,
        )
        worker.run()
    """

    def __init__(
        self,
        worker_index: int,
        messaging: InterProcessMessaging[
            tuple[ResponseT | None, RequestT, RequestInfo],
            ConversationT[RequestT],
        ],
        backend: BackendInterface[RequestT, ResponseT],
        strategy: SchedulingStrategy,
        async_limit: int,
        fut_scheduling_time_limit: float,
        startup_barrier: ProcessingBarrier,
        requests_generated_event: ProcessingEvent,
        constraint_reached_event: ProcessingEvent,
        shutdown_event: ProcessingEvent,
        error_event: ProcessingEvent,
    ):
        """
        Initialize worker process instance.

        :param worker_index: Unique identifier for this worker within the process group
        :param messaging: Inter-process messaging interface for request coordination
        :param backend: Backend interface for processing requests
        :param strategy: Scheduling strategy for determining request timing
        :param async_limit: Maximum concurrent requests this worker can process
        :param fut_scheduling_time_limit: Maximum time in seconds to schedule requests
            into the future
        :param startup_barrier: Synchronization barrier for coordinated startup
        :param requests_generated_event: Event signaling request generation completion
        :param constraint_reached_event: Event signaling processing constraint reached
        :param shutdown_event: Event signaling graceful shutdown request
        :param error_event: Event signaling error conditions across processes
        """
        self.worker_index = worker_index
        self.messaging = messaging
        self.backend = backend
        self.strategy = strategy
        self.async_limit = async_limit
        self.fut_scheduling_time_limit = fut_scheduling_time_limit
        self.startup_barrier = startup_barrier
        self.requests_generated_event = requests_generated_event
        self.constraint_reached_event = constraint_reached_event
        self.shutdown_event = shutdown_event
        self.error_event = error_event

        # Internal states
        self.startup_completed = False
        self.backend_started = False
        self.messaging_started = False
        self.turns_queue: list[
            tuple[HistoryT[RequestT, ResponseT], ConversationT[RequestT]]
        ] = []

    def run(self):
        """
        Main entry point for worker process execution.

        Initializes asyncio event loop with optional uvloop optimization and executes
        worker async operations. Handles event loop cleanup and error propagation.

        :raises RuntimeError: If worker encounters unrecoverable error during execution
        """
        try:
            if HAS_UVLOOP:
                asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
            asyncio.run(self.run_async())
        except Exception as err:
            self.error_event.set()
            raise RuntimeError(
                f"Worker process {self.messaging.worker_index} encountered an "
                f"error: {err}"
            ) from err

    async def run_async(self):
        """
        Execute main asynchronous worker process logic.

        Orchestrates concurrent execution of request processing and shutdown monitoring.
        Handles task cleanup, error propagation, and cancellation coordination when any
        task completes or encounters an error.

        :raises RuntimeError: If worker tasks encounter unrecoverable errors
        :raises asyncio.CancelledError: If worker process was cancelled
        """
        stop_task = asyncio.create_task(self._stop_monitor())
        request_proc_task = asyncio.create_task(self._process_requests())
        caller_cancelled = False

        try:
            await asyncio.wait(
                [stop_task, request_proc_task],
                return_when=asyncio.FIRST_COMPLETED,
            )
        except asyncio.CancelledError:
            caller_cancelled = True

        stop_task.cancel()
        request_proc_task.cancel()

        try:
            # Ensure all child tasks cancel correctly
            await asyncio.wait(
                [stop_task, request_proc_task], return_when=asyncio.ALL_COMPLETED
            )
        except asyncio.CancelledError:
            caller_cancelled = True

        if (
            task_err := (
                request_proc_task.exception()
                if not request_proc_task.cancelled()
                else stop_task.exception()
                if not stop_task.cancelled()
                else None
            )
        ) is not None:
            raise RuntimeError(
                f"Worker process {self.messaging.worker_index} encountered an "
                f"error: {task_err}"
            ) from task_err

        if caller_cancelled:
            raise asyncio.CancelledError("Worker process was cancelled")

    async def _stop_monitor(
        self,
    ) -> None:
        """
        Monitor shutdown and error events for worker termination.
        :raises RuntimeError if the work process received an error signal.
        """
        exit_key = await wait_for_sync_objects(
            {
                "error_event": self.error_event,
                "shutdown_event": self.shutdown_event,
            },
            poll_interval=self.messaging.poll_interval,
        )

        if exit_key == "error_event":
            raise RuntimeError(
                f"Worker process {self.messaging.worker_index} received error signal."
            )

    async def _process_requests(self):
        """
        Manage request processing lifecycle from startup to shutdown.

        Coordinates startup synchronization, processes requests until constraints are
        reached, then cancels pending requests until shutdown or error occurs.
        """
        try:
            # 1. Start up synchronization (backend, messaging, and other processes)
            # 2. Messaging startup, receive requests until requests_generated event
            await self._processing_startup()

            # 3. Run process requests loop until constraint_reached event
            processing_task = asyncio.create_task(self._process_requests_loop())
            await wait_for_sync_event(
                self.constraint_reached_event,
                poll_interval=self.messaging.poll_interval,
            )
            processing_task.cancel()

            # 4. Cancel pending requests until proc canceled (manual, shutdown, error)
            await self._cancel_requests_loop()
        finally:
            # 5. On cancel, shut down event, error event, or internal error:
            #    attempt to shut down this worker cleanly (stop backend and messaging)
            await self._processing_shutdown()

    async def _processing_startup(self):
        """Initialize backend, messaging, and synchronize with other workers."""
        # Get backend ready
        await self.backend.process_startup()
        self.backend_started = True
        await self.backend.validate()

        # Get messaging system ready
        await self.messaging.start(
            receive_stop_criteria=[self.requests_generated_event]
        )
        self.messaging_started = True

        # Wait for all processes to be ready
        await wait_for_sync_barrier(
            self.startup_barrier,
            poll_interval=self.messaging.poll_interval,
        )

        self.startup_completed = True

    async def _processing_shutdown(self):
        if self.backend_started:
            await self.backend.process_shutdown()
            self.backend_started = False

        if self.messaging_started:
            await self.messaging.stop()
            self.messaging_started = False

        self.startup_completed = False

    async def _process_requests_loop(self):
        """
        Process requests continuously until cancelled with concurrency limits.

        Schedules and processes requests according to the timing strategy while
        maintaining the configured concurrency limit through semaphore coordination.
        """
        try:
            # Run request processing
            async_semaphore = asyncio.Semaphore(self.async_limit)
            pending_tasks: set[asyncio.Task] = set()

            def _task_done(task: asyncio.Task[ProcessRequestT[RequestT, ResponseT]]):
                pending_tasks.discard(task)
                async_semaphore.release()

                if not task.cancelled():
                    if exception := task.exception():
                        raise exception

                    history, conversation, info = task.result()
                    if conversation:
                        requeue_task = asyncio.create_task(
                            self._wait_then_requeue(
                                history,
                                conversation,
                                self.strategy.requeue_delay(),
                            )
                        )
                        pending_tasks.add(requeue_task)

            # Main loop; loop until canceled
            while True:
                await async_semaphore.acquire()
                request_time = await self.strategy.next_request_time(
                    worker_index=self.worker_index
                )

                if (
                    time_until := request_time - time.time()
                ) >= self.fut_scheduling_time_limit:
                    await asyncio.sleep(time_until - self.fut_scheduling_time_limit)

                request_task = asyncio.create_task(
                    self._process_next_request(target_start=request_time)
                )
                pending_tasks.add(request_task)
                request_task.add_done_callback(_task_done)
        except asyncio.CancelledError as err:
            for task in pending_tasks:
                task.cancel()
            await asyncio.gather(*pending_tasks, return_exceptions=True)

            raise err

    async def _cancel_requests_loop(self):
        """Cancel all remaining queued requests until worker process terminates."""
        while True:
            try:
                _, conversation = (
                    self.turns_queue.pop(0)
                    if self.turns_queue
                    else (
                        None,
                        await self.messaging.get(timeout=self.messaging.poll_interval),
                    )
                )
            except asyncio.TimeoutError:
                continue

            for request, request_info in conversation:
                request_info.scheduler_node_id = self.messaging.worker_index or -1
                request_info.error = "Request was cancelled"
                request_info.timings.resolve_end = time.time()
                self._send_update("cancelled", None, request, request_info)

    async def _process_next_request(  # noqa: C901
        self, target_start: float
    ) -> ProcessRequestT[RequestT, ResponseT]:
        """
        Process a single request from queue to completion.

        Retrieves request from messaging queue, applies timing strategy, processes
        through backend, and publishes status updates throughout the lifecycle.

        :param target_start: Unix timestamp when request should begin processing
        """
        conversation: ConversationT[RequestT] = []
        history: HistoryT[RequestT, ResponseT] = []
        request: RequestT | None = None
        request_info: RequestInfo | None = None
        response: ResponseT | None = None
        premature_exit: bool = False

        try:
            # Pull request from the queue, update state, and send "pending" update
            history, conversation = await self._dequeue_next_conversation(target_start)
            request, request_info = conversation.pop(0)

            effective_target_start = await self.strategy.resolve_dequeued_target_start(
                self.worker_index,
                target_start,
                request_info.settings,
            )
            if effective_target_start != target_start:
                request_info.timings.targeted_start = effective_target_start
                self._send_update("pending", None, request, request_info)

            # Schedule the request and send "in_progress" update
            await self._schedule_request(request, request_info, effective_target_start)

            async for resp, info in self.backend.resolve(  # type: ignore[attr-defined]
                request, request_info, history or None
            ):
                request_info = info
                if request_info is None:
                    raise RuntimeError("Received invalid request info from backend")

                if (
                    resp is None
                    and request_info.timings.first_token_iteration is not None
                ):
                    self._send_update("first_token", None, request, request_info)

                response = resp

            # Complete the request
            request_info.timings.resolve_end = time.time()
            self._send_update("completed", response, request, request_info)

            # Record Turn
            history.append((request, response))

            response = request = request_info = None
        except asyncio.CancelledError:
            premature_exit = True
            # Handle cancellation
            if request is not None and request_info is not None:
                request_info.error = "Request was cancelled"
                request_info.timings.resolve_end = time.time()
                self._send_update("cancelled", response, request, request_info)
            raise
        except Exception as exc:  # noqa: BLE001
            premature_exit = True
            if request is not None and request_info is not None:
                request_info.error = repr(exc)
                request_info.traceback = traceback.format_exc()
                request_info.timings.resolve_end = time.time()
                self._send_update("errored", response, request, request_info)
                logger.opt(exception=True).debug(
                    f"Backend exception for request {request_info.request_id}"
                )
        finally:
            if request_info is not None:
                self.strategy.request_completed(request_info)
            if premature_exit and conversation:
                for request, request_info in conversation:
                    request_info.error = "Request was cancelled"
                    request_info.timings.resolve_end = time.time()
                    self._send_update("cancelled", None, request, request_info)
                # Clear conversation on premature exit
                conversation = []

        return history, conversation, request_info

    async def _dequeue_next_conversation(
        self, target_start: float
    ) -> tuple[HistoryT[RequestT, ResponseT], ConversationT[RequestT]]:
        history, conversation = (
            self.turns_queue.pop(0)
            if self.turns_queue
            else ([], await self.messaging.get())
        )
        request, request_info = conversation[0]
        dequeued_time = time.time()  # Ensure accurate dequeue timing
        if request is None or request_info is None:
            raise RuntimeError("Received invalid request or request info")

        request_info.timings.dequeued = dequeued_time
        request_info.scheduler_node_id = self.messaging.worker_index or -1
        request_info.timings.targeted_start = target_start
        self._send_update("pending", None, request, request_info)
        return history, conversation

    async def _wait_then_requeue(
        self,
        history: HistoryT[RequestT, ResponseT],
        conversation: ConversationT[RequestT],
        requeue_delay: float,
    ):
        try:
            if requeue_delay > 0:
                await asyncio.sleep(requeue_delay)
        finally:
            # Always requeue so that if we were cancelled during sleep
            # the whole conversation can be cancelled properly later
            self.turns_queue.append((history, conversation))

    async def _schedule_request(
        self, request: RequestT, request_info: RequestInfo, target_start: float
    ):
        request_info.timings.scheduled_at = request_info.timings.dequeued
        if target_start > (current_time := time.time()):
            await asyncio.sleep(target_start - current_time)
            # Adapt delay so that scheduled at reflects the sleep time
            request_info.timings.scheduled_at = target_start

        # Process the request with the backend
        request_info.timings.resolve_start = time.time()
        self._send_update("in_progress", None, request, request_info)

    def _send_update(
        self,
        new_status: Literal[
            "pending",
            "in_progress",
            "first_token",
            "completed",
            "errored",
            "cancelled",
        ],
        response: ResponseT | None,
        request: RequestT,
        request_info: RequestInfo,
    ):
        """
        Publish request status update through messaging system.

        Updates request status and publishes to messaging queue for coordinator
        consumption. Prevents duplicate status updates for the same state.

        :param new_status: New status for the request
        :param response: Response object if available, None otherwise
        :param request: Request object being processed
        :param request_info: Request metadata and timing information
        :raises Exception: If messaging system fails to publish the update
        """
        prev_status = request_info.status

        if new_status == prev_status:
            # already sent this update, don't send again
            return

        try:
            request_info.status = new_status
            request_info = (
                request_info.model_copy()
                if new_status not in {"completed", "errored", "cancelled"}
                else request_info  # last update, don't need to copy
            )
            self.messaging.put_sync(
                (response, request, request_info),
                timeout=-1,
            )
            prev_status = new_status
        except Exception as exc:
            # Reset status to last one that succeeded or started function with
            # Calling logic can retry after handling error, if possible
            request_info.status = prev_status
            raise exc

__init__(worker_index, messaging, backend, strategy, async_limit, fut_scheduling_time_limit, startup_barrier, requests_generated_event, constraint_reached_event, shutdown_event, error_event)

Initialize worker process instance.

Parameters:

Name Type Description Default
worker_index int

Unique identifier for this worker within the process group

required
messaging InterProcessMessaging[tuple[ResponseT | None, RequestT, RequestInfo], ConversationT[RequestT]]

Inter-process messaging interface for request coordination

required
backend BackendInterface[RequestT, ResponseT]

Backend interface for processing requests

required
strategy SchedulingStrategy

Scheduling strategy for determining request timing

required
async_limit int

Maximum concurrent requests this worker can process

required
fut_scheduling_time_limit float

Maximum time in seconds to schedule requests into the future

required
startup_barrier Barrier

Synchronization barrier for coordinated startup

required
requests_generated_event Event

Event signaling request generation completion

required
constraint_reached_event Event

Event signaling processing constraint reached

required
shutdown_event Event

Event signaling graceful shutdown request

required
error_event Event

Event signaling error conditions across processes

required
Source code in src/guidellm/scheduler/worker.py
def __init__(
    self,
    worker_index: int,
    messaging: InterProcessMessaging[
        tuple[ResponseT | None, RequestT, RequestInfo],
        ConversationT[RequestT],
    ],
    backend: BackendInterface[RequestT, ResponseT],
    strategy: SchedulingStrategy,
    async_limit: int,
    fut_scheduling_time_limit: float,
    startup_barrier: ProcessingBarrier,
    requests_generated_event: ProcessingEvent,
    constraint_reached_event: ProcessingEvent,
    shutdown_event: ProcessingEvent,
    error_event: ProcessingEvent,
):
    """
    Initialize worker process instance.

    :param worker_index: Unique identifier for this worker within the process group
    :param messaging: Inter-process messaging interface for request coordination
    :param backend: Backend interface for processing requests
    :param strategy: Scheduling strategy for determining request timing
    :param async_limit: Maximum concurrent requests this worker can process
    :param fut_scheduling_time_limit: Maximum time in seconds to schedule requests
        into the future
    :param startup_barrier: Synchronization barrier for coordinated startup
    :param requests_generated_event: Event signaling request generation completion
    :param constraint_reached_event: Event signaling processing constraint reached
    :param shutdown_event: Event signaling graceful shutdown request
    :param error_event: Event signaling error conditions across processes
    """
    self.worker_index = worker_index
    self.messaging = messaging
    self.backend = backend
    self.strategy = strategy
    self.async_limit = async_limit
    self.fut_scheduling_time_limit = fut_scheduling_time_limit
    self.startup_barrier = startup_barrier
    self.requests_generated_event = requests_generated_event
    self.constraint_reached_event = constraint_reached_event
    self.shutdown_event = shutdown_event
    self.error_event = error_event

    # Internal states
    self.startup_completed = False
    self.backend_started = False
    self.messaging_started = False
    self.turns_queue: list[
        tuple[HistoryT[RequestT, ResponseT], ConversationT[RequestT]]
    ] = []

run()

Main entry point for worker process execution.

Initializes asyncio event loop with optional uvloop optimization and executes worker async operations. Handles event loop cleanup and error propagation.

Raises:

Type Description
RuntimeError

If worker encounters unrecoverable error during execution

Source code in src/guidellm/scheduler/worker.py
def run(self):
    """
    Main entry point for worker process execution.

    Initializes asyncio event loop with optional uvloop optimization and executes
    worker async operations. Handles event loop cleanup and error propagation.

    :raises RuntimeError: If worker encounters unrecoverable error during execution
    """
    try:
        if HAS_UVLOOP:
            asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
        asyncio.run(self.run_async())
    except Exception as err:
        self.error_event.set()
        raise RuntimeError(
            f"Worker process {self.messaging.worker_index} encountered an "
            f"error: {err}"
        ) from err

run_async() async

Execute main asynchronous worker process logic.

Orchestrates concurrent execution of request processing and shutdown monitoring. Handles task cleanup, error propagation, and cancellation coordination when any task completes or encounters an error.

Raises:

Type Description
RuntimeError

If worker tasks encounter unrecoverable errors

asyncio.CancelledError

If worker process was cancelled

Source code in src/guidellm/scheduler/worker.py
async def run_async(self):
    """
    Execute main asynchronous worker process logic.

    Orchestrates concurrent execution of request processing and shutdown monitoring.
    Handles task cleanup, error propagation, and cancellation coordination when any
    task completes or encounters an error.

    :raises RuntimeError: If worker tasks encounter unrecoverable errors
    :raises asyncio.CancelledError: If worker process was cancelled
    """
    stop_task = asyncio.create_task(self._stop_monitor())
    request_proc_task = asyncio.create_task(self._process_requests())
    caller_cancelled = False

    try:
        await asyncio.wait(
            [stop_task, request_proc_task],
            return_when=asyncio.FIRST_COMPLETED,
        )
    except asyncio.CancelledError:
        caller_cancelled = True

    stop_task.cancel()
    request_proc_task.cancel()

    try:
        # Ensure all child tasks cancel correctly
        await asyncio.wait(
            [stop_task, request_proc_task], return_when=asyncio.ALL_COMPLETED
        )
    except asyncio.CancelledError:
        caller_cancelled = True

    if (
        task_err := (
            request_proc_task.exception()
            if not request_proc_task.cancelled()
            else stop_task.exception()
            if not stop_task.cancelled()
            else None
        )
    ) is not None:
        raise RuntimeError(
            f"Worker process {self.messaging.worker_index} encountered an "
            f"error: {task_err}"
        ) from task_err

    if caller_cancelled:
        raise asyncio.CancelledError("Worker process was cancelled")

WorkerProcessGroup

Bases: Generic[RequestT, ResponseT]

Orchestrates multiple worker processes for distributed request processing.

Manages process lifecycle, request distribution, response collection, and state synchronization across workers. Handles dynamic scaling, load balancing, and constraint evaluation with graceful shutdown coordination for high-throughput request processing workloads.

Example: :: from guidellm.scheduler.worker_group import WorkerProcessGroup

group = WorkerProcessGroup(
    requests=request_iterable,
    backend=backend_instance,
    strategy=scheduling_strategy,
    constraints={"max_time": time_constraint}
)

await group.create_processes()
await group.start(time.time())

async for response, request, info, state in group.request_updates():
    if response is not None:
        # Process completed request
        handle_response(response)

await group.shutdown()
Source code in src/guidellm/scheduler/worker_group.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
class WorkerProcessGroup(Generic[RequestT, ResponseT]):
    """
    Orchestrates multiple worker processes for distributed request processing.

    Manages process lifecycle, request distribution, response collection, and state
    synchronization across workers. Handles dynamic scaling, load balancing, and
    constraint evaluation with graceful shutdown coordination for high-throughput
    request processing workloads.

    Example:
    ::
        from guidellm.scheduler.worker_group import WorkerProcessGroup

        group = WorkerProcessGroup(
            requests=request_iterable,
            backend=backend_instance,
            strategy=scheduling_strategy,
            constraints={"max_time": time_constraint}
        )

        await group.create_processes()
        await group.start(time.time())

        async for response, request, info, state in group.request_updates():
            if response is not None:
                # Process completed request
                handle_response(response)

        await group.shutdown()
    """

    def __init__(
        self,
        requests: DatasetIterT[RequestT],
        backend: BackendInterface[RequestT, ResponseT],
        strategy: SchedulingStrategy,
        **constraints: Constraint,
    ):
        """
        Initialize a worker process group for distributed request processing.

        :param requests: Finite iterable of requests to process sequentially
        :param backend: Backend interface for processing requests
        :param strategy: Scheduling strategy for request timing and distribution
        :param constraints: Named constraints for controlling execution behavior
        """
        self.requests = iter(requests)
        self.backend = backend
        self.strategy = strategy
        self.constraints = constraints

        # Multiprocessing contexts and primitives, created in create_processes
        self.mp_context: BaseContext | None = None
        self.mp_manager: BaseManager | None = None
        self.processes: list[BaseProcess] | None = None
        self.startup_barrier: Barrier | None = None
        self.requests_generated_event: Event | None = None
        self.constraint_reached_event: Event | None = None
        self.shutdown_event: Event | None = None
        self.error_event: Event | None = None

        # Background health monitor, created in create_processes
        self._health_monitor_task: asyncio.Task | None = None
        self._worker_error_details: str | None = None

        # Scheduler and messaging state, created in start
        self.state: WorkerGroupState[RequestT, ResponseT] | None = None
        self.messaging: WorkGroupMessengerT[RequestT, ResponseT] | None = None

    async def create_processes(self):
        """
        Create and initialize worker processes for distributed request processing.

        Sets up multiprocessing infrastructure and worker processes based on
        strategy constraints, backend capabilities, and system configuration.
        Determines optimal process count and concurrency limits, then spawns
        worker processes with distributed request handling capabilities.

        :raises RuntimeError: If process initialization or startup fails
        """
        # Processes limits and params
        max_conc: int
        if (
            requests_limit := min(
                self.strategy.requests_limit or math.inf,
                self.backend.requests_limit or math.inf,
            )
        ) != math.inf:
            max_conc = int(requests_limit)
        else:
            # If concurrency not specified, use settings
            max_conc = settings.max_concurrency
        if max_conc <= 0:
            raise RuntimeError("max_concurrency resolved to 0; increase limits/config")

        # Calculate number of processes, ensure we don't exceed the max concurrency,
        # or limits from the backend, strategy, or user settings
        num_processes: int = int(
            min(
                max_conc,
                self.strategy.processes_limit or math.inf,
                self.backend.processes_limit or math.inf,
                settings.max_worker_processes,
            )
        )
        if num_processes <= 0:
            raise RuntimeError("num_processes resolved to 0; increase limits/config")

        per_proc_max_conc = max_conc // num_processes
        max_pending_size = max(
            1, math.floor(max_conc * settings.mp_max_pending_buffer_percent)
        )
        per_proc_max_buffer_size = 1

        # Initialize multiprocessing components
        self.mp_context = get_context(settings.mp_context_type)
        self.mp_manager = self.mp_context.Manager()
        self.startup_barrier = self.mp_context.Barrier(num_processes + 1)
        self.requests_generated_event = self.mp_context.Event()
        self.constraint_reached_event = self.mp_context.Event()
        self.shutdown_event = self.mp_context.Event()
        self.error_event = self.mp_context.Event()

        if settings.mp_messaging_object == "queue":
            self.messaging = InterProcessMessagingQueue(
                mp_context=self.mp_context,
                serialization=settings.mp_serialization,
                encoding=settings.mp_encoding,
                max_pending_size=max_pending_size,
                max_buffer_send_size=settings.mp_requests_send_buffer_size,
                poll_interval=settings.mp_poll_interval,
            )
        elif settings.mp_messaging_object == "manager_queue":
            self.messaging = InterProcessMessagingManagerQueue(
                manager=self.mp_manager,
                mp_context=self.mp_context,
                serialization=settings.mp_serialization,
                encoding=settings.mp_encoding,
                max_pending_size=max_pending_size,
                max_buffer_send_size=settings.mp_requests_send_buffer_size,
                poll_interval=settings.mp_poll_interval,
            )
        elif settings.mp_messaging_object == "pipe":
            self.messaging = InterProcessMessagingPipe(
                num_workers=num_processes,
                mp_context=self.mp_context,
                serialization=settings.mp_serialization,
                encoding=settings.mp_encoding,
                max_pending_size=max_pending_size,
                max_buffer_send_size=settings.mp_requests_send_buffer_size,
                poll_interval=settings.mp_poll_interval,
            )

        # Initialize worker processes
        self.processes = []
        self.strategy.init_processes_timings(
            worker_count=num_processes,
            max_concurrency=max_conc,
            mp_context=self.mp_context,
        )
        for rank in range(num_processes):
            # Distribute any remainder across the first N ranks
            async_limit = per_proc_max_conc + (
                1 if rank < (max_conc % num_processes) else 0
            )

            worker = WorkerProcess[RequestT, ResponseT](
                worker_index=rank,
                messaging=self.messaging.create_worker_copy(  # type: ignore[arg-type]
                    worker_index=rank,
                    max_buffer_send_size=None,
                    max_buffer_receive_size=per_proc_max_buffer_size,
                ),  # The non-group worker lacks the SchedulerState type. Type err.
                backend=self.backend,
                strategy=self.strategy,
                async_limit=async_limit,
                fut_scheduling_time_limit=0.0,
                startup_barrier=self.startup_barrier,
                requests_generated_event=self.requests_generated_event,
                constraint_reached_event=self.constraint_reached_event,
                shutdown_event=self.shutdown_event,
                error_event=self.error_event,
            )
            proc = self.mp_context.Process(target=worker.run, daemon=False)
            proc.start()
            self.processes.append(proc)

        self._health_monitor_task = asyncio.create_task(self._process_health_monitor())

        wait_key = await wait_for_sync_objects(
            {
                "startup_barrier": self.startup_barrier,
                "shutdown_event": self.shutdown_event,
                "error_event": self.error_event,
            },
            poll_interval=settings.mp_poll_interval,
        )

        if wait_key == "error_event":
            detail = self._worker_error_details or "error_event is set"
            raise RuntimeError(f"Worker process group startup failed: {detail}")

    async def _process_health_monitor(self):
        """Detect worker processes killed by OS signals (e.g. SIGSEGV, OOM)
        that bypass Python exception handling and never set error_event."""
        while self.processes:
            await asyncio.sleep(settings.mp_poll_interval)
            dead: list[str] = []
            killed_by_signal = False
            for proc in self.processes:
                if (
                    not proc.is_alive()
                    and proc.exitcode is not None
                    and proc.exitcode != 0
                ):
                    if proc.exitcode < 0:
                        killed_by_signal = True
                        exit_info = f"signal {-proc.exitcode}"
                    else:
                        exit_info = f"exit code {proc.exitcode}"
                    detail = (
                        f"Worker process {proc.pid} died unexpectedly ({exit_info})"
                    )
                    logger.error(detail)
                    dead.append(detail)

            if dead:
                message = "; ".join(dead)
                if killed_by_signal:
                    message += ". Check system logs for details"
                self._worker_error_details = message
                if self.error_event is not None:
                    self.error_event.set()
                return

    async def start(self, start_time: float):
        """
        Begin request processing at the specified start time.

        Initializes scheduler state and background tasks, then waits until the
        specified start time before beginning operations. Sets up inter-process
        communication and coordinates synchronized startup across all workers.

        :param start_time: Unix timestamp when processing should begin
        :raises RuntimeError: If workers encounter errors during startup or
            if create_processes() was not called first
        """
        if (
            not self.processes
            or not self.requests_generated_event
            or not self.constraint_reached_event
            or not self.shutdown_event
            or not self.error_event
            or not self.messaging
        ):
            raise RuntimeError("create_processes() must be called before start()")

        self.strategy.init_processes_start(start_time=start_time)
        stop_send_requests_event = threading.Event()
        send_requests_stopped_event = threading.Event()
        self.state = WorkerGroupState[RequestT, ResponseT](
            start_time=start_time,
            processes=self.processes,
            strategy=self.strategy,
            constraints=self.constraints,
            stop_send_requests_event=stop_send_requests_event,
            send_requests_stopped_event=send_requests_stopped_event,
            requests_generated_event=self.requests_generated_event,
            constraint_reached_event=self.constraint_reached_event,
            shutdown_event=self.shutdown_event,
            error_event=self.error_event,
            messaging=self.messaging,
        )
        await self.messaging.start(
            send_items=self.state.requests_generator(self.requests),
            receive_callback=self.state.received_callback,
            send_stopped_event=send_requests_stopped_event,
            send_stop_criteria=[stop_send_requests_event],
            receive_stop_criteria=[self.shutdown_event],
        )

        if (wait_time := start_time - time.time()) > 0:
            await asyncio.sleep(wait_time)
        if self.error_event.is_set():
            detail = (
                self._worker_error_details
                or "an error occurred in one of the worker processes"
            )
            raise RuntimeError(f"error_event is set in WorkerProcessGroup: {detail}")

    async def request_updates(
        self,
    ) -> AsyncIterator[
        tuple[
            ResponseT | None,
            RequestT,
            RequestInfo,
            SchedulerState,
        ]
    ]:
        """
        Yield request processing updates as they become available.

        Returns an async iterator of request updates including response, request,
        request scheduling info, and scheduler state. Updates occur on request queued,
        processing start, and completion. Response is None until processing completes.

        :return: Async iterator yielding (response, request, request_info, state)
            tuples where response is None until processing is complete
        :raises RuntimeError: If workers encounter unrecoverable errors
        """
        while True:
            if self.error_event.is_set():  # type: ignore[union-attr]
                logger.error("Error event set in WorkerProcessGroup")
                detail = (
                    self._worker_error_details
                    or "an error occurred in one of the worker processes"
                )
                raise RuntimeError(
                    f"error_event is set in WorkerProcessGroup: {detail}"
                )

            try:
                (
                    response,
                    request,
                    request_info,
                    scheduler_state,
                ) = await self.messaging.get(timeout=settings.mp_poll_interval)  # type: ignore[union-attr]

                yield response, request, request_info, scheduler_state
            except asyncio.TimeoutError:
                if self.shutdown_event.is_set():  # type: ignore[union-attr]
                    # Everything yielded, exit
                    break

    async def shutdown(self) -> list[Exception]:  # noqa: C901
        """
        Gracefully shut down the worker process group and clean up resources.

        Performs safe shutdown of worker processes, background tasks, and
        multiprocessing resources. Coordinates orderly termination across
        all workers and collects any exceptions encountered during shutdown.

        :return: List of exceptions encountered during shutdown; empty if no errors
        """
        exceptions: list[Exception] = []

        if self._health_monitor_task is not None:
            self._health_monitor_task.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                await self._health_monitor_task
            self._health_monitor_task = None

        if self.shutdown_event is not None:
            self.shutdown_event.set()

        # Clear out start values
        if self.messaging is not None:
            try:
                await asyncio.wait_for(self.messaging.stop(), timeout=5.0)
            except Exception as err:  # noqa: BLE001
                exceptions.append(err)
        self.messaging = None
        self.state = None

        # Clear out create processes values
        if self.processes is not None:
            for proc in self.processes:
                try:
                    await asyncio.to_thread(proc.join, timeout=5.0)
                    if proc.exitcode is not None and proc.exitcode != 0:
                        exit_info = (
                            f"signal {-proc.exitcode}"
                            if proc.exitcode < 0
                            else f"exit code {proc.exitcode}"
                        )
                        exceptions.append(
                            RuntimeError(
                                f"Worker {proc.pid} exited abnormally ({exit_info})"
                            )
                        )
                except Exception as err:  # noqa: BLE001
                    exceptions.append(err)
        self.processes = None
        self.startup_barrier = None
        self.requests_generated_event = None
        self.constraint_reached_event = None
        self.shutdown_event = None
        self.error_event = None
        if self.mp_manager is not None:
            try:
                self.mp_manager.shutdown()
            except Exception as err:  # noqa: BLE001
                exceptions.append(err)
        self.mp_manager = None
        self.mp_context = None

        return exceptions

__init__(requests, backend, strategy, **constraints)

Initialize a worker process group for distributed request processing.

Parameters:

Name Type Description Default
requests DatasetIterT[RequestT]

Finite iterable of requests to process sequentially

required
backend BackendInterface[RequestT, ResponseT]

Backend interface for processing requests

required
strategy SchedulingStrategy

Scheduling strategy for request timing and distribution

required
constraints Constraint

Named constraints for controlling execution behavior

{}
Source code in src/guidellm/scheduler/worker_group.py
def __init__(
    self,
    requests: DatasetIterT[RequestT],
    backend: BackendInterface[RequestT, ResponseT],
    strategy: SchedulingStrategy,
    **constraints: Constraint,
):
    """
    Initialize a worker process group for distributed request processing.

    :param requests: Finite iterable of requests to process sequentially
    :param backend: Backend interface for processing requests
    :param strategy: Scheduling strategy for request timing and distribution
    :param constraints: Named constraints for controlling execution behavior
    """
    self.requests = iter(requests)
    self.backend = backend
    self.strategy = strategy
    self.constraints = constraints

    # Multiprocessing contexts and primitives, created in create_processes
    self.mp_context: BaseContext | None = None
    self.mp_manager: BaseManager | None = None
    self.processes: list[BaseProcess] | None = None
    self.startup_barrier: Barrier | None = None
    self.requests_generated_event: Event | None = None
    self.constraint_reached_event: Event | None = None
    self.shutdown_event: Event | None = None
    self.error_event: Event | None = None

    # Background health monitor, created in create_processes
    self._health_monitor_task: asyncio.Task | None = None
    self._worker_error_details: str | None = None

    # Scheduler and messaging state, created in start
    self.state: WorkerGroupState[RequestT, ResponseT] | None = None
    self.messaging: WorkGroupMessengerT[RequestT, ResponseT] | None = None

create_processes() async

Create and initialize worker processes for distributed request processing.

Sets up multiprocessing infrastructure and worker processes based on strategy constraints, backend capabilities, and system configuration. Determines optimal process count and concurrency limits, then spawns worker processes with distributed request handling capabilities.

Raises:

Type Description
RuntimeError

If process initialization or startup fails

Source code in src/guidellm/scheduler/worker_group.py
async def create_processes(self):
    """
    Create and initialize worker processes for distributed request processing.

    Sets up multiprocessing infrastructure and worker processes based on
    strategy constraints, backend capabilities, and system configuration.
    Determines optimal process count and concurrency limits, then spawns
    worker processes with distributed request handling capabilities.

    :raises RuntimeError: If process initialization or startup fails
    """
    # Processes limits and params
    max_conc: int
    if (
        requests_limit := min(
            self.strategy.requests_limit or math.inf,
            self.backend.requests_limit or math.inf,
        )
    ) != math.inf:
        max_conc = int(requests_limit)
    else:
        # If concurrency not specified, use settings
        max_conc = settings.max_concurrency
    if max_conc <= 0:
        raise RuntimeError("max_concurrency resolved to 0; increase limits/config")

    # Calculate number of processes, ensure we don't exceed the max concurrency,
    # or limits from the backend, strategy, or user settings
    num_processes: int = int(
        min(
            max_conc,
            self.strategy.processes_limit or math.inf,
            self.backend.processes_limit or math.inf,
            settings.max_worker_processes,
        )
    )
    if num_processes <= 0:
        raise RuntimeError("num_processes resolved to 0; increase limits/config")

    per_proc_max_conc = max_conc // num_processes
    max_pending_size = max(
        1, math.floor(max_conc * settings.mp_max_pending_buffer_percent)
    )
    per_proc_max_buffer_size = 1

    # Initialize multiprocessing components
    self.mp_context = get_context(settings.mp_context_type)
    self.mp_manager = self.mp_context.Manager()
    self.startup_barrier = self.mp_context.Barrier(num_processes + 1)
    self.requests_generated_event = self.mp_context.Event()
    self.constraint_reached_event = self.mp_context.Event()
    self.shutdown_event = self.mp_context.Event()
    self.error_event = self.mp_context.Event()

    if settings.mp_messaging_object == "queue":
        self.messaging = InterProcessMessagingQueue(
            mp_context=self.mp_context,
            serialization=settings.mp_serialization,
            encoding=settings.mp_encoding,
            max_pending_size=max_pending_size,
            max_buffer_send_size=settings.mp_requests_send_buffer_size,
            poll_interval=settings.mp_poll_interval,
        )
    elif settings.mp_messaging_object == "manager_queue":
        self.messaging = InterProcessMessagingManagerQueue(
            manager=self.mp_manager,
            mp_context=self.mp_context,
            serialization=settings.mp_serialization,
            encoding=settings.mp_encoding,
            max_pending_size=max_pending_size,
            max_buffer_send_size=settings.mp_requests_send_buffer_size,
            poll_interval=settings.mp_poll_interval,
        )
    elif settings.mp_messaging_object == "pipe":
        self.messaging = InterProcessMessagingPipe(
            num_workers=num_processes,
            mp_context=self.mp_context,
            serialization=settings.mp_serialization,
            encoding=settings.mp_encoding,
            max_pending_size=max_pending_size,
            max_buffer_send_size=settings.mp_requests_send_buffer_size,
            poll_interval=settings.mp_poll_interval,
        )

    # Initialize worker processes
    self.processes = []
    self.strategy.init_processes_timings(
        worker_count=num_processes,
        max_concurrency=max_conc,
        mp_context=self.mp_context,
    )
    for rank in range(num_processes):
        # Distribute any remainder across the first N ranks
        async_limit = per_proc_max_conc + (
            1 if rank < (max_conc % num_processes) else 0
        )

        worker = WorkerProcess[RequestT, ResponseT](
            worker_index=rank,
            messaging=self.messaging.create_worker_copy(  # type: ignore[arg-type]
                worker_index=rank,
                max_buffer_send_size=None,
                max_buffer_receive_size=per_proc_max_buffer_size,
            ),  # The non-group worker lacks the SchedulerState type. Type err.
            backend=self.backend,
            strategy=self.strategy,
            async_limit=async_limit,
            fut_scheduling_time_limit=0.0,
            startup_barrier=self.startup_barrier,
            requests_generated_event=self.requests_generated_event,
            constraint_reached_event=self.constraint_reached_event,
            shutdown_event=self.shutdown_event,
            error_event=self.error_event,
        )
        proc = self.mp_context.Process(target=worker.run, daemon=False)
        proc.start()
        self.processes.append(proc)

    self._health_monitor_task = asyncio.create_task(self._process_health_monitor())

    wait_key = await wait_for_sync_objects(
        {
            "startup_barrier": self.startup_barrier,
            "shutdown_event": self.shutdown_event,
            "error_event": self.error_event,
        },
        poll_interval=settings.mp_poll_interval,
    )

    if wait_key == "error_event":
        detail = self._worker_error_details or "error_event is set"
        raise RuntimeError(f"Worker process group startup failed: {detail}")

request_updates() async

Yield request processing updates as they become available.

Returns an async iterator of request updates including response, request, request scheduling info, and scheduler state. Updates occur on request queued, processing start, and completion. Response is None until processing completes.

Returns:

Type Description
AsyncIterator[tuple[ResponseT | None, RequestT, RequestInfo, SchedulerState]]

Async iterator yielding (response, request, request_info, state) tuples where response is None until processing is complete

Raises:

Type Description
RuntimeError

If workers encounter unrecoverable errors

Source code in src/guidellm/scheduler/worker_group.py
async def request_updates(
    self,
) -> AsyncIterator[
    tuple[
        ResponseT | None,
        RequestT,
        RequestInfo,
        SchedulerState,
    ]
]:
    """
    Yield request processing updates as they become available.

    Returns an async iterator of request updates including response, request,
    request scheduling info, and scheduler state. Updates occur on request queued,
    processing start, and completion. Response is None until processing completes.

    :return: Async iterator yielding (response, request, request_info, state)
        tuples where response is None until processing is complete
    :raises RuntimeError: If workers encounter unrecoverable errors
    """
    while True:
        if self.error_event.is_set():  # type: ignore[union-attr]
            logger.error("Error event set in WorkerProcessGroup")
            detail = (
                self._worker_error_details
                or "an error occurred in one of the worker processes"
            )
            raise RuntimeError(
                f"error_event is set in WorkerProcessGroup: {detail}"
            )

        try:
            (
                response,
                request,
                request_info,
                scheduler_state,
            ) = await self.messaging.get(timeout=settings.mp_poll_interval)  # type: ignore[union-attr]

            yield response, request, request_info, scheduler_state
        except asyncio.TimeoutError:
            if self.shutdown_event.is_set():  # type: ignore[union-attr]
                # Everything yielded, exit
                break

shutdown() async

Gracefully shut down the worker process group and clean up resources.

Performs safe shutdown of worker processes, background tasks, and multiprocessing resources. Coordinates orderly termination across all workers and collects any exceptions encountered during shutdown.

Returns:

Type Description
list[Exception]

List of exceptions encountered during shutdown; empty if no errors

Source code in src/guidellm/scheduler/worker_group.py
async def shutdown(self) -> list[Exception]:  # noqa: C901
    """
    Gracefully shut down the worker process group and clean up resources.

    Performs safe shutdown of worker processes, background tasks, and
    multiprocessing resources. Coordinates orderly termination across
    all workers and collects any exceptions encountered during shutdown.

    :return: List of exceptions encountered during shutdown; empty if no errors
    """
    exceptions: list[Exception] = []

    if self._health_monitor_task is not None:
        self._health_monitor_task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await self._health_monitor_task
        self._health_monitor_task = None

    if self.shutdown_event is not None:
        self.shutdown_event.set()

    # Clear out start values
    if self.messaging is not None:
        try:
            await asyncio.wait_for(self.messaging.stop(), timeout=5.0)
        except Exception as err:  # noqa: BLE001
            exceptions.append(err)
    self.messaging = None
    self.state = None

    # Clear out create processes values
    if self.processes is not None:
        for proc in self.processes:
            try:
                await asyncio.to_thread(proc.join, timeout=5.0)
                if proc.exitcode is not None and proc.exitcode != 0:
                    exit_info = (
                        f"signal {-proc.exitcode}"
                        if proc.exitcode < 0
                        else f"exit code {proc.exitcode}"
                    )
                    exceptions.append(
                        RuntimeError(
                            f"Worker {proc.pid} exited abnormally ({exit_info})"
                        )
                    )
            except Exception as err:  # noqa: BLE001
                exceptions.append(err)
    self.processes = None
    self.startup_barrier = None
    self.requests_generated_event = None
    self.constraint_reached_event = None
    self.shutdown_event = None
    self.error_event = None
    if self.mp_manager is not None:
        try:
            self.mp_manager.shutdown()
        except Exception as err:  # noqa: BLE001
            exceptions.append(err)
    self.mp_manager = None
    self.mp_context = None

    return exceptions

start(start_time) async

Begin request processing at the specified start time.

Initializes scheduler state and background tasks, then waits until the specified start time before beginning operations. Sets up inter-process communication and coordinates synchronized startup across all workers.

Parameters:

Name Type Description Default
start_time float

Unix timestamp when processing should begin

required

Raises:

Type Description
RuntimeError

If workers encounter errors during startup or if create_processes() was not called first

Source code in src/guidellm/scheduler/worker_group.py
async def start(self, start_time: float):
    """
    Begin request processing at the specified start time.

    Initializes scheduler state and background tasks, then waits until the
    specified start time before beginning operations. Sets up inter-process
    communication and coordinates synchronized startup across all workers.

    :param start_time: Unix timestamp when processing should begin
    :raises RuntimeError: If workers encounter errors during startup or
        if create_processes() was not called first
    """
    if (
        not self.processes
        or not self.requests_generated_event
        or not self.constraint_reached_event
        or not self.shutdown_event
        or not self.error_event
        or not self.messaging
    ):
        raise RuntimeError("create_processes() must be called before start()")

    self.strategy.init_processes_start(start_time=start_time)
    stop_send_requests_event = threading.Event()
    send_requests_stopped_event = threading.Event()
    self.state = WorkerGroupState[RequestT, ResponseT](
        start_time=start_time,
        processes=self.processes,
        strategy=self.strategy,
        constraints=self.constraints,
        stop_send_requests_event=stop_send_requests_event,
        send_requests_stopped_event=send_requests_stopped_event,
        requests_generated_event=self.requests_generated_event,
        constraint_reached_event=self.constraint_reached_event,
        shutdown_event=self.shutdown_event,
        error_event=self.error_event,
        messaging=self.messaging,
    )
    await self.messaging.start(
        send_items=self.state.requests_generator(self.requests),
        receive_callback=self.state.received_callback,
        send_stopped_event=send_requests_stopped_event,
        send_stop_criteria=[stop_send_requests_event],
        receive_stop_criteria=[self.shutdown_event],
    )

    if (wait_time := start_time - time.time()) > 0:
        await asyncio.sleep(wait_time)
    if self.error_event.is_set():
        detail = (
            self._worker_error_details
            or "an error occurred in one of the worker processes"
        )
        raise RuntimeError(f"error_event is set in WorkerProcessGroup: {detail}")