Strategies API Reference
LLMCallStrategy
async_batch_llm.LLMCallStrategy
Bases: ABC, Generic[TOutput]
Abstract base class for LLM call strategies.
A strategy encapsulates how LLM calls are made, including: - Resource initialization (caches, clients) - Call execution with retries - Resource cleanup
The framework calls: 1. prepare() once before any retries 2. execute() for each attempt (including retries) 3. cleanup() once after all attempts complete or fail
cleanup
async
Clean up resources after all retry attempts complete.
Called once per work item after processing finishes (success or failure).
Use this for: - Closing connections/sessions - Releasing locks - Logging final metrics - Deleting temporary files
Do NOT use this for: - Deleting caches intended for reuse across runs - Destructive cleanup that prevents resource reuse
Note on Caches (v0.2.0):
For reusable resources like Gemini caches with TTLs, consider letting
them expire naturally to enable cost savings across multiple pipeline
runs. See GeminiCachedStrategy for an example.
Default: no-op
Source code in src/async_batch_llm/llm_strategies.py
dry_run
async
Return mock output for dry-run mode (testing without API calls).
Override this method to provide realistic mock data for testing. Default implementation returns placeholder values that may not match your output type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
prompt
|
str
|
The prompt that would have been sent to the LLM |
required |
Returns:
| Type | Description |
|---|---|
tuple[TOutput, TokenUsage]
|
Tuple of (mock_output, mock_token_usage) |
Default behavior: - Returns string "[DRY-RUN] Mock output" as output - Returns mock token usage: 100 input, 50 output, 150 total
Source code in src/async_batch_llm/llm_strategies.py
execute
abstractmethod
async
execute(prompt: str, attempt: int, timeout: float, state: RetryState | None = None) -> tuple[TOutput, TokenUsage]
Execute an LLM call for the given attempt.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
prompt
|
str
|
The prompt to send to the LLM |
required |
attempt
|
int
|
Which retry attempt this is (1, 2, 3, ...) |
required |
timeout
|
float
|
Maximum time to wait for response (seconds) |
required |
state
|
RetryState | None
|
Optional retry state that persists across attempts (v0.3.0) |
None
|
Returns:
| Type | Description |
|---|---|
TOutput
|
Tuple of (output, token_usage) |
TokenUsage
|
where token_usage is a TokenUsage dict with optional keys: |
tuple[TOutput, TokenUsage]
|
input_tokens, output_tokens, total_tokens, cached_input_tokens |
Note (v0.3.0): The state parameter allows strategies to maintain state across retry attempts for multi-stage retry patterns. See RetryState documentation for examples.
Source code in src/async_batch_llm/llm_strategies.py
on_error
async
Handle errors that occur during execute().
Called by the framework when execute() raises an exception, before deciding whether to retry. This allows strategies to: - Inspect the error type to adjust retry behavior - Store error information for use in next attempt - Modify prompts based on validation errors - Track error patterns across attempts
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
exception
|
Exception
|
The exception that was raised during execute() |
required |
attempt
|
int
|
Which attempt number failed (1, 2, 3, ...) |
required |
state
|
RetryState | None
|
Optional retry state that persists across attempts (v0.3.0) |
None
|
Default: no-op
Example (v0.2.0): async def on_error(self, exception: Exception, attempt: int) -> None: # Store last error for smart retry logic self.last_error = exception
# Track validation errors vs network errors
if isinstance(exception, ValidationError):
self.should_escalate_model = True
Example (v0.3.0 with retry state): async def on_error( self, exception: Exception, attempt: int, state: RetryState | None = None ) -> None: if state: # Track validation errors separately from other errors if isinstance(exception, ValidationError): count = state.get('validation_failures', 0) + 1 state.set('validation_failures', count) # Save partial results for recovery if hasattr(exception, 'partial_data'): state.set('partial_data', exception.partial_data)
Source code in src/async_batch_llm/llm_strategies.py
prepare
async
Initialize resources before making any LLM calls.
Called once per work item before any retry attempts. Use this to set up caches, initialize clients, etc.
Default: no-op
PydanticAIStrategy
async_batch_llm.PydanticAIStrategy
Bases: LLMCallStrategy[TOutput]
Strategy for using PydanticAI agents.
This strategy wraps a PydanticAI agent, providing a clean interface for batch processing. The agent handles all model interaction, validation, and parsing.
Best for: Structured output with Pydantic models, using PydanticAI's features.
Initialize PydanticAI strategy.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent
|
Agent[None, TOutput]
|
Configured PydanticAI agent |
required |
Source code in src/async_batch_llm/llm_strategies.py
dry_run
async
Return mock output based on agent's result_type for dry-run mode.
Source code in src/async_batch_llm/llm_strategies.py
execute
async
execute(prompt: str, attempt: int, timeout: float, state: RetryState | None = None) -> tuple[TOutput, TokenUsage]
Execute PydanticAI agent call.
Note: timeout parameter is provided for information but timeout enforcement is handled by the framework wrapping this call in asyncio.wait_for().
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
prompt
|
str
|
The prompt to send to the LLM |
required |
attempt
|
int
|
Which retry attempt this is (1, 2, 3, ...) |
required |
timeout
|
float
|
Maximum time to wait for response (seconds) |
required |
state
|
RetryState | None
|
Optional retry state (v0.3.0, unused by this strategy) |
None
|
Source code in src/async_batch_llm/llm_strategies.py
GeminiStrategy
async_batch_llm.GeminiStrategy
GeminiStrategy(model: LLMModel, response_parser: Callable[[LLMResponse], TOutput], *, temperature: float = 0.0)
Bases: LLMCallStrategy[TOutput]
Strategy for calling an LLM model and parsing the response.
Accepts an LLMModel (e.g., GeminiModel or GeminiCachedModel) and a response parser. The model handles the API call and token extraction; the strategy handles response parsing and lifecycle delegation.
For caching, use GeminiStrategy(model=GeminiCachedModel(...)).
v0.6.0: Accepts LLMModel instead of raw client + model string.
Example
model = GeminiModel("gemini-2.5-flash", client) strategy = GeminiStrategy(model, response_parser=lambda r: r.text)
With caching:
cached_model = GeminiCachedModel("gemini-2.5-flash", client, cached_content=[...]) strategy = GeminiStrategy(cached_model, response_parser=lambda r: r.text)
Initialize strategy.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
model
|
LLMModel
|
An LLMModel instance (e.g., GeminiModel, GeminiCachedModel). |
required |
response_parser
|
Callable[[LLMResponse], TOutput]
|
Function to parse LLMResponse into TOutput. |
required |
temperature
|
float
|
Default sampling temperature (overridable by subclasses). |
0.0
|
Source code in src/async_batch_llm/llm_strategies.py
cleanup
async
execute
async
execute(prompt: str, attempt: int, timeout: float, state: RetryState | None = None) -> tuple[TOutput, TokenUsage]
Execute LLM call via the model and parse the response.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
prompt
|
str
|
The prompt to send to the LLM. |
required |
attempt
|
int
|
Which retry attempt this is (1, 2, 3, ...). |
required |
timeout
|
float
|
Maximum time for response (enforced by framework). |
required |
state
|
RetryState | None
|
Optional retry state for cross-attempt persistence. |
None
|
Returns:
| Type | Description |
|---|---|
tuple[TOutput, TokenUsage]
|
Tuple of (parsed_output, token_usage). |
Source code in src/async_batch_llm/llm_strategies.py
prepare
async
Models
GeminiModel
async_batch_llm.GeminiModel
GeminiModel(model: str, client: Client, *, safety_settings: list[dict[str, Any]] | None = None, system_instruction: str | None = None)
LLM model backed by the Google Gemini API.
Wraps a genai.Client and model name, handling API calls, token extraction, and response normalization. Implements the LLMModel protocol.
Example
client = genai.Client(api_key="...") model = GeminiModel("gemini-2.5-flash", client) response = await model.generate("Hello!") print(response.text, response.input_tokens)
Added in v0.6.0.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
model
|
str
|
Model name (e.g., "gemini-3.1-flash-lite-preview"). |
required |
client
|
Client
|
Initialized genai.Client. |
required |
safety_settings
|
list[dict[str, Any]] | None
|
Default safety settings for all calls. |
None
|
system_instruction
|
str | None
|
Default system instruction (overridable per-call). |
None
|
Source code in src/async_batch_llm/models.py
generate
async
generate(prompt: str | list[Any], *, temperature: float = 0.0, system_instruction: str | None = None, config: dict[str, Any] | None = None) -> LLMResponse
Generate a response from Gemini.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
prompt
|
str | list[Any]
|
Text prompt or list of content parts (multimodal). |
required |
temperature
|
float
|
Sampling temperature. |
0.0
|
system_instruction
|
str | None
|
Override default system instruction. |
None
|
config
|
dict[str, Any] | None
|
Additional provider-specific config entries. |
None
|
Returns:
| Type | Description |
|---|---|
LLMResponse
|
Normalized LLMResponse. |
Source code in src/async_batch_llm/models.py
GeminiCachedModel
async_batch_llm.GeminiCachedModel
GeminiCachedModel(model: str, client: Client, cached_content: list[Content], *, cache_ttl_seconds: int = 3600, cache_renewal_buffer_seconds: int = 300, auto_renew: bool = True, cache_tags: dict[str, str] | None = None, safety_settings: list[dict[str, Any]] | None = None)
LLM model backed by Google Gemini with context caching.
Wraps a genai.Client with cache lifecycle management. Implements the ManagedLLMModel protocol: call prepare() before first use, cleanup() when done.
IMPORTANT — share one instance across work items. Create ONE GeminiCachedModel and reuse it across every LLMWorkItem that should share the cached context. Constructing a new instance per item defeats caching entirely and can cost 10× more. The framework calls prepare() exactly once per unique instance, so sharing is the intended lifecycle. See examples/example_gemini_cached.py for the pattern.
This provides 70-90% cost savings when shared correctly.
Example
model = GeminiCachedModel( ... "gemini-2.5-flash", client, ... cached_content=[system_instruction, context_docs], ... ) await model.prepare() # finds or creates cache response = await model.generate("Process this") await model.cleanup() # preserves cache for reuse
Added in v0.6.0.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
model
|
str
|
Model name (e.g., "gemini-2.5-flash"). |
required |
client
|
Client
|
Initialized genai.Client. |
required |
cached_content
|
list[Content]
|
Content to cache (system instructions, documents). |
required |
cache_ttl_seconds
|
int
|
Cache TTL in seconds (default: 3600 = 1 hour). |
3600
|
cache_renewal_buffer_seconds
|
int
|
Renew this many seconds before expiry (default: 300 = 5 minutes). |
300
|
auto_renew
|
bool
|
Auto-renew expired caches in generate() (default: True). |
True
|
cache_tags
|
dict[str, str] | None
|
Tags for precise cache matching. |
None
|
safety_settings
|
list[dict[str, Any]] | None
|
Default safety settings for all calls. |
None
|
Source code in src/async_batch_llm/models.py
cleanup
async
Preserve cache for reuse (does not delete). Idempotent.
Source code in src/async_batch_llm/models.py
delete_cache
async
Explicitly delete the cache.
Safe to call concurrently: the cache lock serializes delete attempts so the provider API fires at most once, and late callers that arrive after the cache is cleared return silently.
Source code in src/async_batch_llm/models.py
generate
async
generate(prompt: str | list[Any], *, temperature: float = 0.0, system_instruction: str | None = None, config: dict[str, Any] | None = None) -> LLMResponse
Generate a response using the cached context.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
prompt
|
str | list[Any]
|
Text prompt or multimodal content parts. |
required |
temperature
|
float
|
Sampling temperature. |
0.0
|
system_instruction
|
str | None
|
Not supported with caching — raises ValueError. |
None
|
config
|
dict[str, Any] | None
|
Additional provider-specific config entries. |
None
|
Returns:
| Type | Description |
|---|---|
LLMResponse
|
Normalized LLMResponse. |
Source code in src/async_batch_llm/models.py
372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 | |
prepare
async
Find or create the Gemini cache. Idempotent.
Source code in src/async_batch_llm/models.py
Protocols
LLMModel
async_batch_llm.LLMModel
Bases: Protocol
Protocol for LLM model instances that can generate responses.
Implementations wrap a specific provider's client and model configuration, handling API calls and response normalization. Strategies call generate() without needing to know about provider-specific details.
Added in v0.6.0.
generate
async
generate(prompt: str | list[Any], *, temperature: float = 0.0, system_instruction: str | None = None, config: dict[str, Any] | None = None) -> LLMResponse
Generate a response from the LLM.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
prompt
|
str | list[Any]
|
Text prompt, or list of content parts for multimodal input. |
required |
temperature
|
float
|
Sampling temperature (0.0 = deterministic). |
0.0
|
system_instruction
|
str | None
|
System instruction override (None = use default). |
None
|
config
|
dict[str, Any] | None
|
Provider-specific configuration (e.g., response_mime_type). |
None
|
Returns:
| Type | Description |
|---|---|
LLMResponse
|
Normalized LLMResponse with text, token counts, and metadata. |
Source code in src/async_batch_llm/core/protocols.py
ManagedLLMModel
async_batch_llm.ManagedLLMModel
Bases: LLMModel, Protocol
LLMModel with lifecycle management (e.g., caching).
Models that need one-time setup (creating a cache) or cleanup implement this protocol. The strategy delegates prepare/cleanup calls to the model.
Added in v0.6.0.
cleanup
async
LLMResponse
async_batch_llm.LLMResponse
dataclass
LLMResponse(text: str, input_tokens: int, output_tokens: int, total_tokens: int, cached_input_tokens: int = 0, metadata: dict[str, Any] | None = None, raw: Any = None)
Normalized response from any LLM provider.
Returned by LLMModel.generate(). Provides a provider-agnostic interface so strategies don't need to know about Gemini, OpenAI, etc. response formats.
Attributes:
| Name | Type | Description |
|---|---|---|
text |
str
|
The response text content. |
input_tokens |
int
|
Number of input/prompt tokens. |
output_tokens |
int
|
Number of output/completion tokens. |
total_tokens |
int
|
Total tokens used. |
cached_input_tokens |
int
|
Input tokens served from cache (0 if no caching). |
metadata |
dict[str, Any] | None
|
Provider-specific metadata (safety ratings, finish reason, etc.). |
raw |
Any
|
The raw provider response object, for edge cases. |
Added in v0.6.0.