Core API Reference
ParallelBatchProcessor
async_batch_llm.ParallelBatchProcessor
ParallelBatchProcessor(max_workers: int | None = None, post_processor: PostProcessorFunc[TOutput, TContext] | None = None, timeout_per_item: float | None = None, rate_limit_cooldown: float | None = None, config: ProcessorConfig | None = None, error_classifier: ErrorClassifier | None = None, rate_limit_strategy: RateLimitStrategy | None = None, middlewares: list[Middleware[TInput, TOutput, TContext]] | None = None, observers: list[ProcessorObserver] | None = None, progress_callback: ProgressCallbackFunc | None = None)
Bases: BatchProcessor[TInput, TOutput, TContext], Generic[TInput, TOutput, TContext]
Batch processor that executes items in parallel as individual agent calls.
This refactored version uses: - Pluggable error classification (provider-agnostic) - Pluggable rate limit strategies - Middleware pipeline for extensibility - Observer pattern for monitoring - Configuration objects for easier setup
Initialize the parallel batch processor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_workers
|
int | None
|
Maximum concurrent workers (deprecated, use config) |
None
|
post_processor
|
PostProcessorFunc[TOutput, TContext] | None
|
Optional async function called after each successful item |
None
|
timeout_per_item
|
float | None
|
Timeout per item in seconds (deprecated, use config) |
None
|
rate_limit_cooldown
|
float | None
|
Cooldown duration (deprecated, use config) |
None
|
config
|
ProcessorConfig | None
|
Processor configuration object (recommended) |
None
|
error_classifier
|
ErrorClassifier | None
|
Strategy for classifying errors (default: DefaultErrorClassifier) |
None
|
rate_limit_strategy
|
RateLimitStrategy | None
|
Strategy for handling rate limits |
None
|
middlewares
|
list[Middleware[TInput, TOutput, TContext]] | None
|
List of middleware to apply |
None
|
observers
|
list[ProcessorObserver] | None
|
List of observers for events |
None
|
progress_callback
|
ProgressCallbackFunc | None
|
Optional callback(completed, total, current_item_id) for progress updates |
None
|
Source code in src/async_batch_llm/parallel.py
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 | |
__aexit__
async
Context manager exit - ensures cleanup of strategies and resources.
Calls cleanup() on all prepared strategies, then delegates to parent cleanup.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
exc_type
|
Exception type (if any exception occurred) |
required | |
exc_val
|
Exception value (if any exception occurred) |
required | |
exc_tb
|
Exception traceback (if any exception occurred) |
required |
Returns:
| Type | Description |
|---|---|
|
False to indicate exceptions should not be suppressed |
Source code in src/async_batch_llm/parallel.py
get_stats
async
Get processor statistics (thread-safe).
Returns:
| Type | Description |
|---|---|
dict
|
Dictionary containing processing statistics including: |
dict
|
|
dict
|
|
dict
|
|
dict
|
|
dict
|
|
dict
|
|
dict
|
|
Source code in src/async_batch_llm/parallel.py
LLMWorkItem
async_batch_llm.LLMWorkItem
dataclass
LLMWorkItem(item_id: str, strategy: LLMCallStrategy[TOutput], prompt: str = '', context: TContext | None = None)
Bases: Generic[TInput, TOutput, TContext]
Represents a single work item to be processed by an LLM strategy.
Attributes:
| Name | Type | Description |
|---|---|---|
item_id |
str
|
Unique identifier for this work item |
strategy |
LLMCallStrategy[TOutput]
|
LLM call strategy that encapsulates how to make the LLM call |
prompt |
str
|
The prompt/input to pass to the LLM |
context |
TContext | None
|
Optional context data passed through to results/post-processor |
__post_init__
Validate work item fields.
Source code in src/async_batch_llm/base.py
WorkItemResult
async_batch_llm.WorkItemResult
dataclass
WorkItemResult(item_id: str, success: bool, output: TOutput | None = None, error: str | None = None, context: TContext | None = None, token_usage: TokenUsage = (lambda: {'input_tokens': 0, 'output_tokens': 0, 'total_tokens': 0})(), gemini_safety_ratings: dict[str, str] | None = None)
Bases: Generic[TOutput, TContext]
Result of processing a single work item.
Attributes:
| Name | Type | Description |
|---|---|---|
item_id |
str
|
ID of the work item |
success |
bool
|
Whether processing succeeded |
output |
TOutput | None
|
Agent output if successful, None if failed |
error |
str | None
|
Error message if failed, None if successful |
context |
TContext | None
|
Context data from the work item |
token_usage |
TokenUsage
|
Token usage stats (input_tokens, output_tokens, total_tokens) |
gemini_safety_ratings |
dict[str, str] | None
|
Gemini API safety ratings if available |
ProcessorConfig
async_batch_llm.ProcessorConfig
dataclass
ProcessorConfig(max_workers: int = 5, timeout_per_item: float = 120.0, post_processor_timeout: float = 90.0, retry: RetryConfig = RetryConfig(), rate_limit: RateLimitConfig = RateLimitConfig(), max_requests_per_minute: float | None = None, progress_interval: int = 10, progress_callback_timeout: float | None = 5.0, enable_detailed_logging: bool = False, max_queue_size: int = 0, dry_run: bool = False)
Complete configuration for batch processor.
__post_init__
validate
Validate complete configuration.
Source code in src/async_batch_llm/core/config.py
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 | |
BatchResult
async_batch_llm.BatchResult
dataclass
BatchResult(results: list[WorkItemResult[TOutput, TContext]], total_items: int = 0, succeeded: int = 0, failed: int = 0, total_input_tokens: int = 0, total_output_tokens: int = 0, total_cached_tokens: int = 0)
Bases: Generic[TOutput, TContext]
Result of processing a batch of work items.
Attributes:
| Name | Type | Description |
|---|---|---|
results |
list[WorkItemResult[TOutput, TContext]]
|
List of individual work item results |
total_items |
int
|
Total number of items in the batch |
succeeded |
int
|
Number of successful items |
failed |
int
|
Number of failed items |
total_input_tokens |
int
|
Sum of input tokens across all items |
total_output_tokens |
int
|
Sum of output tokens across all items |
total_cached_tokens |
int
|
Sum of cached input tokens across all items (v0.2.0) |
__post_init__
Calculate summary statistics from results.
Source code in src/async_batch_llm/base.py
cache_hit_rate
Calculate cache hit rate as percentage of input tokens that were cached.
Returns:
| Type | Description |
|---|---|
float
|
Percentage (0.0 to 100.0) of input tokens served from cache |
Source code in src/async_batch_llm/base.py
effective_input_tokens
Calculate effective input tokens (actual cost after caching).
Gemini charges 10% of the normal price for cached tokens.
Returns:
| Type | Description |
|---|---|
int
|
Effective number of input tokens billed |