001/*
002 * Java Genetic Algorithm Library (jenetics-8.1.0).
003 * Copyright (c) 2007-2024 Franz Wilhelmstötter
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 * Author:
018 *    Franz Wilhelmstötter (franz.wilhelmstoetter@gmail.com)
019 */
020package io.jenetics.util;
021
022import static java.time.Clock.systemUTC;
023import static java.util.Objects.requireNonNull;
024
025import java.time.Clock;
026import java.time.Duration;
027import java.util.Comparator;
028import java.util.function.BinaryOperator;
029import java.util.function.Function;
030import java.util.stream.Stream;
031
032/**
033 * This class contains factory methods for (flat) mapping stream elements. The
034 * functions of this class can be used in the following way.
035 * {@snippet lang="java":
036 * final ISeq<Integer> values = new Random().ints(0, 100).boxed()
037 *     .limit(100)
038 *     .flatMap(Streams.toIntervalMax(13))
039 *     .collect(ISeq.toISeq());
040 * }
041 *
042 * @author <a href="mailto:franz.wilhelmstoetter@gmail.com">Franz Wilhelmstötter</a>
043 * @since 6.0
044 * @version 6.0
045 */
046public final class Streams {
047        private Streams() {}
048
049
050        /**
051         * Return a new flat-mapper function, which guarantees a strictly increasing
052         * stream, from an arbitrarily ordered source stream. Note that this
053         * function doesn't sort the stream. It <em>just</em> skips the <em>out of
054         * order</em> elements.
055         *
056         * <pre>{@code
057         *     +----3--2--5--4--7--7--4--9----|
058         *        toStrictlyIncreasing()
059         *     +----3-----5-----7--------9----|
060         * }</pre>
061         *
062         * {@snippet lang="java":
063         * final ISeq<Integer> values = new Random().ints(0, 100)
064         *     .boxed()
065         *     .limit(100)
066         *     .flatMap(Streams.toStrictlyIncreasing())
067         *     .collect(ISeq.toISeq());
068         *
069         * System.out.println(values);
070         * // [6,47,65,78,96,96,99]
071         * }
072         *
073         *
074         * @param <C> the comparable type
075         * @return a new flat-mapper function
076         */
077        public static <C extends Comparable<? super C>>
078        Function<C, Stream<C>> toStrictlyIncreasing() {
079                return strictlyImproving(Streams::max);
080        }
081
082        /**
083         * Return a new flat-mapper function, which guarantees a strictly decreasing
084         * stream, from an arbitrarily ordered source stream. Note that this
085         * function doesn't sort the stream. It <em>just</em> skips the <em>out of
086         * order</em> elements.
087         *
088         * <pre>{@code
089         *     +----9--8--9--5--6--6--2--9----|
090         *        toStrictlyDecreasing()
091         *     +----9--8-----5--------2-------|
092         * }</pre>
093         *
094         * {@snippet lang="java":
095         * final ISeq<Integer> values = new Random().ints(0, 100)
096         *     .boxed()
097         *     .limit(100)
098         *     .flatMap(Streams.toStrictlyDecreasing())
099         *     .collect(ISeq.toISeq());
100         *
101         * System.out.println(values);
102         * // [45,32,15,12,3,1]
103         * }
104         *
105         * @param <C> the comparable type
106         * @return a new flat-mapper function
107         */
108        public static <C extends Comparable<? super C>>
109        Function<C, Stream<C>> toStrictlyDecreasing() {
110                return strictlyImproving(Streams::min);
111        }
112
113        /**
114         * Return a new flat-mapper function, which guarantees a strictly improving
115         * stream, from an arbitrarily ordered source stream. Note that this
116         * function doesn't sort the stream. It <em>just</em> skips the <em>out of
117         * order</em> elements.
118         * {@snippet lang="java":
119         * final ISeq<Integer> values = new Random().ints(0, 100)
120         *     .boxed()
121         *     .limit(100)
122         *     .flatMap(Streams.toStrictlyImproving(Comparator.naturalOrder()))
123         *     .collect(ISeq.toISeq());
124         *
125         * System.out.println(values);
126         * // [6,47,65,78,96,96,99]
127         * }
128         *
129         * @see #toStrictlyIncreasing()
130         * @see #toStrictlyDecreasing()
131         *
132         * @param <T> the element type
133         * @param comparator the comparator used for testing the elements
134         * @return a new flat-mapper function
135         */
136        public static <T> Function<T, Stream<T>>
137        toStrictlyImproving(final Comparator<? super T> comparator) {
138                return strictlyImproving((a, b) -> best(comparator, a, b));
139        }
140
141        private static <C> Function<C, Stream<C>>
142        strictlyImproving(final BinaryOperator<C> comparator) {
143                requireNonNull(comparator);
144
145                return new Function<>() {
146                        private C _best;
147
148                        @Override
149                        public Stream<C> apply(final C result) {
150                                final C best = comparator.apply(_best, result);
151
152                                final Stream<C> stream = best == _best
153                                        ? Stream.empty()
154                                        : Stream.of(best);
155
156                                _best = best;
157
158                                return stream;
159                        }
160                };
161        }
162
163        private static <T extends Comparable<? super T>> T max(final T a, final T b) {
164                return best(Comparator.naturalOrder(), a, b);
165        }
166
167        private static <T extends Comparable<? super T>> T min(final T a, final T b) {
168                return best(Comparator.reverseOrder(), a, b);
169        }
170
171        private static <T>
172        T best(final Comparator<? super T> comparator, final T a, final T b) {
173                if (a == null && b == null) return null;
174                if (a == null) return b;
175                if (b == null) return a;
176                return comparator.compare(a, b) >= 0 ? a : b;
177        }
178
179        /**
180         * Return a new flat-mapper function which returns (emits) the maximal value
181         * of the last <em>n</em> elements.
182         *
183         * <pre>{@code
184         *          +----3---+----3---+
185         *          |        |        |
186         *     +----9--8--3--3--5--4--2--9----|
187         *        toIntervalMax(3)
188         *     +----------9--------5----------|
189         * }</pre>
190         *
191         * @param size the size of the slice
192         * @param <C> the element type
193         * @return a new flat-mapper function
194         * @throws IllegalArgumentException if the given size is smaller than one
195         */
196        public static <C extends Comparable<? super C>>
197        Function<C, Stream<C>> toIntervalMax(final int size) {
198                return sliceBest(Streams::max, size);
199        }
200
201        /**
202         * Return a new flat-mapper function which returns (emits) the minimal value
203         * of the last <em>n</em> elements.
204         *
205         * <pre>{@code
206         *          +----3---+----3---+
207         *          |        |        |
208         *     +----9--8--3--3--1--4--2--9----|
209         *        toIntervalMin(3)
210         *     +----------3--------1----------|
211         * }</pre>
212         *
213         * @param size the size of the slice
214         * @param <C> the element type
215         * @return a new flat-mapper function
216         * @throws IllegalArgumentException if the given size is smaller than one
217         */
218        public static <C extends Comparable<? super C>>
219        Function<C, Stream<C>> toIntervalMin(final int size) {
220                return sliceBest(Streams::min, size);
221        }
222
223        /**
224         * Return a new flat-mapper function which returns (emits) the minimal value
225         * of the last <em>n</em> elements.
226         *
227         * @see #toIntervalMax(int)
228         * @see #toIntervalMin(int)
229         *
230         * @param <C> the element type
231         * @param size the size of the slice
232         * @param comparator the comparator used for testing the elements
233         * @return a new flat-mapper function
234         * @throws IllegalArgumentException if the given size is smaller than one
235         * @throws NullPointerException if the given {@code comparator} is
236         *         {@code null}
237         */
238        public static <C> Function<C, Stream<C>>
239        toIntervalBest(final Comparator<? super C> comparator, final int size) {
240                requireNonNull(comparator);
241                return sliceBest((a, b) -> best(comparator, a, b), size);
242        }
243
244        private static <C> Function<C, Stream<C>> sliceBest(
245                final BinaryOperator<C> comp,
246                final int rangeSize
247        ) {
248                requireNonNull(comp);
249                if (rangeSize < 1) {
250                        throw new IllegalArgumentException(
251                                "Range size must be at least one: " + rangeSize
252                        );
253                }
254
255                return new Function<>() {
256                        private int _count = 0;
257                        private C _best;
258
259                        @Override
260                        public Stream<C> apply(final C value) {
261                                ++_count;
262                                _best = comp.apply(_best, value);
263
264                                final Stream<C> result;
265                                if (_count >= rangeSize) {
266                                        result = Stream.of(_best);
267                                        _count = 0;
268                                        _best = null;
269                                } else {
270                                        result = Stream.empty();
271                                }
272
273                                return result;
274                        }
275                };
276        }
277
278        /**
279         * Return a new flat-mapper function which returns (emits) the maximal value
280         * of the elements emitted within the given {@code timespan}.
281         *
282         * <pre>{@code
283         *          +---3s---+---3s---+
284         *          |        |        |
285         *     +----9--8--3--3--5--4--2--9----|
286         *        toIntervalMax(3s)
287         *     +----------9--------5----------|
288         * }</pre>
289         *
290         * @see #toIntervalMax(Duration, Clock)
291         *
292         * @param <C> the element type
293         * @param timespan the timespan the elements are collected for the
294         *        calculation slice
295         * @return a new flat-mapper function
296         * @throws IllegalArgumentException if the given size is smaller than one
297         * @throws NullPointerException if the given {@code timespan} is {@code null}
298         */
299        public static <C extends Comparable<? super C>>
300        Function<C, Stream<C>> toIntervalMax(final Duration timespan) {
301                return sliceBest(Streams::max, timespan, systemUTC());
302        }
303
304        /**
305         * Return a new flat-mapper function which returns (emits) the maximal value
306         * of the elements emitted within the given {@code timespan}.
307         *
308         * <pre>{@code
309         *          +---3s---+---3s---+
310         *          |        |        |
311         *     +----9--8--3--3--5--4--2--9----|
312         *        toIntervalMax(3s)
313         *     +----------9--------5----------|
314         * }</pre>
315         *
316         * @see #toIntervalMax(Duration)
317         *
318         * @param <C> the element type
319         * @param timespan the timespan the elements are collected for the
320         *        calculation slice
321         * @param clock the {@code clock} used for measuring the {@code timespan}
322         * @return a new flat-mapper function
323         * @throws IllegalArgumentException if the given size is smaller than one
324         * @throws NullPointerException if one of the arguments is {@code null}
325         */
326        public static <C extends Comparable<? super C>>
327        Function<C, Stream<C>> toIntervalMax(final Duration timespan, final Clock clock) {
328                return sliceBest(Streams::max, timespan, clock);
329        }
330
331        /**
332         * Return a new flat-mapper function which returns (emits) the minimal value
333         * of the elements emitted within the given {@code timespan}.
334         *
335         * <pre>{@code
336         *          +---3s---+---3s---+
337         *          |        |        |
338         *     +----9--8--3--3--1--4--2--9----|
339         *        toIntervalMin(3s)
340         *     +----------3--------1----------|
341         * }</pre>
342         *
343         * @see #toIntervalMin(Duration, Clock)
344         *
345         * @param <C> the element type
346         * @param timespan the timespan the elements are collected for the
347         *        calculation slice
348         * @return a new flat-mapper function
349         * @throws IllegalArgumentException if the given size is smaller than one
350         * @throws NullPointerException if the given {@code timespan} is {@code null}
351         */
352        public static <C extends Comparable<? super C>>
353        Function<C, Stream<C>> toIntervalMin(final Duration timespan) {
354                return sliceBest(Streams::min, timespan, systemUTC());
355        }
356
357        /**
358         * Return a new flat-mapper function which returns (emits) the minimal value
359         * of the elements emitted within the given {@code timespan}.
360         *
361         * <pre>{@code
362         *          +---3s---+---3s---+
363         *          |        |        |
364         *     +----9--8--3--3--1--4--2--9----|
365         *        toIntervalMin(3s)
366         *     +----------3--------1----------|
367         * }</pre>
368         *
369         * @see #toIntervalMin(Duration)
370         *
371         * @param <C> the element type
372         * @param timespan the timespan the elements are collected for the
373         *        calculation slice
374         * @param clock the {@code clock} used for measuring the {@code timespan}
375         * @return a new flat-mapper function
376         * @throws IllegalArgumentException if the given size is smaller than one
377         * @throws NullPointerException if one of the arguments is {@code null}
378         */
379        public static <C extends Comparable<? super C>>
380        Function<C, Stream<C>> toIntervalMin(final Duration timespan, final Clock clock) {
381                return sliceBest(Streams::min, timespan, clock);
382        }
383
384        /**
385         * Return a new flat-mapper function which returns (emits) the minimal value
386         * of the elements emitted within the given {@code timespan}.
387         *
388         * @see #toIntervalMin(Duration)
389         * @see #toIntervalMax(Duration)
390         *
391         * @param <C> the element type
392         * @param comparator the comparator used for testing the elements
393         * @param timespan the timespan the elements are collected for the
394         *        calculation slice
395         * @return a new flat-mapper function
396         * @throws IllegalArgumentException if the given size is smaller than one
397         @throws NullPointerException if one of the arguments is {@code null}
398         */
399        public static <C> Function<C, Stream<C>>
400        toIntervalBest(final Comparator<? super C> comparator, final Duration timespan) {
401                requireNonNull(comparator);
402                return sliceBest((a, b) -> best(comparator, a, b), timespan, systemUTC());
403        }
404
405        /**
406         * Return a new flat-mapper function which returns (emits) the minimal value
407         * of the elements emitted within the given {@code timespan}.
408         *
409         * @param <C> the element type
410         * @param comparator the comparator used for testing the elements
411         * @param timespan the timespan the elements are collected for the
412         *        calculation slice
413         * @param clock the {@code clock} used for measuring the {@code timespan}
414         * @return a new flat-mapper function
415         * @throws IllegalArgumentException if the given size is smaller than one
416         @throws NullPointerException if one of the arguments is {@code null}
417         */
418        public static <C> Function<C, Stream<C>>
419        toIntervalBest(
420                final Comparator<? super C> comparator,
421                final Duration timespan,
422                final Clock clock
423        ) {
424                requireNonNull(comparator);
425                return sliceBest((a, b) -> best(comparator, a, b), timespan, clock);
426        }
427
428        private static <C> Function<C, Stream<C>> sliceBest(
429                final BinaryOperator<C> comp,
430                final Duration timespan,
431                final Clock clock
432        ) {
433                requireNonNull(comp);
434                requireNonNull(timespan);
435
436                return new Function<>() {
437                        private final long _timespan  = timespan.toMillis();
438
439                        private long _start = 0;
440                        private C _best;
441
442                        @Override
443                        public Stream<C> apply(final C value) {
444                                if (_start == 0) {
445                                        _start = clock.millis();
446                                }
447
448                                _best = comp.apply(_best, value);
449                                long end = clock.millis();
450
451                                final Stream<C> result;
452                                if (end - _start >= _timespan) {
453                                        result = Stream.of(_best);
454                                        _start = 0;
455                                        _best = null;
456                                } else {
457                                        result = Stream.empty();
458                                }
459
460                                return result;
461                        }
462                };
463        }
464
465}