Journal

Writing down the things I learned. To share them with others and my future self.

08 May 2020

On Using Fluentd to Parse Elastic Common Scheme from Kubernetes Pod Logs

Today I had to parse the JSON logs of our containers in Kubernetes. Our application are logging in the Elastic Common Scheme format to STDOUT. This format is a JSON object with well-defined fields per log line. We use a fluentd daemonset to read the container logs from the nodes. The outputs of STDOUT and STDERR are saved in /var/log/containers on the nodes by the docker daemon. This setup is very common in the Kubernetes environment. The official fluend-elasticsearch addon uses the same approach. In this article I will show the problems I encountered while parsing these logs.

The fluentd input to read the container logs is:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
  <source>
    @type tail
    @id in_tail_container_logs
    path "/var/log/containers/*.log"
    pos_file "/var/log/fluentd-containers.log.pos"
    tag "kubernetes.*"
    read_from_head true
    <parse>
      @type "json"
      time_format "%Y-%m-%dT%H:%M:%S.%NZ"
      time_type string
    </parse>
  </source>

As you can see in the above snippet the input parses the container logs as JSON via the fluentd JSON parser. The format of the container log files is the docker default json-file (details). If the container prints the string this is a log line\n to STDOUT, the docker daemon saves the following line to the container log file:

1
{"log":"this is a log line\n","stream":"stdout","time":"2019-01-01T11:11:11.111111111Z"}

After passing the fluentd input, the fluentd event has exact three fields. log, stream and the timestamp.

Since our container are logging JSON to STDOUT, the lines in the container logs looks like the following:

1
2
{"log":"{\"message\":\"this is a log line\", \"log\":{\"level\":\"info\"}}","stream":"stdout","time":"2019-01-01T11:11:11.111111111Z"}

This is where the problems start. We told the input to parse every line has JSON. The resulting fluentd event looks like this:

1
2
3
4
5
{
  log => "{"message":"this is a log line", "log":{"level":"info"}}",
  stream => stdout,
  time => "2019-01-01T11:11:11.111111111Z",
}

The fluentd JSON parser can not parse the output of our application as JSON since the docker daemon escaped the JSON in the log string. To solve this I used a filter with a JSON parser:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
<filter **>
  @type parser
  key_name "log"
  reserve_data true
  reserve_time true
  hash_value_field "parsed"
  remove_key_name_field true
  replace_invalid_sequence true
  emit_invalid_record_to_error false
  <parse>
    @type "json"
  </parse>
</filter>

After our fluentd event passed this filter it looks like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
{
    {
      parsed = > {
        message => "this is a log line",
        log => {
           level => "info"
        },
      },
      stream => stdout,
      time => "2019-01-01T11:11:11.111111111Z",
    }
}

The parser removes the original log line from the event since we set the option remove_key_name_field. We also tell the JSON parser to keep all data, and the already parsed timestamp via the options reserve_time and reserve_data. Some applications in the cluster are not logging in JSON format. Therefore, we set the option emit_invalid_record_to_error to false. Without that setting, every non-JSON log line get the @ERROR label and require special treatment by other filters.

The ECS logging scheme does not have a field parsed. Moreover, the field message is at the top-level of the object. To mutate records, fluentd has the record_transformer filter. This filter can set event field values to values of other fields, but can’t copy the content of the field parsed to the top-level of the event. This was the hardest problem I encountered. After some googling I found the filter record_modifier. I applied the trick shown in the record_modifier README to implement complex logic in ruby.

1
2
3
4
5
6
7
<filter **>
  @type record_modifier
  remove_keys _dummy_,parsed
  <record>
    _dummy_ ${record.merge!(record["parsed"])}
  </record>
</filter>

We set the field _dummy_ to the result of the ruby expression ${record.merge!(record["parsed"])}. The content of our fluentd event is the hash record. The function merge!merges the event in place with the values of the field record["parsed]. The result of the merge is irrelevant. Therefore we drop the field _dummy_ after applying the filter. The filter deletes also the merged field parsed. After applying that filter our event looks like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
{
    {
        message => "this is a log line",
        log => {
           level => "info"
        },
        stream => stdout,
        time => "2019-01-01T11:11:11.111111111Z",
    }
}

This event complies with the Elastic Common Scheme. The final fluentd configuration is:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
  <source>
    @type tail
    @id in_tail_container_logs
    path "/var/log/containers/*.log"
    pos_file "/var/log/fluentd-containers.log.pos"
    tag "kubernetes.*"
    read_from_head true
    <parse>
      @type "json"
      time_format "%Y-%m-%dT%H:%M:%S.%NZ"
      time_type string
    </parse>
  </source>

  <filter **>
    @type parser
    key_name "log"
    reserve_data true
    reserve_time true
    hash_value_field "parsed"
    remove_key_name_field true
    replace_invalid_sequence true
    emit_invalid_record_to_error false
    <parse>
      @type "json"
    </parse>
  </filter>

  <filter **>
    @type record_modifier
    remove_keys _dummy_,parsed
    <record>
      _dummy_ ${record.merge!(record["parsed"])}
    </record>
  </filter>

  <match my.logs>
    @type elasticsearch
    host localhost
    port 9200
    logstash_format true
  </match>