Amazon Kinesis Data StreamsにTwitter検索データを送信する
2021/08/09
目次
Kinesis Data Streamsの作成
ストリーム名とシャード数を決定するだけです。
シャードは
「各シャードは最大で 1 MiB/秒および 1000 レコード/秒を取得し、最大 2 MiB/秒を出力します。」
ということですので、1レコード1kbなさそうなレコードを最大100件送信するのでシャードは1つで足りそうです。
シャードエスティメーターで確認しても問題なさそうです。
作成されました。
シャードの数、タグ以外の設定は、サーバー側の暗号化、データ保持期間、シャードレベルメトリクスでした。
デフォルトの24時間、暗号化なし、シャードレベルの追加メトリクスなしにしました。
AWS CLI を使用した基本的な Kinesis Data Stream オペレーションの実行の手順でレコードの送信、受信をやってみました。
送信
1 2 3 4 5 6 |
$ aws kinesis put-record --stream-name Tweet --partition-key 123 --data testdata { "ShardId": "shardId-000000000000", "SequenceNumber": "49620429506074472818275695973080444976867703729602691074" } |
受信
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
$ aws kinesis get-records --shard-iterator $(aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Tweet --query 'ShardIterator') { "Records": [ { "SequenceNumber": "49620429506074472818275695973080444976867703729602691074", "ApproximateArrivalTimestamp": "2021-07-24T09:18:19.003000+00:00", "Data": "testdata", "PartitionKey": "123" } ], "NextShardIterator": "AAAAAAAAAAEE7PmjeZZuA9VoWDyAORnsK9ZmBAlCWRcJOtVKQh7wvlX7AnNli52adZZeuy6pk09sDDCmwopXBCXkdobIPLj642Il3RgV25muolgT4dmP37ob7Qa0baH2EParQtmbEcdBmVZddONbqUXvfoVS13Y2o9btm3JoYb0Y5KL/paZl6nZAmm5ruDYz03mJnFX+iqeO8M4SML2ZNYUApx3uB/QjcGi3uJgEDjSNAnQyVYv55w==", "MillisBehindLatest": 0 } |
shard-iteratorを入れ子コマンドで取得しました。
テストで作ったレコードが残っているとツイッターのデータが書き込めなかったので、一回ストリームを削除して作り直しました。
IAMロールの設定
AWS Lambdaから送信するので、専用のIAMロールを設定しました。
- KinesisPutRecords
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:PutRecords", "kinesis:PutRecord" ], "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/Tweet" }, { "Effect": "Allow", "Action": "secretsmanager:GetSecretValue", "Resource": "arn:aws:secretsmanager:us-east-1:123456789012:secret:LiveDelivery/TwitterAPIKey-xxxxx" }, { "Effect": "Allow", "Action": [ "ssm:PutParameter", "ssm:GetParameter" ], "Resource": "arn:aws:ssm:us-east-1:123456789012:parameter/twitter-since-id" }, { "Effect": "Allow", "Action": [ "xray:PutTraceSegments", "xray:PutTelemetryRecords" ], "Resource": [ "*" ] } ] } |
kinesis:PutRecordsを使うことにしました。
必要ないですが、PutRecordも許可しています。
TwitterのキーをSecretsManagerから取得するポリシーとParameterStoreでsince_idを管理しているのでそのポリシーも設定しています。
他には、AWS管理ポリシーのAWSLambdaBasicExecutionRoleを設定しています。
Lambda関数
Githubで公開しています。
yamamanx/TweetSearchAnalyze
取得したTweetをput_recordでKinesis Data Streamingに送信しています。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
def kinesis_put_records(tweets): try: records = [] for tweet in tweets['statuses']: payload = { 'id': tweet['id_str'], 'text': tweet['text'], 'source': tweet['source'], 'user_name': tweet['user']['name'], 'user_id': tweet['user']['id'], 'user_screen_name': tweet['user']['screen_name'], 'user_followers_count': tweet['user']['followers_count'], 'geo': tweet['geo'] } logger.debug(payload) record = { 'Data': json.dumps(payload), 'PartitionKey': tweet['id_str'] } records.append(record) logger.info(len(records)) if len(records) > 0: kinesis_client = boto3.client('kinesis') response = kinesis_client.put_records( Records=records, StreamName='Tweet' ) logger.info(response) except Exception as e: raise e |
クエリーでOR検索が意図したように動作しなかったので、キーワードを配列にしてキーワードの数だけ検索を実行しています。
その他のコードは以下のブログをご確認ください。
- AWS Lambda(Python)からTwitterに投稿する
- Lambda関数からAWS Systems Managerパラメータストアの値を取得して更新する
- Twitterでツイート検索するAPIを試してみる
Lambda関数はバージョンを作って、エイリアスprodに紐づけて、EventBridgeで5分おき実行にしました。
最後までお読みいただきましてありがとうございました!
「AWS認定資格試験テキスト&問題集 AWS認定ソリューションアーキテクト - プロフェッショナル 改訂第2版」という本を書きました。
![](https://www.sbcr.jp/wp-content/uploads/2023/01/9784815617929-1-407x596.jpg)
「AWS認定資格試験テキスト AWS認定クラウドプラクティショナー 改訂第3版」という本を書きました。
![](https://www.sbcr.jp/wp-content/uploads/2024/01/9784815625382-3-420x596.jpg)
「ポケットスタディ AWS認定 デベロッパーアソシエイト [DVA-C02対応] 」という本を書きました。
![](https://www.shuwasystem.co.jp//images/book/637791.jpg)
「要点整理から攻略するAWS認定ソリューションアーキテクト-アソシエイト」という本を書きました。
![](https://book.mynavi.jp/files/topics/135344_ext_06_0.jpg?v=1673514682)
「AWSではじめるLinux入門ガイド」という本を書きました。
![](https://www.yamamanx.com/wp-content/uploads/2023/12/81Rp5O9We6L._SY522_.jpg)
![@yamamanx](https://www.yamamanx.com/wp-content/plugins/lazy-load/images/1x1.trans.gif)
開発ベンダー5年、ユーザ企業システム部門通算9年、ITインストラクター5年目でプロトタイプビルダーもやりだしたSoftware Engineerです。
質問はコメントかSNSなどからお気軽にどうぞ。
出来る限りなるべく答えます。
このブログの内容/発言の一切は個人の見解であり、所属する組織とは関係ありません。
このブログは経験したことなどの共有を目的としており、手順や結果などを保証するものではありません。
ご参考にされる際は、読者様自身のご判断にてご対応をお願いいたします。
また、勉強会やイベントのレポートは自分が気になったことをメモしたり、聞いて思ったことを書いていますので、登壇者の意見や発表内容ではありません。
ad
ad
関連記事
-
-
VPC新コンソールの日本語UIでルートテーブル編集時のエラー(2021/6/10)が発生したのでフィードバックを送った
VPCの新コンソールがリリースされていたので使って作業してましたところ、こんなエ …
-
-
AWS Cost Explorerの設定で「EC2リソースの推奨事項を受け取る」を有効にしました
「EC2リソースの推奨事項を受け取る」という機能がAWS Cost Explor …
-
-
CloudWatch Internet Monitor(プレビュー)を試しました
Amazon CloudWatch Internet Monitor プレビュー …
-
-
テキストをAmazon PollyでMP3に変換してS3に格納(AWS Lambda Python)
Google Calendar Twilio ReminderのテキストをAma …
-
-
AWS Firewall Managerを設定して結果を確認
CloudFront対応のポリシーとして作成したかったので、Globalを選択し …
-
-
AWS Lambda(Python)からTwitterに投稿する
「GoogleフォームからAPI Gatewayで作成したREST APIにPO …
-
-
「AWS認定資格試験テキスト AWS認定クラウドプラクティショナー」執筆裏話
今日2019/4/20発売となりました「AWS認定資格試験テキスト AWS認定ク …
-
-
Cloud9でSAMローカルテスト
せっかくテストするので、Amazon CloudSearchからAmazon E …
-
-
EC2:RunInstances APIにリクエストしてEC2インスタンスを起動(署名バージョン4、Postman)
AWSのAPIリクエストってHTTPでもよかったですよね?って思って、確認のため …
-
-
AWS Organizationsでアカウントを50作って指定したOUに移動するスクリプト
AWS Organizationsでアカウントを50個作る必要がありましたので、 …