Amazon Kinesis Data Streams、Kinesis Data Firehose、S3、Lambdaを組み合わせ、大量のイベントデータをリアルタイムで収集・加工・保存するパイプラインです。
AWS CLIを使用してCloudFormationスタックをデプロイする場合は、以下のコマンドを実行します。
aws cloudformation create-stack \ --stack-name streaming-architecture-stack \ --template-body file://streaming-architecture.yaml \ --capabilities CAPABILITY_IAM
AWSTemplateFormatVersion: '2010-09-09'
Description: 'Streaming Architecture with Kinesis Data Streams, Firehose, Lambda, and S3'
Parameters:
Environment:
Type: String
Default: dev
AllowedValues:
- dev
- prod
Description: Environment name
Resources:
# KMS Key for encryption
StreamEncryptionKey:
Type: AWS::KMS::Key
Properties:
Description: KMS key for Kinesis stream encryption
KeyPolicy:
Version: '2012-10-17'
Statement:
- Sid: Enable IAM User Permissions
Effect: Allow
Principal:
AWS: !Sub 'arn:aws:iam::${AWS::AccountId}:root'
Action: 'kms:*'
Resource: '*'
- Sid: Allow Kinesis to use the key
Effect: Allow
Principal:
Service: kinesis.amazonaws.com
Action:
- 'kms:Decrypt'
- 'kms:GenerateDataKey'
Resource: '*'
StreamEncryptionKeyAlias:
Type: AWS::KMS::Alias
Properties:
AliasName: !Sub 'alias/kinesis-stream-${Environment}'
TargetKeyId: !Ref StreamEncryptionKey
# Kinesis Data Stream
DataStream:
Type: AWS::Kinesis::Stream
Properties:
RetentionPeriodHours: 24
StreamModeDetails:
StreamMode: ON_DEMAND
StreamEncryption:
EncryptionType: KMS
KeyId: !Ref StreamEncryptionKey
Tags:
- Key: Environment
Value: !Ref Environment
# S3 Bucket for data storage
DataLakeBucket:
Type: AWS::S3::Bucket
Properties:
BucketEncryption:
ServerSideEncryptionConfiguration:
- ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
BucketKeyEnabled: true
PublicAccessBlockConfiguration:
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true
VersioningConfiguration:
Status: Enabled
LifecycleConfiguration:
Rules:
- Id: TransitionToIA
Status: Enabled
Transitions:
- TransitionInDays: 30
StorageClass: STANDARD_IA
- TransitionInDays: 90
StorageClass: GLACIER
Tags:
- Key: Environment
Value: !Ref Environment
# CloudWatch Log Group for Firehose
FirehoseLogGroup:
Type: AWS::Logs::LogGroup
Properties:
RetentionInDays: 7
FirehoseLogStream:
Type: AWS::Logs::LogStream
Properties:
LogGroupName: !Ref FirehoseLogGroup
# IAM Role for Firehose
FirehoseRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: firehose.amazonaws.com
Action: 'sts:AssumeRole'
Policies:
- PolicyName: FirehosePolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- 's3:AbortMultipartUpload'
- 's3:GetBucketLocation'
- 's3:GetObject'
- 's3:ListBucket'
- 's3:ListBucketMultipartUploads'
- 's3:PutObject'
Resource:
- !GetAtt DataLakeBucket.Arn
- !Sub '${DataLakeBucket.Arn}/*'
- Effect: Allow
Action:
- 'kinesis:DescribeStream'
- 'kinesis:GetShardIterator'
- 'kinesis:GetRecords'
- 'kinesis:ListShards'
Resource: !GetAtt DataStream.Arn
- Effect: Allow
Action:
- 'kms:Decrypt'
- 'kms:GenerateDataKey'
Resource: !GetAtt StreamEncryptionKey.Arn
Condition:
StringEquals:
'kms:ViaService': !Sub 'kinesis.${AWS::Region}.amazonaws.com'
StringLike:
'kms:EncryptionContext:aws:kinesis:arn': !GetAtt DataStream.Arn
- Effect: Allow
Action:
- 'logs:PutLogEvents'
Resource: !GetAtt FirehoseLogGroup.Arn
- Effect: Allow
Action:
- 'lambda:InvokeFunction'
- 'lambda:GetFunctionConfiguration'
Resource: !GetAtt ProcessorFunction.Arn
# Lambda Execution Role
LambdaExecutionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: 'sts:AssumeRole'
ManagedPolicyArns:
- 'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole'
Policies:
- PolicyName: KinesisReadPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- 'kinesis:GetRecords'
- 'kinesis:GetShardIterator'
- 'kinesis:DescribeStream'
- 'kinesis:ListShards'
- 'kinesis:ListStreams'
Resource: !GetAtt DataStream.Arn
- Effect: Allow
Action:
- 'kms:Decrypt'
Resource: !GetAtt StreamEncryptionKey.Arn
# Lambda Function for stream processing
ProcessorFunction:
Type: AWS::Lambda::Function
Properties:
Runtime: python3.12
Handler: index.lambda_handler
Role: !GetAtt LambdaExecutionRole.Arn
Timeout: 60
MemorySize: 256
Environment:
Variables:
ENVIRONMENT: !Ref Environment
Code:
ZipFile: |
import json
import base64
def lambda_handler(event, context):
output = []
for record in event['records']:
payload = base64.b64decode(record['data'])
# Transform data
data = json.loads(payload)
data['processed'] = True
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(json.dumps(data).encode()).decode()
}
output.append(output_record)
return {'records': output}
Tags:
- Key: Environment
Value: !Ref Environment
# CloudWatch Log Group for Lambda
ProcessorFunctionLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub '/aws/lambda/${ProcessorFunction}'
RetentionInDays: 7
# Kinesis Firehose Delivery Stream
DeliveryStream:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
DeliveryStreamType: KinesisStreamAsSource
KinesisStreamSourceConfiguration:
KinesisStreamARN: !GetAtt DataStream.Arn
RoleARN: !GetAtt FirehoseRole.Arn
ExtendedS3DestinationConfiguration:
BucketARN: !GetAtt DataLakeBucket.Arn
RoleARN: !GetAtt FirehoseRole.Arn
Prefix: 'data/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/'
ErrorOutputPrefix: 'errors/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/!{firehose:error-output-type}/'
BufferingHints:
SizeInMBs: 128
IntervalInSeconds: 300
CompressionFormat: GZIP
ProcessingConfiguration:
Enabled: true
Processors:
- Type: Lambda
Parameters:
- ParameterName: LambdaArn
ParameterValue: !GetAtt ProcessorFunction.Arn
CloudWatchLoggingOptions:
Enabled: true
LogGroupName: !Ref FirehoseLogGroup
LogStreamName: !Ref FirehoseLogStream
Tags:
- Key: Environment
Value: !Ref Environment
# CloudWatch Alarms
StreamIteratorAgeAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmDescription: Alert when stream iterator age is too high
MetricName: GetRecords.IteratorAgeMilliseconds
Namespace: AWS/Kinesis
Statistic: Maximum
Period: 300
EvaluationPeriods: 1
Threshold: 60000
ComparisonOperator: GreaterThanThreshold
Dimensions:
- Name: StreamName
Value: !Ref DataStream
LambdaErrorAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmDescription: Alert when Lambda function errors occur
MetricName: Errors
Namespace: AWS/Lambda
Statistic: Sum
Period: 300
EvaluationPeriods: 1
Threshold: 5
ComparisonOperator: GreaterThanThreshold
Dimensions:
- Name: FunctionName
Value: !Ref ProcessorFunction
Outputs:
DataStreamName:
Description: Name of the Kinesis Data Stream
Value: !Ref DataStream
Export:
Name: !Sub '${AWS::StackName}-DataStreamName'
DataStreamArn:
Description: ARN of the Kinesis Data Stream
Value: !GetAtt DataStream.Arn
Export:
Name: !Sub '${AWS::StackName}-DataStreamArn'
DataLakeBucketName:
Description: Name of the S3 Data Lake bucket
Value: !Ref DataLakeBucket
Export:
Name: !Sub '${AWS::StackName}-DataLakeBucketName'
DeliveryStreamName:
Description: Name of the Kinesis Firehose Delivery Stream
Value: !Ref DeliveryStream
Export:
Name: !Sub '${AWS::StackName}-DeliveryStreamName'
ProcessorFunctionArn:
Description: ARN of the Lambda processor function
Value: !GetAtt ProcessorFunction.Arn
Export:
Name: !Sub '${AWS::StackName}-ProcessorFunctionArn'