Mark Needham

Thoughts on Software Development

Archive for the ‘Java’ tag

Java: Determining the status of data import using kill signals

with 2 comments

A few weeks ago I was working on the initial import of ~ 60 million bits of data into Neo4j and we kept running into a problem where the import process just seemed to freeze and nothing else was imported.

It was very difficult to tell what was happening inside the process – taking a thread dump merely informed us that it was attempting to process one line of a CSV line and was somehow unable to do so.

One way to help debug this would have been to print out every single line of the CSV as we processed it and then watch where it got stuck but this seemed a bit over kill. Ideally we wanted to only print out the line we were processing on demand.

As luck would have it we can do exactly this by sending a kill signal to our import process and have it print out where it had got up to. We had to make sure we picked a signal which wasn’t already being handled by the JVM and decided to go with ‘SIGTRAP’ i.e. kill -5 [pid]

We came across a neat blog post that explained how to wire everything up and then created our own version:

class Kill3Handler implements SignalHandler
{
    private AtomicInteger linesProcessed;
    private AtomicReference<Map<String, Object>> lastRowProcessed;
 
    public Kill3Handler( AtomicInteger linesProcessed, AtomicReference<Map<String, Object>> lastRowProcessed )
    {
        this.linesProcessed = linesProcessed;
        this.lastRowProcessed = lastRowProcessed;
    }
 
    @Override
    public void handle( Signal signal )
    {
        System.out.println("Last Line Processed: " + linesProcessed.get() + " " + lastRowProcessed.get());
    }
}

We then wired that up like so:

AtomicInteger linesProcessed = new AtomicInteger( 0 );
AtomicReference<Map<String, Object>> lastRowProcessed = new AtomicReference<>(  );
Kill3Handler kill3Handler = new Kill3Handler( linesProcessed, lastRowProcessed );
Signal.handle(new Signal("TRAP"), kill3Handler);
 
// as we iterate each line we update those variables
 
linesProcessed.incrementAndGet();
lastRowProcessed.getAndSet( properties ); // properties = a representation of the row we're processing

This worked really well for us and we were able to work out that we had a slight problem with some of the data in our CSV file which was causing it to be processed incorrectly.

We hadn’t been able to see this by visual inspection since the CSV files were a few GB in size. We’d therefore only skimmed a few lines as a sanity check.

I didn’t even know you could do this but it’s a neat trick to keep in mind – I’m sure it shall come in useful again.

Written by Mark Needham

July 23rd, 2014 at 10:20 pm

Posted in Java

Tagged with

Jersey/Jax RS: Streaming JSON

without comments

About a year ago I wrote a blog post showing how to stream a HTTP response using Jersey/Jax RS and I recently wanted to do the same thing but this time using JSON.

A common pattern is to take our Java object and get a JSON string representation of that but that isn’t the most efficient use of memory because we now have the Java object and a string representation.

This is particularly problematic if we need to return a lot of the data in a response.

By writing a little bit more code we can get our response to stream to the client as soon as some of it is ready rather than building the whole result and sending it all in one go:

@Path("/resource")
public class MadeUpResource
{
    private final ObjectMapper objectMapper;
 
    public MadeUpResource() {
        objectMapper = new ObjectMapper();
    }
 
    @GET
    @Produces(MediaType.APPLICATION_JSON)
    public Response loadHierarchy(@PathParam( "pkPerson" ) String pkPerson) {
        final Map<Integer, String> people  = new HashMap<>();
        people.put(1, "Michael");
        people.put(2, "Mark");
 
        StreamingOutput stream = new StreamingOutput() {
            @Override
            public void write(OutputStream os) throws IOException, WebApplicationException
            {
                JsonGenerator jg = objectMapper.getJsonFactory().createJsonGenerator( os, JsonEncoding.UTF8 );
                jg.writeStartArray();
 
                for ( Map.Entry<Integer, String> person : people.entrySet()  )
                {
                    jg.writeStartObject();
                    jg.writeFieldName( "id" );
                    jg.writeString( person.getKey().toString() );
                    jg.writeFieldName( "name" );
                    jg.writeString( person.getValue() );
                    jg.writeEndObject();
                }
                jg.writeEndArray();
 
                jg.flush();
                jg.close();
            }
        };
 
 
        return Response.ok().entity( stream ).type( MediaType.APPLICATION_JSON ).build()    ;
    }
}

If we run that this is the output we’d see:

[{"id":"1","name":"Michael"},{"id":"2","name":"Mark"}]

It’s a simple example but hopefully it’s easy to see how we could translate that if we wanted to stream more complex data.

Written by Mark Needham

April 30th, 2014 at 1:24 am

Posted in Java

Tagged with ,

Java 8: Lambda Expressions vs Auto Closeable

with 3 comments

If you used earlier versions of Neo4j via its Java API with Java 6 you probably have code similar to the following to ensure write operations happen within a transaction:

public class StylesOfTx
{
    public static void main( String[] args ) throws IOException
    {
        String path = "/tmp/tx-style-test";
        FileUtils.deleteRecursively(new File(path));
 
        GraphDatabaseService db = new GraphDatabaseFactory().newEmbeddedDatabase( path );
 
        Transaction tx = db.beginTx();
        try 
        {
            db.createNode();
            tx.success();
        } 
        finally 
        {
            tx.close();
        }
    }
}

In Neo4j 2.0 Transaction started extending AutoCloseable which meant that you could use ‘try with resources’ and the ‘close’ method would be automatically called when the block finished:

public class StylesOfTx
{
    public static void main( String[] args ) throws IOException
    {
        String path = "/tmp/tx-style-test";
        FileUtils.deleteRecursively(new File(path));
 
        GraphDatabaseService db = new GraphDatabaseFactory().newEmbeddedDatabase( path );
 
        try ( Transaction tx = db.beginTx() )
        {
            Node node = db.createNode();
            tx.success();
        }
    }
}

This works quite well although it’s still possible to have transactions hanging around in an application when people don’t use this syntax – the old style is still permissible.

In Venkat Subramaniam’s Java 8 book he suggests an alternative approach where we use a lambda based approach:

public class StylesOfTx
{
    public static void main( String[] args ) throws IOException
    {
        String path = "/tmp/tx-style-test";
        FileUtils.deleteRecursively(new File(path));
 
        GraphDatabaseService db = new GraphDatabaseFactory().newEmbeddedDatabase( path );
 
        Db.withinTransaction(db, neo4jDb -> {
            Node node = neo4jDb.createNode();
        });
    }
 
    static class Db {
        public static void withinTransaction(GraphDatabaseService db, Consumer<GraphDatabaseService> fn) {
            try ( Transaction tx = db.beginTx() )
            {
                fn.accept(db);
                tx.success();
            }
        }
    }
}

The ‘withinTransaction’ function would actually go on GraphDatabaseService or similar rather than being on that Db class but it was easier to put it on there for this example.

A disadvantage of this style is that you don’t have explicit control over the transaction for handling the failure case – it’s assumed that if ‘tx.success()’ isn’t called then the transaction failed and it’s rolled back. I’m not sure what % of use cases actually need such fine grained control though.

Brian Hurt refers to this as the ‘hole in the middle pattern‘ and I imagine we’ll start seeing more code of this ilk once Java 8 is released and becomes more widely used.

Written by Mark Needham

February 26th, 2014 at 7:32 am

Posted in Java

Tagged with

Jersey: Ignoring SSL certificate – javax.net.ssl.SSLHandshakeException: java.security.cert.CertificateException

without comments

Last week Alistair and I were working on an internal application and we needed to make a HTTPS request directly to an AWS machine using a certificate signed to a different host.

We use jersey-client so our code looked something like this:

Client client = Client.create();
 
client.resource("https://some-aws-host.compute-1.amazonaws.com").post();
// and so on

When we ran this we predictably ran into trouble:

com.sun.jersey.api.client.ClientHandlerException: javax.net.ssl.SSLHandshakeException: java.security.cert.CertificateException: No subject alternative DNS name matching some-aws-host.compute-1.amazonaws.com found.
	at com.sun.jersey.client.urlconnection.URLConnectionClientHandler.handle(URLConnectionClientHandler.java:149)
	at com.sun.jersey.api.client.Client.handle(Client.java:648)
	at com.sun.jersey.api.client.WebResource.handle(WebResource.java:670)
	at com.sun.jersey.api.client.WebResource.post(WebResource.java:241)
	at com.neotechnology.testlab.manager.bootstrap.ManagerAdmin.takeBackup(ManagerAdmin.java:33)
	at com.neotechnology.testlab.manager.bootstrap.ManagerAdminTest.foo(ManagerAdminTest.java:11)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:45)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:42)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:68)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:47)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:300)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:157)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:74)
	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:202)
	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:65)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
Caused by: javax.net.ssl.SSLHandshakeException: java.security.cert.CertificateException: No subject alternative DNS name matching some-aws-host.compute-1.amazonaws.com found.
	at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)
	at sun.security.ssl.SSLSocketImpl.fatal(SSLSocketImpl.java:1884)
	at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:276)
	at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:270)
	at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1341)
	at sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:153)
	at sun.security.ssl.Handshaker.processLoop(Handshaker.java:868)
	at sun.security.ssl.Handshaker.process_record(Handshaker.java:804)
	at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:1016)
	at sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1312)
	at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1339)
	at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1323)
	at sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:563)
	at sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:185)
	at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1300)
	at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:468)
	at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338)
	at com.sun.jersey.client.urlconnection.URLConnectionClientHandler._invoke(URLConnectionClientHandler.java:240)
	at com.sun.jersey.client.urlconnection.URLConnectionClientHandler.handle(URLConnectionClientHandler.java:147)
	... 31 more
Caused by: java.security.cert.CertificateException: No subject alternative DNS name matching some-aws-host.compute-1.amazonaws.com found.
	at sun.security.util.HostnameChecker.matchDNS(HostnameChecker.java:191)
	at sun.security.util.HostnameChecker.match(HostnameChecker.java:93)
	at sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:347)
	at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:203)
	at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:126)
	at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1323)
	... 45 more

We figured that we needed to get our client to ignore the certificate and came across this Stack Overflow thread which had some suggestions on how to do this.

None of the suggestions worked on their own but we ended up with a combination of a couple of the suggestions which did the trick:

public Client hostIgnoringClient() {
    try
    {
        SSLContext sslcontext = SSLContext.getInstance( "TLS" );
        sslcontext.init( null, null, null );
        DefaultClientConfig config = new DefaultClientConfig();
        Map<String, Object> properties = config.getProperties();
        HTTPSProperties httpsProperties = new HTTPSProperties(
                new HostnameVerifier()
                {
                    @Override
                    public boolean verify( String s, SSLSession sslSession )
                    {
                        return true;
                    }
                }, sslcontext
        );
        properties.put( HTTPSProperties.PROPERTY_HTTPS_PROPERTIES, httpsProperties );
        config.getClasses().add( JacksonJsonProvider.class );
        return Client.create( config );
    }
    catch ( KeyManagementException | NoSuchAlgorithmException e )
    {
        throw new RuntimeException( e );
    }
}

You’re welcome Future Mark.

Written by Mark Needham

February 26th, 2014 at 12:12 am

Posted in Java

Tagged with

Java 8: Group by with collections

without comments

In my continued reading of Venkat Subramaniam’s ‘Functional Programming in Java‘ I’ve reached the part of the book where the Stream#collect function is introduced.

We want to take a collection of people, group them by age and return a map of (age -> people’s names) for which this comes in handy.

To refresh, this is what the Person class looks like:

static class Person {
    private String name;
    private int age;
 
    Person(String name, int age) {
 
        this.name = name;
        this.age = age;
    }
 
    @Override
    public String toString() {
        return String.format("Person{name='%s', age=%d}", name, age);
    }
}

And we can write the following code in Java 8 to get a map of people’s names grouped by age:

Stream<Person> people = Stream.of(new Person("Paul", 24), new Person("Mark", 30), new Person("Will", 28));
Map<Integer, List<String>> peopleByAge = people
    .collect(groupingBy(p -> p.age, mapping((Person p) -> p.name, toList())));
System.out.println(peopleByAge);
{24=[Paul], 28=[Will], 30=[Mark]}

We’re running the ‘collect’ function over the collection, grouping by the ‘age’ property as we go and grouping the names of people rather than the people themselves.

This is a little bit different to what you’d do in Ruby where there’s a ‘group_by’ function which you can call on a collection:

> people = [ {:name => "Paul", :age => 24}, {:name => "Mark", :age => 30}, {:name => "Will", :age => 28}]
> people.group_by { |p| p[:age] }
=> {24=>[{:name=>"Paul", :age=>24}], 30=>[{:name=>"Mark", :age=>30}], 28=>[{:name=>"Will", :age=>28}]}

This gives us back lists of people grouped by age but we need to apply an additional ‘map’ operation to change that to be a list of names instead:

> people.group_by { |p| p[:age] }.map { |k,v| [k, v.map { |person| person[:name] } ] }
=> [[24, ["Paul"]], [30, ["Mark"]], [28, ["Will"]]]

At this stage we’ve got an array of (age, names) pairs but luckily Ruby 2.1.0 has a function ‘to_h’ which we can call to get back to a hash again:

> people.group_by { |p| p[:age] }.map { |k,v| [k, v.map { |person| person[:name] } ] }.to_h
=> {24=>["Paul"], 30=>["Mark"], 28=>["Will"]}

If we want to follow the Java approach of grouping by a property while running a reduce over the collection we’d have something like the following:

> people.reduce({}) { |acc, item| acc[item[:age]] ||=[]; acc[item[:age]] << item[:name]; acc }
=> {24=>["Paul"], 30=>["Mark"], 28=>["Will"]}

If we’re using Clojure then we might end up with something like this instead:

(def people
  [{:name "Paul", :age 24} {:name "Mark", :age 30} {:name "Will", :age 28}])
 
> (reduce (fn [acc [k v]] (assoc-in acc [k] (map :name v))) {} (group-by :age people))
{28 ("Will"), 30 ("Mark"), 24 ("Paul")}

I thought the Java version looked a bit weird to begin with but it’s actually not too bad having worked through the problem in a couple of other languages.

It’d be good to know whether there’s a better way of doing this the Ruby/Clojure way though!

Written by Mark Needham

February 23rd, 2014 at 7:16 pm

Posted in Java

Tagged with

Java 8: Sorting values in collections

without comments

Having realised that Java 8 is due for its GA release within the next few weeks I thought it was about time I had a look at it and over the last week have been reading Venkat Subramaniam’s book.

I’m up to chapter 3 which covers sorting a collection of people. The Person class is defined roughly like so:

static class Person {
    private String name;
    private int age;
 
    Person(String name, int age) {
        this.name = name;
        this.age = age;
    }
 
    @Override
    public String toString() {
        return String.format("Person{name='%s', age=%d}", name, age);
    }
}

In the first example we take a list of people and then sort them in ascending age order:

List<Person> people = Arrays.asList(new Person("Paul", 24), new Person("Mark", 30), new Person("Will", 28));
people.stream().sorted((p1, p2) -> p1.age - p2.age).forEach(System.out::println);
Person{name='Paul', age=24}
Person{name='Will', age=28}
Person{name='Mark', age=30}

If we were to write a function to do the same thing in Java 7 it’d look like this:

Collections.sort(people, new Comparator<Person>() {
    @Override
    public int compare(Person o1, Person o2) {
        return o1.age - o2.age;
    }
});
 
for (Person person : people) {
    System.out.println(person);
}

Java 8 has reduced the amount of code we have to write although it’s still more complicated than what we could do in Ruby:

> people = [ {:name => "Paul", :age => 24}, {:name => "Mark", :age => 30}, {:name => "Will", :age => 28}]
> people.sort_by { |p| p[:age] }
=> [{:name=>"Paul", :age=>24}, {:name=>"Will", :age=>28}, {:name=>"Mark", :age=>30}]

A few pages later Venkat shows how you can get close to this by using the Comparator#comparing function:

Function<Person, Integer> byAge = p -> p.age ;
people.stream().sorted(comparing(byAge)).forEach(System.out::println);

I thought I could make this simpler by inlining the ‘byAge’ lambda like this:

people.stream().sorted(comparing(p -> p.age)).forEach(System.out::println);

This seems to compile and run correctly although IntelliJ 13.0 suggests there is a ‘cyclic inference‘ problem. IntelliJ is happy if we explicitly cast the lambda like this:

people.stream().sorted(comparing((Function<Person, Integer>) p -> p.age)).forEach(System.out::println);

IntelliJ also seems happy if we explicitly type ‘p’ in the lambda, so I think I’ll go with that for the moment:

people.stream().sorted(comparing((Person p) -> p.age)).forEach(System.out::println);

Written by Mark Needham

February 23rd, 2014 at 2:43 pm

Posted in Java

Tagged with

Java: Handling a RuntimeException in a Runnable

with 2 comments

At the end of last year I was playing around with running scheduled tasks to monitor a Neo4j cluster and one of the problems I ran into was that the monitoring would sometimes exit.

I eventually realised that this was because a RuntimeException was being thrown inside the Runnable method and I wasn’t handling it. The following code demonstrates the problem:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
 
public class RunnableBlog {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
 
        executor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                    System.out.println(Thread.currentThread().getName() + " -> " + System.currentTimeMillis());
                    throw new RuntimeException("game over");
            }
        }, 0, 1000, TimeUnit.MILLISECONDS).get();
 
 
        System.out.println("exit");
        executor.shutdown();
    }
}

If we run that code we’ll see the RuntimeException but the executor won’t exit because the thread died without informing it:

Exception in thread "main" pool-1-thread-1 -> 1391212558074
java.util.concurrent.ExecutionException: java.lang.RuntimeException: game over
	at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:252)
	at java.util.concurrent.FutureTask.get(FutureTask.java:111)
	at RunnableBlog.main(RunnableBlog.java:11)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:601)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
Caused by: java.lang.RuntimeException: game over
	at RunnableBlog$1.run(RunnableBlog.java:16)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
	at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
	at java.lang.Thread.run(Thread.java:722)

At the time I ended up adding a try catch block and printing the exception like so:

public class RunnableBlog {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
 
        executor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName() + " -> " + System.currentTimeMillis());
                    throw new RuntimeException("game over");
                } catch (RuntimeException e) {
                    e.printStackTrace();
                }
            }
        }, 0, 1000, TimeUnit.MILLISECONDS).get();
 
        System.out.println("exit");
        executor.shutdown();
    }
}

This allows the exception to be recognised and as far as I can tell means that the thread executing the Runnable doesn’t die.

java.lang.RuntimeException: game over
pool-1-thread-1 -> 1391212651955
	at RunnableBlog$1.run(RunnableBlog.java:16)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
	at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
	at java.lang.Thread.run(Thread.java:722)
pool-1-thread-1 -> 1391212652956
java.lang.RuntimeException: game over
	at RunnableBlog$1.run(RunnableBlog.java:16)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
	at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
	at java.lang.Thread.run(Thread.java:722)
pool-1-thread-1 -> 1391212653955
java.lang.RuntimeException: game over
	at RunnableBlog$1.run(RunnableBlog.java:16)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
	at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
	at java.lang.Thread.run(Thread.java:722)

This worked well and allowed me to keep monitoring the cluster.

However, I recently started reading ‘Java Concurrency in Practice‘ (only 6 years after I bought it!) and realised that this might not be the proper way of handling the RuntimeException.

public class RunnableBlog {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
 
        executor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName() + " -> " + System.currentTimeMillis());
                    throw new RuntimeException("game over");
                } catch (RuntimeException e) {
                    Thread t = Thread.currentThread();
                    t.getUncaughtExceptionHandler().uncaughtException(t, e);
                }
            }
        }, 0, 1000, TimeUnit.MILLISECONDS).get();
 
        System.out.println("exit");
        executor.shutdown();
    }
}

I don’t see much difference between the two approaches so it’d be great if someone could explain to me why this approach is better than my previous one of catching the exception and printing the stack trace.

Written by Mark Needham

January 31st, 2014 at 11:59 pm

Posted in Java

Tagged with

Java: Work out the serialVersionUID of a class

without comments

Earlier in the week I wanted to work out the serialVersionUID of a serializable class so that I could override its toString method without breaking everything.

I came across Frank Kim’s blog post which suggested using the serialver tool which comes with the JDK.

I created a little Maven project to test this tool out on a very simple class:

import java.io.Serializable;
 
public class SerialiseMe implements Serializable
{
 
}

If we compile that class into a JAR and then run the serialver tool we see the following output:

$ serialver -classpath target/serialiser-0.0.1-SNAPSHOT.jar SerialiseMe
SerialiseMe:    static final long serialVersionUID = -6060222249255158490L;

I wanted to quickly confirm that I could serialise and deserialise this class using this value so I wrote the following bit of code to serialise the class (when it didn’t have a serial version UID):

public class Serialiser
{
    public static void main( String[] args ) throws IOException, ClassNotFoundException
    {
        ByteArrayOutputStream bout = new ByteArrayOutputStream(  );
        ObjectOutputStream oout = new ObjectOutputStream( bout );
 
        Object value = new SerialiseMe();
 
        oout.writeObject( value );
        oout.close();
        byte[] bytes = bout.toByteArray();
 
        FileOutputStream fileOuputStream = new FileOutputStream("/tmp/foo.txt");
        fileOuputStream.write(bytes);
        fileOuputStream.close();
    }
}

After I’d done that, I wrote the following bit of code to deserialise the file:

public class Deserialiser
{
    public static void main( String[] args ) throws IOException, ClassNotFoundException
    {
        FileInputStream fileInputStream = new FileInputStream( new File( "/tmp/foo.txt" ) );
        byte[] bytes = IOUtils.toByteArray( fileInputStream );
 
        ByteArrayInputStream in = new ByteArrayInputStream( bytes, 0, bytes.length );
        ObjectInputStream oin = new ObjectInputStream( in );
        Object object = oin.readObject();
    }
}

I plugged the serial version UID into the class and was able to deserialise it correctly. I tried changing one of the digits just to check it would blow up and indeed it did:

import java.io.Serializable;
 
public class SerialiseMe implements Serializable
{
    static final long serialVersionUID = -6060222249255158491L;
}
Exception in thread "main" java.io.InvalidClassException: SerialiseMe; local class incompatible: stream classdesc serialVersionUID = -6060222249255158490, local class serialVersionUID = -6060222249255158491
	at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:604)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1515)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1769)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
	at Deserialiser.main(Deserialiser.java:18)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:601)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)

serialver #ftw!

Written by Mark Needham

January 31st, 2014 at 6:51 am

Posted in Java

Tagged with

RxJava: From Future to Observable

without comments

I first came across Reactive Extensions about 4 years ago on Matthew Podwysocki’s blog but then haven’t heard much about it until I saw Matthew give a talk at Code Mesh a few weeks ago.

It seems to have grown in popularity recently and I noticed that’s there’s now a Java version called RxJava written by Netflix.

I thought I’d give it a try by changing some code I wrote while exploring cypher’s MERGE function to expose an Observable instead of Futures.

To recap, we have 50 threads and we do 100 iterations where we create random (user, event) pairs. We create a maximum of 10 users and 50 events and the goal is to concurrently send requests for the same pairs.

In the example of my other post I was throwing away the result of each query whereas here I returned the result back so I had something to subscribe to.

The outline of the code looks like this:

public class MergeTimeRx
{
    public static void main( final String[] args ) throws InterruptedException, IOException
    {
        String pathToDb = "/tmp/foo";
        FileUtils.deleteRecursively( new File( pathToDb ) );
 
        GraphDatabaseService db = new GraphDatabaseFactory().newEmbeddedDatabase( pathToDb );
        final ExecutionEngine engine = new ExecutionEngine( db );
 
        int numberOfThreads = 50;
        int numberOfUsers = 10;
        int numberOfEvents = 50;
        int iterations = 100;
 
        Observable<ExecutionResult> events = processEvents( engine, numberOfUsers, numberOfEvents, numberOfThreads, iterations );
 
        events.subscribe( new Action1<ExecutionResult>()
        {
            @Override
            public void call( ExecutionResult result )
            {
                for ( Map<String, Object> row : result )
                {
                }
            }
        } );
 
        ....
    }
 
}

The nice thing about using RxJava is that there’s no mention of how we got our collection of ExecutionResults, it’s not important. We just have a stream of them and by calling the subscribe function on the Observable we’ll be informed whenever another one is made available.

Most of the examples I found show how to generate events from a single thread but I wanted to use a thread pool so that I could fire off lots of requests at the same time. The processEvents method ended up looking like this:

    private static Observable<ExecutionResult> processEvents( final ExecutionEngine engine, final int numberOfUsers, final int numberOfEvents, final int numberOfThreads, final int iterations )
    {
        final Random random = new Random();
        final List<Integer> userIds = generateIds( numberOfUsers );
        final List<Integer> eventIds = generateIds( numberOfEvents );
 
        return Observable.create( new Observable.OnSubscribeFunc<ExecutionResult>()
        {
            @Override
            public Subscription onSubscribe( final Observer<? super ExecutionResult> observer )
            {
                final ExecutorService executor = Executors.newFixedThreadPool( numberOfThreads );
 
                List<Future<ExecutionResult>> jobs = new ArrayList<>();
                for ( int i = 0; i < iterations; i++ )
                {
                    Future<ExecutionResult> job = executor.submit( new Callable<ExecutionResult>()
                    {
                        @Override
                        public ExecutionResult call()
                        {
                            Integer userId = userIds.get( random.nextInt( numberOfUsers ) );
                            Integer eventId = eventIds.get( random.nextInt( numberOfEvents ) );
 
                            return engine.execute(
                                    "MERGE (u:User {id: {userId}})\n" +
                                    "MERGE (e:Event {id: {eventId}})\n" +
                                    "MERGE (u)-[:HAS_EVENT]->(e)\n" +
                                    "RETURN u, e",
                                    MapUtil.map( "userId", userId, "eventId", eventId ) );
                        }
                    } );
                    jobs.add( job );
                }
 
                for ( Future<ExecutionResult> future : jobs )
                {
                    try
                    {
                        observer.onNext( future.get() );
                    }
                    catch ( InterruptedException | ExecutionException ignored )
                    {
                    }
                }
 
                observer.onCompleted();
                executor.shutdown();
 
                return Subscriptions.empty();
            }
        } );
    }

I’m not sure if that’s the correct way of using Observables so please let me know in the comments if I’ve got it wrong.

I wasn’t sure what the proper way of handling errors was. I initially had a call to observer#onError in the catch block but that means that no further events are produced which wasn’t what I wanted.

The code is available as a gist if you want to play around with it. I added the following dependency to get the RxJava library:

    <dependency>
      <groupId>com.netflix.rxjava</groupId>
      <artifactId>rxjava-core</artifactId>
      <version>0.15.1</version>
    </dependency>

Written by Mark Needham

December 28th, 2013 at 9:46 pm

Posted in Java

Tagged with

Java: Schedule a job to run on a time interval

with 2 comments

Recently I’ve spent some time building a set of tests around rolling upgrades between Neo4j versions and as part of that I wanted to log the state of the cluster as the upgrade was happening.

The main thread of the test blocks waiting until the upgrade is done so I wanted to log on another thread every few seconds. Alistair pointed me at the ScheduledExecutorService which worked quite nicely.

I ended up with a test which looked roughly like this:

public class MyUpgradeTest {
    @Test
    public void shouldUpgradeFromOneVersionToAnother() throws InterruptedException
    {
        ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        scheduledExecutorService.scheduleAtFixedRate( new LogAllTheThings(), 0, 1, TimeUnit.SECONDS );
 
        Thread.sleep(10000);
        // do upgrade of cluster
        scheduledExecutorService.shutdown();
    }
 
    static class LogAllTheThings implements Runnable
    {
        @Override
        public void run()
        {
            Date time = new Date( System.currentTimeMillis() );
 
            try
            {
                Map<String, Object> masterProperties = selectedProperties( client(), URI.create( "http://localhost:7474/" ) );
                System.out.println( String.format( "%s: %s", time, masterProperties ) );
            }
            catch ( Exception ignored )
            {
                ignored.printStackTrace();
            }
        }
 
        private static Client client()
        {
            DefaultClientConfig defaultClientConfig = new DefaultClientConfig();
            defaultClientConfig.getClasses().add( JacksonJsonProvider.class );
            return Client.create( defaultClientConfig );
        }
 
        public static Map<String, Object> selectedProperties( Client client, URI uri )
        {
            Map<String, Object> jmxProperties = new HashMap<String, Object>();
 
            ArrayNode transactionsProperties = jmxBean( client, uri, "org.neo4j/instance%3Dkernel%230%2Cname%3DTransactions" );
            addProperty( jmxProperties, transactionsProperties, "LastCommittedTxId" );
 
            ArrayNode kernelProperties = jmxBean( client, uri, "org.neo4j/instance%3Dkernel%230%2Cname%3DKernel" );
            addProperty( jmxProperties, kernelProperties, "KernelVersion" );
 
            ArrayNode haProperties = jmxBean( client, uri, "org.neo4j/instance%3Dkernel%230%2Cname%3DHigh+Availability" );
            addProperty( jmxProperties, haProperties, "Role" );
            addProperty( jmxProperties, haProperties, "InstanceId" );
 
            return jmxProperties;
        }
 
        private static void addProperty( Map<String, Object> jmxProperties, ArrayNode properties, String propertyName )
        {
            jmxProperties.put( propertyName, getProperty( properties, propertyName ) );
        }
 
        private static String getProperty( ArrayNode properties, String propertyName )
        {
            for ( JsonNode property : properties )
            {
                if ( property.get( "name" ).asText().equals( propertyName ) )
                {
                    return property.get( "value" ).asText();
                }
            }
 
            throw new RuntimeException( "Could not find requested property: " + propertyName );
        }
 
        private static ArrayNode jmxBean( Client client, URI uri, String beanExtension )
        {
            ClientResponse clientResponse = client
                    .resource( uri + "db/manage/server/jmx/domain/" + beanExtension )
                    .accept( MediaType.APPLICATION_JSON )
                    .get( ClientResponse.class );
 
            JsonNode transactionsBean = clientResponse.getEntity( JsonNode.class );
            return (ArrayNode) transactionsBean.get( 0 ).get( "attributes" );
        }
    }
}

LogAllTheThings gets called once every second and it logs the KernelVersion, InstanceId, LastCommittedTxId and Role which Neo4j server exposes as JMX properties.

If we run that against a local Neo4j cluster we’d see something like the following:

Sun Nov 17 22:31:55 GMT 2013: {KernelVersion=Neo4j - Graph Database Kernel 2.0.0-M06, InstanceId=1, LastCommittedTxId=18, Role=master}
Sun Nov 17 22:31:56 GMT 2013: {KernelVersion=Neo4j - Graph Database Kernel 2.0.0-M06, InstanceId=1, LastCommittedTxId=18, Role=master}
Sun Nov 17 22:31:57 GMT 2013: {KernelVersion=Neo4j - Graph Database Kernel 2.0.0-M06, InstanceId=1, LastCommittedTxId=18, Role=master}
Sun Nov 17 22:31:58 GMT 2013: {KernelVersion=Neo4j - Graph Database Kernel 2.0.0-M06, InstanceId=1, LastCommittedTxId=18, Role=master}
Sun Nov 17 22:31:59 GMT 2013: {KernelVersion=Neo4j - Graph Database Kernel 2.0.0-M06, InstanceId=1, LastCommittedTxId=18, Role=master}
...
removed for brevity

The next step was to get the properties of all the members of the cluster at the same time and to do that we can introduce another ExecutorService which has a thread pool of 3 so that it will evaluate each machine (at least close to) simultaneously:

    static class LogAllTheThings implements Runnable
    {
        private ExecutorService executorService = Executors.newFixedThreadPool( 3 );
 
        @Override
        public void run()
        {
            List<URI> machines = new ArrayList<>(  );
            machines.add(URI.create( "http://localhost:7474/" ));
            machines.add(URI.create( "http://localhost:7484/" ));
            machines.add(URI.create( "http://localhost:7494/" ));
 
            Map<URI, Future<Map<String, Object>>> futureJmxProperties = new HashMap<>(  );
            for ( final URI machine : machines )
            {
                Future<Map<String, Object>> futureProperties = executorService.submit( new Callable<Map<String, Object>>()
                {
                    @Override
                    public Map<String, Object> call() throws Exception
                    {
                        try
                        {
                            return selectedProperties( client(), machine );
                        }
                        catch ( Exception ignored )
                        {
                            ignored.printStackTrace();
                            return new HashMap<>();
                        }
                    }
                } );
 
                futureJmxProperties.put( machine, futureProperties );
            }
 
            Date time = new Date( System.currentTimeMillis() );
            System.out.println( time );
            for ( Map.Entry<URI, Future<Map<String, Object>>> uriFutureEntry : futureJmxProperties.entrySet() )
            {
                try
                {
                    System.out.println( "==> " + uriFutureEntry.getValue().get() );
                }
                catch ( Exception ignored )
                {
 
                }
            }
        }
 
        // other methods the same as above
    }

We submit each job to the ExecutorService and receive back a Future which we store in a map before retrieving its result later on. If we run that we’ll see the following output:

Sun Nov 17 22:49:58 GMT 2013
==> {KernelVersion=Neo4j - Graph Database Kernel 2.0.0-M06, InstanceId=1, LastCommittedTxId=18, Role=master}
==> {KernelVersion=Neo4j - Graph Database Kernel 2.0.0-M06, InstanceId=2, LastCommittedTxId=18, Role=slave}
==> {KernelVersion=Neo4j - Graph Database Kernel 2.0.0-M06, InstanceId=3, LastCommittedTxId=18, Role=slave}
Sun Nov 17 22:49:59 GMT 2013
==> {KernelVersion=Neo4j - Graph Database Kernel 2.0.0-M06, InstanceId=1, LastCommittedTxId=18, Role=master}
==> {KernelVersion=Neo4j - Graph Database Kernel 2.0.0-M06, InstanceId=2, LastCommittedTxId=18, Role=slave}
==> {KernelVersion=Neo4j - Graph Database Kernel 2.0.0-M06, InstanceId=3, LastCommittedTxId=18, Role=slave}
Sun Nov 17 22:50:00 GMT 2013
==> {KernelVersion=Neo4j - Graph Database Kernel 2.0.0-M06, InstanceId=1, LastCommittedTxId=18, Role=master}
==> {KernelVersion=Neo4j - Graph Database Kernel 2.0.0-M06, InstanceId=2, LastCommittedTxId=18, Role=slave}
==> {KernelVersion=Neo4j - Graph Database Kernel 2.0.0-M06, InstanceId=3, LastCommittedTxId=18, Role=slave}
 
...
removed for brevity

Overall the approach works quite well although I’m always open to learning of a better way if there is one!

Written by Mark Needham

November 17th, 2013 at 10:58 pm

Posted in Java

Tagged with