Amazon Kinesis Data StreamsにTwitter検索データを送信する
2021/08/09
目次
Kinesis Data Streamsの作成
ストリーム名とシャード数を決定するだけです。
シャードは
「各シャードは最大で 1 MiB/秒および 1000 レコード/秒を取得し、最大 2 MiB/秒を出力します。」
ということですので、1レコード1kbなさそうなレコードを最大100件送信するのでシャードは1つで足りそうです。
シャードエスティメーターで確認しても問題なさそうです。
作成されました。
シャードの数、タグ以外の設定は、サーバー側の暗号化、データ保持期間、シャードレベルメトリクスでした。
デフォルトの24時間、暗号化なし、シャードレベルの追加メトリクスなしにしました。
AWS CLI を使用した基本的な Kinesis Data Stream オペレーションの実行の手順でレコードの送信、受信をやってみました。
送信
1 2 3 4 5 6 |
$ aws kinesis put-record --stream-name Tweet --partition-key 123 --data testdata { "ShardId": "shardId-000000000000", "SequenceNumber": "49620429506074472818275695973080444976867703729602691074" } |
受信
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
$ aws kinesis get-records --shard-iterator $(aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Tweet --query 'ShardIterator') { "Records": [ { "SequenceNumber": "49620429506074472818275695973080444976867703729602691074", "ApproximateArrivalTimestamp": "2021-07-24T09:18:19.003000+00:00", "Data": "testdata", "PartitionKey": "123" } ], "NextShardIterator": "AAAAAAAAAAEE7PmjeZZuA9VoWDyAORnsK9ZmBAlCWRcJOtVKQh7wvlX7AnNli52adZZeuy6pk09sDDCmwopXBCXkdobIPLj642Il3RgV25muolgT4dmP37ob7Qa0baH2EParQtmbEcdBmVZddONbqUXvfoVS13Y2o9btm3JoYb0Y5KL/paZl6nZAmm5ruDYz03mJnFX+iqeO8M4SML2ZNYUApx3uB/QjcGi3uJgEDjSNAnQyVYv55w==", "MillisBehindLatest": 0 } |
shard-iteratorを入れ子コマンドで取得しました。
テストで作ったレコードが残っているとツイッターのデータが書き込めなかったので、一回ストリームを削除して作り直しました。
IAMロールの設定
AWS Lambdaから送信するので、専用のIAMロールを設定しました。
- KinesisPutRecords
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:PutRecords", "kinesis:PutRecord" ], "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/Tweet" }, { "Effect": "Allow", "Action": "secretsmanager:GetSecretValue", "Resource": "arn:aws:secretsmanager:us-east-1:123456789012:secret:LiveDelivery/TwitterAPIKey-xxxxx" }, { "Effect": "Allow", "Action": [ "ssm:PutParameter", "ssm:GetParameter" ], "Resource": "arn:aws:ssm:us-east-1:123456789012:parameter/twitter-since-id" }, { "Effect": "Allow", "Action": [ "xray:PutTraceSegments", "xray:PutTelemetryRecords" ], "Resource": [ "*" ] } ] } |
kinesis:PutRecordsを使うことにしました。
必要ないですが、PutRecordも許可しています。
TwitterのキーをSecretsManagerから取得するポリシーとParameterStoreでsince_idを管理しているのでそのポリシーも設定しています。
他には、AWS管理ポリシーのAWSLambdaBasicExecutionRoleを設定しています。
Lambda関数
Githubで公開しています。
yamamanx/TweetSearchAnalyze
取得したTweetをput_recordでKinesis Data Streamingに送信しています。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
def kinesis_put_records(tweets): try: records = [] for tweet in tweets['statuses']: payload = { 'id': tweet['id_str'], 'text': tweet['text'], 'source': tweet['source'], 'user_name': tweet['user']['name'], 'user_id': tweet['user']['id'], 'user_screen_name': tweet['user']['screen_name'], 'user_followers_count': tweet['user']['followers_count'], 'geo': tweet['geo'] } logger.debug(payload) record = { 'Data': json.dumps(payload), 'PartitionKey': tweet['id_str'] } records.append(record) logger.info(len(records)) if len(records) > 0: kinesis_client = boto3.client('kinesis') response = kinesis_client.put_records( Records=records, StreamName='Tweet' ) logger.info(response) except Exception as e: raise e |
クエリーでOR検索が意図したように動作しなかったので、キーワードを配列にしてキーワードの数だけ検索を実行しています。
その他のコードは以下のブログをご確認ください。
- AWS Lambda(Python)からTwitterに投稿する
- Lambda関数からAWS Systems Managerパラメータストアの値を取得して更新する
- Twitterでツイート検索するAPIを試してみる
Lambda関数はバージョンを作って、エイリアスprodに紐づけて、EventBridgeで5分おき実行にしました。
最後までお読みいただきましてありがとうございました!
「AWS認定資格試験テキスト&問題集 AWS認定ソリューションアーキテクト - プロフェッショナル 改訂第2版」という本を書きました。
「AWS認定資格試験テキスト AWS認定クラウドプラクティショナー 改訂第3版」という本を書きました。
「ポケットスタディ AWS認定 デベロッパーアソシエイト [DVA-C02対応] 」という本を書きました。
「要点整理から攻略するAWS認定ソリューションアーキテクト-アソシエイト」という本を書きました。
「AWSではじめるLinux入門ガイド」という本を書きました。
開発ベンダー5年、ユーザ企業システム部門通算9年、ITインストラクター5年目でプロトタイプビルダーもやりだしたSoftware Engineerです。
質問はコメントかSNSなどからお気軽にどうぞ。
出来る限りなるべく答えます。
このブログの内容/発言の一切は個人の見解であり、所属する組織とは関係ありません。
このブログは経験したことなどの共有を目的としており、手順や結果などを保証するものではありません。
ご参考にされる際は、読者様自身のご判断にてご対応をお願いいたします。
また、勉強会やイベントのレポートは自分が気になったことをメモしたり、聞いて思ったことを書いていますので、登壇者の意見や発表内容ではありません。
ad
ad
関連記事
-
S3バケットポリシーですべてDenyにしてしまったらルートユーザーの出番
特定のConditionsを指定して、それ以外はすべて拒否するS3バケットポリシ …
-
AWS Certificate Manager(ACM)メール検証をDNS検証の証明書に差し替えました
ブログの証明書 このブログの証明書の有効期限があと1週間です。 証明書はAWS …
-
AD Connectorを作成してシームレスにドメイン参加する
VPN接続先のADで管理されているドメインにEC2 Windowsインスタンスか …
-
CloudTrailイベントのコストしか発生していないリージョンのコスト発生源を調査しました
調査のきっかけ ふと検証用AWSアカウントのCostExplorerを見てました …
-
AWS Trusted Advisorの2023/11/17発表のAPI
2023/11/17に発表されたAWS Trusted Advisor の新しい …
-
AWS CodeStarで静的webサイトのテンプレートプロジェクトを作成する
執筆環境の検討中です。 CodeCommitは使うつもりで、コミットしたときにE …
-
Amazon InspectorによるLambda関数の脆弱性検出結果を確認しました
AWS re:Invent 2022期間内に発表されましたAmazon Insp …
-
JAWS FESTA 2019 Sapporo 参加&当日スタッフ&企業サポーターで!
2019年のJAWS FESTA は札幌です! 今回もありがたいことに、所属して …
-
執筆環境(PyCharm, CodeCommit, CodePipeline, S3, Lambda, 署名付きURL)
2018年から、年に1回ぐらい商業本の執筆をさせていただいております。 2020 …
-
Amazon Rekognitionでイベント参加者の顔写真を解析して似ている人ランキングをその場で作る
2017/9/21に開催されたAWS Cloud Roadshow 2017 大 …