001/* 002 * Java Genetic Algorithm Library (jenetics-7.2.0). 003 * Copyright (c) 2007-2023 Franz Wilhelmstötter 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 * Author: 018 * Franz Wilhelmstötter (franz.wilhelmstoetter@gmail.com) 019 */ 020package io.jenetics.util; 021 022import static java.time.Clock.systemUTC; 023import static java.util.Objects.requireNonNull; 024 025import java.time.Clock; 026import java.time.Duration; 027import java.util.Comparator; 028import java.util.function.BinaryOperator; 029import java.util.function.Function; 030import java.util.stream.Stream; 031 032/** 033 * This class contains factory methods for (flat) mapping stream elements. The 034 * functions of this class can be used in the following way. 035 * 036 * <pre>{@code 037 * final ISeq<Integer> values = new Random().ints(0, 100).boxed() 038 * .limit(100) 039 * .flatMap(Streams.toIntervalMax(13)) 040 * .collect(ISeq.toISeq()); 041 * }</pre> 042 * 043 * @author <a href="mailto:franz.wilhelmstoetter@gmail.com">Franz Wilhelmstötter</a> 044 * @since 6.0 045 * @version 6.0 046 */ 047public final class Streams { 048 private Streams() {} 049 050 051 /** 052 * Return a new flat-mapper function, which guarantees a strictly increasing 053 * stream, from an arbitrarily ordered source stream. Note that this 054 * function doesn't sort the stream. It <em>just</em> skips the <em>out of 055 * order</em> elements. 056 * 057 * <pre>{@code 058 * +----3--2--5--4--7--7--4--9----| 059 * toStrictlyIncreasing() 060 * +----3-----5-----7--------9----| 061 * }</pre> 062 * 063 * <pre>{@code 064 * final ISeq<Integer> values = new Random().ints(0, 100) 065 * .boxed() 066 * .limit(100) 067 * .flatMap(Streams.toStrictlyIncreasing()) 068 * .collect(ISeq.toISeq()); 069 * 070 * System.out.println(values); 071 * // [6,47,65,78,96,96,99] 072 * }</pre> 073 * 074 * 075 * @param <C> the comparable type 076 * @return a new flat-mapper function 077 */ 078 public static <C extends Comparable<? super C>> 079 Function<C, Stream<C>> toStrictlyIncreasing() { 080 return strictlyImproving(Streams::max); 081 } 082 083 /** 084 * Return a new flat-mapper function, which guarantees a strictly decreasing 085 * stream, from an arbitrarily ordered source stream. Note that this 086 * function doesn't sort the stream. It <em>just</em> skips the <em>out of 087 * order</em> elements. 088 * 089 * <pre>{@code 090 * +----9--8--9--5--6--6--2--9----| 091 * toStrictlyDecreasing() 092 * +----9--8-----5--------2-------| 093 * }</pre> 094 * 095 * <pre>{@code 096 * final ISeq<Integer> values = new Random().ints(0, 100) 097 * .boxed() 098 * .limit(100) 099 * .flatMap(Streams.toStrictlyDecreasing()) 100 * .collect(ISeq.toISeq()); 101 * 102 * System.out.println(values); 103 * // [45,32,15,12,3,1] 104 * }</pre> 105 * 106 * @param <C> the comparable type 107 * @return a new flat-mapper function 108 */ 109 public static <C extends Comparable<? super C>> 110 Function<C, Stream<C>> toStrictlyDecreasing() { 111 return strictlyImproving(Streams::min); 112 } 113 114 /** 115 * Return a new flat-mapper function, which guarantees a strictly improving 116 * stream, from an arbitrarily ordered source stream. Note that this 117 * function doesn't sort the stream. It <em>just</em> skips the <em>out of 118 * order</em> elements. 119 * 120 * <pre>{@code 121 * final ISeq<Integer> values = new Random().ints(0, 100) 122 * .boxed() 123 * .limit(100) 124 * .flatMap(Streams.toStrictlyImproving(Comparator.naturalOrder())) 125 * .collect(ISeq.toISeq()); 126 * 127 * System.out.println(values); 128 * // [6,47,65,78,96,96,99] 129 * }</pre> 130 * 131 * @see #toStrictlyIncreasing() 132 * @see #toStrictlyDecreasing() 133 * 134 * @param <T> the element type 135 * @param comparator the comparator used for testing the elements 136 * @return a new flat-mapper function 137 */ 138 public static <T> Function<T, Stream<T>> 139 toStrictlyImproving(final Comparator<? super T> comparator) { 140 return strictlyImproving((a, b) -> best(comparator, a, b)); 141 } 142 143 private static <C> Function<C, Stream<C>> 144 strictlyImproving(final BinaryOperator<C> comparator) { 145 requireNonNull(comparator); 146 147 return new Function<>() { 148 private C _best; 149 150 @Override 151 public Stream<C> apply(final C result) { 152 final C best = comparator.apply(_best, result); 153 154 final Stream<C> stream = best == _best 155 ? Stream.empty() 156 : Stream.of(best); 157 158 _best = best; 159 160 return stream; 161 } 162 }; 163 } 164 165 private static <T extends Comparable<? super T>> T max(final T a, final T b) { 166 return best(Comparator.naturalOrder(), a, b); 167 } 168 169 private static <T extends Comparable<? super T>> T min(final T a, final T b) { 170 return best(Comparator.reverseOrder(), a, b); 171 } 172 173 private static <T> 174 T best(final Comparator<? super T> comparator, final T a, final T b) { 175 if (a == null && b == null) return null; 176 if (a == null) return b; 177 if (b == null) return a; 178 return comparator.compare(a, b) >= 0 ? a : b; 179 } 180 181 /** 182 * Return a new flat-mapper function which returns (emits) the maximal value 183 * of the last <em>n</em> elements. 184 * 185 * <pre>{@code 186 * +----3---+----3---+ 187 * | | | 188 * +----9--8--3--3--5--4--2--9----| 189 * toIntervalMax(3) 190 * +----------9--------5----------| 191 * }</pre> 192 * 193 * @param size the size of the slice 194 * @param <C> the element type 195 * @return a new flat-mapper function 196 * @throws IllegalArgumentException if the given size is smaller than one 197 */ 198 public static <C extends Comparable<? super C>> 199 Function<C, Stream<C>> toIntervalMax(final int size) { 200 return sliceBest(Streams::max, size); 201 } 202 203 /** 204 * Return a new flat-mapper function which returns (emits) the minimal value 205 * of the last <em>n</em> elements. 206 * 207 * <pre>{@code 208 * +----3---+----3---+ 209 * | | | 210 * +----9--8--3--3--1--4--2--9----| 211 * toIntervalMin(3) 212 * +----------3--------1----------| 213 * }</pre> 214 * 215 * @param size the size of the slice 216 * @param <C> the element type 217 * @return a new flat-mapper function 218 * @throws IllegalArgumentException if the given size is smaller than one 219 */ 220 public static <C extends Comparable<? super C>> 221 Function<C, Stream<C>> toIntervalMin(final int size) { 222 return sliceBest(Streams::min, size); 223 } 224 225 /** 226 * Return a new flat-mapper function which returns (emits) the minimal value 227 * of the last <em>n</em> elements. 228 * 229 * @see #toIntervalMax(int) 230 * @see #toIntervalMin(int) 231 * 232 * @param <C> the element type 233 * @param size the size of the slice 234 * @param comparator the comparator used for testing the elements 235 * @return a new flat-mapper function 236 * @throws IllegalArgumentException if the given size is smaller than one 237 * @throws NullPointerException if the given {@code comparator} is 238 * {@code null} 239 */ 240 public static <C> Function<C, Stream<C>> 241 toIntervalBest(final Comparator<? super C> comparator, final int size) { 242 requireNonNull(comparator); 243 return sliceBest((a, b) -> best(comparator, a, b), size); 244 } 245 246 private static <C> Function<C, Stream<C>> sliceBest( 247 final BinaryOperator<C> comp, 248 final int rangeSize 249 ) { 250 requireNonNull(comp); 251 if (rangeSize < 1) { 252 throw new IllegalArgumentException( 253 "Range size must be at least one: " + rangeSize 254 ); 255 } 256 257 return new Function<>() { 258 private int _count = 0; 259 private C _best; 260 261 @Override 262 public Stream<C> apply(final C value) { 263 ++_count; 264 _best = comp.apply(_best, value); 265 266 final Stream<C> result; 267 if (_count >= rangeSize) { 268 result = Stream.of(_best); 269 _count = 0; 270 _best = null; 271 } else { 272 result = Stream.empty(); 273 } 274 275 return result; 276 } 277 }; 278 } 279 280 /** 281 * Return a new flat-mapper function which returns (emits) the maximal value 282 * of the elements emitted within the given {@code timespan}. 283 * 284 * <pre>{@code 285 * +---3s---+---3s---+ 286 * | | | 287 * +----9--8--3--3--5--4--2--9----| 288 * toIntervalMax(3s) 289 * +----------9--------5----------| 290 * }</pre> 291 * 292 * @see #toIntervalMax(Duration, Clock) 293 * 294 * @param <C> the element type 295 * @param timespan the timespan the elements are collected for the 296 * calculation slice 297 * @return a new flat-mapper function 298 * @throws IllegalArgumentException if the given size is smaller than one 299 * @throws NullPointerException if the given {@code timespan} is {@code null} 300 */ 301 public static <C extends Comparable<? super C>> 302 Function<C, Stream<C>> toIntervalMax(final Duration timespan) { 303 return sliceBest(Streams::max, timespan, systemUTC()); 304 } 305 306 /** 307 * Return a new flat-mapper function which returns (emits) the maximal value 308 * of the elements emitted within the given {@code timespan}. 309 * 310 * <pre>{@code 311 * +---3s---+---3s---+ 312 * | | | 313 * +----9--8--3--3--5--4--2--9----| 314 * toIntervalMax(3s) 315 * +----------9--------5----------| 316 * }</pre> 317 * 318 * @see #toIntervalMax(Duration) 319 * 320 * @param <C> the element type 321 * @param timespan the timespan the elements are collected for the 322 * calculation slice 323 * @param clock the {@code clock} used for measuring the {@code timespan} 324 * @return a new flat-mapper function 325 * @throws IllegalArgumentException if the given size is smaller than one 326 * @throws NullPointerException if one of the arguments is {@code null} 327 */ 328 public static <C extends Comparable<? super C>> 329 Function<C, Stream<C>> toIntervalMax(final Duration timespan, final Clock clock) { 330 return sliceBest(Streams::max, timespan, clock); 331 } 332 333 /** 334 * Return a new flat-mapper function which returns (emits) the minimal value 335 * of the elements emitted within the given {@code timespan}. 336 * 337 * <pre>{@code 338 * +---3s---+---3s---+ 339 * | | | 340 * +----9--8--3--3--1--4--2--9----| 341 * toIntervalMin(3s) 342 * +----------3--------1----------| 343 * }</pre> 344 * 345 * @see #toIntervalMin(Duration, Clock) 346 * 347 * @param <C> the element type 348 * @param timespan the timespan the elements are collected for the 349 * calculation slice 350 * @return a new flat-mapper function 351 * @throws IllegalArgumentException if the given size is smaller than one 352 * @throws NullPointerException if the given {@code timespan} is {@code null} 353 */ 354 public static <C extends Comparable<? super C>> 355 Function<C, Stream<C>> toIntervalMin(final Duration timespan) { 356 return sliceBest(Streams::min, timespan, systemUTC()); 357 } 358 359 /** 360 * Return a new flat-mapper function which returns (emits) the minimal value 361 * of the elements emitted within the given {@code timespan}. 362 * 363 * <pre>{@code 364 * +---3s---+---3s---+ 365 * | | | 366 * +----9--8--3--3--1--4--2--9----| 367 * toIntervalMin(3s) 368 * +----------3--------1----------| 369 * }</pre> 370 * 371 * @see #toIntervalMin(Duration) 372 * 373 * @param <C> the element type 374 * @param timespan the timespan the elements are collected for the 375 * calculation slice 376 * @param clock the {@code clock} used for measuring the {@code timespan} 377 * @return a new flat-mapper function 378 * @throws IllegalArgumentException if the given size is smaller than one 379 * @throws NullPointerException if one of the arguments is {@code null} 380 */ 381 public static <C extends Comparable<? super C>> 382 Function<C, Stream<C>> toIntervalMin(final Duration timespan, final Clock clock) { 383 return sliceBest(Streams::min, timespan, clock); 384 } 385 386 /** 387 * Return a new flat-mapper function which returns (emits) the minimal value 388 * of the elements emitted within the given {@code timespan}. 389 * 390 * @see #toIntervalMin(Duration) 391 * @see #toIntervalMax(Duration) 392 * 393 * @param <C> the element type 394 * @param comparator the comparator used for testing the elements 395 * @param timespan the timespan the elements are collected for the 396 * calculation slice 397 * @return a new flat-mapper function 398 * @throws IllegalArgumentException if the given size is smaller than one 399 @throws NullPointerException if one of the arguments is {@code null} 400 */ 401 public static <C> Function<C, Stream<C>> 402 toIntervalBest(final Comparator<? super C> comparator, final Duration timespan) { 403 requireNonNull(comparator); 404 return sliceBest((a, b) -> best(comparator, a, b), timespan, systemUTC()); 405 } 406 407 /** 408 * Return a new flat-mapper function which returns (emits) the minimal value 409 * of the elements emitted within the given {@code timespan}. 410 * 411 * @param <C> the element type 412 * @param comparator the comparator used for testing the elements 413 * @param timespan the timespan the elements are collected for the 414 * calculation slice 415 * @param clock the {@code clock} used for measuring the {@code timespan} 416 * @return a new flat-mapper function 417 * @throws IllegalArgumentException if the given size is smaller than one 418 @throws NullPointerException if one of the arguments is {@code null} 419 */ 420 public static <C> Function<C, Stream<C>> 421 toIntervalBest( 422 final Comparator<? super C> comparator, 423 final Duration timespan, 424 final Clock clock 425 ) { 426 requireNonNull(comparator); 427 return sliceBest((a, b) -> best(comparator, a, b), timespan, clock); 428 } 429 430 private static <C> Function<C, Stream<C>> sliceBest( 431 final BinaryOperator<C> comp, 432 final Duration timespan, 433 final Clock clock 434 ) { 435 requireNonNull(comp); 436 requireNonNull(timespan); 437 438 return new Function<>() { 439 private final long _timespan = timespan.toMillis(); 440 441 private long _start = 0; 442 private C _best; 443 444 @Override 445 public Stream<C> apply(final C value) { 446 if (_start == 0) { 447 _start = clock.millis(); 448 } 449 450 _best = comp.apply(_best, value); 451 long end = clock.millis(); 452 453 final Stream<C> result; 454 if (end - _start >= _timespan) { 455 result = Stream.of(_best); 456 _start = 0; 457 _best = null; 458 } else { 459 result = Stream.empty(); 460 } 461 462 return result; 463 } 464 }; 465 } 466 467}