001 /*
002 * Java Genetic Algorithm Library (jenetics-6.3.0).
003 * Copyright (c) 2007-2021 Franz Wilhelmstötter
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 * Author:
018 * Franz Wilhelmstötter (franz.wilhelmstoetter@gmail.com)
019 */
020 package io.jenetics.util;
021
022 import static java.time.Clock.systemUTC;
023 import static java.util.Objects.requireNonNull;
024
025 import java.time.Clock;
026 import java.time.Duration;
027 import java.util.Comparator;
028 import java.util.function.BinaryOperator;
029 import java.util.function.Function;
030 import java.util.stream.Stream;
031
032 /**
033 * This class contains factory methods for (flat) mapping stream elements. The
034 * functions of this class can be used in the following way.
035 *
036 * <pre>{@code
037 * final ISeq<Integer> values = new Random().ints(0, 100).boxed()
038 * .limit(100)
039 * .flatMap(Streams.toIntervalMax(13))
040 * .collect(ISeq.toISeq());
041 * }</pre>
042 *
043 * @author <a href="mailto:franz.wilhelmstoetter@gmail.com">Franz Wilhelmstötter</a>
044 * @since 6.0
045 * @version 6.0
046 */
047 public final class Streams {
048 private Streams() {}
049
050
051 /**
052 * Return a new flat-mapper function, which guarantees a strictly increasing
053 * stream, from an arbitrarily ordered source stream. Note that this
054 * function doesn't sort the stream. It <em>just</em> skips the <em>out of
055 * order</em> elements.
056 *
057 * <pre>{@code
058 * +----3--2--5--4--7--7--4--9----|
059 * toStrictlyIncreasing()
060 * +----3-----5-----7--------9----|
061 * }</pre>
062 *
063 * <pre>{@code
064 * final ISeq<Integer> values = new Random().ints(0, 100)
065 * .boxed()
066 * .limit(100)
067 * .flatMap(Streams.toStrictlyIncreasing())
068 * .collect(ISeq.toISeq());
069 *
070 * System.out.println(values);
071 * // [6,47,65,78,96,96,99]
072 * }</pre>
073 *
074 *
075 * @param <C> the comparable type
076 * @return a new flat-mapper function
077 */
078 public static <C extends Comparable<? super C>>
079 Function<C, Stream<C>> toStrictlyIncreasing() {
080 return strictlyImproving(Streams::max);
081 }
082
083 /**
084 * Return a new flat-mapper function, which guarantees a strictly decreasing
085 * stream, from an arbitrarily ordered source stream. Note that this
086 * function doesn't sort the stream. It <em>just</em> skips the <em>out of
087 * order</em> elements.
088 *
089 * <pre>{@code
090 * +----9--8--9--5--6--6--2--9----|
091 * toStrictlyDecreasing()
092 * +----9--8-----5--------2-------|
093 * }</pre>
094 *
095 * <pre>{@code
096 * final ISeq<Integer> values = new Random().ints(0, 100)
097 * .boxed()
098 * .limit(100)
099 * .flatMap(Streams.toStrictlyDecreasing())
100 * .collect(ISeq.toISeq());
101 *
102 * System.out.println(values);
103 * // [45,32,15,12,3,1]
104 * }</pre>
105 *
106 * @param <C> the comparable type
107 * @return a new flat-mapper function
108 */
109 public static <C extends Comparable<? super C>>
110 Function<C, Stream<C>> toStrictlyDecreasing() {
111 return strictlyImproving(Streams::min);
112 }
113
114 /**
115 * Return a new flat-mapper function, which guarantees a strictly improving
116 * stream, from an arbitrarily ordered source stream. Note that this
117 * function doesn't sort the stream. It <em>just</em> skips the <em>out of
118 * order</em> elements.
119 *
120 * <pre>{@code
121 * final ISeq<Integer> values = new Random().ints(0, 100)
122 * .boxed()
123 * .limit(100)
124 * .flatMap(Streams.toStrictlyImproving(Comparator.naturalOrder()))
125 * .collect(ISeq.toISeq());
126 *
127 * System.out.println(values);
128 * // [6,47,65,78,96,96,99]
129 * }</pre>
130 *
131 * @see #toStrictlyIncreasing()
132 * @see #toStrictlyDecreasing()
133 *
134 * @param <T> the element type
135 * @param comparator the comparator used for testing the elements
136 * @return a new flat-mapper function
137 */
138 public static <T> Function<T, Stream<T>>
139 toStrictlyImproving(final Comparator<? super T> comparator) {
140 return strictlyImproving((a, b) -> best(comparator, a, b));
141 }
142
143 private static <C> Function<C, Stream<C>>
144 strictlyImproving(final BinaryOperator<C> comparator) {
145 requireNonNull(comparator);
146
147 return new Function<>() {
148 private C _best;
149
150 @Override
151 public Stream<C> apply(final C result) {
152 final C best = comparator.apply(_best, result);
153
154 final Stream<C> stream = best == _best
155 ? Stream.empty()
156 : Stream.of(best);
157
158 _best = best;
159
160 return stream;
161 }
162 };
163 }
164
165 private static <T extends Comparable<? super T>> T max(final T a, final T b) {
166 return best(Comparator.naturalOrder(), a, b);
167 }
168
169 private static <T extends Comparable<? super T>> T min(final T a, final T b) {
170 return best(Comparator.reverseOrder(), a, b);
171 }
172
173 private static <T>
174 T best(final Comparator<? super T> comparator, final T a, final T b) {
175 if (a == null && b == null) return null;
176 if (a == null) return b;
177 if (b == null) return a;
178 return comparator.compare(a, b) >= 0 ? a : b;
179 }
180
181 /**
182 * Return a new flat-mapper function which returns (emits) the maximal value
183 * of the last <em>n</em> elements.
184 *
185 * <pre>{@code
186 * +----3---+----3---+
187 * | | |
188 * +----9--8--3--3--5--4--2--9----|
189 * toIntervalMax(3)
190 * +----------9--------5----------|
191 * }</pre>
192 *
193 * @param size the size of the slice
194 * @param <C> the element type
195 * @return a new flat-mapper function
196 * @throws IllegalArgumentException if the given size is smaller than one
197 */
198 public static <C extends Comparable<? super C>>
199 Function<C, Stream<C>> toIntervalMax(final int size) {
200 return sliceBest(Streams::max, size);
201 }
202
203 /**
204 * Return a new flat-mapper function which returns (emits) the minimal value
205 * of the last <em>n</em> elements.
206 *
207 * <pre>{@code
208 * +----3---+----3---+
209 * | | |
210 * +----9--8--3--3--1--4--2--9----|
211 * toIntervalMin(3)
212 * +----------3--------1----------|
213 * }</pre>
214 *
215 * @param size the size of the slice
216 * @param <C> the element type
217 * @return a new flat-mapper function
218 * @throws IllegalArgumentException if the given size is smaller than one
219 */
220 public static <C extends Comparable<? super C>>
221 Function<C, Stream<C>> toIntervalMin(final int size) {
222 return sliceBest(Streams::min, size);
223 }
224
225 /**
226 * Return a new flat-mapper function which returns (emits) the minimal value
227 * of the last <em>n</em> elements.
228 *
229 * @see #toIntervalMax(int)
230 * @see #toIntervalMin(int)
231 *
232 * @param <C> the element type
233 * @param size the size of the slice
234 * @param comparator the comparator used for testing the elements
235 * @return a new flat-mapper function
236 * @throws IllegalArgumentException if the given size is smaller than one
237 * @throws NullPointerException if the given {@code comparator} is
238 * {@code null}
239 */
240 public static <C> Function<C, Stream<C>>
241 toIntervalBest(final Comparator<? super C> comparator, final int size) {
242 requireNonNull(comparator);
243 return sliceBest((a, b) -> best(comparator, a, b), size);
244 }
245
246 private static <C> Function<C, Stream<C>> sliceBest(
247 final BinaryOperator<C> comp,
248 final int rangeSize
249 ) {
250 requireNonNull(comp);
251 if (rangeSize < 1) {
252 throw new IllegalArgumentException(
253 "Range size must be at least one: " + rangeSize
254 );
255 }
256
257 return new Function<>() {
258 private int _count = 0;
259 private C _best;
260
261 @Override
262 public Stream<C> apply(final C value) {
263 ++_count;
264 _best = comp.apply(_best, value);
265
266 final Stream<C> result;
267 if (_count >= rangeSize) {
268 result = Stream.of(_best);
269 _count = 0;
270 _best = null;
271 } else {
272 result = Stream.empty();
273 }
274
275 return result;
276 }
277 };
278 }
279
280 /**
281 * Return a new flat-mapper function which returns (emits) the maximal value
282 * of the elements emitted within the given {@code timespan}.
283 *
284 * <pre>{@code
285 * +---3s---+---3s---+
286 * | | |
287 * +----9--8--3--3--5--4--2--9----|
288 * toIntervalMax(3s)
289 * +----------9--------5----------|
290 * }</pre>
291 *
292 * @see #toIntervalMax(Duration, Clock)
293 *
294 * @param <C> the element type
295 * @param timespan the timespan the elements are collected for the
296 * calculation slice
297 * @return a new flat-mapper function
298 * @throws IllegalArgumentException if the given size is smaller than one
299 * @throws NullPointerException if the given {@code timespan} is {@code null}
300 */
301 public static <C extends Comparable<? super C>>
302 Function<C, Stream<C>> toIntervalMax(final Duration timespan) {
303 return sliceBest(Streams::max, timespan, systemUTC());
304 }
305
306 /**
307 * Return a new flat-mapper function which returns (emits) the maximal value
308 * of the elements emitted within the given {@code timespan}.
309 *
310 * <pre>{@code
311 * +---3s---+---3s---+
312 * | | |
313 * +----9--8--3--3--5--4--2--9----|
314 * toIntervalMax(3s)
315 * +----------9--------5----------|
316 * }</pre>
317 *
318 * @see #toIntervalMax(Duration)
319 *
320 * @param <C> the element type
321 * @param timespan the timespan the elements are collected for the
322 * calculation slice
323 * @param clock the {@code clock} used for measuring the {@code timespan}
324 * @return a new flat-mapper function
325 * @throws IllegalArgumentException if the given size is smaller than one
326 * @throws NullPointerException if one of the arguments is {@code null}
327 */
328 public static <C extends Comparable<? super C>>
329 Function<C, Stream<C>> toIntervalMax(final Duration timespan, final Clock clock) {
330 return sliceBest(Streams::max, timespan, clock);
331 }
332
333 /**
334 * Return a new flat-mapper function which returns (emits) the minimal value
335 * of the elements emitted within the given {@code timespan}.
336 *
337 * <pre>{@code
338 * +---3s---+---3s---+
339 * | | |
340 * +----9--8--3--3--1--4--2--9----|
341 * toIntervalMin(3s)
342 * +----------3--------1----------|
343 * }</pre>
344 *
345 * @see #toIntervalMin(Duration, Clock)
346 *
347 * @param <C> the element type
348 * @param timespan the timespan the elements are collected for the
349 * calculation slice
350 * @return a new flat-mapper function
351 * @throws IllegalArgumentException if the given size is smaller than one
352 * @throws NullPointerException if the given {@code timespan} is {@code null}
353 */
354 public static <C extends Comparable<? super C>>
355 Function<C, Stream<C>> toIntervalMin(final Duration timespan) {
356 return sliceBest(Streams::min, timespan, systemUTC());
357 }
358
359 /**
360 * Return a new flat-mapper function which returns (emits) the minimal value
361 * of the elements emitted within the given {@code timespan}.
362 *
363 * <pre>{@code
364 * +---3s---+---3s---+
365 * | | |
366 * +----9--8--3--3--1--4--2--9----|
367 * toIntervalMin(3s)
368 * +----------3--------1----------|
369 * }</pre>
370 *
371 * @see #toIntervalMin(Duration)
372 *
373 * @param <C> the element type
374 * @param timespan the timespan the elements are collected for the
375 * calculation slice
376 * @param clock the {@code clock} used for measuring the {@code timespan}
377 * @return a new flat-mapper function
378 * @throws IllegalArgumentException if the given size is smaller than one
379 * @throws NullPointerException if one of the arguments is {@code null}
380 */
381 public static <C extends Comparable<? super C>>
382 Function<C, Stream<C>> toIntervalMin(final Duration timespan, final Clock clock) {
383 return sliceBest(Streams::min, timespan, clock);
384 }
385
386 /**
387 * Return a new flat-mapper function which returns (emits) the minimal value
388 * of the elements emitted within the given {@code timespan}.
389 *
390 * @see #toIntervalMin(Duration)
391 * @see #toIntervalMax(Duration)
392 *
393 * @param <C> the element type
394 * @param comparator the comparator used for testing the elements
395 * @param timespan the timespan the elements are collected for the
396 * calculation slice
397 * @return a new flat-mapper function
398 * @throws IllegalArgumentException if the given size is smaller than one
399 @throws NullPointerException if one of the arguments is {@code null}
400 */
401 public static <C> Function<C, Stream<C>>
402 toIntervalBest(final Comparator<? super C> comparator, final Duration timespan) {
403 requireNonNull(comparator);
404 return sliceBest((a, b) -> best(comparator, a, b), timespan, systemUTC());
405 }
406
407 /**
408 * Return a new flat-mapper function which returns (emits) the minimal value
409 * of the elements emitted within the given {@code timespan}.
410 *
411 * @param <C> the element type
412 * @param comparator the comparator used for testing the elements
413 * @param timespan the timespan the elements are collected for the
414 * calculation slice
415 * @param clock the {@code clock} used for measuring the {@code timespan}
416 * @return a new flat-mapper function
417 * @throws IllegalArgumentException if the given size is smaller than one
418 @throws NullPointerException if one of the arguments is {@code null}
419 */
420 public static <C> Function<C, Stream<C>>
421 toIntervalBest(
422 final Comparator<? super C> comparator,
423 final Duration timespan,
424 final Clock clock
425 ) {
426 requireNonNull(comparator);
427 return sliceBest((a, b) -> best(comparator, a, b), timespan, clock);
428 }
429
430 private static <C> Function<C, Stream<C>> sliceBest(
431 final BinaryOperator<C> comp,
432 final Duration timespan,
433 final Clock clock
434 ) {
435 requireNonNull(comp);
436 requireNonNull(timespan);
437
438 return new Function<>() {
439 private final long _timespan = timespan.toMillis();
440
441 private long _start = 0;
442 private C _best;
443
444 @Override
445 public Stream<C> apply(final C value) {
446 if (_start == 0) {
447 _start = clock.millis();
448 }
449
450 _best = comp.apply(_best, value);
451 long end = clock.millis();
452
453 final Stream<C> result;
454 if (end - _start >= _timespan) {
455 result = Stream.of(_best);
456 _start = 0;
457 _best = null;
458 } else {
459 result = Stream.empty();
460 }
461
462 return result;
463 }
464 };
465 }
466
467 }
|