Amazon Kinesis Data StreamsにTwitter検索データを送信する
2021/08/09
目次
Kinesis Data Streamsの作成
ストリーム名とシャード数を決定するだけです。
シャードは
「各シャードは最大で 1 MiB/秒および 1000 レコード/秒を取得し、最大 2 MiB/秒を出力します。」
ということですので、1レコード1kbなさそうなレコードを最大100件送信するのでシャードは1つで足りそうです。
シャードエスティメーターで確認しても問題なさそうです。
作成されました。
シャードの数、タグ以外の設定は、サーバー側の暗号化、データ保持期間、シャードレベルメトリクスでした。
デフォルトの24時間、暗号化なし、シャードレベルの追加メトリクスなしにしました。
AWS CLI を使用した基本的な Kinesis Data Stream オペレーションの実行の手順でレコードの送信、受信をやってみました。
送信
1 2 3 4 5 6 |
$ aws kinesis put-record --stream-name Tweet --partition-key 123 --data testdata { "ShardId": "shardId-000000000000", "SequenceNumber": "49620429506074472818275695973080444976867703729602691074" } |
受信
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
$ aws kinesis get-records --shard-iterator $(aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Tweet --query 'ShardIterator') { "Records": [ { "SequenceNumber": "49620429506074472818275695973080444976867703729602691074", "ApproximateArrivalTimestamp": "2021-07-24T09:18:19.003000+00:00", "Data": "testdata", "PartitionKey": "123" } ], "NextShardIterator": "AAAAAAAAAAEE7PmjeZZuA9VoWDyAORnsK9ZmBAlCWRcJOtVKQh7wvlX7AnNli52adZZeuy6pk09sDDCmwopXBCXkdobIPLj642Il3RgV25muolgT4dmP37ob7Qa0baH2EParQtmbEcdBmVZddONbqUXvfoVS13Y2o9btm3JoYb0Y5KL/paZl6nZAmm5ruDYz03mJnFX+iqeO8M4SML2ZNYUApx3uB/QjcGi3uJgEDjSNAnQyVYv55w==", "MillisBehindLatest": 0 } |
shard-iteratorを入れ子コマンドで取得しました。
テストで作ったレコードが残っているとツイッターのデータが書き込めなかったので、一回ストリームを削除して作り直しました。
IAMロールの設定
AWS Lambdaから送信するので、専用のIAMロールを設定しました。
- KinesisPutRecords
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:PutRecords", "kinesis:PutRecord" ], "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/Tweet" }, { "Effect": "Allow", "Action": "secretsmanager:GetSecretValue", "Resource": "arn:aws:secretsmanager:us-east-1:123456789012:secret:LiveDelivery/TwitterAPIKey-xxxxx" }, { "Effect": "Allow", "Action": [ "ssm:PutParameter", "ssm:GetParameter" ], "Resource": "arn:aws:ssm:us-east-1:123456789012:parameter/twitter-since-id" }, { "Effect": "Allow", "Action": [ "xray:PutTraceSegments", "xray:PutTelemetryRecords" ], "Resource": [ "*" ] } ] } |
kinesis:PutRecordsを使うことにしました。
必要ないですが、PutRecordも許可しています。
TwitterのキーをSecretsManagerから取得するポリシーとParameterStoreでsince_idを管理しているのでそのポリシーも設定しています。
他には、AWS管理ポリシーのAWSLambdaBasicExecutionRoleを設定しています。
Lambda関数
Githubで公開しています。
yamamanx/TweetSearchAnalyze
取得したTweetをput_recordでKinesis Data Streamingに送信しています。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
def kinesis_put_records(tweets): try: records = [] for tweet in tweets['statuses']: payload = { 'id': tweet['id_str'], 'text': tweet['text'], 'source': tweet['source'], 'user_name': tweet['user']['name'], 'user_id': tweet['user']['id'], 'user_screen_name': tweet['user']['screen_name'], 'user_followers_count': tweet['user']['followers_count'], 'geo': tweet['geo'] } logger.debug(payload) record = { 'Data': json.dumps(payload), 'PartitionKey': tweet['id_str'] } records.append(record) logger.info(len(records)) if len(records) > 0: kinesis_client = boto3.client('kinesis') response = kinesis_client.put_records( Records=records, StreamName='Tweet' ) logger.info(response) except Exception as e: raise e |
クエリーでOR検索が意図したように動作しなかったので、キーワードを配列にしてキーワードの数だけ検索を実行しています。
その他のコードは以下のブログをご確認ください。
- AWS Lambda(Python)からTwitterに投稿する
- Lambda関数からAWS Systems Managerパラメータストアの値を取得して更新する
- Twitterでツイート検索するAPIを試してみる
Lambda関数はバージョンを作って、エイリアスprodに紐づけて、EventBridgeで5分おき実行にしました。
最後までお読みいただきましてありがとうございました!
「AWS認定資格試験テキスト&問題集 AWS認定ソリューションアーキテクト - プロフェッショナル 改訂第2版」という本を書きました。
「AWS認定資格試験テキスト AWS認定クラウドプラクティショナー 改訂第3版」という本を書きました。
「ポケットスタディ AWS認定 デベロッパーアソシエイト [DVA-C02対応] 」という本を書きました。
「要点整理から攻略するAWS認定ソリューションアーキテクト-アソシエイト」という本を書きました。
「AWSではじめるLinux入門ガイド」という本を書きました。
開発ベンダー5年、ユーザ企業システム部門通算9年、ITインストラクター5年目でプロトタイプビルダーもやりだしたSoftware Engineerです。
質問はコメントかSNSなどからお気軽にどうぞ。
出来る限りなるべく答えます。
このブログの内容/発言の一切は個人の見解であり、所属する組織とは関係ありません。
このブログは経験したことなどの共有を目的としており、手順や結果などを保証するものではありません。
ご参考にされる際は、読者様自身のご判断にてご対応をお願いいたします。
また、勉強会やイベントのレポートは自分が気になったことをメモしたり、聞いて思ったことを書いていますので、登壇者の意見や発表内容ではありません。
ad
ad
関連記事
-
AWS Transit Gatewayピアリング接続確認
AWS Transit Gatewayのピアリング接続を使用して、異なるリージョ …
-
AWS IAMのMFA「エンティティは既に存在しています」に対応しました
エンティティは既に存在しています MFA Device entity at th …
-
Lucidchart AWSアカウントからインポート機能で自動作図
SNSでLucidchartというサービスが話題になってました。 AWSの環境を …
-
Amazon Location Service入門ワークショップ-マップの操作
関連記事 Amazon Location Service入門ワークショップの前提 …
-
このブログをAWS大阪リージョンへ移行しました
大阪リージョン爆誕!! 朝起きて寝ぼけながらリージョンを見てると。 「大阪リージ …
-
AWS Systems Manager AutomationでEC2の自動停止
Systems Manager Automationがない時代に、Lambdaを …
-
AWS Cloud9でJavaサンプルを実行する
リモートで共有開発ができるCloud9便利ですね。 Cloud9でJavaのサン …
-
AlexaにAWSの最新Feedを読み上げてもらう(Lambda Python)
年末にAmazon Echo Dotを購入しましたので、練習がてらAlexaスキ …
-
Rocket.ChatにAPIで投稿するテスト(Postman)
トレーニング期間中で一時利用するチャットが欲しいなあと思い、Rocket.Cha …
-
Backlogの課題チケット更新内容をMicrosoft Teamsに通知する(AWS Lambda Python)
BacklogにSlack連携が追加されました。 ですが、私の所属している会社で …