Building your own Fluentd plugins is surprisingly straightforward once you understand the core mechanics. The most counterintuitive truth is that you don’t need to be a Fluentd expert to write a plugin; you just need to know how to write Ruby and understand how data flows through a basic Unix pipe.
Let’s see a simple example in action. Imagine we want to capture logs from a specific file, tag them with a custom prefix, and then send them to standard output.
First, let’s define our custom input plugin. This plugin will continuously read lines from a specified file.
# input/my_file_input.rb
require 'fluent/plugin/input'
module Fluent::Plugin
class MyFileInput < Input
Fluent::Plugin.register_input('my_file', self)
config_param :path, :string
config_param :tag_prefix, :string, default: 'my_app'
def configure(conf)
super
raise Fluent::ConfigError, "Missing 'path' parameter" unless @path
end
def start
super
@thread = Thread.new do
File.open(@path, 'r') do |f|
f.each_line do |line|
router.emit(generate_tag, Time.now, { message: line.chomp })
sleep 0.1 # Don't hog the CPU
end
end
end
end
def shutdown
@thread.kill if @thread
super
end
private
def generate_tag
"#{@tag_prefix}.file.#{@path.gsub(/[^a-zA-Z0-9_.-]/, '_')}"
end
end
end
Next, a filter plugin. This one will add a static field to each record.
# filter/my_static_field_filter.rb
require 'fluent/plugin/filter'
module Fluent::Plugin
class MyStaticFieldFilter < Filter
Fluent::Plugin.register_filter('my_static_field', self)
config_param :static_key, :string, default: 'source_type'
config_param :static_value, :string, default: 'custom_input'
def filter(tag, time, record)
record[@static_key] = @static_value
record
end
end
end
Finally, an output plugin. This will simply print the formatted log to standard output.
# out/my_stdout_output.rb
require 'fluent/plugin/output'
module Fluent::Plugin
class MyStdoutOutput < Output
Fluent::Plugin.register_output('my_stdout', self)
def configure(conf)
super
end
def emit(tag, time, record)
puts "#{tag} #{time.to_s}: #{record.to_json}"
end
end
end
To use these, you’d place them in a plugins directory relative to your Fluentd configuration. Here’s a fluentd.conf to tie it all together:
<source>
@type my_file
path /tmp/my_app.log
tag_prefix my_service
</source>
<filter>
@type my_static_field
static_key environment
static_value production
</filter>
<match **>
@type my_stdout
</match>
Now, if you create a file /tmp/my_app.log with some lines like "hello world" and "another log message", and run fluentd -c fluentd.conf, you’ll see output like:
my_service.file._tmp_my_app_log 2023-10-27 10:30:00 +0000: {"message":"hello world","source_type":"custom_input","environment":"production"}
my_service.file._tmp_my_app_log 2023-10-27 10:30:01 +0000: {"message":"another log message","source_type":"custom_input","environment":"production"}
The system in action is essentially a series of Ruby classes that receive data, transform it, and pass it along. The router.emit method in the input plugin is your gateway to the Fluentd event stream. The filter method in the filter plugin receives the tag, time, and record, and returns the modified record. The emit method in the output plugin receives the final tag, time, and record, and does something with it.
The core problem Fluentd solves is aggregating and processing logs from diverse sources into a unified format for analysis or storage. Your plugins extend this by allowing you to ingest data from custom sources (like a proprietary daemon’s log file), enrich it with business context (like adding an environment tag), or route it to unique destinations (like a custom metrics endpoint).
The config_param decorator is how you expose configuration options to your plugin. These are defined in the .conf file and passed as arguments to your plugin’s constructor. The Fluent::Plugin.register_xxx calls are crucial for Fluentd to discover and load your plugins. Without them, Fluentd will act as if your plugins don’t exist.
A common misconception is that plugins must be complex. For filters, you can simply return record without modification if you don’t want to change anything, or return nil to drop the event entirely. The tag and time arguments are also mutable if you need to alter the event’s metadata before it proceeds.
The real power lies in how these components chain together. The output of one plugin becomes the input of the next, forming a data pipeline that you meticulously define. Each plugin is a distinct step, making debugging and understanding data flow remarkably simple once you grasp the tag, time, and record tuple.
The most unexpected aspect for many is how easily you can manipulate the tag. You aren’t just filtering records; you’re actively routing events based on their tags, and your custom plugins can generate or modify these tags at any stage, enabling sophisticated routing logic that goes far beyond simple pattern matching.
The next step in mastering Fluentd plugins is understanding how to handle asynchronous operations and error reporting within your plugins, especially when dealing with external services.