clickhouse-sinker开发指南

工作中遇到对github 开源项目 clickhouse-sinker 的定制开发,因此记录 clickhouse-sinker 源码工程的整体流程

已提交PR https://github.com/yixy/clickhouse_sinker/pull/3

搭建环境

1
go  version

go version go1.21.6 darwin/arm64

工程开发ide : goland

项目第三方依赖安装

1
make build

使用demo 跑通整体流程

使用docker 拉起输入、输出节点

1
docker-compose  up

输入流节点

zookeeper+ kafka

输出持久化节点

clickhouse

表结构

1
2
3
4
5
6
7
8
9
10
CREATE TABLE IF NOT EXISTS test_auto_schema
(
`day` Date DEFAULT toDate(cntime),
`cntime` DateTime,
`name` String,
`value` Float64
)
ENGINE = MergeTree
PARTITION BY day
ORDER BY (cntime, name);

clickhouse-sinker 转换器(工程)

1
go build github.com/housepower/clickhouse_sinker/cmd/clickhouse_sinker --local-cfg-file docker/test_auto_schema.json

任务json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
{
"clickhouse": {
"hosts": [
[
"127.0.0.1"
]
],
"port": 9000,
"db": "default",
"username": "",
"password": "",
"retryTimes": 0
},
"kafka": {
"brokers": "127.0.0.1:9092",
"version": "2.5.0"
},
"task": {
"name": "test_auto_schema",
"topic": "topic1",
"consumerGroup": "test_auto_schema",
"earliest": true,
"parser": "json",
"autoSchema": true,
"tableName": "test_auto_schema",
"excludeColumns": [
"day"
],
"bufferSize": 50000
},
"logLevel": "debug",
"rateLimit": 1,
"RateBurst": 1
}

怎么走到时间格式转换函数里,打印出堆栈

1
2
3
4
5
6
7
8
9
10
11
parser.parseInLocation (parser.go:192) github.com/housepower/clickhouse_sinker/parser
parser.(*Pool).ParseDateTime (parser.go:149) github.com/housepower/clickhouse_sinker/parser
parser.(*FastjsonMetric).GetDateTime (fastjson.go:129) github.com/housepower/clickhouse_sinker/parser
model.GetValueByType (value.go:83) github.com/housepower/clickhouse_sinker/model
model.MetricToRow (message.go:205) github.com/housepower/clickhouse_sinker/model
task.(*Service).put.func1 (task.go:294) github.com/housepower/clickhouse_sinker/task
util.(*WorkerPool).wokerFunc (workerpool.go:59) github.com/housepower/clickhouse_sinker/util
util.(*WorkerPool).start.func1 (workerpool.go:79) github.com/housepower/clickhouse_sinker/util
runtime.goexit (asm_arm64.s:1197) runtime
- Async Stack Trace
util.(*WorkerPool).start (workerpool.go:79) github.com/housepower/clickhouse_sinker/util

可以看到是 WorkerPool的start方法 ,启动了一个task对应的转换器function

在消费到kafka里的一条数据后,进入task 的put.func1 方法,将kafka的字符串转成clickhouse 表里的model 对象

model对象的各个字段是直接读取clickhouse数据库表结构定义得到的

然后遍历model对象的各个字段,例如model.GetValueByType 解析出字段值

解析 datetime类型的字段值时,会进入 parser.parseInLocation 方法,按照模版上的字符串格式,解析成go 的日期时间类对象

自定义日期时间格式

如果接受到的日期时间格式,不在标准模版内,需要自行添加模版

需要在layout 数组中添加模版 – 模板代表的含义必须是

(1)日期时间类 = 2006年01月02日 15时04分05秒

或者

(2)日期类 = 2006年01月02日

例如添加 20060102150405

    // self time format
    "20060102150405",
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
Layouts = []string{
//DateTime, RFC3339
"2006-01-02T15:04:05Z07:00", //time.RFC3339, `date --iso-8601=s` on Ubuntu 20.04
"2006-01-02T15:04:05Z0700", //`date --iso-8601=s` on CentOS 7.6
"2006-01-02T15:04:05",
//DateTime, ISO8601
"2006-01-02 15:04:05Z07:00", //`date --rfc-3339=s` output format
"2006-01-02 15:04:05Z0700",
"2006-01-02 15:04:05",
//DateTime, other layouts supported by golang
"Mon Jan _2 15:04:05 2006", //time.ANSIC
"Mon Jan _2 15:04:05 MST 2006", //time.UnixDate
"Mon Jan 02 15:04:05 -0700 2006", //time.RubyDate
"02 Jan 06 15:04 MST", //time.RFC822
"02 Jan 06 15:04 -0700", //time.RFC822Z
"Monday, 02-Jan-06 15:04:05 MST", //time.RFC850
"Mon, 02 Jan 2006 15:04:05 MST", //time.RFC1123
"Mon, 02 Jan 2006 15:04:05 -0700", //time.RFC1123Z
//DateTime, linux utils
"Mon Jan 02 15:04:05 MST 2006", // `date` on CentOS 7.6 default output format
"Mon 02 Jan 2006 03:04:05 PM MST", // `date` on Ubuntu 20.4 default output format
//DateTime, home-brewed
"Jan 02, 2006 15:04:05Z07:00",
"Jan 02, 2006 15:04:05Z0700",
"Jan 02, 2006 15:04:05",
"02/Jan/2006 15:04:05 Z07:00",
"02/Jan/2006 15:04:05 Z0700",
"02/Jan/2006 15:04:05",
// self time format
"20060102150405",
//Date
"2006-01-02",
"02/01/2006",
"02/Jan/2006",
"Jan 02, 2006",
"Mon Jan 02, 2006",
}

效果

向kafka插入数据

1
echo '{"cntime" : "20240123010500", "name" : "name1", "value" : 1}' | kaf -b '127.0.0.1:9092' produce topic1

Clickhouse-sinker按上述逻辑跑通后

日志打印

1
{"level":"debug","ts":"2024-01-23T01:34:46.389+0800","msg":"wrote Fetch v11","broker":"1","bytes_written":52,"write_wait":0.000059,"time_to_write":0.000019542,"err":null}

clickhouse数据

image

支持字符串转int64,float64

如果是使用json格式

会调用到fastjson.go的 GetInt64 方法 GetFloat64方法

如果value 是类似“23”的字符串,原先的解析逻辑是首先判断 值的编码 是否在 数字编码(int64/ float64)的范围内

因为”23”是字符串,因此不在这个范围内,因此都会转成默认的0

修改为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 先按String 类型读取字符串
strValue := c.value.Get(key).GetStringBytes()
if strValue == nil {
if nullable {
return nil
}
return 0.0 // 默认值
}
// 将字符串转换为 float64
floatVal, err := strconv.ParseFloat(string(strValue), 64)
if err != nil {
if nullable {
return nil
}
return 0.0 // 默认值
}
return floatVal