Connect to Streaming Sources
Connect to Confluent Cloud
Step 1: In the Data Portal, click Tables and then click Create Table.
Step 2: Select Confluent Cloud as the Data Source.
Step 3: Create a New Connection.
Click New Connection. If you want to use an existing connection, select the connection from the list and proceed to Step 5.
Enter a Source Name for the new connection and specify the Broker URL.
Step 4: Configure Connection Parameters
Use the following JSON configuration:
Property Descriptions
Property | Required | Description |
---|---|---|
stream.kafka.broker.list | Yes | The list of Kafka brokers to connect to, typically in the format host1:port1,host2:port2 . Required to establish a connection to Confluent Kafka. |
security.protocol | Yes | The security protocol used to connect to Kafka (e.g., SASL_SSL ). Mandatory when the Confluent Kafka cluster uses SASL. |
sasl.mechanism | Yes | The SASL mechanism used for authentication. This value should be PLAIN . |
stream.confluent.key | Yes | The Confluent key for authentication. |
stream.confluent.secret | Yes | The Confluent secret for authentication. |
sasl.jaas.config | Yes | The JAAS configuration string for SASL authentication. |
Step 5: Test the Connection and Configure Data Ingestion.
After you have configured the connection properties, test the connection to ensure it functions properly.
Once the connection is successful, configure the additional data settings using the following JSON format:
Property Descriptions
Property | Required | Description |
---|---|---|
stream.kafka.topic.name | Yes | The name of the Kafka topic from which Pinot will consume data. |
stream.kafka.decoder.prop.format | Yes | The format of the input data. Supported values include csv, json, avro, parquet, etc. |
stream.kafka.decoder.class.name | Yes | The class name of the decoder used for Kafka message parsing. Set based on the message format and schema. Examples: * For JSON Messages: org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder * For AVRO Messages: org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder * For PROTO Messages: org.apache.pinot.plugin.inputformat.protobuf.KafkaConfluentSchemaRegistryProtoBufMessageDecoder See Message Decoders for more information. |
stream.kafka.consumer.type | Yes | The type of Kafka consumer used in Apache Pinot. Use lowlevel for granular control of partitions and offsets. |
stream.kafka.consumer.factory.class.name | Yes | The Kafka consumer factory class to use. The default class is ai.startree.pinot.plugin.stream.kafka20.ConfluentKafkaConsumerFactory for Kafka 2.0+ or Confluent Kafka clients. Use ai.startree.pinot.plugin.stream.kafka10.ConfluentKafkaConsumerFactory for legacy Kafka (1.x) environments with Confluent client integration. |
stream.kafka.consumer.prop.auto.offset.reset | Yes | Defines behavior when no committed offset exists or offsets are invalid. Options: smallest (for backfilling or consuming all historical data), latest (for real-time streaming), or none . |
Step 6: Preview the Data
Click Show Sample Data to preview the source data before finalizing the configuration.
Next Step
Proceed with Data Modeling.