001 /*
002 * Java Genetic Algorithm Library (jenetics-6.2.0).
003 * Copyright (c) 2007-2021 Franz Wilhelmstötter
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 * Author:
018 * Franz Wilhelmstötter (franz.wilhelmstoetter@gmail.com)
019 */
020 package io.jenetics.util;
021
022 import static java.util.Objects.requireNonNull;
023
024 import java.util.concurrent.CancellationException;
025 import java.util.concurrent.Executor;
026 import java.util.concurrent.Flow;
027 import java.util.concurrent.Flow.Subscriber;
028 import java.util.concurrent.SubmissionPublisher;
029 import java.util.concurrent.atomic.AtomicBoolean;
030 import java.util.function.BiConsumer;
031 import java.util.stream.Stream;
032
033 import io.jenetics.internal.util.Lifecycle.ExtendedCloseable;
034
035 /**
036 * This class allows to create a reactive {@link Flow.Publisher} from a given
037 * Java {@link Stream}.
038 *
039 * <pre>{@code
040 * final Stream<Long> stream = engine.stream()
041 * .limit(33)
042 * .map(EvolutionResult::generation);
043 *
044 * try (var publisher = new StreamPublisher<Long>()) {
045 * publisher.subscribe(new Subscriber<>() {
046 * private Subscription subscription;
047 * \@Override
048 * public void onSubscribe(final Subscription subscription) {
049 * this.subscription = subscription;
050 * this.subscription.request(1);
051 * }
052 * \@Override
053 * public void onNext(final Long g) {
054 * System.out.println("Got new generation: " + g);
055 * subscription.request(1);
056 * }
057 * \@Override
058 * public void onError(final Throwable throwable) {
059 * }
060 * \@Override
061 * public void onComplete() {
062 * System.out.println("Evolution completed.");
063 * }
064 * });
065 *
066 * // Attaching the stream, starts the element publishing.
067 * publisher.attach(stream);
068 *
069 * ...
070 * }
071 * }</pre>
072 *
073 * @param <T> the element type of the publisher
074 *
075 * @author <a href="mailto:franz.wilhelmstoetter@gmail.com">Franz Wilhelmstötter</a>
076 * @version 6.0
077 * @since 6.0
078 */
079 public class StreamPublisher<T> extends SubmissionPublisher<T> {
080
081 private final Object _lock = new Object(){};
082
083 private final AtomicBoolean _proceed = new AtomicBoolean(true);
084
085 private Stream<? extends T> _stream;
086 private Thread _thread;
087
088 /**
089 * Creates a new {@code StreamPublisher} using the given {@code Executor}
090 * for async delivery to subscribers, with the given maximum buffer size for
091 * each subscriber.
092 *
093 * @param executor the executor to use for async delivery, supporting
094 * creation of at least one independent thread
095 * @param maxBufferCapacity the maximum capacity for each subscriber's buffer
096 * @param handler if non-null, procedure to invoke upon exception thrown in
097 * method {@code onNext}
098 * @throws NullPointerException if one of the arguments is {@code null}
099 * @throws IllegalArgumentException if maxBufferCapacity not positive
100 */
101 public StreamPublisher(
102 final Executor executor,
103 final int maxBufferCapacity,
104 final BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler
105 ) {
106 super(executor, maxBufferCapacity, handler);
107 }
108
109 /**
110 * Creates a new {@code StreamPublisher} using the given {@code Executor}
111 * for async delivery to subscribers, with the given maximum buffer size for
112 * each subscriber, and no handler for Subscriber exceptions in method
113 * {@link java.util.concurrent.Flow.Subscriber#onNext(Object)}.
114 *
115 * @param executor the executor to use for async delivery, supporting
116 * creation of at least one independent thread
117 * @param maxBufferCapacity the maximum capacity for each subscriber's buffer
118 * @throws NullPointerException if the given {@code executor} is {@code null}
119 * @throws IllegalArgumentException if maxBufferCapacity not positive
120 */
121 public StreamPublisher(final Executor executor, final int maxBufferCapacity) {
122 super(executor, maxBufferCapacity);
123 }
124
125 /**
126 * Creates a new publisher using the {@code ForkJoinPool.commonPool()} for
127 * async delivery to subscribers (unless it does not support a parallelism
128 * level of at least two, in which case, a new Thread is created to run each
129 * task), with maximum buffer capacity of {@code Flow.defaultBufferSize()},
130 * and no handler for Subscriber exceptions in method onNext.
131 */
132 public StreamPublisher() {
133 }
134
135 /**
136 * Attaches the given stream to the publisher. This method automatically
137 * starts the publishing of the elements read from the stream. The attached
138 * {@code stream} is closed, when {@code this} publisher is closed.
139 *
140 * @param stream the {@code stream} to attach
141 * @throws NullPointerException if the given {@code stream} is {@code null}
142 * @throws IllegalStateException if a stream is already attached to this
143 * publisher
144 */
145 public synchronized void attach(final Stream<? extends T> stream) {
146 requireNonNull(stream);
147
148 synchronized (_lock) {
149 if (_stream != null) {
150 throw new IllegalStateException(
151 "Already attached evolution stream."
152 );
153 }
154
155 _stream = stream.takeWhile(e -> _proceed.get());
156 _thread = new Thread(() -> {
157 try {
158 _stream.forEach(this::submit);
159 close();
160 } catch(CancellationException e) {
161 Thread.currentThread().interrupt();
162 close();
163 } catch (Throwable e) {
164 closeExceptionally(e);
165 }
166 });
167 _thread.start();
168 }
169 }
170
171 /**
172 * Unless already closed, issues {@code onComplete} signals to current
173 * subscribers, and disallows subsequent attempts to publish. Upon return,
174 * this method does NOT guarantee that all subscribers have yet completed.
175 */
176 @Override
177 public void close() {
178 synchronized (_lock) {
179 final var closeable = ExtendedCloseable.of(
180 () -> { if (_thread != null) _thread.interrupt(); },
181 () -> { if (_stream != null) _stream.close(); }
182 );
183
184 _proceed.set(false);
185 closeable.silentClose();
186 }
187 super.close();
188 }
189
190 }
|