Streams.java
001 /*
002  * Java Genetic Algorithm Library (jenetics-6.2.0).
003  * Copyright (c) 2007-2021 Franz Wilhelmstötter
004  *
005  * Licensed under the Apache License, Version 2.0 (the "License");
006  * you may not use this file except in compliance with the License.
007  * You may obtain a copy of the License at
008  *
009  *      http://www.apache.org/licenses/LICENSE-2.0
010  *
011  * Unless required by applicable law or agreed to in writing, software
012  * distributed under the License is distributed on an "AS IS" BASIS,
013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014  * See the License for the specific language governing permissions and
015  * limitations under the License.
016  *
017  * Author:
018  *    Franz Wilhelmstötter (franz.wilhelmstoetter@gmail.com)
019  */
020 package io.jenetics.util;
021 
022 import static java.time.Clock.systemUTC;
023 import static java.util.Objects.requireNonNull;
024 
025 import java.time.Clock;
026 import java.time.Duration;
027 import java.util.Comparator;
028 import java.util.function.BinaryOperator;
029 import java.util.function.Function;
030 import java.util.stream.Stream;
031 
032 /**
033  * This class contains factory methods for (flat) mapping stream elements. The
034  * functions of this class can be used in the following way.
035  *
036  <pre>{@code
037  * final ISeq<Integer> values = new Random().ints(0, 100).boxed()
038  *     .limit(100)
039  *     .flatMap(Streams.toIntervalMax(13))
040  *     .collect(ISeq.toISeq());
041  * }</pre>
042  *
043  @author <a href="mailto:franz.wilhelmstoetter@gmail.com">Franz Wilhelmstötter</a>
044  @since 6.0
045  @version 6.0
046  */
047 public final class Streams {
048     private Streams() {}
049 
050 
051     /**
052      * Return a new flat-mapper function, which guarantees a strictly increasing
053      * stream, from an arbitrarily ordered source stream. Note that this
054      * function doesn't sort the stream. It <em>just</em> skips the <em>out of
055      * order</em> elements.
056      *
057      <pre>{@code
058      *     +----3--2--5--4--7--7--4--9----|
059      *        toStrictlyIncreasing()
060      *     +----3-----5-----7--------9----|
061      * }</pre>
062      *
063      <pre>{@code
064      * final ISeq<Integer> values = new Random().ints(0, 100)
065      *     .boxed()
066      *     .limit(100)
067      *     .flatMap(Streams.toStrictlyIncreasing())
068      *     .collect(ISeq.toISeq());
069      *
070      * System.out.println(values);
071      * // [6,47,65,78,96,96,99]
072      * }</pre>
073      *
074      *
075      @param <C> the comparable type
076      @return a new flat-mapper function
077      */
078     public static <C extends Comparable<? super C>>
079     Function<C, Stream<C>> toStrictlyIncreasing() {
080         return strictlyImproving(Streams::max);
081     }
082 
083     /**
084      * Return a new flat-mapper function, which guarantees a strictly decreasing
085      * stream, from an arbitrarily ordered source stream. Note that this
086      * function doesn't sort the stream. It <em>just</em> skips the <em>out of
087      * order</em> elements.
088      *
089      <pre>{@code
090      *     +----9--8--9--5--6--6--2--9----|
091      *        toStrictlyDecreasing()
092      *     +----9--8-----5--------2-------|
093      * }</pre>
094      *
095      <pre>{@code
096      * final ISeq<Integer> values = new Random().ints(0, 100)
097      *     .boxed()
098      *     .limit(100)
099      *     .flatMap(Streams.toStrictlyDecreasing())
100      *     .collect(ISeq.toISeq());
101      *
102      * System.out.println(values);
103      * // [45,32,15,12,3,1]
104      * }</pre>
105      *
106      @param <C> the comparable type
107      @return a new flat-mapper function
108      */
109     public static <C extends Comparable<? super C>>
110     Function<C, Stream<C>> toStrictlyDecreasing() {
111         return strictlyImproving(Streams::min);
112     }
113 
114     /**
115      * Return a new flat-mapper function, which guarantees a strictly improving
116      * stream, from an arbitrarily ordered source stream. Note that this
117      * function doesn't sort the stream. It <em>just</em> skips the <em>out of
118      * order</em> elements.
119      *
120      <pre>{@code
121      * final ISeq<Integer> values = new Random().ints(0, 100)
122      *     .boxed()
123      *     .limit(100)
124      *     .flatMap(Streams.toStrictlyImproving(Comparator.naturalOrder()))
125      *     .collect(ISeq.toISeq());
126      *
127      * System.out.println(values);
128      * // [6,47,65,78,96,96,99]
129      * }</pre>
130      *
131      @see #toStrictlyIncreasing()
132      @see #toStrictlyDecreasing()
133      *
134      @param <T> the element type
135      @param comparator the comparator used for testing the elements
136      @return a new flat-mapper function
137      */
138     public static <T> Function<T, Stream<T>>
139     toStrictlyImproving(final Comparator<? super T> comparator) {
140         return strictlyImproving((a, b-> best(comparator, a, b));
141     }
142 
143     private static <C> Function<C, Stream<C>>
144     strictlyImproving(final BinaryOperator<C> comparator) {
145         requireNonNull(comparator);
146 
147         return new Function<>() {
148             private C _best;
149 
150             @Override
151             public Stream<C> apply(final C result) {
152                 final C best = comparator.apply(_best, result);
153 
154                 final Stream<C> stream = best == _best
155                     ? Stream.empty()
156                     : Stream.of(best);
157 
158                 _best = best;
159 
160                 return stream;
161             }
162         };
163     }
164 
165     private static <T extends Comparable<? super T>> T max(final T a, final T b) {
166         return best(Comparator.naturalOrder(), a, b);
167     }
168 
169     private static <T extends Comparable<? super T>> T min(final T a, final T b) {
170         return best(Comparator.reverseOrder(), a, b);
171     }
172 
173     private static <T>
174     T best(final Comparator<? super T> comparator, final T a, final T b) {
175         if (a == null && b == nullreturn null;
176         if (a == nullreturn b;
177         if (b == nullreturn a;
178         return comparator.compare(a, b>= ? a : b;
179     }
180 
181     /**
182      * Return a new flat-mapper function which returns (emits) the maximal value
183      * of the last <em>n</em> elements.
184      *
185      <pre>{@code
186      *          +----3---+----3---+
187      *          |        |        |
188      *     +----9--8--3--3--5--4--2--9----|
189      *        toIntervalMax(3)
190      *     +----------9--------5----------|
191      * }</pre>
192      *
193      @param size the size of the slice
194      @param <C> the element type
195      @return a new flat-mapper function
196      @throws IllegalArgumentException if the given size is smaller than one
197      */
198     public static <C extends Comparable<? super C>>
199     Function<C, Stream<C>> toIntervalMax(final int size) {
200         return sliceBest(Streams::max, size);
201     }
202 
203     /**
204      * Return a new flat-mapper function which returns (emits) the minimal value
205      * of the last <em>n</em> elements.
206      *
207      <pre>{@code
208      *          +----3---+----3---+
209      *          |        |        |
210      *     +----9--8--3--3--1--4--2--9----|
211      *        toIntervalMin(3)
212      *     +----------3--------1----------|
213      * }</pre>
214      *
215      @param size the size of the slice
216      @param <C> the element type
217      @return a new flat-mapper function
218      @throws IllegalArgumentException if the given size is smaller than one
219      */
220     public static <C extends Comparable<? super C>>
221     Function<C, Stream<C>> toIntervalMin(final int size) {
222         return sliceBest(Streams::min, size);
223     }
224 
225     /**
226      * Return a new flat-mapper function which returns (emits) the minimal value
227      * of the last <em>n</em> elements.
228      *
229      @see #toIntervalMax(int)
230      @see #toIntervalMin(int)
231      *
232      @param <C> the element type
233      @param size the size of the slice
234      @param comparator the comparator used for testing the elements
235      @return a new flat-mapper function
236      @throws IllegalArgumentException if the given size is smaller than one
237      @throws NullPointerException if the given {@code comparator} is
238      *         {@code null}
239      */
240     public static <C> Function<C, Stream<C>>
241     toIntervalBest(final Comparator<? super C> comparator, final int size) {
242         requireNonNull(comparator);
243         return sliceBest((a, b-> best(comparator, a, b), size);
244     }
245 
246     private static <C> Function<C, Stream<C>> sliceBest(
247         final BinaryOperator<C> comp,
248         final int rangeSize
249     ) {
250         requireNonNull(comp);
251         if (rangeSize < 1) {
252             throw new IllegalArgumentException(
253                 "Range size must be at least one: " + rangeSize
254             );
255         }
256 
257         return new Function<>() {
258             private int _count = 0;
259             private C _best;
260 
261             @Override
262             public Stream<C> apply(final C value) {
263                 ++_count;
264                 _best = comp.apply(_best, value);
265 
266                 final Stream<C> result;
267                 if (_count >= rangeSize) {
268                     result = Stream.of(_best);
269                     _count = 0;
270                     _best = null;
271                 else {
272                     result = Stream.empty();
273                 }
274 
275                 return result;
276             }
277         };
278     }
279 
280     /**
281      * Return a new flat-mapper function which returns (emits) the maximal value
282      * of the elements emitted within the given {@code timespan}.
283      *
284      <pre>{@code
285      *          +---3s---+---3s---+
286      *          |        |        |
287      *     +----9--8--3--3--5--4--2--9----|
288      *        toIntervalMax(3s)
289      *     +----------9--------5----------|
290      * }</pre>
291      *
292      @see #toIntervalMax(Duration, Clock)
293      *
294      @param <C> the element type
295      @param timespan the timespan the elements are collected for the
296      *        calculation slice
297      @return a new flat-mapper function
298      @throws IllegalArgumentException if the given size is smaller than one
299      @throws NullPointerException if the given {@code timespan} is {@code null}
300      */
301     public static <C extends Comparable<? super C>>
302     Function<C, Stream<C>> toIntervalMax(final Duration timespan) {
303         return sliceBest(Streams::max, timespan, systemUTC());
304     }
305 
306     /**
307      * Return a new flat-mapper function which returns (emits) the maximal value
308      * of the elements emitted within the given {@code timespan}.
309      *
310      <pre>{@code
311      *          +---3s---+---3s---+
312      *          |        |        |
313      *     +----9--8--3--3--5--4--2--9----|
314      *        toIntervalMax(3s)
315      *     +----------9--------5----------|
316      * }</pre>
317      *
318      @see #toIntervalMax(Duration)
319      *
320      @param <C> the element type
321      @param timespan the timespan the elements are collected for the
322      *        calculation slice
323      @param clock the {@code clock} used for measuring the {@code timespan}
324      @return a new flat-mapper function
325      @throws IllegalArgumentException if the given size is smaller than one
326      @throws NullPointerException if one of the arguments is {@code null}
327      */
328     public static <C extends Comparable<? super C>>
329     Function<C, Stream<C>> toIntervalMax(final Duration timespan, final Clock clock) {
330         return sliceBest(Streams::max, timespan, clock);
331     }
332 
333     /**
334      * Return a new flat-mapper function which returns (emits) the minimal value
335      * of the elements emitted within the given {@code timespan}.
336      *
337      <pre>{@code
338      *          +---3s---+---3s---+
339      *          |        |        |
340      *     +----9--8--3--3--1--4--2--9----|
341      *        toIntervalMin(3s)
342      *     +----------3--------1----------|
343      * }</pre>
344      *
345      @see #toIntervalMin(Duration, Clock)
346      *
347      @param <C> the element type
348      @param timespan the timespan the elements are collected for the
349      *        calculation slice
350      @return a new flat-mapper function
351      @throws IllegalArgumentException if the given size is smaller than one
352      @throws NullPointerException if the given {@code timespan} is {@code null}
353      */
354     public static <C extends Comparable<? super C>>
355     Function<C, Stream<C>> toIntervalMin(final Duration timespan) {
356         return sliceBest(Streams::min, timespan, systemUTC());
357     }
358 
359     /**
360      * Return a new flat-mapper function which returns (emits) the minimal value
361      * of the elements emitted within the given {@code timespan}.
362      *
363      <pre>{@code
364      *          +---3s---+---3s---+
365      *          |        |        |
366      *     +----9--8--3--3--1--4--2--9----|
367      *        toIntervalMin(3s)
368      *     +----------3--------1----------|
369      * }</pre>
370      *
371      @see #toIntervalMin(Duration)
372      *
373      @param <C> the element type
374      @param timespan the timespan the elements are collected for the
375      *        calculation slice
376      @param clock the {@code clock} used for measuring the {@code timespan}
377      @return a new flat-mapper function
378      @throws IllegalArgumentException if the given size is smaller than one
379      @throws NullPointerException if one of the arguments is {@code null}
380      */
381     public static <C extends Comparable<? super C>>
382     Function<C, Stream<C>> toIntervalMin(final Duration timespan, final Clock clock) {
383         return sliceBest(Streams::min, timespan, clock);
384     }
385 
386     /**
387      * Return a new flat-mapper function which returns (emits) the minimal value
388      * of the elements emitted within the given {@code timespan}.
389      *
390      @see #toIntervalMin(Duration)
391      @see #toIntervalMax(Duration)
392      *
393      @param <C> the element type
394      @param comparator the comparator used for testing the elements
395      @param timespan the timespan the elements are collected for the
396      *        calculation slice
397      @return a new flat-mapper function
398      @throws IllegalArgumentException if the given size is smaller than one
399      @throws NullPointerException if one of the arguments is {@code null}
400      */
401     public static <C> Function<C, Stream<C>>
402     toIntervalBest(final Comparator<? super C> comparator, final Duration timespan) {
403         requireNonNull(comparator);
404         return sliceBest((a, b-> best(comparator, a, b), timespan, systemUTC());
405     }
406 
407     /**
408      * Return a new flat-mapper function which returns (emits) the minimal value
409      * of the elements emitted within the given {@code timespan}.
410      *
411      @param <C> the element type
412      @param comparator the comparator used for testing the elements
413      @param timespan the timespan the elements are collected for the
414      *        calculation slice
415      @param clock the {@code clock} used for measuring the {@code timespan}
416      @return a new flat-mapper function
417      @throws IllegalArgumentException if the given size is smaller than one
418      @throws NullPointerException if one of the arguments is {@code null}
419      */
420     public static <C> Function<C, Stream<C>>
421     toIntervalBest(
422         final Comparator<? super C> comparator,
423         final Duration timespan,
424         final Clock clock
425     ) {
426         requireNonNull(comparator);
427         return sliceBest((a, b-> best(comparator, a, b), timespan, clock);
428     }
429 
430     private static <C> Function<C, Stream<C>> sliceBest(
431         final BinaryOperator<C> comp,
432         final Duration timespan,
433         final Clock clock
434     ) {
435         requireNonNull(comp);
436         requireNonNull(timespan);
437 
438         return new Function<>() {
439             private final long _timespan  = timespan.toMillis();
440 
441             private long _start = 0;
442             private long _end = 0;
443             private C _best;
444 
445             @Override
446             public Stream<C> apply(final C value) {
447                 if (_start == 0) {
448                     _start = clock.millis();
449                 }
450 
451                 _best = comp.apply(_best, value);
452                 _end = clock.millis();
453 
454                 final Stream<C> result;
455                 if (_end - _start >= _timespan) {
456                     result = Stream.of(_best);
457                     _start = 0;
458                     _best = null;
459                 else {
460                     result = Stream.empty();
461                 }
462 
463                 return result;
464             }
465         };
466     }
467 
468 }