Streaming Vault Audit Logs to ElasticSearch

December 10, 2018
vault ksql kafka elassticsearch kafka-connect

Kibana Dashboard

Confluent’s kafka-connect-elasticsearch connector allows you to read messages, in Avro format, from a Kafka topic and insert them into a ElasticSearch index.

Vault only supports writing to Kafka in JSON format, so we’ll use KSQL to convert the messages to AVRO, and then uses Kafka Connect to get the messages into Kibana.

CREATE STREAM vault_audit_logs
(
  time VARCHAR,
  type VARCHAR,
  auth STRUCT
     <client_token   VARCHAR,
      accessor       VARCHAR,
      display_name   VARCHAR,
      policies       ARRAY<STRING>,
      token_policies ARRAY<STRING>,
      entity_id      VARCHAR,
      token_type     VARCHAR>,
  request STRUCT
     <id            VARCHAR,
      operation      VARCHAR,
      path           VARCHAR,
      remote_address VARCHAR>,
  response STRUCT<data STRUCT<error VARCHAR>>,
  error VARCHAR
)
 WITH(
  KAFKA_TOPIC='vault',
  VALUE_FORMAT='JSON',
  TIMESTAMP='time',
  TIMESTAMP_FORMAT='yyyy-MM-dd''T''HH:mm:ss[.SSSSSS][.SSSSS][.SSSS][.SSS][.SS][.S]''Z'''
);

And then persist it to to the vault_avro topic:

CREATE STREAM WITH (VALUE_FORMAT='AVRO', KAFKA_TOPIC='vault_avro') AS SELECT * FROM vault_audit_logs;

To send the logs to ElasticSearch, you can create a Kafka-Connect connector to read from this Avro topic and save each message in ElasticSearch.

provider "kafka-connect" {
  url = "http://localhost:8083"
}

resource "kafka-connect_connector" "vault-es" {
  name = "vault-es"

  config = {
    "connector.class"                     = "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector"
    "value.converter"                     = "io.confluent.connect.avro.AvroConverter"
    "key.converter"                       = "org.apache.kafka.connect.storage.StringConverter"
    "key.converter.schema.registry.url"   = "http://localhost:8081"
    "value.converter.schema.registry.url" = "http://localhost:8081"
    "connection.url"                      = "http://localhost:9200"
    "type.name"                           = "type.name=kafka-connect"
    "topics"                              = "vault_avro"
    "key.ignore"                          = "true"
  }
}

It is common to use daily indexes in ElasticSearch, so that data can be rotated out.

We can utilize Kafka-Connect’s Single Message Transforms to add the timestamp of the message, and send the message to a daily index

Use the timestamp that Vault said was when the message happened (rather than when the message was sent to Kafka) with:

"transforms.InsertTimeStamp.type"            = "org.apache.kafka.connect.transforms.InsertField$Value"
"transforms.InsertTimeStamp.timestamp.field" = "timestamp!"

And send to a daily index with:

"transforms.routeTS.type"             = "org.apache.kafka.connect.transforms.TimestampRouter"
"transforms.routeTS.timestamp.format" = "yyyy.MM.dd"
"transforms.routeTS.topic.format"     = "vault-$${timestamp}"

Make sure you enable both of the transforms, with the following line

"transforms"                                 = "routeTS,InsertTimeStamp"