Mark Needham

Thoughts on Software Development

AWS Lambda: Encrypted environment variables

with 3 comments

Continuing on from my post showing how to create a ‘Hello World’ AWS lambda function I wanted to pass encrypted environment variables to my function.

The following function takes in both an encrypted and unencrypted variable and prints them out.

Don’t print out encrypted variables in a real function, this is just so we can see the example working!

import boto3
import os
 
from base64 import b64decode
 
def lambda_handler(event, context):
    encrypted = os.environ['ENCRYPTED_VALUE']
    decrypted = boto3.client('kms').decrypt(CiphertextBlob=b64decode(encrypted))['Plaintext']
 
    # Don't print out your decrypted value in a real function! This is just to show how it works.
    print("Decrypted value:", decrypted)
 
    plain_text = os.environ["PLAIN_TEXT_VALUE"]
    print("Plain text:", plain_text)

Now we’ll zip up our function into HelloWorldEncrypted.zip, ready to send to AWS.

zip HelloWorldEncrypted.zip HelloWorldEncrypted.py

Now it’s time to upload our function to AWS and create the associated environment variables.

If you’re using a Python editor then you’ll need to install boto3 locally to keep the editor happy but you don’t need to include boto3 in the code you send to AWS Lambda – it comes pre-installed.

Now we write the following code to automate the creation of our Lambda function:

import boto3
from base64 import b64encode
 
fn_name = "HelloWorldEncrypted"
kms_key = "arn:aws:kms:[aws-zone]:[your-aws-id]:key/[your-kms-key-id]"
fn_role = 'arn:aws:iam::[your-aws-id]:role/lambda_basic_execution'
 
lambda_client = boto3.client('lambda')
kms_client = boto3.client('kms')
 
encrypt_me = "abcdefg"
encrypted = b64encode(kms_client.encrypt(Plaintext=encrypt_me, KeyId=kms_key)["CiphertextBlob"])
 
plain_text = 'hijklmno'
 
lambda_client.create_function(
        FunctionName=fn_name,
        Runtime='python2.7',
        Role=fn_role,
        Handler="{0}.lambda_handler".format(fn_name),
        Code={ 'ZipFile': open("{0}.zip".format(fn_name), 'rb').read(),},
        Environment={
            'Variables': {
                'ENCRYPTED_VALUE': encrypted,
                'PLAIN_TEXT_VALUE': plain_text,
            }
        },
        KMSKeyArn=kms_key
)

The tricky bit for me here was figuring out that I needed to pass the value that I wanted to base 64 encode the output of the value encrypted by the KMS client. The KMS client relies on a KMS key that we need to setup. We can see a list of all our KMS keys by running the following command:

$ aws kms list-keys

The format of these keys is arn:aws:kms:[zone]:[account-id]:key/[key-id].

Now let’s try executing our Lambda function from the AWS console:

$ python CreateHelloWorldEncrypted.py

Let’s check it got created:

$ aws lambda list-functions --query "Functions[*].FunctionName"
[
    "HelloWorldEncrypted", 
]

And now let’s execute the function:

$ aws lambda invoke --function-name HelloWorldEncrypted --invocation-type RequestResponse --log-type Tail /tmp/out | jq ".LogResult"
"U1RBUlQgUmVxdWVzdElkOiA5YmNlM2E1MC0xODMwLTExZTctYjFlNi1hZjQxZDYzMzYxZDkgVmVyc2lvbjogJExBVEVTVAooJ0RlY3J5cHRlZCB2YWx1ZTonLCAnYWJjZGVmZycpCignUGxhaW4gdGV4dDonLCAnaGlqa2xtbm8nKQpFTkQgUmVxdWVzdElkOiA5YmNlM2E1MC0xODMwLTExZTctYjFlNi1hZjQxZDYzMzYxZDkKUkVQT1JUIFJlcXVlc3RJZDogOWJjZTNhNTAtMTgzMC0xMWU3LWIxZTYtYWY0MWQ2MzM2MWQ5CUR1cmF0aW9uOiAzNjAuMDQgbXMJQmlsbGVkIER1cmF0aW9uOiA0MDAgbXMgCU1lbW9yeSBTaXplOiAxMjggTUIJTWF4IE1lbW9yeSBVc2VkOiAyNCBNQgkK"

That’s a bit hard to read, some decoding is needed:

$ echo "U1RBUlQgUmVxdWVzdElkOiA5YmNlM2E1MC0xODMwLTExZTctYjFlNi1hZjQxZDYzMzYxZDkgVmVyc2lvbjogJExBVEVTVAooJ0RlY3J5cHRlZCB2YWx1ZTonLCAnYWJjZGVmZycpCignUGxhaW4gdGV4dDonLCAnaGlqa2xtbm8nKQpFTkQgUmVxdWVzdElkOiA5YmNlM2E1MC0xODMwLTExZTctYjFlNi1hZjQxZDYzMzYxZDkKUkVQT1JUIFJlcXVlc3RJZDogOWJjZTNhNTAtMTgzMC0xMWU3LWIxZTYtYWY0MWQ2MzM2MWQ5CUR1cmF0aW9uOiAzNjAuMDQgbXMJQmlsbGVkIER1cmF0aW9uOiA0MDAgbXMgCU1lbW9yeSBTaXplOiAxMjggTUIJTWF4IE1lbW9yeSBVc2VkOiAyNCBNQgkK" | base64 --decode
START RequestId: 9bce3a50-1830-11e7-b1e6-af41d63361d9 Version: $LATEST
('Decrypted value:', 'abcdefg')
('Plain text:', 'hijklmno')
END RequestId: 9bce3a50-1830-11e7-b1e6-af41d63361d9
REPORT RequestId: 9bce3a50-1830-11e7-b1e6-af41d63361d9	Duration: 360.04 ms	Billed Duration: 400 ms 	Memory Size: 128 MB	Max Memory Used: 24 MB

And it worked, hoorah!

Written by Mark Needham

April 3rd, 2017 at 5:49 am

Posted in Software Development

Tagged with ,

AWS Lambda: Programatically create a Python ‘Hello World’ function

with 2 comments

I’ve been playing around with AWS Lambda over the last couple of weeks and I wanted to automate the creation of these functions and all their surrounding config.

Let’s say we have the following Hello World function:

def lambda_handler(event, context):
    print("Hello world")

To upload it to AWS we need to put it inside a zip file so let’s do that:

$ zip HelloWorld.zip HelloWorld.py
$ unzip -l HelloWorld.zip 
Archive:  HelloWorld.zip
  Length     Date   Time    Name
 --------    ----   ----    ----
       61  04-02-17 22:04   HelloWorld.py
 --------                   -------
       61                   1 file

Now we’re ready to write a script to create our AWS lambda function.

import boto3
 
lambda_client = boto3.client('lambda')
 
fn_name = "HelloWorld"
fn_role = 'arn:aws:iam::[your-aws-id]:role/lambda_basic_execution'
 
lambda_client.create_function(
    FunctionName=fn_name,
    Runtime='python2.7',
    Role=fn_role,
    Handler="{0}.lambda_handler".format(fn_name),
    Code={'ZipFile': open("{0}.zip".format(fn_name), 'rb').read(), },
)

[your-aws-id] needs to be replaced with the identifier of our AWS account. We can find that out be running the following command against the AWS CLI:

$ aws ec2 describe-security-groups --query 'SecurityGroups[0].OwnerId' --output text
123456789012

Now we can create our function:

$ python CreateHelloWorld.py

2017 04 02 23 07 38

And if we test the function we’ll get the expected output:

2017 04 02 23 02 59

Written by Mark Needham

April 2nd, 2017 at 10:11 pm

Posted in Software Development

Tagged with ,

My top 10 technology podcasts

without comments

For the last six months I’ve been listening to 2 or 3 technology podcasts every day while out running and on my commute and I thought it’d be cool to share some of my favourites.

I listen to all of these on the Podbean android app which seems pretty good. It can’t read the RSS feeds of some podcasts but other than that it’s worked well.

Anyway, on with the podcasts:

Software Engineering Daily

This is the most reliable of all the podcasts I’ve listened to and a new episode is posted every weekday.

It sweeps across lots of different areas of technology – there’s a bit of software development, a bit of data engineering, and a bit of infrastructure.

Every now and then there’s a focus on a particular topic area or company which I find really interesting e.g. in 2015 there was a week of Bitcoin focused episodes and more recently there’s been a bunch of episodes about Stripe.

Partially Derivative

This one is more of a data science postcast and cover lots of different areas in that space but thankfully keep the conversation at a level that a non data scientist like me can understand.

I especially liked the post US election episode where they talked about the problems with polling and how most election predictions had ended up being wrong.

There’s roughly one new episode a week.

O’Reilly Bots podcast

I didn’t know anything about bots before i listened to this podcast and it was quite addictive – i powered through all the episodes in a few weeks.

They cover all sorts of topics that I’d have never thought of – why have developers got interested in bots? How do UIs differ to ones in apps? How do users find out about bots?

I really enjoy listening to this one but it’s been a bit quiet recently.

Datanauts

I found this one really useful for getting the hang of infrastructure topics. I wanted to learn a bit more about Kubernetes a few months ago and they had an episode which gives an overview as well as more detailed episodes.

One neat feature of this podcast is that after each part of an interview the hosts summarise what they picked up from that segment. I like that it gives you a few seconds to think about what you picked up and whether it matches the summary.

Some of the episodes go really deep into specific infrastructure topics and I struggle to follow along but there are enough other ones to keep me happy.

Becoming a Data Scientist

This one mirrors the journey of Renee Teate getting into data science and bringing everyone along on the journey.

Each episode is paired with a learning exercises for the listener to try and although any of the learning exercises yet I like how some interviews are structured around them. e.g. Sebastien Rashka was interviewed about model accuracy on the week that was being explored in the learning club.

If you’re interested in data science topics but aren’t a data scientist yourself this is a good one to listen to.

This Week In Machine Learning and AI Podcast

This one mostly goes well over my head but it’s still interesting to listen to other people talk about stuff they’re working on.

There’s a lot of focus on Deep Learning so i think i need to learn a bit more about that and then the episodes will make more sense.

The last episode with Evan Wright was much more accessible. I need more like that one!

The Women in Tech Show

I came across Edaena Salinas on Software Engineering Daily and didn’t initially realise that Edaena had a podcast until a couple of weeks ago.

There’s lots of interesting content on this one. The episodes on data driven marketing and unconscious bias are my favourites of the ones I’ve listened to so far.

The Bitcoin Podcast

I listened to a few shows about bitcoin on Software Engineering Daily and found this podcast while trying to learn more.

Some of the episodes are general enough that i can follow along but others use a lot of block chain specific terminology that leave me feeling a bit lost.

I especially liked the episode that featured Greg Walker of learnmeabitcoin fame. Greg uses Neo4j as part of the website and presented at the London Neo4j meetup earlier this week.

Go Time

This one has a chat based format that I really. They have a cool section called ‘free software Friday’ at the end of each show where everybody calls out a piece of software or maintainer that they’re grateful for.

I was playing around with Go in November/December last year so it was really helpful in pointing me in the right direction. I haven’t done any stuff recently so it’s more a general interest show for now.

Change Log

This one covers lots of different topics, mostly around different open source projects.

The really cool thing about this one is they get every guest to explain their ‘origin story’ i.e. how did they get into software and what was their path to the current job. The interview with Nathan Sobo about Atom was particularly good in this respect.

It’s always interesting to hear how other people got started and contrast it with my own experiences.

Another cool feature of this podcast is that they sometimes have episodes where they interview people at open source conferences.

That’s it folks

That’s all for now. Hopefully there’s one or more in there that you haven’t listened to before.

If you’ve got any suggestions for other ones I should listen to let me know in the comments or send me a message on twitter @markhneedham

Written by Mark Needham

March 30th, 2017 at 10:38 pm

Luigi: Defining dynamic requirements (on output files)

with one comment

In my last blog post I showed how to convert a JSON document containing meetup groups into a CSV file using Luigi, the Python library for building data pipelines. As well as creating that CSV file I wanted to go back to the meetup.com API and download all the members of those groups.

This was a rough flow of what i wanted to do:

  • Take JSON document containing all groups
  • Parse that document and for each group:
    • Call the /members endpoint
    • Save each one of those files as a JSON file
  • Iterate over all those JSON files and create a members CSV file

In the previous post we created the GroupsToJSON task which calls the /groups endpoint on the meetup API and creates the file /tmp/groups.json.

Our new task has that as its initial requirement:

class MembersToCSV(luigi.Task):
    key = luigi.Parameter()
    lat = luigi.Parameter()
    lon = luigi.Parameter()
 
    def requires(self):
        yield GroupsToJSON(self.key, self.lat, self.lon)

But we also want to create a requirement on a task that will make those calls to the /members endpoint and store the result in a JSON file.

One of the patterns that Luigi imposes on us is that each task should only create one file so actually we have a requirement on a collection of tasks rather than just one. It took me a little while to get my head around that!

We don’t know the parameters of those tasks at compile time – we can only calculate them by parsing the JSON file produced by GroupsToJSON.

In Luigi terminology what we want to create is a dynamic requirement. A dynamic requirement is defined inside the run method of a task and can rely on the output of any tasks specified in the requires method, which is exactly what we need.

This code does the delegating part of the job:

class MembersToCSV(luigi.Task):
    key = luigi.Parameter()
    lat = luigi.Parameter()
    lon = luigi.Parameter()
 
 
    def run(self):
        outputs = []
        for input in self.input():
            with input.open('r') as group_file:
                groups_json = json.load(group_file)
                groups = [str(group['id']) for group in groups_json]
 
 
                for group_id in groups:
                    members = MembersToJSON(group_id, self.key)
                    outputs.append(members.output().path)
                    yield members
 
 
    def requires(self):
        yield GroupsToJSON(self.key, self.lat, self.lon)

Inside our run method we iterate over the output of GroupsToJSON (which is our input) and we yield to another task as well as collecting its outputs in the array outputs that we’ll use later.
MembersToJSON looks like this:

class MembersToJSON(luigi.Task):
    group_id = luigi.IntParameter()
    key = luigi.Parameter()
 
 
    def run(self):
        results = []
        uri = "https://api.meetup.com/2/members?&group_id={0}&key={1}".format(self.group_id, self.key)
        while True:
            if uri is None:
                break
            r = requests.get(uri)
            response = r.json()
            for result in response["results"]:
                results.append(result)
            uri = response["meta"]["next"] if response["meta"]["next"] else None
 
 
        with self.output().open("w") as output:
            json.dump(results, output)
 
    def output(self):
        return luigi.LocalTarget("/tmp/members/{0}.json".format(self.group_id))

This task generates one file per group containing a list of all the members of that group.

We can now go back to MembersToCSV and convert those JSON files into a single CSV file:

class MembersToCSV(luigi.Task):
    out_path = "/tmp/members.csv"
    key = luigi.Parameter()
    lat = luigi.Parameter()
    lon = luigi.Parameter()
 
 
    def run(self):
        outputs = []
        for input in self.input():
            with input.open('r') as group_file:
                groups_json = json.load(group_file)
                groups = [str(group['id']) for group in groups_json]
 
 
                for group_id in groups:
                    members = MembersToJSON(group_id, self.key)
                    outputs.append(members.output().path)
                    yield members
 
        with self.output().open("w") as output:
            writer = csv.writer(output, delimiter=",")
            writer.writerow(["id", "name", "joined", "topics", "groupId"])
 
            for path in outputs:
                group_id = path.split("/")[-1].replace(".json", "")
                with open(path) as json_data:
                    d = json.load(json_data)
                    for member in d:
                        topic_ids = ";".join([str(topic["id"]) for topic in member["topics"]])
                        if "name" in member:
                            writer.writerow([member["id"], member["name"], member["joined"], topic_ids, group_id])
 
    def output(self):
        return luigi.LocalTarget(self.out_path)
 
    def requires(self):
        yield GroupsToJSON(self.key, self.lat, self.lon)

We then just need to add our new task as a requirement of the wrapper task:

And we’re ready to roll:

$ PYTHONPATH="." luigi --module blog --local-scheduler Meetup --workers 3

We’ve defined the number of workers here as we can execute those calls to the /members endpoint in parallel and there are ~ 600 calls to make.

All the code from both blog posts is available as a gist if you want to play around with it.

Any questions/advice let me know in the comments or I’m @markhneedham on twitter.

Written by Mark Needham

March 28th, 2017 at 5:39 am

Posted in Python

Tagged with ,

Luigi: An ExternalProgramTask example – Converting JSON to CSV

with 2 comments

I’ve been playing around with the Python library Luigi which is used to build pipelines of batch jobs and I struggled to find an example of an ExternalProgramTask so this is my attempt at filling that void.

Luigi - the Python data library for building data science pipelines

I’m building a little data pipeline to get data from the meetup.com API and put it into CSV files that can be loaded into Neo4j using the LOAD CSV command.

The first task I created calls the /groups endpoint and saves the result into a JSON file:

import luigi
import requests
import json
from collections import Counter
 
class GroupsToJSON(luigi.Task):
    key = luigi.Parameter()
    lat = luigi.Parameter()
    lon = luigi.Parameter()
 
    def run(self):
        seed_topic = "nosql"
        uri = "https://api.meetup.com/2/groups?&topic={0}&lat={1}&lon={2}&key={3}".format(seed_topic, self.lat, self.lon, self.key)
 
        r = requests.get(uri)
        all_topics = [topic["urlkey"]  for result in r.json()["results"] for topic in result["topics"]]
        c = Counter(all_topics)
 
        topics = [entry[0] for entry in c.most_common(10)]
 
        groups = {}
        for topic in topics:
            uri = "https://api.meetup.com/2/groups?&topic={0}&lat={1}&lon={2}&key={3}".format(topic, self.lat, self.lon, self.key)
            r = requests.get(uri)
            for group in r.json()["results"]:
                groups[group["id"]] = group
 
        with self.output().open('w') as groups_file:
            json.dump(list(groups.values()), groups_file, indent=4, sort_keys=True)
 
    def output(self):
        return luigi.LocalTarget("/tmp/groups.json")

We define a few parameters at the top of the class which will be passed in when this task is executed. The most interesting lines of the run function are the last couple where we write the JSON to a file. self.output() refers to the target defined in the output function which in this case is /tmp/groups.json.

Now we need to create a task to convert that JSON file into CSV format. The jq command line tool does this job well so we’ll use that. The following task does the job:

from luigi.contrib.external_program import ExternalProgramTask
 
class GroupsToCSV(luigi.contrib.external_program.ExternalProgramTask):
    file_path = "/tmp/groups.csv"
    key = luigi.Parameter()
    lat = luigi.Parameter()
    lon = luigi.Parameter()
 
    def program_args(self):
        return ["./groups.sh", self.input()[0].path, self.output().path]
 
    def output(self):
        return luigi.LocalTarget(self.file_path)
 
    def requires(self):
        yield GroupsToJSON(self.key, self.lat, self.lon)

groups.sh

#!/bin/bash
 
in=${1}
out=${2}
 
echo "id,name,urlname,link,rating,created,description,organiserName,organiserMemberId" > ${out}
jq -r '.[] | [.id, .name, .urlname, .link, .rating, .created, .description, .organizer.name, .organizer.member_id] | @csv' ${in} >> ${out}

I wanted to call jq directly from the Python code but I couldn’t figure out how to do it so putting that code in a shell script is my workaround.

The last piece of the puzzle is a wrapper task that launches the others:

import os
 
class Meetup(luigi.WrapperTask):
    def run(self):
        print("Running Meetup")
 
    def requires(self):
        key = os.environ['MEETUP_API_KEY']
        lat = os.getenv('LAT', "51.5072")
        lon = os.getenv('LON', "0.1275")
 
        yield GroupsToCSV(key, lat, lon)

Now we’re ready to run the tasks:

$ PYTHONPATH="." luigi --module blog --local-scheduler Meetup
DEBUG: Checking if Meetup() is complete
DEBUG: Checking if GroupsToCSV(key=xxx, lat=51.5072, lon=0.1275) is complete
INFO: Informed scheduler that task   Meetup__99914b932b   has status   PENDING
DEBUG: Checking if GroupsToJSON(key=xxx, lat=51.5072, lon=0.1275) is complete
INFO: Informed scheduler that task   GroupsToCSV_xxx_51_5072_0_1275_e07372cebf   has status   PENDING
INFO: Informed scheduler that task   GroupsToJSON_xxx_51_5072_0_1275_e07372cebf   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 3
INFO: [pid 4452] Worker Worker(salt=970508581, workers=1, host=Marks-MBP-4, username=markneedham, pid=4452) running   GroupsToJSON(key=xxx, lat=51.5072, lon=0.1275)
INFO: [pid 4452] Worker Worker(salt=970508581, workers=1, host=Marks-MBP-4, username=markneedham, pid=4452) done      GroupsToJSON(key=xxx, lat=51.5072, lon=0.1275)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   GroupsToJSON_xxx_51_5072_0_1275_e07372cebf   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 4452] Worker Worker(salt=970508581, workers=1, host=Marks-MBP-4, username=markneedham, pid=4452) running   GroupsToCSV(key=xxx, lat=51.5072, lon=0.1275)
INFO: Running command: ./groups.sh /tmp/groups.json /tmp/groups.csv
INFO: [pid 4452] Worker Worker(salt=970508581, workers=1, host=Marks-MBP-4, username=markneedham, pid=4452) done      GroupsToCSV(key=xxx, lat=51.5072, lon=0.1275)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   GroupsToCSV_xxx_51_5072_0_1275_e07372cebf   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 4452] Worker Worker(salt=970508581, workers=1, host=Marks-MBP-4, username=markneedham, pid=4452) running   Meetup()
Running Meetup
INFO: [pid 4452] Worker Worker(salt=970508581, workers=1, host=Marks-MBP-4, username=markneedham, pid=4452) done      Meetup()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Meetup__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=970508581, workers=1, host=Marks-MBP-4, username=markneedham, pid=4452) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====
 
Scheduled 3 tasks of which:
* 3 ran successfully:
    - 1 GroupsToCSV(key=xxx, lat=51.5072, lon=0.1275)
    - 1 GroupsToJSON(key=xxx, lat=51.5072, lon=0.1275)
    - 1 Meetup()
 
This progress looks :) because there were no failed tasks or missing external dependencies
 
===== Luigi Execution Summary =====

Looks good! Let’s quickly look at our CSV file:

$ head -n10 /tmp/groups.csv 
id,name,urlname,link,rating,created,description,organiserName,organiserMemberId
1114381,"London NoSQL, MySQL, Open Source Community","london-nosql-mysql","https://www.meetup.com/london-nosql-mysql/",4.28,1208505614000,"<p>Meet others in London interested in NoSQL, MySQL, and Open Source Databases.</p>","Sinead Lawless",185675230
1561841,"Enterprise Search London Meetup","es-london","https://www.meetup.com/es-london/",4.66,1259157419000,"<p>Enterprise Search London is a meetup for anyone interested in building search and discovery experiences — from intranet search and site search, to advanced discovery applications and beyond.</p>
<p>Disclaimer: This meetup is NOT about SEO or search engine marketing.</p>
<p><strong>What people are saying:</strong></p>
<ul>
<li><span>""Join this meetup if you have a passion for enterprise search and user experience that you would like to share with other able-minded practitioners."" — Vegard Sandvold</span></li>
<li><span>""Full marks for vision and execution. Looking forward to the next Meetup."" — Martin White</span></li>
<li><span>“Consistently excellent” — Helen Lippell</span></li>
</ul>

Sweet! And what if we run it again?

$ PYTHONPATH="." luigi --module blog --local-scheduler Meetup
DEBUG: Checking if Meetup() is complete
INFO: Informed scheduler that task   Meetup__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=172768377, workers=1, host=Marks-MBP-4, username=markneedham, pid=4531) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====
 
Scheduled 1 tasks of which:
* 1 present dependencies were encountered:
    - 1 Meetup()
 
Did not run any tasks
This progress looks :) because there were no failed tasks or missing external dependencies
 
===== Luigi Execution Summary =====

As expected nothing happens since our dependencies are already satisfied and we have our first Luigi pipeline up and running.

Written by Mark Needham

March 25th, 2017 at 2:09 pm

Posted in Python

Tagged with , , ,

Python 3: TypeError: Object of type ‘dict_values’ is not JSON serializable

without comments

I’ve recently upgraded to Python 3 (I know, took me a while!) and realised that one of my scripts that writes JSON to a file no longer works!

This is a simplified version of what I’m doing:

>>> import json
>>> x = {"mark": {"name": "Mark"}, "michael": {"name": "Michael"}  } 
>>> json.dumps(x.values())
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/json/__init__.py", line 231, in dumps
    return _default_encoder.encode(obj)
  File "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/json/encoder.py", line 180, in default
    o.__class__.__name__)
TypeError: Object of type 'dict_values' is not JSON serializable

Python 2.7 would be perfectly happy:

>>> json.dumps(x.values())
'[{"name": "Michael"}, {"name": "Mark"}]'

The difference is in the results returned by the values method:

# Python 2.7.10
>>> x.values()
[{'name': 'Michael'}, {'name': 'Mark'}]
 
# Python 3.6.0
>>> x.values()
dict_values([{'name': 'Mark'}, {'name': 'Michael'}])
>>>

Python 3 no longer returns an array, instead we have a dict_values wrapper around the data.

Luckily this is easy to resolve – we just need to wrap the call to values with a call to list:

>>> json.dumps(list(x.values()))
'[{"name": "Mark"}, {"name": "Michael"}]'

This versions works with Python 2.7 as well so if I accidentally run the script with an old version the world isn’t going to explode.

Written by Mark Needham

March 19th, 2017 at 4:40 pm

Posted in Python

Tagged with

Neo4j: apoc.date.parse – java.lang.IllegalArgumentException: Illegal pattern character ‘T’ / java.text.ParseException: Unparseable date: “2012-11-12T08:46:15Z”

without comments

I often find myself wanting to convert date strings into Unix timestamps using Neo4j’s APOC library and unfortunately some sources don’t use the format that apoc.date.parse expects.

e.g.

return apoc.date.parse("2012-11-12T08:46:15Z",'s') 
AS ts
 
Failed to invoke function `apoc.date.parse`: 
Caused by: java.lang.IllegalArgumentException: java.text.ParseException: Unparseable date: "2012-11-12T08:46:15Z"

We need to define the format explicitly so the SimpleDataFormat documentation comes in handy. I tried the following:

return apoc.date.parse("2012-11-12T08:46:15Z",'s',"yyyy-MM-ddTHH:mm:ssZ") 
AS ts
 
Failed to invoke function `apoc.date.parse`: 
Caused by: java.lang.IllegalArgumentException: Illegal pattern character 'T'

Hmmm, we need to quote the ‘T’ character – we can’t just include it in the pattern. Let’s try again:

return  apoc.date.parse("2012-11-12T08:46:15Z",'s',"yyyy-MM-dd'T'HH:mm:ssZ") 
AS ts
 
Failed to invoke function `apoc.date.parse`: 
Caused by: java.lang.IllegalArgumentException: java.text.ParseException: Unparseable date: "2012-11-12T08:46:15Z"

The problem now is that we haven’t quoted the ‘Z’ but the error doesn’t indicate that – not sure why!

We can either quote the ‘Z’:

return  apoc.date.parse("2012-11-12T08:46:15Z",'s',"yyyy-MM-dd'T'HH:mm:ss'Z'") 
AS ts
 
╒══════════╕
│"ts"      │
╞══════════╡
│1352709975│
└──────────┘

Or we can match the timezone using ‘XXX’:

return  apoc.date.parse("2012-11-12T08:46:15Z",'s',"yyyy-MM-dd'T'HH:mm:ssXXX") 
AS ts
 
╒══════════╕
│"ts"      │
╞══════════╡
│1352709975│
└──────────┘

Written by Mark Needham

March 6th, 2017 at 8:52 pm

Posted in neo4j

Tagged with

Neo4j: Graphing the ‘My name is…I work’ Twitter meme

without comments

Over the last few days I’ve been watching the chain of ‘My name is…’ tweets kicked off by DHH with interest. As I understand it, the idea is to show that coding interview riddles/hard tasks on a whiteboard are ridiculous.

Other people quoted that tweet and added their own piece and yesterday Eduardo Hernacki suggested that traversing this chain of tweets seemed tailor made for Neo4j.

Michael was quickly on the scene and created a Cypher query which calls the Twitter API and creates a Neo4j graph from the resulting JSON response. The only tricky bit is creating a ‘bearer token’ but Jason Kotchoff has a helpful gist showing how to generate one from your Twitter consumer key and consumer secret.

Now that we’re got our bearer token let’s create a parameter to store it. Type the following in the Neo4j browser:

:param bearer: '<your-bearer-token-goes-here>'

Now we’re ready to query the Twitter API. We’ll start with the search API and find all tweets which contain the text ‘”my name” “I work”‘. That will return a JSON response containing lots of tweets. We’ll then create a node for each tweet it returns, a node for the user who posted the tweet, a node for the tweet it quotes, and relationships to glue them all together.

We’re going to use the apoc.load.jsonParams procedure from the APOC library to help us import the data. If you want to follow along you can use a Neo4j sandbox instance which comes with APOC installed. For your local Neo4j installation, grab the APOC jar and put it into your plugins folder before restarting Neo4j.

This is the query in full:

WITH 'https://api.twitter.com/1.1/search/tweets.json?count=100&result_type=recent&lang=en&q=' as url, {bearer} as bearer
 
CALL apoc.load.jsonParams(url + "%22my%20name%22%20is%22%20%22I%20work%22",{Authorization:"Bearer "+bearer},null) yield value
 
UNWIND value.statuses as status
WITH status, status.user as u, status.entities as e
WHERE status.quoted_status_id is not null
 
// create a node for the original tweet
MERGE (t:Tweet {id:status.id}) 
ON CREATE SET t.text=status.text,t.created_at=status.created_at,t.retweet_count=status.retweet_count, t.favorite_count=status.favorite_count
 
// create a node for the author + a POSTED relationship from the author to the tweet
MERGE (p:User {name:u.screen_name})
MERGE (p)-[:POSTED]->(t)
 
// create a MENTIONED relationship from the tweet to any users mentioned in the tweet
FOREACH (m IN e.user_mentions | MERGE (mu:User {name:m.screen_name}) MERGE (t)-[:MENTIONED]->(mu))
 
// create a node for the quoted tweet and create a QUOTED relationship from the original tweet to the quoted one
MERGE (q:Tweet {id:status.quoted_status_id})
MERGE (t)–[:QUOTED]->(q)
 
// repeat the above steps for the quoted tweet
WITH t as t0, status.quoted_status as status WHERE status is not null
WITH t0, status, status.user as u, status.entities as e
 
MERGE (t:Tweet {id:status.id}) 
ON CREATE SET t.text=status.text,t.created_at=status.created_at,t.retweet_count=status.retweet_count, t.favorite_count=status.favorite_count
 
MERGE (t0)-[:QUOTED]->(t)
 
MERGE (p:User {name:u.screen_name})
MERGE (p)-[:POSTED]->(t)
 
FOREACH (m IN e.user_mentions | MERGE (mu:User {name:m.screen_name}) MERGE (t)-[:MENTIONED]->(mu))
 
MERGE (q:Tweet {id:status.quoted_status_id})
MERGE (t)–[:QUOTED]->(q);

The resulting graph looks like this:

MATCH p=()-[r:QUOTED]->() RETURN p LIMIT 25

Graph  21

A more interesting query would be to find the path from DHH to Eduardo which we can find with the following query:

match path = (dhh:Tweet {id: 834146806594433025})<-[:QUOTED*]-(eduardo:Tweet{id: 836400531983724545})
UNWIND NODES(path) AS tweet
MATCH (tweet)<-[:POSTED]->(user)
RETURN tweet, user

This query:

  • starts from DHH’s tweet
  • traverses all QUOTED relationships until it finds Eduardo’s tweet
  • collects all those tweets and then finds the author
  • returns the tweet and the author

And this is the output:

Graph  20

I ran a couple of other queries against the Twitter API to hydrate some nodes that we hadn’t set all the properties on – you can see all the queries on this gist.

For the next couple of days I also have a sandbox running https://10-0-1-157-32898.neo4jsandbox.com/browser/. You can login using the credentials readonly/twitter.

If you have any questions/suggestions let me know in the comments, @markhneedham on twitter, or email the Neo4j DevRel team – devrel@neo4j.com.

Written by Mark Needham

February 28th, 2017 at 3:50 pm

Posted in neo4j

Tagged with ,

Neo4j: How do null values even work?

without comments

Every now and then I find myself wanting to import a CSV file into Neo4j and I always get confused with how to handle the various null values that can lurk within.

Let’s start with an example that doesn’t have a CSV file in sight. Consider the following list and my attempt to only return null values:

WITH [null, "null", "", "Mark"] AS values
UNWIND values AS value
WITH value WHERE value = null
RETURN value
 
(no changes, no records)

Hmm that’s weird. I’d have expected that at least keep the first value in the collection. What about if we do the inverse?

WITH [null, "null", "", "Mark"] AS values
UNWIND values AS value
WITH value WHERE value <> null
RETURN value
 
(no changes, no records)

Still nothing! Let’s try returning the output of our comparisons rather than filtering rows:

WITH [null, "null", "", "Mark"] AS values
UNWIND values AS value
RETURN value = null AS outcome
 
╒═══════╤═════════╕
│"value"│"outcome"│
╞═══════╪═════════╡
│null   │null     │
├───────┼─────────┤
│"null" │null     │
├───────┼─────────┤
│""     │null     │
├───────┼─────────┤
│"Mark" │null     │
└───────┴─────────┘

Ok so that isn’t what we expected. Everything has an ‘outcome’ of ‘null’! What about if we want to check whether the value is the string “Mark”?

WITH [null, "null", "", "Mark"] AS values
UNWIND values AS value
RETURN value = "Mark" AS outcome
 
╒═══════╤═════════╕
│"value"│"outcome"│
╞═══════╪═════════╡
│null   │null     │
├───────┼─────────┤
│"null" │false    │
├───────┼─────────┤
│""     │false    │
├───────┼─────────┤
│"Mark" │true     │
└───────┴─────────┘

From executing this query we learn that if one side of a comparison is null then the return value is always going to be null.

So how do we exclude a row if it’s null?

It turns out we have to use the ‘is’ keyword rather than using the equality operator. Let’s see what that looks like:

WITH [null, "null", "", "Mark"] AS values
UNWIND values AS value
WITH value WHERE value is null
RETURN value
 
╒═══════╕
│"value"│
╞═══════╡
│null   │
└───────┘

And the positive case:

WITH [null, "null", "", "Mark"] AS values
UNWIND values AS value
WITH value WHERE value is not null
RETURN value
 
╒═══════╕
│"value"│
╞═══════╡
│"null" │
├───────┤
│""     │
├───────┤
│"Mark" │
└───────┘

What if we want to get rid of empty strings?

WITH [null, "null", "", "Mark"] AS values
UNWIND values AS value
WITH value WHERE value <> ""
RETURN value
 
╒═══════╕
│"value"│
╞═══════╡
│"null" │
├───────┤
│"Mark" │
└───────┘

Interestingly that also gets rid of the null value which I hadn’t expected. But if we look for values matching the empty string:

WITH [null, "null", "", "Mark"] AS values
UNWIND values AS value
WITH value WHERE value = ""
RETURN value
 
╒═══════╕
│"value"│
╞═══════╡
│""     │
└───────┘

It’s not there either! Hmm what’s going on here:

WITH [null, "null", "", "Mark"] AS values
UNWIND values AS value
RETURN value, value = "" AS isEmpty, value <> "" AS isNotEmpty
 
╒═══════╤═════════╤════════════╕
│"value"│"isEmpty"│"isNotEmpty"│
╞═══════╪═════════╪════════════╡
│null   │null     │null        │
├───────┼─────────┼────────────┤
│"null" │false    │true        │
├───────┼─────────┼────────────┤
│""     │true     │false       │
├───────┼─────────┼────────────┤
│"Mark" │false    │true        │
└───────┴─────────┴────────────┘

null values seem to get filtered out for every type of equality match unless we explicitly check that a value ‘is null’.

So how do we use this knowledge when we’re parsing CSV files using Neo4j’s LOAD CSV tool?

Let’s say we have a CSV file that looks like this:

$ cat nulls.csv
name,company
"Mark",
"Michael",""
"Will",null
"Ryan","Neo4j"

So none of the first three rows have a value for ‘company’. I don’t have any value at all, Michael has an empty string, and Will has a null value. Let’s see how LOAD CSV interprets this:

load csv with headers from "file:///nulls.csv" AS row
RETURN row
 
╒═════════════════════════════════╕
│"row"                            │
╞═════════════════════════════════╡
│{"name":"Mark","company":null}   │
├─────────────────────────────────┤
│{"name":"Michael","company":""}  │
├─────────────────────────────────┤
│{"name":"Will","company":"null"} │
├─────────────────────────────────┤
│{"name":"Ryan","company":"Neo4j"}│
└─────────────────────────────────┘

We’ve got the full sweep of all the combinations from above. We’d like to create a Person node for each row but only create a Company node and associated ‘WORKS_FOR’ relationshp if an actual company is defined – we don’t want to create a null company.

So we only want to create a company node and ‘WORKS_FOR’ relationship for the Ryan row.

The following query does the trick:

load csv with headers from "file:///nulls.csv" AS row
MERGE (p:Person {name: row.name})
WITH p, row
WHERE row.company <> "" AND row.company <> "null"
MERGE (c:Company {name: row.company})
MERGE (p)-[:WORKS_FOR]->(c)
 
Added 5 labels, created 5 nodes, set 5 properties, created 1 relationship, statement completed in 117 ms.

And if we visualise what’s been created:

Graph  15

Perfect. Perhaps this behaviour is obvious but it always trips me up so hopefully it’ll be useful to someone else as well!

There’s also a section on the Neo4j developer pages describing even more null scenarios that’s worth checking out.

Written by Mark Needham

February 22nd, 2017 at 11:28 pm

Posted in neo4j

Tagged with

Neo4j: Analysing a CSV file using LOAD CSV and Cypher

without comments

Last week we ran our first online meetup for several years and I wanted to wanted to analyse the stats that YouTube lets you download for an event.

The file I downloaded looked like this:

$ cat ~/Downloads/youtube_stats_pW9boJoUxO0.csv 
Video IDs:, pW9boJoUxO0, Start time:, Wed Feb 15 08:57:55 2017, End time:, Wed Feb 15 10:03:10 2017
Playbacks, Peak concurrent viewers, Total view time (hours), Average session length (minutes)
348, 112, 97.125, 16.7456896552, 
 
Country code, AR, AT, BE, BR, BY, CA, CH, CL, CR, CZ, DE, DK, EC, EE, ES, FI, FR, GB, HU, IE, IL, IN, IT, LB, LU, LV, MY, NL, NO, NZ, PK, PL, QA, RO, RS, RU, SE, TR, US, VN, ZA
Playbacks, 2, 2, 1, 14, 1, 10, 2, 1, 1, 1, 27, 1, 1, 1, 3, 1, 25, 54, 1, 4, 6, 8, 1, 1, 1, 1, 1, 23, 1, 1, 1, 1, 1, 1, 2, 6, 22, 1, 114, 1, 1
Peak concurrent viewers, 2, 1, 1, 4, 1, 5, 1, 1, 0, 0, 11, 1, 1, 1, 2, 1, 6, 25, 1, 3, 3, 2, 1, 1, 1, 1, 1, 9, 1, 1, 0, 1, 0, 1, 1, 3, 7, 0, 44, 1, 0
Total view time (hours), 1.075, 0.0166666666667, 0.175, 2.58333333333, 0.00833333333333, 3.01666666667, 0.858333333333, 0.0583333333333, 0.0, 0.0, 8.69166666667, 0.8, 0.0166666666667, 0.0583333333333, 0.966666666667, 0.0166666666667, 4.20833333333, 20.8333333333, 0.00833333333333, 1.39166666667, 1.75, 0.766666666667, 0.00833333333333, 0.15, 0.0333333333333, 1.05833333333, 0.0333333333333, 7.36666666667, 0.0583333333333, 0.916666666667, 0.0, 0.00833333333333, 0.0, 0.00833333333333, 0.4, 1.10833333333, 5.28333333333, 0.0, 32.7333333333, 0.658333333333, 0.0
Average session length (minutes), 32.25, 0.5, 10.5, 11.0714285714, 0.5, 18.1, 25.75, 3.5, 0.0, 0.0, 19.3148148148, 48.0, 1.0, 3.5, 19.3333333333, 1.0, 10.1, 23.1481481481, 0.5, 20.875, 17.5, 5.75, 0.5, 9.0, 2.0, 63.5, 2.0, 19.2173913043, 3.5, 55.0, 0.0, 0.5, 0.0, 0.5, 12.0, 11.0833333333, 14.4090909091, 0.0, 17.2280701754, 39.5, 0.0

I want to look at the country specific stats so the first 4 lines aren’t interesting to me:

$ tail -n+5 youtube_stats_pW9boJoUxO0.csv > youtube.csv

I then put the youtube.csv file into the import directory of Neo4j and wrote the following query to return a row representing each country and its score for each of the metrics:

load csv with headers from "file:///youtube.csv" AS row
WITH [key in keys(row) where key <> "Country code"] AS keys, row, row["Country code"] AS heading
UNWIND keys AS key
RETURN key AS country, heading AS key, row[key] AS value
 
╒═════════╤═══════════╤═══════╕
│"country"│"key"      │"value"│
╞═════════╪═══════════╪═══════╡
│" SE"    │"Playbacks"│"22"   │
├─────────┼───────────┼───────┤
│" GB"    │"Playbacks"│"54"   │
├─────────┼───────────┼───────┤
│" FR"    │"Playbacks"│"25"   │
├─────────┼───────────┼───────┤
│" RS"    │"Playbacks"│"2"    │
├─────────┼───────────┼───────┤
│" LV"    │"Playbacks"│"1"    │
└─────────┴───────────┴───────┘

Now I want to create a node representing each country and create a property for each of the metrics. Since the property names are going to be dynamic I’ll make use of the APOC library which I drop into my plugins directory. I then tweaked the query to create the nodes:

load csv with headers from "https://dl.dropboxusercontent.com/u/14493611/youtube.csv" AS row
WITH [key in keys(row) where key <> "Country code"] AS keys, row, row["Country code"] AS heading
UNWIND keys AS key
WITH key AS country, heading AS key, row[key] AS value
MERGE (c:Country {name: replace(country, " ", "")})
WITH *
CALL apoc.create.setProperty(c, key, toInteger(value))
YIELD node
RETURN COUNT(*)

We can now see which country provided the most viewers:

MATCH (n:Country) 
RETURN n.name, n.Playbacks AS playbacks, n.`Total view time (hours)` AS viewTimeInHours, n.`Peak concurrent viewers` AS peakConcViewers, n.`Average session length (minutes)` AS aveSessionMins
ORDER BY playbacks DESC
LIMIT 10
 
╒════════╤═══════════╤═════════════════╤═════════════════╤════════════════╕
│"n.name"│"playbacks"│"viewTimeInHours"│"peakConcViewers"│"aveSessionMins"│
╞════════╪═══════════╪═════════════════╪═════════════════╪════════════════╡
│"US"    │"114"      │"32"             │"44"             │"17"            │
├────────┼───────────┼─────────────────┼─────────────────┼────────────────┤
│"GB"    │"54"       │"20"             │"25"             │"23"            │
├────────┼───────────┼─────────────────┼─────────────────┼────────────────┤
│"DE"    │"27"       │"8"              │"11"             │"19"            │
├────────┼───────────┼─────────────────┼─────────────────┼────────────────┤
│"FR"    │"25"       │"4"              │"6"              │"10"            │
├────────┼───────────┼─────────────────┼─────────────────┼────────────────┤
│"NL"    │"23"       │"7"              │"9"              │"19"            │
├────────┼───────────┼─────────────────┼─────────────────┼────────────────┤
│"SE"    │"22"       │"5"              │"7"              │"14"            │
├────────┼───────────┼─────────────────┼─────────────────┼────────────────┤
│"BR"    │"14"       │"2"              │"4"              │"11"            │
├────────┼───────────┼─────────────────┼─────────────────┼────────────────┤
│"CA"    │"10"       │"3"              │"5"              │"18"            │
├────────┼───────────┼─────────────────┼─────────────────┼────────────────┤
│"IN"    │"8"        │"0"              │"2"              │"5"             │
├────────┼───────────┼─────────────────┼─────────────────┼────────────────┤
│"IL"    │"6"        │"1"              │"3"              │"17"            │
└────────┴───────────┴─────────────────┴─────────────────┴────────────────┘

The United States in first unsurprisingly followed by the UK, Germany, and France. We ran the meetup at 5pm UK time so it was a friendly enough time for this side of the globe but not so friendly for Asia or Australia so it’s not too surprising we don’t see anybody from there!

For my last trick I wanted to see the full names of the countries so I downloaded the 2 digit codes for each country along with their full name.

I then updated my graph:

load csv with headers from "file:///countries.csv" AS row
MATCH (c:Country {name: row.Code})
SET c.fullName = row.Name;

Now let’s re-run our query and show the country fullnames instead:

MATCH (n:Country) 
RETURN n.fullName, n.Playbacks AS playbacks, n.`Total view time (hours)` AS viewTimeInHours, n.`Peak concurrent viewers` AS peakConcViewers, n.`Average session length (minutes)` AS aveSessionMins
ORDER BY playbacks DESC
LIMIT 10
 
╒════════════════╤═══════════╤═════════════════╤═════════════════╤════════════════╕
│"n.fullName"    │"playbacks"│"viewTimeInHours"│"peakConcViewers"│"aveSessionMins"│
╞════════════════╪═══════════╪═════════════════╪═════════════════╪════════════════╡
│"United States" │"114"      │"32"             │"44"             │"17"            │
├────────────────┼───────────┼─────────────────┼─────────────────┼────────────────┤
│"United Kingdom"│"54"       │"20"             │"25"             │"23"            │
├────────────────┼───────────┼─────────────────┼─────────────────┼────────────────┤
│"Germany"       │"27"       │"8"              │"11"             │"19"            │
├────────────────┼───────────┼─────────────────┼─────────────────┼────────────────┤
│"France"        │"25"       │"4"              │"6"              │"10"            │
├────────────────┼───────────┼─────────────────┼─────────────────┼────────────────┤
│"Netherlands"   │"23"       │"7"              │"9"              │"19"            │
├────────────────┼───────────┼─────────────────┼─────────────────┼────────────────┤
│"Sweden"        │"22"       │"5"              │"7"              │"14"            │
├────────────────┼───────────┼─────────────────┼─────────────────┼────────────────┤
│"Brazil"        │"14"       │"2"              │"4"              │"11"            │
├────────────────┼───────────┼─────────────────┼─────────────────┼────────────────┤
│"Canada"        │"10"       │"3"              │"5"              │"18"            │
├────────────────┼───────────┼─────────────────┼─────────────────┼────────────────┤
│"India"         │"8"        │"0"              │"2"              │"5"             │
├────────────────┼───────────┼─────────────────┼─────────────────┼────────────────┤
│"Israel"        │"6"        │"1"              │"3"              │"17"            │
└────────────────┴───────────┴─────────────────┴─────────────────┴────────────────┘

And that’s the end of my analysis with no relationships in sight!

Written by Mark Needham

February 19th, 2017 at 10:39 pm

Posted in neo4j

Tagged with ,