A process for synchronizing sends to a TCP socket — is it a good idea?

Suppose two processes A and B may send data to the same TCP socket. Is it a good idea to have a process C dedicated to synchronizing sends to the socket?

Erlang code for process C:

-module('socket-sender').

-export([start_sender/1, sender/1]).

start_sender(Socket) ->
  spawn('socket-sender', sender, [Socket]).

sender(Socket) ->
  receive
    {From, Data} ->
      Result = case gen_tcp:send(Socket, Data) of
        ok -> ok;
        {error, Reason} -> disconnected
      end,
      From ! Result,
      sender(Socket)
  end.

When process A or B wants to send data to the TCP socket, it will send {self(), Data} to the sender process C, and wait for response. If C responds with ok, then things are good; if C responds with disconnected, process A or B will terminate itself.


But there is one issue with the above sender code: how would the sender process terminate?

Hi and welcome!

In my current project I have a similar approach, namely a dedicated process, which manages a socket.

A big difference: for anything bigger than a “baremetal worker process” I am using OTP, in this case my socket_srv is a gen_server.
This takes much of the boiler plate code into the background, and I can concentrate on the functionality.

Terminating the server: I see two ways: either define a “quit” message, which makes the server terminate, or (less polite) call exit/2 from another process. That depends entirely on your use case.

PS: I forgot your first question, whether it is a good idea:
Generally yes, IMHO. If you have any constraints for the protocol over the socket, a socket server would be a good place to decide.
Also, if you do logging, there is only one place where you need to log events.

1 Like

Suppose process A and B are intended to handle one tcp connection, and should terminate when the connection breaks. Both A and B use process C for sending to the tcp socket to avoid race conditions.

When C receives a “request” for sending data (i.e. {From, Data}) and calls gen_tcp:send, it may get {error, Reason} and find that the connection is broken. How should processes A, B, and C communicate the message so that all of them terminate?

In OTP, the most simple setup would be to put all three processes under a supervisor with the one_for_all strategy and in C, do

ok = gen_tcp:send(Socket, Data),

If an error occurs, C will crash (because the match fails), and the supervisor will kill A and B as well and restart all of them.

If you stick with plain processes, you will have to add more messages between A, B and C.
Or you could use erlang:monitor or erlang:link to notify about errors. A link takes a process down when the linked process terminates, a monitor merely sends a message to the monitoring process, that the monitored process is down.

1 Like

Thank you!

More context on my question: this is actually a scenario I met in a Golang project. In Golang, I managed to do it using mutexes. For each TCP socket, I have a mutex to synchronize sends, and another mutex to synchronize receives. When a connection breaks, process A and B will know that on the next send or receive, and do whatever they need to (including but not limited to termination). Since process C is replaced by a mutex here, I don’t need to worry about terminating C.

Then I thought: mutexes don’t fit into a message-passing paradigm; can I achieve the same thing only by message passing, without mutexes?

I tried to do so in Golang, but couldn’t come up with a scheme where A, B, and C terminate properly. Since Erlang doesn’t have mutexes, I suppose Erlang people should know how to do this. So I posted my question in this forum.


With that said, here’s another requirement: while process A should terminate once the TCP connection breaks, process B should do something (say, removing the broken connection from its list of connections) and then continue running. I think one_for_all is no longer fit. What would you do in this circumstance?

B is more equal than A:
In this case we remove B from the supervisor (maybe create a dedicated supervisor for B) and have it monitor C.

The monitoring mechanism sends a message to B when C terminates, and B does its cleanup job.

What is missing here is how the processes know each other.
A and B must know C. In a small scenario, you can register C with a name, which is known to A and B.

After a restart of C, B has to renew its monitor.

Another thing which I have not yet mentioned is that the message sending is absolutely one-way-ish. There is no error return if the receiving process has died shortly before the message is sent.
You can handle this with timeouts, either use the timeout of gen_server:call, or use and after in a receive.
(Using is_process_alive() for this is not advisable, it cannot be done atomically with the send and would open a race condition).

A bit out of scope of your requirement: if it does not absolutely have to be B, who does the cleaning, I would introduce another process for that. processes are really cheap.
Cleanup is none of B’s regular tasks, so adding this responsibility to B is the beginning of making it a moloch.
D, the cleaner, would just sit and wait for Cs death (with a monitor), and A,B and C could still be under the same supervisor.

That is to say: I am by far not among the most experienced around here, so there might be more elegant ways…

1 Like

I would use a supervisor tree where the processes handling the connection are located under the same supervisor and the process that owns the socket should be a significant child
so that the supervisor and all its children are terminated when the significate child terminates, which it should do when it gets the message that the socket connection has been closed by the peer.

3 Likes

A third way, let another process tell the supervisor to terminate the child in question.

Oh, I forgot about those :sweat_smile: Yeah, that would also be an elegant way to do it.

that’s a nice one, hire a hitman, so to say. :wink:
(also reminds me of “50 ways to leave your lover”)

But supervisor:terminate_child() only terminates, you would also have to tell the supervisor to restart_child.

Typically you do not restart processes that handle TCP connections as the connection is lost. You spawn new children that will handle the new connections.

1 Like

I did not yet know this pattern.
You mean that the supervisor’s child would be a manager for children with sockets?

Technically, a restarted supervisor child and a spawned process are quite equal, I think.

Restarted children and new children are not really equal, because you can not automatically restart the socket process with the same socket as argument it has to have a new socket as argument.

I mean that you would have a dynamic subtree with a supervisor that has the socket handling processes as children, and the child that owns the socket will be a significant child.

3 Likes

With this requirement, how would process B be prompted to do something (say, removing the broken connection from its list of connections) after C (the socket owner) terminates?

This task could be solved in multiple ways, it would depend on the protocol and more specific “business requirements”.
For example, is it possible that the 2nd party (where we are sending TCP data) may send something back? To what process should we forward it? A or B?
Are processes A and B “fixed” or there could be more of them added and removed?

Maybe it would make sense to have the networking protocol implementation code inside the process C and let A and B send not the raw bytes to it but more high-level messages which will be encoded by C according o the protocol.
Are we sure we want to pass already open Socket to C? Maybe it’s better to let C open and manage the socket? That’s how most of database clients work for example GitHub - epgsql/epgsql: Erlang PostgreSQL client library..

Maybe we even don’t need to crash C if socket disconnects and just re-open the connection (see for example how GUN http client works (it provides permanent socket connection and lets client send HTTP requests over it and it re-establishes the connection under the hood if it is closed) GitHub - ninenines/gun: HTTP/1.1, HTTP/2, Websocket client (and more) for Erlang/OTP.

Maybe it would fit better to not rely entirely on supervision tree here but just make A and B do erlang:monitor/2 the C process and act somehow when it crashes.

If you describe your applications requirements in more details then it would be easier to suggest the best-fit solution. While it is the norm to “wrap” a socket in Erlang process when it needs concurrent access, there are many ways it can be done.

1 Like

Thank you!

This is actually a course project in Golang. I was able to solve the problem with mutexes, but I’m curious if I could do so without mutexes (i.e., by pure message-passing).

Course project handout: Project 1: Snowcast (F23) - HackMD

A brief description:

build a simplified version of an Internet radio station that includes a server and client. The server is responsible for keeping track of subscribed clients and sending them streaming data at a precise rate (16KiB/s). Clients provide the user’s interface and are responsible for joining/switching/leaving stations, and “playing” the music.

The ‘server’ streams radio out to a multicast address over UDP on the loopback (lo) interface.

For each connecting client subscribe to the multicast group on the loopback interface and just read the UDP datagrams and copy them blindly to the client’s TCP socket.

The problem then only becomes a framing issue as you are not going to have to care about packet loss on the loopback interface; well if you do you have other problems which code is not going to save you from.

To make sure you do not exceed 16KiB/s, setsockopt(SO_SNDBUF) on the TCP socket to a low value.

Why write code when you have a kernel willing to do the work for you :slight_smile:

Enjoy

1 Like

I read through the whole specification and I don’t really get why do you need two processes to send data to the same socket there.

Are you trying to implement the “server” from this assignment? What roles are the processes A, B and C then?

The way I would probably design this system is:

  • one “streamer” process for each “station” that maintains a list of “listeners” in its state; it does not do any network operations directly, but just reads the media file at a constant rate and “broadcasts” it by sending erlang message to each “listener” erlang process
  • if we don’t want to rely on acceptor pool like “ranch”, then I’d also have one “acceptor” process that holds the listening TCP socket and spawns a new worker for each new connected client
  • and one Erlang process for each “subscriber”. This subscriber is spawned by socket listener described above, it keeps the control plane tcp socket in its state, it receives a control commands and if it receives a “please start streaming this station” then it creates an UDP socket, stores it in its state and then sends a “subscribe” Erlang message to the station process. So this process is included in the list of processes the “station” process should broadcast the radio media data. And when this “client” process receives the broadcast from “station” process, it writes it to the UDP socket. If client process receives “unsubscribe/change station” from control socket, it sends “unsubscribe” Erlang message to the radio station process and sends “subscribe” to another station process.
top_supervisor
--station_supervisor
---- station1
---- station2
---- station3
---- stationN
-- socket_acceptor
-- client_supervisor
---- client1
---- client2
---- clientN

1 Like

Re-reading the spec:

the server MUST send song data to the the UDP port specified in the client’s Hello message

maybe the “stationN” process doesn’teven have to broadcast the media data to “clientN” subscribers in Erlang. Maybe “clientN” should tell “stationN” to what IP:PORT the station should send data for this client and “stationN” would open UDP socket and do the network broadcast itself. It should be ok for UDP (but for TCP can be a problem due to backpressure, but that’s another story).

However the solution when each “clientN” process holds both TCP control socket and UDP media socket sounds a bit cleaner code, because then we can re-use the UDP socket when switching from “stationX” to “stationY”.

1 Like

Thank you for your time and effort! I really appreciate it!

Yes, this post is about the “server”, since both the user side is trivial. Process A is the worker spawned by the “acceptor” process to handle control commands from the client and to send streaming data to the UDP port (one for each new connection). Process B is the “streamer” (one for each station). Process C, if needed, manages sends to the tcp socket (one for each connection). Other processes should send a request process C to send data. Similarly, process D, if needed, manages recvs from the tcp socket (one for each connection). Other processes should send a request process D to receive data. One purpose that process C and D serve, is to avoid blocking due to gen_tcp:send/2 and gen_tcp:recv/2.

I agree. In this design, we need process C and D to avoid blocking send and recv, is that right?