Documentation Index
Fetch the complete documentation index at: https://docs.startree.ai/llms.txt
Use this file to discover all available pages before exploring further.
In this guide we’ll learn how to ingest data into Apache Pinot from an Apache Kafka cluster configured with SASL authentication.
Prerequisites
To follow the code examples in this guide, you must install Docker locally and download recipes.
Navigate to recipe
- If you haven’t already, download recipes.
- In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/kafka-sasl
Launch Pinot and Kafka Clusters
You can spin up Pinot and Kafka clusters by running the following command:
This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Kafka, and Zookeeper. You can find the docker-compose.yml file on GitHub.
The Kafka cluster is launched with the following JAAS (Java Authentication and Authorization Service) for SASL configuration
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_alice="alice-secret";
};
Client{};
We have two users:
admin with the password admin-secret
alice with the password alice-secret
Pinot Schema and Table
Let’s create a Pinot Schema and Table.
The schema is defined below:
{
"schemaName":"events",
"dimensionFieldSpecs":[
{
"name":"uuid",
"dataType":"STRING"
}
],
"metricFieldSpecs":[
{
"name":"count",
"dataType":"INT"
}
],
"dateTimeFieldSpecs":[
{
"name":"ts",
"dataType":"TIMESTAMP",
"format":"1:MILLISECONDS:EPOCH",
"granularity":"1:MILLISECONDS"
}
]
}
config/schema.json
And the table config below:
{
"tableName": "events",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "ts",
"schemaName": "events",
"replication": "1",
"replicasPerPartition": "1"
},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.topic.name": "events",
"stream.kafka.broker.list": "kafka-sasl:9093",
"security.protocol": "SASL_PLAINTEXT",
"sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"admin-secret\";",
"sasl.mechanism": "PLAIN",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder"
}
},
"ingestionConfig": {
"batchIngestionConfig": {
"segmentIngestionType": "APPEND",
"segmentIngestionFrequency": "DAILY"
}
},
"tenants": {},
"metadata": {}
}
config/table.json
The part of this configuration that we’re interested in is highlighted. The credentials that we want to use are specified in the sasl.jaas.config property.
Create the table and schema by running the following command:
docker exec -it pinot-controller-sasl bin/pinot-admin.sh AddTable \
-tableConfigFile /config/table.json \
-schemaFile /config/schema.json \
-exec
Ingesting Data
Next, we’re going to ingest some data into Kafka:
while true; do
ts=`date +%s%N | cut -b1-13`;
uuid=`cat /proc/sys/kernel/random/uuid | sed 's/[-]//g'`
count=$[ $RANDOM % 1000 + 0 ]
echo "{\"ts\": \"${ts}\", \"uuid\": \"${uuid}\", \"count\": $count}"
done | docker exec -i kafka-sasl /opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--producer.config /etc/kafka/kafka_client.conf \
--topic events;
We need to pass in a configuration file with SASL credentials when producing events as well.
The contents of the configuration file are shown below:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="alice" \
password="alice-secret";
/etc/kafka/kafka_client.conf
Querying
Now let’s navigate to localhost:9000/#/query and copy/paste the following query:
select count(*), sum(count)
from events
You will see the following output:
| count(*) | sum(count) |
|---|
| 64209 | 31917036 |
Query Results