| Copyright | (c) Justin Le 2019 |
|---|---|
| License | BSD3 |
| Maintainer | [email protected] |
| Stability | experimental |
| Portability | non-portable |
| Safe Haskell | Safe-Inferred |
| Language | Haskell2010 |
Data.Conduino
Description
Base API for Pipe. See documentation for Pipe, .|, and runPipe
for information on usage.
A "prelude" of useful pipes can be found in Data.Conduino.Combinators.
Why a stream processing library?
A stream processing library is a way to stream processors in a composable way: instead of defining your entire stream processing function as a single recursive loop with some global state, instead think about each "stage" of the process, and isolate each state to its own segment. Each component can contain its own isolated state:
>>>runPipePure $ sourceList [1..10].| scan (+) 0 .| sinkList [1,3,6,10,15,21,28,36,45,55]
All of these components have internal "state":
sourceListkeeps track of "which" item in the list to yield nextscankeeps track of the current running sumsinkListkeeps track of all items that have been seen so far, as a list
They all work together without knowing any other component's internal state, so you can write your total streaming function without concerning yourself, at each stage, with the entire part.
In addition, there are useful functions to "combine" stream processors:
zipSinkcombines sinks in an "and" sort of way: combine two sinks in parallel and finish when all finish.altSinkcombines sinks in an "or" sort of way: combine two sinks in parallel and finish when any of them finishzipSourcecombines sources in parallel and collate their outputs.
Stream processing libraries are also useful for streaming composition of monadic effects (like IO or State), as well.
Synopsis
- data Pipe i o u m a
- (.|) :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m r
- runPipe :: Monad m => Pipe () Void u m a -> m a
- runPipePure :: Pipe () Void Void Identity a -> a
- awaitEither :: Pipe i o u m (Either u i)
- await :: Pipe i o u m (Maybe i)
- awaitWith :: (i -> Pipe i o u m u) -> Pipe i o u m u
- awaitSurely :: Pipe i o Void m i
- awaitForever :: (i -> Pipe i o u m a) -> Pipe i o u m u
- yield :: o -> Pipe i o u m ()
- yieldLazy :: o -> Pipe i o u m ()
- (&|) :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m (v, r)
- (|.) :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m v
- fuseBoth :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m (v, r)
- fuseUpstream :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m v
- fuseBothMaybe :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m (Maybe v, r)
- squeezePipe :: Monad m => Pipe i o u m a -> m ([o], Either (i -> Pipe i o u m a) a)
- squeezePipeEither :: Monad m => Pipe i o u m a -> m ([o], Either (Either u i -> Pipe i o u m a) a)
- feedPipe :: Monad m => [i] -> Pipe i o u m a -> m ([o], Either (i -> Pipe i o u m a) ([i], a))
- feedPipeEither :: Monad m => [i] -> Pipe i o u m a -> m ([o], Either (Either u i -> Pipe i o u m a) ([i], a))
- mapInput :: (i -> j) -> Pipe j o u m a -> Pipe i o u m a
- mapOutput :: (p -> o) -> Pipe i p u m a -> Pipe i o u m a
- mapUpRes :: (u -> v) -> Pipe i o v m a -> Pipe i o u m a
- trimapPipe :: (i -> j) -> (p -> o) -> (u -> v) -> Pipe j p v m a -> Pipe i o u m a
- passthrough :: Monad m => Pipe i o u m a -> Pipe i (Maybe i, o) u m a
- hoistPipe :: (Monad m, Monad n) => (forall x. m x -> n x) -> Pipe i o u m a -> Pipe i o u n a
- feedbackPipe :: Monad m => Pipe x x u m a -> Pipe x x u m a
- feedbackPipeEither :: Monad m => Pipe (Either i o) o u m a -> Pipe i o u m a
- newtype ZipSource m a = ZipSource {
- getZipSource :: Pipe () a Void m ()
- unconsZipSource :: Monad m => ZipSource m a -> m (Maybe (Maybe a, ZipSource m a))
- zipSource :: Monad m => Pipe () (a -> b) u m () -> Pipe () a v m () -> Pipe () b w m ()
- newtype ZipSink i u m a = ZipSink {
- getZipSink :: Pipe i Void u m a
- zipSink :: Monad m => Pipe i Void u m (a -> b) -> Pipe i Void u m a -> Pipe i Void u m b
- altSink :: Monad m => Pipe i Void u m a -> Pipe i Void u m a -> Pipe i Void u m a
- toListT :: Applicative m => Pipe () o u m () -> ListT m (Maybe o)
- fromListT :: Monad m => ListT m (Maybe o) -> Pipe i o u m ()
- pattern PipeList :: Monad m => ListT m (Maybe a) -> Pipe () a u m ()
- withSource :: Pipe () o u m () -> (Maybe (o, m r) -> m r) -> m r
- genSource :: (forall r. (Maybe (o, m r) -> m r) -> m r) -> Pipe i o u m ()
Documentation
Similar to a conduit from the conduit package.
For a , you have:Pipe i o u m a
i: Type of input stream (the things you canawait)o: Type of output stream (the things youyield)u: Type of the result of the upstream pipe (Outputted when upstream pipe terminates)m: Underlying monad (the things you canlift)a: Result type when pipe terminates (outputted when finished, withpureorreturn)
Some specializations:
- If
iis(), the pipe is a source --- it doesn't need anything to produce items. It will pump out items on its own, for pipes downstream to receive and process. - If
oisVoid, the pipe is a sink --- it will neveryieldanything downstream. It will consume items from things upstream, and produce a result (a) if and when it terminates. - If
uisVoid, then the pipe's upstream is limitless, and never terminates. This means that you can useawaitSurelyinstead ofawait, to get await a value that is guaranteed to come. You'll get aniinstead of a.Maybei - If
aisVoid, then the pipe never terminates --- it will keep on consuming and/or producing values forever. If this is a sink, it means that the sink will never terminate, and sorunPipewill also never terminate. If it is a source, it means that if you chain something downstream with.|, that downstream pipe can useawaitSurelyto guarantee something being passed down.
Applicative and Monadic sequencing of pipes chains by exhaustion.
do pipeX pipeY pipeZ
is a pipe itself, that behaves like pipeX until it terminates, then
pipeY until it terminates, then pipeZ until it terminates. The
Monad instance allows you to choose "which pipe to behave like next"
based on the terminating result of a previous pipe.
do x <- pipeX pipeBasedOn x
Usually you would use it by chaining together pipes with
.| and then running the result with
runPipe.
runPipe$ someSource.|somePipe .| someOtherPipe .| someSink
See .| and runPipe for more information
on usage.
For a "prelude" of commonly used Pipes, see
Data.Conduino.Combinators.
Instances
(.|) :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m r infixr 2 Source #
The main operator for chaining pipes together. pipe1 .| pipe2 will
connect the output of pipe1 to the input of pipe2.
Running a pipe will draw from pipe2, and if pipe2 ever asks for
input (with await or something similar), it will block until pipe1
outputs something (or signals termination).
The structure of a full pipeline usually looks like:
runPipe$ someSource.|somePipe .| someOtherPipe .| someSink
Where you route a source into a series of pipes, which eventually ends
up at a sink. runPipe will then produce the result of that sink.
runPipe :: Monad m => Pipe () Void u m a -> m a Source #
Run a pipe that is both a source and a sink (an "effect") into the effect that it represents.
Usually you wouild construct this using something like:
runPipe$ someSource.|somePipe .| someOtherPipe .| someSink
runPipe will produce the result of that final sink.
Some common errors you might receive:
iis not(): If you give a pipe where the first parameter ("input") is not(), it means that your pipe is not a producer. Pre-compose it (using.|) with a producer of the type you need.
For example, if you have a myPipe :: , this is
a pipe that is awaiting Pipe Int o u m aInts from upstream. Pre-compose with
a producer of Ints, like , in order to be able to run it.sourceList
[1,2,3] .| myPipe
ois notVoid: If you give a pipe where the second parameter ("output") is notVoid, it means that your pipe is not a consumer. Post-compose it (using.|) with a consumer of the type you need.
For example, if you have myPipe :: , this is
a pipe that is yielding Pipe i Int u m aInts downstream that are going unhandled.
Post-compose it a consumer of Ints, like myPipe , in order to be able to run it..|
foldl (+) 0
If you just want to ignore all downstream yields, post-compose with
sinkNull.
Primitives
awaitEither :: Pipe i o u m (Either u i) Source #
Await on upstream output. Will block until it receives an i
(expected input type) or a u if the upstream pipe terminates.
await :: Pipe i o u m (Maybe i) Source #
Await input from upstream. Will block until upstream yields.
Will return Nothing if the upstream pipe finishes and terminates.
If the upstream pipe never terminates, then you can use awaitSurely to
guarantee a result.
awaitSurely :: Pipe i o Void m i Source #
Await input from upstream where the upstream pipe is guaranteed to never terminate.
A common type error will occur if u (upstream pipe result type) is not
Void -- it might be () or some non-Void type. This means that the
upstream pipe terminates, so awaiting cannot be assured.
In that case, either change your upstream pipe to be one that never
terminates (which is most likely not possible), or use await instead
of awaitSurely.
awaitForever :: (i -> Pipe i o u m a) -> Pipe i o u m u Source #
A useful utility function over repeated awaits. Will repeatedly
await and then continue with the given pipe whenever the upstream pipe
yields.
Can be used to implement many pipe combinators:
mapf =awaitForever$ x ->yield(f x)
yield :: o -> Pipe i o u m () Source #
Send output downstream.
Since v0.2.3.0, is strict. See yieldLazy for the original behavior.
yieldLazy :: o -> Pipe i o u m () Source #
Send output downstream without forcing its argument.
Since: 0.2.3.0
Special chaining
(&|) :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m (v, r) infixr 2 Source #
Like .|, but get the result of both pipes on termination, instead
of just the second. This means that p &| q will only terminate with a result when
both p and q terminate. (Typically, p .| q would terminate as soon as
q terminates.)
Since: 0.2.1.0
(|.) :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m v infixr 2 Source #
Like .|, but keep the result of the first pipe, instead of the
second. This means that p |. q will only terminate with a result when
both p and q terminate. (Typically, p .| q would terminate as soon as
q terminates.)
Since: 0.2.1.0
fuseBoth :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m (v, r) Source #
Useful prefix version of &|.
Since: 0.2.1.0
fuseUpstream :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m v Source #
Useful prefix version of |..
Since: 0.2.1.0
Incremental running
squeezePipeEither :: Monad m => Pipe i o u m a -> m ([o], Either (Either u i -> Pipe i o u m a) a) Source #
Squeeze a pipe by extracting all output that can be extracted before
any input is requested. Returns a Left if the pipe eventually does
request input (as a continuation on the new input, or a terminating u
value), or a Right if the pipe terminates with a value before ever
asking for input.
Since: 0.2.1.0
Arguments
| :: Monad m | |
| => [i] | input to feed in |
| -> Pipe i o u m a | |
| -> m ([o], Either (i -> Pipe i o u m a) ([i], a)) |
Repeatedly run squeezePipe by giving it items from an input list.
Returns the outputs observed, and Left if the input list was exhausted
with more input expected, or Right if the pipe terminated, with the
leftover inputs and output result.
Since: 0.2.1.0
Arguments
| :: Monad m | |
| => [i] | input to feed in |
| -> Pipe i o u m a | |
| -> m ([o], Either (Either u i -> Pipe i o u m a) ([i], a)) |
Repeatedly run squeezePipeEither by giving it items from an input
list. Returns the outputs observed, and Left if the input list was
exhausted with more input expected (or a u terminating upstream
value), or Right if the pipe terminated, with the leftover inputs and
output result.
Since: 0.2.1.0
Pipe transformers
mapInput :: (i -> j) -> Pipe j o u m a -> Pipe i o u m a Source #
(Contravariantly) map over the expected input type.
mapOutput :: (p -> o) -> Pipe i p u m a -> Pipe i o u m a Source #
Map over the downstream output type.
If you want to map over the result type, use fmap.
mapUpRes :: (u -> v) -> Pipe i o v m a -> Pipe i o u m a Source #
(Contravariantly) map over the upstream result type.
trimapPipe :: (i -> j) -> (p -> o) -> (u -> v) -> Pipe j p v m a -> Pipe i o u m a Source #
Map over the input type, output type, and upstream result type.
If you want to map over the result type, use fmap.
hoistPipe :: (Monad m, Monad n) => (forall x. m x -> n x) -> Pipe i o u m a -> Pipe i o u n a Source #
Transform the underlying monad of a pipe.
Note that if you are trying to work with monad transformers, this is probably not what you want. See Data.Conduino.Lift for tools for working with underlying monad transformers.
feedbackPipe :: Monad m => Pipe x x u m a -> Pipe x x u m a Source #
Loop a pipe into itself.
- Will feed all output back to the input
- Will only ask for input upstream if output is stalled.
- Yields all outputted values downstream, effectively duplicating them.
Since: 0.2.1.0
feedbackPipeEither :: Monad m => Pipe (Either i o) o u m a -> Pipe i o u m a Source #
A version of feedbackPipe that distinguishes upstream input from
downstream output fed back. Gets Left from upstream, and Right from
its own output.
- Will feed all output back to the input
- Will only ask for input upstream if output is stalled.
- Yields all outputted values downstream, effectively duplicating them.
Since: 0.2.2.0
Wrappers
newtype ZipSource m a Source #
A newtype wrapper over a source () that gives it an
alternative Pipe () o VoidApplicative and Alternative instance, matching "ListT
done right".
<*> will pair up each output that the sources produce: if you await
a value from downstream, it will wait until both paired sources yield
before passing them on together.
<|> will completely exhaust the first source before moving on to the
next source.
ZipSource is effectively equivalent to "ListT done right", the true
List Monad transformer. <|> is concatentation. You can use this type
with lift to lift a yielding action and <|> to sequence yields to
implement the pattern described in
http://www.haskellforall.com/2014/11/how-to-build-library-agnostic-streaming.html,
where you can write streaming producers in a polymorphic way, and have
it run with pipes, conduit, etc.
The main difference is that its Applicative instance ("zipping") is
different from the traditional Applicative instance for ListT
("all combinations"). Effectively this becomes like a "zipping"
Applicative instance for ListT.
If you want a Monad (or MonadIO) instance,
use ListT instead, and convert using toListT/fromListT or the
PipeList pattern/constructor.
Constructors
| ZipSource | |
Fields
| |
Instances
| MonadTrans ZipSource Source # | |
Defined in Data.Conduino | |
| Monad m => Alternative (ZipSource m) Source # | |
| Monad m => Applicative (ZipSource m) Source # | |
Defined in Data.Conduino | |
| Functor (ZipSource m) Source # | |
unconsZipSource :: Monad m => ZipSource m a -> m (Maybe (Maybe a, ZipSource m a)) Source #
ZipSource is effectively ListT returning a Maybe. As such, you
can use unconsZipSource to "peel off" the first yielded item, if it
exists, and return the "rest of the list".
zipSource :: Monad m => Pipe () (a -> b) u m () -> Pipe () a v m () -> Pipe () b w m () Source #
Takes two sources and runs them in parallel, collating their outputs.
Since: 0.2.1.0
newtype ZipSink i u m a Source #
A newtype wrapper over a sink () that gives it an
alternative Pipe i VoidApplicative and Alternative instance.
<*> will distribute input over both sinks, and output a final result
once both sinks finish.
<|> will distribute input over both sinks, and output a final result
as soon as one or the other finishes.
Constructors
| ZipSink | |
Fields
| |
Instances
| MonadTrans (ZipSink i u) Source # | |
Defined in Data.Conduino | |
| Monad m => Alternative (ZipSink i u m) Source # |
|
| Monad m => Applicative (ZipSink i u m) Source # |
|
Defined in Data.Conduino Methods pure :: a -> ZipSink i u m a # (<*>) :: ZipSink i u m (a -> b) -> ZipSink i u m a -> ZipSink i u m b # liftA2 :: (a -> b -> c) -> ZipSink i u m a -> ZipSink i u m b -> ZipSink i u m c # (*>) :: ZipSink i u m a -> ZipSink i u m b -> ZipSink i u m b # (<*) :: ZipSink i u m a -> ZipSink i u m b -> ZipSink i u m a # | |
| Functor (ZipSink i u m) Source # | |
zipSink :: Monad m => Pipe i Void u m (a -> b) -> Pipe i Void u m a -> Pipe i Void u m b Source #
Distribute input to both sinks, and finishes with the final result once both finish.
Forms an identity with pure.
altSink :: Monad m => Pipe i Void u m a -> Pipe i Void u m a -> Pipe i Void u m a Source #
Distribute input to both sinks, and finishes with the result of the one that finishes first.