Monday, March 09, 2009

Parallel merge sort in Erlang

I've been thinking lately about the problem of scaling a service like Twitter or the Facebook news feed. When a user visits the site, you want to show her a list of all the recent updates from her friends, sorted by date. It's easy when the user doesn't have too many friends and all the updates are on a single database (as in Twoorl's case :P). You use this query:


"select * from update where uid in ([fid1], [fid2], ...) order by creation_date desc limit 20"


(After making sure you created an index on uid and creation_date, of course :) )

However, what do you when the user has many thousands of friends, and each friend's updates are stored on a different database? Clearly, you should fetch those updates in parallel. In Erlang, it's easy. You use pmap():


fetch_updates(Uids) ->
pmap(
fun(Uid) ->
Db = get_db_for_user(Uid),
query(Db, [<<"select * from update where uid =">>,
Uid, <<" order by creation_date desc limit 20">>])
end, Uids).


%% Applies the function Fun to each element of the list in parallel
pmap(Fun, List) ->
Parent = self(),
%% spawn the processes
Refs =
lists:map(
fun(Elem) ->
Ref = make_ref(),
spawn(
fun() ->
Parent ! {Ref, Fun(Elem)}
end),
Ref
end, List),

%% collect the results
lists:map(
fun(Ref) ->
receive
{Ref, Elem} ->
Elem
end
end, Refs).


Getting the updates is straightforward. However, what do you do once you've got them? Merging thousands of lists can take a long time, especially if you do it in a single process. The last thing you want is that your site's performance would grind to a halt when users add lots of friends.

Fortunately, merging a list of lists isn't too hard to do in parallel. Once you've implemented your nifty parallel merge algorithm, you can theoretically speed up response time by adding more cores to your web servers. This should help you maintain low latency even for very dense social graphs.

So, how do you merge a list of sorted lists in parallel in Erlang? There is probably more than one way of doing it, but this is what I came up with: you create a list of single element lists. You scan through the main list, and for each pair of lists you spawn a process that merges the two lists and sends the result to the parent process. The parent process collects all the results, and repeats as longs as there is more than one result. When only one result is left, the parent returns it.

Let's start with the base case of how to merge two lists:


%% Merges two sorted lists
merge(L1, L2) -> merge(L1, L2, []).

merge(L1, [], Acc) -> lists:reverse(Acc) ++ L1;
merge([], L2, Acc) -> lists:reverse(Acc) ++ L2;
merge(L1 = [Hd1 | Tl1], L2 = [Hd2 | Tl2], Acc) ->
{Hd, L11, L21} =
if Hd1 < Hd2 ->
{Hd1, Tl1, L2};
true ->
{Hd2, L1, Tl2}
end,
merge(L11, L21, [Hd | Acc]).


Now, to the more interesting part: how to merge a list of sorted lists in parallel.


%% Merges all the lists in parallel
merge_all(Lists) ->
merge_all(Lists, 0).

%% When there are no lists to collect or to merge, return an
%% empty list.
merge_all([], 0) ->
[];

%% When no lists are left to merge, we collect the results of
%% all the merges that were done in spawned processes
%% and recursively merge them.
merge_all([], N) ->
Lists = collect(N, []),
merge_all(Lists, 0);

%% If only one list remains, merge it with the result
%% of all the pair-wise merges
merge_all([L], N) ->
merge(L, merge_all([], N));

%% If two or more lists remains, spawn a process to merge
%% the first two lists and move on to the remaining lists
%% without blocking. Also, increment the number
%% of spawned processes so we know how many results
%% to collect later.
merge_all([L1, L2 | Tl], N) ->
Parent = self(),
spawn(
fun() ->
Res = merge(L1, L2),
Parent ! Res
end),
merge_all(Tl, N + 1).

%% Collects the results of N merges (the order
%% doesn't matter).
collect(0, Acc) -> Acc;
collect(N, Acc) ->
L = receive
Res -> Res
end,
collect(N - 1, [L | Acc]).


So, how well does this perform? I ran a benchmark on my 2.5 GHz Core 2 Duo Macbook Pro. First, I created a list of a million random numbers, each between 1 and a million:


> L = [random:uniform(1000000) || N <- lists:seq(1, 1000000)].


Then, I timed how long it takes to sort the list, first with lists:sort() and then with my shiny new parallel merge function.


> timer:tc(lists, sort, [L]).
{688149,
[1,1,1,1,2,6,7,8,10,11,11,11,13,13,14,15,15,16,17,17,20,21,
21,22,23,24,29|...]}


Less than a second. lists:sort() is pretty fast!

Before we can pass the list of numbers into merge_all(), we have to break it up into multiple lists with a single element in each list:


> Lists = [[E] || E <- L].


Now for the moment of truth:


> timer:tc(psort, merge_all, [Lists]).
{8187563,
[1,1,1,1,2,6,7,8,10,11,11,11,13,13,14,15,15,16,17,17,20,21,
21,22,23,24,29|...]}


About 8.2 seconds :(

It's not exactly an improvement, but at least we learned something. In this test case, the overhead of process spawning and inter-process communications outweighed the benefits of parallelism. It would be interesting to run the same test it on machines that have more than two cores but I don't have any at my disposal right now.

Another factor to consider is that lists:sort() is AFAIK implemented in C and therefore it has an unfair advantage over a function implemented in pure Erlang. Indeed, I tried sorting the list with the following pure Erlang quicksort function:


qsort([]) -> [];
qsort([H]) -> [H];
qsort([H | T]) ->
qsort([E || E <- T, E =< H]) ++
[H] ++
qsort([E || E <- T, E > H]).



> timer:tc(psort, qsort, [L]).
{2066387,
[1,2,3,3,4,5,6,6,7,7,8,8,10,10,10,12,13,14,14,15,17,18,19,
22,24,26,26|...]}


It took about ~2 seconds to sort the million numbers.

The performance of merge_all() doesn't seem great, but consider that we spawned ~1,000,000 processes during this test. It had ~19 levels of recursion (log2 500,000). At each level, we spawned half the number of processes as the previous level. The sum of all levels is 500,000*(1 + 1/2 + 1/4 + 1/8 ... + 1/19) ~= 1,000,000 (http://en.wikipedia.org/wiki/Geometric_series). 8 seconds / 500,000 processes = 0.000016 seconds / process. It's actually quite impressive!

Let's go back to the original problem. It wasn't to sort one big list, but to merge a list of sorted lists with 20 items in each list. In this scenario, we still benefit from parallelism but we don't pay for the overhead of spawning hundreds of thousands of processes to merge tiny lists in the first few levels of recursion. Let's see how long it takes merge_all() to merge a million random numbers split between 50,000 sorted lists.


> Lists = [lists:sort([random:uniform(1000000) || N <- lists:seq(1, 20)])
|| N1 <- lists:seq(1, 50000)].
> timer:tc(psort, merge_all, [lists]).
{2259870,
[1,1,4,5,8,10,10,12,12,13,14,14,14,16,16,17,18,18,18,18,18,
19,19,19,21,22,25|...]}


This function call took just over 2 seconds to run, roughly the same time as qsort(), yet it involved spawning 25,000*(1 - 0.5^15)/(1 - 0.5) ~= 50,000 processes! Now the benefits of concurrency start being more obvious.

Can you think of ways to improve performance further? Let me know!

13 comments:

Eran Sandler said...

This is sort of the reduce part of a Map-Reduce.
You mapped out all the updates from all friends and then reduced them a lot of times until you got one coherent list.

Other than the fact that it might have taken less time on a multi-CPU/core (>2) machine, with Erlang you could easily even shipped this off to a different computer (at least some of this).

The only problem is that with an 8 CPU machine, instead of running ~25,000 processes on each CPU you would run ~6,250 which means each CPU is going to "waste" 0.25 seconds, so either way, this would probably be something that is worth calculating as a batch job somehow, and not in real time since you can easily starve a machine for a quarter of a second :-)

But its an interesting approach, none-the-less :-D

Andrey Nikolaev said...

I've tested it on 8 cores (2 x E5420 @ 2.50GHz).

Without SMP and with only one erl scheduler:

$ erl +S 1 -smp disable -pz ./
1> Lists = [lists:sort([random:uniform(1000000) || N <- lists:seq(1, 20)]) || N1 timer:tc(psort, merge_all, [Lists]).
{2525667,
[1,1,1,1,2,6,7,8,10,11,11,11,13,13,14,15,15,16,17,17,20,21,
21,22,23,24,29|...]}
3> timer:tc(psort, merge_all, [Lists]).
{2216642,
[1,1,1,1,2,6,7,8,10,11,11,11,13,13,14,15,15,16,17,17,20,21,
21,22,23,24,29|...]}
4> timer:tc(psort, merge_all, [Lists]).
{2241898,
[1,1,1,1,2,6,7,8,10,11,11,11,13,13,14,15,15,16,17,17,20,21,
21,22,23,24,29|...]}



With smp and 8 erl schedulers:

$ erl +S 8 -smp enable -pz ./
Erlang (BEAM) emulator version 5.6.5 [source] [smp:8] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.6.5 (abort with ^G)
1> Lists = [lists:sort([random:uniform(1000000) || N <- lists:seq(1, 20)]) || N1 timer:tc(psort, merge_all, [Lists]).
{3588451,
[1,1,1,1,2,6,7,8,10,11,11,11,13,13,14,15,15,16,17,17,20,21,
21,22,23,24,29|...]}
3> timer:tc(psort, merge_all, [Lists]).
{3223897,
[1,1,1,1,2,6,7,8,10,11,11,11,13,13,14,15,15,16,17,17,20,21,
21,22,23,24,29|...]}
4> timer:tc(psort, merge_all, [Lists]).
{3362644,
[1,1,1,1,2,6,7,8,10,11,11,11,13,13,14,15,15,16,17,17,20,21,
21,22,23,24,29|...]}


So, there is no performance boost.

John Bender said...

Glad to see you're posting again, hope it continues.

Jake Donham said...

Seems like master / worker parallelism could be effective here. Instead of spawning a process for every pair of lists, keep a pool of processes, and have them request pairs of lists from the master then return the merged list. You really only need as many processes in the pool as you have processors.

Furthermore, in your code you proceed in phases (issue a bunch of merges, collect them all, repeat), so you lose some parallelism at the end of the phase when there are fewer active processes than processors. Instead, the master could return merged lists to the pool of lists needing merging (until there is just one list in the pool when you're done) so it's only at the end of the whole run that you have fewer tasks than processors.

As you discovered, the granularity of the tasks is important to balance parallelism against communication cost. There is a great but little-known book that you might be interested in on these topics: Carriero and Gelernter's How to Write Parallel Programs (http://www.amazon.com/dp/026203171X).

Matt Williamson said...

It would be nice to see a number much bigger than 1,000,000 run across multiple machines.

Jebu Ittiachen said...

Nothing significant but combining the collect with the merge shaves a second off the merge and does better than the quick sort now :) in tune with what Jake mentioned about losing parallelism in between merge and collect

% mmerge process takes two elements off the list spawns merge
% merge merges them and send it back to the master, this adds it back
% to the tail end of the list continue till list has one element and
% no mergers are left
mmerge_all([List|[]], 0) ->
List;
mmerge_all([L1, L2 | Tl], N) ->
Parent = self(),
spawn(
fun() ->
Res = merge(L1, L2),
Parent ! {list, Res}
end),
mmerge_all(Tl, N + 1);
mmerge_all(List, N) ->
L =
receive
{list, L1} ->
lists:append(List, [L1])
end,
mmerge_all(L, N - 1).

Alex Arnon said...

Shifting around all these lists must also incur a penalty. What would happen when we're close to completion, and start sending lists of 100,000's of elements?

Maybe each worker should actually keep its sorted list, and then directly forward it to another (arbitrarily chosen by the master) once its work queue is truly empty. It could then wait for either more work, or a signal to terminate.

Michael S. said...

Switching sort technique once the lists get small isn't really "cheating"--glibc's qsort switches to insertion sort when the sublists get small. See

http://sourceware.org/cgi-bin/cvsweb.cgi/libc/stdlib/qsort.c?rev=1.12.2.1&content-type=text/x-cvsweb-markup&cvsroot=glibc

Ulf Wiger said...

lists:sort/1 is not a BIF, actually. It's a merge sort algorithm implemented completely in Erlang. The one "dirty trick" it uses is calling lists:reverse(L1, L2), which is the same as doing lists:reverse(L1) ++ L2, but much faster (this is not esp dirty, since the function is documented...)

Your quicksort most likely takes a beating since it uses append.

Witek BARYLUK said...

You actually don't need fully sorted list in this case, just N first elemenets, so you mergers can stop if they ciollect this first N elements. Actually this can be also optimalised in qsort (after selection do only right recursion if needed).

I'm also interested about more complex key used for comparission so it will be dominating (something like now() for date can be).

tmd said...

"When a user visits the site, you want to show her a list of all the recent updates from her friends, sorted by date."

Is Facebook just for women?

Chris Hagan said...

Would you mind going into a little more detail about timing spawned processes in Erlang? I'm trying at the moment to benchmark a heavily parallel server, but what I'm seeing is that timer:tc/4 seems to time the first process until it terminates, which isn't necessarily all the work that needed to be executed - the spawned processes continue until they also terminate, but the stopwatch has stopped already.

In your map/reduce case, of course, you've got your parent receiving so it's certain that the spawned processes are finished before the stopwatch stops, but that is a side effect of those spawned processes notifying their parent.

I don't want to have to end every process throughout my system by signalling a termination just so that I can benchmark. What are your thoughts on a general practice for accurate timing of the interval until all work initiated by a method call is complete?

Jim Dukhovny said...

Can you just take a "penalty" on store rather then on retrieve?
If you need to keep top 100 posts of friends sorted by date then...
For every user keep "Recent top 100" posts and every time someone makes a posts you update all their friend's "Recent top 100" by putting it at #1 and kicking last one out.
I am a Front End guy:) so user's perception of Facebook working fast will be much higher if retrieve will be fast rather then save...
Just a subjective thought :)