Connect to WarpStream
Step 1: In the Data Portal, click Tables and then click Create Table.
Step 2: Select Kafka as the Data Source.
Step 3: Create a New Connection.
Click New Connection. If you want to use an existing connection, select the connection from the list and proceed to Step 5.
Enter a Source Name for the new connection and specify the Broker URL.
Select the Security Protocol to be used for authentication, from the drop-down list.
Step 4: Configure Connection Parameters
Add the connection parameters according to your authentication method.
Connecting to Kafka with SASL_PLAIN or SASL_SCRAM
Use the following connection JSON when Kafka is configured with the SASL_PLAINTEXT or SASL_SSL security protocol enabled.
Property Descriptions
Property Name | Required | Description |
---|---|---|
stream.kafka.broker.list | Yes | List of Kafka brokers to connect to, typically in the format host1:port1,host2:port2. |
security.protocol | Yes | The security protocol used to connect to Kafka (for example, SASL_SSL, PLAINTEXT). This parameter is mandatory when the Kafka cluster uses SASL. |
sasl.mechanism | Yes | The SASL mechanism for authentication (for example, PLAIN, SCRAM-SHA-512). This parameter is required when security.protocol is set to SASL_SSL or SASL_PLAINTEXT. |
stream.kafka.username | Required when security.protocol is set to SASL_SSL or SASL_PLAINTEXT | The user name for authenticating with Kafka. |
stream.kafka.password | Required when security.protocol is set to SASL_SSL or SASL_PLAINTEXT | The password for authenticating with Kafka. |
sasl.jaas.config | Required when security.protocol is set to SASL_SSL or SASL_PLAINTEXT | The JAAS configuration string for SASL authentication. When the PLAIN SASL Mechanism is used: “org.apache.kafka.common.security.plain.PlainLoginModule required \n username="" \n password="";” When the SCRAM SASL Mechanism is used: “org.apache.kafka.common.security.scram.ScramLoginModule required \n username=“USERNAME” \n password=“PASSWORD”;“ |
Connecting to Kafka with SSL Security Protocol
See prerequisites for SSL for steps to obtain the bootstrap server endpoint, the server certificate, the client certificate, and the client key, and to update an SSL certificate.
Use the following connection JSON when WarpStream is configured with the SSL security protocol enabled:
Property Descriptions
Property | Required | Description |
---|---|---|
stream.kafka.broker.list | Yes | List of Kafka brokers to connect to, typically in the format host1:port1,host2:port2 . |
security.protocol | Yes | Defines the security protocol used to connect to Kafka (e.g., SASL_SSL , SSL , PLAINTEXT ). This parameter is mandatory when the Kafka cluster uses SSL or SASL. |
stream.kafka.server.certificate | Optional | Certificate for connecting to a Kafka cluster secured with SSL. Required when security.protocol is set to SSL . |
stream.kafka.ssl.client.certificate | Optional | Certificate for connecting to a Kafka cluster secured with SSL. Required when security.protocol is set to SSL . |
stream.kafka.ssl.client.key | Optional | Certificate for connecting to a Kafka cluster secured with SSL. Required when security.protocol is set to SSL and stream.kafka.ssl.client.certificate is also set. |
Additional Configuration
Other SSL-related configurations, such as "ssl.truststore.location"
and "ssl.truststore.password"
, can be added to the JSON configuration as needed.
Step 5: Test the Connection and Configure Data Ingestion
After you have configured the connection properties, test the connection to ensure it is working.
When the connection is successful, use the following JSON to configure the source’s Kafka topic and data format:
Property Descriptions
Property | Required | Description |
---|---|---|
stream.kafka.decoder.prop.format | Yes | The input files are in CSV format. Supported values include csv , json , avro , parquet , etc. |
stream.kafka.decoder.class.name | Yes | The class name of the decoder used for Kafka message parsing. It is set based on the message format and schema. Examples: * For JSON Messages use org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder * For AVRO Messages use org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder * For PROTO Messages use org.apache.pinot.plugin.inputformat.protobuf.KafkaConfluentSchemaRegistryProtoBufMessageDecoder See Message Decoders for more information. |
stream.kafka.topic.name | Yes | The name of the Kafka topic from which Pinot will consume data. |
stream.kafka.consumer.type | Yes | The type of Kafka consumer used in Apache Pinot. Use lowlevel for granular control of partitions and offsets. |
stream.kafka.consumer.factory.class.name | Yes | The Kafka consumer factory class to use. The specified class ai.startree.pinot.plugin.stream.kafka20.ConfluentKafkaConsumerFactory is the default Kafka consumer factory when using Kafka 2.0+ or Confluent Kafka clients. Use ai.startree.pinot.plugin.stream.kafka10.ConfluentKafkaConsumerFactory for legacy Kafka (1.x) environments with Confluent client integration. |
stream.kafka.consumer.prop.auto.offset.reset | Yes | The behavior when no committed offset exists or offsets are invalid (smallest , latest , none ). Use smallest for backfilling or consuming all historical data. Use latest for real-time streaming. |
Additional Configuration
The parameters mentioned above are the minimum-required parameters to establish the connection. Apache Pinot also provides additional configuration options to fine-tune and control the ingestion process. For a complete list of configuration parameters, refer to the Apache Pinot documentation. In addition, Kafka Consumer properties can also be used.
Configure Record Reader
Additional Configuration
The parameters mentioned above are the minimum-required parameters to establish the connection. Apache Pinot also provides additional configuration options to fine-tune and control the ingestion process. For a complete list of configuration parameters, refer to the Apache Pinot documentation. In addition, Kafka Consumer properties can also be used.
Step 6: Sample Data
Click Show Sample Data to see a preview of the source data.
Next Step
Proceed with Data Modeling.