I’m on a quest to implement a fast acceptor pool in Elixir so I’ve been benchmarking different ones and I noticed that gen_tcp-based ones don’t seem to work well with GitHub - satori-com/tcpkali: Fast multi-core TCP and WebSockets load generator., with tcpkali sometimes not being able to collect enough connections. socket-based ones do seem to work though, so I wonder what is the difference.
Here's the gen_tcp example
defmodule Pool do
use GenServer
require Logger
def start_link(opts) do
{genserver_opts, opts} = Keyword.split(opts, [:name, :debug])
GenServer.start_link(__MODULE__, opts, genserver_opts)
end
@impl true
def init(opts) do
Process.flag(:trap_exit, true)
handler = Keyword.fetch!(opts, :handler)
ip = Keyword.get(opts, :ip, {127, 0, 0, 1})
port = Keyword.get(opts, :port, 0)
min_acceptors = opts[:min_acceptors] || 100
{:ok, socket} = :gen_tcp.listen(port, mode: :binary, ip: ip, active: false, backlog: 1024)
state = %{socket: socket, handler: handler}
for _ <- 1..min_acceptors, do: start_acceptor(state)
{:ok, state}
end
@impl true
def handle_cast(:accepted, state) do
start_acceptor(state)
{:noreply, state}
end
@impl true
def handle_info({:EXIT, _pid, {:error, :emfile = reason}}, state) do
Logger.error("No more file descriptors, shutting down")
{:stop, reason, state}
end
def handle_info({:EXIT, _pid, :normal}, state) do
{:noreply, state}
end
def handle_info({:EXIT, _pid, reason}, state) do
Logger.error(Exception.format_exit(reason))
{:noreply, state}
end
defp start_acceptor(state) do
%{socket: socket, handler: handler} = state
start_acceptor(self(), socket, handler)
end
defp start_acceptor(parent, listen_socket, handler) do
:proc_lib.spawn_link(__MODULE__, :__accept__, [parent, listen_socket, handler])
end
@doc false
def __accept__(parent, listen_socket, handler) do
case :gen_tcp.accept(listen_socket, :timer.seconds(10)) do
{:ok, socket} ->
GenServer.cast(parent, :accepted)
continue_loop(socket, handler, _state = [])
{:error, reason} when reason in [:timeout, :econnaborted] ->
__accept__(parent, listen_socket, handler)
{:error, :closed} ->
:ok
{:error, _reason} = error ->
exit(error)
end
end
defp continue_loop(socket, handler, state) do
case handle_data(socket, handler, state) do
{:continue, state} -> continue_loop(socket, handler, state)
{:close, _state} -> :ok
{:error, :closed} -> :ok
{:error, _reason} = error -> exit(error)
end
end
defp handle_data(socket, handler, state) do
with {:ok, data} <- :gen_tcp.recv(socket, 0, :timer.seconds(30)) do
handler.handle_data(data, socket, state)
end
end
end
defmodule Echo do
def handle_data(data, socket, state) do
:gen_tcp.send(socket, data)
{:continue, state}
end
end
Pool.start_link(handler: Echo, port: 8000)
And here’s the ongoing tcpkali output:
$ tcpkali --message '$' --connections=100 --duration=15s 127.0.0.1:8000
Destination: [127.0.0.1]:8000
Interface lo0 address [127.0.0.1]:0
Using interface lo0 to connect to [127.0.0.1]:8000
Ramped up to 100 connections.
Traffic 4015.196↓, 4495.600↑ Mbps (conns 0↓ 100↑ 0⇡; seen 541)
Here seen 541 means that 541 connections have been opened even though the target is 100. With gen_tcp connections get seemingly closed prematurely in this benchmark and tcpcali has to create new ones. This is not the case with socket, please see below.
And here's the socket example
defmodule Pool do
use GenServer
require Logger
def start_link(opts) do
{genserver_opts, opts} = Keyword.split(opts, [:name, :debug])
GenServer.start_link(__MODULE__, opts, genserver_opts)
end
@impl true
def init(opts) do
Process.flag(:trap_exit, true)
handler = Keyword.fetch!(opts, :handler)
ip = Keyword.get(opts, :ip, {127, 0, 0, 1})
port = Keyword.get(opts, :port, 0)
min_acceptors = opts[:min_acceptors] || 100
{:ok, socket} = :socket.open(:inet, :stream, :tcp)
:ok = :socket.bind(socket, %{family: :inet, port: port, addr: ip})
:ok = :socket.listen(socket, _backlog = 1024)
state = %{socket: socket, handler: handler}
for _ <- 1..min_acceptors, do: start_acceptor(state)
{:ok, state}
end
@impl true
def handle_cast(:accepted, state) do
start_acceptor(state)
{:noreply, state}
end
@impl true
def handle_info({:EXIT, _pid, {:error, :emfile = reason}}, state) do
Logger.error("No more file descriptors, shutting down")
{:stop, reason, state}
end
def handle_info({:EXIT, _pid, :normal}, state) do
{:noreply, state}
end
def handle_info({:EXIT, _pid, reason}, state) do
Logger.error(Exception.format_exit(reason))
{:noreply, state}
end
defp start_acceptor(state) do
%{socket: socket, handler: handler} = state
start_acceptor(self(), socket, handler)
end
defp start_acceptor(parent, listen_socket, handler) do
:proc_lib.spawn_link(__MODULE__, :__accept__, [parent, listen_socket, handler])
end
@doc false
def __accept__(parent, listen_socket, handler) do
case :socket.accept(listen_socket, :timer.seconds(10)) do
{:ok, socket} ->
GenServer.cast(parent, :accepted)
continue_loop(socket, handler, _state = [])
{:error, reason} when reason in [:timeout, :econnaborted] ->
__accept__(parent, listen_socket, handler)
{:error, :closed} ->
:ok
{:error, _reason} = error ->
exit(error)
end
end
defp continue_loop(socket, handler, state) do
case handle_data(socket, handler, state) do
{:continue, state} -> continue_loop(socket, handler, state)
{:close, _state} -> :ok
{:error, :closed} -> :ok
{:error, _reason} = error -> exit(error)
end
end
defp handle_data(socket, handler, state) do
with {:ok, data} <- :socket.recv(socket, 0, :timer.seconds(30)) do
handler.handle_data(data, socket, state)
end
end
end
defmodule Echo do
def handle_data(data, socket, state) do
:socket.send(socket, data)
{:continue, state}
end
end
Pool.start_link(handler: Echo, port: 8000)
And the ongoing tcpkali output
$ tcpkali --message '$' --connections=100 --duration=15s 127.0.0.1:8000
Destination: [127.0.0.1]:8000
Interface lo0 address [127.0.0.1]:0
Using interface lo0 to connect to [127.0.0.1]:8000
Ramped up to 100 connections.
Traffic 4085.053↓, 4201.748↑ Mbps (conns 0↓ 100↑ 0⇡; seen 100)
I’d be happy to learn how to find out what’s different between the two implementations.
