Step 1: In the Data Portal, click Tables and then click Create Table.

Step 2: Select Redpanda as the Data Source.

Step 3: Create a New Connection.

Click New Connection. If you want to use an existing connection, select the connection from the list and proceed to Step 5.

Enter a Source Name for the new connection and specify the Broker URL.

Select the Security Protocol to be used for authentication, from the drop-down list.

Step 4: Configure Connection Parameters

Add the connection parameters according to your authentication method.

Connecting to Redpanda with No Authentication

Use the following connection JSON when connecting to Redpanda without authentication.

   {
      "stream.kafka.broker.list": "<broker_list>",
      "security.protocol": "PLAIN_TEXT"
   }

Property Descriptions

Property NameRequiredDescription
stream.kafka.broker.listYesList of Kafka brokers to connect to, typically in the format host1:port1,host2:port2.
security.protocolYesDefines the security protocol used to connect to Kafka (e.g., PLAIN_TEXT).

Connecting to Redpanda with SASL_PLAINTEXT or SASL_SSL Security Protocol

Use the following connection JSON when Redpanda is configured with the SASL_PLAINTEXT or SASL_SSL security protocol enabled.

   {
      "streamType": "kafka",
      "stream.kafka.broker.list": "",
      "security.protocol": "",
      "sasl.mechanism": "",
      "stream.kafka.password": "",
      "stream.kafka.username": "",
      "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required \n username=\"\" \n password=\"\";"
   }

Property Descriptions

Property NameRequiredDescription
stream.kafka.broker.listYesList of Kafka brokers to connect to, typically in the format host1:port1,host2:port2.
security.protocolYesThe security protocol used to connect to Kafka (for example, SASL_SSL, PLAINTEXT). This parameter is mandatory when the Kafka cluster uses SSL or SASL.
sasl.mechanismYesThe SASL mechanism for authentication (for example, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512). This parameter is required when security.protocol is set to SASL_SSL or SASL_PLAINTEXT.
stream.kafka.usernameRequired when security.protocol is set to SASL_SSL or SASL_PLAINTEXT.The user name for authenticating with Kafka.
stream.kafka.passwordRequired when security.protocol is set to SASL_SSL or SASL_PLAINTEXT.The password for authenticating with Kafka.
sasl.jaas.configRequired when security.protocol is set to SASL_SSL or SASL_PLAINTEXT.The JAAS configuration string for SASL authentication.
The JAAS configuration string for SASL authentication.

When the PLAIN SASL Mechanism is used: “org.apache.kafka.common.security.plain.PlainLoginModule required \n username="" \n password="";”

When the SCRAM SASL Mechanism is used:
“org.apache.kafka.common.security.scram.ScramLoginModule required \n username=“USERNAME” \n password=“PASSWORD”;“

Connecting to Redpanda with SSL Security Protocol

To connect with SSL, you will need to obtain the bootstrap server endpoint, the server certificate, the client certificate, and the client key.

Use the following connection JSON when Redpanda is configured with the SSL security protocol enabled:

{
  "stream.kafka.broker.list": "",
  "security.protocol": "SSL",
  "stream.kafka.ssl.server.certificate": "",
  "stream.kafka.ssl.client.certificate": "",
  "stream.kafka.ssl.client.key": ""
}

Property Descriptions

PropertyRequiredDescription
stream.kafka.broker.listYesList of Kafka brokers to connect to, typically in the format host1:port1,host2:port2.
security.protocolYesDefines the security protocol used to connect to Kafka (e.g., SASL_SSL, SSL, PLAINTEXT). This parameter is mandatory when the Kafka cluster uses SSL or SASL.
stream.kafka.server.certificateOptionalCertificate for connecting to a Kafka cluster secured with SSL. Required when security.protocol is set to SSL.
stream.kafka.ssl.client.certificateOptionalCertificate for connecting to a Kafka cluster secured with SSL. Required when security.protocol is set to SSL.
stream.kafka.ssl.client.keyOptionalCertificate for connecting to a Kafka cluster secured with SSL. Required when security.protocol is set to SSL and stream.kafka.ssl.client.certificate is also set.

Additional Configuration

Other SSL-related configurations, such as "ssl.truststore.location" and "ssl.truststore.password", can be added to the JSON configuration as needed.

Step 5: Test the Connection and Configure Data Ingestion

After you have configured the connection properties, test the connection to ensure it is working.

When the connection is successful, use the following JSON to configure the source’s Kafka topic and data format:

{
  "stream.kafka.decoder.prop.format": "",
  "stream.kafka.decoder.class.name": "",
  "stream.kafka.topic.name": "",
  "stream.kafka.consumer.type": "lowlevel",
  "stream.kafka.consumer.factory.class.name": "ai.startree.pinot.plugin.stream.kafka20.ConfluentKafkaConsumerFactory",
  "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
}

Property Descriptions

PropertyRequiredDescription
stream.kafka.decoder.prop.formatYesThe input files are in CSV format. Supported values include csv, json, avro, parquet, etc.
stream.kafka.decoder.class.nameYesThe class name of the decoder used for Kafka message parsing. It is set based on the message format and schema.
Examples:
* For JSON Messages use org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder
* For AVRO Messages use org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder
* For PROTO Messages use org.apache.pinot.plugin.inputformat.protobuf.KafkaConfluentSchemaRegistryProtoBufMessageDecoder

See Message Decoders for more information.
stream.kafka.topic.nameYesThe name of the Kafka topic from which Pinot will consume data.
stream.kafka.consumer.typeYesThe type of Kafka consumer used in Apache Pinot. Use lowlevel for granular control of partitions and offsets.
stream.kafka.consumer.factory.class.nameYesThe Kafka consumer factory class to use. The specified class ai.startree.pinot.plugin.stream.kafka20.ConfluentKafkaConsumerFactory is the default Kafka consumer factory when using Kafka 2.0+ or Confluent Kafka clients.

Use ai.startree.pinot.plugin.stream.kafka10.ConfluentKafkaConsumerFactory for legacy Kafka (1.x) environments with Confluent client integration.
stream.kafka.consumer.prop.auto.offset.resetYesThe behavior when no committed offset exists or offsets are invalid (smallest, latest, none). Use smallest for backfilling or consuming all historical data. Use latest for real-time streaming.

Configure Record Reader

Additional Configuration

The parameters mentioned above are the minimum-required parameters to establish the connection. Apache Pinot also provides additional configuration options to fine-tune and control the ingestion process. For a complete list of configuration parameters, refer to the Apache Pinot documentation. In addition, Kafka Consumer properties can also be used.

Step 6: Sample Data

Click Show Sample Data to see a preview of the source data.