Skip to content

guidellm.data.preprocessors

GenerativeColumnMapper

Bases: DataDependentPreprocessor

Source code in src/guidellm/data/preprocessors/mappers.py
@PreprocessorRegistry.register("generative_column_mapper")
class GenerativeColumnMapper(DataDependentPreprocessor):
    defaults: ClassVar[dict[str, list[str]]] = {
        "prompt_tokens_count_column": ["prompt_tokens_count", "input_tokens_count"],
        "output_tokens_count_column": [
            "output_tokens_count",
            "completion_tokens_count",
        ],
        "prefix_column": [
            "system_prompt",
            "system",
            "prefix",
        ],
        "text_column": [
            "prompt",
            "instruction",
            "question",
            "input",
            "context",
            "content",
            "conversation",
            "turn",
            "text",
        ],
        "image_column": [
            "image",
            "picture",
            "photo",
            "img",
        ],
        "video_column": [
            "video",
            "clip",
            "movie",
            "footage",
            "mp4",
            "mov",
            "avi",
        ],
        "audio_column": [
            "audio",
            "sound",
            "voice",
            "speech",
            "wav",
            "mp3",
        ],
        "tools_column": [
            "tools",
            "functions",
            "tool_definitions",
        ],
        "tool_response_column": [
            "tool_response",
            "tool_result",
            "tool_output",
        ],
        "relative_timestamp_column": ["relative_timestamp"],
    }
    column_name_pattern: str = (
        r"^(?P<full_name>(?P<match_name>({name})(es|s)?)([-_](?P<turn>\d+))?)$"
    )

    @staticmethod
    def _filter_for_dataset(names: list[str], *dataset_names: str) -> list[str]:
        filtered_names: list[str] = []
        for name in names:
            if "." in name:
                dataset_part, column_part = name.split(".", 1)
                if dataset_part in dataset_names:
                    filtered_names.append(column_part)
            else:
                filtered_names.append(name)

        return filtered_names

    @staticmethod
    def _extract_turn_columns(
        turn_pattern: str, columns_str: str
    ) -> list[tuple[int, str]]:
        # Now find all columns that match a variant of the base name
        turn_matches = re.finditer(turn_pattern, columns_str, re.M | re.I)

        turn_columns: list[tuple[int, str]] = []
        turn_count = 0
        for match in turn_matches:
            column_name = match.group("full_name")
            if not column_name:
                continue

            turn_str = match.group("turn")
            turn = int(turn_str) if turn_str is not None else turn_count
            turn_columns.append((turn, column_name))
            turn_count += 1

        return turn_columns

    @classmethod
    def datasets_mappings(
        cls,
        datasets: list[Dataset | IterableDataset],
        input_mappings: dict[str, str | list[str]] | None = None,
    ) -> dict[DatasetColumnKey, list[DatasetColumnValue]]:
        """
        Resolve column mappings across one or more datasets.

        For each dataset, matches actual column names against the requested
        mapping names (or :attr:`defaults`) using regex patterns that account
        for pluralisation and turn suffixes (e.g. ``prompt-0``, ``prompt-1``).

        :param datasets: The loaded datasets to inspect for column names.
        :param input_mappings: Optional explicit column mappings. When ``None``,
            :attr:`defaults` is used. Values may be a single name or a list of
            candidate names in priority order.
        :return: A dict keyed by ``(column_type, turn_index)`` whose values are
            lists of ``(dataset_index, column_name)`` pairs indicating where
            each logical column can be found. Categories with no matching
            columns are silently omitted from the result.
        """
        mappings: dict[DatasetColumnKey, list[DatasetColumnValue]] = defaultdict(list)
        input_map: dict[str, list[str]] = cls.defaults
        if input_mappings:
            input_map = {
                k: v if isinstance(v, list) else [v] for k, v in input_mappings.items()
            }

        for index, dataset in enumerate(datasets):
            dataset_name = (
                dataset.info.dataset_name
                if dataset.info and dataset.info.dataset_name
                else index
            )
            dataset_columns = dataset.column_names or list(next(iter(dataset)).keys())
            dataset_columns_str = "\n".join(dataset_columns)

            for column_type, names in input_map.items():
                filtered_names = cls._filter_for_dataset(
                    names, str(index), str(dataset_name)
                )
                if not filtered_names:
                    continue

                column_pattern = cls.column_name_pattern.format(
                    name="|".join(re.escape(n) for n in filtered_names)
                )
                # Find the first matching column name
                base_match = re.search(column_pattern, dataset_columns_str, re.M | re.I)
                if not base_match:
                    continue

                turn_pattern = cls.column_name_pattern.format(
                    name=base_match.group("match_name"),
                )
                turn_columns = cls._extract_turn_columns(
                    turn_pattern,
                    dataset_columns_str,
                )

                for turn, column_name in sorted(turn_columns):
                    column_type = cast("GenerativeDatasetColumnType", column_type)
                    mappings[(column_type, turn)].append((index, column_name))

        return mappings

    def __init__(
        self,
        config: GenerativeColumnMapperArgs,
    ):
        self.input_mappings = config.column_mappings
        self.datasets_column_mappings: (
            dict[DatasetColumnKey, list[DatasetColumnValue]] | None
        )

    def __call__(self, items: list[dict[str, Any]]) -> list[dict[str, list[Any]]]:
        if self.datasets_column_mappings is None:
            raise ValueError("DefaultGenerativeColumnMapper not setup with data.")

        mapped: list[dict[str, Any]] = []

        for (column_type, turn), column_mappings in sorted(
            self.datasets_column_mappings.items()
        ):
            # Ensure the mapped list has enough turns for this turn
            # Should never need to happen
            while len(mapped) <= turn:
                mapped.append(defaultdict(list))

            for (
                dataset_index,
                dataset_column,
            ) in column_mappings:
                mapped[turn][column_type].append(
                    items[dataset_index]["dataset"][dataset_column]
                )

        return [dict(m) for m in mapped if len(m) > 0]

    def setup_data(
        self,
        datasets: list[DatasetType],
    ):
        self.datasets_column_mappings = self.datasets_mappings(
            datasets, self.input_mappings
        )

        if not self.datasets_column_mappings:
            raise ValueError(
                "GenerativeColumnMapper found no matching columns. "
                f"Requested mappings: {self.input_mappings or 'default mappings'}. "
                "Every row will produce an empty result."
            )

datasets_mappings(datasets, input_mappings=None) classmethod

Resolve column mappings across one or more datasets.

For each dataset, matches actual column names against the requested mapping names (or :attr:defaults) using regex patterns that account for pluralisation and turn suffixes (e.g. prompt-0, prompt-1).

Parameters:

Name Type Description Default
datasets list[Dataset | IterableDataset]

The loaded datasets to inspect for column names.

required
input_mappings dict[str, str | list[str]] | None

Optional explicit column mappings. When None, :attr:defaults is used. Values may be a single name or a list of candidate names in priority order.

None

Returns:

Type Description
dict[DatasetColumnKey, list[DatasetColumnValue]]

A dict keyed by (column_type, turn_index) whose values are lists of (dataset_index, column_name) pairs indicating where each logical column can be found. Categories with no matching columns are silently omitted from the result.

Source code in src/guidellm/data/preprocessors/mappers.py
@classmethod
def datasets_mappings(
    cls,
    datasets: list[Dataset | IterableDataset],
    input_mappings: dict[str, str | list[str]] | None = None,
) -> dict[DatasetColumnKey, list[DatasetColumnValue]]:
    """
    Resolve column mappings across one or more datasets.

    For each dataset, matches actual column names against the requested
    mapping names (or :attr:`defaults`) using regex patterns that account
    for pluralisation and turn suffixes (e.g. ``prompt-0``, ``prompt-1``).

    :param datasets: The loaded datasets to inspect for column names.
    :param input_mappings: Optional explicit column mappings. When ``None``,
        :attr:`defaults` is used. Values may be a single name or a list of
        candidate names in priority order.
    :return: A dict keyed by ``(column_type, turn_index)`` whose values are
        lists of ``(dataset_index, column_name)`` pairs indicating where
        each logical column can be found. Categories with no matching
        columns are silently omitted from the result.
    """
    mappings: dict[DatasetColumnKey, list[DatasetColumnValue]] = defaultdict(list)
    input_map: dict[str, list[str]] = cls.defaults
    if input_mappings:
        input_map = {
            k: v if isinstance(v, list) else [v] for k, v in input_mappings.items()
        }

    for index, dataset in enumerate(datasets):
        dataset_name = (
            dataset.info.dataset_name
            if dataset.info and dataset.info.dataset_name
            else index
        )
        dataset_columns = dataset.column_names or list(next(iter(dataset)).keys())
        dataset_columns_str = "\n".join(dataset_columns)

        for column_type, names in input_map.items():
            filtered_names = cls._filter_for_dataset(
                names, str(index), str(dataset_name)
            )
            if not filtered_names:
                continue

            column_pattern = cls.column_name_pattern.format(
                name="|".join(re.escape(n) for n in filtered_names)
            )
            # Find the first matching column name
            base_match = re.search(column_pattern, dataset_columns_str, re.M | re.I)
            if not base_match:
                continue

            turn_pattern = cls.column_name_pattern.format(
                name=base_match.group("match_name"),
            )
            turn_columns = cls._extract_turn_columns(
                turn_pattern,
                dataset_columns_str,
            )

            for turn, column_name in sorted(turn_columns):
                column_type = cast("GenerativeDatasetColumnType", column_type)
                mappings[(column_type, turn)].append((index, column_name))

    return mappings

GenerativeColumnMapperArgs

Bases: DataPreprocessorArgs

Model for generative column mapper preprocessor arguments.

Source code in src/guidellm/data/preprocessors/mappers.py
@DataPreprocessorArgs.register(
    [
        "generative_column_mapper",
        "pooling_column_mapper",
    ]
)
class GenerativeColumnMapperArgs(DataPreprocessorArgs):
    """Model for generative column mapper preprocessor arguments."""

    kind: Literal["generative_column_mapper", "pooling_column_mapper"] = Field(
        default="generative_column_mapper",
        description="Type identifier for the generative column mapper preprocessor.",
    )
    column_mappings: dict[str, str | list[str]] | None = Field(
        default=None,
        description="Mappings for the column names.",
        examples=[
            {
                "prompt_tokens_count_column": [
                    "prompt_tokens_count",
                    "input_tokens_count",
                ],
                "output_tokens_count_column": [
                    "output_tokens_count",
                    "completion_tokens_count",
                ],
            }
        ],
    )

MediaEncoderArgs

Bases: DataPreprocessorArgs

Model for media encoder preprocessor arguments.

Source code in src/guidellm/data/preprocessors/encoders.py
@DataPreprocessorArgs.register("encode_media")
class MediaEncoderArgs(DataPreprocessorArgs):
    """Model for media encoder preprocessor arguments."""

    kind: Literal["encode_media"] = Field(
        default="encode_media",
        description="Type identifier for the media encoder preprocessor.",
    )
    audio_kwargs: dict[str, Any] = Field(
        default_factory=dict,
        description="Keyword arguments for audio encoding.",
        examples=[{"format": "mp3"}],
    )
    image_kwargs: dict[str, Any] = Field(
        default_factory=dict,
        description="Keyword arguments for image encoding.",
        examples=[{"format": "jpg"}],
    )
    video_kwargs: dict[str, Any] = Field(
        default_factory=dict,
        description="Keyword arguments for video encoding.",
        examples=[{"format": "mp4"}],
    )

PreprocessorRegistry

Bases: RegistryMixin[type[DatasetPreprocessor] | type[DataDependentPreprocessor]]

Source code in src/guidellm/data/preprocessors/preprocessor.py
class PreprocessorRegistry(
    RegistryMixin[type[DatasetPreprocessor] | type[DataDependentPreprocessor]]
):
    @classmethod
    def create(cls, config: DataPreprocessorArgs) -> DatasetPreprocessor:
        """
        Factory method to create a DatasetPreprocessor instance based on configuration.

        :param config: A DataPreprocessorArgs object containing the configuration.
        """
        kind = config.kind
        preprocessor_cls = cls.get_registered_object(kind)

        if preprocessor_cls is None:
            raise ValueError(
                f"DatasetPreprocessor type '{kind}' is not registered."
                f"Available types: {list(cls.registry.keys()) if cls.registry else []}"
            )

        return preprocessor_cls(config)

create(config) classmethod

Factory method to create a DatasetPreprocessor instance based on configuration.

Parameters:

Name Type Description Default
config DataPreprocessorArgs

A DataPreprocessorArgs object containing the configuration.

required
Source code in src/guidellm/data/preprocessors/preprocessor.py
@classmethod
def create(cls, config: DataPreprocessorArgs) -> DatasetPreprocessor:
    """
    Factory method to create a DatasetPreprocessor instance based on configuration.

    :param config: A DataPreprocessorArgs object containing the configuration.
    """
    kind = config.kind
    preprocessor_cls = cls.get_registered_object(kind)

    if preprocessor_cls is None:
        raise ValueError(
            f"DatasetPreprocessor type '{kind}' is not registered."
            f"Available types: {list(cls.registry.keys()) if cls.registry else []}"
        )

    return preprocessor_cls(config)

ToolCallingMessageExtractor

Bases: DatasetPreprocessor

Extract user prompts, system prompts, and tool responses from messages.

Many tool calling datasets (e.g. madroid/glaive-function-calling-openai) store conversations as a messages column containing an array of {"role": ..., "content": ...} dicts. This preprocessor replaces the text_column value with the extracted user content, populates prefix_column with the system prompt when present, and populates tool_response_column with role: "tool" response content.

Usage::

guidellm benchmark run \
    --data madroid/glaive-function-calling-openai \
    --data-column-mapper \
        '{"text_column": "messages", "tools_column": "tools"}' \
    --data-preprocessors tool_calling_message_extractor,encode_media
Source code in src/guidellm/data/preprocessors/tool_calling.py
@PreprocessorRegistry.register("tool_calling_message_extractor")
class ToolCallingMessageExtractor(DatasetPreprocessor):
    """Extract user prompts, system prompts, and tool responses from messages.

    Many tool calling datasets (e.g. ``madroid/glaive-function-calling-openai``)
    store conversations as a ``messages`` column containing an array of
    ``{"role": ..., "content": ...}`` dicts.  This preprocessor replaces the
    ``text_column`` value with the extracted user content, populates
    ``prefix_column`` with the system prompt when present, and populates
    ``tool_response_column`` with ``role: "tool"`` response content.

    Usage::

        guidellm benchmark run \\
            --data madroid/glaive-function-calling-openai \\
            --data-column-mapper \\
                '{"text_column": "messages", "tools_column": "tools"}' \\
            --data-preprocessors tool_calling_message_extractor,encode_media
    """

    def __init__(self, **_: Any) -> None:
        pass

    def __call__(  # noqa: C901
        self, items: list[dict[str, Any]]
    ) -> list[dict[str, Any]]:
        for item in items:
            text_values = item.get("text_column")
            if not text_values or not isinstance(text_values, list):
                continue

            new_texts: list[str] = []
            prefixes: list[str] = []
            tool_responses: list[str] = []

            for value in text_values:
                if isinstance(value, list):
                    user_parts, system_parts, tool_parts = _extract_from_messages(value)
                    if user_parts:
                        new_texts.append(" ".join(user_parts))
                    if system_parts:
                        prefixes.append(" ".join(system_parts))
                    tool_responses.extend(tool_parts)
                elif isinstance(value, str):
                    new_texts.append(value)

            if new_texts:
                item["text_column"] = new_texts
            if prefixes:
                item.setdefault("prefix_column", []).extend(prefixes)
            if tool_responses:
                item.setdefault("tool_response_column", []).extend(tool_responses)

        return items

ToolCallingMessageExtractorArgs

Bases: DataPreprocessorArgs

Model for tool calling message extractor preprocessor arguments.

Source code in src/guidellm/data/preprocessors/tool_calling.py
@DataPreprocessorArgs.register("tool_calling_message_extractor")
class ToolCallingMessageExtractorArgs(DataPreprocessorArgs):
    """Model for tool calling message extractor preprocessor arguments."""

    kind: Literal["tool_calling_message_extractor"] = Field(
        default="tool_calling_message_extractor",
        description="Type identifier for the preprocessor.",
    )

TurnPivot

Bases: DatasetPreprocessor

Swaps the turn and batch dimensions in a multi-turn dataset.

Example: :: # Input: 2 turns, each with 2 batches turns = [ {"prompt": ["P1a", "P1b"], "output_tokens_count": [11, 12]}, # Turn 1 {"prompt": ["P2a", "P2b"], "output_tokens_count": [21, 22]}, # Turn 2 ]

preprocessor = TurnPivot()
turns = preprocessor(turns)

# Resulting turns:
# turns[0] = {"prompt": ["P1a", "P2a"], "output_tokens_count": [11, 21]}
# turns[1] = {"prompt": ["P1b", "P2b"], "output_tokens_count": [12, 22]}
Source code in src/guidellm/data/preprocessors/turn_pivot.py
@PreprocessorRegistry.register("turn_pivot")
class TurnPivot(DatasetPreprocessor):
    """
    Swaps the turn and batch dimensions in a multi-turn dataset.

    Example:
    ::
        # Input: 2 turns, each with 2 batches
        turns = [
            {"prompt": ["P1a", "P1b"], "output_tokens_count": [11, 12]},  # Turn 1
            {"prompt": ["P2a", "P2b"], "output_tokens_count": [21, 22]},  # Turn 2
        ]

        preprocessor = TurnPivot()
        turns = preprocessor(turns)

        # Resulting turns:
        # turns[0] = {"prompt": ["P1a", "P2a"], "output_tokens_count": [11, 21]}
        # turns[1] = {"prompt": ["P1b", "P2b"], "output_tokens_count": [12, 22]}
    """

    def __init__(
        self,
        config: TurnPivotArgs,
    ) -> None:
        self.config = config

    def __call__(self, items: list[dict[str, list[Any]]]) -> list[dict[str, list[Any]]]:
        new_turns: list[dict[str, list[Any]]] = []
        for turn in items:
            for column_name, values in turn.items():
                for i, value in enumerate(values):
                    if len(new_turns) <= i:
                        new_turns.append(defaultdict(list))
                    new_turns[i][column_name].append(value)

        return [dict(turn) for turn in new_turns]

TurnPivotArgs

Bases: DataPreprocessorArgs

Model for turn pivot preprocessor arguments.

Source code in src/guidellm/data/preprocessors/turn_pivot.py
class TurnPivotArgs(DataPreprocessorArgs):
    """Model for turn pivot preprocessor arguments."""

    kind: Literal["turn_pivot"] = Field(
        default="turn_pivot",
        description="Type identifier for the turn pivot preprocessor.",
    )