Need help using emqtt with quic over mqtt on Erlang/OTP 27

I’m trying to use emqtt with quic over mqtt on Erlang/OTP 27 [erts-15.1.2] to communicate with a EK9160 Bus Coupler. I can establish communication with the device from the the command line using these instructions.

 cd ~/emqtt/_build/emqtt/rel/emqtt/bin
With quic option - -enable quic and QoS 1

./emqtt sub -h '192.168.178.33' -p '14567' --enable-quic=true -q '1' -k '300' -t 'EK9160/EK-98EFB6/Stream1/Json/Tx/Data'

./emqtt pub -h '192.168.178.33' -p '14567' --enable-quic=true -q '1' -k '300' -t 'EK9160/EK-98EFB6/Stream1/Json/Rx/Data' -k '30' --payload '{"Values":{"Slave 2 (EL2008).Channel 1.Output":false}}'

To communicate with emqtt using an Erlang/OTP gen_server, I created a new myapp application project using rebar3 and the following rebar.config file

alberto@debian:~/myapp$ cat rebar.config 
{erl_opts, [debug_info]}.
{deps, [{emqtt, {git, "https://github.com/emqx/emqtt.git", {branch, "master"}}} ]}.

{shell, [
    %% {config, "config/sys.config"},
    {apps, [myapp]}
]}.

And then placed the following ek9160.erl file I created in the src directory of myapp

alberto@debian:~/myapp/_build/default/lib$ ls
cowlib  emqtt  getopt  gun  myapp  quicer  snabbkaffe

Here is a copy of the Erlang module

-module(ek9160).
-behaviour(gen_server).
-export([start/0, cnct/0, sub/0, ping/0]).
-export([init/1, handle_call/3, handle_cast/2]).

-record(state, {clientID}).

cnct() ->
   gen_server:call(ek9160, cnct).
sub()->
   gen_server:call(ek9160, sub).
ping() ->
   gen_server:call(ek9160, ping).

start() ->
  gen_server:start_link({local, ek9160}, ek9160, [], []).

init(_Args) ->
  {ok, ConnPid} = emqtt:start_link([{host, '192.168.178.33'}, {port, 14567}, {keepalive,300}]),
  State = #state{clientID = ConnPid},
  {ok, State}.

handle_call(cnct, _From, State) ->
   {ok, Props} = emqtt:quic_connect(State#state.clientID),
   io:fromat("Props ~p~n", [Props]),
   {reply, ok, State};
handle_call(sub, _From, State) ->
   Properties = #{'Payload-Format-Indicator' => 0,
                  %'Content-Type' => binary(),
                  'Server-Keep-Alive' => 300,
                  'Maximum-QoS' => 1,
                  'Retain-Available' => 1},
   SubOpts = [{qos, 1}],
   Subscriptions = [{<<"EK9160/EK-98EFB6/Stream1/Json/Tx/Data">>, SubOpts}],
   {ok, _Props, _ReasonCodes} = emqtt:subscribe(State#state.clientID, Properties, Subscriptions),
   {reply, ok, State};  
handle_call(ping, _From, State) ->
   pong = emqtt:ping(State#state.clientID),
   {reply, ok, State}.

 handle_cast(_Msg, _State) ->
    NewState = {},
    {noreply, NewState}.

Unfortunately when run the module I get the following error report.

alberto@debian:~/myapp$ rebar3 shell
===> Verifying dependencies...
===> Analyzing applications...
===> Compiling myapp
Erlang/OTP 27 [erts-15.1.2] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:1] [jit:ns]

Eshell V15.1.2 (press Ctrl+G to abort, type help(). for help)
===> Booted cowlib
===> Booted gun
===> Booted emqtt
===> Booted myapp
1> ek9160:start().
{ok,<0.218.0>}

2> ek9160:sub().
=ERROR REPORT==== 9-Dec-2024::18:46:34.324796 ===
    state: initialized
    msg: unexpected_event
    clientid: <<"emqtt-debian-d12747649e95ead816f4">>
    event: {subscribe,#{'Payload-Format-Indicator' => 0,
                        'Server-Keep-Alive' => 300,'Maximum-QoS' => 1,
                        'Retain-Available' => 1},
                      [{<<"EK9160/EK-98EFB6/Stream1/Json/Tx/Data">>,
                        #{nl => 0,qos => 1,rap => 0,rh => 0}}]}
    event_type: {call,{<0.218.0>,#Ref<0.2299435312.546570241.15840>}}
=ERROR REPORT==== 9-Dec-2024::18:46:39.347036 ===
** State machine <0.219.0> terminating
** When server state  = {initialized,
                            {state,undefined,<0.218.0>,undefined,
                                '192.168.178.33',14567,[],emqtt_sock,
                                undefined,[],60000,false,
                                <<"emqtt-debian-d12747649e95ead816f4">>,true,
                                undefined,undefined,4,<<"MQTT">>,300,
                                undefined,false,false,undefined,#{},[],#{},
                                #{seq => 1,sent => #{},
                                  max_inflight => infinity},
                                #{},true,30000,undefined,30000,undefined,
                                undefined,1,false,
                                {none,
                                    #{version => 4,max_size => 268435455,
                                      strict_mode => false}},
                                0,5000,false,undefined,
                                {[],[]},
                                #{}}}
** Reason for termination = exit:{{timeout,{gen_server,call,[ek9160,sub]}},
                                  [{gen_server,call,2,
                                       [{file,"gen_server.erl"},{line,1142}]},
                                   {erl_eval,do_apply,7,
                                       [{file,"erl_eval.erl"},{line,904}]},
                                   {shell,exprs,7,
                                       [{file,"shell.erl"},{line,893}]},
                                   {shell,eval_exprs,7,
                                       [{file,"shell.erl"},{line,849}]},
                                   {shell,eval_loop,4,
                                       [{file,"shell.erl"},{line,834}]}]}
** Callback modules = [emqtt]
** Callback mode = state_functions
** Stacktrace =
**  [{gen_statem,loop_receive,3,[{file,"gen_statem.erl"},{line,3625}]},
     {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,329}]}]

=CRASH REPORT==== 9-Dec-2024::18:46:39.352544 ===
  crasher:
    initial call: emqtt:init/1
    pid: <0.219.0>
    registered_name: []
    exception exit: {{timeout,{gen_server,call,[ek9160,sub]}},
                     [{gen_server,call,2,
                                  [{file,"gen_server.erl"},{line,1142}]},
                      {erl_eval,do_apply,7,[{file,"erl_eval.erl"},{line,904}]},
                      {shell,exprs,7,[{file,"shell.erl"},{line,893}]},
                      {shell,eval_exprs,7,[{file,"shell.erl"},{line,849}]},
                      {shell,eval_loop,4,[{file,"shell.erl"},{line,834}]}]}
      in function  gen_statem:loop_receive/3 (gen_statem.erl, line 3625)
    ancestors: [ek9160,<0.214.0>,<0.208.0>,<0.207.0>,<0.66.0>,<0.70.0>,
                  <0.65.0>,kernel_sup,<0.47.0>]
    message_queue_len: 0
    messages: []
    links: []
    dictionary: [{rand_seed,{#{max => 288230376151711743,type => exsplus,
                                next => #Fun<rand.5.40079776>,
                                jump => #Fun<rand.3.40079776>},
                              [242607169756497238|259535589815849374]}}]
    trap_exit: true
    status: running
    heap_size: 6772
    stack_size: 29
    reductions: 21367
  neighbours:

** exception exit: {timeout,{gen_server,call,[ek9160,sub]}}
     in function  gen_server:call/2 (gen_server.erl, line 1142)
3>

The crux of the error is ** Reason for termination = exit:{{timeout,{gen_server,call,[ek9160,sub]}},

I tried to remedy this problem by adding an extended server timeout for the following call, but that didn’t fix the problem.

sub()->
   gen_server:call(ek9160, sub, 6000).

I don’t know if the problem is related to the properties and options I added to the subscription instructions or if I’m calling the emqtt methods incorrectly resulting in the OTP error report.

Would any of you in the Erlang Forum know what I should do to fix the communication problem?

I only briefly viewed the code, and I didn’t try to compile and run it, but here’s a few things:

  1. State of the emqtt process is initialized according to the logs. In this state the only acceptable command is connect, hence unexpected_event error. You need to connect emqtt to the broker first.
  2. Your code implements connect function, but it has a typo io:fromat("Props ~p~n", [Props]),, so I assume it wasn’t called.

I hope it helps.

P.S. emqtt is a one of EMQX projects, the best way to reach the developers is by creating a discussion here: emqx/emqx · Discussions · GitHub

Good morning ieQu1 <https://erlangforums.com/u/iequ1>

I’m running Erlang/OTP 27 [erts-15.1.2] on Debian 12. This is a requirement for running emqtt client and with quic over mqtt.

I just rewrote the module calling com.erl here it com.erl

-module(com).

-behaviour(gen_server).

-export([start/0, stop/0, cnct/0, subscribe/0]).

-export([init/1, handle_call/3, handle_cast/2, terminate/2, handle_info/2]).

-record(state, {clientID}).

%% CLIENT FUNCTIONS

%% start() → {ok, pid()} | {error, Reason}

%% Starts the com server. Called by supervisor

start() →

gen_server:start_link({local, com}, com, , ).

%% stop() → ok.

%% Stops the com server.

stop() →

gen_server:cast(com, stop).

cnct() →

gen_server:cast(com, self()).

subscribe() →

gen_server:call(com, subscribe).

init() →

State = #state{clientID = self()},

{ok, State}.

%% -spec(open_quic_connection(client()) → ok | {error, term()}).

handle_cast(cnct, Pid) →

io:format(“Pid ~p~n”, [Pid]),

State = #state{clientID = Pid},

emqtt:open_quic_connection(State#state.clientID),

emqtt:quic_mqtt_connect(State#state.clientID),

{noreply, State};

handle_cast(stop, State) →

{stop, normal, State}.

%% -spec(subscribe(client(), topic() | {topic(), qos() | qos_name() | [subopt()]} | [{topic(), qos()}])

%% → subscribe_ret()).

handle_call(subscribe, _From, State) →

Properties = #{‘Payload-Format-Indicator’ => 0,

‘Message-Expiry-Interval’ => 10,

‘Content-Type’ => 1,

‘Server-Keep-Alive’ => 30,

%‘Maximum-QoS’ => 1,

‘Retain-Available’ => 1},

SubOpts = [{qos, 1}],

Subscriptions = [{<<“EK9160/EK-98EFB6/Stream1/Json/Tx/Data”>>, SubOpts}],

emqtt:subscribe(State#state.clientID, Properties, Subscriptions),

{noreply, State}.

handle_info(_Msg, State) →

{noreply, State}.

%% terminate(Reason, LoopData) → ok.

%% Termination callback. Does nothing, but should instead kill clients.

terminate(_Reason, _LoopData) →

ok.

I thought this might have worked but I still getting errors

alberto@debian:~/testapp$ rebar3 shell

===> Verifying dependencies…

===> Analyzing applications…

===> Compiling testapp

Erlang/OTP 27 [erts-15.1.2] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:1] [jit:ns]

===> Booted testapp

Eshell V15.1.2 (press Ctrl+G to abort, type help(). for help)

1> com:start().

{ok,<0.215.0>}

2> com:cnct().

ok

=ERROR REPORT==== 12-Dec-2024::10:13:04.180443 ===

** Generic server com terminating

** Last message in was {‘$gen_cast’,<0.213.0>}

** When Server state == {state,<0.215.0>}

** Reason for termination ==

** {function_clause,[{com,handle_cast,

[<0.213.0>,{state,<0.215.0>}],

[{file,“/home/alberto/testapp/src/com.erl”},

{line,35}]},

{gen_server,try_handle_cast,3,

[{file,“gen_server.erl”},{line,2371}]},

{gen_server,handle_msg,6,

[{file,“gen_server.erl”},{line,2433}]},

{proc_lib,init_p_do_apply,3,

[{file,“proc_lib.erl”},{line,329}]}]}

=CRASH REPORT==== 12-Dec-2024::10:13:04.181474 ===

crasher:

initial call: com:init/1

pid: <0.215.0>

registered_name: com

exception error: no function clause matching

com:handle_cast(<0.213.0>,{state,<0.215.0>}) (/home/alberto/testapp/src/com.erl, line 35)

in function gen_server:try_handle_cast/3 (gen_server.erl, line 2371)

in call from gen_server:handle_msg/6 (gen_server.erl, line 2433)

ancestors: [<0.213.0>,<0.210.0>,<0.207.0>,<0.66.0>,<0.70.0>,<0.65.0>,

kernel_sup,<0.47.0>]

message_queue_len: 0

messages:

links: [<0.213.0>]

dictionary:

trap_exit: false

status: running

heap_size: 6772

stack_size: 29

reductions: 6172

neighbours:

neighbour:

pid: <0.213.0>

registered_name:

initial_call: {erlang,apply,2}

current_function: {shell,eval_loop,4}

ancestors: [<0.210.0>,<0.207.0>,<0.66.0>,<0.70.0>,<0.65.0>,kernel_sup,

<0.47.0>]

message_queue_len: 0

links: [<0.210.0>,<0.215.0>]

trap_exit: false

status: waiting

heap_size: 610

stack_size: 8

reductions: 2639

current_stacktrace: [{shell,eval_loop,4,[{file,“shell.erl”},{line,828}]}]

** exception error: no function clause matching

com:handle_cast(<0.213.0>,{state,<0.215.0>}) (/home/alberto/testapp/src/com.erl, line 35)

in function gen_server:try_handle_cast/3 (gen_server.erl, line 2371)

in call from gen_server:handle_msg/6 (gen_server.erl, line 2433)

in call from proc_lib:init_p_do_apply/3 (proc_lib.erl, line 329)

3>

I believe that the problem occurs because the clienId is the Pid of the module being used to call emqtt functions.

In the error report you will see this text

=ERROR REPORT==== 12-Dec-2024::10:13:04.180443 ===

** Generic server com terminating

** Last message in was {‘$gen_cast’,<0.213.0>}

** When Server state == {state,<0.215.0>}

** Reason for termination ==

** {function_clause,[{com,handle_cast,

[<0.213.0>,{state,<0.215.0>}],

[{file,“/home/alberto/testapp/src/com.erl”},

{line,35}]},

And I don’t know how to fix this problem now for three days. I hope you can help.

Alberto

Your cnct function sends the callers pid to the process (gen_server) registered as com. You have no handle_cast clause for that in your gen_server, only clauses that handle the atoms cnct and stop.

Hi Maria, I have altered the module. Here’s how it reads now

-module(com).

-behaviour(gen_server).

-export([start/0, stop/0, cnct/0, subscribe/0]).

-export([init/1, handle_call/3, handle_cast/2, terminate/2, handle_info/2]).

-record(state, {clientID}).

%% CLIENT FUNCTIONS

%% start() -> {ok, pid()} | {error, Reason}

%% Starts the com server. Called by supervisor

start() ->

gen_server:start_link({local, com}, com, [], []).

%% stop() -> ok.

%% Stops the com server.

stop() ->

gen_server:cast(com, stop).

cnct() ->

gen_server:cast(com, cnct).

subscribe() ->

gen_server:call(com, subscribe).

init([]) ->

State = #state{clientID = self()},

{ok, State}.

%% -spec(open_quic_connection(client()) -> ok | {error, term()}).

handle_cast(cnct, State) ->

io:format("State Pid ~p~n", [State#state.clientID]),

%emqtt:open_quic_connection(State#state.clientID),

emqtt:quic_mqtt_connect(State#state.clientID),

{noreply, State};

handle_cast(stop, State) ->

{stop, normal, State}.

%% -spec(subscribe(client(), topic() | {topic(), qos() | qos_name() | [subopt()]} | [{topic(), qos()}])

%% -> subscribe_ret()).

handle_call(subscribe, _From, State) ->

Properties = #{'Payload-Format-Indicator' => 0,

'Message-Expiry-Interval' => 10,

'Content-Type' => 1,

'Server-Keep-Alive' => 30,

%'Maximum-QoS' => 1,

'Retain-Available' => 1},

SubOpts = [{qos, 1}],

Subscriptions = [{<<"EK9160/EK-98EFB6/Stream1/Json/Tx/Data">>, SubOpts}],

emqtt:subscribe(State#state.clientID, Properties, Subscriptions),

{noreply, State}.

handle_info(_Msg, State) ->

{noreply, State}.

%% terminate(Reason, LoopData) -> ok.

%% Termination callback. Does nothing, but should instead kill clients.

terminate(_Reason, _LoopData) ->

ok.

Well does it work now?

To be clear, I can’t help with emqtt stuff. Never used it, probably won’t in the foreseeable future. I was just pointing out the reson for the function_clause error you received there, which should be gone now.

A few more remarks, though.


Storing and carrying around the gen_servers own pid in the state seems rather pointless? In all places where you need it (in your current implementation that is), self() can be used again directly.


You could use gen_server:stop/1,3, instead of casting stop and handling that in the gen_server process by stopping. One difference is that gen_server:stop/1,3 will only return after the gen_server process has exited, whereas your stop/0 function will return immediately with the actual stopping happening asynchronously, later. Since you are using a registered process, you will likely want to wait, that is, if you’re calling your (async) stop and start quickly after one another, you’re likely to encounter already_started errors because the gen_server process has not exited yet and is still registered as com.


This is really hard to read, which is because you’re posting via E-Mail, and so there is no code formatting, everything shows up as plain text (and [] being displayed as is not helpful (@AstonJ :wink:)). Not sure if formatting can be used in E-Mail posts, so could you please post on the Forum via the Web interface instead?

I’ve answered on github.

Hi Maria

I fully agree with you. When something is entirely new there is always unexpected stumbling stones along the way that may lead to wrong interpretations.

In the README.rd on GitHub - emqx/emqtt: Erlang MQTT 5.0 Client states that the clientId is the Pid returned from calling start_link()

Connect to the MQTT server over TCP or TLS and send a CONNECT packet with the options specified in start_link/1, 2. Client must be a pid returned from start_link/1, 2 or a name specified in start_link/1, 2.

Then if you look into the emqtt source code you will find this function

-spec(start_link() -> gen_statem:start_ret()).

start_link() -> start_link([]).
-spec(start_link(map() | [option()]) -> gen_statem:start_ret()).
start_link(Options) when is_map(Options) ->
    start_link(maps:to_list(Options));
start_link(Options) when is_list(Options) ->
    ok = emqtt_props:validate(
            proplists:get_value(properties, Options, #{})),
    StatmOpts = case proplists:get_bool(low_mem, Options) of
                    false -> [];
                    true ->
                        [{spawn_opt, [{min_heap_size, 16},
                                      {min_bin_vheap_size,16}
                                     ]},
                         {hibernate_after, 50}
                        ]
                end,

    case proplists:get_value(name, Options) of
        undefined ->
            gen_statem:start_link(?MODULE, [with_owner(Options)], StatmOpts);
        Name when is_atom(Name) ->
            gen_statem:start_link({local, Name}, ?MODULE, [with_owner(Options)], StatmOpts)
    end.

So now that this has become a bit clear to me I’ve altered the “init()…” part of mycom.erl module to read like this

init([]) ->
    {ok, ConnPid} = emqtt:start_link([{host, '192.168.178.33'}, {port, 14567}]),
    io:format("ConnPid ~p~n", [ConnPid]),
    State = #state{clientID = ConnPid},
    {ok, State}.

This fixes fixes the previous error I was getting when calling my cnct() API function.

alberto@debian:~/testapp$ rebar3 shell
===> Verifying dependencies...
===> Analyzing applications...
===> Compiling testapp
Erlang/OTP 27 [erts-15.1.2] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:1] [jit:ns]

===> Booted testapp
Eshell V15.1.2 (press Ctrl+G to abort, type help(). for help)
1> com:start().
ConnPid <0.216.0>
{ok,<0.215.0>}
2> com:cnct().
ConnPid <0.216.0>
ok
3> 

However there still is a problem when running the subscription function. The data for the subscription is coming from an EK9160 Bus Coupler device. Once configured it automatically publishes data from the sensors and actuators connected to the EK9160 Bus Coupler’s slaves/terminals. The emqtt client should return that data it subscribes from the mqtt broker version 5. In my com.erl module I trying to do that here

%% -spec(subscribe(client(), topic() | {topic(), qos() | qos_name() | [subopt()]} | [{topic(), qos()}])
      %% -> subscribe_ret()).
handle_call(subscribe, _From, State) ->
    Properties = #{'Payload-Format-Indicator' => 0,
                  'Message-Expiry-Interval' => 10,
                  'Content-Type' => 1,
                  'Server-Keep-Alive' => 30,
                  %'Maximum-QoS' => 1,
                  'Retain-Available' => 1},
   SubOpts = [{qos, 1}],
   Subscriptions = [{<<"EK9160/EK-98EFB6/Stream1/Json/Tx/Data">>, SubOpts}],
   Reply = emqtt:subscribe(State#state.clientID, Properties, Subscriptions),
   {reply, Reply, State}.
3> com:subscribe().
=ERROR REPORT==== 13-Dec-2024::11:47:33.739127 ===
    state: initialized
    msg: unexpected_event
    clientid: <<"emqtt-debian-6d93a9f0c1d5cc80ff87">>
    event: {subscribe,#{'Content-Type' => 1,'Message-Expiry-Interval' => 10,
                        'Payload-Format-Indicator' => 0,
                        'Retain-Available' => 1,'Server-Keep-Alive' => 30},
                      [{<<"EK9160/EK-98EFB6/Stream1/Json/Tx/Data">>,
                        #{nl => 0,qos => 1,rap => 0,rh => 0}}]}
    event_type: {call,{<0.208.0>,#Ref<0.1895992169.1106247681.15896>}}
** exception exit: {timeout,{gen_server,call,[com,subscribe]}}
     in function  gen_server:call/2 (gen_server.erl, line 1142)
=ERROR REPORT==== 13-Dec-2024::11:47:39.244897 ===
** State machine <0.209.0> terminating
** When server state  = {initialized,
                         {state,undefined,<0.208.0>,undefined,
                          '192.168.178.33',14567,[],emqtt_quic,
                          {quic,#Ref<0.1895992169.1106378753.15883>,
                           #Ref<0.1895992169.1106378753.15885>},
                          [{handle,#Ref<0.1895992169.1106378753.15883>}],
                          60000,false,<<"emqtt-debian-6d93a9f0c1d5cc80ff87">>,
                          true,undefined,undefined,4,<<"MQTT">>,60,undefined,
                          false,false,undefined,#{},
                          [{call,
                            {callid,connect,
                             {quic,#Ref<0.1895992169.1106378753.15883>,
                              #Ref<0.1895992169.1106378753.15885>},
                             undefined},
                            {<0.208.0>,#Ref<0.1895992169.1106247681.15884>},
                            undefined,
                            {1734,86843,284304}}],
                          #{},
                          #{seq => 1,sent => #{},max_inflight => infinity},
                          #{},true,30000,undefined,30000,undefined,undefined,
                          2,false,
                          {none,
                           #{version => 4,max_size => 268435455,
                             strict_mode => false}},
                          0,5000,false,undefined,
                          {[],[]},
                          #{clientid =>
                             <<"emqtt-debian-6d93a9f0c1d5cc80ff87">>,
                            reconnect => false,
                            control_stream_sock =>
                             {quic,#Ref<0.1895992169.1106378753.15883>,
                              #Ref<0.1895992169.1106378753.15885>},
                            conn_parse_state =>
                             {none,
                              #{version => 4,max_size => 268435455,
                                strict_mode => false}},
                            data_stream_socks => [],logic_stream_map => #{},
                            stream_parse_state =>
                             #{{quic,#Ref<0.1895992169.1106378753.15883>,
                                #Ref<0.1895992169.1106378753.15885>} =>
                                {none,
                                 #{version => 4,max_size => 268435455,
                                   strict_mode => false}}},
                            state_name => init,is_local => true,
                            is_unidir => false,
                            quic_conn_cb => emqtt_quic_connection,
                            quic_stream_cb => emqtt_quic_stream}}}
** Reason for termination = exit:{{timeout,{gen_server,call,[com,subscribe]}},
                                  [{gen_server,call,2,
                                       [{file,"gen_server.erl"},{line,1142}]},
                                   {erl_eval,do_apply,7,
                                       [{file,"erl_eval.erl"},{line,904}]},
                                   {shell,exprs,7,
                                       [{file,"shell.erl"},{line,893}]},
                                   {shell,eval_exprs,7,
                                       [{file,"shell.erl"},{line,849}]},
                                   {shell,eval_loop,4,
                                       [{file,"shell.erl"},{line,834}]}]}
** Callback modules = [emqtt]
** Callback mode = state_functions
** Stacktrace =
**  [{gen_statem,loop_receive,3,[{file,"gen_statem.erl"},{line,3625}]},
     {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,329}]}]

=CRASH REPORT==== 13-Dec-2024::11:47:39.245498 ===
  crasher:
    initial call: emqtt:init/1
    pid: <0.209.0>
    registered_name: []
    exception exit: {{timeout,{gen_server,call,[com,subscribe]}},
                     [{gen_server,call,2,
                                  [{file,"gen_server.erl"},{line,1142}]},
                      {erl_eval,do_apply,7,[{file,"erl_eval.erl"},{line,904}]},
                      {shell,exprs,7,[{file,"shell.erl"},{line,893}]},
                      {shell,eval_exprs,7,[{file,"shell.erl"},{line,849}]},
                      {shell,eval_loop,4,[{file,"shell.erl"},{line,834}]}]}
      in function  gen_statem:loop_receive/3 (gen_statem.erl, line 3625)
    ancestors: [com,<0.206.0>,<0.203.0>,<0.200.0>,<0.66.0>,<0.70.0>,
                  <0.65.0>,kernel_sup,<0.47.0>]
    message_queue_len: 2
    messages: [{quic,send_shutdown_complete,
                        #Ref<0.1895992169.1106378753.15885>,false},
                  {quic,stream_closed,#Ref<0.1895992169.1106378753.15885>,
                        #{error => 346,status => user_canceled,
                          is_app_closing => false,is_shutdown_by_app => false,
                          is_closed_remotely => false,
                          is_conn_shutdown => true}}]
    links: []
    dictionary: [{rand_seed,{#{max => 288230376151711743,type => exsplus,
                                next => #Fun<rand.5.40079776>,
                                jump => #Fun<rand.3.40079776>},
                              [282812234431751097|68555478662159310]}}]
    trap_exit: true
    status: running
    heap_size: 987
    stack_size: 29
    reductions: 29222
  neighbours:

I believe I can resolve this problem by using a gen_statem instead gen_server as I’m currently doing in the com.erl module. The a gen_statem has an infinite timeout which should handle the timeout error. Eventually when I’ll put the module under a supervisor I can set a timeout that will not be infinite.

I’ll keep you posted.

ClientID and client are not interchangeable terms. ClientID is a string that identifies the client to the broker, as per semantics of the MQTT protocol. EMQTT client is an Erlang process used as a proxy for communicating with the broker. It maintains quic connection, etc.

Regarding your implementation, why is cnct a separate call and not part of init? As I explained earlier, the client cannot process any commands (such as subscribe) until it’s connected to the broker: subscribe is a control packet that the client sends to the broker. Without a connection, the client cannot relay this command to the broker.

Hi ieQu1,

indeed. I just realized it today, and not being clear about how to use it, I turned to see if someone in Erlang Forum could shed some light my way - and there you were, pointing out that it belongs init section of my module. You’re really keen :wink:

May I ask if you if I need to include the option as is below

{quic_opts, {quicer:conn_opts(), quicer:stream_opts()}}

following it with the code, I used in the wrong place, show below?

emqtt:open_quic_connection(State#state.clientID),
emqtt:quic_mqtt_connect(State#state.clientID),

Sorry Maria just noticed the @mention.

Markdown should still work via email - just need three backticks before and after the code block. There are some tips in this thread and this one:

1 Like