001/* 002 * Java Genetic Algorithm Library (jenetics-7.2.0). 003 * Copyright (c) 2007-2023 Franz Wilhelmstötter 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 * Author: 018 * Franz Wilhelmstötter (franz.wilhelmstoetter@gmail.com) 019 */ 020package io.jenetics.util; 021 022import static java.util.Objects.requireNonNull; 023 024import java.util.concurrent.CancellationException; 025import java.util.concurrent.Executor; 026import java.util.concurrent.Flow; 027import java.util.concurrent.Flow.Subscriber; 028import java.util.concurrent.SubmissionPublisher; 029import java.util.concurrent.atomic.AtomicBoolean; 030import java.util.function.BiConsumer; 031import java.util.stream.Stream; 032 033import io.jenetics.internal.util.Lifecycle.ExtendedCloseable; 034 035/** 036 * This class allows creating a reactive {@link Flow.Publisher} from a given 037 * Java {@link Stream}. 038 * 039 * <pre>{@code 040 * final Stream<Long> stream = engine.stream() 041 * .limit(33) 042 * .map(EvolutionResult::generation); 043 * 044 * try (var publisher = new StreamPublisher<Long>()) { 045 * publisher.subscribe(new Subscriber<>() { 046 * private Subscription subscription; 047 * \@Override 048 * public void onSubscribe(final Subscription subscription) { 049 * (this.subscription = subscription).request(1); 050 * } 051 * \@Override 052 * public void onNext(final Long g) { 053 * System.out.println("Got new generation: " + g); 054 * subscription.request(1); 055 * } 056 * \@Override 057 * public void onError(final Throwable throwable) { 058 * } 059 * \@Override 060 * public void onComplete() { 061 * System.out.println("Evolution completed."); 062 * } 063 * }); 064 * 065 * // Attaching the stream, starts the element publishing. 066 * publisher.attach(stream); 067 * 068 * ... 069 * } 070 * }</pre> 071 * 072 * @param <T> the element type of the publisher 073 * 074 * @author <a href="mailto:franz.wilhelmstoetter@gmail.com">Franz Wilhelmstötter</a> 075 * @version 6.0 076 * @since 6.0 077 */ 078public class StreamPublisher<T> extends SubmissionPublisher<T> { 079 080 private final Object _lock = new Object(){}; 081 082 private final AtomicBoolean _proceed = new AtomicBoolean(true); 083 084 private Stream<? extends T> _stream; 085 private Thread _thread; 086 087 /** 088 * Creates a new {@code StreamPublisher} using the given {@code Executor} 089 * for async delivery to subscribers, with the given maximum buffer size for 090 * each subscriber. 091 * 092 * @param executor the executor to use for async delivery, supporting 093 * creation of at least one independent thread 094 * @param maxBufferCapacity the maximum capacity for each subscriber's buffer 095 * @param handler if non-null, procedure to invoke upon exception thrown in 096 * method {@code onNext} 097 * @throws NullPointerException if one of the arguments is {@code null} 098 * @throws IllegalArgumentException if maxBufferCapacity not positive 099 */ 100 public StreamPublisher( 101 final Executor executor, 102 final int maxBufferCapacity, 103 final BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler 104 ) { 105 super(executor, maxBufferCapacity, handler); 106 } 107 108 /** 109 * Creates a new {@code StreamPublisher} using the given {@code Executor} 110 * for async delivery to subscribers, with the given maximum buffer size for 111 * each subscriber, and no handler for Subscriber exceptions in method 112 * {@link java.util.concurrent.Flow.Subscriber#onNext(Object)}. 113 * 114 * @param executor the executor to use for async delivery, supporting 115 * creation of at least one independent thread 116 * @param maxBufferCapacity the maximum capacity for each subscriber's buffer 117 * @throws NullPointerException if the given {@code executor} is {@code null} 118 * @throws IllegalArgumentException if maxBufferCapacity not positive 119 */ 120 public StreamPublisher(final Executor executor, final int maxBufferCapacity) { 121 super(executor, maxBufferCapacity); 122 } 123 124 /** 125 * Creates a new publisher using the {@code ForkJoinPool.commonPool()} for 126 * async delivery to subscribers (unless it does not support a parallelism 127 * level of at least two, in which case, a new Thread is created to run each 128 * task), with maximum buffer capacity of {@code Flow.defaultBufferSize()}, 129 * and no handler for Subscriber exceptions in method onNext. 130 */ 131 public StreamPublisher() { 132 } 133 134 /** 135 * Attaches the given stream to the publisher. This method automatically 136 * starts the publishing of the elements read from the stream. The attached 137 * {@code stream} is closed, when {@code this} publisher is closed. 138 * 139 * @param stream the {@code stream} to attach 140 * @throws NullPointerException if the given {@code stream} is {@code null} 141 * @throws IllegalStateException if a stream is already attached to this 142 * publisher 143 */ 144 public synchronized void attach(final Stream<? extends T> stream) { 145 requireNonNull(stream); 146 147 synchronized (_lock) { 148 if (_stream != null) { 149 throw new IllegalStateException( 150 "Already attached evolution stream." 151 ); 152 } 153 154 _stream = stream.takeWhile(e -> _proceed.get()); 155 _thread = new Thread(() -> { 156 try { 157 _stream.forEach(this::submit); 158 close(); 159 } catch(CancellationException e) { 160 Thread.currentThread().interrupt(); 161 close(); 162 } catch (Throwable e) { 163 closeExceptionally(e); 164 } 165 }); 166 _thread.start(); 167 } 168 } 169 170 /** 171 * Unless already closed, issues {@code onComplete} signals to current 172 * subscribers, and disallows subsequent attempts to publish. Upon return, 173 * this method does NOT guarantee that all subscribers have already completed. 174 */ 175 @Override 176 public void close() { 177 synchronized (_lock) { 178 final var closeable = ExtendedCloseable.of( 179 () -> { if (_thread != null) _thread.interrupt(); }, 180 () -> { if (_stream != null) _stream.close(); } 181 ); 182 183 _proceed.set(false); 184 closeable.silentClose(); 185 } 186 super.close(); 187 } 188 189}