ElasticsearchからMaxComputeへ連携する方法
ElasticsearchからMaxComputeへ連携する方法
本記事では、Elasticsearch からMaxComputeへ連携する方法について説明します。
前書き
MaxCompute (旧プロダクト名 ODPS) は、大規模データウェアハウジングのためのフルマネージドかつマルチテナント形式のデータ処理プラットフォームです。さまざまなデータインポートソリューションと分散計算モデルにより、大規模データの効率的な照会、運用コストの削減、データセキュリティを実現します。
少し前になりますが、MaxComputeについての資料をSlideShareへアップロードしていますので、こちらも参考になればと思います。
今回はMaxComputeからAlibaba Cloud Elasticsearchへデータを連携します。構成図で、こんな感じです。
共通作業(MaxCompute全体で共通事項)
RAM ユーザー作成&権限付与
もしMaxComputeを操作するユーザがRAMユーザーの場合は以下を実施してください。
RAMより対象のユーザーを選定します。ユーザーが無い場合は新規作成します。 このときにAccessKey IDとAccessKey Secretをメモとして残してください。AccessKey IDとAccessKey SecretはDataWorks DataIntegrationの処理に必要となります。
対象のユーザーには権限ロールとしてAliyunDataWorksFullAccessをアタッチします。 これはDataWorksを操作するためのFull権限です。
DataWorks側にてユーザーごとに読み取り専用や一部プロジェクト・テーブルなどのきめ細かい権限付与ができますが、ここでは割愛します。
今回は、RDSからMaxComputeへ連携する方法なので、AliyunRDSFullAccessもアタッチします。
Workspace作成
MaxComputeを操作するためにはワークスペースおよびプロジェクトが必要なので新たに作成します。DataWorksコンソールから 「Create Project」を選択し、起動します。
Modeは「Basic Mode(基本モード)」「と「Standard Mode(標準モード)」の2種類があります。ここは「Basic Mode(基本モード)」として選定します。
続けて、MaxCompute を選定します。料金は初めて操作するなら Pay-As-You-Go(使った分だけ課金) が良いと思います。
MaxComputeに関する必要な情報を設定し、Workspaceを作成します。
同期タスクをサブミットする方法(基本モード(basic mode))
作業の途中で 同期タスクをサブミットする旨のアクションが発生しますが、こちらの手順を参考にいただければ幸いです。
DataWorks DataStdioで、操作が終わったら [Commit to Production Environment] をクリックし開発環境から本番環境へ直接コミットします。
同期タスクをサブミットする方法(標準モード(standard mode))
DataWorks DataStdioの右側にあるProperitiesをクリックします。
プロパティRerunを設定して、[Use Root Node]ボタンをクリックします。
開発環境にサブミットします。
あとは開発環境から本番環境にデプロイします。
Maxcomputeのデータをクエリする方法
DataWorks DataStdioのAd-Hoc Query画面に入って、[ODPS SQL]のノードを作成します。
SQLクエリ文を作成したら、上書き保存してから、Run SQLボタンを押します。
その後、SQLクエリの実行コストらお金が出ますが、ここも考慮のうえ、Run、で実行します。
実行結果としてレコード数が無事表示されます。
(事前準備)MaxCompute Tableの準備
DataWorks DataIntegrationから、新規オフライン同期タスクをクリックし、DataStdio画面へ遷移します。
DataStdio画面にて、「Create Node」らダイアログが表示されますが、ここではクローズします。
Workspace Tables画面に入って、テーブルを作成します。
DDL Statementボタンをクリックして、MySQL Tableに対応するDDL Statementを入力します。
CREATE TABLE IF NOT EXISTS odps_to_es (`create_time` string COMMENT '',`category` string COMMENT '',`brand` string COMMENT '',`buyer_id` string COMMENT '',`trans_num` bigint COMMENT '',`trans_amount` double COMMENT '',`click_cnt` bigint COMMENT '')PARTITIONED BY (pt bigint);
Display Nameを入力し、テーブルをコミットします。その後はテーブルが作成されてることが確認できます。
※標準モードプロジェクトの場合は本番環境にもコミットする必要があります。
このテーブルはまだ何も入っていない状態なので、データを追加します。インポートボタンをクリックします。
パーティションが存在するかどうかを確認します。
txtファイルをアップロードします。
データのインポートが無事成功したら、Ad-Hocクエリなり、DataMapなりでレコードが確認できます。
これでMaxCompute側の設定は完了です。次はElasticsearchの準備を進めます。
(事前準備)Elasticsearchの準備
Elasticsearchコンソールに入って、クラスターを作成してから、詳細画面に入ります。
Auto Indexingを有効に設定します。
Auto Indexingが有効になると、このようなステータスになります。
Public Networksを有効にして、EndPointをメモします。
Public Networksのホワイトリストを変更します。
DataworksのデフォルトリソースグループのIPをPublic Networksアクセスのホワイトリストに追加します。
東京リジョンなら以下の通りです。
100.105.55.0/24,11.192.147.0/24,11.192.148.0/24,11.192.149.0/24,100.64.0.0/10,47.91.12.0/24,47.91.13.0/24,47.91.9.0/24,11.199.250.0/24,47.91.27.0/24,11.59.59.0/24,47.245.51.128/26,47.245.51.192/26,47.91.0.128/26,47.91.0.192/26
IPがホワイトリストに追加されたらOKです。
Kibanaのコンソールに入ります。
Kibanaにログインし、DevToolからIndexを作成します。
PUT odps_index?include_type_name=false{"mappings": {"properties": {"category": {"type": "text"},"brand": {"type": "text"},"buyer_id": {"type": "text"},"trans_num": {"type": "integer"},"trans_amount": {"type": "double"},"click_cnt": {"type": "integer"}}}}
DataWorks DataIntegrationに切り替えて、、データソースにElasticsearchを追加します。
データソースとしてElasticsearchのPublic Endpoint、ユーザー名、パスワードなどの情報を入力し、接続テストを実行します。
接続テストで問題なければ、Completeボタンをクリックすることで、Elasticsearchのデータソースが追加されます。
この準備が終わり次第、データを同期してみます。データ同期にはGUIモードとスクリプトモードの2つのパターンがあります。まずはGUIモードで同期します。
スクリプトモードはtemplateな扱いができるため、後日この作業の自動化したい場合、活用できればと思います。
MaxCompute TableをElasticsearchへ移行(GUIモード)
STEP1: workflow作成
DataWorks DataIntegrationから、新規オフライン同期タスクをクリックし、DataStdio画面へ遷移します。
DataStdio画面にて、「Create Node」らダイアログが表示されますが、ここではクローズします。
DataStdio画面にてWorkflowを作成します。
STEP2: DI 同期タスクを作成
同期タスクを作成します。
ソースをODPSに選択して、テーブルを選択します。そのあとはPreviewボタンをクリックします。
Previewでデータが表示されます。
ターゲットをElasticsearchに選択します。
Elasticsearchの場合、IndexとIndex Typeが必須なので入力し、Advanced Settingsをクリックします。
Advanced Settings画面にて、Auto Mappingを有効に設定します。 (Elasticsearchがver7.xの場合は必要です)
設定後はTarget Field ボタンをクリックすることで編集します。
odpsのデータに対応するフィールドを入力します。
{"name":"create_time","type":“id"}{"name": "category","type": "text"}{"name": "brand","type": "text"}{"name": "buyer_id","type": "text"}{"name": "trans_num","type": "integer"}{"name": "trans_amount","type": "double"}{"name": "click_cnt","type": "integer"}
STEP3: DI 同期タスクを実行
タスクを保存して、実行します。
タスクが成功すると、Logが表示されます。
今度はElasticsearch - kibana画面に遷移し、データが届いてるかを確認します。
KibanaコンソールのDevToolより、データを検索します。
POST /odps_index/_search?pretty{"query": { "match_all": {} }}
その結果、GUIモードでMaxComputeのデータをElasticsearchへ同期したことが確認できました。
MaxCompute TableをElasticsearchへ移行(スクリプトモード)
STEP1: workflow作成
DataWorks DataIntegrationから、新規オフライン同期タスクをクリックし、DataStdio画面へ遷移します。
DataStdio画面にて、「Create Node」らダイアログが表示されますが、ここではクローズします。
DataStdio画面にてWorkflowを作成します。
STEP2: DI 同期タスクを作成
同期タスクを作成します。
ソースをODPSに選択して、テーブルを選択します。そのあとはPreviewボタンをクリックします。
Previewでデータが表示されます。
ターゲットをElasticsearchに選択します。
Elasticsearchの場合、IndexとIndex Typeが必須なので入力し、Advanced Settingsをクリックします。
Advanced Settings画面にて、Auto Mappingを有効に設定します。 (Elasticsearchがver7.xの場合は必要です)
設定後はTarget Field ボタンをクリックすることで編集します。
odpsのデータに対応するフィールドを入力します。
{"name":"create_time","type":“id"}{"name": "category","type": "text"}{"name": "brand","type": "text"}{"name": "buyer_id","type": "text"}{"name": "trans_num","type": "integer"}{"name": "trans_amount","type": "double"}{"name": "click_cnt","type": "integer"}
STEP3: スクリプトモードにスイッチ
Switch to Code Editorボタンをクリックし、スクリプトモードにスイッチします。
するとスクリプトが表示されます。これは先述、GUIモードで選択した設定が自動でスクリプトに反映されます。
STEP4: DI 同期タスクを実行
スクリプト(タスク)を実行します。 スクリプト(タスク)が成功すると、タスクとしてLogが表示されます。
あとは上記通り、Elasticsearch - kibanaでcheck、可視化できます。
最後に
本記事では、MaxComputeからElasticsearchへ連携する方法を簡単に説明しました。
MaxComputeのデータをElasticsearch - kibanaダッシュボードでリアルタイム可視化したい場合、参考に頂ければ幸いです。