001 /*
002 * Java Genetic Algorithm Library (jenetics-4.1.0).
003 * Copyright (c) 2007-2018 Franz Wilhelmstötter
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 * Author:
018 * Franz Wilhelmstötter (franz.wilhelmstoetter@gmail.com)
019 */
020 package io.jenetics.stat;
021
022 import static java.lang.Double.compare;
023 import static java.lang.String.format;
024 import static java.util.Objects.requireNonNull;
025
026 import java.util.Arrays;
027 import java.util.Objects;
028 import java.util.function.DoubleConsumer;
029 import java.util.function.ToDoubleFunction;
030 import java.util.stream.Collector;
031
032 /**
033 * Implementation of the quantile estimation algorithm published by
034 * <p>
035 * <strong>Raj JAIN and Imrich CHLAMTAC</strong>:
036 * <em>
037 * The P<sup>2</sup> Algorithm for Dynamic Calculation of Quantiles and
038 * Histograms Without Storing Observations
039 * </em>
040 * <br>
041 * [<a href="http://www.cse.wustl.edu/~jain/papers/ftp/psqr.pdf">Communications
042 * of the ACM; October 1985, Volume 28, Number 10</a>]
043 * <p>
044 * This class is designed to work with (though does not require) streams. For
045 * example, you can compute the quantile with:
046 * <pre>{@code
047 * final DoubleStream stream = ...
048 * final Quantile quantile = stream.collect(
049 * () -> new Quantile(0.23),
050 * Quantile::accept,
051 * Quantile::combine
052 * );
053 * }</pre>
054 *
055 * @implNote
056 * This implementation is not thread safe. However, it is safe to use on a
057 * parallel stream, because the parallel implementation of
058 * {@link java.util.stream.Stream#collect Stream.collect()} provides the
059 * necessary partitioning, isolation, and merging of results for safe and
060 * efficient parallel execution.
061 * <br>
062 * Using this class in the {@code collect} method of an parallel stream can
063 * lead to an reduced accuracy of the quantile value. Since this implementation
064 * is an estimation algorithm, combining the estimations will only work for
065 * large streams ({@code size >> 1000}).
066 *
067 * @see <a href="http://en.wikipedia.org/wiki/Quantile">Wikipedia: Quantile</a>
068 *
069 * @author <a href="mailto:franz.wilhelmstoetter@gmail.com">Franz Wilhelmstötter</a>
070 * @since 1.0
071 * @version 3.7
072 */
073 public class Quantile implements DoubleConsumer {
074
075 private long _samples = 0;
076
077 // The desired quantile.
078 private final double _quantile;
079
080 // Marker heights.
081 private final double[] _q = {0, 0, 0, 0, 0};
082
083 // Marker positions.
084 private final double[] _n = {0, 0, 0, 0, 0};
085
086 // Desired marker positions.
087 private final double[] _nn = {0, 0, 0};
088
089 // Desired marker position increments.
090 private final double[] _dn = {0, 0, 0};
091
092 private boolean _initialized;
093
094 /**
095 * Create a new quantile accumulator with the given value.
096 *
097 * @param quantile the wished quantile value.
098 * @throws IllegalArgumentException if the {@code quantile} is not in the
099 * range {@code [0, 1]}.
100 */
101 public Quantile(final double quantile) {
102 _quantile = quantile;
103 init(quantile);
104 }
105
106 private void init(final double quantile) {
107 check(quantile);
108
109 Arrays.fill(_q, 0);
110 Arrays.fill(_n, 0);
111 Arrays.fill(_nn, 0);
112 Arrays.fill(_dn, 0);
113
114 _n[0] = -1.0;
115 _q[2] = 0.0;
116 _initialized =
117 compare(quantile, 0.0) == 0 ||
118 compare(quantile, 1.0) == 0;
119
120 _samples = 0;
121 }
122
123 private static void check(final double quantile) {
124 if (quantile < 0.0 || quantile > 1) {
125 throw new IllegalArgumentException(format(
126 "Quantile (%s) not in the valid range of [0, 1]", quantile
127 ));
128 }
129 }
130
131 /**
132 * Reset this object to its initial state.
133 */
134 public void reset() {
135 init(_quantile);
136 }
137
138 /**
139 * Return the <em>quantile</em> {@code this} object has been parametrized
140 * with.
141 *
142 * @since 3.1
143 *
144 * @return the <em>quantile</em> {@code this} object has been parametrized
145 * with
146 */
147 public double getQuantile() {
148 return _quantile;
149 }
150
151 /**
152 * Return the computed quantile value.
153 *
154 * @return the quantile value.
155 */
156 public double getValue() {
157 return _q[2];
158 }
159
160 /**
161 * Return the number of samples the quantile value was calculated of.
162 *
163 *
164 * @return the number of samples the quantile value was calculated of
165 */
166 public long getSamples() {
167 return _samples;
168 }
169
170 @Override
171 public void accept(final double value) {
172 if (!_initialized) {
173 initialize(value);
174 } else {
175 update(value);
176 }
177
178 ++_samples;
179 }
180
181 /**
182 * Combine two {@code Quantile} objects.
183 *
184 * @since 3.1
185 *
186 * @param other the other {@code Quantile} object to combine
187 * @return {@code this}
188 * @throws java.lang.NullPointerException if the {@code other} object is
189 * {@code null}.
190 * @throws java.lang.IllegalArgumentException if the {@link #getQuantile}
191 * of the {@code other} object differs from {@code this} one.
192 */
193 public Quantile combine(final Quantile other) {
194 if (_quantile != other._quantile) {
195 throw new IllegalArgumentException(format(
196 "Can't perform combine, the quantile are not equal: %s != %s",
197 _quantile, other._quantile
198 ));
199 }
200
201 _samples += other._samples;
202
203 if (_quantile == 0.0) {
204 _q[2] = Math.min(_q[2], other._q[2]);
205 } else if (_quantile == 1.0) {
206 _q[2] = Math.max(_q[2], other._q[2]);
207 } else {
208 // Combine the marker positions.
209 _n[1] += other._n[1];
210 _n[2] += other._n[2];
211 _n[3] += other._n[3];
212 _n[4] += other._n[4];
213
214 // Combine the marker height.
215 _q[0] = Math.min(_q[0], other._q[0]);
216 _q[1] = (_q[1] + other._q[1])*0.5;
217 _q[2] = (_q[2] + other._q[2])*0.5;
218 _q[3] = (_q[3] + other._q[3])*0.5;
219 _q[4] = Math.max(_q[4], other._q[4]);
220
221 // Combine position of markers.
222 _nn[0] += other._nn[0];
223 _nn[1] += other._nn[1];
224 _nn[2] += other._nn[2];
225
226 adjustMarkerHeights();
227 }
228
229 return this;
230 }
231
232 private void initialize(double value) {
233 if (_n[0] < 0.0) {
234 _n[0] = 0.0;
235 _q[0] = value;
236 } else if (_n[1] == 0.0) {
237 _n[1] = 1.0;
238 _q[1] = value;
239 } else if (_n[2] == 0.0) {
240 _n[2] = 2.0;
241 _q[2] = value;
242 } else if (_n[3] == 0.0) {
243 _n[3] = 3.0;
244 _q[3] = value;
245 } else if (_n[4] == 0.0) {
246 _n[4] = 4.0;
247 _q[4] = value;
248 }
249
250 if (_n[4] != 0.0) {
251 Arrays.sort(_q);
252
253 _nn[0] = 2.0*_quantile;
254 _nn[1] = 4.0*_quantile;
255 _nn[2] = 2.0*_quantile + 2.0;
256
257 _dn[0] = _quantile/2.0;
258 _dn[1] = _quantile;
259 _dn[2] = (1.0 + _quantile)/2.0;
260
261 _initialized = true;
262 }
263 }
264
265 private void update(double value) {
266 assert _initialized;
267
268 // If min or max, handle as special case; otherwise, ...
269 if (_quantile == 0.0) {
270 if (value < _q[2]) {
271 _q[2] = value;
272 }
273 } else if (_quantile == 1.0) {
274 if (value > _q[2]) {
275 _q[2] = value;
276 }
277 } else {
278 // Increment marker locations and update min and max.
279 if (value < _q[0]) {
280 ++_n[1]; ++_n[2]; ++_n[3]; ++_n[4]; _q[0] = value;
281 } else if (value < _q[1]) {
282 ++_n[1]; ++_n[2]; ++_n[3]; ++_n[4];
283 } else if (value < _q[2]) {
284 ++_n[2]; ++_n[3]; ++_n[4];
285 } else if (value < _q[3]) {
286 ++_n[3]; ++_n[4];
287 } else if (value < _q[4]) {
288 ++_n[4];
289 } else {
290 ++_n[4]; _q[4] = value;
291 }
292
293 // Increment positions of markers k + 1
294 _nn[0] += _dn[0];
295 _nn[1] += _dn[1];
296 _nn[2] += _dn[2];
297
298 adjustMarkerHeights();
299 }
300 }
301
302 // Adjust heights of markers 0 to 2 if necessary
303 private void adjustMarkerHeights() {
304 double mm = _n[1] - 1.0;
305 double mp = _n[1] + 1.0;
306 if (_nn[0] >= mp && _n[2] > mp) {
307 _q[1] = qPlus(mp, _n[0], _n[1], _n[2], _q[0], _q[1], _q[2]);
308 _n[1] = mp;
309 } else if (_nn[0] <= mm && _n[0] < mm) {
310 _q[1] = qMinus(mm, _n[0], _n[1], _n[2], _q[0], _q[1], _q[2]);
311 _n[1] = mm;
312 }
313
314 mm = _n[2] - 1.0;
315 mp = _n[2] + 1.0;
316 if (_nn[1] >= mp && _n[3] > mp) {
317 _q[2] = qPlus(mp, _n[1], _n[2], _n[3], _q[1], _q[2], _q[3]);
318 _n[2] = mp;
319 } else if (_nn[1] <= mm && _n[1] < mm) {
320 _q[2] = qMinus(mm, _n[1], _n[2], _n[3], _q[1], _q[2], _q[3]);
321 _n[2] = mm;
322 }
323
324 mm = _n[3] - 1.0;
325 mp = _n[3] + 1.0;
326 if (_nn[2] >= mp && _n[4] > mp) {
327 _q[3] = qPlus(mp, _n[2], _n[3], _n[4], _q[2], _q[3], _q[4]);
328 _n[3] = mp;
329 } else if (_nn[2] <= mm && _n[2] < mm) {
330 _q[3] = qMinus(mm, _n[2], _n[3], _n[4], _q[2], _q[3], _q[4]);
331 _n[3] = mm;
332 }
333 }
334
335 private static double qPlus(
336 final double mp,
337 final double m0,
338 final double m1,
339 final double m2,
340 final double q0,
341 final double q1,
342 final double q2
343 ) {
344 double result =
345 q1 +
346 ((mp - m0)*(q2 - q1)/(m2 - m1) +
347 (m2 - mp)*(q1 - q0)/(m1 - m0))/(m2 - m0);
348
349 if (result > q2) {
350 result = q1 + (q2 - q1)/(m2 - m1);
351 }
352
353 return result;
354 }
355
356 private static double qMinus(
357 final double mm,
358 final double m0,
359 final double m1,
360 final double m2,
361 final double q0,
362 final double q1,
363 final double q2
364 ) {
365 double result =
366 q1 -
367 ((mm - m0)*(q2 - q1)/(m2 - m1) +
368 (m2 - mm)*(q1 - q0)/(m1 - m0))/(m2 - m0);
369
370 if (q0 > result) {
371 result = q1 + (q0 - q1)/(m0 - m1);
372 }
373
374 return result;
375 }
376
377 /**
378 * Compares the state of two {@code Quantile} objects. This is
379 * a replacement for the {@link #equals(Object)} which is not advisable to
380 * implement for this mutable object. If two object have the same state, it
381 * has still the same state when updated with the same value.
382 * <pre>{@code
383 * final Quantile q1 = ...;
384 * final Quantile q2 = ...;
385 *
386 * if (q1.sameState(q2)) {
387 * final double value = random.nextDouble();
388 * q1.accept(value);
389 * q2.accept(value);
390 *
391 * assert q1.sameState(q2);
392 * assert q2.sameState(q1);
393 * assert q1.sameState(q1);
394 * }
395 * }</pre>
396 *
397 * @since 3.7
398 *
399 * @param other the other object for the test
400 * @return {@code true} the {@code this} and the {@code other} objects have
401 * the same state, {@code false} otherwise
402 */
403 public boolean sameState(final Quantile other) {
404 return Objects.equals(_quantile, other._quantile) &&
405 Arrays.equals(_dn, other._dn) &&
406 Arrays.equals(_n, other._n) &&
407 Arrays.equals(_nn, other._nn) &&
408 Arrays.equals(_q, other._q);
409 }
410
411 @Override
412 public String toString() {
413 return format(
414 "%s[samples=%d, quantile=%f]",
415 getClass().getSimpleName(), getSamples(), getValue()
416 );
417 }
418
419
420 static Quantile median() {
421 return new Quantile(0.5);
422 }
423
424 /**
425 * Return a {@code Collector} which applies an double-producing mapping
426 * function to each input element, and returns quantiles for the resulting
427 * values.
428 *
429 * <pre>{@code
430 * final Stream<SomeObject> stream = ...
431 * final Quantile quantile = stream
432 * .collect(toQuantile(0.25, v -> v.doubleValue()));
433 * }</pre>
434 *
435 * @param quantile the wished quantile value.
436 * @param mapper a mapping function to apply to each element
437 * @param <T> the type of the input elements
438 * @return a {@code Collector} implementing the quantiles reduction
439 * @throws java.lang.NullPointerException if the given {@code mapper} is
440 * {@code null}
441 * @throws IllegalArgumentException if the {@code quantile} is not in the
442 * range {@code [0, 1]}.
443 */
444 public static <T> Collector<T, ?, Quantile> toQuantile(
445 final double quantile,
446 final ToDoubleFunction<? super T> mapper
447 ) {
448 check(quantile);
449 requireNonNull(mapper);
450
451 return Collector.of(
452 () -> new Quantile(quantile),
453 (r, t) -> r.accept(mapper.applyAsDouble(t)),
454 Quantile::combine
455 );
456 }
457
458 /**
459 * Return a {@code Collector} which applies an double-producing mapping
460 * function to each input element, and returns the median for the resulting
461 * values.
462 *
463 * <pre>{@code
464 * final Stream<SomeObject> stream = ...
465 * final Quantile median = stream.collect(toMedian(v -> v.doubleValue()));
466 * }</pre>
467 *
468 * @param mapper a mapping function to apply to each element
469 * @param <T> the type of the input elements
470 * @return a {@code Collector} implementing the quantiles reduction
471 * @throws java.lang.NullPointerException if the given {@code mapper} is
472 * {@code null}
473 */
474 public static <T> Collector<T, ?, Quantile> toMedian(
475 final ToDoubleFunction<? super T> mapper
476 ) {
477 requireNonNull(mapper);
478
479 return Collector.of(
480 Quantile::median,
481 (r, t) -> r.accept(mapper.applyAsDouble(t)),
482 Quantile::combine
483 );
484 }
485
486 }
|