> ## Documentation Index
> Fetch the complete documentation index at: https://docs.startree.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Ingesting Avro Messages

In this recipe we'll learn how to ingest Avro messages from Apache Kafka into Apache Pinot. The Avro schema will be stored in the Confluent Schema Registry, so we'll learn how to integrate with that as well.

Watch the following video about ingesting Avro encoded messages into Apache Pinot, or follow the tutorial below.

<iframe width="560" height="315" src="https://www.youtube.com/embed/u2DdOZDKnQw" title="YouTube video player" frameBorder="0" allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture" allowFullScreen />

| Pinot Version | 1.0.0                                                                                                                 |
| ------------- | --------------------------------------------------------------------------------------------------------------------- |
| Code          | [startreedata/pinot-recipes/ingest-avro](https://github.com/startreedata/pinot-recipes/tree/main/recipes/ingest-avro) |

## Prerequisites

To follow the code examples in this guide, you must [install Docker](https://docs.docker.com/get-docker/) locally and download recipes.

## Navigate to recipe

1. If you haven't already, download recipes.
2. In terminal, go to the recipe by running the following command:

```bash theme={null}
cd pinot-recipes/recipes/ingest-avro
```

## Launch Pinot Cluster

You can spin up a Pinot Cluster by running the following command:

```bash theme={null}
docker-compose up
```

This command will run a single instance of the Pinot Controller, Pinot Server, Pinot Broker, Kafka, Zookeeper, and the Confluent Schema Registry.
You can find the [docker-compose.yml](https://github.com/startreedata/pinot-recipes/blob/main/recipes/ingest-avro/docker-compose.yml) file on GitHub.

## Data Generator

This recipe contains a data generator that creates events with data about people.

It uses the [Faker library](https://faker.readthedocs.io/en/master/), so you'll first need to install that:

```bash theme={null}
pip install faker
```

You can generate data by running the following command:

```bash theme={null}
python datagen.py 2>/dev/null  | head -n1 | jq
```

Output is shown below:

```json theme={null}
{
  "ts": 1680614486181,
  "person": {
    "id": "43e44826-44f5-4474-9a12-6740bfc0e7d8",
    "name": "Elizabeth Morales",
    "email": "qmyers@example.com",
    "age": 40,
    "address": {
      "street_address": "2555 Garza Trafficway",
      "city": "Denisefurt",
      "state": "Nebraska",
      "country": "Mauritania"
    },
    "phone_number": "001-901-538-1610x7283",
    "job": {
      "company": "Reynolds, Hansen and Alexander",
      "position": "Psychotherapist, dance movement",
      "department": "harness robust eyeballs"
    },
    "interests": [
      "Music"
    ],
    "friend_ids": [
      "34ba8e1f-4d94-4624-9920-8c32bb2e0e2e"
    ]
  }
}
```

## Avro Schema

The Avro schema for our messages is described below:

```json theme={null}
{
  "type": "record",
  "name": "PersonMessage",
  "namespace": "ai.startree",
  "fields": [
    {"name": "ts", "type": "long"},
    {
      "name": "person",
      "type": {
        "type": "record",
        "name": "Person",
        "fields": [
          {"name": "id", "type": "string"},
          {"name": "name", "type": "string"},
          {"name": "email", "type": "string"},
          {"name": "age", "type": "int"},
          {
            "name": "address",
            "type": {
              "type": "record",
              "name": "Address",
              "fields": [
                {"name": "street_address", "type": "string"},
                {"name": "city", "type": "string"},
                {"name": "state", "type": "string"},
                {"name": "country", "type": "string"}
              ]
            }
          },
          {"name": "phone_number", "type": "string"},
          {
            "name": "job",
            "type": {
              "type": "record",
              "name": "Job",
              "fields": [
                {"name": "company", "type": "string"},
                {"name": "position", "type": "string"},
                {"name": "department", "type": "string"}
              ]
            }
          },
          {"name": "interests", "type": [{"type": "array", "items": "string"}, "null"]},
          {"name": "friend_ids", "type": [{"type": "array", "items": "string"}, "null"]}
        ]
      }
    }
  ]
}
```

*avro/person-topic-value.avsc*

## Kafka ingestion

We're going to ingest the stream of people into Kafka using the Kafka Python client.
We'll need to install the following libraries:

```bash theme={null}
pip install avro confluent-kafka
```

We're going to stream the messages produced by `datagen.py` into the following script:

```python theme={null}
import json
import sys
from confluent_kafka.avro import AvroProducer
from confluent_kafka import avro

def kafka_producer(schema_name):
    producer_config = {
        'bootstrap.servers': 'localhost:9092',
        'schema.registry.url': 'http://localhost:8081', 
        'broker.address.family': 'v4'
    }

    value_schema = avro.load(f"avro/{schema_name}-value.avsc")
    producer = AvroProducer(producer_config, default_value_schema=value_schema)
    return schema_name, producer

if __name__ == "__main__":
    topic_name, producer = kafka_producer(schema_name="person-topic")

    while True:
        line = sys.stdin.readline().rstrip('\n')
        if not line:
            break
        event = json.loads(line)
        producer.produce(topic = topic_name, value = event)
        producer.flush()
```

*kafkaproducer.py*

This script first creates a Kafka producer that knows about the Avro schema and schema registry.
It then infinitely ingests an infinite stream from `stdin` and writes Avro messages to Kafka.

We can combine the data generator script with this one by running the following code:

```bash theme={null}
python datagen.py | python kafkaproducer.py
```

Once we've done that, let's check the messages are being ingested into the `person-topic` using kcat:

```bash theme={null}
kcat -C -b localhost:9092 -t person-topic \
  -r localhost:8081 -s value=avro \
  -o end -c1 | 
jq
```

The output is shown below:

```json theme={null}
{
  "ts": 1680615014367,
  "person": {
    "id": "04da5a02-4984-46d7-83e4-1e769235e2ab",
    "name": "Wayne Jacobs",
    "email": "austinpittman@example.org",
    "age": 22,
    "address": {
      "street_address": "571 Shaun Corner",
      "city": "Port Marcfort",
      "state": "Florida",
      "country": "Argentina"
    },
    "phone_number": "595.121.1530x308",
    "job": {
      "company": "Gentry-Castillo",
      "position": "Broadcast journalist",
      "department": "innovate enterprise channels"
    },
    "interests": null,
    "friend_ids": null
  }
}
```

Next, we're going to ingest the stream into Pinot.

## Pinot schema and table

Our Pinot schema is shown below:

```json theme={null}
{
    "schemaName": "people",
    "dimensionFieldSpecs": [
      {"name": "person.id", "dataType": "STRING"},
      {"name": "person.name", "dataType": "STRING"},
      {"name": "person.email", "dataType": "STRING"},
      {"name": "person.age", "dataType": "INT"},
      {"name": "person.address.street_address", "dataType": "STRING"},
      {"name": "person.address.city", "dataType": "STRING"},
      {"name": "person.address.state", "dataType": "STRING"},
      {"name": "person.address.country", "dataType": "STRING"},
      {"name": "person.phone_number", "dataType": "STRING"},
      {"name": "person.job.company", "dataType": "STRING"},
      {"name": "person.job.position", "dataType": "STRING"},
      {"name": "person.job.department", "dataType": "STRING"},
      {"name": "person.interests", "dataType": "STRING", "singleValueField": false},
      {"name": "person.friend_ids", "dataType": "STRING", "singleValueField": false}
    ],
    "metricFieldSpecs": [
    ],
    "dateTimeFieldSpecs": [
      {"name": "ts", "dataType": "TIMESTAMP", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:MILLISECONDS"}
    ]
  }
```

*schema.json*

And the table config is below:

```json {18-21} theme={null}
{
    "tableName": "people",
    "tableType": "REALTIME",
    "segmentsConfig": {
      "timeColumnName": "ts",
      "timeType": "MILLISECONDS",
      "schemaName": "people",
      "replicasPerPartition": "1"
    },
    "tenants": {},
    "tableIndexConfig": {
      "loadMode": "MMAP",
      "streamConfigs": {
        "streamType": "kafka",
        "stream.kafka.consumer.type": "lowLevel",
        "stream.kafka.topic.name": "person-topic",
        "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
        "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",        
        "stream.kafka.decoder.prop.format": "AVRO",
        "stream.kafka.decoder.prop.schema.registry.rest.url": "http://schema-registry:8081",
        "stream.kafka.decoder.prop.schema.registry.schema.name": "person-topic-value",
        "stream.kafka.broker.list": "kafka:9093",
        "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
      }
    },
    "metadata": {
      "customConfigs": {}
    },
    "ingestionConfig": {
        "complexTypeConfig": {
            "delimiter": "."
          }
    },
    "routing": {
      "instanceSelectorType": "strictReplicaGroup"
    }
}
```

*table.json*

The highlighted section defines a decoder for processing Avro messages via the schema registry.
The URL for the schema registry is defined along with the Avro schema name.

We can create the table and schema by running the following command:

```bash theme={null}
docker run \
   --network ingestavro \
   -v $PWD/config:/config \
   apachepinot/pinot:1.0.0 AddTable \
     -schemaFile /config/schema.json \
     -tableConfigFile /config/table.json \
     -controllerHost "pinot-controller" \
    -exec
```

## Querying Pinot

We can then run the following query via the [Pinot UI](http://localhost:9000/#/query?query=select+person.id%2C+person.interests%2C+person.age%2C+person.address.country%0Afrom+people+%0Awhere+person.friend_ids+%3C%3E+%27null%27%0AAND+person.interests+%3C%3E+%27null%27%0Aand+ARRAYLENGTH%28person.interests%29+%3E+1%0A%0Alimit+10\&tracing=false\&useMSE=false):

```sql theme={null}
select person.id, person.interests, person.age, person.address.country
from people 
where person.friend_ids <> 'null'
AND person.interests <> 'null'
and ARRAYLENGTH(person.interests) > 1
limit 10
```

The output will look something like this:

| person.id                            | person.interests                                      | person.age | person.address.country |
| ------------------------------------ | ----------------------------------------------------- | ---------- | ---------------------- |
| 311b5e5e-060a-45f3-b050-e7b9e268dceb | \["Photography","Cooking","Fashion","Running"]        | 45         | Zambia                 |
| d29e52eb-41d8-4902-92fb-8ed6f51115b1 | \["Cycling","Dancing","Music"]                        | 60         | Qatar                  |
| b1a0cf13-8daf-4120-865f-7ac3ae51c3c4 | \["Yoga","Painting","Swimming"]                       | 48         | Sierra Leone           |
| f774187e-ee0a-4a2f-abbb-e73ba86a1dfb | \["Music","Gardening"]                                | 67         | Belize                 |
| c47cf80b-8104-480f-a9de-046f10f4c3f0 | \["Art","Meditation"]                                 | 31         | Mayotte                |
| c62586ff-24d9-43cb-9bb5-50b7e9a4a297 | \["Painting","Yoga"]                                  | 57         | Azerbaijan             |
| 175a0032-af11-4e58-8ab5-9bfbed70eabf | \["Baking","Fashion","Cooking","Baking","Gardening"]  | 73         | Wallis and Futuna      |
| 82940700-00e1-4d3a-8333-d30165fec35a | \["Fashion","Fashion","Fishing"]                      | 35         | Saint Martin           |
| f27d06e9-686e-42f3-a6d8-742c046ea3d7 | \["Sports","Reading","Reading","Cycling","Traveling"] | 27         | Russian Federation     |
| 0f1189ce-77ae-46cb-9f78-bf841b9f7e86 | \["Sports","Reading","Baking","Reading"]              | 74         | Sri Lanka              |

*Query Results*
