StreamPublisher.java
001 /*
002  * Java Genetic Algorithm Library (jenetics-6.2.0).
003  * Copyright (c) 2007-2021 Franz Wilhelmstötter
004  *
005  * Licensed under the Apache License, Version 2.0 (the "License");
006  * you may not use this file except in compliance with the License.
007  * You may obtain a copy of the License at
008  *
009  *      http://www.apache.org/licenses/LICENSE-2.0
010  *
011  * Unless required by applicable law or agreed to in writing, software
012  * distributed under the License is distributed on an "AS IS" BASIS,
013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014  * See the License for the specific language governing permissions and
015  * limitations under the License.
016  *
017  * Author:
018  *    Franz Wilhelmstötter (franz.wilhelmstoetter@gmail.com)
019  */
020 package io.jenetics.util;
021 
022 import static java.util.Objects.requireNonNull;
023 
024 import java.util.concurrent.CancellationException;
025 import java.util.concurrent.Executor;
026 import java.util.concurrent.Flow;
027 import java.util.concurrent.Flow.Subscriber;
028 import java.util.concurrent.SubmissionPublisher;
029 import java.util.concurrent.atomic.AtomicBoolean;
030 import java.util.function.BiConsumer;
031 import java.util.stream.Stream;
032 
033 import io.jenetics.internal.util.Lifecycle.ExtendedCloseable;
034 
035 /**
036  * This class allows to create a reactive {@link Flow.Publisher} from a given
037  * Java {@link Stream}.
038  *
039  <pre>{@code
040  * final Stream<Long> stream = engine.stream()
041  *     .limit(33)
042  *     .map(EvolutionResult::generation);
043  *
044  * try (var publisher = new StreamPublisher<Long>()) {
045  *     publisher.subscribe(new Subscriber<>() {
046  *         private Subscription subscription;
047  *         \@Override
048  *         public void onSubscribe(final Subscription subscription) {
049  *             this.subscription = subscription;
050  *             this.subscription.request(1);
051  *         }
052  *         \@Override
053  *         public void onNext(final Long g) {
054  *             System.out.println("Got new generation: " + g);
055  *             subscription.request(1);
056  *         }
057  *         \@Override
058  *         public void onError(final Throwable throwable) {
059  *         }
060  *         \@Override
061  *         public void onComplete() {
062  *             System.out.println("Evolution completed.");
063  *         }
064  *     });
065  *
066  *     // Attaching the stream, starts the element publishing.
067  *     publisher.attach(stream);
068  *
069  *     ...
070  * }
071  * }</pre>
072  *
073  @param <T> the element type of the publisher
074  *
075  @author <a href="mailto:franz.wilhelmstoetter@gmail.com">Franz Wilhelmstötter</a>
076  @version 6.0
077  @since 6.0
078  */
079 public class StreamPublisher<T> extends SubmissionPublisher<T> {
080 
081     private final Object _lock = new Object(){};
082 
083     private final AtomicBoolean _proceed = new AtomicBoolean(true);
084 
085     private Stream<? extends T> _stream;
086     private Thread _thread;
087 
088     /**
089      * Creates a new {@code StreamPublisher} using the given {@code Executor}
090      * for async delivery to subscribers, with the given maximum buffer size for
091      * each subscriber.
092      *
093      @param executor the executor to use for async delivery, supporting
094      *        creation of at least one independent thread
095      @param maxBufferCapacity the maximum capacity for each subscriber's buffer
096      @param handler if non-null, procedure to invoke upon exception thrown in
097      *        method {@code onNext}
098      @throws NullPointerException if one of the arguments is {@code null}
099      @throws IllegalArgumentException if maxBufferCapacity not positive
100      */
101     public StreamPublisher(
102         final Executor executor,
103         final int maxBufferCapacity,
104         final BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler
105     ) {
106         super(executor, maxBufferCapacity, handler);
107     }
108 
109     /**
110      * Creates a new {@code StreamPublisher} using the given {@code Executor}
111      * for async delivery to subscribers, with the given maximum buffer size for
112      * each subscriber, and no handler for Subscriber exceptions in method
113      {@link java.util.concurrent.Flow.Subscriber#onNext(Object)}.
114      *
115      @param executor the executor to use for async delivery, supporting
116      *        creation of at least one independent thread
117      @param maxBufferCapacity the maximum capacity for each subscriber's buffer
118      @throws NullPointerException if the given {@code executor} is {@code null}
119      @throws IllegalArgumentException if maxBufferCapacity not positive
120      */
121     public StreamPublisher(final Executor executor, final int maxBufferCapacity) {
122         super(executor, maxBufferCapacity);
123     }
124 
125     /**
126      * Creates a new publisher using the {@code ForkJoinPool.commonPool()} for
127      * async delivery to subscribers (unless it does not support a parallelism
128      * level of at least two, in which case, a new Thread is created to run each
129      * task), with maximum buffer capacity of {@code Flow.defaultBufferSize()},
130      * and no handler for Subscriber exceptions in method onNext.
131      */
132     public StreamPublisher() {
133     }
134 
135     /**
136      * Attaches the given stream to the publisher. This method automatically
137      * starts the publishing of the elements read from the stream. The attached
138      * {@code stream} is closed, when {@code this} publisher is closed.
139      *
140      @param stream the {@code stream} to attach
141      @throws NullPointerException if the given {@code stream} is {@code null}
142      @throws IllegalStateException if a stream is already attached to this
143      *         publisher
144      */
145     public synchronized void attach(final Stream<? extends T> stream) {
146         requireNonNull(stream);
147 
148         synchronized (_lock) {
149             if (_stream != null) {
150                 throw new IllegalStateException(
151                     "Already attached evolution stream."
152                 );
153             }
154 
155             _stream = stream.takeWhile(e -> _proceed.get());
156             _thread = new Thread(() -> {
157                 try {
158                     _stream.forEach(this::submit);
159                     close();
160                 catch(CancellationException e) {
161                     Thread.currentThread().interrupt();
162                     close();
163                 catch (Throwable e) {
164                     closeExceptionally(e);
165                 }
166             });
167             _thread.start();
168         }
169     }
170 
171     /**
172      * Unless already closed, issues {@code onComplete} signals to current
173      * subscribers, and disallows subsequent attempts to publish. Upon return,
174      * this method does NOT guarantee that all subscribers have yet completed.
175      */
176     @Override
177     public void close() {
178         synchronized (_lock) {
179             final var closeable = ExtendedCloseable.of(
180                 () -> if (_thread != null_thread.interrupt()},
181                 () -> if (_stream != null_stream.close()}
182             );
183 
184             _proceed.set(false);
185             closeable.silentClose();
186         }
187         super.close();
188     }
189 
190 }