001 /*
002 * Java Genetic Algorithm Library (jenetics-6.0.0).
003 * Copyright (c) 2007-2020 Franz Wilhelmstötter
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 * Author:
018 * Franz Wilhelmstötter (franz.wilhelmstoetter@gmail.com)
019 */
020 package io.jenetics.util;
021
022 import static java.util.Objects.requireNonNull;
023
024 import java.util.concurrent.CancellationException;
025 import java.util.concurrent.Executor;
026 import java.util.concurrent.Flow;
027 import java.util.concurrent.Flow.Subscriber;
028 import java.util.concurrent.SubmissionPublisher;
029 import java.util.concurrent.atomic.AtomicBoolean;
030 import java.util.function.BiConsumer;
031 import java.util.stream.Stream;
032
033 /**
034 * This class allows to create a reactive {@link Flow.Publisher} from a given
035 * Java {@link Stream}.
036 *
037 * <pre>{@code
038 * final Stream<Long> stream = engine.stream()
039 * .limit(33)
040 * .map(EvolutionResult::generation);
041 *
042 * try (var publisher = new StreamPublisher<Long>()) {
043 * publisher.subscribe(new Subscriber<>() {
044 * private Subscription subscription;
045 * \@Override
046 * public void onSubscribe(final Subscription subscription) {
047 * this.subscription = subscription;
048 * this.subscription.request(1);
049 * }
050 * \@Override
051 * public void onNext(final Long g) {
052 * System.out.println("Got new generation: " + g);
053 * subscription.request(1);
054 * }
055 * \@Override
056 * public void onError(final Throwable throwable) {
057 * }
058 * \@Override
059 * public void onComplete() {
060 * System.out.println("Evolution completed.");
061 * }
062 * });
063 *
064 * // Attaching the stream, starts the element publishing.
065 * publisher.attach(stream);
066 *
067 * ...
068 * }
069 * }</pre>
070 *
071 * @param <T> the element type of the publisher
072 *
073 * @author <a href="mailto:franz.wilhelmstoetter@gmail.com">Franz Wilhelmstötter</a>
074 * @version 6.0
075 * @since 6.0
076 */
077 public class StreamPublisher<T> extends SubmissionPublisher<T> {
078
079 private final Object _lock = new Object(){};
080
081 private final AtomicBoolean _proceed = new AtomicBoolean(true);
082
083 private Stream<? extends T> _stream;
084 private Thread _thread;
085
086 /**
087 * Creates a new {@code StreamPublisher} using the given {@code Executor}
088 * for async delivery to subscribers, with the given maximum buffer size for
089 * each subscriber.
090 *
091 * @param executor the executor to use for async delivery, supporting
092 * creation of at least one independent thread
093 * @param maxBufferCapacity the maximum capacity for each subscriber's buffer
094 * @param handler if non-null, procedure to invoke upon exception thrown in
095 * method {@code onNext}
096 * @throws NullPointerException if one of the arguments is {@code null}
097 * @throws IllegalArgumentException if maxBufferCapacity not positive
098 */
099 public StreamPublisher(
100 final Executor executor,
101 final int maxBufferCapacity,
102 final BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler
103 ) {
104 super(executor, maxBufferCapacity, handler);
105 }
106
107 /**
108 * Creates a new {@code StreamPublisher} using the given {@code Executor}
109 * for async delivery to subscribers, with the given maximum buffer size for
110 * each subscriber, and no handler for Subscriber exceptions in method
111 * {@link java.util.concurrent.Flow.Subscriber#onNext(Object)}.
112 *
113 * @param executor the executor to use for async delivery, supporting
114 * creation of at least one independent thread
115 * @param maxBufferCapacity the maximum capacity for each subscriber's buffer
116 * @throws NullPointerException if the given {@code executor} is {@code null}
117 * @throws IllegalArgumentException if maxBufferCapacity not positive
118 */
119 public StreamPublisher(final Executor executor, final int maxBufferCapacity) {
120 super(executor, maxBufferCapacity);
121 }
122
123 /**
124 * Creates a new publisher using the {@code ForkJoinPool.commonPool()} for
125 * async delivery to subscribers (unless it does not support a parallelism
126 * level of at least two, in which case, a new Thread is created to run each
127 * task), with maximum buffer capacity of {@code Flow.defaultBufferSize()},
128 * and no handler for Subscriber exceptions in method onNext.
129 */
130 public StreamPublisher() {
131 }
132
133 /**
134 * Attaches the given stream to the publisher. This method automatically
135 * starts the publishing of the elements read from the stream.
136 *
137 * @param stream the {@code stream} to attach
138 * @throws NullPointerException if the given {@code stream} is {@code null}
139 * @throws IllegalStateException if a stream is already attached to this
140 * publisher
141 */
142 public synchronized void attach(final Stream<? extends T> stream) {
143 requireNonNull(stream);
144
145 synchronized (_lock) {
146 if (_stream != null) {
147 throw new IllegalStateException(
148 "Already attached evolution stream."
149 );
150 }
151
152 _stream = stream.takeWhile(e -> _proceed.get());
153 _thread = new Thread(() -> {
154 try {
155 _stream.forEach(this::submit);
156 close();
157 } catch(CancellationException e) {
158 Thread.currentThread().interrupt();
159 close();
160 } catch (Throwable e) {
161 closeExceptionally(e);
162 }
163 });
164 _thread.start();
165 }
166 }
167
168 /**
169 * Unless already closed, issues {@code onComplete} signals to current
170 * subscribers, and disallows subsequent attempts to publish. Upon return,
171 * this method does NOT guarantee that all subscribers have yet completed.
172 */
173 @Override
174 public void close() {
175 synchronized (_lock) {
176 _proceed.set(false);
177 if (_thread != null) {
178 _thread.interrupt();
179 }
180 }
181 super.close();
182 }
183
184 }
|