Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 47 additions & 4 deletions pyiceberg/avro/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io
import json
import os
import threading
from collections.abc import Callable
from dataclasses import dataclass
from enum import Enum
Expand All @@ -31,6 +32,8 @@
TypeVar,
)

from cachetools import LRUCache

from pyiceberg.avro.codecs import AVRO_CODEC_KEY, CODEC_MAPPING_ICEBERG_TO_AVRO, KNOWN_CODECS
from pyiceberg.avro.codecs.codec import Codec
from pyiceberg.avro.decoder import BinaryDecoder, new_decoder
Expand Down Expand Up @@ -68,6 +71,48 @@
_SCHEMA_KEY = "avro.schema"


# Cache for Avro-to-Iceberg schema conversion, keyed by raw schema JSON string.
# Manifests of the same type share the same Avro schema, so this avoids
# redundant JSON parsing and schema conversion on every manifest open.
_schema_cache: LRUCache[str, Schema] = LRUCache(maxsize=32)
_schema_cache_lock = threading.Lock()

# Cache for resolved reader trees, keyed by object identity of (file_schema,
# read_schema, read_types, read_enums). Reader objects are stateless — read()
# takes a decoder and returns decoded data without mutating self, so sharing
# cached readers across threads and calls is safe.
_reader_cache: LRUCache[tuple[int, ...], Reader] = LRUCache(maxsize=32)
_reader_cache_lock = threading.Lock()


def _cached_avro_to_iceberg(avro_schema_string: str) -> Schema:
"""Convert an Avro schema JSON string to an Iceberg Schema, with caching."""
with _schema_cache_lock:
if avro_schema_string in _schema_cache:
return _schema_cache[avro_schema_string]
schema = AvroSchemaConversion().avro_to_iceberg(json.loads(avro_schema_string))
with _schema_cache_lock:
_schema_cache[avro_schema_string] = schema
return schema


def _cached_resolve_reader(
file_schema: Schema,
read_schema: Schema,
read_types: dict[int, Callable[..., StructProtocol]],
read_enums: dict[int, Callable[..., Enum]],
) -> Reader:
"""Resolve a reader tree for the given schema pair, with caching."""
key = (id(file_schema), id(read_schema), id(read_types), id(read_enums))
with _reader_cache_lock:
if key in _reader_cache:
return _reader_cache[key]
reader = resolve_reader(file_schema, read_schema, read_types, read_enums)
with _reader_cache_lock:
_reader_cache[key] = reader
return reader


class AvroFileHeader(Record):
@property
def magic(self) -> bytes:
Expand Down Expand Up @@ -97,9 +142,7 @@ def compression_codec(self) -> type[Codec] | None:

def get_schema(self) -> Schema:
if _SCHEMA_KEY in self.meta:
avro_schema_string = self.meta[_SCHEMA_KEY]
avro_schema = json.loads(avro_schema_string)
return AvroSchemaConversion().avro_to_iceberg(avro_schema)
return _cached_avro_to_iceberg(self.meta[_SCHEMA_KEY])
else:
raise ValueError("No schema found in Avro file headers")

Expand Down Expand Up @@ -178,7 +221,7 @@ def __enter__(self) -> AvroFile[D]:
if not self.read_schema:
self.read_schema = self.schema

self.reader = resolve_reader(self.schema, self.read_schema, self.read_types, self.read_enums)
self.reader = _cached_resolve_reader(self.schema, self.read_schema, self.read_types, self.read_enums)

return self

Expand Down
31 changes: 21 additions & 10 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
RecursiveDict,
TableVersion,
)
from pyiceberg.utils.concurrent import ExecutorFactory
from pyiceberg.utils.config import Config, merge_config
from pyiceberg.utils.properties import property_as_bool
from pyiceberg.view import View
Expand All @@ -90,6 +91,7 @@
MANIFEST_LIST = "manifest list"
PREVIOUS_METADATA = "previous metadata"
METADATA = "metadata"
DATA_FILE = "data"
URI = "uri"
LOCATION = "location"
EXTERNAL_TABLE = "EXTERNAL_TABLE"
Expand Down Expand Up @@ -284,7 +286,7 @@ def list_catalogs() -> list[str]:


def delete_files(io: FileIO, files_to_delete: set[str], file_type: str) -> None:
"""Delete files.
"""Delete files in parallel.

Log warnings if failing to delete any file.

Expand All @@ -293,32 +295,41 @@ def delete_files(io: FileIO, files_to_delete: set[str], file_type: str) -> None:
files_to_delete: A set of file paths to be deleted.
file_type: The type of the file.
"""
for file in files_to_delete:

def _delete_file(file: str) -> None:
try:
io.delete(file)
except OSError:
logger.warning(f"Failed to delete {file_type} file {file}", exc_info=logger.isEnabledFor(logging.DEBUG))

executor = ExecutorFactory.get_or_create()
list(executor.map(_delete_file, files_to_delete))


def delete_data_files(io: FileIO, manifests_to_delete: list[ManifestFile]) -> None:
"""Delete data files linked to given manifests.

Deduplicates manifests by path before reading entries, since the same manifest
appears across multiple snapshots' manifest lists. Deletes data files in parallel.

Log warnings if failing to delete any file.

Args:
io: The FileIO used to delete the object.
manifests_to_delete: A list of manifest contains paths of data files to be deleted.
"""
deleted_files: dict[str, bool] = {}
unique_manifests: dict[str, ManifestFile] = {}
for manifest_file in manifests_to_delete:
unique_manifests.setdefault(manifest_file.manifest_path, manifest_file)

# Collect all unique data file paths
data_file_paths: set[str] = set()
for manifest_file in unique_manifests.values():
for entry in manifest_file.fetch_manifest_entry(io, discard_deleted=False):
path = entry.data_file.file_path
if not deleted_files.get(path, False):
try:
io.delete(path)
except OSError:
logger.warning(f"Failed to delete data file {path}", exc_info=logger.isEnabledFor(logging.DEBUG))
deleted_files[path] = True
data_file_paths.add(entry.data_file.file_path)

# Delete in parallel
delete_files(io, data_file_paths, DATA_FILE)


def _import_catalog(name: str, catalog_impl: str, properties: Properties) -> Catalog | None:
Expand Down
Loading