Building your own Fluentd plugins is surprisingly straightforward once you understand the core mechanics. The most counterintuitive truth is that you don’t need to be a Fluentd expert to write a plugin; you just need to know how to write Ruby and understand how data flows through a basic Unix pipe.

Let’s see a simple example in action. Imagine we want to capture logs from a specific file, tag them with a custom prefix, and then send them to standard output.

First, let’s define our custom input plugin. This plugin will continuously read lines from a specified file.

# input/my_file_input.rb
require 'fluent/plugin/input'

module Fluent::Plugin
  class MyFileInput < Input
    Fluent::Plugin.register_input('my_file', self)

    config_param :path, :string
    config_param :tag_prefix, :string, default: 'my_app'

    def configure(conf)
      super
      raise Fluent::ConfigError, "Missing 'path' parameter" unless @path
    end

    def start
      super
      @thread = Thread.new do
        File.open(@path, 'r') do |f|
          f.each_line do |line|
            router.emit(generate_tag, Time.now, { message: line.chomp })
            sleep 0.1 # Don't hog the CPU
          end
        end
      end
    end

    def shutdown
      @thread.kill if @thread
      super
    end

    private

    def generate_tag
      "#{@tag_prefix}.file.#{@path.gsub(/[^a-zA-Z0-9_.-]/, '_')}"
    end
  end
end

Next, a filter plugin. This one will add a static field to each record.

# filter/my_static_field_filter.rb
require 'fluent/plugin/filter'

module Fluent::Plugin
  class MyStaticFieldFilter < Filter
    Fluent::Plugin.register_filter('my_static_field', self)

    config_param :static_key, :string, default: 'source_type'
    config_param :static_value, :string, default: 'custom_input'

    def filter(tag, time, record)
      record[@static_key] = @static_value
      record
    end
  end
end

Finally, an output plugin. This will simply print the formatted log to standard output.

# out/my_stdout_output.rb
require 'fluent/plugin/output'

module Fluent::Plugin
  class MyStdoutOutput < Output
    Fluent::Plugin.register_output('my_stdout', self)

    def configure(conf)
      super
    end

    def emit(tag, time, record)
      puts "#{tag} #{time.to_s}: #{record.to_json}"
    end
  end
end

To use these, you’d place them in a plugins directory relative to your Fluentd configuration. Here’s a fluentd.conf to tie it all together:

<source>
  @type my_file
  path /tmp/my_app.log
  tag_prefix my_service
</source>

<filter>
  @type my_static_field
  static_key environment
  static_value production
</filter>

<match **>
  @type my_stdout
</match>

Now, if you create a file /tmp/my_app.log with some lines like "hello world" and "another log message", and run fluentd -c fluentd.conf, you’ll see output like:

my_service.file._tmp_my_app_log 2023-10-27 10:30:00 +0000: {"message":"hello world","source_type":"custom_input","environment":"production"}
my_service.file._tmp_my_app_log 2023-10-27 10:30:01 +0000: {"message":"another log message","source_type":"custom_input","environment":"production"}

The system in action is essentially a series of Ruby classes that receive data, transform it, and pass it along. The router.emit method in the input plugin is your gateway to the Fluentd event stream. The filter method in the filter plugin receives the tag, time, and record, and returns the modified record. The emit method in the output plugin receives the final tag, time, and record, and does something with it.

The core problem Fluentd solves is aggregating and processing logs from diverse sources into a unified format for analysis or storage. Your plugins extend this by allowing you to ingest data from custom sources (like a proprietary daemon’s log file), enrich it with business context (like adding an environment tag), or route it to unique destinations (like a custom metrics endpoint).

The config_param decorator is how you expose configuration options to your plugin. These are defined in the .conf file and passed as arguments to your plugin’s constructor. The Fluent::Plugin.register_xxx calls are crucial for Fluentd to discover and load your plugins. Without them, Fluentd will act as if your plugins don’t exist.

A common misconception is that plugins must be complex. For filters, you can simply return record without modification if you don’t want to change anything, or return nil to drop the event entirely. The tag and time arguments are also mutable if you need to alter the event’s metadata before it proceeds.

The real power lies in how these components chain together. The output of one plugin becomes the input of the next, forming a data pipeline that you meticulously define. Each plugin is a distinct step, making debugging and understanding data flow remarkably simple once you grasp the tag, time, and record tuple.

The most unexpected aspect for many is how easily you can manipulate the tag. You aren’t just filtering records; you’re actively routing events based on their tags, and your custom plugins can generate or modify these tags at any stage, enabling sophisticated routing logic that goes far beyond simple pattern matching.

The next step in mastering Fluentd plugins is understanding how to handle asynchronous operations and error reporting within your plugins, especially when dealing with external services.

Want structured learning?

Take the full Fluentd course →