001/* 002 * Java Genetic Algorithm Library (jenetics-8.1.0). 003 * Copyright (c) 2007-2024 Franz Wilhelmstötter 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 * Author: 018 * Franz Wilhelmstötter (franz.wilhelmstoetter@gmail.com) 019 */ 020package io.jenetics.util; 021 022import static java.util.Objects.requireNonNull; 023 024import java.util.concurrent.CancellationException; 025import java.util.concurrent.Executor; 026import java.util.concurrent.Flow; 027import java.util.concurrent.Flow.Subscriber; 028import java.util.concurrent.SubmissionPublisher; 029import java.util.concurrent.atomic.AtomicBoolean; 030import java.util.function.BiConsumer; 031import java.util.stream.Stream; 032 033import io.jenetics.internal.util.Lifecycle.Releasable; 034 035/** 036 * This class allows creating a reactive {@link Flow.Publisher} from a given 037 * Java {@link Stream}. 038 * {@snippet lang="java": 039 * final Stream<Long> stream = engine.stream() 040 * .limit(33) 041 * .map(EvolutionResult::generation); 042 * 043 * try (var publisher = new StreamPublisher<Long>()) { 044 * publisher.subscribe(new Subscriber<>() { 045 * private Subscription subscription; 046 * @Override 047 * public void onSubscribe(final Subscription subscription) { 048 * (this.subscription = subscription).request(1); 049 * } 050 * @Override 051 * public void onNext(final Long g) { 052 * System.out.println("Got new generation: " + g); 053 * subscription.request(1); 054 * } 055 * @Override 056 * public void onError(final Throwable throwable) { 057 * } 058 * @Override 059 * public void onComplete() { 060 * System.out.println("Evolution completed."); 061 * } 062 * }); 063 * 064 * // Attaching the stream, starts the element publishing. 065 * publisher.attach(stream); 066 * 067 * // ... 068 * } 069 * } 070 * 071 * @param <T> the element type of the publisher 072 * 073 * @author <a href="mailto:franz.wilhelmstoetter@gmail.com">Franz Wilhelmstötter</a> 074 * @version 6.0 075 * @since 6.0 076 */ 077public class StreamPublisher<T> extends SubmissionPublisher<T> { 078 079 private final Object _lock = new Object(){}; 080 081 private final AtomicBoolean _proceed = new AtomicBoolean(true); 082 083 private Stream<? extends T> _stream; 084 private Thread _thread; 085 086 /** 087 * Creates a new {@code StreamPublisher} using the given {@code Executor} 088 * for async delivery to subscribers, with the given maximum buffer size for 089 * each subscriber. 090 * 091 * @param executor the executor to use for async delivery, supporting 092 * creation of at least one independent thread 093 * @param maxBufferCapacity the maximum capacity for each subscriber's buffer 094 * @param handler if non-null, procedure to invoke upon exception thrown in 095 * method {@code onNext} 096 * @throws NullPointerException if one of the arguments is {@code null} 097 * @throws IllegalArgumentException if maxBufferCapacity not positive 098 */ 099 public StreamPublisher( 100 final Executor executor, 101 final int maxBufferCapacity, 102 final BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler 103 ) { 104 super(executor, maxBufferCapacity, handler); 105 } 106 107 /** 108 * Creates a new {@code StreamPublisher} using the given {@code Executor} 109 * for async delivery to subscribers, with the given maximum buffer size for 110 * each subscriber, and no handler for Subscriber exceptions in method 111 * {@link java.util.concurrent.Flow.Subscriber#onNext(Object)}. 112 * 113 * @param executor the executor to use for async delivery, supporting 114 * creation of at least one independent thread 115 * @param maxBufferCapacity the maximum capacity for each subscriber's buffer 116 * @throws NullPointerException if the given {@code executor} is {@code null} 117 * @throws IllegalArgumentException if maxBufferCapacity not positive 118 */ 119 public StreamPublisher(final Executor executor, final int maxBufferCapacity) { 120 super(executor, maxBufferCapacity); 121 } 122 123 /** 124 * Creates a new publisher using the {@code ForkJoinPool.commonPool()} for 125 * async delivery to subscribers (unless it does not support a parallelism 126 * level of at least two, in which case, a new Thread is created to run each 127 * task), with maximum buffer capacity of {@code Flow.defaultBufferSize()}, 128 * and no handler for Subscriber exceptions in method onNext. 129 */ 130 public StreamPublisher() { 131 } 132 133 /** 134 * Attaches the given stream to the publisher. This method automatically 135 * starts the publishing of the elements read from the stream. The attached 136 * {@code stream} is closed, when {@code this} publisher is closed. 137 * 138 * @param stream the {@code stream} to attach 139 * @throws NullPointerException if the given {@code stream} is {@code null} 140 * @throws IllegalStateException if a stream is already attached to this 141 * publisher 142 */ 143 public synchronized void attach(final Stream<? extends T> stream) { 144 requireNonNull(stream); 145 146 synchronized (_lock) { 147 if (_stream != null) { 148 throw new IllegalStateException( 149 "Already attached evolution stream." 150 ); 151 } 152 153 _stream = stream.takeWhile(e -> _proceed.get()); 154 _thread = new Thread(() -> { 155 try { 156 _stream.forEach(this::submit); 157 close(); 158 } catch(CancellationException e) { 159 Thread.currentThread().interrupt(); 160 close(); 161 } catch (Throwable e) { 162 closeExceptionally(e); 163 } 164 }); 165 _thread.start(); 166 } 167 } 168 169 /** 170 * Unless already closed, issues {@code onComplete} signals to current 171 * subscribers, and disallows later attempts to publish. Upon return, this 172 * method does NOT guarantee that all subscribers have already completed. 173 */ 174 @Override 175 public void close() { 176 synchronized (_lock) { 177 final var closeable = Releasable.of( 178 () -> { if (_thread != null) _thread.interrupt(); }, 179 () -> { if (_stream != null) _stream.close(); } 180 ); 181 182 _proceed.set(false); 183 closeable.silentRelease(); 184 } 185 super.close(); 186 } 187 188}