最近由于工作原因,仔细研究了一下 Filebeat 这个开源软件的代码,然后意外发现了两处 Bug,并且可以重现。
本想顺手修复一下,给官方提个 pull request,但是一看 Filebeat Gitlab 代码仓库,目前都在 1.4k 个 issue 未处理,我这两个估计也没人管,所以作罢。
于是发此文章,当作记录一下。下面开始讲一下这两处 Bug,我测试是用的比较早的版本 7.13.2,截止目前 Filebeat 的最新版本是 8.0.0,同样存在这两个 Bug。
一、filebeat 处理超长 Line 时导致记录文件 offset 不准的 bug
测试 filebeat 版本: filebeat-7.13.2-darwin-x86_64
Bug 描述
filebeat 定义了每行日志的最大长度,一旦超过此长度,则会丢弃此行。但是 filebeat 在丢弃这行数据的时候,并没有将这行数据的长度记录到日志文件读取的 offset 中,导致落地的文件读取进度会少于预期值。
重现方法
见同目录的 filebeat.yml 配置文件,重点配置
max_bytes: 200
read_buffer_size: 16384
意思是一次从文件读取 16k 数据,每行日志的最大长度超过 200 会被丢弃。filebeat 实际判断的时候,会把 max_bytes 乘以 4 ,防止 utf32 这种编码。所以实际上超过 800 行的日志会被丢弃。
我们构造了一个日志文件 bigline.txt,见同目录。文件第一行超过了 800 字节,应该被丢弃,第二行正常,应该正常输出。
运行命令,filebeat -c filebeat.yml ,输出
Exceeded 800 max bytes in line limit, skipped 2986 bytes
{"@timestamp":"2021-10-14T08:37:01.054Z","@metadata":{"beat":"filebeat","type":"_doc","version":"8.0.0"},"message":"456789","input":{"type":"log"},"log":{"offset":0,"file":{"path":"/Users/kyle/Desktop/bigline.log"}}}
看到没,这里日志的 offset 是 0 开始的,实际是错误的,没算上被丢弃的第一行的长度。
再看 filebeat 落地的文件读取进度 registry 文件,看下 log.json
{"op":"set","id":3}
{"k":"filebeat::logs::native::12937144398-16777220","v":{"id":"native::12937144398-16777220","source":"/Users/kyle/Desktop/bigline.log","timestamp":[2061641314080,1634200622],"ttl":-1,"FileStateOS":{"inode":12937144398,"device":16777220},"prev_id":"","offset":7,"type":"log","identifier_name":"native"}}
同样,记录的文件读取进度,也是 7,没算上丢弃消息长度。
这样再次启动 filebeat 时,会从位置 7 开始读取,又会出现行超过长度的提示
Exceeded 800 max bytes in line limit, skipped 2979 bytes line
{"@timestamp":"2021-10-14T08:42:22.503Z","@metadata":{"beat":"filebeat","type":"_doc","version":"8.0.0"},"log":{"offset":7,"file":{"path":"/Users/kyle/Desktop/bigline.log"}},"message":"456789","input":{"type":"log"}}
这里又把第二行内容 456789 读了出来。
bug 代码
丢弃超长行的代码逻辑在 libbeat/reader/readfile/line.go
// func (r *LineReader) advance() error
// If max bytes limit per line is set, then drop the lines that are longer
if r.maxBytes != 0 {
//fmt.Printf("advance maxBytes != 0\n")
// If newLine is found, drop the lines longer than maxBytes
for idx != -1 && idx > r.maxBytes {
r.logger.Warnf("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, idx)
err = r.inBuffer.Advance(idx + len(r.nl))
r.inBuffer.Reset()
r.inOffset = 0
idx = r.inBuffer.IndexFrom(r.inOffset, r.nl)
}
// If newLine is not found and the incoming data buffer exceeded max bytes limit, then skip until the next newLine
if idx == -1 && r.inBuffer.Len() > r.maxBytes {
skipped, err := r.skipUntilNewLine()
if err != nil {
r.logger.Error("Error skipping until new line, err:", err)
return err
}
r.logger.Warnf("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, skipped)
idx = r.inBuffer.IndexFrom(r.inOffset, r.nl)
}
}
修复方法
@@ -122,7 +122,9 @@ func (r *LineReader) advance() error {
// If newLine is found, drop the lines longer than maxBytes
for idx != -1 && idx > r.maxBytes {
logrus.Warnf("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, idx)
- err = r.inBuffer.Advance(idx + len(lineTerminatorCharacter))
+ skipped := idx + len(lineTerminatorCharacter)
+ err = r.inBuffer.Advance(skipped)
+ r.byteCount += skipped
if err != nil {
return err
}
@@ -139,6 +141,7 @@ func (r *LineReader) advance() error {
return err
}
logrus.Warnf("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, skipped)
+ r.byteCount += skipped
idx = r.inBuffer.IndexFrom(r.inOffset, lineTerminatorCharacter)
}
}
二、filebeat 遇到 ErrBreakerOpen 报错会错误 ACK 导致消息丢失的 bug
测试 filebeat 版本: filebeat-7.13.2-darwin-x86_64
Bug 描述
当 filebeat 连接 kafka 失败,例如用户密码错误时,kafka 会抛出 ErrBreakerOpen 异常,filebeat 会 sleep 一定时间再继续处理。
当发送一个 batch 消息时,如果这个 batch 内消息所有都返回 ErrBreakerOpen 错误,则这个 batch 会被当成正常发送而进行 ACK 逻辑。
导致这些本没有发送成功的消息,ack 后变成发送成功了。也就是消息丢失。
重现方法
见同目录的 filebeat.yml 配置文件,重点配置
queue:
mem:
# 这样一个 batch 消息条数少,重现容易些
flush.min_events: 2
output.kafka:
# 故意设置成一个错误的用户名
username: 'xxx'
运行命令,filebeat -c filebeat.yml,过一会,观察 filebeat ack 日志
cat registry/filebeat/log.json 16:34:59
{"op":"set","id":1}
{"k":"filebeat::logs::native::12943379684-16777220","v":{"source":"/Users/kyle/projects/futu/dp_logpipe/logpipe_collector/logpipe_sdk.log","offset":0,"FileStateOS":{"inode":12943379684,"device":16777220},"identifier_name":"native","id":"native::12943379684-16777220","timestamp":[2062295754080,1644482072],"ttl":-1,"type":"log","prev_id":""}}
{"op":"set","id":2}
{"k":"filebeat::logs::native::12943379684-16777220","v":{"prev_id":"","offset":2703,"timestamp":[2062176572080,1644482082],"ttl":-1,"type":"log","FileStateOS":{"device":16777220,"inode":12943379684},"id":"native::12943379684-16777220","source":"/Users/kyle/projects/futu/dp_logpipe/logpipe_collector/logpipe_sdk.log","identifier_name":"native"}}
第二条日志,错误地 ack 了消息位置 “offset”:2703,本来用户密码错误应该一直发送失败的,结果被当成了发送成功而进行了 ack。
然后把 kafka 的用户密码改成正确的,再重启 filebeat,会发现之前的消息不会重复发送,因为被当成发送成功而不会再次读取。
bug 代码
有问题的代码位于 libbeat/outputs/kafka/client.go
func (r *msgRef) fail(msg *message, err error) {
switch err {
case sarama.ErrInvalidMessage:
r.client.log.Errorf("Kafka (topic=%v): dropping invalid message", msg.topic)
r.client.observer.Dropped(1)
case sarama.ErrMessageSizeTooLarge, sarama.ErrInvalidMessageSize:
r.client.log.Errorf("Kafka (topic=%v): dropping too large message of size %v.",
msg.topic,
len(msg.key)+len(msg.value))
r.client.observer.Dropped(1)
case breaker.ErrBreakerOpen:
// Add this message to the failed list, but don't overwrite r.err since
// all the breaker error means is "there were a lot of other errors".
r.failed = append(r.failed, msg.data)
default:
r.failed = append(r.failed, msg.data)
if r.err == nil {
// Don't overwrite an existing error. This way at tne end of the batch
// we report the first error that we saw, rather than the last one.
r.err = err
}
}
r.dec()
}
case breaker.ErrBreakerOpen 这个条件下,没有对 r.error 进行设置。导致判断 ack 时,会误认为正常:
func (r *msgRef) dec() {
i := atomic.AddInt32(&r.count, -1)
if i > 0 {
return
}
r.client.log.Debug("finished kafka batch")
stats := r.client.observer
// 这里取了 r.err 是否为 nil 来判断要不要 ack,因为上面 bug 代码一个batch消息都是 ErrBreakerOpen 异常时,r.err 未设置,所以这里 err=nil,会触发 ack
err := r.err
if err != nil {
failed := len(r.failed)
success := r.total - failed
r.batch.RetryEvents(r.failed)
stats.Failed(failed)
if success > 0 {
stats.Acked(success)
}
r.client.log.Debugf("Kafka publish failed with: %+v", err)
} else {
r.batch.ACK()
stats.Acked(r.total)
}
}
修复方法
case breaker.ErrBreakerOpen:
// Add this message to the failed list, but don't overwrite r.err since
// all the breaker error means is "there were a lot of other errors".
r.failed = append(r.failed, msg.data)
// 添加下面三行
if r.err == nil {
r.err = err
}
default:
r.failed = append(r.failed, msg.data)
if r.err == nil {
// Don't overwrite an existing error. This way at tne end of the batch
// we report the first error that we saw, rather than the last one.
r.err = err
}