Skip to content

Consumers

The consumer is what "consumes" the events written into Kafka, pre-processes them into a Metric which then can be handled by an output to store in a database or be sent somewhere else.

The consumer will typically implement vendor specific logic to normalize the event into one of the few Metric models available.

For example if you are looking to ingest a session mapping event from 2 CGNAT vendors but 1 represents the protocol as an integer and the other represents the protocol as a string (e.g 6 for TCP or 17 for UDP), the typical data model for a metric object requires the protocol number so you would implement some logic to transform the string to its integer equivalent.

Consumer Basic Architecture

Configuration Examples

Consumer configuration is driven via YAML or JSON. When you mount the configuration file to the consumer containers, you should set the CONFIG_FILE environment variable to let the consumer know where the configuration file lives.

YAML

# Global Configuration Settings
kafka_bootstrap_servers: "localhost:9094"
kafka_group_id: "syslog-consumers"
kafka_max_records_poll: 500
batch_size: 30000
processes: 8
metrics_port: 4499

# Handler Configuration
handler:
  type: "cgn_ec_consumer.handlers.nfware.NFWareSyslogHandler"
  options: {}

# Outputs Configuration
outputs:
  # TimeScaleDB Output
  - type: "cgn_ec_consumer.outputs.timescaledb.TimeScaleDBOutput"
    options:
      address: "tsdb"
      port: 5432
      username: "cgnat"
      password: "password123"
      database: "cgnat"
      batch_size: 30000

    # Example HTTP Output
    # - type: "HTTPWebhookOutput"
    #   options:
    #     url: "http://some-api/test"
    #     headers:
    #       x-api-key: default-change-me
    #     timeout: 10
    #   preprocessors:
    #     - name: filter_keys
    #       arguments:
    #         keys:
    #           - src_ip
    #           - src_port
    #           - x_ip
    #           - x_port
    #           - timestamp
    #           - dst_ip
    #           - dst_port
    #           - event
    #     - name: match_kvs
    #       arguments:
    #         kvs:
    #           src_ip: 192.168.3.30
    #     - name: key_exists
    #       arguments:
    #         key: dst_port
    #         ignore_none: true

JSON

{
  "kafka_bootstrap_servers": "localhost:9094",
  "kafka_group_id": "syslog-consumers",
  "kafka_max_records_poll": 500,
  "batch_size": 30000,
  "processes": 8,
  "metrics_port": 4499,
  "handler": {
    "type": "cgn_ec_consumer.handlers.nfware.NFWareSyslogHandler",
    "options": {}
  },

  "outputs": [
    {
      "type": "cgn_ec_consumer.outputs.timescaledb.TimeScaleDBOutput",
      "options": {
        "address": "tsdb",
        "port": 5432,
        "username": "cgnat",
        "password": "password123",
        "database": "cgnat",
        "batch_size": 30000
      },
      "preprocessors": [
        {
          "name": "filter_keys",
          "arguments": {
            "keys": [
              "src_ip",
              "src_port",
              "x_ip",
              "x_port",
              "timestamp",
              "dst_ip",
              "dst_port",
              "event",
              "type"
            ]
          }
        },
        {
          "name": "match_kvs",
          "arguments": {
            "kvs": {
              "src_ip": "192.168.1.20",
              "type": "session-mapping"
            }
          }
        }
      ]
    }
  ]
}

Parameters

Parameter Description
kafka_bootstrap_servers Kafka Server for consuming all events
kafka_group_id ID for the consumer group
kafka_max_records_poll How many records are pulled
processes How many processes to run within the single container
metrics_port What port to expose the Prometheus Python Client Exporter data on
config_file Only set outside the configuration to reference config file path
batch_size TimeScaleDB Batch insert size
handler.type Handler class for processing events
handler.options Options passed to Handler init
outputs[i].type Output class for processing output logic
outputs[i].options Output Options passed to Output init
outputs[i].preprocessors[i].name Name of Preprocessor
outputs[i].preprocessors[i].arguments Arguments passed to Preprocessor init