Amazon Kinesis Data StreamsにTwitter検索データを送信する
2021/08/09
目次
Kinesis Data Streamsの作成
ストリーム名とシャード数を決定するだけです。
シャードは
「各シャードは最大で 1 MiB/秒および 1000 レコード/秒を取得し、最大 2 MiB/秒を出力します。」
ということですので、1レコード1kbなさそうなレコードを最大100件送信するのでシャードは1つで足りそうです。
シャードエスティメーターで確認しても問題なさそうです。
作成されました。
シャードの数、タグ以外の設定は、サーバー側の暗号化、データ保持期間、シャードレベルメトリクスでした。
デフォルトの24時間、暗号化なし、シャードレベルの追加メトリクスなしにしました。
AWS CLI を使用した基本的な Kinesis Data Stream オペレーションの実行の手順でレコードの送信、受信をやってみました。
送信
1 2 3 4 5 6 |
$ aws kinesis put-record --stream-name Tweet --partition-key 123 --data testdata { "ShardId": "shardId-000000000000", "SequenceNumber": "49620429506074472818275695973080444976867703729602691074" } |
受信
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
$ aws kinesis get-records --shard-iterator $(aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Tweet --query 'ShardIterator') { "Records": [ { "SequenceNumber": "49620429506074472818275695973080444976867703729602691074", "ApproximateArrivalTimestamp": "2021-07-24T09:18:19.003000+00:00", "Data": "testdata", "PartitionKey": "123" } ], "NextShardIterator": "AAAAAAAAAAEE7PmjeZZuA9VoWDyAORnsK9ZmBAlCWRcJOtVKQh7wvlX7AnNli52adZZeuy6pk09sDDCmwopXBCXkdobIPLj642Il3RgV25muolgT4dmP37ob7Qa0baH2EParQtmbEcdBmVZddONbqUXvfoVS13Y2o9btm3JoYb0Y5KL/paZl6nZAmm5ruDYz03mJnFX+iqeO8M4SML2ZNYUApx3uB/QjcGi3uJgEDjSNAnQyVYv55w==", "MillisBehindLatest": 0 } |
shard-iteratorを入れ子コマンドで取得しました。
テストで作ったレコードが残っているとツイッターのデータが書き込めなかったので、一回ストリームを削除して作り直しました。
IAMロールの設定
AWS Lambdaから送信するので、専用のIAMロールを設定しました。
- KinesisPutRecords
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:PutRecords", "kinesis:PutRecord" ], "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/Tweet" }, { "Effect": "Allow", "Action": "secretsmanager:GetSecretValue", "Resource": "arn:aws:secretsmanager:us-east-1:123456789012:secret:LiveDelivery/TwitterAPIKey-xxxxx" }, { "Effect": "Allow", "Action": [ "ssm:PutParameter", "ssm:GetParameter" ], "Resource": "arn:aws:ssm:us-east-1:123456789012:parameter/twitter-since-id" }, { "Effect": "Allow", "Action": [ "xray:PutTraceSegments", "xray:PutTelemetryRecords" ], "Resource": [ "*" ] } ] } |
kinesis:PutRecordsを使うことにしました。
必要ないですが、PutRecordも許可しています。
TwitterのキーをSecretsManagerから取得するポリシーとParameterStoreでsince_idを管理しているのでそのポリシーも設定しています。
他には、AWS管理ポリシーのAWSLambdaBasicExecutionRoleを設定しています。
Lambda関数
Githubで公開しています。
yamamanx/TweetSearchAnalyze
取得したTweetをput_recordでKinesis Data Streamingに送信しています。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
def kinesis_put_records(tweets): try: records = [] for tweet in tweets['statuses']: payload = { 'id': tweet['id_str'], 'text': tweet['text'], 'source': tweet['source'], 'user_name': tweet['user']['name'], 'user_id': tweet['user']['id'], 'user_screen_name': tweet['user']['screen_name'], 'user_followers_count': tweet['user']['followers_count'], 'geo': tweet['geo'] } logger.debug(payload) record = { 'Data': json.dumps(payload), 'PartitionKey': tweet['id_str'] } records.append(record) logger.info(len(records)) if len(records) > 0: kinesis_client = boto3.client('kinesis') response = kinesis_client.put_records( Records=records, StreamName='Tweet' ) logger.info(response) except Exception as e: raise e |
クエリーでOR検索が意図したように動作しなかったので、キーワードを配列にしてキーワードの数だけ検索を実行しています。
その他のコードは以下のブログをご確認ください。
- AWS Lambda(Python)からTwitterに投稿する
- Lambda関数からAWS Systems Managerパラメータストアの値を取得して更新する
- Twitterでツイート検索するAPIを試してみる
Lambda関数はバージョンを作って、エイリアスprodに紐づけて、EventBridgeで5分おき実行にしました。
最後までお読みいただきましてありがとうございました!
【PR】 「AWS認定試験対策 AWS クラウドプラクティショナー」という本を書きました。
【PR】 「AWSではじめるLinux入門ガイド」という本を書きました。
【PR】 「ポケットスタディ AWS認定 デベロッパーアソシエイト」という本を書きました。
【PR】 「AWS認定資格試験テキスト&問題集 AWS認定ソリューションアーキテクト - プロフェッショナル」という本を書きました。

開発ベンダー5年、ユーザ企業システム部門通算9年、ITインストラクター5年目でプロトタイプビルダーもやりだしたSoftware Engineerです。
質問はコメントかSNSなどからお気軽にどうぞ。
出来る限りなるべく答えます。
このブログの内容/発言の一切は個人の見解であり、所属する組織とは関係ありません。
このブログは経験したことなどの共有を目的としており、手順や結果などを保証するものではありません。
ご参考にされる際は、読者様自身のご判断にてご対応をお願いいたします。
また、勉強会やイベントのレポートは自分が気になったことをメモしたり、聞いて思ったことを書いていますので、登壇者の意見や発表内容ではありません。
ad
ad
関連記事
-
-
Amazon SNSサブスクリプションフィルターで優先度別のSQSキューにサブスクライブする
EC2のコンシューマーアプリケーションは優先度の高いキューのメッセージを先に処理 …
-
-
「関西AWSスタートアップ勉強会」に行ってきました
第2回 関西スタートアップAWS勉強会に行ってきました。 akippa 拠点数コ …
-
-
AWS Toolkit for EclipseからLambda関数を直接作成できずにMavenでパッケージ化して作成
AWS Toolkit for EclipseからLambda関数を直接作成 チ …
-
-
AWS Client VPNを設定しました
ユーザーガイドのクライアント VPN の開始方法に沿ってやりました。 AWSクラ …
-
-
RDSリザーブドDBインスタンスを購入しました
リザーブドインスタンス推奨事項を確認したで確認した結果、購入したほうがよさそうで …
-
-
Amazon Data Lifecycle Manager(DLM)が東京リージョンで使えるようになったのでLambdaでAMI自動取得から乗り換えた
EBSのスナップショットを自動で作成してくれるAmazon Data Lifec …
-
-
EC2 Amazon Linux 2 にAmazon LinuxからWordPressを移行
このブログを新しいインスタンスに移行することにしました。 2015年5月にAma …
-
-
AWS Toolkit for Eclipseをセットアップ(2021年版)
AWS Toolkit for Eclipseをセットアップするで6年前に書いて …
-
-
kintone webhookからAWS API Gateway – Lambdaを実行しレコードの値を渡す
2017年2月のアップデートでkintoneにWebhook機能がリリースされま …
-
-
SCPが影響しないサービスにリンクされたロールにEC2が引き受けるIAMロールは含まれないことを確認
ドキュメントで確認 サービスコントロールポリシーのユーザーガイドには、「SCPは …