Efficient storage using lists

Hi There,

This question related to my Erlang-Red project but is really its about how to store data efficiently in Erlang - bear with me here.

To demonstrate the question, two executions of the same flow. Top one is Erlang-Red, the bottom one is Node-RED. Both in real time, no speeding or slowing of frame rates and both using exactly the same flow.

Erlang-Red:

erlang-red-storage-2

Node-RED:

node-red-storage-3

What the flow does is generate 100_000 messages and then do various forms of batching and joining. What a join node here does is wait for X number of messages and then send a single batched message containing all those messages - so it’s buffering the messages as they arrive.

The green debug nodes are counting messages as they arrive. So the top join is buffering 2000 messages and then sends out a single message with those 2000 messages attached. The debug counter shows 50 at the end as 100_000 / 2_000 = 50. The join 3000 does the same but with 3000 messages and hence its debug counter shows 33 at the end. And so on.

Now the problem: Node-RED takes about 8 seconds to execute the entire flow. Erlang-Red takes about a minute to do the same. Same data, same flow, same logic, same machine.

Perhaps my implementation is really bad but the bottom flow, which is also handling the 100_000 messages works at a comparable rate - so it can’t be a general implementational issue.

What I found was that if I don’t store the messages in the join node, then its much faster. Of course that defeats the purpose. So my question is: how can I store the data efficiently?

Currently the implementation is using gen_server with a list of messages, i.e. #{ store => [Msg | Store]} becomes the state of the gen_server with each new message. Each join node is a gen_server process and each is receiving the message independently from other join nodes.

I read somewhere that Erlang is using a linked list as storage mechanism for lists. So a I thought I’d use Store ++ [Msg] instead but that made no difference. What made a difference was #{ store => [Msg]}, i.e. no storage - then the execution times were comparable but no storage is no answer!

Any tips greatly appreciated.

1 Like

Erlang Efficiency Guide explicitly warns against this type of operation.
++ requires a full traversal of the Store, so the longer Store gets, the longer it takes.

Do the following instead: the simplest implementation of a buffer is a list where new messages are added to the head of the list:

add(Elem, Buffer) -> [Elem | Buffer].

This operation simply creates a new cons cell; it’s a fast operation.

When it’s time to flush the buffer, the list is reversed once:

flush(Buffer) -> do_something(lists:reverse(Buffer)).

lists:reverse is implemented with a help of a BIF, so it’s fast.

If you need something like a FIFO rather than a simple buffer, consider using queue module from the standard library.

4 Likes

I’ve actually initially did do [Msg | Store] but swapped it out because of the list:reverse in a couple of places. But for my use case, both performed similar i.e., there wasn’t a performance break through by swapping.

I’ll go back to the [Msg | Store] since it is an eleganter solution - personally speaking.

Thanks for the tip, I’ll have a look at queue and perhaps there is something there :+1:

1 Like

@ieQu1 was provide great input, I will just add few additional:

Move the buffer out of the process

ETS per-joiner - ETS keeps data outside the process heap, so your gen_server heap stays tiny (fast GC), and inserts are O(1) amortized:

%% Create one ETS table per joiner
init(Opts) ->
    Tab = ets:new(?MODULE, [ordered_set, private, {write_concurrency, true}]),
    {ok, #{tab => Tab, n => maps:get(n, Opts), cnt => 0, next_idx => 1}}.

handle_cast({msg, Msg}, S = #{tab := T, cnt := C, n := N, next_idx := I}) ->
    %% Insert outside the process heap
    true = ets:insert(T, {I, Msg}),
    C1   = C + 1,
    I1   = I + 1,
    case C1 == N of
        false ->
            {noreply, S#{cnt := C1, next_idx := I1}};
        true ->
            %% Extract in order, build batch, and clear
            Batch = ets:foldl(fun({_, M}, Acc) -> [M|Acc] end, [], T),
            ets:delete_all_objects(T),  %% cheap truncate
            %% Reverse once if you care about original order
            send_downstream(lists:reverse(Batch)),
            {noreply, S#{cnt := 0}}
    end.

Notes:

  • ordered_set + integer index preserves order with foldl as above (you can also select ranges).
  • You still copy terms when inserting and when reading the batch back - but you avoid repeated copying on every state update and keep the process heap small, which is what usually wins.
  • If batches are huge and you just forward them, consider streaming them in chunks instead of materializing a single mega list.

If messages aren’t binaries, make them binaries

The BEAM does zero-copy message passing for large binaries (≥64 bytes). If your “message” is a map/list of small values, you pay a copy on every hop.

Options:

  • Encode once at the edge (producer) to a binary (e.g., term_to_binary/1 or your own compact format), ship binaries through the graph, and only decode where needed.
    • Pro: off-heap, ref-counted, cheap to pass and to keep in ETS or lists.
    • Con: encode/decode cost, but often still a net win at 100k messages.
  • If you control payloads, flatten to iolists of binaries (no nested maps/lists of small ints).

Tune the joiner process for less GC pain

Spawn the joiner with a bigger initial heap and fewer fullsweeps(erlang — erts v16.0.2) e.g.:

Pid = proc_lib:spawn_opt(?MODULE, init, [Opts],
                         [{min_heap_size, 32768}, {fullsweep_after, 20}]).

After sending a large batch, force a GC(erlang — erts v16.0.2):

erlang:garbage_collect().

Consider hibernate only if the joiner stays idle long; not useful under sustained load.

Don’t batch what you don’t need to batch

If multiple joiners (2k, 3k, 5k, …) are fed the same 100k stream, you are multiplying memory pressure and copying. You can:

  • Fan-out with binaries only (see If messages aren’t binaries, make them binaries point), or
  • Insert once into a shared ETS table and let each joiner keep only indices; at flush time each joiner selects its window without duplicating storage.

Also, some useful resources

1 Like

Just implemented that and it made a big difference. I was using small hashes as Msg objects and converting them to binaries made the implementation quicker but still nowhere near the NodeJS implementation.

I’ll now redo it to use an ETS table and see how that goes.

I was watching the memory usage and there is definitely something not being cleaned up -memory usage just explodes and doesn’t get reclaimed. It does look like the garbage collector is collecting for some reason.

Thanks for the tips :+1:

1 Like

I can’t do that since each joiner knows nothing of any other joiners. So the independence is vital but this could be simplified by using graph magic (not implemented) so that when the flow is deployed to the server, something in the server does an analyse and creates a shared ETS table for the joins but not because the user identified them as such but because the server did an graph analysis. I.e. recognising the common connection to the generator node and then realising that the data will be the same etc.

Doing an analysis of the flows before the processes are initiated is definitely worth doing especially as something like this:

at the moment, Erlang-Red creates separate Erlang processes for each junction node - however these are purely decorative. These processes could be optimised away - on the server side - by connecting the respective nodes directly.

Which in turn would have to be done before creating shared ETS tables for joiners :wink:

Superb! I just put together a reference implementation such that each joiner gets its own ETS table (based on process id not module name) and the memory usage is down to basically zero and the speed is near to NodeJS. Brilliant!

Thanks for the code snippet, I took it one-to-one :+1:

That’s the final step once I’ve completed the conversion to ETS tables - will need to fix several nodes that do message buffering/storage.

1 Like

Amazing results! Always glad to help with cool projects! :sign_of_the_horns:

1 Like

The final commit shows the changes - in the end, it wasn’t that much and now the performance is better than Node-RED - so it was a good day!

Thanks to all the advice and gladly more based on the new implementation :wink:

1 Like

Looks great! But… why you try convert processes to atoms before store them? I suppose you can store them as is without converting.

Do you mean this line:

    Tab = ets:new(
        list_to_atom(pid_to_list(MyPid)),
        [ordered_set, private, {write_concurrency, true}]
    ),

Or something else? That line is because new/2 only takes an atom, strangely not a Pid.

There might be other places where I convert processes, e.g. when sending json, but none that I recall now…

1 Like

You can use the same atom since these are private (and not named) tables. If they were public named tables the name would need to be unique.

What you need to store in the process is the reference ets:new returns.

Simple example, 2 processes using private table named test

24> Pid1 = spawn(fun() -> T = ets:new(test, [private,set]), ets:insert(T, {a, 1}), timer:sleep(10_000), io:format("~p~n", [{self(), T, ets:tab2list(T)}]) end).
<0.123.0>
25> Pid2 = spawn(fun() -> T = ets:new(test, [private,set]), ets:insert(T, {b, 1}), timer:sleep(10_000), io:format("~p~n", [{self(), T, ets:tab2list(T)}]) end).
<0.125.0>
{<0.123.0>,#Ref<0.839174049.1753350153.75415>,[{a,1}]}
{<0.125.0>,#Ref<0.839174049.1753350153.75430>,[{b,1}]}

Edited to specify that this is really about named_table. When table is not named, you can re-use the same atom in table creation. Converting pids to atoms is a recipe for disaster potentially leading to atom exhaustion

3 Likes

Yes, I about this one - it add load to system if you doing this without any reason. Better to store them as is. It should add more performance. But for JSON - yes - you need converting as well. But in mentioned part - you do not need it I think.

Totally! Or just use ?MODULE instead of process.

Of course, I forgot about that - oh, dear :face_with_peeking_eye:

Thanks to both of you for picking this up!

EDIT: fixed with commit.

1 Like