To learn how to perform partial upserts on a real-time table, watch the following video, or complete the tutorial below.

To get a better understanding of upserts and how they work, see the Full Upserts documentation.

Pinot Version1.0.0
Codestartreedata/pinot-recipes/partial-upserts

Prerequisites

To follow the code examples in this guide, you must install Docker locally and download recipes.

  1. If you haven’t already, download recipes.
  2. In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/upserts-partial

Launch Pinot Cluster

You can spin up a Pinot Cluster by running the following command:

docker-compose up

This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Zookeeper, and Kafka. You can find the docker-compose.yml file on GitHub.

Create meetup_rsvp Kafka topic

This recipe explores capturing RSVPs for meetup events. A meetup can be hosted by multiple groups, at multiple venues.

Let’s create the meetup_rsvp topic in Kafka to record the RSVPs.

docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \
--create --bootstrap-server kafka:9092 --topic meetup_rsvp

Pinot Schema and Table

Now let’s create a Pinot Schema and Table.

First, the schema:

{
    "schemaName": "meetup_rsvp",
    "primaryKeyColumns": [
        "event_id"
    ],
    "dimensionFieldSpecs": [
        {
            "name": "event_id",
            "dataType": "INT"
        },
        {
            "name": "venue_name",
            "dataType": "STRING",
            "singleValueField": false
        },
        {
          "dataType": "STRING",
          "name": "group_name",
          "singleValueField": false
        }
    ],
    "metricFieldSpecs": [
        {
            "name": "rsvp_count",
            "dataType": "INT"
        }
    ],
    "dateTimeFieldSpecs": [
        {
            "name": "mtime",
            "dataType": "LONG",
            "format": "1:MILLISECONDS:EPOCH",
            "granularity": "1:MILLISECONDS"
          }
    ]
}

config/meetup_rsvp_schema.json Note that, the event_id column is appointed as the primary key, which is mandatory for upserts in Pinot.

"primaryKeyColumns": [
        "event_id"
]

We’ll also have the following table config:

{
    "tableName": "meetup_rsvp",
    "tableType": "REALTIME",
    "segmentsConfig": {
        "timeColumnName": "mtime",
        "timeType": "MILLISECONDS",
        "retentionTimeUnit": "DAYS",
        "retentionTimeValue": "1",
        "segmentPushType": "APPEND",
        "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
        "schemaName": "meetup_rsvp",
        "replicasPerPartition": "1"
    },
    "tenants": {},
    "tableIndexConfig": {
        "loadMode": "MMAP",
        "streamConfigs": {
            "streamType": "kafka",
            "stream.kafka.consumer.type": "lowLevel",
            "stream.kafka.topic.name": "meetup_rsvp",
            "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
            "stream.kafka.hlc.zk.connect.string": "zookeeper:2181/kafka",
            "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
            "stream.kafka.zk.broker.url": "zookeeper:2181/kafka",
            "stream.kafka.broker.list": "kafka:9093",
            "realtime.segment.flush.threshold.size": 30,
            "realtime.segment.flush.threshold.rows": 30
        },
        "nullHandlingEnabled": true
    },
    "metadata": {
        "customConfigs": {}
    },
    "routing": {
        "instanceSelectorType": "strictReplicaGroup"
    },
    "upsertConfig": {
        "mode": "PARTIAL",
        "partialUpsertStrategies": {
            "rsvp_count": "INCREMENT",
            "group_name": "UNION",
            "venue_name": "APPEND"
        }
    }
}

config/orders_table.json

In this table configuration, we only enable upserts on three columns: rsvp_count, group_name, and venue_name. Hence, the mode is set to PARTIAL.

"upsertConfig": {
      "mode": "PARTIAL",
      "partialUpsertStrategies": {
          "rsvp_count": "INCREMENT",
          "group_name": "UNION",
          "venue_name": "APPEND"
      }
  }

When using the APPEND strategy, you must make sure that the corresponding column can accept multiple values, by specifying the following config:

"singleValueField": false

When RSVP events are ingested with unique event_id values, rsvp_count will be incremented. The name of the group will added to the group_name column, if not exists. Also, the venue will be appended to the venue_name.

You can create the table and schema by running the following command:`

docker run \
   --network upserts \
   -v $PWD/config:/config \
   apachepinot/pinot:1.0.0 AddTable \
  -tableConfigFile /config/meetup_rsvp_table.json \
  -controllerHost "pinot-controller" \
  -schemaFile /config/meetup_rsvp_schema.json \
  -exec

Produce some RSVP events

We will simulate a few RSVPs by publishing the following events to the meetup_rsvp Kafka topic.

echo - '
{"event_id":3,"venue_name":"Indonesia","group_name":"C","rsvp_count":1,"mtime":"1635140709"}
{"event_id":3,"venue_name":"China","group_name":"C","rsvp_count":1,"mtime":"1646067689"}
{"event_id":2,"venue_name":"France","group_name":"C","rsvp_count":1,"mtime":"1616646138"}
{"event_id":1,"venue_name":"Myanmar","group_name":"B","rsvp_count":1,"mtime":"1632930567"}
{"event_id":1,"venue_name":"Hungary","group_name":"A","rsvp_count":1,"mtime":"1643574332"}
{"event_id":1,"venue_name":"China","group_name":"B","rsvp_count":1,"mtime":"1645779637"}
' | kcat -P -b localhost:9092 -t meetup_rsvp

Querying

Once that’s completed, navigate to localhost:9000/#/query and click on the meetup_rsvp table or copy/paste the following query:

select event_id, sum(rsvp_count) as total_rsvp
from meetup_rsvp 
group by event_id
order by total_rsvp desc
limit 10