Timer with Step Functions and AWS Lambda (.NET8)
Imagine you’re building an online auction app on AWS. A key feature is a timer that counts down to the end of bidding. This timer needs to be accurate, reliable, and scalable. Here’s how you can set it up using AWS Step Functions and AWS Lambda.
How It Works
We’ll use AWS Lambda to handle requests for new timers, AWS Step Functions to keep track of the timers, and another AWS Lambda to do something when the timer ends.
Timer Scheduler Lambda
This AWS Lambda creates new timers. It gets requests to start a timer and sets up a State Machine for each one. The main thing it needs is RunAt
, which is when the action should happen.
using Amazon.Lambda.Core;
using Amazon.StepFunctions;
using Amazon.StepFunctions.Model;
using System.Text.Json;
using System.Text.Json.Nodes;
// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
namespace TimerLambda;
public class Function
{
private readonly IAmazonStepFunctions _stepFunctionsClient;
private readonly string? _targetLambdaArn;
private readonly int _targetLambdaCallBeforeSec;
private readonly string? _statMachineRoleArn;
private readonly string _stateMachineNamePrefix;
public Function()
{
_stepFunctionsClient = new AmazonStepFunctionsClient();
_targetLambdaArn = Environment.GetEnvironmentVariable("TARGET_LAMBDA_ARN");
_targetLambdaCallBeforeSec = int.TryParse(Environment.GetEnvironmentVariable("TARGET_LAMBDA_CALL_BEFORE_SEC"), out int result) ? result : 0;
_stateMachineNamePrefix = Environment.GetEnvironmentVariable("STATE_MACHINE_NAME_PREFIX") ?? "TimerStateMachine";
_statMachineRoleArn = Environment.GetEnvironmentVariable("STATE_MACHINE_ROLE_ARN");
}
public async Task<object> FunctionHandler(Input input, ILambdaContext context)
{
context.Logger.LogLine($"FunctionHandler: {JsonSerializer.Serialize(input)}");
var inputObject = input;
if (inputObject == null)
{
throw new ArgumentException("Input cannot be null");
}
var waitTime = inputObject.RunAt.Subtract(DateTime.UtcNow);
context.Logger.LogLine($"WaitTime: {waitTime}");
if (waitTime < TimeSpan.Zero)
{
throw new ArgumentException("RunAt time cannot be in the past");
}
if (string.IsNullOrEmpty(_targetLambdaArn))
{
throw new ArgumentException("TARGET_LAMBDA_ARN environment variable not found");
}
var createStateMachineRequest = new CreateStateMachineRequest
{
Name = $"{_stateMachineNamePrefix}-{Guid.NewGuid()}",
Definition = GetStateMachineDefinition(inputObject.RunAt, _targetLambdaArn, _targetLambdaCallBeforeSec, inputObject.Payload ?? "{}"),
RoleArn = _statMachineRoleArn
};
context.Logger.LogLine($"CreateStateMachineRequest: {JsonSerializer.Serialize(createStateMachineRequest)}");
var createStateMachineResult = await _stepFunctionsClient.CreateStateMachineAsync(createStateMachineRequest);
var startExecutionRequest = new StartExecutionRequest
{
StateMachineArn = createStateMachineResult.StateMachineArn
};
return await _stepFunctionsClient.StartExecutionAsync(startExecutionRequest);
}
private static string GetStateMachineDefinition(DateTime runAtUtc, string targetLambdaArn, int targetLambdaCallBeforeSec, string payloadJson)
{
string waitUntil = runAtUtc.Subtract(TimeSpan.FromSeconds(targetLambdaCallBeforeSec)).ToString("yyyy-MM-ddTHH:mm:ssZ");
string exactRunAt = runAtUtc.ToString("yyyy-MM-ddTHH:mm:ssZ");
var stateMachineDefinition = new JsonObject
{
["Comment"] = "Timer Definition",
["StartAt"] = "Wait",
["States"] = new JsonObject
{
["Wait"] = new JsonObject
{
["Type"] = "Wait",
["Next"] = "Lambda Invoke",
["Timestamp"] = waitUntil
},
["Lambda Invoke"] = new JsonObject
{
["Type"] = "Task",
["Resource"] = "arn:aws:states:::lambda:invoke",
["OutputPath"] = "$.Payload",
["Parameters"] = new JsonObject
{
["FunctionName"] = targetLambdaArn,
["Payload"] = new JsonObject
{
["RunAt"] = exactRunAt,
["Payload"] = payloadJson
}
},
["End"] = true
}
}
};
return stateMachineDefinition.ToJsonString();
}
public sealed class Input
{
public DateTime RunAt { get; set; }
public string? Payload { get; set; }
}
}
In the setup, we connect to AWS Step Functions and read some settings:
TARGET_LAMBDA_ARN
: The AWS Lambda to call when the timer ends.STATE_MACHINE_NAME_PREFIX
: A prefix for naming each timer’s state machine.STATE_MACHINE_ROLE_ARN
: The role that lets the state machine call the target Lambda.
The FunctionHandler
method is where the action happens. It checks the request, makes sure it’s valid, and sets up a new state machine.
The GetStateMachineDefinition
method creates the state machine’s plan. It has two parts: Wait
(waits until the timer ends) and Lambda Invoke
(calls the final AWS Lambda).
Callback Executor Lambda
This Lambda runs after the timer ends. It can do things like send a message, update a database, call another Lambda, or hit a REST API.
using Amazon.Lambda.Core;
using System.Text.Json.Nodes;
// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
namespace ExecutorLambda;
public class Function
{
public void FunctionHandler(JsonObject input, ILambdaContext context)
{
context.Logger.LogLine($"Entry Time: {DateTime.UtcNow}");
context.Logger.LogLine($"Input: {input}");
var runAt = input.TryGetPropertyValue("RunAt", out var runAtValue) ? runAtValue!.GetValue<DateTime>() : DateTime.MinValue;
TimeSpan delay = runAt - DateTime.UtcNow;
// If the delay is negative, the time has already passed
if (delay.TotalMilliseconds > 0)
{
// Wait for the delay to run at the exact time
Task.Delay(delay).Wait();
context.Logger.LogLine($"Run Time: {DateTime.UtcNow}");
// Here you can do anything you want with the input
// For example, you can call another Lambda function
// or call a REST API
// or call a database
}
}
}
Using CDK
We’ll use CDK to set up everything we need automatically:
using Amazon.CDK;
using Amazon.CDK.AWS.Lambda;
using System.Collections.Generic;
using Constructs;
using Amazon.CDK.AWS.IAM;
namespace Cdk
{
public class TimerCdkStack : Stack
{
internal TimerCdkStack(Construct scope, string id, IStackProps props = null) : base(scope, id, props)
{
var stateMachineNamePrefix = new CfnParameter(this, "StateMachineNamePrefix", new CfnParameterProps
{
Type = "String",
Description = "The prefix for the state machine name",
Default = "TimerStateMachine"
});
var targetLambdaBeforeSec = new CfnParameter(this, "TargetLambdaBeforeSec", new CfnParameterProps
{
Type = "Number",
Description = "The number of seconds before the exact time the target lambda is called to handle cold start",
Default = 10
});
#region Executor Callback Lambda
// Extend this role to allow the Executor Lambda to call other resources
var executorLambdaRole = new Role(this, "ExecutorLambdaRole", new RoleProps
{
AssumedBy = new ServicePrincipal("lambda.amazonaws.com"),
ManagedPolicies =
[
ManagedPolicy.FromAwsManagedPolicyName("service-role/AWSLambdaBasicExecutionRole")
]
});
var executorLambda = new Function(this, "ExecutorLambda", new FunctionProps
{
Runtime = Runtime.DOTNET_8,
Handler = "ExecutorLambda::ExecutorLambda.Function::FunctionHandler",
Code = Code.FromAsset("../src/ExecutorLambda/bin/Release/net8.0/linux-arm64/publish/"),
Architecture = Architecture.ARM_64,
MemorySize = 256,
Timeout = Duration.Minutes(2),
Role = executorLambdaRole
});
#endregion
#region Timer Lambda
var stateMachineRole = new Role(this, "StateMachineRole", new RoleProps
{
AssumedBy = new ServicePrincipal("states.amazonaws.com"),
ManagedPolicies =
[
ManagedPolicy.FromAwsManagedPolicyName("service-role/AWSLambdaBasicExecutionRole")
],
InlinePolicies = new Dictionary<string, PolicyDocument>
{
["StepFunctionPolicy"] = new PolicyDocument(new PolicyDocumentProps
{
Statements = new[]
{
new PolicyStatement(new PolicyStatementProps
{
Actions = new[] { "lambda:InvokeFunction" },
Resources = executorLambda.ResourceArnsForGrantInvoke
})
}
})
}
});
var timerLambdaRole = new Role(this, "TimerLambdaRole", new RoleProps
{
AssumedBy = new ServicePrincipal("lambda.amazonaws.com"),
ManagedPolicies = new IManagedPolicy[]
{
ManagedPolicy.FromAwsManagedPolicyName("service-role/AWSLambdaBasicExecutionRole")
},
InlinePolicies = new Dictionary<string, PolicyDocument>
{
["StepFunctionPolicy"] = new PolicyDocument(new PolicyDocumentProps
{
Statements =
[
new PolicyStatement(new PolicyStatementProps
{
Actions = new[] { "states:CreateStateMachine", "states:StartExecution" },
Resources = new[] { "*" }
}),
new PolicyStatement(new PolicyStatementProps
{
Actions = new[] { "iam:PassRole" },
Resources = new[] { stateMachineRole.RoleArn }
})
]
})
}
});
_ = new Function(this, "TimerLambda", new FunctionProps
{
Runtime = Runtime.DOTNET_8,
Handler = "TimerLambda::TimerLambda.Function::FunctionHandler",
Code = Code.FromAsset("../src/TimerLambda/bin/Release/net8.0/linux-arm64/publish/"),
Architecture = Architecture.ARM_64,
MemorySize = 256,
Timeout = Duration.Minutes(2),
Environment = new Dictionary<string, string>
{
["TARGET_LAMBDA_ARN"] = executorLambda.FunctionArn,
["TARGET_LAMBDA_CALL_BEFORE_SEC"] = targetLambdaBeforeSec.ValueAsString,
["STATE_MACHINE_ROLE_ARN"] = stateMachineRole.RoleArn,
["STATE_MACHINE_NAME_PREFIX"] = stateMachineNamePrefix.ValueAsString,
},
Role = timerLambdaRole,
});
#endregion
}
}
}
How to deploy
You can deploy this solution using the following commands:
cd src
dotnet publish -c Release -r linux-arm64
cd ../cdk/src
cdk deploy --parameters StateMachineNamePrefix=MyTimerStateMachine