In this guide we’ll learn how to ingest data into Apache Pinot from an Apache Kafka cluster configured with SSL and SASL authentication.

Pinot Version0.10.0
Codestartreedata/pinot-recipes/kafka-ssl-sasl

Prerequisites

To follow the code examples in this guide, do the following:

  • Install Docker locally.
  • Ensure you have a Kafka cluster with SSL enabled. Confluent Cloud offers a hosted Kafka service with free credits to get started.
  • Download recipes
  1. If you haven’t already, download recipes.
  2. In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/kafka-ssl-sasl

Launch Pinot Cluster

You can spin up Pinot cluster by running the following command:

docker-compose up

This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, and Zookeeper. You can find the docker-compose.yml file on GitHub.

Pinot Schema and Table

Let’s create a Pinot Schema and Table.

The schema is defined below:

{
  "schemaName":"events",
  "dimensionFieldSpecs":[
    {
      "name":"uuid",
      "dataType":"STRING"
    }
  ],
  "metricFieldSpecs":[
    {
      "name":"count",
      "dataType":"INT"
    }
  ],
  "dateTimeFieldSpecs":[
    {
      "name":"ts",
      "dataType":"TIMESTAMP",
      "format":"1:MILLISECONDS:EPOCH",
      "granularity":"1:MILLISECONDS"
    }
  ]
}

config/schema.json

And the table configuration below:

{
  "tableName": "events",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "timeColumnName": "ts",
    "schemaName": "events",
    "replication": "1",
    "replicasPerPartition": "1"
  },
  "tableIndexConfig": {
    "loadMode": "MMAP",
    "streamConfigs": {
      "streamType": "kafka",
      "stream.kafka.topic.name": "events",
      "stream.kafka.broker.list": "<bootstrap.servers>",
      "security.protocol": "SASL_SSL",
      "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<cluster-api-key>\" password=\"<cluster-api-secret>\";",
      "sasl.mechanism": "PLAIN",
      "stream.kafka.consumer.type": "lowlevel",
      "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
      "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
      "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder"
    }
  },
  "ingestionConfig": {
    "batchIngestionConfig": {
      "segmentIngestionType": "APPEND",
      "segmentIngestionFrequency": "DAILY"
    }
  },
  "tenants": {},
  "metadata": {}
}

config/table.json

You’ll need to replace <bootstrap-servers> with the host and port of your Kafka cluster.

The credentials that we want to use are specified in the sasl.jaas.config property. You’ll need to replace <cluster-api-key> and <cluster-api-secret> with your own credentials.

If our Kafka cluster does not have SSL enabled, we would need to specify security_protocol as SASL_PLAINTEXT instead of SASL_SSL. For an example of using SASL without SSL, see Connecting to Kafka with SASL authentication

Create the table and schema by running the following command:

docker exec -it pinot-controller-ssl-sasl bin/pinot-admin.sh AddTable   \
  -tableConfigFile /config/table.json   \
  -schemaFile /config/schema.json \
  -exec

Ingesting Data

Ingest a few messages into your Kafka cluster:

{"ts": "1649757242937", "uuid": "fc43b2fafbf64d9e8dff8d6be75d881d", "count": 308}
{"ts": "1649757242941", "uuid": "11f2500386ec42be84debba1d5bfd2f7", "count": 515}
{"ts": "1649757242945", "uuid": "f2dcf496957146eaa12605c5d8c005a0", "count": 142}

If you’re using Confluent Cloud you can ingest these messages via the UI.

Querying

Now let’s navigate to localhost:9000/#/query and copy/paste the following query:

select count(*), sum(count) 
from events 

You will see the following output:

count(*)sum(count)
3965

Query Results