Mark Needham

Thoughts on Software Development

Spark: pyspark/Hadoop – py4j.protocol.Py4JJavaError: An error occurred while calling o23.load.: org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot communicate with client version 4

without comments

I’ve been playing around with pyspark – Spark’s Python library – and I wanted to execute the following job which takes a file from my local HDFS and then counts how many times each FBI code appears using Spark SQL:

from pyspark import SparkContext
from pyspark.sql import SQLContext
 
sc = SparkContext("local", "Simple App")
sqlContext = SQLContext(sc)
 
file = "hdfs://localhost:9000/user/markneedham/Crimes_-_2001_to_present.csv"
 
sqlContext.load(source="com.databricks.spark.csv", header="true", path = file).registerTempTable("crimes")
rows = sqlContext.sql("select `FBI Code` AS fbiCode, COUNT(*) AS times FROM crimes GROUP BY `FBI Code` ORDER BY times DESC").collect()
 
for row in rows:
    print("{0} -> {1}".format(row.fbiCode, row.times))

I submitted the job and waited:

$ ./spark-1.3.0-bin-hadoop1/bin/spark-submit --driver-memory 5g --packages com.databricks:spark-csv_2.10:1.1.0 fbi_spark.py
...
Traceback (most recent call last):
  File "/Users/markneedham/projects/neo4j-spark-chicago/fbi_spark.py", line 11, in <module>
    sqlContext.load(source="com.databricks.spark.csv", header="true", path = file).registerTempTable("crimes")
  File "/Users/markneedham/projects/neo4j-spark-chicago/spark-1.3.0-bin-hadoop1/python/pyspark/sql/context.py", line 482, in load
    df = self._ssql_ctx.load(source, joptions)
  File "/Users/markneedham/projects/neo4j-spark-chicago/spark-1.3.0-bin-hadoop1/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
  File "/Users/markneedham/projects/neo4j-spark-chicago/spark-1.3.0-bin-hadoop1/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o23.load.
: org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot communicate with client version 4
	at org.apache.hadoop.ipc.Client.call(Client.java:1070)
	at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
	at com.sun.proxy.$Proxy7.getProtocolVersion(Unknown Source)
	at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:396)
	at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379)
	at org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:119)
	at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:238)
	at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:203)
	at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:89)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:176)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
	at org.apache.spark.rdd.RDD.take(RDD.scala:1156)
	at org.apache.spark.rdd.RDD.first(RDD.scala:1189)
	at com.databricks.spark.csv.CsvRelation.firstLine$lzycompute(CsvRelation.scala:129)
	at com.databricks.spark.csv.CsvRelation.firstLine(CsvRelation.scala:127)
	at com.databricks.spark.csv.CsvRelation.inferSchema(CsvRelation.scala:109)
	at com.databricks.spark.csv.CsvRelation.<init>(CsvRelation.scala:62)
	at com.databricks.spark.csv.DefaultSource.createRelation(DefaultSource.scala:115)
	at com.databricks.spark.csv.DefaultSource.createRelation(DefaultSource.scala:40)
	at com.databricks.spark.csv.DefaultSource.createRelation(DefaultSource.scala:28)
	at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:290)
	at org.apache.spark.sql.SQLContext.load(SQLContext.scala:679)
	at org.apache.spark.sql.SQLContext.load(SQLContext.scala:667)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:207)
	at java.lang.Thread.run(Thread.java:745)

It looks like my Hadoop Client and Server are using different versions which in fact they are! We can see from the name of the spark folder that I’m using Hadoop 1.x there and if we check the local Hadoop version we’ll notice it’s using the 2.x seris:

$ hadoop version
Hadoop 2.6.0

In this case the easiest fix is use a version of Spark that’s compiled against Hadoop 2.6 which as of now means Spark 1.4.1.

Let’s try and run our job again:

$ ./spark-1.4.1-bin-hadoop2.6/bin/spark-submit --driver-memory 5g --packages com.databricks:spark-csv_2.10:1.1.0 fbi_spark.py
 
06 -> 859197
08B -> 653575
14 -> 488212
18 -> 457782
26 -> 431316
05 -> 257310
07 -> 197404
08A -> 188964
03 -> 157706
11 -> 112675
04B -> 103961
04A -> 60344
16 -> 47279
15 -> 40361
24 -> 31809
10 -> 22467
17 -> 17555
02 -> 17008
20 -> 15190
19 -> 10878
22 -> 8847
09 -> 6358
01A -> 4830
13 -> 1561
12 -> 835
01B -> 16

And it’s working!

Written by Mark Needham

August 4th, 2015 at 6:35 am

Posted in Spark

Tagged with

Spark: Processing CSV files using Databricks Spark CSV Library

without comments

Last year I wrote about exploring the Chicago crime data set using Spark and the OpenCSV parser and while this worked well, a few months ago I noticed that there’s now a spark-csv library which I should probably use instead.

I thought it’d be a fun exercise to translate my code to use it.

So to recap our goal: we want to count how many times each type of crime has been committed. I have a more up to date version of the crimes file now so the numbers won’t be exactly the same.

First let’s launch the spark-shell and register our CSV file as a temporary table so we can query it as if it was a SQL table:

$ ./spark-1.3.0-bin-hadoop1/bin/spark-shell
 
scala> import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext
 
scala> val crimeFile = "/Users/markneedham/Downloads/Crimes_-_2001_to_present.csv"
crimeFile: String = /Users/markneedham/Downloads/Crimes_-_2001_to_present.csv
 
scala> val sqlContext = new SQLContext(sc)
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@9746157
 
scala> sqlContext.load("com.databricks.spark.csv", Map("path" -> crimeFile, "header" -> "true")).registerTempTable("crimes")
java.lang.RuntimeException: Failed to load class for data source: com.databricks.spark.csv
	at scala.sys.package$.error(package.scala:27)
	at org.apache.spark.sql.sources.ResolvedDataSource$.lookupDataSource(ddl.scala:268)
	at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:279)
	at org.apache.spark.sql.SQLContext.load(SQLContext.scala:679)
        at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
	at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
	at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
	at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
	at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
	at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856)
	at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901)
	at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813)
	at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656)
	at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664)
	at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669)
	at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996)
	at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
	at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
	at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
	at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944)
	at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058)
	at org.apache.spark.repl.Main$.main(Main.scala:31)
	at org.apache.spark.repl.Main.main(Main.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

I’ve actually forgotten to tell spark-shell about the CSV package so let’s restart the shell and pass it as an argument:

$ ./spark-1.3.0-bin-hadoop1/bin/spark-shell --packages com.databricks:spark-csv_2.10:1.1.0
 
scala> import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext
 
scala> val crimeFile = "/Users/markneedham/Downloads/Crimes_-_2001_to_present.csv"
crimeFile: String = /Users/markneedham/Downloads/Crimes_-_2001_to_present.csv
 
scala> val sqlContext = new SQLContext(sc)
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@44587c44
 
scala> sqlContext.load("com.databricks.spark.csv", Map("path" -> crimeFile, "header" -> "true")).registerTempTable("crimes")
...
15/08/02 18:57:46 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
15/08/02 18:57:46 INFO DAGScheduler: Stage 0 (first at CsvRelation.scala:129) finished in 0.207 s
15/08/02 18:57:46 INFO DAGScheduler: Job 0 finished: first at CsvRelation.scala:129, took 0.267327 s

Now we can write a simple SQL query on our ‘crimes’ table to find the most popular crime types:

scala>  sqlContext.sql(
        """
        select `Primary Type` as primaryType, COUNT(*) AS times
        from crimes
        group by `Primary Type`
        order by times DESC
        """).save("/tmp/agg.csv", "com.databricks.spark.csv")

That spits out a load of CSV ‘part files’ into /tmp/agg.csv so let’s bring in the merge function that we’ve used previously to combine these into one CSV file:

scala> import org.apache.hadoop.conf.Configuration
scala> import org.apache.hadoop.fs._
 
scala> def merge(srcPath: String, dstPath: String): Unit =  {
         val hadoopConfig = new Configuration()
         val hdfs = FileSystem.get(hadoopConfig)
         FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, null)
       }
 
scala> merge("/tmp/agg.csv", "agg.csv")

And finally let’s browse the contents of our new CSV file:

$ cat agg.csv
THEFT,1206745
BATTERY,1066110
CRIMINAL DAMAGE,672782
NARCOTICS,662257
OTHER OFFENSE,360824
ASSAULT,354583
BURGLARY,343443
MOTOR VEHICLE THEFT,278014
ROBBERY,218190
DECEPTIVE PRACTICE,197477
CRIMINAL TRESPASS,171363
PROSTITUTION,65660
WEAPONS VIOLATION,56218
PUBLIC PEACE VIOLATION,42446
OFFENSE INVOLVING CHILDREN,37010
CRIM SEXUAL ASSAULT,21346
SEX OFFENSE,21305
GAMBLING,13704
LIQUOR LAW VIOLATION,13264
INTERFERENCE WITH PUBLIC OFFICER,11366
ARSON,9642
HOMICIDE,7192
KIDNAPPING,6029
INTIMIDATION,3443
STALKING,2760
OBSCENITY,331
PUBLIC INDECENCY,123
OTHER NARCOTIC VIOLATION,106
CONCEALED CARRY LICENSE VIOLATION,34
NON-CRIMINAL,31
NON - CRIMINAL,25
RITUALISM,23
HUMAN TRAFFICKING,9
NON-CRIMINAL (SUBJECT SPECIFIED),3
DOMESTIC VIOLENCE,1

Great! We’ve got the same output with much less code which is always a #win.

Written by Mark Needham

August 2nd, 2015 at 6:08 pm

Posted in Spark

Tagged with

Neo4j: Cypher – Removing consecutive duplicates

without comments

When writing Cypher queries I sometimes find myself wanting to remove consecutive duplicates in collections that I’ve joined together.

e.g we might start with the following query where 1 and 7 appear consecutively:

RETURN [1,1,2,3,4,5,6,7,7,8] AS values
 
==> +-----------------------+
==> | values                |
==> +-----------------------+
==> | [1,1,2,3,4,5,6,7,7,8] |
==> +-----------------------+
==> 1 row

We want to end up with [1,2,3,4,5,6,7,8]. We can start by exploding our array and putting consecutive elements next to each other:

WITH [1,1,2,3,4,5,6,7,7,8] AS values
UNWIND RANGE(0, LENGTH(values) - 2) AS idx
RETURN idx, idx+1, values[idx], values[idx+1]
 
==> +-------------------------------------------+
==> | idx | idx+1 | values[idx] | values[idx+1] |
==> +-------------------------------------------+
==> | 0   | 1     | 1           | 1             |
==> | 1   | 2     | 1           | 2             |
==> | 2   | 3     | 2           | 3             |
==> | 3   | 4     | 3           | 4             |
==> | 4   | 5     | 4           | 5             |
==> | 5   | 6     | 5           | 6             |
==> | 6   | 7     | 6           | 7             |
==> | 7   | 8     | 7           | 7             |
==> | 8   | 9     | 7           | 8             |
==> +-------------------------------------------+
==> 9 rows

Next we can filter out rows which have the same values since that means they have consecutive duplicates:

WITH [1,1,2,3,4,5,6,7,7,8] AS values
UNWIND RANGE(0, LENGTH(values) - 2) AS idx
WITH values[idx] AS a, values[idx+1] AS b
WHERE a <> b
RETURN a,b
 
==> +-------+
==> | a | b |
==> +-------+
==> | 1 | 2 |
==> | 2 | 3 |
==> | 3 | 4 |
==> | 4 | 5 |
==> | 5 | 6 |
==> | 6 | 7 |
==> | 7 | 8 |
==> +-------+
==> 7 rows

Now we need to join the collection back together again. Most of the values we want are in field ‘b’ but we also need to grab the first value from field ‘a':

WITH [1,1,2,3,4,5,6,7,7,8] AS values
UNWIND RANGE(0, LENGTH(values) - 2) AS idx
WITH values[idx] AS a, values[idx+1] AS b
WHERE a <> b
RETURN COLLECT(a)[0] + COLLECT(b) AS noDuplicates
 
==> +-------------------+
==> | noDuplicates      |
==> +-------------------+
==> | [1,2,3,4,5,6,7,8] |
==> +-------------------+
==> 1 row

What about if we have more than 2 duplicates in a row?

WITH [1,1,1,2,3,4,5,5,6,7,7,8] AS values
UNWIND RANGE(0, LENGTH(values) - 2) AS idx
WITH values[idx] AS a, values[idx+1] AS b
WHERE a <> b
RETURN COLLECT(a)[0] + COLLECT(b) AS noDuplicates
 
==> +-------------------+
==> | noDuplicates      |
==> +-------------------+
==> | [1,2,3,4,5,6,7,8] |
==> +-------------------+
==> 1 row

Still happy, good times! Of course if we have a non consecutive duplicate that wouldn’t be removed:

WITH [1,1,1,2,3,4,5,5,6,7,7,8,1] AS values
UNWIND RANGE(0, LENGTH(values) - 2) AS idx
WITH values[idx] AS a, values[idx+1] AS b
WHERE a <> b
RETURN COLLECT(a)[0] + COLLECT(b) AS noDuplicates
 
==> +---------------------+
==> | noDuplicates        |
==> +---------------------+
==> | [1,2,3,4,5,6,7,8,1] |
==> +---------------------+
==> 1 row

Written by Mark Needham

July 30th, 2015 at 6:23 am

Posted in neo4j

Tagged with ,

Neo4j: MERGE’ing on super nodes

with one comment

In my continued playing with the Chicago crime data set I wanted to connect the crimes committed to their position in the FBI crime type hierarchy.

These are the sub graphs that I want to connect:

2015 07 26 22 19 04

We have a ‘fbiCode’ on each ‘Crime’ node which indicates which ‘Crime Sub Category’ the crime belongs to.

I started with the following query to connect the nodes together:

MATCH (crime:Crime)
WITH crime SKIP {skip} LIMIT 10000
 
MATCH (subCat:SubCategory {code: crime.fbiCode})
MERGE (crime)-[:CATEGORY]->(subCat)
RETURN COUNT(*) AS crimesProcessed

I had this running inside a Python script which incremented ‘skip’ by 10,000 on each iteration as long as ‘crimesProcessed’ came back with a value > 0.

To start with the ‘CATEGORY’ relationships were being created very quickly but it slowed down quite noticeably about 1 million nodes in.

I profiled the queries but the query plans didn’t show anything obviously wrong. My suspicion was that I had a super node problem where the cypher run time was iterating through all of the sub category’s relationships to check whether one of them pointed to the crime on the other side of the ‘MERGE’ statement.

I cancelled the import job and wrote a query to check how many relationships each sub category had. It varied from 1,000 to 93,000 somewhat confirming my suspicion.

Michael suggested tweaking the query to use the shortestpath function to check for the existence of the relationship and then use the ‘CREATE’ clause to create it if it didn’t exist.

The neat thing about the shortestpath function is that it will start from the side with the lowest cardinality and as soon as it finds a relationship it will stop searching. Let’s have a look at that version of the query:

MATCH (crime:Crime)
WITH crime SKIP {skip} LIMIT 10000
MATCH (subCat:SubCategory {code: crime.fbiCode})
WITH crime, subCat, shortestPath((crime)-[:CATEGORY]->(subCat)) AS path
FOREACH(ignoreMe IN CASE WHEN path is NULL THEN [1] ELSE [] END |
  CREATE (crime)-[:CATEGORY]->(subCat))
RETURN COUNT(*)

This worked much better – 10,000 nodes processed in ~ 2.5 seconds – and the time remained constant as more relationships were added. This allowed me to create all the category nodes but we can actually do even better if we use CREATE UNIQUE instead of MERGE

MATCH (crime:Crime)
WITH crime SKIP {skip} LIMIT 10000
 
MATCH (subCat:SubCategory {code: crime.fbiCode})
CREATE UNIQUE (crime)-[:CATEGORY]->(subCat)
RETURN COUNT(*) AS crimesProcessed

Using this query 10,000 nodes took ~ 250ms -900ms second to process which means we can process all the nodes in 5-6 minutes – good times!

I’m not super familiar with the ‘CREATE UNIQUE’ code so I’m not sure that it’s always a good substitute for ‘MERGE’ but on this occasion it does the job.

The lesson for me here is that if a query is taking longer than you think it should try and use other constructs / a combination of other constructs and see whether things improve – they just might!

Written by Mark Needham

July 28th, 2015 at 9:04 pm

Posted in neo4j

Tagged with

Python: Difference between two datetimes in milliseconds

without comments

I’ve been doing a bit of adhoc measurement of some cypher queries executed via py2neo and wanted to work out how many milliseconds each query was taking end to end.

I thought there’d be an obvious way of doing this but if there is it’s evaded me so far and I ended up calculating the different between two datetime objects which gave me the following timedelta object:

>>> import datetime
>>> start = datetime.datetime.now()
>>> end = datetime.datetime.now()
 
>>> end - start
datetime.timedelta(0, 3, 519319)

The 3 parts of this object are ‘days’, ‘seconds’ and ‘microseconds’ which I found quite strange!

These are the methods/attributes we have available to us:

>>> dir(end - start)
['__abs__', '__add__', '__class__', '__delattr__', '__div__', '__doc__', '__eq__', '__floordiv__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__le__', '__lt__', '__mul__', '__ne__', '__neg__', '__new__', '__nonzero__', '__pos__', '__radd__', '__rdiv__', '__reduce__', '__reduce_ex__', '__repr__', '__rfloordiv__', '__rmul__', '__rsub__', '__setattr__', '__sizeof__', '__str__', '__sub__', '__subclasshook__', 'days', 'max', 'microseconds', 'min', 'resolution', 'seconds', 'total_seconds']

There’s no ‘milliseconds’ on there so we’ll have to calculate it from what we do have:

>>> diff = end - start
>>> elapsed_ms = (diff.days * 86400000) + (diff.seconds * 1000) + (diff.microseconds / 1000)
 
>>> elapsed_ms
3519

Or we could do the following slightly simpler calculation:

>>> diff.total_seconds() * 1000
3519.319

And now back to the query profiling!

Written by Mark Needham

July 28th, 2015 at 8:05 pm

Posted in Python

Tagged with

Neo4j: From JSON to CSV to LOAD CSV via jq

without comments


In my last blog post I showed how to import a Chicago crime categories & sub categories JSON document using Neo4j’s cypher query language via the py2neo driver. While this is a good approach for people with a developer background, many of the users I encounter aren’t developers and favour using Cypher via the Neo4j browser.

If we’re going to do this we’ll need to transform our JSON document into a CSV file so that we can use the LOAD CSV command on it. Michael pointed me to the jq tool which comes in very handy.

To recap, this is a part of the JSON file:

{
    "categories": [
        {
            "name": "Index Crime",
            "sub_categories": [
                {
                    "code": "01A",
                    "description": "Homicide 1st & 2nd Degree"
                },
            ]
        },
        {
            "name": "Non-Index Crime",
            "sub_categories": [
                {
                    "code": "01B",
                    "description": "Involuntary Manslaughter"
                },
            ]
        },
        {
            "name": "Violent Crime",
            "sub_categories": [
                {
                    "code": "01A",
                    "description": "Homicide 1st & 2nd Degree"
                },
            ]
        }
    ]
}

We want to get one row for each sub category which contains three columns – category name, sub category code, sub category description.

First we need to pull out the categories:

$ jq ".categories[]" categories.json
 
{
  "name": "Index Crime",
  "sub_categories": [
    {
      "code": "01A",
      "description": "Homicide 1st & 2nd Degree"
    },
  ]
}
{
  "name": "Non-Index Crime",
  "sub_categories": [
    {
      "code": "01B",
      "description": "Involuntary Manslaughter"
    },
  ]
}
{
  "name": "Violent Crime",
  "sub_categories": [
    {
      "code": "01A",
      "description": "Homicide 1st & 2nd Degree"
    },
  ]
}

Next we want to create a row for each sub category with the category alongside it. We can use the pipe function to combine the two selectors:

$ jq ".categories[] | {name: .name, sub_category: .sub_categories[]}" categories.json
 
{
  "name": "Index Crime",
  "sub_category": {
    "code": "01A",
    "description": "Homicide 1st & 2nd Degree"
  }
}
...
{
  "name": "Non-Index Crime",
  "sub_category": {
    "code": "01B",
    "description": "Involuntary Manslaughter"
  }
}
...
{
  "name": "Violent Crime",
  "sub_category": {
    "code": "01A",
    "description": "Homicide 1st & 2nd Degree"
  }
}

Now we want to un-nest the sub category:

$ jq ".categories[] | {name: .name, sub_category: .sub_categories[]} | [.name, .sub_category.code, .sub_category.description]" categories.json
 
[
  "Index Crime",
  "01A",
  "Homicide 1st & 2nd Degree"
]
 
[
  "Non-Index Crime",
  "01B",
  "Involuntary Manslaughter"
]
 
[
  "Violent Crime",
  "01A",
  "Homicide 1st & 2nd Degree"
]

And finally let’s use the @csv filter to generate CSV lines:

$ jq ".categories[] | {name: .name, sub_category: .sub_categories[]} | [.name, .sub_category.code, .sub_category.description] | @csv" categories.json
"\"Index Crime\",\"01A\",\"Homicide 1st & 2nd Degree\""
"\"Index Crime\",\"02\",\"Criminal Sexual Assault\""
"\"Index Crime\",\"03\",\"Robbery\""
"\"Index Crime\",\"04A\",\"Aggravated Assault\""
"\"Index Crime\",\"04B\",\"Aggravated Battery\""
"\"Index Crime\",\"05\",\"Burglary\""
"\"Index Crime\",\"06\",\"Larceny\""
"\"Index Crime\",\"07\",\"Motor Vehicle Theft\""
"\"Index Crime\",\"09\",\"Arson\""
"\"Non-Index Crime\",\"01B\",\"Involuntary Manslaughter\""
"\"Non-Index Crime\",\"08A\",\"Simple Assault\""
"\"Non-Index Crime\",\"08B\",\"Simple Battery\""
"\"Non-Index Crime\",\"10\",\"Forgery & Counterfeiting\""
"\"Non-Index Crime\",\"11\",\"Fraud\""
"\"Non-Index Crime\",\"12\",\"Embezzlement\""
"\"Non-Index Crime\",\"13\",\"Stolen Property\""
"\"Non-Index Crime\",\"14\",\"Vandalism\""
"\"Non-Index Crime\",\"15\",\"Weapons Violation\""
"\"Non-Index Crime\",\"16\",\"Prostitution\""
"\"Non-Index Crime\",\"17\",\"Criminal Sexual Abuse\""
"\"Non-Index Crime\",\"18\",\"Drug Abuse\""
"\"Non-Index Crime\",\"19\",\"Gambling\""
"\"Non-Index Crime\",\"20\",\"Offenses Against Family\""
"\"Non-Index Crime\",\"22\",\"Liquor License\""
"\"Non-Index Crime\",\"24\",\"Disorderly Conduct\""
"\"Non-Index Crime\",\"26\",\"Misc Non-Index Offense\""
"\"Violent Crime\",\"01A\",\"Homicide 1st & 2nd Degree\""
"\"Violent Crime\",\"02\",\"Criminal Sexual Assault\""
"\"Violent Crime\",\"03\",\"Robbery\""
"\"Violent Crime\",\"04A\",\"Aggravated Assault\""
"\"Violent Crime\",\"04B\",\"Aggravated Battery\""

The only annoying thing about this output is that all the double quotes are escaped. We can sort that out by passing the ‘-r’ flag when we call jq:

$ jq -r ".categories[] | {name: .name, sub_category: .sub_categories[]} | [.name, .sub_category.code, .sub_category.description] | @csv" categories.json
"Index Crime","01A","Homicide 1st & 2nd Degree"
"Index Crime","02","Criminal Sexual Assault"
"Index Crime","03","Robbery"
"Index Crime","04A","Aggravated Assault"
"Index Crime","04B","Aggravated Battery"
"Index Crime","05","Burglary"
"Index Crime","06","Larceny"
"Index Crime","07","Motor Vehicle Theft"
"Index Crime","09","Arson"
"Non-Index Crime","01B","Involuntary Manslaughter"
"Non-Index Crime","08A","Simple Assault"
"Non-Index Crime","08B","Simple Battery"
"Non-Index Crime","10","Forgery & Counterfeiting"
"Non-Index Crime","11","Fraud"
"Non-Index Crime","12","Embezzlement"
"Non-Index Crime","13","Stolen Property"
"Non-Index Crime","14","Vandalism"
"Non-Index Crime","15","Weapons Violation"
"Non-Index Crime","16","Prostitution"
"Non-Index Crime","17","Criminal Sexual Abuse"
"Non-Index Crime","18","Drug Abuse"
"Non-Index Crime","19","Gambling"
"Non-Index Crime","20","Offenses Against Family"
"Non-Index Crime","22","Liquor License"
"Non-Index Crime","24","Disorderly Conduct"
"Non-Index Crime","26","Misc Non-Index Offense"
"Violent Crime","01A","Homicide 1st & 2nd Degree"
"Violent Crime","02","Criminal Sexual Assault"
"Violent Crime","03","Robbery"
"Violent Crime","04A","Aggravated Assault"
"Violent Crime","04B","Aggravated Battery"

Excellent. The only thing left is to write a header and then direct the output into a CSV file and get it into Neo4j:

$ echo "category,sub_category_code,sub_category_description" > categories.csv
$ jq -r ".categories[] |
         {name: .name, sub_category: .sub_categories[]} |
         [.name, .sub_category.code, .sub_category.description] |
         @csv " categories.json >> categories.csv
$ head -n10 categories.csv
category,sub_category_code,sub_category_description
"Index Crime","01A","Homicide 1st & 2nd Degree"
"Index Crime","02","Criminal Sexual Assault"
"Index Crime","03","Robbery"
"Index Crime","04A","Aggravated Assault"
"Index Crime","04B","Aggravated Battery"
"Index Crime","05","Burglary"
"Index Crime","06","Larceny"
"Index Crime","07","Motor Vehicle Theft"
"Index Crime","09","Arson"
LOAD CSV WITH HEADERS FROM "file:///Users/markneedham/projects/neo4j-spark-chicago/categories.csv" AS row
MERGE (c:CrimeCategory {name: row.category})
MERGE (sc:SubCategory {code: row.sub_category_code})
ON CREATE SET sc.description = row.sub_category_description
MERGE (c)-[:CHILD]->(sc)

And that’s it!

Graph  25

Written by Mark Needham

July 25th, 2015 at 11:05 pm

Posted in neo4j

Tagged with

Neo4j: Loading JSON documents with Cypher

without comments

One of the most commonly asked questions I get asked is how to load JSON documents into Neo4j and although Cypher doesn’t have a ‘LOAD JSON’ command we can still get JSON data into the graph.

Michael shows how to do this from various languages in this blog post and I recently wanted to load a JSON document that I generated from Chicago crime types.

This is a snippet of the JSON document:

{
    "categories": [
        {
            "name": "Index Crime", 
            "sub_categories": [
                {
                    "code": "01A", 
                    "description": "Homicide 1st & 2nd Degree"
                }
            ]
        }, 
        {
            "name": "Non-Index Crime", 
            "sub_categories": [
                {
                    "code": "01B", 
                    "description": "Involuntary Manslaughter"
                }
            ]
        }, 
        {
            "name": "Violent Crime", 
            "sub_categories": [
                {
                    "code": "01A", 
                    "description": "Homicide 1st & 2nd Degree"
                }
            ]
        }
    ]
}

We want to create the following graph structure from this document:

2015 07 23 06 46 50

We can then connect the crimes to the appropriate sub category and write aggregation queries that drill down from the category.

To do this we’re going to have to pass the JSON document to Neo4j via its HTTP API rather than through the browser. Luckily there are drivers available for {insert your favourite language here} so we should still be good.

Python is my current goto language so I’m going to use py2neo to load the data in.

Let’s start by writing a simple query which passes our JSON document in and gets it straight back. Note that I’ve updated my Neo4j password to be ‘foobar’ – replace that with your equivalent if you’re following along:

import json
from py2neo import Graph, authenticate
 
# replace 'foobar' with your password
authenticate("localhost:7474", "neo4j", "foobar")
graph = Graph()
 
with open('categories.json') as data_file:
    json = json.load(data_file)
 
query = """
RETURN {json}
"""
 
# Send Cypher query.
print graph.cypher.execute(query, json = json)
$ python import_categories.py
   | document
---+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 1 | {u'categories': [{u'name': u'Index Crime', u'sub_categories': [{u'code': u'01A', u'description': u'Homicide 1st & 2nd Degree'}, {u'code': u'02', u'description': u'Criminal Sexual Assault'}, {u'code': u'03', u'description': u'Robbery'}, {u'code': u'04A', u'description': u'Aggravated Assault'}, {u'code': u'04B', u'description': u'Aggravated Battery'}, {u'code': u'05', u'description': u'Burglary'}, {u'code': u'06', u'description': u'Larceny'}, {u'code': u'07', u'description': u'Motor Vehicle Theft'}, {u'code': u'09', u'description': u'Arson'}]}, {u'name': u'Non-Index Crime', u'sub_categories': [{u'code': u'01B', u'description': u'Involuntary Manslaughter'}, {u'code': u'08A', u'description': u'Simple Assault'}, {u'code': u'08B', u'description': u'Simple Battery'}, {u'code': u'10', u'description': u'Forgery & Counterfeiting'}, {u'code': u'11', u'description': u'Fraud'}, {u'code': u'12', u'description': u'Embezzlement'}, {u'code': u'13', u'description': u'Stolen Property'}, {u'code': u'14', u'description': u'Vandalism'}, {u'code': u'15', u'description': u'Weapons Violation'}, {u'code': u'16', u'description': u'Prostitution'}, {u'code': u'17', u'description': u'Criminal Sexual Abuse'}, {u'code': u'18', u'description': u'Drug Abuse'}, {u'code': u'19', u'description': u'Gambling'}, {u'code': u'20', u'description': u'Offenses Against Family'}, {u'code': u'22', u'description': u'Liquor License'}, {u'code': u'24', u'description': u'Disorderly Conduct'}, {u'code': u'26', u'description': u'Misc Non-Index Offense'}]}, {u'name': u'Violent Crime', u'sub_categories': [{u'code': u'01A', u'description': u'Homicide 1st & 2nd Degree'}, {u'code': u'02', u'description': u'Criminal Sexual Assault'}, {u'code': u'03', u'description': u'Robbery'}, {u'code': u'04A', u'description': u'Aggravated Assault'}, {u'code': u'04B', u'description': u'Aggravated Battery'}]}]}

It’s a bit ugly but we can see that everything’s there! Our next step is to extract each category into its own row. We can do this by accessing the ‘categories’ key in our JSON document and then calling the UNWIND function which allows us to expand a collection into a sequence of rows:

query = """
WITH {json} AS document
UNWIND document.categories AS category
RETURN category.name
"""
$ python import_categories.py
   | category.name
---+-----------------
 1 | Index Crime
 2 | Non-Index Crime
 3 | Violent Crime

Now we can create a node for each of those categories. We’ll use the MERGE command so that we can run this script multiple times without ending up with repeat categories:

query = """
WITH {json} AS document
UNWIND document.categories AS category
MERGE (:CrimeCategory {name: category.name}) 
"""

Let’s quickly check those categories were correctly imported:

match (category:CrimeCategory)
return category

Graph  23

Looking good so far – now for the sub categories. We’re going to use the UNWIND function to help us out here as well:

query = """
WITH {json} AS document
UNWIND document.categories AS category
UNWIND category.sub_categories AS subCategory
RETURN category.name, subCategory.code, subCategory.description
"""
$ python import_categories.py
    | category.name   | subCategory.code | subCategory.description
----+-----------------+------------------+---------------------------
  1 | Index Crime     | 01A              | Homicide 1st & 2nd Degree
  2 | Index Crime     | 02               | Criminal Sexual Assault
  3 | Index Crime     | 03               | Robbery
  4 | Index Crime     | 04A              | Aggravated Assault
  5 | Index Crime     | 04B              | Aggravated Battery
  6 | Index Crime     | 05               | Burglary
  7 | Index Crime     | 06               | Larceny
  8 | Index Crime     | 07               | Motor Vehicle Theft
  9 | Index Crime     | 09               | Arson
 10 | Non-Index Crime | 01B              | Involuntary Manslaughter
 11 | Non-Index Crime | 08A              | Simple Assault
 12 | Non-Index Crime | 08B              | Simple Battery
 13 | Non-Index Crime | 10               | Forgery & Counterfeiting
 14 | Non-Index Crime | 11               | Fraud
 15 | Non-Index Crime | 12               | Embezzlement
 16 | Non-Index Crime | 13               | Stolen Property
 17 | Non-Index Crime | 14               | Vandalism
 18 | Non-Index Crime | 15               | Weapons Violation
 19 | Non-Index Crime | 16               | Prostitution
 20 | Non-Index Crime | 17               | Criminal Sexual Abuse
 21 | Non-Index Crime | 18               | Drug Abuse
 22 | Non-Index Crime | 19               | Gambling
 23 | Non-Index Crime | 20               | Offenses Against Family
 24 | Non-Index Crime | 22               | Liquor License
 25 | Non-Index Crime | 24               | Disorderly Conduct
 26 | Non-Index Crime | 26               | Misc Non-Index Offense
 27 | Violent Crime   | 01A              | Homicide 1st & 2nd Degree
 28 | Violent Crime   | 02               | Criminal Sexual Assault
 29 | Violent Crime   | 03               | Robbery
 30 | Violent Crime   | 04A              | Aggravated Assault
 31 | Violent Crime   | 04B              | Aggravated Battery

Let’s give sub categories the MERGE treatment too:

query = """
WITH {json} AS document
UNWIND document.categories AS category
UNWIND category.sub_categories AS subCategory
MERGE (c:CrimeCategory {name: category.name})
MERGE (sc:SubCategory {code: subCategory.code})
ON CREATE SET sc.description = subCategory.description
MERGE (c)-[:CHILD]->(sc)
"""

And finally let’s write a query to check what we’ve imported:

match (category:CrimeCategory)-[:CHILD]->(subCategory)
return *
Graph  24

I hadn’t realised before running this query is that some sub categories sit under multiple categories so that’s quite an interesting insight. The final Python script is available on github – any questions let me know.

Written by Mark Needham

July 23rd, 2015 at 6:15 am

Posted in neo4j

Tagged with

Neo4j 2.2.3: neo4j-import – Encoder StringEncoder[2] returned an illegal encoded value 0

without comments

I’ve been playing around with the Chicago crime data set again while preparing for a Neo4j webinar next week and while running the import tool ran into the following exception:

Importing the contents of these files into tmp/crimes.db:
Nodes:
  /Users/markneedham/projects/neo4j-spark-chicago/tmp/crimes.csv
 
  /Users/markneedham/projects/neo4j-spark-chicago/tmp/beats.csv
 
  /Users/markneedham/projects/neo4j-spark-chicago/tmp/primaryTypes.csv
 
  /Users/markneedham/projects/neo4j-spark-chicago/tmp/locations.csv
Relationships:
  /Users/markneedham/projects/neo4j-spark-chicago/tmp/crimesBeats.csv
 
  /Users/markneedham/projects/neo4j-spark-chicago/tmp/crimesPrimaryTypes.csv
 
  /Users/markneedham/projects/neo4j-spark-chicago/tmp/crimesLocationsCleaned.csv
 
Available memory:
  Free machine memory: 263.17 MB
  Max heap memory : 3.56 GB
 
Nodes
[*>:17.41 MB/s-------------------------|PROPERTIES(3)=|NODE:3|LABEL SCAN----|v:36.30 MB/s(2)===]  3MImport error: Panic called, so exiting
java.lang.RuntimeException: Panic called, so exiting
	at org.neo4j.unsafe.impl.batchimport.staging.AbstractStep.assertHealthy(AbstractStep.java:200)
	at org.neo4j.unsafe.impl.batchimport.staging.AbstractStep.await(AbstractStep.java:191)
	at org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep.receive(ProcessorStep.java:98)
	at org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep.sendDownstream(ProcessorStep.java:224)
	at org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep.access$400(ProcessorStep.java:42)
	at org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep$Sender.send(ProcessorStep.java:250)
	at org.neo4j.unsafe.impl.batchimport.LabelScanStorePopulationStep.process(LabelScanStorePopulationStep.java:60)
	at org.neo4j.unsafe.impl.batchimport.LabelScanStorePopulationStep.process(LabelScanStorePopulationStep.java:37)
	at org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep$4.run(ProcessorStep.java:120)
	at org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep$4.run(ProcessorStep.java:102)
	at org.neo4j.unsafe.impl.batchimport.executor.DynamicTaskExecutor$Processor.run(DynamicTaskExecutor.java:237)
Caused by: java.lang.IllegalStateException: Encoder StringEncoder[2] returned an illegal encoded value 0
	at org.neo4j.unsafe.impl.batchimport.cache.idmapping.string.EncodingIdMapper.encode(EncodingIdMapper.java:229)
	at org.neo4j.unsafe.impl.batchimport.cache.idmapping.string.EncodingIdMapper.put(EncodingIdMapper.java:208)
	at org.neo4j.unsafe.impl.batchimport.NodeEncoderStep.process(NodeEncoderStep.java:77)
	at org.neo4j.unsafe.impl.batchimport.NodeEncoderStep.process(NodeEncoderStep.java:43)
	... 3 more

I narrowed the problem down to a specific file and from tracing the code learned that this exception happens when we’ve ended up with a node that doesn’t have an id.

I guessed that this might be due to there being an empty column somewhere in my CSV file so I did a bit of grepping:

$ grep -rn  "\"\"" tmp/locations.csv
tmp/locations.csv:11:"",Location

We can now narrow down the import to just the one line and see if we still get the exception:

$ cat foo.csv
id:ID(Location),:LABEL
"",Location
 
$ ./neo4j-community-2.2.3/bin/neo4j-import --into tmp/foo --nodes foo.csv
Importing the contents of these files into tmp/foo:
Nodes:
  /Users/markneedham/projects/neo4j-spark-chicago/foo.csv
 
Available memory:
  Free machine memory: 2.22 GB
  Max heap memory : 3.56 GB
 
Nodes
Import error: Encoder StringEncoder[2] returned an illegal encoded value 0

Yep, same error. Now we can clean up our CSV file and try again:

$ grep -v  "\"\"" foo.csv > fooCleaned.csv
 
# I put in a few real records so we can see them import
$ cat fooCleaned.csv
id:ID(Location),:LABEL
"RAILROAD PROPERTY",Location
"NEWSSTAND",Location
"SCHOOL, PRIVATE, BUILDING",Location
 
$ ./neo4j-community-2.2.3/bin/neo4j-import --into tmp/foo --nodes fooCleaned.csv
Importing the contents of these files into tmp/foo:
Nodes:
  /Users/markneedham/projects/neo4j-spark-chicago/fooCleaned.csv
 
Available memory:
  Free machine memory: 1.23 GB
  Max heap memory : 3.56 GB
 
Nodes
[*>:??-------------------------------------------------|PROPE|NODE:7.63 MB-----------------|LA] 10k
Done in 110ms
Prepare node index
[*DETECT:7.63 MB-------------------------------------------------------------------------------]   0
Done in 60ms
Calculate dense nodes
[*>:??-----------------------------------------------------------------------------------------]   0
Done in 10ms
Relationships
[*>:??-----------------------------------------------------------------------------------------]   0
Done in 11ms
Node --> Relationship
[*v:??-----------------------------------------------------------------------------------------] 10k
Done in 1ms
Relationship --> Relationship
[*>:??-----------------------------------------------------------------------------------------]   0
Done in 11ms
Node counts
[>:|*COUNT:76.29 MB----------------------------------------------------------------------------] 10k
Done in 46ms
Relationship counts
[*>:??-----------------------------------------------------------------------------------------]   0
Done in 12ms
 
IMPORT DONE in 1s 576ms. Imported:
  3 nodes
  0 relationships
  3 properties

Sweet! We’re back in business.

Written by Mark Needham

July 21st, 2015 at 6:11 am

Posted in neo4j

Tagged with

R: Bootstrap confidence intervals

without comments

I recently came across an interesting post on Julia Evans’ blog showing how to generate a bigger set of data points by sampling the small set of data points that we actually have using bootstrapping. Julia’s examples are all in Python so I thought it’d be a fun exercise to translate them into R.

We’re doing the bootstrapping to simulate the number of no-shows for a flight so we can work out how many seats we can overbook the plane by.

We start out with a small sample of no-shows and work off the assumption that it’s ok to kick someone off a flight 5% of the time. Let’s work out how many people that’d be for our initial sample:

> data = c(0, 1, 3, 2, 8, 2, 3, 4)
> quantile(data, 0.05)
  5% 
0.35

0.35 people! That’s not a particularly useful result so we’re going to resample the initial data set 10,000 times, taking the 5%ile each time and see if we come up with something better:

We’re going to use the sample function with replacement to generate our resamples:

> sample(data, replace = TRUE)
[1] 0 3 2 8 8 0 8 0
> sample(data, replace = TRUE)
[1] 2 2 4 3 4 4 2 2

Now let’s write a function to do that multiple times:

library(ggplot)
 
bootstrap_5th_percentile = function(data, n_bootstraps) {
  return(sapply(1:n_bootstraps, 
                function(iteration) quantile(sample(data, replace = TRUE), 0.05)))
}
 
values = bootstrap_5th_percentile(data, 10000)
 
ggplot(aes(x = value), data = data.frame(value = values)) + geom_histogram(binwidth=0.25)

2015 07 19 18 05 48

So this visualisation is telling us that we can oversell by 0-2 people but we don’t know an exact number.

Let’s try the same exercise but with a bigger initial data set of 1,000 values rather than just 8. First we’ll generate a distribution (with a mean of 5 and standard deviation of 2) and visualise it:

library(dplyr)
 
df = data.frame(value = rnorm(1000,5, 2))
df = df %>% filter(value >= 0) %>% mutate(value = as.integer(round(value)))
ggplot(aes(x = value), data = df) + geom_histogram(binwidth=1)

2015 07 19 18 09 15

Our distribution seems to have a lot more values around 4 & 5 whereas the Python version has a flatter distribution – I’m not sure why that is so if you have any ideas let me know. In any case, let’s check the 5%ile for this data set:

> quantile(df$value, 0.05)
5% 
 2

Cool! Now at least we have an integer value rather than the 0.35 we got earlier. Finally let’s do some bootstrapping over our new distribution and see what 5%ile we come up with:

resampled = bootstrap_5th_percentile(df$value, 10000)
byValue = data.frame(value = resampled) %>% count(value)
 
> byValue
Source: local data frame [3 x 2]
 
  value    n
1   1.0    3
2   1.7    2
3   2.0 9995
 
ggplot(aes(x = value, y = n), data = byValue) + geom_bar(stat = "identity")

2015 07 19 18 23 29

‘2’ is by far the most popular 5%ile here although it seems weighted more towards that value than with Julia’s Python version, which I imagine is because we seem to have sampled from a slightly different distribution.

Written by Mark Needham

July 19th, 2015 at 7:44 pm

Posted in R

Tagged with

R: Blog post frequency anomaly detection

without comments

I came across Twitter’s anomaly detection library last year but haven’t yet had a reason to take it for a test run so having got my blog post frequency data into shape I thought it’d be fun to run it through the algorithm.

I wanted to see if it would detect any periods of time when the number of posts differed significantly – I don’t really have an action I’m going to take based on the results, it’s curiosity more than anything else!

First we need to get the library installed. It’s not on CRAN so we need to use devtools to install it from the github repository:

install.packages("devtools")
devtools::install_github("twitter/AnomalyDetection")
library(AnomalyDetection)

The expected data format is two columns – one containing a time stamp and the other a count. e.g. using the ‘raw_data’ data frame that is in scope when you add the library:

> library(dplyr)
> raw_data %>% head()
            timestamp   count
1 1980-09-25 14:01:00 182.478
2 1980-09-25 14:02:00 176.231
3 1980-09-25 14:03:00 183.917
4 1980-09-25 14:04:00 177.798
5 1980-09-25 14:05:00 165.469
6 1980-09-25 14:06:00 181.878

In our case the timestamps will be the start date of a week and the count the number of posts in that week. But first let’s get some practice calling the anomaly function using the canned data:

res = AnomalyDetectionTs(raw_data, max_anoms=0.02, direction='both', plot=TRUE)
res$plot

2015 07 18 00 09 22

From this visualisation we learn that we should expect both high and low outliers to be identified. Let’s give it a try with the blog post publication data.

We need to get the data into shape so we’ll start by getting a count of the number of blog posts by (week, year) pair:

> df %>% sample_n(5)
                                                           title                date
1425                            Coding: Copy/Paste then refactor 2009-10-31 07:54:31
783  Neo4j 2.0.0-M06 -> 2.0.0-RC1: Working with path expressions 2013-11-23 10:30:41
960                                        R: Removing for loops 2015-04-18 23:53:20
966   R: dplyr - Error in (list: invalid subscript type 'double' 2015-04-27 22:34:43
343                     Parsing XML from the unix terminal/shell 2011-09-03 23:42:11
 
> byWeek = df %>% 
    mutate(year = year(date), week = week(date)) %>% 
    group_by(week, year) %>% summarise(n = n()) %>% 
    ungroup() %>% arrange(desc(n))
 
> byWeek %>% sample_n(5)
Source: local data frame [5 x 3]
 
  week year n
1   44 2009 6
2   37 2011 4
3   39 2012 3
4    7 2013 4
5    6 2010 6

Great. The next step is to translate this data frame into one containing a date representing the start of that week and the number of posts:

> data = byWeek %>% 
    mutate(start_of_week = calculate_start_of_week(week, year)) %>%
    filter(start_of_week > ymd("2008-07-01")) %>%
    select(start_of_week, n)
 
> data %>% sample_n(5)
Source: local data frame [5 x 2]
 
  start_of_week n
1    2010-09-10 4
2    2013-04-09 4
3    2010-04-30 6
4    2012-03-11 3
5    2014-12-03 3

We’re now ready to plug it into the anomaly detection function:

res = AnomalyDetectionTs(data, 
                         max_anoms=0.02, 
                         direction='both', 
                         plot=TRUE)
res$plot

2015 07 18 00 24 20

Interestingly I don’t seem to have any low end anomalies – there were a couple of really high frequency weeks when I first started writing and I think one of the other weeks contains a New Year’s Eve when I was particularly bored!

If we group by month instead only the very first month stands out as an outlier:

data = byMonth %>% 
  mutate(start_of_month = ymd(paste(year, month, 1, sep="-"))) %>%
  filter(start_of_month > ymd("2008-07-01")) %>%
  select(start_of_month, n)
res = AnomalyDetectionTs(data, 
                         max_anoms=0.02, 
                         direction='both',       
                         #longterm = TRUE,
                         plot=TRUE)
res$plot

2015 07 18 00 34 02

I’m not sure what else to do as far as anomaly detection goes but if you have any ideas please let me know!

Written by Mark Needham

July 17th, 2015 at 11:34 pm

Posted in R

Tagged with