A process for synchronizing sends to a TCP socket — is it a good idea?

Ah… TLDR: I think you don’t need C and D processes. Process A can do all their tasks.

You’d rarely see gen_tcp:recv being used in real-world Erlang projects. Most of the time you set {active, true} (risky) or {active, once} (conservative) or {active, N} (best performance, but bit more complex) on socket so the process that owns the socket receives TCP data asynchronously from the socket in its mailbox, see Erlang -- gen_tcp.

Blocking send - I also rarely see people being too worried about it. For UDP I think this is not a problem (worst case packet will be dropped) in general, and in gen_tcp sends are buffered, see Erlang -- gen_tcp, but if buffer gets full it could block and then it might get tricky yep… There is a send_timeout option.

And I’d suggest that “radio station streamer” process should not do any networking operation. The cleaner approach is for “radio station” process to kkep the list of “client” process in it’s state and for it to broadcast lists:foreach(fun(Pid) -> Pid ! {data, MediaDataChunk} end, State#state.subscribers) media data to subscribers (eg, start a timer and broadcast 128kb chunk each second) and let “client” processes write it to their UDP sockets.
You may also make “client” process send “ack” mesages back to radiostation so “station” stops broadcasting to a specific client if there are too much non-acked messages so you don’t run out-of-memory if one of the clients stuck on some blocking operation.

This project, for example, implements somewhat similar structure: https://github.com/seriyps/mtproto_proxy/blob/master/src/mtp_down_conn.erl#L58
There we have small number of mtp_down_conn processes (representing multiplexed connection to “media source”) which all have large number of mtp_handler processes (representing individual client connections) connected to them. And it has quite advanced “ack” policy implemented there. Was able to serve few Gb/s with it. It’s not exactly the same, because mtp_down_conn doesn’t do broadcasting, but it does multiplexing and “routing” (it reads messages from “upstream” socket, figures out to which one of the subscribers it is addressed and sends the media to this particular subscriber and other way around).

1 Like

In active mode, is there a way to specify the number of bytes to get each time (like gen_tcp:recv(Socket, Length)? If not, we need process D.

Well, in some way you can limit the upper bound by setting {buffer, N} option. But most of the apps don’t really care about that, but they have an internal “buffer” field in their state. When they receive new TCP message, they append it to the end of this binary .buffer and then try to parse it as deep as possible, potentially getting and processing multiople commands, leaving the unparseable “tail” to wait for the next TCP chunk like, eg

handle_info({tcp, Sock, Data}, #state{buf = Buf} = State) ->
  NewBuf = <<Buf/binary, Data/binary>>,
  BufTail = parse_and_process_buffer(NewBuf),  % there might be multiple commands in `NewBuf`
  {noreply, State#state{buf = BufTail}}.

Real-life example: PostgreSQL connector https://github.com/epgsql/epgsql/blob/7ba52768cf0ea7d084df24d4275a88eef4db13c2/src/epgsql_sock.erl#L289-L291

Yes, it’s not the same patterns as one used to see in C/Python/Go languages. But I’d say it’s the standard in Erlang/Elixir and it works very well after you get used to them.

1 Like

Ah, this makes sense. Thank you for your answers!