![]() |
| https://www.pexels.com/photo/man-wearing-black-and-white-stripe-shirt-looking-at-white-printer-papers-on-the-wall-212286/ |
Large language models are powerful, but calling them from real applications is surprisingly fragile. Network blips, rate limits (HTTP 429), timeouts, and transient service errors are the norm, not the exception. When an LLM call sits in the middle of a multi-step business process, a single failure can lose work that is expensive to recompute and annoying to retry by hand.
This post walks through two small but complete samples that pair Temporal with Azure OpenAI to make AI calls durable, retryable, and observable, without scattering retry loops and error handling across your code. source code as in a public git repo.
The first is a single-step translation agent; the second is a multi-step workflow that classifies a question and routes it to one of two specialized agents.
Why Temporal for AI calls?
Temporal is a durable execution engine. You write ordinary code, and Temporal guarantees it runs to completion — even across process crashes, deploys, and infrastructure failures. It does this by separating two concepts:
- Workflows — deterministic orchestration code that describes what should happen and in what order. Temporal records every step in an event history so a workflow can be replayed and resumed exactly where it left off.
- Activities — the side-effecting work (HTTP calls, database writes, calling Azure OpenAI). Activities can fail and are automatically retried according to a configurable policy.
What we built
- SimpleWorkflow — a single-purpose translation agent. It renders a system prompt instructing the model to translate the user's input to Japanese, then calls Azure OpenAI. One linear path, two activities.
- RoutingWorkflow — a multi-step pipeline. It first classifies the user's question, then routes it to one of two specialized agents (HR or FAQ) over the A2A protocol. This shows how Temporal coordinates branching logic and calls to external services.
temporal_openai/
worker.py # long-running worker, polls the task queue
start_workflow.py # client that starts the SimpleWorkflow
start_routing_workflow.py # client that starts the RoutingWorkflow
workflows/
simple_workflow.py # the simple, linear workflow
routing_workflow.py # the multi-step classify-then-route workflow
activities/
prompts.py # renders a Jinja2 prompt template
openai_responses.py # calls Azure OpenAI
a2a_client.py # calls an A2A agent and returns its reply text
agents/
server.py # shared Azure OpenAI-backed A2A server
hr_agent.py # HR agent entrypoint (port 9101)
faq_agent.py # FAQ agent entrypoint (port 9102)
prompts/
simple.jinja2 # "translate the input to Japanese" prompt
classify.jinja2 # "is this HR or FAQ?" classifier prompt
hr_agent.jinja2 # HR agent persona
faq_agent.jinja2 # FAQ agent persona
Part 1 — The simple workflow
SimpleWorkflow ├─ Activity: render_prompt (simple.jinja2 → "translate to Japanese" instructions) └─ Activity: openai.create (Azure OpenAI Responses API → Japanese translation)
The workflow
The workflow is pure orchestration. It calls two activities in sequence and returns the model's translated text. Notice there is no retry code, no try/except — Temporal handles that.
@workflow.defn
class SimpleWorkflow:
@workflow.run
async def run(self, input: str) -> str:
system_instructions = await workflow.execute_activity(
prompts.render_prompt,
prompts.RenderPromptRequest(template="simple.jinja2"),
start_to_close_timeout=timedelta(seconds=30),
)
result = await workflow.execute_activity(
openai_responses.create,
openai_responses.OpenAIResponsesRequest(
instructions=system_instructions,
input=input,
),
start_to_close_timeout=timedelta(seconds=30),
)
return result.output_textA key detail: workflow code must be deterministic, so imports of non-deterministic libraries are wrapped so Temporal's sandbox passes them through rather than re-executing them:
with workflow.unsafe.imports_passed_through():
from temporal_openai.activities import openai_responses, promptsThe Azure OpenAI activity
The activity is where the real I/O happens. We use the async Azure OpenAI client with Entra ID (passwordless) authentication via DefaultAzureCredential — no API keys in the code or environment.
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env", extra="ignore")
azure_openai_endpoint: str
azure_openai_api_version: str
azure_openai_deployed_model_name: str
@activity.defn
async def create(request: OpenAIResponsesRequest) -> Response:
settings = Settings()
token_provider = get_bearer_token_provider(
DefaultAzureCredential(), "https://cognitiveservices.azure.com/.default"
)
client = AsyncAzureOpenAI(
azure_endpoint=settings.azure_openai_endpoint,
api_version=settings.azure_openai_api_version,
azure_ad_token_provider=token_provider,
max_retries=0,
)
resp = await client.responses.create(
model=settings.azure_openai_deployed_model_name,
instructions=request.instructions,
input=request.input,
timeout=15,
)
return respTwo things worth calling out:
- max_retries=0 — we deliberately turn off the SDK's own retries and let Temporal own the retry policy, so retries are recorded in the event history and follow one consistent backoff strategy.
- Configuration is loaded with pydantic-settings. Because BaseSettings reads only real environment variables by default, we explicitly set env_file=".env" so local .env values are picked up.
Prompts as a separate activity
Rendering the prompt template is its own activity. This keeps file I/O out of the deterministic workflow path and makes prompts easy to evolve independently of orchestration logic.
@activity.defn
async def render_prompt(request: RenderPromptRequest) -> str:
template = _env.get_template(request.template)
return template.render(**request.context)
Passing rich types across the boundary
Activities exchange structured data (dataclasses and pydantic models), so the client and worker are configured with Temporal's pydantic data converter:
client = await Client.connect(
"localhost:7233",
data_converter=pydantic_data_converter,
)Starting a run
The client starts the workflow against a named task queue. We use a fixed workflow ID plus a conflict policy so re-running the sample cleanly replaces any stuck or in-flight execution instead of erroring out:
result = await client.execute_workflow(
SimpleWorkflow.run,
"Good morning, what is the weather like today?",
id="my-workflow-id",
task_queue="simple-python-task-queue",
id_conflict_policy=WorkflowIDConflictPolicy.TERMINATE_EXISTING,
)
print(f"Result: {result}")Here the input text is "Good morning, what is the weather like today?", and the printed result is its Japanese translation — the model never answers the question, it only translates it.
Part 2 — The multi-step routing workflow
The simple workflow is a straight line. Real applications branch: you often need to look at the input, decide what to do, and hand the work to a specialized component. RoutingWorkflow does exactly that:
RoutingWorkflow
├─ Activity: render_prompt (classify.jinja2 → classifier instructions)
├─ Activity: openai.create (Azure OpenAI → "HR" or "FAQ")
│ │
│ └─ workflow decides which agent to call
│
└─ Activity: a2a_client.call_agent
├─ if HR → HR agent on http://localhost:9101 (A2A)
└─ if FAQ → FAQ agent on http://localhost:9102 (A2A)Classify, branch, route
The workflow runs three activities, with a plain `if` in between to pick the agent. The branching lives in the workflow, so the decision is recorded in the event history just like every other step.
@workflow.defn
class RoutingWorkflow:
@workflow.run
async def run(self, input: str) -> str:
# 1. Classify the user's input as an HR or FAQ question.
classify_instructions = await workflow.execute_activity(
prompts.render_prompt,
prompts.RenderPromptRequest(template="classify.jinja2"),
start_to_close_timeout=timedelta(seconds=30),
)
classification = await workflow.execute_activity(
openai_responses.create,
openai_responses.OpenAIResponsesRequest(
instructions=classify_instructions,
input=input,
),
start_to_close_timeout=timedelta(seconds=30),
)
agent = "HR" if "HR" in classification.output_text.upper() else "FAQ"
# 2. Forward the question to the chosen agent over A2A.
response = await workflow.execute_activity(
a2a_client.call_agent,
a2a_client.A2ACallRequest(agent=agent, message=input),
start_to_close_timeout=timedelta(seconds=60),
)
return f"[routed to {agent}] {response}"The agents are separate services
The HR and FAQ agents are standalone HTTP servers, not part of the Temporal worker. Each is an Azure OpenAI-backed A2A agent with its own persona prompt, running on its own port (HR on 9101, FAQ on 9102). Keeping them separate means each agent can be developed, scaled, and deployed independently — the workflow only needs to know how to reach them.
The a2a_client.call_agent activity is the bridge. It resolves the agent name to a URL, opens an A2A client, streams the request to the agent, and returns the reply as a plain string. One subtlety worth highlighting: the A2A client's default read timeout is short (~5s), but an agent backed by an LLM can take longer to answer. We raise the timeout so the streamed response isn't cut off mid-flight:
client = await create_client(
url,
client_config=ClientConfig(
httpx_client=httpx.AsyncClient(timeout=httpx.Timeout(60.0)),
),
)Because the activity returns a simple str, the protocol-specific objects never cross the Temporal boundary — the workflow stays clean and serializable.
Running it
The two workflows have different prerequisites. The simple workflow needs three processes; the routing workflow needs five because it also talks to the two agent servers. The repo ships a Taskfile.yml, so each piece is a one-line task command. Run each in its own terminal.
Common to both: server + worker
1. Temporal dev server — provides the service and Web UI.
task start-temporal-server # → temporal server start-dev
The server listens on localhost:7233 and the Web UI is at http://localhost:8233
2. The worker — a long-running process that polls simple-python-task-queue and executes the workflow and activity tasks. It registers both workflows and all activities, so the same worker serves both samples.
task run-worker # → uv run python -m temporal_openai.worker
The worker is the part that's easy to forget. Without it, a workflow is created but its tasks are never picked up — the event history stops at WorkflowTaskScheduled and the run sits in Running forever. Whenever you change activity or workflow code, restart the worker so it reloads.
Running the simple workflow
With the server and worker up, start a run:
task start-workflow # → uv run python -m temporal_openai.start_workflow
That's it — the client prints Result: ... once the workflow completes.
Running the routing workflow
The routing workflow additionally needs the two agent servers. With the server and worker already running, start each agent in its own terminal before kicking off the workflow, because the a2a_client.call_agent activity makes live HTTP calls to them:
1. HR agent — Azure OpenAI agent on port `9101`.
task run-hr-agent # → uv run python -m temporal_openai.agents.hr_agent
2. FAQ agent — Azure OpenAI agent on port `9102`.
task run-faq-agent # → uv run python -m temporal_openai.agents.faq_agent
3. Start the routing workflow with a question. Pass the text as an argument to see the routing in action:
# Classified as HR → routed to the HR agent
uv run python -m temporal_openai.start_routing_workflow \
"How many vacation days do I have left this year?"
# → Result: [routed to HR] ...
# Classified as FAQ → routed to the FAQ agent
uv run python -m temporal_openai.start_routing_workflow \
"What time does the office open?"
# → Result: [routed to FAQ] ...
(task start-routing-workflow runs the same client with a default question.)
A quick note on which command restarts what: change an agent (anything under agents/ or its prompt) and restart that agent; change a workflow or activity and restart the worker; the Temporal server and the agent servers can keep running across test runs. Mismatched task-queue names or a missing worker are the two most common reasons a run appears to hang.
Inspecting runs in the Web UI
Once the dev server is running, open the Temporal Web UI at http://localhost:8233. This is where Temporal's observability really pays off.
From the Workflows list you can click into any run (for example, my-workflow-id) to see:
- The overall status (Running, Completed, Failed, Terminated) and the final result returned by the workflow.
- The full event history — every WorkflowTaskScheduled, ActivityTaskScheduled, ActivityTaskStarted, and ActivityTaskCompleted event, in order. This is the same history you can inspect from the CLI with temporal workflow show --workflow-id my-workflow-id.
- The input and output of each activity, including the rendered prompt and the Azure OpenAI response.
- Retries and failures — if an activity fails (say, a 429 from Azure OpenAI), each attempt and its error is recorded here, so you can see exactly what happened and when.
Tip: change the UI port with temporal server start-dev --ui-port 8080 if 8233 is already in use. For Temporal Cloud, the UI lives at https://cloud.temporal.io instead.
Getting workflow data as JSON
The Web UI is great for browsing, but sometimes you want the raw data — for debugging, automation, or feeding into another tool. Temporal makes the full event history and metadata available as JSON.
From the CLI
Add -o json (or --output json) to most temporal workflow commands:
# Full event history as JSON
temporal workflow show --workflow-id my-workflow-id -o json
# Summary/metadata (status, type, timestamps, ...) as JSON
temporal workflow describe --workflow-id my-workflow-id -o json
# List runs as JSON
temporal workflow list --query "WorkflowId = 'my-workflow-id'" -o json
Pipe into jq to extract just what you need — for example, the completion event:
temporal workflow show --workflow-id my-workflow-id -o json \
| jq '.events[] | select(.eventType == "EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED")'
From the Python SDK
The client can fetch the result, metadata, and full history programmatically:
handle = client.get_workflow_handle("my-workflow-id")
# Final result (already decoded into your return type)
result = await handle.result()
# Metadata: status, run id, task queue, timestamps, ...
desc = await handle.describe()
# Full event history as JSON
history = await handle.fetch_history()
print(history.to_json())From the Web UI
On any workflow run page there's a Download option that exports the same event history JSON you'd get from temporal workflow show -o json.
Lessons learned
A few things that tripped us up, in case they save you time:
- No worker = stuck workflow. If a run never advances past WorkflowTaskScheduled, the worker isn't running or is polling the wrong task queue. The task queue name must match exactly between the worker and the client.
- BaseSettings doesn't read .env by default. You must set env_file=".env" in the model config, otherwise required fields fail validation even when a .env file exists.
- Let Temporal own retries. Disable the OpenAI SDK's internal retries (max_retries=0) so all retry behavior lives in one place and shows up in the event history.
- Keep workflows deterministic. Push all I/O (file reads, HTTP, model calls) into activities, and use workflow.unsafe.imports_passed_through() for modules the sandbox shouldn't re-execute.
- Mind the A2A client timeout. The default read timeout is short (~5s), which an LLM-backed agent will routinely exceed. Pass a custom httpx client with a longer timeout, or the streamed response gets cut off mid-flight.
- Restart the right process. Change an agent and restart that agent; change a workflow or activity and restart the worker. The Temporal server and agent servers can stay up across runs.
Downsides and when not to use Temporal
Temporal buys durability, but it isn't free. Weigh these tradeoffs before reaching for it.
You have to run (or rent) a server
Temporal is a client-server system. Your workflows and activities run in worker processes (your code), but they connect to a separate Temporal Service that owns durability, scheduling, retries, and history. You can't run workflows without it. Your options:
- Local dev — temporal server start-dev bundles everything into one binary (what these samples use). Convenient, but not meant for production.
- Self-hosted — run the cluster yourself (frontend, history, matching, and worker services) backed by a database. You own upgrades, scaling, monitoring, and backups.
- Temporal Cloud — a managed SaaS that removes the operational burden but adds cost and a vendor dependency.
Where the data lives
The Temporal Service is effectively stateless; the database you provide is the source of truth. It keeps two stores:
- Persistence store — workflow event histories, task queues, timers, and mutable state. Supported backends are PostgreSQL, MySQL, or Cassandra. The start-dev server uses an embedded SQLite file (or in-memory) instead.
- Visibility store — powers the list and search queries in the UI and CLI. This is either the same SQL database or Elasticsearch/OpenSearch for richer search at scale.
One consequence worth flagging: activity inputs/outputs and workflow state are stored in that history as payloads. Sensitive data ends up in the database unless you plug in a custom data converter that encrypts payloads.
The other tradeoffs
- - Operational overhead — a production-grade self-hosted cluster plus its database is real work; many teams end up paying for Temporal Cloud.
- - Determinism constraints — workflow code must be deterministic. No direct I/O, and no random, datetime.now(), or non-deterministic libraries in the workflow path. It's a learning curve and a common source of bugs.
- - Versioning complexity — because workflows replay their history, changing workflow code for in-flight runs requires patching/versioning to avoid non-determinism errors.
- - Latency and cost — every step is persisted, adding latency and database load versus a plain function call. Not ideal for ultra-low-latency hot paths.
- - History limits — long-running or high-event workflows can hit history size limits; you manage this with continue-as-new and small payloads.
For the AI-calls use case here, the payoff (automatic retries, durability, observability) usually outweighs these costs. But for a single, short LLM call with no orchestration around it, Temporal is likely overkill — a plain retry wrapper may serve you better.
Security considerations
These samples are tuned for local development, so a few things need hardening before they face real traffic. The list below moves from what the samples already do well to what you must add for production.
Secrets and authentication
The Azure OpenAI activity uses Entra ID (passwordless) authentication via DefaultAzureCredential, so no API keys live in code or .env. That is the recommended posture — in production, back the credential with a managed identity and grant it only the `Cognitive Services OpenAI User` role on the specific resource. The .env file holds non-secret config (endpoint, API version, model name) only; keep it out of source control regardless.
Data at rest — the Temporal database
This is the consideration unique to Temporal. Workflow inputs, outputs, and activity payloads are persisted in the event history. In these samples that means the user's text, the translation, the classification, and the agent's reply all land in the database in plain JSON. Anyone with database or Web UI access can read them.
If prompts or responses can contain PII or sensitive business data, add a custom data converter (Codec Server) that encrypts payloads before they leave your process, so Temporal only ever stores ciphertext. On a self-hosted cluster, also enable encryption-at-rest on the underlying database.
Data in transit
- Worker ↔ Temporal Service — the samples connect to plaintext localhost:7233. In production, enable mTLS between workers and the cluster (Temporal Cloud requires TLS with client certificates).
- Worker ↔ Azure OpenAI — already HTTPS.
- Workflow ↔ A2A agents — the routing sample calls http://localhost:9101 and 9102 over plain HTTP with no auth. In production, put the agents behind HTTPS with authentication (the A2A spec supports auth schemes) or on a private network. As written, anyone who can reach those ports can invoke the agents and spend your Azure OpenAI quota.
The agent servers as an attack surface
The HR and FAQ agents are standalone HTTP services that forward user input straight to an LLM. Treat them like any public API: add authentication, authorization, rate limiting, input size limits, and network isolation.
Because user input flows into the model, they are also exposed to prompt injection. If you later give agents tools (file access, databases, outbound HTTP), injected instructions could trigger unintended actions. Constrain the tools available, validate model output, and never let raw model output drive privileged operations unchecked.
Access control and logging
- The Temporal Web UI (:8233) exposes full histories, including payloads, and is unauthenticated in dev. Lock it down with Temporal Cloud RBAC or put a self-hosted UI behind SSO.
- Use separate namespaces to isolate tenants and environments rather than sharing one.
- Avoid logging full prompts and responses to stdout or application logs if they are sensitive — that can leak the very data an encrypted converter protects.
Dependencies
The stack pulls in temporalio, openai, a2a-sdk, and others. Pin versions and scan them (for example with pip-audit or Dependabot). The A2A protocol is young, so track its releases for security-relevant changes. For production, a practical priority order is: keep managed identity (already done), encrypt Temporal payloads, enable mTLS to the cluster, put HTTPS and auth on the A2A agents, and lock down the Web UI.
Wrapping up
With a few small files we turned a fragile, one-shot LLM call into a durable workflow that retries automatically, survives crashes, and is fully observable in the Temporal Web UI. The `RoutingWorkflow` then showed how the same building blocks compose into a multi-step pipeline: classify the input, branch on the result, and route to one of two specialized agents over A2A — with every step recorded in the event history.
Both samples are available in this repository. Start the Temporal dev server and worker, then either run `task start-workflow` for the simple case, or bring up the two agents and run the routing workflow to watch each step appear in the UI.


Comments
Post a Comment