Temporizador con Step Functions y AWS Lambda (.NET8)

Temporizador con Step Functions y AWS Lambda (.NET8)

Amazon Web Services Microsoft .NET AWS Lambda AWS Step Functions

Imagina que estás desarrollando una aplicación de subastas en línea en AWS. Obviamente, la funcionalidad principal será un temporizador que cuenta el tiempo hasta el final de la subasta. Este temporizador debe ser preciso, confiable y escalable. Consideremos una posible implementación de dicho temporizador utilizando AWS Step Functions y AWS Lambda.

Arquitectura

Diagrama de arquitectura simple del temporizador con Step Functions y Lambdas de AWS
Diagrama de arquitectura simple del temporizador con Step Functions y Lambdas de AWS

Implementamos AWS Lambda para manejar las solicitudes de creación de temporizadores, AWS Step Functions para administrar el estado de los temporizadores y otra AWS Lambda para ejecutar la acción después de que expire el temporizador.

Lambda de programación de temporizador

Esta AWS Lambda es responsable de crear nuevos temporizadores. Recibe solicitudes de creación de temporizadores y crea una Máquina de Estados basada en ellas. El parámetro principal es RunAt, que representa el momento exacto de la acción.

using Amazon.Lambda.Core;
using Amazon.StepFunctions;
using Amazon.StepFunctions.Model;
using System.Text.Json;
using System.Text.Json.Nodes;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace TimerLambda;

public class Function
{
    private readonly IAmazonStepFunctions _stepFunctionsClient;
    private readonly string? _targetLambdaArn;
    private readonly int _targetLambdaCallBeforeSec;
    private readonly string? _statMachineRoleArn;
    private readonly string _stateMachineNamePrefix;

    public Function()
    {
        _stepFunctionsClient = new AmazonStepFunctionsClient();
        _targetLambdaArn = Environment.GetEnvironmentVariable("TARGET_LAMBDA_ARN");
        _targetLambdaCallBeforeSec = int.TryParse(Environment.GetEnvironmentVariable("TARGET_LAMBDA_CALL_BEFORE_SEC"), out int result) ? result : 0;
        _stateMachineNamePrefix = Environment.GetEnvironmentVariable("STATE_MACHINE_NAME_PREFIX") ?? "TimerStateMachine";
        _statMachineRoleArn = Environment.GetEnvironmentVariable("STATE_MACHINE_ROLE_ARN");
    }

    public async Task<object> FunctionHandler(Input input, ILambdaContext context)
    {
        context.Logger.LogLine($"FunctionHandler: {JsonSerializer.Serialize(input)}");

        var inputObject = input;
        if (inputObject == null)
        {
            throw new ArgumentException("Input cannot be null");
        }

        var waitTime = inputObject.RunAt.Subtract(DateTime.UtcNow);
        context.Logger.LogLine($"WaitTime: {waitTime}");
        if (waitTime < TimeSpan.Zero)
        {
            throw new ArgumentException("RunAt time cannot be in the past");
        }

        if (string.IsNullOrEmpty(_targetLambdaArn))
        {
            throw new ArgumentException("TARGET_LAMBDA_ARN environment variable not found");
        }

        var createStateMachineRequest = new CreateStateMachineRequest
        {
            Name = $"{_stateMachineNamePrefix}-{Guid.NewGuid()}",
            Definition = GetStateMachineDefinition(inputObject.RunAt, _targetLambdaArn, _targetLambdaCallBeforeSec, inputObject.Payload ?? "{}"),
            RoleArn = _statMachineRoleArn
        };

        context.Logger.LogLine($"CreateStateMachineRequest: {JsonSerializer.Serialize(createStateMachineRequest)}");

        var createStateMachineResult = await _stepFunctionsClient.CreateStateMachineAsync(createStateMachineRequest);
        var startExecutionRequest = new StartExecutionRequest
        {
            StateMachineArn = createStateMachineResult.StateMachineArn
        };

        return await _stepFunctionsClient.StartExecutionAsync(startExecutionRequest);
    }

    private static string GetStateMachineDefinition(DateTime runAtUtc, string targetLambdaArn, int targetLambdaCallBeforeSec, string payloadJson)
    {
        string waitUntil = runAtUtc.Subtract(TimeSpan.FromSeconds(targetLambdaCallBeforeSec)).ToString("yyyy-MM-ddTHH:mm:ssZ");
        string exactRunAt = runAtUtc.ToString("yyyy-MM-ddTHH:mm:ssZ");
        var stateMachineDefinition = new JsonObject
        {
            ["Comment"] = "Timer Definition",
            ["StartAt"] = "Wait",
            ["States"] = new JsonObject
            {
                ["Wait"] = new JsonObject
                {
                    ["Type"] = "Wait",
                    ["Next"] = "Lambda Invoke",
                    ["Timestamp"] = waitUntil
                },
                ["Lambda Invoke"] = new JsonObject
                {
                    ["Type"] = "Task",
                    ["Resource"] = "arn:aws:states:::lambda:invoke",
                    ["OutputPath"] = "$.Payload",
                    ["Parameters"] = new JsonObject
                    {
                        ["FunctionName"] = targetLambdaArn,
                        ["Payload"] = new JsonObject
                        {
                            ["RunAt"] = exactRunAt,
                            ["Payload"] = payloadJson
                        }
                    },
                    ["End"] = true
                }
            }
        };

        return stateMachineDefinition.ToJsonString();
    }

    public sealed class Input
    {
        public DateTime RunAt { get; set; }
        public string? Payload { get; set; }
    }
}

En el constructor, creamos un cliente para interactuar con AWS Step Functions y también leemos variables de entorno utilizadas para la configuración del servicio:

  • TARGET_LAMBDA_ARN contiene el ARN de AWS Lambda que se invocará después de que expire el temporizador.
  • STATE_MACHINE_NAME_PREFIX contiene el prefijo para el nombre de la máquina de estados que se creará para cada temporizador. Con este prefijo y un identificador único, podemos garantizar la unicidad del nombre de la máquina de estados.
  • STATE_MACHINE_ROLE_ARN contiene el ARN del rol que se utilizará para crear la máquina de estados. Este rol debe tener permisos para invocar la Lambda objetivo.

El trabajo principal se encuentra en el método FunctionHandler, que recibe una solicitud para crear un temporizador, la valida y crea una nueva máquina de estados para cada uno de ellos.

El método GetStateMachineDefinition genera la definición de la máquina de estados para cada temporizador. Esta definición contiene dos estados: Wait y Lambda Invoke. El primer estado se encarga de esperar hasta que expire el temporizador, y el segundo estado se encarga de invocar la Lambda final. En la consola de AWS, se verá de la siguiente manera:

Ejemplo de máquina de estados
Diagrama de arquitectura simple del temporizador con Step Functions y Lambdas de AWS

Lambda Ejecutor de Callback

Consideremos un enfoque de implementación posible para el Lambda Ejecutor, que se invoca después de que el temporizador expire. Este lambda puede realizar diversas acciones, como enviar un mensaje, actualizar una base de datos, invocar otro lambda o llamar a una API REST.

using Amazon.Lambda.Core;
using System.Text.Json.Nodes;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace ExecutorLambda;

public class Function
{
    public void FunctionHandler(JsonObject input, ILambdaContext context)
    {
        context.Logger.LogLine($"Entry Time: {DateTime.UtcNow}");
        context.Logger.LogLine($"Input: {input}");

        var runAt = input.TryGetPropertyValue("RunAt", out var runAtValue) ? runAtValue!.GetValue<DateTime>() : DateTime.MinValue;
        TimeSpan delay = runAt - DateTime.UtcNow;

        // If the delay is negative, the time has already passed
        if (delay.TotalMilliseconds > 0)
        {
            // Wait for the delay to run at the exact time
            Task.Delay(delay).Wait();
            context.Logger.LogLine($"Run Time: {DateTime.UtcNow}");

            // Here you can do anything you want with the input
            // For example, you can call another Lambda function
            // or call a REST API
            // or call a database
        }
    }
}

CDK

Aprovechemos CDK para describir y automatizar la creación de todos los recursos necesarios que necesitamos:

using Amazon.CDK;
using Amazon.CDK.AWS.Lambda;
using System.Collections.Generic;
using Constructs;
using Amazon.CDK.AWS.IAM;

namespace Cdk
{
    public class TimerCdkStack : Stack
    {
        internal TimerCdkStack(Construct scope, string id, IStackProps props = null) : base(scope, id, props)
        {
            var stateMachineNamePrefix = new CfnParameter(this, "StateMachineNamePrefix", new CfnParameterProps
            {
                Type = "String",
                Description = "The prefix for the state machine name",
                Default = "TimerStateMachine"
            });

            var targetLambdaBeforeSec = new CfnParameter(this, "TargetLambdaBeforeSec", new CfnParameterProps
            {
                Type = "Number",
                Description = "The number of seconds before the exact time the target lambda is called to handle cold start",
                Default = 10
            });

            #region Executor Callback Lambda

            // Extend this role to allow the Executor Lambda to call other resources
            var executorLambdaRole = new Role(this, "ExecutorLambdaRole", new RoleProps
            {
                AssumedBy = new ServicePrincipal("lambda.amazonaws.com"),
                ManagedPolicies =
                [
                    ManagedPolicy.FromAwsManagedPolicyName("service-role/AWSLambdaBasicExecutionRole")
                ]
            });

            var executorLambda = new Function(this, "ExecutorLambda", new FunctionProps
            {
                Runtime = Runtime.DOTNET_8,
                Handler = "ExecutorLambda::ExecutorLambda.Function::FunctionHandler",
                Code = Code.FromAsset("../src/ExecutorLambda/bin/Release/net8.0/linux-arm64/publish/"),
                Architecture = Architecture.ARM_64,
                MemorySize = 256,
                Timeout = Duration.Minutes(2),
                Role = executorLambdaRole
            });

            #endregion

            #region Timer Lambda

            var stateMachineRole = new Role(this, "StateMachineRole", new RoleProps
            {
                AssumedBy = new ServicePrincipal("states.amazonaws.com"),
                ManagedPolicies =
                [
                    ManagedPolicy.FromAwsManagedPolicyName("service-role/AWSLambdaBasicExecutionRole")
                ],
                InlinePolicies = new Dictionary<string, PolicyDocument>
                {
                    ["StepFunctionPolicy"] = new PolicyDocument(new PolicyDocumentProps
                    {
                        Statements = new[]
                        {
                            new PolicyStatement(new PolicyStatementProps
                            {
                                Actions = new[] { "lambda:InvokeFunction" },
                                Resources = executorLambda.ResourceArnsForGrantInvoke
                            })
                        }
                    })
                }
            });

            var timerLambdaRole = new Role(this, "TimerLambdaRole", new RoleProps
            {
                AssumedBy = new ServicePrincipal("lambda.amazonaws.com"),
                ManagedPolicies = new IManagedPolicy[]
                {
                    ManagedPolicy.FromAwsManagedPolicyName("service-role/AWSLambdaBasicExecutionRole")
                },
                InlinePolicies = new Dictionary<string, PolicyDocument>
                {
                    ["StepFunctionPolicy"] = new PolicyDocument(new PolicyDocumentProps
                    {
                        Statements =
                        [
                            new PolicyStatement(new PolicyStatementProps
                            {
                                Actions = new[] { "states:CreateStateMachine", "states:StartExecution" },
                                Resources = new[] { "*" }
                            }),
                            new PolicyStatement(new PolicyStatementProps
                            {
                                Actions = new[] { "iam:PassRole" },
                                Resources = new[] { stateMachineRole.RoleArn }
                            })
                        ]
                    })
                }
            });

            _ = new Function(this, "TimerLambda", new FunctionProps
            {
                Runtime = Runtime.DOTNET_8,
                Handler = "TimerLambda::TimerLambda.Function::FunctionHandler",
                Code = Code.FromAsset("../src/TimerLambda/bin/Release/net8.0/linux-arm64/publish/"),
                Architecture = Architecture.ARM_64,
                MemorySize = 256,
                Timeout = Duration.Minutes(2),
                Environment = new Dictionary<string, string>
                {
                    ["TARGET_LAMBDA_ARN"] = executorLambda.FunctionArn,
                    ["TARGET_LAMBDA_CALL_BEFORE_SEC"] = targetLambdaBeforeSec.ValueAsString,
                    ["STATE_MACHINE_ROLE_ARN"] = stateMachineRole.RoleArn,
                    ["STATE_MACHINE_NAME_PREFIX"] = stateMachineNamePrefix.ValueAsString,
                },
                Role = timerLambdaRole,
            });

            #endregion
        }
    }
}
ℹ️
Por favor, ten en cuenta que elegimos arm64 (procesador AWS Graviton2) para nuestras Lambdas de AWS porque son más económicas y rápidas en comparación con x86-64. Más detalles disponibles aquí.

Cómo desplegar

Puedes desplegar esta solución utilizando los siguientes comandos:

cd src
dotnet publish -c Release -r linux-arm64

cd ../cdk/src
cdk deploy --parameters StateMachineNamePrefix=MyTimerStateMachine