> ## Documentation Index
> Fetch the complete documentation index at: https://docs.startree.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Merge Real-Time Segments in Real-Time Tables

To improve disk storage and query performance, we recommend merging segments in real-time Pinot tables using the [Minion merge rollup task](https://docs.pinot.apache.org/operators/operating-pinot/minion-merge-rollup-task).

To learn how to merge segments in real-time Pinot tables, watch the following video, or complete the tutorial below.

<iframe width="560" height="315" src="https://www.youtube.com/embed/6Ck1d5iswm4" title="YouTube video player" frameBorder="0" allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture" allowFullScreen />

If your use case supports aggregating data, you may also want to rollup segments in real-time tables. For more information, check out the following video.

<iframe width="560" height="315" src="https://www.youtube.com/embed/jvP64G4kQAo" title="YouTube video player" frameBorder="0" allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture" allowFullScreen />

| Pinot Version | 1.0.0                                                                                                                                                     |
| ------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Code          | [startreedata/pinot-recipes/merge-small-segments-realtime](https://github.com/startreedata/pinot-recipes/tree/main/recipes/merge-small-segments-realtime) |

<Info>
  You can also merge segments in offline tables. For more information see the [merge small segments in offline tables guide](/recipes/merge-small-segments)
</Info>

## Prerequisites

To follow the code examples in this guide, you must [install Docker](https://docs.docker.com/get-docker/) locally and download recipes.

## Navigate to recipe

1. If you haven't already, download recipes.
2. In terminal, go to the recipe by running the following command:

```bash theme={null}
cd pinot-recipes/recipes/merge-small-segments-realtime
```

## Launch Pinot Cluster

You can spin up a Pinot Cluster by running the following command:

```bash theme={null}
docker-compose up
```

This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, and Zookeeper. You can find the [docker-compose.yml](https://github.com/startreedata/pinot-recipes/blob/main/recipes/ingest-json-files/docker-compose.yml) file on GitHub.

## Controller configuration

The Pinot controller is launched with the following custom configuration:

```properties {8-9} theme={null}
controller.access.protocols.http.port=9000
controller.zk.str=zookeeper-mergesegments:2181
controller.helix.cluster.name=PinotCluster
controller.host=pinot-controller-mergesegments
controller.port=9000
controller.data.dir=/data

controller.task.scheduler.enabled=true
controller.task.frequencyPeriod=5m
```

*controller-conf.conf*

This configuration is needed to enabled Pinot's task scheduler, which we'll use to automatically merge segments.

<Note>
  We've configured the task scheduler to run every 5 minutes, but you'd set that to an hour or more in a production system.
</Note>

## Dataset

We've got the following data generator that generates data in bursts depending on the minute of the hour:

```python theme={null}
import datetime as dt
import uuid
import random
import json
import time

while True:
    now = dt.datetime.now()
    ts = int(now.timestamp() * 1000)
    id = str(uuid.uuid4())
    count = random.randint(0, 1000)
    print(
        json.dumps({"ts": ts, "uuid": id, "count": count})
    )

    sleep_time = 0.01 if now.minute % 2 == 0 else 0.00005
    time.sleep(sleep_time)
```

*datagen.py*

If the minute is divisible by 2, we generate around 20,000 messages per second and if it isn't we generate around 100 messages per second.

An example of a message produced by this script is shown below:

```json theme={null}
{
  "ts": 1680260357275,
  "uuid": "0d4c5cc7-cdc7-42d0-924d-27babc18ad94",
  "count": 126
}
```

We can write that data into Kafka by running the following command:

```bash theme={null}
python datagen.py  2>/dev/null |
jq -cr --arg sep ø '[.uuid, tostring] | join($sep)' |
kcat -P -b localhost:9092 -t events -Kø
```

## Pinot Schema and Table

Now let's create a Pinot Schema and Table.

First, the schema:

```json theme={null}
{
  "schemaName": "events",
  "dimensionFieldSpecs": [{"name": "uuid", "dataType": "STRING"}],
  "metricFieldSpecs": [{"name": "count", "dataType": "INT"}],
  "dateTimeFieldSpecs": [
    {
      "name": "ts",
      "dataType": "TIMESTAMP",
      "format": "1:MILLISECONDS:EPOCH",
      "granularity": "1:MILLISECONDS"
    }
  ]
}
```

*config/schema.json*

We'll also have the following table config:

```json {20-21,25-28,36-40} theme={null}
{
  "tableName": "events",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "timeColumnName": "ts",
    "schemaName": "events",
    "replication": "1",
    "replicasPerPartition": "1"
  },
  "tableIndexConfig": {
    "loadMode": "MMAP",
    "streamConfigs": {
      "streamType": "kafka",
      "stream.kafka.topic.name": "events",
      "stream.kafka.broker.list": "kafka-mergesegments:9093",
      "stream.kafka.consumer.type": "lowlevel",
      "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
      "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
      "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
      "realtime.segment.flush.threshold.rows":"1000000",
      "realtime.segment.flush.threshold.time":"1m"
    }
  },
  "ingestionConfig": {
    "batchIngestionConfig": {
      "segmentIngestionType": "APPEND",
      "segmentIngestionFrequency": "DAILY"
    },
    "transformConfigs": []
  },
  "tenants": {},
  "metadata": {},
  
  "task": {
    "taskTypeConfigsMap": {
      "MergeRollupTask": {
        "5m_2m.mergeType": "concat",
        "5m_2m.bucketTimePeriod": "5m",
        "5m_2m.bufferTimePeriod": "2m"
      }
    }
  }
  
}
```

*config/table.json*

There are a few bits of config that we're interested in.

First up is segment threshold, which is defined like so:

```json theme={null}
"realtime.segment.flush.threshold.rows":"1000000",
"realtime.segment.flush.threshold.time":"1m"
```

This means that a segment will be committed once it contains 1m records or every minute, whichever comes first. For more on configuring the segment threshold, see the [segment threshold guide](/recipes/configuring-segment-threshold).

Next is the `batchIngestionConfig`:

```json theme={null}
"batchIngestionConfig": {
  "segmentIngestionType": "APPEND",
  "segmentIngestionFrequency": "DAILY"
}
```

<Warning>
  This shouldn't be strictly necessary because it's a config that's usually used for offline tables, but the merge/roll-up logic in the current version (0.12.1) relies on it being there. The reliance on this config existing has [already been fixed](https://github.com/apache/pinot/commit/dadd42cff16cb8f4b7ec510ad73455a198b60e37) in the main branch and therefore this config won't be required when 0.13.0 is released.
</Warning>

We're also interested in the `MergeRollupTask`, which is extracted below:

```json theme={null}
"task": {
  "taskTypeConfigsMap": {
    "MergeRollupTask": {
      "5m_2m.mergeType": "concat",
      "5m_2m.bucketTimePeriod": "5m",
      "5m_2m.bufferTimePeriod": "2m"
    }
  }
}
```

This configuration will bucket records from the same 5 minute period and will only process records with a timestamp from more than 2 minutes ago.

<Note>
  We are intentionally using very small values for the `bucketTimePeriod` and `bufferTimePeriod` for the purposes of this example. You'll want to use larger values for production systems.
</Note>

You can create the table and schema by running the following command:

```bash theme={null}
docker run \
   --network mergesegments \
   -v $PWD/config:/config \
   apachepinot/pinot:1.0.0 AddTable \
     -schemaFile /config/schema.json \
     -tableConfigFile /config/table.json \
     -controllerHost "pinot-controller-mergesegments" \
    -exec
```

Remove the `-arm64` suffix if you're not using a Mac M1/M2.

## Viewing segments

We can navigate to the [Pinot UI](http://localhost:9000) and run the following query to see the segments that have been created and the number of records that they contain:

```sql theme={null}
select $segmentName, count(*), 
       ToDateTime(min(ts), 'YYYY-MM-dd HH:mm:ss') AS minDate, 
       ToDateTime(max(ts), 'YYYY-MM-dd HH:mm:ss') AS maxDate
from events 
group by $segmentName
order by max(ts) DESC
limit 50
```

We'll see something like the following output:

| segmentName                        | count(\*) | minDate             | maxDate             |
| ---------------------------------- | --------- | ------------------- | ------------------- |
| events\_\_0\_\_6\_\_20230331T1122Z | 15137     | 2023-03-31 11:21:58 | 2023-03-31 11:22:14 |
| events\_\_0\_\_5\_\_20230331T1120Z | 778623    | 2023-03-31 11:20:54 | 2023-03-31 11:21:58 |
| events\_\_0\_\_4\_\_20230331T1119Z | 110868    | 2023-03-31 11:19:51 | 2023-03-31 11:20:54 |
| events\_\_0\_\_3\_\_20230331T1118Z | 691993    | 2023-03-31 11:18:46 | 2023-03-31 11:19:51 |
| events\_\_0\_\_2\_\_20230331T1117Z | 182076    | 2023-03-31 11:17:46 | 2023-03-31 11:18:46 |
| events\_\_0\_\_1\_\_20230331T1116Z | 602990    | 2023-03-31 11:16:40 | 2023-03-31 11:17:46 |
| events\_\_0\_\_0\_\_20230331T1115Z | 118298    | 2023-03-31 11:15:51 | 2023-03-31 11:16:40 |

*Query Results*

## Merge segments

The job to merge segments runs every 5 minutes, so if we wait a little while it will eventually start.

<Info>
  If you want to manually trigger the merge segments job, see the [merge segments section](/recipes/merge-small-segments) of the merge small segments guide.
</Info>

We can check the Pinot Controller logs to see that it's been triggered:

```bash theme={null}
docker exec -it pinot-controller-mergesegments \
  grep -ri --color "\[MergeRollupTaskGenerator" logs/pinot-all.log
```

<div class="text-output">
  **Output**

  ```text theme={null}
  2023/03/31 11:19:15.651 INFO [MergeRollupTaskGenerator] [pool-10-thread-5] Start generating task configs for table: events_REALTIME for task: MergeRollupTask
  2023/03/31 11:19:15.658 INFO [MergeRollupTaskGenerator] [pool-10-thread-5] Creating the gauge metric for tracking the merge/roll-up task delay for table: events_REALTIME and mergeLevel: 5m_2m.(watermarkMs=1680261300000, bufferTimeMs=120000, bucketTimeMs=300000, taskDelayInNumTimeBuckets=0)
  2023/03/31 11:19:15.658 INFO [MergeRollupTaskGenerator] [pool-10-thread-5] Bucket with start: 1680261300000 and end: 1680261600000 (table : events_REALTIME, mergeLevel : 5m_2m) cannot be merged yet
  2023/03/31 11:19:15.663 INFO [MergeRollupTaskGenerator] [pool-10-thread-5] Finished generating task configs for table: events_REALTIME for task: MergeRollupTask, numTasks: 0
  2023/03/31 11:24:15.672 INFO [MergeRollupTaskGenerator] [pool-10-thread-3] Start generating task configs for table: events_REALTIME for task: MergeRollupTask
  2023/03/31 11:24:15.682 INFO [MergeRollupTaskGenerator] [pool-10-thread-3] Update watermark for table: events_REALTIME, mergeLevel: 5m_2m from: 1680261300000 to: 1680261300000
  2023/03/31 11:24:15.696 INFO [MergeRollupTaskGenerator] [pool-10-thread-3] Finished generating task configs for table: events_REALTIME for task: MergeRollupTask, numTasks: 1
  ```
</div>

And we can check the Pinot Minion logs to see if the job has run:

```bash theme={null}
docker exec -it pinot-minion-mergesegments \
  grep -ri --color "INFO \[MergeRollup" logs/pinot-all.log
```

<div class="text-output">
  **Output**

  ```text theme={null}
  2023/03/31 11:24:16.718 INFO [MergeRollupTaskExecutor] [TaskStateModelFactory-task_thread-0] Starting task: MergeRollupTask with configs: {maxNumRecordsPerSegment=null, partitionBucketTimePeriod=5m, segmentNamePrefix=merged_5m_2m_1680261855683_0_events, authToken=null, downloadURL=http://pinot-controller-mergesegments:9000/segments/events/events__0__0__20230331T1115Z,http://pinot-controller-mergesegments:9000/segments/events/events__0__1__20230331T1116Z,http://pinot-controller-mergesegments:9000/segments/events/events__0__2__20230331T1117Z,http://pinot-controller-mergesegments:9000/segments/events/events__0__3__20230331T1118Z,http://pinot-controller-mergesegments:9000/segments/events/events__0__4__20230331T1119Z, mergeType=concat, mergeLevel=5m_2m, 5m_2m.bufferTimePeriod=2m, 5m_2m.bucketTimePeriod=5m, push.mode=TAR, segmentName=events__0__0__20230331T1115Z,events__0__1__20230331T1116Z,events__0__2__20230331T1117Z,events__0__3__20230331T1118Z,events__0__4__20230331T1119Z, tableName=events_REALTIME, uploadURL=http://pinot-controller-mergesegments:9000/segments, enableReplaceSegments=true, push.controllerUri=http://pinot-controller-mergesegments:9000, roundBucketTimePeriod=null, 5m_2m.mergeType=concat, TASK_ID=Task_MergeRollupTask_cd467be0-472d-44a9-99d3-6e6fcdf69d54_1680261855713_0, overwriteOutput=false}
  2023/03/31 11:24:22.497 INFO [MergeRollupTaskExecutor] [TaskStateModelFactory-task_thread-0] Finished task: MergeRollupTask with configs: {maxNumRecordsPerSegment=null, partitionBucketTimePeriod=5m, segmentNamePrefix=merged_5m_2m_1680261855683_0_events, authToken=null, downloadURL=http://pinot-controller-mergesegments:9000/segments/events/events__0__0__20230331T1115Z,http://pinot-controller-mergesegments:9000/segments/events/events__0__1__20230331T1116Z,http://pinot-controller-mergesegments:9000/segments/events/events__0__2__20230331T1117Z,http://pinot-controller-mergesegments:9000/segments/events/events__0__3__20230331T1118Z,http://pinot-controller-mergesegments:9000/segments/events/events__0__4__20230331T1119Z, mergeType=concat, mergeLevel=5m_2m, 5m_2m.bufferTimePeriod=2m, 5m_2m.bucketTimePeriod=5m, push.mode=TAR, segmentName=events__0__0__20230331T1115Z,events__0__1__20230331T1116Z,events__0__2__20230331T1117Z,events__0__3__20230331T1118Z,events__0__4__20230331T1119Z, tableName=events_REALTIME, uploadURL=http://pinot-controller-mergesegments:9000/segments, enableReplaceSegments=true, push.controllerUri=http://pinot-controller-mergesegments:9000, roundBucketTimePeriod=null, 5m_2m.mergeType=concat, TASK_ID=Task_MergeRollupTask_cd467be0-472d-44a9-99d3-6e6fcdf69d54_1680261855713_0, overwriteOutput=false}. Total time: 5779ms
  ```
</div>

Let's now run the segments query again:

| segmentName                                                                 | count(\*) | minDate             | maxDate             |
| --------------------------------------------------------------------------- | --------- | ------------------- | ------------------- |
| events\_\_0\_\_8\_\_20230331T1124Z                                          | 109890    | 2023-03-31 11:23:59 | 2023-03-31 11:25:08 |
| events\_\_0\_\_7\_\_20230331T1123Z                                          | 743572    | 2023-03-31 11:23:04 | 2023-03-31 11:23:59 |
| events\_\_0\_\_6\_\_20230331T1122Z                                          | 72745     | 2023-03-31 11:21:58 | 2023-03-31 11:23:04 |
| events\_\_0\_\_5\_\_20230331T1120Z                                          | 778623    | 2023-03-31 11:20:54 | 2023-03-31 11:21:58 |
| merged\_5m\_2m\_1680261855683\_0  \_events\_1680261600000\_1680261654662\_1 | 4567      | 2023-03-31 11:20:00 | 2023-03-31 11:20:54 |
| merged\_5m\_2m\_1680261855683\_0  \_events\_1680261351086\_1680261599999\_0 | 1701658   | 2023-03-31 11:15:51 | 2023-03-31 11:19:59 |

We can see that the merge job has rolled up segments 0-4 into two larger segments, containing data from 5 minute windows.

If we wait a few more minutes, the next time it runs it will roll up the rest of the data for the 11:20 to 11:25 window, as well as some of the next window.
We can see the output of the next few runs below:

| segmentName                                                                 | count(\*) | minDate             | maxDate             |
| --------------------------------------------------------------------------- | --------- | ------------------- | ------------------- |
| events\_\_0\_\_18\_\_20230331T1134Z                                         | 124044    | 2023-03-31 11:34:34 | 2023-03-31 11:35:09 |
| events\_\_0\_\_17\_\_20230331T1133Z                                         | 343966    | 2023-03-31 11:33:33 | 2023-03-31 11:34:34 |
| events\_\_0\_\_16\_\_20230331T1132Z                                         | 443483    | 2023-03-31 11:32:26 | 2023-03-31 11:33:33 |
| events\_\_0\_\_15\_\_20230331T1131Z                                         | 439137    | 2023-03-31 11:31:26 | 2023-03-31 11:32:26 |
| events\_\_0\_\_14\_\_20230331T1130Z                                         | 355319    | 2023-03-31 11:30:19 | 2023-03-31 11:31:26 |
| merged\_5m\_2m\_1680262455984\_0  \_events\_1680262200000\_1680262219170\_1 | 1591      | 2023-03-31 11:30:00 | 2023-03-31 11:30:19 |
| merged\_5m\_2m\_1680262455984\_0  \_events\_1680261900010\_1680262199999\_0 | 2368619   | 2023-03-31 11:25:00 | 2023-03-31 11:29:59 |
| merged\_5m\_2m\_1680262155835\_0  \_events\_1680261600000\_1680261899994\_0 | 1604525   | 2023-03-31 11:20:00 | 2023-03-31 11:24:59 |
| merged\_5m\_2m\_1680261855683\_0  \_events\_1680261351086\_1680261599999\_0 | 1701658   | 2023-03-31 11:15:51 | 2023-03-31 11:19:59 |
