顔認識 (OpenCV DNN) とツイート (Tweepy) をサーバーレス環境 (GCP) で定期実行

Twitterのホームタイムラインを取得して, 顔認識とツイートを定期的に実行する仕組みをサーバーレス環境 (GCP) に実装しました.
(意訳:推しの画像をリツイートしてみんなに見てもらいたかった@pedigree_C)

推しの方々からいいねをもらったりもらわなかったり, 評判がなかなか良いです(?)

全体のソースコードと詳細は https://github.com/camberbridge/dft-gcp-serverless にありますのでよろしければご覧ください.


アーキテクチャ

顔認識&ツイート定期実行アーキテクチャ

何と言っても特筆すべきは Cloud Functions の柔軟さです.

関数が何らかの原因でクラッシュしても, 「失敗時に再試行」の設定をしておけば最大7日間 (2021年現在), 関数が正常に完了するまで繰り返し呼び出されます (タイムアウト時間は設定可能).

また, リクエストの多さによってインスタンス数が自動でスケールします. もちろんこの時の上限数は設定可能です.
今回は, 毎分の小さな定期実行なのでスケーリングしなくても間に合うだろうと思っていましたが, 画像付きツイートが多いと (顔認識があるので) 関数の処理に時間がかかって次のリクエスト処理に間に合わないことが多々ありました.

そんなときに Cloud Functions は自動でスケールアウトして, 逆に画像付きツイートが少ないとスケールインしてくれて, 非常に便利でした.


サーバーレス環境 (GCP)

DBにCloud Datastore

Twitterのホームタイムラインを定期的に取得する際, 前回取得したTweet IDよりも新しいものだけを取得するようにしたかったので, 最新のTweet IDを保持しておく必要がありました.
そこで, (DBを用意するまでもない構成ですが) 環境をステートレスにしたかったこともあり, ドキュメントDBの Cloud Datastore を用いました$^1$.

Tweet IDを書き込むときはデータを増やす理由がないので同じエンティティを更新し続けるようにしました.
ベストプラクティス的には, 「1つのエンティティをあまり頻繁に (1秒に1回以上) 更新しないこと」とありますが, ここでは毎分の更新なので良しとします$^2$.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from google.cloud import datastore

datastore_client = datastore.Client()
# エンティティの種類
kind = "Twitter_home"
# エンティティの名前/ID
name = "latest_tweet_id"
entity_key = datastore_client.key(kind, name)

def put_latest_tweet_id(tweet_id):
entity = datastore.Entity(key=entity_key)
entity["id"] = tweet_id

# エンティティ保存
datastore_client.put(entity)

def get_latest_tweet_id():
# エンティティ取得
entity = datastore_client.get(key=entity_key)

return entity["id"]

※ローカル環境から Datastore にアクセスする (上のコードを実行する) 場合はサービスアカウントを発行する必要があります$^3$

プログラムの実行環境にCloud Functions

イベント単位の実行に適したサーバーレスコンピューティングリソースの Cloud Functions を用いました.
基本的にはGCPコンソールで作成と設定が可能ですが, 顔認識に必要なOpenCVのDNNモデル等のファイルをコンソールからアップロードできません.

そこで, ソースコードやアセットをzip化したファイルを gsutilCloud Storage にアップロードしておき, その後の gcloud による Functions デプロイ時に, オブジェクトURLを指定してあげます$^4$.

1
2
3
4
5
6
7
8
9
10
11
12
$ zip -r [FILE_NAME].zip *
$ gsutil mb -p [GCP_PROJECT_ID] -l US-CENTRAL1 gs://[BUCKET_NAME]
$ gsutil cp [FILE_NAME].zip gs://[BUCKET_NAME]
$ gcloud functions deploy [FUNCTION_NAME] \
--source=gs://[BUCKET_NAME]/[FILE_NAME].zip \
--stage-bucket=[BUCKET_NAME] \
--trigger-topic=[Pub/Sub_TOPIC_NAME] \
--memory=256MB \
--runtime=python37 \
--region=us-central1 \
--project=[GCP_PROJECT_ID] \
--entry-point=main

定期実行のトリガーにCloud Scheduler x Pub/Sub

cronと同じ要領で使える Cloud Scheduler から毎分 Pub/Sub トピックをパブリッシュして Cloud Functions をトリガーさせました.
トピックは gcloud コマンドでデプロイする時に作成 (Pub/Sub_TOPIC_NAME の部分) しており, 受けの Functions にも設定しているので, あとは出す側の Cloud Scheduler の設定をします.
やり方は, GCPコンソールでリージョンやcron, トピックを指定するだけです$^5$.

その他

デプロイ時やプログラム実行時の成功/失敗など, ログ監視に Cloud Logging が大変役に立ちます.
また, エラー確認には, Cloud Error Reporting がすごく便利です. Twitter APIにレート制限があることをすっかり忘れていて, 実際に Error Reporting からメールが届いたおかげで気づけました (後述の wait_on_rate_limit を設定したきっかけ).


Twitter API (Tweepy)

レート制限の対処

無料アカウントの場合, ホームタイムライン取得リクエストの制限回数は15分に15回 (2021年現在) です$^6$.
結構シビアですが, Tweepyにはレート制限が補充されるのを自動的に待つ (エラーハンドリングせずに放置できる!) 便利なパラメータ wait_on_rate_limit があるのでそれを使います$^7$.

1
2
3
4
5
import tweepy

auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
api = tweepy.API(auth, wait_on_rate_limit=True)

ホームタイムライン取得

今回の仕組みは定期実行させるので, 毎度同じツイートを取得する可能性があります.
そこで, これまた便利なパラメータ since_id を使い, あるIDよりも新しいツイートを取得するようにします.$^7$

1
timeline = api.home_timeline(since_id=TWEET_ID, count=N)

ツイート前処理とMedia URLの取得

推しが投稿するツイートだけを取得したいので, 前処理として”RT @”を含むツイートを除外します.
ちなみにただのリツイートはホームタイムライン取得のレスポンスには入ってきません.

1
2
3
4
5
6
7
for status in timeline:
# リツイートを除外
if not "RT @" in status.text[0:4]:
if "media" in status.entities:
for media in status.extended_entities["media"]:
# Media URL取得
media_url = media["media_url"]

引用リツイート

元ツイートのURLをTweet本文に貼るだけで引用リツイートになります.
そしてアプリでリツイートした時と同様に, きちんと引用されたユーザに通知されます.


顔認識 (OpenCV DNN)

Web画像の読み込み

OpenCVでいろいろと画像処理をするため, Media URLから画像を一時ファイルとしてダウンロードしました.
このとき画像は /tmp ディレクトリに保存することがポイントです.
これは, Cloud Functions のファイルシステムで書き込み可能な場所 (一時ファイルの保存先として使用できる場所) は /tmp ディレクトリだけとなっているからです$^8$.

そして画像ファイルをOpenCVで読み込み, 処理の高速化のためあらかじめサイズを半分にしておきました.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import numpy as np
import cv2
import tempfile
import requests

def imread_web(url, flags=cv2.IMREAD_COLOR, dtype=np.uint8):
try:
res = requests.get(url)
# URLをダウンロードして一時ファイルとして保存
with tempfile.NamedTemporaryFile(dir="/tmp/") as fp:
fp.write(res.content)
fp.file.seek(0)

n = np.fromfile(fp.name, dtype)
img = cv2.imdecode(n, flags)
img_h, img_w = img.shape[:2]
img_hf_w = img_w // 2
img_hf_h = img_h // 2
halfimg = cv2.resize(img, (img_hf_w, img_hf_h))
return halfimg
except Exception as e:
return None

OpenCV DNNに入力する画像を正方形にリサイズ

画像をOpenCV DNNに入力する前に, 画像を正方形にリサイズしておく必要があります.
OpenCV DNNの顔認識では最初にBlobオブジェクトを blobFromImage() で作るのですが, このときの処理が同じ幅と高さの画像を要するからです$^9$.

1
2
3
img_size = 600
blob = cv2.dnn.blobFromImage(cv2.resize(img, (img_size, img_size)), 1.0,
(img_size, img_size), (104.0, 177.0, 123.0))

顔認識の実行

検出結果それぞれには信頼度があるので, それをよしなに使います.
いろいろと試行錯誤した結果, 0.9よりも大きければ顔と判定することにしました. 今のところ仕組みは顔写真付きツイートだけに反応できているのでいい感じです.

# 事前学習済み DNN caffe model
# 参考: https://github.com/spmallick/learnopencv/tree/master/FaceDetectionComparison/models
PROTOTXT_PATH = "./deploy.prototxt"
WEIGHTS_PATH = "./res10_300x300_ssd_iter_140000_fp16.caffemodel"
# 信頼度閾値
CONFIDENCE = 0.9
face_num = 0

# モデルのロード
net = cv2.dnn.readNetFromCaffe(PROTOTXT_PATH, WEIGHTS_PATH)
net.setInput(blob)
detections = net.forward()

for i in range(0, detections.shape[2]):
    confidence = detections[0, 0, i, 2]
    # 顔検出
    if confidence > CONFIDENCE:
        face_num += 1

参考文献

[1] “Datastore mode Client Libraries”. https://cloud.google.com/datastore/docs/reference/libraries

[2] “ベストプラクティス - エンティティの更新”. https://cloud.google.com/datastore/docs/best-practices#updates_to_an_entity

[3] “Datastore mode Client Libraries - Setting up authenticaiton”. https://cloud.google.com/datastore/docs/reference/libraries#setting_up_authentication

[4] “Google Cloud Pub/Sub トリガー - 関数のデプロイ”. https://cloud.google.com/functions/docs/calling/pubsub#deploying_your_function

[5] “Pub/Sub を使用して Cloud ファンクションをトリガーする - Cloud Schedulerジョブを作成する”. https://cloud.google.com/scheduler/docs/tut-pub-sub#create_a_job

[6] “Get Tweet timelines - Resource Information”. https://developer.twitter.com/en/docs/twitter-api/v1/tweets/timelines/api-reference/get-statuses-home_timeline

[7] “Tweepy API Reference”. https://docs.tweepy.org/en/latest/api.html

[8] “Cloud Functions 実行環境 - ファイルシステム”. https://cloud.google.com/functions/docs/concepts/exec#file_system

[9] “OpenCV Deep Neural Network module”. https://docs.opencv.org/master/d6/d0f/group__dnn.html