A shared FIFO queue is implemented using ETS

I recently needed to use a multi-process shared FIFO, I thought of some ways to implement it 1 Lock free queue, but Erlang does not support CAS 2 implement one using NIF, I found some nIF implementations on Github, But it is not the first choice. 3 Implement like this using ETS


But I think the do_out/1 function may be idling many times during concurrent calls,
The way to improve is to implement ets similar to first_take/1 last_take/1 rand_take/1

3 Likes

Have you looked at Erlang queue module?

https://www.erlang.org/doc/man/queue.html

3 Likes

Yes I have seen it, but I want to have a shared FIFO queue, but I don’t want to use a process to manage it and then call out call in

3 Likes

Don’t you still need a process to manage the ETS table now?

2 Likes
  • Normally I would create this queue in the Administrator process
2 Likes
  • It is easy to implement a process pool or work pool based on this shared queue

    So can add ets:first_take ets:last_take to make it even better
2 Likes

Busy looping like do_out does is very very expensive. If the item isn’t in the queue, consider using something like process groups (pg) to send a message when a new item is inserted, so processes can wait for that message rather than loop constantly.

I also think using a process to manage the queue would work better. RabbitMQ does this, each queue is a process. Internally the process can still use ETS as the queue implementation if you’d like to avoid GC latency.

4 Likes

Thank you for your suggestion. I saw the implementation of pg, which is implemented by gen_server + ets, maybe this is not what I want. one gen_server: call need 1us, one read/write fifo ets may need 200-350ns, and when high concurrency, making a large number of concurrent requests to a process is not what I want to see. If do_out is expensive in concurrency, the main reason There is still no function like first_take, maybe using nif to implement a lock-free queue replacement is the last choice. Finally, about loop, I think you can use a FIFO queue to store idle and waiting workers, when inserting work At that time, if find that there is a sleeping process in this queue, out a process and wake it up to let it to work.

2 Likes

Hi :slight_smile:

I can understand the requirement for a shared queue, that would indeed be a thing nice to have. But why the requirement for such extreme speed, ie where 200-350ns vs 1µs makes a difference?

I don’t know what you are planning to use this for, but in a general sense the primary use case for something like a shared queue is a producer/consumer setup. Producers create and enqueue tasks, consumers (workers) dequeue tasks and execute them, you know the pattern. But typically, the tasks are comparatively expensive and long-running, so I have doubts that a difference of 650-800ns in fetching a task would make a noticeable difference.

You might have something entirely different in mind, though. As you are willing to resort to implementing NIFs to gain speed, it looks like you desperately need it :wink: But I can’t imagine what for, so please tell me :slight_smile:

3 Likes

Shouldn’t that be erlang:unique_integer([monotonic])? Because without monotonic, it is only guaranteed that you will get a unique integer, not that it will be larger than any unique integer that was generated before, which is a requirement to guarantee queue ordering.
Using monotonic makes this a very expensive operation, however :sweat: OTOH, in-speed may be of lesser importance than out-speed?


Another drawback (IMO :sweat_smile:) of this solution is that it can only be used locally, ie processes wanting to call in/out will have to reside on the same node where new was initially called, because ETS tables are local :confused:


Anyway… my interest is sparked, I’m getting some funny ideas in my head (involving processes, though :wink:), and I might just give it a stab :laughing:

4 Likes

Count me in, I’m bored ATM :grinning_face_with_smiling_eyes:

2 Likes

In fact, the producer/consumer may not care about those few hundred ns. I just wanted to write a faster queue for underlying support, and I ended up implementing a project based on that queue: GitHub - ErlGameWorld/eFaw: eralng's Factories and workers. . It’s not fully written yet, Its operating mode is different from poolboy GitHub - devinus/poolboy: A hunky Erlang worker pool factory.

2 Likes

In a short-time concurrent enqeue, strict order is not required, but a unique id in order

2 Likes

I used ETS to implement a FIFO but ETS does not support first_take last_take. I wanted a more perfect implementation. I thought NIF would be faster, but the test used too much memory. Then I have implemented ets:first_take ets:last_take. Just like this:

/*
** first_take(Tab, Key)
/
BIF_RETTYPE ets_first_take_1(BIF_ALIST_1)
{
DbTable
tb;
int cret;
Eterm first_key;
Eterm ret;
CHECK_TABLES();

DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_take_2);

cret = tb->common.meth->db_first(BIF_P, tb, &first_key);
ASSERT(cret == DB_ERROR_NONE); (void)cret;

cret = tb->common.meth->db_take(BIF_P, tb, first_key, &ret);
ASSERT(cret == DB_ERROR_NONE); (void)cret;

db_unlock(tb, LCK_WRITE_REC);
BIF_RET(ret);

}

/*
** last_take(Tab, Key)
/
BIF_RETTYPE ets_last_take_1(BIF_ALIST_1)
{
DbTable
tb;
int cret;
Eterm last_key;
Eterm ret;
CHECK_TABLES();

DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_take_2);

cret = tb->common.meth->db_last(BIF_P, tb, &last_key);
ASSERT(cret == DB_ERROR_NONE); (void)cret;

cret = tb->common.meth->db_take(BIF_P, tb, last_key, &ret);
ASSERT(cret == DB_ERROR_NONE); (void)cret;

db_unlock(tb, LCK_WRITE_REC);
BIF_RET(ret);

}

that’s the easiest way to do it. It has a better implementation, But if I want to write, it will take a long time,Do you want to try?

2 Likes

FWIW, this is what we came up with in a hurry: GitHub - hnc-agency/shq (no docs, no tests, no specs, just the bare implementation :sweat_smile:).

Downside: It uses a managing (per queue) gen_server process, which is what you tried to avoid. However, the gen_server overhead could be removed if necessary, implementing it as a streamlined special process, but for a quick implementation we went with gen_server.

Upsides that (should) make up for that are…

  • constant time (O(1)) access for all operations (both insertion and retrieval, both front and rear) because we avoided the ordered_set table type and used set instead
  • avoiding the (arguably expensive) erlang:unique_integer altogether while ensuring queue order
  • making it work across nodes, ie removing the local node restriction (or rather, this comes for free because we are using a process)
  • no need for a named table (but the managing process can be named if you so wish)
  • fully OTP compliant, ie it can be used in a supervision tree

Aside from the gen_server overhead, I believe this is as fast as we can get with the things we already have at our hands, ie without resorting to NIF magic or implementing new functions in ets :wink:

3 Likes

Interesting. Crazy thought, but since you’re using simple integer values to track the head/tail keys could you not use two counters in ets and therefore bypass the gen_server cast/call requirement entirely? Ie using ets:update_counter for the keys

2 Likes

I looked at the implementation of SHQ, it is great. now I’m here at 12 o 'clock at night.I’ll run some tests tomorrow to compare the speed.

erlang:unique_integer This function doesn’t cost much, look this:

utTc:tm(1000, 100, erlang, unique_integer, []).
=====================
execute Args:[]
execute Fun :unique_integer
execute Mod :erlang
execute LoopTime:100
execute ProcCnts:1000
PMaxTime: 40098(ns) 0.00004(s)
PMinTime: 8715(ns) 0.000009(s)
PSumTime: 9708507(ns) 0.009709(s)
PAvgTime: 9708.507(ns) 0.00001(s)
FAvgTime: 97.08507(ns) 0.0(s)
PGrar : 170(cn) 0.17(%)
PLess : 830(cn) 0.83(%)
=====================

FwQueue can also cross nodes like this
erpc:cast(Node, fwQueue, in, [QName, QData]).

2 Likes

I say a word you can use this module: eUtils/utTc.erl at master · ErlGameWorld/eUtils · GitHub it can easily test function and concurrent execution time performance

2 Likes

Obviously without tests or anything, but just using ets with front/rear counters.

-module(shq).

-export([init/1,
         in/2,
         in_r/2,
         out/1,
         out_r/1,
         size/1]).

init(Tab) ->
    Ref = ets:new(Tab, [public, set]),
    ets:insert(Ref, [{front, 0}, {rear, 0}]),
    {ok, Ref}.

in(Ref, Value) ->
    true = ets:insert(Ref, {ets:update_counter(Ref, rear, 1) - 1, Value}),
    ok.

in_r(Ref, Value) ->
    true = ets:insert(Ref, {ets:update_counter(Ref, front, -1), Value}),
    ok.
    
out(Ref) ->
    KF = ets:update_counter(Ref, front, 1) - 1,
    case ets:take(Ref, KF) of
      [{_, V}] ->
          {ok, V};
      [] ->
        empty
    end.

out_r(Ref) ->
    KR1 = ets:update_counter(Ref, rear, -1),
    case ets:take(Ref, KR1) of
      [{_, V}] ->
          {ok, V};
      [] ->
        empty
    end.

size(Ref) ->
    ets:lookup_element(Ref, rear, 2) - ets:lookup_element(Ref, front, 2).

Using the suggested utTc module above to test.
In both tests 1_000_000 entries were add into the table before running.

> utTc:tm(1000,100,shq, out, [T]). 
=====================
execute Args:[#Ref<0.2264322891.2797469697.147954>]
execute Fun :out
execute Mod :shq
execute LoopTime:100
execute ProcCnts:1000
PMaxTime:   12487019(ns)   0.012487(s)
PMinTime:     328385(ns)   0.000328(s)
PSumTime: 3748337374(ns)   3.748337(s)
PAvgTime: 3748337.37(ns)   0.003748(s)
FAvgTime: 37483.3737(ns)   0.000037(s)
PGrar   :        397(cn)       0.40(%)
PLess   :        603(cn)       0.60(%)
=====================

and for comparison, the original gen_server based shq module (named shq2 locally)

 utTc:tm(1000,100,shq2, out, [Ref]).
=====================
execute Args:[<0.89.0>]
execute Fun :out
execute Mod :shq2
execute LoopTime:100
execute ProcCnts:1000
PMaxTime:  353619294(ns)   0.353619(s)
PMinTime:  348715711(ns)   0.348716(s)
PSumTime: 3514640250(ns) 351.464025(s)
PAvgTime: 351464025.(ns)   0.351464(s)
FAvgTime: 3514640.25(ns)   0.003515(s)
PGrar   :        507(cn)       0.51(%)
PLess   :        493(cn)       0.49(%)
=====================
3 Likes

This is a really fun thread! I’m enjoying reading the code and ideas being shared

Or perhaps use atomics, if it’s OK the integers being bounded.

2 Likes