Hello,
pgmp
is a PostgreSQL client using some of the more recent features of OTP - socket, send_request/4 and check_response/3, crypto:pbkdf2_hmac/5. With support for simple and extended query, and logical replication to ETS.
Replication to ETS is a couple of steps:
create table xy (x integer primary key, y text);
insert into xy values (1, 'foo');
Create a PostgreSQL publication for that table:
create publication xy for table xy;
The current state of the table is replicated into an ETS table also called xy
:
1> ets:i(xy).
<1 > {1, <<"foo">>}
EOT (q)uit (p)Digits (k)ill
Introspection on the PostgreSQL metadata is done by pgmp
so that x
is used as the primary key for the replicated ETS table.
Thereafter, CRUD changes on the underlying PostgreSQL table will be automatically pushed to pgmp
and reflected in the ETS table.
An example of logical replication of a single table with a composite key:
create table xyz (x integer, y integer, z integer, primary key (x, y));
insert into xyz values (1, 1, 3);
Create a PostgreSQL publication for that table:
create publication xyz for table xyz;
With pgmp
configured for replication, the stanza:
{pgmp, [...
{replication_logical_publication_names, <<"xy,xyz">>},
...]}
Where replication_logical_publication_names
is a comma separated list of publication names for pgmp
to replicate. The contents of the PostgreSQL table is replicated into an ETS table of the same name.
1> ets:i(xyz).
<1 > {{1, 1}, 3}
EOT (q)uit (p)Digits (k)ill
Note that replication introspects the PostgreSQL table metadata so that {1, 1}
(x, y) is automatically used as the composite key.
There are two mechanisms for making asynchronous requests to pmgp
.
You can immediately wait for a response (via receive_response/1).
1> gen_statem:receive_response(pgmp_connection:query(#{sql => <<"select 2*3">>})).
{reply, [{row_description, [<<"?column?">>]},
{data_row, [6]},
{command_complete, {select, 1}}]}
Effectively the above turns an asynchronous call into a synchronous request that immediately blocks the current process until it receives the reply.
However, it is likely that you can continue doing some other important work, e.g., responding to other messages, while waiting for the response from pgmp
. In which case using the send_request/4 and check_response/3 pattern is preferred.
If you’re using pgmp
within another gen_*
behaviour (gen_server
, gen_statem
, etc), this is very likely to be the option to choose. So using another gen_statem
as an example:
The following init/1
sets up some state with a request id collection to maintain our outstanding asynchronous requests.
init([]) ->
{ok, ready, #{requests => gen_statem:reqids_new()}}.
You can then use the label
and requests
parameter to pgmp
to identify the response to your asynchronous request as follows:
handle_event(internal, commit = Label, _, _) ->
{keep_state_and_data,
{next_event, internal, {query, #{label => Label, sql => <<"commit">>}}}};
handle_event(internal, {query, Arg}, _, #{requests := Requests} = Data) ->
{keep_state,
Data#{requests := pgmp_connection:query(Arg#{requests => Requests})}};
A call to any of pgmp_connection
functions: query/1
, parse/1
, bind/1
, describe/1
or execute/1
take a map of parameters. If that map includes both a label
and requests
then the request is made using send_request/4. The response will be received as an info
message as follows:
handle_event(info, Msg, _, #{requests := Existing} = Data) ->
case gen_statem:check_response(Msg, Existing, true) of
{{reply, Reply}, Label, Updated} ->
# You have a response with a Label so that you stitch it
# back to the original request...
do_something_with_response(Reply, Label),
{keep_state, Data#{requests := Updated}};
...deal with other cases...
end.
Regards.
Peter.