How to Create a Kinesis Data Stream

scenario: Monitoring a Social Media Platform in Real-Time

Jeeva-AWSLabsJourney
7 min readSep 17, 2024

Let’s build a hands-on project where we simulate a social media platform monitoring system that ingests real-time user activity data, anal yzes it, and stores the processed results for business intelligence.

In the above architecture, EC2 has been incorporated, but we are using a local machine to generate the data randomly.

Step 1: Setting Up the Kinesis Data Stream

  1. Navigate to the Kinesis Console:
  • Sign in to the AWS Management Console and search for “Kinesis.”
  • Click Create Data Stream.

2. Define the Stream:

. Stream name: SocialMediaActivityStream

  • Shards: Start with one shard for simplicity (this determines the throughput of the stream).
  • Click Create Stream.

3. Stream Details:

  • Once the stream is active, you will see details such as the stream name, ARN, and number of shards.
  • The stream will be able to handle ingestion from multiple data sources (producers).

Step 2: Ingest Data into the Stream (Producer)

To simulate real-time social media activity, we can write a Python script using the boto3 SDK to send user activity data into the Kinesis stream.

Python Code:

import boto3
import json
import time
import random

# Create Kinesis client
kinesis = boto3.client('kinesis', region_name='us-east-1')

stream_name = 'SocialMediaActivityStream'

# Sample data of user activity
activities = ["login", "post", "comment", "like", "logout"]
user_ids = range(1, 1001) # Simulate 1000 users

while True:
# Simulate a random user activity
activity = {
'user_id': random.choice(user_ids),
'action': random.choice(activities),
'timestamp': int(time.time())
}

# Put record into Kinesis stream
kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(activity),
PartitionKey=str(activity['user_id']) # Use user_id as partition key
)
print(f"Sent: {activity}")
time.sleep(0.5) # Wait half a second before sending the next record

This script sends simulated social media user activities to the Kinesis Data Stream. Each user performs a random action, and this data is ingested every half a second.

Consider that this is related to your local machine. I have used VS Code for reference, but you can use any text editor you prefer.

refer the Image

Next, we need to process this data in real time. AWS Lambda can serve as a serverless consumer for processing real-time data.

3. Create or Update an IAM Role for Lambda

Your Lambda function needs an IAM role with the necessary permissions to read from the Kinesis Data Stream.

Step-by-Step:

  1. Navigate to IAM Console:
  • Sign in to the AWS Management Console.
  • Go to the IAM console.

2. Create a New Role (if not already created):

  • Click on Roles in the sidebar.
  • Click Create role.
  • Choose Lambda as the trusted entity.
  • Click Next: Permissions.

3.Attach Policies:

  • Attach the AmazonKinesisReadOnlyAccess managed policy to grant read access to Kinesis streams.
  • You may also want to attach AWSLambdaBasicExecutionRole for basic Lambda execution permissions.
  • For more granular control, you can create a custom policy if needed.

Example Custom Policy:

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:GetRecords",
"kinesis:GetShardIterator",
"kinesis:DescribeStream",
"kinesis:ListStreams"
],
"Resource": "arn:aws:kinesis:region:account-id:stream/stream-name"
}
]
}
  • Replace region, account-id, and stream-name with your actual AWS region, account ID, and Kinesis stream name.
  • Exmple :”Resource”: “arn:aws:kinesis:us-east-1:008971888:stream/SocialMediaActivityStream”

Review and Create Role:

  • Name your role (e.g., LambdaKinesisRole).
  • Review the role and click Create role.

Step 4: Consuming Data from the Stream (Consumer)

  1. Create a Lambda Function:
  • Navigate to the AWS Lambda Console.
  • Click Create Function and select the Author from Scratch option.
  • Name it ProcessSocialMediaActivity, and choose Python as the runtime.

2.Add Kinesis as a Trigger:

  • In the Lambda function configuration, add a trigger by choosing Kinesis.
  • Select the SocialMediaActivityStream you created earlier.
  • This setup ensures that the Lambda function processes every record ingested into the stream.

3. After creating the lambda function, attach the role you created earlier to access the Kinesis and Firehose.

4. Lambda Function Code:

  • Here’s a basic function that prints the activity data for processing:
import base64
import json

def lambda_handler(event, context):
# Ensure the 'Records' key exists in the event
if 'Records' in event:
for record in event['Records']:
# Kinesis data is base64-encoded, so decode it
payload = base64.b64decode(record['kinesis']['data']).decode('utf-8')

# Parse the decoded payload as JSON
data = json.loads(payload)

# Process the data (e.g., log it to CloudWatch)
print(f"Processed: {data}")

return 'Processed all records'
else:
print("No records found in the event")
return 'No records to process'

Test the Stream: Start the Python producer script and monitor the Lambda function’s logs to see real-time processing of the social media activity.

Step 5: Store Processed Data in S3 using Kinesis Firehose

Kinesis Firehose simplifies the delivery of real-time streaming data to storage services like Amazon S3.

  1. Create a Firehose Delivery Stream:
  • Navigate to the Kinesis Firehose Console.
  • Click Create Delivery Stream.
  • Choose the source as Kinesis Data Stream and select SocialMediaActivityStream.
  • For the destination, select Amazon S3 and specify a bucket (e.g., social-media-logs).
  • Enable Data Transformation if you want to format the data before storing it.

2. Start Streaming:

  • Now, Kinesis Firehose will automatically deliver data from the data stream to the S3 bucket, storing real-time logs of user activities.

step 6. Validate Kinesis Data Stream

Check Stream Status

  1. AWS Console: Go to the Kinesis Data Streams console.
  • Check the stream’s status to ensure it is Active.
  • Review the Shards section to see if they are in good health and no errors are reported.
  1. Metrics:
  • Go to the Monitoring tab in the Kinesis Data Streams console.
  • Check metrics such as Incoming Records, Incoming Bytes, and Read Provisioned Throughput Exceeded to ensure data is being ingested and there are no issues with throughput.

After the records were loaded into the data stream, we can see the retrieved records within a timeframe of 30 minutes to one hour.

Validate Data Ingestion

  1. Use AWS CLI to List Records:
  • Use the AWS CLI to get records from the stream. This will help confirm if the data is being written to the stream.
aws kinesis get-records --stream-name SocialMediaActivityStream --shard-id <shard-id> --shard-iterator-type TRIM_HORIZON
  • Replace <shard-id> with your actual shard ID.

Check the Python Producer Script:

  • Ensure that the Python script is running without errors.
  • Monitor the output of the script to verify it’s sending data to the stream.

Use AWS CloudWatch Logs:

  • Check CloudWatch Logs for your Lambda function or any custom logging you have set up in your producer script.
  • Ensure logs indicate that records are being sent and received by the stream.

Step 7 . Validate Kinesis Firehose

Check Delivery Stream Status

  1. AWS Console: Go to the Kinesis Firehose console.
  • Verify that the delivery stream’s status is Active.
  • Check the Destination configuration to ensure it is set up correctly (e.g., Amazon S3 bucket).
  1. Metrics:
  • Review metrics under the Monitoring tab for your Firehose delivery stream.
  • Check metrics such as Incoming Records and Delivery Errors to ensure data is being delivered without issues.

Validate Data Delivery

  1. Check S3 Bucket:
  • Go to the Amazon S3 console.
  • Verify that data is being written to the specified bucket.
  • Check the files and their contents to ensure they match the expected format and data.

2.CloudWatch Logs:

  • Check CloudWatch Logs for Kinesis Firehose to see if there are any errors or delivery issues.
  • Look for logs that indicate successful data delivery.

End-to-End Verification

1.Verify Data Flow:

Ensure that data flows from the producer (Python script) to the Kinesis Data Stream, is processed by the Lambda function (if used), and finally delivered to the destination (S3 bucket) via Kinesis Firehose.

2. Test End-to-End:

  • Run a test to simulate data production and verify that it appears in your destination (e.g., S3).
  • Use the producer script to send test records and confirm they are processed and delivered as expected.

3.Review Error Handling:

  • Check for any error handling mechanisms in place. Ensure that any errors in data ingestion or processing are properly logged and monitored.

Example Validation Steps

  1. Producer Validation:
  • Check the output of your Python script to confirm that records are being sent to the Kinesis Data Stream.
  • Verify the script logs to ensure no errors are reported.

2. Stream Validation:

  • Use the AWS CLI or AWS SDK to get records from the stream and verify their content.

3. Firehose Validation:

  • Confirm that records are being written to the S3 bucket or other destinations configured in Firehose.
  • Inspect the data files in S3 to ensure the records are correctly formatted and complete.

--

--

Jeeva-AWSLabsJourney
Jeeva-AWSLabsJourney

Written by Jeeva-AWSLabsJourney

Exploring AWS, cloud, Linux & DevOps. Your guide to navigating the digital realm. Join me on the journey of discovery

No responses yet