001/* 002 * Java Genetic Algorithm Library (jenetics-8.1.0). 003 * Copyright (c) 2007-2024 Franz Wilhelmstötter 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 * Author: 018 * Franz Wilhelmstötter (franz.wilhelmstoetter@gmail.com) 019 */ 020package io.jenetics.util; 021 022import static java.time.Clock.systemUTC; 023import static java.util.Objects.requireNonNull; 024 025import java.time.Clock; 026import java.time.Duration; 027import java.util.Comparator; 028import java.util.function.BinaryOperator; 029import java.util.function.Function; 030import java.util.stream.Stream; 031 032/** 033 * This class contains factory methods for (flat) mapping stream elements. The 034 * functions of this class can be used in the following way. 035 * {@snippet lang="java": 036 * final ISeq<Integer> values = new Random().ints(0, 100).boxed() 037 * .limit(100) 038 * .flatMap(Streams.toIntervalMax(13)) 039 * .collect(ISeq.toISeq()); 040 * } 041 * 042 * @author <a href="mailto:franz.wilhelmstoetter@gmail.com">Franz Wilhelmstötter</a> 043 * @since 6.0 044 * @version 6.0 045 */ 046public final class Streams { 047 private Streams() {} 048 049 050 /** 051 * Return a new flat-mapper function, which guarantees a strictly increasing 052 * stream, from an arbitrarily ordered source stream. Note that this 053 * function doesn't sort the stream. It <em>just</em> skips the <em>out of 054 * order</em> elements. 055 * 056 * <pre>{@code 057 * +----3--2--5--4--7--7--4--9----| 058 * toStrictlyIncreasing() 059 * +----3-----5-----7--------9----| 060 * }</pre> 061 * 062 * {@snippet lang="java": 063 * final ISeq<Integer> values = new Random().ints(0, 100) 064 * .boxed() 065 * .limit(100) 066 * .flatMap(Streams.toStrictlyIncreasing()) 067 * .collect(ISeq.toISeq()); 068 * 069 * System.out.println(values); 070 * // [6,47,65,78,96,96,99] 071 * } 072 * 073 * 074 * @param <C> the comparable type 075 * @return a new flat-mapper function 076 */ 077 public static <C extends Comparable<? super C>> 078 Function<C, Stream<C>> toStrictlyIncreasing() { 079 return strictlyImproving(Streams::max); 080 } 081 082 /** 083 * Return a new flat-mapper function, which guarantees a strictly decreasing 084 * stream, from an arbitrarily ordered source stream. Note that this 085 * function doesn't sort the stream. It <em>just</em> skips the <em>out of 086 * order</em> elements. 087 * 088 * <pre>{@code 089 * +----9--8--9--5--6--6--2--9----| 090 * toStrictlyDecreasing() 091 * +----9--8-----5--------2-------| 092 * }</pre> 093 * 094 * {@snippet lang="java": 095 * final ISeq<Integer> values = new Random().ints(0, 100) 096 * .boxed() 097 * .limit(100) 098 * .flatMap(Streams.toStrictlyDecreasing()) 099 * .collect(ISeq.toISeq()); 100 * 101 * System.out.println(values); 102 * // [45,32,15,12,3,1] 103 * } 104 * 105 * @param <C> the comparable type 106 * @return a new flat-mapper function 107 */ 108 public static <C extends Comparable<? super C>> 109 Function<C, Stream<C>> toStrictlyDecreasing() { 110 return strictlyImproving(Streams::min); 111 } 112 113 /** 114 * Return a new flat-mapper function, which guarantees a strictly improving 115 * stream, from an arbitrarily ordered source stream. Note that this 116 * function doesn't sort the stream. It <em>just</em> skips the <em>out of 117 * order</em> elements. 118 * {@snippet lang="java": 119 * final ISeq<Integer> values = new Random().ints(0, 100) 120 * .boxed() 121 * .limit(100) 122 * .flatMap(Streams.toStrictlyImproving(Comparator.naturalOrder())) 123 * .collect(ISeq.toISeq()); 124 * 125 * System.out.println(values); 126 * // [6,47,65,78,96,96,99] 127 * } 128 * 129 * @see #toStrictlyIncreasing() 130 * @see #toStrictlyDecreasing() 131 * 132 * @param <T> the element type 133 * @param comparator the comparator used for testing the elements 134 * @return a new flat-mapper function 135 */ 136 public static <T> Function<T, Stream<T>> 137 toStrictlyImproving(final Comparator<? super T> comparator) { 138 return strictlyImproving((a, b) -> best(comparator, a, b)); 139 } 140 141 private static <C> Function<C, Stream<C>> 142 strictlyImproving(final BinaryOperator<C> comparator) { 143 requireNonNull(comparator); 144 145 return new Function<>() { 146 private C _best; 147 148 @Override 149 public Stream<C> apply(final C result) { 150 final C best = comparator.apply(_best, result); 151 152 final Stream<C> stream = best == _best 153 ? Stream.empty() 154 : Stream.of(best); 155 156 _best = best; 157 158 return stream; 159 } 160 }; 161 } 162 163 private static <T extends Comparable<? super T>> T max(final T a, final T b) { 164 return best(Comparator.naturalOrder(), a, b); 165 } 166 167 private static <T extends Comparable<? super T>> T min(final T a, final T b) { 168 return best(Comparator.reverseOrder(), a, b); 169 } 170 171 private static <T> 172 T best(final Comparator<? super T> comparator, final T a, final T b) { 173 if (a == null && b == null) return null; 174 if (a == null) return b; 175 if (b == null) return a; 176 return comparator.compare(a, b) >= 0 ? a : b; 177 } 178 179 /** 180 * Return a new flat-mapper function which returns (emits) the maximal value 181 * of the last <em>n</em> elements. 182 * 183 * <pre>{@code 184 * +----3---+----3---+ 185 * | | | 186 * +----9--8--3--3--5--4--2--9----| 187 * toIntervalMax(3) 188 * +----------9--------5----------| 189 * }</pre> 190 * 191 * @param size the size of the slice 192 * @param <C> the element type 193 * @return a new flat-mapper function 194 * @throws IllegalArgumentException if the given size is smaller than one 195 */ 196 public static <C extends Comparable<? super C>> 197 Function<C, Stream<C>> toIntervalMax(final int size) { 198 return sliceBest(Streams::max, size); 199 } 200 201 /** 202 * Return a new flat-mapper function which returns (emits) the minimal value 203 * of the last <em>n</em> elements. 204 * 205 * <pre>{@code 206 * +----3---+----3---+ 207 * | | | 208 * +----9--8--3--3--1--4--2--9----| 209 * toIntervalMin(3) 210 * +----------3--------1----------| 211 * }</pre> 212 * 213 * @param size the size of the slice 214 * @param <C> the element type 215 * @return a new flat-mapper function 216 * @throws IllegalArgumentException if the given size is smaller than one 217 */ 218 public static <C extends Comparable<? super C>> 219 Function<C, Stream<C>> toIntervalMin(final int size) { 220 return sliceBest(Streams::min, size); 221 } 222 223 /** 224 * Return a new flat-mapper function which returns (emits) the minimal value 225 * of the last <em>n</em> elements. 226 * 227 * @see #toIntervalMax(int) 228 * @see #toIntervalMin(int) 229 * 230 * @param <C> the element type 231 * @param size the size of the slice 232 * @param comparator the comparator used for testing the elements 233 * @return a new flat-mapper function 234 * @throws IllegalArgumentException if the given size is smaller than one 235 * @throws NullPointerException if the given {@code comparator} is 236 * {@code null} 237 */ 238 public static <C> Function<C, Stream<C>> 239 toIntervalBest(final Comparator<? super C> comparator, final int size) { 240 requireNonNull(comparator); 241 return sliceBest((a, b) -> best(comparator, a, b), size); 242 } 243 244 private static <C> Function<C, Stream<C>> sliceBest( 245 final BinaryOperator<C> comp, 246 final int rangeSize 247 ) { 248 requireNonNull(comp); 249 if (rangeSize < 1) { 250 throw new IllegalArgumentException( 251 "Range size must be at least one: " + rangeSize 252 ); 253 } 254 255 return new Function<>() { 256 private int _count = 0; 257 private C _best; 258 259 @Override 260 public Stream<C> apply(final C value) { 261 ++_count; 262 _best = comp.apply(_best, value); 263 264 final Stream<C> result; 265 if (_count >= rangeSize) { 266 result = Stream.of(_best); 267 _count = 0; 268 _best = null; 269 } else { 270 result = Stream.empty(); 271 } 272 273 return result; 274 } 275 }; 276 } 277 278 /** 279 * Return a new flat-mapper function which returns (emits) the maximal value 280 * of the elements emitted within the given {@code timespan}. 281 * 282 * <pre>{@code 283 * +---3s---+---3s---+ 284 * | | | 285 * +----9--8--3--3--5--4--2--9----| 286 * toIntervalMax(3s) 287 * +----------9--------5----------| 288 * }</pre> 289 * 290 * @see #toIntervalMax(Duration, Clock) 291 * 292 * @param <C> the element type 293 * @param timespan the timespan the elements are collected for the 294 * calculation slice 295 * @return a new flat-mapper function 296 * @throws IllegalArgumentException if the given size is smaller than one 297 * @throws NullPointerException if the given {@code timespan} is {@code null} 298 */ 299 public static <C extends Comparable<? super C>> 300 Function<C, Stream<C>> toIntervalMax(final Duration timespan) { 301 return sliceBest(Streams::max, timespan, systemUTC()); 302 } 303 304 /** 305 * Return a new flat-mapper function which returns (emits) the maximal value 306 * of the elements emitted within the given {@code timespan}. 307 * 308 * <pre>{@code 309 * +---3s---+---3s---+ 310 * | | | 311 * +----9--8--3--3--5--4--2--9----| 312 * toIntervalMax(3s) 313 * +----------9--------5----------| 314 * }</pre> 315 * 316 * @see #toIntervalMax(Duration) 317 * 318 * @param <C> the element type 319 * @param timespan the timespan the elements are collected for the 320 * calculation slice 321 * @param clock the {@code clock} used for measuring the {@code timespan} 322 * @return a new flat-mapper function 323 * @throws IllegalArgumentException if the given size is smaller than one 324 * @throws NullPointerException if one of the arguments is {@code null} 325 */ 326 public static <C extends Comparable<? super C>> 327 Function<C, Stream<C>> toIntervalMax(final Duration timespan, final Clock clock) { 328 return sliceBest(Streams::max, timespan, clock); 329 } 330 331 /** 332 * Return a new flat-mapper function which returns (emits) the minimal value 333 * of the elements emitted within the given {@code timespan}. 334 * 335 * <pre>{@code 336 * +---3s---+---3s---+ 337 * | | | 338 * +----9--8--3--3--1--4--2--9----| 339 * toIntervalMin(3s) 340 * +----------3--------1----------| 341 * }</pre> 342 * 343 * @see #toIntervalMin(Duration, Clock) 344 * 345 * @param <C> the element type 346 * @param timespan the timespan the elements are collected for the 347 * calculation slice 348 * @return a new flat-mapper function 349 * @throws IllegalArgumentException if the given size is smaller than one 350 * @throws NullPointerException if the given {@code timespan} is {@code null} 351 */ 352 public static <C extends Comparable<? super C>> 353 Function<C, Stream<C>> toIntervalMin(final Duration timespan) { 354 return sliceBest(Streams::min, timespan, systemUTC()); 355 } 356 357 /** 358 * Return a new flat-mapper function which returns (emits) the minimal value 359 * of the elements emitted within the given {@code timespan}. 360 * 361 * <pre>{@code 362 * +---3s---+---3s---+ 363 * | | | 364 * +----9--8--3--3--1--4--2--9----| 365 * toIntervalMin(3s) 366 * +----------3--------1----------| 367 * }</pre> 368 * 369 * @see #toIntervalMin(Duration) 370 * 371 * @param <C> the element type 372 * @param timespan the timespan the elements are collected for the 373 * calculation slice 374 * @param clock the {@code clock} used for measuring the {@code timespan} 375 * @return a new flat-mapper function 376 * @throws IllegalArgumentException if the given size is smaller than one 377 * @throws NullPointerException if one of the arguments is {@code null} 378 */ 379 public static <C extends Comparable<? super C>> 380 Function<C, Stream<C>> toIntervalMin(final Duration timespan, final Clock clock) { 381 return sliceBest(Streams::min, timespan, clock); 382 } 383 384 /** 385 * Return a new flat-mapper function which returns (emits) the minimal value 386 * of the elements emitted within the given {@code timespan}. 387 * 388 * @see #toIntervalMin(Duration) 389 * @see #toIntervalMax(Duration) 390 * 391 * @param <C> the element type 392 * @param comparator the comparator used for testing the elements 393 * @param timespan the timespan the elements are collected for the 394 * calculation slice 395 * @return a new flat-mapper function 396 * @throws IllegalArgumentException if the given size is smaller than one 397 @throws NullPointerException if one of the arguments is {@code null} 398 */ 399 public static <C> Function<C, Stream<C>> 400 toIntervalBest(final Comparator<? super C> comparator, final Duration timespan) { 401 requireNonNull(comparator); 402 return sliceBest((a, b) -> best(comparator, a, b), timespan, systemUTC()); 403 } 404 405 /** 406 * Return a new flat-mapper function which returns (emits) the minimal value 407 * of the elements emitted within the given {@code timespan}. 408 * 409 * @param <C> the element type 410 * @param comparator the comparator used for testing the elements 411 * @param timespan the timespan the elements are collected for the 412 * calculation slice 413 * @param clock the {@code clock} used for measuring the {@code timespan} 414 * @return a new flat-mapper function 415 * @throws IllegalArgumentException if the given size is smaller than one 416 @throws NullPointerException if one of the arguments is {@code null} 417 */ 418 public static <C> Function<C, Stream<C>> 419 toIntervalBest( 420 final Comparator<? super C> comparator, 421 final Duration timespan, 422 final Clock clock 423 ) { 424 requireNonNull(comparator); 425 return sliceBest((a, b) -> best(comparator, a, b), timespan, clock); 426 } 427 428 private static <C> Function<C, Stream<C>> sliceBest( 429 final BinaryOperator<C> comp, 430 final Duration timespan, 431 final Clock clock 432 ) { 433 requireNonNull(comp); 434 requireNonNull(timespan); 435 436 return new Function<>() { 437 private final long _timespan = timespan.toMillis(); 438 439 private long _start = 0; 440 private C _best; 441 442 @Override 443 public Stream<C> apply(final C value) { 444 if (_start == 0) { 445 _start = clock.millis(); 446 } 447 448 _best = comp.apply(_best, value); 449 long end = clock.millis(); 450 451 final Stream<C> result; 452 if (end - _start >= _timespan) { 453 result = Stream.of(_best); 454 _start = 0; 455 _best = null; 456 } else { 457 result = Stream.empty(); 458 } 459 460 return result; 461 } 462 }; 463 } 464 465}