07/07/2018

Writing Apache Flume plugin for Telegraf - Golang

At the beginning of this year while working with Data science team, we had a use case where Apache Flume fits in. But first let me quote this from official Flume page:

Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms.

After writing a SaltStack formula for it, then "monitoring" step came where we monitor every component in our stack. So as usual, since we already use for monitoring Telegraf (an agent written in Go for collecting, processing, aggregating, and writing metrics) as part of TICK Stack, so I looked for an official plugin for it, but couldn't find one!

I found that it's nice to have one, also to polish my Golang knowledge. And since I already know enough about Telegraf internals, it wasn't that hard, but to deal with Golang. I've took a look on some available resources like "Telegraf contributing guide", and "How to Write a Telegraf Plugin for Beginners". Finally I ended up writing one: telegraf-flume-plugin!

Try it!

So if you want to try/use this plugin, first download Telegraf if you don't have it, then copy flume directory to the following path:

cp -a flume $GOPATH/src/github.com/influxdata/telegraf/plugins/inputs
Then compile Telegraf, and generate a test conf:
# This is just for test purpose.
./telegraf config > telegraf.conf
cat << EOF >> telegraf.conf

[[inputs.flume]]
  name = "flume_agents_test"
  servers = [
    "http://localhost:41414/flume01.json",
    "http://localhost:41414/flume02.json",
  ]
EOF
Run the simple http server with samples data:
cd samples
python -m SimpleHTTPServer 41414
# or
# python3 -m http.server 41414
Finally in Telegraf dir:
./telegraf --config telegraf.conf --input-filter flume --test

Output example

Based on the samples, the output should be the following:

flume_agents_test,type=SOURCE,name=kafka-source01 AppendBatchAcceptedCount=0,AppendReceivedCount=0,KafkaEmptyCount=0,OpenConnectionCount=0,StartTime=1516706313621,StopTime=0,AppendAcceptedCount=0,AppendBatchReceivedCount=0,EventAcceptedCount=73353071,EventReceivedCount=73353071,KafkaCommitTimer=6432357,KafkaEventGetTimer=616114808
flume_agents_test,type=CHANNEL,name=hdfs-channel02 EventPutAttemptCount=4234929,EventPutSuccessCount=4234929
flume_agents_test,type=CHANNEL,name=hdfs-channel04 EventPutAttemptCount=12767892,EventPutSuccessCount=12767892
flume_agents_test,type=CHANNEL,name=null-channel EventPutAttemptCount=17057471,EventPutSuccessCount=17057471
flume_agents_test,name=hdfs-sink02,type=SINK BatchUnderflowCount=19281,ConnectionClosedCount=704,ConnectionCreatedCount=705,ConnectionFailedCount=0,EventDrainAttemptCount=4234929,EventDrainSuccessCount=4233252,BatchCompleteCount=691,BatchEmptyCount=4573,StartTime=1516706291441,StopTime=0
flume_agents_test,type=SINK,name=hdfs-sink04 BatchEmptyCount=36,ConnectionClosedCount=704,ConnectionCreatedCount=707,ConnectionFailedCount=8,StartTime=1516706291445,StopTime=0,BatchCompleteCount=2276,BatchUnderflowCount=2694,EventDrainAttemptCount=12770801,EventDrainSuccessCount=12764502
flume_agents_test,type=CHANNEL,name=hdfs-channel01 EventPutAttemptCount=2757422,EventPutSuccessCount=2757422
flume_agents_test,type=CHANNEL,name=hdfs-channel03 EventPutSuccessCount=36535357,EventPutAttemptCount=36535357
flume_agents_test,type=SINK,name=hdfs-sink01 EventDrainAttemptCount=2757422,EventDrainSuccessCount=2756217,StopTime=0,BatchEmptyCount=9466,ConnectionClosedCount=704,ConnectionCreatedCount=705,StartTime=1516706291440,BatchCompleteCount=317,BatchUnderflowCount=22371,ConnectionFailedCount=0
flume_agents_test,type=SINK,name=hdfs-sink03 EventDrainSuccessCount=36533183,StartTime=1516706291444,BatchUnderflowCount=3138,ConnectionClosedCount=704,ConnectionCreatedCount=706,EventDrainAttemptCount=36536896,BatchCompleteCount=7147,BatchEmptyCount=124,ConnectionFailedCount=4,StopTime=0
flume_agents_test,type=CHANNEL,name=memChannel EventPutSuccessCount=22948908,EventPutAttemptCount=22948908
flume_agents_test,type=CHANNEL,name=fileChannel EventPutSuccessCount=468085,EventPutAttemptCount=468086

The funny thing, when I determined to make a pull request to upstream, I found another one opened 2 days before that! But no harm! I just made a PR over that repo to add some futures from code (like filtrating).

That what I really love about open-source! :D
Actually it was a nice experience, and I got many feedback about the code and style. Now I love Golang much more :D

Powered by Blogger.

Hello, my name is Ahmed AbouZaid, I'm a passionate Tech Lead DevOps Engineer. 👋

I specialize in Cloud-Native and Kubernetes. I'm also a Free/Open source geek and book author. My favorite topics are DevOps transformation, DevSecOps, automation, data, and metrics.

More about me ➡️

Contact Me

Name

Email *

Message *

Start Your DevOps Engineer Journey!

Start Your DevOps Engineer Journey!
Start your DevOps career for free the Agile way in 2024 with the Dynamic DevOps Roadmap ⭐

Latest Post

Bootstrap Cloud-Native bootstrappers like Crossplane with K3d - Automation

I created a logo for the Crossplane Bootstrapper because all good projects deserve a logo. 😁 TL;DR ...

Popular Posts

Blog Archive