Pgmp - PostgreSQL client with logical replication to ETS


pgmp is a PostgreSQL client using some of the more recent features of OTP - socket, send_request/4 and check_response/3, crypto:pbkdf2_hmac/5. With support for simple and extended query, and logical replication to ETS.

Replication to ETS is a couple of steps:

create table xy (x integer primary key, y text);
insert into xy values (1, 'foo');

Create a PostgreSQL publication for that table:

create publication xy for table xy;

The current state of the table is replicated into an ETS table also called xy:

1> ets:i(xy).
<1   > {1, <<"foo">>}
EOT  (q)uit (p)Digits (k)ill

Introspection on the PostgreSQL metadata is done by pgmp so that x is used as the primary key for the replicated ETS table.

Thereafter, CRUD changes on the underlying PostgreSQL table will be automatically pushed to pgmp and reflected in the ETS table.

An example of logical replication of a single table with a composite key:

create table xyz (x integer, y integer, z integer, primary key (x, y));
insert into xyz values (1, 1, 3);

Create a PostgreSQL publication for that table:

create publication xyz for table xyz;

With pgmp configured for replication, the stanza:

 {pgmp, [...
         {replication_logical_publication_names, <<"xy,xyz">>},

Where replication_logical_publication_names is a comma separated list of publication names for pgmp to replicate. The contents of the PostgreSQL table is replicated into an ETS table of the same name.

1> ets:i(xyz).
<1   > {{1, 1}, 3}
EOT  (q)uit (p)Digits (k)ill

Note that replication introspects the PostgreSQL table metadata so that {1, 1} (x, y) is automatically used as the composite key.

There are two mechanisms for making asynchronous requests to pmgp.

You can immediately wait for a response (via receive_response/1).

1> gen_statem:receive_response(pgmp_connection:query(#{sql => <<"select 2*3">>})).
{reply, [{row_description, [<<"?column?">>]},
         {data_row, [6]},
         {command_complete, {select, 1}}]}

Effectively the above turns an asynchronous call into a synchronous request that immediately blocks the current process until it receives the reply.

However, it is likely that you can continue doing some other important work, e.g., responding to other messages, while waiting for the response from pgmp. In which case using the send_request/4 and check_response/3 pattern is preferred.

If you’re using pgmp within another gen_* behaviour (gen_server, gen_statem, etc), this is very likely to be the option to choose. So using another gen_statem as an example:

The following init/1 sets up some state with a request id collection to maintain our outstanding asynchronous requests.

init([]) ->
    {ok, ready, #{requests => gen_statem:reqids_new()}}.

You can then use the label and requests parameter to pgmp to identify the response to your asynchronous request as follows:

handle_event(internal, commit = Label, _, _) ->
      {next_event, internal, {query, #{label => Label, sql => <<"commit">>}}}};
handle_event(internal, {query, Arg}, _, #{requests := Requests} = Data) ->
     Data#{requests := pgmp_connection:query(Arg#{requests => Requests})}};

A call to any of pgmp_connection functions: query/1, parse/1, bind/1, describe/1 or execute/1 take a map of parameters. If that map includes both a label and requests then the request is made using send_request/4. The response will be received as an info message as follows:

handle_event(info, Msg, _, #{requests := Existing} = Data) ->
    case gen_statem:check_response(Msg, Existing, true) of
        {{reply, Reply}, Label, Updated} ->
            # You have a response with a Label so that you stitch it
            # back to the original request...
            do_something_with_response(Reply, Label),
            {keep_state, Data#{requests := Updated}}; with other cases...



1 Like

Ah, only one vital detail missing :slight_smile:

Thanks @LeonardB

1 Like

Nice! All the modern features I like!

Do you have some load tests for this? Maybe a comparison with the good old epgsql library? Some documentation or article on when it is a good idea to migrate from one to another and how to do so would also be very useful :slight_smile:

1 Like

Thanks. I’ve really enjoyed the combination of send_request and check_response, allowing me to reply knowing that the caller isn’t blocked while waiting. They’re a really nice OTP recent feature. Prior to this, I was always “replying” with a cast (with the overhead of monitoring the caller/etc), which never felt quite “right”.

Load tests - performance isn’t something I’ve really worried about yet. Main focus is getting the codec for various types done - using binary types rather than text. PostgreSQL offers both binary and text representations. Binary is usually “better”, and is one of the advantages of parse, bind, execute in the extended query mode. Simple query only uses the textual representation (unless I’ve missed something in simple query) which isn’t particularly efficient (unless your data is mostly text!). Though binary mode is only documented by the C code :slight_smile: Replication uses binary mode for the initial table transactional snapshot. With a local PostgreSQL a snapshot of ~112k rows currently takes roughly 4 seconds into ETS - that is batching 5k rows (another feature of extended query). The “raw” socket protocol is handed off to a “middle man” that understands the underlying protocol, but the actual row decoding isn’t done until it reaches the destination just before it goes into ETS. Will turn to performance more once the codec has better type coverage.

Migration - I’ve also not really thought about right now (sorry). It would kind of depend on how embedded the “textual” types are - for example, table OIDs are string integers, rather than “integers”. And arrays of OIDs are a string of space separated integers (IIRC), rather than just a list of integers. If you’re using basic SQL types it is probably pretty OK :slight_smile: YMMV. It would also depend on how much send_request/check_response goodness you want too.

I have a bit of a stumble on SQL timestamps - In PostgreSQL they’re micros since {2000, 1, 1}. At the moment I’m creating a calendar:datetime tuple after munging from PG epoch to Unix, but will probably switch to a system time integer in micros. SQL Time is also micros since 00:00:00, which I’m struggling to find a decent representation for… {Hour, Minute, Second}.Micros?


This is exciting! Nice work.



Version 0.8.0 of pgmp, includes some recent new PostgreSQL 15 features adding row filters and column lists to logical streaming replication into ETS.

Fuller details are here, with a couple of examples below:

Column Lists

create table t1 (id int,
                 a text,
                 b text,
                 c text,
                 d text,
                 e text,
                 primary key(id));

Create a publication p1, with a column list to reduce the number of columns that will be replicated:

create publication p1
  for table t1 (id, a, b, d);

Insert some test data into the newly created t1 table:

insert into t1
     (1, 'a-1', 'b-1', 'c-1', 'd-1', 'e-1'),
     (2, 'a-2', 'b-2', 'c-2', 'd-2', 'e-2'),
     (3, 'a-3', 'b-3', 'c-3', 'd-3', 'e-3');

As part of the replication process pgmp introspects the table metadata for t1 determining that column id is the primary key. This is the column that is used as the primary key in ETS replica of t1. The contents of the replicated table:

2> lists:sort(ets:tab2list(t1)).
[{1, <<"a-1">>, <<"b-1">>, <<"d-1">>},
 {2, <<"a-2">>, <<"b-2">>, <<"d-2">>},
 {3, <<"a-3">>, <<"b-3">>, <<"d-3">>}]

Any changes on the PostgreSQL database will now be replicated in real-time to the ETS replica.

insert into t1
  values (4, 'a-4', 'b-4', 'c-4', 'd-4', 'e-4');

Meanwhile in pgmp:

3> lists:sort(ets:tab2list(t1)).

Row Filters

create table t2 (a int,
                 b int,
                 c text,
                 primary key(a, c));

Creating a publication p2 with a row filter:

create publication p2
  for table t2
  where (a > 5 and c = 'NSW');

With some initial data:

insert into t2
    (2, 102, 'NSW'),
    (3, 103, 'QLD'),
    (4, 104, 'VIC'),
    (5, 105, 'ACT'),
    (6, 106, 'NSW'),
    (7, 107, 'NT'),
    (8, 108, 'QLD'),
    (9, 109, 'NSW');

Only those rows that match the filter are replicated where (a > 5 and c = 'NSW'):

1> lists:sort(ets:tab2list(t2)).
[{{6, <<"NSW">>}, 109},
 {{9, <<"NSW">>}, 109}]

Updates are streamed in real-time into ETS:

update t2 SET b = 999 where a = 6;
2> lists:sort(ets:tab2list(t2)).
[{{6, <<"NSW">>}, 999},
 {{9, <<"NSW">>}, 109}]

Changing data so that it meets the row filter will be replicated in real-time into ETS:

update t2 set a = 555 where a = 2;
3> lists:sort(ets:tab2list(t2)).

[{{6, <<"NSW">>}, 999},
 {{9, <<"NSW">>}, 109},
 {{555, <<"NSW">>}, 102}]

Similarly changing data so that it no longer meets the row filter will be replicated in real-time into ETS:

update t2 set c = 'VIC' where a = 9;
4> lists:sort(ets:tab2list(t2)).

[{{6, <<"NSW">>}, 999},
 {{555, <<"NSW">>}, 102}]



@shortishly if i update my ETS table using ets:xyz, does pgmp reflect the change into Postgres?
I’d like to persist all my ETS tables to some durable storage to survive crashes, poweroff…
Is pgmp a viable solution for my case?

1 Like

Hi @zabrane,

Updates to ets:xyz are not reflected into PostgreSQL. Data is being replicated from PostgreSQL into ETS one way. Effectively, ets:xyz is a read only copy of data held in PostgreSQL. You could however, make a request to PostgreSQL via pgmp to update xyz set x = X, which would then result in your ets:xyz being updated through replication.

If you’re thinking about durable ETS storage (in no particular order):

Which to choose, will really depend on the detail of your use case.


@shortishly thanks. I’m aware of all the alternatives and i was looking for something else.