Skip to main content
1

In the Data Portal, click Tables and then click Create Table.

2

Select Kafka as the Data Source.

3

Create a New Connection.

Click New Connection. If you want to use an existing connection, select the connection from the list and proceed to Step 5.Enter a Source Name for the new connection and specify the Broker URL.
4

Configure Connection Parameters.

SASL_SSL with PLAIN Authentication

StreamNative supports SASL_SSL as the security protocol with PLAIN as the SASL mechanism. Use the following JSON configuration:
{
  "stream.kafka.broker.list": "kafka-abc123.streamnative.cloud:9093",
  "security.protocol": "SASL_SSL",
  "sasl.mechanism": "PLAIN",
  "stream.kafka.username": "YOUR_SERVICE_ACCOUNT_TOKEN_NAME",
  "stream.kafka.password": "YOUR_SERVICE_ACCOUNT_TOKEN_SECRET",
  "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required \n username=\"YOUR_SERVICE_ACCOUNT_TOKEN_NAME\" \n password=\"YOUR_SERVICE_ACCOUNT_TOKEN_SECRET\";"
}

Property Descriptions

PropertyRequiredDescription
stream.kafka.broker.listYesThe list of Kafka brokers to connect to, typically in the format host1:port1,host2:port2. Required to establish a connection to the StreamNative cluster.
security.protocolYesThe security protocol used to connect to Kafka. StreamNative only supports SASL_SSL.
sasl.mechanismYesThe SASL mechanism used for authentication. StreamNative only supports PLAIN.
stream.kafka.usernameYesThe StreamNative service account token name used for authentication.
stream.kafka.passwordYesThe StreamNative service account token secret used for authentication.
sasl.jaas.configYesThe JAAS configuration string for SASL authentication.
5

Test the Connection and Configure Data Ingestion.

After you have configured the connection properties, test the connection to ensure it functions properly.

Configure Data Ingestion

Once the connection is successful, configure the additional data settings using the following JSON format:
{
  "stream.kafka.topic.name": "",
  "stream.kafka.decoder.prop.format": "",
  "stream.kafka.decoder.class.name": "",
  "stream.kafka.consumer.type": "lowlevel",
  "stream.kafka.consumer.factory.class.name": "ai.startree.pinot.plugin.stream.kafka20.ConfluentKafkaConsumerFactory",
  "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
}

Property Descriptions

PropertyRequiredDescription
stream.kafka.topic.nameYesThe name of the Kafka topic from which Pinot will consume data.
stream.kafka.decoder.prop.formatYesThe format of the input data. Supported values include json, avro, proto, etc.
stream.kafka.decoder.class.nameYesThe class name of the decoder used for Kafka message parsing. Set based on the message format and schema.
Examples:
* For JSON Messages: org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder
* For AVRO Messages: org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder
* For PROTO Messages: org.apache.pinot.plugin.inputformat.protobuf.KafkaConfluentSchemaRegistryProtoBufMessageDecoder

See Message Decoders for more information.
stream.kafka.consumer.typeNoThe type of Kafka consumer used in Apache Pinot. Use lowlevel for granular control of partitions and offsets.
stream.kafka.consumer.factory.class.nameYesThe Kafka consumer factory class to use.
The default class is ai.startree.pinot.plugin.stream.kafka20.ConfluentKafkaConsumerFactory for Kafka 2.0+ or Confluent Kafka clients.

Use ai.startree.pinot.plugin.stream.kafka10.ConfluentKafkaConsumerFactory for legacy Kafka (1.x) environments with Confluent client integration.
stream.kafka.consumer.prop.auto.offset.resetYesDefines behavior when no committed offset exists or offsets are invalid. Options: smallest (for backfilling or consuming all historical data), latest (for real-time streaming), or none.
6

Preview the Sample Data.

Click Show Sample Data to preview the source data before finalizing the configuration.

Next Step

Proceed with Data Modeling.