· pinot

Apache Pinot: Sorted indexes on offline tables

I’ve recently been learning all about Apache Pinot’s sorted forward indexes. I was initially going to explain how they work for offline and real-time tables, but the post got a bit long, so instead we’ll have two blog posts. In this one we’ll learn how sorted indexes are applied for offline tables.

sorted indexes offline banner
Figure 1. Apache Pinot: Sorted indexes on offline tables

Launch Components

We’re going to spin up a local instance of Pinot using the following Docker compose config:

docker-compose.yml
version: '3.7'
services:
  zookeeper:
    image: zookeeper:3.5.6
    hostname: zookeeper
    container_name: zookeeper-strava
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
  pinot-controller:
    image: apachepinot/pinot:0.9.3
    command: "StartController -zkAddress zookeeper-strava:2181 -dataDir /data"
    container_name: "pinot-controller-strava"
    volumes:
      - ./config:/config
      - ./data:/data
      - ./input:/input
    restart: unless-stopped
    ports:
      - "9000:9000"
    depends_on:
      - zookeeper
  pinot-broker:
    image: apachepinot/pinot:0.9.3
    command: "StartBroker -zkAddress zookeeper-strava:2181"
    restart: unless-stopped
    container_name: "pinot-broker-strava"
    ports:
      - "8099:8099"
    depends_on:
      - pinot-controller
  pinot-server:
    image: apachepinot/pinot:0.9.3
    command: "StartServer -zkAddress zookeeper-strava:2181"
    restart: unless-stopped
    container_name: "pinot-server-strava"
    depends_on:
      - pinot-broker

We can launch all the components by running the following command:

docker-compose up

Create Schema

We’re going to explore sorted indexes using a dataset of my Strava activities. We’ll be using the following schema:

/config/schema.json
{
    "schemaName": "activities",
    "dimensionFieldSpecs": [
      {
        "name": "id",
        "dataType": "LONG"
      },
      {
        "name": "altitude",
        "dataType": "DOUBLE"
      },
      {
        "name": "hr",
        "dataType": "INT"
      },
      {
        "name": "cadence",
        "dataType": "INT"
      },
      {
        "name": "rawTime",
        "dataType": "INT"
      },
      {
        "name": "distance",
        "dataType": "DOUBLE"
      },
      {
        "name": "lat",
        "dataType": "DOUBLE"
      },
      {
        "name": "lon",
        "dataType": "DOUBLE"
      },
      {
        "name": "location",
        "dataType": "BYTES"
      }
    ],
    "dateTimeFieldSpecs": [{
      "name": "timestamp",
      "dataType": "TIMESTAMP",
      "format" : "1:MILLISECONDS:EPOCH",
      "granularity": "1:MILLISECONDS"
    }]
  }

We can create the schema by running the following command:

docker exec -it pinot-controller-strava bin/pinot-admin.sh AddSchema \
  -schemaFile /config/schema.json -exec

Create Table

Now let’s create an offline table based on that schema.

/config/table-offline.json
{
    "tableName": "activities_offline",
    "tableType": "OFFLINE",
    "segmentsConfig": {
      "timeColumnName": "timestamp",
      "replication": 1,
      "schemaName": "activities"
    },
    "tenants": {
      "broker":"DefaultTenant",
      "server":"DefaultTenant"
    },
    "tableIndexConfig": {
      "loadMode": "MMAP"
    },
    "ingestionConfig": {
      "batchIngestionConfig": {
        "segmentIngestionType": "APPEND",
        "segmentIngestionFrequency": "DAILY"
      },
      "transformConfigs": [
        {"columnName": "location", "transformFunction": "toSphericalGeography(stPoint(lon, lat))" },
        {"columnName": "timestamp", "transformFunction": "FromDateTime(\"time\", 'yyyy-MM-dd HH:mm:ssZ')" }
      ]
    },
    "metadata": {}
  }

We can create a table by running the following command:

docker exec -it pinot-controller-strava bin/pinot-admin.sh AddTable   \
  -tableConfigFile /config/table-offline.json   \
  -exec

What is a sorted index?

Before we ingest any data, let’s remind ourselves about the definition of a sorted index.

When a column is physically sorted, Pinot uses a sorted forward index with run-length encoding on top of the dictionary-encoding. Instead of saving dictionary ids for each document id, Pinot will store a pair of start and end document ids for each value.

A diagram showing how a sorted index works conceptually is shown below:

sorted forward
Figure 2. Sorted Forward Index

The advantage of having a sorted index is that queries that filter by that column will be more performant since the query engine doesn’t need to scan through every document to check if they match the filtering criteria.

When creating a segment Pinot does a single pass over every column to check whether the data is sorted. Columns that contain sorted data will use a sorted forward index.

Note

Sorted indexes are determined for each segment. This means that a column could be sorted in one segment, but not in another one.

Ingesting Data

Now let’s import some data into our table. We’ll be importing data from CSV files that contain some of the lat/longs from a few of my Strava runs.

activity1.csv contains the first 5 recorded points from one run:

Table 1. activity1.csv
lat lon id distance altitude hr cadence time rawTime

56.265595

12.859432

2776420839

1.5

11.2

88

0

2019-10-09 21:25:25+00:00

0

56.26558

12.85943

2776420839

3.1

11.2

89

79

2019-10-09 21:25:26+00:00

1

56.265566

12.859438

2776420839

4.6

11.3

89

79

2019-10-09 21:25:27+00:00

2

56.265503

12.859488

2776420839

12.2

11.4

92

79

2019-10-09 21:25:30+00:00

5

56.265451

12.85952

2776420839

18.4

11.4

97

83

2019-10-09 21:25:32+00:00

7

And activity2.csv contains 5 recorded points from two different runs:

Table 2. activity2.csv
lat lon id distance altitude hr cadence time rawTime

-20.400632

57.597161

3120683481

0.0

284.4

73

97

2020-02-22 03:48:39+00:00

0

-20.400629

57.597158

3120683481

0.2

284.4

73

46

2020-02-22 03:48:40+00:00

1

-20.400595

57.597163

3120683481

4.0

284.3

73

46

2020-02-22 03:48:42+00:00

3

-20.400398

57.597157

3120683481

26.1

285.4

73

82

2020-02-22 03:48:49+00:00

10

-20.400278

57.597101

3120683481

40.7

284.7

76

84

2020-02-22 03:48:53+00:00

14

56.434599

12.838058

3092741860

0.8

12.3

88

59

2020-02-12 06:19:59+00:00

0

56.434604

12.83807

3092741860

1.8

12.3

88

59

2020-02-12 06:20:00+00:00

1

56.434625

12.838106

3092741860

5.0

12.1

87

59

2020-02-12 06:20:03+00:00

4

56.434717

12.838408

3092741860

26.5

11.0

87

82

2020-02-12 06:20:10+00:00

11

56.434718

12.838463

3092741860

29.9

10.8

91

82

2020-02-12 06:20:11+00:00

12

docker exec \
  -it pinot-controller-strava bin/pinot-admin.sh LaunchDataIngestionJob \
  -jobSpecFile /config/job-spec.yml

Once the job has run we’ll have two segments in our table, which we can see by navigating to http://localhost:9000/#/tenants/table/activities_offline_OFFLINE. You should see something like the following:

segments table
Figure 3. Segments

If the segments are showing up we can be reasonably sure that the import has worked, but let’s also navigate to http://localhost:9000/#/query to make sure. If we run a query against the table, we should see something like the following:

sorted query
Figure 4. Querying the activities_offline table

Checking sorted status

Next, we’re going to check on the sorted status of the columns in each segment.

First let’s collect all the columns in the activities schema:

export queryString=`curl -X GET "http://localhost:9000/schemas/activities" -H "accept: application/json" 2>/dev/null |
  jq -r '[.dimensionFieldSpecs,.dateTimeFieldSpecs | .[] | .name ] | join("&columns=")'`

And now let’s call the getServerMetaData endpoint and filter the response to get the segment name, input file, and column names with sorted status:

curl -X GET "http://localhost:9000/segments/activities_offline/metadata?columns=${queryString}" \
  -H "accept: application/json"  2>/dev/null |
  jq -c '.[] | select(.columns != null) | {
    segment: .segmentName,
    importedFrom: .custom ["input.data.file.uri"],
    columns: .columns | map({(.columnName): .sorted})
  }'

If we run this command, we’ll see the following output:

Output
{
    "segment": "activities_offline_OFFLINE_1581488399000_1582343333000_1",
    "importedFrom": "file:/input-blog/activity2.csv",
    "columns": [
        {"altitude": false},
        {"distance": false},
        {"hr": false},
        {"lon": false},
        {"cadence": false},
        {"rawTime": false},
        {"location": false},
        {"id": false},
        {"lat": true},
        {"timestamp": false}
    ]
}
{
    "segment": "activities_offline_OFFLINE_1570656325000_1570656332000_0",
    "importedFrom": "file:/input-blog/activity1.csv",
    "columns": [
        {"altitude": true},
        {"distance": true},
        {"hr": true},
        {"lon": false},
        {"cadence": true},
        {"rawTime": true},
        {"location": false},
        {"id": true},
        {"lat": false},
        {"timestamp": true}
    ]
}
Note

I have formatted the JSON output using the FracturedJson tool in the web browser to make it easier to read.

From this output we can see that the segment created from activity1.csv has many more sorted columns than the one created from activity2.csv. The only column that was explicitly sorted is timestamp, but rawTime and distance are also sorted because they are correlated with timestamp within an activity.

For the activity2.csv segment the only sorted column is lat, which is sorted by chance more than anything else! None of the other columns are sorted.

Summary

So that’s the end of this first post explaining how sorted indexes work in Apache Pinot. Hopefully it all made sense, but if not feel free to ask any questions on the Pinot Community Slack.

In our next post we’ll learn how sorted indexes work on real-time tables.

  • LinkedIn
  • Tumblr
  • Reddit
  • Google+
  • Pinterest
  • Pocket