Simple Workflow with Azure Durable Function

Image by https://www.pexels.com/@divinetechygirl/
Image by https://www.pexels.com/@divinetechygirl/

In this blog, we look into Azure Durable Function. It is excellent for implementing a simple workflow when we need to track the state of activities.

Setup

Azure Durable Function is part of the Azure Function App. Hence we just need to create a Function App in our resource group.

Simple Workflow

For the blog, we coined a simple use case.

An HTTP interface that receives a list of image URLs. This interface calls durable function which in turn makes multiple calls to Azure Face Recognition Service (one call per image). From the result from Azure Face Recognition Service, we get a list of happy emotion ratings. Then we find the maximum and minimum values. (see this blog on how Azure Face Recognition Service works)

Implementation

 From the diagram above, we need

  1. Durable Function HTTP Starter that takes an HTTP request.
  2. An Orchestrator to fan out the calls to Detect Face Durable Function Activity
  3. Detect Face Durable Function

 There are a few things, we need to familiarize ourselves with.

  1. fan out and fan in
  2. different kinds of activities
The source code is at git. We will focus on 3 main pieces of code.

Durable Function HTTP Starter

import json
import logging

import azure.functions as func
import azure.durable_functions as df


async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
    client = df.DurableOrchestrationClient(starter)
    fn_name = req.route_params["functionName"]

    if fn_name == "Orchestrator":
        urls = json.loads(req.get_body())
        instance_id = await client.start_new(fn_name, None, {"urls": urls})
    else:
        instance_id = await client.start_new(fn_name, None, None)
        
    logging.info(f"Started orchestration with ID = '{instance_id}'.")
    return client.create_check_status_response(req, instance_id)
Our Orchestrator durable function is called when we make an HTTP/1.1 POST request, https://<my-app>.azurewebsites.net/api/orchestrators/Orchestrator

Orchestrator Durable Function

from functools import reduce
import json

import azure.functions as func
import azure.durable_functions as df


def orchestrator_function(context: df.DurableOrchestrationContext):
    input_context = context.get_input()

    tasks = []
    for x in input_context.get('urls'):
        tasks.append(context.call_activity('DetectFace', x))

    result = yield context.task_all(tasks)
    
    flatten_list = reduce(lambda a, b: a+b, filter(lambda x: x is not None, result))

    return {
        "min_val": min(flatten_list),
        "max_val": max(flatten_list)
    }

main = df.Orchestrator.create(orchestrator_function)

Here, we create a list of tasks (based on the number of image URLs provided) and do yield context.task_all(tasks) which parallelizes the call to the DetectFace function. The rest of the code is self-explanatory.

DetectFace Function

import os
import json
import requests

from azure.cognitiveservices.vision.face import FaceClient
from azure.cognitiveservices.vision.face.models import DetectedFace
from msrest.authentication import CognitiveServicesCredentials

KEY = os.environ["AZURE_FACE_REC_KEY"]
ENDPOINT = os.environ["AZURE_FACE_REC_ENDPOINT"]


def main(url: str) -> dict:
    face_client = FaceClient(ENDPOINT, CognitiveServicesCredentials(KEY))

    detected_faces = face_client.face.detect_with_url(
        url=url,
        return_face_attributes=["emotion"])

    if detected_faces:
        return [face.face_attributes.emotion.happiness for face in detected_faces]

    return None

Here we simply call the Azure Face Recognition Service.

Observation

With vscode, I am able to run the functions in my local environment.

I made an HTTP/1.1 POST with the Postman with 3 images URLs.


and I got
{
    "id": "5346f527c9004e54a0cfdb0f2a988f5d",
    "statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/5346f527c9004e54a0cfdb0f2a988f5d?taskHub=TestHubName&connection=Storage&code=3bXmVZuGANyvw9oFao1KRG4vnKdqu6YVXFVI8znt58ntkxz8OWMNbQ==",
    "sendEventPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/5346f527c9004e54a0cfdb0f2a988f5d/raiseEvent/{eventName}?taskHub=TestHubName&connection=Storage&code=3bXmVZuGANyvw9oFao1KRG4vnKdqu6YVXFVI8znt58ntkxz8OWMNbQ==",
    "terminatePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/5346f527c9004e54a0cfdb0f2a988f5d/terminate?reason={text}&taskHub=TestHubName&connection=Storage&code=3bXmVZuGANyvw9oFao1KRG4vnKdqu6YVXFVI8znt58ntkxz8OWMNbQ==",
    "rewindPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/5346f527c9004e54a0cfdb0f2a988f5d/rewind?reason={text}&taskHub=TestHubName&connection=Storage&code=3bXmVZuGANyvw9oFao1KRG4vnKdqu6YVXFVI8znt58ntkxz8OWMNbQ==",
    "purgeHistoryDeleteUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/5346f527c9004e54a0cfdb0f2a988f5d?taskHub=TestHubName&connection=Storage&code=3bXmVZuGANyvw9oFao1KRG4vnKdqu6YVXFVI8znt58ntkxz8OWMNbQ==",
    "restartPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/5346f527c9004e54a0cfdb0f2a988f5d/restart?taskHub=TestHubName&connection=Storage&code=3bXmVZuGANyvw9oFao1KRG4vnKdqu6YVXFVI8znt58ntkxz8OWMNbQ==",
    "suspendPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/5346f527c9004e54a0cfdb0f2a988f5d/suspend?reason={text}&taskHub=TestHubName&connection=Storage&code=3bXmVZuGANyvw9oFao1KRG4vnKdqu6YVXFVI8znt58ntkxz8OWMNbQ==",
    "resumePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/5346f527c9004e54a0cfdb0f2a988f5d/resume?reason={text}&taskHub=TestHubName&connection=Storage&code=3bXmVZuGANyvw9oFao1KRG4vnKdqu6YVXFVI8znt58ntkxz8OWMNbQ=="
}

This response has an HTTP status code of 202. This means that the request is received. Waiting for a few seconds, I pointed my browser to the URL for statusQueryGetUri and got.

{
    "name": "Orchestrator",
    "instanceId": "5346f527c9004e54a0cfdb0f2a988f5d",
    "runtimeStatus": "Completed",
    "input": "{\"urls\": [\"https://raw.githubusercontent.com/Microsoft/Cognitive-Face-Windows/master/Data/detection1.jpg\", \"https://www.nist.gov/sites/default/files/styles/960_x_960_limit/public/images/2020/07/27/DoubleRow.png?itok=5CfcABWm\", \"https://images.newscientist.com/wp-content/uploads/2022/02/14174128/PRI_223554170.jpg?width=800\"]}",
    "customStatus": null,
    "output": {
        "min_val": 0.0,
        "max_val": 1.0
    },
    "createdTime": "2022-11-27T19:36:14Z",
    "lastUpdatedTime": "2022-11-27T19:36:18Z"
}
It took 4 seconds to make 3 calls to Azure Face Recognition Service. IMO, this is impressive (speedy).

Summary

With Azure Durable Function, it is easy to implement code for a simple workflow. And the code that I have shared above is very simple and crisp. 


Comments