Mark Needham

Thoughts on Software Development

Archive for the ‘Spark’ Category

Spark: MatchError (of class org.apache.spark.sql.catalyst.expressions.GenericRow) spark

without comments

I’ve been using Spark again lately to do some pre-processing on the Land Registry data set and ran into an initially confusing problem when trying to parse the CSV file.

I’m using the Databricks CSV parsing library and wrote the following script to go over each row, collect up the address components and then derive a ‘fullAddress’ field.

To refresh, this is what the CSV file looks like:

$ head  -n5 pp-complete.csv
"{0C7ADEF5-878D-4066-B785-0000003ED74A}","163000","2003-02-21 00:00","UB5 4PJ","T","N","F","106","","READING ROAD","NORTHOLT","NORTHOLT","EALING","GREATER LONDON","A"
"{35F67271-ABD4-40DA-AB09-00000085B9D3}","247500","2005-07-15 00:00","TA19 9DD","D","N","F","58","","ADAMS MEADOW","ILMINSTER","ILMINSTER","SOUTH SOMERSET","SOMERSET","A"
"{B20B1C74-E8E1-4137-AB3E-0000011DF342}","320000","2010-09-10 00:00","W4 1DZ","F","N","L","58","","WHELLOCK ROAD","","LONDON","EALING","GREATER LONDON","A"
"{7D6B0915-C56B-4275-AF9B-00000156BCE7}","104000","1997-08-27 00:00","NE61 2BH","D","N","F","17","","WESTGATE","MORPETH","MORPETH","CASTLE MORPETH","NORTHUMBERLAND","A"
"{47B60101-B64C-413D-8F60-000002F1692D}","147995","2003-05-02 00:00","PE33 0RU","D","N","F","4","","MASON GARDENS","WEST WINCH","KING'S LYNN","KING'S LYNN AND WEST NORFOLK","NORFOLK","A"
import org.apache.spark.sql.{SQLContext, _}
import org.apache.spark.{SparkConf, SparkContext}
 
case class BlogTransaction(price: Integer, date: String, postCode:String, paon:String, saon:String, street:String, locality:String, city:String, district:String, county:String)
 
object BlogApp {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._
 
    sqlContext.read.format("com.databricks.spark.csv").load("/Users/markneedham/projects/land-registry/pp-complete.csv").registerTempTable("transactions")
 
    val rows = sqlContext.sql("select C1,C2,C3,C7,C8,C9,C10,C11,C12,C13 from transactions where transactions.C3 = 'SW3 4EU'").map(x =>
      Row.fromSeq(x.toSeq ++ Array(Array(x.get(4), x.get(3), x.get(5), x.get(6), x.get(7), x.get(8), x.get(9), x.get(2))
        .map(x => x.toString)
        .filter(x => !x.isEmpty)
        .distinct
        .mkString(" / "))))
 
    val path: String = "/tmp/tx-" + System.currentTimeMillis() + ".csv"
    rows.map {
      case Row(price: Integer, date: String, postCode:String, paon:String, saon:String, street:String, locality:String, city:String, district:String, county:String) =>
        BlogTransaction(price, date, postCode, paon, saon, street, locality, city, district, county) }
      .toDF()
      .write
      .format("com.databricks.spark.csv")
      .save(path)
  }
}

Let’s execute that job against a local Spark worker:

./spark-1.5.0-bin-hadoop2.6/bin/spark-submit --class BlogApp --master local[8] --packages com.databricks:spark-csv_2.10:1.2.0 target/scala-2.10/simple-project_2.10-1.0.jar
 
15/10/27 22:56:41 INFO Executor: Executor killed task 7.0 in stage 1.0 (TID 8)
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 1.0 failed 1 times, most recent failure: Lost task 2.0 in stage 1.0 (TID 3, localhost): scala.MatchError: [14850000,2013-11-13 00:00,SW3 4EU,9,,ORMONDE GATE,,LONDON,KENSINGTON AND CHELSEA,GREATER LONDON,9 / ORMONDE GATE / LONDON / KENSINGTON AND CHELSEA / GREATER LONDON / SW3 4EU] (of class org.apache.spark.sql.catalyst.expressions.GenericRow)
	at BlogApp$$anonfun$main$1.apply(BlogApp.scala:24)
	at BlogApp$$anonfun$main$1.apply(BlogApp.scala:24)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at com.databricks.spark.csv.package$CsvSchemaRDD$$anonfun$5$$anon$1.next(package.scala:154)
	at com.databricks.spark.csv.package$CsvSchemaRDD$$anonfun$5$$anon$1.next(package.scala:147)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply$mcV$sp(PairRDDFunctions.scala:1109)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1116)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:88)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
 
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1280)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1268)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1267)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1267)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1493)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1124)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1065)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1065)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1065)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:989)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:965)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:965)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:965)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:897)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:897)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:897)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:896)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1426)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1405)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1405)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1405)
	at com.databricks.spark.csv.package$CsvSchemaRDD.saveAsCsvFile(package.scala:169)
	at com.databricks.spark.csv.DefaultSource.createRelation(DefaultSource.scala:165)
	at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:170)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137)
	at BlogApp$.main(BlogApp.scala:30)
	at BlogApp.main(BlogApp.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: scala.MatchError: [14850000,2013-11-13 00:00,SW3 4EU,9,,ORMONDE GATE,,LONDON,KENSINGTON AND CHELSEA,GREATER LONDON,9 / ORMONDE GATE / LONDON / KENSINGTON AND CHELSEA / GREATER LONDON / SW3 4EU] (of class org.apache.spark.sql.catalyst.expressions.GenericRow)
	at BlogApp$$anonfun$main$1.apply(BlogApp.scala:24)
	at BlogApp$$anonfun$main$1.apply(BlogApp.scala:24)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at com.databricks.spark.csv.package$CsvSchemaRDD$$anonfun$5$$anon$1.next(package.scala:154)
	at com.databricks.spark.csv.package$CsvSchemaRDD$$anonfun$5$$anon$1.next(package.scala:147)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply$mcV$sp(PairRDDFunctions.scala:1109)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1116)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:88)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

So it looks like we have something wrong with our matching code and the only place we’re matching anything is the Row case class when we’re mapping over rows.

Although I thought price should be an integer I tweaked it to be a string just in case that was the issue:

case class BlogTransaction(price: Integer, date: String, postCode:String, paon:String, saon:String, street:String, locality:String, city:String, district:String, county:String)
...
case Row(price: Integer, date: String, postCode:String, paon:String, saon:String, street:String, locality:String, city:String, district:String, county:String) =>

changed to:

case class BlogTransaction(price: String, date: String, postCode:String, paon:String, saon:String, street:String, locality:String, city:String, district:String, county:String)
...
case Row(price: String, date: String, postCode:String, paon:String, saon:String, street:String, locality:String, city:String, district:String, county:String) =>

Attempt #2:

./spark-1.5.0-bin-hadoop2.6/bin/spark-submit --class BlogApp --master local[8] --packages com.databricks:spark-csv_2.10:1.2.0 target/scala-2.10/simple-project_2.10-1.0.jar
 
15/10/27 23:01:35 WARN TaskSetManager: Lost task 6.0 in stage 1.0 (TID 7, localhost): TaskKilled (killed intentionally)
Exception in thread "main" 15/10/27 23:01:35 WARN TaskSetManager: Lost task 1.0 in stage 1.0 (TID 2, localhost): TaskKilled (killed intentionally)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 1.0 failed 1 times, most recent failure: Lost task 2.0 in stage 1.0 (TID 3, localhost): scala.MatchError: [14850000,2013-11-13 00:00,SW3 4EU,9,,ORMONDE GATE,,LONDON,KENSINGTON AND CHELSEA,GREATER LONDON,9 / ORMONDE GATE / LONDON / KENSINGTON AND CHELSEA / GREATER LONDON / SW3 4EU] (of class org.apache.spark.sql.catalyst.expressions.GenericRow)
	at BlogApp$$anonfun$main$1.apply(BlogApp.scala:24)
	at BlogApp$$anonfun$main$1.apply(BlogApp.scala:24)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at com.databricks.spark.csv.package$CsvSchemaRDD$$anonfun$5$$anon$1.next(package.scala:154)
	at com.databricks.spark.csv.package$CsvSchemaRDD$$anonfun$5$$anon$1.next(package.scala:147)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply$mcV$sp(PairRDDFunctions.scala:1109)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1116)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:88)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
 
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1280)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1268)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1267)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1267)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1493)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1124)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1065)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1065)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1065)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:989)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:965)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:965)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:965)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:897)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:897)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:897)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:896)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1426)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1405)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1405)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1405)
	at com.databricks.spark.csv.package$CsvSchemaRDD.saveAsCsvFile(package.scala:169)
	at com.databricks.spark.csv.DefaultSource.createRelation(DefaultSource.scala:165)
	at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:170)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137)
	at BlogApp$.main(BlogApp.scala:30)
	at BlogApp.main(BlogApp.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: scala.MatchError: [14850000,2013-11-13 00:00,SW3 4EU,9,,ORMONDE GATE,,LONDON,KENSINGTON AND CHELSEA,GREATER LONDON,9 / ORMONDE GATE / LONDON / KENSINGTON AND CHELSEA / GREATER LONDON / SW3 4EU] (of class org.apache.spark.sql.catalyst.expressions.GenericRow)
	at BlogApp$$anonfun$main$1.apply(BlogApp.scala:24)
	at BlogApp$$anonfun$main$1.apply(BlogApp.scala:24)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at com.databricks.spark.csv.package$CsvSchemaRDD$$anonfun$5$$anon$1.next(package.scala:154)
	at com.databricks.spark.csv.package$CsvSchemaRDD$$anonfun$5$$anon$1.next(package.scala:147)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply$mcV$sp(PairRDDFunctions.scala:1109)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$6.apply(PairRDDFunctions.scala:1108)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1116)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:88)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Hmmm….no improvement. At this point I realised I’d accidentally missed off the fullAddress argument from the case statement so I added that in:

case class BlogTransaction(price: String, date: String, postCode:String, paon:String, saon:String, street:String, locality:String, city:String, district:String, county:String)
...
case Row(price: String, date: String, postCode:String, paon:String, saon:String, street:String, locality:String, city:String, district:String, county:String) =>

changed to:

case class BlogTransaction(price: String, date: String, postCode:String, paon:String, saon:String, street:String, locality:String, city:String, district:String, county:String, fullAddress:String)
...
case Row(price: String, date: String, postCode:String, paon:String, saon:String, street:String, locality:String, city:String, district:String, county:String, fullAddress:String) =>

Attempt #3:

./spark-1.5.0-bin-hadoop2.6/bin/spark-submit --class BlogApp --master local[8] --packages com.databricks:spark-csv_2.10:1.2.0 target/scala-2.10/simple-project_2.10-1.0.jar
...
15/10/27 23:06:03 INFO DAGScheduler: Job 1 finished: saveAsTextFile at package.scala:169, took 39.665661 s

Hoorah, it took a bit of guess work but finally it’s finally working!

For completeness, here’s the final version of the Spark job:

import org.apache.spark.sql.{SQLContext, _}
import org.apache.spark.{SparkConf, SparkContext}
 
case class BlogTransaction(price: Integer, date: String, postCode:String, paon:String, saon:String, street:String, locality:String, city:String, district:String, county:String, fullAddress:String)
 
object BlogApp {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Simple Application")
 
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._
 
    sqlContext.read.format("com.databricks.spark.csv").load("/Users/markneedham/projects/land-registry/pp-complete.csv").registerTempTable("transactions")
 
    val rows = sqlContext.sql("select C1,C2,C3,C7,C8,C9,C10,C11,C12,C13 from transactions where transactions.C3 = 'SW3 4EU'").map(x =>
      Row.fromSeq(x.toSeq ++ Array(Array(x.get(4), x.get(3), x.get(5), x.get(6), x.get(7), x.get(8), x.get(9), x.get(2))
        .map(x => x.toString)
        .filter(x => !x.isEmpty)
        .distinct
        .mkString(" / "))))
 
    val path: String = "/tmp/tx-" + System.currentTimeMillis() + ".csv"
    rows.map {
      case Row(price: Integer, date: String, postCode:String, paon:String, saon:String, street:String, locality:String, city:String, district:String, county:String, fullAddress:String) =>
        BlogTransaction(price, date, postCode, paon, saon, street, locality, city, district, county, fullAddress) }
      .toDF()
      .write
      .format("com.databricks.spark.csv")
      .save(path)
  }
}

Written by Mark Needham

October 27th, 2015 at 11:10 pm

Posted in Spark

Tagged with

SparkR: Add new column to data frame by concatenating other columns

without comments

Continuing with my exploration of the Land Registry open data set using SparkR I wanted to see which road in the UK has had the most property sales over the last 20 years.

To recap, this is what the data frame looks like:

./spark-1.5.0-bin-hadoop2.6/bin/sparkR --packages com.databricks:spark-csv_2.11:1.2.0
 
> sales <- read.df(sqlContext, "pp-complete.csv", "com.databricks.spark.csv", header="false")
 
> head(sales)
                                      C0     C1               C2       C3 C4 C5
1 {0C7ADEF5-878D-4066-B785-0000003ED74A} 163000 2003-02-21 00:00  UB5 4PJ  T  N
2 {35F67271-ABD4-40DA-AB09-00000085B9D3} 247500 2005-07-15 00:00 TA19 9DD  D  N
3 {B20B1C74-E8E1-4137-AB3E-0000011DF342} 320000 2010-09-10 00:00   W4 1DZ  F  N
4 {7D6B0915-C56B-4275-AF9B-00000156BCE7} 104000 1997-08-27 00:00 NE61 2BH  D  N
5 {47B60101-B64C-413D-8F60-000002F1692D} 147995 2003-05-02 00:00 PE33 0RU  D  N
6 {51F797CA-7BEB-4958-821F-000003E464AE} 110000 2013-03-22 00:00 NR35 2SF  T  N
  C6  C7 C8              C9         C10         C11
1  F 106       READING ROAD    NORTHOLT    NORTHOLT
2  F  58       ADAMS MEADOW   ILMINSTER   ILMINSTER
3  L  58      WHELLOCK ROAD                  LONDON
4  F  17           WESTGATE     MORPETH     MORPETH
5  F   4      MASON GARDENS  WEST WINCH KING'S LYNN
6  F   5    WILD FLOWER WAY DITCHINGHAM      BUNGAY
                           C12            C13 C14
1                       EALING GREATER LONDON   A
2               SOUTH SOMERSET       SOMERSET   A
3                       EALING GREATER LONDON   A
4               CASTLE MORPETH NORTHUMBERLAND   A
5 KING'S LYNN AND WEST NORFOLK        NORFOLK   A
6                SOUTH NORFOLK        NORFOLK   A

This document explains the data stored in each field and for this particular query we’re interested in fields C9-C12. The plan is to group the data frame by those fields and then sort by frequency in descending order.

When grouping by multiple fields it tends to be easiest to create a new field which concatenates them all and then group by that.

I started with the following:

> sales$address = paste(sales$C9, sales$C10, sales$C11, sales$C12, sep=", ")
Error in as.character.default(<S4 object of class "Column">) :
  no method for coercing this S4 class to a vector

Not so successful! Next I went even more primitive:

> sales$address = sales$C9 + ", " + sales$C10 + ", " + sales$C11 + ", " + sales$C12
 
> head(sales)
                                      C0     C1               C2       C3 C4 C5
1 {0C7ADEF5-878D-4066-B785-0000003ED74A} 163000 2003-02-21 00:00  UB5 4PJ  T  N
2 {35F67271-ABD4-40DA-AB09-00000085B9D3} 247500 2005-07-15 00:00 TA19 9DD  D  N
3 {B20B1C74-E8E1-4137-AB3E-0000011DF342} 320000 2010-09-10 00:00   W4 1DZ  F  N
4 {7D6B0915-C56B-4275-AF9B-00000156BCE7} 104000 1997-08-27 00:00 NE61 2BH  D  N
5 {47B60101-B64C-413D-8F60-000002F1692D} 147995 2003-05-02 00:00 PE33 0RU  D  N
6 {51F797CA-7BEB-4958-821F-000003E464AE} 110000 2013-03-22 00:00 NR35 2SF  T  N
  C6  C7 C8              C9         C10         C11
1  F 106       READING ROAD    NORTHOLT    NORTHOLT
2  F  58       ADAMS MEADOW   ILMINSTER   ILMINSTER
3  L  58      WHELLOCK ROAD                  LONDON
4  F  17           WESTGATE     MORPETH     MORPETH
5  F   4      MASON GARDENS  WEST WINCH KING'S LYNN
6  F   5    WILD FLOWER WAY DITCHINGHAM      BUNGAY
                           C12            C13 C14 address
1                       EALING GREATER LONDON   A      NA
2               SOUTH SOMERSET       SOMERSET   A      NA
3                       EALING GREATER LONDON   A      NA
4               CASTLE MORPETH NORTHUMBERLAND   A      NA
5 KING'S LYNN AND WEST NORFOLK        NORFOLK   A      NA
6                SOUTH NORFOLK        NORFOLK   A      NA

That at least compiled but all addresses were ‘NA’ which isn’t what we want. After a bit of searching I realised that there was a concat function that I could use for exactly this task:

> sales$address = concat_ws(sep=", ", sales$C9, sales$C10, sales$C11, sales$C12)
 
> head(sales)
                                      C0     C1               C2       C3 C4 C5
1 {0C7ADEF5-878D-4066-B785-0000003ED74A} 163000 2003-02-21 00:00  UB5 4PJ  T  N
2 {35F67271-ABD4-40DA-AB09-00000085B9D3} 247500 2005-07-15 00:00 TA19 9DD  D  N
3 {B20B1C74-E8E1-4137-AB3E-0000011DF342} 320000 2010-09-10 00:00   W4 1DZ  F  N
4 {7D6B0915-C56B-4275-AF9B-00000156BCE7} 104000 1997-08-27 00:00 NE61 2BH  D  N
5 {47B60101-B64C-413D-8F60-000002F1692D} 147995 2003-05-02 00:00 PE33 0RU  D  N
6 {51F797CA-7BEB-4958-821F-000003E464AE} 110000 2013-03-22 00:00 NR35 2SF  T  N
  C6  C7 C8              C9         C10         C11
1  F 106       READING ROAD    NORTHOLT    NORTHOLT
2  F  58       ADAMS MEADOW   ILMINSTER   ILMINSTER
3  L  58      WHELLOCK ROAD                  LONDON
4  F  17           WESTGATE     MORPETH     MORPETH
5  F   4      MASON GARDENS  WEST WINCH KING'S LYNN
6  F   5    WILD FLOWER WAY DITCHINGHAM      BUNGAY
                           C12            C13 C14
1                       EALING GREATER LONDON   A
2               SOUTH SOMERSET       SOMERSET   A
3                       EALING GREATER LONDON   A
4               CASTLE MORPETH NORTHUMBERLAND   A
5 KING'S LYNN AND WEST NORFOLK        NORFOLK   A
6                SOUTH NORFOLK        NORFOLK   A
                                                               address
1                             READING ROAD, NORTHOLT, NORTHOLT, EALING
2                   ADAMS MEADOW, ILMINSTER, ILMINSTER, SOUTH SOMERSET
3                                      WHELLOCK ROAD, , LONDON, EALING
4                           WESTGATE, MORPETH, MORPETH, CASTLE MORPETH
5 MASON GARDENS, WEST WINCH, KING'S LYNN, KING'S LYNN AND WEST NORFOLK
6                  WILD FLOWER WAY, DITCHINGHAM, BUNGAY, SOUTH NORFOLK

That’s more like it! Now let’s see which streets have sold the most properties:

> byAddress = summarize(groupBy(sales, sales$address), count = n(sales$address))
> head(arrange(byAddress, desc(byAddress$count)), 10)
 
                                                            address count
1                          BARBICAN, LONDON, LONDON, CITY OF LONDON  1398
2          CHRISTCHURCH ROAD, BOURNEMOUTH, BOURNEMOUTH, BOURNEMOUTH  1313
3                   MAIDA VALE, LONDON, LONDON, CITY OF WESTMINSTER  1305
4                     ROTHERHITHE STREET, LONDON, LONDON, SOUTHWARK  1253
5             SLOANE AVENUE, LONDON, LONDON, KENSINGTON AND CHELSEA  1219
6  THE STRAND, BRIGHTON MARINA VILLAGE, BRIGHTON, BRIGHTON AND HOVE  1218
7                     FAIRFIELD ROAD, LONDON, LONDON, TOWER HAMLETS  1217
8                             QUEENSTOWN ROAD, , LONDON, WANDSWORTH  1153
9                   UPPER RICHMOND ROAD, LONDON, LONDON, WANDSWORTH  1123
10                      QUEENSTOWN ROAD, LONDON, LONDON, WANDSWORTH  1079

Next we’ll drill into the data further but that’s for another post.

Written by Mark Needham

September 21st, 2015 at 10:30 pm

Posted in Spark

Tagged with

SparkR: Error in invokeJava(isStatic = TRUE, className, methodName, …) : java.lang.ClassNotFoundException: Failed to load class for data source: csv.

without comments

I’ve been wanting to play around with SparkR for a while and over the weekend deciding to explore a large Land Registry CSV file containing all the sales of properties in the UK over the last 20 years.

First I started up the SparkR shell with the CSV package loaded in:

./spark-1.5.0-bin-hadoop2.6/bin/sparkR --packages com.databricks:spark-csv_2.11:1.2.0

Next I tried to read the CSV file into a Spark data frame by modifying one of the examples from the tutorial:

> sales <- read.df(sqlContext, "pp-complete.csv", "csv")
15/09/20 19:13:02 ERROR RBackendHandler: loadDF on org.apache.spark.sql.api.r.SQLUtils failed
Error in invokeJava(isStatic = TRUE, className, methodName, ...) :
  java.lang.ClassNotFoundException: Failed to load class for data source: csv.
	at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:67)
	at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:87)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114)
	at org.apache.spark.sql.api.r.SQLUtils$.loadDF(SQLUtils.scala:156)
	at org.apache.spark.sql.api.r.SQLUtils.loadDF(SQLUtils.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:132)
	at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:79)
	at org.apache.spark.api.r.RBackendH

As far as I can tell I have loaded in the CSV data source so I’m not sure why that doesn’t work.

However, I came across this github issue which suggested passing in the full package name as the 3rd argument of ‘read.df’ rather than just ‘csv’:

> sales <- read.df(sqlContext, "pp-complete.csv", "com.databricks.spark.csv", header="false")
> sales
DataFrame[C0:string, C1:string, C2:string, C3:string, C4:string, C5:string, C6:string, C7:string, C8:string, C9:string, C10:string, C11:string, C12:string, C13:string, C14:string]

And that worked much better! We can now carry on and do some slicing and dicing of the data to see if there are any interesting insights.

Written by Mark Needham

September 21st, 2015 at 10:06 pm

Posted in Spark

Tagged with

Spark: Convert RDD to DataFrame

without comments

As I mentioned in a previous blog post I’ve been playing around with the Databricks Spark CSV library and wanted to take a CSV file, clean it up and then write out a new CSV file containing some of the columns.

I started by processing the CSV file and writing it into a temporary table:

import org.apache.spark.sql.{SQLContext, Row, DataFrame}
 
val sqlContext = new SQLContext(sc)
val crimeFile = "Crimes_-_2001_to_present.csv"
sqlContext.load("com.databricks.spark.csv", Map("path" -> crimeFile, "header" -> "true")).registerTempTable("crimes")

I wanted to get to the point where I could call the following function which writes a DataFrame to disk:

private def createFile(df: DataFrame, file: String, header: String): Unit = {
  FileUtil.fullyDelete(new File(file))
  val tmpFile = "tmp/" + System.currentTimeMillis() + "-" + file
  df.distinct.save(tmpFile, "com.databricks.spark.csv")
}

The first file only needs to contain the primary type of crime, which we can extract with the following query:

val rows = sqlContext.sql("select `Primary Type` as primaryType FROM crimes LIMIT 10")
 
rows.collect()
res4: Array[org.apache.spark.sql.Row] = Array([ASSAULT], [ROBBERY], [CRIMINAL DAMAGE], [THEFT], [THEFT], [BURGLARY], [THEFT], [BURGLARY], [THEFT], [CRIMINAL DAMAGE])

Some of the primary types have trailing spaces which I want to get rid of. As far as I can tell Spark’s variant of SQL doesn’t have the LTRIM or RTRIM functions but we can map over ‘rows’ and use the String ‘trim’ function instead:

rows.map { case Row(primaryType: String) => Row(primaryType.trim) }
res8: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[29] at map at DataFrame.scala:776

Now we’ve got an RDD of Rows which we need to convert back to a DataFrame again. ‘sqlContext’ has a function which we might be able to use:

sqlContext.createDataFrame(rows.map { case Row(primaryType: String) => Row(primaryType.trim) })
 
<console>:27: error: overloaded method value createDataFrame with alternatives:
  [A <: Product](data: Seq[A])(implicit evidence$4: reflect.runtime.universe.TypeTag[A])org.apache.spark.sql.DataFrame <and>
  [A <: Product](rdd: org.apache.spark.rdd.RDD[A])(implicit evidence$3: reflect.runtime.universe.TypeTag[A])org.apache.spark.sql.DataFrame
 cannot be applied to (org.apache.spark.rdd.RDD[org.apache.spark.sql.Row])
              sqlContext.createDataFrame(rows.map { case Row(primaryType: String) => Row(primaryType.trim) })
                         ^

These are the signatures we can choose from:

2015 08 06 21 58 12

If we want to pass in an RDD of type Row we’re going to have to define a StructType or we can convert each row into something more strongly typed:

case class CrimeType(primaryType: String)
 
sqlContext.createDataFrame(rows.map { case Row(primaryType: String) => CrimeType(primaryType.trim) })
res14: org.apache.spark.sql.DataFrame = [primaryType: string]

Great, we’ve got our DataFrame which we can now plug into the ‘createFile’ function like so:

createFile(
  sqlContext.createDataFrame(rows.map { case Row(primaryType: String) => CrimeType(primaryType.trim) }),
  "/tmp/crimeTypes.csv",
  "crimeType:ID(CrimeType)")

We can actually do better though!

Since we’ve got an RDD of a specific class we can make use of the ‘rddToDataFrameHolder’ implicit function and then the ‘toDF’ function on ‘DataFrameHolder’. This is what the code looks like:

import sqlContext.implicits._
createFile(
  rows.map { case Row(primaryType: String) => CrimeType(primaryType.trim) }.toDF(),
  "/tmp/crimeTypes.csv",
  "crimeType:ID(CrimeType)")

And we’re done!

Written by Mark Needham

August 6th, 2015 at 9:11 pm

Posted in Spark

Tagged with

Spark: pyspark/Hadoop – py4j.protocol.Py4JJavaError: An error occurred while calling o23.load.: org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot communicate with client version 4

without comments

I’ve been playing around with pyspark – Spark’s Python library – and I wanted to execute the following job which takes a file from my local HDFS and then counts how many times each FBI code appears using Spark SQL:

from pyspark import SparkContext
from pyspark.sql import SQLContext
 
sc = SparkContext("local", "Simple App")
sqlContext = SQLContext(sc)
 
file = "hdfs://localhost:9000/user/markneedham/Crimes_-_2001_to_present.csv"
 
sqlContext.load(source="com.databricks.spark.csv", header="true", path = file).registerTempTable("crimes")
rows = sqlContext.sql("select `FBI Code` AS fbiCode, COUNT(*) AS times FROM crimes GROUP BY `FBI Code` ORDER BY times DESC").collect()
 
for row in rows:
    print("{0} -> {1}".format(row.fbiCode, row.times))

I submitted the job and waited:

$ ./spark-1.3.0-bin-hadoop1/bin/spark-submit --driver-memory 5g --packages com.databricks:spark-csv_2.10:1.1.0 fbi_spark.py
...
Traceback (most recent call last):
  File "/Users/markneedham/projects/neo4j-spark-chicago/fbi_spark.py", line 11, in <module>
    sqlContext.load(source="com.databricks.spark.csv", header="true", path = file).registerTempTable("crimes")
  File "/Users/markneedham/projects/neo4j-spark-chicago/spark-1.3.0-bin-hadoop1/python/pyspark/sql/context.py", line 482, in load
    df = self._ssql_ctx.load(source, joptions)
  File "/Users/markneedham/projects/neo4j-spark-chicago/spark-1.3.0-bin-hadoop1/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
  File "/Users/markneedham/projects/neo4j-spark-chicago/spark-1.3.0-bin-hadoop1/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o23.load.
: org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot communicate with client version 4
	at org.apache.hadoop.ipc.Client.call(Client.java:1070)
	at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
	at com.sun.proxy.$Proxy7.getProtocolVersion(Unknown Source)
	at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:396)
	at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379)
	at org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:119)
	at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:238)
	at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:203)
	at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:89)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:176)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
	at org.apache.spark.rdd.RDD.take(RDD.scala:1156)
	at org.apache.spark.rdd.RDD.first(RDD.scala:1189)
	at com.databricks.spark.csv.CsvRelation.firstLine$lzycompute(CsvRelation.scala:129)
	at com.databricks.spark.csv.CsvRelation.firstLine(CsvRelation.scala:127)
	at com.databricks.spark.csv.CsvRelation.inferSchema(CsvRelation.scala:109)
	at com.databricks.spark.csv.CsvRelation.<init>(CsvRelation.scala:62)
	at com.databricks.spark.csv.DefaultSource.createRelation(DefaultSource.scala:115)
	at com.databricks.spark.csv.DefaultSource.createRelation(DefaultSource.scala:40)
	at com.databricks.spark.csv.DefaultSource.createRelation(DefaultSource.scala:28)
	at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:290)
	at org.apache.spark.sql.SQLContext.load(SQLContext.scala:679)
	at org.apache.spark.sql.SQLContext.load(SQLContext.scala:667)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:207)
	at java.lang.Thread.run(Thread.java:745)

It looks like my Hadoop Client and Server are using different versions which in fact they are! We can see from the name of the spark folder that I’m using Hadoop 1.x there and if we check the local Hadoop version we’ll notice it’s using the 2.x seris:

$ hadoop version
Hadoop 2.6.0

In this case the easiest fix is use a version of Spark that’s compiled against Hadoop 2.6 which as of now means Spark 1.4.1.

Let’s try and run our job again:

$ ./spark-1.4.1-bin-hadoop2.6/bin/spark-submit --driver-memory 5g --packages com.databricks:spark-csv_2.10:1.1.0 fbi_spark.py
 
06 -> 859197
08B -> 653575
14 -> 488212
18 -> 457782
26 -> 431316
05 -> 257310
07 -> 197404
08A -> 188964
03 -> 157706
11 -> 112675
04B -> 103961
04A -> 60344
16 -> 47279
15 -> 40361
24 -> 31809
10 -> 22467
17 -> 17555
02 -> 17008
20 -> 15190
19 -> 10878
22 -> 8847
09 -> 6358
01A -> 4830
13 -> 1561
12 -> 835
01B -> 16

And it’s working!

Written by Mark Needham

August 4th, 2015 at 6:35 am

Posted in Spark

Tagged with

Spark: Processing CSV files using Databricks Spark CSV Library

with one comment

Last year I wrote about exploring the Chicago crime data set using Spark and the OpenCSV parser and while this worked well, a few months ago I noticed that there’s now a spark-csv library which I should probably use instead.

I thought it’d be a fun exercise to translate my code to use it.

So to recap our goal: we want to count how many times each type of crime has been committed. I have a more up to date version of the crimes file now so the numbers won’t be exactly the same.

First let’s launch the spark-shell and register our CSV file as a temporary table so we can query it as if it was a SQL table:

$ ./spark-1.3.0-bin-hadoop1/bin/spark-shell
 
scala> import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext
 
scala> val crimeFile = "/Users/markneedham/Downloads/Crimes_-_2001_to_present.csv"
crimeFile: String = /Users/markneedham/Downloads/Crimes_-_2001_to_present.csv
 
scala> val sqlContext = new SQLContext(sc)
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@9746157
 
scala> sqlContext.load("com.databricks.spark.csv", Map("path" -> crimeFile, "header" -> "true")).registerTempTable("crimes")
java.lang.RuntimeException: Failed to load class for data source: com.databricks.spark.csv
	at scala.sys.package$.error(package.scala:27)
	at org.apache.spark.sql.sources.ResolvedDataSource$.lookupDataSource(ddl.scala:268)
	at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:279)
	at org.apache.spark.sql.SQLContext.load(SQLContext.scala:679)
        at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
	at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
	at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
	at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
	at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
	at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856)
	at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901)
	at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813)
	at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656)
	at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664)
	at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669)
	at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996)
	at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
	at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
	at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
	at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944)
	at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058)
	at org.apache.spark.repl.Main$.main(Main.scala:31)
	at org.apache.spark.repl.Main.main(Main.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

I’ve actually forgotten to tell spark-shell about the CSV package so let’s restart the shell and pass it as an argument:

$ ./spark-1.3.0-bin-hadoop1/bin/spark-shell --packages com.databricks:spark-csv_2.10:1.1.0
 
scala> import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext
 
scala> val crimeFile = "/Users/markneedham/Downloads/Crimes_-_2001_to_present.csv"
crimeFile: String = /Users/markneedham/Downloads/Crimes_-_2001_to_present.csv
 
scala> val sqlContext = new SQLContext(sc)
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@44587c44
 
scala> sqlContext.load("com.databricks.spark.csv", Map("path" -> crimeFile, "header" -> "true")).registerTempTable("crimes")
...
15/08/02 18:57:46 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
15/08/02 18:57:46 INFO DAGScheduler: Stage 0 (first at CsvRelation.scala:129) finished in 0.207 s
15/08/02 18:57:46 INFO DAGScheduler: Job 0 finished: first at CsvRelation.scala:129, took 0.267327 s

Now we can write a simple SQL query on our ‘crimes’ table to find the most popular crime types:

scala>  sqlContext.sql(
        """
        select `Primary Type` as primaryType, COUNT(*) AS times
        from crimes
        group by `Primary Type`
        order by times DESC
        """).save("/tmp/agg.csv", "com.databricks.spark.csv")

That spits out a load of CSV ‘part files’ into /tmp/agg.csv so let’s bring in the merge function that we’ve used previously to combine these into one CSV file:

scala> import org.apache.hadoop.conf.Configuration
scala> import org.apache.hadoop.fs._
 
scala> def merge(srcPath: String, dstPath: String): Unit =  {
         val hadoopConfig = new Configuration()
         val hdfs = FileSystem.get(hadoopConfig)
         FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, null)
       }
 
scala> merge("/tmp/agg.csv", "agg.csv")

And finally let’s browse the contents of our new CSV file:

$ cat agg.csv
THEFT,1206745
BATTERY,1066110
CRIMINAL DAMAGE,672782
NARCOTICS,662257
OTHER OFFENSE,360824
ASSAULT,354583
BURGLARY,343443
MOTOR VEHICLE THEFT,278014
ROBBERY,218190
DECEPTIVE PRACTICE,197477
CRIMINAL TRESPASS,171363
PROSTITUTION,65660
WEAPONS VIOLATION,56218
PUBLIC PEACE VIOLATION,42446
OFFENSE INVOLVING CHILDREN,37010
CRIM SEXUAL ASSAULT,21346
SEX OFFENSE,21305
GAMBLING,13704
LIQUOR LAW VIOLATION,13264
INTERFERENCE WITH PUBLIC OFFICER,11366
ARSON,9642
HOMICIDE,7192
KIDNAPPING,6029
INTIMIDATION,3443
STALKING,2760
OBSCENITY,331
PUBLIC INDECENCY,123
OTHER NARCOTIC VIOLATION,106
CONCEALED CARRY LICENSE VIOLATION,34
NON-CRIMINAL,31
NON - CRIMINAL,25
RITUALISM,23
HUMAN TRAFFICKING,9
NON-CRIMINAL (SUBJECT SPECIFIED),3
DOMESTIC VIOLENCE,1

Great! We’ve got the same output with much less code which is always a #win.

Written by Mark Needham

August 2nd, 2015 at 6:08 pm

Posted in Spark

Tagged with

Spark: Generating CSV files to import into Neo4j

with one comment

About a year ago Ian pointed me at a Chicago Crime data set which seemed like a good fit for Neo4j and after much procrastination I’ve finally got around to importing it.

The data set covers crimes committed from 2001 until now. It contains around 4 million crimes and meta data around those crimes such as the location, type of crime and year to name a few.

The contents of the file follow this structure:

$ head -n 10 ~/Downloads/Crimes_-_2001_to_present.csv
ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location
9464711,HX114160,01/14/2014 05:00:00 AM,028XX E 80TH ST,0560,ASSAULT,SIMPLE,APARTMENT,false,true,0422,004,7,46,08A,1196652,1852516,2014,01/20/2014 12:40:05 AM,41.75017626412204,-87.55494559131228,"(41.75017626412204, -87.55494559131228)"
9460704,HX113741,01/14/2014 04:55:00 AM,091XX S JEFFERY AVE,031A,ROBBERY,ARMED: HANDGUN,SIDEWALK,false,false,0413,004,8,48,03,1191060,1844959,2014,01/18/2014 12:39:56 AM,41.729576153145636,-87.57568059471686,"(41.729576153145636, -87.57568059471686)"
9460339,HX113740,01/14/2014 04:44:00 AM,040XX W MAYPOLE AVE,1310,CRIMINAL DAMAGE,TO PROPERTY,RESIDENCE,false,true,1114,011,28,26,14,1149075,1901099,2014,01/16/2014 12:40:00 AM,41.884543798701515,-87.72803579358926,"(41.884543798701515, -87.72803579358926)"
9461467,HX114463,01/14/2014 04:43:00 AM,059XX S CICERO AVE,0820,THEFT,$500 AND UNDER,PARKING LOT/GARAGE(NON.RESID.),false,false,0813,008,13,64,06,1145661,1865031,2014,01/16/2014 12:40:00 AM,41.785633535413176,-87.74148516669783,"(41.785633535413176, -87.74148516669783)"
9460355,HX113738,01/14/2014 04:21:00 AM,070XX S PEORIA ST,0820,THEFT,$500 AND UNDER,STREET,true,false,0733,007,17,68,06,1171480,1858195,2014,01/16/2014 12:40:00 AM,41.766348042591375,-87.64702037047671,"(41.766348042591375, -87.64702037047671)"
9461140,HX113909,01/14/2014 03:17:00 AM,016XX W HUBBARD ST,0610,BURGLARY,FORCIBLE ENTRY,COMMERCIAL / BUSINESS OFFICE,false,false,1215,012,27,24,05,1165029,1903111,2014,01/16/2014 12:40:00 AM,41.889741146006095,-87.66939334853973,"(41.889741146006095, -87.66939334853973)"
9460361,HX113731,01/14/2014 03:12:00 AM,022XX S WENTWORTH AVE,0820,THEFT,$500 AND UNDER,CTA TRAIN,false,false,0914,009,25,34,06,1175363,1889525,2014,01/20/2014 12:40:05 AM,41.85223460427207,-87.63185047834335,"(41.85223460427207, -87.63185047834335)"
9461691,HX114506,01/14/2014 03:00:00 AM,087XX S COLFAX AVE,0650,BURGLARY,HOME INVASION,RESIDENCE,false,false,0423,004,7,46,05,1195052,1847362,2014,01/17/2014 12:40:17 AM,41.73607283858007,-87.56097809501115,"(41.73607283858007, -87.56097809501115)"
9461792,HX114824,01/14/2014 03:00:00 AM,012XX S CALIFORNIA BLVD,0810,THEFT,OVER $500,STREET,false,false,1023,010,28,29,06,1157929,1894034,2014,01/17/2014 12:40:17 AM,41.86498077118534,-87.69571529596696,"(41.86498077118534, -87.69571529596696)"

Since I wanted to import this into Neo4j I needed to do some massaging of the data since the neo4j-import tool expects to receive CSV files containing the nodes and relationships we want to create.

Spark logo 192x100px

I’d been looking at Spark towards the end of last year and the pre-processing of the big initial file into smaller CSV files containing nodes and relationships seemed like a good fit.

I therefore needed to create a Spark job to do this. We’ll then pass this job to a Spark executor running locally and it will spit out CSV files.

2015 04 15 00 51 42

We start by creating a Scala object with a main method that will contain our processing code. Inside that main method we’ll instantiate a Spark context:

import org.apache.spark.{SparkConf, SparkContext}
 
object GenerateCSVFiles {  
    def main(args: Array[String]) {    
        val conf = new SparkConf().setAppName("Chicago Crime Dataset")    
        val sc = new SparkContext(conf)  
    }
}

Easy enough. Next we’ll read in the CSV file. I found the easiest way to reference this was with an environment variable but perhaps there’s a more idiomatic way:

import java.io.File
import org.apache.spark.{SparkConf, SparkContext}
 
object GenerateCSVFiles {
  def main(args: Array[String]) {
    var crimeFile = System.getenv("CSV_FILE")
 
    if(crimeFile == null || !new File(crimeFile).exists()) {
      throw new RuntimeException("Cannot find CSV file [" + crimeFile + "]")
    }
 
    println("Using %s".format(crimeFile))
 
    val conf = new SparkConf().setAppName("Chicago Crime Dataset")
 
    val sc = new SparkContext(conf)
    val crimeData = sc.textFile(crimeFile).cache()
}

The type of crimeData is RDD[String] – Spark’s way of representing the (lazily evaluated) lines of the CSV file. This also includes the header of the file so let’s write a function to get rid of that since we’ll be generating our own headers for the different files:

import org.apache.spark.rdd.RDD
 
// http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAEYYnxYuEaie518ODdn-fR7VvD39d71=CgB_Dxw_4COVXgmYYQ@mail.gmail.com%3E
def dropHeader(data: RDD[String]): RDD[String] = {
  data.mapPartitionsWithIndex((idx, lines) => {
    if (idx == 0) {
      lines.drop(1)
    }
    lines
  })
}

Now we’re ready to start generating our new CSV files so we’ll write a function which parses each line and extracts the appropriate columns. I’m using Open CSV for this:

import au.com.bytecode.opencsv.CSVParser
 
def generateFile(file: String, withoutHeader: RDD[String], fn: Array[String] => Array[String], header: String , distinct:Boolean = true, separator: String = ",") = {
  FileUtil.fullyDelete(new File(file))
 
  val tmpFile = "/tmp/" + System.currentTimeMillis() + "-" + file
  val rows: RDD[String] = withoutHeader.mapPartitions(lines => {
    val parser = new CSVParser(',')
    lines.map(line => {
      val columns = parser.parseLine(line)
      fn(columns).mkString(separator)
    })
  })
 
  if (distinct) rows.distinct() saveAsTextFile tmpFile else rows.saveAsTextFile(tmpFile)
}

We then call this function like this:

generateFile("/tmp/crimes.csv", withoutHeader, columns => Array(columns(0),"Crime", columns(2), columns(6)), "id:ID(Crime),:LABEL,date,description", false)

The output into ‘tmpFile’ is actually 32 ‘part files’ but I wanted to be able to merge those together into individual CSV files that were easier to work with.

I won’t paste the the full job here but if you want to take a look it’s on github.

Now we need to submit the job to Spark. I’ve wrapped this in a script if you want to follow along but these are the contents:

./spark-1.1.0-bin-hadoop1/bin/spark-submit \
--driver-memory 5g \
--class GenerateCSVFiles \
--master local[8] \ 
target/scala-2.10/playground_2.10-1.0.jar \
$@

If we execute that we’ll see the following output…”

Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Crimes_-_2001_to_present.csv
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/04/15 00:31:44 INFO SparkContext: Running Spark version 1.3.0
...
15/04/15 00:47:26 INFO TaskSchedulerImpl: Removed TaskSet 8.0, whose tasks have all completed, from pool
15/04/15 00:47:26 INFO DAGScheduler: Stage 8 (saveAsTextFile at GenerateCSVFiles.scala:51) finished in 2.702 s
15/04/15 00:47:26 INFO DAGScheduler: Job 4 finished: saveAsTextFile at GenerateCSVFiles.scala:51, took 8.715588 s
 
real	0m44.935s
user	4m2.259s
sys	0m14.159s

and these CSV files will be generated:

$ ls -alh /tmp/*.csv
-rwxrwxrwx  1 markneedham  wheel   3.0K 14 Apr 07:37 /tmp/beats.csv
-rwxrwxrwx  1 markneedham  wheel   217M 14 Apr 07:37 /tmp/crimes.csv
-rwxrwxrwx  1 markneedham  wheel    84M 14 Apr 07:37 /tmp/crimesBeats.csv
-rwxrwxrwx  1 markneedham  wheel   120M 14 Apr 07:37 /tmp/crimesPrimaryTypes.csv
-rwxrwxrwx  1 markneedham  wheel   912B 14 Apr 07:37 /tmp/primaryTypes.csv

Let’s have a quick check what they contain:

$ head -n 10 /tmp/beats.csv
id:ID(Beat),:LABEL
1135,Beat
1421,Beat
2312,Beat
1113,Beat
1014,Beat
2411,Beat
1333,Beat
2521,Beat
1652,Beat
$ head -n 10 /tmp/crimes.csv
id:ID(Crime),:LABEL,date,description
9464711,Crime,01/14/2014 05:00:00 AM,SIMPLE
9460704,Crime,01/14/2014 04:55:00 AM,ARMED: HANDGUN
9460339,Crime,01/14/2014 04:44:00 AM,TO PROPERTY
9461467,Crime,01/14/2014 04:43:00 AM,$500 AND UNDER
9460355,Crime,01/14/2014 04:21:00 AM,$500 AND UNDER
9461140,Crime,01/14/2014 03:17:00 AM,FORCIBLE ENTRY
9460361,Crime,01/14/2014 03:12:00 AM,$500 AND UNDER
9461691,Crime,01/14/2014 03:00:00 AM,HOME INVASION
9461792,Crime,01/14/2014 03:00:00 AM,OVER $500
$ head -n 10 /tmp/crimesBeats.csv
:START_ID(Crime),:END_ID(Beat),:TYPE
5896915,0733,ON_BEAT
9208776,2232,ON_BEAT
8237555,0111,ON_BEAT
6464775,0322,ON_BEAT
6468868,0411,ON_BEAT
4189649,0524,ON_BEAT
7620897,0421,ON_BEAT
7720402,0321,ON_BEAT
5053025,1115,ON_BEAT

Looking good. Let’s get them imported into Neo4j:

$ ./neo4j-community-2.2.0/bin/neo4j-import --into /tmp/my-neo --nodes /tmp/crimes.csv --nodes /tmp/beats.csv --nodes /tmp/primaryTypes.csv --relationships /tmp/crimesBeats.csv --relationships /tmp/crimesPrimaryTypes.csv
Nodes
[*>:45.76 MB/s----------------------------------|PROPERTIES(2)=============|NODE:3|v:118.05 MB/]  4M
Done in 5s 605ms
Prepare node index
[*RESOLVE:64.85 MB-----------------------------------------------------------------------------]  4M
Done in 4s 930ms
Calculate dense nodes
[>:42.33 MB/s-------------------|*PREPARE(7)===================================|CALCULATOR-----]  8M
Done in 5s 417ms
Relationships
[>:42.33 MB/s-------------|*PREPARE(7)==========================|RELATIONSHIP------------|v:44.]  8M
Done in 6s 62ms
Node --> Relationship
[*>:??-----------------------------------------------------------------------------------------]  4M
Done in 324ms
Relationship --> Relationship
[*LINK-----------------------------------------------------------------------------------------]  8M
Done in 1s 984ms
Node counts
[*>:??-----------------------------------------------------------------------------------------]  4M
Done in 360ms
Relationship counts
[*>:??-----------------------------------------------------------------------------------------]  8M
Done in 653ms
 
IMPORT DONE in 26s 517ms

Next I updated conf/neo4j-server.properties to point to my new database:

#***************************************************************
# Server configuration
#***************************************************************
 
# location of the database directory
#org.neo4j.server.database.location=data/graph.db
org.neo4j.server.database.location=/tmp/my-neo

Now I can start up Neo and start exploring the data:

$ ./neo4j-community-2.2.0/bin/neo4j start
MATCH (:Crime)-[r:CRIME_TYPE]->() 
RETURN r 
LIMIT 10
Graph  15

There’s lots more relationships and entities that we could pull out of this data set – what I’ve done is just a start. So if you’re up for some more Chicago crime exploration the code and instructions explaining how to run it are on github.

Written by Mark Needham

April 14th, 2015 at 10:56 pm

Posted in Spark

Tagged with ,

Spark: Write to CSV file with header using saveAsFile

with one comment

In my last blog post I showed how to write to a single CSV file using Spark and Hadoop and the next thing I wanted to do was add a header row to the resulting row.

Hadoop’s FileUtil#copyMerge function does take a String parameter but it adds this text to the end of each partition file which isn’t quite what we want.

However, if we copy that function into our own FileUtil class we can restructure it to do what we want:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import java.io.IOException;
 
public class MyFileUtil {
    public static boolean copyMergeWithHeader(FileSystem srcFS, Path srcDir, FileSystem dstFS, Path dstFile, boolean deleteSource, Configuration conf, String header) throws IOException {
        dstFile = checkDest(srcDir.getName(), dstFS, dstFile, false);
        if(!srcFS.getFileStatus(srcDir).isDir()) {
            return false;
        } else {
            FSDataOutputStream out = dstFS.create(dstFile);
            if(header != null) {
                out.write((header + "\n").getBytes("UTF-8"));
            }
 
            try {
                FileStatus[] contents = srcFS.listStatus(srcDir);
 
                for(int i = 0; i < contents.length; ++i) {
                    if(!contents[i].isDir()) {
                        FSDataInputStream in = srcFS.open(contents[i].getPath());
 
                        try {
                            IOUtils.copyBytes(in, out, conf, false);
 
                        } finally {
                            in.close();
                        }
                    }
                }
            } finally {
                out.close();
            }
 
            return deleteSource?srcFS.delete(srcDir, true):true;
        }
    }
 
    private static Path checkDest(String srcName, FileSystem dstFS, Path dst, boolean overwrite) throws IOException {
        if(dstFS.exists(dst)) {
            FileStatus sdst = dstFS.getFileStatus(dst);
            if(sdst.isDir()) {
                if(null == srcName) {
                    throw new IOException("Target " + dst + " is a directory");
                }
 
                return checkDest((String)null, dstFS, new Path(dst, srcName), overwrite);
            }
 
            if(!overwrite) {
                throw new IOException("Target " + dst + " already exists");
            }
        }
        return dst;
    }
}

We can then update our merge function to call this instead:

def merge(srcPath: String, dstPath: String, header:String): Unit =  {
  val hadoopConfig = new Configuration()
  val hdfs = FileSystem.get(hadoopConfig)
  MyFileUtil.copyMergeWithHeader(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, header)
}

We call merge from our code like this:

merge(file, destinationFile, "type,count")

I wasn’t sure how to import my Java based class into the Spark shell so I compiled the code into a JAR and submitted it as a job instead:

$ sbt package
[info] Loading global plugins from /Users/markneedham/.sbt/0.13/plugins
[info] Loading project definition from /Users/markneedham/projects/spark-play/playground/project
[info] Set current project to playground (in build file:/Users/markneedham/projects/spark-play/playground/)
[info] Compiling 3 Scala sources to /Users/markneedham/projects/spark-play/playground/target/scala-2.10/classes...
[info] Packaging /Users/markneedham/projects/spark-play/playground/target/scala-2.10/playground_2.10-1.0.jar ...
[info] Done packaging.
[success] Total time: 8 s, completed 30-Nov-2014 08:12:26
 
$ time ./bin/spark-submit --class "WriteToCsvWithHeader" --master local[4] /path/to/playground/target/scala-2.10/playground_2.10-1.0.jar
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.propertie
...
14/11/30 08:16:15 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
14/11/30 08:16:15 INFO SparkContext: Job finished: saveAsTextFile at WriteToCsvWithHeader.scala:49, took 0.589036 s
 
real	0m13.061s
user	0m38.977s
sys	0m3.393s

And if we look at our destination file:

$ cat /tmp/singlePrimaryTypes.csv
type,count
THEFT,859197
BATTERY,757530
NARCOTICS,489528
CRIMINAL DAMAGE,488209
BURGLARY,257310
OTHER OFFENSE,253964
ASSAULT,247386
MOTOR VEHICLE THEFT,197404
ROBBERY,157706
DECEPTIVE PRACTICE,137538
CRIMINAL TRESPASS,124974
PROSTITUTION,47245
WEAPONS VIOLATION,40361
PUBLIC PEACE VIOLATION,31585
OFFENSE INVOLVING CHILDREN,26524
CRIM SEXUAL ASSAULT,14788
SEX OFFENSE,14283
GAMBLING,10632
LIQUOR LAW VIOLATION,8847
ARSON,6443
INTERFERE WITH PUBLIC OFFICER,5178
HOMICIDE,4846
KIDNAPPING,3585
INTERFERENCE WITH PUBLIC OFFICER,3147
INTIMIDATION,2471
STALKING,1985
OFFENSES INVOLVING CHILDREN,355
OBSCENITY,219
PUBLIC INDECENCY,86
OTHER NARCOTIC VIOLATION,80
RITUALISM,12
NON-CRIMINAL,12
OTHER OFFENSE ,6
NON - CRIMINAL,2
NON-CRIMINAL (SUBJECT SPECIFIED),2

Happy days!

The code is available as a gist if you want to see all the details.

Written by Mark Needham

November 30th, 2014 at 8:21 am

Posted in Spark

Tagged with

Spark: Write to CSV file

with 6 comments

A couple of weeks ago I wrote how I’d been using Spark to explore a City of Chicago Crime data set and having worked out how many of each crime had been committed I wanted to write that to a CSV file.

Spark provides a saveAsTextFile function which allows us to save RDD’s so I refactored my code into the following format to allow me to use that:

import au.com.bytecode.opencsv.CSVParser
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
 
def dropHeader(data: RDD[String]): RDD[String] = {
  data.mapPartitionsWithIndex((idx, lines) => {
    if (idx == 0) {
      lines.drop(1)
    }
    lines
  })
}
 
// https://data.cityofchicago.org/Public-Safety/Crimes-2001-to-present/ijzp-q8t2
val crimeFile = "/Users/markneedham/Downloads/Crimes_-_2001_to_present.csv"
 
val crimeData = sc.textFile(crimeFile).cache()
val withoutHeader: RDD[String] = dropHeader(crimeData)
 
val file = "/tmp/primaryTypes.csv"
FileUtil.fullyDelete(new File(file))
 
val partitions: RDD[(String, Int)] = withoutHeader.mapPartitions(lines => {
  val parser = new CSVParser(',')
  lines.map(line => {
    val columns = parser.parseLine(line)
    (columns(5), 1)
  })
})
 
val counts = partitions.
  reduceByKey {case (x,y) => x + y}.
  sortBy {case (key, value) => -value}.
  map { case (key, value) => Array(key, value).mkString(",") }
 
counts.saveAsTextFile(file)

If we run that code from the Spark shell we end up with a folder called /tmp/primaryTypes.csv containing multiple part files:

$ ls -lah /tmp/primaryTypes.csv/
total 496
drwxr-xr-x  66 markneedham  wheel   2.2K 30 Nov 07:17 .
drwxrwxrwt  80 root         wheel   2.7K 30 Nov 07:16 ..
-rw-r--r--   1 markneedham  wheel     8B 30 Nov 07:16 ._SUCCESS.crc
-rw-r--r--   1 markneedham  wheel    12B 30 Nov 07:16 .part-00000.crc
-rw-r--r--   1 markneedham  wheel    12B 30 Nov 07:16 .part-00001.crc
-rw-r--r--   1 markneedham  wheel    12B 30 Nov 07:16 .part-00002.crc
-rw-r--r--   1 markneedham  wheel    12B 30 Nov 07:16 .part-00003.crc
...
-rwxrwxrwx   1 markneedham  wheel     0B 30 Nov 07:16 _SUCCESS
-rwxrwxrwx   1 markneedham  wheel    28B 30 Nov 07:16 part-00000
-rwxrwxrwx   1 markneedham  wheel    17B 30 Nov 07:16 part-00001
-rwxrwxrwx   1 markneedham  wheel    23B 30 Nov 07:16 part-00002
-rwxrwxrwx   1 markneedham  wheel    16B 30 Nov 07:16 part-00003
...

If we look at some of those part files we can see that it’s written the crime types and counts as expected:

$ cat /tmp/primaryTypes.csv/part-00000
THEFT,859197
BATTERY,757530
 
$ cat /tmp/primaryTypes.csv/part-00003
BURGLARY,257310

This is fine if we’re going to pass those CSV files into another Hadoop based job but I actually want a single CSV file so it’s not quite what I want.

One way to achieve this is to force everything to be calculated on one partition which will mean we only get one part file generated:

val counts = partitions.repartition(1).
  reduceByKey {case (x,y) => x + y}.
  sortBy {case (key, value) => -value}.
  map { case (key, value) => Array(key, value).mkString(",") }
 
 
counts.saveAsTextFile(file)

part-00000 now looks like this:

$ cat !$
cat /tmp/primaryTypes.csv/part-00000
THEFT,859197
BATTERY,757530
NARCOTICS,489528
CRIMINAL DAMAGE,488209
BURGLARY,257310
OTHER OFFENSE,253964
ASSAULT,247386
MOTOR VEHICLE THEFT,197404
ROBBERY,157706
DECEPTIVE PRACTICE,137538
CRIMINAL TRESPASS,124974
PROSTITUTION,47245
WEAPONS VIOLATION,40361
PUBLIC PEACE VIOLATION,31585
OFFENSE INVOLVING CHILDREN,26524
CRIM SEXUAL ASSAULT,14788
SEX OFFENSE,14283
GAMBLING,10632
LIQUOR LAW VIOLATION,8847
ARSON,6443
INTERFERE WITH PUBLIC OFFICER,5178
HOMICIDE,4846
KIDNAPPING,3585
INTERFERENCE WITH PUBLIC OFFICER,3147
INTIMIDATION,2471
STALKING,1985
OFFENSES INVOLVING CHILDREN,355
OBSCENITY,219
PUBLIC INDECENCY,86
OTHER NARCOTIC VIOLATION,80
NON-CRIMINAL,12
RITUALISM,12
OTHER OFFENSE ,6
NON - CRIMINAL,2
NON-CRIMINAL (SUBJECT SPECIFIED),2

This works but it’s quite a bit slower than when we were doing the aggregation across partitions so it’s not ideal.

Instead, what we can do is make use of one of Hadoop’s merge functions which squashes part files together into a single file.

First we import Hadoop into our SBT file:

libraryDependencies += "org.apache.hadoop" % "hadoop-hdfs" % "2.5.2"

Now let’s bring our merge function into the Spark shell:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
 
def merge(srcPath: String, dstPath: String): Unit =  {
  val hadoopConfig = new Configuration()
  val hdfs = FileSystem.get(hadoopConfig)
  FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, null)
}

And now let’s make use of it:

val file = "/tmp/primaryTypes.csv"
FileUtil.fullyDelete(new File(file))
 
val destinationFile= "/tmp/singlePrimaryTypes.csv"
FileUtil.fullyDelete(new File(destinationFile))
 
val counts = partitions.
reduceByKey {case (x,y) => x + y}.
sortBy {case (key, value) => -value}.
map { case (key, value) => Array(key, value).mkString(",") }
 
counts.saveAsTextFile(file)
 
merge(file, destinationFile)

And now we’ve got the best of both worlds:

$ cat /tmp/singlePrimaryTypes.csv
THEFT,859197
BATTERY,757530
NARCOTICS,489528
CRIMINAL DAMAGE,488209
BURGLARY,257310
OTHER OFFENSE,253964
ASSAULT,247386
MOTOR VEHICLE THEFT,197404
ROBBERY,157706
DECEPTIVE PRACTICE,137538
CRIMINAL TRESPASS,124974
PROSTITUTION,47245
WEAPONS VIOLATION,40361
PUBLIC PEACE VIOLATION,31585
OFFENSE INVOLVING CHILDREN,26524
CRIM SEXUAL ASSAULT,14788
SEX OFFENSE,14283
GAMBLING,10632
LIQUOR LAW VIOLATION,8847
ARSON,6443
INTERFERE WITH PUBLIC OFFICER,5178
HOMICIDE,4846
KIDNAPPING,3585
INTERFERENCE WITH PUBLIC OFFICER,3147
INTIMIDATION,2471
STALKING,1985
OFFENSES INVOLVING CHILDREN,355
OBSCENITY,219
PUBLIC INDECENCY,86
OTHER NARCOTIC VIOLATION,80
RITUALISM,12
NON-CRIMINAL,12
OTHER OFFENSE ,6
NON - CRIMINAL,2
NON-CRIMINAL (SUBJECT SPECIFIED),2

The full code is available as a gist if you want to play around with it.

Written by Mark Needham

November 30th, 2014 at 7:40 am

Posted in Spark

Tagged with

Spark: Parse CSV file and group by column value

with one comment

I’ve found myself working with large CSV files quite frequently and realising that my existing toolset didn’t let me explore them quickly I thought I’d spend a bit of time looking at Spark to see if it could help.

I’m working with a crime data set released by the City of Chicago: it’s 1GB in size and contains details of 4 million crimes:

$ ls -alh ~/Downloads/Crimes_-_2001_to_present.csv
-rw-r--r--@ 1 markneedham  staff   1.0G 16 Nov 12:14 /Users/markneedham/Downloads/Crimes_-_2001_to_present.csv
 
$ wc -l ~/Downloads/Crimes_-_2001_to_present.csv
 4193441 /Users/markneedham/Downloads/Crimes_-_2001_to_present.csv

We can get a rough idea of the contents of the file by looking at the first row along with the header:

$ head -n 2 ~/Downloads/Crimes_-_2001_to_present.csv
ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location
9464711,HX114160,01/14/2014 05:00:00 AM,028XX E 80TH ST,0560,ASSAULT,SIMPLE,APARTMENT,false,true,0422,004,7,46,08A,1196652,1852516,2014,01/20/2014 12:40:05 AM,41.75017626412204,-87.55494559131228,"(41.75017626412204, -87.55494559131228)"

I wanted to do a count of the ‘Primary Type’ column to see how many of each crime we have. Using just Unix command line tools this is how we’d do that:

$ time tail +2 ~/Downloads/Crimes_-_2001_to_present.csv | cut -d, -f6  | sort | uniq -c | sort -rn
859197 THEFT
757530 BATTERY
489528 NARCOTICS
488209 CRIMINAL DAMAGE
257310 BURGLARY
253964 OTHER OFFENSE
247386 ASSAULT
197404 MOTOR VEHICLE THEFT
157706 ROBBERY
137538 DECEPTIVE PRACTICE
124974 CRIMINAL TRESPASS
47245 PROSTITUTION
40361 WEAPONS VIOLATION
31585 PUBLIC PEACE VIOLATION
26524 OFFENSE INVOLVING CHILDREN
14788 CRIM SEXUAL ASSAULT
14283 SEX OFFENSE
10632 GAMBLING
8847 LIQUOR LAW VIOLATION
6443 ARSON
5178 INTERFERE WITH PUBLIC OFFICER
4846 HOMICIDE
3585 KIDNAPPING
3147 INTERFERENCE WITH PUBLIC OFFICER
2471 INTIMIDATION
1985 STALKING
 355 OFFENSES INVOLVING CHILDREN
 219 OBSCENITY
  86 PUBLIC INDECENCY
  80 OTHER NARCOTIC VIOLATION
  12 RITUALISM
  12 NON-CRIMINAL
   6 OTHER OFFENSE
   2 NON-CRIMINAL (SUBJECT SPECIFIED)
   2 NON - CRIMINAL
 
real	2m37.495s
user	3m0.337s
sys	0m1.471s

This isn’t too bad but it seems like the type of calculation that Spark is made for so I had a look at how I could go about doing that. To start with I created an SBT project with the following build file:

name := "playground"
 
version := "1.0"
 
scalaVersion := "2.10.4"
 
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.1.0"
 
libraryDependencies += "net.sf.opencsv" % "opencsv" % "2.3"
 
ideaExcludeFolders += ".idea"
 
ideaExcludeFolders += ".idea_modules"

I downloaded Spark and after unpacking it launched the Spark shell:

$ pwd
/Users/markneedham/projects/spark-play/spark-1.1.0/spark-1.1.0-bin-hadoop1
 
$ ./bin/spark-shell
...
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.1.0
      /_/
 
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_51)
...
Spark context available as sc.
 
scala>

I first import some classes I’m going to need:

scala> import au.com.bytecode.opencsv.CSVParser
import au.com.bytecode.opencsv.CSVParser
 
scala> import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD

Now, following the quick start example, we’ll create a Resilient Distributed Dataset (RDD) from our Crime CSV file:

scala> val crimeFile = "/Users/markneedham/Downloads/Crimes_-_2001_to_present.csv"
crimeFile: String = /Users/markneedham/Downloads/Crimes_-_2001_to_present.csv
 
scala> val crimeData = sc.textFile(crimeFile).cache()
14/11/16 22:31:16 INFO MemoryStore: ensureFreeSpace(32768) called with curMem=0, maxMem=278302556
14/11/16 22:31:16 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 32.0 KB, free 265.4 MB)
crimeData: org.apache.spark.rdd.RDD[String] = /Users/markneedham/Downloads/Crimes_-_2001_to_present.csv MappedRDD[1] at textFile at <console>:17

Our next step is to process each line of the file using our CSV Parser. A simple way to do this would be to create a new CSVParser for each line:

scala> crimeData.map(line => {
         val parser = new CSVParser(',')
         parser.parseLine(line).mkString(",")
       }).take(5).foreach(println)
14/11/16 22:35:49 INFO SparkContext: Starting job: take at <console>:23
...
4/11/16 22:35:49 INFO SparkContext: Job finished: take at <console>:23, took 0.013904 s
ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location
9464711,HX114160,01/14/2014 05:00:00 AM,028XX E 80TH ST,0560,ASSAULT,SIMPLE,APARTMENT,false,true,0422,004,7,46,08A,1196652,1852516,2014,01/20/2014 12:40:05 AM,41.75017626412204,-87.55494559131228,(41.75017626412204, -87.55494559131228)
9460704,HX113741,01/14/2014 04:55:00 AM,091XX S JEFFERY AVE,031A,ROBBERY,ARMED: HANDGUN,SIDEWALK,false,false,0413,004,8,48,03,1191060,1844959,2014,01/18/2014 12:39:56 AM,41.729576153145636,-87.57568059471686,(41.729576153145636, -87.57568059471686)
9460339,HX113740,01/14/2014 04:44:00 AM,040XX W MAYPOLE AVE,1310,CRIMINAL DAMAGE,TO PROPERTY,RESIDENCE,false,true,1114,011,28,26,14,1149075,1901099,2014,01/16/2014 12:40:00 AM,41.884543798701515,-87.72803579358926,(41.884543798701515, -87.72803579358926)
9461467,HX114463,01/14/2014 04:43:00 AM,059XX S CICERO AVE,0820,THEFT,$500 AND UNDER,PARKING LOT/GARAGE(NON.RESID.),false,false,0813,008,13,64,06,1145661,1865031,2014,01/16/2014 12:40:00 AM,41.785633535413176,-87.74148516669783,(41.785633535413176, -87.74148516669783)

That works but it’s a bit wasteful to create a new CSVParser each time so instead let’s just create one for each partition that Spark splits our file up into:

scala> crimeData.mapPartitions(lines => {
         val parser = new CSVParser(',')
         lines.map(line => {
           parser.parseLine(line).mkString(",")
         })
       }).take(5).foreach(println)
14/11/16 22:38:44 INFO SparkContext: Starting job: take at <console>:25
...
14/11/16 22:38:44 INFO SparkContext: Job finished: take at <console>:25, took 0.015216 s
ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location
9464711,HX114160,01/14/2014 05:00:00 AM,028XX E 80TH ST,0560,ASSAULT,SIMPLE,APARTMENT,false,true,0422,004,7,46,08A,1196652,1852516,2014,01/20/2014 12:40:05 AM,41.75017626412204,-87.55494559131228,(41.75017626412204, -87.55494559131228)
9460704,HX113741,01/14/2014 04:55:00 AM,091XX S JEFFERY AVE,031A,ROBBERY,ARMED: HANDGUN,SIDEWALK,false,false,0413,004,8,48,03,1191060,1844959,2014,01/18/2014 12:39:56 AM,41.729576153145636,-87.57568059471686,(41.729576153145636, -87.57568059471686)
9460339,HX113740,01/14/2014 04:44:00 AM,040XX W MAYPOLE AVE,1310,CRIMINAL DAMAGE,TO PROPERTY,RESIDENCE,false,true,1114,011,28,26,14,1149075,1901099,2014,01/16/2014 12:40:00 AM,41.884543798701515,-87.72803579358926,(41.884543798701515, -87.72803579358926)
9461467,HX114463,01/14/2014 04:43:00 AM,059XX S CICERO AVE,0820,THEFT,$500 AND UNDER,PARKING LOT/GARAGE(NON.RESID.),false,false,0813,008,13,64,06,1145661,1865031,2014,01/16/2014 12:40:00 AM,41.785633535413176,-87.74148516669783,(41.785633535413176, -87.74148516669783)

You’ll notice that we’ve still got the header being printed which isn’t ideal – let’s get rid of it!I expected there to be a ‘drop’ function which would allow me to do that but in fact there isn’t. Instead we can make use of our knowledge that the first partition will contain the first line and strip it out that way:

scala> def dropHeader(data: RDD[String]): RDD[String] = {
         data.mapPartitionsWithIndex((idx, lines) => {
           if (idx == 0) {
             lines.drop(1)
           }
           lines
         })
       }
dropHeader: (data: org.apache.spark.rdd.RDD[String])org.apache.spark.rdd.RDD[String]

Now let’s grab the first 5 lines again and print them out:

scala> val withoutHeader: RDD[String] = dropHeader(crimeData)
withoutHeader: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at mapPartitionsWithIndex at <console>:14
 
scala> withoutHeader.mapPartitions(lines => {
         val parser = new CSVParser(',')
         lines.map(line => {
           parser.parseLine(line).mkString(",")
         })
       }).take(5).foreach(println)
14/11/16 22:43:27 INFO SparkContext: Starting job: take at <console>:29
...
14/11/16 22:43:27 INFO SparkContext: Job finished: take at <console>:29, took 0.018557 s
9464711,HX114160,01/14/2014 05:00:00 AM,028XX E 80TH ST,0560,ASSAULT,SIMPLE,APARTMENT,false,true,0422,004,7,46,08A,1196652,1852516,2014,01/20/2014 12:40:05 AM,41.75017626412204,-87.55494559131228,(41.75017626412204, -87.55494559131228)
9460704,HX113741,01/14/2014 04:55:00 AM,091XX S JEFFERY AVE,031A,ROBBERY,ARMED: HANDGUN,SIDEWALK,false,false,0413,004,8,48,03,1191060,1844959,2014,01/18/2014 12:39:56 AM,41.729576153145636,-87.57568059471686,(41.729576153145636, -87.57568059471686)
9460339,HX113740,01/14/2014 04:44:00 AM,040XX W MAYPOLE AVE,1310,CRIMINAL DAMAGE,TO PROPERTY,RESIDENCE,false,true,1114,011,28,26,14,1149075,1901099,2014,01/16/2014 12:40:00 AM,41.884543798701515,-87.72803579358926,(41.884543798701515, -87.72803579358926)
9461467,HX114463,01/14/2014 04:43:00 AM,059XX S CICERO AVE,0820,THEFT,$500 AND UNDER,PARKING LOT/GARAGE(NON.RESID.),false,false,0813,008,13,64,06,1145661,1865031,2014,01/16/2014 12:40:00 AM,41.785633535413176,-87.74148516669783,(41.785633535413176, -87.74148516669783)
9460355,HX113738,01/14/2014 04:21:00 AM,070XX S PEORIA ST,0820,THEFT,$500 AND UNDER,STREET,true,false,0733,007,17,68,06,1171480,1858195,2014,01/16/2014 12:40:00 AM,41.766348042591375,-87.64702037047671,(41.766348042591375, -87.64702037047671)

We’re finally in good shape to extract the values from the ‘Primary Type’ column and count how many times each of those appears in our data set:

scala> withoutHeader.mapPartitions(lines => {
         val parser=new CSVParser(',')
         lines.map(line => {
           val columns = parser.parseLine(line)
           Array(columns(5)).mkString(",")
         })
       }).countByValue().toList.sortBy(-_._2).foreach(println)
14/11/16 22:45:20 INFO SparkContext: Starting job: countByValue at <console>:30
14/11/16 22:45:20 INFO DAGScheduler: Got job 7 (countByValue at <console>:30) with 32 output partitions (allowLocal=false)
...
14/11/16 22:45:30 INFO SparkContext: Job finished: countByValue at <console>:30, took 9.796565 s
(THEFT,859197)
(BATTERY,757530)
(NARCOTICS,489528)
(CRIMINAL DAMAGE,488209)
(BURGLARY,257310)
(OTHER OFFENSE,253964)
(ASSAULT,247386)
(MOTOR VEHICLE THEFT,197404)
(ROBBERY,157706)
(DECEPTIVE PRACTICE,137538)
(CRIMINAL TRESPASS,124974)
(PROSTITUTION,47245)
(WEAPONS VIOLATION,40361)
(PUBLIC PEACE VIOLATION,31585)
(OFFENSE INVOLVING CHILDREN,26524)
(CRIM SEXUAL ASSAULT,14788)
(SEX OFFENSE,14283)
(GAMBLING,10632)
(LIQUOR LAW VIOLATION,8847)
(ARSON,6443)
(INTERFERE WITH PUBLIC OFFICER,5178)
(HOMICIDE,4846)
(KIDNAPPING,3585)
(INTERFERENCE WITH PUBLIC OFFICER,3147)
(INTIMIDATION,2471)
(STALKING,1985)
(OFFENSES INVOLVING CHILDREN,355)
(OBSCENITY,219)
(PUBLIC INDECENCY,86)
(OTHER NARCOTIC VIOLATION,80)
(NON-CRIMINAL,12)
(RITUALISM,12)
(OTHER OFFENSE ,6)
(NON-CRIMINAL (SUBJECT SPECIFIED),2)
(NON - CRIMINAL,2)

We get the same results as with the Unix commands except it took less than 10 seconds to calculate which is pretty cool!

Written by Mark Needham

November 16th, 2014 at 10:53 pm

Posted in Spark

Tagged with