Create a connection to stream real-time events using Apache Kafka.
Property Name | Required | Description |
---|---|---|
stream.kafka.broker.list | Yes | List of Kafka brokers to connect to, typically in the format host1:port1,host2:port2. |
security.protocol | Yes | Defines the security protocol used to connect to Kafka (e.g., PLAIN_TEXT). |
Property Name | Required | Description |
---|---|---|
stream.kafka.broker.list | Yes | List of Kafka brokers to connect to, typically in the format host1:port1,host2:port2. |
security.protocol | Yes | The security protocol used to connect to Kafka (for example, SASL_SSL, PLAINTEXT). This parameter is mandatory when the Kafka cluster uses SASL. |
sasl.mechanism | Yes | The SASL mechanism for authentication (for example, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512). This parameter is required when security.protocol is set to SASL_SSL or SASL_PLAINTEXT. |
stream.kafka.username | Required when security.protocol is set to SASL_SSL or SASL_PLAINTEXT | The user name for authenticating with Kafka. |
stream.kafka.password | Required when security.protocol is set to SASL_SSL or SASL_PLAINTEXT | The password for authenticating with Kafka. |
sasl.jaas.config | Required when security.protocol is set to SASL_SSL or SASL_PLAINTEXT | The JAAS configuration string for SASL authentication. When the PLAIN SASL Mechanism is used: “org.apache.kafka.common.security.plain.PlainLoginModule required \n username="" \n password="";” When the SCRAM SASL Mechanism is used: “org.apache.kafka.common.security.scram.ScramLoginModule required \n username=“USERNAME” \n password=“PASSWORD”;“ |
Property | Required | Description |
---|---|---|
stream.kafka.broker.list | Yes | List of Kafka brokers to connect to, typically in the format host1:port1,host2:port2 . |
security.protocol | Yes | Defines the security protocol used to connect to Kafka (e.g., SASL_SSL , SSL , PLAINTEXT ). This parameter is mandatory when the Kafka cluster uses SSL or SASL. |
stream.kafka.server.certificate | Optional | Certificate for connecting to a Kafka cluster secured with SSL. Required when security.protocol is set to SSL . |
stream.kafka.ssl.client.certificate | Optional | Certificate for connecting to a Kafka cluster secured with SSL. Required when security.protocol is set to SSL . |
stream.kafka.ssl.client.key | Optional | Certificate for connecting to a Kafka cluster secured with SSL. Required when security.protocol is set to SSL and stream.kafka.ssl.client.certificate is also set. |
"ssl.truststore.location"
and "ssl.truststore.password"
, can be added to the JSON configuration as needed.
Property | Required | Description |
---|---|---|
stream.kafka.decoder.prop.format | Yes | The input files are in CSV format. Supported values include csv , json , avro , parquet , etc. |
stream.kafka.decoder.class.name | Yes | The class name of the decoder used for Kafka message parsing. It is set based on the message format and schema. Examples: * For JSON Messages use org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder * For AVRO Messages use org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder * For PROTO Messages use org.apache.pinot.plugin.inputformat.protobuf.KafkaConfluentSchemaRegistryProtoBufMessageDecoder See Message Decoders for more information. |
stream.kafka.topic.name | Yes | The name of the Kafka topic from which Pinot will consume data. |
stream.kafka.consumer.type | Yes | The type of Kafka consumer used in Apache Pinot. Use lowlevel for granular control of partitions and offsets. |
stream.kafka.consumer.factory.class.name | Yes | The Kafka consumer factory class to use. The specified class ai.startree.pinot.plugin.stream.kafka20.ConfluentKafkaConsumerFactory is the default Kafka consumer factory when using Kafka 2.0+ or Confluent Kafka clients. Use ai.startree.pinot.plugin.stream.kafka10.ConfluentKafkaConsumerFactory for legacy Kafka (1.x) environments with Confluent client integration. |
stream.kafka.consumer.prop.auto.offset.reset | Yes | The behavior when no committed offset exists or offsets are invalid (smallest , latest , none ). Use smallest for backfilling or consuming all historical data. Use latest for real-time streaming. |
AVRO
AvroRecordReaderConfig
is supported.PROTO
ProtoBufRecordReaderConfig
exists in Pinot and the following configuration is possible.