www.colben.cn/content/post/ch-kafka.md
2021-11-14 15:52:46 +08:00

2.4 KiB
Raw Blame History

title, date, lastmod, tags, categories
title date lastmod tags categories
ClickHouse 表引擎之 Kafka 2020-10-08T10:42:00+08:00 2020-10-08T10:42:00+08:00
clickhouse

Kafka 表引擎简介

  • 对接 Kafka 系统,订阅 Kafka 主题,接收消息

创建 Kafka 引擎表

  • 声明

    ENGINE = Kafka()
    SETTINGS
        kafka_broker_list = 'host:port, ...',
        kafka_topic_list = 'topic1, topic2, ...',
        kafka_group_name = 'consumer_group_name',
        kafka_format = 'data_format',
        [kafka_row_delimiter = 'delimiter_symbol',]
        [kafka_schema = '',]
        [kafka_num_consumers = N,]
        [kafka_skip_broken_messages = N,]
        [kafka_commit_every_batch =N];
    
  • kafka_broker_list: kafka 节点地址列表,用逗号分隔

  • kafka_topic_list: 订阅的主题列表,用逗号分隔

  • kafka_group_name: 消费组名称,引擎会依据此名称创建消费组

  • kafka_format: 消息格式,如 TSV、JSONEachRow、CSV 等

  • kafka_row_delimiter: 一行数据的结束符,默认 '\0'

  • kafka_schema: kafka schema 参数

  • kafka_num_consumers: 消费者数量,默认 1

  • kafka_skip_broken_messages: 允许跳过的错误消息数量默认0

  • kafka_commit_every_batch: kafka commit 频率,默认 0即整个 Block 完全写入后才 commit

Kafka 表引擎其他参数

  • stream_poll_timeout_ms: 默认每 500ms 消费一次数据,写入缓存

  • 刷新缓存触发条件:

    • 一个数据块(kafka_max_block_size默认 65536)写入完成
    • 等待 7500 毫秒(stream_flush_interval_ms)
  • config.xml 中的 librdkafka 配置,参考 https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

    <kafka>
        <auto_offset_reset>smallest</auto_offset_reset>
    </kafka>
    

Kafka 引擎表一般用法

  • 创建 Kafka 引擎表,充当数据管道

  • 创建 MergeTree 引擎表,用于查询

  • 创建物化视图,同步 kafka 数据到 MergeTree 引擎表

    CREATE MATERIALIZED VIEW kafka_view TO mergetree_table
        AS SELECT col1, col2, ... FROM kafka_table;
    
  • 要停止数据同步,可以删除视图,也可以卸载视图

    -- 删除
    DROP TABLE kafka_view;
    -- 卸载
    DETACH TABLE kafka_view;
    
  • 恢复数据同步,装载视图

    ATTACH MATERIALIZED VIEW kafka_view TO mergetree_table
        AS SELECT col1, col2, ... FROM kafka_table;