Pgmp - PostgreSQL client with logical replication to ETS


pgmp is a PostgreSQL client using some of the more recent features of OTP - socket, send_request/4 and check_response/3, crypto:pbkdf2_hmac/5. With support for simple and extended query, and logical replication to ETS.

Replication to ETS is a couple of steps:

create table xy (x integer primary key, y text);
insert into xy values (1, 'foo');

Create a PostgreSQL publication for that table:

create publication xy for table xy;

The current state of the table is replicated into an ETS table also called xy:

1> ets:i(xy).
<1   > {1, <<"foo">>}
EOT  (q)uit (p)Digits (k)ill

Introspection on the PostgreSQL metadata is done by pgmp so that x is used as the primary key for the replicated ETS table.

Thereafter, CRUD changes on the underlying PostgreSQL table will be automatically pushed to pgmp and reflected in the ETS table.

An example of logical replication of a single table with a composite key:

create table xyz (x integer, y integer, z integer, primary key (x, y));
insert into xyz values (1, 1, 3);

Create a PostgreSQL publication for that table:

create publication xyz for table xyz;

With pgmp configured for replication, the stanza:

 {pgmp, [...
         {replication_logical_publication_names, <<"xy,xyz">>},

Where replication_logical_publication_names is a comma separated list of publication names for pgmp to replicate. The contents of the PostgreSQL table is replicated into an ETS table of the same name.

1> ets:i(xyz).
<1   > {{1, 1}, 3}
EOT  (q)uit (p)Digits (k)ill

Note that replication introspects the PostgreSQL table metadata so that {1, 1} (x, y) is automatically used as the composite key.

There are two mechanisms for making asynchronous requests to pmgp.

You can immediately wait for a response (via receive_response/1).

1> gen_statem:receive_response(pgmp_connection:query(#{sql => <<"select 2*3">>})).
{reply, [{row_description, [<<"?column?">>]},
         {data_row, [6]},
         {command_complete, {select, 1}}]}

Effectively the above turns an asynchronous call into a synchronous request that immediately blocks the current process until it receives the reply.

However, it is likely that you can continue doing some other important work, e.g., responding to other messages, while waiting for the response from pgmp. In which case using the send_request/4 and check_response/3 pattern is preferred.

If you’re using pgmp within another gen_* behaviour (gen_server, gen_statem, etc), this is very likely to be the option to choose. So using another gen_statem as an example:

The following init/1 sets up some state with a request id collection to maintain our outstanding asynchronous requests.

init([]) ->
    {ok, ready, #{requests => gen_statem:reqids_new()}}.

You can then use the label and requests parameter to pgmp to identify the response to your asynchronous request as follows:

handle_event(internal, commit = Label, _, _) ->
      {next_event, internal, {query, #{label => Label, sql => <<"commit">>}}}};
handle_event(internal, {query, Arg}, _, #{requests := Requests} = Data) ->
     Data#{requests := pgmp_connection:query(Arg#{requests => Requests})}};

A call to any of pgmp_connection functions: query/1, parse/1, bind/1, describe/1 or execute/1 take a map of parameters. If that map includes both a label and requests then the request is made using send_request/4. The response will be received as an info message as follows:

handle_event(info, Msg, _, #{requests := Existing} = Data) ->
    case gen_statem:check_response(Msg, Existing, true) of
        {{reply, Reply}, Label, Updated} ->
            # You have a response with a Label so that you stitch it
            # back to the original request...
            do_something_with_response(Reply, Label),
            {keep_state, Data#{requests := Updated}}; with other cases...



1 Like

Ah, only one vital detail missing :slight_smile:

Thanks @LeonardB

1 Like

Nice! All the modern features I like!

Do you have some load tests for this? Maybe a comparison with the good old epgsql library? Some documentation or article on when it is a good idea to migrate from one to another and how to do so would also be very useful :slight_smile:

1 Like

Thanks. I’ve really enjoyed the combination of send_request and check_response, allowing me to reply knowing that the caller isn’t blocked while waiting. They’re a really nice OTP recent feature. Prior to this, I was always “replying” with a cast (with the overhead of monitoring the caller/etc), which never felt quite “right”.

Load tests - performance isn’t something I’ve really worried about yet. Main focus is getting the codec for various types done - using binary types rather than text. PostgreSQL offers both binary and text representations. Binary is usually “better”, and is one of the advantages of parse, bind, execute in the extended query mode. Simple query only uses the textual representation (unless I’ve missed something in simple query) which isn’t particularly efficient (unless your data is mostly text!). Though binary mode is only documented by the C code :slight_smile: Replication uses binary mode for the initial table transactional snapshot. With a local PostgreSQL a snapshot of ~112k rows currently takes roughly 4 seconds into ETS - that is batching 5k rows (another feature of extended query). The “raw” socket protocol is handed off to a “middle man” that understands the underlying protocol, but the actual row decoding isn’t done until it reaches the destination just before it goes into ETS. Will turn to performance more once the codec has better type coverage.

Migration - I’ve also not really thought about right now (sorry). It would kind of depend on how embedded the “textual” types are - for example, table OIDs are string integers, rather than “integers”. And arrays of OIDs are a string of space separated integers (IIRC), rather than just a list of integers. If you’re using basic SQL types it is probably pretty OK :slight_smile: YMMV. It would also depend on how much send_request/check_response goodness you want too.

I have a bit of a stumble on SQL timestamps - In PostgreSQL they’re micros since {2000, 1, 1}. At the moment I’m creating a calendar:datetime tuple after munging from PG epoch to Unix, but will probably switch to a system time integer in micros. SQL Time is also micros since 00:00:00, which I’m struggling to find a decent representation for… {Hour, Minute, Second}.Micros?


This is exciting! Nice work.



Version 0.8.0 of pgmp, includes some recent new PostgreSQL 15 features adding row filters and column lists to logical streaming replication into ETS.

Fuller details are here, with a couple of examples below:

Column Lists

create table t1 (id int,
                 a text,
                 b text,
                 c text,
                 d text,
                 e text,
                 primary key(id));

Create a publication p1, with a column list to reduce the number of columns that will be replicated:

create publication p1
  for table t1 (id, a, b, d);

Insert some test data into the newly created t1 table:

insert into t1
     (1, 'a-1', 'b-1', 'c-1', 'd-1', 'e-1'),
     (2, 'a-2', 'b-2', 'c-2', 'd-2', 'e-2'),
     (3, 'a-3', 'b-3', 'c-3', 'd-3', 'e-3');

As part of the replication process pgmp introspects the table metadata for t1 determining that column id is the primary key. This is the column that is used as the primary key in ETS replica of t1. The contents of the replicated table:

2> lists:sort(ets:tab2list(t1)).
[{1, <<"a-1">>, <<"b-1">>, <<"d-1">>},
 {2, <<"a-2">>, <<"b-2">>, <<"d-2">>},
 {3, <<"a-3">>, <<"b-3">>, <<"d-3">>}]

Any changes on the PostgreSQL database will now be replicated in real-time to the ETS replica.

insert into t1
  values (4, 'a-4', 'b-4', 'c-4', 'd-4', 'e-4');

Meanwhile in pgmp:

3> lists:sort(ets:tab2list(t1)).

Row Filters

create table t2 (a int,
                 b int,
                 c text,
                 primary key(a, c));

Creating a publication p2 with a row filter:

create publication p2
  for table t2
  where (a > 5 and c = 'NSW');

With some initial data:

insert into t2
    (2, 102, 'NSW'),
    (3, 103, 'QLD'),
    (4, 104, 'VIC'),
    (5, 105, 'ACT'),
    (6, 106, 'NSW'),
    (7, 107, 'NT'),
    (8, 108, 'QLD'),
    (9, 109, 'NSW');

Only those rows that match the filter are replicated where (a > 5 and c = 'NSW'):

1> lists:sort(ets:tab2list(t2)).
[{{6, <<"NSW">>}, 109},
 {{9, <<"NSW">>}, 109}]

Updates are streamed in real-time into ETS:

update t2 SET b = 999 where a = 6;
2> lists:sort(ets:tab2list(t2)).
[{{6, <<"NSW">>}, 999},
 {{9, <<"NSW">>}, 109}]

Changing data so that it meets the row filter will be replicated in real-time into ETS:

update t2 set a = 555 where a = 2;
3> lists:sort(ets:tab2list(t2)).

[{{6, <<"NSW">>}, 999},
 {{9, <<"NSW">>}, 109},
 {{555, <<"NSW">>}, 102}]

Similarly changing data so that it no longer meets the row filter will be replicated in real-time into ETS:

update t2 set c = 'VIC' where a = 9;
4> lists:sort(ets:tab2list(t2)).

[{{6, <<"NSW">>}, 999},
 {{555, <<"NSW">>}, 102}]



@shortishly if i update my ETS table using ets:xyz, does pgmp reflect the change into Postgres?
I’d like to persist all my ETS tables to some durable storage to survive crashes, poweroff…
Is pgmp a viable solution for my case?


Hi @zabrane,

Updates to ets:xyz are not reflected into PostgreSQL. Data is being replicated from PostgreSQL into ETS one way. Effectively, ets:xyz is a read only copy of data held in PostgreSQL. You could however, make a request to PostgreSQL via pgmp to update xyz set x = X, which would then result in your ets:xyz being updated through replication.

If you’re thinking about durable ETS storage (in no particular order):

Which to choose, will really depend on the detail of your use case.


@shortishly thanks. I’m aware of all the alternatives and i was looking for something else.

1 Like


I maintain pgo which I created pg_types for. I thought it’d be great if the Erlang postgres clients shared an implementation of encoding/decoding types since there is a lot we can differ on in the rest of the implementation of a client but when it comes to type encoding/decoding it is mostly the same.

I still need to look closer at pgmp but plan too – I’d love to find it meets my needs and I can sunset pgo :slight_smile:

One thing I think is missing from my quick look is the ability to hook in any instrumentation.


Thanks @tsloughter

PG… Oh! Nice! Maybe pgmp should have been PGO2 :slight_smile:

I’ll take a look at pg_types too - I spent a while with pgmp working on property based tests for the various types, especially checking for precision of real (6 decimal digits), double precision (15), numeric, which certainly found some sharp edges in the code! Also, the x_log_data types for replication, my main motivation/justification for pgmp.

Instrumentation/observability - it has been on my TODO for a while. I need to kick the tyres on Telemetry. Happy to collaborate on this if that could work for you.

1 Like

Nice, the property tests of pgmp made me the most interested – pg_types is tested through pgo which has manually done, not very good, tests :slight_smile:

I’m happy to collaborate on instrumentation. telemetry is probably the best way to go to provide an easy way for people to hook in any metrics library. I’d suggest still following opentelemetry-specification/ at main · open-telemetry/opentelemetry-specification · GitHub and opentelemetry-specification/ at main · open-telemetry/opentelemetry-specification · GitHub for some of what data to provide to the user. Those conventions are the ones I’d use in an OpenTelemetry adapter that uses the pgmp telemetry events to report traces and metrics.

In case there is any confusion, telemetry and OpenTelemetry are entirely separate :slight_smile:telemetry provides a generic event framework, OpenTelemetry is a full trace/metrics/logs spec and implementation.


First pass of pgmp with telemetry:

  • socket (low level recv/send);
  • the middleman (higher level PostgreSQL protocol); and
  • the logical replication layer.

The major spans (start/stop) for the simple and extended query are all present. All are listed in the readme.

I’m planning to do another pass through again in a week or so, picking up telemetry for errors (socket failures/etc) and (if) any other feedback.

Is there a “standard” way of naming (and the types of) telemetry events? For example, I’ve left the duration on a span as native time, rather than micro/millis/etc. The only real template I’ve found is cowboy_telemetry.

I’ve written the events to support metrics, rather than tracing… because that is my itch :slight_smile: Not sure if I’ve missed anything as a result.

I’ve not integrated pgmp with any telemetry adapters, so it is just generating events - I think that’s the right thing to do for pgmp, so that it doesn’t prescribe a particular adapter/environment. There is an extension point in pgmp to supply the module used to handle telemetry events using application (or environment variables) with an example here.

I’ve written some telemetry adapters in pgec that integrate with Prometheus/Victora here:

1 Like

I’d suggest fewer events. I don’t think anyone will want to create a span for bind/describe but instead just the query as a whole and may want metrics about stuff like the time bind took.


I’m a bit late to the party but I came across your gen_statem blogposts via here and found this library! The ETS replication via logical is a really cool application of logical replication.

I was reading a fair bit through the code yesterday and I hope you don’t mind me asking: What made you go for using gen_statem? I haven’t seen it used much (maybe I have been looking in the wrong spots) but your blog posts on it made me curious. Looking at the code, I found the query and connection logic as well as ETS replication logic relatively easy to follow but it was an approach I did not see before. If I understood it properly, you queue outstanding requests by connection owner in here and then return the results to the client as they come in from the backing connection.

Is the benefit of using gen_statem performance? Or maintainability of the code: you can more easily reason about how to deal with queries in different states of the connection?

Sorry for my noob-ish questions but I’m curious to learn :slight_smile:


Thanks for taking the time to look at the code, the comments and great questions!

why use statem?

statem provides a generic state machine behaviour that has replaced its predecessor fsm since Erlang/OTP 20. In OTP terms that is still pretty fresh :slight_smile:

If you’re thinking “I don’t have states, therefore it must be a server” - then think again - especially if you’re dealing with timers or connecting with resources on demand, you can also create your own events (a bit like calling a local function) which can be traced and logged via sys - which is great for understanding the events that led into a postmortem.

Also, while the default timeout of 5 seconds in server-call-3 is perfect for finding deadlock in development, but (probably) isn’t something you want in production. It is a real pain to remember, going through and changing every server call. Whereas, statem uses infinity as the default in statem-call-3.

For example, PGMP uses postpone to create the sockets to PostgreSQL in lazy way, that is easy to do in statem, that aren’t possible in server without writing the code yourself. You can also change the callback module being used, which is great for parts of the protocol that are isolated: while authenticating it uses a different callback module depending on the method being used (all the protocol for MD5 is in a separate module to SASL).

I think of statem just as a better server that happens to have states. It can have a steep learning curve, but worth the investment (repeated reading of design principles helped my understanding a lot). I find it easier to understand, than a server emulating parts of a statem, but then I’m also not supporting OTP prior to 25.

queuing requests

Both call and cast are the traditional mechanisms of making a request to an Erlang/OTP service. Available since OTP 23, send_request allows you to make an asynchronous (similar to cast) request and later receive a reply (similar to call). Cake and eat it! PGMP uses this a lot. You are correct, in the connection manager (amongst other places), the send_request is asynchronously sending requests and receiving replies - they’re stitched together with a label (the third parameter in your link).


Thank you very much for the insightful reply, it helped a lot :slight_smile: After some tinkering, I fully agree that it’s worth the investment of learning!


I’m interested in the built-in pool functionality. Having some metrics regarding that would be nice too, to be able to figure out pool utilization over time.


Sorry, looks like I missed the pool off that list.

Just look for {telemetry, Name, Map} and you’ll see where events are being raised.

As an example pgmp/pgmp_connection.erl at develop · shortishly/pgmp · GitHub

handle_event({call, _},
             {request, _},
             #{max := Max,
               owners := Owners,
               connections := Connections})
             when map_size(Connections) + map_size(Owners) < Max ->
    {ok, _} = pgmp_pool_sup:start_child(),
      nei({telemetry, new, #{count => 1}})]};

handle_event({call, _}, {request, _}, drained, _) ->
      nei({telemetry, postponed, #{count => 1}})]};

Where “nei(X)” is {next_event, internal, X}.

The first clause is where we can spawn a new connection because the pool is smaller than the number of allowed connections, a telemetry event for pgmp_connection, “new” with a count of 1 will be raised (the request is postponed, but when the new connection is ready we transition to available, allowing the first queued request to take the connection).

The second clause is where the pool is larger than the allowed connections, in this case we raise a pgmp_connection, “postponed” telemetry event also with a count of 1 (if and when the statem transitions to “available” this request will be retried and if there is a connection available it can continue).

Note that: pgmp just creates the telemetry events, you will need to attach listeners that connect to your monitoring of choice.

For a concrete example: pgec has adapters for prometheus/Victoria metrics. If you run make shell on pgec:

3> [M || #{name := Name} = M <- metrics:all(), string:prefix(atom_to_list(Name), "pgmp_connection") /= nomatch].
[#{label => #{},name => pgmp_connection_reservations_size,
   type => gauge,value => 0},
 #{label => #{},name => pgmp_connection_pool_size,
   type => gauge,value => 5},
 #{label => #{},name => pgmp_connection_owners_size,
   type => gauge,value => 0},
 #{label => #{},name => pgmp_connection_outstanding_size,
   type => gauge,value => 0},
 #{label => #{},name => pgmp_connection_new_count,
   type => counter,value => 1},
 #{label => #{},name => pgmp_connection_monitors_size,
   type => gauge,value => 0},
 #{label => #{},name => pgmp_connection_connections_size,
   type => gauge,value => 1}]

The example in pgec also has Grafana dashboards setup, which are simplest to see by checking out that project, browsing the Quickstart in and running:

docker compose --profile all up --detach --remove-orphans