Send Flink's logs to ElasticSearch using Log4j

November 29, 2017

Flink uses slf4j as its logging façade, and log4j as the default logging framework (they support logback too). Logs are accessible via Flink’s UI in the JobManager tab which is good for short-lived jobs but unusable for long-lived, streaming applications. You probably want your logs out of there somewhere else; here’s how you can send them to ElasticSearch so you can access them, say, with Kibana.

First, you will need a log4j binding for ElasticSearch; Downfy/log4j-elasticsearch-java-api seems to do the job.

You have to compile the project into a jar and place it in Flink’s lib folder. By default, that project will compile a simple JAR with no dependencies, which is inconvenient because it also depends on elasticsearch and jest. I made a fork that uses Gradle + the Shadow Jar plugin to make a fat jar with everything you need:

git clone https://github.com/casidiablo/log4j-elasticsearch-java-api.git
./gradlew shadowJar

… or you can just download the latest JAR I built if you are lazy enough to trust me.

Lastly, update your log4j.properties file in Flink’s conf directory:

# include elastic search appender to the root logger
log4j.rootLogger=INFO, file, ealstic

# Config ES logging appender
log4j.appender.elastic=com.letfy.log4j.appenders.ElasticSearchClientAppender
log4j.appender.elastic.elasticHost=https://YOUR_ELASTICSEARCH_URL

# more options (see github project for the full list)
log4j.appender.elastic.elasticIndex=logging-index
log4j.appender.elastic.elasticType=logging

One of the changes I made in my fork of log4j-elasticsearch-java-api was adding support for AWS.

Tipically, Amazon managed ElasticSearch clusters are configured with an access policy that restricts access either by IP or by IAM user/role. If your Flink cluster is running on Amazon’s EMR, you need a little bit extra work to make this work:

  1. Configure ElasticSearch’s access policy:
{
  "Version": "2012-10-17",
  "Statement": [
    ...
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::ACCOUNT_ID:role/EC2_ROLE_USED_BY_YOUR_CLUSTER"
      },
      "Action": "es:*", // you can tighten this as much as you want
      "Resource": "arn:aws:es:YOUR_REGION:ACCOUNT_ID:domain/YOUR_ES_CLUSTER_NAME/*"
    }
    ...
  ]
}
  1. Add an entry to the log4j.properties file:
log4j.appender.elastic.awsRegion=us-east-1

Note: on EMR, you can find the file in /etc/flink/conf/ but it’s better to use EMR’s configuration facilities.

© 2017 | Powered by Hugo ♥ | Art by Clip Art ETC