001/*
002 * Java Genetic Algorithm Library (jenetics-9.0.0).
003 * Copyright (c) 2007-2026 Franz Wilhelmstötter
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 * Author:
018 *    Franz Wilhelmstötter (franz.wilhelmstoetter@gmail.com)
019 */
020package io.jenetics.util;
021
022import static java.time.Clock.systemUTC;
023import static java.util.Objects.requireNonNull;
024
025import java.time.Clock;
026import java.time.Duration;
027import java.util.Comparator;
028import java.util.function.BinaryOperator;
029import java.util.function.Function;
030import java.util.stream.Stream;
031
032/**
033 * This class contains factory methods for (flat) mapping stream element. The
034 * functions of this class can be used in the following way.
035 * {@snippet lang="java":
036 * final ISeq<Integer> values = new Random().ints(0, 100).boxed()
037 *     .limit(100)
038 *     .flatMap(Streams.toIntervalMax(13))
039 *     .collect(ISeq.toISeq());
040 * }
041 *
042 * @author <a href="mailto:franz.wilhelmstoetter@gmail.com">Franz Wilhelmstötter</a>
043 * @since 6.0
044 * @version 6.0
045 */
046public final class Streams {
047        private Streams() {
048        }
049
050
051        /**
052         * Return a new flat-mapper function, which guarantees a strictly increasing
053         * stream, from an arbitrarily ordered source stream. Note that this
054         * function doesn't sort the stream. It <em>just</em> skips the <em>out of
055         * order</em> elements.
056         *
057         * <pre>{@code
058         *     +----3--2--5--4--7--7--4--9----|
059         *        toStrictlyIncreasing()
060         *     +----3-----5-----7--------9----|
061         * }</pre>
062         *
063         * {@snippet lang="java":
064         * final ISeq<Integer> values = new Random().ints(0, 100)
065         *     .boxed()
066         *     .limit(100)
067         *     .flatMap(Streams.toStrictlyIncreasing())
068         *     .collect(ISeq.toISeq());
069         *
070         * System.out.println(values);
071         * // [6,47,65,78,96,96,99]
072         * }
073         *
074         *
075         * @param <C> the comparable type
076         * @return a new flat-mapper function
077         */
078        public static <C extends Comparable<? super C>>
079        Function<C, Stream<C>> toStrictlyIncreasing() {
080                return strictlyImproving(Streams::max);
081        }
082
083        /**
084         * Return a new flat-mapper function, which guarantees a strictly decreasing
085         * stream, from an arbitrarily ordered source stream. Note that this
086         * function doesn't sort the stream. It <em>just</em> skips the <em>out of
087         * order</em> elements.
088         *
089         * <pre>{@code
090         *     +----9--8--9--5--6--6--2--9----|
091         *        toStrictlyDecreasing()
092         *     +----9--8-----5--------2-------|
093         * }</pre>
094         *
095         * {@snippet lang="java":
096         * final ISeq<Integer> values = new Random().ints(0, 100)
097         *     .boxed()
098         *     .limit(100)
099         *     .flatMap(Streams.toStrictlyDecreasing())
100         *     .collect(ISeq.toISeq());
101         *
102         * System.out.println(values);
103         * // [45,32,15,12,3,1]
104         * }
105         *
106         * @param <C> the comparable type
107         * @return a new flat-mapper function
108         */
109        public static <C extends Comparable<? super C>>
110        Function<C, Stream<C>> toStrictlyDecreasing() {
111                return strictlyImproving(Streams::min);
112        }
113
114        /**
115         * Return a new flat-mapper function, which guarantees a strictly improving
116         * stream, from an arbitrarily ordered source stream. Note that this
117         * function doesn't sort the stream. It <em>just</em> skips the <em>out of
118         * order</em> elements.
119         * {@snippet lang="java":
120         * final ISeq<Integer> values = new Random().ints(0, 100)
121         *     .boxed()
122         *     .limit(100)
123         *     .flatMap(Streams.toStrictlyImproving(Comparator.naturalOrder()))
124         *     .collect(ISeq.toISeq());
125         *
126         * System.out.println(values);
127         * // [6,47,65,78,96,96,99]
128         * }
129         *
130         * @see #toStrictlyIncreasing()
131         * @see #toStrictlyDecreasing()
132         *
133         * @param <T> the element type
134         * @param comparator the comparator used for testing the elements
135         * @return a new flat-mapper function
136         */
137        public static <T> Function<T, Stream<T>>
138        toStrictlyImproving(final Comparator<? super T> comparator) {
139                return strictlyImproving((a, b) -> best(comparator, a, b));
140        }
141
142        private static <C> Function<C, Stream<C>>
143        strictlyImproving(final BinaryOperator<C> comparator) {
144                requireNonNull(comparator);
145
146                return new Function<>() {
147                        private C _best;
148
149                        @Override
150                        public Stream<C> apply(final C result) {
151                                final C best = comparator.apply(_best, result);
152
153                                final Stream<C> stream = best == _best
154                                        ? Stream.empty()
155                                        : Stream.of(best);
156
157                                _best = best;
158
159                                return stream;
160                        }
161                };
162        }
163
164        private static <T extends Comparable<? super T>> T max(final T a, final T b) {
165                return best(Comparator.naturalOrder(), a, b);
166        }
167
168        private static <T extends Comparable<? super T>> T min(final T a, final T b) {
169                return best(Comparator.reverseOrder(), a, b);
170        }
171
172        private static <T>
173        T best(final Comparator<? super T> comparator, final T a, final T b) {
174                if (a == null && b == null) return null;
175                if (a == null) return b;
176                if (b == null) return a;
177                return comparator.compare(a, b) >= 0 ? a : b;
178        }
179
180        /**
181         * Return a new flat-mapper function which returns (emits) the maximal value
182         * of the last <em>n</em> elements.
183         *
184         * <pre>{@code
185         *          +----3---+----3---+
186         *          |        |        |
187         *     +----9--8--3--3--5--4--2--9----|
188         *        toIntervalMax(3)
189         *     +----------9--------5----------|
190         * }</pre>
191         *
192         * @param size the size of the slice
193         * @param <C> the element type
194         * @return a new flat-mapper function
195         * @throws IllegalArgumentException if the given size is smaller than one
196         */
197        public static <C extends Comparable<? super C>>
198        Function<C, Stream<C>> toIntervalMax(final int size) {
199                return sliceBest(Streams::max, size);
200        }
201
202        /**
203         * Return a new flat-mapper function which returns (emits) the minimal value
204         * of the last <em>n</em> elements.
205         *
206         * <pre>{@code
207         *          +----3---+----3---+
208         *          |        |        |
209         *     +----9--8--3--3--1--4--2--9----|
210         *        toIntervalMin(3)
211         *     +----------3--------1----------|
212         * }</pre>
213         *
214         * @param size the size of the slice
215         * @param <C> the element type
216         * @return a new flat-mapper function
217         * @throws IllegalArgumentException if the given size is smaller than one
218         */
219        public static <C extends Comparable<? super C>>
220        Function<C, Stream<C>> toIntervalMin(final int size) {
221                return sliceBest(Streams::min, size);
222        }
223
224        /**
225         * Return a new flat-mapper function which returns (emits) the minimal value
226         * of the last <em>n</em> elements.
227         *
228         * @see #toIntervalMax(int)
229         * @see #toIntervalMin(int)
230         *
231         * @param <C> the element type
232         * @param size the size of the slice
233         * @param comparator the comparator used for testing the elements
234         * @return a new flat-mapper function
235         * @throws IllegalArgumentException if the given size is smaller than one
236         * @throws NullPointerException if the given {@code comparator} is
237         *         {@code null}
238         */
239        public static <C> Function<C, Stream<C>>
240        toIntervalBest(final Comparator<? super C> comparator, final int size) {
241                requireNonNull(comparator);
242                return sliceBest((a, b) -> best(comparator, a, b), size);
243        }
244
245        private static <C> Function<C, Stream<C>> sliceBest(
246                final BinaryOperator<C> comp,
247                final int rangeSize
248        ) {
249                requireNonNull(comp);
250                if (rangeSize < 1) {
251                        throw new IllegalArgumentException(
252                                "Range size must be at least one: " + rangeSize
253                        );
254                }
255
256                return new Function<>() {
257                        private int _count = 0;
258                        private C _best;
259
260                        @Override
261                        public Stream<C> apply(final C value) {
262                                ++_count;
263                                _best = comp.apply(_best, value);
264
265                                final Stream<C> result;
266                                if (_count >= rangeSize) {
267                                        result = Stream.of(_best);
268                                        _count = 0;
269                                        _best = null;
270                                } else {
271                                        result = Stream.empty();
272                                }
273
274                                return result;
275                        }
276                };
277        }
278
279        /**
280         * Return a new flat-mapper function which returns (emits) the maximal value
281         * of the elements emitted within the given {@code timespan}.
282         *
283         * <pre>{@code
284         *          +---3s---+---3s---+
285         *          |        |        |
286         *     +----9--8--3--3--5--4--2--9----|
287         *        toIntervalMax(3s)
288         *     +----------9--------5----------|
289         * }</pre>
290         *
291         * @see #toIntervalMax(Duration, Clock)
292         *
293         * @param <C> the element type
294         * @param timespan the timespan the elements are collected for the
295         *        calculation slice
296         * @return a new flat-mapper function
297         * @throws IllegalArgumentException if the given size is smaller than one
298         * @throws NullPointerException if the given {@code timespan} is {@code null}
299         */
300        public static <C extends Comparable<? super C>>
301        Function<C, Stream<C>> toIntervalMax(final Duration timespan) {
302                return sliceBest(Streams::max, timespan, systemUTC());
303        }
304
305        /**
306         * Return a new flat-mapper function which returns (emits) the maximal value
307         * of the elements emitted within the given {@code timespan}.
308         *
309         * <pre>{@code
310         *          +---3s---+---3s---+
311         *          |        |        |
312         *     +----9--8--3--3--5--4--2--9----|
313         *        toIntervalMax(3s)
314         *     +----------9--------5----------|
315         * }</pre>
316         *
317         * @see #toIntervalMax(Duration)
318         *
319         * @param <C> the element type
320         * @param timespan the timespan the elements are collected for the
321         *        calculation slice
322         * @param clock the {@code clock} used for measuring the {@code timespan}
323         * @return a new flat-mapper function
324         * @throws IllegalArgumentException if the given size is smaller than one
325         * @throws NullPointerException if one of the arguments is {@code null}
326         */
327        public static <C extends Comparable<? super C>>
328        Function<C, Stream<C>> toIntervalMax(final Duration timespan, final Clock clock) {
329                return sliceBest(Streams::max, timespan, clock);
330        }
331
332        /**
333         * Return a new flat-mapper function which returns (emits) the minimal value
334         * of the elements emitted within the given {@code timespan}.
335         *
336         * <pre>{@code
337         *          +---3s---+---3s---+
338         *          |        |        |
339         *     +----9--8--3--3--1--4--2--9----|
340         *        toIntervalMin(3s)
341         *     +----------3--------1----------|
342         * }</pre>
343         *
344         * @see #toIntervalMin(Duration, Clock)
345         *
346         * @param <C> the element type
347         * @param timespan the timespan the elements are collected for the
348         *        calculation slice
349         * @return a new flat-mapper function
350         * @throws IllegalArgumentException if the given size is smaller than one
351         * @throws NullPointerException if the given {@code timespan} is {@code null}
352         */
353        public static <C extends Comparable<? super C>>
354        Function<C, Stream<C>> toIntervalMin(final Duration timespan) {
355                return sliceBest(Streams::min, timespan, systemUTC());
356        }
357
358        /**
359         * Return a new flat-mapper function which returns (emits) the minimal value
360         * of the elements emitted within the given {@code timespan}.
361         *
362         * <pre>{@code
363         *          +---3s---+---3s---+
364         *          |        |        |
365         *     +----9--8--3--3--1--4--2--9----|
366         *        toIntervalMin(3s)
367         *     +----------3--------1----------|
368         * }</pre>
369         *
370         * @see #toIntervalMin(Duration)
371         *
372         * @param <C> the element type
373         * @param timespan the timespan the elements are collected for the
374         *        calculation slice
375         * @param clock the {@code clock} used for measuring the {@code timespan}
376         * @return a new flat-mapper function
377         * @throws IllegalArgumentException if the given size is smaller than one
378         * @throws NullPointerException if one of the arguments is {@code null}
379         */
380        public static <C extends Comparable<? super C>>
381        Function<C, Stream<C>> toIntervalMin(final Duration timespan, final Clock clock) {
382                return sliceBest(Streams::min, timespan, clock);
383        }
384
385        /**
386         * Return a new flat-mapper function which returns (emits) the minimal value
387         * of the elements emitted within the given {@code timespan}.
388         *
389         * @see #toIntervalMin(Duration)
390         * @see #toIntervalMax(Duration)
391         *
392         * @param <C> the element type
393         * @param comparator the comparator used for testing the elements
394         * @param timespan the timespan the elements are collected for the
395         *        calculation slice
396         * @return a new flat-mapper function
397         * @throws IllegalArgumentException if the given size is smaller than one
398         @throws NullPointerException if one of the arguments is {@code null}
399         */
400        public static <C> Function<C, Stream<C>>
401        toIntervalBest(final Comparator<? super C> comparator, final Duration timespan) {
402                requireNonNull(comparator);
403                return sliceBest((a, b) -> best(comparator, a, b), timespan, systemUTC());
404        }
405
406        /**
407         * Return a new flat-mapper function which returns (emits) the minimal value
408         * of the elements emitted within the given {@code timespan}.
409         *
410         * @param <C> the element type
411         * @param comparator the comparator used for testing the elements
412         * @param timespan the timespan the elements are collected for the
413         *        calculation slice
414         * @param clock the {@code clock} used for measuring the {@code timespan}
415         * @return a new flat-mapper function
416         * @throws IllegalArgumentException if the given size is smaller than one
417         @throws NullPointerException if one of the arguments is {@code null}
418         */
419        public static <C> Function<C, Stream<C>>
420        toIntervalBest(
421                final Comparator<? super C> comparator,
422                final Duration timespan,
423                final Clock clock
424        ) {
425                requireNonNull(comparator);
426                return sliceBest((a, b) -> best(comparator, a, b), timespan, clock);
427        }
428
429        private static <C> Function<C, Stream<C>> sliceBest(
430                final BinaryOperator<C> comp,
431                final Duration timespan,
432                final Clock clock
433        ) {
434                requireNonNull(comp);
435                requireNonNull(timespan);
436
437                return new Function<>() {
438                        private final long _timespan  = timespan.toMillis();
439
440                        private long _start = 0;
441                        private C _best;
442
443                        @Override
444                        public Stream<C> apply(final C value) {
445                                if (_start == 0) {
446                                        _start = clock.millis();
447                                }
448
449                                _best = comp.apply(_best, value);
450                                long end = clock.millis();
451
452                                final Stream<C> result;
453                                if (end - _start >= _timespan) {
454                                        result = Stream.of(_best);
455                                        _start = 0;
456                                        _best = null;
457                                } else {
458                                        result = Stream.empty();
459                                }
460
461                                return result;
462                        }
463                };
464        }
465
466}