Step 1: In the Data Portal, click Tables and then click Create Table.
Step 2: Select Custom as the Data Source.
Native support for AWS MSK is coming soon.Step 3: Create a New Connection.
Select “Streaming” as the Data Source Category.Connect to AWS MSK Using IAM-Based SASL Authentication
Use the following JSON configuration when MSK is set up with IAM-based SASL authentication (AWS_MSK_IAM) and TLS (SASL_SSL).Property Descriptions
| Property | Required | Description |
|---|---|---|
authentication.type | Yes | Authentication mode for Kafka/MSK. Set to SASL to enable SASL-based authentication. |
security.protocol | Yes | Kafka security protocol. Use SASL_SSL to enable TLS + SASL for MSK IAM authentication. |
sasl.mechanism | Yes | SASL mechanism. For AWS MSK IAM-based auth, set to AWS_MSK_IAM. |
sasl.jaas.config | Yes | JAAS configuration string for the IAM login module. Must use org.apache.pinot.shaded.software.amazon.msk.auth.iam.IAMLoginModule and specify awsRoleArn (or equivalent credentials strategy). |
sasl.client.callback.handler.class | Yes | Callback handler implementation for MSK IAM authentication. Use org.apache.pinot.shaded.software.amazon.msk.auth.iam.IAMClientCallbackHandler. |
stream.kafka.broker.list | Yes | Comma-separated list of MSK bootstrap brokers with ports. Example: b-1.example.amazonaws.com:9098,b-2.example.amazonaws.com:9098. Should match the Broker URL / bootstrap servers configured in the UI. |
stream.kafka.topic.name | Yes | Name of the MSK topic to consume from. Example: sample_topic. |
stream.kafka.consumer.type | Yes | Kafka consumer mode. lowlevel uses Pinot’s low level consumer for fine-grained partition/offset control. |
stream.kafka.consumer.factory.class.name | Yes | Kafka consumer factory implementation. For Kafka 2.0+ clusters (including MSK), use org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory. |
stream.kafka.decoder.class.name | Yes | The class name of the decoder used for Kafka message parsing. It is set based on the message format and schema. Examples: * For JSON Messages use org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder * For AVRO Messages use org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder * For PROTO Messages use org.apache.pinot.plugin.inputformat.protobuf.KafkaConfluentSchemaRegistryProtoBufMessageDecoderSee Message Decoders for more information. |
stream.kafka.decoder.prop.format | Yes | Format of the messages in Kafka. Supported values include json, avro, proto etc. |
stream.kafka.consumer.prop.auto.offset.reset | No | Defines starting offset if there is no committed one. smallest (Kafka 0.8-style) = earliest offset. |
realtime.segment.flush.threshold.rows | Yes* | Row-based flush threshold. 0 disables row-based flushing and makes size/time the only triggers. |
realtime.segment.flush.threshold.segment.size | Yes | Target segment size before flush (e.g., 200M). Controls memory usage and segment count. |
realtime.segment.flush.threshold.time | Yes | Maximum time before a segment flush, even if row/size thresholds aren’t met. Example: 24h. |
| ⸻ |
Step 6: Preview the Sample Data
Click Show Sample Data to see a preview of the source data.Next Step
Proceed with Data Modeling.

