· pinot

Apache Pinot: Failed to generate segment - Input path {} does not exist

In this blog post we’re going to learn how to work around a bug when trying to ingest CSV files with the same name into Apache Pinot. I came across this issue while writing a recipe showing how to import data files from different directories.

no import banner
Figure 1. Apache Pinot: Failed to generate segment - Input path {} does not exist

Setup

We’re going to spin up a local instance of Pinot and Kafka using the following Docker compose config:

docker-compose.yml
version: '3.7'
services:
  zookeeper:
    image: zookeeper:3.5.6
    hostname: zookeeper
    container_name: zookeeper-csv
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
  pinot-controller:
    image: apachepinot/pinot:0.9.3
    command: "StartController -zkAddress zookeeper-csv:2181 -dataDir /data"
    container_name: "pinot-controller-csv"
    volumes:
      - ./config:/config
      - ./data:/data
      - ./input:/input
    restart: unless-stopped
    ports:
      - "9000:9000"
    depends_on:
      - zookeeper
  pinot-broker:
    image: apachepinot/pinot:0.9.3
    command: "StartBroker -zkAddress zookeeper-csv:2181"
    restart: unless-stopped
    container_name: "pinot-broker-csv"
    ports:
      - "8099:8099"
    depends_on:
      - pinot-controller
  pinot-server:
    image: apachepinot/pinot:0.9.3
    command: "StartServer -zkAddress zookeeper-csv:2181"
    restart: unless-stopped
    container_name: "pinot-server-csv"
    depends_on:
      - pinot-broker

We can launch all the components by running the following command:

docker-compose up

Create Schema

We’re going to use the following schema:

/config/schema.json
{
  "schemaName": "movies",
  "dimensionFieldSpecs": [{
      "name": "id",
      "dataType": "LONG"
    },
    {
      "name": "title",
      "dataType": "STRING"
    },
    {
      "name": "genre",
      "dataType": "STRING"
    },
    {
      "name": "year",
      "dataType": "INT"
    }
  ]
}

We can create the schema by running the following command:

docker exec -it pinot-controller-csv bin/pinot-admin.sh AddSchema \
  -schemaFile /config/schema.json -exec

Create Table

Now let’s create an offline table based on that schema:

/config/table.json
{
  "tableName": "movies",
  "tableType": "OFFLINE",
  "segmentsConfig": {
    "replication": 1
  },
  "tenants": {
    "broker": "DefaultTenant",
    "server": "DefaultTenant"
  },
  "tableIndexConfig": {
    "loadMode": "MMAP"
  },
  "ingestionConfig": {
    "batchIngestionConfig": {
      "segmentIngestionType": "APPEND",
      "segmentIngestionFrequency": "DAILY"
    },
    "transformConfigs": []
  },
  "metadata": {}
}

We can create the table by running the following command:

docker exec -it pinot-controller-csv bin/pinot-admin.sh AddTable   \
  -tableConfigFile /config/table.json   \
  -exec

Ingest CSV files

Now we’re going to ingest some CSV files into this table. The files are in the input directory:

tree input/
Output
input/
├── 2000_2009
│   └── movies.csv
├── 2010_2019
│   └── movies.csv
└── 2020_present
    └── movies.csv

3 directories, 3 files

We have three files in different directories, all with the same name. Let’s now try to import them using the following ingestion spec:

/config/job-spec.yml
executionFrameworkSpec:
  name: 'standalone'
  segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
  segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
segmentNameGeneratorSpec:
  configs:
    use.global.directory.sequence.id: true
jobType: SegmentCreationAndTarPush
inputDirURI: '/input'
includeFileNamePattern: 'glob:**/*.csv'
outputDirURI: '/data'
pinotFSSpecs:
  - scheme: file
    className: org.apache.pinot.spi.filesystem.LocalPinotFS
recordReaderSpec:
  dataFormat: 'csv'
  className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
  configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
tableSpec:
  tableName: 'movies'
pinotClusterSpecs:
  - controllerURI: 'http://localhost:9000'

Run the following command:

docker exec -it pinot-controller-csv bin/pinot-admin.sh LaunchDataIngestionJob \
  -jobSpecFile /config/job-spec.yml

We’ll see the following output:

Output
java.lang.IllegalStateException: Input path {} does not exist. [/tmp/pinot-957bc026-8f77-4458-85df-d50d0559c159/input/movies.csv]
	at shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:518) ~[pinot-all-0.9.3-jar-with-dependencies.jar:0.9.3-e23f213cf0d16b1e9e086174d734a4db868542cb]
	at org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig.setInputFilePath(SegmentGeneratorConfig.java:416) ~[pinot-all-0.9.3-jar-with-dependencies.jar:0.9.3-e23f213cf0d16b1e9e086174d734a4db868542cb]
	at org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner.run(SegmentGenerationTaskRunner.java:112) ~[pinot-all-0.9.3-jar-with-dependencies.jar:0.9.3-e23f213cf0d16b1e9e086174d734a4db868542cb]
	at org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner.lambda$submitSegmentGenTask$1(SegmentGenerationJobRunner.java:263) ~[pinot-batch-ingestion-standalone-0.9.3-shaded.jar:0.9.3-e23f213cf0d16b1e9e086174d734a4db868542cb]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
	at java.lang.Thread.run(Thread.java:829) [?:?]
2022/03/10 14:15:59.187 ERROR [SegmentGenerationJobRunner] [pool-2-thread-1] Failed to generate Pinot segment for file - file:/input/2010_2019/movies.csv
java.lang.IllegalStateException: Input path {} does not exist. [/tmp/pinot-957bc026-8f77-4458-85df-d50d0559c159/input/movies.csv]
	at shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:518) ~[pinot-all-0.9.3-jar-with-dependencies.jar:0.9.3-e23f213cf0d16b1e9e086174d734a4db868542cb]
	at org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig.setInputFilePath(SegmentGeneratorConfig.java:416) ~[pinot-all-0.9.3-jar-with-dependencies.jar:0.9.3-e23f213cf0d16b1e9e086174d734a4db868542cb]
	at org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner.run(SegmentGenerationTaskRunner.java:112) ~[pinot-all-0.9.3-jar-with-dependencies.jar:0.9.3-e23f213cf0d16b1e9e086174d734a4db868542cb]
	at org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner.lambda$submitSegmentGenTask$1(SegmentGenerationJobRunner.java:263) ~[pinot-batch-ingestion-standalone-0.9.3-shaded.jar:0.9.3-e23f213cf0d16b1e9e086174d734a4db868542cb]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
	at java.lang.Thread.run(Thread.java:829) [?:?]

The reason we get this error is that Pinot unpacks all the input files into the same directory. It then creates a job per input file that runs on another thread(s). Once the thread has finished processing a file it deletes it, which is what leads to this error message.

Even if we didn’t run into this error we wouldn’t end up with all our CSV files ingested since the contents of the input file is overwritten by the next one.

There is a PR pending to fix this bug, but in the mean time we need to make sure that our input files have unique names. In this case we could use the following names:

Output
input/
├── 2000_2009
│   └── 2000_2009.csv
├── 2010_2019
│   └── 2010_2019.csv
└── 2020_present
    └── 2020_present.csv
  • LinkedIn
  • Tumblr
  • Reddit
  • Google+
  • Pinterest
  • Pocket