> ## Documentation Index
> Fetch the complete documentation index at: https://docs.startree.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# How to pause and resume ingestion of a stream

Pause and resume ingestion of a stream for the following reasons:

* There’s a problem with the underlying stream, and we need to restart the server, reset offsets, or recreate a topic
* We want to ingest data from different streams into the same table.
* Discover a mistake in the Pinot ingestion configuration, and Pinot is throwing exceptions and you're not able to ingest any more data.

To learn how to use the `pauseConsumption` and `resumeConsumption` APIs, which are used to pause and resume ingestion of a stream. watch the following video, or complete the tutorial below, starting with the Prerequites.

<iframe width="560" height="315" src="https://www.youtube.com/embed/u9CwDpMZRog" title="YouTube video player" frameBorder="0" allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture" allowFullScreen />

For changes to the table configuration, you don't need to pause and resume; use the [`forceCommit API`](/api-reference/table/force-commit-the-current-consuming-segments).

| Pinot Version | 1.0.0                                                                                                                   |
| ------------- | ----------------------------------------------------------------------------------------------------------------------- |
| Code          | [startreedata/pinot-recipes/pause-resume](https://github.com/startreedata/pinot-recipes/tree/main/recipes/pause-resume) |

## Prerequisites

You will need to [install Docker](https://docs.docker.com/get-docker/) to follow the code examples in this guide.

## Navigate to recipe

1. If you haven't already, download recipes.
2. In terminal, go to the recipe by running the following command:

```bash theme={null}
cd pinot-recipes/recipes/pause-resume
```

## Launch Pinot Cluster

You can spin up a Pinot Cluster by running the following command:

```bash theme={null}
docker-compose up
```

This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Kafka, and Zookeeper.
You can find the [docker-compose.yml](https://github.com/startreedata/pinot-recipes/blob/main/recipes/pause-resume/docker-compose.yml) file on GitHub.

## Data generator

This recipe contains a data generator that creates events with a timestamp, count, and UUID.
You can generate data by running the following command:

```bash theme={null}
python datagen.py 2>/dev/null | head -n1 | jq
```

Output is shown below:

```json theme={null}
{
  "tsString": "2022-11-23T12:08:44.127481Z",
  "uuid": "e1c58795-a009-4e21-ae76-cdd66e090797",
  "count": 203
}
```

## Kafka ingestion

We're going to ingest this data into an Apache Kafka topic using the [kcat](https://github.com/edenhill/kcat) command line tool.
We'll also use `jq` to structure the data in the `key:payload` structure that Kafka expects:

```bash theme={null}
python datagen.py --sleep 0.0001 2>/dev/null |
jq -cr --arg sep ø '[.uuid, tostring] | join($sep)' |
kcat -P -b localhost:9092 -t events -Kø
```

We can check that Kafka has some data by running the following command:

```bash theme={null}
docker exec -it kafka-querysegment kafka-run-class.sh \
  kafka.tools.GetOffsetShell \
  --broker-list localhost:9092 \
  --topic events
```

We'll see something like the following:

```bash theme={null}
events:0:19960902
```

## Pinot Schema and Table

Now let's create a Pinot Schema and Table.

First, the schema:

```json theme={null}
{
  "schemaName": "events",
  "dimensionFieldSpecs": [{"name": "uuid", "dataType": "STRING"}],
  "metricFieldSpecs": [{"name": "count", "dataType": "INT"}],
  "dateTimeFieldSpecs": [
    {
      "name": "ts",
      "dataType": "TIMESTAMP",
      "format": "1:MILLISECONDS:EPOCH",
      "granularity": "1:MILLISECONDS"
    }
  ]
}
```

Now for the table config:

```json {14} theme={null}
{
    "tableName": "events",
    "tableType": "REALTIME",
    "segmentsConfig": {
      "timeColumnName": "ts",
      "schemaName": "events",
      "replication": "1",
      "replicasPerPartition": "1"
    },
    "ingestionConfig":{
      "transformConfigs": [
        {
            "columnName": "ts",
            "transformFunction": "FromDateTime(tsString, 'YYYY-MM-dd''T''HH:mm:ss.SS''Z''')"
        }
      ]
    },
    "tableIndexConfig": {
      "loadMode": "MMAP",
      "streamConfigs": {
        "streamType": "kafka",
        "stream.kafka.topic.name": "events",
        "stream.kafka.broker.list": "kafka-pauseresume:9093",
        "stream.kafka.consumer.type": "lowlevel",
        "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
        "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
        "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder"
      }
    },
    "tenants": {},
    "metadata": {}
  }
```

*config/table.json*

This highlighted section highlights a transformation function that has a subtle error.
The second parameter passed to the FromDateTime function describes the format of the DateTime string, which we defined as:

```
YYYY-MM-dd''T''HH:mm:ss.SS''Z''
```

But tsString has values in the following format:

```
2022-11-23T12:08:44.127550Z
```

i.e., we don’t have enough S values - there should be 5 rather than 2.

We'll create the table by running the following:

```bash theme={null}
docker run \
   --network forcecommit \
   -v $PWD/config:/config \
   apachepinot/pinot:1.0.0 AddTable \
     -schemaFile /config/schema.json \
     -tableConfigFile /config/table.json \
     -controllerHost "pinot-controller-forcecommit" \
    -exec
```

If we navigate to the Pinot UI, we'll notice that no records have been ingested.
Instead our server logs are full of the following error:

```text theme={null}
2023/06/02 15:00:31.705 ERROR [LLRealtimeSegmentDataManager_events__0__0__20230602T1459Z] [events__0__0__20230602T1459Z] Caught exception while transforming the record: org.apache.pinot.spi.stream.StreamDataDecoderResult@19717934
java.lang.RuntimeException: Caught exception while evaluation transform function for column: ts
	at org.apache.pinot.segment.local.recordtransformer.ExpressionTransformer.transform(ExpressionTransformer.java:126) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at org.apache.pinot.segment.local.recordtransformer.CompositeTransformer.transform(CompositeTransformer.java:90) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at org.apache.pinot.segment.local.segment.creator.TransformPipeline.processPlainRow(TransformPipeline.java:97) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at org.apache.pinot.segment.local.segment.creator.TransformPipeline.processRow(TransformPipeline.java:92) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.processStreamEvents(LLRealtimeSegmentDataManager.java:559) [pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.consumeLoop(LLRealtimeSegmentDataManager.java:434) [pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager$PartitionConsumer.run(LLRealtimeSegmentDataManager.java:629) [pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.lang.RuntimeException: Caught exception while executing function: fromDateTime(tsString,'YYYY-MM-dd'T'HH:mm:ss.SS'Z'')
	at org.apache.pinot.segment.local.function.InbuiltFunctionEvaluator$FunctionExecutionNode.execute(InbuiltFunctionEvaluator.java:231) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at org.apache.pinot.segment.local.function.InbuiltFunctionEvaluator.evaluate(InbuiltFunctionEvaluator.java:106) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at org.apache.pinot.segment.local.recordtransformer.ExpressionTransformer.transform(ExpressionTransformer.java:123) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	... 7 more
Caused by: java.lang.IllegalStateException: Caught exception while invoking method: public static long org.apache.pinot.common.function.scalar.DateTimeFunctions.fromDateTime(java.lang.String,java.lang.String) with arguments: [2023-06-02T15:59:58.186685Z, YYYY-MM-dd'T'HH:mm:ss.SS'Z']
	at org.apache.pinot.common.function.FunctionInvoker.invoke(FunctionInvoker.java:130) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at org.apache.pinot.segment.local.function.InbuiltFunctionEvaluator$FunctionExecutionNode.execute(InbuiltFunctionEvaluator.java:229) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at org.apache.pinot.segment.local.function.InbuiltFunctionEvaluator.evaluate(InbuiltFunctionEvaluator.java:106) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at org.apache.pinot.segment.local.recordtransformer.ExpressionTransformer.transform(ExpressionTransformer.java:123) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	... 7 more
Caused by: java.lang.reflect.InvocationTargetException
	at jdk.internal.reflect.GeneratedMethodAccessor18.invoke(Unknown Source) ~[?:?]
	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
	at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
	at org.apache.pinot.common.function.FunctionInvoker.invoke(FunctionInvoker.java:127) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at org.apache.pinot.segment.local.function.InbuiltFunctionEvaluator$FunctionExecutionNode.execute(InbuiltFunctionEvaluator.java:229) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at org.apache.pinot.segment.local.function.InbuiltFunctionEvaluator.evaluate(InbuiltFunctionEvaluator.java:106) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at org.apache.pinot.segment.local.recordtransformer.ExpressionTransformer.transform(ExpressionTransformer.java:123) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	... 7 more
Caused by: java.lang.IllegalArgumentException: Invalid format: "2023-06-02T15:59:58.186685Z" is malformed at "6685Z"
	at org.joda.time.format.DateTimeParserBucket.doParseMillis(DateTimeParserBucket.java:187) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:826) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at org.apache.pinot.common.function.DateTimePatternHandler.parseDateTimeStringToEpochMillis(DateTimePatternHandler.java:38) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at org.apache.pinot.common.function.scalar.DateTimeFunctions.fromDateTime(DateTimeFunctions.java:271) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at jdk.internal.reflect.GeneratedMethodAccessor18.invoke(Unknown Source) ~[?:?]
	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
	at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
	at org.apache.pinot.common.function.FunctionInvoker.invoke(FunctionInvoker.java:127) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at org.apache.pinot.segment.local.function.InbuiltFunctionEvaluator$FunctionExecutionNode.execute(InbuiltFunctionEvaluator.java:229) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at org.apache.pinot.segment.local.function.InbuiltFunctionEvaluator.evaluate(InbuiltFunctionEvaluator.java:106) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	at org.apache.pinot.segment.local.recordtransformer.ExpressionTransformer.transform(ExpressionTransformer.java:123) ~[pinot-all-0.12.0-jar-with-dependencies.jar:0.12.0-118f5e065cb258c171d97a586183759fbc61e2bf]
	... 7 more
```

## The Pause/Resume workflow

We're going to run the following workflow to sort this out:

* Pause ingestion for the table
* Fix the transformation function
* Resume ingestion

Pause consumption by running the following command:

```bash theme={null}
curl -X POST \
  "http://localhost:9000/tables/events/pauseConsumption" \
  -H "accept: application/json"
```

Then we need to fix the config, with the following one:

```json {14} theme={null}
{
    "tableName": "events",
    "tableType": "REALTIME",
    "segmentsConfig": {
      "timeColumnName": "ts",
      "schemaName": "events",
      "replication": "1",
      "replicasPerPartition": "1"
    },
    "ingestionConfig":{
      "transformConfigs": [
        {
            "columnName": "ts",
            "transformFunction": "FromDateTime(tsString, 'YYYY-MM-dd''T''HH:mm:ss.SSSSSS''Z''')"
        }
      ]
    },
    "tableIndexConfig": {
      "loadMode": "MMAP",
      "streamConfigs": {
        "streamType": "kafka",
        "stream.kafka.topic.name": "events",
        "stream.kafka.broker.list": "kafka-pauseresume:9093",
        "stream.kafka.consumer.type": "lowlevel",
        "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
        "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
        "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder"
      }
    },
    "tenants": {},
    "metadata": {}
  }
```

*config/table.json*

Let's apply that:

```bash theme={null}
docker run \
   --network pauseresume \
   -v $PWD/config:/config \
   apachepinot/pinot:1.0.0 AddTable \
     -schemaFile /config/schema.json \
     -tableConfigFile /config/table-fixed.json \
     -controllerHost "pinot-controller-pauseresume" \
    -exec -update
```

And resume consumption:

```bash theme={null}
curl -X POST \
  "http://localhost:9000/tables/events/resumeConsumption?consumeFrom=smallest" \
  -H "accept: application/json"
```

If we navigate back to the Pinot UI, it will now be ingesting data. And we have no more errors in our server logs either.
