Step 1: In the Data Portal, click Tables and then click Create Table.
Step 2: Select Kafka as the Data Source.
Step 3: Create a New Connection.
Click New Connection. If you want to use an existing connection, select the connection from the list and proceed to Step 5. Enter a Source Name for the new connection and specify the Broker URL. Select the Security Protocol to be used for authentication, from the drop-down list.Step 4: Configure Connection Parameters
Add the connection parameters according to your authentication method.Connecting to Kafka with SASL_PLAIN or SASL_SCRAM
Use the following connection JSON when Kafka is configured with the SASL_PLAINTEXT or SASL_SSL security protocol enabled.Property Descriptions
| Property Name | Required | Description |
|---|---|---|
| stream.kafka.broker.list | Yes | List of Kafka brokers to connect to, typically in the format host1:port1,host2:port2. |
| security.protocol | Yes | The security protocol used to connect to Kafka (for example, SASL_SSL, PLAINTEXT). This parameter is mandatory when the Kafka cluster uses SASL. |
| sasl.mechanism | Yes | The SASL mechanism for authentication (for example, PLAIN, SCRAM-SHA-512). This parameter is required when security.protocol is set to SASL_SSL or SASL_PLAINTEXT. |
| stream.kafka.username | Required when security.protocol is set to SASL_SSL or SASL_PLAINTEXT | The user name for authenticating with Kafka. |
| stream.kafka.password | Required when security.protocol is set to SASL_SSL or SASL_PLAINTEXT | The password for authenticating with Kafka. |
| sasl.jaas.config | Required when security.protocol is set to SASL_SSL or SASL_PLAINTEXT | The JAAS configuration string for SASL authentication. When the PLAIN SASL Mechanism is used: “org.apache.kafka.common.security.plain.PlainLoginModule required \n username="" \n password="";” When the SCRAM SASL Mechanism is used: “org.apache.kafka.common.security.scram.ScramLoginModule required \n username=“USERNAME” \n password=“PASSWORD”;“ |
Connecting to Kafka with SSL Security Protocol
See prerequisites for SSL for steps to obtain the bootstrap server endpoint, the server certificate, the client certificate, and the client key, and to update an SSL certificate.One-way SSL (Server Authentication)
In one-way SSL, only the server presents its SSL certificate to the client to prove its identity. The client verifies the server’s certificate using a trusted Certificate Authority (CA) before establishing an encrypted connection. Use the following connection JSON when Kafka is configured with the SSL security protocol enabled:Two-way SSL (Mutual Authentication)
In two-way SSL, both the client and the server present certificates to authenticate each other. The server verifies the client’s certificate, and the client verifies the server’s certificate. This setup provides a higher level of security and trust, commonly used. Use the following connection JSON when Kafka is configured with the SSL security protocol enabled:The values for the ssl properties (server/client certificate, key) need to be base64 encoded.
Property Descriptions
| Property | Required | Description |
|---|---|---|
stream.kafka.broker.list | Yes | List of Kafka brokers to connect to, typically in the format host1:port1,host2:port2. |
security.protocol | Yes | Defines the security protocol used to connect to Kafka (e.g., SASL_SSL, SSL, PLAINTEXT). This parameter is mandatory when the Kafka cluster uses SSL or SASL. |
ssl.truststore.location | Yes | Absolute path to the truststore file that contains trusted CA certificates (used to verify Kafka broker certificates). |
ssl.truststore.password | Yes | Password to open the truststore file. |
stream.kafka.ssl.server.certificate | Required in one-way SSL | Server Certificate for connecting to a Kafka cluster secured with SSL. Required when security.protocol is set to SSL. It needs to be Base64 encoded. |
stream.kafka.ssl.client.certificate | Required in two-way SSL | Client Certificate for connecting to a Kafka cluster secured with SSL. Required when security.protocol is set to SSL. It needs to be Base64 encoded. |
stream.kafka.ssl.client.key | Required in two-way SSL | Certificate for connecting to a Kafka cluster secured with SSL. Required when security.protocol is set to SSL and stream.kafka.ssl.client.certificate is also set. It needs to be Base64 encoded. |
ssl.keystore.location | Required in two-way SSL | Path to the keystore file containing the client’s certificate and private key. |
ssl.keystore.password | Required in two-way SSL | Password to open the keystore file. |
ssl.key.password | Required in two-way SSL | Password protecting the private key inside the keystore. |
Step 5: Test the Connection and Configure Data Ingestion
After you have configured the connection properties, test the connection to ensure it is working. When the connection is successful, use the following JSON to configure the source’s Kafka topic and data format:Property Descriptions
| Property | Required | Description |
|---|---|---|
stream.kafka.decoder.prop.format | Yes | The input files are in CSV format. Supported values include csv, json, avro, parquet, etc. |
stream.kafka.decoder.class.name | Yes | The class name of the decoder used for Kafka message parsing. It is set based on the message format and schema. Examples: * For JSON Messages use org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder * For AVRO Messages use org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder * For PROTO Messages use org.apache.pinot.plugin.inputformat.protobuf.KafkaConfluentSchemaRegistryProtoBufMessageDecoderSee Message Decoders for more information. |
stream.kafka.topic.name | Yes | The name of the Kafka topic from which Pinot will consume data. |
stream.kafka.consumer.type | Yes | The type of Kafka consumer used in Apache Pinot. Use lowlevel for granular control of partitions and offsets. |
stream.kafka.consumer.factory.class.name | Yes | The Kafka consumer factory class to use. The specified class ai.startree.pinot.plugin.stream.kafka20.ConfluentKafkaConsumerFactory is the default Kafka consumer factory when using Kafka 2.0+ or Confluent Kafka clients. Use ai.startree.pinot.plugin.stream.kafka10.ConfluentKafkaConsumerFactory for legacy Kafka (1.x) environments with Confluent client integration. |
stream.kafka.consumer.prop.auto.offset.reset | Yes | The behavior when no committed offset exists or offsets are invalid (smallest, latest, none). Use smallest for backfilling or consuming all historical data. Use latest for real-time streaming. |
Additional Configuration
The parameters mentioned above are the minimum-required parameters to establish the connection. Apache Pinot also provides additional configuration options to fine-tune and control the ingestion process. For a complete list of configuration parameters, refer to the Apache Pinot documentation. In addition, Kafka Consumer properties can also be used.Configure Record Reader
AVRO
AVRO
One configuration option For example, if the schema type is INT, logical type is DATE, the conversion applied is a TimeConversion, and the value is V; then a date is generated V days from epoch start.
AvroRecordReaderConfig is supported.- enableLogicalTypes: Enable logical type conversions for specific Avro logical types, such as DECIMAL, UUID, DATE, TIME_MILLIS, TIME_MICROS, TIMESTAMP_MILLIS, and TIMESTAMP_MICROS.
PROTO
PROTO
ProtoBufRecordReaderConfig exists in Pinot and the following configuration is possible.Additional Configuration
The parameters mentioned above are the minimum-required parameters to establish the connection. Apache Pinot also provides additional configuration options to fine-tune and control the ingestion process. For a complete list of configuration parameters, refer to the Apache Pinot documentation. In addition, Kafka Consumer properties can also be used.Step 6: Sample Data
Click Show Sample Data to see a preview of the source data.Next Step
Proceed with Data Modeling.

