Step 1: In the Data Portal, click Tables and then click Create Table.

Step 2: Select Confluent Cloud as the Data Source.

Step 3: Create a New Connection.

Click New Connection. If you want to use an existing connection, select the connection from the list and proceed to Step 5.

Enter a Source Name for the new connection and specify the Broker URL.

Step 4: Configure Connection Parameters

Use the following JSON configuration:

{
  "stream.kafka.broker.list": "abc-xyz05.region.aws.confluent.cloud:9092",
  "security.protocol": "SASL_SSL",
  "sasl.mechanism": "PLAIN",
  "stream.confluent.key": "AKIAEXAMPLEKEY",
  "stream.confluent.secret": "SECRETKEYEXAMPLE12345",
  "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required \n username=\"AKIAEXAMPLEKEY\" \n password=\"SECRETKEYEXAMPLE12345\";"
}

Property Descriptions

PropertyRequiredDescription
stream.kafka.broker.listYesThe list of Kafka brokers to connect to, typically in the format host1:port1,host2:port2. Required to establish a connection to Confluent Kafka.
security.protocolYesThe security protocol used to connect to Kafka (e.g., SASL_SSL). Mandatory when the Confluent Kafka cluster uses SASL.
sasl.mechanismYesThe SASL mechanism used for authentication. This value should be PLAIN.
stream.confluent.keyYesThe Confluent key for authentication.
stream.confluent.secretYesThe Confluent secret for authentication.
sasl.jaas.configYesThe JAAS configuration string for SASL authentication.

Step 5: Test the Connection and Configure Data Ingestion.

After you have configured the connection properties, test the connection to ensure it functions properly.

Once the connection is successful, configure the additional data settings using the following JSON format:

{
  "stream.kafka.topic.name": "",
  "stream.kafka.decoder.prop.format": "",
  "stream.kafka.decoder.class.name": "",
  "stream.kafka.consumer.type": "lowlevel",
  "stream.kafka.consumer.factory.class.name": "ai.startree.pinot.plugin.stream.kafka20.ConfluentKafkaConsumerFactory",
  "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
}

Property Descriptions

PropertyRequiredDescription
stream.kafka.topic.nameYesThe name of the Kafka topic from which Pinot will consume data.
stream.kafka.decoder.prop.formatYesThe format of the input data. Supported values include csv, json, avro, parquet, etc.
stream.kafka.decoder.class.nameYesThe class name of the decoder used for Kafka message parsing. Set based on the message format and schema.
Examples:
* For JSON Messages: org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder
* For AVRO Messages: org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder
* For PROTO Messages: org.apache.pinot.plugin.inputformat.protobuf.KafkaConfluentSchemaRegistryProtoBufMessageDecoder

See Message Decoders for more information.
stream.kafka.consumer.typeYesThe type of Kafka consumer used in Apache Pinot. Use lowlevel for granular control of partitions and offsets.
stream.kafka.consumer.factory.class.nameYesThe Kafka consumer factory class to use.
The default class is ai.startree.pinot.plugin.stream.kafka20.ConfluentKafkaConsumerFactory for Kafka 2.0+ or Confluent Kafka clients.

Use ai.startree.pinot.plugin.stream.kafka10.ConfluentKafkaConsumerFactory for legacy Kafka (1.x) environments with Confluent client integration.
stream.kafka.consumer.prop.auto.offset.resetYesDefines behavior when no committed offset exists or offsets are invalid. Options: smallest (for backfilling or consuming all historical data), latest (for real-time streaming), or none.

Step 6: Preview the Data

Click Show Sample Data to preview the source data before finalizing the configuration.