Alerting Failed Vault Logins With KSQL

January 1, 2019
Terraform Vault Kafka KSQL

vault-alerts.gif

When running a Vault cluster, it is important to monitor requests to it, and understand if it’s operating as expected; clients are correctly authenticating, or if there’s a potential attack taking place. To do this we can output Vault’s audit logs to Kafka, and use Confluent’s KSQL to query it. We’ll use Terraform to define everything.

Output Audit log to Kafka

Create a Kafka Topic, and tell Vault to send audit logs to it.

provider "kafka" {
  bootstrap_servers = ["localhost:9092"]
}

provider "vault" {}

resource "kafka_topic" "vault" {
  name               = "vault"
  replication_factor = 1
  partitions         = 1
}

resource "vault_audit" "kafka" {
  type = "kafka"

  options = {
    topic   = "${kafka_topic.vault.name}"
    address = "localhost:9092"
  }
}

Create a stream, that can be queried from KSQL. Here we use the time value that vault outputs to indicate the message timestamp.

KSQL uses Java’s Datetime Format builder. Vault outputs logs with varying precision, so, in order to stop parsing errors with timestamps, we have to include the 6 possible formats [.SSSSSS][.SSSSS][.SSSS][.SSS][.SS][.S]

CREATE STREAM vault_logs
(time VARCHAR,
 type VARCHAR,
 auth STRUCT<client_token VARCHAR,
             accessor VARCHAR,
             display_name VARCHAR,
             policies ARRAY<STRING>,
             token_policies ARRAY<STRING>,
             entity_id VARCHAR,
             token_type VARCHAR>,
 request STRUCT<id VARCHAR,
                operation VARCHAR,
                path VARCHAR,
                remote_address VARCHAR>,
 response STRUCT<data STRUCT<error VARCHAR>>,
 error VARCHAR)
 WITH (KAFKA_TOPIC='vault',
       VALUE_FORMAT='JSON',
       TIMESTAMP='time',
       TIMESTAMP_FORMAT='yyyy-MM-dd''T''HH:mm:ss[.SSSSSS][.SSSSS][.SSSS][.SSS][.SS][.S]''Z''');

We can then create a stream that filters out errors to our Vault mount @ auth/userpass.

CREATE STREAM vault_failed_login_attempts AS
  SELECT *
  FROM vault_logs
  WHERE
    type = 'response' AND
    response->data->error != '' AND
    request->path LIKE 'auth/userpass%';

Due to a bug in KSQL 5.2, we need to create an intermediarey stream, which flattens the data – so that we can query the internal data.

CREATE STREAM vault_failed_login_attempts_flattened AS
  SELECT request->remote_address AS ip,
         request->path AS path
  FROM vault_failed_login_attempts;

Our final table will, produce a message, where there are over 5 failed logins over 30 seconds

CREATE TABLE suspicious_attempts
  AS SELECT ip, count(*)
  FROM vault_failed_login_attempts_flattened
  WINDOW TUMBLING (size 30 second)
  GROUP BY ip having count(*) > 5;

Our output topic, SUSPICIOUS_ATTEMPTS, now contains messages with things.

> kafka-avro-console-consumer --topic SUSPICIOUS_ATTEMPTS --bootstrap-server localhost:9092 --from-beginning

{"IP":{"string":"127.0.0.1"},"COUNT":{"long":4}}
{"IP":{"string":"127.0.0.1"},"COUNT":{"long":6}}

We can bring in Kafka-Connect, to read from the topic, and send slack alerts, by using kafka-connect-slack

provider "kafka-connect" {
  url = "http://localhost:8083"
}

resource "kafka-connect_connector" "vault_alerts" {
  name = "vault_alerts"

  config = {
    "name"              = "vault_alerts"
    "connector.class"   = "net.mongey.kafka.connect.SlackSinkConnector"
    "key.converter"     = "org.apache.kafka.connect.storage.StringConverter"
    "errors.log.enable" = "true"
    "topics"            = "${upper(ksql_table.suspicious_attempts.name)}"
    "slack.token"       = "${var.slack_token}"
    "slack.channel"     = "security"
    "message.template"  = "$${IP} has failed to login $${COUNT} times"
  }
}

And there we go, we’ve sent