Timer with Step Functions and AWS Lambda (.NET8)

Timer with Step Functions and AWS Lambda (.NET8)

Amazon Web Services Microsoft .NET AWS Lambda AWS Step Functions

Imagine you’re building an online auction app on AWS. A key feature is a timer that counts down to the end of bidding. This timer needs to be accurate, reliable, and scalable. Here’s how you can set it up using AWS Step Functions and AWS Lambda.

How It Works

Simple Timer architecture with Step Functions and Lambdas diagram
Simple Timer architecture with Step Functions and Lambdas diagram

We’ll use AWS Lambda to handle requests for new timers, AWS Step Functions to keep track of the timers, and another AWS Lambda to do something when the timer ends.

Timer Scheduler Lambda

This AWS Lambda creates new timers. It gets requests to start a timer and sets up a State Machine for each one. The main thing it needs is RunAt, which is when the action should happen.

using Amazon.Lambda.Core;
using Amazon.StepFunctions;
using Amazon.StepFunctions.Model;
using System.Text.Json;
using System.Text.Json.Nodes;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace TimerLambda;

public class Function
{
    private readonly IAmazonStepFunctions _stepFunctionsClient;
    private readonly string? _targetLambdaArn;
    private readonly int _targetLambdaCallBeforeSec;
    private readonly string? _statMachineRoleArn;
    private readonly string _stateMachineNamePrefix;

    public Function()
    {
        _stepFunctionsClient = new AmazonStepFunctionsClient();
        _targetLambdaArn = Environment.GetEnvironmentVariable("TARGET_LAMBDA_ARN");
        _targetLambdaCallBeforeSec = int.TryParse(Environment.GetEnvironmentVariable("TARGET_LAMBDA_CALL_BEFORE_SEC"), out int result) ? result : 0;
        _stateMachineNamePrefix = Environment.GetEnvironmentVariable("STATE_MACHINE_NAME_PREFIX") ?? "TimerStateMachine";
        _statMachineRoleArn = Environment.GetEnvironmentVariable("STATE_MACHINE_ROLE_ARN");
    }

    public async Task<object> FunctionHandler(Input input, ILambdaContext context)
    {
        context.Logger.LogLine($"FunctionHandler: {JsonSerializer.Serialize(input)}");

        var inputObject = input;
        if (inputObject == null)
        {
            throw new ArgumentException("Input cannot be null");
        }

        var waitTime = inputObject.RunAt.Subtract(DateTime.UtcNow);
        context.Logger.LogLine($"WaitTime: {waitTime}");
        if (waitTime < TimeSpan.Zero)
        {
            throw new ArgumentException("RunAt time cannot be in the past");
        }

        if (string.IsNullOrEmpty(_targetLambdaArn))
        {
            throw new ArgumentException("TARGET_LAMBDA_ARN environment variable not found");
        }

        var createStateMachineRequest = new CreateStateMachineRequest
        {
            Name = $"{_stateMachineNamePrefix}-{Guid.NewGuid()}",
            Definition = GetStateMachineDefinition(inputObject.RunAt, _targetLambdaArn, _targetLambdaCallBeforeSec, inputObject.Payload ?? "{}"),
            RoleArn = _statMachineRoleArn
        };

        context.Logger.LogLine($"CreateStateMachineRequest: {JsonSerializer.Serialize(createStateMachineRequest)}");

        var createStateMachineResult = await _stepFunctionsClient.CreateStateMachineAsync(createStateMachineRequest);
        var startExecutionRequest = new StartExecutionRequest
        {
            StateMachineArn = createStateMachineResult.StateMachineArn
        };

        return await _stepFunctionsClient.StartExecutionAsync(startExecutionRequest);
    }

    private static string GetStateMachineDefinition(DateTime runAtUtc, string targetLambdaArn, int targetLambdaCallBeforeSec, string payloadJson)
    {
        string waitUntil = runAtUtc.Subtract(TimeSpan.FromSeconds(targetLambdaCallBeforeSec)).ToString("yyyy-MM-ddTHH:mm:ssZ");
        string exactRunAt = runAtUtc.ToString("yyyy-MM-ddTHH:mm:ssZ");
        var stateMachineDefinition = new JsonObject
        {
            ["Comment"] = "Timer Definition",
            ["StartAt"] = "Wait",
            ["States"] = new JsonObject
            {
                ["Wait"] = new JsonObject
                {
                    ["Type"] = "Wait",
                    ["Next"] = "Lambda Invoke",
                    ["Timestamp"] = waitUntil
                },
                ["Lambda Invoke"] = new JsonObject
                {
                    ["Type"] = "Task",
                    ["Resource"] = "arn:aws:states:::lambda:invoke",
                    ["OutputPath"] = "$.Payload",
                    ["Parameters"] = new JsonObject
                    {
                        ["FunctionName"] = targetLambdaArn,
                        ["Payload"] = new JsonObject
                        {
                            ["RunAt"] = exactRunAt,
                            ["Payload"] = payloadJson
                        }
                    },
                    ["End"] = true
                }
            }
        };

        return stateMachineDefinition.ToJsonString();
    }

    public sealed class Input
    {
        public DateTime RunAt { get; set; }
        public string? Payload { get; set; }
    }
}

In the setup, we connect to AWS Step Functions and read some settings:

  • TARGET_LAMBDA_ARN: The AWS Lambda to call when the timer ends.
  • STATE_MACHINE_NAME_PREFIX: A prefix for naming each timer’s state machine.
  • STATE_MACHINE_ROLE_ARN: The role that lets the state machine call the target Lambda.

The FunctionHandler method is where the action happens. It checks the request, makes sure it’s valid, and sets up a new state machine.

The GetStateMachineDefinition method creates the state machine’s plan. It has two parts: Wait (waits until the timer ends) and Lambda Invoke (calls the final AWS Lambda).

Callback Executor Lambda

This Lambda runs after the timer ends. It can do things like send a message, update a database, call another Lambda, or hit a REST API.

using Amazon.Lambda.Core;
using System.Text.Json.Nodes;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace ExecutorLambda;

public class Function
{
    public void FunctionHandler(JsonObject input, ILambdaContext context)
    {
        context.Logger.LogLine($"Entry Time: {DateTime.UtcNow}");
        context.Logger.LogLine($"Input: {input}");

        var runAt = input.TryGetPropertyValue("RunAt", out var runAtValue) ? runAtValue!.GetValue<DateTime>() : DateTime.MinValue;
        TimeSpan delay = runAt - DateTime.UtcNow;

        // If the delay is negative, the time has already passed
        if (delay.TotalMilliseconds > 0)
        {
            // Wait for the delay to run at the exact time
            Task.Delay(delay).Wait();
            context.Logger.LogLine($"Run Time: {DateTime.UtcNow}");

            // Here you can do anything you want with the input
            // For example, you can call another Lambda function
            // or call a REST API
            // or call a database
        }
    }
}

Using CDK

We’ll use CDK to set up everything we need automatically:

using Amazon.CDK;
using Amazon.CDK.AWS.Lambda;
using System.Collections.Generic;
using Constructs;
using Amazon.CDK.AWS.IAM;

namespace Cdk
{
    public class TimerCdkStack : Stack
    {
        internal TimerCdkStack(Construct scope, string id, IStackProps props = null) : base(scope, id, props)
        {
            var stateMachineNamePrefix = new CfnParameter(this, "StateMachineNamePrefix", new CfnParameterProps
            {
                Type = "String",
                Description = "The prefix for the state machine name",
                Default = "TimerStateMachine"
            });

            var targetLambdaBeforeSec = new CfnParameter(this, "TargetLambdaBeforeSec", new CfnParameterProps
            {
                Type = "Number",
                Description = "The number of seconds before the exact time the target lambda is called to handle cold start",
                Default = 10
            });

            #region Executor Callback Lambda

            // Extend this role to allow the Executor Lambda to call other resources
            var executorLambdaRole = new Role(this, "ExecutorLambdaRole", new RoleProps
            {
                AssumedBy = new ServicePrincipal("lambda.amazonaws.com"),
                ManagedPolicies =
                [
                    ManagedPolicy.FromAwsManagedPolicyName("service-role/AWSLambdaBasicExecutionRole")
                ]
            });

            var executorLambda = new Function(this, "ExecutorLambda", new FunctionProps
            {
                Runtime = Runtime.DOTNET_8,
                Handler = "ExecutorLambda::ExecutorLambda.Function::FunctionHandler",
                Code = Code.FromAsset("../src/ExecutorLambda/bin/Release/net8.0/linux-arm64/publish/"),
                Architecture = Architecture.ARM_64,
                MemorySize = 256,
                Timeout = Duration.Minutes(2),
                Role = executorLambdaRole
            });

            #endregion

            #region Timer Lambda

            var stateMachineRole = new Role(this, "StateMachineRole", new RoleProps
            {
                AssumedBy = new ServicePrincipal("states.amazonaws.com"),
                ManagedPolicies =
                [
                    ManagedPolicy.FromAwsManagedPolicyName("service-role/AWSLambdaBasicExecutionRole")
                ],
                InlinePolicies = new Dictionary<string, PolicyDocument>
                {
                    ["StepFunctionPolicy"] = new PolicyDocument(new PolicyDocumentProps
                    {
                        Statements = new[]
                        {
                            new PolicyStatement(new PolicyStatementProps
                            {
                                Actions = new[] { "lambda:InvokeFunction" },
                                Resources = executorLambda.ResourceArnsForGrantInvoke
                            })
                        }
                    })
                }
            });

            var timerLambdaRole = new Role(this, "TimerLambdaRole", new RoleProps
            {
                AssumedBy = new ServicePrincipal("lambda.amazonaws.com"),
                ManagedPolicies = new IManagedPolicy[]
                {
                    ManagedPolicy.FromAwsManagedPolicyName("service-role/AWSLambdaBasicExecutionRole")
                },
                InlinePolicies = new Dictionary<string, PolicyDocument>
                {
                    ["StepFunctionPolicy"] = new PolicyDocument(new PolicyDocumentProps
                    {
                        Statements =
                        [
                            new PolicyStatement(new PolicyStatementProps
                            {
                                Actions = new[] { "states:CreateStateMachine", "states:StartExecution" },
                                Resources = new[] { "*" }
                            }),
                            new PolicyStatement(new PolicyStatementProps
                            {
                                Actions = new[] { "iam:PassRole" },
                                Resources = new[] { stateMachineRole.RoleArn }
                            })
                        ]
                    })
                }
            });

            _ = new Function(this, "TimerLambda", new FunctionProps
            {
                Runtime = Runtime.DOTNET_8,
                Handler = "TimerLambda::TimerLambda.Function::FunctionHandler",
                Code = Code.FromAsset("../src/TimerLambda/bin/Release/net8.0/linux-arm64/publish/"),
                Architecture = Architecture.ARM_64,
                MemorySize = 256,
                Timeout = Duration.Minutes(2),
                Environment = new Dictionary<string, string>
                {
                    ["TARGET_LAMBDA_ARN"] = executorLambda.FunctionArn,
                    ["TARGET_LAMBDA_CALL_BEFORE_SEC"] = targetLambdaBeforeSec.ValueAsString,
                    ["STATE_MACHINE_ROLE_ARN"] = stateMachineRole.RoleArn,
                    ["STATE_MACHINE_NAME_PREFIX"] = stateMachineNamePrefix.ValueAsString,
                },
                Role = timerLambdaRole,
            });

            #endregion
        }
    }
}
ℹ️
Please note, we choose arm64 (AWS Graviton2 processor) for our AWS Lambdas because they’re cheaper and faster compared to x86-64. More details available here.

How to deploy

You can deploy this solution using the following commands:

cd src
dotnet publish -c Release -r linux-arm64

cd ../cdk/src
cdk deploy --parameters StateMachineNamePrefix=MyTimerStateMachine