· pinot

Apache Pinot: Exploring range queries

In our last post about the Chicago Crimes dataset and Apache Pinot, we learnt how to use various indexes to filter columns by exact values. In this post we’re going to learn how to write range queries against the dataset.

range queries
Figure 1. Apache Pinot - Range Queries

Recap

To recap, the Chicago Crimes dataset contains more than 7 million crimes committed in Chicago from 2001 until today. For each crime we have various identifiers, a timestamp, location, and codes reprsenting the type of crime that’s been committed.

A subset of the data is shown below:

Chicago Crimes Dataset
Figure 2. Chicago Crimes Dataset

We loaded this data into Apache and Pinot and then analysed the numEntriesScannedInFilter and timeUsedMs values returned in the query metadata to see how well we’d optimised our queries.

Setup

We’re going to use the same Docker Compose script as before, which spins up the various components for Apache Pinot 0.9.0:

docker-compose.yml
version: '3.7'
services:
  zookeeper:
    image: zookeeper:3.5.6
    hostname: zookeeper
    container_name: manual-zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
  pinot-controller:
    image: apachepinot/pinot:0.9.0
    command: "StartController -zkAddress manual-zookeeper:2181"
    container_name: "manual-pinot-controller"
    volumes:
      - ./config:/config
      - ./data:/data
    restart: unless-stopped
    ports:
      - "9000:9000"
    depends_on:
      - zookeeper
  pinot-broker:
    image: apachepinot/pinot:0.9.0
    command: "StartBroker -zkAddress manual-zookeeper:2181"
    restart: unless-stopped
    container_name: "manual-pinot-broker"
    volumes:
      - ./config:/config
      - ./data:/data
    ports:
      - "8099:8099"
    depends_on:
      - pinot-controller
  pinot-server:
    image: apachepinot/pinot:0.9.0
    command: "StartServer -zkAddress manual-zookeeper:2181"
    restart: unless-stopped
    container_name: "manual-pinot-server"
    volumes:
      - ./config:/config
      - ./data:/data
    depends_on:
      - pinot-broker

Schema

We’ve made a small change to our schema since the last blog post. Instead of storing the timestamp as a STRING, we’re going to store it as a TIMESTAMP so that we can perform date time operations. We’ll also change the column name from Timestamp to DateEpoch.

The updated schema is shown below:

/config/schema.json
{
    "schemaName": "crimes",
    "dimensionFieldSpecs": [
      {
        "name": "ID",
        "dataType": "INT"
      },
      {
        "name": "CaseNumber",
        "dataType": "STRING"
      },
      {
        "name": "Block",
        "dataType": "STRING"
      },
      {
        "name": "IUCR",
        "dataType": "STRING"
      },
      {
        "name": "PrimaryType",
        "dataType": "STRING"
      },
      {
        "name": "Arrest",
        "dataType": "BOOLEAN"
      },
      {
        "name": "Domestic",
        "dataType": "BOOLEAN"
      },
      {
        "name": "Beat",
        "dataType": "STRING"
      },
      {
        "name": "District",
        "dataType": "STRING"
      },
      {
        "name": "Ward",
        "dataType": "STRING"
      },
      {
        "name": "CommunityArea",
        "dataType": "STRING"
      },
      {
        "name": "FBICode",
        "dataType": "STRING"
      },
      {
        "name": "Latitude",
        "dataType": "DOUBLE"
      },
      {
        "name": "Longitude",
        "dataType": "DOUBLE"
      }
    ],
    "dateTimeFieldSpecs": [
      {
        "name": "DateEpoch",
        "dataType": "TIMESTAMP",
        "format" : "1:MILLISECONDS:EPOCH",
        "granularity": "1:MILLISECONDS"
      }
    ]
}

If you read the first post about this dataset you’ll remember that the timestamp was a DateTime string in the MM/dd/yyyy HH:mm:ss a format, rather than the number of milliseconds since the epoch. We’re going to take care of that with a transformation function in our table config.

Table

The table config is described below:

/config/table.json
{
    "tableName": "crimes",
    "tableType": "OFFLINE",
    "segmentsConfig": {
      "replication": 1
    },
    "tenants": {
      "broker":"DefaultTenant",
      "server":"DefaultTenant"
    },
    "tableIndexConfig": {
      "loadMode": "MMAP",
      "sortedColumn": ["Beat"]
    },
    "nullHandlingEnabled": true,
    "ingestionConfig": {
      "batchIngestionConfig": {
        "segmentIngestionType": "APPEND",
        "segmentIngestionFrequency": "DAILY"
      },
      "transformConfigs": [
        {"columnName": "CaseNumber", "transformFunction": "\"Case Number\"" },
        {"columnName": "PrimaryType", "transformFunction": "\"Primary Type\"" },
        {"columnName": "CommunityArea", "transformFunction": "\"Community Area\"" },
        {"columnName": "FBICode", "transformFunction": "\"FBI Code\"" },
        {"columnName": "DateEpoch", "transformFunction": "FromDateTime(\"Date\", 'MM/dd/yyyy HH:mm:ss a')" }
      ]
    },
    "metadata": {}
}

As mentioned in the previous section, we’re using the FromDateTime transformation function to convert the DateTime strings in the Date column into timestamps.

For more details on the data transformation works see my blog post Converting a DateTime String to Timestamp in Apache Pinot.

Now let’s create the table and schema:

docker exec -it manual-pinot-controller bin/pinot-admin.sh AddTable   \
  -tableConfigFile /config/table.json   \
  -schemaFile /config/schema.json -exec

Import CSV

Now we’re going to import the crimes into Pinot, using the following ingestion spec:

/config/job-spec-sorted.yml
executionFrameworkSpec:
  name: 'standalone'
  segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
  segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
jobType: SegmentCreationAndTarPush
inputDirURI: '/data'
includeFileNamePattern: 'glob:**/Crimes_beat_sorted.csv'
outputDirURI: ' '
pinotFSSpecs:
  - scheme: file
    className: org.apache.pinot.spi.filesystem.LocalPinotFS
recordReaderSpec:
  dataFormat: 'csv'
  className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
  configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
tableSpec:
  tableName: 'crimes'
pinotClusterSpecs:
  - controllerURI: 'http://localhost:9000'
docker exec \
  -it manual-pinot-controller bin/pinot-admin.sh LaunchDataIngestionJob \
  -jobSpecFile /config/job-spec-sorted.yml

Range Querying

Now it’s time to do some range querying. Let’s start by counting the crimes committed on 1st January 2019, which we can do by running the following query:

SELECT count(*)
FROM crimes
WHERE DateEpoch BETWEEN
  FromDateTime('2019-01-01', 'yyyy-MM-dd') AND
  FromDateTime('2019-01-02', 'yyyy-MM-dd')

As in the last post, we’ll be looking at the output in JSON format to see what’s going on under the covers:

Output
{
  "numDocsScanned": 1065,
  "numEntriesScannedInFilter": 7434990,
  "numEntriesScannedPostFilter": 0,
  "timeUsedMs": 80
}

As we might expect, the SQL engine has scanned all 7,434,990 rows to check which ones match the date range. 1,065 crimes were committed on this day and the query took 80 ms.

Now we’re going to create another table called crimes_index that has a range index applied to the DateEpoch column:

/config/table-range-index.json
{
  "tableName": "crimes_range_index",
  "tableType": "OFFLINE",
  "segmentsConfig": {
    "replication": 1,
    "schemaName": "crimes", (1)
  },
  "tenants": {
    "broker":"DefaultTenant",
    "server":"DefaultTenant"
  },
  "tableIndexConfig": {
    "loadMode": "MMAP",
    "sortedColumn": ["Beat"],
    "rangeIndexVersion": 2,
    "rangeIndexColumns": ["DateEpoch"] (2)
  },
  "nullHandlingEnabled": true,
  "ingestionConfig": {
    "batchIngestionConfig": {
      "segmentIngestionType": "APPEND",
      "segmentIngestionFrequency": "DAILY"
    },
    "transformConfigs": [
      {"columnName": "CaseNumber", "transformFunction": "\"Case Number\"" },
      {"columnName": "PrimaryType", "transformFunction": "\"Primary Type\"" },
      {"columnName": "CommunityArea", "transformFunction": "\"Community Area\"" },
      {"columnName": "FBICode", "transformFunction": "\"FBI Code\"" },
      {"columnName": "DateEpoch", "transformFunction": "FromDateTime(\"Date\", 'MM/dd/yyyy HH:mm:ss a')" }
    ]
  },
  "metadata": {}
}
1 We need to explicitly specify the schema name since it doesn’t match the table name.
2 Add a range index on the DateEpoch column.

Run the following command to create the table:

docker exec -it manual-pinot-controller bin/pinot-admin.sh AddTable   \
  -tableConfigFile /config/table-range-index.json   \
  -schemaFile /config/schema.json -exec

Now let’s copy the segment from the crimes table to the crimes_range_index table. We can do this with the following ingestion spec:

/config/job-spec-download-only.yml
executionFrameworkSpec:
  name: 'standalone'
  segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
jobType: SegmentTarPush
outputDirURI: '/opt/pinot/data/crimes'
pinotFSSpecs:
  - scheme: file
    className: org.apache.pinot.spi.filesystem.LocalPinotFS
tableSpec:
  tableName: 'crimes_range_index'
pinotClusterSpecs:
  - controllerURI: 'http://localhost:9000'

Run the ingestion spec:

docker exec \
  -it manual-pinot-controller bin/pinot-admin.sh LaunchDataIngestionJob \
  -jobSpecFile /config/job-spec-download-only.yml

For more details on how this ingestion spec works, see my blog post Apache Pinot: Copying a segment to a new table.

Now let’s re-run the query against the new table:

SELECT count(*)
FROM crimes_range_index
WHERE DateEpoch BETWEEN
  FromDateTime('2019-01-01', 'yyyy-MM-dd') AND
  FromDateTime('2019-01-02', 'yyyy-MM-dd')
Output
{
  "numDocsScanned": 1065,
  "numEntriesScannedInFilter": 0,
  "numEntriesScannedPostFilter": 0,
  "timeUsedMs": 14
}

numEntriesScannedInFilter is now down to 0 and the query is more than 5 times faster than it was before.

Now let’s see what happens if we filter on multiple columns. We have a sorted index on the Beat column, so let’s find the crimes committed on a specific beat on 1st January 2019:

SELECT count(*)
FROM crimes
WHERE DateEpoch BETWEEN
  FromDateTime('2019-01-01', 'yyyy-MM-dd') AND
  FromDateTime('2019-01-02', 'yyyy-MM-dd')
AND Beat = '0421'
Output
{
  "numDocsScanned": 13,
  "numEntriesScannedInFilter": 57573,
  "numEntriesScannedPostFilter": 0,
  "timeUsedMs": 3
}
SELECT count(*)
FROM crimes_range_index
WHERE DateEpoch BETWEEN
  FromDateTime('2019-01-01', 'yyyy-MM-dd') AND
  FromDateTime('2019-01-02', 'yyyy-MM-dd')
AND Beat = '0421'
Output
{
  "numDocsScanned": 13,
  "numEntriesScannedInFilter": 0,
  "numEntriesScannedPostFilter": 0,
  "timeUsedMs": 12
}

In this case, the combination of sorted index on Beat and a scan of the forward index on DateEpoch is quicker than querying indexes on both fields.

Admittedly this is far from the normal usage of Pinot. I’m only running one query at a time, whereas there would normally be thousands (if not more) concurrent queries, which would tip things in favour of using the indexes on both fields. Having said that, there is a GitHub issue to have the query engine be smarter when deciding whether to scan or use an index.

If we include more beats, the query that uses both indexes is slightly quicker:

SELECT count(*)
FROM crimes
WHERE DateEpoch BETWEEN
  FromDateTime('2019-01-01', 'yyyy-MM-dd') AND
  FromDateTime('2019-01-02', 'yyyy-MM-dd')
AND Beat IN ('0421', '0423', '0624', '1834', '0511', '1112', '1533', '0823', '0414', '1522')
Output
{
  "numDocsScanned": 90,
  "numEntriesScannedInFilter": 513052,
  "numEntriesScannedPostFilter": 0,
  "timeUsedMs": 12
}
SELECT count(*)
FROM crimes_range_index
WHERE DateEpoch BETWEEN
  FromDateTime('2019-01-01', 'yyyy-MM-dd') AND
  FromDateTime('2019-01-02', 'yyyy-MM-dd')
AND Beat IN ('0421', '0423', '0624', '1834', '0511', '1112', '1533', '0823', '0414', '1522')
Output
{
  "numDocsScanned": 90,
  "numEntriesScannedInFilter": 0,
  "numEntriesScannedPostFilter": 0,
  "timeUsedMs": 11
}

Conclusion

In this post we learnt how to use Apache Pinot’s range index so that we could find crimes that happened on a given day. We then combined filter predicates and saw that sometimes it can be faster to do a column scan rather than merging together the results of querying two indexes.

We haven’t yet explored how to query the spatial data that this dataset contains, so perhaps that’s what we’ll look at in our next post!

  • LinkedIn
  • Tumblr
  • Reddit
  • Google+
  • Pinterest
  • Pocket