Skip to content

Strategies API Reference

LLMCallStrategy

async_batch_llm.LLMCallStrategy

Bases: ABC, Generic[TOutput]

Abstract base class for LLM call strategies.

A strategy encapsulates how LLM calls are made, including: - Resource initialization (caches, clients) - Call execution with retries - Resource cleanup

The framework calls: 1. prepare() once before any retries 2. execute() for each attempt (including retries) 3. cleanup() once after all attempts complete or fail

cleanup async

cleanup() -> None

Clean up resources after all retry attempts complete.

Called once per work item after processing finishes (success or failure).

Use this for: - Closing connections/sessions - Releasing locks - Logging final metrics - Deleting temporary files

Do NOT use this for: - Deleting caches intended for reuse across runs - Destructive cleanup that prevents resource reuse

Note on Caches (v0.2.0): For reusable resources like Gemini caches with TTLs, consider letting them expire naturally to enable cost savings across multiple pipeline runs. See GeminiCachedStrategy for an example.

Default: no-op

Source code in src/async_batch_llm/llm_strategies.py
async def cleanup(self) -> None:
    """
    Clean up resources after all retry attempts complete.

    Called once per work item after processing finishes (success or failure).

    **Use this for:**
    - Closing connections/sessions
    - Releasing locks
    - Logging final metrics
    - Deleting temporary files

    **Do NOT use this for:**
    - Deleting caches intended for reuse across runs
    - Destructive cleanup that prevents resource reuse

    **Note on Caches (v0.2.0):**
    For reusable resources like Gemini caches with TTLs, consider letting
    them expire naturally to enable cost savings across multiple pipeline
    runs. See `GeminiCachedStrategy` for an example.

    Default: no-op
    """
    pass

dry_run async

dry_run(prompt: str) -> tuple[TOutput, TokenUsage]

Return mock output for dry-run mode (testing without API calls).

Override this method to provide realistic mock data for testing. Default implementation returns placeholder values that may not match your output type.

Parameters:

Name Type Description Default
prompt str

The prompt that would have been sent to the LLM

required

Returns:

Type Description
tuple[TOutput, TokenUsage]

Tuple of (mock_output, mock_token_usage)

Default behavior: - Returns string "[DRY-RUN] Mock output" as output - Returns mock token usage: 100 input, 50 output, 150 total

Source code in src/async_batch_llm/llm_strategies.py
async def dry_run(self, prompt: str) -> tuple[TOutput, TokenUsage]:
    """
    Return mock output for dry-run mode (testing without API calls).

    Override this method to provide realistic mock data for testing.
    Default implementation returns placeholder values that may not match
    your output type.

    Args:
        prompt: The prompt that would have been sent to the LLM

    Returns:
        Tuple of (mock_output, mock_token_usage)

    Default behavior:
    - Returns string "[DRY-RUN] Mock output" as output
    - Returns mock token usage: 100 input, 50 output, 150 total
    """
    mock_output: TOutput = f"[DRY-RUN] Mock output for prompt: {prompt[:50]}..."  # ty:ignore[invalid-assignment]
    mock_tokens: TokenUsage = {
        "input_tokens": 100,
        "output_tokens": 50,
        "total_tokens": 150,
    }
    return mock_output, mock_tokens

execute abstractmethod async

execute(prompt: str, attempt: int, timeout: float, state: RetryState | None = None) -> tuple[TOutput, TokenUsage]

Execute an LLM call for the given attempt.

Parameters:

Name Type Description Default
prompt str

The prompt to send to the LLM

required
attempt int

Which retry attempt this is (1, 2, 3, ...)

required
timeout float

Maximum time to wait for response (seconds)

required
state RetryState | None

Optional retry state that persists across attempts (v0.3.0)

None

Returns:

Type Description
TOutput

Tuple of (output, token_usage)

TokenUsage

where token_usage is a TokenUsage dict with optional keys:

tuple[TOutput, TokenUsage]

input_tokens, output_tokens, total_tokens, cached_input_tokens

Note (v0.3.0): The state parameter allows strategies to maintain state across retry attempts for multi-stage retry patterns. See RetryState documentation for examples.

Source code in src/async_batch_llm/llm_strategies.py
@abstractmethod
async def execute(
    self, prompt: str, attempt: int, timeout: float, state: "RetryState | None" = None
) -> tuple[TOutput, TokenUsage]:
    """
    Execute an LLM call for the given attempt.

    Args:
        prompt: The prompt to send to the LLM
        attempt: Which retry attempt this is (1, 2, 3, ...)
        timeout: Maximum time to wait for response (seconds)
        state: Optional retry state that persists across attempts (v0.3.0)

    Returns:
        Tuple of (output, token_usage)
        where token_usage is a TokenUsage dict with optional keys:
        input_tokens, output_tokens, total_tokens, cached_input_tokens

    Raises:
        Any exception to trigger retry (if retryable) or failure

    Note (v0.3.0):
        The state parameter allows strategies to maintain state across retry
        attempts for multi-stage retry patterns. See RetryState documentation
        for examples.
    """
    pass

on_error async

on_error(exception: Exception, attempt: int, state: RetryState | None = None) -> None

Handle errors that occur during execute().

Called by the framework when execute() raises an exception, before deciding whether to retry. This allows strategies to: - Inspect the error type to adjust retry behavior - Store error information for use in next attempt - Modify prompts based on validation errors - Track error patterns across attempts

Parameters:

Name Type Description Default
exception Exception

The exception that was raised during execute()

required
attempt int

Which attempt number failed (1, 2, 3, ...)

required
state RetryState | None

Optional retry state that persists across attempts (v0.3.0)

None

Default: no-op

Example (v0.2.0): async def on_error(self, exception: Exception, attempt: int) -> None: # Store last error for smart retry logic self.last_error = exception

    # Track validation errors vs network errors
    if isinstance(exception, ValidationError):
        self.should_escalate_model = True

Example (v0.3.0 with retry state): async def on_error( self, exception: Exception, attempt: int, state: RetryState | None = None ) -> None: if state: # Track validation errors separately from other errors if isinstance(exception, ValidationError): count = state.get('validation_failures', 0) + 1 state.set('validation_failures', count) # Save partial results for recovery if hasattr(exception, 'partial_data'): state.set('partial_data', exception.partial_data)

Source code in src/async_batch_llm/llm_strategies.py
async def on_error(
    self, exception: Exception, attempt: int, state: "RetryState | None" = None
) -> None:
    """
    Handle errors that occur during execute().

    Called by the framework when execute() raises an exception, before
    deciding whether to retry. This allows strategies to:
    - Inspect the error type to adjust retry behavior
    - Store error information for use in next attempt
    - Modify prompts based on validation errors
    - Track error patterns across attempts

    Args:
        exception: The exception that was raised during execute()
        attempt: Which attempt number failed (1, 2, 3, ...)
        state: Optional retry state that persists across attempts (v0.3.0)

    Default: no-op

    Example (v0.2.0):
        async def on_error(self, exception: Exception, attempt: int) -> None:
            # Store last error for smart retry logic
            self.last_error = exception

            # Track validation errors vs network errors
            if isinstance(exception, ValidationError):
                self.should_escalate_model = True

    Example (v0.3.0 with retry state):
        async def on_error(
            self, exception: Exception, attempt: int, state: RetryState | None = None
        ) -> None:
            if state:
                # Track validation errors separately from other errors
                if isinstance(exception, ValidationError):
                    count = state.get('validation_failures', 0) + 1
                    state.set('validation_failures', count)
                    # Save partial results for recovery
                    if hasattr(exception, 'partial_data'):
                        state.set('partial_data', exception.partial_data)
    """
    pass

prepare async

prepare() -> None

Initialize resources before making any LLM calls.

Called once per work item before any retry attempts. Use this to set up caches, initialize clients, etc.

Default: no-op

Source code in src/async_batch_llm/llm_strategies.py
async def prepare(self) -> None:
    """
    Initialize resources before making any LLM calls.

    Called once per work item before any retry attempts.
    Use this to set up caches, initialize clients, etc.

    Default: no-op
    """
    pass

PydanticAIStrategy

async_batch_llm.PydanticAIStrategy

PydanticAIStrategy(agent: Agent[None, TOutput])

Bases: LLMCallStrategy[TOutput]

Strategy for using PydanticAI agents.

This strategy wraps a PydanticAI agent, providing a clean interface for batch processing. The agent handles all model interaction, validation, and parsing.

Best for: Structured output with Pydantic models, using PydanticAI's features.

Initialize PydanticAI strategy.

Parameters:

Name Type Description Default
agent Agent[None, TOutput]

Configured PydanticAI agent

required
Source code in src/async_batch_llm/llm_strategies.py
def __init__(self, agent: "Agent[None, TOutput]"):
    """
    Initialize PydanticAI strategy.

    Args:
        agent: Configured PydanticAI agent
    """
    if Agent is Any:
        raise ImportError(
            "pydantic-ai is required for PydanticAIStrategy. "
            "Install with: pip install 'async-batch-llm[pydantic-ai]'"
        )

    self.agent = agent

dry_run async

dry_run(prompt: str) -> tuple[TOutput, TokenUsage]

Return mock output based on agent's result_type for dry-run mode.

Source code in src/async_batch_llm/llm_strategies.py
async def dry_run(self, prompt: str) -> tuple[TOutput, TokenUsage]:
    """Return mock output based on agent's result_type for dry-run mode."""
    # Try to create a mock instance of the expected output type
    try:
        from pydantic import BaseModel

        result_type = self.agent.result_type  # ty:ignore[unresolved-attribute]

        # If result_type is a Pydantic model, try to create an instance
        if isinstance(result_type, type) and issubclass(result_type, BaseModel):
            # Use model_construct to create instance without validation
            # This allows creating instances even with required fields
            mock_output: TOutput = result_type.model_construct()
        else:
            # For non-Pydantic types, use base class default
            return await super().dry_run(prompt)

    except Exception:
        # If anything fails, fall back to base class default
        return await super().dry_run(prompt)

    # Return mock output with realistic token usage
    mock_tokens: TokenUsage = {
        "input_tokens": len(prompt.split()),  # Rough estimate
        "output_tokens": 50,
        "total_tokens": len(prompt.split()) + 50,
    }

    return mock_output, mock_tokens

execute async

execute(prompt: str, attempt: int, timeout: float, state: RetryState | None = None) -> tuple[TOutput, TokenUsage]

Execute PydanticAI agent call.

Note: timeout parameter is provided for information but timeout enforcement is handled by the framework wrapping this call in asyncio.wait_for().

Parameters:

Name Type Description Default
prompt str

The prompt to send to the LLM

required
attempt int

Which retry attempt this is (1, 2, 3, ...)

required
timeout float

Maximum time to wait for response (seconds)

required
state RetryState | None

Optional retry state (v0.3.0, unused by this strategy)

None
Source code in src/async_batch_llm/llm_strategies.py
async def execute(
    self, prompt: str, attempt: int, timeout: float, state: RetryState | None = None
) -> tuple[TOutput, TokenUsage]:
    """Execute PydanticAI agent call.

    Note: timeout parameter is provided for information but timeout enforcement
    is handled by the framework wrapping this call in asyncio.wait_for().

    Args:
        prompt: The prompt to send to the LLM
        attempt: Which retry attempt this is (1, 2, 3, ...)
        timeout: Maximum time to wait for response (seconds)
        state: Optional retry state (v0.3.0, unused by this strategy)
    """
    result = await self.agent.run(prompt)

    # Extract token usage FIRST (before accessing result.output which may fail validation)
    usage = result.usage()
    tokens: TokenUsage = {
        "input_tokens": usage.request_tokens if usage else 0,
        "output_tokens": usage.response_tokens if usage else 0,
        "total_tokens": usage.total_tokens if usage else 0,
    }

    # Access result.output (may raise validation errors)
    try:
        output = result.output
    except Exception as e:
        # Attach token usage to exception so framework can track it
        if not hasattr(e, "__dict__"):
            # For built-in exceptions without __dict__, wrap in TokenTrackingError
            wrapped = TokenTrackingError(str(e), token_usage=tokens)
            wrapped.__cause__ = e
            raise wrapped from e
        else:
            e.__dict__["_failed_token_usage"] = tokens
            raise

    return output, tokens

GeminiStrategy

async_batch_llm.GeminiStrategy

GeminiStrategy(model: LLMModel, response_parser: Callable[[LLMResponse], TOutput], *, temperature: float = 0.0)

Bases: LLMCallStrategy[TOutput]

Strategy for calling an LLM model and parsing the response.

Accepts an LLMModel (e.g., GeminiModel or GeminiCachedModel) and a response parser. The model handles the API call and token extraction; the strategy handles response parsing and lifecycle delegation.

For caching, use GeminiStrategy(model=GeminiCachedModel(...)).

v0.6.0: Accepts LLMModel instead of raw client + model string.

Example

model = GeminiModel("gemini-2.5-flash", client) strategy = GeminiStrategy(model, response_parser=lambda r: r.text)

With caching:

cached_model = GeminiCachedModel("gemini-2.5-flash", client, cached_content=[...]) strategy = GeminiStrategy(cached_model, response_parser=lambda r: r.text)

Initialize strategy.

Parameters:

Name Type Description Default
model LLMModel

An LLMModel instance (e.g., GeminiModel, GeminiCachedModel).

required
response_parser Callable[[LLMResponse], TOutput]

Function to parse LLMResponse into TOutput.

required
temperature float

Default sampling temperature (overridable by subclasses).

0.0
Source code in src/async_batch_llm/llm_strategies.py
def __init__(
    self,
    model: "LLMModel",
    response_parser: Callable[[LLMResponse], TOutput],
    *,
    temperature: float = 0.0,
):
    """
    Initialize strategy.

    Args:
        model: An LLMModel instance (e.g., GeminiModel, GeminiCachedModel).
        response_parser: Function to parse LLMResponse into TOutput.
        temperature: Default sampling temperature (overridable by subclasses).
    """
    self.model = model
    self.response_parser = response_parser
    self.temperature = temperature

cleanup async

cleanup() -> None

Delegate to model.cleanup() if model has lifecycle.

Source code in src/async_batch_llm/llm_strategies.py
async def cleanup(self) -> None:
    """Delegate to model.cleanup() if model has lifecycle."""
    if isinstance(self.model, ManagedLLMModel):
        await self.model.cleanup()

execute async

execute(prompt: str, attempt: int, timeout: float, state: RetryState | None = None) -> tuple[TOutput, TokenUsage]

Execute LLM call via the model and parse the response.

Parameters:

Name Type Description Default
prompt str

The prompt to send to the LLM.

required
attempt int

Which retry attempt this is (1, 2, 3, ...).

required
timeout float

Maximum time for response (enforced by framework).

required
state RetryState | None

Optional retry state for cross-attempt persistence.

None

Returns:

Type Description
tuple[TOutput, TokenUsage]

Tuple of (parsed_output, token_usage).

Source code in src/async_batch_llm/llm_strategies.py
async def execute(
    self, prompt: str, attempt: int, timeout: float, state: RetryState | None = None
) -> tuple[TOutput, TokenUsage]:
    """Execute LLM call via the model and parse the response.

    Args:
        prompt: The prompt to send to the LLM.
        attempt: Which retry attempt this is (1, 2, 3, ...).
        timeout: Maximum time for response (enforced by framework).
        state: Optional retry state for cross-attempt persistence.

    Returns:
        Tuple of (parsed_output, token_usage).
    """
    llm_response = await self.model.generate(prompt, temperature=self.temperature)

    try:
        output = self.response_parser(llm_response)
    except Exception as e:
        # Attach token usage to exception so framework can track it
        tokens = llm_response.token_usage
        if not hasattr(e, "__dict__"):
            wrapped = TokenTrackingError(str(e), token_usage=tokens)
            wrapped.__cause__ = e
            raise wrapped from e
        else:
            e.__dict__["_failed_token_usage"] = tokens
            raise

    return output, llm_response.token_usage

prepare async

prepare() -> None

Delegate to model.prepare() if model has lifecycle.

Source code in src/async_batch_llm/llm_strategies.py
async def prepare(self) -> None:
    """Delegate to model.prepare() if model has lifecycle."""
    if isinstance(self.model, ManagedLLMModel):
        await self.model.prepare()

Models

GeminiModel

async_batch_llm.GeminiModel

GeminiModel(model: str, client: Client, *, safety_settings: list[dict[str, Any]] | None = None, system_instruction: str | None = None)

LLM model backed by the Google Gemini API.

Wraps a genai.Client and model name, handling API calls, token extraction, and response normalization. Implements the LLMModel protocol.

Example

client = genai.Client(api_key="...") model = GeminiModel("gemini-2.5-flash", client) response = await model.generate("Hello!") print(response.text, response.input_tokens)

Added in v0.6.0.

Parameters:

Name Type Description Default
model str

Model name (e.g., "gemini-3.1-flash-lite-preview").

required
client Client

Initialized genai.Client.

required
safety_settings list[dict[str, Any]] | None

Default safety settings for all calls.

None
system_instruction str | None

Default system instruction (overridable per-call).

None
Source code in src/async_batch_llm/models.py
def __init__(
    self,
    model: str,
    client: "genai.Client",
    *,
    safety_settings: list[dict[str, Any]] | None = None,
    system_instruction: str | None = None,
):
    """
    Args:
        model: Model name (e.g., "gemini-3.1-flash-lite-preview").
        client: Initialized genai.Client.
        safety_settings: Default safety settings for all calls.
        system_instruction: Default system instruction (overridable per-call).
    """
    if genai is None:
        raise ImportError(
            "google-genai is required for GeminiModel. "
            "Install with: pip install 'async-batch-llm[gemini]'"
        )

    self._model = model
    self._client = client
    self._safety_settings = safety_settings
    self._default_system_instruction = system_instruction

generate async

generate(prompt: str | list[Any], *, temperature: float = 0.0, system_instruction: str | None = None, config: dict[str, Any] | None = None) -> LLMResponse

Generate a response from Gemini.

Parameters:

Name Type Description Default
prompt str | list[Any]

Text prompt or list of content parts (multimodal).

required
temperature float

Sampling temperature.

0.0
system_instruction str | None

Override default system instruction.

None
config dict[str, Any] | None

Additional provider-specific config entries.

None

Returns:

Type Description
LLMResponse

Normalized LLMResponse.

Source code in src/async_batch_llm/models.py
async def generate(
    self,
    prompt: str | list[Any],
    *,
    temperature: float = 0.0,
    system_instruction: str | None = None,
    config: dict[str, Any] | None = None,
) -> LLMResponse:
    """Generate a response from Gemini.

    Args:
        prompt: Text prompt or list of content parts (multimodal).
        temperature: Sampling temperature.
        system_instruction: Override default system instruction.
        config: Additional provider-specific config entries.

    Returns:
        Normalized LLMResponse.
    """
    # Build config dict
    call_config: dict[str, Any] = {"temperature": temperature}

    si = system_instruction or self._default_system_instruction
    if si is not None:
        call_config["system_instruction"] = si

    if self._safety_settings:
        call_config["safety_settings"] = self._safety_settings

    if config:
        call_config.update(config)

    # Make the API call (config is built as a dict; the SDK accepts this at runtime
    # even though the type stubs say GenerateContentConfig)
    response = await self._client.aio.models.generate_content(
        model=self._model,
        contents=prompt,
        config=call_config,  # type: ignore[arg-type]  # ty:ignore[invalid-argument-type]
    )

    # Extract tokens
    input_tokens, output_tokens, total_tokens, cached_tokens = _extract_tokens(response)

    # Extract text (may be None if safety-blocked)
    text = response.text
    if text is None:
        metadata = _extract_metadata(response)
        safety_info = ""
        if metadata and "safety_ratings" in metadata:
            safety_info = f" Safety ratings: {metadata['safety_ratings']}"
        raise ValueError(
            f"Empty response from model (likely blocked by safety filter).{safety_info}"
        )

    return LLMResponse(
        text=text,
        input_tokens=input_tokens,
        output_tokens=output_tokens,
        total_tokens=total_tokens,
        cached_input_tokens=cached_tokens,
        metadata=_extract_metadata(response),
        raw=response,
    )

GeminiCachedModel

async_batch_llm.GeminiCachedModel

GeminiCachedModel(model: str, client: Client, cached_content: list[Content], *, cache_ttl_seconds: int = 3600, cache_renewal_buffer_seconds: int = 300, auto_renew: bool = True, cache_tags: dict[str, str] | None = None, safety_settings: list[dict[str, Any]] | None = None)

LLM model backed by Google Gemini with context caching.

Wraps a genai.Client with cache lifecycle management. Implements the ManagedLLMModel protocol: call prepare() before first use, cleanup() when done.

IMPORTANT — share one instance across work items. Create ONE GeminiCachedModel and reuse it across every LLMWorkItem that should share the cached context. Constructing a new instance per item defeats caching entirely and can cost 10× more. The framework calls prepare() exactly once per unique instance, so sharing is the intended lifecycle. See examples/example_gemini_cached.py for the pattern.

This provides 70-90% cost savings when shared correctly.

Example

model = GeminiCachedModel( ... "gemini-2.5-flash", client, ... cached_content=[system_instruction, context_docs], ... ) await model.prepare() # finds or creates cache response = await model.generate("Process this") await model.cleanup() # preserves cache for reuse

Added in v0.6.0.

Parameters:

Name Type Description Default
model str

Model name (e.g., "gemini-2.5-flash").

required
client Client

Initialized genai.Client.

required
cached_content list[Content]

Content to cache (system instructions, documents).

required
cache_ttl_seconds int

Cache TTL in seconds (default: 3600 = 1 hour).

3600
cache_renewal_buffer_seconds int

Renew this many seconds before expiry (default: 300 = 5 minutes).

300
auto_renew bool

Auto-renew expired caches in generate() (default: True).

True
cache_tags dict[str, str] | None

Tags for precise cache matching.

None
safety_settings list[dict[str, Any]] | None

Default safety settings for all calls.

None
Source code in src/async_batch_llm/models.py
def __init__(
    self,
    model: str,
    client: "genai.Client",
    cached_content: list["Content"],
    *,
    cache_ttl_seconds: int = 3600,
    cache_renewal_buffer_seconds: int = 300,
    auto_renew: bool = True,
    cache_tags: dict[str, str] | None = None,
    safety_settings: list[dict[str, Any]] | None = None,
):
    """
    Args:
        model: Model name (e.g., "gemini-2.5-flash").
        client: Initialized genai.Client.
        cached_content: Content to cache (system instructions, documents).
        cache_ttl_seconds: Cache TTL in seconds (default: 3600 = 1 hour).
        cache_renewal_buffer_seconds: Renew this many seconds before expiry
            (default: 300 = 5 minutes).
        auto_renew: Auto-renew expired caches in generate() (default: True).
        cache_tags: Tags for precise cache matching.
        safety_settings: Default safety settings for all calls.
    """
    if genai is None:
        raise ImportError(
            "google-genai is required for GeminiCachedModel. "
            "Install with: pip install 'async-batch-llm[gemini]'"
        )

    if cache_renewal_buffer_seconds >= cache_ttl_seconds:
        raise ValueError(
            f"cache_renewal_buffer_seconds ({cache_renewal_buffer_seconds}) "
            f"must be less than cache_ttl_seconds ({cache_ttl_seconds})."
        )

    if 10 <= cache_ttl_seconds < 60:
        import warnings

        warnings.warn(
            f"cache_ttl_seconds ({cache_ttl_seconds}) is less than 60 seconds. "
            f"Very short TTLs defeat the purpose of caching. "
            f"Recommended minimum: 300 seconds (5 minutes).",
            UserWarning,
            stacklevel=2,
        )

    if cache_renewal_buffer_seconds < 60:
        import warnings

        warnings.warn(
            f"cache_renewal_buffer_seconds ({cache_renewal_buffer_seconds}) is less than "
            f"60 seconds. Small buffers risk renewing on every call if generation takes "
            f"longer than the buffer. Recommended minimum: 60 seconds.",
            UserWarning,
            stacklevel=2,
        )

    self._model = model
    self._client = client
    self._cached_content = cached_content
    self._cache_ttl_seconds = cache_ttl_seconds
    self._cache_renewal_buffer_seconds = cache_renewal_buffer_seconds
    self._auto_renew = auto_renew
    self._cache_tags = cache_tags or {}
    self._safety_settings = safety_settings

    self._cache: Any = None
    self._cache_created_at: float | None = None
    self._cache_lock: Any = None
    self._prepared = False

cache_name property

cache_name: str | None

The name of the active cache, or None.

cleanup async

cleanup() -> None

Preserve cache for reuse (does not delete). Idempotent.

Source code in src/async_batch_llm/models.py
async def cleanup(self) -> None:
    """Preserve cache for reuse (does not delete). Idempotent."""
    if self._cache:
        logger.info(
            f"Leaving cache active for reuse: {self._cache.name} "
            f"(TTL: {self._cache_ttl_seconds}s, will expire naturally)"
        )

delete_cache async

delete_cache() -> None

Explicitly delete the cache.

Safe to call concurrently: the cache lock serializes delete attempts so the provider API fires at most once, and late callers that arrive after the cache is cleared return silently.

Source code in src/async_batch_llm/models.py
async def delete_cache(self) -> None:
    """Explicitly delete the cache.

    Safe to call concurrently: the cache lock serializes delete attempts
    so the provider API fires at most once, and late callers that arrive
    after the cache is cleared return silently.
    """
    if self._cache is None:
        return

    import asyncio as _asyncio

    if getattr(self, "_cache_lock", None) is None:
        self._cache_lock = _asyncio.Lock()

    async with self._cache_lock:
        cache = self._cache
        if cache is None:
            # A concurrent caller already finished the delete.
            return

        # Capture the name up front so log messages don't depend on
        # self._cache still existing after concurrent callers clear it.
        cache_name = cache.name
        # Clear state BEFORE the API call so concurrent tasks that
        # re-enter see an empty cache and no-op.
        self._cache = None
        self._cache_created_at = None
        self._prepared = False

        try:
            await self._client.aio.caches.delete(name=cache_name)
            logger.info(f"Deleted Gemini cache: {cache_name}")
        except Exception as e:
            # Keep Exception (not BaseException) so KeyboardInterrupt still propagates;
            # cache-delete failures are best-effort — caches expire on their own.
            logger.warning(
                f"Failed to delete Gemini cache '{cache_name}': {e}. "
                "Cache may have already expired or been deleted.",
                exc_info=True,
            )

generate async

generate(prompt: str | list[Any], *, temperature: float = 0.0, system_instruction: str | None = None, config: dict[str, Any] | None = None) -> LLMResponse

Generate a response using the cached context.

Parameters:

Name Type Description Default
prompt str | list[Any]

Text prompt or multimodal content parts.

required
temperature float

Sampling temperature.

0.0
system_instruction str | None

Not supported with caching — raises ValueError.

None
config dict[str, Any] | None

Additional provider-specific config entries.

None

Returns:

Type Description
LLMResponse

Normalized LLMResponse.

Source code in src/async_batch_llm/models.py
async def generate(
    self,
    prompt: str | list[Any],
    *,
    temperature: float = 0.0,
    system_instruction: str | None = None,
    config: dict[str, Any] | None = None,
) -> LLMResponse:
    """Generate a response using the cached context.

    Args:
        prompt: Text prompt or multimodal content parts.
        temperature: Sampling temperature.
        system_instruction: Not supported with caching — raises ValueError.
        config: Additional provider-specific config entries.

    Returns:
        Normalized LLMResponse.
    """
    if system_instruction is not None:
        raise ValueError(
            "system_instruction cannot be overridden per-call with cached models. "
            "The system instruction is baked into the cache at creation time."
        )

    # Auto-renew if expired
    if self._auto_renew and self._is_cache_expired():
        logger.info(
            "Cache expired or about to expire, renewing before API call "
            f"(age: {time.time() - (self._cache_created_at or 0):.0f}s, "
            f"renewal buffer: {self._cache_renewal_buffer_seconds}s)"
        )

        import asyncio

        if self._cache_lock is None:
            self._cache_lock = asyncio.Lock()

        async with self._cache_lock:
            if self._is_cache_expired():
                self._cache = None
                self._cache_created_at = None
                self._prepared = False
                await self._find_or_create_cache()
                self._prepared = True

    if self._cache is None:
        raise RuntimeError("Cache not initialized — call prepare() first")

    # Build config with cache reference
    call_config: dict[str, Any] = {
        "cached_content": self._cache.name,
        "temperature": temperature,
    }

    if self._safety_settings:
        call_config["safety_settings"] = self._safety_settings

    if config:
        call_config.update(config)

    response = await self._client.aio.models.generate_content(
        model=self._model,
        contents=prompt,
        config=call_config,  # type: ignore[arg-type]  # ty:ignore[invalid-argument-type]
    )

    input_tokens, output_tokens, total_tokens, cached_tokens = _extract_tokens(response)

    text = response.text
    if text is None:
        metadata = _extract_metadata(response)
        safety_info = ""
        if metadata and "safety_ratings" in metadata:
            safety_info = f" Safety ratings: {metadata['safety_ratings']}"
        raise ValueError(
            f"Empty response from model (likely blocked by safety filter).{safety_info}"
        )

    return LLMResponse(
        text=text,
        input_tokens=input_tokens,
        output_tokens=output_tokens,
        total_tokens=total_tokens,
        cached_input_tokens=cached_tokens,
        metadata=_extract_metadata(response),
        raw=response,
    )

prepare async

prepare() -> None

Find or create the Gemini cache. Idempotent.

Source code in src/async_batch_llm/models.py
async def prepare(self) -> None:
    """Find or create the Gemini cache. Idempotent."""
    if self._prepared:
        return

    import asyncio

    if self._cache_lock is None:
        self._cache_lock = asyncio.Lock()

    async with self._cache_lock:
        if self._prepared:
            return
        await self._find_or_create_cache()
        self._prepared = True

Protocols

LLMModel

async_batch_llm.LLMModel

Bases: Protocol

Protocol for LLM model instances that can generate responses.

Implementations wrap a specific provider's client and model configuration, handling API calls and response normalization. Strategies call generate() without needing to know about provider-specific details.

Added in v0.6.0.

generate async

generate(prompt: str | list[Any], *, temperature: float = 0.0, system_instruction: str | None = None, config: dict[str, Any] | None = None) -> LLMResponse

Generate a response from the LLM.

Parameters:

Name Type Description Default
prompt str | list[Any]

Text prompt, or list of content parts for multimodal input.

required
temperature float

Sampling temperature (0.0 = deterministic).

0.0
system_instruction str | None

System instruction override (None = use default).

None
config dict[str, Any] | None

Provider-specific configuration (e.g., response_mime_type).

None

Returns:

Type Description
LLMResponse

Normalized LLMResponse with text, token counts, and metadata.

Source code in src/async_batch_llm/core/protocols.py
async def generate(
    self,
    prompt: str | list[Any],
    *,
    temperature: float = 0.0,
    system_instruction: str | None = None,
    config: dict[str, Any] | None = None,
) -> LLMResponse:
    """
    Generate a response from the LLM.

    Args:
        prompt: Text prompt, or list of content parts for multimodal input.
        temperature: Sampling temperature (0.0 = deterministic).
        system_instruction: System instruction override (None = use default).
        config: Provider-specific configuration (e.g., response_mime_type).

    Returns:
        Normalized LLMResponse with text, token counts, and metadata.
    """
    ...

ManagedLLMModel

async_batch_llm.ManagedLLMModel

Bases: LLMModel, Protocol

LLMModel with lifecycle management (e.g., caching).

Models that need one-time setup (creating a cache) or cleanup implement this protocol. The strategy delegates prepare/cleanup calls to the model.

Added in v0.6.0.

cleanup async

cleanup() -> None

Release resources. Must be idempotent.

Source code in src/async_batch_llm/core/protocols.py
async def cleanup(self) -> None:
    """Release resources. Must be idempotent."""
    ...

prepare async

prepare() -> None

Initialize resources (e.g., find or create a cache). Must be idempotent.

Source code in src/async_batch_llm/core/protocols.py
async def prepare(self) -> None:
    """Initialize resources (e.g., find or create a cache). Must be idempotent."""
    ...

LLMResponse

async_batch_llm.LLMResponse dataclass

LLMResponse(text: str, input_tokens: int, output_tokens: int, total_tokens: int, cached_input_tokens: int = 0, metadata: dict[str, Any] | None = None, raw: Any = None)

Normalized response from any LLM provider.

Returned by LLMModel.generate(). Provides a provider-agnostic interface so strategies don't need to know about Gemini, OpenAI, etc. response formats.

Attributes:

Name Type Description
text str

The response text content.

input_tokens int

Number of input/prompt tokens.

output_tokens int

Number of output/completion tokens.

total_tokens int

Total tokens used.

cached_input_tokens int

Input tokens served from cache (0 if no caching).

metadata dict[str, Any] | None

Provider-specific metadata (safety ratings, finish reason, etc.).

raw Any

The raw provider response object, for edge cases.

Added in v0.6.0.

token_usage property

token_usage: TokenUsage

Return token counts as a TokenUsage dict.