How to handle only certain PIDs in receive?

This is draft code of asyncronous data-processing:

process_all_items(Items) ->
    Monitors = lists:map(fun process_single_item/1, Items),
    _MonitoredPIDs = lists:map(fun({Pid, _MonitorRef}) -> Pid end, Monitors),
    yield_results(Monitors, []).

process_single_item(Item) ->
    ParentPid = self(),
    spawn_monitor(fun() ->
        Result = case process_single_item_logic(Item) of
            {ok, Result} -> {<data ok, something>};
            {error, Reason} -> {<error, something2>};
        end,

        ParentPid ! {self(), Result}
    end).


yield_results([], Results) ->
    lists:reverse(Results);

    
% (!)
yield_results([MonitorRef | Rest], Results) ->
    receive
        % (1) (?) How to handle only my PIDs?
        {_Pid, Result} ->
            yield_results(Rest, [Result | Results]);

        % (2) (?) How to handle only my PIDs?
        {'DOWN', _ActualMonitorRef, process, _Pid, _Reason} ->
            yield_results(Rest, Results)
    after 5000 ->
        % (3) (?) How to throw only for my PIDs?
        throw("Timeout!")
    end.

How will I ensure that in yield_results(..) ONLY the messages with the certain, previously spawned PIDs, are processed? The ones contained in Monitors

Good question! spawn_monitor returns a {pid(), reference()} tuple, see the docs.

So modify your receive to pattern match:

yield_results([{MonitorPid, _Ref} | Rest], Results) ->
    receive
      {MonitorPid, Result} ->

I know what it returns. The question isn’t about it.

It seems you might have only read half of my answer. Unless I am mistaken, you have the pids inside of the MonitorRefs and can use those in the receive head.

@mpope is right. I’ll explain line by line.

Since process_single_item/1 returns the result of spawn_monitor/1, which is {Pid, MonitorRef}, Monitor is a list of {Pid, MonitorRef} tuples.

The above line is pointless.

Here, what gets passed as first argument to yield_results/2 is Monitors, that is the list of {Pid, MonitorRef} tuples from the spawn_monitor/1 calls in process_single_item/1.

This should read yield_results([{Pid, MonitorRef} | Rest], Results) ->, because see above.

This should read…

        {Pid, Result} ->
            demonitor(MonitorRef, [flush]),
            yield_results(Rest, [Result | Results];

        {'DOWN', MonitorRef, process, Pid, _Reason} ->
            yield_result(Rest, Results)

… which also solves the question regarding after timing out only for messages from your Pids, since the selective receive only selects either result or ’ DOWN’ messages originating from your Pids.

1 Like

The original question hasn’t been answered. It wasn’t about asking for the code.

I’ll gladly be the third: mpope’s answer is an answer to your question. It uses the fact that Erlang’s receive traverses the whole mailbox to find matching patterns. Thus, by using the value that you pass, as it is the result of spawn_monitor, you will only collect results that are shaped {Pid, ...} (though any process could have generated those).

If you really really insist on matching the PIDs, you can use a map (or a v2 set) of PIDs instead of the explicit monitors (which weren’t used correctly anyway as you never matched the MonitorRef, which in turn wasn’t an actual erlang:monitor result but a tuple):

yield_results(Pids, Results) when map_size(Pids) =:= 0 ->
    lists:reverse(Results);
yield_results(Pids, Results) ->
    receive
        {Pid, Result} when is_map_key(Pid, PidToMonitor) ->
            yield_results(Pids, [Result | Results]);
        {'DOWN', _MonitorRef, process, Pid, _Reason) ->
            yield_results(maps:remove(Pid, Pids), Results)
    after 5000 -> ...

A much easier solution to issues like this is to generate a ref in the spawning function and match on that instead, so something like:

process_all_items(Items) ->
    Ref = make_ref(),
    Monitors = [process_single_item(Ref, Item) || Item <- Items],
    yield_results(Monitors, Ref, []).

process_single_item(Ref, Item) ->
    ParentPid = self(),
    {_Pid, MonitorRef} = spawn_monitor(fun() ->
        Result = case process_single_item_logic(Item) of
            {ok, Result} -> {<data ok, something>};
            {error, Reason} -> {<error, something2>};
        end,

        ParentPid ! {Ref, Result}
    end),
    MonitorRef.

yield_results([], _Ref, Results) ->
    lists:reverse(Results);
    
% (!)
yield_results([MonitorRef | Rest] = Monitors, Ref, Results) ->
    receive
        {Ref, Result} ->
            yield_results(Monitors, Ref, [Result | Results]);

        {'DOWN', MonitorRef, process, _Pid, _Reason} ->
            yield_results(Rest, Ref, Results)
    after 5000 ->
        error(timeout)
    end.
2 Likes

I still think that @mpope’s answer is the best one, but I guess this is what you’re looking for, @patricio3661 …

yield_results([{MonitoredPid, _} = MonitorRef | Rest], Results) ->
    receive
        {MonitoredPid, Result} ->
            yield_results(Rest, [Result | Results]);

        {'DOWN', _ActualMonitorRef, process, MonitoredPid, _Reason} ->
            yield_results(Rest, Results);

       _ -> %% This ignores all unwanted messages
            yield_results([MonitorRef | Rest], Results)
    after 5000 ->
        % (3) (?) How to throw only for my PIDs?
        throw("Timeout!")
    end.

In other words… use pattern-matching in the receive clauses with the PIDs you’re looking for.

Question (3) I left it in place because I don’t really know what you want there. It can be…

  • How to throw only if I don’t get a message from my PIDs after 5 seconds? (i.e., I don’t care about messages from other PIDs). In that case, just remove my last receive clause (the one that ignores messages).
  • How to throw after 5 seconds of complete inactivity? (i.e. it spent 5 seconds without receiving any message, from your PIDs or any other) In that case, the code I wrote above works.
  • How to throw after 5 seconds of inactivity but only if all the PIDs are dead already? (i.e., you only want to throw a timeout if you waited for a message that will never arrive). Since you’re monitoring all your PIDs, you can be 100% certain that you’ll get a 'DOWN' message for each of them. Therefore, if you want to wait until all the PIDs are done, just remove the after clause.
1 Like

It’s strange that everyone claims that someone’s answer is the best one. For whom it is, though?

And does the author of the question think so as well? I don’t know. Maybe somebody should ask him directly, whether or not?

You have been given two solutions; as I remember them:

  1. Loop over a list of pids and use selective receive matching on one Pid at the time.
  2. Store the pids as keys in a map and use selective receive on any of them in parallel through the is_map_key/2 guard.

Both has their pros and cons. Does any of them work for you or if not why not?

How will I ensure that in yield_results(..) ONLY the messages with the certain, previously spawned PIDs, are processed? The ones contained in Monitors

Normal Erlang messages do not contain any metadata, not even sender’s pid, if that is what you mean. In general, it’s impossible to know the origin of an arbitrary message, and for a good reason too: the message can originate from a port, C node, NIF running on a dirty scheduler or other places that may not even have a pid. If you need to process messages only from certain senders and ignore or discard the others, it’s your job to design a protocol that tags messages with the appropriate metadata (sender’s pid, monitor ref or whatever).

You were given plenty of examples of such protocols. I don’t know what magical alternative solution you expect, because there is none. Check OTP’s own sources, if you don’t believe: every gen:call message is tagged with a monitor reference for request/reply matching.

1 Like

Do you think any of these combined form an answer? If not, which part of your original question do you fell that they have not addressed?

Outside of the terseness of my answer, I didn’t describe my answer well at all admittedly. But the machinery behind it has been kindly addressed.

Then sorry, I don’t get it, please restate your question.

If it is about receiveing only messages originating from a specific sender, then @ieQu1’s answer is the best one :grin::grin::grin: *runs*

But seriously, everybody including myself was just trying to be helpful by making suggestions how you can make your example use case work despite the fact that @ieQu1 explained.

1 Like

For whom it is, though?

In my case, at least, I said so because I was thinking in all other members of the community arriving to this thread with similar doubts as the ones expressed by you in your original question. I didn’t want them to be confused and think that my answer was a better one than @mpope when, in fact, it was just an alternative solution considering that you didn’t like @mpope’s one :wink:

1 Like

Regarding my initial question - I’ve understood it. However, In my code there’s an issue:

it’s an assumption that yield_results/2 will process or receive results in the same order the tasks/jobs were spawned earlier.

yield_results([{Pid, MonitorRef} | Rest], Results) ->
  % ..... 

The head of a list if a certain Pid each time.

But there’s no guarantee. And why would there be? A list of tasks/jobs with side-effects - process_single_item_logic in this case - can get finished in any, as well as random, order. Whereas yield_results/2 expects the Pid-s to arrive in the fixed order.

Why would it work properly then? Why has no one pointed this out in the beginning?

Because…

… it doesn’t. If you do a selective receive (which you do), you will get the next message that matches one of the receive patterns, no matter if it is the next message in the queue or a later one. For illustration:

1> Parent = self().
<0.81.0>
2> Pids = [spawn(fun() -> timer:sleep(T*1000), Parent ! self() end) || T <- lists:seq(4, 0, -1)].
[<0.84.0>,<0.85.0>,<0.86.0>,<0.87.0>,<0.88.0>]
3> [receive Pid -> {message, Pid} end || Pid <- Pids].
[{message,<0.84.0>},
 {message,<0.85.0>},
 {message,<0.86.0>},
 {message,<0.87.0>},
 {message,<0.88.0>}]

This works despite the fact that the messages did arrive in reverse order from the order in which the processes were spawned.

It may be just me, but I find it strange that you complain about missing a comment on an issue in your code (which isn’t an issue) when all the time you were insisting that the question wasn’t about the code and that the original question wasn’t answered :man_shrugging:

5 Likes

Look, let’s try to approach it differently. Given your code (or a revised version of it), what is your input, what do you expect as output, and what do you actually get instead?

I didn’t ask about the pattern in receive, but about the head of the list of Pid-s. Which each iteration expects the proper and certain Pid.

yield_results([{Pid, MonitorRef} | Rest], Results) ->

which must not be effective because there may be other messages that’ve arrived first, in the mailbox, with other Pid-s from the list. Whereas receive keeps waiting for the one with a certain Pid for this step of the iteration.

Yes, so? You will get it if any message in the mailbox, anywhere in the mailbox, matches one of the receive patterns. If there is no matching message in the mailbox, receive will wait until one arrives, or the timeout expires.

1> self() ! a.
a
2> self() ! b.
b
3> self() ! c.
c

You now have 3 messages, a, b and c in your mailbox, in that order.

4> receive b -> b end.
b

You got the first message matching the receive pattern b from the mailbox, even though it was not at the head of the mailbox. a and c remain in the mailbox.

1 Like