nariのエンジニアリング備忘録

SRE/Devops/AWS/自動化/IaC/Terraform/Go/DDD など

AWS DynamoDB exportとAthena CreateTable/ExecuteQueryを定期的に実行するLambdaでお手軽データ解析基盤を手に入れる

はじめに

お久しぶりです。都内でエンジニアをしている nariです。 年末年始なので今年やったことをつらつら備忘録としてためておきたいと思います。 (これ一個で終わっちゃうかも。ご容赦)

作った動機

ある案件でDynamoDBに対してある程度複雑なクエリで抽出したい要件がでてきました。 しかし

  • そもそも本番Data Storeに解析クエリを投げるのはよくない
  • DQL(DynamoDB Query Language)の表現(unix timeの扱いetc)が貧弱/毎回解析用にIndexをはるのがつらい

ので、お手軽解析環境をStream/Batchの同期タスクをお守りすることなく手に入れたい思いから作成しました。 今回は、ちょうど今年リリースされたしたDynamo export to s3のという神機能を使って作成していきます。

全体像

f:id:st5818129:20201231152615p:plain

サンプルコードは以下

github.com

サンプルコード解説

Terraform側

DynamoDB table/export先のS3 bucket/Athena database/Athena execute result store用のS3 bucketを作成する

# ./terraform/dynamodb.tf

resource "aws_dynamodb_table" "sample" {
  name         = "sample_dynamodb_table"
  billing_mode = "PAY_PER_REQUEST"
  hash_key     = "id"

  attribute {
    name = "id"
    type = "S"
  }

  attribute {
    name = "name"
    type = "S"
  }

  point_in_time_recovery {
    enabled = true //①
  }
}
  • ① export to s3の機能を使うのにPITRが必須なのでtrueにしておきます
#./terraform/export_s3.tf

resource "aws_s3_bucket" "sample_export" {
  bucket        = "sample-export"
  acl           = "private"
  force_destroy = "false"

  versioning {
    enabled = true
  }

  lifecycle_rule {
    enabled = true
    expiration {
      days = 90 //②
    }
    noncurrent_version_expiration {
      days = 1
    }
  }


  server_side_encryption_configuration {
    rule {
      apply_server_side_encryption_by_default {
        kms_master_key_id = "arn:aws:kms:ap-northeast-1:99999999999999:key/xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" //③
        sse_algorithm     = "aws:kms"
      }
    }
  }
}


# enforce public access block on bucket
# reapply on next tf apply if disabled
resource "aws_s3_bucket_public_access_block" "sample_export" {
  bucket = aws_s3_bucket.sample_export.bucket

  block_public_acls       = true
  block_public_policy     = true
  ignore_public_acls      = true
  restrict_public_buckets = true
}
  • ② 個人情報を含む場合、こういったDataLake移行のレイヤーでも、Dataのライフサイクルを最初から意識して解析側やステークホルダーと握って設定するのが吉です。また、Query Resultを保存する S3 bucketにも同様のサイクルや後述する暗号化を適応する必要があることにも注意してください。
  • ③ 暗号化も各会社の基準によるところですが、重要度が高いものに関してはmanaged kms keyで暗号化するのが賢明だと思います。(内部からの漏洩も対策しておく)

Serverless Framework側

DynamoDB exportとAthena CreateTable/ExecuteQueryを実行するLambdaとCloudWatch Eventを作成する

#./conf/sample.yml

sample_dynamodb_table:
  cron:
    rate: rate(30 days)  //④
    enabled: true
  dynamo:
    arn: "arn:aws:dynamodb:ap-northeast-1:99999999999:table/xxxxxxxxxxxxxxxxxxxxxxxx"
  s3:
    name: "sample-bucket"
  kms:
    arn: "arn:aws:kms:ap-northeast-1:99999999999999:key/xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
  athena:
    enabled: true
    database_name: "sample-db"
    query_result_output_bucket_name: "sample-result-bucket"
    table_name: "sample-table"
    table_schema: "struct<id:struct<s:string>,name:struct<s:string>>" //⑤
  • ④ ap-northeast-1で1GBあたり$0.114 per GB (20201231時点)なので、頻度はお財布と相談してください
  • Dynamo table export データがこのようにネストされた構造をもっているので、Athena(Glue)で表現しようとするとこれだけ複雑になってします。こちらはid,nameをattributeにもつ場合の例。どのtableもこの形式なので参考にしてみてください
# ./src/module/export_handler.go

func (s *exportHandler) Run() error {
    sess := session.Must(session.NewSession())
    dynamoCli, err := NewDynamoCli(sess)
    if err != nil {
        log.Error(err)
        return err
    }
    exportID, err := dynamoCli.ExportToS3(s.Event.DynamoTableArn, s.Event.S3BucketName, s.Event.KmsArn)
    if err != nil {
        log.Error(err)
        return err
    }
    if s.Event.AthenaEnabled {
        athenaCli, err := NewAthenaCli(sess)
        if err != nil {
            log.Error(err)
            return err
        }
        newLocation := fmt.Sprintf("s3://%s/AWSDynamoDB/%s/data/", s.Event.S3BucketName, exportID)
        outputLocation := fmt.Sprintf("s3://%s/output/", s.Event.AthenaQueryResultOutputBucketName)
        //⑥ 
        if err := athenaCli.CreateTableIfNotExists(s.Event.AthenaDatabaseName, s.Event.AthenaTableName, s.Event.AthenaTableSchema, newLocation, outputLocation); err != nil {
            log.Error(err)
            return err
        }
        if err := athenaCli.ChangeLocation(s.Event.AthenaDatabaseName, s.Event.AthenaTableName, newLocation, outputLocation); err != nil {
            log.Error(err)
            return err
        }
    }
    return nil
}
  • ⑥ 初回はCreateTableをして、初回移行は新しくexportしたobject pathでtable locationを変更しにいきます。これによって、同じtable名で、ある程度の鮮度のデータを解析できるようになります。

Athenaでの引き方

  • 以下のようなsqlでコンソールから引けるようになりました。redashとつなげていつでも解析できるようにすることもできます。
SELECT 
    item.id.s as id,
    item.name.s as name
FROM 
    "sample_db"."sample_table" 

終わりに

これで、実行間隔分の反映ラグ(1日おきなら、最大1日分の反映ラグが発生してします)が発生するものの、それくらいのデータ鮮度を許容できる場合簡単にDynamoDBをSQLで解析できるようになりました。 皆さんもDynamoDBに検証/解析用のちょっと複雑なクエリを投げたくなったらこういう感じでさくっとAthenaで解析する方向にシフトしてみてください。

参考文献