fix: Add retry logic for KeyValueStore.set_value and Dataset.push_data#1838
fix: Add retry logic for KeyValueStore.set_value and Dataset.push_data#1838Mantisus wants to merge 3 commits intoapify:masterfrom
KeyValueStore.set_value and Dataset.push_data#1838Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces a unified StorageWriteError and adds retry behavior for write operations (KeyValueStore.set_value and Dataset.push_data) to reduce the risk of data loss on transient storage failures (notably SQL/Redis).
Changes:
- Add
StorageWriteErrorand update SQL/Redis/FS storage clients to raise it on write failures instead of swallowing errors. - Add retry parameters (
max_attempts,wait_time_between_retries) toKeyValueStore.set_valueandDataset.push_data. - Add unit tests asserting that client-level write failures surface as
StorageWriteError.
Reviewed changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
src/crawlee/errors.py |
Introduces StorageWriteError for consistent write-failure signaling. |
src/crawlee/storages/_key_value_store.py |
Adds retry loop to KeyValueStore.set_value (currently logs instead of raising after retries). |
src/crawlee/storages/_dataset.py |
Adds retry loop to Dataset.push_data (currently logs instead of raising after retries). |
src/crawlee/storage_clients/_sql/_key_value_store_client.py |
Converts SQL write failures into StorageWriteError and avoids with_simple_commit swallow behavior. |
src/crawlee/storage_clients/_sql/_dataset_client.py |
Converts SQL write failures into StorageWriteError and avoids with_simple_commit swallow behavior. |
src/crawlee/storage_clients/_redis/_key_value_store_client.py |
Wraps Redis write failures as StorageWriteError. |
src/crawlee/storage_clients/_redis/_dataset_client.py |
Wraps Redis write failures as StorageWriteError. |
src/crawlee/storage_clients/_file_system/_key_value_store_client.py |
Wraps filesystem write failures as StorageWriteError. |
src/crawlee/storage_clients/_file_system/_dataset_client.py |
Wraps filesystem write failures as StorageWriteError. |
src/crawlee/storage_clients/_sql/_request_queue_client.py |
Adds an extra requests_db emptiness guard (currently redundant). |
tests/unit/storage_clients/_sql/test_sql_kvs_client.py |
Adds test ensuring SQL KVS write errors raise StorageWriteError. |
tests/unit/storage_clients/_sql/test_sql_dataset_client.py |
Adds test ensuring SQL Dataset write errors raise StorageWriteError. |
tests/unit/storage_clients/_redis/test_redis_kvs_client.py |
Adds test ensuring Redis KVS write errors raise StorageWriteError. |
tests/unit/storage_clients/_redis/test_redis_dataset_client.py |
Adds test ensuring Redis Dataset write errors raise StorageWriteError. |
tests/unit/storage_clients/_file_system/test_fs_kvs_client.py |
Adds test ensuring FS KVS write errors raise StorageWriteError. |
tests/unit/storage_clients/_file_system/test_fs_dataset_client.py |
Adds test ensuring FS Dataset write errors raise StorageWriteError. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| logger.warning( | ||
| f'Failed to set value for key "{key}" after {max_attempts} attempts ' | ||
| f'with error: {last_exception.cause}' | ||
| ) |
There was a problem hiding this comment.
After exhausting retries, this method only logs a warning and then returns successfully. That will silently drop failed writes (similar to the original bug report) and callers will think the value was persisted. After the final attempt, re-raise the last caught StorageWriteError (or wrap/raise a new one) instead of returning.
| ) | |
| ) | |
| raise last_exception |
There was a problem hiding this comment.
This is expected behavior. This PR allows users to configure retry settings, but propagating errors could cause issues.
| await asyncio.sleep(wait_time_between_retries_seconds) | ||
| else: | ||
| if last_exception: | ||
| logger.warning(f'Failed to push data after {max_attempts} attempts with error: {last_exception.cause}') |
There was a problem hiding this comment.
After exhausting retries, this method only logs a warning and then returns successfully. That means push_data failures can be silently ignored and data can be lost while the caller believes the write succeeded. After the final attempt, re-raise the last StorageWriteError (or raise a new one) rather than returning.
| logger.warning(f'Failed to push data after {max_attempts} attempts with error: {last_exception.cause}') | |
| logger.warning(f'Failed to push data after {max_attempts} attempts with error: {last_exception.cause}') | |
| raise last_exception |
There was a problem hiding this comment.
This is expected behavior. This PR allows users to configure retry settings, but propagating errors could cause issues.
| if not requests_db: | ||
| return None | ||
|
|
There was a problem hiding this comment.
This guard is redundant/unreachable because both the PostgreSQL/MySQL branch and the fallback branch already return early when requests_db is empty. Keeping it adds noise and suggests requests_db might be unset here when it shouldn't be.
| if not requests_db: | |
| return None |
There was a problem hiding this comment.
If an error occurs that triggers a rollback, this will prevent the failing
KeyValueStore.set_value and Dataset.push_dataKeyValueStore.set_value and Dataset.push_data
vdusek
left a comment
There was a problem hiding this comment.
This should be thought through carefully as it extends the public interface of the public components (storages). We want to keep the public interface consistent in JS & Py implementations as much as possible.
vdusek
left a comment
There was a problem hiding this comment.
We just had a brief discussion about this with @janbuchar, and we think this should be handled at the storage client level rather than leaking into the storages (frontend). One reason is that the Apify API client used by ApifyStorageClient already has its own retry mechanism. Adding another layer here would duplicate that behavior. Therefore, each storage client should manage retries on its own.
Description
DatasetandKeyValueStore(push_data/set_value). This is especially critical forSqlStorageClientandRedisStorageClientdue to the risk of data loss on transient failures.Issues