· pinot

Apache Pinot: Resetting a segment after an invalid JSON Transformation

I recently had a typo in a Pinot ingestion transformation function and wanted to have Pinot re-process the Kafka stream without having to restart all the things. In this blog post we’ll learn how to do that.

reset banner
Figure 1. Apache Pinot: Resetting a segment after an invalid JSON Transformation

Setup

We’re going to spin up a local instance of Pinot and Kafka using the following Docker compose config:

docker-compose.yml
version: '3.7'
services:
  zookeeper:
    image: zookeeper:3.5.6
    hostname: zookeeper
    container_name: zookeeper-json
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
  kafka:
    image: wurstmeister/kafka:latest
    restart: unless-stopped
    container_name: "kafka-json"
    ports:
      - "9092:9092"
    expose:
      - "9093"
    depends_on:
      - zookeeper
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-json:2181/kafka
      KAFKA_BROKER_ID: 0
      KAFKA_ADVERTISED_HOST_NAME: kafka-json
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-json:9093,OUTSIDE://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
  pinot-controller:
    image: apachepinot/pinot:0.9.3
    command: "StartController -zkAddress zookeeper-json:2181"
    container_name: "pinot-controller-json"
    volumes:
      - ./config:/config
    restart: unless-stopped
    ports:
      - "9000:9000"
    depends_on:
      - zookeeper
  pinot-broker:
    image: apachepinot/pinot:0.9.3
    command: "StartBroker -zkAddress zookeeper-json:2181"
    restart: unless-stopped
    container_name: "pinot-broker-json"
    ports:
      - "8099:8099"
    depends_on:
      - pinot-controller
  pinot-server:
    image: apachepinot/pinot:0.9.3
    command: "StartServer -zkAddress zookeeper-json:2181"
    restart: unless-stopped
    container_name: "pinot-server-json"
    depends_on:
      - pinot-broker

We can launch all the components by running the following command:

docker-compose up

Create Schema

We’re going to use the following schema:

/config/schema.json
{
    "schemaName": "events",
    "dimensionFieldSpecs": [
      {
        "name": "age",
        "dataType": "LONG"
      }
    ],
    "dateTimeFieldSpecs": [{
      "name": "timestamp",
      "dataType": "TIMESTAMP",
      "format" : "1:MILLISECONDS:EPOCH",
      "granularity": "1:MILLISECONDS"
    }]
  }

It’s only small, but it will be enough for our purposes. We can create the schema by running the following command:

docker exec -it pinot-controller-json bin/pinot-admin.sh AddSchema \
  -schemaFile /config/schema.json -exec

Create Table

Now let’s create a real-time table based on that schema:

/config/table.json
{
  "tableName": "events",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "timeColumnName": "timestamp",
    "schemaName": "events",
    "replication": "1",
    "replicasPerPartition": "1"
  },
  "ingestionConfig": {
    "batchIngestionConfig": {
      "segmentIngestionType": "APPEND",
      "segmentIngestionFrequency": "DAILY"
    },
    "transformConfigs": [
      {"columnName": "age", "transformFunction": "JSONPATHLONG(payload, '$.ages')" } (1)
    ]
  },
  "tableIndexConfig": {
      "loadMode": "MMAP",
      "streamConfigs": {
        "streamType": "kafka",
        "stream.kafka.topic.name": "events",
        "stream.kafka.broker.list": "kafka-json:9093",
        "stream.kafka.consumer.type": "lowlevel",
        "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
        "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
        "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
      }
  },
  "tenants": {},
  "metadata": {}
}
1 Typo in the JSON path ('$.ages') means that an exception will be thrown when the function is executed.
== For documentation on the JSONPATHLONG function, see the JSONPATHLONG function page. ==

We can create the table by running the following command:

docker exec -it pinot-controller-json bin/pinot-admin.sh AddTable   \
  -tableConfigFile /config/table.json   \
  -exec

Ingest Data into Kafka

Now let’s ingest a few messages into the Kafka events topic:

printf '{"timestamp": "2019-10-09 22:25:25", "payload": {"age": 18}}
{"timestamp": "2019-10-09 23:25:25", "payload": {"age": 14}}
{"timestamp": "2019-10-09 23:40:25", "payload": {"age": 16}}\n' |
docker exec -i kafka-json /opt/kafka/bin/kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic events

We can check that the messages have been ingested by running the following command:

docker exec -i kafka-json /opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic events \
  --from-beginning \
  --max-messages 3
Output
{"timestamp": "2019-10-09 22:25:25", "payload": {"age": 18}}
{"timestamp": "2019-10-09 23:25:25", "payload": {"age": 14}}
{"timestamp": "2019-10-09 23:40:25", "payload": {"age": 16}}
Processed a total of 3 messages

All good so far. Let’s navigate to http://localhost:9000/#/query and query the events table:

events empty
Figure 2. No documents

Hmmm, no documents.

The Debug API

We can find out what’s going on by querying the debug API for this table:

curl -X GET "http://localhost:9000/debug/tables/events?verbosity=0" -H "accept: application/json"
Output
[
  {
    "tableName": "events_REALTIME",
    "numSegments": 1,
    "numServers": 1,
    "numBrokers": 1,
    "segmentDebugInfos": [
      {
        "segmentName": "events__0__0__20220131T1057Z",
        "serverState": {
          "Server_172.24.0.6_8098": {
            "idealState": "CONSUMING",
            "externalView": "CONSUMING",
            "segmentSize": "0 bytes",
            "consumerInfo": {
              "segmentName": "events__0__0__20220131T1057Z",
              "consumerState": "CONSUMING",
              "lastConsumedTimestamp": 1643626843673,
              "partitionToOffsetMap": {
                "0": "3"
              }
            },
            "errorInfo": {
              "timestamp": "2022-01-31 10:57:50 UTC",
              "errorMessage": "Caught exception while transforming the record: {\n  \"nullValueFields\" : [ ],\n  \"fieldToValueMap\" : {\n    \"payload\" : {\n      \"age\" : 16\n    },\n    \"age\" : null,\n    \"timestamp\" : \"2019-10-09 23:40:25\"\n  }\n}",
              "stackTrace": "java.lang.RuntimeException: Caught exception while executing function: jsonPathLong(payload,'$.ages')\n\tat org.apache.pinot.segment.local.function.InbuiltFunctionEvaluator$FunctionExecutionNode.execute(InbuiltFunctionEvaluator.java:124)\n\tat org.apache.pinot.segment.local.function.InbuiltFunctionEvaluator.evaluate(InbuiltFunctionEvaluator.java:88)\n\tat org.apache.pinot.segment.local.recordtransformer.ExpressionTransformer.transform(ExpressionTransformer.java:96)\n\tat org.apache.pinot.segment.local.recordtransformer.CompositeTransformer.transform(CompositeTransformer.java:83)\n\tat org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.processStreamEvents(LLRealtimeSegmentDataManager.java:518)\n\tat org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.consumeLoop(LLRealtimeSegmentDataManager.java:420)\n\tat org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager$PartitionConsumer.run(LLRealtimeSegmentDataManager.java:568)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: java.lang.IllegalStateException: Caught exception while invoking method: public static long org.apache.pinot.common.function.scalar.JsonFunctions.jsonPathLong(java.lang.Object,java.lang.String) with arguments: [{age=16}, $.ages]\n\tat org.apache.pinot.common.function.FunctionInvoker.invoke(FunctionInvoker.java:131)\n\tat org.apache.pinot.segment.local.function.InbuiltFunctionEvaluator$FunctionExecutionNode.execute(InbuiltFunctionEvaluator.java:122)\n\t... 7 more\nCaused by: java.lang.reflect.InvocationTargetException\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.base/java.lang.reflect.Method.invoke(Method.java:566)\n\tat org.apache.pinot.common.function.FunctionInvoker.invoke(FunctionInvoker.java:128)\n\t... 8 more\nCaused by: com.jayway.jsonpath.PathNotFoundException: No results for path: $['ages']\n\tat com.jayway.jsonpath.internal.path.EvaluationContextImpl.getValue(EvaluationContextImpl.java:133)\n\tat com.jayway.jsonpath.JsonPath.read(JsonPath.java:187)\n\tat com.jayway.jsonpath.internal.JsonContext.read(JsonContext.java:102)\n\tat com.jayway.jsonpath.internal.JsonContext.read(JsonContext.java:85)\n\tat org.apache.pinot.common.function.scalar.JsonFunctions.jsonPath(JsonFunctions.java:89)\n\tat org.apache.pinot.common.function.scalar.JsonFunctions.jsonPathLong(JsonFunctions.java:152)\n\t... 13 more\n"
            }
          }
        }
      }
    ],
    "serverDebugInfos": [],
    "brokerDebugInfos": [],
    "tableSize": {
      "reportedSize": "0 bytes",
      "estimatedSize": "0 bytes"
    },
    "ingestionStatus": {
      "ingestionState": "HEALTHY",
      "errorMessage": ""
    }
  }
]

As expected, Pinot failed to find the ages property because it doesn’t exist in those messages. Let’s fix the transformation by applying the following table config:

docker exec -it pinot-controller-json bin/pinot-admin.sh AddTable   \
  -tableConfigFile /config/table-fixed.json   \
  -exec
/config/table-fixed.json
{
  "tableName": "events",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "timeColumnName": "timestamp",
    "schemaName": "events",
    "replication": "1",
    "replicasPerPartition": "1"
  },
  "ingestionConfig": {
    "batchIngestionConfig": {
      "segmentIngestionType": "APPEND",
      "segmentIngestionFrequency": "DAILY"
    },
    "transformConfigs": [
      {"columnName": "age", "transformFunction": "JSONPATHLONG(payload, '$.age')" } (1)
    ]
  },
  "tableIndexConfig": {
      "loadMode": "MMAP",
      "streamConfigs": {
        "streamType": "kafka",
        "stream.kafka.topic.name": "events",
        "stream.kafka.broker.list": "kafka-json:9093",
        "stream.kafka.consumer.type": "lowlevel",
        "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
        "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
        "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder"
      }
  },
  "tenants": {},
  "metadata": {}
}
1 Typo has now been fixed

This fixes the table config, but doesn’t retrospectively ingest the messages where the exception was thrown. We can return the Kafka offset of the consuming segment by running the following:

curl -X GET "http://localhost:9000/tables/events/consumingSegmentsInfo" -H "accept: application/json"
Output
{
  "_segmentToConsumingInfoMap": {
    "events__0__0__20220131T1057Z": [
      {
        "serverName": "Server_172.24.0.6_8098",
        "consumerState": "CONSUMING",
        "lastConsumedTimestamp": 1643627394569,
        "partitionToOffsetMap": {
          "0": "3"
        }
      }
    ]
  }
}

The offset for partition 0 is 3, but we want to process offsets 0-2.

Resetting the consuming segment

To do that we’ll need to reset the consuming segment, by running the following command:

curl -X POST "http://localhost:9000/segments/events_REALTIME/events__0__0__20220131T1057Z/reset" -H "accept: application/json"
Output
{"status":"Successfully reset segment: events__0__0__20220131T1057Z of table: events_REALTIME"}

If we now go back to the query editor we’ll see that those documents have now been ingested:

events full
Figure 3. Documents!
  • LinkedIn
  • Tumblr
  • Reddit
  • Google+
  • Pinterest
  • Pocket