- Upsert is usually used when we are capturing events that contain a state transition. This could be the location of a driver or the status of an order. When querying these events, we want to return the latest state, grouped by a primary key.
- The real-time to offline job is used to move segments from a real-time table to an offline table.
The main thing to keep in mind when combining these features is that upsert functionality only applies to real-time tables.As soon as those segments are moved to an offline table, the upsert logic is no longer applied at query time.
We will need to backfill the offline segments created by the real-time to offline job to achieve upsert-like queries.
Prerequisites
To follow the code examples in this guide, you must install Docker locally and download recipes.Navigate to recipe
- If you haven’t already, download recipes.
- In terminal, go to the recipe by running the following command:
Launch Pinot Cluster
You can spin up a Pinot Cluster by running the following command:Pinot Schema and Tables
Now let’s create a Pinot Schema, as well as real-time and offline tables. Pinot is going to take care of populating data into the offline table, but it still expects us to configure the table.Schema
Our schema is going to capture some order events, and looks like this:Offline Table
The offline table config is defined below:Real-Time Table
And the real-time table is defined below:The
realtime.segment.flush.threshold.rows
config is intentionally set to an extremely small value so that the segment will be committed after 10,000 records have been ingested.
In a production system this value should be set much higher, as described in the configuring segment threshold guide.validationTypesToSkip=All
when calling this API endpoint.
Ingesting Data
Let’s ingest data into theevents
Kafka topic, by running the following:
orders
table:
amount | customer_id | order_id | order_status | ts |
---|---|---|---|---|
29.35 | 104 | 1 | COMPLETED | 1632463391000 |
3.24 | 105 | 2 | COMPLETED | 1632467065000 |
9.77 | 103 | 3 | OPEN | 1632467066000 |
55.52 | 104 | 4 | CANCELLED | 1632467070000 |
12.22 | 105 | 5 | COMPLETED | 1632677270000 |
13.94 | 106 | 6 | OPEN | 1632677270400 |
20.32 | 107 | 7 | OPEN | 1632677270403 |
45.11 | 108 | 8 | OPEN | 1632677270508 |
129.22 | 109 | 9 | OPEN | 1632677270699 |
Scheduling the RT2OFF Job
The Real-Time to Offline Job can be scheduled automatically through the real-time table config or manually via the REST API. We can trigger it manually by running the following command:amount | customer_id | order_id | order_status | ts |
---|---|---|---|---|
29.35 | 104 | 1 | OPEN | 1632463351000 |
29.35 | 104 | 1 | IN_TRANSIT | 1632463361000 |
29.35 | 104 | 1 | COMPLETED | 1632463391000 |
3.24 | 105 | 2 | COMPLETED | 1632467065000 |
9.77 | 103 | 3 | OPEN | 1632467066000 |
55.52 | 104 | 4 | OPEN | 1632467068000 |
55.52 | 104 | 4 | CANCELLED | 1632467070000 |
order_id=1
and order_id=4
, which we’ll need to sort out.
The time boundary that indicates where records should be read from is 1632463470000
.
This means that records with a timestamp less than or equal to this value will come from the offline table and records with a timestamp greater than this value will come from the real-time table.
You can see which records will be returned from our newly created offline segment by running the following query:
amount | customer_id | order_id | order_status | ts |
---|---|---|---|---|
29.35 | 104 | 1 | OPEN | 1632463351000 |
29.35 | 104 | 1 | IN_TRANSIT | 1632463361000 |
29.35 | 104 | 1 | COMPLETED | 1632463391000 |
order_id=1
and order_id=4
, we’ll only see an impact on queries against the orders
table for order_id=1
until the time boundary increases.
You can see which records are returned from the orders
table by running the following query:
amount | customer_id | order_id | order_status | ts |
---|---|---|---|---|
29.35 | 104 | 1 | OPEN | 1632463351000 |
29.35 | 104 | 1 | IN_TRANSIT | 1632463361000 |
29.35 | 104 | 1 | COMPLETED | 1632463391000 |
55.52 | 104 | 4 | CANCELLED | 1632467070000 |
Replacing offline segment
Let’s now backfill the records in the offline segment with the documents indata/orders.json
that only contain the most recent event for each order:
orders_1632463351000_1632467070000_0
:
orders
table:
amount | customer_id | order_id | order_status | ts |
---|---|---|---|---|
29.35 | 104 | 1 | COMPLETED | 1632463391000 |
55.52 | 104 | 4 | CANCELLED | 1632467070000 |
order_id=4
has been fixed by querying the offline table:
amount | customer_id | order_id | order_status | ts |
---|---|---|---|---|
29.35 | 104 | 1 | COMPLETED | 1632463391000 |
3.24 | 105 | 2 | COMPLETED | 1632467065000 |
9.77 | 103 | 3 | OPEN | 1632467066000 |
55.52 | 104 | 4 | CANCELLED | 1632467070000 |