Temporizador con Step Functions y AWS Lambda (.NET8)

Temporizador con Step Functions y AWS Lambda (.NET8)

Amazon Web Services Microsoft .NET AWS Lambda AWS Step Functions

Imagina que estás construyendo una aplicación de subastas en línea en AWS. Una característica clave es un temporizador que cuenta regresivamente hasta el final de la subasta. Este temporizador necesita ser preciso, confiable y escalable. Aquí te mostramos cómo configurarlo usando AWS Step Functions y AWS Lambda.

Cómo Funciona

Diagrama de arquitectura de temporizador simple con Step Functions y Lambdas
Diagrama de arquitectura de temporizador simple con Step Functions y Lambdas

Usaremos AWS Lambda para manejar las solicitudes de nuevos temporizadores, AWS Step Functions para llevar el control de los temporizadores, y otro AWS Lambda para hacer algo cuando el temporizador termine.

Lambda Programador de Temporizador

Este AWS Lambda crea nuevos temporizadores. Recibe solicitudes para iniciar un temporizador y configura una Máquina de Estado para cada uno. Lo principal que necesita es RunAt, que es cuando debe ocurrir la acción.

using Amazon.Lambda.Core;
using Amazon.StepFunctions;
using Amazon.StepFunctions.Model;
using System.Text.Json;
using System.Text.Json.Nodes;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace TimerLambda;

public class Function
{
    private readonly IAmazonStepFunctions _stepFunctionsClient;
    private readonly string? _targetLambdaArn;
    private readonly int _targetLambdaCallBeforeSec;
    private readonly string? _statMachineRoleArn;
    private readonly string _stateMachineNamePrefix;

    public Function()
    {
        _stepFunctionsClient = new AmazonStepFunctionsClient();
        _targetLambdaArn = Environment.GetEnvironmentVariable("TARGET_LAMBDA_ARN");
        _targetLambdaCallBeforeSec = int.TryParse(Environment.GetEnvironmentVariable("TARGET_LAMBDA_CALL_BEFORE_SEC"), out int result) ? result : 0;
        _stateMachineNamePrefix = Environment.GetEnvironmentVariable("STATE_MACHINE_NAME_PREFIX") ?? "TimerStateMachine";
        _statMachineRoleArn = Environment.GetEnvironmentVariable("STATE_MACHINE_ROLE_ARN");
    }

    public async Task<object> FunctionHandler(Input input, ILambdaContext context)
    {
        context.Logger.LogLine($"FunctionHandler: {JsonSerializer.Serialize(input)}");

        var inputObject = input;
        if (inputObject == null)
        {
            throw new ArgumentException("Input cannot be null");
        }

        var waitTime = inputObject.RunAt.Subtract(DateTime.UtcNow);
        context.Logger.LogLine($"WaitTime: {waitTime}");
        if (waitTime < TimeSpan.Zero)
        {
            throw new ArgumentException("RunAt time cannot be in the past");
        }

        if (string.IsNullOrEmpty(_targetLambdaArn))
        {
            throw new ArgumentException("TARGET_LAMBDA_ARN environment variable not found");
        }

        var createStateMachineRequest = new CreateStateMachineRequest
        {
            Name = $"{_stateMachineNamePrefix}-{Guid.NewGuid()}",
            Definition = GetStateMachineDefinition(inputObject.RunAt, _targetLambdaArn, _targetLambdaCallBeforeSec, inputObject.Payload ?? "{}"),
            RoleArn = _statMachineRoleArn
        };

        context.Logger.LogLine($"CreateStateMachineRequest: {JsonSerializer.Serialize(createStateMachineRequest)}");

        var createStateMachineResult = await _stepFunctionsClient.CreateStateMachineAsync(createStateMachineRequest);
        var startExecutionRequest = new StartExecutionRequest
        {
            StateMachineArn = createStateMachineResult.StateMachineArn
        };

        return await _stepFunctionsClient.StartExecutionAsync(startExecutionRequest);
    }

    private static string GetStateMachineDefinition(DateTime runAtUtc, string targetLambdaArn, int targetLambdaCallBeforeSec, string payloadJson)
    {
        string waitUntil = runAtUtc.Subtract(TimeSpan.FromSeconds(targetLambdaCallBeforeSec)).ToString("yyyy-MM-ddTHH:mm:ssZ");
        string exactRunAt = runAtUtc.ToString("yyyy-MM-ddTHH:mm:ssZ");
        var stateMachineDefinition = new JsonObject
        {
            ["Comment"] = "Timer Definition",
            ["StartAt"] = "Wait",
            ["States"] = new JsonObject
            {
                ["Wait"] = new JsonObject
                {
                    ["Type"] = "Wait",
                    ["Next"] = "Lambda Invoke",
                    ["Timestamp"] = waitUntil
                },
                ["Lambda Invoke"] = new JsonObject
                {
                    ["Type"] = "Task",
                    ["Resource"] = "arn:aws:states:::lambda:invoke",
                    ["OutputPath"] = "$.Payload",
                    ["Parameters"] = new JsonObject
                    {
                        ["FunctionName"] = targetLambdaArn,
                        ["Payload"] = new JsonObject
                        {
                            ["RunAt"] = exactRunAt,
                            ["Payload"] = payloadJson
                        }
                    },
                    ["End"] = true
                }
            }
        };

        return stateMachineDefinition.ToJsonString();
    }

    public sealed class Input
    {
        public DateTime RunAt { get; set; }
        public string? Payload { get; set; }
    }
}

En la configuración, nos conectamos a AWS Step Functions y leemos algunas configuraciones:

  • TARGET_LAMBDA_ARN: El AWS Lambda que se llamará cuando termine el temporizador.
  • STATE_MACHINE_NAME_PREFIX: Un prefijo para nombrar la máquina de estado de cada temporizador.
  • STATE_MACHINE_ROLE_ARN: El rol que permite a la máquina de estado llamar al Lambda objetivo.

El método FunctionHandler es donde ocurre la acción. Verifica la solicitud, se asegura de que sea válida y configura una nueva máquina de estado.

El método GetStateMachineDefinition crea el plan de la máquina de estado. Tiene dos partes: Wait (espera hasta que termine el temporizador) y Lambda Invoke (llama al AWS Lambda final).

Lambda Ejecutor de Callback

Este Lambda se ejecuta después de que el temporizador termine. Puede hacer cosas como enviar un mensaje, actualizar una base de datos, llamar a otro Lambda o acceder a una API REST.

using Amazon.Lambda.Core;
using System.Text.Json.Nodes;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace ExecutorLambda;

public class Function
{
    public void FunctionHandler(JsonObject input, ILambdaContext context)
    {
        context.Logger.LogLine($"Entry Time: {DateTime.UtcNow}");
        context.Logger.LogLine($"Input: {input}");

        var runAt = input.TryGetPropertyValue("RunAt", out var runAtValue) ? runAtValue!.GetValue<DateTime>() : DateTime.MinValue;
        TimeSpan delay = runAt - DateTime.UtcNow;

        // If the delay is negative, the time has already passed
        if (delay.TotalMilliseconds > 0)
        {
            // Wait for the delay to run at the exact time
            Task.Delay(delay).Wait();
            context.Logger.LogLine($"Run Time: {DateTime.UtcNow}");

            // Here you can do anything you want with the input
            // For example, you can call another Lambda function
            // or call a REST API
            // or call a database
        }
    }
}

Usando CDK

Usaremos CDK para configurar todo lo que necesitamos automáticamente:

using Amazon.CDK;
using Amazon.CDK.AWS.Lambda;
using System.Collections.Generic;
using Constructs;
using Amazon.CDK.AWS.IAM;

namespace Cdk
{
    public class TimerCdkStack : Stack
    {
        internal TimerCdkStack(Construct scope, string id, IStackProps props = null) : base(scope, id, props)
        {
            var stateMachineNamePrefix = new CfnParameter(this, "StateMachineNamePrefix", new CfnParameterProps
            {
                Type = "String",
                Description = "The prefix for the state machine name",
                Default = "TimerStateMachine"
            });

            var targetLambdaBeforeSec = new CfnParameter(this, "TargetLambdaBeforeSec", new CfnParameterProps
            {
                Type = "Number",
                Description = "The number of seconds before the exact time the target lambda is called to handle cold start",
                Default = 10
            });

            #region Executor Callback Lambda

            // Extend this role to allow the Executor Lambda to call other resources
            var executorLambdaRole = new Role(this, "ExecutorLambdaRole", new RoleProps
            {
                AssumedBy = new ServicePrincipal("lambda.amazonaws.com"),
                ManagedPolicies =
                [
                    ManagedPolicy.FromAwsManagedPolicyName("service-role/AWSLambdaBasicExecutionRole")
                ]
            });

            var executorLambda = new Function(this, "ExecutorLambda", new FunctionProps
            {
                Runtime = Runtime.DOTNET_8,
                Handler = "ExecutorLambda::ExecutorLambda.Function::FunctionHandler",
                Code = Code.FromAsset("../src/ExecutorLambda/bin/Release/net8.0/linux-arm64/publish/"),
                Architecture = Architecture.ARM_64,
                MemorySize = 256,
                Timeout = Duration.Minutes(2),
                Role = executorLambdaRole
            });

            #endregion

            #region Timer Lambda

            var stateMachineRole = new Role(this, "StateMachineRole", new RoleProps
            {
                AssumedBy = new ServicePrincipal("states.amazonaws.com"),
                ManagedPolicies =
                [
                    ManagedPolicy.FromAwsManagedPolicyName("service-role/AWSLambdaBasicExecutionRole")
                ],
                InlinePolicies = new Dictionary<string, PolicyDocument>
                {
                    ["StepFunctionPolicy"] = new PolicyDocument(new PolicyDocumentProps
                    {
                        Statements = new[]
                        {
                            new PolicyStatement(new PolicyStatementProps
                            {
                                Actions = new[] { "lambda:InvokeFunction" },
                                Resources = executorLambda.ResourceArnsForGrantInvoke
                            })
                        }
                    })
                }
            });

            var timerLambdaRole = new Role(this, "TimerLambdaRole", new RoleProps
            {
                AssumedBy = new ServicePrincipal("lambda.amazonaws.com"),
                ManagedPolicies = new IManagedPolicy[]
                {
                    ManagedPolicy.FromAwsManagedPolicyName("service-role/AWSLambdaBasicExecutionRole")
                },
                InlinePolicies = new Dictionary<string, PolicyDocument>
                {
                    ["StepFunctionPolicy"] = new PolicyDocument(new PolicyDocumentProps
                    {
                        Statements =
                        [
                            new PolicyStatement(new PolicyStatementProps
                            {
                                Actions = new[] { "states:CreateStateMachine", "states:StartExecution" },
                                Resources = new[] { "*" }
                            }),
                            new PolicyStatement(new PolicyStatementProps
                            {
                                Actions = new[] { "iam:PassRole" },
                                Resources = new[] { stateMachineRole.RoleArn }
                            })
                        ]
                    })
                }
            });

            _ = new Function(this, "TimerLambda", new FunctionProps
            {
                Runtime = Runtime.DOTNET_8,
                Handler = "TimerLambda::TimerLambda.Function::FunctionHandler",
                Code = Code.FromAsset("../src/TimerLambda/bin/Release/net8.0/linux-arm64/publish/"),
                Architecture = Architecture.ARM_64,
                MemorySize = 256,
                Timeout = Duration.Minutes(2),
                Environment = new Dictionary<string, string>
                {
                    ["TARGET_LAMBDA_ARN"] = executorLambda.FunctionArn,
                    ["TARGET_LAMBDA_CALL_BEFORE_SEC"] = targetLambdaBeforeSec.ValueAsString,
                    ["STATE_MACHINE_ROLE_ARN"] = stateMachineRole.RoleArn,
                    ["STATE_MACHINE_NAME_PREFIX"] = stateMachineNamePrefix.ValueAsString,
                },
                Role = timerLambdaRole,
            });

            #endregion
        }
    }
}
ℹ️
Por favor, ten en cuenta que elegimos arm64 (procesador AWS Graviton2) para nuestras Lambdas de AWS porque son más económicas y rápidas en comparación con x86-64. Más detalles disponibles aquí.

Cómo desplegar

Puedes desplegar esta solución utilizando los siguientes comandos:

cd src
dotnet publish -c Release -r linux-arm64

cd ../cdk/src
cdk deploy --parameters StateMachineNamePrefix=MyTimerStateMachine