Skip to content

Examples

This guide provides practical examples for using aiogzip in various scenarios.

CSV Processing with aiocsv

aiogzip pairs perfectly with aiocsv for efficient, asynchronous CSV processing.

Reading a CSV File

import asyncio
import aiocsv
from aiogzip import AsyncGzipTextFile

async def read_csv():
    async with AsyncGzipTextFile("data.csv.gz", "rt", encoding="utf-8", newline="") as f:
        # Use AsyncDictReader for dictionary-based access
        async for row in aiocsv.AsyncDictReader(f):
            print(f"Name: {row['name']}, Age: {row['age']}")

asyncio.run(read_csv())

Writing a CSV File

import asyncio
import aiocsv
from aiogzip import AsyncGzipTextFile

async def write_csv():
    data = [
        {"name": "Alice", "age": 30, "city": "New York"},
        {"name": "Bob", "age": 25, "city": "Los Angeles"},
    ]

    async with AsyncGzipTextFile("output.csv.gz", "wt", encoding="utf-8", newline="") as f:
        writer = aiocsv.AsyncDictWriter(f, fieldnames=["name", "age", "city"])
        await writer.writeheader()
        for row in data:
            await writer.writerow(row)

asyncio.run(write_csv())

JSON Lines (JSONL) Processing

Processing large compressed JSONL files is a common task in data engineering.

Reading JSONL

import asyncio
import json
from aiogzip import AsyncGzipTextFile

async def process_jsonl():
    async with AsyncGzipTextFile("logs.jsonl.gz", "rt") as f:
        async for line in f:
            try:
                record = json.loads(line)
                if record.get("level") == "ERROR":
                    print(f"Error found: {record['message']}")
            except json.JSONDecodeError:
                print(f"Skipping invalid JSON: {line[:50]}...")

asyncio.run(process_jsonl())

Reading JSONL Efficiently

If your data is standard UTF-8 JSONL with \n line endings, configure the text reader explicitly:

import asyncio
import json
from aiogzip import AsyncGzipTextFile

async def process_jsonl_fast():
    async with AsyncGzipTextFile(
        "logs.jsonl.gz",
        "rt",
        newline="\n",
        chunk_size=512 * 1024,
    ) as f:
        async for line in f:
            record = json.loads(line)
            # Process record...

asyncio.run(process_jsonl_fast())

Why this helps:

  • newline="\n" skips universal-newline handling that JSONL does not need.
  • A larger chunk_size reduces async read overhead during line iteration.

This is a good default for large gzipped JSONL files. If memory pressure matters more than throughput, lower the chunk size.

Concurrent File Processing

One of the biggest advantages of aiogzip is the ability to process multiple files concurrently without blocking the event loop.

import asyncio
from aiogzip import AsyncGzipTextFile

async def process_file(filename):
    print(f"Starting {filename}...")
    line_count = 0
    async with AsyncGzipTextFile(filename, "rt") as f:
        async for _ in f:
            line_count += 1
    print(f"Finished {filename}: {line_count} lines")
    return line_count

async def main():
    files = ["data1.gz", "data2.gz", "data3.gz"]

    # Create tasks for all files
    tasks = [process_file(f) for f in files]

    # Run them concurrently
    results = await asyncio.gather(*tasks)

    print(f"Total lines processed: {sum(results)}")

# To run this example, ensure the files exist
# asyncio.run(main())

Error Handling

aiogzip raises standard OSError (or subclasses) for I/O issues, ensuring consistency with Python's built-in file handling.

import asyncio
from aiogzip import AsyncGzipFile

async def safe_read():
    try:
        async with AsyncGzipFile("non_existent.gz", "rb") as f:
            await f.read()
    except FileNotFoundError:
        print("File not found!")
    except OSError as e:
        print(f"An I/O error occurred: {e}")

asyncio.run(safe_read())

Reading Untrusted Files Safely

When reading gzip data from untrusted sources, cap decompressed output to avoid expanding a small compressed file into unbounded memory usage:

import asyncio
from aiogzip import AsyncGzipFile

async def read_untrusted(path):
    async with AsyncGzipFile(path, "rb", max_decompressed_size=100 * 1024 * 1024) as f:
        return await f.read()

asyncio.run(read_untrusted("upload.gz"))

If the input is a non-seekable stream, aiogzip may keep compressed bytes so backward seeks can replay the stream. The cache defaults to 128 MiB; reduce max_rewind_cache_size for memory-sensitive pipelines, or use forward-only reads and leave backward seeking disabled once the cap is exceeded.