001/*
002 * Java Genetic Algorithm Library (jenetics-8.0.0).
003 * Copyright (c) 2007-2024 Franz Wilhelmstötter
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 * Author:
018 *    Franz Wilhelmstötter (franz.wilhelmstoetter@gmail.com)
019 */
020package io.jenetics.util;
021
022import static java.util.Objects.requireNonNull;
023
024import java.util.concurrent.CancellationException;
025import java.util.concurrent.Executor;
026import java.util.concurrent.Flow;
027import java.util.concurrent.Flow.Subscriber;
028import java.util.concurrent.SubmissionPublisher;
029import java.util.concurrent.atomic.AtomicBoolean;
030import java.util.function.BiConsumer;
031import java.util.stream.Stream;
032
033import io.jenetics.internal.util.Lifecycle.ExtendedCloseable;
034
035/**
036 * This class allows creating a reactive {@link Flow.Publisher} from a given
037 * Java {@link Stream}.
038 *
039 * {@snippet lang="java":
040 * final Stream<Long> stream = engine.stream()
041 *     .limit(33)
042 *     .map(EvolutionResult::generation);
043 *
044 * try (var publisher = new StreamPublisher<Long>()) {
045 *     publisher.subscribe(new Subscriber<>() {
046 *         private Subscription subscription;
047 *         @Override
048 *         public void onSubscribe(final Subscription subscription) {
049 *             (this.subscription = subscription).request(1);
050 *         }
051 *         @Override
052 *         public void onNext(final Long g) {
053 *             System.out.println("Got new generation: " + g);
054 *             subscription.request(1);
055 *         }
056 *         @Override
057 *         public void onError(final Throwable throwable) {
058 *         }
059 *         @Override
060 *         public void onComplete() {
061 *             System.out.println("Evolution completed.");
062 *         }
063 *     });
064 *
065 *     // Attaching the stream, starts the element publishing.
066 *     publisher.attach(stream);
067 *
068 *     // ...
069 * }
070 * }
071 *
072 * @param <T> the element type of the publisher
073 *
074 * @author <a href="mailto:franz.wilhelmstoetter@gmail.com">Franz Wilhelmstötter</a>
075 * @version 6.0
076 * @since 6.0
077 */
078public class StreamPublisher<T> extends SubmissionPublisher<T> {
079
080        private final Object _lock = new Object(){};
081
082        private final AtomicBoolean _proceed = new AtomicBoolean(true);
083
084        private Stream<? extends T> _stream;
085        private Thread _thread;
086
087        /**
088         * Creates a new {@code StreamPublisher} using the given {@code Executor}
089         * for async delivery to subscribers, with the given maximum buffer size for
090         * each subscriber.
091         *
092         * @param executor the executor to use for async delivery, supporting
093         *        creation of at least one independent thread
094         * @param maxBufferCapacity the maximum capacity for each subscriber's buffer
095         * @param handler if non-null, procedure to invoke upon exception thrown in
096         *        method {@code onNext}
097         * @throws NullPointerException if one of the arguments is {@code null}
098         * @throws IllegalArgumentException if maxBufferCapacity not positive
099         */
100        public StreamPublisher(
101                final Executor executor,
102                final int maxBufferCapacity,
103                final BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler
104        ) {
105                super(executor, maxBufferCapacity, handler);
106        }
107
108        /**
109         * Creates a new {@code StreamPublisher} using the given {@code Executor}
110         * for async delivery to subscribers, with the given maximum buffer size for
111         * each subscriber, and no handler for Subscriber exceptions in method
112         * {@link java.util.concurrent.Flow.Subscriber#onNext(Object)}.
113         *
114         * @param executor the executor to use for async delivery, supporting
115         *        creation of at least one independent thread
116         * @param maxBufferCapacity the maximum capacity for each subscriber's buffer
117         * @throws NullPointerException if the given {@code executor} is {@code null}
118         * @throws IllegalArgumentException if maxBufferCapacity not positive
119         */
120        public StreamPublisher(final Executor executor, final int maxBufferCapacity) {
121                super(executor, maxBufferCapacity);
122        }
123
124        /**
125         * Creates a new publisher using the {@code ForkJoinPool.commonPool()} for
126         * async delivery to subscribers (unless it does not support a parallelism
127         * level of at least two, in which case, a new Thread is created to run each
128         * task), with maximum buffer capacity of {@code Flow.defaultBufferSize()},
129         * and no handler for Subscriber exceptions in method onNext.
130         */
131        public StreamPublisher() {
132        }
133
134        /**
135         * Attaches the given stream to the publisher. This method automatically
136         * starts the publishing of the elements read from the stream. The attached
137         * {@code stream} is closed, when {@code this} publisher is closed.
138         *
139         * @param stream the {@code stream} to attach
140         * @throws NullPointerException if the given {@code stream} is {@code null}
141         * @throws IllegalStateException if a stream is already attached to this
142         *         publisher
143         */
144        public synchronized void attach(final Stream<? extends T> stream) {
145                requireNonNull(stream);
146
147                synchronized (_lock) {
148                        if (_stream != null) {
149                                throw new IllegalStateException(
150                                        "Already attached evolution stream."
151                                );
152                        }
153
154                        _stream = stream.takeWhile(e -> _proceed.get());
155                        _thread = new Thread(() -> {
156                                try {
157                                        _stream.forEach(this::submit);
158                                        close();
159                                } catch(CancellationException e) {
160                                        Thread.currentThread().interrupt();
161                                        close();
162                                } catch (Throwable e) {
163                                        closeExceptionally(e);
164                                }
165                        });
166                        _thread.start();
167                }
168        }
169
170        /**
171         * Unless already closed, issues {@code onComplete} signals to current
172         * subscribers, and disallows subsequent attempts to publish. Upon return,
173         * this method does NOT guarantee that all subscribers have already completed.
174         */
175        @Override
176        public void close() {
177                synchronized (_lock) {
178                        final var closeable = ExtendedCloseable.of(
179                                () -> { if (_thread != null) _thread.interrupt(); },
180                                () -> { if (_stream != null) _stream.close(); }
181                        );
182
183                        _proceed.set(false);
184                        closeable.silentClose();
185                }
186                super.close();
187        }
188
189}