001/*
002 * Java Genetic Algorithm Library (jenetics-7.2.0).
003 * Copyright (c) 2007-2023 Franz Wilhelmstötter
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 * Author:
018 *    Franz Wilhelmstötter (franz.wilhelmstoetter@gmail.com)
019 */
020package io.jenetics.util;
021
022import static java.time.Clock.systemUTC;
023import static java.util.Objects.requireNonNull;
024
025import java.time.Clock;
026import java.time.Duration;
027import java.util.Comparator;
028import java.util.function.BinaryOperator;
029import java.util.function.Function;
030import java.util.stream.Stream;
031
032/**
033 * This class contains factory methods for (flat) mapping stream elements. The
034 * functions of this class can be used in the following way.
035 *
036 * <pre>{@code
037 * final ISeq<Integer> values = new Random().ints(0, 100).boxed()
038 *     .limit(100)
039 *     .flatMap(Streams.toIntervalMax(13))
040 *     .collect(ISeq.toISeq());
041 * }</pre>
042 *
043 * @author <a href="mailto:franz.wilhelmstoetter@gmail.com">Franz Wilhelmstötter</a>
044 * @since 6.0
045 * @version 6.0
046 */
047public final class Streams {
048        private Streams() {}
049
050
051        /**
052         * Return a new flat-mapper function, which guarantees a strictly increasing
053         * stream, from an arbitrarily ordered source stream. Note that this
054         * function doesn't sort the stream. It <em>just</em> skips the <em>out of
055         * order</em> elements.
056         *
057         * <pre>{@code
058         *     +----3--2--5--4--7--7--4--9----|
059         *        toStrictlyIncreasing()
060         *     +----3-----5-----7--------9----|
061         * }</pre>
062         *
063         * <pre>{@code
064         * final ISeq<Integer> values = new Random().ints(0, 100)
065         *     .boxed()
066         *     .limit(100)
067         *     .flatMap(Streams.toStrictlyIncreasing())
068         *     .collect(ISeq.toISeq());
069         *
070         * System.out.println(values);
071         * // [6,47,65,78,96,96,99]
072         * }</pre>
073         *
074         *
075         * @param <C> the comparable type
076         * @return a new flat-mapper function
077         */
078        public static <C extends Comparable<? super C>>
079        Function<C, Stream<C>> toStrictlyIncreasing() {
080                return strictlyImproving(Streams::max);
081        }
082
083        /**
084         * Return a new flat-mapper function, which guarantees a strictly decreasing
085         * stream, from an arbitrarily ordered source stream. Note that this
086         * function doesn't sort the stream. It <em>just</em> skips the <em>out of
087         * order</em> elements.
088         *
089         * <pre>{@code
090         *     +----9--8--9--5--6--6--2--9----|
091         *        toStrictlyDecreasing()
092         *     +----9--8-----5--------2-------|
093         * }</pre>
094         *
095         * <pre>{@code
096         * final ISeq<Integer> values = new Random().ints(0, 100)
097         *     .boxed()
098         *     .limit(100)
099         *     .flatMap(Streams.toStrictlyDecreasing())
100         *     .collect(ISeq.toISeq());
101         *
102         * System.out.println(values);
103         * // [45,32,15,12,3,1]
104         * }</pre>
105         *
106         * @param <C> the comparable type
107         * @return a new flat-mapper function
108         */
109        public static <C extends Comparable<? super C>>
110        Function<C, Stream<C>> toStrictlyDecreasing() {
111                return strictlyImproving(Streams::min);
112        }
113
114        /**
115         * Return a new flat-mapper function, which guarantees a strictly improving
116         * stream, from an arbitrarily ordered source stream. Note that this
117         * function doesn't sort the stream. It <em>just</em> skips the <em>out of
118         * order</em> elements.
119         *
120         * <pre>{@code
121         * final ISeq<Integer> values = new Random().ints(0, 100)
122         *     .boxed()
123         *     .limit(100)
124         *     .flatMap(Streams.toStrictlyImproving(Comparator.naturalOrder()))
125         *     .collect(ISeq.toISeq());
126         *
127         * System.out.println(values);
128         * // [6,47,65,78,96,96,99]
129         * }</pre>
130         *
131         * @see #toStrictlyIncreasing()
132         * @see #toStrictlyDecreasing()
133         *
134         * @param <T> the element type
135         * @param comparator the comparator used for testing the elements
136         * @return a new flat-mapper function
137         */
138        public static <T> Function<T, Stream<T>>
139        toStrictlyImproving(final Comparator<? super T> comparator) {
140                return strictlyImproving((a, b) -> best(comparator, a, b));
141        }
142
143        private static <C> Function<C, Stream<C>>
144        strictlyImproving(final BinaryOperator<C> comparator) {
145                requireNonNull(comparator);
146
147                return new Function<>() {
148                        private C _best;
149
150                        @Override
151                        public Stream<C> apply(final C result) {
152                                final C best = comparator.apply(_best, result);
153
154                                final Stream<C> stream = best == _best
155                                        ? Stream.empty()
156                                        : Stream.of(best);
157
158                                _best = best;
159
160                                return stream;
161                        }
162                };
163        }
164
165        private static <T extends Comparable<? super T>> T max(final T a, final T b) {
166                return best(Comparator.naturalOrder(), a, b);
167        }
168
169        private static <T extends Comparable<? super T>> T min(final T a, final T b) {
170                return best(Comparator.reverseOrder(), a, b);
171        }
172
173        private static <T>
174        T best(final Comparator<? super T> comparator, final T a, final T b) {
175                if (a == null && b == null) return null;
176                if (a == null) return b;
177                if (b == null) return a;
178                return comparator.compare(a, b) >= 0 ? a : b;
179        }
180
181        /**
182         * Return a new flat-mapper function which returns (emits) the maximal value
183         * of the last <em>n</em> elements.
184         *
185         * <pre>{@code
186         *          +----3---+----3---+
187         *          |        |        |
188         *     +----9--8--3--3--5--4--2--9----|
189         *        toIntervalMax(3)
190         *     +----------9--------5----------|
191         * }</pre>
192         *
193         * @param size the size of the slice
194         * @param <C> the element type
195         * @return a new flat-mapper function
196         * @throws IllegalArgumentException if the given size is smaller than one
197         */
198        public static <C extends Comparable<? super C>>
199        Function<C, Stream<C>> toIntervalMax(final int size) {
200                return sliceBest(Streams::max, size);
201        }
202
203        /**
204         * Return a new flat-mapper function which returns (emits) the minimal value
205         * of the last <em>n</em> elements.
206         *
207         * <pre>{@code
208         *          +----3---+----3---+
209         *          |        |        |
210         *     +----9--8--3--3--1--4--2--9----|
211         *        toIntervalMin(3)
212         *     +----------3--------1----------|
213         * }</pre>
214         *
215         * @param size the size of the slice
216         * @param <C> the element type
217         * @return a new flat-mapper function
218         * @throws IllegalArgumentException if the given size is smaller than one
219         */
220        public static <C extends Comparable<? super C>>
221        Function<C, Stream<C>> toIntervalMin(final int size) {
222                return sliceBest(Streams::min, size);
223        }
224
225        /**
226         * Return a new flat-mapper function which returns (emits) the minimal value
227         * of the last <em>n</em> elements.
228         *
229         * @see #toIntervalMax(int)
230         * @see #toIntervalMin(int)
231         *
232         * @param <C> the element type
233         * @param size the size of the slice
234         * @param comparator the comparator used for testing the elements
235         * @return a new flat-mapper function
236         * @throws IllegalArgumentException if the given size is smaller than one
237         * @throws NullPointerException if the given {@code comparator} is
238         *         {@code null}
239         */
240        public static <C> Function<C, Stream<C>>
241        toIntervalBest(final Comparator<? super C> comparator, final int size) {
242                requireNonNull(comparator);
243                return sliceBest((a, b) -> best(comparator, a, b), size);
244        }
245
246        private static <C> Function<C, Stream<C>> sliceBest(
247                final BinaryOperator<C> comp,
248                final int rangeSize
249        ) {
250                requireNonNull(comp);
251                if (rangeSize < 1) {
252                        throw new IllegalArgumentException(
253                                "Range size must be at least one: " + rangeSize
254                        );
255                }
256
257                return new Function<>() {
258                        private int _count = 0;
259                        private C _best;
260
261                        @Override
262                        public Stream<C> apply(final C value) {
263                                ++_count;
264                                _best = comp.apply(_best, value);
265
266                                final Stream<C> result;
267                                if (_count >= rangeSize) {
268                                        result = Stream.of(_best);
269                                        _count = 0;
270                                        _best = null;
271                                } else {
272                                        result = Stream.empty();
273                                }
274
275                                return result;
276                        }
277                };
278        }
279
280        /**
281         * Return a new flat-mapper function which returns (emits) the maximal value
282         * of the elements emitted within the given {@code timespan}.
283         *
284         * <pre>{@code
285         *          +---3s---+---3s---+
286         *          |        |        |
287         *     +----9--8--3--3--5--4--2--9----|
288         *        toIntervalMax(3s)
289         *     +----------9--------5----------|
290         * }</pre>
291         *
292         * @see #toIntervalMax(Duration, Clock)
293         *
294         * @param <C> the element type
295         * @param timespan the timespan the elements are collected for the
296         *        calculation slice
297         * @return a new flat-mapper function
298         * @throws IllegalArgumentException if the given size is smaller than one
299         * @throws NullPointerException if the given {@code timespan} is {@code null}
300         */
301        public static <C extends Comparable<? super C>>
302        Function<C, Stream<C>> toIntervalMax(final Duration timespan) {
303                return sliceBest(Streams::max, timespan, systemUTC());
304        }
305
306        /**
307         * Return a new flat-mapper function which returns (emits) the maximal value
308         * of the elements emitted within the given {@code timespan}.
309         *
310         * <pre>{@code
311         *          +---3s---+---3s---+
312         *          |        |        |
313         *     +----9--8--3--3--5--4--2--9----|
314         *        toIntervalMax(3s)
315         *     +----------9--------5----------|
316         * }</pre>
317         *
318         * @see #toIntervalMax(Duration)
319         *
320         * @param <C> the element type
321         * @param timespan the timespan the elements are collected for the
322         *        calculation slice
323         * @param clock the {@code clock} used for measuring the {@code timespan}
324         * @return a new flat-mapper function
325         * @throws IllegalArgumentException if the given size is smaller than one
326         * @throws NullPointerException if one of the arguments is {@code null}
327         */
328        public static <C extends Comparable<? super C>>
329        Function<C, Stream<C>> toIntervalMax(final Duration timespan, final Clock clock) {
330                return sliceBest(Streams::max, timespan, clock);
331        }
332
333        /**
334         * Return a new flat-mapper function which returns (emits) the minimal value
335         * of the elements emitted within the given {@code timespan}.
336         *
337         * <pre>{@code
338         *          +---3s---+---3s---+
339         *          |        |        |
340         *     +----9--8--3--3--1--4--2--9----|
341         *        toIntervalMin(3s)
342         *     +----------3--------1----------|
343         * }</pre>
344         *
345         * @see #toIntervalMin(Duration, Clock)
346         *
347         * @param <C> the element type
348         * @param timespan the timespan the elements are collected for the
349         *        calculation slice
350         * @return a new flat-mapper function
351         * @throws IllegalArgumentException if the given size is smaller than one
352         * @throws NullPointerException if the given {@code timespan} is {@code null}
353         */
354        public static <C extends Comparable<? super C>>
355        Function<C, Stream<C>> toIntervalMin(final Duration timespan) {
356                return sliceBest(Streams::min, timespan, systemUTC());
357        }
358
359        /**
360         * Return a new flat-mapper function which returns (emits) the minimal value
361         * of the elements emitted within the given {@code timespan}.
362         *
363         * <pre>{@code
364         *          +---3s---+---3s---+
365         *          |        |        |
366         *     +----9--8--3--3--1--4--2--9----|
367         *        toIntervalMin(3s)
368         *     +----------3--------1----------|
369         * }</pre>
370         *
371         * @see #toIntervalMin(Duration)
372         *
373         * @param <C> the element type
374         * @param timespan the timespan the elements are collected for the
375         *        calculation slice
376         * @param clock the {@code clock} used for measuring the {@code timespan}
377         * @return a new flat-mapper function
378         * @throws IllegalArgumentException if the given size is smaller than one
379         * @throws NullPointerException if one of the arguments is {@code null}
380         */
381        public static <C extends Comparable<? super C>>
382        Function<C, Stream<C>> toIntervalMin(final Duration timespan, final Clock clock) {
383                return sliceBest(Streams::min, timespan, clock);
384        }
385
386        /**
387         * Return a new flat-mapper function which returns (emits) the minimal value
388         * of the elements emitted within the given {@code timespan}.
389         *
390         * @see #toIntervalMin(Duration)
391         * @see #toIntervalMax(Duration)
392         *
393         * @param <C> the element type
394         * @param comparator the comparator used for testing the elements
395         * @param timespan the timespan the elements are collected for the
396         *        calculation slice
397         * @return a new flat-mapper function
398         * @throws IllegalArgumentException if the given size is smaller than one
399         @throws NullPointerException if one of the arguments is {@code null}
400         */
401        public static <C> Function<C, Stream<C>>
402        toIntervalBest(final Comparator<? super C> comparator, final Duration timespan) {
403                requireNonNull(comparator);
404                return sliceBest((a, b) -> best(comparator, a, b), timespan, systemUTC());
405        }
406
407        /**
408         * Return a new flat-mapper function which returns (emits) the minimal value
409         * of the elements emitted within the given {@code timespan}.
410         *
411         * @param <C> the element type
412         * @param comparator the comparator used for testing the elements
413         * @param timespan the timespan the elements are collected for the
414         *        calculation slice
415         * @param clock the {@code clock} used for measuring the {@code timespan}
416         * @return a new flat-mapper function
417         * @throws IllegalArgumentException if the given size is smaller than one
418         @throws NullPointerException if one of the arguments is {@code null}
419         */
420        public static <C> Function<C, Stream<C>>
421        toIntervalBest(
422                final Comparator<? super C> comparator,
423                final Duration timespan,
424                final Clock clock
425        ) {
426                requireNonNull(comparator);
427                return sliceBest((a, b) -> best(comparator, a, b), timespan, clock);
428        }
429
430        private static <C> Function<C, Stream<C>> sliceBest(
431                final BinaryOperator<C> comp,
432                final Duration timespan,
433                final Clock clock
434        ) {
435                requireNonNull(comp);
436                requireNonNull(timespan);
437
438                return new Function<>() {
439                        private final long _timespan  = timespan.toMillis();
440
441                        private long _start = 0;
442                        private C _best;
443
444                        @Override
445                        public Stream<C> apply(final C value) {
446                                if (_start == 0) {
447                                        _start = clock.millis();
448                                }
449
450                                _best = comp.apply(_best, value);
451                                long end = clock.millis();
452
453                                final Stream<C> result;
454                                if (end - _start >= _timespan) {
455                                        result = Stream.of(_best);
456                                        _start = 0;
457                                        _best = null;
458                                } else {
459                                        result = Stream.empty();
460                                }
461
462                                return result;
463                        }
464                };
465        }
466
467}