StreamPublisher.java
001 /*
002  * Java Genetic Algorithm Library (jenetics-6.0.0).
003  * Copyright (c) 2007-2020 Franz Wilhelmstötter
004  *
005  * Licensed under the Apache License, Version 2.0 (the "License");
006  * you may not use this file except in compliance with the License.
007  * You may obtain a copy of the License at
008  *
009  *      http://www.apache.org/licenses/LICENSE-2.0
010  *
011  * Unless required by applicable law or agreed to in writing, software
012  * distributed under the License is distributed on an "AS IS" BASIS,
013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014  * See the License for the specific language governing permissions and
015  * limitations under the License.
016  *
017  * Author:
018  *    Franz Wilhelmstötter (franz.wilhelmstoetter@gmail.com)
019  */
020 package io.jenetics.util;
021 
022 import static java.util.Objects.requireNonNull;
023 
024 import java.util.concurrent.CancellationException;
025 import java.util.concurrent.Executor;
026 import java.util.concurrent.Flow;
027 import java.util.concurrent.Flow.Subscriber;
028 import java.util.concurrent.SubmissionPublisher;
029 import java.util.concurrent.atomic.AtomicBoolean;
030 import java.util.function.BiConsumer;
031 import java.util.stream.Stream;
032 
033 /**
034  * This class allows to create a reactive {@link Flow.Publisher} from a given
035  * Java {@link Stream}.
036  *
037  <pre>{@code
038  * final Stream<Long> stream = engine.stream()
039  *     .limit(33)
040  *     .map(EvolutionResult::generation);
041  *
042  * try (var publisher = new StreamPublisher<Long>()) {
043  *     publisher.subscribe(new Subscriber<>() {
044  *         private Subscription subscription;
045  *         \@Override
046  *         public void onSubscribe(final Subscription subscription) {
047  *             this.subscription = subscription;
048  *             this.subscription.request(1);
049  *         }
050  *         \@Override
051  *         public void onNext(final Long g) {
052  *             System.out.println("Got new generation: " + g);
053  *             subscription.request(1);
054  *         }
055  *         \@Override
056  *         public void onError(final Throwable throwable) {
057  *         }
058  *         \@Override
059  *         public void onComplete() {
060  *             System.out.println("Evolution completed.");
061  *         }
062  *     });
063  *
064  *     // Attaching the stream, starts the element publishing.
065  *     publisher.attach(stream);
066  *
067  *     ...
068  * }
069  * }</pre>
070  *
071  @param <T> the element type of the publisher
072  *
073  @author <a href="mailto:franz.wilhelmstoetter@gmail.com">Franz Wilhelmstötter</a>
074  @version 6.0
075  @since 6.0
076  */
077 public class StreamPublisher<T> extends SubmissionPublisher<T> {
078 
079     private final Object _lock = new Object(){};
080 
081     private final AtomicBoolean _proceed = new AtomicBoolean(true);
082 
083     private Stream<? extends T> _stream;
084     private Thread _thread;
085 
086     /**
087      * Creates a new {@code StreamPublisher} using the given {@code Executor}
088      * for async delivery to subscribers, with the given maximum buffer size for
089      * each subscriber.
090      *
091      @param executor the executor to use for async delivery, supporting
092      *        creation of at least one independent thread
093      @param maxBufferCapacity the maximum capacity for each subscriber's buffer
094      @param handler if non-null, procedure to invoke upon exception thrown in
095      *        method {@code onNext}
096      @throws NullPointerException if one of the arguments is {@code null}
097      @throws IllegalArgumentException if maxBufferCapacity not positive
098      */
099     public StreamPublisher(
100         final Executor executor,
101         final int maxBufferCapacity,
102         final BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler
103     ) {
104         super(executor, maxBufferCapacity, handler);
105     }
106 
107     /**
108      * Creates a new {@code StreamPublisher} using the given {@code Executor}
109      * for async delivery to subscribers, with the given maximum buffer size for
110      * each subscriber, and no handler for Subscriber exceptions in method
111      {@link java.util.concurrent.Flow.Subscriber#onNext(Object)}.
112      *
113      @param executor the executor to use for async delivery, supporting
114      *        creation of at least one independent thread
115      @param maxBufferCapacity the maximum capacity for each subscriber's buffer
116      @throws NullPointerException if the given {@code executor} is {@code null}
117      @throws IllegalArgumentException if maxBufferCapacity not positive
118      */
119     public StreamPublisher(final Executor executor, final int maxBufferCapacity) {
120         super(executor, maxBufferCapacity);
121     }
122 
123     /**
124      * Creates a new publisher using the {@code ForkJoinPool.commonPool()} for
125      * async delivery to subscribers (unless it does not support a parallelism
126      * level of at least two, in which case, a new Thread is created to run each
127      * task), with maximum buffer capacity of {@code Flow.defaultBufferSize()},
128      * and no handler for Subscriber exceptions in method onNext.
129      */
130     public StreamPublisher() {
131     }
132 
133     /**
134      * Attaches the given stream to the publisher. This method automatically
135      * starts the publishing of the elements read from the stream.
136      *
137      @param stream the {@code stream} to attach
138      @throws NullPointerException if the given {@code stream} is {@code null}
139      @throws IllegalStateException if a stream is already attached to this
140      *         publisher
141      */
142     public synchronized void attach(final Stream<? extends T> stream) {
143         requireNonNull(stream);
144 
145         synchronized (_lock) {
146             if (_stream != null) {
147                 throw new IllegalStateException(
148                     "Already attached evolution stream."
149                 );
150             }
151 
152             _stream = stream.takeWhile(e -> _proceed.get());
153             _thread = new Thread(() -> {
154                 try {
155                     _stream.forEach(this::submit);
156                     close();
157                 catch(CancellationException e) {
158                     Thread.currentThread().interrupt();
159                     close();
160                 catch (Throwable e) {
161                     closeExceptionally(e);
162                 }
163             });
164             _thread.start();
165         }
166     }
167 
168     /**
169      * Unless already closed, issues {@code onComplete} signals to current
170      * subscribers, and disallows subsequent attempts to publish. Upon return,
171      * this method does NOT guarantee that all subscribers have yet completed.
172      */
173     @Override
174     public void close() {
175         synchronized (_lock) {
176             _proceed.set(false);
177             if (_thread != null) {
178                 _thread.interrupt();
179             }
180         }
181         super.close();
182     }
183 
184 }