Skip to content

API Reference

aiogzip exposes its supported public API from the top-level package:

  • AsyncGzipBinaryFile
  • AsyncGzipTextFile
  • AsyncGzipFile

Implementation internals live in aiogzip._common, aiogzip._binary, and aiogzip._text. Treat those modules as private and unstable unless symbols are explicitly re-exported by aiogzip.

aiogzip

Async gzip file reader/writer public API.

AsyncGzipBinaryFile

AsyncGzipBinaryFile(filename: Union[str, bytes, Path, None], mode: str = 'rb', chunk_size: int = DEFAULT_CHUNK_SIZE, compresslevel: int = 6, mtime: Optional[Union[int, float]] = None, original_filename: Optional[Union[str, bytes]] = None, fileobj: Optional[WithAsyncReadWrite] = None, closefd: Optional[bool] = None)

An asynchronous gzip file reader/writer for binary data.

This class provides async gzip compression/decompression for binary data, making it a drop-in replacement for gzip.open() in binary mode.

Features: - Full compatibility with gzip.open() file format - Binary mode only (no text encoding/decoding) - Async context manager support - Configurable chunk size for performance tuning

Basic Usage

Write binary data

async with AsyncGzipBinaryFile("data.gz", "wb") as f: await f.write(b"Hello, World!")

Read binary data

async with AsyncGzipBinaryFile("data.gz", "rb") as f: data = await f.read() # Returns bytes

Interoperability with gzip.open(): # Files created by AsyncGzipBinaryFile can be read by gzip.open() async with AsyncGzipBinaryFile("data.gz", "wb") as f: await f.write(b"data")

with gzip.open("data.gz", "rb") as f:
    data = f.read()  # Works perfectly!
Source code in src/aiogzip/_binary.py
def __init__(
    self,
    filename: Union[str, bytes, Path, None],
    mode: str = "rb",
    chunk_size: int = DEFAULT_CHUNK_SIZE,
    compresslevel: int = 6,
    mtime: Optional[Union[int, float]] = None,
    original_filename: Optional[Union[str, bytes]] = None,
    fileobj: Optional[WithAsyncReadWrite] = None,
    closefd: Optional[bool] = None,
) -> None:
    # Validate inputs using shared validation functions
    _validate_filename(filename, fileobj)
    _validate_chunk_size(chunk_size)

    # Validate mode and derive file characteristics
    mode_op, saw_b, saw_t, plus = _parse_mode_tokens(mode)
    if saw_t:
        raise ValueError("Binary mode cannot include text ('t')")
    if mode_op not in {"r", "w", "a", "x"}:
        raise ValueError(f"Invalid mode '{mode}'.")

    self._filename = filename
    self._mode = mode
    self._mode_op = mode_op
    self._mode_plus = plus
    self._writing_mode = mode_op in {"w", "a", "x"}
    if self._writing_mode:
        _validate_compresslevel(compresslevel)
    self._chunk_size = chunk_size
    self._compresslevel = compresslevel
    self._header_mtime = _normalize_mtime(mtime)
    self._header_filename_override = _validate_original_filename(original_filename)
    self._external_file = fileobj
    self._closefd = closefd if closefd is not None else fileobj is None

    # Determine the underlying file mode based on gzip mode
    file_mode_suffix = "b"
    self._file_mode = f"{mode_op}{file_mode_suffix}"
    if plus:
        self._file_mode += "+"

    self._file: Any = None
    self._engine: ZlibEngine = None
    self._buffer = bytearray()  # Use bytearray for efficient buffer growth
    self._buffer_offset: int = 0  # Offset to the start of valid data in _buffer
    self._is_closed: bool = False
    self._eof: bool = False
    self._owns_file: bool = False
    self._crc: int = 0
    self._input_size: int = 0
    self._position: int = 0
    self._mtime: Optional[int] = None
    self._header_probe_buffer = bytearray()
    self._compressed_cache = bytearray()
    self._replay_offset: Optional[int] = None
    self._cache_rewindable_reads: bool = False

closed property

closed: bool

Return True when this file has been closed.

mtime property

mtime: Optional[int]

Return the gzip member mtime after the header has been read.

name property

name: Union[str, bytes, Path, None]

Return the name of the file.

This property provides compatibility with the standard file API. Returns the filename passed to the constructor, or falls back to the underlying file object's name attribute when available.

Returns:

Type Description
Union[str, bytes, Path, None]

The filename as str, bytes, or Path, or None if no name is available.

__aenter__ async

__aenter__() -> AsyncGzipBinaryFile

Enter the async context manager and initialize resources.

Source code in src/aiogzip/_binary.py
async def __aenter__(self) -> "AsyncGzipBinaryFile":
    """Enter the async context manager and initialize resources."""
    try:
        if self._external_file is not None:
            self._file = self._external_file
            self._owns_file = False
        else:
            if self._filename is None:
                raise ValueError(
                    "Filename must be provided when fileobj is not given"
                )
            self._file = await aiofiles.open(  # type: ignore
                self._filename, self._file_mode
            )
            self._owns_file = True

        # Initialize compression/decompression engine based on mode
        if self._writing_mode:
            self._engine = zlib.compressobj(
                level=self._compresslevel, wbits=-zlib.MAX_WBITS
            )
            header = _build_gzip_header(
                _derive_header_filename(
                    self._header_filename_override, self._filename
                ),
                self._header_mtime,
                self._compresslevel,
            )
            await self._file.write(header)
            self._crc = 0
            self._input_size = 0
        else:  # read mode
            self._engine = zlib.decompressobj(wbits=GZIP_WBITS)
            self._position = 0
            self._mtime = None
            self._header_probe_buffer.clear()
            self._compressed_cache.clear()
            self._replay_offset = None
            seek_method = getattr(self._file, "seek", None)
            self._cache_rewindable_reads = not callable(seek_method)

        return self
    except Exception:
        await self._cleanup_failed_enter()
        raise

__aexit__ async

__aexit__(exc_type: Optional[type], exc_val: Optional[BaseException], exc_tb: Optional[Any]) -> None

Exit the context manager, flushing and closing the file.

Source code in src/aiogzip/_binary.py
async def __aexit__(
    self,
    exc_type: Optional[type],
    exc_val: Optional[BaseException],
    exc_tb: Optional[Any],
) -> None:
    """Exit the context manager, flushing and closing the file."""
    await self.close()

__aiter__

__aiter__() -> AsyncGzipBinaryFile

Make AsyncGzipBinaryFile iterable over newline-delimited chunks.

Source code in src/aiogzip/_binary.py
def __aiter__(self) -> "AsyncGzipBinaryFile":
    """Make AsyncGzipBinaryFile iterable over newline-delimited chunks."""
    return self

__anext__ async

__anext__() -> bytes

Return the next line from the binary stream.

Source code in src/aiogzip/_binary.py
async def __anext__(self) -> bytes:
    """Return the next line from the binary stream."""
    if self._is_closed:
        raise StopAsyncIteration
    line = await self.readline()
    if line == b"":
        raise StopAsyncIteration
    return line

close async

close() -> None

Flushes any remaining compressed data and closes the file.

Source code in src/aiogzip/_binary.py
async def close(self) -> None:
    """Flushes any remaining compressed data and closes the file."""
    if self._is_closed:
        return

    # Mark as closed immediately to prevent concurrent close attempts
    self._is_closed = True

    try:
        if self._writing_mode and self._file is not None:
            # Flush the compressor to write the gzip trailer
            remaining_data = self._engine.flush()
            if remaining_data:
                await self._file.write(remaining_data)
            trailer = _build_gzip_trailer(self._crc, self._input_size)
            await self._file.write(trailer)

        if self._file is not None and (self._owns_file or self._closefd):
            # Close only if we own it or closefd=True
            close_method = getattr(self._file, "close", None)
            if callable(close_method):
                result = close_method()
                if hasattr(result, "__await__"):
                    await result
    except Exception:
        # If an error occurs during close, we're still closed
        # but we need to propagate the exception
        raise

detach

detach() -> Any

Detach is unsupported to mirror gzip.GzipFile behavior.

Source code in src/aiogzip/_binary.py
def detach(self) -> Any:
    """Detach is unsupported to mirror gzip.GzipFile behavior."""
    raise io.UnsupportedOperation("detach")

fileno

fileno() -> int

Return the underlying file descriptor number.

Source code in src/aiogzip/_binary.py
def fileno(self) -> int:
    """Return the underlying file descriptor number."""
    if self._file is None:
        raise ValueError("File not opened. Use async context manager.")
    fileno_method = getattr(self._file, "fileno", None)
    if fileno_method is None:
        raise io.UnsupportedOperation("fileno() not supported by underlying file")
    result = fileno_method()
    if hasattr(result, "__await__"):
        raise io.UnsupportedOperation(
            "fileno() is not awaitable in underlying file"
        )
    return int(result)

flush async

flush() -> None

Flush any buffered compressed data to the file.

In write/append mode, this forces any buffered compressed data to be written to the underlying file. Note that this does NOT write the gzip trailer - use close() for that.

In read mode, this is a no-op for compatibility with the file API.

Examples:

async with AsyncGzipBinaryFile("file.gz", "wb") as f: await f.write(b"Hello") await f.flush() # Ensure data is written await f.write(b" World")

Source code in src/aiogzip/_binary.py
async def flush(self) -> None:
    """
    Flush any buffered compressed data to the file.

    In write/append mode, this forces any buffered compressed data to be
    written to the underlying file. Note that this does NOT write the gzip
    trailer - use close() for that.

    In read mode, this is a no-op for compatibility with the file API.

    Examples:
        async with AsyncGzipBinaryFile("file.gz", "wb") as f:
            await f.write(b"Hello")
            await f.flush()  # Ensure data is written
            await f.write(b" World")
    """
    if self._is_closed:
        raise ValueError("I/O operation on closed file.")

    if self._writing_mode and self._file is not None:
        # Flush any buffered compressed data (but not the final trailer)
        # Using Z_SYNC_FLUSH allows us to flush without ending the stream
        try:
            flushed_data = self._engine.flush(zlib.Z_SYNC_FLUSH)
            if flushed_data:
                await self._file.write(flushed_data)

            # Also flush the underlying file if it has a flush method
            flush_method = getattr(self._file, "flush", None)
            if callable(flush_method):
                result = flush_method()
                if hasattr(result, "__await__"):
                    await result
        except zlib.error as e:
            raise OSError(f"Error flushing compressed data: {e}") from e
        except OSError:
            raise
        except Exception as e:
            raise OSError(f"Unexpected error during flush: {e}") from e

isatty

isatty() -> bool

Return True if the underlying stream is interactive.

Source code in src/aiogzip/_binary.py
def isatty(self) -> bool:
    """Return True if the underlying stream is interactive."""
    if self._file is None:
        return False
    isatty_method = getattr(self._file, "isatty", None)
    if not callable(isatty_method):
        return False
    result = isatty_method()
    if hasattr(result, "__await__"):
        close_method = getattr(result, "close", None)
        if callable(close_method):
            close_method()
        return False
    return bool(result)

peek async

peek(size: int = -1) -> bytes

Return up to size bytes without advancing the read position.

Source code in src/aiogzip/_binary.py
async def peek(self, size: int = -1) -> bytes:
    """Return up to size bytes without advancing the read position."""
    if self._mode_op != "r":
        raise OSError("File not open for reading")
    if self._file is None:
        raise ValueError("File not opened. Use async context manager.")
    available = len(self._buffer) - self._buffer_offset
    target = size
    if target is None or target <= 0:
        target = available if available > 0 else 1
    while available < target and not self._eof:
        await self._fill_buffer()
        available = len(self._buffer) - self._buffer_offset
        if available == 0 and self._eof:
            break
    end = self._buffer_offset + min(target, available)
    return bytes(self._buffer[self._buffer_offset : end])

raw

raw() -> Any

Expose the underlying file object for advanced integrations.

Source code in src/aiogzip/_binary.py
def raw(self) -> Any:
    """Expose the underlying file object for advanced integrations."""
    return self._file

read async

read(size: int = -1) -> bytes

Reads and decompresses binary data from the file.

Parameters:

Name Type Description Default
size int

Number of bytes to read (-1 for all remaining data)

-1

Returns:

Type Description
bytes

bytes

Examples:

async with AsyncGzipBinaryFile("file.gz", "rb") as f: data = await f.read() # Returns bytes partial = await f.read(100) # Returns first 100 bytes

Source code in src/aiogzip/_binary.py
async def read(self, size: int = -1) -> bytes:
    """
    Reads and decompresses binary data from the file.

    Args:
        size: Number of bytes to read (-1 for all remaining data)

    Returns:
        bytes

    Examples:
        async with AsyncGzipBinaryFile("file.gz", "rb") as f:
            data = await f.read()  # Returns bytes
            partial = await f.read(100)  # Returns first 100 bytes
    """
    if self._mode_op != "r":
        raise OSError("File not open for reading")
    if self._is_closed:
        raise ValueError("I/O operation on closed file.")
    if self._file is None:
        raise ValueError("File not opened. Use async context manager.")

    if size is None:
        size = -1
    if size < 0:
        size = -1

    # If size is -1, read all data in chunks to avoid memory issues
    if size == -1:
        # Return buffered data + read remaining (no recursion)
        chunks = []
        total_read = 0
        if self._buffer_offset < len(self._buffer):
            chunk = bytes(self._buffer[self._buffer_offset :])
            chunks.append(chunk)
            total_read += len(chunk)

        del self._buffer[:]  # Clear while retaining capacity
        self._buffer_offset = 0

        while not self._eof:
            await self._fill_buffer()
            if self._buffer:
                chunk = bytes(self._buffer)
                chunks.append(chunk)
                total_read += len(chunk)
                del self._buffer[:]  # Clear while retaining capacity

        data = b"".join(chunks)
        self._position += total_read
        return data
    else:
        # Otherwise, read until the buffer has enough data to satisfy the request.
        while (len(self._buffer) - self._buffer_offset) < size and not self._eof:
            # If buffer has too much garbage at the front, compact it
            if self._buffer_offset > self.BUFFER_COMPACTION_THRESHOLD:
                del self._buffer[: self._buffer_offset]
                self._buffer_offset = 0

            await self._fill_buffer()

        # Determine how much we can actually read
        available = len(self._buffer) - self._buffer_offset
        actual_read_size = min(size, available)

        data_to_return = bytes(
            self._buffer[
                self._buffer_offset : self._buffer_offset + actual_read_size
            ]
        )
        self._buffer_offset += actual_read_size
        self._position += actual_read_size

        # If we consumed everything, reset to keep buffer clean
        if self._buffer_offset >= len(self._buffer):
            del self._buffer[:]
            self._buffer_offset = 0

        return data_to_return

read1 async

read1(size: int = -1) -> bytes

Read up to size bytes from the buffer without looping.

Source code in src/aiogzip/_binary.py
async def read1(self, size: int = -1) -> bytes:
    """Read up to size bytes from the buffer without looping."""
    if self._mode_op != "r":
        raise OSError("File not open for reading")
    if self._is_closed:
        raise ValueError("I/O operation on closed file.")
    if self._file is None:
        raise ValueError("File not opened. Use async context manager.")

    if size is None:
        size = -1
    if size == 0:
        return b""

    available = len(self._buffer) - self._buffer_offset
    if available <= 0 and not self._eof:
        await self._fill_buffer()
        available = len(self._buffer) - self._buffer_offset

    if size is None or size < 0:
        actual_read_size = available
    else:
        actual_read_size = min(size, available)

    data_to_return = bytes(
        self._buffer[self._buffer_offset : self._buffer_offset + actual_read_size]
    )
    self._buffer_offset += actual_read_size
    self._position += actual_read_size

    if self._buffer_offset >= len(self._buffer):
        del self._buffer[:]
        self._buffer_offset = 0

    return data_to_return

readinto async

readinto(b: Union[bytearray, memoryview]) -> int

Read bytes directly into a pre-allocated, writable buffer.

Source code in src/aiogzip/_binary.py
async def readinto(self, b: Union[bytearray, memoryview]) -> int:
    """Read bytes directly into a pre-allocated, writable buffer."""
    if self._mode_op != "r":
        raise OSError("File not open for reading")
    if self._file is None:
        raise ValueError("File not opened. Use async context manager.")
    view = memoryview(b)
    if view.readonly:
        raise TypeError("readinto() argument must be writable")
    data = await self.read(len(view))
    view[: len(data)] = data
    return len(data)

readinto1 async

readinto1(b: Union[bytearray, memoryview]) -> int

Read directly into the buffer without looping.

Source code in src/aiogzip/_binary.py
async def readinto1(self, b: Union[bytearray, memoryview]) -> int:
    """Read directly into the buffer without looping."""
    if self._mode_op != "r":
        raise OSError("File not open for reading")
    if self._file is None:
        raise ValueError("File not opened. Use async context manager.")
    view = memoryview(b)
    if view.readonly:
        raise TypeError("readinto() argument must be writable")
    data = await self.read1(len(view))
    view[: len(data)] = data
    return len(data)

readline async

readline(limit: int = -1) -> bytes

Read and return one line from the binary stream.

Source code in src/aiogzip/_binary.py
async def readline(self, limit: int = -1) -> bytes:
    """Read and return one line from the binary stream."""
    if self._mode_op != "r":
        raise OSError("File not open for reading")
    if self._is_closed:
        raise ValueError("I/O operation on closed file.")
    if self._file is None:
        raise ValueError("File not opened. Use async context manager.")
    if limit is None:
        limit = -1
    if limit == 0:
        return b""

    chunks: List[bytes] = []
    total = 0
    while True:
        if self._buffer_offset >= len(self._buffer):
            if self._eof:
                break
            await self._fill_buffer()
            if self._buffer_offset >= len(self._buffer) and self._eof:
                break
            if self._buffer_offset >= len(self._buffer):
                continue

        start = self._buffer_offset
        end = len(self._buffer)
        newline_index = self._buffer.find(b"\n", start)
        if newline_index != -1:
            end = newline_index + 1
        if limit != -1:
            remaining = limit - total
            if remaining <= 0:
                break
            end = min(end, start + remaining)

        if end <= start:
            break

        chunk = bytes(self._buffer[start:end])
        chunks.append(chunk)
        consumed = end - start
        self._buffer_offset = end
        self._position += consumed
        total += consumed

        if self._buffer_offset >= len(self._buffer):
            del self._buffer[:]
            self._buffer_offset = 0

        if (newline_index != -1 and end == newline_index + 1) or (
            limit != -1 and total >= limit
        ):
            break

    return b"".join(chunks)

readlines async

readlines(hint: int = -1) -> List[bytes]

Read and return a list of lines from the binary stream.

Source code in src/aiogzip/_binary.py
async def readlines(self, hint: int = -1) -> List[bytes]:
    """Read and return a list of lines from the binary stream."""
    if self._mode_op != "r":
        raise OSError("File not open for reading")
    if self._is_closed:
        raise ValueError("I/O operation on closed file.")

    lines: List[bytes] = []
    total = 0
    while True:
        line = await self.readline()
        if not line:
            break
        lines.append(line)
        total += len(line)
        if hint > 0 and total >= hint:
            break
    return lines

seek async

seek(offset: int, whence: int = os.SEEK_SET) -> int

Move to a new file position, mirroring gzip.GzipFile semantics.

Source code in src/aiogzip/_binary.py
async def seek(self, offset: int, whence: int = os.SEEK_SET) -> int:
    """Move to a new file position, mirroring gzip.GzipFile semantics."""
    if self._file is None:
        raise ValueError("File not opened. Use async context manager.")
    if self._writing_mode:
        if whence == os.SEEK_CUR:
            target = self._position + offset
        elif whence == os.SEEK_SET:
            target = offset
        else:
            raise ValueError("Seek from end not supported in write mode")
        if target < self._position:
            raise OSError("Negative seek in write mode")
        count = target - self._position
        if count > 0:
            zero_chunk = b"\x00" * min(1024, count)
            remaining = count
            while remaining > 0:
                chunk = (
                    zero_chunk
                    if remaining >= len(zero_chunk)
                    else zero_chunk[:remaining]
                )
                await self.write(chunk)
                remaining -= len(chunk)
        return self._position

    if whence == os.SEEK_SET:
        target = offset
    elif whence == os.SEEK_CUR:
        target = self._position + offset
    elif whence == os.SEEK_END:
        while not self._eof:
            await self._fill_buffer()
            buffered = len(self._buffer) - self._buffer_offset
            if buffered > 0:
                self._buffer_offset = len(self._buffer)
                self._position += buffered
                del self._buffer[:]
                self._buffer_offset = 0
        target = self._position + offset
        if target < 0:
            target = 0
        elif target > self._position:
            target = self._position
    else:
        raise ValueError("Invalid whence value")

    if target < 0:
        raise OSError("Negative seek in read mode")

    if target < self._position:
        await self._rewind_reader()

    await self._consume_bytes(target - self._position)
    return self._position

tell async

tell() -> int

Return the current uncompressed file position.

Source code in src/aiogzip/_binary.py
async def tell(self) -> int:
    """Return the current uncompressed file position."""
    return self._position

truncate

truncate(size: Optional[int] = None) -> int

Truncation is unsupported for gzip-compressed streams.

Source code in src/aiogzip/_binary.py
def truncate(self, size: Optional[int] = None) -> int:
    """Truncation is unsupported for gzip-compressed streams."""
    raise io.UnsupportedOperation("truncate")

write async

write(data: Union[bytes, bytearray, memoryview]) -> int

Compresses and writes binary data to the file.

Parameters:

Name Type Description Default
data Union[bytes, bytearray, memoryview]

Bytes to write

required

Examples:

async with AsyncGzipBinaryFile("file.gz", "wb") as f: await f.write(b"Hello, World!") # Bytes input

Source code in src/aiogzip/_binary.py
async def write(self, data: Union[bytes, bytearray, memoryview]) -> int:
    """
    Compresses and writes binary data to the file.

    Args:
        data: Bytes to write

    Examples:
        async with AsyncGzipBinaryFile("file.gz", "wb") as f:
            await f.write(b"Hello, World!")  # Bytes input
    """
    if not self._writing_mode:
        raise OSError("File not open for writing")
    if self._is_closed:
        raise ValueError("I/O operation on closed file.")
    if self._file is None:
        raise ValueError("File not opened. Use async context manager.")

    buffer = self._coerce_byteslike(data)
    self._crc = zlib.crc32(buffer, self._crc)
    self._input_size += len(buffer)
    self._position = self._input_size

    try:
        compressed = self._engine.compress(buffer)
        if compressed:
            await self._file.write(compressed)
    except zlib.error as e:
        raise OSError(f"Error compressing data: {e}") from e
    except OSError:
        # Re-raise I/O errors as-is
        raise
    except Exception as e:
        raise OSError(f"Unexpected error during compression: {e}") from e

    return len(buffer)

writelines async

writelines(lines: Iterable[bytes]) -> None

Write a sequence of bytes-like lines to the binary stream.

Source code in src/aiogzip/_binary.py
async def writelines(self, lines: Iterable[bytes]) -> None:
    """Write a sequence of bytes-like lines to the binary stream."""
    if not self._writing_mode:
        raise OSError("File not open for writing")
    if self._is_closed:
        raise ValueError("I/O operation on closed file.")

    for line in lines:
        await self.write(line)

AsyncGzipTextFile

AsyncGzipTextFile(filename: Union[str, bytes, Path, None], mode: str = 'rt', chunk_size: int = AsyncGzipBinaryFile.DEFAULT_CHUNK_SIZE, encoding: Optional[str] = 'utf-8', errors: Optional[str] = 'strict', newline: Union[str, None] = None, compresslevel: int = 6, mtime: Optional[Union[int, float]] = None, original_filename: Optional[Union[str, bytes]] = None, fileobj: Optional[WithAsyncReadWrite] = None, closefd: Optional[bool] = None)

An asynchronous gzip file reader/writer for text data.

This class wraps AsyncGzipBinaryFile and provides text mode operations with proper UTF-8 handling for multi-byte characters.

Features: - Full compatibility with gzip.open() file format - Text mode with automatic encoding/decoding - Proper handling of multi-byte UTF-8 characters - Line-by-line iteration support - Async context manager support

Basic Usage

Write text data

async with AsyncGzipTextFile("data.gz", "wt") as f: await f.write("Hello, World!") # String input

Read text data

async with AsyncGzipTextFile("data.gz", "rt") as f: text = await f.read() # Returns string

Line-by-line iteration

async with AsyncGzipTextFile("data.gz", "rt") as f: async for line in f: print(line.strip())

Source code in src/aiogzip/_text.py
def __init__(
    self,
    filename: Union[str, bytes, Path, None],
    mode: str = "rt",
    chunk_size: int = AsyncGzipBinaryFile.DEFAULT_CHUNK_SIZE,
    encoding: Optional[str] = "utf-8",
    errors: Optional[str] = "strict",
    newline: Union[str, None] = None,
    compresslevel: int = 6,
    mtime: Optional[Union[int, float]] = None,
    original_filename: Optional[Union[str, bytes]] = None,
    fileobj: Optional[WithAsyncReadWrite] = None,
    closefd: Optional[bool] = None,
) -> None:
    # Validate inputs using shared validation functions
    _validate_filename(filename, fileobj)
    _validate_chunk_size(chunk_size)

    # Validate text-specific parameters
    if encoding is None:
        encoding = "utf-8"
    if not encoding:
        raise ValueError("Encoding cannot be empty")
    if errors is None:
        errors = "strict"
    if newline not in {None, "", "\n", "\r", "\r\n"}:
        raise ValueError(f"illegal newline value: {newline}")

    mode_op, saw_b, saw_t, plus = _parse_mode_tokens(mode)
    if saw_b:
        raise ValueError("Text mode cannot include binary ('b')")
    if mode_op not in {"r", "w", "a", "x"}:
        raise ValueError(f"Invalid mode '{mode}'.")

    self._filename = filename
    self._mode = mode
    self._mode_op = mode_op
    self._mode_plus = plus
    self._writing_mode = mode_op in {"w", "a", "x"}
    if self._writing_mode:
        _validate_compresslevel(compresslevel)
    self._chunk_size = chunk_size
    self._encoding = encoding
    self._errors = errors
    self._newline = newline
    self._compresslevel = compresslevel
    self._header_mtime = _normalize_mtime(mtime)
    self._header_filename_override = _validate_original_filename(original_filename)
    self._external_file = fileobj
    self._closefd = closefd if closefd is not None else fileobj is None

    # Determine the underlying binary file mode
    self._binary_mode = f"{mode_op}b"
    if plus:
        self._binary_mode += "+"

    self._binary_file: Optional[AsyncGzipBinaryFile] = None
    self._is_closed: bool = False

    # Decoder and buffer state
    self._decoder = codecs.getincrementaldecoder(self._encoding)(
        errors=self._errors
    )
    self._text_buffer: str = ""  # Backing store for buffered decoded text
    self._text_buffer_offset: int = 0  # Start of unread text within _text_buffer
    self._trailing_cr: bool = False  # Track if last decoded chunk ended with \r
    self._seen_newline_types: int = 0
    self._cookie_nonce: int = secrets.randbits(64)
    initial_decoder_state = self._decoder.getstate()
    self._buffer_origin_offset: int = 0
    self._buffer_origin_decoder_state: Tuple[Any, int] = initial_decoder_state
    self._buffer_origin_trailing_cr: bool = False
    self._buffer_origin_seen_newline_types: int = 0

buffer property

buffer: AsyncGzipBinaryFile

Expose the underlying binary gzip stream.

closed property

closed: bool

Return True when this file has been closed.

encoding property

encoding: str

Return the configured text encoding.

errors property

errors: str

Return the configured text error handler.

name property

name: Union[str, bytes, Path, None]

Return the name of the file.

This property provides compatibility with the standard file API. Returns the filename passed to the constructor, or falls back to the underlying file object's name attribute when available.

Returns:

Type Description
Union[str, bytes, Path, None]

The filename as str, bytes, or Path, or None if no name is available.

newlines property

newlines: Optional[Union[str, Tuple[str, ...]]]

Return newline types observed while reading, like TextIOWrapper.

__aenter__ async

__aenter__() -> AsyncGzipTextFile

Enter the async context manager and initialize resources.

Source code in src/aiogzip/_text.py
async def __aenter__(self) -> "AsyncGzipTextFile":
    """Enter the async context manager and initialize resources."""
    filename = os.fspath(self._filename) if self._filename is not None else None
    self._binary_file = AsyncGzipBinaryFile(
        filename=filename,
        mode=self._binary_mode,
        chunk_size=self._chunk_size,
        compresslevel=self._compresslevel,
        mtime=self._header_mtime,
        original_filename=self._header_filename_override,
        fileobj=self._external_file,
        closefd=self._closefd,
    )
    try:
        await self._binary_file.__aenter__()
    except Exception:
        try:
            await self._binary_file.close()
        except Exception:
            pass
        self._binary_file = None
        raise
    return self

__aexit__ async

__aexit__(exc_type: Optional[type], exc_val: Optional[BaseException], exc_tb: Optional[Any]) -> None

Exit the context manager, flushing and closing the file.

Source code in src/aiogzip/_text.py
async def __aexit__(
    self,
    exc_type: Optional[type],
    exc_val: Optional[BaseException],
    exc_tb: Optional[Any],
) -> None:
    """Exit the context manager, flushing and closing the file."""
    await self.close()

__aiter__

__aiter__() -> AsyncGzipTextFile

Make AsyncGzipTextFile iterable for line-by-line reading.

Source code in src/aiogzip/_text.py
def __aiter__(self) -> "AsyncGzipTextFile":
    """Make AsyncGzipTextFile iterable for line-by-line reading."""
    return self

__anext__ async

__anext__() -> str

Return the next line from the file.

Source code in src/aiogzip/_text.py
async def __anext__(self) -> str:
    """Return the next line from the file."""
    if self._is_closed:
        raise StopAsyncIteration

    # Read until we get a complete line
    while True:
        # Try to get a line from our buffer using newline-aware search
        buffered = self._buffered_text()
        pos, length = self._get_line_terminator_pos(buffered)
        if pos != -1:
            # Found a line terminator
            line = self._consume_buffer(pos + length)
            return line

        # Read more data
        has_more = await self._read_chunk_and_decode()
        if not has_more:
            # EOF
            if self._buffered_text_len() > 0:
                result = self._consume_buffer(self._buffered_text_len())
                return result  # Last line without newline
            else:
                raise StopAsyncIteration

close async

close() -> None

Closes the file.

Source code in src/aiogzip/_text.py
async def close(self) -> None:
    """Closes the file."""
    if self._is_closed:
        return

    # Mark as closed immediately to prevent concurrent close attempts
    self._is_closed = True

    try:
        if not self._writing_mode:
            # Flush the decoder to ensure all buffered bytes are processed
            # This is important for handling incomplete multi-byte characters at EOF
            self._decoder.decode(b"", final=True)

        if self._binary_file is not None:
            await self._binary_file.close()
    except Exception:
        # If an error occurs during close, we're still closed
        # but we need to propagate the exception
        raise

flush async

flush() -> None

Flush any buffered data to the file.

In write/append mode, this forces any buffered text to be encoded and written to the underlying binary file.

In read mode, this is a no-op for compatibility with the file API.

Examples:

async with AsyncGzipTextFile("file.gz", "wt") as f: await f.write("Hello") await f.flush() # Ensure data is written await f.write(" World")

Source code in src/aiogzip/_text.py
async def flush(self) -> None:
    """
    Flush any buffered data to the file.

    In write/append mode, this forces any buffered text to be encoded
    and written to the underlying binary file.

    In read mode, this is a no-op for compatibility with the file API.

    Examples:
        async with AsyncGzipTextFile("file.gz", "wt") as f:
            await f.write("Hello")
            await f.flush()  # Ensure data is written
            await f.write(" World")
    """
    if self._is_closed:
        raise ValueError("I/O operation on closed file.")

    if self._binary_file is not None:
        await self._binary_file.flush()

read async

read(size: int = -1) -> str

Reads and decodes text data from the file.

Parameters:

Name Type Description Default
size int

Number of characters to read (-1 for all remaining data)

-1

Returns:

Type Description
str

str

Examples:

async with AsyncGzipTextFile("file.gz", "rt") as f: text = await f.read() # Returns string partial = await f.read(100) # Returns first 100 chars as string

Source code in src/aiogzip/_text.py
async def read(self, size: int = -1) -> str:
    """
    Reads and decodes text data from the file.

    Args:
        size: Number of characters to read (-1 for all remaining data)

    Returns:
        str

    Examples:
        async with AsyncGzipTextFile("file.gz", "rt") as f:
            text = await f.read()  # Returns string
            partial = await f.read(100)  # Returns first 100 chars as string
    """
    if self._mode_op != "r":
        raise OSError("File not open for reading")
    if self._is_closed:
        raise ValueError("I/O operation on closed file.")
    if self._binary_file is None:
        raise ValueError("File not opened. Use async context manager.")

    if size is None:
        size = -1
    if size < 0:
        size = -1

    # Handle read(0) - should return empty string without draining buffer
    if size == 0:
        return ""

    if size == -1:
        # Read all remaining data
        # We use a list to accumulate chunks for performance
        chunks = []
        if self._buffered_text_len() > 0:
            chunks.append(self._consume_buffer(self._buffered_text_len()))

        while True:
            has_more = await self._read_chunk_and_decode()
            if not has_more:
                break
            if self._buffered_text_len() > 0:
                chunks.append(self._consume_buffer(self._buffered_text_len()))

        return "".join(chunks)
    else:
        # Check if we have enough data in our text buffer
        while self._buffered_text_len() < size:
            has_more = await self._read_chunk_and_decode()
            if not has_more:
                break

        # Return the requested number of characters
        return self._consume_buffer(size)

readline async

readline(limit: int = -1) -> str

Read and return one line from the file.

A line is defined as text ending with a newline character ('\n'). If the file ends without a newline, the last line is returned without one.

Parameters:

Name Type Description Default
limit int

Maximum number of characters to return. Stops at newline, EOF, or once the limit is reached (matching TextIOBase semantics).

-1

Returns:

Name Type Description
str str

The next line from the file, including the newline if present. Returns empty string at EOF.

Examples:

async with AsyncGzipTextFile("file.gz", "rt") as f: line = await f.readline() # Read one line while line: print(line.rstrip()) line = await f.readline()

Source code in src/aiogzip/_text.py
async def readline(self, limit: int = -1) -> str:
    """
    Read and return one line from the file.

    A line is defined as text ending with a newline character ('\\n').
    If the file ends without a newline, the last line is returned without one.

    Args:
        limit: Maximum number of characters to return. Stops at newline,
            EOF, or once the limit is reached (matching TextIOBase semantics).

    Returns:
        str: The next line from the file, including the newline if present.
             Returns empty string at EOF.

    Examples:
        async with AsyncGzipTextFile("file.gz", "rt") as f:
            line = await f.readline()  # Read one line
            while line:
                print(line.rstrip())
                line = await f.readline()
    """
    if self._is_closed:
        raise ValueError("I/O operation on closed file.")
    if self._mode_op != "r":
        raise OSError("File not open for reading")

    if limit is None:
        limit = -1
    if limit == 0:
        return ""

    # Try to get a line from our buffer using newline-aware search
    while True:
        buffered = self._buffered_text()
        pos, length = self._get_line_terminator_pos(buffered)

        if pos != -1:
            # Found a line terminator - extract the line
            end = pos + length
            if limit != -1 and end > limit:
                return self._consume_buffer(limit)
            return self._consume_buffer(end)

        # No terminator found - check if we have enough data for limit
        if limit != -1 and self._buffered_text_len() >= limit:
            line = self._consume_buffer(limit)
            return line

        # Need more data - try to read
        has_more = await self._read_chunk_and_decode()
        if not has_more:
            # EOF reached - return whatever is in the buffer
            if self._buffered_text_len() == 0:
                return ""
            if limit != -1 and self._buffered_text_len() > limit:
                return self._consume_buffer(limit)
            return self._consume_buffer(self._buffered_text_len())

readlines async

readlines(hint: int = -1) -> List[str]

Read and return a list of lines from the file.

Parameters:

Name Type Description Default
hint int

Optional size hint. If given and greater than 0, lines totaling approximately hint bytes are read (counted before decoding). The actual number of bytes read may be more or less than hint. If hint is -1 or not given, all remaining lines are read.

-1

Returns:

Type Description
List[str]

List[str]: A list of lines from the file, each including any trailing

List[str]

newline character.

Examples:

async with AsyncGzipTextFile("file.gz", "rt") as f: lines = await f.readlines() # Read all lines for line in lines: print(line.rstrip())

With size hint

async with AsyncGzipTextFile("file.gz", "rt") as f: lines = await f.readlines(1024) # Read ~1KB of lines

Source code in src/aiogzip/_text.py
async def readlines(self, hint: int = -1) -> List[str]:
    """
    Read and return a list of lines from the file.

    Args:
        hint: Optional size hint. If given and greater than 0, lines totaling
            approximately hint bytes are read (counted before decoding).
            The actual number of bytes read may be more or less than hint.
            If hint is -1 or not given, all remaining lines are read.

    Returns:
        List[str]: A list of lines from the file, each including any trailing
        newline character.

    Examples:
        async with AsyncGzipTextFile("file.gz", "rt") as f:
            lines = await f.readlines()  # Read all lines
            for line in lines:
                print(line.rstrip())

        # With size hint
        async with AsyncGzipTextFile("file.gz", "rt") as f:
            lines = await f.readlines(1024)  # Read ~1KB of lines
    """
    if self._is_closed:
        raise ValueError("I/O operation on closed file.")
    if self._mode_op != "r":
        raise OSError("File not open for reading")

    lines: List[str] = []
    total_size = 0

    while True:
        line = await self.readline()
        if not line:
            break
        lines.append(line)
        total_size += len(line)
        if hint > 0 and total_size >= hint:
            break

    return lines

write async

write(data: str) -> int

Encodes and writes text data to the file.

Parameters:

Name Type Description Default
data str

String to write

required

Examples:

async with AsyncGzipTextFile("file.gz", "wt") as f: await f.write("Hello, World!") # String input

Source code in src/aiogzip/_text.py
async def write(self, data: str) -> int:
    """
    Encodes and writes text data to the file.

    Args:
        data: String to write

    Examples:
        async with AsyncGzipTextFile("file.gz", "wt") as f:
            await f.write("Hello, World!")  # String input
    """
    if not self._writing_mode:
        raise OSError("File not open for writing")
    if self._is_closed:
        raise ValueError("I/O operation on closed file.")
    if self._binary_file is None:
        raise ValueError("File not opened. Use async context manager.")

    if not isinstance(data, str):
        raise TypeError("write() argument must be str, not bytes")

    # Translate newlines according to Python's text I/O semantics
    text_to_encode = data
    if self._newline is None:
        # Translate \n to os.linesep on write
        text_to_encode = text_to_encode.replace("\n", os.linesep)
    elif self._newline in ("\n", "\r", "\r\n"):
        text_to_encode = text_to_encode.replace("\n", self._newline)
    else:
        # newline == '' means no translation; any other value treat as no translation
        pass

    # Encode string to bytes
    encoded_data = text_to_encode.encode(self._encoding, errors=self._errors)
    await self._binary_file.write(encoded_data)
    return len(data)

writelines async

writelines(lines: Iterable[str]) -> None

Write a list of lines to the file.

Note that newlines are not added automatically; each string in the iterable should include its own line terminator if desired.

Parameters:

Name Type Description Default
lines Iterable[str]

An iterable of strings to write.

required

Examples:

async with AsyncGzipTextFile("file.gz", "wt") as f: await f.writelines(["line1\n", "line2\n", "line3\n"])

From a generator

async with AsyncGzipTextFile("file.gz", "wt") as f: await f.writelines(f"{i}\n" for i in range(100))

Source code in src/aiogzip/_text.py
async def writelines(self, lines: Iterable[str]) -> None:
    """
    Write a list of lines to the file.

    Note that newlines are not added automatically; each string in the
    iterable should include its own line terminator if desired.

    Args:
        lines: An iterable of strings to write.

    Examples:
        async with AsyncGzipTextFile("file.gz", "wt") as f:
            await f.writelines(["line1\\n", "line2\\n", "line3\\n"])

        # From a generator
        async with AsyncGzipTextFile("file.gz", "wt") as f:
            await f.writelines(f"{i}\\n" for i in range(100))
    """
    if not self._writing_mode:
        raise OSError("File not open for writing")
    if self._is_closed:
        raise ValueError("I/O operation on closed file.")

    for line in lines:
        await self.write(line)

WithAsyncRead

Bases: Protocol

Protocol for async file-like objects that can be read.

WithAsyncReadWrite

Bases: Protocol

Protocol for async file-like objects that can be read and written.

WithAsyncWrite

Bases: Protocol

Protocol for async file-like objects that can be written.

AsyncGzipFile

AsyncGzipFile(filename: Union[str, bytes, Path, None], mode: str = 'rb', **kwargs: Any) -> Union[AsyncGzipBinaryFile, AsyncGzipTextFile]

Factory function that returns the appropriate AsyncGzip class based on mode.

This provides backward compatibility with the original AsyncGzipFile interface while using the new separated binary and text file classes.

Parameters:

Name Type Description Default
filename Union[str, bytes, Path, None]

Path to the file

required
mode str

File mode ('rb', 'wb', 'rt', 'wt', etc.)

'rb'
**kwargs Any

Additional arguments passed to the appropriate class

{}

Returns:

Type Description
Union[AsyncGzipBinaryFile, AsyncGzipTextFile]

AsyncGzipBinaryFile for binary modes ('rb', 'wb', 'ab')

Union[AsyncGzipBinaryFile, AsyncGzipTextFile]

AsyncGzipTextFile for text modes ('rt', 'wt', 'at')

Source code in src/aiogzip/__init__.py
def AsyncGzipFile(
    filename: Union[str, bytes, Path, None], mode: str = "rb", **kwargs: Any
) -> Union[AsyncGzipBinaryFile, AsyncGzipTextFile]:
    """
    Factory function that returns the appropriate AsyncGzip class based on mode.

    This provides backward compatibility with the original AsyncGzipFile interface
    while using the new separated binary and text file classes.

    Args:
        filename: Path to the file
        mode: File mode ('rb', 'wb', 'rt', 'wt', etc.)
        **kwargs: Additional arguments passed to the appropriate class

    Returns:
        AsyncGzipBinaryFile for binary modes ('rb', 'wb', 'ab')
        AsyncGzipTextFile for text modes ('rt', 'wt', 'at')
    """
    if not isinstance(mode, str):
        raise TypeError("mode must be a string")
    text_mode = "t" in mode
    if not text_mode:
        for arg_name in ("encoding", "errors", "newline"):
            if kwargs.get(arg_name) is not None:
                raise ValueError(f"Argument '{arg_name}' not supported in binary mode")
        kwargs = {
            key: value
            for key, value in kwargs.items()
            if key not in {"encoding", "errors", "newline"}
        }
    if text_mode:
        return AsyncGzipTextFile(filename, mode, **kwargs)
    else:
        return AsyncGzipBinaryFile(filename, mode, **kwargs)