Apache kafka(Message Queue for Apache Kafka)からClickHouseへデータ連携する方法
Apache kafka(Message Queue for Apache Kafka)からClickHouseへデータ連携する方法
本記事では、Apache KafkaからApsaraDB for ClickHouse へリアルタイ ムでデータ連携する方法をご紹介します。
構成図は次の通りです。
ClickHouseとは
ClickHouseは非集計データを含む大量のデータを安定的かつ継続しながら集計といったリアルタイム分析を支える列指向の分散型データベースサービスです。 トラフィック分析、広告およびマーケティング分析、行動分析、リアルタイム監視などのビジネスシナリオで幅広く 使用されています。
Apache kafka(Message Queue for Apache Kafka)とは
Apache kafkaはスケーラビリティに優れた分散メッセージングシステムです。
これは元々Apacheオープンソースとして展開されていましたが、これをAlibabaによりフルマネージド型サービスとして登場したのがMessage Queue for Apache Kafkaです。
1.ClickHouseの準備
1-1.ClickHouseインスタンスを準備します
この手法は過去の記事でも記載していますが、再掲として記載します。
https://sbopsv.github.io/cloud-tech/usecase-ClickHouse/ACH_002_clickhouse-quick-start
1)まずはApsaraDB for ClickHouseインスタンスを作成します。
①VPCを作成
②ClickHouseインスタンスを作成
著者は以下のインスタンススペックでインスタンスを作成しています。
ClickHouse version:20.8.7.15 Edition:Single-replica Edition
2)ClickHouseの登録アカウントを作成
インスタンスをクリックし、左側にアカウント管理画面で、アカウントを作成します
3)ClickHouseクラスターにDMSで接続
①ClickHouseのインスタンスをクリックし、トップメニューの「Log On to Database」をクリックします
② DBアカウントとパスワードを入力し、ClickHouseへログイン
③DMS画面でClickHouseのインスタンスが表示されます
2.Apache kafka(Message Queue for Apache Kafka)の準備
2-1.Kafkaインスタンスを作成します
1)AlibabaCloudのサイトをログインし、Message Queue for Apache Kafka を選択します
2)Kafkaインスタンスを作成します
3)Kafkaをデプロイします
4)Topicを作成します
①Kafka詳細画面またはTopic画面に、Topic作成をクリックします
②Topic情報を入力し、Topicを作成します
③Topicを作成しました
5)Consumer Groupを作成します
①Consumer Group画面にConsumer Group作成をクリックします
②Consumer Group情報を入力し、Consumer Groupを作成します
③Consumer Groupを作成しました
2-2.IntelliJ IDEAを使ってJava SDKでKafkaデータを作成します
1)JavaProjectを作成します
①IntelliJ IDEAを開き、ファイルメニューからプロジェクトをクリックします
②Mavenを選択します
③プロジェクト名を設定します
④プロジェクト名とパスを設定します(事前にフォルダを作成する必要があります)
⑤プロジェクトを作成します
⑥Project Encodingを設定します
2)Kafkaデータを生成します
①下記Java依存関係ライブラリをpom.xmlに追加します
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.11.0.3</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.6</version></dependency><dependency><groupId>commons-cli</groupId><artifactId>commons-cli</artifactId><version>1.4</version></dependency>
②log4j.propertiesコンフィグファイルを作成します
# Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.log4j.rootLogger=INFO, STDOUTlog4j.appender.STDOUT=org.apache.log4j.ConsoleAppenderlog4j.appender.STDOUT.layout=org.apache.log4j.PatternLayoutlog4j.appender.STDOUT.layout.ConversionPattern=[%d] %p %m (%c)%n
③kafka.propertiesを作成します
## アクセスポイントを設定することで、コンソール画面の詳細画面にデフォルトアクセスポイントが表示されますbootstrap.servers=172.16.0.84:9092,172.16.0.83:9092,172.16.0.82:9092## Topicを設定することで、コンソール画面にTopicが作成されますtopic=topic_ck_new## Consumer Groupを設定することで、コンソール画面にConsumer Groupが作成されますgroup.id=group_ck
④JavaKafkaConfigurer.javaを作成します
import java.util.Properties;public class JavaKafkaConfigurer {private static Properties properties;public synchronized static Properties getKafkaProperties() {if (null != properties) {return properties;}//kafka.propertiesの内容を取得Properties kafkaProperties = new Properties();try {kafkaProperties.load(KafkaProducerDemo.class.getClassLoader().getResourceAsStream("kafka.properties"));} catch (Exception e) {//ファイルをロード失敗しました、Quite可能e.printStackTrace();}properties = kafkaProperties;return kafkaProperties;}}
⑤KafkaProducerDemo.javaを作成します
import java.util.ArrayList;import java.util.List;import java.util.Properties;import java.util.concurrent.Future;import java.util.concurrent.TimeUnit;import org.apache.kafka.clients.CommonClientConfigs;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;public class KafkaProducerDemo {public static void main(String args[]) {//kafka.propertiesをロードしますProperties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();Properties props = new Properties();//アクセスポイントを設定、コンソール画面に該当Topicのアクセスポイントを取得しますprops.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));//Kafkaメッセージのシリアル化方法。props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//リクエストの最大待機時間。props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);//内部クライアントの再試行回数を設定します。props.put(ProducerConfig.RETRIES_CONFIG, 5);//クライアントの内部再試行間隔を設定します。props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 3000);//Producerオブジェクトを作成します,このオブジェクトはスレッドセーフであることに注意してください。一般的に、プロセス内の1つのProducerオブジェクトで十分です。//パフォーマンスを向上させたい場合は、さらにいくつかのオブジェクトを作成できますが、多すぎないようにします。できれば5つ以下にします。KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);//Kafkaメッセージを作成する。String topic = kafkaProperties.getProperty("topic"); //メッセージのTopic、コンソール画面で作成後、入力しますString value = "this is the message's value"; //メッセージ内容。try {//Futureオブジェクトをバッチ取得することでスピードアップできる。バッチ量は大きく設定しないことをご注意くださいList<Future<RecordMetadata>> futures = new ArrayList<Future<RecordMetadata>>(128);for (int i =0; i < 100; i++) {//メッセージを発信、Futureオブジェクトを取得しますString msgBody = "{'index':" + i + ", 'content': '" + value + ": " + i + "'}"; //jsonastringProducerRecord<String, String> kafkaMessage = new ProducerRecord<String, String>(topic, msgBody);//jasonasstring//ProducerRecord<String, String> kafkaMessage = new ProducerRecord<String, String>(topic, value + ": " + i);Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage);futures.add(metadataFuture);}producer.flush();for (Future<RecordMetadata> future: futures) {//Futureオブジェクトの結果を同時に取得しますtry {RecordMetadata recordMetadata = future.get();System.out.println("Produce ok:" + recordMetadata.toString());} catch (Throwable t) {t.printStackTrace();}}} catch (Exception e) {//クライアント内部リトライ後、再度発信失敗、このエラーを対応する必要がありますSystem.out.println("error occurred");e.printStackTrace();}}}
⑥プロジェクトをコンパイルし、KafkaConsumerDemo.javaを実行します
import java.util.ArrayList;import java.util.List;import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.producer.ProducerConfig;public class KafkaConsumerDemo {public static void main(String args[]) {//kafka.propertiesをロードしますProperties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();Properties props = new Properties();//アクセスポイントを設定、コンソール画面に該当Topicのアクセスポイントを取得しますprops.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));//2つのポーリング間の最大許容間隔。//コンシューマーがこの値を超えるとハートビートを返さない、サーバーはコンシューマーが非ライブ状態であると判断し、サーバーはコンシューマーをコンシューマーグループから削除してリバランスをトリガーします。デフォルトは30秒です。props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);//毎回ポーリングの最大数。//この値は大きく設定しないように、ポーリングのデータが多すぎると次のポーリングの前に消費できない場合、SLBがトリガーされ、フリーズが発生します。props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);//メッセージを逆シリアル化する方法props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//コンシューマーインスタンスが属するコンシューマーグループは、コンソールアで作成後入力します。//同じグループに属するコンシューマーインスタンスは、コンシューマーメッセージをロードしますprops.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));//メッセージオブジェクトを構築します。つまり、消費インスタンスを生成しますKafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);//コンシューマーグループがサブスクライブするトピックを設定する。複数のトピックをサブスクライブできます。//GROUP_ID_CONFIGが同じである場合は、サブスクライブされたトピックを同じに設定することをお勧めします。List<String> subscribedTopics = new ArrayList<String>();//複数のトピックをサブスクライブする必要がある場合は、ここに追加してください。//各トピックは、予めコンソールで作成する必要がありますString topicStr = kafkaProperties.getProperty("topic");String[] topics = topicStr.split(",");for (String topic: topics) {subscribedTopics.add(topic.trim());}consumer.subscribe(subscribedTopics);//リサイクルでメッセージを消費しますwhile (true){try {ConsumerRecords<String, String> records = consumer.poll(1000);//次のポーリングの前にこのデータが消費される必要があり、合計時間はSESSION_TIMEOUT_MS_CONFIGを超えてはなりません。//メッセージを消費するために単独なスレッドプールを開くことをお勧めします。結果を非同期で返すことfor (ConsumerRecord<String, String> record : records) {System.out.println(String.format("Consume partition:%d offset:%d", record.partition(), record.offset()));}} catch (Exception e) {try {Thread.sleep(1000);} catch (Throwable ignore) {}e.printStackTrace();}}}}
⑦プロジェクトをコンパイルし、KafkaProducerDemo.javaを実行します
⑧プロジェクトをビルドします
⑧下記コマンドでJarパッケージを作成します
mvn package
2-3.Linuxでkafkaデータを生成します
①linuxをログインし、jarファイルをLinuxにアップロードします
② jarを実行し、Kafkaデータを生成します
# java -jar kafkaclickhouse-1.0-SNAPSHOT-jar-with-dependencies.jar
③Kafkaコンソール画面でデータを確認します
④JarパッケージでKafkaデータを生成します
# java -classpath kafkaclickhouse-1.0-SNAPSHOT-jar-with-dependencies.jar com.test.kafka.KafkaProducerDemo
⑤JarパッケージでKafkaデータを消費します
# java -classpath kafkaclickhouse-1.0-SNAPSHOT-jar-with-dependencies.jar com.test.kafka.KafkaConsumerDemo
3.KafkaデータをClickHouseにインポート
3-1.ClickHouseでテーブルを作成します
1)DMSでClickhouseへ接続します
2)データベースを作成します
create database if not exists kafka_clickhouse_demo ON CLUSTER default;
3)ClickHouseでKafkaコンシューマーテーブルを作成します。ここではkafka_clickhouse_demoデータベースを使用します。
create table kafka_src_table(`message` String)ENGINE = Kafka()SETTINGS kafka_broker_list = '172.16.0.84:9092,172.16.0.83:9092,172.16.0.82:9092',kafka_topic_list = 'topic_ck_new',kafka_group_name = 'group_ck',kafka_format = 'JSONAsString';
※kafka_formatおよびパラメータがKafkaのデータソースと一致する必要があります
※Kafkaコンシューマーテーブル消費テーブルを結果テーブルとして直接使用することはできません。Kafka消費テーブルは、Kafkaデータを消費するためにのみ使用されます。そのため、kafka側で実際にすべてのデータを保存しているわけではありません。
4)ClickHouseでローカルテーブルを作成します
CREATE TABLE kafka_table_local(`message` String)ENGINE = MergeTree()ORDER BY message;
5)Clickhouseの分散テーブルを作成します
CREATE TABLE kafka_table_distributed ON CLUSTER default AS kafka_clickhouse_demo.kafka_table_localENGINE = Distributed(default, kafka_clickhouse_demo,kafka_table_local, rand());
6)ClickHouseでMATERIALIZED VIEWテーブルを作成します
CREATE MATERIALIZED VIEW source_mv TO kafka_table_distributed ASSELECT`message`FROM kafka_src_table;
③テーブルを表示します
show tables;
3-2.kafkaでデータを生成します
1)下記コマンドでKafkaデータを生成します
# java -jar kafkaclickhouse-1.0-SNAPSHOT-jar-with-dependencies.jar
または
# java -classpath kafkaclickhouse-1.0-SNAPSHOT-jar-with-dependencies.jar com.test.kafka.KafkaProducerDemo
2)Kafkaコンソール画面でデータを確認します
3-3.Clickhouseでデータをインポートします
1)kafka_src_tableを確認します
SELECT * FROM `kafka_src_table` LIMIT 20;SELECT COUNT( *) FROM kafka_src_table;
2)kafka_table_localを確認します
SELECT * FROM `kafka_table_local` LIMIT 20;SELECT COUNT( *) FROM kafka_table_local;
3)kafka_table_distributedを確認します
SELECT * FROM `kafka_table_distributed` LIMIT 20;SELECT COUNT( *) FROM kafka_table_distributed;
4)source_mvを確認します
SELECT * FROM `source_mv` LIMIT 20;SELECT COUNT( *) FROM source_mv;
最後に
ここまで、Apache kafka(Message Queue for Apache Kafka)- ClickHouseのデータ連携方法を紹介しました。
ApsaraDB for ClickHouseは Apache kafka とスムーズに連携できるので、Apache kafka(Message Queue for Apache Kafka)もしくはApache kafka - ClickHouse といったリアルタイムデータ分析ソリューションとして仕上げることもできます。