一覧に戻る
Analytics

リアルタイムデータストリーミングアーキテクチャ

Amazon Kinesis Data Streams、Kinesis Data Firehose、S3、Lambdaを組み合わせ、大量のイベントデータをリアルタイムで収集・加工・保存するパイプラインです。

構成要素 (AWS Services):

Kinesis Data StreamsKinesis Data FirehoseS3Lambda

アーキテクチャ図 (Architecture Diagram)

クリックで拡大表示
リアルタイムデータストリーミングアーキテクチャ アーキテクチャ図

AWS CLI でのデプロイ例

AWS CLIを使用してCloudFormationスタックをデプロイする場合は、以下のコマンドを実行します。

aws cloudformation create-stack \
  --stack-name streaming-architecture-stack \
  --template-body file://streaming-architecture.yaml \
  --capabilities CAPABILITY_IAM
streaming-architecture.yaml
DL
AWSTemplateFormatVersion: '2010-09-09'
Description: 'Streaming Architecture with Kinesis Data Streams, Firehose, Lambda, and S3'

Parameters:
  Environment:
    Type: String
    Default: dev
    AllowedValues:
      - dev
      - prod
    Description: Environment name

Resources:
  # KMS Key for encryption
  StreamEncryptionKey:
    Type: AWS::KMS::Key
    Properties:
      Description: KMS key for Kinesis stream encryption
      KeyPolicy:
        Version: '2012-10-17'
        Statement:
          - Sid: Enable IAM User Permissions
            Effect: Allow
            Principal:
              AWS: !Sub 'arn:aws:iam::${AWS::AccountId}:root'
            Action: 'kms:*'
            Resource: '*'
          - Sid: Allow Kinesis to use the key
            Effect: Allow
            Principal:
              Service: kinesis.amazonaws.com
            Action:
              - 'kms:Decrypt'
              - 'kms:GenerateDataKey'
            Resource: '*'

  StreamEncryptionKeyAlias:
    Type: AWS::KMS::Alias
    Properties:
      AliasName: !Sub 'alias/kinesis-stream-${Environment}'
      TargetKeyId: !Ref StreamEncryptionKey

  # Kinesis Data Stream
  DataStream:
    Type: AWS::Kinesis::Stream
    Properties:
      RetentionPeriodHours: 24
      StreamModeDetails:
        StreamMode: ON_DEMAND
      StreamEncryption:
        EncryptionType: KMS
        KeyId: !Ref StreamEncryptionKey
      Tags:
        - Key: Environment
          Value: !Ref Environment

  # S3 Bucket for data storage
  DataLakeBucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketEncryption:
        ServerSideEncryptionConfiguration:
          - ServerSideEncryptionByDefault:
              SSEAlgorithm: AES256
            BucketKeyEnabled: true
      PublicAccessBlockConfiguration:
        BlockPublicAcls: true
        BlockPublicPolicy: true
        IgnorePublicAcls: true
        RestrictPublicBuckets: true
      VersioningConfiguration:
        Status: Enabled
      LifecycleConfiguration:
        Rules:
          - Id: TransitionToIA
            Status: Enabled
            Transitions:
              - TransitionInDays: 30
                StorageClass: STANDARD_IA
              - TransitionInDays: 90
                StorageClass: GLACIER
      Tags:
        - Key: Environment
          Value: !Ref Environment

  # CloudWatch Log Group for Firehose
  FirehoseLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      RetentionInDays: 7

  FirehoseLogStream:
    Type: AWS::Logs::LogStream
    Properties:
      LogGroupName: !Ref FirehoseLogGroup

  # IAM Role for Firehose
  FirehoseRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: firehose.amazonaws.com
            Action: 'sts:AssumeRole'
      Policies:
        - PolicyName: FirehosePolicy
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - 's3:AbortMultipartUpload'
                  - 's3:GetBucketLocation'
                  - 's3:GetObject'
                  - 's3:ListBucket'
                  - 's3:ListBucketMultipartUploads'
                  - 's3:PutObject'
                Resource:
                  - !GetAtt DataLakeBucket.Arn
                  - !Sub '${DataLakeBucket.Arn}/*'
              - Effect: Allow
                Action:
                  - 'kinesis:DescribeStream'
                  - 'kinesis:GetShardIterator'
                  - 'kinesis:GetRecords'
                  - 'kinesis:ListShards'
                Resource: !GetAtt DataStream.Arn
              - Effect: Allow
                Action:
                  - 'kms:Decrypt'
                  - 'kms:GenerateDataKey'
                Resource: !GetAtt StreamEncryptionKey.Arn
                Condition:
                  StringEquals:
                    'kms:ViaService': !Sub 'kinesis.${AWS::Region}.amazonaws.com'
                  StringLike:
                    'kms:EncryptionContext:aws:kinesis:arn': !GetAtt DataStream.Arn
              - Effect: Allow
                Action:
                  - 'logs:PutLogEvents'
                Resource: !GetAtt FirehoseLogGroup.Arn
              - Effect: Allow
                Action:
                  - 'lambda:InvokeFunction'
                  - 'lambda:GetFunctionConfiguration'
                Resource: !GetAtt ProcessorFunction.Arn

  # Lambda Execution Role
  LambdaExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: 'sts:AssumeRole'
      ManagedPolicyArns:
        - 'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole'
      Policies:
        - PolicyName: KinesisReadPolicy
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - 'kinesis:GetRecords'
                  - 'kinesis:GetShardIterator'
                  - 'kinesis:DescribeStream'
                  - 'kinesis:ListShards'
                  - 'kinesis:ListStreams'
                Resource: !GetAtt DataStream.Arn
              - Effect: Allow
                Action:
                  - 'kms:Decrypt'
                Resource: !GetAtt StreamEncryptionKey.Arn

  # Lambda Function for stream processing
  ProcessorFunction:
    Type: AWS::Lambda::Function
    Properties:
      Runtime: python3.12
      Handler: index.lambda_handler
      Role: !GetAtt LambdaExecutionRole.Arn
      Timeout: 60
      MemorySize: 256
      Environment:
        Variables:
          ENVIRONMENT: !Ref Environment
      Code:
        ZipFile: |
          import json
          import base64
          
          def lambda_handler(event, context):
              output = []
              
              for record in event['records']:
                  payload = base64.b64decode(record['data'])
                  
                  # Transform data
                  data = json.loads(payload)
                  data['processed'] = True
                  
                  output_record = {
                      'recordId': record['recordId'],
                      'result': 'Ok',
                      'data': base64.b64encode(json.dumps(data).encode()).decode()
                  }
                  output.append(output_record)
              
              return {'records': output}
      Tags:
        - Key: Environment
          Value: !Ref Environment

  # CloudWatch Log Group for Lambda
  ProcessorFunctionLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub '/aws/lambda/${ProcessorFunction}'
      RetentionInDays: 7

  # Kinesis Firehose Delivery Stream
  DeliveryStream:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      DeliveryStreamType: KinesisStreamAsSource
      KinesisStreamSourceConfiguration:
        KinesisStreamARN: !GetAtt DataStream.Arn
        RoleARN: !GetAtt FirehoseRole.Arn
      ExtendedS3DestinationConfiguration:
        BucketARN: !GetAtt DataLakeBucket.Arn
        RoleARN: !GetAtt FirehoseRole.Arn
        Prefix: 'data/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/'
        ErrorOutputPrefix: 'errors/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/!{firehose:error-output-type}/'
        BufferingHints:
          SizeInMBs: 128
          IntervalInSeconds: 300
        CompressionFormat: GZIP
        ProcessingConfiguration:
          Enabled: true
          Processors:
            - Type: Lambda
              Parameters:
                - ParameterName: LambdaArn
                  ParameterValue: !GetAtt ProcessorFunction.Arn
        CloudWatchLoggingOptions:
          Enabled: true
          LogGroupName: !Ref FirehoseLogGroup
          LogStreamName: !Ref FirehoseLogStream
      Tags:
        - Key: Environment
          Value: !Ref Environment

  # CloudWatch Alarms
  StreamIteratorAgeAlarm:
    Type: AWS::CloudWatch::Alarm
    Properties:
      AlarmDescription: Alert when stream iterator age is too high
      MetricName: GetRecords.IteratorAgeMilliseconds
      Namespace: AWS/Kinesis
      Statistic: Maximum
      Period: 300
      EvaluationPeriods: 1
      Threshold: 60000
      ComparisonOperator: GreaterThanThreshold
      Dimensions:
        - Name: StreamName
          Value: !Ref DataStream

  LambdaErrorAlarm:
    Type: AWS::CloudWatch::Alarm
    Properties:
      AlarmDescription: Alert when Lambda function errors occur
      MetricName: Errors
      Namespace: AWS/Lambda
      Statistic: Sum
      Period: 300
      EvaluationPeriods: 1
      Threshold: 5
      ComparisonOperator: GreaterThanThreshold
      Dimensions:
        - Name: FunctionName
          Value: !Ref ProcessorFunction

Outputs:
  DataStreamName:
    Description: Name of the Kinesis Data Stream
    Value: !Ref DataStream
    Export:
      Name: !Sub '${AWS::StackName}-DataStreamName'

  DataStreamArn:
    Description: ARN of the Kinesis Data Stream
    Value: !GetAtt DataStream.Arn
    Export:
      Name: !Sub '${AWS::StackName}-DataStreamArn'

  DataLakeBucketName:
    Description: Name of the S3 Data Lake bucket
    Value: !Ref DataLakeBucket
    Export:
      Name: !Sub '${AWS::StackName}-DataLakeBucketName'

  DeliveryStreamName:
    Description: Name of the Kinesis Firehose Delivery Stream
    Value: !Ref DeliveryStream
    Export:
      Name: !Sub '${AWS::StackName}-DeliveryStreamName'

  ProcessorFunctionArn:
    Description: ARN of the Lambda processor function
    Value: !GetAtt ProcessorFunction.Arn
    Export:
      Name: !Sub '${AWS::StackName}-ProcessorFunctionArn'