Cloud & Networking

Integrate applications with AWS services: Amazon DynamoDB & Amazon Kinesis [Tutorial]

17 min read

AWS provides hybrid capabilities for networking, storage, database, application development, and management tools for secure and seamless integration. In today’s tutorial, we will integrate applications with the two popular AWS services namely Amazon DynamoDB and Amazon Kinesis.

Amazon DynamoDB is a fast, fully managed, highly available, and scalable NoSQL database service from AWS. DynamoDB uses key-value and document store data models. Amazon Kinesis is used to collect real-time data to process and analyze it.

This article is an excerpt from a book ‘Expert AWS Development’ written by Atul V. Mistry. By the end of this tutorial, you will know how to integrate applications with the relative AWS services and best practices.

Amazon DynamoDB

The Amazon DynamoDB service falls under the Database category. It is a fast NoSQL database service from Amazon. It is highly durable as it will replicate data across three distinct geographical facilities in AWS regions. It’s great for web, mobile, gaming, and IoT applications.

DynamoDB will take care of software patching, hardware provisioning, cluster scaling, setup, configuration, and replication. You can create a database table and store and retrieve any amount and variety of data. It will delete expired data automatically from the table. It will help to reduce the usage storage and cost of storing data which is no longer needed.

Amazon DynamoDB Accelerator (DAX) is a highly available, fully managed, and in-memory cache. For millions of requests per second, it reduces the response time from milliseconds to microseconds.

DynamoDB is allowed to store up to 400 KB of large text and binary objects. It uses SSD storage to provide high I/O performance.

Integrating DynamoDB into an application

The following diagram provides a high-level overview of integration between your application and DynamoDB:

Please perform the following steps to understand this integration:

  1. Your application in your programming language which is using an AWS SDK.
  2. DynamoDB can work with one or more programmatic interfaces provided by AWS SDK.
  3. From your programming language, AWS SDK will construct an HTTP or HTTPS request with a DynamoDB low-level API.
  4. The AWS SDK will send a request to the DynamoDB endpoint.
  5. DynamoDB will process the request and send the response back to the AWS SDK. If the request is executed successfully, it will return HTTP 200 (OK) response code. If the request is not successful, it will return HTTP error code and error message.
  6. The AWS SDK will process the response and send the result back to the application.

The AWS SDK provides three kinds of interfaces to connect with DynamoDB. These interfaces are as follows:

  • Low-level interface
  • Document interface
  • Object persistence (high-level) interface

Let’s explore all three interfaces. The following diagram is the Movies table, which is created in DynamoDB and used in all our examples:

Low-level interface

AWS SDK programming languages provide low-level interfaces for DynamoDB. These SDKs provide methods that are similar to low-level DynamoDB API requests.

The following example uses the Java language for the low-level interface of AWS SDKs. Here you can use Eclipse IDE for the example.

In this Java program, we request getItem from the Movies table, pass the movie name as an attribute, and print the movie release year:

  1. Let’s create the MovieLowLevelExample file. We have to import a few classes to work with the DynamoDB.

AmazonDynamoDBClient is used to create the DynamoDB client instance. AttributeValue is used to construct the data. In AttributeValue, name is datatype and value is data:

    • GetItemRequest is the input of GetItem
    • GetItemResult is the output of GetItem
  1. The following code will create the dynamoDB client instance. You have to assign the credentials and region to this instance:
Static AmazonDynamoDBClient dynamoDB;
  1. In the code, we have created HashMap, passing the value parameter as AttributeValue().withS(). It contains actual data and withS is the attribute of String:
String tableName = "Movies";

HashMap<String, AttributeValue> key = new HashMap<String, AttributeValue>();
key.put(“name”, new AttributeValue().withS(“Airplane”));

  1. GetItemRequest will create a request object, passing the table name and key as a parameter. It is the input of GetItem:
GetItemRequest request = new GetItemRequest()
       .withTableName(tableName).withKey(key);
  1. GetItemResult will create the result object. It is the output of getItem where we are passing request as an input:
GetItemResult result = dynamoDB.getItem(request);
  1. It will check the getItem null condition. If getItem is not null then create the object for AttributeValue. It will get the year from the result object and create an instance for yearObj. It will print the year value from yearObj:
if (result.getItem() != null) {
   AttributeValue yearObj = result.getItem().get("year");
   System.out.println("The movie Released in " + yearObj.getN());
} else {
System.out.println("No matching movie was found");
}

Document interface

This interface enables you to do Create, Read, Update, and Delete (CRUD) operations on tables and indexes. The datatype will be implied with data from this interface and you do not need to specify it.

The AWS SDKs for Java, Node.js, JavaScript, and .NET provides support for document interfaces.

The following example uses the Java language for the document interface in AWS SDKs. Here you can use the Eclipse IDE for the example.

In this Java program, we will create a table object from the Movies table, pass the movie name as attribute, and print the movie release year.

We have to import a few classes. DynamoDB is the entry point to use this library in your class. GetItemOutcomeis is used to get items from the DynamoDB table. Table is used to get table details:

static AmazonDynamoDB client;

The preceding code will create the client instance. You have to assign the credentials and region to this instance:

String tableName = "Movies";
DynamoDB docClient = new DynamoDB(client);
Table movieTable = docClient.getTable(tableName);

DynamoDB will create the instance of docClient by passing the client instance. It is the entry point for the document interface library. This docClient instance will get the table details by passing the tableName and assign it to the movieTable instance:

GetItemOutcome outcome = movieTable.getItemOutcome("name","Airplane");
int yearObj = outcome.getItem().getInt("year");
System.out.println("The movie was released in " + yearObj);

GetItemOutcome will create an outcome instance from movieTable by passing the name as key and movie name as parameter. It will retrieve the item year from the outcome object and store it into the yearObj object and print it:

Object persistence (high-level) interface

In the object persistence interface, you will not perform any CRUD operations directly on the data; instead, you have to create objects which represent DynamoDB tables and indexes and perform operations on those objects. It will allow you to write object-centric code and not database-centric code.

The AWS SDKs for Java and .NET provide support for the object persistence interface.

Let’s create a DynamoDBMapper object in AWS SDK for Java. It will represent data in the Movies table. This is the MovieObjectMapper.java class. Here you can use the Eclipse IDE for the example.

You need to import a few classes for annotations. DynamoDBAttribute is applied to the getter method. If it will apply to the class field then its getter and setter method must be declared in the same class. The DynamoDBHashKey annotation marks property as the hash key for the modeled class. The DynamoDBTable annotation marks DynamoDB as the table name:

@DynamoDBTable(tableName="Movies")

It specifies the table name:

@DynamoDBHashKey(attributeName="name")
public String getName() { return name;}
public void setName(String name) {this.name = name;}
@DynamoDBAttribute(attributeName = "year")
public int getYear() { return year; }
public void setYear(int year) { this.year = year; }

In the preceding code, DynamoDBHashKey has been defined as the hash key for the name attribute and its getter and setter methods. DynamoDBAttribute specifies the column name and its getter and setter methods.

Now create MovieObjectPersistenceExample.java to retrieve the movie year:

static AmazonDynamoDB client;

The preceding code will create the client instance. You have to assign the credentials and region to this instance. You need to import DynamoDBMapper, which will be used to fetch the year from the Movies table:

DynamoDBMapper mapper = new DynamoDBMapper(client);
MovieObjectMapper movieObjectMapper = new MovieObjectMapper();
movieObjectMapper.setName("Airplane");

The mapper object will be created from DynamoDBMapper by passing the client.

The movieObjectMapper object will be created from the POJO class, which we created earlier. In this object, set the movie name as the parameter:

MovieObjectMapper result = mapper.load(movieObjectMapper);
if (result != null) {
System.out.println("The song was released in "+ result.getYear());
}

Create the result object by calling DynamoDBMapper object’s load method. If the result is not null then it will print the year from the result’s getYear() method.

DynamoDB low-level API

This API is a protocol-level interface which will convert every HTTP or HTTPS request into the correct format with a valid digital signature. It uses JavaScript Object Notation (JSON) as a transfer protocol. AWS SDK will construct requests on your behalf and it will help you concentrate on the application/business logic.

The AWS SDK will send a request in JSON format to DynamoDB and DynamoDB will respond in JSON format back to the AWS SDK API. DynamoDB will not persist data in JSON format.

Troubleshooting in Amazon DynamoDB

The following are common problems and their solutions:

  • If error logging is not enabled then enable it and check error log messages.
  • Verify whether the DynamoDB table exists or not.
  • Verify the IAM role specified for DynamoDB and its access permissions.
  • AWS SDKs take care of propagating errors to your application for appropriate actions. Like Java programs, you should write a try-catch block to handle the error or exception.
  • If you are not using an AWS SDK then you need to parse the content of low-level responses from DynamoDB.
  • A few exceptions are as follows:
    • AmazonServiceException: Client request sent to DynamoDB but DynamoDB was unable to process it and returned an error response
    • AmazonClientException: Client is unable to get a response or parse the response from service
    • ResourceNotFoundException: Requested table doesn’t exist or is in CREATING state

Now let’s move on to Amazon Kinesis, which will help to collect and process real-time streaming data.

Amazon Kinesis

The Amazon Kinesis service is under the Analytics product category. This is a fully managed, real-time, highly scalable service. You can easily send data to other AWS services such as Amazon DynamoDB, AmazaonS3, and Amazon Redshift.

You can ingest real-time data such as application logs, website clickstream data, IoT data, and social stream data into Amazon Kinesis. You can process and analyze data when it comes and responds immediately instead of waiting to collect all data before the process begins.

Now, let’s explore an example of using Kinesis streams and Kinesis Firehose using AWS SDK API for Java.

Amazon Kinesis streams

In this example, we will create the stream if it does not exist and then we will put the records into the stream. Here you can use Eclipse IDE for the example.

You need to import a few classes. AmazonKinesis and AmazonKinesisClientBuilder are used to create the Kinesis clients. CreateStreamRequest will help to create the stream. DescribeStreamRequest will describe the stream request. PutRecordRequest will put the request into the stream and PutRecordResult will print the resulting record. ResourceNotFoundException will throw an exception when the stream does not exist. StreamDescription will provide the stream description:

Static AmazonKinesis kinesisClient;

kinesisClient is the instance of AmazonKinesis. You have to assign the credentials and region to this instance:

final String streamName = "MyExampleStream";
final Integer streamSize = 1;
DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest().withStreamName(streamName);

Here you are creating an instance of describeStreamRequest. For that, you will pass the streamNameas parameter to the withStreamName() method:

StreamDescription streamDescription = kinesisClient.describeStream(describeStreamRequest).getStreamDescription();

It will create an instance of streamDescription. You can get information such as the stream name, stream status, and shards from this instance:

CreateStreamRequest createStreamRequest = new CreateStreamRequest();
createStreamRequest.setStreamName(streamName);
createStreamRequest.setShardCount(streamSize);
kinesisClient.createStream(createStreamRequest);

The createStreamRequest instance will help to create a stream request. You can set the stream name, shard count, and SDK request timeout. In the createStream method, you will pass the createStreamRequest:

long createTime = System.currentTimeMillis();
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setStreamName(streamName);
putRecordRequest.setData(ByteBuffer.wrap(String.format("testData-%d", createTime).getBytes()));
putRecordRequest.setPartitionKey(String.format("partitionKey-%d", createTime));

Here we are creating a record request and putting it into the stream. We are setting the data and PartitionKey for the instance. It will create the records:

PutRecordResult putRecordResult = kinesisClient.putRecord(putRecordRequest);

It will create the record from the putRecord method and pass putRecordRequest as a parameter:

System.out.printf("Success : Partition key "%s", ShardID "%s" and SequenceNumber "%s".n",
putRecordRequest.getPartitionKey(), putRecordResult.getShardId(), putRecordResult.getSequenceNumber());

It will print the output on the console as follows:

Troubleshooting tips for Kinesis streams

The following are common problems and their solutions:

  • Unauthorized KMS master key permission error:
    • Without authorized permission on the master key, when a producer or consumer application tries to writes or reads an encrypted stream
    • Provide access permission to an application using Key policies in AWS KMS or IAM policies with AWS KMS
  • Sometimes producer becomes writing slower.
    • Service limits exceeded:

Check whether the producer is throwing throughput exceptions from the service, and validate what API operations are being throttled.

You can also check Amazon Kinesis Streams limits because of different limits based on the call. If calls are not an issue, check you have selected a partition key that allows distributing put operations evenly across all shards, and that you don’t have a particular partition key that’s bumping into the service limits when the rest are not. This requires you to measure peak throughput and the number of shards in your stream.

    • Producer optimization:

It has either a large producer or small producer. A large producer is running from an EC2 instance or on-premises while a small producer is running from the web client, mobile app, or IoT device. Customers can use different strategies for latency. Kinesis Produce Library or multiple threads are useful while writing for buffer/micro-batch records, PutRecords for multi-record operation, PutRecord for single-record operation.

  • Shard iterator expires unexpectedly:

The shard iterator expires because its GetRecord methods have not been called for more than 5 minutes, or you have performed a restart of your consumer application.

The shard iterator expires immediately before you use it. This might indicate that the DynamoDB table used by Kinesis does not have enough capacity to store the data. It might happen if you have a large number of shards. Increase the write capacity assigned to the shard table to solve this.

  • Consumer application is reading at a slower rate:

The following are common reasons for read throughput being slower than expected:

    • Total reads for multiple consumer applications exceed per-shard limits. In the Kinesis stream, increase the number of shards.
    • Maximum number of GetRecords per call may have been configured with a low limit value.
    • The logic inside the processRecords call may be taking longer for a number of possible reasons; the logic may be CPU-intensive, bottlenecked on synchronization, or I/O blocking.

We have covered Amazon Kinesis streams. Now, we will cover Kinesis Firehose.

Amazon Kinesis Firehose

Amazon Kinesis Firehose is a fully managed, highly available and durable service to load real-time streaming data easily into AWS services such as Amazon S3, Amazon Redshift, or Amazon Elasticsearch. It replicates your data synchronously at three different facilities. It will automatically scale as per throughput data. You can compress your data into different formats and also encrypt it before loading.

AWS SDK for Java, Node.js, Python, .NET, and Ruby can be used to send data to a Kinesis Firehose stream using the Kinesis Firehose API.

The Kinesis Firehose API provides two operations to send data to the Kinesis Firehose delivery stream:

  • PutRecord: In one call, it will send one record
  • PutRecordBatch: In one call, it will send multiple data records

Let’s explore an example using PutRecord. In this example, the MyFirehoseStream stream has been created. Here you can use Eclipse IDE for the example.

You need to import a few classes such as AmazonKinesisFirehoseClient, which will help to create the client for accessing Firehose. PutRecordRequest and PutRecordResult will help to put the stream record request and its result:

private static AmazonKinesisFirehoseClient client;

AmazonKinesisFirehoseClient will create the instance firehoseClient. You have to assign the credentials and region to this instance:

String data = "My Kinesis Firehose data";
String myFirehoseStream = "MyFirehoseStream";
Record record = new Record();
record.setData(ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8)));

As mentioned earlier, myFirehoseStream has already been created.

A record in the delivery stream is a unit of data. In the setData method, we are passing a data blob. It is base-64 encoded. Before sending a request to the AWS service, Java will perform base-64 encoding on this field.

A returned ByteBuffer is mutable. If you change the content of this byte buffer then it will reflect to all objects that have a reference to it. It’s always best practice to call ByteBuffer.duplicate() or ByteBuffer.asReadOnlyBuffer() before reading from the buffer or using it.

Now you have to mention the name of the delivery stream and the data records you want to create the PutRecordRequest instance:

PutRecordRequest putRecordRequest = new PutRecordRequest()
                .withDeliveryStreamName(myFirehoseStream)
                .withRecord(record);
putRecordRequest.setRecord(record);
PutRecordResult putRecordResult = client.putRecord(putRecordRequest);
System.out.println("Put Request Record ID: " + putRecordResult.getRecordId());

putRecordResult will write a single record into the delivery stream by passing the putRecordRequest and get the result and print the RecordID:

PutRecordBatchRequest putRecordBatchRequest = new PutRecordBatchRequest().withDeliveryStreamName("MyFirehoseStream")
                      .withRecords(getBatchRecords());

You have to mention the name of the delivery stream and the data records you want to create the PutRecordBatchRequest instance. The getBatchRecord method has been created to pass multiple records as mentioned in the next step:

JSONObject jsonObject = new JSONObject();
jsonObject.put("userid", "userid_1");
jsonObject.put("password", "password1");
Record record = new Record().withData(ByteBuffer.wrap(jsonObject.toString().getBytes()));
records.add(record);

In the getBatchRecord method, you will create the jsonObject and put data into this jsonObject . You will pass jsonObject to create the record. These records add to a list of records and return it:

PutRecordBatchResult putRecordBatchResult = client.putRecordBatch(putRecordBatchRequest);
for(int i=0;i<putRecordBatchResult.getRequestResponses().size();i++){
   System.out.println("Put Batch Request Record ID :"+i+": " +  putRecordBatchResult.getRequestResponses().get(i).getRecordId());
}

putRecordBatchResult will write multiple records into the delivery stream by passing the putRecordBatchRequest, get the result, and print the RecordID. You will see the output like the following screen:

Troubleshooting tips for Kinesis Firehose

Sometimes data is not delivered at specified destinations. The following are steps to solve common issues while working with Kinesis Firehose:

  • Data not delivered to Amazon S3:
    • If error logging is not enabled then enable it and check error log messages for delivery failure.
    • Verify that the S3 bucket mentioned in the Kinesis Firehose delivery stream exists.
    • Verify whether data transformation with Lambda is enabled, the Lambda function mentioned in your delivery stream exists, and Kinesis Firehose has attempted to invoke the Lambda function.
    • Verify whether the IAM role specified in the delivery stream has given proper access to the S3 bucket and Lambda function or not.
    • Verify your Kinesis Firehose metrics to check whether the data was sent to the Kinesis Firehose delivery stream successfully.
  • Data not delivered to Amazon Redshift/Elasticsearch:
    • For Amazon Redshift and Elasticsearch, verify the points mentioned in Data not delivered to Amazon S3, including the IAM role, configuration, and public access.
  • For CloudWatch and IoT, delivery stream not available as target:
    • Some AWS services can only send messages and events to a Kinesis Firehose delivery stream which is in the same region. Verify that your Kinesis Firehose delivery stream is located in the same region as your other services.

We completed implementations, examples, and best practices for Amazon DynamoDB and Amazon Kinesis AWS services using AWS SDK.

If you found this post useful, do check out the book ‘Expert AWS Development’ to learn application integration with other AWS services like Amazon Lambda, Amazon SQS, and Amazon SWF.

Read Next

A serverless online store on AWS could save you money. Build one.

Why is AWS the preferred cloud platform for developers working with big data?

Verizon chooses Amazon Web Services(AWS) as its preferred cloud provider

Natasha Mathur

Tech writer at the Packt Hub. Dreamer, book nerd, lover of scented candles, karaoke, and Gilmore Girls.

View Comments

Share
Published by
Natasha Mathur

Recent Posts

Top life hacks for prepping for your IT certification exam

I remember deciding to pursue my first IT certification, the CompTIA A+. I had signed…

3 years ago

Learn Transformers for Natural Language Processing with Denis Rothman

Key takeaways The transformer architecture has proved to be revolutionary in outperforming the classical RNN…

3 years ago

Learning Essential Linux Commands for Navigating the Shell Effectively

Once we learn how to deploy an Ubuntu server, how to manage users, and how…

3 years ago

Clean Coding in Python with Mariano Anaya

Key-takeaways:   Clean code isn’t just a nice thing to have or a luxury in software projects; it's a necessity. If we…

3 years ago

Exploring Forms in Angular – types, benefits and differences   

While developing a web application, or setting dynamic pages and meta tags we need to deal with…

3 years ago

Gain Practical Expertise with the Latest Edition of Software Architecture with C# 9 and .NET 5

Software architecture is one of the most discussed topics in the software industry today, and…

3 years ago