To learn how to perform partial upserts on a real-time table, watch the following video, or complete the tutorial below.
VIDEO
To get a better understanding of upserts and how they work, see the Full Upserts documentation.
Prerequisites
To follow the code examples in this guide, you must install Docker locally and download recipes.
Navigate to recipe
If you haven’t already, download recipes.
In terminal, go to the recipe by running the following command:
cd pinot-recipes/recipes/upserts-partial
Launch Pinot Cluster
You can spin up a Pinot Cluster by running the following command:
This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Zookeeper, and Kafka. You can find the docker-compose.yml file on GitHub.
Create meetup_rsvp
Kafka topic
This recipe explores capturing RSVPs for meetup events. A meetup can be hosted by multiple groups, at multiple venues.
Let’s create the meetup_rsvp
topic in Kafka to record the RSVPs.
docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \
--create --bootstrap-server kafka:9092 --topic meetup_rsvp
Pinot Schema and Table
Now let’s create a Pinot Schema and Table.
First, the schema:
{
"schemaName" : "meetup_rsvp" ,
"primaryKeyColumns" : [
"event_id"
],
"dimensionFieldSpecs" : [
{
"name" : "event_id" ,
"dataType" : "INT"
},
{
"name" : "venue_name" ,
"dataType" : "STRING" ,
"singleValueField" : false
},
{
"dataType" : "STRING" ,
"name" : "group_name" ,
"singleValueField" : false
}
],
"metricFieldSpecs" : [
{
"name" : "rsvp_count" ,
"dataType" : "INT"
}
],
"dateTimeFieldSpecs" : [
{
"name" : "mtime" ,
"dataType" : "LONG" ,
"format" : "1:MILLISECONDS:EPOCH" ,
"granularity" : "1:MILLISECONDS"
}
]
}
config/meetup_rsvp_schema.json
Note that, the event_id
column is appointed as the primary key, which is mandatory for upserts in Pinot.
"primaryKeyColumns" : [
"event_id"
]
We’ll also have the following table config:
{
"tableName" : "meetup_rsvp" ,
"tableType" : "REALTIME" ,
"segmentsConfig" : {
"timeColumnName" : "mtime" ,
"timeType" : "MILLISECONDS" ,
"retentionTimeUnit" : "DAYS" ,
"retentionTimeValue" : "1" ,
"segmentPushType" : "APPEND" ,
"segmentAssignmentStrategy" : "BalanceNumSegmentAssignmentStrategy" ,
"schemaName" : "meetup_rsvp" ,
"replicasPerPartition" : "1"
},
"tenants" : {},
"tableIndexConfig" : {
"loadMode" : "MMAP" ,
"streamConfigs" : {
"streamType" : "kafka" ,
"stream.kafka.consumer.type" : "lowLevel" ,
"stream.kafka.topic.name" : "meetup_rsvp" ,
"stream.kafka.decoder.class.name" : "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder" ,
"stream.kafka.hlc.zk.connect.string" : "zookeeper:2181/kafka" ,
"stream.kafka.consumer.factory.class.name" : "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory" ,
"stream.kafka.zk.broker.url" : "zookeeper:2181/kafka" ,
"stream.kafka.broker.list" : "kafka:9093" ,
"realtime.segment.flush.threshold.size" : 30 ,
"realtime.segment.flush.threshold.rows" : 30
},
"nullHandlingEnabled" : true
},
"metadata" : {
"customConfigs" : {}
},
"routing" : {
"instanceSelectorType" : "strictReplicaGroup"
},
"upsertConfig" : {
"mode" : "PARTIAL" ,
"partialUpsertStrategies" : {
"rsvp_count" : "INCREMENT" ,
"group_name" : "UNION" ,
"venue_name" : "APPEND"
}
}
}
config/orders_table.json
In this table configuration, we only enable upserts on three columns: rsvp_count
, group_name
, and venue_name
. Hence, the mode
is set to PARTIAL
.
"upsertConfig" : {
"mode" : "PARTIAL" ,
"partialUpsertStrategies" : {
"rsvp_count" : "INCREMENT" ,
"group_name" : "UNION" ,
"venue_name" : "APPEND"
}
}
When using the APPEND
strategy, you must make sure that the corresponding column can accept multiple values, by specifying the following config:
"singleValueField": false
When RSVP events are ingested with unique event_id
values, rsvp_count
will be incremented.
The name of the group will added to the group_name
column, if not exists.
Also, the venue will be appended to the venue_name
.
You can create the table and schema by running the following command:`
docker run \
--network upserts \
-v $PWD /config:/config \
apachepinot/pinot:1.0.0 AddTable \
-tableConfigFile /config/meetup_rsvp_table.json \
-controllerHost "pinot-controller" \
-schemaFile /config/meetup_rsvp_schema.json \
-exec
Produce some RSVP events
We will simulate a few RSVPs by publishing the following events to the meetup_rsvp
Kafka topic.
echo - '
{"event_id":3,"venue_name":"Indonesia","group_name":"C","rsvp_count":1,"mtime":"1635140709"}
{"event_id":3,"venue_name":"China","group_name":"C","rsvp_count":1,"mtime":"1646067689"}
{"event_id":2,"venue_name":"France","group_name":"C","rsvp_count":1,"mtime":"1616646138"}
{"event_id":1,"venue_name":"Myanmar","group_name":"B","rsvp_count":1,"mtime":"1632930567"}
{"event_id":1,"venue_name":"Hungary","group_name":"A","rsvp_count":1,"mtime":"1643574332"}
{"event_id":1,"venue_name":"China","group_name":"B","rsvp_count":1,"mtime":"1645779637"}
' | kcat -P -b localhost:9092 -t meetup_rsvp
Querying
Once that’s completed, navigate to localhost:9000/#/query and click on the meetup_rsvp
table or copy/paste the following query:
select event_id, sum (rsvp_count) as total_rsvp
from meetup_rsvp
group by event_id
order by total_rsvp desc
limit 10