you can copy contiguously to memory the first time and the next 2 times is a memcpy operation
For this to work within the node, all receiving processes must use off-heap message queue. Otherwise processes have different heap layouts, and deep copy is necessary to created boxed terms, bump reference counters etc… But using off-heap message queue defeats the purpose.
It can probably work over distribution (because message is serialised to a binary buffer), but implementation isn’t trivial at all, considering that nodes in the cluster may run different ERTS versions.
You’re right that I bumped into this problem implementing pg
. In fact it helped me to come up with much cleaner design, so I’m grateful this problem exists WhatsApp has been battling pg2
for quite some time, to a degree that we had over 20 patches at some point. I’ve got a lot of internal war stories about it. I should probably publish someday, when I’m less lazy to deal with comms. In original pg2
implementation every Pid in every group is monitored by all pg2 gen_servers within the cluster, and it does not really work for 500k processes in 50k groups.
We have a number of solutions to support fanout, mainly to overcome an issue with deeply nested messages being slow to serlialise. We “pre-serialise” those using term_to_binary
, and then fan out, avoiding duplicate serialisation cost. Since it’s the same content sent, local message sends are really fast (a couple of binary RefCounter bumps). Remote sends also benefit from not having to serialise the same content multiple times. It pretty much does the trick original poster is after, but does not need complex logic in ERTS.
The only optimisation I can still think of could be batching messages sent between pair of nodes. Which we rarely need, hence I don’t have generic implementation. But it should not be too hard to implement a function that fans out a single message to thousands of processes via a proxy process on the remote node. Something like this (typing in the editor, so may not compile):
large_fanout(Pids, Msg) ->
SendTo = maps:groups_from_list([{node(P), P} || P <- Pids]),
BinMsg = term_to_binary(Msg),
[{proxy, Node} ! {ProxyTo, BinMsg} || {Node, PidsOnNode} <- maps:to_list(Nodes)].
Assuming every remote node runs a gen_server registered under proxy
name:
handle_info({LocalPids, BinMsg}, State) ->
[LocalPid ! BinMsg || LocalPid <- LocalPids],
{noreply, State}.
Apparently all LocalPids must do binary_to_term upon receiving such a message.
I think this would be performant enough to avoid complex “partitioned worker pool” implementation.