🐉 AWS CDK 101 – 🏇 Using batched dynamodb stream to delete item on another dynamodb table

🔰 Freshmen new to AWS CDK, please do have a look at my earlier articles one after the other on this sequence.

If in case missed my earlier article, do discover it with the beneath hyperlinks.

🔁 Authentic earlier publish at 🔗 Dev Post

🔁 Reposted earlier publish at 🔗 dev to @aravindvcyber

On this article, allow us to add a brand new async batch integration to our message dynamodb desk which is able to assist us to delete the processed information from the staging dynamodb desk.



Advantages on this method 💦

  • On this method we tried to clear the staging desk knowledge, asynchronously utilizing the dynamodb stream instantly invoking a lambda.
  • So we will use this in programs the place we might indirectly do the scavenging synchronous in a compute-intensive workload. Saving compute by avoiding wait time for these I/O.
  • Once more, some might argue that anyway I’m making a lambda name elsewhere individually, which could possibly be regarded as a superb decoupling technique as properly.
  • However invocation expenses and compute nonetheless apply there. But, we’d like no yet one more factor that the streams might be prepared in batches by our lambda and the default is 100, so one invocation.
  • On the identical time, the handler can do a batch request to delete the info from the dynamodb in a single request.
  • So not solely you may learn a batch of streams, however you too can carry out batch delete which, is quick in addition to a superb discount in utilization of write models in dynamodb desk when you’ve such throttled async integrations.
  • Possibly I’ll write a separate article later about it, for now, it’s not restricted to the 1:1 delete operation right here.

batched stream



Planning 🐞

Right here we will probably be making use of the dynamodb streams to set off the deleteItem motion by invoking a brand new lambda handler operate which achieves our goal.



Building 💮

We’d like a brand new lambda operate code that has a recent handler to obtain the dynamodb stream occasion object and course of it as proven beneath.

Create a brand new lambda operate within the file lambda/message-stream.ts.

Right here you could discover that we’re focusing on an occasion title to be INSERT, likewise we will have finer management over our desired end result throughout these stream invocations as proven beneath.



New deal with operate logic stuffed_flatbread:

exports.created = async operate (occasion: any) {
  console.log("Acquired stream:", JSON.stringify(occasion, undefined, 2));
  const outcomes: any[] = [];
  await Promise.all(
    occasion.Data.map(async (Document: any) => {
      console.log(JSON.stringify(Document, undefined, 2));
      if (Document.eventName === "INSERT") {
        outcomes.push(await deleteDbItem(Document.dynamodb.Keys));
      }
    })
  );
  outcomes.map((res) => console.log(res));
};
Enter fullscreen mode

Exit fullscreen mode



Helper operate dynamodb deleteItem 💐

Easy helper operate to carry out deleteItem from a dynamodb desk.

const deleteDbItem: any = async (keys: any) => {
  console.log("Deleting: ", { keys });
  const deleteData: DeleteItemInput =  "",
    Key: keys,
    ReturnConsumedCapacity: "TOTAL",
  ;
  console.log("deleteItem: ", JSON.stringify(deleteData, undefined, 2));
  return await dynamo.deleteItem(deleteData).promise();
};

Enter fullscreen mode

Exit fullscreen mode



Minor modifications to the dynamodb desk definition 🍊

I’ve highlighted the mandatory modifications, we have to carry out dynamodb stream technology for our desk.

Most significantly, I’ve requested each the brand new and previous photographs, which can have all the mandatory knowledge, nevertheless, we aren’t going to make use of the entire knowledge. That is just for demonstration functions.

const messages = new dynamodb.Desk(this, "MessagesTable", {
      tableName: course of.env.messagesTable,
      sortKey: { title: "createdAt", sort: dynamodb.AttributeType.NUMBER },
      partitionKey: { title: "messageId", sort: dynamodb.AttributeType.STRING },
      encryption: dynamodb.TableEncryption.AWS_MANAGED,
      readCapacity: 5,
      writeCapacity: 5,
      //New merchandise added beneath
      stream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGES
});
Enter fullscreen mode

Exit fullscreen mode

db stream



Defining the brand new lambda 🍇

Right here we are going to use the code bloc used above to provision our lambda as proven beneath inside our grasp stack.

const messageStreamFunc = new lambda.Operate(this, "messageStreamFunc", {
      runtime: lambda.Runtime.NODEJS_14_X,
      code: lambda.Code.fromAsset("lambda"),
      handler: "message-stream.created",
      logRetention: logs.RetentionDays.ONE_MONTH,
      tracing: Tracing.ACTIVE,
      layers: [nodejsUtils,nodejsXray],
      setting: ,
});
messageStreamFunc.applyRemovalPolicy(RemovalPolicy.DESTROY);
Enter fullscreen mode

Exit fullscreen mode

Function

trigger



Grant permission for the handler to write down to the desk 💨

Additionally, our handler will need to have ample entry to delete from the opposite dynamodb desk stgMessages.

stgMessages.grantWriteData(messageStreamFunc);

Enter fullscreen mode

Exit fullscreen mode

db permission



Including Occasion supply to the lambda operate 🌽

It’s time to not join the handler operate to the dynamodb occasion supply as follows.

messageStreamFunc.addEventSource(
    new DynamoEventSource(messages, {
    startingPosition: lambda.StartingPosition.LATEST,
    })
)
Enter fullscreen mode

Exit fullscreen mode

or you may enhance the batch-size and window in seconds for lengthy polling as follows.

messageStreamFunc.addEventSource(new DynamoEventSource(messages, {
      startingPosition: lambda.StartingPosition.LATEST,
      batchSize:100,
      maxBatchingWindow: Length.seconds(60)
}))
Enter fullscreen mode

Exit fullscreen mode



Pattern dynamodb stream object 🥣

I’ve shared the dynamodb stream object used as payload to invoke our handler lambda beneath.


{
  "Data": [
    {
      "eventID": "5c9aa5395f970324e088a32578ee0a66",
      "eventName": "INSERT",
      "eventVersion": "1.1",
      "eventSource": "aws:dynamodb",
      "awsRegion": "ap-south-1",
      "dynamodb": {
        "ApproximateCreationDateTime": 1652355068,
        "Keys": {
          "createdAt": {
            "N": "1652355039000"
          },
          "messageId": {
            "S": "47d97141-01e7-42a1-b6b3-0c59a6a3827e"
          }
        },
        "NewImage": {
          "createdAt": {
            "N": "1652355039000"
          },
          "messageId": {
            "S": "47d97141-01e7-42a1-b6b3-0c59a6a3827e"
          },
          "event": {
            "S": "{n    "message": {"new": "A secret message"}n}"
          }
        },
        "SequenceNumber": "20927100000000035334874026",
        "SizeBytes": 173,
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "eventSourceARN": "arn:aws:dynamodb:ap-south-1:8888888:table/MessagesTable/stream/2022-05-*****"
    }
  ]
}
Enter fullscreen mode

Exit fullscreen mode



Console log throughout execution 🍿

Lastly post-execution, we may discover the above JSON payload we’ve obtained within the occasion object and which is then used to delete from our staging desk you could discover the outcomes beneath in cloud watch logs.

deleting

Within the subsequent article, we are going to exhibit how we are going to use an identical method to delete Object from S3, which we’ve beforehand created.

We will probably be including extra connections to our stack and making it extra usable within the upcoming articles by creating new constructs, so do contemplate following and subscribing to my e-newsletter.

⏭ We’ve our subsequent article in serverless, do try

🎉 Thanks for supporting! 🙏

Could be nice for those who prefer to ☕ Buy Me a Coffee, to assist increase my efforts.

Buy Me a Coffee at ko-fi.com

🔁 Authentic publish at 🔗 Dev Post

🔁 Reposted at 🔗 dev to @aravindvcyber

Add a Comment

Your email address will not be published. Required fields are marked *