Sunday, February 10, 2008

More Erlang Fun: Distributed, Fault Tolerant MapReduce

I the Erlang Challenge posting, I showed how to implement a simple distributed parallel map function. You can pass it a list of nodes, and it performs each application of F1 on a random node ("node" means a connected Erlang VM that may exist on a different machine) from the list. Finally, it collects the results in the order of the arguments.

What Erlang nailed beside concurrent and distributed programming is fault tolerance. When processes die or nodes go down, the VM notifies their supervisors so they can perform error recovery. My previous example didn't demonstrate this capability, so in this article I'll show how to implement a similar solution but also taking into account node failures. In addition, instead of doing just a simple parallel map, this example will also fold the results, making it a basic MapReduce implementation.

(The reason I'm adding the 'reduce' phase is because it allows me to not worry about the order at which the results are accumulated. This implies that the reduce operation must be commutative.)

The function mapreduce() takes a function F1 and applies to elements of list L. Each application is performed on a remote node from Nodes. As opposed to the pmap() example, this implementation chooses the next node from the list in a rotating fashion instead of randomly (i.e., if Nodes is [A, B, C], then F1 is applied on remote nodes with the following order: A, B, C, A, B, C, A ...). The results are collected, and if any nodes went down, their computations are retried on the remaining nodes until all computations succeed or no nodes are left. If the computations all succeed, we call lists:foldl(F2, Acc, Vals), where Vals contains the results of the computations.

Implementation details:

- do_map() returns a list of {Pid, X} tuples. Pid is the process on the remote node in which F1 was applied to X. The reason we want to remember X is that if Pid's node goes down, we can later retry to evaluate F1(X) on a different node.
- For each {Pid, X} tuple, collect() receives either an {ok, Pid, Val} message, indicating the computation was performed successfully, or {nodedown, Node}, indicating the node on which Pid lived went down. collect() folds the result into a tuple of type {Successes, Failures, RemainingNodes}. If Failures is not empty, collect() calls do_map() on the list of Failures and the remaining nodes, and then recursively calls itself on the result and appends the outcome to Successes.

The code is below. (Note that I didn't test it, so it may have bugs :) )


-module(mapreduce).
-export([mapreduce/5]).

mapreduce(F1, F2, Acc, L, Nodes) ->
%% map F1 to the elements of L on nodes from Nodes
Results = do_map(F1, L, Nodes),

%% collect the results, retrying in the event of failures
Vals = collect(F1, Results, Nodes),

%% perform the reduce operation
lists:foldl(F2, Acc, Vals).


%% exit if we ran out of good nodes
do_map(_F1, _L, []) -> exit(no_nodes);
do_map(F1, L, Nodes) ->
Parent = self(),

%% apply F1 to values of L on remote nodes in a rotating fashion
{Results, _Nodes1} =
lists:foldl(
fun(X, {Acc, [Node | Rest]}) ->
erlang:monitor_node(Node),
Fun = fun() -> Parent ! {ok, self(), (catch F1(X))} end,
Pid = spawn(Node, Fun),
{[{Pid, X} | Acc], Rest ++ [Node]}
end, {[], Nodes}, L),
Results.

collect(F1, Results, Nodes) ->
collect(F1, Results, Nodes, []).
collect(F1, Results, Nodes, Acc) ->
{Successes, Failures, RemainingNodes}=
lists:foldl(
fun({Pid, X}, {Successes1, Failures1, Nodes1}) ->
Node = node(Pid),
receive
{ok, Pid, Val} ->
{[Val | Successes1], Failures1, Nodes1};

%% we may receive this message because of call to
%% monitor_node()
{nodedown, Node} ->
{Successes1, [X | Failures1], Nodes1 -- Node}
end
end, {[], [], Nodes}, Results),

if Failures =/= [] ->
%% retry the failed computations on the remaining nodes
%% and add the results to the current list of successes
Results2 = do_map(F1, Failures, RemainingNodes),
collect(F1, Results2, RemainingNodes, Successes ++ Acc);
true ->
Successes ++ Acc
end.


This is more complex than the pmap() example, but I think it packs a lot of functionality into a relatively small amount of code

7 comments:

Luke Hoersten said...

I noticed this is kind of an extended version to the question I asked you over Facebook. Very nice.

links for 2008-02-12 « Bloggitation said...

[...] More Erlang Fun: Distributed, Fault Tolerant MapReduce (tags: erlang programming blog cluster 247up) [...]

Duane Johnson said...

I'm fairly new to Erlang, so please excuse my ignorance. I was wondering if the module should export mapreduce/5 since there doesn't seem to be a 4-argument mapreduce?

Yariv said...

Oops, it was a late change that I didn't check with the compiler. Thanks for pointing it out.

Tim Dysinger said...

I made a small change (monitor_node call needs a boolean arg) and added some eunit tests:


-module(mapreduce).
-export([mapreduce/5]).

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.

mapreduce(F1, F2, Acc, L, Nodes) ->
%% map F1 to the elements of L on nodes from Nodes
Results = do_map(F1, L, Nodes),

%% collect the results, retrying in the event of failures
Vals = collect(F1, Results, Nodes),

%% perform the reduce operation
lists:foldl(F2, Acc, Vals).


%% exit if we ran out of good nodes
do_map(_F1, _L, []) -> exit(no_nodes);
do_map(F1, L, Nodes) ->
Parent = self(),

%% apply F1 to values of L on remote nodes in a rotating fashion
{Results, _Nodes1} =
lists:foldl(
fun(X, {Acc, [Node | Rest]}) ->
erlang:monitor_node(Node, true),
Fun = fun() -> Parent ! {ok, self(), (catch F1(X))} end,
Pid = spawn(Node, Fun),
{[{Pid, X} | Acc], Rest ++ [Node]}
end, {[], Nodes}, L),
Results.

collect(F1, Results, Nodes) ->
collect(F1, Results, Nodes, []).
collect(F1, Results, Nodes, Acc) ->
{Successes, Failures, RemainingNodes}=
lists:foldl(
fun({Pid, X}, {Successes1, Failures1, Nodes1}) ->
Node = node(Pid),
receive
{ok, Pid, Val} ->
{[Val | Successes1], Failures1, Nodes1};

%% we may receive this message because of call to
%% monitor_node()
{nodedown, Node} ->
{Successes1, [X | Failures1], Nodes1 -- Node}
end
end, {[], [], Nodes}, Results),

if Failures =/= [] ->
%% retry the failed computations on the remaining nodes
%% and add the results to the current list of successes
Results2 = do_map(F1, Failures, RemainingNodes),
collect(F1, Results2, RemainingNodes, Successes ++ Acc);
true ->
Successes ++ Acc
end.

-ifdef(TEST).

pass_through_test() ->
?assertEqual([one, two, three],
mapreduce(fun(X) -> X end,
fun(X, Acc) -> lists:append(Acc, [X]) end,
[],
[one, two, three],
[node()])).

multiply_by_2_no_reduce_test() ->
?assertEqual([2, 4, 6],
mapreduce(fun(X) -> X * 2 end,
fun(X, Acc) -> lists:append(Acc, [X]) end,
[],
[1, 2, 3],
[node()])).

multiply_by_2_and_reduce_test() ->
?assertEqual([2, 4, 6],
mapreduce(fun(X) -> X * 2 end,
fun(X, Acc) ->
case lists:member(X, Acc) of
false -> lists:append(Acc, [X]);
_ -> Acc
end
end,
[],
[1, 2, 2, 1, 3],
[node()])).

-endif.

Tim Dysinger said...

Here's the hadoop word count ported to erlang as eunit test for the mapreduce module -> (sorry for the formatting yarivs, maybe you can fix it.)

%%% Port of -> http://wiki.apache.org/hadoop/WordCount
word_count_test() ->
Words = [the, quick, brown, fox, jumped, over, the, lazy, dog],
Result =
mapreduce(fun(X) -> {X, 1} end,
fun({Word, _}, Dict) ->
case dict:is_key(Word, Dict) of
true -> dict:store(Word,
dict:fetch(Word, Dict) + 1,
Dict);
false -> dict:store(Word, 1, Dict)
end
end,
dict:new(),
Words,
[node()]),
[ ?assertEqual(case Word of the -> 2; _ -> 1 end,
dict:fetch(Word, Result)) || Word <- Words ].

Tim Dysinger said...

The other things that are needed are

1- ability to add new nodes during the (sometimes) long run - "alternates" to step in for failed nodes.

4- ability to "combine" (local mini-reduce - post map/pre-partition)

2- ability to partition (pre-reduce function that determines reduce grouping)

3- doing the reduction on romote nodes (not all on the master)