Iterator - lazy sequences simulating `lists` API (for processing data that does not fit to RAM)

This library allows you to work with “too large to fit to RAM” or infinity-long data the same way as you would work with lists using lists module.

Let’s say you have a 100Gb text file that has many lines which may or may not be numbers and we want to calculate a total sum of numbers (why not?).

foo
1234
321
barbar
baz
543

If this file was just 1Mb, you would just read it whole into memory, split it by \n and process the resulting list using lists:filter and lists:sum or lists:foldl:

% get lines
{ok, Bin} = file:read_file("1mb_file.txt"),
Lines = binary:split(B, <<"\n">>, [global]),
MatchingLines = lists:filter(
  fun(Line) ->
    % does the line contain number?
    case re:run(Line, "^[0-9]+$") of
            nomatch ->
                false;
            {match, _} ->
                true
     end
 end, LinesIterator),
% convert to integers
Integers = lists:map(fun erlang:binary_to_integer/1, MatchingLines),
% sum integers
Sum = lists:sum(Integers).

However what are we going to do if the file does not fit into RAM? Then we would have to read the file line-by-line and write complex, nested, non-reusable, manual tail-recursive functions to get the same result. Smth like

process(F, Sum) ->
  case file:read_line(F) of
    {ok, Line} ->
      case re:run(Line, "^[0-9]+$") of
            nomatch ->
                process(F, Sum);
            {match, _} ->
                process(F, Sum + binary_to_integer(Line))
     end;
    eof ->
      Sum
  end.

iterator.erl lets you use the same high-order functions as in lists module (or your own) without reading the whole input at once:

LinesIterator = file_line_iterator("100GB_file.txt"), % implementation see in repo README
MatchingIterator =
    iterator:filter(
        fun(Line) ->
            case re:run(Line, "^[0-9]+$") of
                nomatch ->
                    false;
                {match, _} ->
                    true
            end
        end, LinesIterator).
IntegerIterator = iterator:map(fun erlang:binary_to_integer/1, MatchingIterator).
Sum = iterator:fold(fun erlang:'+'/2, 0, IntegerIterator).

The code looks identical to the first example (uses iterator: module in place of lists:), but it never reads more than one line at a time into memory!

Library also includes iterator_pmap: parallel version of lists:map that processes the input iterator elements on a pool of worker processes and maps the results to output iterator (there are ordered and unordered versions):

1> I = iterator:from_list(lists:seq(1, 100)).
...
2> I1 = iterator_pmap:pmap(fun(T) -> timer:sleep(T), T end, I, #{ordered => false}).
...
3> timer:tc(fun() -> iterator:to_list(I1) end).
{559483, [1,2,3,4,5|...]}

so it takes just 559ms to process the whole list on 10 (configurable) workers instead of 5s

4> timer:tc(fun() -> lists:map(fun(T) -> timer:sleep(T), T end, lists:seq(1, 100)) end).
{5151070, [1,2,3,4,5|...]}

We use it at Klarna to run a batch maintenance jobs or to iterate through database tables mostly.

It is available on hex

Hex docs

https://hexdocs.pm/iterator/

And github

7 Likes

Reminds me of… Lazy - fun project exploring lazy sequences in Erlang :sweat_smile:

The API of lazy is generally more verbose than yours, but I think it is a better solution towards lazyness in general. Which may mean it is over-engineered, too, since what do you want to really use lazy stuff for other than reading/processing potentially huge files or databases, which is what your lib does? :heart:

4 Likes

Yes, I remember I saw lazy and was inspired by it. But there were some parts that I didn’t like but I don’t remember what exactly it was :smile: Maybe because sequence was represented by just a fun with closure vs explicit state. Or maybe I wanted to emulate lists API as close as possible…
But true, they are very similar!

4 Likes

A few updates since the last announce. After some time of using the library for practical tasks to do long-running (hours or even days) maintenance jobs a couple more features were introduced:

Progress reporting

If we have an input that is several millons long and it takes a few days to process, it’s nice to get some progress reporting. I was inspired by UNIX pv tool that is inserted somewhere in the middle of the shell pipeline and it reports the flow of data through it. So iterator:pv/3 or alias iterator:progress/3 were added:

I0 = ...,
I1 = iterator:pv(
  fun(SampleElement, TimePassed, ItemsPassed, TotalItems) ->
    TimeS = erlang:convert_time_unit(TimePassed, native, second),
    ?LOG_INFO("Processed ~p items. Pace is ~p per-second. Current item: ~p",
              [TotalItems, ItemsPassed / TimeS, SampleElement]) % beware div by zero
  end,
  #{for_each_n => 1000,
    every_s => 30},
  I0),
...

This example will log the current progress either every 30 seconds or after processing every 1000 elements (whichever triggers first).

Pace throttling

Sometimes we don’t want to overload the database or CPU or another resource by processing data too quick. Then we can explicitly slow-down the processing. Two new rate-limiting pass-through iterators are added: iterator_rate:leaky_bucket/2 (never exceeds requested rate) and iterator_rate:token_bucket/2 (allows short bursts, but guarantees amortized rate).

This example will print 30 times per second (rate). However if producer or consumer would somehow pause for 2 seconds and then resumes, it will allow a short burst of 60 items (capacity):

I0 = iterator:from_list(lists:seq(1, 1000)),
I1 = iterator_rate:token_bucket(
  #{
    rate => 30,
    capacity => 60,
    window_ms => 1000
   },
   I0),
iterator:foreach(fun(I) ->
  io:format("~p: ~p~n", [calendar:local_time(), I])
end, I1).
3 Likes