Just show me the code
As always if you don’t care about the post I have upload the source code on my Github.

In today’s post I’ll be showing you how to implement the Async HTTP Api pattern with Azure Durable functions.

Also, most of the online examples and documentation about Durable Functions are written with csharp so instead of that I have decided to use Python.

To begin with, let’s do a brief explanation about a few key concepts around Azure Durable Functions.

1. Azure Durable Functions

For those unaware of Azure Durable Functions, it is an extension of Azure Functions that lets you write stateful functions in a serverless compute environment.
The extension lets you define stateful workflows by writing orchestrator functions and stateful entities.

The 4 main concepts you should know about are:

  • Client functions

The client function is responsible for starting, stopping and monitoring the orchestrator functions.

  • Orchestrator functions

This function is the heart when building a durable function solution. In this function you write your workflow in code. The workflow can consist of code statements calling other functions like activity functions, other orchestration functions or even wait for other events to occur.

An orchestration is started by a client function, a function that on its turn can be triggered by a message in a queue, an HTTP request, or any other trigger mechanism you are familiar with.

Every instance of an orchestration function will have an instance identifier, which can be auto-generated or user-generated.

  • Activity functions

These activity functions are the basic unit of work in a durable function solution.
Each activity function executes one task and can be anything you want.

  • Entity functions

Entity functions define operations for reading and updating small pieces of state. We often refer to these stateful entities as durable entities.

Like orchestrator functions, entity functions are functions with a special trigger type, entity trigger. They can also be invoked from client functions or from orchestrator functions.

Unlike orchestrator functions, entity functions do not have any specific code constraints. Entity functions also manage state explicitly rather than implicitly representing state via control flow.

2. Problem trying to solve with Azure Durable functions

In this section I’ll like to talk a little bit about what I was trying to solve using Durable functions and why Durable functions was a good fit.

One of my clients has an HTTP API that uses an Azure Function as its backend, the function takes a few parameters via querystring, runs a query against an Azure Storage Table and returns a result. It’s a really simple setup.

audit-api

The problem with this approach lies in the fact that the query takes between 3 and 7 minutes to complete, so most of the time the HTTP function times out because 230 seconds is the maximum amount of time that an HTTP triggered function can take to respond to a request. This is because of the idle timeout of Azure Load Balancer.

For longer processing times, I needed to consider using an async pattern or defer the actual work and return an immediate response.
And Azure Durable Functions provides built-in support for the async http api pattern.

3. Async HTTP API Pattern

The async HTTP API pattern addresses the problem of coordinating the state of long-running operations with external clients.

A common way to implement this pattern is by having an HTTP endpoint trigger the long-running action. Then, redirect the client to a status endpoint that the client polls to learn when the operation is finished.

async-http-pattern

4. Scenario

The idea behind what we’re going to build is the following one:

  • The customer submits a job by calling an Azure Function that has an HTTP endpoint trigger. This is the Client Function.
  • The submitted job is going to be picked up by the Orchestrator Function and it will call the Activity Function.
  • The Activity Function is going to run the query against the Azure Storage Table and return the result.
  • There is going to be an extra Azure Function that queries the orchestrator function and retrieves the status and the result of a given job.

Here’s a diagram:

async-http-api

  • client-function: submits the job that needs to be executed.
  • get-status-function: it is used to retrieve the status and the result of a given job.
  • orchestrator-function: Unwraps the parameters from the submitted job and calls the activity function.
  • query-storage-account-activity-function: Runs a custom query in an Azure Storage Table.

5. Implementation

The components of the solution are described in the previous section. Now let’s start building them.

client-function

This is an Http triggered function.

The function receives 2 parameters via QueryString (those parameters are needed to build the query against the Storage Table) and validates both of them.

Afterwards, it creates a DurableOrchestrationClient and starts a new orchestration instance using the start_new method.
The start_new method takes 3 parameters:

  • Name: The name of the orchestrator function to schedule.
  • InstanceId: (Optional) The unique ID of the instance. If you don’t specify this parameter, the method uses a random ID.
  • Input: Any JSON-serializable data that should be passed as the input to the orchestrator function.

And lastly, it builds the URL from where the client can retrieve the status and the result of the submitted job.

import azure.functions as func
import azure.durable_functions as df
import json
import dateutil.parser
from urllib.parse import urlparse

async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:

    response = { }
    headers = { "Content-Type": "application/json" }

    start_date = req.params.get('s')
    end_date = req.params.get('e')
    
    if start_date:
        try:
          start_date = dateutil.parser.parse(start_date, dayfirst=True)
        except:
          response['error'] = "Invalid start date format."
          return func.HttpResponse(json.dumps(response) ,headers=headers, status_code=400 )
    else:
        response['error'] = "Empty start date."
        return func.HttpResponse(json.dumps(response) ,headers=headers, status_code=400 )

    if end_date:
        try:
          end_date = dateutil.parser.parse(end_date, dayfirst=True)
        except:
          response['error'] = "Invalid end date format."
          return func.HttpResponse(json.dumps(response) ,headers=headers, status_code=400 )
    else:
        response['error'] = "Empty end date."
        return func.HttpResponse(json.dumps(response) ,headers=headers, status_code=400 )

    delta = end_date - start_date
    if delta.days < 1:
        response['error'] = "Invalid date range."
        return func.HttpResponse(json.dumps(response) ,headers=headers, status_code=400 )

    
    client = df.DurableOrchestrationClient(starter)

    parameters = {
        "start": start_date.strftime("%Y-%m-%d"),
        "end": end_date.strftime("%Y-%m-%d")
    }

    instance_id = await client.start_new('orchestrator-function', None, parameters)

    status_uri = build_api_url(urlparse(req.url).scheme, req.headers.get("host"), instance_id)
    response["statusUri"] = status_uri
    return func.HttpResponse(json.dumps(response), headers=headers, status_code=200 )

def build_api_url(scheme, host, instance_id):
    return f"{scheme}://{host}/api/status/{instance_id}"

get-status-function

Is this function really needed?

Before discussing the implementation of this Azure Function, I think it’s important to talk a bit about if this function is really needed or not.

Most of the examples that you are going to find online don’t use a custom function to retrieve the status of the jobs. Instead of that, they use the create_check_status_response method on the Client Function.

That means that in the previous section I could have built the client function like this:

    ...
    client = df.DurableOrchestrationClient(starter)

    parameters = {
        "start": start_date.strftime("%Y-%m-%d"),
        "end": end_date.strftime("%Y-%m-%d")
    }

    instance_id = await client.start_new('orchestrator-function', None, parameters)
    return client.create_check_status_response(req, instance_id)

And that’s how the function would have responded:

{
  "id": "b78244c9e19f43e89e7b1578f711940d",
  "statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/b78244c9e19f43e89e7b1578f711940d?taskHub=TestHubName&connection=Storage&code=V/xX4rraZaheGNwbMePOhhGtyHxGsgks4Q36WKBdS3WfiOjD5Sb5lw==",
  "sendEventPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/b78244c9e19f43e89e7b1578f711940d/raiseEvent/{eventName}?taskHub=TestHubName&connection=Storage&code=V/xX4rraZaheGNwbMePOhhGtyHxGsgks4Q36WKBdS3WfiOjD5Sb5lw==",
  "terminatePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/b78244c9e19f43e89e7b1578f711940d/terminate?reason={text}&taskHub=TestHubName&connection=Storage&code=V/xX4rraZaheGNwbMePOhhGtyHxGsgks4Q36WKBdS3WfiOjD5Sb5lw==",
  "rewindPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/b78244c9e19f43e89e7b1578f711940d/rewind?reason={text}&taskHub=TestHubName&connection=Storage&code=V/xX4rraZaheGNwbMePOhhGtyHxGsgks4Q36WKBdS3WfiOjD5Sb5lw==",
  "purgeHistoryDeleteUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/b78244c9e19f43e89e7b1578f711940d?taskHub=TestHubName&connection=Storage&code=V/xX4rraZaheGNwbMePOhhGtyHxGsgks4Q36WKBdS3WfiOjD5Sb5lw==",
  "restartPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/b78244c9e19f43e89e7b1578f711940d/restart?taskHub=TestHubName&connection=Storage&code=V/xX4rraZaheGNwbMePOhhGtyHxGsgks4Q36WKBdS3WfiOjD5Sb5lw=="
}

As you can see the create_check_status_response method returns a response that contains links to the various management operations that can be invoked on an orchestration instance.
These operations include querying the orchestration status, raising events or terminating the orchestration.

So, why I’m building an extra Azure Function to retrieve the status of a job instead of directly using the create_check_status_response method?

This is because the general public shouldn’t be able to query the Orchestration status endpoint directly.
As you can see the response payload includes the orchestrator management endpoints with their corresponding keys, by providing that, you would allow the clients not only to get the status of a running instance, but also to get the execution history, send external events to the orchestration instance or terminate that instance.

Basically if you don’t want your clients messing around with the heart of the function, you need to expose another azure function that returns only the minimum information required.

Implementation

The get-status-function is an Http triggered function.

This function needs the instance_id that the Client function generated when submitting the job to the orchestrator function.

To retrieve the status and the result of a submitted job you can use the get_status method. This method queries the orchestrator function to obtain the status of the job.

The get_status returns an object with the following properties:

  • Name: The name of the orchestrator function.
  • InstanceId: The instance ID of the orchestration (should be the same as the instanceId input).
  • CreatedTime: The time at which the orchestrator function started running.
  • LastUpdatedTime: The time at which the orchestration last checkpointed.
  • Input: The input of the function as a JSON value. This field isn’t populated if showInput is false.
  • CustomStatus: Custom orchestration status in JSON format.
  • Output: The output of the function as a JSON value (if the function has completed). If the orchestrator function failed, this property includes the failure details. If the orchestrator function was terminated, this property includes the reason for the termination (if any).
  • RuntimeStatus: One of the following values:
    • Pending: The instance has been scheduled but has not yet started running.
    • Running: The instance has started running.
    • Completed: The instance has completed normally.
    • ContinuedAsNew: The instance has restarted itself with a new history. This state is a transient state.
    • Failed: The instance failed with an error.
    • Terminated: The instance was stopped abruptly.
  • History: The execution history of the orchestration. This field is only populated if showHistory is set to true.
import azure.functions as func
import azure.durable_functions as df
import json


async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
    client = df.DurableOrchestrationClient(starter)

    instance_id = req.route_params["id"]

    response = await client.get_status(instance_id)

    if response.instance_id is None:
        return func.HttpResponse("Job not found", status_code=404)

    return func.HttpResponse(json.dumps({
        "id": response.instance_id,
        "status": response.runtime_status.value,
        "result": response.output
    }))

orchestrator-function

Nothing fancy here. The orchestrator function in this application is really simple because there is no need to orchestrate multiple activies nor build a complex workflow.

The function does the following steps:

  • Retrieves the inputs send from the Client Function (These parameters are needed to build the query against the Azure Table Storage)
  • Calls the activity function passing the retrieved inputs.
  • Waits for the activity function to end and returns the result.
import azure.durable_functions as df


def orchestrator_function(context: df.DurableOrchestrationContext):
    input = context.get_input()
    result = yield context.call_activity('query-storage-account-activity-function', {'start': input['start'], 'end': input['end']})
    return result

main = df.Orchestrator.create(orchestrator_function)

query-storage-account-activity-function

This Activity Function is responsible to run the query against the Azure Table Storage.

This function retrieves the Storage Table connection string from an App Configuration with a Key Vault reference (https://docs.microsoft.com/en-us/azure/azure-app-configuration/overview), runs the query and returns the result.

As you can see this function is a single unit of work and does not contain any reference to the durable-functions library.

import json
import os
from azure.appconfiguration import AzureAppConfigurationClient
from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient
from azure.data.tables import TableClient
from pathlib import Path

def main(request: dict) -> int:
    start = request['start']
    end   = request['end']
    
    table_conn_str = get_azure_table_connection_string()
    response = run_query(start, end, table_conn_str)

    return response

def run_query(start, end, table_conn_str) -> int:

    items = []
    client =  TableClient.from_connection_string(conn_str=table_conn_str, table_name="audit") 
    
    parameters = {
        "start": start,
        "end": end
    }
           
    query_filter = "PartitionKey ge @start and PartitionKey le @end and CertNumber ne ''"
    entities = client.query_entities(query_filter, parameters=parameters, select='identificationNumber')
    
    for entity in entities:
        items.append(entity['identificationNumber'])
        
    return len(set(items))

def get_azure_table_connection_string() -> str:
    
    defaultAzureCredential = DefaultAzureCredential()

    app_config_base_url = os.getenv('AppConfigEndpoint')
    app_config_client = AzureAppConfigurationClient(base_url=app_config_base_url, credential=defaultAzureCredential)

    keyvault_value = app_config_client.get_configuration_setting(key="storage-account-connection-string", label="async-http-api")
    url_parts = Path(json.loads(keyvault_value.value)["uri"]).parts
    vault_url = "//".join(url_parts[:2])
    kv_secret = url_parts[-1]
    kv_client = SecretClient(vault_url, defaultAzureCredential)
    secret_val = kv_client.get_secret(kv_secret).value

    return secret_val

6. Test

Everything is put in place, now let’s test it.

If I try to submit a new job with a few invalid parameters, it responds with an error:

curl "https://func-sa-table-durable-dev.azurewebsites.net/api/submit/query"
{"error": "Empty start date."}
curl "https://func-sa-table-durable-dev.azurewebsites.net/api/submit/query?s=10/08/2021&e=abcd"
{"error": "Invalid end date format."}
curl "https://func-sa-table-durable-dev.azurewebsites.net/api/submit/query?s=10/08/2021&e=05/08/2021"
{"error": "Invalid date range."}

If I try to submit a new job with valid parameters, the client function responds with the status endpoint.

curl "https://func-sa-table-durable-dev.azurewebsites.net/api/submit/query?s=10/10/2021&e=10/12/2021"
{"statusUri": "https://func-sa-table-durable-dev.azurewebsites.net/api/status/433ebcfe85ec4012abe94dcda2aa6b00"}

If I query the get-status function right away, it returns that the job is still being executed.

curl "https://func-sa-table-durable-dev.azurewebsites.net/api/status/433ebcfe85ec4012abe94dcda2aa6b00"
{"id": "433ebcfe85ec4012abe94dcda2aa6b00", "status": "Running", "result": null}

If I query the get-status function after a few minutes, the job has completed and we can see the result.

curl "https://func-sa-table-durable-dev.azurewebsites.net/api/status/433ebcfe85ec4012abe94dcda2aa6b00"
{"id": "433ebcfe85ec4012abe94dcda2aa6b00", "status": "Completed", "result": 119}

7. Deployment to Azure

I didn’t plan to write about how to deploy these functions to Azure, but it might be useful to someone.

Here’s how you can deploy them using:

  • Azure DevOps pipelines
  • Github Actions

Using Azure Pipelines

trigger: none

pool:
  vmImage: 'ubuntu-latest'

variables:
- name: azureSubscription
  value: 'cpons-demos-dev'
- name: functionAppName
  value: 'func-staccount-report-query-dev'

steps:
- task: UsePythonVersion@0
  inputs:
    versionSpec: '3.8'
  displayName: 'Use Python 3.8'

- script: |
    python -m pip install --upgrade pip
    pip install --target="./.python_packages/lib/site-packages" -r ./requirements.txt    
  displayName: 'Install dependencies'

- task: CopyFiles@2
  inputs:
    SourceFolder: '$(Build.SourcesDirectory)'
    Contents: '**'
    TargetFolder: '$(Build.ArtifactStagingDirectory)'

- task: ArchiveFiles@2
  inputs:
    rootFolderOrFile: '$(Build.ArtifactStagingDirectory)'
    includeRootFolder: false
    archiveType: 'zip'
    archiveFile: '$(Build.ArtifactStagingDirectory)/$(Build.BuildId).zip'
    replaceExistingArchive: true

- task: AzureFunctionApp@1
  inputs:
    azureSubscription: '$(azureSubscription)'
    appType: 'functionAppLinux'
    appName: '$(functionAppName)'
    package: '$(Build.ArtifactStagingDirectory)/$(Build.BuildId).zip'
    runtimeStack: 'PYTHON|3.8'

Using Github Action

name: Deploy Durable Functions to Azure Function App

on:
  push:
    branches: [ main ]

  workflow_dispatch:

env:
  AZURE_FUNCTIONAPP_NAME: func-staccount-report-query-dev
  AZURE_FUNCTIONAPP_PACKAGE_PATH: '.'   
  PYTHON_VERSION: '3.8'
                
jobs:
  build-and-deploy:
    environment: dev
    runs-on: ubuntu-latest
    steps:
    - name: 'Checkout GitHub Action'
      uses: actions/checkout@master

    - name: Setup Python ${{ env.PYTHON_VERSION }} Environment
      uses: actions/setup-python@v1
      with:
        python-version: ${{ env.PYTHON_VERSION }}

    - name: 'Resolve Project Dependencies Using Pip'
      shell: bash
      run: |
        pushd './${{ env.AZURE_FUNCTIONAPP_PACKAGE_PATH }}'
        python -m pip install --upgrade pip
        pip install -r requirements.txt --target=".python_packages/lib/site-packages"
        popd        
    - name: 'Run Azure Functions Action'
      uses: Azure/functions-action@v1
      id: fa
      with:
        app-name: ${{ env.AZURE_FUNCTIONAPP_NAME }}
        package: ${{ env.AZURE_FUNCTIONAPP_PACKAGE_PATH }}
        publish-profile: ${{ secrets.SCM_CREDENTIALS }}