Efficient storage using lists

@ieQu1 was provide great input, I will just add few additional:

Move the buffer out of the process

ETS per-joiner - ETS keeps data outside the process heap, so your gen_server heap stays tiny (fast GC), and inserts are O(1) amortized:

%% Create one ETS table per joiner
init(Opts) ->
    Tab = ets:new(?MODULE, [ordered_set, private, {write_concurrency, true}]),
    {ok, #{tab => Tab, n => maps:get(n, Opts), cnt => 0, next_idx => 1}}.

handle_cast({msg, Msg}, S = #{tab := T, cnt := C, n := N, next_idx := I}) ->
    %% Insert outside the process heap
    true = ets:insert(T, {I, Msg}),
    C1   = C + 1,
    I1   = I + 1,
    case C1 == N of
        false ->
            {noreply, S#{cnt := C1, next_idx := I1}};
        true ->
            %% Extract in order, build batch, and clear
            Batch = ets:foldl(fun({_, M}, Acc) -> [M|Acc] end, [], T),
            ets:delete_all_objects(T),  %% cheap truncate
            %% Reverse once if you care about original order
            send_downstream(lists:reverse(Batch)),
            {noreply, S#{cnt := 0}}
    end.

Notes:

  • ordered_set + integer index preserves order with foldl as above (you can also select ranges).
  • You still copy terms when inserting and when reading the batch back - but you avoid repeated copying on every state update and keep the process heap small, which is what usually wins.
  • If batches are huge and you just forward them, consider streaming them in chunks instead of materializing a single mega list.

If messages aren’t binaries, make them binaries

The BEAM does zero-copy message passing for large binaries (≥64 bytes). If your “message” is a map/list of small values, you pay a copy on every hop.

Options:

  • Encode once at the edge (producer) to a binary (e.g., term_to_binary/1 or your own compact format), ship binaries through the graph, and only decode where needed.
    • Pro: off-heap, ref-counted, cheap to pass and to keep in ETS or lists.
    • Con: encode/decode cost, but often still a net win at 100k messages.
  • If you control payloads, flatten to iolists of binaries (no nested maps/lists of small ints).

Tune the joiner process for less GC pain

Spawn the joiner with a bigger initial heap and fewer fullsweeps(erlang — erts v16.0.2) e.g.:

Pid = proc_lib:spawn_opt(?MODULE, init, [Opts],
                         [{min_heap_size, 32768}, {fullsweep_after, 20}]).

After sending a large batch, force a GC(erlang — erts v16.0.2):

erlang:garbage_collect().

Consider hibernate only if the joiner stays idle long; not useful under sustained load.

Don’t batch what you don’t need to batch

If multiple joiners (2k, 3k, 5k, …) are fed the same 100k stream, you are multiplying memory pressure and copying. You can:

  • Fan-out with binaries only (see If messages aren’t binaries, make them binaries point), or
  • Insert once into a shared ETS table and let each joiner keep only indices; at flush time each joiner selects its window without duplicating storage.

Also, some useful resources

1 Like