Skip to content

Basic Usage Examples

Simple Batch Processing

Process multiple prompts in parallel:

import asyncio
from async_batch_llm import (
    ParallelBatchProcessor,
    LLMWorkItem,
    ProcessorConfig,
    PydanticAIStrategy,
)
from pydantic_ai import Agent

async def main():
    agent = Agent("gemini-2.5-flash", result_type=str)
    strategy = PydanticAIStrategy(agent=agent)

    config = ProcessorConfig(max_workers=5)

    async with ParallelBatchProcessor(config=config) as processor:
        prompts = [
            "What is Python?",
            "What is async/await?",
            "What is asyncio?"
        ]

        for i, prompt in enumerate(prompts):
            await processor.add_work(
                LLMWorkItem(
                    item_id=f"item_{i}",
                    strategy=strategy,
                    prompt=prompt
                )
            )

        result = await processor.process_all()

        for work_result in result.results:
            if work_result.success:
                print(f"{work_result.item_id}: {work_result.output}")
            else:
                print(f"{work_result.item_id}: Failed - {work_result.error}")

asyncio.run(main())

Structured Output

Use Pydantic models for validated output:

from pydantic import BaseModel

class CodeReview(BaseModel):
    issues: list[str]
    suggestions: list[str]
    rating: int

async def review_code():
    agent = Agent("gemini-2.5-flash", result_type=CodeReview)
    strategy = PydanticAIStrategy(agent=agent)

    config = ProcessorConfig(max_workers=3)

    async with ParallelBatchProcessor(config=config) as processor:
        code_snippets = ["def foo(): pass", "def bar(): return 42"]

        for snippet in code_snippets:
            await processor.add_work(
                LLMWorkItem(
                    item_id=snippet[:20],
                    strategy=strategy,
                    prompt=f"Review this code:\n{snippet}"
                )
            )

        result = await processor.process_all()

        for work_result in result.results:
            if work_result.success:
                review = work_result.output
                print(f"Rating: {review.rating}/10")
                print(f"Issues: {review.issues}")

Context Passing

Pass context through the processing pipeline:

from dataclasses import dataclass

@dataclass
class FileContext:
    filepath: str
    original_content: str

async def process_with_context():
    agent = Agent("gemini-2.5-flash", result_type=str)
    strategy = PydanticAIStrategy(agent=agent)

    config = ProcessorConfig(max_workers=5)

    async with ParallelBatchProcessor[str, str, FileContext](config=config) as processor:
        files = [
            ("file1.py", "content1"),
            ("file2.py", "content2"),
        ]

        for filepath, content in files:
            await processor.add_work(
                LLMWorkItem(
                    item_id=filepath,
                    strategy=strategy,
                    prompt=f"Summarize: {content}",
                    context=FileContext(filepath=filepath, original_content=content)
                )
            )

        result = await processor.process_all()

        for work_result in result.results:
            if work_result.success and work_result.context:
                print(f"File: {work_result.context.filepath}")
                print(f"Summary: {work_result.output}")

Post-Processing

Use post-processors to handle results as they complete:

async def save_result(result):
    """Called for each completed work item."""
    if result.success:
        # Save to database, file, etc.
        await save_to_db(result.item_id, result.output)
        print(f"Saved {result.item_id}")

async def process_with_post_processor():
    agent = Agent("gemini-2.5-flash", result_type=str)
    strategy = PydanticAIStrategy(agent=agent)

    config = ProcessorConfig(max_workers=5)

    async with ParallelBatchProcessor(
        config=config,
        post_processor=save_result  # Called for each result
    ) as processor:
        # Add work items...
        result = await processor.process_all()

Metrics Collection

Track metrics using observers:

from async_batch_llm.observers import MetricsObserver

async def process_with_metrics():
    metrics = MetricsObserver()

    agent = Agent("gemini-2.5-flash", result_type=str)
    strategy = PydanticAIStrategy(agent=agent)

    config = ProcessorConfig(max_workers=5)

    async with ParallelBatchProcessor(
        config=config,
        observers=[metrics]
    ) as processor:
        # Add work items...
        result = await processor.process_all()

        # Get collected metrics
        collected_metrics = await metrics.get_metrics()
        print(f"Items processed: {collected_metrics['items_processed']}")
        print(f"Succeeded: {collected_metrics['items_succeeded']}")
        print(f"Failed: {collected_metrics['items_failed']}")