module Concurrent::Stream
Overview
Influenced by Ruby parallel and Java streams.
Creating a stream:
- Channel#parallel creates a Stream::Source reading from the Channel.
- Enumerable#parallel creates a Channel and Stream::Source reading from it.
Stream operations:
- #map { } - Same as Enumerable#map but runs in a fiber pool.
- #select { } - Same as Enumerable#select but runs in a fiber pool.
- #batch(size) { } - Groups results in to chunks up to the given size.
- #run { } - Runs block in a fiber pool. Further processing is not possible except for #wait.
- #tee { } - Runs block in a fiber pool passing the original message to the next Stream.
- #serial - returns an Enumerable collecting results from a parallel Stream.
Final results and error handling
All method chains should end with #wait, #serial, or #to_a all of which gather errors and end parallel processing. You may omit calling #wait when using #run for background tasks where completion is not guaranteed. When used in this fashion make sure to catch all exceptions in the run block or the internal exception channel may fill. causing the entire pipeline to stop.
Error handling
Use #wait, #serial, or #to_a receive errors or rescue within any blocks. Better handling is a WIP.
EXPERIMENTAL
Defined in:
concurrent/stream.crConstant Summary
-
Log =
::Log.for(self)