Mark Needham

Thoughts on Software Development

Neo4j: Cypher – Using MERGE with schema indexes/constraints

with one comment

A couple of weeks about I wrote about cypher’s MERGE function and over the last few days I’ve been exploring how it works when used with schema indexes and unique constraints.

A common use case with Neo4j is to model users and events where an event could be a tweet, Facebook post or Pinterest pin. The model might look like this:

2013 12 22 20 14 04

We’d have a stream of (user, event) pairs and a cypher statement like the following to get the data into Neo4j:

MERGE (u:User {id: {userId}})
MERGE (e:Event {id: {eventId}})
MERGE (u)-[:CREATED_EVENT]->(m)
RETURN u, e

We’d like to ensure that we don’t get duplicate users or events and MERGE provides the semantics to do this:

MERGE ensures that a pattern exists in the graph. Either the pattern already exists, or it needs to be created.

I wanted to see what would happen if I wrote a script that tried to create the same (user, event) pairs concurrently and ended up with the following:

import org.neo4j.cypher.javacompat.ExecutionEngine;
import org.neo4j.cypher.javacompat.ExecutionResult;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.factory.GraphDatabaseFactory;
import org.neo4j.helpers.collection.MapUtil;
import org.neo4j.kernel.impl.util.FileUtils;
 
...
 
public class MergeTime
{
    public static void main(String[] args) throws Exception
    {
        String pathToDb = "/tmp/foo";
        FileUtils.deleteRecursively(new File(pathToDb));
 
        GraphDatabaseService db = new GraphDatabaseFactory().newEmbeddedDatabase( pathToDb );
        final ExecutionEngine engine = new ExecutionEngine( db );
 
        ExecutorService executor = Executors.newFixedThreadPool( 50 );
        final Random random = new Random();
 
        final int numberOfUsers = 10;
        final int numberOfEvents = 50;
        int iterations = 100;
        final List<Integer> userIds = generateIds( numberOfUsers );
        final List<Integer> eventIds = generateIds( numberOfEvents );
        List<Future> merges = new ArrayList<>(  );
        for ( int i = 0; i < iterations; i++ )
        {
            Integer userId = userIds.get(random.nextInt(numberOfUsers));
            Integer eventId = eventIds.get(random.nextInt(numberOfEvents));
            merges.add(executor.submit(mergeAway( engine, userId, eventId) ));
        }
 
        for ( Future merge : merges )
        {
            merge.get();
        }
 
        executor.shutdown();
 
        ExecutionResult userResult = engine.execute("MATCH (u:User) RETURN u.id as userId, COUNT(u) AS count ORDER BY userId");
 
        System.out.println(userResult.dumpToString());
 
    }
 
    private static Runnable mergeAway(final ExecutionEngine engine,
                                      final Integer userId, final Integer eventId)
    {
        return new Runnable()
        {
            @Override
            public void run()
            {
                try
                {
                    ExecutionResult result = engine.execute(
                            "MERGE (u:User {id: {userId}})\n" +
                            "MERGE (e:Event {id: {eventId}})\n" +
                            "MERGE (u)-[:CREATED_EVENT]->(m)\n" +
                            "RETURN u, e",
                            MapUtil.map( "userId", userId, "eventId", eventId) );
 
                    // throw away
                    for ( Map<String, Object> row : result ) { }
                }
                catch ( Exception e )
                {
                    e.printStackTrace();
                }
            }
        };
    }
 
    private static List<Integer> generateIds( int amount )
    {
        List<Integer> ids = new ArrayList<>();
        for ( int i = 1; i <= amount; i++ )
        {
            ids.add( i );
        }
        return ids;
    }
}

We create a maximum of 10 users and 50 events and then do 100 iterations of random (user, event) pairs with 50 concurrent threads. Afterwards we execute a query which checks how many users of each id have been created and get the following output:

+----------------+
| userId | count |
+----------------+
| 1      | 6     |
| 2      | 3     |
| 3      | 4     |
| 4      | 8     |
| 5      | 9     |
| 6      | 7     |
| 7      | 5     |
| 8      | 3     |
| 9      | 3     |
| 10     | 2     |
+----------------+
10 rows

Next I added in a schema index on users and events to see if that would make any difference, something Javad Karabi recently asked on the user group.

CREATE INDEX ON :User(id)
CREATE INDEX ON :Event(id)

We wouldn’t expect this to make a difference as schema indexes don’t ensure uniqueness but I ran it anyway t and got the following output:

+----------------+
| userId | count |
+----------------+
| 1      | 2     |
| 2      | 9     |
| 3      | 7     |
| 4      | 2     |
| 5      | 3     |
| 6      | 7     |
| 7      | 7     |
| 8      | 6     |
| 9      | 5     |
| 10     | 3     |
+----------------+
10 rows

If we want to ensure uniqueness of users and events we need to add a unique constraint on the id of both of these labels:

CREATE CONSTRAINT ON (user:User) ASSERT user.id IS UNIQUE
CREATE CONSTRAINT ON (event:Event) ASSERT event.id IS UNIQUE

Now if we run the test we’ll only end up with one of each user:

+----------------+
| userId | count |
+----------------+
| 1      | 1     |
| 2      | 1     |
| 3      | 1     |
| 4      | 1     |
| 5      | 1     |
| 6      | 1     |
| 7      | 1     |
| 8      | 1     |
| 9      | 1     |
| 10     | 1     |
+----------------+
10 rows

We’d see the same type of result if we ran a similar query checking for the uniqueness of events.

As far as I can tell this duplication of nodes that we merge on only happens if you try and create the same node twice concurrently. Once the node has been created we can use MERGE with a non-unique index and a duplicate node won’t get created.

All the code from this post is available as a gist if you want to play around with it.

Be Sociable, Share!

Written by Mark Needham

December 23rd, 2013 at 1:30 pm

Posted in neo4j

Tagged with