Pgmp - PostgreSQL client with logical replication to ETS


pgmp is a PostgreSQL client using some of the more recent features of OTP - socket, send_request/4 and check_response/3, crypto:pbkdf2_hmac/5. With support for simple and extended query, and logical replication to ETS.

Replication to ETS is a couple of steps:

create table xy (x integer primary key, y text);
insert into xy values (1, 'foo');

Create a PostgreSQL publication for that table:

create publication xy for table xy;

The current state of the table is replicated into an ETS table also called xy:

1> ets:i(xy).
<1   > {1, <<"foo">>}
EOT  (q)uit (p)Digits (k)ill

Introspection on the PostgreSQL metadata is done by pgmp so that x is used as the primary key for the replicated ETS table.

Thereafter, CRUD changes on the underlying PostgreSQL table will be automatically pushed to pgmp and reflected in the ETS table.

An example of logical replication of a single table with a composite key:

create table xyz (x integer, y integer, z integer, primary key (x, y));
insert into xyz values (1, 1, 3);

Create a PostgreSQL publication for that table:

create publication xyz for table xyz;

With pgmp configured for replication, the stanza:

 {pgmp, [...
         {replication_logical_publication_names, <<"xy,xyz">>},

Where replication_logical_publication_names is a comma separated list of publication names for pgmp to replicate. The contents of the PostgreSQL table is replicated into an ETS table of the same name.

1> ets:i(xyz).
<1   > {{1, 1}, 3}
EOT  (q)uit (p)Digits (k)ill

Note that replication introspects the PostgreSQL table metadata so that {1, 1} (x, y) is automatically used as the composite key.

There are two mechanisms for making asynchronous requests to pmgp.

You can immediately wait for a response (via receive_response/1).

1> gen_statem:receive_response(pgmp_connection:query(#{sql => <<"select 2*3">>})).
{reply, [{row_description, [<<"?column?">>]},
         {data_row, [6]},
         {command_complete, {select, 1}}]}

Effectively the above turns an asynchronous call into a synchronous request that immediately blocks the current process until it receives the reply.

However, it is likely that you can continue doing some other important work, e.g., responding to other messages, while waiting for the response from pgmp. In which case using the send_request/4 and check_response/3 pattern is preferred.

If you’re using pgmp within another gen_* behaviour (gen_server, gen_statem, etc), this is very likely to be the option to choose. So using another gen_statem as an example:

The following init/1 sets up some state with a request id collection to maintain our outstanding asynchronous requests.

init([]) ->
    {ok, ready, #{requests => gen_statem:reqids_new()}}.

You can then use the label and requests parameter to pgmp to identify the response to your asynchronous request as follows:

handle_event(internal, commit = Label, _, _) ->
      {next_event, internal, {query, #{label => Label, sql => <<"commit">>}}}};
handle_event(internal, {query, Arg}, _, #{requests := Requests} = Data) ->
     Data#{requests := pgmp_connection:query(Arg#{requests => Requests})}};

A call to any of pgmp_connection functions: query/1, parse/1, bind/1, describe/1 or execute/1 take a map of parameters. If that map includes both a label and requests then the request is made using send_request/4. The response will be received as an info message as follows:

handle_event(info, Msg, _, #{requests := Existing} = Data) ->
    case gen_statem:check_response(Msg, Existing, true) of
        {{reply, Reply}, Label, Updated} ->
            # You have a response with a Label so that you stitch it
            # back to the original request...
            do_something_with_response(Reply, Label),
            {keep_state, Data#{requests := Updated}}; with other cases...



1 Like

Ah, only one vital detail missing :slight_smile:

Thanks @LeonardB

1 Like

Nice! All the modern features I like!

Do you have some load tests for this? Maybe a comparison with the good old epgsql library? Some documentation or article on when it is a good idea to migrate from one to another and how to do so would also be very useful :slight_smile:

1 Like

Thanks. I’ve really enjoyed the combination of send_request and check_response, allowing me to reply knowing that the caller isn’t blocked while waiting. They’re a really nice OTP recent feature. Prior to this, I was always “replying” with a cast (with the overhead of monitoring the caller/etc), which never felt quite “right”.

Load tests - performance isn’t something I’ve really worried about yet. Main focus is getting the codec for various types done - using binary types rather than text. PostgreSQL offers both binary and text representations. Binary is usually “better”, and is one of the advantages of parse, bind, execute in the extended query mode. Simple query only uses the textual representation (unless I’ve missed something in simple query) which isn’t particularly efficient (unless your data is mostly text!). Though binary mode is only documented by the C code :slight_smile: Replication uses binary mode for the initial table transactional snapshot. With a local PostgreSQL a snapshot of ~112k rows currently takes roughly 4 seconds into ETS - that is batching 5k rows (another feature of extended query). The “raw” socket protocol is handed off to a “middle man” that understands the underlying protocol, but the actual row decoding isn’t done until it reaches the destination just before it goes into ETS. Will turn to performance more once the codec has better type coverage.

Migration - I’ve also not really thought about right now (sorry). It would kind of depend on how embedded the “textual” types are - for example, table OIDs are string integers, rather than “integers”. And arrays of OIDs are a string of space separated integers (IIRC), rather than just a list of integers. If you’re using basic SQL types it is probably pretty OK :slight_smile: YMMV. It would also depend on how much send_request/check_response goodness you want too.

I have a bit of a stumble on SQL timestamps - In PostgreSQL they’re micros since {2000, 1, 1}. At the moment I’m creating a calendar:datetime tuple after munging from PG epoch to Unix, but will probably switch to a system time integer in micros. SQL Time is also micros since 00:00:00, which I’m struggling to find a decent representation for… {Hour, Minute, Second}.Micros?