Image by https://www.pexels.com/@divinetechygirl/ |
In this blog, we look into Azure Durable Function. It is excellent for implementing a simple workflow when we need to track the state of activities.
Setup
Azure Durable Function is part of the Azure Function App. Hence we just need to create a Function App in our resource group.
Simple Workflow
For the blog, we coined a simple use case.
An HTTP interface that receives a list of image URLs. This interface calls durable function which in turn makes multiple calls to Azure Face Recognition Service (one call per image). From the result from Azure Face Recognition Service, we get a list of happy emotion ratings. Then we find the maximum and minimum values. (see this blog on how Azure Face Recognition Service works)
Implementation
From the diagram above, we need
- Durable Function HTTP Starter that takes an HTTP request.
- An Orchestrator to fan out the calls to Detect Face Durable Function Activity
- Detect Face Durable Function
There are a few things, we need to familiarize ourselves with.
Durable Function HTTP Starter
import json import logging import azure.functions as func import azure.durable_functions as df async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse: client = df.DurableOrchestrationClient(starter) fn_name = req.route_params["functionName"] if fn_name == "Orchestrator": urls = json.loads(req.get_body()) instance_id = await client.start_new(fn_name, None, {"urls": urls}) else: instance_id = await client.start_new(fn_name, None, None) logging.info(f"Started orchestration with ID = '{instance_id}'.") return client.create_check_status_response(req, instance_id)
Orchestrator Durable Function
from functools import reduce import json import azure.functions as func import azure.durable_functions as df def orchestrator_function(context: df.DurableOrchestrationContext): input_context = context.get_input() tasks = [] for x in input_context.get('urls'): tasks.append(context.call_activity('DetectFace', x)) result = yield context.task_all(tasks) flatten_list = reduce(lambda a, b: a+b, filter(lambda x: x is not None, result)) return { "min_val": min(flatten_list), "max_val": max(flatten_list) } main = df.Orchestrator.create(orchestrator_function)
Here, we create a list of tasks (based on the number of image URLs provided) and do yield context.task_all(tasks) which parallelizes the call to the DetectFace function. The rest of the code is self-explanatory.
DetectFace Function
import os import json import requests from azure.cognitiveservices.vision.face import FaceClient from azure.cognitiveservices.vision.face.models import DetectedFace from msrest.authentication import CognitiveServicesCredentials KEY = os.environ["AZURE_FACE_REC_KEY"] ENDPOINT = os.environ["AZURE_FACE_REC_ENDPOINT"] def main(url: str) -> dict: face_client = FaceClient(ENDPOINT, CognitiveServicesCredentials(KEY)) detected_faces = face_client.face.detect_with_url( url=url, return_face_attributes=["emotion"]) if detected_faces: return [face.face_attributes.emotion.happiness for face in detected_faces] return None
Here we simply call the Azure Face Recognition Service.
Observation
With vscode, I am able to run the functions in my local environment.
I made an HTTP/1.1 POST with the Postman with 3 images URLs.
{ "id": "5346f527c9004e54a0cfdb0f2a988f5d", "statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/5346f527c9004e54a0cfdb0f2a988f5d?taskHub=TestHubName&connection=Storage&code=3bXmVZuGANyvw9oFao1KRG4vnKdqu6YVXFVI8znt58ntkxz8OWMNbQ==", "sendEventPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/5346f527c9004e54a0cfdb0f2a988f5d/raiseEvent/{eventName}?taskHub=TestHubName&connection=Storage&code=3bXmVZuGANyvw9oFao1KRG4vnKdqu6YVXFVI8znt58ntkxz8OWMNbQ==", "terminatePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/5346f527c9004e54a0cfdb0f2a988f5d/terminate?reason={text}&taskHub=TestHubName&connection=Storage&code=3bXmVZuGANyvw9oFao1KRG4vnKdqu6YVXFVI8znt58ntkxz8OWMNbQ==", "rewindPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/5346f527c9004e54a0cfdb0f2a988f5d/rewind?reason={text}&taskHub=TestHubName&connection=Storage&code=3bXmVZuGANyvw9oFao1KRG4vnKdqu6YVXFVI8znt58ntkxz8OWMNbQ==", "purgeHistoryDeleteUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/5346f527c9004e54a0cfdb0f2a988f5d?taskHub=TestHubName&connection=Storage&code=3bXmVZuGANyvw9oFao1KRG4vnKdqu6YVXFVI8znt58ntkxz8OWMNbQ==", "restartPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/5346f527c9004e54a0cfdb0f2a988f5d/restart?taskHub=TestHubName&connection=Storage&code=3bXmVZuGANyvw9oFao1KRG4vnKdqu6YVXFVI8znt58ntkxz8OWMNbQ==", "suspendPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/5346f527c9004e54a0cfdb0f2a988f5d/suspend?reason={text}&taskHub=TestHubName&connection=Storage&code=3bXmVZuGANyvw9oFao1KRG4vnKdqu6YVXFVI8znt58ntkxz8OWMNbQ==", "resumePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/5346f527c9004e54a0cfdb0f2a988f5d/resume?reason={text}&taskHub=TestHubName&connection=Storage&code=3bXmVZuGANyvw9oFao1KRG4vnKdqu6YVXFVI8znt58ntkxz8OWMNbQ==" }
This response has an HTTP status code of 202. This means that the request is received. Waiting for a few seconds, I pointed my browser to the URL for statusQueryGetUri and got.
{ "name": "Orchestrator", "instanceId": "5346f527c9004e54a0cfdb0f2a988f5d", "runtimeStatus": "Completed", "input": "{\"urls\": [\"https://raw.githubusercontent.com/Microsoft/Cognitive-Face-Windows/master/Data/detection1.jpg\", \"https://www.nist.gov/sites/default/files/styles/960_x_960_limit/public/images/2020/07/27/DoubleRow.png?itok=5CfcABWm\", \"https://images.newscientist.com/wp-content/uploads/2022/02/14174128/PRI_223554170.jpg?width=800\"]}", "customStatus": null, "output": { "min_val": 0.0, "max_val": 1.0 }, "createdTime": "2022-11-27T19:36:14Z", "lastUpdatedTime": "2022-11-27T19:36:18Z" }
Summary
With Azure Durable Function, it is easy to implement code for a simple workflow. And the code that I have shared above is very simple and crisp.
Comments
Post a Comment