安装kafka

  • 装一下java11,我习惯装aws的corretto
  • 版本3.4.1,no zookeeper
wget https://downloads.apache.org/kafka/3.4.1/kafka_2.13-3.4.1.tgz
# 官网下载慢可以去阿里下,很快
wget http://mirrors.aliyun.com/apache/kafka/3.4.1/kafka_2.13-3.4.1.tgz?spm=a2c6h.25603864.0.0.40943270kPi33z
  • 解压到kafka_2.13-3.4.1
  • 建议改一下 config/kraft/server.properties 的log.dirs,默认在/tmp/
cd kafka_2.13-3.4.1
./bin/kafka-storage.sh random-uuid
# P3Jr59YRTOmYrB8q7Hrdgw 
./bin/kafka-storage.sh format -t P3Jr59YRTOmYrB8q7Hrdgw -c ./config/kraft/server.properties
# 如果内在小于2G的话,需要改一下kafka-server-start.sh里的xms,xmx
./bin/kafka-server-start.sh ./config/kraft/server.properties

clickhouse VS Loki

  • 查看很多资料,两个分开来说一说
  • loki:
    • Cpu/Mem 资源占用少
    • 日志压缩存储
    • 界面是grafana
  • clickhouse:
    • Cpu/Mem 资源占用相对较多,一般都是集群部署,需要重量级组件zookeeper
    • 支持压缩存储,可配置
  • 就我目前情况来看clickhouse更合适一点,他是一个数据库,支持聚合之类查询,偶尔我想从日志里查到某个接口在某段时间的访问量,可以直接跑行sql,loki不确定行

Filebeat

  • go写的资源占用少,支持js脚本处理原始数据
  • 安装
mkdir filebeat
cd filebeat
#下载速度慢,80k/s,下了10分钟
#wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.0.0-linux-x86_64.tar.gz
# 改为华为镜像7.3.2版本
wget https://mirrors.huaweicloud.com/filebeat/7.3.2/filebeat-7.3.2-linux-x86_64.tar.gz
tar -zxvf filebeat-7.3.2-linux-x86_64.tar.gz 
cd filebeat-7.3.2-linux-x86_64/
  • 配置,经过几天的调整配置如下(js逻辑做数据处理还是蛮方便的)

我们业务日志大概是下面这样子的,用js写了一部分逻辑把日志里的时间,level,traceId,className,和日志内容,切了出来
2023-08-27 14:50:19.791 INFO - 82815[ restartedMain] [4aXj4DZogiYo] io.undertow.servlet : Initializing Spring embedded WebApplicationContext

max_procs: 1
queue.mem.events: 1024
queue.mem.flush.min_events: 960

filebeat.inputs:
- type: log
  enabled: true
  ignore_older: 2h
  close_inactive: 30m #线上可以改为2m 
  max_bytes: 20480
  tail_files: true
  paths:
    - /var/javaserver/log/app/app.log
  fields_under_root: true
  fields:
    server: dev

output.kafka:
  # initial brokers for reading cluster metadata
  enable: true
  hosts:
    - kafka.server:9092
  topic: log
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: lz4 #发现filebeat的output目前不支持zstd
  max_message_bytes: 102400
  version: 2.0.0

#output.console:
#  enable: true

processors:
  - script:
      lang: javascript
      id: times
      tag: enable
      source: >
        function process(event) {
            var str= event.Get("message");
            var time = str.split(" ").slice(0,2).join(" ");
            var level = str.split(" ").slice(2,4).join("").replace("-","");
            var msg = str.split(":").slice(3).join(":").replace("  "," ");
            var traceIdMatchArr = str.match(/\[.*?\]/g)
            if(traceIdMatchArr && traceIdMatchArr.length>1){
              event.Put("traceId",traceIdMatchArr[1].replace(/[\[\]]/g,""))
            }else{
              event.Put("traceId","")
            }
            var classNameMatchArr = str.match(/ \.?\w+\.[\.\w]+/g)
            if(classNameMatchArr){
              event.Put("className",classNameMatchArr[0])
            }else{
              event.Put("className","")
            }
            event.Put("time",time);
            event.Put("level",level);
            event.Put("msg",msg)
        }
  - timestamp:
      field: time
      timezone: Asia/Shanghai
      layouts:
        - '2006-01-02 15:04:05'
        - '2006-01-02 15:04:05.999'
      test:
        - '2019-06-22 16:33:51'
  - drop_fields:  # 过滤掉不想要的字段
      fields: ["message","log","input","host","agent","ecs","time"]
  • 最终输出日志如下
{"@timestamp":"2023-08-28T02:10:17.944Z","@metadata":{"beat":"filebeat","type":"_doc","version":"7.3.2","topic":"log"},"traceId":"QmcaZqZ1cau4","level":"INFO","server":"ldd_dev","className":" c.c.l.start.handler.TraceIdFilter","msg":" request start path:/api/app/user/tempOrder/listVehicleOrder"}

clickhouse

  • 安装直接走官方文档apt安装即可
  • 必要配置(搜索一下)
    • 开启外网访问
    • 配置default密码
    • 配置数据目录
  • UI
    • DBM :https://dbm.inke.io
    • DBeaver 也不错,但是这个太吃内存了,不建议内存小的mac用
  • 创建kafaka消费表
create table default.log_kafka_queue(traceId String, level String, server String, className String, msg String) 
engine = Kafka 
SETTINGS kafka_broker_list = 'kafka.chebaidu.net:9092', kafka_topic_list = 'log', kafka_group_name = 'log_clickhouse', kafka_format = 'JSONEachRow', kafka_num_consumers = 1,kafka_skip_broken_messages=1
  • 创建实际数据表
create table default.log(time DateTime64(3, 'Asia/Shanghai'), traceId String, level String, server String, className String, msg String,KafkaPartition UInt8,KafkaOffset UInt64) 
Engine = MergeTree PARTITION BY toYYYYMMDD(time) ORDER BY (KafkaPartition,KafkaOffset);
  • 创建物化视图并写到log表
CREATE MATERIALIZED VIEW default.log_kafka_view TO default.log AS
SELECT traceId,level, server,className, msg,_partition as kafkaPartition,_offset as kafkaOffset,_timestamp as time
FROM default.log_kafka_queue
  • 发现有字段写错了,重建表
DETACH TABLE  default.log_kafka_view
drop table default.log
--
create table default.log(upTime DateTime64(3, 'Asia/Shanghai'), traceId String, level String, server String, className String, msg String,_partition UInt8,_offset UInt64) 
Engine = MergeTree PARTITION BY toYYYYMMDD(upTime) ORDER BY (_partition,_offset)
--
CREATE MATERIALIZED VIEW default.log_view TO default.log AS
SELECT traceId,level, server,className, msg,_partition,_offset,_timestamp_ms as upTime
FROM default.log_kafka_queue

日志展示

比较流行的方案是Grafana,侧重图表,但我只想要一个简单的日志搜索查询系统

plogVew

  • kafka代理配置默认是localhost,如果要让外网能访问,需要配置为外网ip或域名,且建议所有客户端都做hostname映射
advertised.listeners=PLAINTEXT://kafka.chebaidu.net:9092
  • filebeat7应该是支持所有0.10到最高版本kafka的,而不是官方文档说的支持到2.6

8/31更新

  • filebeat配置调整:
    • 添加了java的Exception 行合并
    • 由于timestamp无法到秒,所以去掉为直接用时间
    • 保留原始msg的情况下,切出simpleMsg
max_procs: 1
queue.mem.events: 1024
queue.mem.flush.min_events: 960

filebeat.inputs:
- type: log
  enabled: true
  ignore_older: 2h
  close_inactive: 2h #线上可以改为2m 
  max_bytes: 20480
  tail_files: true
  paths:
    - /var/javaserver/log/ludingding/app.log
  fields_under_root: true
  fields:
    server: ldd_dev
    prod: dev
	multiline.pattern: '^2\d{3}\-\d{2}\-\d{2}'
  multiline.negate: true
  multiline.match: after

output.kafka:
  # initial brokers for reading cluster metadata
  enable: true
  hosts:
    - kafka.chebaidu.net:9092
  topic: log
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: gzip
  max_message_bytes: 102400
  version: 2.0.0

#output.console:
#  enable: true

processors:
  - script:
      lang: javascript
      id: msgParse
      tag: enable
      source: >
        function process(event) {
            var str= event.Get("message");
            var time = str.split(" ").slice(0,2).join(" ");
            var level = str.split(" ").slice(2,4).join("").replace("-","");
            var simpleMsg = str.split(":").slice(3).join(":").trim().substr(0,75);
            var traceIdMatchArr = str.match(/\[.*?\]/g)
            if(traceIdMatchArr && traceIdMatchArr.length>1){
              event.Put("traceId",traceIdMatchArr[1].replace(/[\[\]]/g,""))
            }else{
              event.Put("traceId","")
            }
            var classNameMatchArr = str.match(/ \.?\w+\.[\.\w]+/g)
            if(classNameMatchArr){
              event.Put("className",classNameMatchArr[0])
            }else{
              event.Put("className","")
            }
            event.Put("time",time);
            event.Put("level",level);
            event.Put("simpleMsg",simpleMsg);
        }
  - drop_fields:  # 过滤掉不想要的字段
      fields: ["log","input","host","agent","ecs"]
  • ck相应调整
-- 创建kafka表
create table default.log_kafka(traceId String, level String, server String, className String, simpleMsg String,message String,prod String,time String) 
engine = Kafka 
SETTINGS kafka_broker_list = 'kafka.chebaidu.net:9092', kafka_topic_list = 'log', kafka_group_name = 'log_clickhouse', kafka_format = 'JSONEachRow', kafka_num_consumers = 1,kafka_skip_broken_messages=10,kafka_max_block_size = 1048576

-- 创建log表
create table default.log(
  time DateTime64(3, 'Asia/Shanghai'),
  traceId String,
  level LowCardinality(String),
  server LowCardinality(String),
  prod LowCardinality(String),
  simpleMsg String,
  message String,
  INDEX idx_traceId traceId TYPE ngrambf_v1(3, 256, 2, 0) GRANULARITY 5,
)Engine = MergeTree()
PARTITION BY toYYYYMMDD(time)
ORDER BY time 
TTL toDate(time) + toIntervalDay(3)

-- view
CREATE MATERIALIZED VIEW default.log_view TO default.log AS
SELECT time,traceId,level,server,prod,className,simpleMsg,message
FROM default.log_kafka