> ## Documentation Index
> Fetch the complete documentation index at: https://docs.startree.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Amazon DynamoDB Message Decoder

DynamoDB provides Change Data Capture (CDC) capabilities through DynamoDB Streams, which capture data modifications in DynamoDB tables. The generated CDC data is written to streaming systems like Kafka and made available in real-time for downstream applications.

Native support for the DynamoDB data format in Pinot allows users to consume CDC data in real-time from DynamoDB tables without complex transformations. As long as the data is available in any of Pinot's supported streaming connectors, it can be ingested into a Pinot table.

## DynamoDB Message Decoder Configurations

To configure a Pinot table to use a DynamoDB formatted streaming source, Pinot provides a decoder - `ai.startree.pinot.plugin.inputformat.dynamodb.DynamoDbMessageDecoder`.

The properties of this decoder are listed below:

| Configuration Key                      | Description                                                                                                                                                                                                   |
| :------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| `decoder.class.name`                   | Specifies the primary decoder for DynamoDB messages. Set this to `ai.startree.pinot.plugin.inputformat.dynamodb.DynamoDbMessageDecoder` to enable DynamoDB CDC ingestion.                                     |
| `dynamodb.timeColumnName`              | The column name where the `ApproximateCreationDateTime` from the DynamoDB JSON record should be stored. This timestamp can be used as the table's default time column.                                        |
| `dynamodb.deleteColumnName`            | The column name that will be set to `true` when a `REMOVE` record is received from DynamoDB, and `false` otherwise. This helps track deletion events in your Pinot table.                                     |
| `dynamodb.envelope.decoder.class.name` | Specifies the underlying decoder used to parse the message format. Since DynamoDB messages are in JSON format, this should typically be set to `org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder`. |
| `dynamodb.envelope.decoder.prop.`      | Prefix to be used for any properties associated with the envelope decoder class.                                                                                                                              |

## Configuration Example

When ingesting a DynamoDB formatted payload from a stream, the decoder used for the stream must be `ai.startree.pinot.plugin.inputformat.dynamodb.DynamoDbMessageDecoder`.

The following is an example stream config where the Pinot table is consuming from a JSON-encoded Kafka topic containing DynamoDB CDC payload:

```json theme={null}
{
  "streamConfigs": {
    "streamType": "kafka",
    "stream.kafka.consumer.type": "simple",
    "stream.kafka.topic.name": "dynamodb-cdc-topic",
    "stream.kafka.decoder.class.name": "ai.startree.pinot.plugin.inputformat.dynamodb.DynamoDbMessageDecoder",
    "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
    "stream.kafka.zk.broker.url": "localhost:2181",
    "stream.kafka.broker.list": "localhost:9092",
    "realtime.segment.flush.threshold.time": "1h",
    "realtime.segment.flush.threshold.size": "100",
    "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
    "stream.kafka.decoder.prop.dynamodb.timeColumnName": "created_at_timestamp",
    "stream.kafka.decoder.prop.dynamodb.deleteColumnName": "is_deleted",
    "stream.kafka.decoder.prop.dynamodb.envelope.decoder.class.name": "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder"
  }
}
```

In the above sample, the Kafka consumer factory used is `org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory` and the decoder associated with this stream is `ai.startree.pinot.plugin.inputformat.dynamodb.DynamoDbMessageDecoder`.

The configuration uses several key components:

* The primary decoder `ai.startree.pinot.plugin.inputformat.dynamodb.DynamoDbMessageDecoder` handles the DynamoDB-specific message format
* The `dynamodb.timeColumnName` is populated with the `ApproximateCreationDateTime` from the DynamoDB JSON record
* The `dynamodb.deleteColumnName` is set to `true` when `REMOVE` records are received from DynamoDB
* The `dynamodb.envelope.decoder.class.name` is set to `org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder` since the underlying DynamoDB messages are in JSON format

## Simplified Configuration

The DynamoDB Message Decoder significantly simplifies the configuration needed for ingesting DynamoDB CDC data. Without this decoder, you would need to specify multiple JSON path transformations for each column. Here's a comparison:

### Without DynamoDB Decoder

```json theme={null}
{
  "ingestionConfig": {
    "transformConfigs": [
      {
        "columnName": "rsvp_count",
        "transformFunction": "JSONPATHLONG(dynamodb, '$.NewImage.rsvp_count.N')"
      },
      {
        "columnName": "venue_name",
        "transformFunction": "JSONPATHSTRING(dynamodb, '$.NewImage.venue_name.S')"
      },
      {
        "columnName": "group_city",
        "transformFunction": "JSONPATHSTRING(dynamodb, '$.NewImage.group_city.S')"
      }
      // ... more transform configs for each column
    ]
  }
}
```

### With DynamoDB Decoder

```json theme={null}
{
  "streamConfigs": {
    "stream.kafka.decoder.class.name": "ai.startree.pinot.plugin.inputformat.dynamodb.DynamoDbMessageDecoder",
    "stream.kafka.decoder.prop.dynamodb.timeColumnName": "created_at_timestamp",
    "stream.kafka.decoder.prop.dynamodb.deleteColumnName": "is_deleted",
    "stream.kafka.decoder.prop.dynamodb.envelope.decoder.class.name": "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder"
  }
}
```

The decoder automatically handles the complex DynamoDB JSON structure, including the type indicators (S, N, B) and nested structure, making it much easier to ingest DynamoDB CDC data into Pinot.

<note>
  Properties related to a stream's decoder are prefixed with `stream.$streamType.decoder.prop.`. For the Dynamodb message decoder all properties begin with `stream.kafka.decoder.prop`, as seen above in the `stream.kafka.decoder.prop.dynamodb.source`,
  `stream.kafka.decoder.prop.dynamodb.timeColumn` and `stream.kafka.decoder.prop.dynamodb.envelope.decoder.class.name` properties.
</note>
