S3オブジェクトストレージMail.ru Cloud Solutionsのイベント駆動型Webhookベースのアプリケーションの例



Rube Goldbergコーヒーマシン



イベント駆動型アーキテクチャは、必要なときにだけ使用されるため、使用されるリソースのコスト効率を向上させます。これを実装する方法には多くのオプションがあり、ワーカーアプリケーションとして追加のクラウドエンティティを作成しません。そして今日は、FaaSについてではなく、Webhookについて話します。 Object Storage Webhookでイベントを処理するチュートリアルの例を紹介します。



オブジェクトストレージとWebhookについて一言。オブジェクトストレージを使用すると、S3または別のAPI(実装に応じて)経由でHTTP / HTTPS経由でアクセスできるオブジェクトとして、クラウドにデータを保存できます。 Webhookは通常、カスタムHTTPコールバックです。それらは通常、リポジトリへのコード送信やブログに投稿されたコメントなどのイベントによってトリガーされます。イベントが発生すると、オリジンサイトはWebhookに指定されたURLにHTTPリクエストを送信します。その結果、あるサイトのイベントで別のサイト(wiki)のアクションをトリガーできます。ソースサイトがObject Storageの場合、イベントはそのコンテンツの変更です。



このような自動化を使用できる単純なケースの例:



  1. . « », .
  2. , , .
  3. ( , , , ).
  4. , , Kubernetes, , .


例として、AWSオブジェクトストレージのwebhookを使用してMail.ru Cloud Solutions(MCS)オブジェクトストレージバケットの変更が同期されるときに、タスク1のバリアントを作成します。実際にロードされたケースでは、キューにwebhookを登録することで非同期作業を提供する必要がありますが、教育的なタスクでは、これなしで実装を行います。



仕事のスキーム



通信プロトコルについては、MCSのS3 Webhookガイドで詳しく説明しています。作業計画には次の要素があります。



  • S3側に配置され、Webnhookが発生したときにHTTPリクエストを発行する発行サービス
  • HTTP公開サービスからのリクエストをリッスンし、適切なアクションを実行するWebhook受信サーバーサーバーは任意の言語で記述できます。この例では、サーバーをGoで記述します。


S3 APIでのWebhook実装の特徴は、公開サービスでのWebhook受付サーバーの登録です。特に、Webhook受信サーバーは、公開サービスメッセージのサブスクリプションを確認する必要があります(他のWebhook実装では、通常、サブスクリプションを確認する必要はありません)。



したがって、Webhook受信サーバーは2つの主要な操作をサポートする必要があります。



  • 登録の確認のための出版サービスからの要求に応答し、
  • 着信イベントを処理します。


Webhookを受信するためのサーバーのインストール



Webhook受信サーバーを実行するには、Linuxサーバーが必要です。この記事では、例として、MCSにデプロイする仮想インスタンスを使用します。



必要なソフトウェアをインストールし、Webhookサーバーを起動します。



ubuntu@ubuntu-basic-1-2-10gb:~$ sudo apt-get install git
Reading package lists... Done
Building dependency tree
Reading state information... Done
The following packages were automatically installed and are no longer required:
  bc dns-root-data dnsmasq-base ebtables landscape-common liblxc-common 
liblxc1 libuv1 lxcfs lxd lxd-client python3-attr python3-automat 
python3-click python3-constantly python3-hyperlink
  python3-incremental python3-pam python3-pyasn1-modules 
python3-service-identity python3-twisted python3-twisted-bin 
python3-zope.interface uidmap xdelta3
Use 'sudo apt autoremove' to remove them.
Suggested packages:
  git-daemon-run | git-daemon-sysvinit git-doc git-el git-email git-gui 
gitk gitweb git-cvs git-mediawiki git-svn
The following NEW packages will be installed:
  git
0 upgraded, 1 newly installed, 0 to remove and 46 not upgraded.
Need to get 3915 kB of archives.
After this operation, 32.3 MB of additional disk space will be used.
Get:1 http://MS1.clouds.archive.ubuntu.com/ubuntu bionic-updates/main 
amd64 git amd64 1:2.17.1-1ubuntu0.7 [3915 kB]
Fetched 3915 kB in 1s (5639 kB/s)
Selecting previously unselected package git.
(Reading database ... 53932 files and directories currently installed.)
Preparing to unpack .../git_1%3a2.17.1-1ubuntu0.7_amd64.deb ...
Unpacking git (1:2.17.1-1ubuntu0.7) ...
Setting up git (1:2.17.1-1ubuntu0.7) ...


フォルダーをwebhook受信サーバーで複製します。



ubuntu@ubuntu-basic-1-2-10gb:~$ git clone
https://github.com/RomanenkoDenys/s3-webhook.git
Cloning into 's3-webhook'...
remote: Enumerating objects: 48, done.
remote: Counting objects: 100% (48/48), done.
remote: Compressing objects: 100% (27/27), done.
remote: Total 114 (delta 20), reused 45 (delta 18), pack-reused 66
Receiving objects: 100% (114/114), 23.77 MiB | 20.25 MiB/s, done.
Resolving deltas: 100% (49/49), done.


サーバーを起動しましょう:



ubuntu@ubuntu-basic-1-2-10gb:~$ cd s3-webhook/
ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80


パブリッシングサービスのサブスクライブ



サーバーを登録して、APIまたはWebインターフェースを介してWebhookを受信できます。簡単にするために、Webインターフェイスから登録します。



  1. コントロールルームのバケットセクション移動します。
  2. Webhookをセットアップするバケツに行き、ギアをクリックします。






[Webhook]タブに移動して、[Add]をクリックします。





フィールドに入力します







。ID-Webhookの名前。



イベント-送信するイベント。ファイルの操作(追加および削除)時に発生するすべてのイベントの転送を設定しました。



URL-Webhook受信サーバーのアドレス。



フィルタープレフィックス/サフィックスは、名前が特定のルールに一致するオブジェクトに対してのみWebhookを生成できるフィルターです。たとえば、拡張子が.pngのファイルのみをWebhookで機能させるには、フィルターサフィックスに「png」書き込みます



現在、Webhook受信サーバーへのアクセスは、ポート80と443のみがサポートされています。



[ フック追加]をクリックすると、次のようになります









ログでWebhookを受信するサーバーは、フック登録プロセスの進行状況を示します。



ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80
2020/06/15 12:01:14 [POST] incoming HTTP request from 
95.163.216.92:42530
2020/06/15 12:01:14 Got timestamp: 2020-06-15T15:01:13+03:00 TopicArn: 
mcs5259999770|myfiles-ash|s3:ObjectCreated:*,s3:ObjectRemoved:* Token: 
E2itMqAMUVVZc51pUhFWSp13DoxezvRxkUh5P7LEuk1dEe9y URL: 
http://89.208.199.220/webhook
2020/06/15 12:01:14 Generate responce signature: 
3754ce36636f80dfd606c5254d64ecb2fd8d555c27962b70b4f759f32c76b66d


登録は終了しました。次のセクションでは、サーバーがWebhookを受信するためのアルゴリズムを詳しく見ていきます。



Webhookを受信するサーバーの説明



この例では、サーバーはGoで記述されています。その仕事の基本原則を分析しましょう。



package main

// Generate hmac_sha256_hex
func HmacSha256hex(message string, secret string) string {
}

// Generate hmac_sha256
func HmacSha256(message string, secret string) string {
}

// Send subscription confirmation
func SubscriptionConfirmation(w http.ResponseWriter, req *http.Request, body []byte) {
}

// Send subscription confirmation
func GotRecords(w http.ResponseWriter, req *http.Request, body []byte) {
}

// Liveness probe
func Ping(w http.ResponseWriter, req *http.Request) {
    // log request
    log.Printf("[%s] incoming HTTP Ping request from %s\n", req.Method, req.RemoteAddr)
    fmt.Fprintf(w, "Pong\n")
}

//Webhook
func Webhook(w http.ResponseWriter, req *http.Request) {
}

func main() {

    // get command line args
    bindPort := flag.Int("port", 80, "number between 1-65535")
    bindAddr := flag.String("address", "", "ip address in dot format")
    flag.StringVar(&actionScript, "script", "", "external script to execute")
    flag.Parse()

    http.HandleFunc("/ping", Ping)
    http.HandleFunc("/webhook", Webhook)

log.Fatal(http.ListenAndServe(*bindAddr+":"+strconv.Itoa(*bindPort), nil))
}


主な機能を考えてみましょう:



  • Ping()は、活性プローブの最も単純な実装であるURL / pingで応答するルートです。
  • Webhook()-メインルート、URL / Webhookハンドラー:

    • パブリッシングサービスでの登録を確認します(SubscriptionConfirmation関数への移行)。
    • 着信Webhookを処理します(Gotrecords関数)。
  • HmacSha256およびHmacSha256hex関数は、HMAC-SHA256およびHMAC-SHA256暗号化アルゴリズムの実装であり、署名減算用の16進数の文字列として出力されます。
  • mainはmain関数であり、コマンドラインパラメータを処理し、URLハンドラを登録します。


サーバーが受け入れるコマンドラインパラメータ:



  • -portは、サーバーが待機するポートです。
  • -addressは、サーバーが待機するIPアドレスです。
  • -scriptは、入ってくるすべてのフックで呼び出される外部プログラムです。


関数のいくつかを詳しく見てみましょう。



//Webhook
func Webhook(w http.ResponseWriter, req *http.Request) {

    // Read body
    body, err := ioutil.ReadAll(req.Body)
    defer req.Body.Close()
    if err != nil {
        http.Error(w, err.Error(), 500)
        return
    }

    // log request
    log.Printf("[%s] incoming HTTP request from %s\n", req.Method, req.RemoteAddr)
    // check if we got subscription confirmation request
    if strings.Contains(string(body), 
"\"Type\":\"SubscriptionConfirmation\"") {
        SubscriptionConfirmation(w, req, body)
    } else {
        GotRecords(w, req, body)
    }

}


この関数は、何が来たかを決定します-登録の確認またはWebhookの要求。ドキュメントからわかるように、登録確認の場合、次のJson構造がPostリクエストに含まれます。



POST http://test.com HTTP/1.1
x-amz-sns-messages-type: SubscriptionConfirmation
content-type: application/json

{
    "Timestamp":"2019-12-26T19:29:12+03:00",
    "Type":"SubscriptionConfirmation",
    "Message":"You have chosen to subscribe to the topic $topic. To confirm the subscription you need to response with calculated signature",
    "TopicArn":"mcs2883541269|bucketA|s3:ObjectCreated:Put",
    "SignatureVersion":1,
    "Token":«RPE5UuG94rGgBH6kHXN9FUPugFxj1hs2aUQc99btJp3E49tA»
}


このリクエストには答える必要があります:



content-type: application/json

{"signature":«ea3fce4bb15c6de4fec365d36bcebbc34ccddf54616d5ca12e1972f82b6d37af»}


署名は次のように計算されます。



signature = hmac_sha256(url, hmac_sha256(TopicArn, 
hmac_sha256(Timestamp, Token)))


Webhookが到着すると、Postリクエストの構造は次のようになります。



POST <url> HTTP/1.1
x-amz-sns-messages-type: SubscriptionConfirmation

{ "Records":
    [
        {
            "s3": {
                "object": {
                    "eTag":"aed563ecafb4bcc5654c597a421547b2",
                    "sequencer":1577453615,
                    "key":"some-file-to-bucket",
                    "size":100
                },
            "configurationId":"1",
            "bucket": {
                "name": "bucketA",
                "ownerIdentity": {
                    "principalId":"mcs2883541269"}
                },
                "s3SchemaVersion":"1.0"
            },
            "eventVersion":"1.0",
            "requestParameters":{
                "sourceIPAddress":"185.6.245.156"
            },
            "userIdentity": {
                "principalId":"2407013e-cbc1-415f-9102-16fb9bd6946b"
            },
            "eventName":"s3:ObjectCreated:Put",
            "awsRegion":"ru-msk",
            "eventSource":"aws:s3",
            "responseElements": {
                "x-amz-request-id":"VGJR5rtJ"
            }
        }
    ]
}


したがって、リクエストによっては、データの処理方法を理解する必要があります。レコード"Type":"SubscriptionConfirmation"は、サブスクリプションを確認するリクエストに含まれ、Webhookには含まれていないため、インジケータとして選択しましたPOST要求にこのレコードが存在するかどうかに基づいて、さらにプログラムが実行されると、関数SubscriptionConfirmationまたは関数に入りますGotRecords



SubscriptionConfirmation関数は詳細には考慮しませんドキュメントに記載されている原則に従って実装されていますこの関数のソースコードは、プロジェクトのgitリポジトリで確認できます



GotRecords関数は、着信要求を解析し、各Recordオブジェクトについて、次のパラメーターを使用して(-scriptパラメーターで名前が渡された)外部スクリプトを呼び出します。



  • バケット名
  • オブジェクトキー
  • 行為:

    • コピー-元のリクエストの場合EventName = ObjectCreated | PutObject | PutObjectCopy
    • 削除-元のリクエストの場合EventName = ObjectRemoved | DeleteObject


したがって、上記のようPostリクエストを含むフックが到着し、-script = script.shパラメータが到着すると、スクリプトは次のように呼び出されます。



script.sh  bucketA some-file-to-bucket copy


このWebhook受信サーバーは完全な本番ソリューションではなく、可能な実装の単純化された例であることを理解する必要があります。



仕事の例



MCSのメインバケットのファイルをAWSのバックアップバケットに同期しましょう。メインバケットはmyfiles-ashと呼ばれ、バックアップはmyfiles-backupです(AWSでのバケットの構成はこの記事の範囲外です)。したがって、ファイルがメインバケットに配置されると、そのコピーはバックアップに表示され、メインのファイルから削除されると、バックアップから削除されます。



MCSクラウドストレージとAWSクラウドストレージの両方に互換性があるawscliユーティリティを使用してバケットを操作します。



ubuntu@ubuntu-basic-1-2-10gb:~$ sudo apt-get install awscli
Reading package lists... Done
Building dependency tree
Reading state information... Done
After this operation, 34.4 MB of additional disk space will be used.
Unpacking awscli (1.14.44-1ubuntu1) ...
Setting up awscli (1.14.44-1ubuntu1) ...


S3 MCS APIへのアクセスを設定しましょう:



ubuntu@ubuntu-basic-1-2-10gb:~$ aws configure --profile mcs
AWS Access Key ID [None]: hdywEPtuuJTExxxxxxxxxxxxxx
AWS Secret Access Key [None]: hDz3SgxKwXoxxxxxxxxxxxxxxxxxx
Default region name [None]:
Default output format [None]:


AWS S3 APIへのアクセスを設定しましょう:



ubuntu@ubuntu-basic-1-2-10gb:~$ aws configure --profile aws
AWS Access Key ID [None]: AKIAJXXXXXXXXXXXX
AWS Secret Access Key [None]: dfuerphOLQwu0CreP5Z8l5fuXXXXXXXXXXXXXXXX
Default region name [None]:
Default output format [None]:


アクセス



確認しましょう:AWSへ:



ubuntu@ubuntu-basic-1-2-10gb:~$ aws s3 ls --profile aws
2020-07-06 08:44:11 myfiles-backup


MCSの場合、コマンドの実行中に--endpoint-urlを追加します。



ubuntu@ubuntu-basic-1-2-10gb:~$ aws s3 ls --profile mcs --endpoint-url 
https://hb.bizmrg.com
2020-02-04 06:38:05 databasebackups-0cdaaa6402d4424e9676c75a720afa85
2020-05-27 10:08:33 myfiles-ash


アクセス。



今度は、着信フックを処理するためのスクリプトを作成して、s3_backup_mcs_aws.shと名付けましょう



#!/bin/bash
# Require aws cli
# if file added — copy it to backup bucket
# if file removed — remove it from backup bucket
# Variables
ENDPOINT_MCS="https://hb.bizmrg.com"
AWSCLI_MCS=`which aws`" --endpoint-url ${ENDPOINT_MCS} --profile mcs s3"
AWSCLI_AWS=`which aws`" --profile aws s3"
BACKUP_BUCKET="myfiles-backup"

SOURCE_BUCKET="${1}"
SOURCE_FILE="${2}"
ACTION="${3}"

SOURCE="s3://${SOURCE_BUCKET}/${SOURCE_FILE}"
TARGET="s3://${BACKUP_BUCKET}/${SOURCE_FILE}"
TEMP="/tmp/${SOURCE_BUCKET}/${SOURCE_FILE}"

case ${ACTION} in
    "copy")
    ${AWSCLI_MCS} cp "${SOURCE}" "${TEMP}"
    ${AWSCLI_AWS} cp "${TEMP}" "${TARGET}"
    rm ${TEMP}
    ;;

    "delete")
    ${AWSCLI_AWS} rm ${TARGET}
    ;;

    *)
    echo "Usage: ${0} sourcebucket sourcefile copy/delete"
    exit 1
    ;;
esac


サーバーを起動します。



ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80 -
script scripts/s3_backup_mcs_aws.sh


それがどのように機能するかチェックしてください。MCSインターフェースウェブバケットmyfilesという-灰にtest.txtというファイルを追加します。コンソールのログで、リクエストがWebhookサーバーに対して行われたことがわかります。



2020/07/06 09:43:08 [POST] incoming HTTP request from 
95.163.216.92:56612
download: s3://myfiles-ash/test.txt to ../../../tmp/myfiles-ash/test.txt
upload: ../../../tmp/myfiles-ash/test.txt to 
s3://myfiles-backup/test.txt


AWSのmyfiles-backupバケットの内容を確認してみましょう。



ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ aws s3 --profile aws ls 
myfiles-backup
2020-07-06 09:43:10       1104 test.txt


次に、Webインターフェースを使用して、myfiles-ashバケットからファイルを削除します。



サーバーログ:



2020/07/06 09:44:46 [POST] incoming HTTP request from 
95.163.216.92:58224
delete: s3://myfiles-backup/test.txt


バケットの内容:



ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ aws s3 --profile aws ls 
myfiles-backup
ubuntu@ubuntu-basic-1-2-10gb:~$


ファイルを削除し、問題を解決しました。



結論とToDo



この記事で使用するすべてのコードは私のリポジトリにありますスクリプトの例と、Webhookを登録するためのシグネチャのカウントの例もあります。



このコードは、アクティビティでS3 Webhookを使用する方法の例にすぎません。冒頭で述べたように、このようなサーバーを運用環境で使用する場合は、少なくとも非同期作業用にサーバーを書き直す必要があります。着信Webhookをキュー(RabbitMQまたはNATS)に登録し、そこからワーカーアプリケーションで逆アセンブルして処理します。そうしないと、Webhookが大量に到着するため、タスクを実行するためのサーバーリソースが不足する可能性があります。キューの存在により、サーバーとワーカーを分散させるだけでなく、障害が発生した場合の繰り返しタスクに関する問題を解決できます。ロギングをより詳細で標準化されたものに変更することも望ましいです。



幸運を!



トピックについてもっと読む:






All Articles