I’m on a quest to implement a fast acceptor pool in Elixir so I’ve been benchmarking different ones and I noticed that gen_tcp
-based ones don’t seem to work well with GitHub - satori-com/tcpkali: Fast multi-core TCP and WebSockets load generator., with tcpkali sometimes not being able to collect enough connections. socket
-based ones do seem to work though, so I wonder what is the difference.
Here's the gen_tcp
example
defmodule Pool do
use GenServer
require Logger
def start_link(opts) do
{genserver_opts, opts} = Keyword.split(opts, [:name, :debug])
GenServer.start_link(__MODULE__, opts, genserver_opts)
end
@impl true
def init(opts) do
Process.flag(:trap_exit, true)
handler = Keyword.fetch!(opts, :handler)
ip = Keyword.get(opts, :ip, {127, 0, 0, 1})
port = Keyword.get(opts, :port, 0)
min_acceptors = opts[:min_acceptors] || 100
{:ok, socket} = :gen_tcp.listen(port, mode: :binary, ip: ip, active: false, backlog: 1024)
state = %{socket: socket, handler: handler}
for _ <- 1..min_acceptors, do: start_acceptor(state)
{:ok, state}
end
@impl true
def handle_cast(:accepted, state) do
start_acceptor(state)
{:noreply, state}
end
@impl true
def handle_info({:EXIT, _pid, {:error, :emfile = reason}}, state) do
Logger.error("No more file descriptors, shutting down")
{:stop, reason, state}
end
def handle_info({:EXIT, _pid, :normal}, state) do
{:noreply, state}
end
def handle_info({:EXIT, _pid, reason}, state) do
Logger.error(Exception.format_exit(reason))
{:noreply, state}
end
defp start_acceptor(state) do
%{socket: socket, handler: handler} = state
start_acceptor(self(), socket, handler)
end
defp start_acceptor(parent, listen_socket, handler) do
:proc_lib.spawn_link(__MODULE__, :__accept__, [parent, listen_socket, handler])
end
@doc false
def __accept__(parent, listen_socket, handler) do
case :gen_tcp.accept(listen_socket, :timer.seconds(10)) do
{:ok, socket} ->
GenServer.cast(parent, :accepted)
continue_loop(socket, handler, _state = [])
{:error, reason} when reason in [:timeout, :econnaborted] ->
__accept__(parent, listen_socket, handler)
{:error, :closed} ->
:ok
{:error, _reason} = error ->
exit(error)
end
end
defp continue_loop(socket, handler, state) do
case handle_data(socket, handler, state) do
{:continue, state} -> continue_loop(socket, handler, state)
{:close, _state} -> :ok
{:error, :closed} -> :ok
{:error, _reason} = error -> exit(error)
end
end
defp handle_data(socket, handler, state) do
with {:ok, data} <- :gen_tcp.recv(socket, 0, :timer.seconds(30)) do
handler.handle_data(data, socket, state)
end
end
end
defmodule Echo do
def handle_data(data, socket, state) do
:gen_tcp.send(socket, data)
{:continue, state}
end
end
Pool.start_link(handler: Echo, port: 8000)
And here’s the ongoing tcpkali
output:
$ tcpkali --message '$' --connections=100 --duration=15s 127.0.0.1:8000
Destination: [127.0.0.1]:8000
Interface lo0 address [127.0.0.1]:0
Using interface lo0 to connect to [127.0.0.1]:8000
Ramped up to 100 connections.
Traffic 4015.196↓, 4495.600↑ Mbps (conns 0↓ 100↑ 0⇡; seen 541)
Here seen 541
means that 541 connections have been opened even though the target is 100. With gen_tcp
connections get seemingly closed prematurely in this benchmark and tcpcali
has to create new ones. This is not the case with socket
, please see below.
And here's the socket
example
defmodule Pool do
use GenServer
require Logger
def start_link(opts) do
{genserver_opts, opts} = Keyword.split(opts, [:name, :debug])
GenServer.start_link(__MODULE__, opts, genserver_opts)
end
@impl true
def init(opts) do
Process.flag(:trap_exit, true)
handler = Keyword.fetch!(opts, :handler)
ip = Keyword.get(opts, :ip, {127, 0, 0, 1})
port = Keyword.get(opts, :port, 0)
min_acceptors = opts[:min_acceptors] || 100
{:ok, socket} = :socket.open(:inet, :stream, :tcp)
:ok = :socket.bind(socket, %{family: :inet, port: port, addr: ip})
:ok = :socket.listen(socket, _backlog = 1024)
state = %{socket: socket, handler: handler}
for _ <- 1..min_acceptors, do: start_acceptor(state)
{:ok, state}
end
@impl true
def handle_cast(:accepted, state) do
start_acceptor(state)
{:noreply, state}
end
@impl true
def handle_info({:EXIT, _pid, {:error, :emfile = reason}}, state) do
Logger.error("No more file descriptors, shutting down")
{:stop, reason, state}
end
def handle_info({:EXIT, _pid, :normal}, state) do
{:noreply, state}
end
def handle_info({:EXIT, _pid, reason}, state) do
Logger.error(Exception.format_exit(reason))
{:noreply, state}
end
defp start_acceptor(state) do
%{socket: socket, handler: handler} = state
start_acceptor(self(), socket, handler)
end
defp start_acceptor(parent, listen_socket, handler) do
:proc_lib.spawn_link(__MODULE__, :__accept__, [parent, listen_socket, handler])
end
@doc false
def __accept__(parent, listen_socket, handler) do
case :socket.accept(listen_socket, :timer.seconds(10)) do
{:ok, socket} ->
GenServer.cast(parent, :accepted)
continue_loop(socket, handler, _state = [])
{:error, reason} when reason in [:timeout, :econnaborted] ->
__accept__(parent, listen_socket, handler)
{:error, :closed} ->
:ok
{:error, _reason} = error ->
exit(error)
end
end
defp continue_loop(socket, handler, state) do
case handle_data(socket, handler, state) do
{:continue, state} -> continue_loop(socket, handler, state)
{:close, _state} -> :ok
{:error, :closed} -> :ok
{:error, _reason} = error -> exit(error)
end
end
defp handle_data(socket, handler, state) do
with {:ok, data} <- :socket.recv(socket, 0, :timer.seconds(30)) do
handler.handle_data(data, socket, state)
end
end
end
defmodule Echo do
def handle_data(data, socket, state) do
:socket.send(socket, data)
{:continue, state}
end
end
Pool.start_link(handler: Echo, port: 8000)
And the ongoing tcpkali
output
$ tcpkali --message '$' --connections=100 --duration=15s 127.0.0.1:8000
Destination: [127.0.0.1]:8000
Interface lo0 address [127.0.0.1]:0
Using interface lo0 to connect to [127.0.0.1]:8000
Ramped up to 100 connections.
Traffic 4085.053↓, 4201.748↑ Mbps (conns 0↓ 100↑ 0⇡; seen 100)
I’d be happy to learn how to find out what’s different between the two implementations.