001/*
002 * Java Genetic Algorithm Library (jenetics-8.1.0).
003 * Copyright (c) 2007-2024 Franz Wilhelmstötter
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 * Author:
018 *    Franz Wilhelmstötter (franz.wilhelmstoetter@gmail.com)
019 */
020package io.jenetics.util;
021
022import static java.util.Objects.requireNonNull;
023
024import java.util.concurrent.CancellationException;
025import java.util.concurrent.Executor;
026import java.util.concurrent.Flow;
027import java.util.concurrent.Flow.Subscriber;
028import java.util.concurrent.SubmissionPublisher;
029import java.util.concurrent.atomic.AtomicBoolean;
030import java.util.function.BiConsumer;
031import java.util.stream.Stream;
032
033import io.jenetics.internal.util.Lifecycle.Releasable;
034
035/**
036 * This class allows creating a reactive {@link Flow.Publisher} from a given
037 * Java {@link Stream}.
038 * {@snippet lang="java":
039 * final Stream<Long> stream = engine.stream()
040 *     .limit(33)
041 *     .map(EvolutionResult::generation);
042 *
043 * try (var publisher = new StreamPublisher<Long>()) {
044 *     publisher.subscribe(new Subscriber<>() {
045 *         private Subscription subscription;
046 *         @Override
047 *         public void onSubscribe(final Subscription subscription) {
048 *             (this.subscription = subscription).request(1);
049 *         }
050 *         @Override
051 *         public void onNext(final Long g) {
052 *             System.out.println("Got new generation: " + g);
053 *             subscription.request(1);
054 *         }
055 *         @Override
056 *         public void onError(final Throwable throwable) {
057 *         }
058 *         @Override
059 *         public void onComplete() {
060 *             System.out.println("Evolution completed.");
061 *         }
062 *     });
063 *
064 *     // Attaching the stream, starts the element publishing.
065 *     publisher.attach(stream);
066 *
067 *     // ...
068 * }
069 * }
070 *
071 * @param <T> the element type of the publisher
072 *
073 * @author <a href="mailto:franz.wilhelmstoetter@gmail.com">Franz Wilhelmstötter</a>
074 * @version 6.0
075 * @since 6.0
076 */
077public class StreamPublisher<T> extends SubmissionPublisher<T> {
078
079        private final Object _lock = new Object(){};
080
081        private final AtomicBoolean _proceed = new AtomicBoolean(true);
082
083        private Stream<? extends T> _stream;
084        private Thread _thread;
085
086        /**
087         * Creates a new {@code StreamPublisher} using the given {@code Executor}
088         * for async delivery to subscribers, with the given maximum buffer size for
089         * each subscriber.
090         *
091         * @param executor the executor to use for async delivery, supporting
092         *        creation of at least one independent thread
093         * @param maxBufferCapacity the maximum capacity for each subscriber's buffer
094         * @param handler if non-null, procedure to invoke upon exception thrown in
095         *        method {@code onNext}
096         * @throws NullPointerException if one of the arguments is {@code null}
097         * @throws IllegalArgumentException if maxBufferCapacity not positive
098         */
099        public StreamPublisher(
100                final Executor executor,
101                final int maxBufferCapacity,
102                final BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler
103        ) {
104                super(executor, maxBufferCapacity, handler);
105        }
106
107        /**
108         * Creates a new {@code StreamPublisher} using the given {@code Executor}
109         * for async delivery to subscribers, with the given maximum buffer size for
110         * each subscriber, and no handler for Subscriber exceptions in method
111         * {@link java.util.concurrent.Flow.Subscriber#onNext(Object)}.
112         *
113         * @param executor the executor to use for async delivery, supporting
114         *        creation of at least one independent thread
115         * @param maxBufferCapacity the maximum capacity for each subscriber's buffer
116         * @throws NullPointerException if the given {@code executor} is {@code null}
117         * @throws IllegalArgumentException if maxBufferCapacity not positive
118         */
119        public StreamPublisher(final Executor executor, final int maxBufferCapacity) {
120                super(executor, maxBufferCapacity);
121        }
122
123        /**
124         * Creates a new publisher using the {@code ForkJoinPool.commonPool()} for
125         * async delivery to subscribers (unless it does not support a parallelism
126         * level of at least two, in which case, a new Thread is created to run each
127         * task), with maximum buffer capacity of {@code Flow.defaultBufferSize()},
128         * and no handler for Subscriber exceptions in method onNext.
129         */
130        public StreamPublisher() {
131        }
132
133        /**
134         * Attaches the given stream to the publisher. This method automatically
135         * starts the publishing of the elements read from the stream. The attached
136         * {@code stream} is closed, when {@code this} publisher is closed.
137         *
138         * @param stream the {@code stream} to attach
139         * @throws NullPointerException if the given {@code stream} is {@code null}
140         * @throws IllegalStateException if a stream is already attached to this
141         *         publisher
142         */
143        public synchronized void attach(final Stream<? extends T> stream) {
144                requireNonNull(stream);
145
146                synchronized (_lock) {
147                        if (_stream != null) {
148                                throw new IllegalStateException(
149                                        "Already attached evolution stream."
150                                );
151                        }
152
153                        _stream = stream.takeWhile(e -> _proceed.get());
154                        _thread = new Thread(() -> {
155                                try {
156                                        _stream.forEach(this::submit);
157                                        close();
158                                } catch(CancellationException e) {
159                                        Thread.currentThread().interrupt();
160                                        close();
161                                } catch (Throwable e) {
162                                        closeExceptionally(e);
163                                }
164                        });
165                        _thread.start();
166                }
167        }
168
169        /**
170         * Unless already closed, issues {@code onComplete} signals to current
171         * subscribers, and disallows later attempts to publish. Upon return, this
172         * method does NOT guarantee that all subscribers have already completed.
173         */
174        @Override
175        public void close() {
176                synchronized (_lock) {
177                        final var closeable = Releasable.of(
178                                () -> { if (_thread != null) _thread.interrupt(); },
179                                () -> { if (_stream != null) _stream.close(); }
180                        );
181
182                        _proceed.set(false);
183                        closeable.silentRelease();
184                }
185                super.close();
186        }
187
188}