> ## Documentation Index
> Fetch the complete documentation index at: https://docs.startree.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Upserts and the real-time to offline job

In this guide, we'll learn how to use the combination of Pinot's upsert and real-time to offline job features.

Let's quickly recap these features:

* Upsert is usually used when we are capturing events that contain a state transition.
  This could be the location of a driver or the status of an order.
  When querying these events, we want to return the latest state, grouped by a primary key.
* The real-time to offline job is used to move segments from a real-time table to an offline table.

<Warning>
  The main thing to keep in mind when combining these features is that upsert functionality only applies to real-time tables.

  As soon as those segments are moved to an offline table, the upsert logic is no longer applied at query time.
  We will need to backfill the offline segments created by the real-time to offline job to achieve upsert-like queries.
</Warning>

## Prerequisites

To follow the code examples in this guide, you must [install Docker](https://docs.docker.com/get-docker/) locally and download recipes.

## Navigate to recipe

1. If you haven't already, download recipes.
2. In terminal, go to the recipe by running the following command:

```bash theme={null}
cd pinot-recipes/recipes/upserts-real-time-offline-job
```

## Launch Pinot Cluster

You can spin up a Pinot Cluster by running the following command:

```bash theme={null}
docker compose up
```

This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Pinot Minion, Zookeeper, and Kafka. You can find the [docker-compose.yml](https://github.com/startreedata/pinot-recipes/blob/main/recipes/upserts-real-time-offline-job/docker-compose.yml) file on GitHub.

## Pinot Schema and Tables

Now let's create a Pinot Schema, as well as real-time and offline tables. Pinot is going to take care of populating data into the offline table, but it still expects us to configure the table.

### Schema

Our schema is going to capture some order events, and looks like this:

```json theme={null}
{
  "schemaName": "orders",
  "primaryKeyColumns": [
    "order_id"
  ],
  "dimensionFieldSpecs": [{
      "name": "order_id",
      "dataType": "INT"
    },
    {
      "name": "customer_id",
      "dataType": "INT"
    },
    {
      "name": "order_status",
      "dataType": "STRING"
    }
  ],
  "metricFieldSpecs": [{
    "name": "amount",
    "dataType": "FLOAT"
  }],
  "dateTimeFieldSpecs": [{
    "name": "ts",
    "dataType": "LONG",
    "format": "1:MILLISECONDS:EPOCH",
    "granularity": "1:MILLISECONDS"
  }]
}
```

*config/orders\_schema.json*

### Offline Table

The offline table config is defined below:

```json theme={null}
{
  "tableName": "orders",
  "tableType": "OFFLINE",
  "segmentsConfig": {
    "timeColumnName": "ts",
    "schemaName": "orders",
    "replication": "1",
    "replicasPerPartition": "1"
  },
  "ingestionConfig": {
    "batchIngestionConfig": {
      "segmentIngestionType": "APPEND",
      "segmentIngestionFrequency": "HOURLY"
    }
  },
  "tableIndexConfig": {
    "loadMode": "MMAP"
  },
  "tenants": {},
  "metadata": {}
}
```

*config/orders\_offline\_table.json*

You can create the table and schema by running the following command:

```bash theme={null}
docker run \
   --network upserts \
   -v $PWD/config:/config \
   apachepinot/pinot:1.0.0 AddTable \
     -schemaFile /config/orders_schema.json \
     -tableConfigFile /config/orders_offline_table.json \
     -controllerHost "pinot-controller" \
    -exec
```

### Real-Time Table

And the real-time table is defined below:

```json theme={null}
{
  "tableName": "orders",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "timeColumnName": "ts",
    "timeType": "MILLISECONDS",
    "segmentPushType": "APPEND",
    "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
    "schemaName": "orders",
    "replicasPerPartition": "1"
  },
  "task": {
    "taskTypeConfigsMap": {
      "RealtimeToOfflineSegmentsTask": {
        "bucketTimePeriod": "2h",
        "bufferTimePeriod": "1m"
      }
    }
  },
  "tenants": {},
  "tableIndexConfig": {
    "loadMode": "MMAP",
    "streamConfigs": {
      "streamType": "kafka",
      "stream.kafka.consumer.type": "lowLevel",
      "stream.kafka.topic.name": "orders",
      "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
      "stream.kafka.hlc.zk.connect.string": "zookeeper:2181/kafka",
      "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
      "stream.kafka.zk.broker.url": "zookeeper:2181/kafka",
      "stream.kafka.broker.list": "kafka:9093",
      "realtime.segment.flush.threshold.rows": 5
    }
  },
  "metadata": {
    "customConfigs": {}
  },
  "routing": {
    "instanceSelectorType": "strictReplicaGroup"
  },
  "upsertConfig": {
    "mode": "FULL"
  }
}
```

*config/orders\_table.json*

<Warning>
  The `realtime.segment.flush.threshold.rows` config is intentionally set to an extremely small value so that the segment will be committed after 10,000 records have been ingested.
  In a production system this value should be set much higher, as described in the [configuring segment threshold](/recipes/configuring-segment-threshold) guide.
</Warning>

Create the table by running the following command:

```bash theme={null}
wget -q --method=POST "http://localhost:9000/tables?validationTypesToSkip=ALL" \
     --header="accept: application/json" \
     --header="Content-Type: application/json" \
     --body-file=config/orders_table.json \
     -O - 
```

You **must** pass in `validationTypesToSkip=All` when calling this API endpoint.

## Ingesting Data

Let's ingest data into the `events` Kafka topic, by running the following:

```bash theme={null}
echo -e '
{"order_id":1,"customer_id":104,"order_status":"OPEN","amount":29.35,"ts":"1632463351000"}
{"order_id":1,"customer_id":104,"order_status":"IN_TRANSIT","amount":29.35,"ts":"1632463361000"}
{"order_id":1,"customer_id":104,"order_status":"COMPLETED","amount":29.35,"ts":"1632463391000"}
{"order_id":2,"customer_id":105,"order_status":"COMPLETED","amount":3.24,"ts":"1632467065000"}
{"order_id":3,"customer_id":103,"order_status":"OPEN","amount":9.77,"ts":"1632467066000"}
{"order_id":4,"customer_id":104,"order_status":"OPEN","amount":55.52,"ts":"1632467068000"}
{"order_id":4,"customer_id":104,"order_status":"CANCELLED","amount":55.52,"ts":"1632467070000"}
{"order_id":5,"customer_id":105,"order_status":"OPEN","amount":12.22,"ts":"1632667070000"}
{"order_id":5,"customer_id":105,"order_status":"IN_TRANSIT","amount":12.22,"ts":"1632667170000"}
{"order_id":5,"customer_id":105,"order_status":"COMPLETED","amount":12.22,"ts":"1632677270000"}
{"order_id":6,"customer_id":106,"order_status":"OPEN","amount":13.94,"ts":"1632677270400"}
{"order_id":7,"customer_id":107,"order_status":"OPEN","amount":20.32,"ts":"1632677270403"}
{"order_id":8,"customer_id":108,"order_status":"OPEN","amount":45.11,"ts":"1632677270508"}
{"order_id":9,"customer_id":109,"order_status":"OPEN","amount":129.22,"ts":"1632677270699"}
' | kcat -P -b localhost:9092 -t orders
```

Data will make its way into the real-time table.
Let's query the `orders` table:

```sql theme={null}
select * 
from orders 
order by order_id 
limit 10
```

| amount | customer\_id | order\_id | order\_status | ts            |
| ------ | ------------ | --------- | ------------- | ------------- |
| 29.35  | 104          | 1         | COMPLETED     | 1632463391000 |
| 3.24   | 105          | 2         | COMPLETED     | 1632467065000 |
| 9.77   | 103          | 3         | OPEN          | 1632467066000 |
| 55.52  | 104          | 4         | CANCELLED     | 1632467070000 |
| 12.22  | 105          | 5         | COMPLETED     | 1632677270000 |
| 13.94  | 106          | 6         | OPEN          | 1632677270400 |
| 20.32  | 107          | 7         | OPEN          | 1632677270403 |
| 45.11  | 108          | 8         | OPEN          | 1632677270508 |
| 129.22 | 109          | 9         | OPEN          | 1632677270699 |

*Query Results*

As expected, we have the latest event for each order.

## Scheduling the RT2OFF Job

The Real-Time to Offline Job can be scheduled [automatically through the real-time table config](/recipes/real-time-offline-job-automatic-scheduling) or manually via the REST API. We can trigger it manually by running the following command:

```bash theme={null}
tableName="orders_REALTIME"
wget -q --method=POST \
  "http://localhost:9000/tasks/schedule?taskType=RealtimeToOfflineSegmentsTask&tableName=${tableName}" \
  --header="accept: application/json" -O -
```

You can see which records are available in the offline segment by running the following query:

```sql theme={null}
select * 
from orders_OFFLINE
```

| amount | customer\_id | order\_id | order\_status | ts            |
| ------ | ------------ | --------- | ------------- | ------------- |
| 29.35  | 104          | 1         | OPEN          | 1632463351000 |
| 29.35  | 104          | 1         | IN\_TRANSIT   | 1632463361000 |
| 29.35  | 104          | 1         | COMPLETED     | 1632463391000 |
| 3.24   | 105          | 2         | COMPLETED     | 1632467065000 |
| 9.77   | 103          | 3         | OPEN          | 1632467066000 |
| 55.52  | 104          | 4         | OPEN          | 1632467068000 |
| 55.52  | 104          | 4         | CANCELLED     | 1632467070000 |

*Query Results*

We have duplicate records for `order_id=1` and `order_id=4`, which we'll need to sort out.

The [time boundary](/recipes/time-boundary-hybrid-table) that indicates where records should be read from is `1632463470000`.
This means that records with a timestamp less than or equal to this value will come from the offline table and records with a timestamp greater than this value will come from the real-time table.

You can see which records will be returned from our newly created offline segment by running the following query:

```sql theme={null}
select * 
from orders_OFFLINE
WHERE ts <= 1632463470000
```

| amount | customer\_id | order\_id | order\_status | ts            |
| ------ | ------------ | --------- | ------------- | ------------- |
| 29.35  | 104          | 1         | OPEN          | 1632463351000 |
| 29.35  | 104          | 1         | IN\_TRANSIT   | 1632463361000 |
| 29.35  | 104          | 1         | COMPLETED     | 1632463391000 |

*Query Results*

This means that although we need to fix the records with `order_id=1` and `order_id=4`, we'll only see an impact on queries against the `orders` table for `order_id=1` until the time boundary increases.

You can see which records are returned from the `orders` table by running the following query:

```sql theme={null}
select * 
from orders
where order_id = 1 or order_id = 4
```

| amount | customer\_id | order\_id | order\_status | ts            |
| ------ | ------------ | --------- | ------------- | ------------- |
| 29.35  | 104          | 1         | OPEN          | 1632463351000 |
| 29.35  | 104          | 1         | IN\_TRANSIT   | 1632463361000 |
| 29.35  | 104          | 1         | COMPLETED     | 1632463391000 |
| 55.52  | 104          | 4         | CANCELLED     | 1632467070000 |

*Query Results*

## Replacing offline segment

Let's now backfill the records in the offline segment with the documents in `data/orders.json` that only contain the most recent event for each order:

```json theme={null}
{"order_id":1,"customer_id":104,"order_status":"COMPLETED","amount":29.35,"ts":"1632463391000"}
{"order_id":2,"customer_id":105,"order_status":"COMPLETED","amount":3.24,"ts":"1632467065000"}
{"order_id":3,"customer_id":103,"order_status":"OPEN","amount":9.77,"ts":"1632467066000"}
{"order_id":4,"customer_id":104,"order_status":"CANCELLED","amount":55.52,"ts":"1632467070000"}
```

*data/orders.json*

We'll ingest this file using the following ingestion spec:

```yaml theme={null}
executionFrameworkSpec:
  name: 'standalone'
  segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
  segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
segmentNameGeneratorSpec:
  type: fixed
  configs:
    segment.name: ${segmentName}
jobType: SegmentCreationAndTarPush
inputDirURI: '/data'
includeFileNamePattern: 'glob:**/orders.json'
outputDirURI: '/opt/pinot/data/orders/'
overwriteOutput: true
pinotFSSpecs:
  - scheme: file
    className: org.apache.pinot.spi.filesystem.LocalPinotFS
recordReaderSpec:
  dataFormat: 'json'
  className: 'org.apache.pinot.plugin.inputformat.json.JSONRecordReader'
tableSpec:
  tableName: 'orders'
pinotClusterSpecs:
  - controllerURI: 'http://pinot-controller:9000'
pushJobSpec:
  pushAttempts: 2
  pushRetryIntervalMillis: 1000
```

*config/job-spec.yml*

Now let's run the job to replace the offline segment, which has the name `orders_1632463351000_1632467070000_0`:

```bash theme={null}
docker run \
   --network upserts \
   -v $PWD/config:/config \
   -v $PWD/data:/data \
   apachepinot/pinot:1.0.0 LaunchDataIngestionJob \
  -jobSpecFile /config/job-spec.yml \
  -values segmentName='orders_1632463351000_1632467070000_0'
```

Once we've done this we can go back and query the `orders` table:

```sql theme={null}
select * 
from orders
where order_id = 1 or order_id = 4
```

| amount | customer\_id | order\_id | order\_status | ts            |
| ------ | ------------ | --------- | ------------- | ------------- |
| 29.35  | 104          | 1         | COMPLETED     | 1632463391000 |
| 55.52  | 104          | 4         | CANCELLED     | 1632467070000 |

*Query Results*

We can also confirm that `order_id=4` has been fixed by querying the offline table:

```sql theme={null}
select * 
from orders_OFFLINE
```

| amount | customer\_id | order\_id | order\_status | ts            |
| ------ | ------------ | --------- | ------------- | ------------- |
| 29.35  | 104          | 1         | COMPLETED     | 1632463391000 |
| 3.24   | 105          | 2         | COMPLETED     | 1632467065000 |
| 9.77   | 103          | 3         | OPEN          | 1632467066000 |
| 55.52  | 104          | 4         | CANCELLED     | 1632467070000 |

*Query Results*
