www.colben.cn/content/post/ch-kafka.md
2021-11-14 14:32:08 +08:00

74 lines
2.4 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

---
title: "ClickHouse 表引擎之 Kafka"
date: 2020-10-08T10:42:00+08:00
lastmod: 2020-10-08T10:42:00+08:00
tags: []
categories: ["clickhouse"]
---
# Kafka 表引擎简介
- 对接 Kafka 系统,订阅 Kafka 主题,接收消息
# 创建 Kafka 引擎表
- 声明
```sql
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'host:port, ...',
kafka_topic_list = 'topic1, topic2, ...',
kafka_group_name = 'consumer_group_name',
kafka_format = 'data_format',
[kafka_row_delimiter = 'delimiter_symbol',]
[kafka_schema = '',]
[kafka_num_consumers = N,]
[kafka_skip_broken_messages = N,]
[kafka_commit_every_batch =N];
```
- kafka_broker_list: kafka 节点地址列表,用逗号分隔
- kafka_topic_list: 订阅的主题列表,用逗号分隔
- kafka_group_name: 消费组名称,引擎会依据此名称创建消费组
- kafka_format: 消息格式,如 TSV、JSONEachRow、CSV 等
- kafka_row_delimiter: 一行数据的结束符,默认 '\0'
- kafka_schema: kafka schema 参数
- kafka_num_consumers: 消费者数量,默认 1
- kafka_skip_broken_messages: 允许跳过的错误消息数量默认0
- kafka_commit_every_batch: kafka commit 频率,默认 0即整个 Block 完全写入后才 commit
# Kafka 表引擎其他参数
- stream_poll_timeout_ms: 默认每 500ms 消费一次数据,写入缓存
- 刷新缓存触发条件:
- 一个数据块(kafka_max_block_size默认 65536)写入完成
- 等待 7500 毫秒(stream_flush_interval_ms)
- config.xml 中的 librdkafka 配置,参考 [https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)
```xml
<kafka>
<auto_offset_reset>smallest</auto_offset_reset>
</kafka>
```
# Kafka 引擎表一般用法
- 创建 Kafka 引擎表,充当数据管道
- 创建 MergeTree 引擎表,用于查询
- 创建物化视图,同步 kafka 数据到 MergeTree 引擎表
```sql
CREATE MATERIALIZED VIEW kafka_view TO mergetree_table
AS SELECT col1, col2, ... FROM kafka_table;
```
- 要停止数据同步,可以删除视图,也可以卸载视图
```sql
-- 删除
DROP TABLE kafka_view;
-- 卸载
DETACH TABLE kafka_view;
```
- 恢复数据同步,装载视图
```sql
ATTACH MATERIALIZED VIEW kafka_view TO mergetree_table
AS SELECT col1, col2, ... FROM kafka_table;
```