· pinot

Apache Pinot: Importing CSV files with columns containing spaces

I’ve been playing around with one of my favourite datasets from the Chicago Data Portal and spent a while figuring out how to import columns that contain spaces into Apache Pinot. In this blog post we’ll learn how to do that using a subset of the data.

Setup

We’re going to spin up a local instance of Pinot using the following Docker compose config:

docker-compose.yml
version: '3.7'
services:
  zookeeper:
    image: zookeeper:3.5.6
    hostname: zookeeper
    container_name: manual-zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
  pinot-controller:
    image: apachepinot/pinot:0.9.0
    command: "StartController -zkAddress manual-zookeeper:2181"
    container_name: "manual-pinot-controller"
    volumes:
      - ./config:/config
      - ./data:/data
    restart: unless-stopped
    ports:
      - "9000:9000"
    depends_on:
      - zookeeper
  pinot-broker:
    image: apachepinot/pinot:0.9.0
    command: "StartBroker -zkAddress manual-zookeeper:2181"
    restart: unless-stopped
    container_name: "manual-pinot-broker"
    volumes:
      - ./config:/config
      - ./data:/data
    ports:
      - "8099:8099"
    depends_on:
      - pinot-controller
  pinot-server:
    image: apachepinot/pinot:0.9.0
    command: "StartServer -zkAddress manual-zookeeper:2181"
    restart: unless-stopped
    container_name: "manual-pinot-server"
    volumes:
      - ./config:/config
      - ./data:/data
    depends_on:
      - pinot-broker

We can launch the container by running the following command:

docker-compose up

Data

Let’s say we have the following CSV file that contains two columns, one that contains spaces and one that doesn’t:

Table 1. data/import.csv
ID Case Number

10224738

HY411648

10224739

HY411615

11646166

JC213529

10224740

HY411595

This CSV contains a subset of the crimes data from the Chicago Data Portal.

Create Table

We’re going to create a crimes Pinot table and associated schema based on this CSV file:

config/schema.json
{
    "schemaName": "crimes",
    "dimensionFieldSpecs": [
      {
        "name": "ID",
        "dataType": "INT"
      },
      {
        "name": "Case Number",
        "dataType": "STRING"
      }
    ]
}
config/table.json
{
    "tableName": "crimes",
    "tableType": "OFFLINE",
    "segmentsConfig": {
      "replication": 1
    },
    "tenants": {
      "broker":"DefaultTenant",
      "server":"DefaultTenant"
    },
    "tableIndexConfig": {
      "loadMode": "MMAP"
    },
    "ingestionConfig": {
      "batchIngestionConfig": {
        "segmentIngestionType": "APPEND",
        "segmentIngestionFrequency": "DAILY"
      }
    },
    "metadata": {}
  }

Let’s try to create a table based on this config:

docker exec -it manual-pinot-controller bin/pinot-admin.sh AddTable   -tableConfigFile /config/table.json   -schemaFile /config/schema.json -exec
2021/11/25 11:57:27.088 ERROR [AddTableCommand] [main] Got Exception to upload Pinot Schema: crimes
org.apache.pinot.common.exception.HttpErrorStatusException: Got error status code: 400 (Bad Request) with reason: "Cannot add invalid schema: crimes. Reason: The column name "Case Number" should not contain blank space." while sending request: http://192.168.144.3:9000/schemas to controller: d15b07933b22, version: Unknown
	at org.apache.pinot.common.utils.FileUploadDownloadClient.sendRequest(FileUploadDownloadClient.java:510) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	at org.apache.pinot.common.utils.FileUploadDownloadClient.addSchema(FileUploadDownloadClient.java:616) ~[pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	at org.apache.pinot.tools.admin.command.AddTableCommand.uploadSchema(AddTableCommand.java:166) [pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	at org.apache.pinot.tools.admin.command.AddTableCommand.execute(AddTableCommand.java:203) [pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	at org.apache.pinot.tools.Command.call(Command.java:33) [pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	at org.apache.pinot.tools.Command.call(Command.java:29) [pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	at picocli.CommandLine.executeUserObject(CommandLine.java:1953) [pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	at picocli.CommandLine.access$1300(CommandLine.java:145) [pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	at picocli.CommandLine$RunLast.executeUserObjectOfLastSubcommandWithSameParent(CommandLine.java:2352) [pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	at picocli.CommandLine$RunLast.handle(CommandLine.java:2346) [pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	at picocli.CommandLine$RunLast.handle(CommandLine.java:2311) [pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	at picocli.CommandLine$AbstractParseResultHandler.execute(CommandLine.java:2179) [pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	at picocli.CommandLine.execute(CommandLine.java:2078) [pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	at org.apache.pinot.tools.admin.PinotAdministrator.execute(PinotAdministrator.java:161) [pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]
	at org.apache.pinot.tools.admin.PinotAdministrator.main(PinotAdministrator.java:192) [pinot-all-0.9.0-jar-with-dependencies.jar:0.9.0-cf8b84e8b0d6ab62374048de586ce7da21132906]

Pinot schemas don’t allow column names that contain spaces, so we’ll have to get rid of the space in Case Number. We can update the schema to look like this:

config/schema.json
{
    "schemaName": "crimes",
    "dimensionFieldSpecs": [
      {
        "name": "ID",
        "dataType": "INT"
      },
      {
        "name": "CaseNumber",
        "dataType": "STRING"
      }
    ]
}

If we re-rerun the AddTable command, we’ll see the following output:

2021/11/25 12:02:04.606 INFO [AddTableCommand] [main] Executing command: AddTable -tableConfigFile /config/table.json -schemaFile /config/schema.json -controllerProtocol http -controllerHost 192.168.144.3 -controllerPort 9000 -user null -password [hidden] -exec
2021/11/25 12:02:05.084 INFO [AddTableCommand] [main] {"status":"Table crimes_OFFLINE succesfully added"}

Ingest CSV file

Now we’re going to imort the CSV file that we saw at the beginning of the post. To do this, we’ll create the following data ingestion job spec:

config/job-spec.yml
executionFrameworkSpec:
  name: 'standalone'
  segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
  segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
  segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
jobType: SegmentCreationAndTarPush
inputDirURI: '/data'
includeFileNamePattern: 'glob:**/import.csv'
outputDirURI: '/opt/pinot/data/crimes/segments/'
overwriteOutput: true
pinotFSSpecs:
  - scheme: file
    className: org.apache.pinot.spi.filesystem.LocalPinotFS
recordReaderSpec:
  dataFormat: 'csv'
  className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
  configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
tableSpec:
  tableName: 'crimes'
pinotClusterSpecs:
  - controllerURI: 'http://localhost:9000'

We can import the CSV file by running the following command:

docker exec \
  -it manual-pinot-controller bin/pinot-admin.sh LaunchDataIngestionJob \
  -jobSpecFile /config/job-spec.yml

Once we’ve run this we can navigate to the Query Console at http://localhost:9000/#/query and run the following query:

select * from crimes limit 10
Table 2. Result
CaseNumber ID

null

10224738

null

10224739

null

11646166

null

10224740

Hmmm, the CaseNumber is always null, which isn’t what we want. To deal with this problem we’ll need to add an ingestion transformation config to our table config.

Update table config and reingest

Let’s update our table config to add the transform config:

config/table.json
{
    "tableName": "crimes",
    "tableType": "OFFLINE",
    "segmentsConfig": {
      "replication": 1
    },
    "tenants": {
      "broker":"DefaultTenant",
      "server":"DefaultTenant"
    },
    "tableIndexConfig": {
      "loadMode": "MMAP"
    },
    "ingestionConfig": {
      "batchIngestionConfig": {
        "segmentIngestionType": "APPEND",
        "segmentIngestionFrequency": "DAILY"
      },
      "transformConfigs": [
        {"columnName": "CaseNumber", "transformFunction": "\"Case Number\"" }
      ]
    },
    "metadata": {}
  }

The columnName refers to the column name in the schema and the tranformFunction describes a function for processing a value from the source data. In this case we’re specifying the name of the property/column name from our CSV file and it will extract the values from that column.

Before we update the table config, let’s first delete the segment that we ingested in the previous section:

Drop crimes table segments
curl -X DELETE "http://localhost:9000/segments/crimes?type=OFFLINE" -H "accept: application/json"

Now we can update the table config:

Update table config
curl 'http://localhost:9000/tables/crimes_OFFLINE' \
 -X 'PUT' \
 -H 'Content-Type: application/json' \
 --data-binary "@config/table.json"

And finally, we can run the data ingestion job again:

docker exec \
  -it manual-pinot-controller bin/pinot-admin.sh LaunchDataIngestionJob \
  -jobSpecFile /config/job-spec.yml

Now if we run a query against this table we’ll see populated values for the CaseNumber:

select * from crimes limit 10
Table 3. Result
CaseNumber ID

HY411648

10224738

HY411615

10224739

JC213529

11646166

HY411595

10224740

  • LinkedIn
  • Tumblr
  • Reddit
  • Google+
  • Pinterest
  • Pocket