Go’s concurrency model makes it a surprisingly good fit for building custom Fluent Bit plugins.
Let’s see what that looks like. Imagine we want to capture system metrics using procfs and send them to Fluent Bit. First, we define our plugin’s configuration.
[SERVICE]
Flush 1
Daemon Off
Log_Level info
[INPUT]
Name go_procfs
Tag system.metrics
Path /proc
Interval_Sec 5
This tells Fluent Bit to load a plugin named go_procfs, tag its output with system.metrics, look for process information in /proc, and poll every 5 seconds.
Now, the Go plugin itself.
package main
import (
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/fluent/fluent-bit-go/output"
)
// ProcfsInputConfig holds configuration for our plugin.
type ProcfsInputConfig struct {
Path string
IntervalSec int
}
// ParseConfig reads configuration from Fluent Bit.
func ParseConfig(cfg *output.Config) ProcfsInputConfig {
path, _ := cfg.Key("Path")
intervalStr, _ := cfg.Key("Interval_Sec")
interval, _ := strconv.Atoi(intervalStr)
if interval == 0 {
interval = 5 // Default interval
}
if path == "" {
path = "/proc" // Default path
}
return ProcfsInputConfig{
Path: path,
IntervalSec: interval,
}
}
// CollectMetrics reads /proc/[pid]/stat and extracts CPU usage.
func CollectMetrics(procPath string) map[string]interface{} {
metrics := make(map[string]interface{})
filepath.Walk(procPath, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
return nil // We only care about directories (PIDs)
}
// Check if the directory name is a number (potential PID)
pidStr := info.Name()
if _, err := strconv.Atoi(pidStr); err != nil {
return nil // Not a PID directory
}
statPath := filepath.Join(path, "stat")
content, err := ioutil.ReadFile(statPath)
if err != nil {
return nil // Couldn't read stat file
}
fields := strings.Fields(string(content))
if len(fields) < 14 {
return nil // Not enough fields in stat file
}
// Field 14: utime - user mode jiffies
// Field 15: stime - kernel mode jiffies
utime, _ := strconv.ParseFloat(fields[13], 64)
stime, _ := strconv.ParseFloat(fields[14], 64)
// For simplicity, we'll just report total CPU time for now.
// A real plugin would calculate delta over time.
totalCPUTime := utime + stime
metrics[fmt.Sprintf("pid_%s_cpu_jiffies", pidStr)] = totalCPUTime
return nil
})
return metrics
}
//export FLB_PLUGIN_INIT
func FLB_PLUGIN_INIT(ctx unsafe.Pointer) int {
cfg := output.GetConfig(ctx)
pluginCfg := ParseConfig(cfg)
output.SetContext(ctx, pluginCfg)
return output.FLB_OK
}
//export FLB_PLUGIN_RUN
func FLB_PLUGIN_RUN(ctx unsafe.Pointer, data unsafe.Pointer) int {
pluginCfg := output.GetContext(ctx).(ProcfsInputConfig)
// Create a new record
record := make(map[string]interface{})
// Collect metrics
procMetrics := CollectMetrics(pluginCfg.Path)
for key, value := range procMetrics {
record[key] = value
}
// Add timestamp
record["timestamp"] = time.Now().UTC().Format(time.RFC3339Nano)
// Pack record and send
// output.FLB_OK, output.FLB_ERROR, output.FLB_RETRY
// The output plugin type expects a tag and a map of records.
// We'll use the tag defined in the configuration.
tag, _ := output.GetTag(ctx)
output.SendRecord(ctx, tag, record)
// Simulate the interval by sleeping. In a real scenario, this would be
// managed by Fluent Bit's input plugin scheduler. For output plugins,
// this function is called whenever data is ready to be processed.
// For an input plugin, you'd typically have a goroutine that sleeps
// and then calls output.SendRecord.
//
// Since this example is structured as an output plugin for demonstration,
// we'll simulate the polling loop here. A true input plugin would have
// a separate goroutine for this.
time.Sleep(time.Duration(pluginCfg.IntervalSec) * time.Second)
return output.FLB_OK
}
//export FLB_PLUGIN_EXIT
func FLB_PLUGIN_EXIT() int {
return output.FLB_OK
}
func main() {
// This is required for the plugin to be loadable by Fluent Bit.
// The actual Go code will be compiled into a shared library (.so).
}
The FLB_PLUGIN_INIT, FLB_PLUGIN_RUN, and FLB_PLUGIN_EXIT functions are the entry points that Fluent Bit expects. FLB_PLUGIN_INIT reads the configuration, FLB_PLUGIN_RUN is called repeatedly to collect and send data, and FLB_PLUGIN_EXIT handles cleanup.
Inside FLB_PLUGIN_RUN, we call CollectMetrics. This function walks the /proc directory, finds subdirectories that are numbers (representing PIDs), and reads their stat file. It then parses the user and kernel CPU jiffies (CPU time spent in user mode and kernel mode, respectively) for each process. For simplicity, this example aggregates these as a single cpu_jiffies metric per PID. A more robust plugin would calculate the delta of these jiffies over time to determine actual CPU usage percentage.
The output.SendRecord function is key here. It takes the Fluent Bit context, the tag (which we fetch using output.GetTag(ctx)), and the collected record (a Go map[string]interface{}) and marshals it into Fluent Bit’s internal format for forwarding.
The time.Sleep is a simplification. For a true input plugin, you’d typically spawn a goroutine in FLB_PLUGIN_INIT that periodically calls CollectMetrics and then uses output.SendRecord (or a similar mechanism for inputs) to push data into Fluent Bit’s internal buffer. The FLB_PLUGIN_RUN function is more idiomatic for output plugins, which are invoked by Fluent Bit when there’s data to process. However, the core logic of collecting and sending data remains the same.
The output.SetContext and output.GetContext functions allow you to store and retrieve arbitrary Go data (like our ProcfsInputConfig) associated with the plugin instance across calls to FLB_PLUGIN_RUN.
The unsafe.Pointer is necessary because Fluent Bit’s C API uses void pointers, and Go needs to interact with them. The fluent-bit-go/output package provides safe wrappers around these.
When you compile this Go code (go build -buildmode=c-shared -o go_procfs.so main.go), you get a shared library. You then place this .so file in Fluent Bit’s plugin directory and configure Fluent Bit to load it as an input.
One thing many people miss is how Fluent Bit manages the lifecycle of these plugins. When Fluent Bit starts, it loads the shared library and calls FLB_PLUGIN_INIT. It then repeatedly calls FLB_PLUGIN_RUN as dictated by the plugin type and configuration. When Fluent Bit shuts down gracefully, it calls FLB_PLUGIN_EXIT to allow for resource cleanup. This makes the Go plugin behave like a native C plugin from Fluent Bit’s perspective.
The next concept you’ll likely encounter is handling errors more gracefully and implementing more sophisticated metric calculations, such as CPU utilization percentages derived from deltas.