工作中遇到对github 开源项目 clickhouse-sinker 的定制开发,因此记录 clickhouse-sinker 源码工程的整体流程
已提交PR https://github.com/yixy/clickhouse_sinker/pull/3
搭建环境
go version go1.21.6 darwin/arm64
工程开发ide : goland
项目第三方依赖安装
使用demo 跑通整体流程
使用docker 拉起输入、输出节点
输入流节点
zookeeper+ kafka
输出持久化节点
clickhouse
表结构
1 2 3 4 5 6 7 8 9 10
| CREATE TABLE IF NOT EXISTS test_auto_schema ( `day` Date DEFAULT toDate(cntime), `cntime` DateTime, `name` String, `value` Float64 ) ENGINE = MergeTree PARTITION BY day ORDER BY (cntime, name);
|
clickhouse-sinker 转换器(工程)
1
| go build github.com/housepower/clickhouse_sinker/cmd/clickhouse_sinker --local-cfg-file docker/test_auto_schema.json
|
任务json
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| { "clickhouse": { "hosts": [ [ "127.0.0.1" ] ], "port": 9000, "db": "default", "username": "", "password": "", "retryTimes": 0 }, "kafka": { "brokers": "127.0.0.1:9092", "version": "2.5.0" }, "task": { "name": "test_auto_schema", "topic": "topic1", "consumerGroup": "test_auto_schema", "earliest": true, "parser": "json", "autoSchema": true, "tableName": "test_auto_schema", "excludeColumns": [ "day" ], "bufferSize": 50000 }, "logLevel": "debug", "rateLimit": 1, "RateBurst": 1 }
|
怎么走到时间格式转换函数里,打印出堆栈
1 2 3 4 5 6 7 8 9 10 11
| parser.parseInLocation (parser.go:192) github.com/housepower/clickhouse_sinker/parser parser.(*Pool).ParseDateTime (parser.go:149) github.com/housepower/clickhouse_sinker/parser parser.(*FastjsonMetric).GetDateTime (fastjson.go:129) github.com/housepower/clickhouse_sinker/parser model.GetValueByType (value.go:83) github.com/housepower/clickhouse_sinker/model model.MetricToRow (message.go:205) github.com/housepower/clickhouse_sinker/model task.(*Service).put.func1 (task.go:294) github.com/housepower/clickhouse_sinker/task util.(*WorkerPool).wokerFunc (workerpool.go:59) github.com/housepower/clickhouse_sinker/util util.(*WorkerPool).start.func1 (workerpool.go:79) github.com/housepower/clickhouse_sinker/util runtime.goexit (asm_arm64.s:1197) runtime - Async Stack Trace util.(*WorkerPool).start (workerpool.go:79) github.com/housepower/clickhouse_sinker/util
|
可以看到是 WorkerPool的start方法 ,启动了一个task对应的转换器function
在消费到kafka里的一条数据后,进入task 的put.func1 方法,将kafka的字符串转成clickhouse 表里的model 对象
model对象的各个字段是直接读取clickhouse数据库表结构定义得到的
然后遍历model对象的各个字段,例如model.GetValueByType 解析出字段值
解析 datetime类型的字段值时,会进入 parser.parseInLocation 方法,按照模版上的字符串格式,解析成go 的日期时间类对象
自定义日期时间格式
如果接受到的日期时间格式,不在标准模版内,需要自行添加模版
需要在layout 数组中添加模版 – 模板代表的含义必须是
(1)日期时间类 = 2006年01月02日 15时04分05秒
或者
(2)日期类 = 2006年01月02日
例如添加 20060102150405
// self time format
"20060102150405",
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| Layouts = []string{ "2006-01-02T15:04:05Z07:00", "2006-01-02T15:04:05Z0700", "2006-01-02T15:04:05", "2006-01-02 15:04:05Z07:00", "2006-01-02 15:04:05Z0700", "2006-01-02 15:04:05", "Mon Jan _2 15:04:05 2006", "Mon Jan _2 15:04:05 MST 2006", "Mon Jan 02 15:04:05 -0700 2006", "02 Jan 06 15:04 MST", "02 Jan 06 15:04 -0700", "Monday, 02-Jan-06 15:04:05 MST", "Mon, 02 Jan 2006 15:04:05 MST", "Mon, 02 Jan 2006 15:04:05 -0700", "Mon Jan 02 15:04:05 MST 2006", "Mon 02 Jan 2006 03:04:05 PM MST", "Jan 02, 2006 15:04:05Z07:00", "Jan 02, 2006 15:04:05Z0700", "Jan 02, 2006 15:04:05", "02/Jan/2006 15:04:05 Z07:00", "02/Jan/2006 15:04:05 Z0700", "02/Jan/2006 15:04:05", "20060102150405", "2006-01-02", "02/01/2006", "02/Jan/2006", "Jan 02, 2006", "Mon Jan 02, 2006", }
|
效果
向kafka插入数据
1
| echo '{"cntime" : "20240123010500", "name" : "name1", "value" : 1}' | kaf -b '127.0.0.1:9092' produce topic1
|
Clickhouse-sinker按上述逻辑跑通后
日志打印
1
| {"level":"debug","ts":"2024-01-23T01:34:46.389+0800","msg":"wrote Fetch v11","broker":"1","bytes_written":52,"write_wait":0.000059,"time_to_write":0.000019542,"err":null}
|
clickhouse数据

支持字符串转int64,float64
如果是使用json格式
会调用到fastjson.go的 GetInt64 方法 GetFloat64方法
如果value 是类似“23”的字符串,原先的解析逻辑是首先判断 值的编码 是否在 数字编码(int64/ float64)的范围内
因为”23”是字符串,因此不在这个范围内,因此都会转成默认的0
修改为
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| strValue := c.value.Get(key).GetStringBytes() if strValue == nil { if nullable { return nil } return 0.0 } floatVal, err := strconv.ParseFloat(string(strValue), 64) if err != nil { if nullable { return nil } return 0.0 } return floatVal
|