· pinot

Apache Pinot: Sorted indexes on real-time tables

I’ve recently been learning all about Apache Pinot’s sorted forward indexes, and in my first blog post I explained how they work for offline tables. In this blog post we’ll learn how sorted indexes work with real-time tables.

sorted indexes realtime banner
Figure 1. Apache Pinot: Sorted indexes on real-time tables

Launch Components

We’re going to spin up a local instance of Pinot and Kafka using the following Docker compose config:

docker-compose.yml
version: '3.7'
services:
  zookeeper:
    image: zookeeper:3.5.6
    hostname: zookeeper
    container_name: zookeeper-strava-realtime
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
  kafka:
    image: wurstmeister/kafka:latest
    restart: unless-stopped
    container_name: "kafka-strava"
    ports:
      - "9092:9092"
    expose:
      - "9093"
    depends_on:
      - zookeeper
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-strava-realtime:2181/kafka
      KAFKA_BROKER_ID: 0
      KAFKA_ADVERTISED_HOST_NAME: kafka-strava
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-strava:9093,OUTSIDE://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
  pinot-controller:
    image: apachepinot/pinot:0.9.3
    command: "StartController -zkAddress zookeeper-strava-realtime:2181 -dataDir /data"
    container_name: "pinot-controller-strava-realtime"
    volumes:
      - ./config:/config
      - ./data-realtime:/data
    restart: unless-stopped
    ports:
      - "9000:9000"
    depends_on:
      - zookeeper
  pinot-broker:
    image: apachepinot/pinot:0.9.3
    command: "StartBroker -zkAddress zookeeper-strava-realtime:2181"
    restart: unless-stopped
    container_name: "pinot-broker-strava-realtime"
    ports:
      - "8099:8099"
    depends_on:
      - pinot-controller
  pinot-server:
    image: apachepinot/pinot:0.9.3
    command: "StartServer -zkAddress zookeeper-strava-realtime:2181"
    restart: unless-stopped
    container_name: "pinot-server-strava-realtime"
    depends_on:
      - pinot-broker

We can launch all the components by running the following command:

docker-compose up

Create Schema

We’re going to explore sorted indexes using a dataset of my Strava activities, the same one that we used in the first blog post. The schema is described below:

/config/schema.json
{
    "schemaName": "activities",
    "dimensionFieldSpecs": [
      {
        "name": "id",
        "dataType": "LONG"
      },
      {
        "name": "altitude",
        "dataType": "DOUBLE"
      },
      {
        "name": "hr",
        "dataType": "INT"
      },
      {
        "name": "cadence",
        "dataType": "INT"
      },
      {
        "name": "rawTime",
        "dataType": "INT"
      },
      {
        "name": "distance",
        "dataType": "DOUBLE"
      },
      {
        "name": "lat",
        "dataType": "DOUBLE"
      },
      {
        "name": "lon",
        "dataType": "DOUBLE"
      },
      {
        "name": "location",
        "dataType": "BYTES"
      }
    ],
    "dateTimeFieldSpecs": [{
      "name": "timestamp",
      "dataType": "TIMESTAMP",
      "format" : "1:MILLISECONDS:EPOCH",
      "granularity": "1:MILLISECONDS"
    }]
  }

We can create the schema by running the following command:

docker exec -it pinot-controller-strava-realtime bin/pinot-admin.sh AddSchema \
  -schemaFile /config/schema.json -exec

What is a sorted index?

Before we ingest any data, let’s remind ourselves about the definition of a sorted index.

When a column is physically sorted, Pinot uses a sorted forward index with run-length encoding on top of the dictionary-encoding. Instead of saving dictionary ids for each document id, Pinot will store a pair of start and end document ids for each value.

A diagram showing how a sorted index works conceptually is shown below:

sorted forward
Figure 2. Sorted Forward Index

The advantage of having a sorted index is that queries that filter by that column will be more performant since the query engine doesn’t need to scan through every document to check if they match the filtering criteria.

For real-time tables we can specify a sorted index in the tableIndexConfig:

{
    "tableIndexConfig": {
        "sortedColumn": [
            "column_name"
        ],
    }
}

When Pinot commits the segment it will ensure that the data is sorted based on this sorted column. It will then do a single pass over every other column to check whether the data in those columns is sorted. Columns that contain sorted data will also use a sorted forward index. This means that it’s possible that multiple columns will use a sorted index even though one column is guaranteed to use a sorted index.

Note

Sorted indexes are determined for each segment. This means that a column could be sorted in one segment, but not in another one.

Data Ingestion into Kafka

Now let’s ingest some data into a Kafka topic.

First let’s install the Confluent Kafka client:

pip install confluent-kafka

And now we’ll import some documents into the activities-realtime topic.

Import the following libraries:

import json
from confluent_kafka import Producer

Define an acknowledge function and configure our Kafka producer:

def acked(err, msg):
    if err is not None:
        print("Failed to deliver message: {0}: {1}".format(msg.value(), err.str()))

producer = Producer({'bootstrap.servers': 'localhost:9092'})

And finally write some messages to the topic:

points = [
    {'lat': '56.265595', 'lon': '12.859432', 'id': '2776420839', 'distance': '1.5', 'altitude': '11.2', 'hr': '88',
     'cadence': '0', 'time': '2019-10-09 21:25:25+00:00', 'rawTime': '0'},
    {'lat': '56.265566', 'lon': '12.859438', 'id': '2776420839', 'distance': '4.6', 'altitude': '11.3', 'hr': '89',
     'cadence': '79', 'time': '2019-10-09 21:25:27+00:00', 'rawTime': '2'},
    {'lat': '56.265503', 'lon': '12.859488', 'id': '2776420839', 'distance': '12.2', 'altitude': '11.4', 'hr': '92',
     'cadence': '79', 'time': '2019-10-09 21:25:30+00:00', 'rawTime': '5'},
    {'lat': '56.265451', 'lon': '12.85952', 'id': '2776420839', 'distance': '18.4', 'altitude': '11.4', 'hr': '97',
     'cadence': '83', 'time': '2019-10-09 21:25:32+00:00', 'rawTime': '7'},
    {'lat': '56.26558', 'lon': '12.85943', 'id': '2776420839', 'distance': '3.1', 'altitude': '11.2', 'hr': '89',
     'cadence': '79', 'time': '2019-10-09 21:25:26+00:00', 'rawTime': '1'}
]

for point in points:
    payload = json.dumps(point, ensure_ascii=False).encode('utf-8')
    producer.produce(topic='activities-realtime', key=str(point['id']), value=payload, callback=acked)

producer.flush()

We can check that those messages have reached the topic by running the following command:

docker exec -it kafka-strava kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic activities-realtime \
  --from-beginning
Output
{"lat": "56.265595", "lon": "12.859432", "id": "2776420839", "distance": "1.5", "altitude": "11.2", "hr": "88", "cadence": "0", "time": "2019-10-09 21:25:25+00:00", "rawTime": "0"}
{"lat": "56.265566", "lon": "12.859438", "id": "2776420839", "distance": "4.6", "altitude": "11.3", "hr": "89", "cadence": "79", "time": "2019-10-09 21:25:27+00:00", "rawTime": "2"}
{"lat": "56.265503", "lon": "12.859488", "id": "2776420839", "distance": "12.2", "altitude": "11.4", "hr": "92", "cadence": "79", "time": "2019-10-09 21:25:30+00:00", "rawTime": "5"}
{"lat": "56.265451", "lon": "12.85952", "id": "2776420839", "distance": "18.4", "altitude": "11.4", "hr": "97", "cadence": "83", "time": "2019-10-09 21:25:32+00:00", "rawTime": "7"}
{"lat": "56.26558", "lon": "12.85943", "id": "2776420839", "distance": "3.1", "altitude": "11.2", "hr": "89", "cadence": "79", "time": "2019-10-09 21:25:26+00:00", "rawTime": "1"}

Processed a total of 5 messages

It’s looking good so far.

Unsorted Table

Now let’s create a real-time table called activities_realtime to ingest the data from that Kafka topic into Pinot. This table doesn’t specify a sorted column.

/config/table-realtime.json
{
    "tableName": "activities_realtime",
    "tableType": "REALTIME",
    "segmentsConfig": {
      "timeColumnName": "timestamp",
      "schemaName": "activities",
      "replication": "1",
      "replicasPerPartition": "1"
    },
    "tenants": {
      "broker":"DefaultTenant",
      "server":"DefaultTenant"
    },
    "tableIndexConfig": {
        "invertedIndexColumns": [],
        "rangeIndexColumns": [],
        "autoGeneratedInvertedIndex": false,
        "createInvertedIndexDuringSegmentGeneration": false,
        "sortedColumn": [],
        "bloomFilterColumns": [],
        "loadMode": "MMAP",
        "streamConfigs": {
          "streamType": "kafka",
          "stream.kafka.topic.name": "activities",
          "stream.kafka.broker.list": "kafka-strava:9093",
          "stream.kafka.consumer.type": "lowlevel",
          "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
          "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
          "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
          "realtime.segment.flush.threshold.rows": "5",
          "realtime.segment.flush.threshold.time": "24h",
          "realtime.segment.flush.segment.size": "100M"
        },
        "noDictionaryColumns": [],
        "onHeapDictionaryColumns": [],
        "varLengthDictionaryColumns": [],
        "enableDefaultStarTree": false,
        "enableDynamicStarTreeCreation": false,
        "aggregateMetrics": false,
        "nullHandlingEnabled": false
    },
    "ingestionConfig": {
      "batchIngestionConfig": {
        "segmentIngestionType": "APPEND",
        "segmentIngestionFrequency": "DAILY"
      },
      "transformConfigs": [
        {"columnName": "location", "transformFunction": "toSphericalGeography(stPoint(lon, lat))" },
        {"columnName": "timestamp", "transformFunction": "FromDateTime(\"time\", 'yyyy-MM-dd HH:mm:ssZ')" }
      ]
    },
    "metadata": {}
  }
Warning

The realtime.segment.flush.threshold.rows config is intentionally set to an extremely small value so that the segment will be committed after 5 records have been ingested. In a production system this value should be set much higher, as described in the real time tuning guide.

We can create a table by running the following command:

docker exec -it pinot-controller-strava-realtime bin/pinot-admin.sh AddTable   \
  -tableConfigFile /config/table-realtime.json   \
  -exec

Once we create this table, Pinot will start ingesting data from the Kafka topic. We can check which segments have been created by navidating to http://localhost:9000/#/tenants/table/activities_realtime_REALTIME. You should see something like the following:

realtime segments
Figure 3. Segments

If we click on the first segment, we’ll see the following:

segment0
Figure 4. Segment 0

We can see that this segment has been committed by looking at the segment.realtime.status property on the right hand side. The segment.total.docs property tells us that this segment contains 5 documents.

Now let’s go back and then click the other segment. If we do that we’ll see the following:

segment1
Figure 5. Segment 1

This one is IN_PROGRESS and if add some more messages to the Kafka topic they will go into this segment.

Now we’re going to check on the sorted status of the columns for all committed segments i.e segment activities_realtime01__20220119T1541Z in this case.

First let’s collect all the columns in the activities schema:

export queryString=`curl -X GET "http://localhost:9000/schemas/activities" -H "accept: application/json" 2>/dev/null |
  jq -r '[.dimensionFieldSpecs,.dateTimeFieldSpecs | .[] | .name ] | join("&columns=")'`

And now let’s call the getServerMetaData endpoint to return the segments for the activities_realtime table and filter the response to get the segment name, input file, and column names with sorted status:

curl -X GET "http://localhost:9000/segments/activities_realtime/metadata?columns=${queryString}" \
  -H "accept: application/json"  2>/dev/null |
  jq -c '.[] | select(.columns != null) | {
    segment: .segmentName,
    columns: .columns | map({(.columnName): .sorted})
  }'
Note

To refresh our minds, the data in the table was imported in the order shown in the table below:

Table 1. activities_realtime
lat lon id distance altitude hr cadence time rawTime

56.265595

12.859432

2776420839

1.5

11.2

88

0

2019-10-09 21:25:25+00:00

0

56.265566

12.859438

2776420839

4.6

11.3

89

79

2019-10-09 21:25:27+00:00

2

56.265503

12.859488

2776420839

12.2

11.4

92

79

2019-10-09 21:25:30+00:00

5

56.265451

12.85952

2776420839

18.4

11.4

97

83

2019-10-09 21:25:32+00:00

7

56.26558

12.85943

2776420839

3.1

11.2

89

79

2019-10-09 21:25:26+00:00

1

If we run this command, we’ll see the following output:

Output
{
    "segment": "activities_realtime__0__0__20220119T1541Z",
    "importedFrom": null,
    "columns": [
        {"altitude": false},
        {"distance": false},
        {"hr": false},
        {"lon": false},
        {"cadence": false},
        {"rawTime": false},
        {"location": false},
        {"id": true},
        {"lat": false},
        {"timestamp": false}
    ]
}

From the output we can see that almost all of the columns aren’t sorted. The only sorted column is id and that’s because we only have one unique value in that column.

Sorted Table

Now let’s create a real-time table called activities_realtime_sorted that specifies timestamp as a sorted column.

/config/table-realtime.json
{
    "tableName": "activities_realtime_sorted",
    "tableType": "REALTIME",
    "segmentsConfig": {
      "timeColumnName": "timestamp",
      "schemaName": "activities",
      "replication": "1",
      "replicasPerPartition": "1"
    },
    "tenants": {
      "broker":"DefaultTenant",
      "server":"DefaultTenant"
    },
    "tableIndexConfig": {
        "invertedIndexColumns": [],
        "rangeIndexColumns": [],
        "autoGeneratedInvertedIndex": false,
        "createInvertedIndexDuringSegmentGeneration": false,
        "sortedColumn": ["timestamp"],
        "bloomFilterColumns": [],
        "loadMode": "MMAP",
        "streamConfigs": {
          "streamType": "kafka",
          "stream.kafka.topic.name": "activities",
          "stream.kafka.broker.list": "kafka-strava:9093",
          "stream.kafka.consumer.type": "lowlevel",
          "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
          "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
          "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
          "realtime.segment.flush.threshold.rows": "5",
          "realtime.segment.flush.threshold.time": "24h",
          "realtime.segment.flush.segment.size": "100M"
        },
        "noDictionaryColumns": [],
        "onHeapDictionaryColumns": [],
        "varLengthDictionaryColumns": [],
        "enableDefaultStarTree": false,
        "enableDynamicStarTreeCreation": false,
        "aggregateMetrics": false,
        "nullHandlingEnabled": false
    },
    "ingestionConfig": {
      "batchIngestionConfig": {
        "segmentIngestionType": "APPEND",
        "segmentIngestionFrequency": "DAILY"
      },
      "transformConfigs": [
        {"columnName": "location", "transformFunction": "toSphericalGeography(stPoint(lon, lat))" },
        {"columnName": "timestamp", "transformFunction": "FromDateTime(\"time\", 'yyyy-MM-dd HH:mm:ssZ')" }
      ]
    },
    "metadata": {}
  }

We can create a table by running the following command:

docker exec -it pinot-controller-strava-realtime bin/pinot-admin.sh AddTable   \
  -tableConfigFile /config/table-realtime-.json   \
  -exec

We can call the getSegments endpoint to check which segments have been created for this table:

curl -X GET "http://localhost:9000/segments/activities_realtime_sorted" -H "accept: application/json" 2>/dev/null
Output
[
  {
    "REALTIME": [
      "activities_realtime_sorted__0__0__20220120T0952Z",
      "activities_realtime_sorted__0__1__20220120T0952Z"
    ]
  }
]

We can then call the getSegmentMetadata endpoint to return the metadata for each of these segments:

segments=`curl -X GET "http://localhost:9000/segments/activities_realtime_sorted" \
               -H "accept: application/json" 2>/dev/null |
          jq -r '.[] | .REALTIME[]'`
for segment in ${segments}; do
  curl -X GET "http://localhost:9000/segments/activities_realtime_sorted/${segment}/metadata" \
    -H "accept: application/json" 2>/dev/null | jq '.'
done
Output
{
  "segment.crc": "3864673434",
  "segment.creation.time": "1642672360783",
  "segment.end.time": "1570656332000",
  "segment.flush.threshold.size": "5",
  "segment.index.version": "v3",
  "segment.name": "activities_realtime_sorted__0__0__20220120T0952Z",
  "segment.realtime.download.url": "http://172.21.0.3:9000/segments/activities_realtime_sorted/activities_realtime_sorted__0__0__20220120T0952Z",
  "segment.realtime.endOffset": "5",
  "segment.realtime.numReplicas": "1",
  "segment.realtime.startOffset": "0",
  "segment.realtime.status": "DONE",
  "segment.start.time": "1570656325000",
  "segment.table.name": "activities_realtime_sorted",
  "segment.time.unit": "MILLISECONDS",
  "segment.total.docs": "5",
  "segment.type": "REALTIME"
}
{
  "segment.creation.time": "1642672361525",
  "segment.flush.threshold.size": "5",
  "segment.name": "activities_realtime_sorted__0__1__20220120T0952Z",
  "segment.realtime.numReplicas": "1",
  "segment.realtime.startOffset": "5",
  "segment.realtime.status": "IN_PROGRESS",
  "segment.table.name": "activities_realtime_sorted",
  "segment.type": "REALTIME"
}

From this output we learn that segment activities_realtime_sorted0020220120T0952Z has already been committed and segment activities_realtime_sorted0120220120T0952Z is still in progress. Any new records will be added to segment activities_realtime_sorted01__20220120T0952Z.

Now let’s check on the sorted status of columns in these segments using the getServerMetaData endpoint:

curl -X GET "http://localhost:9000/segments/activities_realtime_sorted/metadata?columns=${queryString}" \
  -H "accept: application/json"  2>/dev/null |
  jq -c '.[] | select(.columns != null) | {
    segment: .segmentName,
    columns: .columns | map({(.columnName): .sorted})
  }'
Note

The table below shows the effective order of the data in the segment when it was committed:

Table 2. activities_realtime_sorted
lat lon id distance altitude hr cadence time rawTime

56.265595

12.859432

2776420839

1.5

11.2

88

0

2019-10-09 21:25:25+00:00

0

56.26558

12.85943

2776420839

3.1

11.2

89

79

2019-10-09 21:25:26+00:00

1

56.265566

12.859438

2776420839

4.6

11.3

89

79

2019-10-09 21:25:27+00:00

2

56.265503

12.859488

2776420839

12.2

11.4

92

79

2019-10-09 21:25:30+00:00

5

56.265451

12.85952

2776420839

18.4

11.4

97

83

2019-10-09 21:25:32+00:00

7

If we run this command, we’ll see the following output:

Output
{
    "segment": "activities_realtime_sorted__0__0__20220120T0952Z",
    "columns": [
        {"altitude": true},
        {"distance": true},
        {"hr": true},
        {"lon": false},
        {"cadence": true},
        {"rawTime": true},
        {"location": false},
        {"id": true},
        {"lat": false},
        {"timestamp": true}
    ]
}

Now the timestamp field is sorted, which is what we would expect, along with a bunch of other fields as well.

It’s unlikely that any of the other fields would remain sorted if we we imported any records. distance and rawTime are correlated with timestamp within a single activity, but if a segment contained multiple contained multiple activities that correlation would be lost.

Conclusion

That’s the end of this mini blog series about Pinot’s sorted indexes. Hopefully it all made sense, but if not feel free to ask any questions on the Pinot Community Slack.

  • LinkedIn
  • Tumblr
  • Reddit
  • Google+
  • Pinterest
  • Pocket