StreamPublisher.java
001 /*
002  * Java Genetic Algorithm Library (jenetics-6.3.0).
003  * Copyright (c) 2007-2021 Franz Wilhelmstötter
004  *
005  * Licensed under the Apache License, Version 2.0 (the "License");
006  * you may not use this file except in compliance with the License.
007  * You may obtain a copy of the License at
008  *
009  *      http://www.apache.org/licenses/LICENSE-2.0
010  *
011  * Unless required by applicable law or agreed to in writing, software
012  * distributed under the License is distributed on an "AS IS" BASIS,
013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014  * See the License for the specific language governing permissions and
015  * limitations under the License.
016  *
017  * Author:
018  *    Franz Wilhelmstötter (franz.wilhelmstoetter@gmail.com)
019  */
020 package io.jenetics.util;
021 
022 import static java.util.Objects.requireNonNull;
023 
024 import java.util.concurrent.CancellationException;
025 import java.util.concurrent.Executor;
026 import java.util.concurrent.Flow;
027 import java.util.concurrent.Flow.Subscriber;
028 import java.util.concurrent.SubmissionPublisher;
029 import java.util.concurrent.atomic.AtomicBoolean;
030 import java.util.function.BiConsumer;
031 import java.util.stream.Stream;
032 
033 import io.jenetics.internal.util.Lifecycle.ExtendedCloseable;
034 
035 /**
036  * This class allows to create a reactive {@link Flow.Publisher} from a given
037  * Java {@link Stream}.
038  *
039  <pre>{@code
040  * final Stream<Long> stream = engine.stream()
041  *     .limit(33)
042  *     .map(EvolutionResult::generation);
043  *
044  * try (var publisher = new StreamPublisher<Long>()) {
045  *     publisher.subscribe(new Subscriber<>() {
046  *         private Subscription subscription;
047  *         \@Override
048  *         public void onSubscribe(final Subscription subscription) {
049  *             (this.subscription = subscription).request(1);
050  *         }
051  *         \@Override
052  *         public void onNext(final Long g) {
053  *             System.out.println("Got new generation: " + g);
054  *             subscription.request(1);
055  *         }
056  *         \@Override
057  *         public void onError(final Throwable throwable) {
058  *         }
059  *         \@Override
060  *         public void onComplete() {
061  *             System.out.println("Evolution completed.");
062  *         }
063  *     });
064  *
065  *     // Attaching the stream, starts the element publishing.
066  *     publisher.attach(stream);
067  *
068  *     ...
069  * }
070  * }</pre>
071  *
072  @param <T> the element type of the publisher
073  *
074  @author <a href="mailto:franz.wilhelmstoetter@gmail.com">Franz Wilhelmstötter</a>
075  @version 6.0
076  @since 6.0
077  */
078 public class StreamPublisher<T> extends SubmissionPublisher<T> {
079 
080     private final Object _lock = new Object(){};
081 
082     private final AtomicBoolean _proceed = new AtomicBoolean(true);
083 
084     private Stream<? extends T> _stream;
085     private Thread _thread;
086 
087     /**
088      * Creates a new {@code StreamPublisher} using the given {@code Executor}
089      * for async delivery to subscribers, with the given maximum buffer size for
090      * each subscriber.
091      *
092      @param executor the executor to use for async delivery, supporting
093      *        creation of at least one independent thread
094      @param maxBufferCapacity the maximum capacity for each subscriber's buffer
095      @param handler if non-null, procedure to invoke upon exception thrown in
096      *        method {@code onNext}
097      @throws NullPointerException if one of the arguments is {@code null}
098      @throws IllegalArgumentException if maxBufferCapacity not positive
099      */
100     public StreamPublisher(
101         final Executor executor,
102         final int maxBufferCapacity,
103         final BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler
104     ) {
105         super(executor, maxBufferCapacity, handler);
106     }
107 
108     /**
109      * Creates a new {@code StreamPublisher} using the given {@code Executor}
110      * for async delivery to subscribers, with the given maximum buffer size for
111      * each subscriber, and no handler for Subscriber exceptions in method
112      {@link java.util.concurrent.Flow.Subscriber#onNext(Object)}.
113      *
114      @param executor the executor to use for async delivery, supporting
115      *        creation of at least one independent thread
116      @param maxBufferCapacity the maximum capacity for each subscriber's buffer
117      @throws NullPointerException if the given {@code executor} is {@code null}
118      @throws IllegalArgumentException if maxBufferCapacity not positive
119      */
120     public StreamPublisher(final Executor executor, final int maxBufferCapacity) {
121         super(executor, maxBufferCapacity);
122     }
123 
124     /**
125      * Creates a new publisher using the {@code ForkJoinPool.commonPool()} for
126      * async delivery to subscribers (unless it does not support a parallelism
127      * level of at least two, in which case, a new Thread is created to run each
128      * task), with maximum buffer capacity of {@code Flow.defaultBufferSize()},
129      * and no handler for Subscriber exceptions in method onNext.
130      */
131     public StreamPublisher() {
132     }
133 
134     /**
135      * Attaches the given stream to the publisher. This method automatically
136      * starts the publishing of the elements read from the stream. The attached
137      * {@code stream} is closed, when {@code this} publisher is closed.
138      *
139      @param stream the {@code stream} to attach
140      @throws NullPointerException if the given {@code stream} is {@code null}
141      @throws IllegalStateException if a stream is already attached to this
142      *         publisher
143      */
144     public synchronized void attach(final Stream<? extends T> stream) {
145         requireNonNull(stream);
146 
147         synchronized (_lock) {
148             if (_stream != null) {
149                 throw new IllegalStateException(
150                     "Already attached evolution stream."
151                 );
152             }
153 
154             _stream = stream.takeWhile(e -> _proceed.get());
155             _thread = new Thread(() -> {
156                 try {
157                     _stream.forEach(this::submit);
158                     close();
159                 catch(CancellationException e) {
160                     Thread.currentThread().interrupt();
161                     close();
162                 catch (Throwable e) {
163                     closeExceptionally(e);
164                 }
165             });
166             _thread.start();
167         }
168     }
169 
170     /**
171      * Unless already closed, issues {@code onComplete} signals to current
172      * subscribers, and disallows subsequent attempts to publish. Upon return,
173      * this method does NOT guarantee that all subscribers have yet completed.
174      */
175     @Override
176     public void close() {
177         synchronized (_lock) {
178             final var closeable = ExtendedCloseable.of(
179                 () -> if (_thread != null_thread.interrupt()},
180                 () -> if (_stream != null_stream.close()}
181             );
182 
183             _proceed.set(false);
184             closeable.silentClose();
185         }
186         super.close();
187     }
188 
189 }