发现两个 Filebeat 的 Bug

最近由于工作原因,仔细研究了一下 Filebeat 这个开源软件的代码,然后意外发现了两处 Bug,并且可以重现。

本想顺手修复一下,给官方提个 pull request,但是一看 Filebeat Gitlab 代码仓库,目前都在 1.4k 个 issue 未处理,我这两个估计也没人管,所以作罢。

于是发此文章,当作记录一下。下面开始讲一下这两处 Bug,我测试是用的比较早的版本 7.13.2,截止目前 Filebeat 的最新版本是 8.0.0,同样存在这两个 Bug。

一、filebeat 处理超长 Line 时导致记录文件 offset 不准的 bug

测试 filebeat 版本: filebeat-7.13.2-darwin-x86_64

Bug 描述

filebeat 定义了每行日志的最大长度,一旦超过此长度,则会丢弃此行。但是 filebeat 在丢弃这行数据的时候,并没有将这行数据的长度记录到日志文件读取的 offset 中,导致落地的文件读取进度会少于预期值。

重现方法

见同目录的 filebeat.yml 配置文件,重点配置

  max_bytes: 200
  read_buffer_size: 16384

意思是一次从文件读取 16k 数据,每行日志的最大长度超过 200 会被丢弃。filebeat 实际判断的时候,会把 max_bytes 乘以 4 ,防止 utf32 这种编码。所以实际上超过 800 行的日志会被丢弃。

我们构造了一个日志文件 bigline.txt,见同目录。文件第一行超过了 800 字节,应该被丢弃,第二行正常,应该正常输出。

运行命令,filebeat -c filebeat.yml ,输出

Exceeded 800 max bytes in line limit, skipped 2986 bytes 

{"@timestamp":"2021-10-14T08:37:01.054Z","@metadata":{"beat":"filebeat","type":"_doc","version":"8.0.0"},"message":"456789","input":{"type":"log"},"log":{"offset":0,"file":{"path":"/Users/kyle/Desktop/bigline.log"}}}

看到没,这里日志的 offset 是 0 开始的,实际是错误的,没算上被丢弃的第一行的长度。

再看 filebeat 落地的文件读取进度 registry 文件,看下 log.json

{"op":"set","id":3}
{"k":"filebeat::logs::native::12937144398-16777220","v":{"id":"native::12937144398-16777220","source":"/Users/kyle/Desktop/bigline.log","timestamp":[2061641314080,1634200622],"ttl":-1,"FileStateOS":{"inode":12937144398,"device":16777220},"prev_id":"","offset":7,"type":"log","identifier_name":"native"}}

同样,记录的文件读取进度,也是 7,没算上丢弃消息长度。

这样再次启动 filebeat 时,会从位置 7 开始读取,又会出现行超过长度的提示

Exceeded 800 max bytes in line limit, skipped 2979 bytes line

{"@timestamp":"2021-10-14T08:42:22.503Z","@metadata":{"beat":"filebeat","type":"_doc","version":"8.0.0"},"log":{"offset":7,"file":{"path":"/Users/kyle/Desktop/bigline.log"}},"message":"456789","input":{"type":"log"}}

这里又把第二行内容 456789 读了出来。

bug 代码

丢弃超长行的代码逻辑在 libbeat/reader/readfile/line.go

//  func (r *LineReader) advance() error  
// If max bytes limit per line is set, then drop the lines that are longer
        if r.maxBytes != 0 {
            //fmt.Printf("advance maxBytes != 0\n")
            // If newLine is found, drop the lines longer than maxBytes
            for idx != -1 && idx > r.maxBytes {
                r.logger.Warnf("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, idx)
                err = r.inBuffer.Advance(idx + len(r.nl))
                r.inBuffer.Reset()
                r.inOffset = 0
                idx = r.inBuffer.IndexFrom(r.inOffset, r.nl)
            }

            // If newLine is not found and the incoming data buffer exceeded max bytes limit, then skip until the next newLine
            if idx == -1 && r.inBuffer.Len() > r.maxBytes {
                skipped, err := r.skipUntilNewLine()
                if err != nil {
                    r.logger.Error("Error skipping until new line, err:", err)
                    return err
                }
                r.logger.Warnf("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, skipped)
                idx = r.inBuffer.IndexFrom(r.inOffset, r.nl)
            }
        }

修复方法

@@ -122,7 +122,9 @@ func (r *LineReader) advance() error {
                        // If newLine is found, drop the lines longer than maxBytes
                        for idx != -1 && idx > r.maxBytes {
                                logrus.Warnf("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, idx)
-                               err = r.inBuffer.Advance(idx + len(lineTerminatorCharacter))
+                               skipped := idx + len(lineTerminatorCharacter)
+                               err = r.inBuffer.Advance(skipped)
+                               r.byteCount += skipped
                                if err != nil {
                                        return err
                                }
@@ -139,6 +141,7 @@ func (r *LineReader) advance() error {
                                        return err
                                }
                                logrus.Warnf("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, skipped)
+                               r.byteCount += skipped
                                idx = r.inBuffer.IndexFrom(r.inOffset, lineTerminatorCharacter)
                        }
                }

二、filebeat 遇到 ErrBreakerOpen 报错会错误 ACK 导致消息丢失的 bug

测试 filebeat 版本: filebeat-7.13.2-darwin-x86_64

Bug 描述

当 filebeat 连接 kafka 失败,例如用户密码错误时,kafka 会抛出 ErrBreakerOpen 异常,filebeat 会 sleep 一定时间再继续处理。

当发送一个 batch 消息时,如果这个 batch 内消息所有都返回 ErrBreakerOpen 错误,则这个 batch 会被当成正常发送而进行 ACK 逻辑。

导致这些本没有发送成功的消息,ack 后变成发送成功了。也就是消息丢失。

重现方法

见同目录的 filebeat.yml 配置文件,重点配置

queue:
  mem:
    # 这样一个 batch 消息条数少,重现容易些
    flush.min_events: 2

output.kafka:
  # 故意设置成一个错误的用户名
  username: 'xxx'

运行命令,filebeat -c filebeat.yml,过一会,观察 filebeat ack 日志

cat registry/filebeat/log.json                                                                                                16:34:59
{"op":"set","id":1}
{"k":"filebeat::logs::native::12943379684-16777220","v":{"source":"/Users/kyle/projects/futu/dp_logpipe/logpipe_collector/logpipe_sdk.log","offset":0,"FileStateOS":{"inode":12943379684,"device":16777220},"identifier_name":"native","id":"native::12943379684-16777220","timestamp":[2062295754080,1644482072],"ttl":-1,"type":"log","prev_id":""}}
{"op":"set","id":2}
{"k":"filebeat::logs::native::12943379684-16777220","v":{"prev_id":"","offset":2703,"timestamp":[2062176572080,1644482082],"ttl":-1,"type":"log","FileStateOS":{"device":16777220,"inode":12943379684},"id":"native::12943379684-16777220","source":"/Users/kyle/projects/futu/dp_logpipe/logpipe_collector/logpipe_sdk.log","identifier_name":"native"}}

第二条日志,错误地 ack 了消息位置 “offset”:2703,本来用户密码错误应该一直发送失败的,结果被当成了发送成功而进行了 ack。

然后把 kafka 的用户密码改成正确的,再重启 filebeat,会发现之前的消息不会重复发送,因为被当成发送成功而不会再次读取。

bug 代码

有问题的代码位于 libbeat/outputs/kafka/client.go

func (r *msgRef) fail(msg *message, err error) {
    switch err {
    case sarama.ErrInvalidMessage:
        r.client.log.Errorf("Kafka (topic=%v): dropping invalid message", msg.topic)
        r.client.observer.Dropped(1)

    case sarama.ErrMessageSizeTooLarge, sarama.ErrInvalidMessageSize:
        r.client.log.Errorf("Kafka (topic=%v): dropping too large message of size %v.",
            msg.topic,
            len(msg.key)+len(msg.value))
        r.client.observer.Dropped(1)

    case breaker.ErrBreakerOpen:
        // Add this message to the failed list, but don't overwrite r.err since
        // all the breaker error means is "there were a lot of other errors".
        r.failed = append(r.failed, msg.data)

    default:
        r.failed = append(r.failed, msg.data)
        if r.err == nil {
            // Don't overwrite an existing error. This way at tne end of the batch
            // we report the first error that we saw, rather than the last one.
            r.err = err
        }
    }
    r.dec()
}

case breaker.ErrBreakerOpen 这个条件下,没有对 r.error 进行设置。导致判断 ack 时,会误认为正常:

func (r *msgRef) dec() {
    i := atomic.AddInt32(&r.count, -1)
    if i > 0 {
        return
    }

    r.client.log.Debug("finished kafka batch")
    stats := r.client.observer
    // 这里取了 r.err 是否为 nil 来判断要不要 ack,因为上面 bug 代码一个batch消息都是 ErrBreakerOpen 异常时,r.err 未设置,所以这里 err=nil,会触发 ack
    err := r.err   
    if err != nil {
        failed := len(r.failed)
        success := r.total - failed
        r.batch.RetryEvents(r.failed)

        stats.Failed(failed)
        if success > 0 {
            stats.Acked(success)
        }

        r.client.log.Debugf("Kafka publish failed with: %+v", err)
    } else {
        r.batch.ACK()
        stats.Acked(r.total)
    }
}

修复方法

    case breaker.ErrBreakerOpen:
        // Add this message to the failed list, but don't overwrite r.err since
        // all the breaker error means is "there were a lot of other errors".
        r.failed = append(r.failed, msg.data)
        // 添加下面三行
        if r.err == nil {
            r.err = err
        }

    default:
        r.failed = append(r.failed, msg.data)
        if r.err == nil {
            // Don't overwrite an existing error. This way at tne end of the batch
            // we report the first error that we saw, rather than the last one.
            r.err = err
        }