Timer with Step Functions and AWS Lambda (.NET8)
Imagine you are developing an online auction application in AWS. Obviously, the core functionality will be a timer that counts down the time until the end of the bidding. This timer needs to be accurate, reliable, and scalable. Let’s consider a possible implementation of such a timer using AWS Step Functions and AWS Lambda.
Architecture
We implement AWS Lambda to handle timer creation requests, AWS Step Functions to manage the state of the timers, and another AWS Lambda to execute the action after the timer expires.
Timer Scheduler Lambda
This AWS Lambda is responsible for creating new timers. It receives requests for timer creation and creates a State Machine based on them. The primary parameter is RunAt
, which represents the exact time of action.
using Amazon.Lambda.Core;
using Amazon.StepFunctions;
using Amazon.StepFunctions.Model;
using System.Text.Json;
using System.Text.Json.Nodes;
// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
namespace TimerLambda;
public class Function
{
private readonly IAmazonStepFunctions _stepFunctionsClient;
private readonly string? _targetLambdaArn;
private readonly int _targetLambdaCallBeforeSec;
private readonly string? _statMachineRoleArn;
private readonly string _stateMachineNamePrefix;
public Function()
{
_stepFunctionsClient = new AmazonStepFunctionsClient();
_targetLambdaArn = Environment.GetEnvironmentVariable("TARGET_LAMBDA_ARN");
_targetLambdaCallBeforeSec = int.TryParse(Environment.GetEnvironmentVariable("TARGET_LAMBDA_CALL_BEFORE_SEC"), out int result) ? result : 0;
_stateMachineNamePrefix = Environment.GetEnvironmentVariable("STATE_MACHINE_NAME_PREFIX") ?? "TimerStateMachine";
_statMachineRoleArn = Environment.GetEnvironmentVariable("STATE_MACHINE_ROLE_ARN");
}
public async Task<object> FunctionHandler(Input input, ILambdaContext context)
{
context.Logger.LogLine($"FunctionHandler: {JsonSerializer.Serialize(input)}");
var inputObject = input;
if (inputObject == null)
{
throw new ArgumentException("Input cannot be null");
}
var waitTime = inputObject.RunAt.Subtract(DateTime.UtcNow);
context.Logger.LogLine($"WaitTime: {waitTime}");
if (waitTime < TimeSpan.Zero)
{
throw new ArgumentException("RunAt time cannot be in the past");
}
if (string.IsNullOrEmpty(_targetLambdaArn))
{
throw new ArgumentException("TARGET_LAMBDA_ARN environment variable not found");
}
var createStateMachineRequest = new CreateStateMachineRequest
{
Name = $"{_stateMachineNamePrefix}-{Guid.NewGuid()}",
Definition = GetStateMachineDefinition(inputObject.RunAt, _targetLambdaArn, _targetLambdaCallBeforeSec, inputObject.Payload ?? "{}"),
RoleArn = _statMachineRoleArn
};
context.Logger.LogLine($"CreateStateMachineRequest: {JsonSerializer.Serialize(createStateMachineRequest)}");
var createStateMachineResult = await _stepFunctionsClient.CreateStateMachineAsync(createStateMachineRequest);
var startExecutionRequest = new StartExecutionRequest
{
StateMachineArn = createStateMachineResult.StateMachineArn
};
return await _stepFunctionsClient.StartExecutionAsync(startExecutionRequest);
}
private static string GetStateMachineDefinition(DateTime runAtUtc, string targetLambdaArn, int targetLambdaCallBeforeSec, string payloadJson)
{
string waitUntil = runAtUtc.Subtract(TimeSpan.FromSeconds(targetLambdaCallBeforeSec)).ToString("yyyy-MM-ddTHH:mm:ssZ");
string exactRunAt = runAtUtc.ToString("yyyy-MM-ddTHH:mm:ssZ");
var stateMachineDefinition = new JsonObject
{
["Comment"] = "Timer Definition",
["StartAt"] = "Wait",
["States"] = new JsonObject
{
["Wait"] = new JsonObject
{
["Type"] = "Wait",
["Next"] = "Lambda Invoke",
["Timestamp"] = waitUntil
},
["Lambda Invoke"] = new JsonObject
{
["Type"] = "Task",
["Resource"] = "arn:aws:states:::lambda:invoke",
["OutputPath"] = "$.Payload",
["Parameters"] = new JsonObject
{
["FunctionName"] = targetLambdaArn,
["Payload"] = new JsonObject
{
["RunAt"] = exactRunAt,
["Payload"] = payloadJson
}
},
["End"] = true
}
}
};
return stateMachineDefinition.ToJsonString();
}
public sealed class Input
{
public DateTime RunAt { get; set; }
public string? Payload { get; set; }
}
}
In the constructor, we create a client to interact with AWS Step Functions and also read environment variables used for service configuration:
TARGET_LAMBDA_ARN
contains the AWS Lambda ARN that will be invoked after the timer expires.STATE_MACHINE_NAME_PREFIX
contains the prefix for the state machine name that will be created for each timer. With this prefix and a unique identifier, we can ensure the uniqueness of the state machine name.STATE_MACHINE_ROLE_ARN
contains the ARN of the role that will be used to create the state machine. This role must have permission to invoke the target Lambda.
The main work is located in the FunctionHandler
method, which receives a request to create a timer, validates it, and creates a new state machine for each of them.
The GetStateMachineDefinition
method generates the definition of the state machine for each timer. This definition contains two states: Wait
and Lambda Invoke
. The first state handles waiting until the timer expiration time, and the second state handles invoking the final AWS Lambda. In the AWS console, it will appear as follows:
Callback Executor Lambda
Let’s consider a possible implementation approach for the Executor Lambda, which is invoked after the timer expires. This lambda can perform various actions such as sending a message, updating a database, invoking another lambda, or calling a REST API.
using Amazon.Lambda.Core;
using System.Text.Json.Nodes;
// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
namespace ExecutorLambda;
public class Function
{
public void FunctionHandler(JsonObject input, ILambdaContext context)
{
context.Logger.LogLine($"Entry Time: {DateTime.UtcNow}");
context.Logger.LogLine($"Input: {input}");
var runAt = input.TryGetPropertyValue("RunAt", out var runAtValue) ? runAtValue!.GetValue<DateTime>() : DateTime.MinValue;
TimeSpan delay = runAt - DateTime.UtcNow;
// If the delay is negative, the time has already passed
if (delay.TotalMilliseconds > 0)
{
// Wait for the delay to run at the exact time
Task.Delay(delay).Wait();
context.Logger.LogLine($"Run Time: {DateTime.UtcNow}");
// Here you can do anything you want with the input
// For example, you can call another Lambda function
// or call a REST API
// or call a database
}
}
}
CDK
Let’s leverage CDK to describe and automate the creation of all the necessary resources we need:
using Amazon.CDK;
using Amazon.CDK.AWS.Lambda;
using System.Collections.Generic;
using Constructs;
using Amazon.CDK.AWS.IAM;
namespace Cdk
{
public class TimerCdkStack : Stack
{
internal TimerCdkStack(Construct scope, string id, IStackProps props = null) : base(scope, id, props)
{
var stateMachineNamePrefix = new CfnParameter(this, "StateMachineNamePrefix", new CfnParameterProps
{
Type = "String",
Description = "The prefix for the state machine name",
Default = "TimerStateMachine"
});
var targetLambdaBeforeSec = new CfnParameter(this, "TargetLambdaBeforeSec", new CfnParameterProps
{
Type = "Number",
Description = "The number of seconds before the exact time the target lambda is called to handle cold start",
Default = 10
});
#region Executor Callback Lambda
// Extend this role to allow the Executor Lambda to call other resources
var executorLambdaRole = new Role(this, "ExecutorLambdaRole", new RoleProps
{
AssumedBy = new ServicePrincipal("lambda.amazonaws.com"),
ManagedPolicies =
[
ManagedPolicy.FromAwsManagedPolicyName("service-role/AWSLambdaBasicExecutionRole")
]
});
var executorLambda = new Function(this, "ExecutorLambda", new FunctionProps
{
Runtime = Runtime.DOTNET_8,
Handler = "ExecutorLambda::ExecutorLambda.Function::FunctionHandler",
Code = Code.FromAsset("../src/ExecutorLambda/bin/Release/net8.0/linux-arm64/publish/"),
Architecture = Architecture.ARM_64,
MemorySize = 256,
Timeout = Duration.Minutes(2),
Role = executorLambdaRole
});
#endregion
#region Timer Lambda
var stateMachineRole = new Role(this, "StateMachineRole", new RoleProps
{
AssumedBy = new ServicePrincipal("states.amazonaws.com"),
ManagedPolicies =
[
ManagedPolicy.FromAwsManagedPolicyName("service-role/AWSLambdaBasicExecutionRole")
],
InlinePolicies = new Dictionary<string, PolicyDocument>
{
["StepFunctionPolicy"] = new PolicyDocument(new PolicyDocumentProps
{
Statements = new[]
{
new PolicyStatement(new PolicyStatementProps
{
Actions = new[] { "lambda:InvokeFunction" },
Resources = executorLambda.ResourceArnsForGrantInvoke
})
}
})
}
});
var timerLambdaRole = new Role(this, "TimerLambdaRole", new RoleProps
{
AssumedBy = new ServicePrincipal("lambda.amazonaws.com"),
ManagedPolicies = new IManagedPolicy[]
{
ManagedPolicy.FromAwsManagedPolicyName("service-role/AWSLambdaBasicExecutionRole")
},
InlinePolicies = new Dictionary<string, PolicyDocument>
{
["StepFunctionPolicy"] = new PolicyDocument(new PolicyDocumentProps
{
Statements =
[
new PolicyStatement(new PolicyStatementProps
{
Actions = new[] { "states:CreateStateMachine", "states:StartExecution" },
Resources = new[] { "*" }
}),
new PolicyStatement(new PolicyStatementProps
{
Actions = new[] { "iam:PassRole" },
Resources = new[] { stateMachineRole.RoleArn }
})
]
})
}
});
_ = new Function(this, "TimerLambda", new FunctionProps
{
Runtime = Runtime.DOTNET_8,
Handler = "TimerLambda::TimerLambda.Function::FunctionHandler",
Code = Code.FromAsset("../src/TimerLambda/bin/Release/net8.0/linux-arm64/publish/"),
Architecture = Architecture.ARM_64,
MemorySize = 256,
Timeout = Duration.Minutes(2),
Environment = new Dictionary<string, string>
{
["TARGET_LAMBDA_ARN"] = executorLambda.FunctionArn,
["TARGET_LAMBDA_CALL_BEFORE_SEC"] = targetLambdaBeforeSec.ValueAsString,
["STATE_MACHINE_ROLE_ARN"] = stateMachineRole.RoleArn,
["STATE_MACHINE_NAME_PREFIX"] = stateMachineNamePrefix.ValueAsString,
},
Role = timerLambdaRole,
});
#endregion
}
}
}
How to deploy
You can deploy this solution using the following commands:
cd src
dotnet publish -c Release -r linux-arm64
cd ../cdk/src
cdk deploy --parameters StateMachineNamePrefix=MyTimerStateMachine