Fluentd might seem like a simple log shipper, but its real power lies in its ability to transform unstructured journald output into rich, searchable event data before it ever hits your Elasticsearch or S3 bucket.
Let’s see it in action. Imagine you have a service running, and its logs are being captured by systemd’s journald. Here’s a typical log entry you might see with journalctl:
Oct 26 10:30:00 my-server systemd[1]: Started My Application Service.
Oct 26 10:30:01 my-server my-app[12345]: INFO: User 'admin' logged in from 192.168.1.10
Oct 26 10:30:02 my-server my-app[12345]: WARN: Disk usage approaching 80%
Oct 26 10:30:03 my-server systemd[1]: Stopping My Application Service...
Now, let’s configure Fluentd to collect these logs. We’ll use the in_systemd input plugin, which is designed to read directly from the journal.
First, ensure you have the fluent-plugin-systemd gem installed:
gem install fluent-plugin-systemd
Here’s a basic fluentd.conf that collects journald logs and forwards them to a local Elasticsearch instance (assuming it’s running on localhost:9200):
<source>
@type systemd
@id in_systemd
tag systemd.myhost
<storage>
@type local
path /var/lib/fluentd/systemd.pos
</storage>
</source>
<match systemd.**>
@type elasticsearch
@id out_elasticsearch
host localhost
port 9200
logstash_format true
logstash_prefix systemd
include_tag_key true
tag_key log_topic
flush_interval 5s
</match>
When Fluentd starts with this configuration, the in_systemd plugin will begin tailing the journal. It uses a persistent cursor (/var/lib/fluentd/systemd.pos) to keep track of its position, so it won’t re-read old logs on restart.
The tag systemd.myhost directive assigns a tag to all incoming records. This tag is crucial for routing logs to the correct output plugin via the <match> directive.
The <match systemd.**> block tells Fluentd to send any records tagged with systemd. (like systemd.myhost) to Elasticsearch. logstash_format true and logstash_prefix systemd ensure the logs are indexed in Elasticsearch with a predictable naming convention (systemd-YYYY.MM.DD).
Now, let’s look at that my-app log line again: Oct 26 10:30:01 my-server my-app[12345]: INFO: User 'admin' logged in from 192.168.1.10.
Without any further processing, Fluentd would send this as a single string field, likely named MESSAGE or similar, within the Elasticsearch document. This makes searching for "User 'admin'" or "192.168.1.10" within that specific message difficult.
This is where Fluentd’s filtering and parsing capabilities shine. We can add a filter to parse the MESSAGE field and extract structured data. Let’s use the filter_parser plugin to break down that log line.
Here’s an updated fluentd.conf:
<source>
@type systemd
@id in_systemd
tag systemd.myhost
<storage>
@type local
path /var/lib/fluentd/systemd.pos
</storage>
</source>
<filter systemd.**>
@type parser
@id filter_parser_systemd
key_name MESSAGE
reserve_data true
<parse>
@type regexp
expression /^(?<time>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) (?<host>[^ ]+) (?<process_name>[^\[]+)\[(?<pid>\d+)\]: (?<level>\w+): (?<message>.*)$/
time_key time
time_format %Y-%m-%d %H:%M:%S
</parse>
</filter>
<match systemd.**>
@type elasticsearch
@id out_elasticsearch
host localhost
port 9200
logstash_format true
logstash_prefix systemd
include_tag_key true
tag_key log_topic
flush_interval 5s
</match>
In this enhanced configuration, the <filter systemd.**> block intercepts logs tagged with systemd.. The filter_parser with @type regexp uses a regular expression to break down the MESSAGE field.
The expression captures:
time: The timestamp from the log message itself.host: The hostname.process_name: The name of the process (e.g.,my-app).pid: The process ID.level: The log level (e.g.,INFO,WARN).message: The actual log message content.
time_key time and time_format %Y-%m-%d %H:%M:%S tell Fluentd to use the extracted time field as the event’s timestamp and parse it accordingly. This is crucial for accurate time-based searching in Elasticsearch.
With this filter, the log entry Oct 26 10:30:01 my-server my-app[12345]: INFO: User 'admin' logged in from 192.168.1.10 would be transformed into an Elasticsearch document with fields like time, host, process_name, pid, level (e.g., "INFO"), and message (e.g., "User 'admin' logged in from 192.168.1.10"). You can now easily search for all logs where level is "INFO" or where message contains "192.168.1.10".
The reserve_data true directive is important here. It ensures that the original MESSAGE field remains in the record along with the newly parsed fields. This is useful if your regex doesn’t capture everything perfectly, or if you want to keep the raw message for context.
The core idea is to leverage Fluentd’s plugins to enrich your log data before it’s stored. You can chain multiple filters. For instance, you could add another filter_parser to parse the message field further for specific patterns like IP addresses or user IDs, turning unstructured text into distinct, queryable fields.
The in_systemd plugin’s internal state, managed by the <storage> configuration, is what allows Fluentd to resume log collection from where it left off. This is implemented by reading and writing the journal’s cursor position to a file. The cursor is a unique identifier that points to a specific entry in the journal, allowing Fluentd to efficiently determine which logs have already been processed.
If you omit the <storage> section, Fluentd will re-read all available journal entries every time it starts, which is rarely desirable for production systems.
The next step is often to add more sophisticated parsing for specific application log formats or to aggregate logs from multiple sources into a single, unified stream.