> ## Documentation Index
> Fetch the complete documentation index at: https://docs.startree.ai/llms.txt
> Use this file to discover all available pages before exploring further.

> In this recipe, we'll learn how to ingest JSON documents from Apache Kafka into Apache Pinot.

# Ingesting JSON files from Kafka

To learn how to ingest JSON files from Apache Kafka into Pinot, watch the following video, or complete the tutorial below.

<iframe width="560" height="315" src="https://www.youtube.com/embed/OCV13UMQrgA" title="YouTube video player" frameBorder="0" allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture" allowFullScreen />

[*Mark Needham shows how to ingest JSON files*](https://twitter.com/markhneedham)

If you have complex JSON documents with a nested structure, see how to ingest [complex JSON documents with a nested structure from Kafka into Pinot](https://www.youtube.com/watch?v=Jc03u8rXc2w).

| Pinot Version | 0.10.0                                                                                                                                        |
| ------------- | --------------------------------------------------------------------------------------------------------------------------------------------- |
| Code          | [startreedata/pinot-recipes/ingest-json-files-kafka](https://github.com/startreedata/pinot-recipes/tree/main/recipes/ingest-json-files-kafka) |

## Prerequisites

To follow the code examples in this guide, you must [install Docker](https://docs.docker.com/get-docker/) locally and download recipes.

## Navigate to recipe

1. If you haven't already, download recipes.
2. In terminal, go to the recipe by running the following command:

```bash theme={null}
cd pinot-recipes/recipes/ingest-json-files-kafka
```

## Launch Pinot Cluster

You can spin up a Pinot Cluster by running the following command:

```bash theme={null}
docker-compose up
```

This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Kafka, and Zookeeper. You can find the [docker-compose.yml](https://github.com/startreedata/pinot-recipes/blob/main/recipes/ingest-json-files-kafka/docker-compose.yml) file on GitHub.

## Dataset

We're going to import the following JSON files:

```json theme={null}
{"title": "Valentine's Day", "genre": "Comedy", "year": 2010, "releaseDate": "2010-02-12 00:00:00", "budget": 52000000, "boxOffice": 216500000}
{"title": "The Ugly Truth", "genre": "Comedy", "year": 2009, "releaseDate": "2010-04-14 00:00:00", "budget": 38000000, "boxOffice": 205300000}
{"title": "P.S. I Love You", "genre": "Romance", "year": 2007, "releaseDate": "2007-12-21 00:00:00", "budget": 30000000, "boxOffice": 156800000}
{"title": "Dear John", "genre": "Drama", "year": 2010, "releaseDate": "2010-04-14 00:00:00", "budget": 25000000, "boxOffice": 115000000}
{"title": "The Curious Case of Benjamin Button", "genre": "Fantasy", "year": 2008, "releaseDate": "2008-12-25 00:00:00", "budget": 167000000, "boxOffice": 335800000}
```

*data/import1.jsonl*

```json theme={null}
{"title": "Pirates of the Caribbean: Salazar's Revenge", "genre": "Action", "year": 2017, "releaseDate": "2017-05-26 00:00:00", "budget": 230000000, "boxOffice": 794881442}
{"title": "The Hunger Games", "genre": "Action", "year": 2012, "releaseDate": "2012-03-23 00:00:00", "budget": 78000000, "boxOffice": 694394724}
{"title": "Pride & Prejudice", "genre": "Romance", "year": 2005, "releaseDate": "2005-09-16 00:00:00", "budget": 28000000, "boxOffice": 121616555}
```

*data/import2.jsonl*

## Pinot Schema and Table

Now let's create a Pinot Schema and Table.

First, the schema:

```json theme={null}
{
  "schemaName": "movies",
  "dimensionFieldSpecs": [
    {
      "name": "title",
      "dataType": "STRING"
    },
    {
      "name": "genre",
      "dataType": "STRING"
    },
    {
      "name": "year",
      "dataType": "INT"
    }
  ],
  "metricFieldSpecs": [
    {
      "name": "budget",
      "dataType": "INT"
    },
    {
      "name": "boxOffice",
      "dataType": "INT"
    }
  ],
  "dateTimeFieldSpecs": [
    {
      "name": "releaseDate",
      "dataType": "TIMESTAMP",
      "format": "1:MILLISECONDS:EPOCH",
      "granularity": "1:MILLISECONDS"
    }
  ]
}
```

*config/schema.json*

We'll also have the following table config:

```json {13-16,19} theme={null}
{
  "tableName": "movies",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "timeColumnName": "releaseDate",
    "timeType": "MILLISECONDS",
    "schemaName": "movies",
    "replicasPerPartition": "1"
  },
  "tenants": {},
  "tableIndexConfig": {
    "streamConfigs": {
      "streamType": "kafka",
      "stream.kafka.broker.list": "kafka-json:9093",
      "stream.kafka.consumer.type": "lowLevel",
      "stream.kafka.topic.name": "events",
      "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
      "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
      "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
    },
    "loadMode": "MMAP"
  },
  "task": {
    "taskTypeConfigsMap": {}
  },
  "metadata": {
    "customConfigs": {}
  }
}
```

*config/table.json*

We need to tell Pinot where our Kafka cluster lives as well as the topic that we wish to pull events from. Finally, we need to specify an offset value, which indicates where Pinot should start pulling data in each topic partition. A value of `smallest` means it will start from the earliest offset.
A value of `largest` means it will start from the latest offset.

You can create the table and schema by running the following command:\`

```bash theme={null}
docker exec -it pinot-controller-json bin/pinot-admin.sh AddTable   \
  -tableConfigFile /config/table.json   \
  -schemaFile /config/schema.json \
  -exec
```

## Importing data

Now we're going to import the JSON files into Kafka:

```bash theme={null}
docker exec -i kafka-json kafka-console-producer.sh \
  --bootstrap-server kafka-json:9092 \
  --topic events < data/import1.jsonl
```

```bash theme={null}
docker exec -i kafka-json kafka-console-producer.sh \
  --bootstrap-server kafka-json:9092 \
  --topic events < data/import2.jsonl
```

## Querying

Once that's completed, navigate to [localhost:9000/#/query](http://localhost:9000/#/query) and click on the `movies` table or copy/paste the following query:

```sql theme={null}
select *
from movies
limit 10
```

You will see the following output:

| boxOffice | budget    | genre   | releaseDate           | title                                       | year |
| --------- | --------- | ------- | --------------------- | ------------------------------------------- | ---- |
| 216500000 | 52000000  | Comedy  | 2010-02-12 00:00:00.0 | Valentine's Day                             | 2010 |
| 205300000 | 38000000  | Comedy  | 2010-04-14 00:00:00.0 | The Ugly Truth                              | 2009 |
| 156800000 | 30000000  | Romance | 2007-12-21 00:00:00.0 | P.S. I Love You                             | 2007 |
| 115000000 | 25000000  | Drama   | 2010-04-14 00:00:00.0 | Dear John                                   | 2010 |
| 335800000 | 167000000 | Fantasy | 2008-12-25 00:00:00.0 | The Curious Case of Benjamin Button         | 2008 |
| 794881442 | 230000000 | Action  | 2017-05-26 00:00:00.0 | Pirates of the Caribbean: Salazar's Revenge | 2017 |
| 694394724 | 78000000  | Action  | 2012-03-23 00:00:00.0 | The Hunger Games                            | 2012 |
| 121616555 | 28000000  | Romance | 2005-09-16 00:00:00.0 | Pride & Prejudice                           | 2005 |

*Query Results*
