Apache SparkからClickHouseへデータをリアルタイム格納する方法
Apache SparkからClickHouseへデータをリアルタイム格納する方法
本記事では、Apache SparkからApsaraDB for ClickHouse へリアルタイ ムでデータ連携する方法をご紹介します。

ClickHouseとは
ClickHouseは非集計データを含む大量のデータを安定的かつ継続しながら集計といったリアルタイム分析を支える列指向の分散型データベースサービスです。 トラフィック分析、広告およびマーケティング分析、行動分析、リアルタイム監視などのビジネスシナリオで幅広く 使用されています。
Apache Sparkとは
オープンソースのビッグデータと機械学習のための非常に高速な分散処理フレームワークです。 Apache SparkはE-MapReduceやDataLake Analytics、MaxComputeなどにて付帯しています。
少し前になりますが、Apache Sparkを含む、E-MapReduceについての資料をSlideShareへアップロードしていますので、こちらも参考になればと思います。
https://www.slideshare.net/sbopsv/alibabacloudemapreduce-231725148
本記事では、Apache SparkからClickHouseへデータをリアルタイム格納してみます。構成図は次の通りです。

1.ClickHouseClientの準備
1-1.ClickHouseインスタンスを準備します
この手法は過去の記事でも記載していますが、再掲として記載します。
https://sbopsv.github.io/cloud-tech/usecase-ClickHouse/ACH_002_clickhouse-quick-start
1)まずはApsaraDB for ClickHouseインスタンスを作成します。
①VPCを作成


②ClickHouseインスタンスを作成
著者は以下のインスタンススペックでインスタンスを作成しています。
ClickHouse version:20.8.7.15 Edition:Single-replica Edition



2)ClickHouseの登録アカウントを作成
インスタンスをクリックし、左側にアカウント管理画面で、アカウントを作成します

3)ClickHouseクラスターにDMSで接続
①ClickHouseのインスタンスをクリックし、トップメニューの「Log On to Database」をクリックします

② DBアカウントとパスワードを入力し、ClickHouseへログイン

③DMS画面でClickHouseのインスタンスが表示されます

2. Apache Spark環境の準備
2-1.IntelliJ IDEAをインストールします。(具体的な説明は本記事では省略)
1)IntelliJ IDEAを起動します

2-2.IntelliJ IDEAでSBTプラグインをインストールします
1)下記リンクからSBTプラグインをダウンロードします

2)ダウンロードされたSBTzipファイルをIntelliJ IDEAのプラグインフォルダに置き、SBTプラグインをインストールします
① メニューバー [File] > [Settings] を開きます

②Pluginsを選択し、Install Plugin from Disk設定メニューをクリックします

③プラグインを選択します

④SBTプラグインが表示されるのを確認します

⑤IDEを再起動します

⑥SBTプラグインがインストールされます

2-3.SBTインストール
SBTインストール をします。この手順はWindows環境での実行となります。
著者はMacを持っていないため、お手数ですがネットなどの情報を参考に構築いただければ幸いです。
1)下記リンクからsbt-1.3.8.zipをローカルにダウンロードします
①sbt-1.3.8.zipをダウンロードします


②sbt-1.3.8.zipを解凍します

2)MyComputerプロパティから環境パスを設定
①SBT_HOMEを設定します

②SBT binパスを追加します

3)CMDを開き、下記コマンドでsbtを確認します
①CMDを開き、sbtを入力します
# sbt


②sbtバージョンを確認します


2-4.Intellij ideaでScalaプラグインをインストールします
1)下記リンクからscalaプラグインをインストールします
前提条件として、Intellij ideaが起動されていることが必須です
①Install to IntelliJ IDEA2020.1.1ボタンをクリックします

②Successと表示されます

③IDEAでscalaのインストール画面が表示されるのを確認します

④Scalaをインストールします

⑤インストール後、IDEAでFile‐New‐Projectプロジェクトの作成画面にScalaメニューが追加されます

3.Sparkプロジェクトの準備
3-1.Sparkプロジェクトを作成します
1)Sparkプロジェクトを作成します
① メニューバーで File > New > Project をクリックします

②Scalaを選択し、sbtを選択します

③プロジェクトフォルダを選択し、JDK、sbt、Scalaを設定します
JDK:1.8sbt:1.3.8Scala:2.12.0

④プロジェクトが作成されます


3-2.Sparkプロジェクトのディレクトリを準備します
1)ディレクトリ構造
./src./src/main./src/main/scala./src/main/scala/com./src/main/scala/com/spark./src/main/scala/com/spark/test./src/main/scala/com/spark/test/WriteToCk.scala./build.sbt./assembly.sbt./project/plugins.sbt
2)./src/main/scala/com/spark/test/WriteToCk.scalaを作成します




3)WriteToCk.scalaを編集します
①WriteToCk.scalaサンプル を次の通りに入力します
package com.spark.testimport java.utilimport java.util.Propertiesimport org.apache.spark.sql.execution.datasources.jdbc.JDBCOptionsimport org.apache.spark.SparkConfimport org.apache.spark.sql.{SaveMode, SparkSession}import org.apache.spark.storage.StorageLevelobject WriteToCk {val properties = new Properties()properties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")properties.put("user", "<your-user-name>")properties.put("password", "<your-password>")properties.put("batchsize","100000")properties.put("socket_timeout","300000")properties.put("numPartitions","8")properties.put("rewriteBatchedStatements","true")val url = "jdbc:clickhouse://<you-url>:8123/default"val table = "<your-table-name>"def main(args: Array[String]): Unit = {val sc = new SparkConf()sc.set("spark.driver.memory", "1G")sc.set("spark.driver.cores", "4")sc.set("spark.executor.memory", "1G")sc.set("spark.executor.cores", "2")val session = SparkSession.builder().master("local[*]").config(sc).appName("write-to-ck").getOrCreate()val df = session.read.format("csv").option("header", "true").option("sep", ",").option("inferSchema", "true").load("</your/path/to/test/data/a.txt>").selectExpr("Year","Quarter","Month").persist(StorageLevel.MEMORY_ONLY_SER_2)println(s"read done")df.write.mode(SaveMode.Append).option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 100000).jdbc(url, table, properties)println(s"write done")df.unpersist(true)}}
パラメータ説明:
- your-user-name:ターゲットClickHouseクラスターで作成されたデータベースアカウント名
- password:データベースアカウント名に対応するパスワード
- your-url:ターゲットClickHouseクラスターアドレス(VPCエンドポイントで設定することをお勧めする)
- /your/path/to/test/data/a.txt:インポートするデータファイルのパス(ファイルアドレスとファイル名を含む)
- your-table-name:ClickHouseクラスターのターゲットテーブル名
②WriteToCk.scalaを編集します
package com.spark.testimport java.util.Propertiesimport org.apache.spark.SparkConfimport org.apache.spark.sql.execution.datasources.jdbc.JDBCOptionsimport org.apache.spark.sql.{SaveMode, SparkSession}import org.apache.spark.storage.StorageLevelobject WriteToCk {val properties = new Properties()properties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")properties.put("user", "sbtest")properties.put("password", "Test1234")properties.put("batchsize","100000")properties.put("socket_timeout","300000")properties.put("numPartitions","8")properties.put("rewriteBatchedStatements","true")val url = "jdbc:clickhouse://cc-0iw4v4hezq9lw9333.ads.aliyuncs.com:8123/default"val table = "spark_table_distributed"def main(args: Array[String]): Unit = {val sc = new SparkConf()sc.set("spark.driver.memory", "1G")sc.set("spark.driver.cores", "4")sc.set("spark.executor.memory", "1G")sc.set("spark.executor.cores", "2")val session = SparkSession.builder().master("local[*]").config(sc).appName("write-to-ck").getOrCreate()val df = session.read.format("csv").option("header", "true").option("sep", ",").option("inferSchema", "true").load("oss://spark-clickhouse/data/access_log_csv.txt").select("*").persist(StorageLevel.MEMORY_ONLY_SER_2)println(s"read done")df.write.mode(SaveMode.Append).option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 100000).jdbc(url, table, properties)println(s"write done")df.unpersist(true)}}
※今回の例はOSSに保存するtxtファイルをSparkで読み取るものです
※予め、access_log_csv.txt をOSS ("oss://spark-clickhouse/data/access_log_csv.txt") にアップロードしておきます

4)./build.sbt構成ファイルを編集して、依存関係を追加します
①build.sbtを下記のように編集します
lazy val sparkSettings = Seq(organization := "com.spark.test",version := "0.1",scalaVersion := "2.12.0",libraryDependencies ++= Seq("org.apache.spark" %% "spark-sql" % "3.0.0" % "provided","org.apache.spark" %% "spark-core" % "3.0.0" % "provided","com.alibaba" % "fastjson" % "1.2.4" % "provided","ru.yandex.clickhouse" % "clickhouse-jdbc" % "0.2.4"))lazy val root = (project in file(".")).settings(sparkSettings,name := "sparkdemo",mainClass in assembly := Some("com.spark.test.WriteToCk"),assemblyJarName in assembly := "nancy-spark-test-WriteToCk.jar",assemblyMergeStrategy in assembly := {case PathList("javax", "servlet", xs @ _*) => MergeStrategy.firstcase PathList("javax", "inject", xs @ _*) => MergeStrategy.firstcase PathList("javax", "activation", xs @ _*) => MergeStrategy.firstcase PathList("javax", "xml", xs @ _*) => MergeStrategy.firstcase PathList("org", "apache", xs @ _*) => MergeStrategy.firstcase PathList("org", "aopalliance", xs @ _*) => MergeStrategy.firstcase PathList("org", "ow2", xs @ _*) => MergeStrategy.firstcase PathList("net", "jpountz", xs @ _*) => MergeStrategy.firstcase PathList("com", "google", xs @ _*) => MergeStrategy.firstcase PathList("com", "esotericsoftware", xs @ _*) => MergeStrategy.firstcase PathList("com", "codahale", xs @ _*) => MergeStrategy.firstcase PathList("com", "yammer", xs @ _*) => MergeStrategy.firstcase PathList("com", "fasterxml", xs @ _*) => MergeStrategy.firstcase "about.html" => MergeStrategy.renamecase "META-INF/mailcap" => MergeStrategy.firstcase "META-INF/mimetypes.default" => MergeStrategy.firstcase "plugin.properties" => MergeStrategy.firstcase "git.properties" => MergeStrategy.firstcase "log4j.properties" => MergeStrategy.firstcase "module-info.class" => MergeStrategy.discardcase x =>val oldStrategy = (assemblyMergeStrategy in assembly).valueoldStrategy(x)})
※assemblyMergeStrategyでJarパッケージが重複するエラーを解決します
※./assembly.sbt-> sbt assemblyでパッケージする方法ではru.yandex.clickhouseの3rdパーティーを引用することができます。sbt packageで作成したJarパッケージではclickhouseの3rdパーティーには含まれません。

5)./assembly.sbtを編集します
※assemblyプラグインのインストール方法はassembly関連のコンフィグファイルを正しく設定し、sbt updateコマンドを実行します
①assembly.sbtを下記のように編集します
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "1.0.0")resolvers += Resolver.url("bintray-sbt-plugins", url("https://scala.jfrog.io/artifactory/sbt-plugin-releases/sbt-plugin-releases"))(Resolver.ivyStylePatterns)
※sbtとassemblyのバージョンにはご注意ください

6)./project/plugins.sbtを編集します
logLevel := Level.WarnaddSbtPlugin("com.eed3si9n" % "sbt-assembly" % "1.0.0")

7)sbt updateでsbt-assemblyをインストールします
# sbt update

8)プラグインを確認します
①sbt pluginsを入力します
# sbt plugins

②sbtassembly.AssemblyPluginがインストールされます

ここまでsbt-assemblyがインストール完了されたことを確認します
9)Spark依頼をインポートします
プロジェクトを正しくビルドするため、Spark依頼をインポートします
①下記リンクからspark-3.1.2-bin-hadoop3.2.tgzをダウンロードします


②spark-3.1.2-bin-hadoop3.2.tgzを解凍し、プロジェクトの右クリックメニューから依頼Jarを追加します

③Libraries‐NewProjectLibraryにJavaを選択します

④spark-3.1.2-bin-hadoop3.2のjarsフォルダを選択します

⑤「OK」をクリックします

⑥プロジェクトをBuildします

⑦プロジェクトが正しくビルドされたことを確認します

ここまで手順で問題なければプロジェクトのビルドが無事成功したと思います
3-3.Sparkプロジェクトをパッケージします
1)下記コマンドでsbtクリアします
sbt clean

2)下記コマンドでパッケージを実行します
sbt assembly

3)nancy-spark-test-WriteToCk.jarパッケージが生成されるのを確認します

4. Apache SparkからtxtファイルデータをClickHouseへ格納
4-1.OSSにファイルをアップロードします
1)OSSバケットを作成します
①OSSを選択します

②Bucketメニューを選択します

③Bucket作成をクリックします

④デフォルト設定でBucketを作成します

2)Step2-3で生成されたnancy-spark-test-WriteToCk.jarファイルをOSSにアップロードします
①フォルダ作成ボタンをクリックし、jarフォルダを作成します

②jarsフォルダに遷移し、Uploadメニューをクリックし、ファイルをアップロードします

3)txtファイルをOSSにアップロードします
①サンプルファイルを準備します
"id","user_name","age","city","access_url"1,tick,32,shanghai,http://xdbdsd.com/xgwgwe2,wangl,22,beijing,http://ghwbw.com/xgwgwe3,xiaoh,23,shenzhen,http://holko.com/xgwgwe4,jess,45,hangzhou,http://jopjop.com/xgwgwe5,jack,14,shanghai,http://wewsd.com/xgwgwe6,tomy,25,hangzhou,http://sbedr.com/xgwgwe7,lucy,45,shanghai,http://ghhwed.com/xgwgwe8,tengyin,26,shanghai,http://hewhe.com/xgwgwe9,cuos,27,shenzhen,http://yoiuj.com/xgwgwe10,wangsh,37,shanghai,http://hhou.com/xgwgwe

② Create Folderをクリックし、dataフォルダを作成します

③ dataフォルダに遷移し、Uploadメニューをクリックし、ファイルをアップロードします
※data保存パスはCodeに書いてあるファイルパスと同じです

4-2.ClickHouseでテーブルを作成します
1)DMSでClickhouseを接続します



2)ClickHouseでデフォルトDBにローカルテーブルを作成します
create table spark_table_local on cluster default (id UInt8,user_name String,age UInt16,city String,access_url String)engine = MergeTree()order by id;

3)ClickHouseでデフォルトDBに分散テーブルを作成します
create table spark_table_distributed on cluster default(id UInt8,user_name String,age UInt16,city String,access_url String)engine = Distributed(default, default, spark_table_local, id);

4-3.EMRのSparkタスクでデータをClickHouseにインポートします
4-3-1.EMRインスタンスを作成します
①コンソール画面でEMRを検索します

②日本リージョンを選択し、ClusterWizardをクリックします

③ClusterタイプでHadoopを選択します

④従量課金を選択し、ClickHouseと同じVPCを設定します

⑤Cluster基本情報を設定し、PublicIPをオンにします

⑥Cluster情報を確認します

⑦EMRClusterを作成完成します

4-3-2.EMRでプロジェクトを作成します
①EMRClusterを選択し、EMR情報画面を表示します

②「Data Platform」をクリックします

③「Create Project」をクリックし、プロジェクトを作成します

④Projectを設定します

⑤プロジェクトが作成されたことを確認します

4-3-2.EMRでSparkJobを作成します
①Projectをクリックし、Workflows画面を表示します

②EditJobをクリックし、Job作成メニューをクリックします

③sparkジョブが作成されました

4-3-4.EMRでSparkJobを実行します
1)jarファイルパスを自動的に入力します
①Enter an OSS pathメニューでOSSバケットを選択し、OSSREFを選択します

②nancy-spark-test-WriteToCk.jarを選択します

③コンソール画面でossref://spark-clickhouse/jars/nancy-spark-test-WriteToCk.jarを入力します

2)sparkジョブで下記コマンドを入力します
--class com.spark.test.WriteToCk --master yarn-client --driver-memory 7G --executor-memory 5G --executor-cores 1 --num-executors 32 ossref://spark-clickhouse/jars/nancy-spark-test-WriteToCk.jar
①「run」をクリックします

②ResourceGroupを設定します

③spark jobを実行します

④record画面でジョブ実行状態を確認します

4-3-5.DMSでClickHouseのデータを確認します
1)DMSでtxtデータをClickHouseにインポートすることを確認します
①分散テーブルを検索します
SELECT * FROM spark_table_distributed;SELECT COUNT(*) FROM spark_table_distributed;


②ローカルテーブルを検索します
SELECT * FROM spark_table_local;SELECT COUNT(*) FROM spark_table_local;


最後に
ここまで、Apache Spark- ApsaraDB for ClickHouseへデータ連携する方法を紹介しました。
ApsaraDB for ClickHouseはApache Sparkとスムーズに連携できるので、例えば、Spark StreamingやDataLake Analytics Serverless SparkなどからApsaraDB for ClickHouseへリアルタイムデータ連携しつつ、ClickHouseでリアルタイム可視化、といったソリューションを構築することもできます。
