Clickhouse Kafka Engine 使用

在使用ClickHouse的过程中,数据接入的方式有很多种,最近在尝试使用kafka的方式进行数据的入库,目前大概有两种方案:

  1. 内部kafka 引擎方式接入
  2. clickhouse_sinker

尝试第一种方式,既kafka引擎方式进行接入。

具体操作步骤如下:

    • 搭建kafka服务器 可以参照 spring cloud stream Kafka 示例 里,kafka搭建方式
    • 创建kafka 引擎的表
    • CREATE TABLE tkafka (
      timestamp UInt64,
      level String,
      message String
      ) ENGINE = Kafka SETTINGS kafka_broker_list = ‘192.168.1.198:9092’,
      kafka_topic_list = ‘test2’,
      kafka_group_name = ‘group1’,
      kafka_format = ‘JSONEachRow’,
      kafka_row_delimiter = ‘\n’,
      kafka_num_consumers = 1;

    • 创建一个结构表
    • CREATE TABLE daily (
      day Date,
      level String,
      total UInt64
      ) ENGINE = SummingMergeTree(day, (day, level), 8192);

    • 创建物化视图
    • CREATE MATERIALIZED VIEW consumer TO daily AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total FROM tkafka GROUP BY day, level;

整个过程就完毕了,其中需要消息发送主要是JSONEachRow,也就是JSON格式的数据,那么往topic 里面写入JSON数据即可。

bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test2
>{"timestamp":"1562209583","level":"2","message":"hello2"}

发送数据后需要关闭通道,不然无法查询到数据。

SELECT level, sum(total) FROM daily GROUP BY level;

问题解答

1. Cannot parse input: expected { before: \0: (at row 2)

问题出在引擎版本上,我使用的是19.3.4 版本。19.1 版本没有问题, 19.5.2.6 版本解决了此问题,也就是中间版本存在这个问题。

原因: 消息中数据之间的分割符号未指定,导致无法处理。

解决办法: 添加 kafka_row_delimiter = ‘\n’,也就是上文键标红的部分。

参考解决地址: https://github.com/yandex/ClickHouse/issues/4442

2. 消息发送后,数据无法查询。

原因:kafka 引擎默认消费根据条数与时间进行入库,不然肯定是没效率的。

解决办法:其中对应的参数有两个。 max_insert_block_size ,stream_flush_interval_ms。

这两个参数都是全局性的。

max_insert_block_size 默认值为: Default value: 1,048,576.

参考地址: https://clickhouse.yandex/docs/zh/operations/settings/settings/#settings-max_insert_block_size

<iframe id="aswift_2" style="border-width: 0px; border-style: initial; font-family: inherit; font-style: inherit; font-weight: inherit; margin: 0px; outline: 0px; padding: 0px; vertical-align: baseline; max-width: 100%; left: 0px; position: absolute; top: 0px; width: 700px; height: 175px;" name="aswift_2" frameborder="0" marginwidth="0" marginheight="0" scrolling="no" width="700" height="175"></iframe>

stream-flush-interval-ms 默认值为: The default value is 7500.

实战:

 注意发送后关闭通道

     严格的物化顺序,不允许中间删除再补

             user.xml文件中:有效

            <max_memory_usage>120000000000</max_memory_usage>  解决查询峰值问题,查询异常不稳定

            <max_insert_block_size>2048576</max_insert_block_size>  防止溢出

            <stream_flush_interval_ms>750</stream_flush_interval_ms>  解决快速看效果

可用脚本:

例子:

创建队列:

./kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 1 --partitions 1 --topic log_test1

发送消息:

./kafka-console-consumer.sh --bootstrap-server node01:9092  --topic log_test

接受消息:

sh bin/kafka-console-producer.sh --broker-list node01:9092 --topic log_test1

{"timestamp":"1562209583","level":"2","message":"hello2"}

CREATE TABLE default.tkafka (

timestamp UInt64,

level String,

message String

) ENGINE = Kafka SETTINGS kafka_broker_list = '192.168.202.135:9092,192.168.202.136:9092,192.168.202.185:9092',

kafka_topic_list = 'log_test55',

kafka_group_name = 'group1',

kafka_format = 'JSONEachRow',

kafka_row_delimiter = '\n',

kafka_num_consumers = 1;

CREATE TABLE default.daily (

day Date,

level String,

total UInt64

) ENGINE = SummingMergeTree(day, (day, level), 8192);

CREATE MATERIALIZED VIEW default.consumer TO daily AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total FROM default.tkafka GROUP BY day, level;

参考地址:

https://clickhouse.yandex/docs/zh/operations/settings/settings/#stream-flush-interval-ms

这个参数改小时影响整个数据库的,所以如果不好调整请采用方案2。clickhouse_sinker.

github 地址: https://github.com/housepower/clickhouse_sinker

参考:https://www.cqmaple.com/201907/clickhouse-kafka-engine.html

相关推荐