Skip to main content

Large payload storage - Python SDK

Experimental

External payload storage is an experimental feature. The API may change in future releases.

The Temporal Service enforces a ~2 MB per payload limit. When your Workflows or Activities handle data larger than this, you can offload payloads to external storage (such as S3) and pass a small reference token through the event history instead. This is sometimes called the claim check pattern.

External storage sits at the end of the data pipeline, after both the Payload Converter and the Payload Codec:

User code → PayloadConverter → PayloadCodec → External Storage → Wire → Temporal Service

When a payload exceeds a configurable size threshold (default 256 KiB), the storage driver uploads it to your external store and replaces it with a lightweight reference. Payloads below the threshold stay inline in the event history. On the way back, reference payloads are retrieved from external storage before the codec decodes them.

Because external storage runs after the codec, payloads are already encrypted (if you use an encryption codec) before they're uploaded to your store.

Store and retrieve large payloads using external storage

To offload large payloads, implement a StorageDriver and configure it on your DataConverter. The driver needs a store() method to upload payloads and a retrieve() method to fetch them back.

Implement a storage driver

Extend the StorageDriver abstract class:

from temporalio.converter import (
StorageDriver,
StorageDriverClaim,
StorageDriverStoreContext,
StorageDriverRetrieveContext,
)
from temporalio.api.common.v1 import Payload
from typing import Sequence


class S3StorageDriver(StorageDriver):
def __init__(self, bucket: str) -> None:
self._bucket = bucket
self._s3 = boto3.client("s3")

def name(self) -> str:
return "s3"

async def store(self, context, payloads):
# Upload payloads, return claims (see below)
...

async def retrieve(self, context, claims):
# Download payloads using claims (see below)
...

Store payloads

The store() method receives a sequence of payloads and must return exactly one StorageDriverClaim per payload. A claim is a set of string key-value pairs that the driver uses to locate the payload later — typically a storage key or URL.

async def store(
self,
context: StorageDriverStoreContext,
payloads: Sequence[Payload],
) -> list[StorageDriverClaim]:
claims = []
for payload in payloads:
key = f"payloads/{uuid.uuid4()}"
self._s3.put_object(
Bucket=self._bucket,
Key=key,
Body=payload.SerializeToString(),
)
claims.append(StorageDriverClaim(claim_data={"key": key}))
return claims

Retrieve payloads

The retrieve() method receives the claims that store() produced and must return the original payloads:

async def retrieve(
self,
context: StorageDriverRetrieveContext,
claims: Sequence[StorageDriverClaim],
) -> list[Payload]:
payloads = []
for claim in claims:
response = self._s3.get_object(
Bucket=self._bucket,
Key=claim.claim_data["key"],
)
payload = Payload()
payload.ParseFromString(response["Body"].read())
payloads.append(payload)
return payloads

Configure external storage on the Data Converter

Pass an ExternalStorage instance to your DataConverter:

from temporalio.converter import DataConverter, ExternalStorage

converter = DataConverter(
external_storage=ExternalStorage(
drivers=[S3StorageDriver("my-bucket")],
payload_size_threshold=256 * 1024, # 256 KiB (default)
),
)

Use this converter when creating your Client and Worker:

from temporalio.client import Client

client = await Client.connect(
"localhost:7233",
data_converter=converter,
)

Adjust the size threshold

The payload_size_threshold controls which payloads get offloaded. Payloads smaller than this value stay inline in the event history.

ExternalStorage(
drivers=[driver],
payload_size_threshold=100 * 1024, # 100 KiB
)

Set it to None to externalize all payloads regardless of size.

Use multiple storage drivers

When you have multiple drivers (for example, hot and cold storage tiers), provide a driver_selector function that chooses which driver handles each payload:

hot_driver = S3StorageDriver("hot-bucket")
cold_driver = S3StorageDriver("cold-bucket")

ExternalStorage(
drivers=[hot_driver, cold_driver],
driver_selector=lambda context, payload: (
cold_driver if payload.ByteSize() > 1_000_000 else hot_driver
),
payload_size_threshold=100 * 1024,
)

Return None from the selector to keep a specific payload inline.

Organize stored payloads by Workflow

Without additional context, stored payloads end up keyed by random identifiers, making cleanup and debugging difficult. The StorageDriverStoreContext includes a serialization_context field that provides the identity of the Workflow or Activity that owns the data:

async def store(
self,
context: StorageDriverStoreContext,
payloads: Sequence[Payload],
) -> list[StorageDriverClaim]:
claims = []
prefix = "payloads"

# Use workflow identity for the storage key when available
sc = context.serialization_context
if sc is not None:
prefix = f"payloads/{sc.namespace}/{sc.workflow_id}"

for payload in payloads:
key = f"{prefix}/{uuid.uuid4()}"
self._s3.put_object(
Bucket=self._bucket,
Key=key,
Body=payload.SerializeToString(),
)
claims.append(StorageDriverClaim(claim_data={"key": key}))
return claims

This groups stored objects by namespace and Workflow ID, so you can find, inspect, or clean up payloads for a specific Workflow execution.

View externally stored payloads in the UI

When payloads are offloaded, the Temporal Web UI shows reference tokens instead of actual data. To make the UI display the real payload content, run a Codec Server that knows how to retrieve payloads from your external store.

Use a gRPC proxy instead of a codec

Instead of implementing storage in each SDK client, you can run a centralized gRPC proxy between your workers and the Temporal Service that intercepts and offloads large payloads transparently. The go.temporal.io/api/proxy package provides the building blocks for this approach.

If you use a gRPC proxy that alters payload sizes, disable the worker's eager payload size validation so it doesn't reject payloads that the proxy will shrink before they reach the server:

from temporalio.worker import Worker

worker = Worker(
client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
activities=[my_activity],
disable_payload_error_limit=True,
)