Skip to content

Core API Reference

ParallelBatchProcessor

async_batch_llm.ParallelBatchProcessor

ParallelBatchProcessor(max_workers: int | None = None, post_processor: PostProcessorFunc[TOutput, TContext] | None = None, timeout_per_item: float | None = None, rate_limit_cooldown: float | None = None, config: ProcessorConfig | None = None, error_classifier: ErrorClassifier | None = None, rate_limit_strategy: RateLimitStrategy | None = None, middlewares: list[Middleware[TInput, TOutput, TContext]] | None = None, observers: list[ProcessorObserver] | None = None, progress_callback: ProgressCallbackFunc | None = None)

Bases: BatchProcessor[TInput, TOutput, TContext], Generic[TInput, TOutput, TContext]

Batch processor that executes items in parallel as individual agent calls.

This refactored version uses: - Pluggable error classification (provider-agnostic) - Pluggable rate limit strategies - Middleware pipeline for extensibility - Observer pattern for monitoring - Configuration objects for easier setup

Initialize the parallel batch processor.

Parameters:

Name Type Description Default
max_workers int | None

Maximum concurrent workers (deprecated, use config)

None
post_processor PostProcessorFunc[TOutput, TContext] | None

Optional async function called after each successful item

None
timeout_per_item float | None

Timeout per item in seconds (deprecated, use config)

None
rate_limit_cooldown float | None

Cooldown duration (deprecated, use config)

None
config ProcessorConfig | None

Processor configuration object (recommended)

None
error_classifier ErrorClassifier | None

Strategy for classifying errors (default: DefaultErrorClassifier)

None
rate_limit_strategy RateLimitStrategy | None

Strategy for handling rate limits

None
middlewares list[Middleware[TInput, TOutput, TContext]] | None

List of middleware to apply

None
observers list[ProcessorObserver] | None

List of observers for events

None
progress_callback ProgressCallbackFunc | None

Optional callback(completed, total, current_item_id) for progress updates

None
Source code in src/async_batch_llm/parallel.py
def __init__(
    self,
    max_workers: int | None = None,
    post_processor: PostProcessorFunc[TOutput, TContext] | None = None,
    timeout_per_item: float | None = None,
    rate_limit_cooldown: float | None = None,  # Deprecated, use config
    # New parameters
    config: ProcessorConfig | None = None,
    error_classifier: ErrorClassifier | None = None,
    rate_limit_strategy: RateLimitStrategy | None = None,
    middlewares: list[Middleware[TInput, TOutput, TContext]] | None = None,
    observers: list[ProcessorObserver] | None = None,
    progress_callback: "ProgressCallbackFunc | None" = None,
):
    """
    Initialize the parallel batch processor.

    Args:
        max_workers: Maximum concurrent workers (deprecated, use config)
        post_processor: Optional async function called after each successful item
        timeout_per_item: Timeout per item in seconds (deprecated, use config)
        rate_limit_cooldown: Cooldown duration (deprecated, use config)
        config: Processor configuration object (recommended)
        error_classifier: Strategy for classifying errors (default: DefaultErrorClassifier)
        rate_limit_strategy: Strategy for handling rate limits
        middlewares: List of middleware to apply
        observers: List of observers for events
        progress_callback: Optional callback(completed, total, current_item_id) for progress updates
    """
    import warnings

    # Emit deprecation warnings for legacy parameters
    if max_workers is not None:
        warnings.warn(
            "The 'max_workers' parameter is deprecated. "
            "Use ProcessorConfig(max_workers=...) instead.",
            DeprecationWarning,
            stacklevel=2,
        )
    if timeout_per_item is not None:
        warnings.warn(
            "The 'timeout_per_item' parameter is deprecated. "
            "Use ProcessorConfig(timeout_per_item=...) instead.",
            DeprecationWarning,
            stacklevel=2,
        )
    if rate_limit_cooldown is not None:
        warnings.warn(
            "The 'rate_limit_cooldown' parameter is deprecated. "
            "Use ProcessorConfig(rate_limit=RateLimitConfig(cooldown_seconds=...)) instead.",
            DeprecationWarning,
            stacklevel=2,
        )

    # Handle backward compatibility
    if config is None:
        from .core import RateLimitConfig

        config = ProcessorConfig(
            max_workers=max_workers or 5,
            timeout_per_item=timeout_per_item or 120.0,
            rate_limit=RateLimitConfig(cooldown_seconds=rate_limit_cooldown or 300.0),
        )
    else:
        # Override config with explicit parameters if provided
        if max_workers is not None:
            config.max_workers = max_workers
        if timeout_per_item is not None:
            config.timeout_per_item = timeout_per_item
        if rate_limit_cooldown is not None:
            config.rate_limit.cooldown_seconds = rate_limit_cooldown

    config.validate()

    super().__init__(
        config.max_workers,
        post_processor,
        max_queue_size=config.max_queue_size,
        progress_callback=progress_callback,
        progress_callback_timeout=config.progress_callback_timeout,
    )
    self.config = config

    # Set up strategies
    self.error_classifier = error_classifier or DefaultErrorClassifier()
    self.rate_limit_strategy = rate_limit_strategy or ExponentialBackoffStrategy(
        initial_cooldown=config.rate_limit.cooldown_seconds,
        backoff_multiplier=config.rate_limit.backoff_multiplier,
        slow_start_items=config.rate_limit.slow_start_items,
        slow_start_initial_delay=config.rate_limit.slow_start_initial_delay,
        slow_start_final_delay=config.rate_limit.slow_start_final_delay,
    )

    # Set up middleware and observers
    self.middlewares = middlewares or []
    self.observers = observers or []

    # Event + middleware dispatch. Delegates observer emits and the
    # middleware chain (before/after/on_error) to a stateless helper.
    self._events: EventDispatcher[TInput, TOutput, TContext] = EventDispatcher(
        observers=self.observers, middlewares=self.middlewares
    )

    # Rate-limit coordination (extracted in v0.7.0).
    self._rate_limit_coord = RateLimitCoordinator(
        rate_limit_strategy=self.rate_limit_strategy,
        events=self._events,
    )
    # Back-compat aliases — existing private methods and some tests
    # reach into these attributes directly.
    self._rate_limit_event = self._rate_limit_coord._rate_limit_event
    self._current_generation_event = self._rate_limit_coord._current_generation_event
    self._rate_limit_lock = self._rate_limit_coord._lock

    # Thread safety locks (stats + results remain processor-owned).
    self._stats_lock = asyncio.Lock()
    self._results_lock = asyncio.Lock()

    # Strategy lifecycle management (v0.2.0, extracted in v0.7.0).
    # Tracks prepared strategies via a WeakSet so sharing one instance
    # across work items invokes prepare() exactly once.
    self._strategy_lifecycle: StrategyLifecycle[TOutput] = StrategyLifecycle()
    # Back-compat aliases used by existing private methods and tests.
    self._prepared_strategies = self._strategy_lifecycle._prepared
    self._strategy_lock = self._strategy_lifecycle._lock

    # Proactive rate limiting (prevents hitting rate limits)
    if config.max_requests_per_minute:
        from aiolimiter import AsyncLimiter

        # aiolimiter doesn't have explicit burst_size - it uses max_rate as burst capacity
        # To support burst_size, we'd need to use max_rate + burst_size
        # For now, we use max_rate directly (no additional burst)
        self._proactive_rate_limiter: AsyncLimiter | None = AsyncLimiter(
            max_rate=config.max_requests_per_minute,
            time_period=60,  # per minute
        )
    else:
        self._proactive_rate_limiter = None

    # Centralized token-usage extraction across all exception shapes.
    self._token_extractor = TokenExtractor()

__aexit__ async

__aexit__(exc_type, exc_val, exc_tb)

Context manager exit - ensures cleanup of strategies and resources.

Calls cleanup() on all prepared strategies, then delegates to parent cleanup.

Parameters:

Name Type Description Default
exc_type

Exception type (if any exception occurred)

required
exc_val

Exception value (if any exception occurred)

required
exc_tb

Exception traceback (if any exception occurred)

required

Returns:

Type Description

False to indicate exceptions should not be suppressed

Source code in src/async_batch_llm/parallel.py
async def __aexit__(self, exc_type, exc_val, exc_tb):
    """
    Context manager exit - ensures cleanup of strategies and resources.

    Calls cleanup() on all prepared strategies, then delegates to parent cleanup.

    Args:
        exc_type: Exception type (if any exception occurred)
        exc_val: Exception value (if any exception occurred)
        exc_tb: Exception traceback (if any exception occurred)

    Returns:
        False to indicate exceptions should not be suppressed
    """
    await self._cleanup_strategies()

    # Call parent cleanup to handle workers and queue
    await self.cleanup()
    return False  # Don't suppress exceptions

get_stats async

get_stats() -> dict

Get processor statistics (thread-safe).

Returns:

Type Description
dict

Dictionary containing processing statistics including:

dict
  • processed: Number of items processed
dict
  • succeeded: Number of successful items
dict
  • failed: Number of failed items
dict
  • rate_limit_count: Number of rate limit errors encountered
dict
  • error_counts: Dictionary of error types and their counts
dict
  • total: Total number of items queued
dict
  • start_time: Timestamp when processing started
Source code in src/async_batch_llm/parallel.py
async def get_stats(self) -> dict:
    """
    Get processor statistics (thread-safe).

    Returns:
        Dictionary containing processing statistics including:
        - processed: Number of items processed
        - succeeded: Number of successful items
        - failed: Number of failed items
        - rate_limit_count: Number of rate limit errors encountered
        - error_counts: Dictionary of error types and their counts
        - total: Total number of items queued
        - start_time: Timestamp when processing started
    """
    async with self._stats_lock:
        return self._stats.copy()

shutdown async

shutdown()

Clean up resources: flush observers and cancel pending tasks.

Source code in src/async_batch_llm/parallel.py
async def shutdown(self):
    """Clean up resources: flush observers and cancel pending tasks."""
    await self._cleanup_strategies()
    await self.cleanup()

LLMWorkItem

async_batch_llm.LLMWorkItem dataclass

LLMWorkItem(item_id: str, strategy: LLMCallStrategy[TOutput], prompt: str = '', context: TContext | None = None)

Bases: Generic[TInput, TOutput, TContext]

Represents a single work item to be processed by an LLM strategy.

Attributes:

Name Type Description
item_id str

Unique identifier for this work item

strategy LLMCallStrategy[TOutput]

LLM call strategy that encapsulates how to make the LLM call

prompt str

The prompt/input to pass to the LLM

context TContext | None

Optional context data passed through to results/post-processor

__post_init__

__post_init__()

Validate work item fields.

Source code in src/async_batch_llm/base.py
def __post_init__(self):
    """Validate work item fields."""
    if not self.item_id or not isinstance(self.item_id, str):
        raise ValueError(
            f"item_id must be a non-empty string (got {type(self.item_id).__name__}: {repr(self.item_id)}). "
            f"Provide a unique string identifier for this work item."
        )
    if not self.item_id.strip():
        raise ValueError(
            f"item_id cannot be whitespace only (got {repr(self.item_id)}). "
            f"Provide a non-whitespace string identifier."
        )
    if self.strategy is None:
        raise ValueError(
            "strategy must not be None. "
            "Pass an LLMCallStrategy instance (e.g., PydanticAIStrategy, GeminiStrategy, "
            "or your custom subclass)."
        )
    if not isinstance(self.prompt, str):
        raise TypeError(
            f"prompt must be a string (got {type(self.prompt).__name__}: {repr(self.prompt)[:80]}). "
            f"If you need to pass structured data, serialize it to a string first."
        )

WorkItemResult

async_batch_llm.WorkItemResult dataclass

WorkItemResult(item_id: str, success: bool, output: TOutput | None = None, error: str | None = None, context: TContext | None = None, token_usage: TokenUsage = (lambda: {'input_tokens': 0, 'output_tokens': 0, 'total_tokens': 0})(), gemini_safety_ratings: dict[str, str] | None = None)

Bases: Generic[TOutput, TContext]

Result of processing a single work item.

Attributes:

Name Type Description
item_id str

ID of the work item

success bool

Whether processing succeeded

output TOutput | None

Agent output if successful, None if failed

error str | None

Error message if failed, None if successful

context TContext | None

Context data from the work item

token_usage TokenUsage

Token usage stats (input_tokens, output_tokens, total_tokens)

gemini_safety_ratings dict[str, str] | None

Gemini API safety ratings if available

ProcessorConfig

async_batch_llm.ProcessorConfig dataclass

ProcessorConfig(max_workers: int = 5, timeout_per_item: float = 120.0, post_processor_timeout: float = 90.0, retry: RetryConfig = RetryConfig(), rate_limit: RateLimitConfig = RateLimitConfig(), max_requests_per_minute: float | None = None, progress_interval: int = 10, progress_callback_timeout: float | None = 5.0, enable_detailed_logging: bool = False, max_queue_size: int = 0, dry_run: bool = False)

Complete configuration for batch processor.

__post_init__

__post_init__() -> None

Validate configuration on construction.

Source code in src/async_batch_llm/core/config.py
def __post_init__(self) -> None:
    """Validate configuration on construction."""
    self.validate()

validate

validate() -> None

Validate complete configuration.

Source code in src/async_batch_llm/core/config.py
def validate(self) -> None:
    """Validate complete configuration."""
    if self.max_workers < 1:
        raise ValueError(
            f"max_workers must be >= 1 (got {self.max_workers}). "
            f"Set config.max_workers to a positive integer (typical: 5-20)."
        )
    if self.timeout_per_item <= 0:
        raise ValueError(
            f"timeout_per_item must be > 0 (got {self.timeout_per_item}). "
            f"Set config.timeout_per_item to a positive number in seconds (typical: 60-300)."
        )
    if self.post_processor_timeout <= 0:
        raise ValueError(
            f"post_processor_timeout must be > 0 (got {self.post_processor_timeout}). "
            f"Set config.post_processor_timeout to a positive number in seconds (typical: 30-120)."
        )
    if self.progress_interval < 1:
        raise ValueError(
            f"progress_interval must be >= 1 (got {self.progress_interval}). "
            f"Set config.progress_interval to a positive integer."
        )
    if self.progress_callback_timeout is not None and self.progress_callback_timeout <= 0:
        raise ValueError(
            f"progress_callback_timeout must be > 0 (got {self.progress_callback_timeout}). "
            f"Set config.progress_callback_timeout to None to disable or a positive number of seconds."
        )
    if self.max_queue_size < 0:
        raise ValueError(
            f"max_queue_size must be >= 0 (got {self.max_queue_size}). "
            f"Set config.max_queue_size to 0 for unlimited, or a positive number to limit queue size."
        )
    if self.max_requests_per_minute is not None and self.max_requests_per_minute <= 0:
        raise ValueError(
            f"max_requests_per_minute must be > 0 or None (got {self.max_requests_per_minute}). "
            f"Set config.max_requests_per_minute to None to disable proactive rate limiting, "
            f"or a positive number (typical: 10-500 requests/minute)."
        )

    # Validate nested configs first
    self.retry.validate()
    self.rate_limit.validate()

    # Cross-field validations
    if self.max_queue_size > 0 and self.max_queue_size < self.max_workers:
        logger.warning(
            f"max_queue_size ({self.max_queue_size}) is less than max_workers ({self.max_workers}). "
            f"This may cause workers to starve waiting for work. "
            f"Consider setting max_queue_size >= max_workers or 0 for unlimited."
        )

    if self.timeout_per_item < self.retry.initial_wait:
        logger.warning(
            f"timeout_per_item ({self.timeout_per_item}s) is less than "
            f"retry.initial_wait ({self.retry.initial_wait}s). "
            f"This means the timeout may occur before the first retry delay completes. "
            f"Consider increasing timeout_per_item or decreasing retry.initial_wait."
        )

    # Calculate maximum possible retry wait time
    max_total_retry_wait = 0.0
    for attempt in range(self.retry.max_attempts - 1):  # -1 because first attempt has no wait
        wait_time = min(
            self.retry.initial_wait * (self.retry.exponential_base**attempt),
            self.retry.max_wait,
        )
        max_total_retry_wait += wait_time

    if max_total_retry_wait > 0 and self.timeout_per_item < max_total_retry_wait * 0.5:
        jitter_note = (
            " (with jitter, actual delays will be 50-100% of this)" if self.retry.jitter else ""
        )
        logger.warning(
            f"timeout_per_item ({self.timeout_per_item}s) may be too short for retry strategy. "
            f"With {self.retry.max_attempts} attempts, retry delays could total up to "
            f"{max_total_retry_wait:.1f}s{jitter_note}. "
            f"Consider increasing timeout_per_item to at least {max_total_retry_wait * 2:.1f}s."
        )

    # Validate proactive rate limit vs workers
    if self.max_requests_per_minute is not None:
        requests_per_second = self.max_requests_per_minute / 60.0
        if requests_per_second < self.max_workers:
            logger.warning(
                f"max_requests_per_minute ({self.max_requests_per_minute}) is less than "
                f"max_workers ({self.max_workers}). "
                f"At {requests_per_second:.2f} requests/second with {self.max_workers} workers, "
                f"workers may frequently wait for rate limit tokens. "
                f"Consider reducing max_workers to {int(requests_per_second)} or increasing "
                f"max_requests_per_minute."
            )

BatchResult

async_batch_llm.BatchResult dataclass

BatchResult(results: list[WorkItemResult[TOutput, TContext]], total_items: int = 0, succeeded: int = 0, failed: int = 0, total_input_tokens: int = 0, total_output_tokens: int = 0, total_cached_tokens: int = 0)

Bases: Generic[TOutput, TContext]

Result of processing a batch of work items.

Attributes:

Name Type Description
results list[WorkItemResult[TOutput, TContext]]

List of individual work item results

total_items int

Total number of items in the batch

succeeded int

Number of successful items

failed int

Number of failed items

total_input_tokens int

Sum of input tokens across all items

total_output_tokens int

Sum of output tokens across all items

total_cached_tokens int

Sum of cached input tokens across all items (v0.2.0)

__post_init__

__post_init__()

Calculate summary statistics from results.

Source code in src/async_batch_llm/base.py
def __post_init__(self):
    """Calculate summary statistics from results."""
    self.total_items = len(self.results)
    self.succeeded = sum(1 for r in self.results if r.success)
    self.failed = sum(1 for r in self.results if not r.success)
    self.total_input_tokens = sum(r.token_usage.get("input_tokens", 0) for r in self.results)
    self.total_output_tokens = sum(r.token_usage.get("output_tokens", 0) for r in self.results)
    # v0.2.0: Aggregate cached tokens
    self.total_cached_tokens = sum(
        r.token_usage.get("cached_input_tokens", 0) for r in self.results
    )

cache_hit_rate

cache_hit_rate() -> float

Calculate cache hit rate as percentage of input tokens that were cached.

Returns:

Type Description
float

Percentage (0.0 to 100.0) of input tokens served from cache

Source code in src/async_batch_llm/base.py
def cache_hit_rate(self) -> float:
    """
    Calculate cache hit rate as percentage of input tokens that were cached.

    Returns:
        Percentage (0.0 to 100.0) of input tokens served from cache
    """
    if self.total_input_tokens == 0:
        return 0.0
    return (self.total_cached_tokens / self.total_input_tokens) * 100.0

effective_input_tokens

effective_input_tokens() -> int

Calculate effective input tokens (actual cost after caching).

Gemini charges 10% of the normal price for cached tokens.

Returns:

Type Description
int

Effective number of input tokens billed

Source code in src/async_batch_llm/base.py
def effective_input_tokens(self) -> int:
    """
    Calculate effective input tokens (actual cost after caching).

    Gemini charges 10% of the normal price for cached tokens.

    Returns:
        Effective number of input tokens billed
    """
    # Cached tokens cost 10% of normal, so discount is 90%
    discount = int(self.total_cached_tokens * 0.9)
    return self.total_input_tokens - discount