How to handle only certain PIDs in receive?

Wait… re-reading yield_results in your opening post, the Pid isn’t matched against. Instead, you’re receiving either a 2-tuple or 'DOWN'-tuple in each step, as many as there are elements in the first argument list. And while this has obvious issues like the possibility (or rather, likelihood) of skipping results and leaving messages in the mailbox, none of them are related to the order of the elements in the argument list. What would you have expected to be pointed out in that regard?

(Sorry for the spanish/porteño, english-speaking friends, but I think it will be clearer for @patricio3661 this way)

Aaaaaah, flaco! Ahí entendí lo que querés! Cuando vos pusiste…

How will I ensure that in yield_results(..) ONLY the messages with the certain, previously spawned PIDs , are processed? The ones contained in Monitors

…no estabas preguntando “Cómo hacer que yield_results(..) no procese mensajes que no estén en Monitors?”, vos querías decir “Cómo hacer que yield_results(…) procese sólo los mensajes que están en Monitors aunque vengan en cualquier orden (no necesariamente en el orden en que aparecen en esa lista)?”.

Eso, si no te importa descartar los otros mensajes (o sea… los que vengan de procesos que no sean los que vos iniciaste y tengan pinta de {Pid, Cosa}), se hace más o menos así…

yield_results([], Results) ->
    lists:reverse(Results);
yield_results(Monitors, Results) ->
    receive
        {Pid, Result} when is_pid(Pid) ->
            case lists:keytake(Pid, 1, Monitors) of
                false -> results(Monitors, Results);
                {value, _, OtherMonitors} ->
                    yield_results(OtherMonitors, [Result | Results])
            end;

        {'DOWN', _ActualMonitorRef, process, Pid, _Reason} ->
            case lists:keytake(Pid, 1, Monitors) of
                false -> results(Monitors, Results);
                {value, _, OtherMonitors} ->
                    yield_results(OtherMonitors, Results)
            end

    after 5000 ->
        % (3) (?) How to throw only for my PIDs?
        throw("Timeout!")
    end.

(el item (3) sigo sin entenderlo :P)

Ahora bien, eso te devuelve los resultados en el orden en que los recibiste… que puede no tener nada que ver con el orden de Monitors. Si eso no te jode, todo bien… sino… vas a tener que hacer alguna otra cosa más complicada, para la cual yo quizá usaría mapas en lugar de listas.

Ojalá que sirva.

Hi @elbrujohalcon, I didn’t understand a word you wrote, but I understand your code and I can see a few issues with it :wink:

  • It steals and ignores messages that are 2-tuples with a pid as first element as well as any 'DOWN' messages from the message queue that were not sent by the spawned processes; those were sent by other processes, likely for a purpose.
  • It doesn’t keep the results in order, which means the results can not be related to the Items that were processed (if that is important).
  • It leaves at least one 'DOWN' message in the queue even if all goes well.
  • If it hits the timeout and if the throw is catched, there may be late results and 'DOWN' messages coming in from the spawned processes, polluting the message queue.

Item by item:

  • Yes, and I explicitely mentioned it to @patricio3661 :white_check_mark:
  • Yes, and I explicitely mentioned it to @patricio3661 :white_check_mark:
  • I didn’t notice this, I wonder if this will be a problem for him :question:
  • True… but all solutions so far have the same issue, right? Well… except if you remove that after 5000 block. I guess, it’s up to him to decide what to do, but I don’t think he’s concerned about that at all.

Thanks for the review, @Maria-12648430

1 Like

But you see how it happens, yes? It could be solved by demonitor+flush, you get the monitor ref in the result of the lists:keytake.

Indeed :wink:

Solving that would be a bit more complicated… It would require using an alias for sending the result messages from the spawned processes. In the after clause, deactivate the alias so no more messages from the spawned processes come in, demonitor+flush all remaining monitors, flush out all 2-tuples with one of the remaining pids in the first element. And in the final yield_results clause (which doesn’t need a lists:reverse around Results since the results are out of order, anyway), also deactivate the alias. Sth like that :woman_shrugging:

:hugs:

1 Like

Look at the code again.
Not a pattern alone. But a pattern and a concreate value of Pid that’s expected to arrive on this step of the iteration.

Si que me lo importa. No quero procesar los mensajes de otros procesos. Y por qué lo querría?

Still can’t find anything wrong with it. I use your inital code, with some modifications, to make this module:

-module(pdp).
-compile([export_all, nowarn_export_all]).

process_all_items(Items) ->
    Monitors = lists:map(fun process_single_item/1, Items),
    yield_results(Monitors, []).

process_single_item(Item) ->
    ParentPid = self(),
    spawn_monitor(fun() ->
        ParentPid ! {self(), process_single_item_logic(Item)}
    end).

yield_results([], Results) ->
    lists:reverse(Results);
yield_results([{Pid, MonitorRef} | Rest], Results) ->
    receive
        {Pid, Result} ->
            demonitor(MonitorRef, [flush]),
            yield_results(Rest, [Result | Results]);
        {'DOWN', MonitorRef, process, Pid, _Reason} ->
            yield_results(Rest, Results)
    after 5000 ->
        throw("Timeout!")
    end.

process_single_item_logic(Item) ->
    timer:sleep(Item),
    {ok, T}.

I simplified process_single_item/1 a little for the sake of brevity. I implemented process_single_item_logic to wait for the time (to simulate work being done) given in Item, then return {ok, Item} (to show up in the results). Most significant, I modified yield_results/2 to use selective receives with concrete values of a Pid that’s expected to arrive on this step of the iteration (your words) in the receive patterns.

Now, if I run it, like this for example…

1> pdp:process_all_items([1000, 0, 2000, 4000, 3000]).
[{ok,1000},{ok,0},{ok,2000},{ok,4000},{ok,3000}]

… it returns all results, in the order in which the items were put in (and in consequence, the order in which the processes were started), even though the processes took different times and thus answered in another order (second process immediately, first process after 1s, third process after 2s, fifth process after 3s, fourth process after 4s) than how they are received in the iteration.

And even if I insert messages like {SomePid, Something} and 'DOWN' messages beforehand, it still works:

2> self() ! {self(), foo}.                               
{<0.81.0>,foo}
3> monitor(process, spawn(fun() -> ok end)).
#Ref<0.3676790454.1990459394.21072>
4> pdp:process_all_items([1000, 0, 2000, 4000, 3000]).
[{ok,1000},{ok,0},{ok,2000},{ok,4000},{ok,3000}]
5> flush().
Shell got {<0.81.0>,foo}
Shell got {'DOWN',#Ref<0.3676790454.1990459394.21072>,process,<0.92.0>,normal}
ok

So, where is the problem, what else do you expect to happen? :thinking:

1 Like

@patricio3661 You fundamentally misunderstand either how selective receive works, or variable scoping rules and/or pattern matching.

yield_results([{Pid, MonitorRef} | Rest], Results) ->

^ This line of code binds variables Pid, MonitorRef, Rest and Results for the entire scope of the function clause.

Next statement,

recieve
      {MonitorRef, Result} ->

uses the already bound value of MonitorRef and searches the mailbox for the messages that have a form of 2-tuple where the first element is equal* to the value of MonitorRef. If such message is already present in the mailbox, receive clause binds a new variable Result to the value of the second element of the tuple. If it’s not found, the process is blocked until a message confirming to the pattern is received.
Note that Erlang does NOT shadow variable assignments, unlike, say, Ocaml where the following construction is allowed:

let a = (1, 1) in
  match a with
  | (a, _) -> a;; (* a has been shadowed *)

Which evaluates to 1. In Erlang such construct can never match because it would imply 1 = (1, 1).

In other words, while most FPLs treat variables by rules of lambda-calculus, Erlang treats them more like a system of equations.

  • There’s a subtle difference between matching terms and comparing them equal, but let’s not over-complicate matters here.
3 Likes

This should be Pid, not MonitorRef, because the spawned processes send {self(), ...}. Otherwise, I agree :wink:

Bueno, digamos que:

  1. No los procesás, los descartás.
  2. En general, si le llegan mensajes a tu proceso y a vos no te importan, es razonable ignorarlos.
  3. Para que le lleguen mensajes así (en la format {Pid, Thing}) a tu proceso, algún otro proceso se los tuvo que enviar… para eso, como tu proceso no está registrado (o sea, no tiene un nombre), el otro proceso tiene que saber cuál es el Pid de tu proceso. Si bien esto no es imposible, considerando que vos sos el dueño (o el autor) del código que corre en tu nodo… es, vamos a decir, muy poco probable que algún otro proceso alocadamente te mande un mensaje así, no? Y si lo hace… y lo ignorás… que se joda. Let it crash!

En cualquier caso, considerando que no dijiste lo contrario, entiendo que finalmente comprendí tu problema original. Siendo así, me parece que la mejor (más rápida) forma de resolverlo sería charlarlo directamente en vivo. Si te interesa, buscame en el slack de Erlang o el de la EEF y seguimos la charla por ahí.

For the english-speaking folks, let me rephrase @patricio3661’s problem the best that I can: What he needs is a way for yield_results/2 to avoid imposing an order on the reception of the messages. He does want to just process the results from the processes he’s monitoring (i.e. Monitors) and not others… but he doesn’t need to receive them in order.

Thanks for the gist and keeping us in the loop @elbrujohalcon :slight_smile:

So while now we know the actual problem, I don’t think that it makes much of a difference whether you receive in Monitor order (which is easy) or in as-they-come-in order (which is more complicated). (There is a difference, more on that later). Nevertheless, no matter whether he receives the messages in Monitor order or as-they-come-in, yield_results will still take the same time, roughly that of the longest running of the spawned processes.


The difference I mentioned is in the timeout behavior.

Using @juhlig’s example code (ie, receiving in Monitor order) from above, calling it like for example pdp:process_all_items([4000, 6000]) will succeed after 6s and return [{ok, 4000}, {ok, 6000}], whereas pdp:process_all_items([6000, 4000]) will time out after 5s. A call like pdp:process_all_items([4000, 10000]) will time out after 9s and pdp:process_all_items([10000, 4000]) will time out after 5s.

Using a receiving as they come approach, pdp:process_all_items([6000, 4000]) will always succeed after 6s no matter what the argument order is, and may return either [{ok, 6000}, {ok, 4000}] or [{ok, 4000}, {ok, 6000}]. A call like pdp:process_all_items([4000, 10000]) will always time out after 9s, no matter what the argument order is.

1 Like

It will always return [{ok, 4000}, {ok, 6000}] :wink:

Indeed, my bad :sweat_smile:

If you want to receive in any order but only from the same set of processes started for that batch, you could generate a batch reference with make_ref() and pass that as the monitor tag to erlang:monitor/3. Then you can match on the batch reference instead of one of the PIDs.

4 Likes

Because you’re trying to explain this to yourself, and which you already know, thus keeping to miss what was asked about in the initial question.

That’s it then. Bingo.

How should I (or anybody for that matter) know what you know or don’t know or understood or misunderstood? :thinking:

You got roughly the same answer from all the people who tried to help, including me, in varying degrees of detail. If none of them did answer your question, it might be because your question wasn’t all that clear to begin with, and should have been rephrased or refined?

Just saying.

4 Likes