Mark Needham

Thoughts on Software Development

Archive for the ‘Languages’ Category

Luigi: Defining dynamic requirements (on output files)

without comments

In my last blog post I showed how to convert a JSON document containing meetup groups into a CSV file using Luigi, the Python library for building data pipelines. As well as creating that CSV file I wanted to go back to the meetup.com API and download all the members of those groups.

This was a rough flow of what i wanted to do:

  • Take JSON document containing all groups
  • Parse that document and for each group:
    • Call the /members endpoint
    • Save each one of those files as a JSON file
  • Iterate over all those JSON files and create a members CSV file

In the previous post we created the GroupsToJSON task which calls the /groups endpoint on the meetup API and creates the file /tmp/groups.json.

Our new task has that as its initial requirement:

class MembersToCSV(luigi.Task):
    key = luigi.Parameter()
    lat = luigi.Parameter()
    lon = luigi.Parameter()
 
    def requires(self):
        yield GroupsToJSON(self.key, self.lat, self.lon)

But we also want to create a requirement on a task that will make those calls to the /members endpoint and store the result in a JSON file.

One of the patterns that Luigi imposes on us is that each task should only create one file so actually we have a requirement on a collection of tasks rather than just one. It took me a little while to get my head around that!

We don’t know the parameters of those tasks at compile time – we can only calculate them by parsing the JSON file produced by GroupsToJSON.

In Luigi terminology what we want to create is a dynamic requirement. A dynamic requirement is defined inside the run method of a task and can rely on the output of any tasks specified in the requires method, which is exactly what we need.

This code does the delegating part of the job:

class MembersToCSV(luigi.Task):
    key = luigi.Parameter()
    lat = luigi.Parameter()
    lon = luigi.Parameter()
 
 
    def run(self):
        outputs = []
        for input in self.input():
            with input.open('r') as group_file:
                groups_json = json.load(group_file)
                groups = [str(group['id']) for group in groups_json]
 
 
                for group_id in groups:
                    members = MembersToJSON(group_id, self.key)
                    outputs.append(members.output().path)
                    yield members
 
 
    def requires(self):
        yield GroupsToJSON(self.key, self.lat, self.lon)

Inside our run method we iterate over the output of GroupsToJSON (which is our input) and we yield to another task as well as collecting its outputs in the array outputs that we’ll use later.
MembersToJSON looks like this:

class MembersToJSON(luigi.Task):
    group_id = luigi.IntParameter()
    key = luigi.Parameter()
 
 
    def run(self):
        results = []
        uri = "https://api.meetup.com/2/members?&group_id={0}&key={1}".format(self.group_id, self.key)
        while True:
            if uri is None:
                break
            r = requests.get(uri)
            response = r.json()
            for result in response["results"]:
                results.append(result)
            uri = response["meta"]["next"] if response["meta"]["next"] else None
 
 
        with self.output().open("w") as output:
            json.dump(results, output)
 
    def output(self):
        return luigi.LocalTarget("/tmp/members/{0}.json".format(self.group_id))

This task generates one file per group containing a list of all the members of that group.

We can now go back to MembersToCSV and convert those JSON files into a single CSV file:

class MembersToCSV(luigi.Task):
    out_path = "/tmp/members.csv"
    key = luigi.Parameter()
    lat = luigi.Parameter()
    lon = luigi.Parameter()
 
 
    def run(self):
        outputs = []
        for input in self.input():
            with input.open('r') as group_file:
                groups_json = json.load(group_file)
                groups = [str(group['id']) for group in groups_json]
 
 
                for group_id in groups:
                    members = MembersToJSON(group_id, self.key)
                    outputs.append(members.output().path)
                    yield members
 
        with self.output().open("w") as output:
            writer = csv.writer(output, delimiter=",")
            writer.writerow(["id", "name", "joined", "topics", "groupId"])
 
            for path in outputs:
                group_id = path.split("/")[-1].replace(".json", "")
                with open(path) as json_data:
                    d = json.load(json_data)
                    for member in d:
                        topic_ids = ";".join([str(topic["id"]) for topic in member["topics"]])
                        if "name" in member:
                            writer.writerow([member["id"], member["name"], member["joined"], topic_ids, group_id])
 
    def output(self):
        return luigi.LocalTarget(self.out_path)
 
    def requires(self):
        yield GroupsToJSON(self.key, self.lat, self.lon)

We then just need to add our new task as a requirement of the wrapper task:

And we’re ready to roll:

$ PYTHONPATH="." luigi --module blog --local-scheduler Meetup --workers 3

We’ve defined the number of workers here as we can execute those calls to the /members endpoint in parallel and there are ~ 600 calls to make.

All the code from both blog posts is available as a gist if you want to play around with it.

Any questions/advice let me know in the comments or I’m @markhneedham on twitter.

Written by Mark Needham

March 28th, 2017 at 5:39 am

Posted in Python

Tagged with ,

Luigi: An ExternalProgramTask example – Converting JSON to CSV

without comments

I’ve been playing around with the Python library Luigi which is used to build pipelines of batch jobs and I struggled to find an example of an ExternalProgramTask so this is my attempt at filling that void.

Luigi - the Python data library for building data science pipelines

I’m building a little data pipeline to get data from the meetup.com API and put it into CSV files that can be loaded into Neo4j using the LOAD CSV command.

The first task I created calls the /groups endpoint and saves the result into a JSON file:

import luigi
import requests
import json
from collections import Counter
 
class GroupsToJSON(luigi.Task):
    key = luigi.Parameter()
    lat = luigi.Parameter()
    lon = luigi.Parameter()
 
    def run(self):
        seed_topic = "nosql"
        uri = "https://api.meetup.com/2/groups?&topic={0}&lat={1}&lon={2}&key={3}".format(seed_topic, self.lat, self.lon, self.key)
 
        r = requests.get(uri)
        all_topics = [topic["urlkey"]  for result in r.json()["results"] for topic in result["topics"]]
        c = Counter(all_topics)
 
        topics = [entry[0] for entry in c.most_common(10)]
 
        groups = {}
        for topic in topics:
            uri = "https://api.meetup.com/2/groups?&topic={0}&lat={1}&lon={2}&key={3}".format(topic, self.lat, self.lon, self.key)
            r = requests.get(uri)
            for group in r.json()["results"]:
                groups[group["id"]] = group
 
        with self.output().open('w') as groups_file:
            json.dump(list(groups.values()), groups_file, indent=4, sort_keys=True)
 
    def output(self):
        return luigi.LocalTarget("/tmp/groups.json")

We define a few parameters at the top of the class which will be passed in when this task is executed. The most interesting lines of the run function are the last couple where we write the JSON to a file. self.output() refers to the target defined in the output function which in this case is /tmp/groups.json.

Now we need to create a task to convert that JSON file into CSV format. The jq command line tool does this job well so we’ll use that. The following task does the job:

from luigi.contrib.external_program import ExternalProgramTask
 
class GroupsToCSV(luigi.contrib.external_program.ExternalProgramTask):
    file_path = "/tmp/groups.csv"
    key = luigi.Parameter()
    lat = luigi.Parameter()
    lon = luigi.Parameter()
 
    def program_args(self):
        return ["./groups.sh", self.input()[0].path, self.output().path]
 
    def output(self):
        return luigi.LocalTarget(self.file_path)
 
    def requires(self):
        yield GroupsToJSON(self.key, self.lat, self.lon)

groups.sh

#!/bin/bash
 
in=${1}
out=${2}
 
echo "id,name,urlname,link,rating,created,description,organiserName,organiserMemberId" > ${out}
jq -r '.[] | [.id, .name, .urlname, .link, .rating, .created, .description, .organizer.name, .organizer.member_id] | @csv' ${in} >> ${out}

I wanted to call jq directly from the Python code but I couldn’t figure out how to do it so putting that code in a shell script is my workaround.

The last piece of the puzzle is a wrapper task that launches the others:

import os
 
class Meetup(luigi.WrapperTask):
    def run(self):
        print("Running Meetup")
 
    def requires(self):
        key = os.environ['MEETUP_API_KEY']
        lat = os.getenv('LAT', "51.5072")
        lon = os.getenv('LON', "0.1275")
 
        yield GroupsToCSV(key, lat, lon)

Now we’re ready to run the tasks:

$ PYTHONPATH="." luigi --module blog --local-scheduler Meetup
DEBUG: Checking if Meetup() is complete
DEBUG: Checking if GroupsToCSV(key=xxx, lat=51.5072, lon=0.1275) is complete
INFO: Informed scheduler that task   Meetup__99914b932b   has status   PENDING
DEBUG: Checking if GroupsToJSON(key=xxx, lat=51.5072, lon=0.1275) is complete
INFO: Informed scheduler that task   GroupsToCSV_xxx_51_5072_0_1275_e07372cebf   has status   PENDING
INFO: Informed scheduler that task   GroupsToJSON_xxx_51_5072_0_1275_e07372cebf   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 3
INFO: [pid 4452] Worker Worker(salt=970508581, workers=1, host=Marks-MBP-4, username=markneedham, pid=4452) running   GroupsToJSON(key=xxx, lat=51.5072, lon=0.1275)
INFO: [pid 4452] Worker Worker(salt=970508581, workers=1, host=Marks-MBP-4, username=markneedham, pid=4452) done      GroupsToJSON(key=xxx, lat=51.5072, lon=0.1275)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   GroupsToJSON_xxx_51_5072_0_1275_e07372cebf   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 4452] Worker Worker(salt=970508581, workers=1, host=Marks-MBP-4, username=markneedham, pid=4452) running   GroupsToCSV(key=xxx, lat=51.5072, lon=0.1275)
INFO: Running command: ./groups.sh /tmp/groups.json /tmp/groups.csv
INFO: [pid 4452] Worker Worker(salt=970508581, workers=1, host=Marks-MBP-4, username=markneedham, pid=4452) done      GroupsToCSV(key=xxx, lat=51.5072, lon=0.1275)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   GroupsToCSV_xxx_51_5072_0_1275_e07372cebf   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 4452] Worker Worker(salt=970508581, workers=1, host=Marks-MBP-4, username=markneedham, pid=4452) running   Meetup()
Running Meetup
INFO: [pid 4452] Worker Worker(salt=970508581, workers=1, host=Marks-MBP-4, username=markneedham, pid=4452) done      Meetup()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Meetup__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=970508581, workers=1, host=Marks-MBP-4, username=markneedham, pid=4452) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====
 
Scheduled 3 tasks of which:
* 3 ran successfully:
    - 1 GroupsToCSV(key=xxx, lat=51.5072, lon=0.1275)
    - 1 GroupsToJSON(key=xxx, lat=51.5072, lon=0.1275)
    - 1 Meetup()
 
This progress looks :) because there were no failed tasks or missing external dependencies
 
===== Luigi Execution Summary =====

Looks good! Let’s quickly look at our CSV file:

$ head -n10 /tmp/groups.csv 
id,name,urlname,link,rating,created,description,organiserName,organiserMemberId
1114381,"London NoSQL, MySQL, Open Source Community","london-nosql-mysql","https://www.meetup.com/london-nosql-mysql/",4.28,1208505614000,"<p>Meet others in London interested in NoSQL, MySQL, and Open Source Databases.</p>","Sinead Lawless",185675230
1561841,"Enterprise Search London Meetup","es-london","https://www.meetup.com/es-london/",4.66,1259157419000,"<p>Enterprise Search London is a meetup for anyone interested in building search and discovery experiences — from intranet search and site search, to advanced discovery applications and beyond.</p>
<p>Disclaimer: This meetup is NOT about SEO or search engine marketing.</p>
<p><strong>What people are saying:</strong></p>
<ul>
<li><span>""Join this meetup if you have a passion for enterprise search and user experience that you would like to share with other able-minded practitioners."" — Vegard Sandvold</span></li>
<li><span>""Full marks for vision and execution. Looking forward to the next Meetup."" — Martin White</span></li>
<li><span>“Consistently excellent” — Helen Lippell</span></li>
</ul>

Sweet! And what if we run it again?

$ PYTHONPATH="." luigi --module blog --local-scheduler Meetup
DEBUG: Checking if Meetup() is complete
INFO: Informed scheduler that task   Meetup__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=172768377, workers=1, host=Marks-MBP-4, username=markneedham, pid=4531) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====
 
Scheduled 1 tasks of which:
* 1 present dependencies were encountered:
    - 1 Meetup()
 
Did not run any tasks
This progress looks :) because there were no failed tasks or missing external dependencies
 
===== Luigi Execution Summary =====

As expected nothing happens since our dependencies are already satisfied and we have our first Luigi pipeline up and running.

Written by Mark Needham

March 25th, 2017 at 2:09 pm

Posted in Python

Tagged with , , ,

Python 3: TypeError: Object of type ‘dict_values’ is not JSON serializable

without comments

I’ve recently upgraded to Python 3 (I know, took me a while!) and realised that one of my scripts that writes JSON to a file no longer works!

This is a simplified version of what I’m doing:

>>> import json
>>> x = {"mark": {"name": "Mark"}, "michael": {"name": "Michael"}  } 
>>> json.dumps(x.values())
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/json/__init__.py", line 231, in dumps
    return _default_encoder.encode(obj)
  File "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/json/encoder.py", line 180, in default
    o.__class__.__name__)
TypeError: Object of type 'dict_values' is not JSON serializable

Python 2.7 would be perfectly happy:

>>> json.dumps(x.values())
'[{"name": "Michael"}, {"name": "Mark"}]'

The difference is in the results returned by the values method:

# Python 2.7.10
>>> x.values()
[{'name': 'Michael'}, {'name': 'Mark'}]
 
# Python 3.6.0
>>> x.values()
dict_values([{'name': 'Mark'}, {'name': 'Michael'}])
>>>

Python 3 no longer returns an array, instead we have a dict_values wrapper around the data.

Luckily this is easy to resolve – we just need to wrap the call to values with a call to list:

>>> json.dumps(list(x.values()))
'[{"name": "Mark"}, {"name": "Michael"}]'

This versions works with Python 2.7 as well so if I accidentally run the script with an old version the world isn’t going to explode.

Written by Mark Needham

March 19th, 2017 at 4:40 pm

Posted in Python

Tagged with

ReactJS/Material-UI: Cannot resolve module ‘material-ui/lib/’

without comments

I’ve been playing around with ReactJS and the Material-UI library over the weekend and ran into this error while trying to follow one of the example from the demo application:

ERROR in ./src/app/modules/Foo.js
Module not found: Error: Cannot resolve module 'material-ui/lib/Subheader' in /Users/markneedham/neo/reactjs-test/src/app/modules
 @ ./src/app/modules/Foo.js 13:17-53
webpack: Failed to compile.

This was the component code:

import React from 'react'
import Subheader from 'material-ui/lib/Subheader'
 
export default React.createClass({
  render() {
    return <div>
    <Subheader>Some Text</Subheader>
    </div>
  }
})

which is then rendered like this:

import Foo from './modules/Foo'
render(Foo, document.getElementById("app"))

I came across this post on Stack Overflow which seemed to describe a similar issue and led me to realise that I was actually on the wrong version of the documentation. I’m using version 0.16.7 but the demo I copied from is for version 0.15.0-alpha.1!

This is the component code that we actually want:

import React from 'react'
import Subheader from 'material-ui/Subheader'
 
export default React.createClass({
  render() {
    return <div>
    <Subheader>Some Text</Subheader>
    </div>
  }
})

And that’s all I had to change. There are several other components that you’ll see the same error for and it looks like the change was made between the 0.14.x and 0.15.x series of the library.

Written by Mark Needham

February 12th, 2017 at 10:43 pm

Posted in Javascript

Tagged with

Go: Multi-threaded writing to a CSV file

with one comment

As part of a Go script I’ve been working on I wanted to write to a CSV file from multiple Go routines, but realised that the built in CSV Writer isn’t thread safe.

My first attempt at writing to the CSV file looked like this:

package main
 
 
import (
	"encoding/csv"
	"os"
	"log"
	"strconv"
)
 
func main() {
 
	csvFile, err := os.Create("/tmp/foo.csv")
	if err != nil {
		log.Panic(err)
	}
 
	w := csv.NewWriter(csvFile)
	w.Write([]string{"id1","id2","id3"})
 
	count := 100
	done := make(chan bool, count)
 
	for i := 0; i < count; i++ {
		go func(i int) {
			w.Write([]string {strconv.Itoa(i), strconv.Itoa(i), strconv.Itoa(i)})
			done <- true
		}(i)
	}
 
	for i:=0; i < count; i++ {
		<- done
	}
	w.Flush()
}

This script should output the numbers from 0-99 three times on each line. Some rows in the file are written correctly, but as we can see below, some aren’t:

40,40,40
37,37,37
38,38,38
18,18,39
^@,39,39
...
67,67,70,^@70,70
65,65,65
73,73,73
66,66,66
72,72,72
75,74,75,74,75
74
7779^@,79,77
...

One way that we can make our script safe is to use a mutex whenever we’re calling any methods on the CSV writer. I wrote the following code to do this:

type CsvWriter struct {
	mutex *sync.Mutex
	csvWriter *csv.Writer
}
 
func NewCsvWriter(fileName string) (*CsvWriter, error) {
	csvFile, err := os.Create(fileName)
	if err != nil {
		return nil, err
	}
	w := csv.NewWriter(csvFile)
	return &CsvWriter{csvWriter:w, mutex: &sync.Mutex{}}, nil
}
 
func (w *CsvWriter) Write(row []string) {
	w.mutex.Lock()
	w.csvWriter.Write(row)
	w.mutex.Unlock()
}
 
func (w *CsvWriter) Flush() {
	w.mutex.Lock()
	w.csvWriter.Flush()
	w.mutex.Unlock()
}

We create a mutex when NewCsvWriter instantiates CsvWriter and then use it in the Write and Flush functions so that only one go routine at a time can access the underlying CsvWriter. We then tweak the initial script to call this class instead of calling CsvWriter directly:

func main() {
	w, err := NewCsvWriter("/tmp/foo-safe.csv")
	if err != nil {
		log.Panic(err)
	}
 
	w.Write([]string{"id1","id2","id3"})
 
	count := 100
	done := make(chan bool, count)
 
	for i := 0; i < count; i++ {
		go func(i int) {
			w.Write([]string {strconv.Itoa(i), strconv.Itoa(i), strconv.Itoa(i)})
			done <- true
		}(i)
	}
 
	for i:=0; i < count; i++ {
		<- done
	}
	w.Flush()
}

And now if we inspect the CSV file all lines have been written successfully:

...
25,25,25
13,13,13
29,29,29
32,32,32
26,26,26
30,30,30
27,27,27
31,31,31
28,28,28
34,34,34
35,35,35
33,33,33
37,37,37
36,36,36
...

That’s all for now. If you have any suggestions for a better way to do this do let me know in the comments or on twitter – I’m @markhneedham

Written by Mark Needham

January 31st, 2017 at 5:57 am

Posted in Go

Tagged with ,

Go vs Python: Parsing a JSON response from a HTTP API

without comments

As part of a recommendations with Neo4j talk that I’ve presented a few times over the last year I have a set of scripts that download some data from the meetup.com API.

They’re all written in Python but I thought it’d be a fun exercise to see what they’d look like in Go. My eventual goal is to try and parallelise the API calls.

This is the Python version of the script:

import requests
import os
import json
 
key =  os.environ['MEETUP_API_KEY']
lat = "51.5072"
lon = "0.1275"
 
seed_topic = "nosql"
uri = "https://api.meetup.com/2/groups?&amp;topic={0}&amp;lat={1}&amp;lon={2}&amp;key={3}".format(seed_topic, lat, lon, key)
 
r = requests.get(uri)
all_topics = [topic["urlkey"]  for result in r.json()["results"] for topic in result["topics"]]
 
for topic in all_topics:
    print topic

We’re using the requests library to send a request to the meetup API to get the groups which have the topic ‘nosql’ in the London area. We then parse the response and print out the topics.

Now to do the same thing in Go! The first bit of the script is almost identical:

import (
	"fmt"
	"os"
	"net/http"
	"log"
	"time"
)
 
func handleError(err error) {
	if err != nil {
		fmt.Println(err)
		log.Fatal(err)
	}
}
 
func main() {
	var httpClient = &amp;http.Client{Timeout: 10 * time.Second}
 
	seedTopic := "nosql"
	lat := "51.5072"
	lon := "0.1275"
	key := os.Getenv("MEETUP_API_KEY")
 
	uri := fmt.Sprintf("https://api.meetup.com/2/groups?&amp;topic=%s&amp;lat=%s&amp;lon=%s&amp;key=%s", seedTopic, lat, lon, key)
 
	response, err := httpClient.Get(uri)
	handleError(err)
	defer response.Body.Close()
	fmt.Println(response)
}

If we run that this is the output we see:

$ go cmd/blog/main.go
 
&amp;{200 OK 200 HTTP/2.0 2 0 map[X-Meetup-Request-Id:[2d3be3c7-a393-4127-b7aa-076f150499e6] X-Ratelimit-Reset:[10] Cf-Ray:[324093a73f1135d2-LHR] X-Oauth-Scopes:[basic] Etag:["35a941c5ea3df9df4204d8a4a2d60150"] Server:[cloudflare-nginx] Set-Cookie:[__cfduid=d54db475299a62af4bb963039787e2e3d1484894864; expires=Sat, 20-Jan-18 06:47:44 GMT; path=/; domain=.meetup.com; HttpOnly] X-Meetup-Server:[api7] X-Ratelimit-Limit:[30] X-Ratelimit-Remaining:[29] X-Accepted-Oauth-Scopes:[basic] Vary:[Accept-Encoding,User-Agent,Accept-Language] Date:[Fri, 20 Jan 2017 06:47:45 GMT] Content-Type:[application/json;charset=utf-8]] 0xc420442260 -1 [] false true map[] 0xc4200d01e0 0xc4202b2420}

So far so good. Now we need to parse the response that comes back.

Most of the examples that I came across suggest creating a struct with all the fields that you want to extract from the JSON document but that feels a bit over kill for such a simple script.

Instead we can just create maps of (string -> interface{}) and then apply type conversions where appropriate. I ended up with the following code to extract the topics:

import "encoding/json"
 
var target map[string]interface{}
decoder := json.NewDecoder(response.Body)
decoder.Decode(&amp;target)
 
for _, rawGroup := range target["results"].([]interface{}) {
    group := rawGroup.(map[string]interface{})
    for _, rawTopic := range group["topics"].([]interface{}) {
        topic := rawTopic.(map[string]interface{})
        fmt.Println(topic["urlkey"])
    }
}

It’s more verbose that the Python version because we have to explicitly type each thing we take out of the map at every stage, but it’s not too bad. This is the full script:

package main
 
import (
	"fmt"
	"os"
	"net/http"
	"log"
	"time"
	"encoding/json"
)
 
func handleError(err error) {
	if err != nil {
		fmt.Println(err)
		log.Fatal(err)
	}
}
 
func main() {
	var httpClient = &amp;http.Client{Timeout: 10 * time.Second}
 
	seedTopic := "nosql"
	lat := "51.5072"
	lon := "0.1275"
	key := os.Getenv("MEETUP_API_KEY")
 
	uri := fmt.Sprintf("https://api.meetup.com/2/groups?&amp;topic=%s&amp;lat=%s&amp;lon=%s&amp;key=%s", seedTopic, lat, lon, key)
 
	response, err := httpClient.Get(uri)
	handleError(err)
	defer response.Body.Close()
 
	var target map[string]interface{}
	decoder := json.NewDecoder(response.Body)
	decoder.Decode(&amp;target)
 
	for _, rawGroup := range target["results"].([]interface{}) {
		group := rawGroup.(map[string]interface{})
		for _, rawTopic := range group["topics"].([]interface{}) {
			topic := rawTopic.(map[string]interface{})
			fmt.Println(topic["urlkey"])
		}
	}
}

Once I’ve got these topics the next step is to make more API calls to get the groups for those topics.

I want to make those API calls in parallel while making sure I don’t exceed the rate limit restrictions on the API and I think I can make use of go routines, channels, and timers to do that. But that’s for another post!

Written by Mark Needham

January 21st, 2017 at 10:49 am

Posted in Python

Tagged with ,

Go: First attempt at channels

without comments

In a previous blog post I mentioned that I wanted to extract blips from The ThoughtWorks Radar into a CSV file and I thought this would be a good mini project for me to practice using Go.

In particular I wanted to try using channels and this seemed like a good chance to do that.

I watched a talk by Rob Pike on designing concurrent applications where he uses the following definition of concurrency:

Concurrency is a way to structure a program by breaking it into pieces that can be executed independently.

He then demonstrates this with the following diagram:

2016 12 23 19 52 30

I broke the scraping application down into four parts:

  1. Find the links of blips to download ->
  2. Download the blips ->
  3. Scrape the data from each page ->
  4. Write the data into a CSV file

I don’t think we gain much by parallelising steps 1) or 4) but steps 2) and 3) seem easily parallelisable. Therefore we’ll use a single goroutine for steps 1) and 4) and multiple goroutines for steps 2) and 3).

We’ll create two channels:

  • filesToScrape
  • filesScraped

And they will interact with our components like this:

  • 2) will write the path of the downloaded files into filesToScape
  • 3) will read from filesToScrape and write the scraped content into filesScraped
  • 4) will read from filesScraped and put that information into a CSV file.


I decided to write a completely serial version of the scraping application first so that I could compare it to the parallel version. I had the following common code:

scrape/scrape.go

package scrape
 
import (
	"github.com/PuerkitoBio/goquery"
	"os"
	"bufio"
	"fmt"
	"log"
	"strings"
	"net/http"
	"io"
)
 
func checkError(err error) {
	if err != nil {
		fmt.Println(err)
		log.Fatal(err)
	}
}
 
type Blip struct {
	Link  string
	Title string
}
 
func (blip Blip) Download() File {
	parts := strings.Split(blip.Link, "/")
	fileName := "rawData/items/" + parts[len(parts) - 1]
 
	if _, err := os.Stat(fileName); os.IsNotExist(err) {
		resp, err := http.Get("http://www.thoughtworks.com" + blip.Link)
		checkError(err)
		body := resp.Body
 
		file, err := os.Create(fileName)
		checkError(err)
 
		io.Copy(bufio.NewWriter(file), body)
		file.Close()
		body.Close()
	}
 
	return File{Title: blip.Title, Path: fileName }
}
 
type File struct {
	Title string
	Path  string
}
 
func (fileToScrape File ) Scrape() ScrapedFile {
	file, err := os.Open(fileToScrape.Path)
	checkError(err)
 
	doc, err := goquery.NewDocumentFromReader(bufio.NewReader(file))
	checkError(err)
	file.Close()
 
	var entries []map[string]string
	doc.Find("div.blip-timeline-item").Each(func(i int, s *goquery.Selection) {
		entry := make(map[string]string, 0)
		entry["time"] = s.Find("div.blip-timeline-item__time").First().Text()
		entry["outcome"] = strings.Trim(s.Find("div.blip-timeline-item__ring span").First().Text(), " ")
		entry["description"] = s.Find("div.blip-timeline-item__lead").First().Text()
		entries = append(entries, entry)
	})
 
	return ScrapedFile{File:fileToScrape, Entries:entries}
}
 
type ScrapedFile struct {
	File    File
	Entries []map[string]string
}
 
func FindBlips(pathToRadar string) []Blip {
	blips := make([]Blip, 0)
 
	file, err := os.Open(pathToRadar)
	checkError(err)
 
	doc, err := goquery.NewDocumentFromReader(bufio.NewReader(file))
	checkError(err)
 
	doc.Find(".blip").Each(func(i int, s *goquery.Selection) {
		item := s.Find("a")
		title := item.Text()
		link, _ := item.Attr("href")
		blips = append(blips, Blip{Title: title, Link: link })
	})
 
	return blips
}

Note that we’re using the goquery library to scrape the HTML files that we download.

A Blip is used to represent an item that appears on the radar e.g. .NET Core. A File is a representation of that blip on my local file system and a ScrapedFile contains the local representation of a blip and has an array containing every appearance the blip has made in radars over time.

Let’s have a look at the single threaded version of the scraper:

cmd/single/main.go

package main
 
import (
	"fmt"
	"encoding/csv"
	"os"
	"github.com/mneedham/neo4j-thoughtworks-radar/scrape"
)
 
 
func main() {
	var filesCompleted chan scrape.ScrapedFile = make(chan scrape.ScrapedFile)
	defer close(filesCompleted)
 
	blips := scrape.FindBlips("rawData/twRadar.html")
 
	var filesToScrape []scrape.File
	for _, blip := range blips {
		filesToScrape = append(filesToScrape, blip.Download())
	}
 
	var filesScraped []scrape.ScrapedFile
	for _, file := range filesToScrape {
		filesScraped = append(filesScraped, file.Scrape())
	}
 
	blipsCsvFile, _ := os.Create("import/blipsSingle.csv")
	writer := csv.NewWriter(blipsCsvFile)
	defer blipsCsvFile.Close()
 
	writer.Write([]string{"technology", "date", "suggestion" })
	for _, scrapedFile := range filesScraped {
		fmt.Println(scrapedFile.File.Title)
		for _, blip := range scrapedFile.Entries {
			writer.Write([]string{scrapedFile.File.Title, blip["time"], blip["outcome"] })
		}
	}
	writer.Flush()
}

rawData/twRadar.html is a local copy of the A-Z page which contains all the blips. This version is reasonably simple: we create an array containing all the blips, scrape them into another array, and then that array into a CSV file. And if we run it:

$ time go run cmd/single/main.go 
 
real	3m10.354s
user	0m1.140s
sys	0m0.586s
 
$ head -n10 import/blipsSingle.csv 
technology,date,suggestion
.NET Core,Nov 2016,Assess
.NET Core,Nov 2015,Assess
.NET Core,May 2015,Assess
A single CI instance for all teams,Nov 2016,Hold
A single CI instance for all teams,Apr 2016,Hold
Acceptance test of journeys,Mar 2012,Trial
Acceptance test of journeys,Jul 2011,Trial
Acceptance test of journeys,Jan 2011,Trial
Accumulate-only data,Nov 2015,Assess

It takes a few minutes and most of the time will be taken in the blip.Download() function – work which is easily parallelisable. Let’s have a look at the parallel version where goroutines use channels to communicate with each other:

cmd/parallel/main.go

package main
 
import (
	"os"
	"encoding/csv"
	"github.com/mneedham/neo4j-thoughtworks-radar/scrape"
)
 
func main() {
	var filesToScrape chan scrape.File = make(chan scrape.File)
	var filesScraped chan scrape.ScrapedFile = make(chan scrape.ScrapedFile)
	defer close(filesToScrape)
	defer close(filesScraped)
 
	blips := scrape.FindBlips("rawData/twRadar.html")
 
	for _, blip := range blips {
		go func(blip scrape.Blip) { filesToScrape <- blip.Download() }(blip)
	}
 
	for i := 0; i < len(blips); i++ {
		select {
		case file := <-filesToScrape:
			go func(file scrape.File) { filesScraped <- file.Scrape() }(file)
		}
	}
 
	blipsCsvFile, _ := os.Create("import/blips.csv")
	writer := csv.NewWriter(blipsCsvFile)
	defer blipsCsvFile.Close()
 
	writer.Write([]string{"technology", "date", "suggestion" })
	for i := 0; i < len(blips); i++ {
		select {
		case scrapedFile := <-filesScraped:
			for _, blip := range scrapedFile.Entries {
				writer.Write([]string{scrapedFile.File.Title, blip["time"], blip["outcome"] })
			}
		}
	}
	writer.Flush()
}

Let’s remove the files we just downloaded and give this version a try.

$ rm rawData/items/*
 
$ time go run cmd/parallel/main.go 
 
real	0m6.689s
user	0m2.544s
sys	0m0.904s
 
$ head -n10 import/blips.csv 
technology,date,suggestion
Zucchini,Oct 2012,Assess
Reactive Extensions for .Net,May 2013,Assess
Manual infrastructure management,Mar 2012,Hold
Manual infrastructure management,Jul 2011,Hold
JavaScript micro frameworks,Oct 2012,Trial
JavaScript micro frameworks,Mar 2012,Trial
NPM for all the things,Apr 2016,Trial
NPM for all the things,Nov 2015,Trial
PowerShell,Mar 2012,Trial

So we’re down from 190 seconds to 7 seconds, pretty cool! One interesting thing is that the order of the values in the CSV file will be different since the goroutines won’t necessarily come back in the same order that they were launched. We do end up with the same number of values:

$ wc -l import/blips.csv 
    1361 import/blips.csv
 
$ wc -l import/blipsSingle.csv 
    1361 import/blipsSingle.csv

And we can check that the contents are identical:

$ cat import/blipsSingle.csv  | sort > /tmp/blipsSingle.csv
 
$ cat import/blips.csv  | sort > /tmp/blips.csv
 
$ diff /tmp/blips.csv /tmp/blipsSingle.csv


The code in this post is all on github. I’m sure I’ve made some mistakes/there are ways that this could be done better so do let me know in the comments or I’m @markhneedham on twitter.

Written by Mark Needham

December 24th, 2016 at 10:45 am

Posted in Go

Tagged with ,

Go: cannot execute binary file: Exec format error

without comments

In an earlier blog post I mentioned that I’d been building an internal application to learn a bit of Go and I wanted to deploy it to AWS.

Since the application was only going to live for a couple of days I didn’t want to spend a long time build up anything fancy so my plan was just to build the executable, SSH it to my AWS instance, and then run it.

My initial (somewhat naive) approach was to just build the project on my Mac and upload and run it:

$ go build
 
$ scp myapp ubuntu@aws...
 
$ ssh ubuntu@aws...
 
$ ./myapp
-bash: ./myapp: cannot execute binary file: Exec format error

That didn’t go so well! By reading Ask Ubuntu and Dave Cheney’s blog post on cross compilation I realised that I just needed to set the appropriate environment variables before running go build.

The following did the trick:

env GOOS=linux GOARCH=amd64 GOARM=7 go build

And that’s it! I’m sure there’s more sophisticated ways of doing this that I’ll come to learn about but for now this worked for me.

Written by Mark Needham

December 23rd, 2016 at 6:24 pm

Posted in Go

Tagged with ,

Go: Templating with the Gin Web Framework

without comments

I spent a bit of time over the last week building a little internal web application using Go and the Gin Web Framework and it took me a while to get the hang of the templating language so I thought I’d write up some examples.

Before we get started, I’ve got my GOPATH set to the following path:

$ echo $GOPATH
/Users/markneedham/projects/gocode

And the project containing the examples sits inside the src directory:

$ pwd
/Users/markneedham/projects/gocode/src/github.com/mneedham/golang-gin-templating-demo

Let’s first install Gin:

$ go get gopkg.in/gin-gonic/gin.v1

It gets installed here:

$ ls -lh $GOPATH/src/gopkg.in
total 0
drwxr-xr-x   3 markneedham  staff   102B 23 Dec 10:55 gin-gonic

Now let’s create a main function to launch our web application:

demo.go

package main
 
import (
	"github.com/gin-gonic/gin"
	"net/http"
)
 
func main() {
	router := gin.Default()
	router.LoadHTMLGlob("templates/*")
 
	// our handlers will go here
 
	router.Run("0.0.0.0:9090")
}

We’re launching our application on port 9090 and the templates live in the templates directory which is located relative to the file containing the main function:

$ ls -lh
total 8
-rw-r--r--  1 markneedham  staff   570B 23 Dec 13:34 demo.go
drwxr-xr-x  4 markneedham  staff   136B 23 Dec 13:34 templates

Arrays

Let’s create a route which will display the values of an array in an unordered list:

	router.GET("/array", func(c *gin.Context) {
		var values []int
		for i := 0; i < 5; i++ {
			values = append(values, i)
		}
 
		c.HTML(http.StatusOK, "array.tmpl", gin.H{"values": values})
	})
<ul>
  {{ range .values }}
  <li>{{ . }}</li>
  {{ end }}
</ul>

And now we’ll cURL our application to see what we get back:

$ curl http://localhost:9090/array
<ul>
  <li>0</li>
  <li>1</li>
  <li>2</li>
  <li>3</li>
  <li>4</li>
</ul>

What about if we have an array of structs instead of just strings?

import "strconv"
 
type Foo struct {
	value1 int
	value2 string
}
 
	router.GET("/arrayStruct", func(c *gin.Context) {
		var values []Foo
		for i := 0; i < 5; i++ {
			values = append(values, Foo{Value1: i, Value2: "value " + strconv.Itoa(i)})
		}
 
		c.HTML(http.StatusOK, "arrayStruct.tmpl", gin.H{"values": values})
	})
<ul>
  {{ range .values }}
  <li>{{ .Value1 }} -> {{ .Value2 }}</li>
  {{ end }}
</ul>

cURL time:

$ curl http://localhost:9090/arrayStruct
<ul>
  <li>0 -> value 0</li>
  <li>1 -> value 1</li>
  <li>2 -> value 2</li>
  <li>3 -> value 3</li>
  <li>4 -> value 4</li>  
</ul>

Maps

Now let’s do the same for maps.

	router.GET("/map", func(c *gin.Context) {
		values := make(map[string]string)
		values["language"] = "Go"
		values["version"] = "1.7.4"
 
		c.HTML(http.StatusOK, "map.tmpl", gin.H{"myMap": values})
	})
<ul>
  {{ range .myMap }}
  <li>{{ . }}</li>
  {{ end }}
</ul>

And cURL it:

$ curl http://localhost:9090/map
<ul>
  <li>Go</li>
  <li>1.7.4</li>
</ul>

What if we want to see the keys as well?

	router.GET("/mapKeys", func(c *gin.Context) {
		values := make(map[string]string)
		values["language"] = "Go"
		values["version"] = "1.7.4"
 
		c.HTML(http.StatusOK, "mapKeys.tmpl", gin.H{"myMap": values})
	})
<ul>
  {{ range $key, $value := .myMap }}
  <li>{{ $key }} -> {{ $value }}</li>
  {{ end }}
</ul>
$ curl http://localhost:9090/mapKeys
<ul>  
  <li>language -> Go</li>  
  <li>version -> 1.7.4</li>  
</ul>

And finally, what if we want to select specific values from the map?

	router.GET("/mapSelectKeys", func(c *gin.Context) {
		values := make(map[string]string)
		values["language"] = "Go"
		values["version"] = "1.7.4"
 
		c.HTML(http.StatusOK, "mapSelectKeys.tmpl", gin.H{"myMap": values})
	})
<ul>
  <li>Language: {{ .myMap.language }}</li>
  <li>Version: {{ .myMap.version }}</li>
</ul>
$ curl http://localhost:9090/mapSelectKeys
<ul>
  <li>Language: Go</li>
  <li>Version: 1.7.4</li>
</ul>

I’ve found the Hugo Go Template Primer helpful for figuring this out so that’s a good reference if you get stuck. You can find a go file containing all the examples on github if you want to use that as a starting point.

Written by Mark Needham

December 23rd, 2016 at 2:30 pm

Posted in Go

Tagged with ,

scikit-learn: First steps with log_loss

without comments

Over the last week I’ve spent a little bit of time playing around with the data in the Kaggle TalkingData Mobile User Demographics competition, and came across a notebook written by dune_dweller showing how to run a logistic regression algorithm on the dataset.

The metric used to evaluate the output in this competition is multi class logarithmic loss, which is implemented by the log_loss function in the scikit-learn library.

I’ve not used it before so I created a small example to get to grips with it.

Let’s say we have 3 rows to predict and we happen to know that they should be labelled ‘bam’, ‘spam’, and ‘ham’ respectively:

>>> actual_labels = ["bam", "ham", "spam"]


To work out the log loss score we need to make a prediction for what we think each label actually is. We do this by passing an array containing a probability between 0-1 for each label

e.g. if we think the first label is definitely ‘bam’ then we’d pass [1, 0, 0], whereas if we thought it had a 50-50 chance of being ‘bam’ or ‘spam’ then we might pass [0.5, 0, 0.5]. As far as I can tell the values get sorted into (alphabetical) order so we need to provide our predictions in the same order.

Let’s give it a try. First we’ll import the function:

>>> from sklearn.metrics import log_loss

Now let’s see what score we get if we make a perfect prediction:

>>> log_loss(actual_labels,  [[1, 0, 0], [0, 1, 0], [0, 0, 1]])
2.1094237467877998e-15

What about if we make a completely wrong prediction?

>>> log_loss(actual_labels,  [[0, 0, 1], [1, 0, 0], [0, 1, 0]])
34.538776394910684

We can reverse engineer this score to work out the probability that we’ve predicted the correct class.

If we look at the case where the average log loss exceeds 1, it is when log(pij) < -1 when i is the true class. This means that the predicted probability for that given class would be less than exp(-1) or around 0.368. So, seeing a log loss greater than one can be expected in the cass that that your model only gives less than a 36% probability estimate for the correct class.

This is the formula of logloss:

NEmt7

In which yij is 1 for the correct class and 0 for other classes and pij is the probability assigned for that class.

The interesting thing about this formula is that we only care about the correct class. The yij value of 0 cancels out the wrong classes.

In our two examples so far we actually already know the probability estimate for the correct class – 100% in the first case and 0% in the second case, but we can plug in the numbers to check we end up with the same result.

First we need to work out what value would have been passed to the log function which is easy in this case. The value of yij is

# every prediction exactly right
>>> math.log(1)
0.0
 
>>> math.exp(0)
1.0
# every prediction completely wrong
>>> math.log(0.000000001)
-20.72326583694641
 
>>> math.exp(-20.72326583694641)
1.0000000000000007e-09

I used a really small value instead of 0 in the second example because math.log(0) trends towards negative infinity.

Let’s try another example where we have less certainty:

>>> print log_loss(actual_labels, [[0.8, 0.1, 0.1], [0.3, 0.6, 0.1], [0.15, 0.15, 0.7]])
0.363548039673

We’ll have to do a bit more work to figure out what value was being passed to the log function this time, but not too much. This is roughly the calculation being performed:

# 0.363548039673 = -1/3 * (log(0.8) + log(0.6) + log(0.7)
 
>>> print log_loss(actual_labels,  [[0.8, 0.1, 0.1], [0.3, 0.6, 0.1], [0.15, 0.15, 0.7]])
0.363548039673

In this case, on average our probability estimate would be:

# we put in the negative value since we multiplied by -1/N
>>> math.exp(-0.363548039673)
0.6952053289772744

We had 60%, 70%, and 80% accuracy for our 3 labels so an overall probability of 69.5% seems about right.

One more example. This time we’ll make one more very certain (90%) prediction for ‘spam’:

>>> print log_loss(["bam", "ham", "spam", "spam"], [[0.8, 0.1, 0.1], [0.3, 0.6, 0.1], [0.15, 0.15, 0.7], [0.05, 0.05, 0.9]])
0.299001158669
 
>>> math.exp(-0.299001158669)
0.741558550213609

74% accuracy overall, sounds about right!

Written by Mark Needham

September 14th, 2016 at 5:33 am

Posted in Machine Learning,Python

Tagged with