`send` optimization for sending to many processes

For cases where you are sending a single message to a large number of processes (tens or hundreds of thousands), currently the only way is to loop through each pid and call send. This is unfortunate because you have to do a lot of extra work copying and sending individual messages to pids which are almost certainly grouped into a much smaller number of nodes. We use a custom solution for handling this (GitHub - discord/manifold: Fast batch message passing between nodes for Erlang/Elixir.), but it feels like something that could be a great bit of built-in language functionality.

In my mind this is solved by adding something like send(list of pids, message), which would internally group the pids into lists based on their node, and then send a message (i guess a new distribution control opcode?) to each node with the message and the list of pids on that node it should be sent to.

I’ve been experimenting a bit with the OTP source locally and if folks like this idea I’d be very happy to drive the implementation work.

4 Likes

Didn’t Joe Armstrong suggest doing something like this once? He had some sort of extended !! idea and if this wasn’t part of that, it would have fitted well. For that matter, [pid…] ! message fits too.

Like this?

send(Pids, Message) ->
    maps:foreach(fun(N, Ps) when  N =:= node() -> lists:foreach(fun(P) -> P ! Message end, Ps);
                    (N, Ps) -> erpc:cast(N, lists, foreach, [fun(P) -> P ! Message end, Ps])
                 end,
                 maps:groups_from_list(fun erlang:node/1, Pids)).
2 Likes

Similar yes. One final thing you’d likely want to do on each receiving node is spread out the effort of sending all those messages across a few threads. In the library I linked in the OP we do that with phash and a few genservers on each receiving end.

1 Like

I also wonder if this could lead to optimizations even sending messages within the same node.

For example, if you need to send messages to 3 processes, you have to traverse the message three times and copy them to the new processes.

But with this API, you can copy contiguously to memory the first time and the next 2 times is a memcpy operation, with no additional traversals. This may be useful in modules such as pg too. /cc @max-au

4 Likes

you can copy contiguously to memory the first time and the next 2 times is a memcpy operation

For this to work within the node, all receiving processes must use off-heap message queue. Otherwise processes have different heap layouts, and deep copy is necessary to created boxed terms, bump reference counters etc… But using off-heap message queue defeats the purpose.

It can probably work over distribution (because message is serialised to a binary buffer), but implementation isn’t trivial at all, considering that nodes in the cluster may run different ERTS versions.

You’re right that I bumped into this problem implementing pg. In fact it helped me to come up with much cleaner design, so I’m grateful this problem exists :sweat_smile: WhatsApp has been battling pg2 for quite some time, to a degree that we had over 20 patches at some point. I’ve got a lot of internal war stories about it. I should probably publish someday, when I’m less lazy to deal with comms. In original pg2 implementation every Pid in every group is monitored by all pg2 gen_servers within the cluster, and it does not really work for 500k processes in 50k groups.

We have a number of solutions to support fanout, mainly to overcome an issue with deeply nested messages being slow to serlialise. We “pre-serialise” those using term_to_binary, and then fan out, avoiding duplicate serialisation cost. Since it’s the same content sent, local message sends are really fast (a couple of binary RefCounter bumps). Remote sends also benefit from not having to serialise the same content multiple times. It pretty much does the trick original poster is after, but does not need complex logic in ERTS.

The only optimisation I can still think of could be batching messages sent between pair of nodes. Which we rarely need, hence I don’t have generic implementation. But it should not be too hard to implement a function that fans out a single message to thousands of processes via a proxy process on the remote node. Something like this (typing in the editor, so may not compile):

large_fanout(Pids, Msg) ->
    SendTo = maps:groups_from_list([{node(P), P} || P <- Pids]),
    BinMsg = term_to_binary(Msg),
    [{proxy, Node} ! {ProxyTo, BinMsg} || {Node, PidsOnNode} <- maps:to_list(Nodes)].

Assuming every remote node runs a gen_server registered under proxy name:

handle_info({LocalPids, BinMsg}, State) ->
    [LocalPid ! BinMsg || LocalPid <- LocalPids],
    {noreply, State}.

Apparently all LocalPids must do binary_to_term upon receiving such a message.

I think this would be performant enough to avoid complex “partitioned worker pool” implementation.

7 Likes

I wasn’t thinking much about local sends, that’s an interesting case indeed. I’m definitely not the expert on OTP internals here but if folks have ideas for how to improve that case, it seems worth including in this effort, since we will have a list of local pids anyway.

We “pre-serialise” those using term_to_binary, and then fan out, avoiding duplicate serialisation cost

I like this idea, definitely adding it to my mental toolbox :smile:

1 Like

You still have the problem of having to de-serialise the serialised binary at each receiving process. And the receiving process has to realise that the binary is not a “normal” binary but something which needs to be de-serialised. This would work if the send and receive were wrapped.

IIRC if you are sending the remote then the serialisation and de-serialisation are a little specialised in that somethings can be handled more efficiently, (atoms?). This is handled in the BEAM, I think. Again if you are using known wrapped send and receive.

3 Likes

That’s done concurrently, therefore can leverage many cores on many nodes. Sending process is usually the bottleneck. In the actual implementation, the message contains more than just a binary - also some routing information, and metadata. Hence sending part looks like {proxy, ReceivingNode} ! {do_something, Binary} where handle_info({do_something, Bin}, State) does binary_to_term.

Actually, that (atom cache for dist connections) reminded me of another important aspect. For binary_to_term, it’s possible to specify safe option, which we recommend over unbounded atom creation (to avoid atom space exhaustion). I don’t exactly remember if it’s possible to enable something like that for distribution. So in a sense using binary-wrapped messages can be safer.

1 Like

About the safe distribution I don’t know. I will have to try and find out.

1 Like

So it seems like people are interested in this I guess… What should I do to drive it forward? I’ve been playing around with the OTP source and implementing this a bit while I’m on holiday. Should I just open a PR against the github repo? Do I need to do something more formal?

1 Like

I once discussed this problem of sending a message to a very large number of processes with Joe Armstrong and he concluded the best approach would be to do it using a broadcast tree. Many, many years later I am about to give that idea a go for distributed PubSub I am implementing in Bondy.

In fact I am thinking of implementing an algorithm similar to Epidemic Broadcast Trees (Plumtree) for a local set of processes.

Has anyone implemented a broadcast tree based solution for this problem?

3 Likes

BTW if I remember correctly the issue we were having is that the BEAM (circa Erlang/OTP 17) would penalise a process that was sending to many messages to too many processes (I am talking several tens of thousands). So a way to overcome the issue was to split the messaging between processes and thus the broadcast tree idea. Not sure if this description is accurate and if it still valid in Erlang/OTP 25).

1 Like

I believe the penalization for sending messages has been removed in relatively recent OTP.

4 Likes

Interesting. Can anyone from OTP Core Team confirm this?
Thanks

1 Like

Yes, senders are no longer penalized. This has been the case for several major releases.

6 Likes

That’s a great way to start a discussion. The important part would be to provide scenarios and benchmarks to measure performance improvements.

3 Likes

I opened a draft PR here: add send/2 override for multiple pids by devsnek · Pull Request #6638 · erlang/otp · GitHub

2 Likes

Many moons ago when I had to implement this use case, I naĂŻvely thought it would be a good idea to mess with the priority whilst performing the send, on the presumption that this would reduce context switching churn (single node):

send_data(Subs, Data) ->
    Msg = {downlink_data, Data},
    OldPri = process_flag(priority, high),
    [Pid ! Msg || Pid <- Subs],
    process_flag(priority, OldPri),
    ok.

This thread jogged my memory and I’d be interested in opinions on the wisdom of this approach, especially WRT recent OTP versions (as I recall, this was originally targeted at OTP 17 as maps had just become a thing).

1 Like

@snek may be of interest to you:

2 Likes