A shared FIFO queue is implemented using ETS

Um… huh? Our shq stores nothing in the pdict, it uses an ets table to store values.

That is probably because in is an async cast, while out is a call that waits for a response.

The process does not really manage the data, it manages access to the data, which is stored in an ets table it owns.

Basically,the process has two integer pointers (counters), one for the front of the queue, one for the rear. The values are stored in the ets table with continuous keys between those two pointers, this way we can use a set (O(1)) table and don’t need ordered_set (O(log(n)).

As there are two updates involved in this process (counter update and insert or take), this needs to be synchronized, otherwise there are race conditions which may corrupt the queue, leading to holes and orphans.

If you do an in, the value is stored with the current rear counter being the key, and the counter is then increased.
If you do an out, the value at the current front counter is taken, and then the front counter is increased, then the taken value is returned.
If both counters are equal, the queue is empty.

Like this:

  • after shq:start the queue is empty:
    • the data table D is empty <>
    • the front counter F is 0
    • the rear counter R is 0
  • with an shq:in(Shq, foo), this happens:
    • an object is inserted in the data table D, with current R as the key: <{0, foo}>
    • the front counter stays at 0
    • the rear counter is increased to 1
  • with an shq:out(Shq), this happens:
    • the object with the current value of F is taken from the data table D, so value V is foo, and the data table D is empty again <>
    • the front counter is increased to 1
    • the rear counter stays at 1
    • V is returned

The reverse (*_r) functions do basically the same, but the roles of F and R are reversed in that case.


Not sure I explained that well… :thinking: Anyway. The crucial point is that it is guaranteed that there are no values (rather, keys) in the data table below the front counter and also none above the rear counter (both would be inaccessible orphans, ie memory leaks), and all keys between them are continuous (so, no holes). This way, we can predict the keys with only those two counters, and don’t have to rely on the internal ordering of ordered_set.

2 Likes

Although set(1) ordered_set(o(logn)) I tested and found that their efficiency is not very different, so there is not much difference between using ordered_set and ets to implement fifo queues.
But ets’s first last is much slower than ordered_set’s
The info(Tid, size) of ordered_set is very slow

You use one process to manage two counts. It is very effective, but it has an additional layer of message interaction.

2 Likes

Using ordered_set with write_concurrency true also enables decentralized_counters by default, you can disable that option to make info(Tid, size) much faster. Details in the blog post: Decentralized ETS Counters for Better Scalability - Erlang/OTP

3 Likes

It seems to be the best option
thanks

2 Likes

That’s the point of processes though, synchronization points. You’d have to synchronize it some other way anyway, could do it in ETS but that’s a lot of manual management to deal with and isn’t going to be able to apply backpressure, in addition processes can proactively send to listeners and can do lots of other things.

Yeah this is what I’m exceptionally curious about…

Yeah I worry about that too, you can’t synchronize multiple calls to ETS without going through a process, a process is a mutex by virtue of its messages.


Personally I’m not even sure I’d use ETS, just a process holding the state itself. If you need it to serialize it out it can, if you need to be able to register a listener/monitor for when something enters you easily can, you don’t have data race conditions, etc… etc… In addition you still get backpressure and system handling and all, plus better tracing! There’s not really a CAS-like instruction on ETS to build a mutex that way, using atomics you could but that’s a lot of work to be able to do right without data race issues, more than you might think could be the issue, and it means you are basically stuck in not being able to be multi-node later either.

2 Likes

Based on the simple test results, USING ETS is better than using single process management in terms of speed and memory, so I finally choose ETS implementation.
If you consider multiple nodes, I believe you can do it over RPC

1 Like

FWIW, the Feuerlabs KVDB database had support for queues - FIFO, LIFO, keyed, priority, etc.
http://www.erlang-factory.com/conference/SFBay2013/speakers/UlfWiger
https://github.com/Feuerlabs/kvdb

The design might be interesting for some. We used a particular key structure that allowed us to handle tons of separate queues in a single table, and efficiently push, pop and iterate - even skip through subtrees efficiently. This was used to manage device configuration trees.

The queue system was also used to implement a CRON-style persistent scheduling system.

The code hasn’t been actively maintained for some years, but I’d be open to help move it forward into working order.

BR,
Ulf

6 Likes

I watched it, it was very interesting, thank you very much for your suggestion

2 Likes

Yes, it would, because there’s no synchronization on it then, which is the issue and likely to cause odd bugs randomly.

2 Likes

In Erlang, everything (with a few unfortunate exceptions like modules/persistent_terms) is owned by a process. ETS tables also has a single owner process. When that process terminates, table is either deleted, or passed to the heir process.

So when you access an ETS table, you can think of accessing table owner process state directly, circumventing message passing mechanisms (and therefore losing lineariseability).

That said, even if ETS is utilised only to break lineariseability, it still needs to have an owning process.

Unrelated question, I wonder what is the purpose of implementing a FIFO queue in Erlang, when every single process has a mailbox that is exactly the FIFO queue?

3 Likes

My initial purpose is to write a eralng 's Factories and workers [GitHub - ErlGameWorld/eFaw: eralng's Factories and workers.]. I’ve finished it. You can read it.
I think some other process sends tasks to the factory, and then the workers in the factory automatically work.

2 Likes

Hm, not quite, in my understanding :wink:

The typical use case for such a shared queue is a number of n producers that genereate and insert tasks in the queue, and a number of m workers that fetch tasks from the queue and execute them.

To achieve this via only the message queues, the producers would have to know the workers and manage them to some degree, like when workers shut down or new ones are started.

Also, once something is pushed to a particular worker, it can’t be “fetched back”, for example when a worker takes very long for a task and has more tasks queued up after them, while other workers are idle. In that same line, if a particular worker crashes while executing some task, he will take all other (later) tasks in his queue with him.

Finally, the message queue is also used for all sorts of other stuff the worker may do, not just queued tasks, so he would either have to resort to selective receives that leave the queued tasks in the message queue, or store them internally.

2 Likes

We followed up the for-fun implementation shq which we started earlier (see this post) for a while.

It is slower than what the other ideas here that circumvent going through processes and access an ETS table directly bring to the table. But it is comparatively easy to use (much like the queue module, but, you know, shared), and it guarantees consistency (because it works through a managing process).

The managing process is a gen_server. We played with implementing it as a special process in order to remove the gen_server overhead, but scrapped it again. While there is a small performance gain, it is not all that great, and manually implementing what gen_server offers is a drag :sweat_smile:

So we left the initial emphasis on speed behind and instead added some (IMO) nice features :wink: On top of the “instant” out/out_r, there is also out_wait/out_r_wait which can be used to wait (block the caller) until an item arrives in the case that the queue is currently empty, and out_nowait/out_r_nowait which basically does the same without blocking the caller (when an item arrives, it is sent to the caller in a message).

I have a feeling that something like this has been done before, and not only once. But it is fun :smiley:
(And no, still no docs or tests :stuck_out_tongue_winking_eye: Specs are there now, though)

4 Likes

I see! So it’s a “pool of workers” pattern. I do have some reservations about the whole premise of implementing such a pattern in Erlang. In my experience the primary purpose of it would be to limit concurrency. That is, "no more than m workers can run concurrently. Essentially a semaphore synchronisation primitive.

This scenario is only necessary when producers don’t get any backpressure and can generate unbounded amount of tasks. If there is a way to slow producers down (exert backpressure), “worker pool” pattern may be avoided altogether.

Interestingly, in older OTP (up to some R18 or so) there was a basic backpressure mechanism for message sending. Reduction cost of sending a message to backlogged process correlated to the receiver message queue length. I don’t remember reasoning why it has been removed.

3 Likes

That’s the point, yes. Or you may think of it as a task buffer, such that sudden bursts of work don’t hit a limited underlying resource all at once.

In the company I work for, we receive such bursts from the webserver. Like, in the morning around the time when everybody out there turns on their computer and checks their E-Mails. We need to answer each request quickly, but the actual tasks resulting from it (which involve some rummaging around in databases) can be done somewhat later (ie, not immediately).

This scenario is only necessary when producers don’t get any backpressure and can generate unbounded amount of tasks. If there is a way to slow producers down (exert backpressure), “worker pool” pattern may be avoided altogether.

Yes, but in our case we don’t want that, because exerting backpressure on the producers would mean exerting backpressure on the users, which we want to avoid (as much as possible).

That aside, a backpressure mechanism for this kind of queue is a good idea, too. It would esserntially mean to limit the number of tasks that can be enqueued and rejecting in operations if the queue is full.

2 Likes

We put some effort in this after all :slight_smile: and now finished the first version of shq. If you want to try it out, it is available on GitHub (GitHub - hnc-agency/shq) and Hex (shq | Hex). Enjoy :smiley:

8 Likes

it’s great. I thought you might be interested in building a generic Erlang asynchronous network library. Are you interested in looking into this? GitHub - ErlGameWorld/eNet: erlang's net library :grinning: :grin: :grin: :grin: :grin: * I sometimes like to find people to work on things with

2 Likes

I’ll look into it tomorrow. But at a first glance, I see a lot of Chinese, of which I know nothing at all :sweat_smile:

2 Likes