Mark Needham

Thoughts on Software Development

R: Time to/from the weekend

with one comment

In my last post I showed some examples using R’s lubridate package and another problem it made really easy to solve was working out how close a particular date time was to the weekend.

I wanted to write a function which would return the previous Sunday or upcoming Saturday depending on which was closer.

lubridate’s floor_date and ceiling_date functions make this quite simple.

e.g. if we want to round the 18th December down to the beginning of the week and up to the beginning of the next week we could do the following:

> library(lubridate)
> floor_date(ymd("2014-12-18"), "week")
[1] "2014-12-14 UTC"
 
> ceiling_date(ymd("2014-12-18"), "week")
[1] "2014-12-21 UTC"

For the date in the future we actually want to grab the Saturday rather than the Sunday so we’ll subtract one day from that:

> ceiling_date(ymd("2014-12-18"), "week") - days(1)
[1] "2014-12-20 UTC"

Now let’s put that together into a function which finds the closest weekend for a given date:

findClosestWeekendDay = function(dateToLookup) {
  before = floor_date(dateToLookup, "week") + hours(23) + minutes(59) + seconds(59)
  after  = ceiling_date(dateToLookup, "week") - days(1)
  if((dateToLookup - before) < (after - dateToLookup)) {
    before  
  } else {
    after  
  }
}
 
> findClosestWeekendDay(ymd_hms("2014-12-13 13:33:29"))
[1] "2014-12-13 UTC"
 
> findClosestWeekendDay(ymd_hms("2014-12-14 18:33:29"))
[1] "2014-12-14 23:59:59 UTC"
 
> findClosestWeekendDay(ymd_hms("2014-12-15 18:33:29"))
[1] "2014-12-14 23:59:59 UTC"
 
> findClosestWeekendDay(ymd_hms("2014-12-17 11:33:29"))
[1] "2014-12-14 23:59:59 UTC"
 
> findClosestWeekendDay(ymd_hms("2014-12-17 13:33:29"))
[1] "2014-12-20 UTC"
 
> findClosestWeekendDay(ymd_hms("2014-12-19 13:33:29"))
[1] "2014-12-20 UTC"

I’ve set the Sunday date at 23:59:59 so that I can use this date in the next step where we want to calculate how how many hours it is from the current date to the nearest weekend.

I ended up with this function:

distanceFromWeekend = function(dateToLookup) {
  before = floor_date(dateToLookup, "week") + hours(23) + minutes(59) + seconds(59)
  after  = ceiling_date(dateToLookup, "week") - days(1)
  timeToBefore = dateToLookup - before
  timeToAfter = after - dateToLookup
 
  if(timeToBefore < 0 || timeToAfter < 0) {
    0  
  } else {
    if(timeToBefore < timeToAfter) {
      timeToBefore / dhours(1)
    } else {
      timeToAfter / dhours(1)
    }
  }
}
 
> distanceFromWeekend(ymd_hms("2014-12-13 13:33:29"))
[1] 0
 
> distanceFromWeekend(ymd_hms("2014-12-14 18:33:29"))
[1] 0
 
> distanceFromWeekend(ymd_hms("2014-12-15 18:33:29"))
[1] 18.55833
 
> distanceFromWeekend(ymd_hms("2014-12-17 11:33:29"))
[1] 59.55833
 
> distanceFromWeekend(ymd_hms("2014-12-17 13:33:29"))
[1] 58.44194
 
> distanceFromWeekend(ymd_hms("2014-12-19 13:33:29"))
[1] 10.44194

While this works it’s quite slow when you run it over a data frame which contains a lot of rows.

There must be a clever R way of doing the same thing (perhaps using matrices) which I haven’t figured out yet so if you know how to speed it up do let me know.

Written by Mark Needham

December 13th, 2014 at 8:38 pm

Posted in R

Tagged with ,

R: Numeric representation of date time

without comments

I’ve been playing around with date times in R recently and I wanted to derive a numeric representation for a given value to make it easier to see the correlation between time and another variable.

e.g. December 13th 2014 17:30 should return 17.5 since it’s 17.5 hours since midnight.

Using the standard R libraries we would write the following code:

> december13 = as.POSIXlt("2014-12-13 17:30:00")
> as.numeric(december13 - trunc(december13, "day"), units="hours")
[1] 17.5

That works pretty well but Antonios recently introduced me to the lubridate so I thought I’d give that a try as well.

The first nice thing about lubridate is that we can use the date we created earlier and call the floor_date function rather than truncate:

> (december13 - floor_date(december13, "day"))
Time difference of 17.5 hours

That gives us back a difftime

> class((december13 - floor_date(december13, "day")))
[1] "difftime"

…which we can divide by different units to get the granularity we want:

> diff = (december13 - floor_date(december13, "day"))
> diff / dhours(1)
[1] 17.5
 
> diff / ddays(1)
[1] 0.7291667
 
> diff / dminutes(1)
[1] 1050

Pretty neat!

lubridate also has some nice functions for creating dates/date times. e.g.

> ymd_hms("2014-12-13 17:00:00")
[1] "2014-12-13 17:00:00 UTC"
 
> ymd_hm("2014-12-13 17:00")
[1] "2014-12-13 17:00:00 UTC"
 
> ymd_h("2014-12-13 17")
[1] "2014-12-13 17:00:00 UTC"
 
> ymd("2014-12-13")
[1] "2014-12-13 UTC"

And if you want a different time zone that’s pretty easy too:

> with_tz(ymd("2014-12-13"), "GMT")
[1] "2014-12-13 GMT"

Written by Mark Needham

December 13th, 2014 at 7:58 pm

Posted in R

Tagged with ,

R: data.table/dplyr/lubridate – Error in wday(date, label = TRUE, abbr = FALSE) : unused arguments (label = TRUE, abbr = FALSE)

without comments

I spent a couple of hours playing around with data.table this evening and tried changing some code written using a data frame to use a data table instead.

I started off by building a data frame which contains all the weekends between 2010 and 2015…

> library(lubridate)
 
> library(dplyr)
 
> dates = data.frame(date = seq( dmy("01-01-2010"), to=dmy("01-01-2015"), by="day" ))
 
> dates = dates %>% filter(wday(date, label = TRUE, abbr = FALSE) %in% c("Saturday", "Sunday"))

…which works fine:

> dates %>% head()
         date
1: 2010-01-02
2: 2010-01-03
3: 2010-01-09
4: 2010-01-10
5: 2010-01-16
6: 2010-01-17

I then tried to change the code to use a data table instead which led to the following error:

> library(data.table)
 
> dates = data.table(date = seq( dmy("01-01-2010"), to=dmy("01-01-2015"), by="day" ))
 
> dates = dates %>% filter(wday(date, label = TRUE, abbr = FALSE) %in% c("Saturday", "Sunday"))
Error in wday(date, label = TRUE, abbr = FALSE) : 
  unused arguments (label = TRUE, abbr = FALSE)

I wasn’t sure what was going on so I went back to the data frame version to check if that still worked…

> dates = data.frame(date = seq( dmy("01-01-2010"), to=dmy("01-01-2015"), by="day" ))
 
> dates = dates %>% filter(wday(date, label = TRUE, abbr = FALSE) %in% c("Saturday", "Sunday"))
Error in wday(c(1262304000, 1262390400, 1262476800, 1262563200, 1262649600,  : 
  unused arguments (label = TRUE, abbr = FALSE)

…except it now didn’t work either! I decided to check what wday was referring to…

Help on topic ‘wday’ was found in the following packages:

Integer based date class
(in package data.table in library /Library/Frameworks/R.framework/Versions/3.1/Resources/library)
Get/set days component of a date-time.
(in package lubridate in library /Library/Frameworks/R.framework/Versions/3.1/Resources/library)

…and realised that data.table has its own wday function – I’d been caught out by R’s global scoping of all the things!

We can probably work around that by the order in which we require the various libraries but for now I’m just prefixing the call to wday and all is well:

dates = dates %>% filter(lubridate::wday(date, label = TRUE, abbr = FALSE) %in% c("Saturday", "Sunday"))

Written by Mark Needham

December 11th, 2014 at 7:03 pm

Posted in R

Tagged with ,

R: Cleaning up and plotting Google Trends data

with one comment

I recently came across an excellent article written by Stian Haklev in which he describes things he wishes he’d been told before starting out with R, one being to do all data clean up in code which I thought I’d give a try.

My goal is to leave the raw data completely unchanged, and do all the transformation in code, which can be rerun at any time.

While I’m writing the scripts, I’m often jumping around, selectively executing individual lines or code blocks, running commands to inspect the data in the REPL (read-evaluate-print-loop, where each command is executed as soon as you type enter, in the picture above it’s the pane to the right), etc.

But I try to make sure that when I finish up, the script is runnable by itself.

I thought the Google Trends data set would be an interesting one to play around with as it gives you a CSV containing several different bits of data of which I’m only interested in ‘interest over time’.

It’s not very easy to automate the download of the CSV file so I did that bit manually and automated everything from there onwards.

The first step was to read the CSV file and explore some of the rows to see what it contained:

> library(dplyr)
 
> googleTrends = read.csv("/Users/markneedham/Downloads/report.csv", row.names=NULL)
 
> googleTrends %>% head()
##                   row.names Web.Search.interest..neo4j
## 1 Worldwide; 2004 - present                           
## 2        Interest over time                           
## 3                      Week                      neo4j
## 4   2004-01-04 - 2004-01-10                          0
## 5   2004-01-11 - 2004-01-17                          0
## 6   2004-01-18 - 2004-01-24                          0
 
> googleTrends %>% sample_n(10)
##                   row.names Web.Search.interest..neo4j
## 109 2006-01-08 - 2006-01-14                          0
## 113 2006-02-05 - 2006-02-11                          0
## 267 2009-01-18 - 2009-01-24                          0
## 199 2007-09-30 - 2007-10-06                          0
## 522 2013-12-08 - 2013-12-14                         88
## 265 2009-01-04 - 2009-01-10                          0
## 285 2009-05-24 - 2009-05-30                          0
## 318 2010-01-10 - 2010-01-16                          0
## 495 2013-06-02 - 2013-06-08                         79
## 28  2004-06-20 - 2004-06-26                          0
 
> googleTrends %>% tail()
##                row.names Web.Search.interest..neo4j
## 658        neo4j example                   Breakout
## 659 neo4j graph database                   Breakout
## 660           neo4j java                   Breakout
## 661           neo4j node                   Breakout
## 662           neo4j rest                   Breakout
## 663       neo4j tutorial                   Breakout

We only want to keep the rows which contain (week, interest) pairs so the first thing we’ll do is rename the columns:

names(googleTrends) = c("week", "score")

Now we want to strip out the rows which don’t contain (week, interest) pairs. The easiest way to do this is to look for rows which don’t contain date values in the ‘week’ column.

First we need to split the start and end dates in that column by using the strsplit function.

I found it much easier to apply the function to each row individually rather than passing in a list of values so I created a dummy column with a row number in to allow me to do that (a trick Antonios showed me):

> googleTrends %>% 
    mutate(ind = row_number()) %>% 
    group_by(ind) %>%
    mutate(dates = strsplit(week, " - "),
           start = dates[[1]][1] %>% strptime("%Y-%m-%d") %>% as.character(),
           end =   dates[[1]][2] %>% strptime("%Y-%m-%d") %>% as.character()) %>%
    head()
## Source: local data frame [6 x 6]
## Groups: ind
## 
##                        week score ind    dates      start        end
## 1 Worldwide; 2004 - present     1   1 <chr[2]>         NA         NA
## 2        Interest over time     1   2 <chr[1]>         NA         NA
## 3                      Week    90   3 <chr[1]>         NA         NA
## 4   2004-01-04 - 2004-01-10     3   4 <chr[2]> 2004-01-04 2004-01-10
## 5   2004-01-11 - 2004-01-17     3   5 <chr[2]> 2004-01-11 2004-01-17
## 6   2004-01-18 - 2004-01-24     3   6 <chr[2]> 2004-01-18 2004-01-24

Now we need to get rid of the rows which have an NA value for ‘start’ or ‘end':

> googleTrends %>% 
    mutate(ind = row_number()) %>% 
    group_by(ind) %>%
    mutate(dates = strsplit(week, " - "),
           start = dates[[1]][1] %>% strptime("%Y-%m-%d") %>% as.character(),
           end =   dates[[1]][2] %>% strptime("%Y-%m-%d") %>% as.character()) %>%
    filter(!is.na(start) | !is.na(end)) %>% 
    head()
## Source: local data frame [6 x 6]
## Groups: ind
## 
##                      week score ind    dates      start        end
## 1 2004-01-04 - 2004-01-10     3   4 <chr[2]> 2004-01-04 2004-01-10
## 2 2004-01-11 - 2004-01-17     3   5 <chr[2]> 2004-01-11 2004-01-17
## 3 2004-01-18 - 2004-01-24     3   6 <chr[2]> 2004-01-18 2004-01-24
## 4 2004-01-25 - 2004-01-31     3   7 <chr[2]> 2004-01-25 2004-01-31
## 5 2004-02-01 - 2004-02-07     3   8 <chr[2]> 2004-02-01 2004-02-07
## 6 2004-02-08 - 2004-02-14     3   9 <chr[2]> 2004-02-08 2004-02-14

Next we’ll get rid of ‘week’, ‘ind’ and ‘dates’ as we aren’t going to need those anymore:

> cleanGoogleTrends = googleTrends %>% 
    mutate(ind = row_number()) %>% 
    group_by(ind) %>%
    mutate(dates = strsplit(week, " - "),
           start = dates[[1]][1] %>% strptime("%Y-%m-%d") %>% as.character(),
           end =   dates[[1]][2] %>% strptime("%Y-%m-%d") %>% as.character()) %>%
    filter(!is.na(start) | !is.na(end)) %>%
    ungroup() %>%
    select(-c(ind, dates, week))
 
> cleanGoogleTrends %>% head()
## Source: local data frame [6 x 3]
## 
##   score      start        end
## 1     3 2004-01-04 2004-01-10
## 2     3 2004-01-11 2004-01-17
## 3     3 2004-01-18 2004-01-24
## 4     3 2004-01-25 2004-01-31
## 5     3 2004-02-01 2004-02-07
## 6     3 2004-02-08 2004-02-14
 
> cleanGoogleTrends %>% sample_n(10)
## Source: local data frame [10 x 3]
## 
##    score      start        end
## 1      8 2010-09-26 2010-10-02
## 2     73 2013-11-17 2013-11-23
## 3     52 2012-07-01 2012-07-07
## 4      3 2005-06-19 2005-06-25
## 5      3 2004-12-12 2004-12-18
## 6      3 2009-09-06 2009-09-12
## 7     71 2014-09-14 2014-09-20
## 8      3 2004-12-26 2005-01-01
## 9     62 2013-03-03 2013-03-09
## 10     3 2006-03-19 2006-03-25
 
> cleanGoogleTrends %>% tail()
## Source: local data frame [6 x 3]
## 
##   score      start        end
## 1    80 2014-10-19 2014-10-25
## 2    80 2014-10-26 2014-11-01
## 3    84 2014-11-02 2014-11-08
## 4    81 2014-11-09 2014-11-15
## 5    83 2014-11-16 2014-11-22
## 6     2 2014-11-23 2014-11-29

Ok now we’re ready to plot. This was my first attempt:

> library(ggplot2)
> ggplot(aes(x = start, y = score), data = cleanGoogleTrends) + 
    geom_line(size = 0.5)
## geom_path: Each group consist of only one observation. Do you need to adjust the group aesthetic?
2014 12 09 17 57 49

As you can see, not too successful! The first mistake I’ve made is not telling ggplot that the ‘start’ column is a date and so it can use that ordering when plotting:

> cleanGoogleTrends = cleanGoogleTrends %>% mutate(start =  as.Date(start))
> ggplot(aes(x = start, y = score), data = cleanGoogleTrends) + 
    geom_line(size = 0.5)

2014 12 09 18 00 03

My next mistake is that ‘score’ is not being treated as a continuous variable and so we’re ending up with this very strange looking chart. We can see that if we call the class function:

> class(cleanGoogleTrends$score)
## [1] "factor"

Let’s fix that and plot again:

> cleanGoogleTrends = cleanGoogleTrends %>% mutate(score = as.numeric(score))
> ggplot(aes(x = start, y = score), data = cleanGoogleTrends) + 
    geom_line(size = 0.5)

2014 12 09 18 02 39

That’s much better but there is quite a bit of noise in the week to week scores which we can flatten a bit by plotting a rolling mean of the last 4 weeks instead:

> library(zoo)
> cleanGoogleTrends = cleanGoogleTrends %>% 
    mutate(rolling = rollmean(score, 4, fill = NA, align=c("right")),
           start =  as.Date(start))
 
> ggplot(aes(x = start, y = rolling), data = cleanGoogleTrends) + 
    geom_line(size = 0.5)

2014 12 09 18 05 26

Here’s the full code if you want to reproduce:

library(dplyr)
library(zoo)
library(ggplot2)
 
googleTrends = read.csv("/Users/markneedham/Downloads/report.csv", row.names=NULL)
names(googleTrends) = c("week", "score")
 
cleanGoogleTrends = googleTrends %>% 
  mutate(ind = row_number()) %>% 
  group_by(ind) %>%
  mutate(dates = strsplit(week, " - "),
         start = dates[[1]][1] %>% strptime("%Y-%m-%d") %>% as.character(),
         end =   dates[[1]][2] %>% strptime("%Y-%m-%d") %>% as.character()) %>%
  filter(!is.na(start) | !is.na(end)) %>%
  ungroup() %>%
  select(-c(ind, dates, week)) %>%
  mutate(start =  as.Date(start),
         score = as.numeric(score),
         rolling = rollmean(score, 4, fill = NA, align=c("right")))
 
ggplot(aes(x = start, y = rolling), data = cleanGoogleTrends) + 
  geom_line(size = 0.5)

My next step is to plot the Google Trends scores against my meetup data set to see if there’s any interesting correlations going on.

As an aside I made use of knitr while putting together this post – it works really well for checking that you’ve included all the steps and that it actually works!

Written by Mark Needham

December 9th, 2014 at 6:14 pm

Posted in R

Tagged with ,

R: dplyr – mutate with strptime (incompatible size/wrong result size)

without comments

Having worked out how to translate a string into a date or NA if it wasn’t the appropriate format the next thing I wanted to do was store the result of the transformation in my data frame.

I started off with this:

data = data.frame(x = c("2014-01-01", "2014-02-01", "foo"))
> data
           x
1 2014-01-01
2 2014-02-01
3        foo

And when I tried to do the date translation ran into the following error:

> data %>% mutate(y = strptime(x, "%Y-%m-%d"))
Error: wrong result size (11), expected 3 or 1

As I understand it this error is telling us that we are trying to put a value into the data frame which represents 11 rows rather than 3 rows or 1 row.

It turns out that storing POSIXlts in a data frame isn’t such a good idea! In this case we can use the as.character function to create a character vector which can be stored in the data frame:

> data %>% mutate(y = strptime(x, "%Y-%m-%d") %>% as.character())
           x          y
1 2014-01-01 2014-01-01
2 2014-02-01 2014-02-01
3        foo       <NA>

We can then get rid of the NA row by using the is.na function:

> data %>% mutate(y = strptime(x, "%Y-%m-%d") %>% as.character()) %>% filter(!is.na(y))
           x          y
1 2014-01-01 2014-01-01
2 2014-02-01 2014-02-01

And a final tweak so that we have 100% pipelining goodness:

> data %>% 
    mutate(y = x %>% strptime("%Y-%m-%d") %>% as.character()) %>%
    filter(!is.na(y))
           x          y
1 2014-01-01 2014-01-01
2 2014-02-01 2014-02-01

Written by Mark Needham

December 8th, 2014 at 7:02 pm

Posted in R

Tagged with ,

R: String to Date or NA

without comments

I’ve been trying to clean up a CSV file which contains some rows with dates and some not – I only want to keep the cells which do have dates so I’ve been trying to work out how to do that.

My first thought was that I’d try and find a function which would convert the contents of the cell into a date if it was in date format and NA if not. I could then filter out the NA values using the is.na function.

I started out with the as.Date function…

> as.Date("2014-01-01")
[1] "2014-01-01"
 
> as.Date("foo")
Error in charToDate(x) : 
  character string is not in a standard unambiguous format

…but that throws an error if we have a non date value so it’s not so useful in this case.

Instead we can make use of the strptime function which does exactly what we want:

> strptime("2014-01-01", "%Y-%m-%d")
[1] "2014-01-01 GMT"
 
> strptime("foo", "%Y-%m-%d")
[1] NA

We can then feed those values into is.na..

> strptime("2014-01-01", "%Y-%m-%d") %>% is.na()
[1] FALSE
 
> strptime("foo", "%Y-%m-%d") %>% is.na()
[1] TRUE

…and we have exactly the behaviour we were looking for.

Written by Mark Needham

December 7th, 2014 at 7:29 pm

Posted in R

Tagged with ,

R: Applying a function to every row of a data frame

without comments

In my continued exploration of London’s meetups I wanted to calculate the distance from meetup venues to a centre point in London.

I’ve created a gist containing the coordinates of some of the venues that host NoSQL meetups in London town if you want to follow along:

library(dplyr)
 
# https://gist.github.com/mneedham/7e926a213bf76febf5ed
venues = read.csv("/tmp/venues.csv")
 
venues %>% head()
##                        venue      lat       lon
## 1              Skills Matter 51.52482 -0.099109
## 2                   Skinkers 51.50492 -0.083870
## 3          Theodore Bullfrog 51.50878 -0.123749
## 4 The Skills Matter eXchange 51.52452 -0.099231
## 5               The Guardian 51.53373 -0.122340
## 6            White Bear Yard 51.52227 -0.109804

Now to do the calculation. I’ve chosen the Centre Point building in Tottenham Court Road as our centre point. We can use the distHaversine function in the geosphere library allows us to do the calculation:

options("scipen"=100, "digits"=4)
library(geosphere)
 
centre = c(-0.129581, 51.516578)
aVenue = venues %>% slice(1)
aVenue
##           venue   lat      lon
## 1 Skills Matter 51.52 -0.09911

Now we can calculate the distance from Skillsmatter to our centre point:

distHaversine(c(aVenue$lon, aVenue$lat), centre)
## [1] 2302

That works pretty well so now we want to apply it to every row in the venues data frame and add an extra column containing that value.

This was my first attempt…

venues %>% mutate(distHaversine(c(lon,lat),centre))
## Error in .pointsToMatrix(p1): Wrong length for a vector, should be 2

…which didn’t work quite as I’d imagined!

I eventually found my way to the by function which allows you to ‘apply a function to a data frame split by factors’. In this case I wouldn’t be grouping rows by a factor – I’d apply the function to each row separately.

I wired everything up like so:

distanceFromCentre = by(venues, 1:nrow(venues), function(row) { distHaversine(c(row$lon, row$lat), centre)  })
 
distanceFromCentre %>% head()
## 1:nrow(venues)
##      1      2      3      4      5      6 
## 2301.6 3422.6  957.5 2280.6 1974.1 1509.5

We can now add the distances to our venues data frame:

venuesWithCentre = venues %>% 
  mutate(distanceFromCentre = by(venues, 1:nrow(venues), function(row) { distHaversine(c(row$lon, row$lat), centre)  }))
 
venuesWithCentre %>% head()
##                        venue   lat      lon distanceFromCentre
## 1              Skills Matter 51.52 -0.09911             2301.6
## 2                   Skinkers 51.50 -0.08387             3422.6
## 3          Theodore Bullfrog 51.51 -0.12375              957.5
## 4 The Skills Matter eXchange 51.52 -0.09923             2280.6
## 5               The Guardian 51.53 -0.12234             1974.1
## 6            White Bear Yard 51.52 -0.10980             1509.5

Et voila!

Written by Mark Needham

December 4th, 2014 at 6:31 am

Posted in R

Tagged with ,

Spark: Write to CSV file with header using saveAsFile

with one comment

In my last blog post I showed how to write to a single CSV file using Spark and Hadoop and the next thing I wanted to do was add a header row to the resulting row.

Hadoop’s FileUtil#copyMerge function does take a String parameter but it adds this text to the end of each partition file which isn’t quite what we want.

However, if we copy that function into our own FileUtil class we can restructure it to do what we want:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import java.io.IOException;
 
public class MyFileUtil {
    public static boolean copyMergeWithHeader(FileSystem srcFS, Path srcDir, FileSystem dstFS, Path dstFile, boolean deleteSource, Configuration conf, String header) throws IOException {
        dstFile = checkDest(srcDir.getName(), dstFS, dstFile, false);
        if(!srcFS.getFileStatus(srcDir).isDir()) {
            return false;
        } else {
            FSDataOutputStream out = dstFS.create(dstFile);
            if(header != null) {
                out.write((header + "\n").getBytes("UTF-8"));
            }
 
            try {
                FileStatus[] contents = srcFS.listStatus(srcDir);
 
                for(int i = 0; i < contents.length; ++i) {
                    if(!contents[i].isDir()) {
                        FSDataInputStream in = srcFS.open(contents[i].getPath());
 
                        try {
                            IOUtils.copyBytes(in, out, conf, false);
 
                        } finally {
                            in.close();
                        }
                    }
                }
            } finally {
                out.close();
            }
 
            return deleteSource?srcFS.delete(srcDir, true):true;
        }
    }
 
    private static Path checkDest(String srcName, FileSystem dstFS, Path dst, boolean overwrite) throws IOException {
        if(dstFS.exists(dst)) {
            FileStatus sdst = dstFS.getFileStatus(dst);
            if(sdst.isDir()) {
                if(null == srcName) {
                    throw new IOException("Target " + dst + " is a directory");
                }
 
                return checkDest((String)null, dstFS, new Path(dst, srcName), overwrite);
            }
 
            if(!overwrite) {
                throw new IOException("Target " + dst + " already exists");
            }
        }
        return dst;
    }
}

We can then update our merge function to call this instead:

def merge(srcPath: String, dstPath: String, header:String): Unit =  {
  val hadoopConfig = new Configuration()
  val hdfs = FileSystem.get(hadoopConfig)
  MyFileUtil.copyMergeWithHeader(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, header)
}

We call merge from our code like this:

merge(file, destinationFile, "type,count")

I wasn’t sure how to import my Java based class into the Spark shell so I compiled the code into a JAR and submitted it as a job instead:

$ sbt package
[info] Loading global plugins from /Users/markneedham/.sbt/0.13/plugins
[info] Loading project definition from /Users/markneedham/projects/spark-play/playground/project
[info] Set current project to playground (in build file:/Users/markneedham/projects/spark-play/playground/)
[info] Compiling 3 Scala sources to /Users/markneedham/projects/spark-play/playground/target/scala-2.10/classes...
[info] Packaging /Users/markneedham/projects/spark-play/playground/target/scala-2.10/playground_2.10-1.0.jar ...
[info] Done packaging.
[success] Total time: 8 s, completed 30-Nov-2014 08:12:26
 
$ time ./bin/spark-submit --class "WriteToCsvWithHeader" --master local[4] /path/to/playground/target/scala-2.10/playground_2.10-1.0.jar
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.propertie
...
14/11/30 08:16:15 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
14/11/30 08:16:15 INFO SparkContext: Job finished: saveAsTextFile at WriteToCsvWithHeader.scala:49, took 0.589036 s
 
real	0m13.061s
user	0m38.977s
sys	0m3.393s

And if we look at our destination file:

$ cat /tmp/singlePrimaryTypes.csv
type,count
THEFT,859197
BATTERY,757530
NARCOTICS,489528
CRIMINAL DAMAGE,488209
BURGLARY,257310
OTHER OFFENSE,253964
ASSAULT,247386
MOTOR VEHICLE THEFT,197404
ROBBERY,157706
DECEPTIVE PRACTICE,137538
CRIMINAL TRESPASS,124974
PROSTITUTION,47245
WEAPONS VIOLATION,40361
PUBLIC PEACE VIOLATION,31585
OFFENSE INVOLVING CHILDREN,26524
CRIM SEXUAL ASSAULT,14788
SEX OFFENSE,14283
GAMBLING,10632
LIQUOR LAW VIOLATION,8847
ARSON,6443
INTERFERE WITH PUBLIC OFFICER,5178
HOMICIDE,4846
KIDNAPPING,3585
INTERFERENCE WITH PUBLIC OFFICER,3147
INTIMIDATION,2471
STALKING,1985
OFFENSES INVOLVING CHILDREN,355
OBSCENITY,219
PUBLIC INDECENCY,86
OTHER NARCOTIC VIOLATION,80
RITUALISM,12
NON-CRIMINAL,12
OTHER OFFENSE ,6
NON - CRIMINAL,2
NON-CRIMINAL (SUBJECT SPECIFIED),2

Happy days!

The code is available as a gist if you want to see all the details.

Written by Mark Needham

November 30th, 2014 at 8:21 am

Posted in Spark

Tagged with

Spark: Write to CSV file

without comments

A couple of weeks ago I wrote how I’d been using Spark to explore a City of Chicago Crime data set and having worked out how many of each crime had been committed I wanted to write that to a CSV file.

Spark provides a saveAsTextFile function which allows us to save RDD’s so I refactored my code into the following format to allow me to use that:

import au.com.bytecode.opencsv.CSVParser
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
 
def dropHeader(data: RDD[String]): RDD[String] = {
  data.mapPartitionsWithIndex((idx, lines) => {
    if (idx == 0) {
      lines.drop(1)
    }
    lines
  })
}
 
// https://data.cityofchicago.org/Public-Safety/Crimes-2001-to-present/ijzp-q8t2
val crimeFile = "/Users/markneedham/Downloads/Crimes_-_2001_to_present.csv"
 
val crimeData = sc.textFile(crimeFile).cache()
val withoutHeader: RDD[String] = dropHeader(crimeData)
 
val file = "/tmp/primaryTypes.csv"
FileUtil.fullyDelete(new File(file))
 
val partitions: RDD[(String, Int)] = withoutHeader.mapPartitions(lines => {
  val parser = new CSVParser(',')
  lines.map(line => {
    val columns = parser.parseLine(line)
    (columns(5), 1)
  })
})
 
val counts = partitions.
  reduceByKey {case (x,y) => x + y}.
  sortBy {case (key, value) => -value}.
  map { case (key, value) => Array(key, value).mkString(",") }
 
counts.saveAsTextFile(file)

If we run that code from the Spark shell we end up with a folder called /tmp/primaryTypes.csv containing multiple part files:

$ ls -lah /tmp/primaryTypes.csv/
total 496
drwxr-xr-x  66 markneedham  wheel   2.2K 30 Nov 07:17 .
drwxrwxrwt  80 root         wheel   2.7K 30 Nov 07:16 ..
-rw-r--r--   1 markneedham  wheel     8B 30 Nov 07:16 ._SUCCESS.crc
-rw-r--r--   1 markneedham  wheel    12B 30 Nov 07:16 .part-00000.crc
-rw-r--r--   1 markneedham  wheel    12B 30 Nov 07:16 .part-00001.crc
-rw-r--r--   1 markneedham  wheel    12B 30 Nov 07:16 .part-00002.crc
-rw-r--r--   1 markneedham  wheel    12B 30 Nov 07:16 .part-00003.crc
...
-rwxrwxrwx   1 markneedham  wheel     0B 30 Nov 07:16 _SUCCESS
-rwxrwxrwx   1 markneedham  wheel    28B 30 Nov 07:16 part-00000
-rwxrwxrwx   1 markneedham  wheel    17B 30 Nov 07:16 part-00001
-rwxrwxrwx   1 markneedham  wheel    23B 30 Nov 07:16 part-00002
-rwxrwxrwx   1 markneedham  wheel    16B 30 Nov 07:16 part-00003
...

If we look at some of those part files we can see that it’s written the crime types and counts as expected:

$ cat /tmp/primaryTypes.csv/part-00000
THEFT,859197
BATTERY,757530
 
$ cat /tmp/primaryTypes.csv/part-00003
BURGLARY,257310

This is fine if we’re going to pass those CSV files into another Hadoop based job but I actually want a single CSV file so it’s not quite what I want.

One way to achieve this is to force everything to be calculated on one partition which will mean we only get one part file generated:

val counts = partitions.repartition(1).
  reduceByKey {case (x,y) => x + y}.
  sortBy {case (key, value) => -value}.
  map { case (key, value) => Array(key, value).mkString(",") }
 
 
counts.saveAsTextFile(file)

part-00000 now looks like this:

$ cat !$
cat /tmp/primaryTypes.csv/part-00000
THEFT,859197
BATTERY,757530
NARCOTICS,489528
CRIMINAL DAMAGE,488209
BURGLARY,257310
OTHER OFFENSE,253964
ASSAULT,247386
MOTOR VEHICLE THEFT,197404
ROBBERY,157706
DECEPTIVE PRACTICE,137538
CRIMINAL TRESPASS,124974
PROSTITUTION,47245
WEAPONS VIOLATION,40361
PUBLIC PEACE VIOLATION,31585
OFFENSE INVOLVING CHILDREN,26524
CRIM SEXUAL ASSAULT,14788
SEX OFFENSE,14283
GAMBLING,10632
LIQUOR LAW VIOLATION,8847
ARSON,6443
INTERFERE WITH PUBLIC OFFICER,5178
HOMICIDE,4846
KIDNAPPING,3585
INTERFERENCE WITH PUBLIC OFFICER,3147
INTIMIDATION,2471
STALKING,1985
OFFENSES INVOLVING CHILDREN,355
OBSCENITY,219
PUBLIC INDECENCY,86
OTHER NARCOTIC VIOLATION,80
NON-CRIMINAL,12
RITUALISM,12
OTHER OFFENSE ,6
NON - CRIMINAL,2
NON-CRIMINAL (SUBJECT SPECIFIED),2

This works but it’s quite a bit slower than when we were doing the aggregation across partitions so it’s not ideal.

Instead, what we can do is make use of one of Hadoop’s merge functions which squashes part files together into a single file.

First we import Hadoop into our SBT file:

libraryDependencies += "org.apache.hadoop" % "hadoop-hdfs" % "2.5.2"

Now let’s bring our merge function into the Spark shell:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
 
def merge(srcPath: String, dstPath: String): Unit =  {
  val hadoopConfig = new Configuration()
  val hdfs = FileSystem.get(hadoopConfig)
  FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, null)
}

And now let’s make use of it:

val file = "/tmp/primaryTypes.csv"
FileUtil.fullyDelete(new File(file))
 
val destinationFile= "/tmp/singlePrimaryTypes.csv"
FileUtil.fullyDelete(new File(destinationFile))
 
val counts = partitions.
reduceByKey {case (x,y) => x + y}.
sortBy {case (key, value) => -value}.
map { case (key, value) => Array(key, value).mkString(",") }
 
counts.saveAsTextFile(file)
 
merge(file, destinationFile)

And now we’ve got the best of both worlds:

$ cat /tmp/singlePrimaryTypes.csv
THEFT,859197
BATTERY,757530
NARCOTICS,489528
CRIMINAL DAMAGE,488209
BURGLARY,257310
OTHER OFFENSE,253964
ASSAULT,247386
MOTOR VEHICLE THEFT,197404
ROBBERY,157706
DECEPTIVE PRACTICE,137538
CRIMINAL TRESPASS,124974
PROSTITUTION,47245
WEAPONS VIOLATION,40361
PUBLIC PEACE VIOLATION,31585
OFFENSE INVOLVING CHILDREN,26524
CRIM SEXUAL ASSAULT,14788
SEX OFFENSE,14283
GAMBLING,10632
LIQUOR LAW VIOLATION,8847
ARSON,6443
INTERFERE WITH PUBLIC OFFICER,5178
HOMICIDE,4846
KIDNAPPING,3585
INTERFERENCE WITH PUBLIC OFFICER,3147
INTIMIDATION,2471
STALKING,1985
OFFENSES INVOLVING CHILDREN,355
OBSCENITY,219
PUBLIC INDECENCY,86
OTHER NARCOTIC VIOLATION,80
RITUALISM,12
NON-CRIMINAL,12
OTHER OFFENSE ,6
NON - CRIMINAL,2
NON-CRIMINAL (SUBJECT SPECIFIED),2

The full code is available as a gist if you want to play around with it.

Written by Mark Needham

November 30th, 2014 at 7:40 am

Posted in Spark

Tagged with

Docker/Neo4j: Port forwarding on Mac OS X not working

without comments

Prompted by Ognjen Bubalo’s excellent blog post I thought it was about time I tried running Neo4j on a docker container on my Mac Book Pro to make it easier to play around with different data sets.

I got the container up and running by following Ognien’s instructions and had the following ports forwarded to my host machine:

$ docker ps
CONTAINER ID        IMAGE                 COMMAND                CREATED             STATUS              PORTS                                              NAMES
c62f8601e557        tpires/neo4j:latest   "/bin/bash -c /launc   About an hour ago   Up About an hour    0.0.0.0:49153->1337/tcp, 0.0.0.0:49154->7474/tcp   neo4j

This should allow me to access Neo4j on port 49154 but when I tried to access that host:port pair I got a connection refused message:

$ curl -v http://localhost:49154
* Adding handle: conn: 0x7ff369803a00
* Adding handle: send: 0
* Adding handle: recv: 0
* Curl_addHandleToPipeline: length: 1
* - Conn 0 (0x7ff369803a00) send_pipe: 1, recv_pipe: 0
* About to connect() to localhost port 49154 (#0)
*   Trying ::1...
*   Trying 127.0.0.1...
*   Trying fe80::1...
* Failed connect to localhost:49154; Connection refused
* Closing connection 0
curl: (7) Failed connect to localhost:49154; Connection refused

My first thought was the maybe Neo4j hadn’t started up correctly inside the container so I checked the logs:

$ docker logs --tail=10 c62f8601e557
10:59:12.994 [main] INFO  o.e.j.server.handler.ContextHandler - Started o.e.j.w.WebAppContext@2edfbe28{/webadmin,jar:file:/usr/share/neo4j/system/lib/neo4j-server-2.1.5-static-web.jar!/webadmin-html,AVAILABLE}
10:59:13.449 [main] INFO  o.e.j.server.handler.ContextHandler - Started o.e.j.s.ServletContextHandler@192efb4e{/db/manage,null,AVAILABLE}
10:59:13.699 [main] INFO  o.e.j.server.handler.ContextHandler - Started o.e.j.s.ServletContextHandler@7e94c035{/db/data,null,AVAILABLE}
10:59:13.714 [main] INFO  o.e.j.w.StandardDescriptorProcessor - NO JSP Support for /browser, did not find org.apache.jasper.servlet.JspServlet
10:59:13.715 [main] INFO  o.e.j.server.handler.ContextHandler - Started o.e.j.w.WebAppContext@3e84ae71{/browser,jar:file:/usr/share/neo4j/system/lib/neo4j-browser-2.1.5.jar!/browser,AVAILABLE}
10:59:13.807 [main] INFO  o.e.j.server.handler.ContextHandler - Started o.e.j.s.ServletContextHandler@4b6690b1{/,null,AVAILABLE}
10:59:13.819 [main] INFO  o.e.jetty.server.ServerConnector - Started ServerConnector@495350f0{HTTP/1.1}{c62f8601e557:7474}
10:59:13.900 [main] INFO  o.e.jetty.server.ServerConnector - Started ServerConnector@23ad0c5a{SSL-HTTP/1.1}{c62f8601e557:7473}
2014-11-27 10:59:13.901+0000 INFO  [API] Server started on: http://c62f8601e557:7474/
2014-11-27 10:59:13.902+0000 INFO  [API] Remote interface ready and available at [http://c62f8601e557:7474/]

Nope! It’s up and running perfectly fine which suggested the problemw was with port forwarding.

I eventually found my way to Chris Jones’ ‘How to use Docker on OS X: The Missing Guide‘ which explained the problem:

The Problem: Docker forwards ports from the container to the host, which is boot2docker, not OS X.

The Solution: Use the VM’s IP address.

So to access Neo4j on my machine I need to use the VM’s IP address rather than localhost. We can get the VM’s IP address like so:

$ boot2docker ip
 
The VM's Host only interface IP address is: 192.168.59.103

Let’s strip out that surrounding text though:

$ boot2docker ip 2> /dev/null
192.168.59.103

Now if we cURL using that IP instead:

$ curl -v http://192.168.59.103:49154
* About to connect() to 192.168.59.103 port 49154 (#0)
*   Trying 192.168.59.103...
* Adding handle: conn: 0x7fd794003a00
* Adding handle: send: 0
* Adding handle: recv: 0
* Curl_addHandleToPipeline: length: 1
* - Conn 0 (0x7fd794003a00) send_pipe: 1, recv_pipe: 0
* Connected to 192.168.59.103 (192.168.59.103) port 49154 (#0)
> GET / HTTP/1.1
> User-Agent: curl/7.30.0
> Host: 192.168.59.103:49154
> Accept: */*
>
< HTTP/1.1 200 OK
< Content-Type: application/json; charset=UTF-8
< Access-Control-Allow-Origin: *
< Content-Length: 112
* Server Jetty(9.0.5.v20130815) is not blacklisted
< Server: Jetty(9.0.5.v20130815)
<
{
  "management" : "http://192.168.59.103:49154/db/manage/",
  "data" : "http://192.168.59.103:49154/db/data/"
* Connection #0 to host 192.168.59.103 left intact

Happy days!

Chris has solutions to lots of other common problems people come across when using Docker with Mac OS X so it’s worth having a flick through his post.

Written by Mark Needham

November 27th, 2014 at 12:28 pm

Posted in Software Development

Tagged with