LogServiceとHologres間のリアルタイムETLをする方法
LogServiceとHologres間のリアルタイムETLをする方法
本記事では、Hologres で、Log ServiceとHologres間のリアルタイムETLをする方法をご紹介します。
Hologresとは
Hologres はリアルタイムのインタラクティブ分析サービスです。高い同時実行性と低いレイテンシーでTB、PBクラスのデータの移動や分析を短時間で処理できます。PostgreSQL11と互換性があり、データを多次元で分析し、ビジネスインサイトを素早くキャッチすることができます。
少し前になりますが、Hologresについての資料をSlideShareへアップロードしていますので、こちらも参考になればと思います。
Log ServiceとHologres間のリアルタイムETLについて
このガイドラインでは、Log ServiceとHologres間のリアルタイムETLサービスを段階的に作成します。今回、Alibaba Cloud LogServiceとHologres、Object Storage Service (OSS)、Realtime Compute for Apache Flink(フルマネージド型Apache Flink)を使用します。
このチュートリアルについて
対象者:
本ガイドラインは、以下のような方を対象としています
- Alibaba Cloud、Hologres、LogService、OSS(Object Storage Service)、Realtime Compute for Apache Flink(フルマネージド型Apache Flink)に関する基本的な知識を持っている
前提条件:
- Alibaba Cloud のアカウントを所持している
- Hologres、LogService、OSS(Object Storage Service)Realtime Compute for Apache Flink(フルマネージド型Apache Flink)が使用可能な状態になっている
- 使用するHologres、LogService、OSS(Object Storage Service)Realtime Compute for Apache Flink(フルマネージド型Apache Flink)は同一Region配下にある
- 少なくとも1つのHologresインスタンス、1つのOSS(Object Storage Service)bucket、Realtime Compute for Apache Flink(フルマネージド型Apache Flink)の2CUを持っている。
Hologresのnon-partitioned tableでの作業
Hologresのnon-partitioned tableを使ったリアルタイムETLサービスの手順は以下の通りです。
LogService(SLS)の準備
RealtimeCompute(フルマネージド型Apache Flink)では、LogServiceプロジェクトの logstore をソーステーブルとして動作します。
Web Tacking
を有効にした新規プロジェクトとlogstoreを作成します。
LogService(SLS)では、複数の方法でデータのインポートをサポートしています。
例としてWebTrackingを挙げます。
https://www.alibabacloud.com/cloud-tech/doc-detail/31752.htm
curl --request GET 'http://${project}.${host}/logstores/${logstore}/track? APIVersion=0.6.0&key1=val1&key2=val2'
続いて、Pythonスクリプトを使って、関連テストレコードを出力するようにします。
import randomimport requestsimport uuidproject = "<your project name>"endpoint = "<your region endpoint>"logstore = "<your logstore name>"topic = "<your topic name>"url = 'http://{0}.{1}/logstores/{2}/track?APIVersion=0.6.0&__topic__={3}&id={4}&name={5}&rating={6}'for i in range(<your record counts>):res = requests.get(url.format(project, endpoint, logstore, topic, uuid.uuid1(), uuid.uuid1(), str(random.randint(1, 5))))print(res)
対象となるLogStoreを入力し、インデックス属性を有効にします。
今回は id
、name
、rating
の3つのインデックスがあります。
データ生成スクリプトを実行すると、コンソールに関連するログが表示されます。
Hologresの準備
HologresのHoloWebにて、book_sls
テーブル を結果テーブルとして作成します。
テーブルレコードを確認し、Flinkサービスの実行前に空になっていることを確認します。
Realtime Compute for Apache Flink(フルマネージド型Apache Flink)の準備
Realtime Compute for Apache Flink(フルマネージド型Apache Flink)のコンソールで「draft editor」ページに移動し、新しいストリームSQLタスクを作成します。
schema
セクションに移動して、HologresとLog Serviceテーブル用のcreateステートメントを生成します。
Taskでテーブルスキーマ、接続情報、Insertステートメント(SQL)を更新し確認します。
詳細は、LogService Source Table および Hologres Result Table を参照してください。
https://www.alibabacloud.com/cloud-tech/doc-detail/176690.htm
https://www.alibabacloud.com/cloud-tech/doc-detail/178795.htm
CREATE TEMPORARY TABLE `sls_table_demo` (id string,`name` string,rating string,`__topic__` STRING METADATA VIRTUAL,`__source__` STRING METADATA VIRTUAL,`__timestamp__` BIGINT METADATA VIRTUAL) with ('connector' = 'sls','endpoint' = '<your log service endpoint>','project' = '<your log service project name>','logstore' = '<your log service logstore name>');CREATE TEMPORARY TABLE `hologres_table_demo` (id varchar,`name` varchar,rating varchar,sls_topic varchar,sls_source varchar,sls_timestamp bigint) with ('connector' = 'hologres','dbname' = '<your Hologres database name>','tablename' = '<your Hologres table name>','username' = '<your access key id>','password' = '<your access key secret>','endpoint' = '<your Hologres instance access info>');BEGIN STATEMENT SET;INSERT INTO hologres_table_demoSELECTid, `name`, rating, `__topic__` as sls_topic, `__source__` as sls_source, `__timestamp__` as sls_timestampFROM sls_table_demo;End;
Flinkサービスとの連携
準備したStreamingSQLタスクを公開してから、ジョブを開始します。
Deployments
ページで公開されたタスクを確認し、タスクを開始します。
もし、以下のようなSqlParseException
エラーが発生した場合は、Insertステートメント(SQL)文のカラムマッピングが原因です。
ソーステーブルと結果テーブルのカラム名が一致しているか確認してください。同一でない場合は、Insert SQL文の中でエイリアスを使って更新してください。
Failed to create the job graph for the job: 5fe48ce699684c55a5f702cbcae2485f (message = 36:1-36:5, Translating the JobGraph for this deployment failed before. Please delete the JobGraph before requesting a new translation.Error message: Failed to translate a SQL Script into a JobGraph.Cause: Could not parse SQL: Encountered "BEGIN" at line 36, column 1.Was expecting one of:<EOF>"LIKE" ...";" ....StackTrace:org.apache.flink.table.api.SqlParserException: Could not parse SQL: Encountered "BEGIN" at line 36, column 1.Was expecting one of:<EOF>"LIKE" ...";" ...at com.ververica.platform.sql.environment.VvpSqlParser.parse(VvpSqlParser.java:80)at com.ververica.platform.sql.entrypoint.SqlJobEntrypoint.executeInClassLoader(SqlJobEntrypoint.java:85)at com.ververica.platform.sql.entrypoint.SqlJobEntrypoint.lambda$execute$0(SqlJobEntrypoint.java:67)at com.ververica.platform.sql.classloader.ClassLoaderWrapper.execute(ClassLoaderWrapper.java:12)at com.ververica.platform.sql.entrypoint.SqlJobEntrypoint.execute(SqlJobEntrypoint.java:67)at com.ververica.platform.sql.entrypoint.SqlJobEntrypoint.main(SqlJobEntrypoint.java:54)at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.base/java.lang.reflect.Method.invoke(Method.java:566)at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:158)at org.apache.flink.client.deployment.application.ClassPathJobGraphRetriever.retrieveJobGraph(ClassPathJobGraphRetriever.java:84)at org.apache.flink.client.deployment.application.JobGraphTranslator.translateJobGraph(JobGraphTranslator.java:87)at org.apache.flink.client.deployment.application.JobGraphTranslationTask.run(JobGraphTranslationTask.java:89)at org.apache.flink.client.deployment.application.JobGraphTranslationTask.run(JobGraphTranslationTask.java:77)at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)at java.base/java.lang.Thread.run(Thread.java:984)Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "BEGIN" at line 36, column 1.Was expecting one of:<EOF>"LIKE" ...";" ...at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:446)at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:209)at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:140)at org.apache.calcite.sql.parser.SqlParser.parseStmtList(SqlParser.java:195)at com.ververica.platform.sql.environment.VvpSqlParser.parse(VvpSqlParser.java:77)... 19 moreCaused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "BEGIN" at line 36, column 1.Was expecting one of:<EOF>"LIKE" ...";" ...at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:40152)at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:39963)at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtList(FlinkSqlParserImpl.java:3346)at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtList(FlinkSqlParserImpl.java:261)at org.apache.calcite.sql.parser.SqlParser.parseStmtList(SqlParser.java:193)... 20 more)
もし、LogService(SLS)やRAMによって拒否されたことを示す LogException
が出る場合は、認証許可が原因です。
RAMコンソールで他のプロダクトサービスにアクセスするためにRealtime Compute for Apache Flink(フルマネージド型Apache Flink)に使用されるAliyunStreamAsiDefaultRole
という名前の既存のロールを見つけ、必要な関連ポリシーを追加する必要があります。
LogService(SLS)のアクセスポリシー (AliyunLogReadOnlyAccess
) を AliyunStreamAsiDefaultRole
に追加する例として、次の手順を実行します。
すべてがうまくいけば、ジョブは実行ステータスになり、データはHologresのnon-partitioned tableにリアルタイムで転送されます。
ここまで、LogService(SLS)とHologresのnon-partitioned table間のリアルタイムETLを実現しました。
Hologresのpartitioned tableでの作業
上記はnon-partitioned tableベースでの作業になりますが、今度はHologresのpartitioned tableを使ったリアルタイムETLサービスを構築します。手順は以下の通りです。
Book対するユーザー評価を収集し、LogService(SLS)に送信するシステムがあるとします。
データにはbook_idのみが含まれ、book_nameなどの詳細情報はCSVファイルとしてOSS(Object Storage Service)のバケットで管理されます。
Hologresテーブルはbook_idでパーティショニングされており、各Bookに基づいて迅速な統計を行うことができます。 データがHologresへリアルタイム転送されると、book_idに基づいてブックの詳細情報が同時に結合されます。
LogService(SLS)の準備
LogStoreを新規で作成します。このLogStoreはパーティション化されたテーブルの情報に一致する、インデックス属性情報を持ちます。
まずは上記のステップと同様にWebTrackingが有効になっていることを確認します。
新しいインデックス属性に基づいてテストレコードを生成するPythonスクリプトを更新します。
import randomimport requestsimport uuidproject = "<your project name>"endpoint = "<your region endpoint>"logstore = "<your logstore name>"topic = "<your topic name>"url = 'http://{0}.{1}/logstores/{2}/track?APIVersion=0.6.0&__topic__={3}&id={4}&user_id={5}&rating={6}'for i in range(<your record counts>):res = requests.get(url.format(project, endpoint, logstore, topic, str(random.randint(1, 9)), uuid.uuid1(), str(random.randint(1, 5))))print(res)
Hologresの準備
Hologresでpartition tableを新規作成します。
OSS(Object Storage Service) の準備
OSS(Object Storage Service)のバケットに次のようなBook情報の入ったcsvファイルをアップロードします。
1,嫌われる勇気2,ノルウェイの森3,海辺のカフカ4,色彩を持たない多崎つくると彼の巡礼の年5,容疑者Ⅹの献身6,人間失格7,こころ8,天声人語9,幸せになる勇気
Realtime Compute for Apache Flink(フルマネージド型Apache Flink)の準備
Realtime Compute for Apache Flink(フルマネージド型Apache Flink)コンソールのdraft editor
ページに移動し、別のストリームSQLタスクを作成します。
schema
セクションに移動して、再びHologresとLogService(SLS)テーブル用のcreateステートメントを生成します。
HologresとLogService(SLS)テーブルの他に、もう一つfilesystem
というテーブルを追加しなければなりません。これは、OSS(Object Storage Service)のバケットにあるCSVファイルを元にしたFileSystemディメンションテーブルに使用されます。
Hologresの設定情報の中で、partitionrouter
はパーティションテーブルにデータを書き込むかどうかを指定し、createparttable
は存在しないパーティションテーブルを自動的に作成して、そこにパーティション値に基づいてデータを書き込むかどうかを指定します。両方とも「true」に設定すると、スクリプトは対象のテーブルをパーティショニングされたものと認識し、値(ここではbook_id
)に基づいて対象のパーティションテーブルを自動的に作成します。
詳細は、LogService Source Table および Hologres Result Table 、FileSystem Dimension Tableを参照してください。
CREATE TEMPORARY TABLE `sls_table_demo_p` (id string,`user_id` string,rating string,`__topic__` STRING METADATA VIRTUAL,`__source__` STRING METADATA VIRTUAL,`__timestamp__` BIGINT METADATA VIRTUAL) with ('connector' = 'sls','endpoint' = '<your log service endpoint>','project' = '<your log service project name>','logstore' = '<your log service logstore name>');CREATE TEMPORARY TABLE `hologres_table_demo_p` (id varchar,`user_id` varchar,`name` varchar,rating varchar,sls_topic varchar,sls_source varchar,sls_timestamp bigint) with ('connector' = 'hologres','dbname' = '<your Hologres database name>','tablename' = '<your Hologres table name>','username' = '<your access key id>','password' = '<your access key secret>','endpoint' = '<your Hologres instance access info>','partitionrouter' = 'true','createparttable' = 'true');CREATE TEMPORARY TABLE `oss_table_p` (id varchar,`name` varchar) with ('connector' = 'filesystem','path' = '<your csv path in OSS bucket>','format' = 'csv');BEGIN STATEMENT SET;INSERT INTO hologres_table_demo_pSELECTs.id, s.`user_id`, o.`name`, s.rating, s.`__topic__` as sls_topic, s.`__source__` as sls_source, s.`__timestamp__` as sls_timestampFROM sls_table_demo_p AS sJOIN oss_table_p FOR SYSTEM_TIME AS OF proctime() AS oON s.id = o.id;End;
Flinkサービスとの連携
パーティショニングされたテーブルのために、新しいcreate stream SQLタスクを開始します。
ジョブを開始し、LogStoreにテストレコードを作成してから、実行結果を確認します。
以上で、LogService(SLS)とOSS、Hologresのpartitioned table間のリアルタイムETLを実現しました。
最後に
ここまで、LogService(SLS)とOSS、Realtime Compute for Apache Flink(フルマネージド型Apache Flink)を使ってHologres(partition付きテーブル、Non-partition)のリアルタイムETLをする方法を紹介しました。
この方法を生かすことで、LogServiceからリアルタイムデータ収集した後に、Hologresでリアルタイム可視化を実現することができます。