Connection closed on gen_tcp but not socket when using tcpkali

I’m on a quest to implement a fast acceptor pool in Elixir so I’ve been benchmarking different ones and I noticed that gen_tcp-based ones don’t seem to work well with GitHub - satori-com/tcpkali: Fast multi-core TCP and WebSockets load generator., with tcpkali sometimes not being able to collect enough connections. socket-based ones do seem to work though, so I wonder what is the difference.

Here's the gen_tcp example
defmodule Pool do
  use GenServer
  require Logger

  def start_link(opts) do
    {genserver_opts, opts} = Keyword.split(opts, [:name, :debug])
    GenServer.start_link(__MODULE__, opts, genserver_opts)
  end

  @impl true
  def init(opts) do
    Process.flag(:trap_exit, true)

    handler = Keyword.fetch!(opts, :handler)
    ip = Keyword.get(opts, :ip, {127, 0, 0, 1})
    port = Keyword.get(opts, :port, 0)
    min_acceptors = opts[:min_acceptors] || 100

    {:ok, socket} = :gen_tcp.listen(port, mode: :binary, ip: ip, active: false, backlog: 1024)

    state = %{socket: socket, handler: handler}
    for _ <- 1..min_acceptors, do: start_acceptor(state)

    {:ok, state}
  end

  @impl true
  def handle_cast(:accepted, state) do
    start_acceptor(state)
    {:noreply, state}
  end

  @impl true
  def handle_info({:EXIT, _pid, {:error, :emfile = reason}}, state) do
    Logger.error("No more file descriptors, shutting down")
    {:stop, reason, state}
  end

  def handle_info({:EXIT, _pid, :normal}, state) do
    {:noreply, state}
  end

  def handle_info({:EXIT, _pid, reason}, state) do
    Logger.error(Exception.format_exit(reason))
    {:noreply, state}
  end

  defp start_acceptor(state) do
    %{socket: socket, handler: handler} = state
    start_acceptor(self(), socket, handler)
  end

  defp start_acceptor(parent, listen_socket, handler) do
    :proc_lib.spawn_link(__MODULE__, :__accept__, [parent, listen_socket, handler])
  end

  @doc false
  def __accept__(parent, listen_socket, handler) do
    case :gen_tcp.accept(listen_socket, :timer.seconds(10)) do
      {:ok, socket} ->
        GenServer.cast(parent, :accepted)
        continue_loop(socket, handler, _state = [])

      {:error, reason} when reason in [:timeout, :econnaborted] ->
        __accept__(parent, listen_socket, handler)

      {:error, :closed} ->
        :ok

      {:error, _reason} = error ->
        exit(error)
    end
  end

  defp continue_loop(socket, handler, state) do
    case handle_data(socket, handler, state) do
      {:continue, state} -> continue_loop(socket, handler, state)
      {:close, _state} -> :ok
      {:error, :closed} -> :ok
      {:error, _reason} = error -> exit(error)
    end
  end

  defp handle_data(socket, handler, state) do
    with {:ok, data} <- :gen_tcp.recv(socket, 0, :timer.seconds(30)) do
      handler.handle_data(data, socket, state)
    end
  end
end

defmodule Echo do
  def handle_data(data, socket, state) do
    :gen_tcp.send(socket, data)
    {:continue, state}
  end
end

Pool.start_link(handler: Echo, port: 8000)

And here’s the ongoing tcpkali output:

$ tcpkali --message '$' --connections=100 --duration=15s 127.0.0.1:8000
Destination: [127.0.0.1]:8000
Interface lo0 address [127.0.0.1]:0
Using interface lo0 to connect to [127.0.0.1]:8000
Ramped up to 100 connections.
Traffic 4015.196↓, 4495.600↑ Mbps (conns 0↓ 100↑ 0⇡; seen 541)

Here seen 541 means that 541 connections have been opened even though the target is 100. With gen_tcp connections get seemingly closed prematurely in this benchmark and tcpcali has to create new ones. This is not the case with socket, please see below.

And here's the socket example
defmodule Pool do
  use GenServer
  require Logger

  def start_link(opts) do
    {genserver_opts, opts} = Keyword.split(opts, [:name, :debug])
    GenServer.start_link(__MODULE__, opts, genserver_opts)
  end

  @impl true
  def init(opts) do
    Process.flag(:trap_exit, true)

    handler = Keyword.fetch!(opts, :handler)
    ip = Keyword.get(opts, :ip, {127, 0, 0, 1})
    port = Keyword.get(opts, :port, 0)
    min_acceptors = opts[:min_acceptors] || 100

    {:ok, socket} = :socket.open(:inet, :stream, :tcp)
    :ok = :socket.bind(socket, %{family: :inet, port: port, addr: ip})
    :ok = :socket.listen(socket, _backlog = 1024)

    state = %{socket: socket, handler: handler}
    for _ <- 1..min_acceptors, do: start_acceptor(state)

    {:ok, state}
  end

  @impl true
  def handle_cast(:accepted, state) do
    start_acceptor(state)
    {:noreply, state}
  end

  @impl true
  def handle_info({:EXIT, _pid, {:error, :emfile = reason}}, state) do
    Logger.error("No more file descriptors, shutting down")
    {:stop, reason, state}
  end

  def handle_info({:EXIT, _pid, :normal}, state) do
    {:noreply, state}
  end

  def handle_info({:EXIT, _pid, reason}, state) do
    Logger.error(Exception.format_exit(reason))
    {:noreply, state}
  end

  defp start_acceptor(state) do
    %{socket: socket, handler: handler} = state
    start_acceptor(self(), socket, handler)
  end

  defp start_acceptor(parent, listen_socket, handler) do
    :proc_lib.spawn_link(__MODULE__, :__accept__, [parent, listen_socket, handler])
  end

  @doc false
  def __accept__(parent, listen_socket, handler) do
    case :socket.accept(listen_socket, :timer.seconds(10)) do
      {:ok, socket} ->
        GenServer.cast(parent, :accepted)
        continue_loop(socket, handler, _state = [])

      {:error, reason} when reason in [:timeout, :econnaborted] ->
        __accept__(parent, listen_socket, handler)

      {:error, :closed} ->
        :ok

      {:error, _reason} = error ->
        exit(error)
    end
  end

  defp continue_loop(socket, handler, state) do
    case handle_data(socket, handler, state) do
      {:continue, state} -> continue_loop(socket, handler, state)
      {:close, _state} -> :ok
      {:error, :closed} -> :ok
      {:error, _reason} = error -> exit(error)
    end
  end

  defp handle_data(socket, handler, state) do
    with {:ok, data} <- :socket.recv(socket, 0, :timer.seconds(30)) do
      handler.handle_data(data, socket, state)
    end
  end
end

defmodule Echo do
  def handle_data(data, socket, state) do
    :socket.send(socket, data)
    {:continue, state}
  end
end

Pool.start_link(handler: Echo, port: 8000)

And the ongoing tcpkali output

$ tcpkali --message '$' --connections=100 --duration=15s 127.0.0.1:8000
Destination: [127.0.0.1]:8000
Interface lo0 address [127.0.0.1]:0
Using interface lo0 to connect to [127.0.0.1]:8000
Ramped up to 100 connections.
Traffic 4085.053↓, 4201.748↑ Mbps (conns 0↓ 100↑ 0⇡; seen 100)

I’d be happy to learn how to find out what’s different between the two implementations.

1 Like

I was interested in this and while I do not know what OS you’re testing on I was able to replicate the issue on mac os (which I don’t perform tests like this on normally). Tuning the send and receive buffers makes the problem go away. You can do this with gen_tcp itself (either tune the user level buffer or snd and rec bufs) or tune at the os level outside of erlang.

As to the differences, it may simply be that socket is currently a wee bit slower (guess!) as I can’t imagine how they would end up with wildly different buffer sizes, but I don’t know the innards of this code either. I’d love to hear from @raimo though :slight_smile:

Edit:

I forgot to note, the problem was obvious after looking at packet dumps (i.e., tcp zero windows we’re being slung around).

1 Like

Thank you for looking into this! I’m on a mac too. I’ll try reproducing this on a linux later today.

it may simply be that socket is currently a wee bit slower

Just a data point, but on a mac the socket version actually shows slightly higher throughput and packet rate:

# gen_tcp
$ tcpkali --message '$' --connections=3 --duration=15s 127.0.0.1:8000
Destination: [127.0.0.1]:8000
Interface lo0 address [127.0.0.1]:0
Using interface lo0 to connect to [127.0.0.1]:8000
Ramped up to 3 connections.
Total data sent:     11214.1 MiB (11758841980 bytes)
Total data received: 11213.0 MiB (11757700876 bytes)
Bandwidth per channel: 4180.483⇅ Mbps (522560.4 kBps)
Aggregate bandwidth: 6270.421↓, 6271.029↑ Mbps
Packet rate estimate: 542421.5↓, 572179.8↑ (6↓, 10↑ TCP MSS/op)
Test duration: 15.0008 s.

# socket
$ tcpkali --message '$' --connections=3 --duration=15s 127.0.0.1:8000
Destination: [127.0.0.1]:8000
Interface lo0 address [127.0.0.1]:0
Using interface lo0 to connect to [127.0.0.1]:8000
Ramped up to 3 connections.
Total data sent:     14996.7 MiB (15725185361 bytes)
Total data received: 14987.9 MiB (15715985733 bytes)
Bandwidth per channel: 5586.146⇅ Mbps (698268.3 kBps)
Aggregate bandwidth: 8376.768↓, 8381.671↑ Mbps
Packet rate estimate: 741095.5↓, 735514.8↑ (11↓, 21↑ TCP MSS/op)
Test duration: 15.0091 s.
1 Like

You are welcome :slight_smile:

Try putting in sndbuf: 5000, recbuf: 4000 in your :gen_tcp.listen/2 call. You should find that you get a higher rate than socket. This setting should also allow you to test with original parameters with little to no error rate (I got zero). I’d be quite interested to know if it’s tcpkali that can’t keep up. One side is definitely going faster than the other, but I only glanced at a packet capture vs performing any real analysis.

Of course, I’d be remiss not state putting in buffer sizes like this is generally only tenable in testing (but it depends), :slight_smile:

You are encouraged to use getopts/2 to retrieve the size set by your operating system.

1 Like

I’d be quite interested to know if it’s tcpkali that can’t keep up.

A bit of apples to oranges, but tcpkali against a tcpkali sink (not echo) shows quite a bit higher “throughput”.

Listener tcpkali --listen-port 8000 --duration 60s
Client tcpkali --message '$' --connections 3 --duration 15s 127.0.0.1:8000

Result

Destination: [127.0.0.1]:8000
Interface lo0 address [127.0.0.1]:0
Using interface lo0 to connect to [127.0.0.1]:8000
Ramped up to 3 connections.
Total data sent:     80129.2 MiB (84021518026 bytes)
Total data received: 0 bytes (0 bytes)
Bandwidth per channel: 14928.876⇅ Mbps (1866109.5 kBps)
Aggregate bandwidth: 0.000↓, 44786.629↑ Mbps
Packet rate estimate: 0.0↓, 3875338.0↑ (0↓, 36↑ TCP MSS/op)
Test duration: 15.0083 s.

and here’s the same client against a gen_tcp sink, with sndbuf: 5000, recbuf: 4000 in options:

Destination: [127.0.0.1]:8000
Interface lo0 address [127.0.0.1]:0
Using interface lo0 to connect to [127.0.0.1]:8000
Ramped up to 3 connections.
Total data sent:     51931.5 MiB (54454156991 bytes)
Total data received: 0 bytes (0 bytes)
Bandwidth per channel: 9679.152⇅ Mbps (1209894.0 kBps)
Aggregate bandwidth: 0.000↓, 29037.455↑ Mbps
Packet rate estimate: 0.0↓, 2553170.8↑ (0↓, 37↑ TCP MSS/op)
Test duration: 15.0025 s.

Default values for the gen_tcp buffers were [recbuf: 131072, sndbuf: 131072], and for socket it was

iex> :socket.getopt(socket, {:otp, :rcvbuf})       
{:ok, 8192}
iex> :socket.getopt(socket, {:socket, :sndbuf})    
{:ok, 131072}
iex> :socket.getopt(socket, {:socket, :rcvbuf})
{:ok, 131072}

Thank you for the pointers. I’ll continue exploring!

I can give you a little bit of info about the different buffer sizes:

There are (at least) 3 sizes both in inet_drv and in socket.
In inet_drv we have buffer, recbuf and sndbuf.
The corresponding in socket are: {otp,rcvbuf}, {socket,rcvbuf} and {socket,sndbuf}.

recbuf and {socket,rcvbuf} are the OS socket option SO_RCVBUF.
sndbuf and {socket,sndbuf} are the OS socket option SO_SNDBUF.
Note that the naming is more consistent in socket
The OS does as it pleases with these values.

buffer and {otp,rcvbuf} both are the default size to allocate when receiving without specifying a size, that is receive with size 0, which I think is what you do in your tests here.
I had a look at a VM on the ‘master’ branch and the default for buffer is 1460 and for {otp,rcvbuf} it is 8192, so it is ridiculously (traditionally) small for inet_drv.

In inet_drv there is a coupling between these options, that I do not remember the details of, but when you set some option, others are set to the same value if they are known to be lower.

Edit: I checked. If you set recbuf then buffer is set to the same size, if it is smaller and has not been explicitly set.

In addition to this, inet_drv has got an I/O queue because it needs to maintain the notion that output does not block. You can bang to a port (Erlang port()) and a bang does not block. So all output, unless it can be sent immediately through the OS socket send() call, is buffered and gen_tcp:send/* returns immediately.

socket:send/*, being more low level, on the other hand blocks if the OS socket send() call blocks, so when socket:send/* returns all data has been handed over to the OS TCP stack.

This difference might give better throughput in simple throughput benchmarks since inet_drv returns to receive more data sooner so receive and send can overlap. To get something similar with socket you can buffer yourself and make bigger writes. Receiving and sending in different processes (banging binaries between them) might allow the operations to overlap.

Unfortunately I have not had time to look at the implementations (I am no good at Elixir) to figure out why the number of acceptors gets above your target number, but I guess debugging that so you can trust the code to do what you expect would be a good start…

3 Likes

This is it more or less accept I stuffed it all into a single module :

-module(kali_test).

-behaviour(gen_server).

-export([
    code_change/3,
    handle_call/3,
    handle_cast/2,
    handle_info/2,
    init/1,
    start_link/0,
    start_link/1,
    terminate/2
]).

-export([accept/3, continue_loop/3, handle_data/2]).

-define(ACCEPT_TIMEOUT, 10000).
-define(RECV_TIMEOUT, 30000).

start_link() -> start_link([]).

start_link(Opts) ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, Opts, []).

accept(Parent, ListenSocket, Handler) ->
    case gen_tcp:accept(ListenSocket, ?ACCEPT_TIMEOUT) of
        {ok, Socket} ->
            continue_loop(Socket, Handler, _State = []);
        {error, Reason} when
            Reason =:= timeout orelse
                Reason =:= econnaborted
        ->
            accept(Parent, ListenSocket, Handler);
        {error, closed} ->
            ok;
        {error, _Reason} = Error ->
            exit(Error)
    end.

continue_loop(Socket, Handler, State) ->
    case Handler:handle_data(Socket, State) of
        {continue, State1} ->
            continue_loop(Socket, Handler, State1);
        {close, _State} ->
            ok;
        {error, closed} ->
            ok;
        {error, _Reason} = Error ->
            exit(Error)
    end.

handle_data(Socket, State) ->
    case gen_tcp:recv(Socket, 0, ?RECV_TIMEOUT) of
        {ok, _Data} ->
            {continue, State};
        Err ->
            Err
    end.

init(Opts) ->
    process_flag(trap_exit, true),
    Handler = proplists:get_value(handler, Opts, kali_test),
    IpAddr = proplists:get_value(ip, Opts, {127, 0, 0, 1}),
    Port = proplists:get_value(port, Opts, 8000),
    NumAcceptors =
        case proplists:get_value(min_acceptors, Opts, 1) of
            N when N >= 1 ->
                N;
            _ ->
                exit(your_min_acceptors_option_is_bad_and_you_should_feel_bad)
        end,

    {ok, Socket} =
        gen_tcp:listen(
            Port,
            [
                {mode, binary},
                {ip, IpAddr},
                {active, false},
                {backlog, 1024}
            ]
        ),
    State = #{listen_socket => Socket, handler => Handler},
    [start_acceptor(State) || _I <- lists:seq(1, NumAcceptors)],
    {ok, State}.

handle_call(_, State, _) ->
    {noreply, State}.

handle_cast(accepted, State) ->
    start_acceptor(State),
    {noreply, State};
handle_cast(_, State) ->
    {noreply, State}.

handle_info({'EXIT', _Pid, {error, emfile = Reason}}, State) ->
    {stop, Reason, State};
handle_info(_, State) ->
    {noreply, State}.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

terminate(_Reason, _State) ->
    ok.

start_acceptor(#{listen_socket := ListenSocket, handler := Handler} = _State) ->
    start_acceptor(self(), ListenSocket, Handler).

start_acceptor(Parent, ListenSocket, Handler) ->
    proc_lib:spawn_link(
        kali_test,
        accept,
        [Parent, ListenSocket, Handler]
    ).
1 Like

Thank you for that wealth of knowledge. Curious me just had to look further, I had to prove to myself erlang (really inet_drv) is not closing connections, tcp_kali is :slight_smile:

There’s two interesting things that happen. On the first conn, right after the SYN, ACK is sent back from the erlang side, tcp_kali sends a RST. That can account for one (though I’m not sure that’s reported).

At some point, as tcp_kali is opening conns (it concurrently starts sending data vs opening all conns then flooding), tcp_kali starts sending FIN, ACK to the erlang side, the erlang side politely says “Sure, goodbye”.

Now, I love to be wrong, but packet captures usually don’t lie, especially over loopback :slight_smile:

The odd RST ^

FIN, ACK madness ^

I would encourage @ruslandoga to break out tcpdump, wireshark, etc and analyze this themselves :slight_smile:

Edit:

I forgot to say, this clearly has something to do with buffers, zero window fun, etc. and it may not be tcp_kali at all, but rather OS defaults working against you here, but it might be both.

Also note, you can rid yourself of the issue by slowing down the erlang send side. Put a sleep call with an arg of a few ms (this will be variable surely) and notice you get the same effect of increasing the buffer size, this jibes with Raimo’s commentary.

1 Like