Archive for the ‘algorithms’ tag
BellmanFord algorithm in Python using vectorisation/numpy
I recently wrote about an implementation of the Bellman Ford shortest path algorithm and concluded by saying that it took 27 seconds to calculate the shortest path in the graph for any node.
This seemed a bit slow and while browsing the Coursera forums I came across a suggestion that the algorithm would run much more quickly if we used vectorization with numpy rather than nested for loops.
Vectorisation refers to a problem solving approach where we make use of matrices operations which is what numpy allows us to do.
To refresh, the core of the original algorithm reads like this:
for i in range(1, vertices): for v in range(0, vertices): previous_cache = cache least_adjacent_cost = calculate_least_adjacent_cost(adjacency_list, i, v, previous_cache) cache[v] = min(previous_cache[v], least_adjacent_cost) # detecting negative cycles for v in range(0, vertices): previous_cache = copy.deepcopy(cache) least_adjacent_cost = calculate_least_adjacent_cost(adjacency_list, i, v, previous_cache) cache[v] = min(previous_cache[v], least_adjacent_cost) if(not cache == previous_cache): raise Exception("negative cycle detected") shortest_path = min(cache) 
We want to try and simplify the first two lines where we have for loops through i and v.
I couldn’t see a way to apply matrix operations to the i loop since each iteration of i makes use of the result of the previous iteration but with the v loop the calculation of the shortest path for each vertex is independent of the calculation for other vertices so we can vectorise this bit of code.
We want to try and get to the point where we can use numpy’s minimum function where we’d pass in an array representing the previous iteration of i and an array that represents the newly calculated cost for each vertex v.
# We want to get to this point previous_cache = cache[:] # here we copy the contents of cache into previous_cache cache = minimum(previous_cache, new_shortest_paths) 
It was initially a bit tricky to see how we could go about this but after sketching it out on paper it became clear that we needed to add the values in previous_cache to the weights of the edges between different vertices and then find the minimum combined value for each vertex.
It seemed much easier to achieve this if we used an adjacency matrix rather than an adjacency list to represent the graph and if we do that then the following example shows how we’d go about vectorising the algorithm.
If our previous_cache had the following values:
1 2 3 4 5 6 # these are the previous shortest paths for vertex 0,1…,n 
And our adjacency matrix looked like this:
inf inf 4 inf inf inf # edges for vertex 0 2 inf inf inf inf inf # edges for vertex 1 inf 1 inf inf inf inf # and so on... inf inf 2 inf inf 1 inf inf 3 inf inf 4 inf inf inf inf inf inf 
where the numeric values represent the edge weights between vertices and those with a value of ‘inf’ don’t have a direct edge we’d except the initial combination of these two data structures to look like this:
inf inf 5 inf inf inf # edges for vertex 0 0 inf inf inf inf inf # edges for vertex 1 inf 2 inf inf inf inf # and so on... inf inf 6 inf inf 5 inf inf 2 inf inf 1 inf inf inf inf inf inf 
where 1 has been added to every value in the first row, 2 has been added to every value in the second row and so on.
We can achieve that with the following code:
>>> previous = arange(1,7).reshape((1,6)) >>> previous array([[1, 2, 3, 4, 5, 6]]) >>> adjacency_matrix = x = array([[inf,inf,4,inf,inf,inf],[2,inf,inf,inf,inf,inf],[inf,1,inf,inf,inf,inf],[inf,inf,2,inf,inf,1],[inf,inf,3,inf,inf,4],[inf,inf,inf,inf,inf,inf]]) >>> adjacency_matrix array([[ inf, inf, 4., inf, inf, inf], [ 2., inf, inf, inf, inf, inf], [ inf, 1., inf, inf, inf, inf], [ inf, inf, 2., inf, inf, 1.], [ inf, inf, 3., inf, inf, 4.], [ inf, inf, inf, inf, inf, inf]]) >>> previous.T + adjacency_matrix array([[ inf, inf, 5., inf, inf, inf], [ 0., inf, inf, inf, inf, inf], [ inf, 2., inf, inf, inf, inf], [ inf, inf, 6., inf, inf, 5.], [ inf, inf, 2., inf, inf, 1.], [ inf, inf, inf, inf, inf, inf]]) 
Here we used the transpose function to get our previous variable in the right shape so we could apply its first value to every item in the first row of adjacency_matrix its second value to every item in the second row and so on.
What we actually want is the shortest path for each vertex so we need to take the minimum value from each row for which the min function comes in handy.
>>> result = previous.T + adjacency_matrix >>> result.min(axis=1) array([ 5., 0., 2., 5., 1., inf]) 
We have to tell it to apply itself to each row by passing ‘axis=1’ otherwise it will just take the minimum value of the whole matrix.
Now to get our newly calculated cache we just need to combine our previous value with this new one:
>>> previous array([[1, 2, 3, 4, 5, 6]]) >>> result.min(axis=1) array([ 5., 0., 2., 5., 1., inf]) >>> minimum(previous, result.min(axis=1)) array([[ 1., 0., 2., 4., 1., 6.]]) 
Now if we put this into our algorithm it ends up looking like this:
adjacency_matrix = zeros((vertices, vertices)) adjacency_matrix[:] = float("inf") for line in file.readlines(): tail, head, weight = line.split(" ") adjacency_matrix[int(head)1][int(tail)1] = int(weight) def initialise_cache(vertices, s): cache = empty(vertices) cache[:] = float("inf") cache[s] = 0 return cache cache = initialise_cache(vertices, 0) for i in range(1, vertices): previous_cache = cache[:] combined = (previous_cache.T + adjacency_matrix).min(axis=1) cache = minimum(previous_cache, combined) # checking for negative cycles previous_cache = cache[:] combined = (previous_cache.T + adjacency_matrix).min(axis=1) cache = minimum(previous_cache, combined) if(not alltrue(cache == previous_cache)): raise Exception("negative cycle detected") 
The only numpy function that’s new is alltrue which is used to check whether every value of two arrays is the same.
The code is on github and the running time is now down from 27 seconds to 5 seconds per shortest path which is pretty cool I think!
BellmanFord algorithm in Python
The latest problem of the Algorithms 2 class required us to write an algorithm to calculate the shortest path between two nodes on a graph and one algorithm which allows us to do this is BellmanFord.
BellmanFord computes the single source shortest path which means that if we have a 5 vertex graph we’d need to run it 5 times to find the shortest path for each vertex and then find the shortest paths of those shortest paths.
One nice thing about BellmanFord compared to Djikstra is that it’s able to handle negative edge weights.
The pseudocode of the algorithm is as follows:
 Let A = 2D array of size n * n where n is the number of vertices
 Initialise A[0,s] = 0; A[0,v] = infinity for all v ≠ s where s is the node we’re finding the shortest path for

for i=1,2,3,…n1
 for each v ∈ V
 A[i,v] = min { A[i1, v],
min (w,v) ∈ E { A[k1, w] + Cost_{wv} }
 }  where (w,v) are the incoming edges of vertex v
 A[i,v] = min { A[i1, v],
 for each v ∈ V
 check for negative cycles by running one more time and checking A[n] = A[n1]
 If they are equal then return A[n1] otherwise report a negative cycle.
My first version looked like this:
import os file = open(os.path.dirname(os.path.realpath(__file__)) + "/g_small.txt") vertices, edges = map(lambda x: int(x), file.readline().replace("\n", "").split(" ")) adjacency_list = [[] for k in xrange(vertices)] for line in file.readlines(): tail, head, weight = line.split(" ") adjacency_list[int(head)1].append({"from" : int(tail), "weight" : int(weight)}) s=0 cache = [[0 for k in xrange(vertices+1)] for j in xrange(vertices+1)] cache[0][s] = 0 for v in range(0, vertices): if v != s: cache[0][v] = float("inf") for i in range(1, vertices): for v in range(0, vertices): least_adjacent_cost = calculate_least_adjacent_cost(adjacency_list, i, v, cache[i1]) cache[i][v] = min(cache[i1][v], least_adjacent_cost) # detecting negative cycles for v in range(0, vertices): least_adjacent_cost = calculate_least_adjacent_cost(adjacency_list, i, v, cache[vertices1]) cache[vertices][v] = min(cache[vertices1][v], least_adjacent_cost) if(not cache[vertices] == cache[vertices1]): raise Exception("negative cycle detected") shortest_path = min(cache[vertices1]) print("Shortest Path: " + str(shortest_path)) 
where calculate_least_adjacent_cost is defined like so:
def calculate_least_adjacent_cost(adjacency_list, i, v, cache): adjacent_nodes = adjacency_list[v] least_adjacent_cost = float("inf") for node in adjacent_nodes: adjacent_cost = cache[node["from"]1] + node["weight"] if adjacent_cost < least_adjacent_cost: least_adjacent_cost = adjacent_cost return least_adjacent_cost 
As you can see we’re using an adjacency list as our graph representation, mapping from each node to its corresponding edges. We then initialise the cache as per the algorithm and then have two nested for loops which we use to work out the shortest path from s to each vertex.
The calculate_least_adjacent_cost function is used to work out which of the incoming edges to a vertex has the lowest cost, taking into account previous shortest path calculations that we’ve done up to the source vertices of those incoming edges.
We then have an extra call at the end to check for negative cycles. If there is no change in the values calculated from s to each vertex then we know we don’t have any negative cycles because otherwise one of them would have come into effect and given us a different result.
This algorithm works but it’s inefficient in its use of space – our cache has size n*n when we only ever care about 2 of the rows – the previous and current ones – so we can just use a list instead.
If we do this the code now looks like this:
s=0 cache = [[] for j in xrange(vertices+1)] cache[s] = 0 for v in range(0, vertices): if v != s: cache[v] = float("inf") for i in range(1, vertices): for v in range(0, vertices): previous_cache = cache least_adjacent_cost = calculate_least_adjacent_cost(adjacency_list, i, v, previous_cache) cache[v] = min(previous_cache[v], least_adjacent_cost) # detecting negative cycles for v in range(0, vertices): previous_cache = copy.deepcopy(cache) least_adjacent_cost = calculate_least_adjacent_cost(adjacency_list, i, v, previous_cache) cache[v] = min(previous_cache[v], least_adjacent_cost) if(not cache == previous_cache): raise Exception("negative cycle detected") 
By doing this we lose the history of the algorithm over the run which means in its current state we wouldn’t be able to work our what the shortest path was, we just know its cost.
For a 1000 node, 47978 edge graph it takes 27 seconds to go over the whole graph and detect a negative cycle if there is one.
The code for this algorithm is on github as always.
Kruskal’s Algorithm using union find in Ruby
I recently wrote a blog post describing my implementation of Kruskal’s algorithm – a greedy algorithm using to find a minimum spanning tree (MST) of a graph – and while it does the job it’s not particularly quick.
It takes 20 seconds to calculate the MST for a 500 node, ~2000 edge graph.
One way that we can improve the performance of the algorithm is by storing the MST in a union find/disjoint set data structure.
To refresh, Kruskal’s algorithm does the following:
 sort edges E in order of increasing cost
 let T = minimum spanning tree, m = number of edges
 for i=1 to m:
 let e = selected edge (u,v)
 if there’s no way to go between u and v using edges in T:
 add e to T
 return T
In the first version of the algorithm we stored the MST in an array and then we did a depth first search to check that adding an edge to the array wasn’t going to create a cycle which would mean we no longer had an MST.
As I understand it the way that we use the union find data structure is that we start out with n connected components i.e. every node is in its own connected component.
We then merge these components together as we come across new edges which connect nodes in different components until we only have one component left at which point we should have our MST.
 sort edges E in order of increasing cost
 initialise unionfind uf with n connected components
 let T = minimum spanning tree, m = number of edges
 for i=1 to m:
 let e = selected edge (u,v)
 if u and v not in same connected component in uf:
 merge u and v into same connected component in uf
 add e to T
 return T
I came across this bit of code written by Michael Luckeneder which implements the union find data structure in Ruby and adapted that slightly to fit the nomenclature used in the Algorithms 2 videos.
The union find data structure looks like this:
class UnionFind def initialize(n) @leaders = 1.upto(n).inject([]) { leaders, i leaders[i] = i; leaders } end def connected?(id1,id2) @leaders[id1] == @leaders[id2] end def union(id1,id2) leader_1, leader_2 = @leaders[id1], @leaders[id2] @leaders.map! {i (i == leader_1) ? leader_2 : i } end end 
We have two methods:
 connected? which we use to check whether or not two nodes are in the same connected component.
 union which we use to put two nodes into the same connected component.
The way it’s implemented is that we have a collection of ‘leaders’ and initially each node is its own leader. As we find edges which belong in the MST we call union passing the two nodes as arguments. After this method executes every node which has the same leader as the first node will now instead have the same leader as the second node.
For example if we have a simple graph with edges 1 > 2 > 3 > 4 > 5 our initial union find data structure would look like this:
> uf = UnionFind.new 5 => #<UnionFind:0x45e5a9b3 @leaders=[nil, 1, 2, 3, 4, 5]> 
At this stage each node is in its own connected component but if we process the edge 1 > 2 we’d first check if those two nodes were already in the same connected component:
> uf.connected?(1,2) => false 
Given that they aren’t we can call union:
> uf.union(1,2) => [nil, 2, 2, 3, 4, 5] 
As we can see from the output nodes 1 & 2 now both have the same leader since they are in the same connected component while the other nodes still remain on their own.
We could then process the 2 > 3 edge which would put nodes 1, 2 & 3 together:
> uf.union(2,3) => [nil, 3, 3, 3, 4, 5] 
The outline for Kruskal’s algorithm which makes use of this data structure is like so:
set = UnionFind.new number_of_nodes @minimum_spanning_tree = [] edges = file.drop(1). map { x x.gsub(/\n/, "").split(" ").map(&:to_i) }. map { one, two, weight { :from => one, :to => two, :weight => weight}}. sort_by { x x[:weight]} edges.each do edge if !set.connected?(edge[:from], edge[:to]) @minimum_spanning_tree << edge set.union(edge[:from], edge[:to]) end end puts "MST: #{@minimum_spanning_tree}" puts "Cost: #{@minimum_spanning_tree.inject(0) { acc, x acc + x[:weight]}}" 
This version of the algorithm runs in 1.9 seconds, a significant improvement on the initial version. The full code is on github as usual!
Kruskal’s Algorithm in Ruby
Last week I wrote a couple of posts showing different implementations of Prim’s algorithm – an algorithm using to find a minimum spanning tree in a graph – and a similar algorithm is Kruskal’s algorithm.
Kruskal’s algorithm also finds a minimum spanning tree but it goes about it in a slightly different way.
Prim’s algorithm takes an approach whereby we select nodes and then find connecting edges until we’ve covered all the nodes. Kruskal’s algorithm, on the other hand, drives from the edges of lowest costs and makes sure that we don’t create a cycle in our spanning tree by adding an edge to it.
The pseudocode for Kruskal’s algorithm reads like so:
 sort edges E in order of increasing cost
 let T = minimum spanning tree, m = number of edges
 for i=1 to m:
 let e = selected edge (u,v)
 if there’s no way to go between u and v using edges in T:
 add e to T
 return T
The full code is on github and the outline of the algorithm reads like so:
@minimum_spanning_tree = [] edges = file.drop(1). map { x x.gsub(/\n/, "").split(" ").map(&:to_i) }. map { one, two, weight { :from => one, :to => two, :weight => weight}}. sort_by { x x[:weight]} edges.each do edge @minimum_spanning_tree << edge unless has_cycles edge end 
The main bit of code is a variation on depth first search to check if adding an edge creates a cycle which would mean we’re adding an edge which isn’t needed as part of a minimum spanning tree.
def has_cycles(edge) node_one, node_two = edge[:from], edge[:to] @minimum_spanning_tree.each { x x[:explored] = false } cycle_between(node_one, node_two, @minimum_spanning_tree.dup) end def cycle_between(one, two, edges) adjacent_edges = edges.select { edge edge[:to] == one  edge[:from] == one} return false if adjacent_edges.empty? adjacent_edges.reject {edge edge[:explored] }.each do edge edge[:explored] = true other_node = (edge[:from] == one) ? edge[:to] : edge[:from] return true if other_node == two  cycle_between(other_node, two, edges) end false end 
cycle_between is the depth first search but we’re using it to tell us whether there’s already a path between two nodes in the graph and we exit as soon as we determine that.
I added some code on line 3 to reset the explored status of edges each time that has_cycles is called but I’m not sure why this is necessary because I am making a copy of @minimum spanning_tree (on line 4) before mutating it.
Otherwise I think this is a fairly standard solution. If we wanted to go from node 1 to node 3 and we had the following edges: 1 > 4 > 5 > 3 we’d make these method calls:
# excluding weights and explored status for brevity cycle_between(1,3, [{:from => 1, :to => 4}, {:from => 4, :to => 5}, {:from => 5, :to => 3}) cycle_between(4,3, [{:from => 1, :to => 4}, {:from => 4, :to => 5}, {:from =>; 5, :to => 3}) cycle_between(5,3, [{:from => 1, :to => 4}, {:from => 4, :to => 5}, {:from => 5, :to => 3}) 
The function would exit on that last iteration because we’d match our target node ‘3’.
This algorithm wasn’t one of the assignments for the class but I used the same data set that was provided for Prim’s algorithm and was able to get the same output so I figure it works!
Prim’s algorithm using a heap/priority queue in Ruby
I recently wrote a blog post describing my implementation of Prim’s Algorithm for the Algorithms 2 class and while it comes up with the right answer for the supplied data set it takes almost 30 seconds to do so!
In one of the lectures Tim Roughgarden points out that we’re doing the same calculations multiple times to work out the next smallest edge to include in our minimal spanning tree and could use a heap to speed things up.
A heap works well in this situation because one of the reasons we might use a heap is to speed up repeated minimum computations i.e. working out the minimum weighted edge to add to our spanning tree.
The pseudocode for the Prim’s algorithm which uses a heap reads like this:
 Let X = nodes covered so far, V = all the nodes in the graph, E = all the edges in the graph
 Pick an arbitrary initial node s and put that into X
 for v ∈ V – X
 key[v] = cheapest edge (u,v) with v ∈ X
 while X ≠ V:
 let v = extractmin(heap) (i.e. v is the node which has the minimal edge cost into X)
 Add v to X
 for each edge v, w ∈ E
 if w ∈ V – X (i.e. w is a node which hasn’t yet been covered)
 Delete w from heap
 recompute key[w] = min(key[w], weight(v, w)) (key[w] would only change if the weight of the edge (v,w) is less than the current weight for that key).
 reinsert w into the heap
 if w ∈ V – X (i.e. w is a node which hasn’t yet been covered)
We store the uncovered nodes in the heap and set their priority to be the cheapest edge from that node into the set of nodes which we’re already covered.
I came across the PriorityQueue gem which actually seems to be better than a heap because we can have the node as the key and then set the priority of the key to be the edge weight. When you extract the minimum value from the priority queue it makes use of this priority to return the minimum one.
The outline of my solution to this problem looks like this:
MAX_VALUE = (2**(0.size * 8 2) 1) adjacency_matrix = create_adjacency_matrix @nodes_spanned_so_far, spanning_tree_cost = [1], 0 heap = PriorityQueue.new nodes_left_to_cover.each do node cheapest_nodes = get_edges(adjacency_matrix, node1). select { _, other_node_index @nodes_spanned_so_far.include?(other_node_index + 1) }  [] cheapest = cheapest_nodes.inject([]) do all_edges, (weight, index) all_edges << { :start => node, :end => index + 1, :weight => weight } all_edges end.sort { x,y x[:weight] y[:weight] }.first weight = !cheapest.nil? ? cheapest[:weight]: MAX_VALUE heap[node] = weight end while !nodes_left_to_cover.empty? cheapest_node, weight = heap.delete_min spanning_tree_cost += weight @nodes_spanned_so_far << cheapest_node edges_with_potential_change = get_edges(adjacency_matrix, cheapest_node1). reject { _, node_index @nodes_spanned_so_far.include?(node_index + 1) } edges_with_potential_change.each do weight, node_index heap.change_priority(node_index+1, [heap.priority(node_index+1), adjacency_matrix[cheapest_node1][node_index]].min) end end puts "total spanning tree cost #{spanning_tree_cost}" 
I couldn’t see a way to keep track of the edges that comprise the minimal spanning tree so in this version I’ve created a variable which keeps tracking of the edge weights as we go rather than computing it at the end.
We start off by initialising the priority queue to contain entries for each of the nodes in the graph.
We do this by finding the edges that go from each node to the nodes that we’ve already covered. In this case the only node we’ve covered is node 1 so the priorities for most nodes will be MAX_VALUE and for nodes which have an edge to node 1 it’ll be the weight of that edge.
While we still have nodes left to cover we take the next node with the cheapest weight from the priority queue and add it to the collection of nodes that we’ve covered. We then iterate through the nodes which have an edge to the node we just removed and update the priority queue if necessary.
The time taken for this version of the algorithm to run against the data set was 0.3 seconds as compared to the 29 seconds of the naive implementation.
As usual the code is on github – I need to figure out how to keep track of the edges so if anyone has any suggestions that’d be cool.
Prim’s Algorithm in Ruby
One of the first programming assignments of the Algorithms 2 course was to code Prim’s algorithm – a greedy algorithm used to find the minimum spanning tree of a connected weighted undirected graph.
In simpler terms we need to find the path of least cost which connects all of the nodes together and there can’t be any cycles in that path.
Wikipedia has a neat diagram which shows this more clearly:
The pseudocode for the algorithm is as follows:
 Let X = nodes covered so far, T = edges covered so far, V = all the nodes in the graph
 Pick an initial arbitrary node s – it doesn’t matter which one it is
 while X ≠ V:
 let e = (u,v) be the cheapest edge of the graph where u ∈ X and v ∉ X
i.e. u is a node that has been covered and v a node that has not yet been covered  Add e to T
 Add v to X
 let e = (u,v) be the cheapest edge of the graph where u ∈ X and v ∉ X
At the end of the algorithm we’ll have a collection of all the nodes in the graph and a collection of the edges we need to follow in order to create a minimal spanning tree. If we sum the weights of those edges we get the cost of the tree.
I used an adjacency matrix to represent the graph i.e. a 2 dimensional array of size n*n (where n = number of nodes in the graph).
If node 0 had an edge to node 3 of weight 4 then we’d put this entry into the matrix:
adjacency_matrix[0][3] = 4 
I tried to get my implementation of the algorithm to look as close to the pseudocode as possible and I ended up with the following:
adjacency_matrix = create_adjacency_matrix first_edge = select_first_edge(adjacency_matrix) @nodes_spanned_so_far, @edges = [first_edge[:start], first_edge[:end]], [first_edge] while !nodes_left_to_cover.empty? cheapest_edge = find_cheapest_edge(adjacency_matrix, @nodes_spanned_so_far, number_of_nodes) @edges << cheapest_edge @nodes_spanned_so_far << cheapest_edge[:start] end 
The code became a bit messy in parts because it relies on a 0 indexed array yet the names of the nodes start at 1. There’s therefore loads of +1s and 1s dotted around the place.
The method to work out the next cheapest node looks like this:
def find_cheapest_edge(adjacency_matrix, nodes_spanned_so_far, number_of_nodes) available_nodes = (0..number_of_nodes1).to_a.reject { node_index nodes_spanned_so_far.include?(node_index + 1) } cheapest_edges = available_nodes.inject([]) do acc, node_index get_edges(adjacency_matrix, node_index).select { _, other_node_index nodes_spanned_so_far.include?(other_node_index + 1) }.each do weight, other_node_index acc << { :start => node_index + 1, :end => other_node_index + 1, :weight => weight } end acc end cheapest_edges.sort { x,y x[:weight] y[:weight] }.first end def get_edges(adjacency_matrix, node_index) adjacency_matrix[node_index].each_with_index.reject { edge, index edge.nil? } end 
We first get all the nodes which haven’t already been spanned and then build up a collection of the edges between nodes we’ve already spanned and ones that we haven’t.
The other bit of interesting code is the creation of the adjacency matrix at the beginning:
def create_adjacency_matrix adjacency_matrix = [].tap { m number_of_nodes.times { m << Array.new(number_of_nodes) } } file.drop(1).map { x x.gsub(/\n/, "").split(" ").map(&:to_i) }.each do (node1, node2, weight) adjacency_matrix[node1  1][node2  1] = weight adjacency_matrix[node2  1][node1  1] = weight end adjacency_matrix end 
Here we are first parsing the file which involves skipping the first header line and the converting it into a collection of integer arrays representing the two nodes and their corresponding edge weight.
We then put two entries into the adjacency matrix, one entry from node A to node B and one entry from node B to node A. The reason we do this is that this is an undirected graph so we can go either way between the nodes.
To work out the cost of the minimum spanning tree I use this line of code at the end:
puts "edges: #{@edges}, total spanning tree cost #{@edges.inject(0) {acc, edge acc + edge[:weight]}}" 
My full solution is on github so if anyone has any suggestions/improvements they’re always welcome.
Algorithms: Rabin Karp in Haskell
I recently came across a blog post describing the Rabin Karp algorithm – an algorithm that uses hashing to find a pattern string in some text – and thought it would be interesting to try and write a version of it in Haskell.
This algorithm is typically used when we want to search for multiple pattern strings in a text e.g. when detecting plagiarism or a primitive way of detecting code duplication but my initial version only lets your search for one pattern.
For text of length n and p patterns of combined length m, its average and best case running time is O(n+m) in space O(p), but its worstcase time is O(nm)
1 2 3 4 5 6 7 8  function RabinKarp(string s[1..n], string sub[1..m]) hsub := hash(sub[1..m]); hs := hash(s[1..m]) for i from 1 to nm+1 if hs = hsub if s[i..i+m1] = sub return i hs := hash(s[i+1..i+m]) return not found 
On line 2 we compute the initial hash value for the pattern and for the first m characters of the text where m represents the number of characters in the pattern.
We then work through the text comparing the pattern hash with our current version of the text hash each time.
If they match then we check that the characters in those positions also match, since two different strings can hash to the same value, and if they do then we’re done.
Line 7 is the interesting line because if we recalculate the hash from scratch each time then it will take O(m) time which means the whole algorithm will take O(nm) time since it’s called in a loop.
We therefore need to use a rolling hash which will allow us to work out the value of the next hash from the current one.
For example if we’re searching for a three letter pattern in the text ‘markus’ then on our first iteration of the loop we’ll have hash(“mar”) and on the second iteration we need to work out hash(“ark”).
The hash of “mar” already contains the hash value of “ar” so we need to remove the hash value of the “m” and then add the hash value of “k” to get our new hash value.
I used this Java version of the algorithm as a template.
This is the main function:
import Data.Char import Data.List import Data.Maybe rabinKarp :: String > String > Int rabinKarp text pattern = if pattern == "" then 1 else fromMaybe (1) $ mapOnMaybe fst $ find matchingString $ zip [0..] $ scanl nextHash (hash text m) $ windowed (m+1) text where n = length text m = length pattern nextHash currentHash chars = reHash currentHash (head chars) (last chars) m matchingString (offset, textHash) = hash2 pattern m == textHash && pattern == subString text offset m 
I applied the windowed function to the text to break it down into a collection of smaller strings which included the next character that we would need to look at.
e.g. if we were searching for a 3 character pattern in “markusauerelius” then this would be the collection of values we iterate over.
windowed 4 "markusauerelius" ["mark","arku","rkus","kusa","usau","saue","auer","uere","erel","reli","eliu","lius"] 
I then use scanl to apply a reduction over that collection, passing in the hash of the previous 3 characters each time in this example. I used scanl instead of foldl so that I could see the value of the hash on each iteration rather than only at the end.
Next I use a zip to get the indexes of each letter in the text and then I look for the first entry in the collection which matches the pattern hash and has the same sequence of characters.
The mapToMaybe is used to grab the index of the match and then we return a ‘1’ if there is no match in the last bit of the line.
I’m assuming that scanl is lazily evaluated and in this case will therefore only evaluate up to where a match is found – if that assumption’s wrong then this version of Rabin Karp is very inefficient!
The hash and reHash functions referenced above are defined like so:
globalQ = 1920475943 globalR = 256 hash = hash' globalR globalQ hash' r q string m = foldl (\acc x > (r * acc + ord x) `mod` q) 0 $ take m string 
> hash "markusaerelius" 3 7168370 
Written in English this is the definition of the hash function:
((r^{m1} * ascii char) + (r^{m2} * ascii char) + … (r^{0} * ascii char)) % q
where r = 256, q = 1920475943
Which translates to the following in Haskell:
((256^{2} * ord ‘m’) + (256^{1} * ord ‘a’) + (256^{0} * ord ‘r’)) `mod` 1920475943
I was having trouble with casting bigger values when I wrote a function which described this more explicitly which is why the current ‘hash’ function applies the modulo function on each iteration.
The reHash function is defined like this:
reHash = reHash' globalR globalQ reHash' r q existingHash firstChar nextChar m = (takeOffFirstChar `mod` fromIntegral q * fromIntegral r + ord nextChar) `mod` fromIntegral q where rm = if m >0 then (fromIntegral r ^ fromIntegral (m1)) `mod` fromIntegral q else 0 takeOffFirstChar = existingHash  fromIntegral rm * ord firstChar 
In English:
reHash = ((existingHash – ((r^{m1} % q) * ascii firstChar)) % q + (r * ascii nextChar)) % q
First we remove the hash of the first character – which has a value of ‘r^{m1} * ascii char’ from our hash function – and then we multiply the whole thing by ‘r’ to push each character up by one position
e.g. the 2nd character would initially have a hash value of ‘r^{m2} * ascii char’. We multiply by ‘r’ so it now has a hash value of ‘r^{m1} * ascii char’.
Then we add the ascii value of the next character along and we have our new hash value.
We can compare the results we get from using hash and reHash to check it’s working:
> hash "mar" 3 7168370 > hash "ark" 3 6386283 
> reHash 7168370 'm' 'k' 3 6386283 
I hardcoded ‘globalQ’ to make life a bit easier for myself but in a proper implementation we’d randomly generate it.
‘globalR’ would be constant and I wanted it to be available to the hash and reHash functions without needed to be passed explicitly which is why I’ve partially applied hash’ and reHash’ to achieve this.
We can then run the algorithm like so:
> rabinKarp "markusaerelius" "sae" 5 
> rabinKarp "markusaerelius" "blah" 1 
My whole solution is available on hpaste and as usual if you see any ways to code this better please let me know.
Algorithms: Flood Fill in Haskell – Abstracting the common
In the comments of my blog post describing the flood fill algorithm in Haskell David Turner pointed out that the way I was passing the grid around was quite error prone.
floodFill :: Array (Int, Int) Colour > (Int, Int) > Colour > Colour > Array (Int, Int) Colour floodFill grid point@(x, y) target replacement = if((not $ inBounds grid point)  grid ! (x,y) /= target) then grid else gridNorth where grid' = replace grid point replacement gridEast = floodFill grid' (x+1, y) target replacement gridWest = floodFill gridEast (x1, y) target replacement gridSouth = floodFill gridWest (x, y+1) target replacement gridNorth = floodFill gridSouth (x, y1) target replacement 
I actually did pass the wrong grid variable around while I was writing it and ended up quite confused as to why it wasn’t working as I expected.
David’s suggestion based on the above version of the algorithm was that I might want to use the state monad to thread state rather than explicitly passing it around like I am here.
I had a go at writing the function using the state monad but realised while doing so that rather than doing that I could fold over a cells neighbours rather than using the state monad to thread state.
I didn’t see the ‘neighbours’ concept the first time I wrote it but David wrote a version of the algorithm in which he introduced that.
This is the resulting function:
floodFill :: Array (Int, Int) Colour > (Int, Int) > Colour > Colour > Array (Int, Int) Colour floodFill grid point target replacement = if(target == replacement) then grid else let gridWithSquareReplaced = if onGrid point then grid // [(point, replacement)] else grid validNeighbours = filter (onGrid `and` sameAsTarget) neighbours in foldl (\grid point > floodFill grid point target replacement) gridWithSquareReplaced validNeighbours where neighbours = let (x,y) = point in [(x+1, y), (x1, y), (x, y+1), (x, y1)] sameAsTarget point = grid ! point == target onGrid = inRange $ bounds grid and :: (a > Bool) > (a > Bool) > a > Bool and f g x = f x && g x 
I initially get all of the neighbouring squares but then filter those out if they don’t fit on the grid or aren’t of the required colour. We can then use ‘foldl’ to iterate over each neighbour passing the grid along as the accumulator.
David also wrote a more succinct function to work out whether or not a value fits on the grid so I’ve stolen that as well!
I’m still not sure I totally understand when I should be using ‘let’ and ‘where’ but in this example the functions I’ve put in the ‘where’ section are helper functions whereas the values I defined in the ‘let’ section are domain concepts.
Whether that’s the correct way to think of the two I’m not sure!
Algorithms: Flood Fill in Haskell
Flood fill is an algorithm used to work out which nodes are connected to a certain node in a multi dimensional array. In this case we’ll use a two dimensional array.
The idea is that we decide that we want to change the colour of one of the cells in the array and have its immediate neighbours who share its initial colour have their colour changed too i.e. the colour floods its way through the grid.
The algorithm is described on Wikipedia like so:
Floodfill (node, targetcolor, replacementcolor):
 If the color of node is not equal to targetcolor, return.
 Set the color of node to replacementcolor.
 Perform Floodfill (one step to the west of node, targetcolor, replacementcolor).
 Perform Floodfill (one step to the east of node, targetcolor, replacementcolor).
 Perform Floodfill (one step to the north of node, targetcolor, replacementcolor).
 Perform Floodfill (one step to the south of node, targetcolor, replacementcolor).
 Return.
I decided to have a go at implementing it in Haskell and ended up with the following code:
import Data.Array data Colour = White  Black  Blue  Green  Red deriving (Show, Eq) 
inBounds :: Array (Int, Int) Colour > (Int, Int) > Bool inBounds grid (x, y) = x >= lowx && x <= highx && y >= lowy && y <= highy where ((lowx, lowy), (highx, highy)) = bounds grid replace :: Array (Int, Int) Colour > (Int, Int) > Colour > Array (Int, Int) Colour replace grid point replacement = if inBounds grid point then grid // [(point, replacement)] else grid 
floodFill :: Array (Int, Int) Colour > (Int, Int) > Colour > Colour > Array (Int, Int) Colour floodFill grid point@(x, y) target replacement = if((not $ inBounds grid point)  grid ! (x,y) /= target) then grid else gridNorth where grid' = replace grid point replacement gridEast = floodFill grid' (x+1, y) target replacement gridWest = floodFill gridEast (x1, y) target replacement gridSouth = floodFill gridWest (x, y+1) target replacement gridNorth = floodFill gridSouth (x, y1) target replacement 
Since we can’t mutate the array, but only create new instances of it, we have to pass it onto the next recursive call such that the recursive call to the ‘East’ passes its result to the one recursing to the ‘West’ and so on.
It seemed to be easier to check that the square we wanted to change was actually in the grid in the replace function rather than before calling it so I pushed the logic into there. The bounds function was useful for allowing me to work out if we were still on the grid.
If we start with this grid:
> printGrid $ (toComplexArray [[White, White, White, Blue, Blue], [Blue, White, Blue, Blue, Blue], [Blue, Blue, Blue, Green, Green], [Green, Red, Blue, Black, Black], [Blue, Blue, Blue, Green, Blue]]) White White White Blue Blue Blue White Blue Blue Blue Blue Blue Blue Green Green Green Red Blue Black Black Blue Blue Blue Green Blue 
Let’s say we want to change the ‘Blue’ value on the second line down, third row across (position 1,2) in the array to be ‘Red’:
> printGrid $ floodFill (toComplexArray [[White, White, White, Blue, Blue], [Blue, White, Blue, Blue, Blue], [Blue, Blue, Blue, Green, Green], [Green, Red, Blue, Black, Black], [Blue, Blue, Blue, Green, Blue]]) (1,2) Blue Red White White White Red Red Red White Red Red Red Red Red Red Green Green Green Red Red Black Black Red Red Red Green Blue 
The toComplexArray function is used to convert a list into an ‘Array’ because it’s much easier to create a list for testing purposes. The code for that is on github if you’re interested.
I described the printGrid function in my last post and the rest of the code is on my github haskell repository.