001/* 002 * Java Genetic Algorithm Library (jenetics-9.0.0). 003 * Copyright (c) 2007-2026 Franz Wilhelmstötter 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 * Author: 018 * Franz Wilhelmstötter (franz.wilhelmstoetter@gmail.com) 019 */ 020package io.jenetics.util; 021 022import static java.time.Clock.systemUTC; 023import static java.util.Objects.requireNonNull; 024 025import java.time.Clock; 026import java.time.Duration; 027import java.util.Comparator; 028import java.util.function.BinaryOperator; 029import java.util.function.Function; 030import java.util.stream.Stream; 031 032/** 033 * This class contains factory methods for (flat) mapping stream element. The 034 * functions of this class can be used in the following way. 035 * {@snippet lang="java": 036 * final ISeq<Integer> values = new Random().ints(0, 100).boxed() 037 * .limit(100) 038 * .flatMap(Streams.toIntervalMax(13)) 039 * .collect(ISeq.toISeq()); 040 * } 041 * 042 * @author <a href="mailto:franz.wilhelmstoetter@gmail.com">Franz Wilhelmstötter</a> 043 * @since 6.0 044 * @version 6.0 045 */ 046public final class Streams { 047 private Streams() { 048 } 049 050 051 /** 052 * Return a new flat-mapper function, which guarantees a strictly increasing 053 * stream, from an arbitrarily ordered source stream. Note that this 054 * function doesn't sort the stream. It <em>just</em> skips the <em>out of 055 * order</em> elements. 056 * 057 * <pre>{@code 058 * +----3--2--5--4--7--7--4--9----| 059 * toStrictlyIncreasing() 060 * +----3-----5-----7--------9----| 061 * }</pre> 062 * 063 * {@snippet lang="java": 064 * final ISeq<Integer> values = new Random().ints(0, 100) 065 * .boxed() 066 * .limit(100) 067 * .flatMap(Streams.toStrictlyIncreasing()) 068 * .collect(ISeq.toISeq()); 069 * 070 * System.out.println(values); 071 * // [6,47,65,78,96,96,99] 072 * } 073 * 074 * 075 * @param <C> the comparable type 076 * @return a new flat-mapper function 077 */ 078 public static <C extends Comparable<? super C>> 079 Function<C, Stream<C>> toStrictlyIncreasing() { 080 return strictlyImproving(Streams::max); 081 } 082 083 /** 084 * Return a new flat-mapper function, which guarantees a strictly decreasing 085 * stream, from an arbitrarily ordered source stream. Note that this 086 * function doesn't sort the stream. It <em>just</em> skips the <em>out of 087 * order</em> elements. 088 * 089 * <pre>{@code 090 * +----9--8--9--5--6--6--2--9----| 091 * toStrictlyDecreasing() 092 * +----9--8-----5--------2-------| 093 * }</pre> 094 * 095 * {@snippet lang="java": 096 * final ISeq<Integer> values = new Random().ints(0, 100) 097 * .boxed() 098 * .limit(100) 099 * .flatMap(Streams.toStrictlyDecreasing()) 100 * .collect(ISeq.toISeq()); 101 * 102 * System.out.println(values); 103 * // [45,32,15,12,3,1] 104 * } 105 * 106 * @param <C> the comparable type 107 * @return a new flat-mapper function 108 */ 109 public static <C extends Comparable<? super C>> 110 Function<C, Stream<C>> toStrictlyDecreasing() { 111 return strictlyImproving(Streams::min); 112 } 113 114 /** 115 * Return a new flat-mapper function, which guarantees a strictly improving 116 * stream, from an arbitrarily ordered source stream. Note that this 117 * function doesn't sort the stream. It <em>just</em> skips the <em>out of 118 * order</em> elements. 119 * {@snippet lang="java": 120 * final ISeq<Integer> values = new Random().ints(0, 100) 121 * .boxed() 122 * .limit(100) 123 * .flatMap(Streams.toStrictlyImproving(Comparator.naturalOrder())) 124 * .collect(ISeq.toISeq()); 125 * 126 * System.out.println(values); 127 * // [6,47,65,78,96,96,99] 128 * } 129 * 130 * @see #toStrictlyIncreasing() 131 * @see #toStrictlyDecreasing() 132 * 133 * @param <T> the element type 134 * @param comparator the comparator used for testing the elements 135 * @return a new flat-mapper function 136 */ 137 public static <T> Function<T, Stream<T>> 138 toStrictlyImproving(final Comparator<? super T> comparator) { 139 return strictlyImproving((a, b) -> best(comparator, a, b)); 140 } 141 142 private static <C> Function<C, Stream<C>> 143 strictlyImproving(final BinaryOperator<C> comparator) { 144 requireNonNull(comparator); 145 146 return new Function<>() { 147 private C _best; 148 149 @Override 150 public Stream<C> apply(final C result) { 151 final C best = comparator.apply(_best, result); 152 153 final Stream<C> stream = best == _best 154 ? Stream.empty() 155 : Stream.of(best); 156 157 _best = best; 158 159 return stream; 160 } 161 }; 162 } 163 164 private static <T extends Comparable<? super T>> T max(final T a, final T b) { 165 return best(Comparator.naturalOrder(), a, b); 166 } 167 168 private static <T extends Comparable<? super T>> T min(final T a, final T b) { 169 return best(Comparator.reverseOrder(), a, b); 170 } 171 172 private static <T> 173 T best(final Comparator<? super T> comparator, final T a, final T b) { 174 if (a == null && b == null) return null; 175 if (a == null) return b; 176 if (b == null) return a; 177 return comparator.compare(a, b) >= 0 ? a : b; 178 } 179 180 /** 181 * Return a new flat-mapper function which returns (emits) the maximal value 182 * of the last <em>n</em> elements. 183 * 184 * <pre>{@code 185 * +----3---+----3---+ 186 * | | | 187 * +----9--8--3--3--5--4--2--9----| 188 * toIntervalMax(3) 189 * +----------9--------5----------| 190 * }</pre> 191 * 192 * @param size the size of the slice 193 * @param <C> the element type 194 * @return a new flat-mapper function 195 * @throws IllegalArgumentException if the given size is smaller than one 196 */ 197 public static <C extends Comparable<? super C>> 198 Function<C, Stream<C>> toIntervalMax(final int size) { 199 return sliceBest(Streams::max, size); 200 } 201 202 /** 203 * Return a new flat-mapper function which returns (emits) the minimal value 204 * of the last <em>n</em> elements. 205 * 206 * <pre>{@code 207 * +----3---+----3---+ 208 * | | | 209 * +----9--8--3--3--1--4--2--9----| 210 * toIntervalMin(3) 211 * +----------3--------1----------| 212 * }</pre> 213 * 214 * @param size the size of the slice 215 * @param <C> the element type 216 * @return a new flat-mapper function 217 * @throws IllegalArgumentException if the given size is smaller than one 218 */ 219 public static <C extends Comparable<? super C>> 220 Function<C, Stream<C>> toIntervalMin(final int size) { 221 return sliceBest(Streams::min, size); 222 } 223 224 /** 225 * Return a new flat-mapper function which returns (emits) the minimal value 226 * of the last <em>n</em> elements. 227 * 228 * @see #toIntervalMax(int) 229 * @see #toIntervalMin(int) 230 * 231 * @param <C> the element type 232 * @param size the size of the slice 233 * @param comparator the comparator used for testing the elements 234 * @return a new flat-mapper function 235 * @throws IllegalArgumentException if the given size is smaller than one 236 * @throws NullPointerException if the given {@code comparator} is 237 * {@code null} 238 */ 239 public static <C> Function<C, Stream<C>> 240 toIntervalBest(final Comparator<? super C> comparator, final int size) { 241 requireNonNull(comparator); 242 return sliceBest((a, b) -> best(comparator, a, b), size); 243 } 244 245 private static <C> Function<C, Stream<C>> sliceBest( 246 final BinaryOperator<C> comp, 247 final int rangeSize 248 ) { 249 requireNonNull(comp); 250 if (rangeSize < 1) { 251 throw new IllegalArgumentException( 252 "Range size must be at least one: " + rangeSize 253 ); 254 } 255 256 return new Function<>() { 257 private int _count = 0; 258 private C _best; 259 260 @Override 261 public Stream<C> apply(final C value) { 262 ++_count; 263 _best = comp.apply(_best, value); 264 265 final Stream<C> result; 266 if (_count >= rangeSize) { 267 result = Stream.of(_best); 268 _count = 0; 269 _best = null; 270 } else { 271 result = Stream.empty(); 272 } 273 274 return result; 275 } 276 }; 277 } 278 279 /** 280 * Return a new flat-mapper function which returns (emits) the maximal value 281 * of the elements emitted within the given {@code timespan}. 282 * 283 * <pre>{@code 284 * +---3s---+---3s---+ 285 * | | | 286 * +----9--8--3--3--5--4--2--9----| 287 * toIntervalMax(3s) 288 * +----------9--------5----------| 289 * }</pre> 290 * 291 * @see #toIntervalMax(Duration, Clock) 292 * 293 * @param <C> the element type 294 * @param timespan the timespan the elements are collected for the 295 * calculation slice 296 * @return a new flat-mapper function 297 * @throws IllegalArgumentException if the given size is smaller than one 298 * @throws NullPointerException if the given {@code timespan} is {@code null} 299 */ 300 public static <C extends Comparable<? super C>> 301 Function<C, Stream<C>> toIntervalMax(final Duration timespan) { 302 return sliceBest(Streams::max, timespan, systemUTC()); 303 } 304 305 /** 306 * Return a new flat-mapper function which returns (emits) the maximal value 307 * of the elements emitted within the given {@code timespan}. 308 * 309 * <pre>{@code 310 * +---3s---+---3s---+ 311 * | | | 312 * +----9--8--3--3--5--4--2--9----| 313 * toIntervalMax(3s) 314 * +----------9--------5----------| 315 * }</pre> 316 * 317 * @see #toIntervalMax(Duration) 318 * 319 * @param <C> the element type 320 * @param timespan the timespan the elements are collected for the 321 * calculation slice 322 * @param clock the {@code clock} used for measuring the {@code timespan} 323 * @return a new flat-mapper function 324 * @throws IllegalArgumentException if the given size is smaller than one 325 * @throws NullPointerException if one of the arguments is {@code null} 326 */ 327 public static <C extends Comparable<? super C>> 328 Function<C, Stream<C>> toIntervalMax(final Duration timespan, final Clock clock) { 329 return sliceBest(Streams::max, timespan, clock); 330 } 331 332 /** 333 * Return a new flat-mapper function which returns (emits) the minimal value 334 * of the elements emitted within the given {@code timespan}. 335 * 336 * <pre>{@code 337 * +---3s---+---3s---+ 338 * | | | 339 * +----9--8--3--3--1--4--2--9----| 340 * toIntervalMin(3s) 341 * +----------3--------1----------| 342 * }</pre> 343 * 344 * @see #toIntervalMin(Duration, Clock) 345 * 346 * @param <C> the element type 347 * @param timespan the timespan the elements are collected for the 348 * calculation slice 349 * @return a new flat-mapper function 350 * @throws IllegalArgumentException if the given size is smaller than one 351 * @throws NullPointerException if the given {@code timespan} is {@code null} 352 */ 353 public static <C extends Comparable<? super C>> 354 Function<C, Stream<C>> toIntervalMin(final Duration timespan) { 355 return sliceBest(Streams::min, timespan, systemUTC()); 356 } 357 358 /** 359 * Return a new flat-mapper function which returns (emits) the minimal value 360 * of the elements emitted within the given {@code timespan}. 361 * 362 * <pre>{@code 363 * +---3s---+---3s---+ 364 * | | | 365 * +----9--8--3--3--1--4--2--9----| 366 * toIntervalMin(3s) 367 * +----------3--------1----------| 368 * }</pre> 369 * 370 * @see #toIntervalMin(Duration) 371 * 372 * @param <C> the element type 373 * @param timespan the timespan the elements are collected for the 374 * calculation slice 375 * @param clock the {@code clock} used for measuring the {@code timespan} 376 * @return a new flat-mapper function 377 * @throws IllegalArgumentException if the given size is smaller than one 378 * @throws NullPointerException if one of the arguments is {@code null} 379 */ 380 public static <C extends Comparable<? super C>> 381 Function<C, Stream<C>> toIntervalMin(final Duration timespan, final Clock clock) { 382 return sliceBest(Streams::min, timespan, clock); 383 } 384 385 /** 386 * Return a new flat-mapper function which returns (emits) the minimal value 387 * of the elements emitted within the given {@code timespan}. 388 * 389 * @see #toIntervalMin(Duration) 390 * @see #toIntervalMax(Duration) 391 * 392 * @param <C> the element type 393 * @param comparator the comparator used for testing the elements 394 * @param timespan the timespan the elements are collected for the 395 * calculation slice 396 * @return a new flat-mapper function 397 * @throws IllegalArgumentException if the given size is smaller than one 398 @throws NullPointerException if one of the arguments is {@code null} 399 */ 400 public static <C> Function<C, Stream<C>> 401 toIntervalBest(final Comparator<? super C> comparator, final Duration timespan) { 402 requireNonNull(comparator); 403 return sliceBest((a, b) -> best(comparator, a, b), timespan, systemUTC()); 404 } 405 406 /** 407 * Return a new flat-mapper function which returns (emits) the minimal value 408 * of the elements emitted within the given {@code timespan}. 409 * 410 * @param <C> the element type 411 * @param comparator the comparator used for testing the elements 412 * @param timespan the timespan the elements are collected for the 413 * calculation slice 414 * @param clock the {@code clock} used for measuring the {@code timespan} 415 * @return a new flat-mapper function 416 * @throws IllegalArgumentException if the given size is smaller than one 417 @throws NullPointerException if one of the arguments is {@code null} 418 */ 419 public static <C> Function<C, Stream<C>> 420 toIntervalBest( 421 final Comparator<? super C> comparator, 422 final Duration timespan, 423 final Clock clock 424 ) { 425 requireNonNull(comparator); 426 return sliceBest((a, b) -> best(comparator, a, b), timespan, clock); 427 } 428 429 private static <C> Function<C, Stream<C>> sliceBest( 430 final BinaryOperator<C> comp, 431 final Duration timespan, 432 final Clock clock 433 ) { 434 requireNonNull(comp); 435 requireNonNull(timespan); 436 437 return new Function<>() { 438 private final long _timespan = timespan.toMillis(); 439 440 private long _start = 0; 441 private C _best; 442 443 @Override 444 public Stream<C> apply(final C value) { 445 if (_start == 0) { 446 _start = clock.millis(); 447 } 448 449 _best = comp.apply(_best, value); 450 long end = clock.millis(); 451 452 final Stream<C> result; 453 if (end - _start >= _timespan) { 454 result = Stream.of(_best); 455 _start = 0; 456 _best = null; 457 } else { 458 result = Stream.empty(); 459 } 460 461 return result; 462 } 463 }; 464 } 465 466}