abstract class AbstractPushStreamImpl<T> extends java.lang.Object implements PushStream<T>
Modifier and Type | Class and Description |
---|---|
private static class |
AbstractPushStreamImpl.ArrayQueue<E> |
(package private) static class |
AbstractPushStreamImpl.State |
Modifier and Type | Field and Description |
---|---|
protected java.util.concurrent.atomic.AtomicReference<AbstractPushStreamImpl.State> |
closed |
private Function<T,T> |
IDENTITY |
protected java.util.concurrent.atomic.AtomicReference<PushEventConsumer<T>> |
next |
protected java.util.concurrent.atomic.AtomicReference<java.lang.Runnable> |
onCloseCallback |
protected java.util.concurrent.atomic.AtomicReference<java.util.function.Consumer<? super java.lang.Throwable>> |
onErrorCallback |
protected PromiseFactory |
promiseFactory |
protected PushStreamProvider |
psp |
Constructor and Description |
---|
AbstractPushStreamImpl(PushStreamProvider psp,
PromiseFactory promiseFactory) |
Modifier and Type | Method and Description |
---|---|
PushStream<T> |
adjustBackPressure(java.util.function.LongUnaryOperator adjustment)
Changes the back-pressure propagated by this pipeline stage.
|
PushStream<T> |
adjustBackPressure(java.util.function.ToLongBiFunction<T,java.lang.Long> adjustment)
Changes the back-pressure propagated by this pipeline stage.
|
private <R> void |
aggregateAndForward(java.util.function.BiFunction<java.lang.Long,java.util.Collection<T>,R> f,
AbstractPushStreamImpl<R> eventStream,
PushEvent<? extends T> event,
java.util.Queue<T> queue,
java.util.concurrent.Executor executor,
long elapsed) |
private <R> long |
aggregateAndForward(Function<java.util.Collection<T>,R> f,
AbstractPushStreamImpl<R> eventStream,
PushEvent<? extends T> event,
java.util.Queue<T> queue) |
Promise<java.lang.Boolean> |
allMatch(Predicate<? super T> predicate)
Closes the channel and resolve the promise with false when the predicate
does not matches a pay load.
|
Promise<java.lang.Boolean> |
anyMatch(Predicate<? super T> predicate)
Close the channel and resolve the promise with true when the predicate
matches a payload.
|
<R> PushStream<R> |
asyncMap(int n,
int delay,
Function<? super T,Promise<? extends R>> mapper)
Asynchronously map the payload values.
|
protected abstract boolean |
begin() |
PushStream<T> |
buffer()
Buffer the events in a queue using default values for the queue size and
other behaviors.
|
<U extends java.util.concurrent.BlockingQueue<PushEvent<? extends T>>> |
buildBuffer()
Build a buffer to enqueue events in a queue using custom values for the
queue size and other behaviors.
|
(package private) void |
check(java.util.concurrent.atomic.AtomicLong lastTime,
long timeout) |
void |
close()
Close this PushStream by sending an event of type
PushEvent.EventType.CLOSE downstream. |
protected boolean |
close(PushEvent<T> event) |
protected boolean |
close(PushEvent<T> event,
boolean sendDownStreamEvent) |
<R> PushStream<R> |
coalesce(Function<? super T,java.util.Optional<R>> accumulator)
Coalesces a number of events into a new type of event.
|
<R> PushStream<R> |
coalesce(int count,
Function<java.util.Collection<T>,R> f)
Coalesces a number of events into a new type of event.
|
<R> PushStream<R> |
coalesce(java.util.function.IntSupplier count,
Function<java.util.Collection<T>,R> f)
Coalesces a number of events into a new type of event.
|
<R,A> Promise<R> |
collect(java.util.stream.Collector<? super T,A,R> collector)
See Stream.
|
Promise<java.lang.Long> |
count()
See Stream.
|
PushStream<T> |
distinct()
Remove any duplicates.
|
PushStream<T> |
filter(Predicate<? super T> predicate)
Only pass events downstream when the predicate tests true.
|
Promise<java.util.Optional<T>> |
findAny()
Close the channel and resolve the promise with the first element.
|
Promise<java.util.Optional<T>> |
findFirst()
Close the channel and resolve the promise with the first element.
|
<R> PushStream<R> |
flatMap(Function<? super T,? extends PushStream<? extends R>> mapper)
Flat map the payload value (turn one event into 0..n events of
potentially another type).
|
Promise<java.lang.Void> |
forEach(java.util.function.Consumer<? super T> action)
Execute the action for each event received until the channel is closed.
|
Promise<java.lang.Long> |
forEachEvent(PushEventConsumer<? super T> action)
Pass on each event to another consumer until the stream is closed.
|
PushStream<T> |
fork(int n,
int delay,
java.util.concurrent.Executor ex)
Execute the downstream events in up to n background threads.
|
protected java.util.Queue<T> |
getQueueForInternalBuffering(int size) |
private <R> java.lang.Runnable |
getWindowTask(AbstractPushStreamImpl<R> eventStream,
java.util.function.BiFunction<java.lang.Long,java.util.Collection<T>,R> f,
java.util.function.Supplier<java.time.Duration> time,
java.util.function.IntSupplier maxEvents,
java.lang.Object lock,
long expectedCounter,
java.util.concurrent.atomic.AtomicReference<java.util.Queue<T>> queueRef,
java.util.concurrent.atomic.AtomicLong timestamp,
java.util.concurrent.atomic.AtomicLong counter,
java.util.concurrent.atomic.AtomicLong previousWindowSize,
java.util.concurrent.Executor executor) |
protected long |
handleEvent(PushEvent<? extends T> event) |
PushStream<T> |
limit(java.time.Duration maxTime)
Automatically close the channel after the given amount of time has
elapsed.
|
PushStream<T> |
limit(long maxSize)
Automatically close the channel after the maxSize number of elements is
received.
|
<R> PushStream<R> |
map(Function<? super T,? extends R> mapper)
Map a payload value.
|
Promise<java.util.Optional<T>> |
max(java.util.Comparator<? super T> comparator)
See Stream.
|
PushStream<T> |
merge(PushEventSource<? extends T> source)
Merge in the events from another source.
|
PushStream<T> |
merge(PushStream<? extends T> source)
Merge in the events from another PushStream.
|
Promise<java.util.Optional<T>> |
min(java.util.Comparator<? super T> comparator)
See Stream.
|
Promise<java.lang.Boolean> |
noneMatch(Predicate<? super T> predicate)
Closes the channel and resolve the promise with false when the predicate
matches any pay load.
|
PushStream<T> |
onClose(java.lang.Runnable closeHandler)
Must be run after the channel is closed.
|
PushStream<T> |
onError(java.util.function.Consumer<? super java.lang.Throwable> closeHandler)
Must be run after the channel is closed.
|
Promise<java.util.Optional<T>> |
reduce(java.util.function.BinaryOperator<T> accumulator)
Standard reduce without identity, so the return is an Optional.
|
Promise<T> |
reduce(T identity,
java.util.function.BinaryOperator<T> accumulator)
Standard reduce, see Stream.
|
<U> Promise<U> |
reduce(U identity,
java.util.function.BiFunction<U,? super T,U> accumulator,
java.util.function.BinaryOperator<U> combiner)
Standard reduce with identity, accumulator and combiner.
|
PushStream<T> |
sequential()
Ensure that any events are delivered sequentially.
|
PushStream<T> |
skip(long n)
Skip a number of events in the channel.
|
PushStream<T> |
sorted()
Sorted the elements, assuming that T extends Comparable.
|
PushStream<T> |
sorted(java.util.Comparator<? super T> comparator)
Sorted the elements with the given comparator.
|
PushStream<T>[] |
split(Predicate<? super T>... predicates)
Split the events to different streams based on a predicate.
|
PushStream<T> |
timeout(java.time.Duration maxTime)
Automatically fail the channel if no events are received for the
indicated length of time.
|
Promise<java.lang.Object[]> |
toArray()
Collect the payloads in an Object array after the channel is closed.
|
<A extends T> |
toArray(java.util.function.IntFunction<A[]> generator)
Collect the payloads in an Object array after the channel is closed.
|
private void |
updateNext(PushEventConsumer<T> consumer) |
protected abstract void |
upstreamClose(PushEvent<?> close) |
<R> PushStream<R> |
window(java.time.Duration time,
java.util.concurrent.Executor executor,
Function<java.util.Collection<T>,R> f)
Buffers a number of events over a fixed time interval and then forwards
the events to an accumulator function.
|
<R> PushStream<R> |
window(java.time.Duration time,
Function<java.util.Collection<T>,R> f)
Buffers a number of events over a fixed time interval and then forwards
the events to an accumulator function.
|
<R> PushStream<R> |
window(java.util.function.Supplier<java.time.Duration> time,
java.util.function.IntSupplier maxEvents,
java.util.function.BiFunction<java.lang.Long,java.util.Collection<T>,R> f)
Buffers a number of events over a variable time interval and then
forwards the events to an accumulator function.
|
<R> PushStream<R> |
window(java.util.function.Supplier<java.time.Duration> time,
java.util.function.IntSupplier maxEvents,
java.util.concurrent.Executor ex,
java.util.function.BiFunction<java.lang.Long,java.util.Collection<T>,R> f)
Buffers a number of events over a variable time interval and then
forwards the events to an accumulator function.
|
protected final PushStreamProvider psp
protected final PromiseFactory promiseFactory
protected final java.util.concurrent.atomic.AtomicReference<AbstractPushStreamImpl.State> closed
protected final java.util.concurrent.atomic.AtomicReference<PushEventConsumer<T>> next
protected final java.util.concurrent.atomic.AtomicReference<java.lang.Runnable> onCloseCallback
protected final java.util.concurrent.atomic.AtomicReference<java.util.function.Consumer<? super java.lang.Throwable>> onErrorCallback
AbstractPushStreamImpl(PushStreamProvider psp, PromiseFactory promiseFactory)
protected abstract boolean begin()
protected abstract void upstreamClose(PushEvent<?> close)
public void close()
PushStream
PushEvent.EventType.CLOSE
downstream. Closing a PushStream is a
safe operation that will not throw an Exception.
Calling close()
on a closed PushStream has no effect.
close
in interface java.lang.AutoCloseable
close
in interface PushStream<T>
public PushStream<T> onClose(java.lang.Runnable closeHandler)
PushStream
onClose
in interface PushStream<T>
closeHandler
- Will be called on closepublic PushStream<T> onError(java.util.function.Consumer<? super java.lang.Throwable> closeHandler)
PushStream
onError
in interface PushStream<T>
closeHandler
- Will be called on closeprivate void updateNext(PushEventConsumer<T> consumer)
public PushStream<T> filter(Predicate<? super T> predicate)
PushStream
filter
in interface PushStream<T>
predicate
- The predicate that is tested (not null)public <R> PushStream<R> map(Function<? super T,? extends R> mapper)
PushStream
map
in interface PushStream<T>
mapper
- The map functionpublic <R> PushStream<R> asyncMap(int n, int delay, Function<? super T,Promise<? extends R>> mapper)
PushStream
The PushStream limits the number of concurrently running mapping operations, and returns back pressure based on the number of existing queued operations.
asyncMap
in interface PushStream<T>
n
- number of simultaneous promises to usedelay
- Nr of ms/promise that is queued back pressuremapper
- The mapping functionpublic <R> PushStream<R> flatMap(Function<? super T,? extends PushStream<? extends R>> mapper)
PushStream
flatMap
in interface PushStream<T>
mapper
- The flat map functionpublic PushStream<T> distinct()
PushStream
distinct
in interface PushStream<T>
public PushStream<T> sorted()
PushStream
sorted
in interface PushStream<T>
public PushStream<T> sorted(java.util.Comparator<? super T> comparator)
PushStream
sorted
in interface PushStream<T>
public PushStream<T> limit(long maxSize)
PushStream
limit
in interface PushStream<T>
maxSize
- Maximum number of elements has been receivedpublic PushStream<T> limit(java.time.Duration maxTime)
PushStream
limit
in interface PushStream<T>
maxTime
- The maximum time that the stream should remain openpublic PushStream<T> timeout(java.time.Duration maxTime)
PushStream
TimeoutException
will be sent.timeout
in interface PushStream<T>
maxTime
- The length of time that the stream should remain open
when no events are being received.void check(java.util.concurrent.atomic.AtomicLong lastTime, long timeout)
public PushStream<T> skip(long n)
PushStream
skip
in interface PushStream<T>
n
- number of elements to skippublic PushStream<T> fork(int n, int delay, java.util.concurrent.Executor ex)
PushStream
fork
in interface PushStream<T>
n
- number of simultaneous background threads to usedelay
- Nr of ms/thread that is queued back pressureex
- an executor to use for the background threads.public PushStream<T> buffer()
PushStream
Buffers are useful for "bursty" event sources which produce a number of
events close together, then none for some time. These bursts can
sometimes overwhelm downstream event consumers. Buffering will not,
however, protect downstream components from a source which produces
events faster than they can be consumed. For fast sources
PushStream.filter(Predicate)
and PushStream.coalesce(int, Function)
PushStream.fork(int, int, Executor)
are better choices.
buffer
in interface PushStream<T>
public <U extends java.util.concurrent.BlockingQueue<PushEvent<? extends T>>> PushStreamBuilder<T,U> buildBuffer()
PushStream
Buffers are useful for "bursty" event sources which produce a number of
events close together, then none for some time. These bursts can
sometimes overwhelm downstream event consumers. Buffering will not,
however, protect downstream components from a source which produces
events faster than they can be consumed. For fast sources
PushStream.filter(Predicate)
and PushStream.coalesce(int, Function)
PushStream.fork(int, int, Executor)
are better choices.
Buffers are also useful as "circuit breakers" in the pipeline. If a
QueuePolicyOption.FAIL
is used then a full buffer will trigger
the stream to close, preventing an event storm from reaching the client.
buildBuffer
in interface PushStream<T>
public PushStream<T> merge(PushEventSource<? extends T> source)
PushStream
merge
in interface PushStream<T>
source
- The source to merge in.public PushStream<T> merge(PushStream<? extends T> source)
PushStream
merge
in interface PushStream<T>
source
- The source to merge in.public PushStream<T>[] split(Predicate<? super T>... predicates)
PushStream
This method differs from other methods of PushStream in three significant ways:
split
in interface PushStream<T>
predicates
- the predicates to testpublic PushStream<T> sequential()
PushStream
sequential
in interface PushStream<T>
public <R> PushStream<R> coalesce(Function<? super T,java.util.Optional<R>> accumulator)
PushStream
coalesce
in interface PushStream<T>
public <R> PushStream<R> coalesce(int count, Function<java.util.Collection<T>,R> f)
PushStream
coalesce
in interface PushStream<T>
public <R> PushStream<R> coalesce(java.util.function.IntSupplier count, Function<java.util.Collection<T>,R> f)
PushStream
coalesce
in interface PushStream<T>
private <R> long aggregateAndForward(Function<java.util.Collection<T>,R> f, AbstractPushStreamImpl<R> eventStream, PushEvent<? extends T> event, java.util.Queue<T> queue) throws java.lang.Exception
java.lang.Exception
public <R> PushStream<R> window(java.time.Duration time, Function<java.util.Collection<T>,R> f)
PushStream
PushStream
.
window
in interface PushStream<T>
public <R> PushStream<R> window(java.time.Duration time, java.util.concurrent.Executor executor, Function<java.util.Collection<T>,R> f)
PushStream
window
in interface PushStream<T>
public <R> PushStream<R> window(java.util.function.Supplier<java.time.Duration> time, java.util.function.IntSupplier maxEvents, java.util.function.BiFunction<java.lang.Long,java.util.Collection<T>,R> f)
PushStream
PushStream
.
window
in interface PushStream<T>
public <R> PushStream<R> window(java.util.function.Supplier<java.time.Duration> time, java.util.function.IntSupplier maxEvents, java.util.concurrent.Executor ex, java.util.function.BiFunction<java.lang.Long,java.util.Collection<T>,R> f)
PushStream
PushStream
.
window
in interface PushStream<T>
protected java.util.Queue<T> getQueueForInternalBuffering(int size)
private <R> java.lang.Runnable getWindowTask(AbstractPushStreamImpl<R> eventStream, java.util.function.BiFunction<java.lang.Long,java.util.Collection<T>,R> f, java.util.function.Supplier<java.time.Duration> time, java.util.function.IntSupplier maxEvents, java.lang.Object lock, long expectedCounter, java.util.concurrent.atomic.AtomicReference<java.util.Queue<T>> queueRef, java.util.concurrent.atomic.AtomicLong timestamp, java.util.concurrent.atomic.AtomicLong counter, java.util.concurrent.atomic.AtomicLong previousWindowSize, java.util.concurrent.Executor executor)
private <R> void aggregateAndForward(java.util.function.BiFunction<java.lang.Long,java.util.Collection<T>,R> f, AbstractPushStreamImpl<R> eventStream, PushEvent<? extends T> event, java.util.Queue<T> queue, java.util.concurrent.Executor executor, long elapsed)
public PushStream<T> adjustBackPressure(java.util.function.LongUnaryOperator adjustment)
PushStream
The supplied function receives the back pressure returned by the next pipeline stage and returns the back pressure that should be returned by this stage. This function will not be called if the previous pipeline stage returns negative back pressure.
adjustBackPressure
in interface PushStream<T>
public PushStream<T> adjustBackPressure(java.util.function.ToLongBiFunction<T,java.lang.Long> adjustment)
PushStream
The supplied function receives the data object passed to the next pipeline stage and the back pressure that was returned by that stage when accepting it. The function returns the back pressure that should be returned by this stage. This function will not be called if the previous pipeline stage returns negative back pressure.
adjustBackPressure
in interface PushStream<T>
public Promise<java.lang.Void> forEach(java.util.function.Consumer<? super T> action)
PushStream
This is a terminal operation
forEach
in interface PushStream<T>
action
- The action to performpublic Promise<java.lang.Object[]> toArray()
PushStream
This is a terminal operation
toArray
in interface PushStream<T>
public <A extends T> Promise<A[]> toArray(java.util.function.IntFunction<A[]> generator)
PushStream
This is a terminal operation
toArray
in interface PushStream<T>
public Promise<T> reduce(T identity, java.util.function.BinaryOperator<T> accumulator)
PushStream
This is a terminal operation
reduce
in interface PushStream<T>
identity
- The identity/begin valueaccumulator
- The accumulatorpublic Promise<java.util.Optional<T>> reduce(java.util.function.BinaryOperator<T> accumulator)
PushStream
This is a terminal operation
reduce
in interface PushStream<T>
accumulator
- The accumulatorpublic <U> Promise<U> reduce(U identity, java.util.function.BiFunction<U,? super T,U> accumulator, java.util.function.BinaryOperator<U> combiner)
PushStream
This is a terminal operation
reduce
in interface PushStream<T>
combiner
- combines two U's into one U (for example, combine two
lists)public <R,A> Promise<R> collect(java.util.stream.Collector<? super T,A,R> collector)
PushStream
This is a terminal operation
collect
in interface PushStream<T>
public Promise<java.util.Optional<T>> min(java.util.Comparator<? super T> comparator)
PushStream
This is a terminal operation
min
in interface PushStream<T>
public Promise<java.util.Optional<T>> max(java.util.Comparator<? super T> comparator)
PushStream
This is a terminal operation
max
in interface PushStream<T>
public Promise<java.lang.Long> count()
PushStream
This is a terminal operation
count
in interface PushStream<T>
public Promise<java.lang.Boolean> anyMatch(Predicate<? super T> predicate)
PushStream
This is a short circuiting terminal operation
anyMatch
in interface PushStream<T>
public Promise<java.lang.Boolean> allMatch(Predicate<? super T> predicate)
PushStream
This is a short circuiting terminal operation
allMatch
in interface PushStream<T>
public Promise<java.lang.Boolean> noneMatch(Predicate<? super T> predicate)
PushStream
This is a short circuiting terminal operation
noneMatch
in interface PushStream<T>
public Promise<java.util.Optional<T>> findFirst()
PushStream
findFirst
in interface PushStream<T>
public Promise<java.util.Optional<T>> findAny()
PushStream
This is a terminal operation
findAny
in interface PushStream<T>
public Promise<java.lang.Long> forEachEvent(PushEventConsumer<? super T> action)
PushStream
This is a terminal operation
forEachEvent
in interface PushStream<T>