Create a connection to ingest from fully managed Kafka in Confluent Cloud.
Property | Required | Description |
---|---|---|
stream.kafka.broker.list | Yes | The list of Kafka brokers to connect to, typically in the format host1:port1,host2:port2 . Required to establish a connection to Confluent Kafka. |
security.protocol | Yes | The security protocol used to connect to Kafka (e.g., SASL_SSL ). Mandatory when the Confluent Kafka cluster uses SASL. |
sasl.mechanism | Yes | The SASL mechanism used for authentication. This value should be PLAIN . |
stream.confluent.key | Yes | The Confluent key for authentication. |
stream.confluent.secret | Yes | The Confluent secret for authentication. |
sasl.jaas.config | Yes | The JAAS configuration string for SASL authentication. |
Property | Required | Description |
---|---|---|
stream.kafka.topic.name | Yes | The name of the Kafka topic from which Pinot will consume data. |
stream.kafka.decoder.prop.format | Yes | The format of the input data. Supported values include csv, json, avro, parquet, etc. |
stream.kafka.decoder.class.name | Yes | The class name of the decoder used for Kafka message parsing. Set based on the message format and schema. Examples: * For JSON Messages: org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder * For AVRO Messages: org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder * For PROTO Messages: org.apache.pinot.plugin.inputformat.protobuf.KafkaConfluentSchemaRegistryProtoBufMessageDecoder See Message Decoders for more information. |
stream.kafka.consumer.type | Yes | The type of Kafka consumer used in Apache Pinot. Use lowlevel for granular control of partitions and offsets. |
stream.kafka.consumer.factory.class.name | Yes | The Kafka consumer factory class to use. The default class is ai.startree.pinot.plugin.stream.kafka20.ConfluentKafkaConsumerFactory for Kafka 2.0+ or Confluent Kafka clients. Use ai.startree.pinot.plugin.stream.kafka10.ConfluentKafkaConsumerFactory for legacy Kafka (1.x) environments with Confluent client integration. |
stream.kafka.consumer.prop.auto.offset.reset | Yes | Defines behavior when no committed offset exists or offsets are invalid. Options: smallest (for backfilling or consuming all historical data), latest (for real-time streaming), or none . |