安装kafka
- 装一下java11,我习惯装aws的corretto
- 版本3.4.1,no zookeeper
wget https://downloads.apache.org/kafka/3.4.1/kafka_2.13-3.4.1.tgz
# 官网下载慢可以去阿里下,很快
wget http://mirrors.aliyun.com/apache/kafka/3.4.1/kafka_2.13-3.4.1.tgz?spm=a2c6h.25603864.0.0.40943270kPi33z
- 解压到kafka_2.13-3.4.1
- 建议改一下 config/kraft/server.properties 的log.dirs,默认在/tmp/
cd kafka_2.13-3.4.1
./bin/kafka-storage.sh random-uuid
# P3Jr59YRTOmYrB8q7Hrdgw
./bin/kafka-storage.sh format -t P3Jr59YRTOmYrB8q7Hrdgw -c ./config/kraft/server.properties
# 如果内在小于2G的话,需要改一下kafka-server-start.sh里的xms,xmx
./bin/kafka-server-start.sh ./config/kraft/server.properties
- 没啥大问题的话就启起来了
- 推荐一个简单的ui https://apps.apple.com/cn/app/kfexplorer/id1590781799,可直接在appstore下载
clickhouse VS Loki
- 查看很多资料,两个分开来说一说
- loki:
- Cpu/Mem 资源占用少
- 日志压缩存储
- 界面是grafana
- clickhouse:
- Cpu/Mem 资源占用相对较多,一般都是集群部署,需要重量级组件zookeeper
- 支持压缩存储,可配置
- 就我目前情况来看clickhouse更合适一点,他是一个数据库,支持聚合之类查询,偶尔我想从日志里查到某个接口在某段时间的访问量,可以直接跑行sql,loki不确定行
Filebeat
- go写的资源占用少,支持js脚本处理原始数据
- 安装
mkdir filebeat
cd filebeat
#下载速度慢,80k/s,下了10分钟
#wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.0.0-linux-x86_64.tar.gz
# 改为华为镜像7.3.2版本
wget https://mirrors.huaweicloud.com/filebeat/7.3.2/filebeat-7.3.2-linux-x86_64.tar.gz
tar -zxvf filebeat-7.3.2-linux-x86_64.tar.gz
cd filebeat-7.3.2-linux-x86_64/
- 配置,经过几天的调整配置如下(js逻辑做数据处理还是蛮方便的)
我们业务日志大概是下面这样子的,用js写了一部分逻辑把日志里的时间,level,traceId,className,和日志内容,切了出来
2023-08-27 14:50:19.791 INFO - 82815[ restartedMain] [4aXj4DZogiYo] io.undertow.servlet : Initializing Spring embedded WebApplicationContext
max_procs: 1
queue.mem.events: 1024
queue.mem.flush.min_events: 960
filebeat.inputs:
- type: log
enabled: true
ignore_older: 2h
close_inactive: 30m #线上可以改为2m
max_bytes: 20480
tail_files: true
paths:
- /var/javaserver/log/app/app.log
fields_under_root: true
fields:
server: dev
output.kafka:
# initial brokers for reading cluster metadata
enable: true
hosts:
- kafka.server:9092
topic: log
partition.round_robin:
reachable_only: false
required_acks: 1
compression: lz4 #发现filebeat的output目前不支持zstd
max_message_bytes: 102400
version: 2.0.0
#output.console:
# enable: true
processors:
- script:
lang: javascript
id: times
tag: enable
source: >
function process(event) {
var str= event.Get("message");
var time = str.split(" ").slice(0,2).join(" ");
var level = str.split(" ").slice(2,4).join("").replace("-","");
var msg = str.split(":").slice(3).join(":").replace(" "," ");
var traceIdMatchArr = str.match(/\[.*?\]/g)
if(traceIdMatchArr && traceIdMatchArr.length>1){
event.Put("traceId",traceIdMatchArr[1].replace(/[\[\]]/g,""))
}else{
event.Put("traceId","")
}
var classNameMatchArr = str.match(/ \.?\w+\.[\.\w]+/g)
if(classNameMatchArr){
event.Put("className",classNameMatchArr[0])
}else{
event.Put("className","")
}
event.Put("time",time);
event.Put("level",level);
event.Put("msg",msg)
}
- timestamp:
field: time
timezone: Asia/Shanghai
layouts:
- '2006-01-02 15:04:05'
- '2006-01-02 15:04:05.999'
test:
- '2019-06-22 16:33:51'
- drop_fields: # 过滤掉不想要的字段
fields: ["message","log","input","host","agent","ecs","time"]
- 最终输出日志如下
{"@timestamp":"2023-08-28T02:10:17.944Z","@metadata":{"beat":"filebeat","type":"_doc","version":"7.3.2","topic":"log"},"traceId":"QmcaZqZ1cau4","level":"INFO","server":"ldd_dev","className":" c.c.l.start.handler.TraceIdFilter","msg":" request start path:/api/app/user/tempOrder/listVehicleOrder"}
clickhouse
- 安装直接走官方文档apt安装即可
- 必要配置(搜索一下)
- 开启外网访问
- 配置default密码
- 配置数据目录
- UI
- DBM :https://dbm.inke.io
- DBeaver 也不错,但是这个太吃内存了,不建议内存小的mac用
- 创建kafaka消费表
create table default.log_kafka_queue(traceId String, level String, server String, className String, msg String)
engine = Kafka
SETTINGS kafka_broker_list = 'kafka.chebaidu.net:9092', kafka_topic_list = 'log', kafka_group_name = 'log_clickhouse', kafka_format = 'JSONEachRow', kafka_num_consumers = 1,kafka_skip_broken_messages=1
- 创建实际数据表
create table default.log(time DateTime64(3, 'Asia/Shanghai'), traceId String, level String, server String, className String, msg String,KafkaPartition UInt8,KafkaOffset UInt64)
Engine = MergeTree PARTITION BY toYYYYMMDD(time) ORDER BY (KafkaPartition,KafkaOffset);
- 创建物化视图并写到log表
CREATE MATERIALIZED VIEW default.log_kafka_view TO default.log AS
SELECT traceId,level, server,className, msg,_partition as kafkaPartition,_offset as kafkaOffset,_timestamp as time
FROM default.log_kafka_queue
- 发现有字段写错了,重建表
DETACH TABLE default.log_kafka_view
drop table default.log
--
create table default.log(upTime DateTime64(3, 'Asia/Shanghai'), traceId String, level String, server String, className String, msg String,_partition UInt8,_offset UInt64)
Engine = MergeTree PARTITION BY toYYYYMMDD(upTime) ORDER BY (_partition,_offset)
--
CREATE MATERIALIZED VIEW default.log_view TO default.log AS
SELECT traceId,level, server,className, msg,_partition,_offset,_timestamp_ms as upTime
FROM default.log_kafka_queue
日志展示
比较流行的方案是Grafana,侧重图表,但我只想要一个简单的日志搜索查询系统
plogVew
- 自己写了下个简单的clickhouse日志查询系统
- 技术栈:java + clickHouseHttpClient + html + PicoCss + alpinejs+ day.js
- 已在github开源: https://github.com/sprite5/plogview_clickhouse
坑
- kafka代理配置默认是localhost,如果要让外网能访问,需要配置为外网ip或域名,且建议所有客户端都做hostname映射
advertised.listeners=PLAINTEXT://kafka.chebaidu.net:9092
- filebeat7应该是支持所有0.10到最高版本kafka的,而不是官方文档说的支持到2.6
8/31更新
- filebeat配置调整:
- 添加了java的Exception 行合并
- 由于timestamp无法到秒,所以去掉为直接用时间
- 保留原始msg的情况下,切出simpleMsg
max_procs: 1
queue.mem.events: 1024
queue.mem.flush.min_events: 960
filebeat.inputs:
- type: log
enabled: true
ignore_older: 2h
close_inactive: 2h #线上可以改为2m
max_bytes: 20480
tail_files: true
paths:
- /var/javaserver/log/ludingding/app.log
fields_under_root: true
fields:
server: ldd_dev
prod: dev
multiline.pattern: '^2\d{3}\-\d{2}\-\d{2}'
multiline.negate: true
multiline.match: after
output.kafka:
# initial brokers for reading cluster metadata
enable: true
hosts:
- kafka.chebaidu.net:9092
topic: log
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 102400
version: 2.0.0
#output.console:
# enable: true
processors:
- script:
lang: javascript
id: msgParse
tag: enable
source: >
function process(event) {
var str= event.Get("message");
var time = str.split(" ").slice(0,2).join(" ");
var level = str.split(" ").slice(2,4).join("").replace("-","");
var simpleMsg = str.split(":").slice(3).join(":").trim().substr(0,75);
var traceIdMatchArr = str.match(/\[.*?\]/g)
if(traceIdMatchArr && traceIdMatchArr.length>1){
event.Put("traceId",traceIdMatchArr[1].replace(/[\[\]]/g,""))
}else{
event.Put("traceId","")
}
var classNameMatchArr = str.match(/ \.?\w+\.[\.\w]+/g)
if(classNameMatchArr){
event.Put("className",classNameMatchArr[0])
}else{
event.Put("className","")
}
event.Put("time",time);
event.Put("level",level);
event.Put("simpleMsg",simpleMsg);
}
- drop_fields: # 过滤掉不想要的字段
fields: ["log","input","host","agent","ecs"]
- ck相应调整
-- 创建kafka表
create table default.log_kafka(traceId String, level String, server String, className String, simpleMsg String,message String,prod String,time String)
engine = Kafka
SETTINGS kafka_broker_list = 'kafka.chebaidu.net:9092', kafka_topic_list = 'log', kafka_group_name = 'log_clickhouse', kafka_format = 'JSONEachRow', kafka_num_consumers = 1,kafka_skip_broken_messages=10,kafka_max_block_size = 1048576
-- 创建log表
create table default.log(
time DateTime64(3, 'Asia/Shanghai'),
traceId String,
level LowCardinality(String),
server LowCardinality(String),
prod LowCardinality(String),
simpleMsg String,
message String,
INDEX idx_traceId traceId TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 5,
)Engine = MergeTree()
PARTITION BY toYYYYMMDD(time)
ORDER BY time
TTL toDate(time) + toIntervalDay(3)
-- view
CREATE MATERIALIZED VIEW default.log_view TO default.log AS
SELECT time,traceId,level,server,prod,className,simpleMsg,message
FROM default.log_kafka