001/* 002 * Java Genetic Algorithm Library (jenetics-7.2.0). 003 * Copyright (c) 2007-2023 Franz Wilhelmstötter 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 * Author: 018 * Franz Wilhelmstötter (franz.wilhelmstoetter@gmail.com) 019 */ 020package io.jenetics.internal.util; 021 022import static java.lang.Math.ceil; 023import static java.lang.Math.max; 024import static java.util.Objects.requireNonNull; 025 026import java.util.ArrayList; 027import java.util.Iterator; 028import java.util.List; 029import java.util.concurrent.CancellationException; 030import java.util.concurrent.ExecutionException; 031import java.util.concurrent.Executor; 032import java.util.concurrent.ExecutorService; 033import java.util.concurrent.ForkJoinPool; 034import java.util.concurrent.ForkJoinTask; 035import java.util.concurrent.Future; 036import java.util.concurrent.FutureTask; 037 038import io.jenetics.util.Seq; 039 040/** 041 * @author <a href="mailto:franz.wilhelmstoetter@gmail.com">Franz Wilhelmstötter</a> 042 * @version 3.8 043 * @since 2.0 044 */ 045public abstract class Concurrency implements Executor, AutoCloseable { 046 047 public static final int CORES = Runtime.getRuntime().availableProcessors(); 048 049 public static final Concurrency SERIAL_EXECUTOR = new SerialConcurrency(); 050 051 public abstract void execute(final Seq<? extends Runnable> runnables); 052 053 @Override 054 public abstract void close(); 055 056 /** 057 * Return the underlying {@code Executor}, which is used for performing the 058 * actual task execution. 059 * 060 * @return the underlying {@code Executor} object 061 */ 062 public Executor getInnerExecutor() { 063 return this; 064 } 065 066 /** 067 * Return a new Concurrency object from the given executor. 068 * 069 * @param executor the underlying Executor 070 * @return a new Concurrency object 071 */ 072 public static Concurrency with(final Executor executor) { 073 if (executor instanceof ForkJoinPool e) { 074 return new ForkJoinPoolConcurrency(e); 075 } else if (executor instanceof ExecutorService e) { 076 return new ExecutorServiceConcurrency(e); 077 } else if (executor == SERIAL_EXECUTOR) { 078 return SERIAL_EXECUTOR; 079 } else { 080 return new ExecutorConcurrency(executor); 081 } 082 } 083 084 /** 085 * Return a new Concurrency object using the common ForkJoinPool. 086 * 087 * @return a new Concurrency object using the new ForkJoinPool 088 */ 089 public static Concurrency withCommonPool() { 090 return with(ForkJoinPool.commonPool()); 091 } 092 093 094 /** 095 * This Concurrency uses a ForkJoinPool. 096 */ 097 private static final class ForkJoinPoolConcurrency extends Concurrency { 098 private final List<ForkJoinTask<?>> _tasks = new ArrayList<>(); 099 private final ForkJoinPool _pool; 100 101 ForkJoinPoolConcurrency(final ForkJoinPool pool) { 102 _pool = requireNonNull(pool); 103 } 104 105 @Override 106 public void execute(final Runnable runnable) { 107 _tasks.add(_pool.submit(runnable)); 108 } 109 110 @Override 111 public void execute(final Seq<? extends Runnable> runnables) { 112 if (runnables.nonEmpty()) { 113 _tasks.add(_pool.submit(new RunnablesAction(runnables))); 114 } 115 } 116 117 @Override 118 public Executor getInnerExecutor() { 119 return _pool; 120 } 121 122 @Override 123 public void close() { 124 Concurrency.join(_tasks); 125 } 126 } 127 128 /** 129 * This Concurrency uses an ExecutorService. 130 */ 131 private static final class ExecutorServiceConcurrency extends Concurrency { 132 private final List<Future<?>> _futures = new ArrayList<>(); 133 private final ExecutorService _service; 134 135 ExecutorServiceConcurrency(final ExecutorService service) { 136 _service = requireNonNull(service); 137 } 138 139 @Override 140 public void execute(final Runnable command) { 141 _futures.add(_service.submit(command)); 142 } 143 144 @Override 145 public void execute(final Seq<? extends Runnable> runnables) { 146 if (runnables.nonEmpty()) { 147 final int[] parts = partition( 148 runnables.size(), 149 max( 150 (CORES + 1)*2, 151 (int)ceil(runnables.size()/(double)maxBatchSize()) 152 ) 153 ); 154 155 for (int i = 0; i < parts.length - 1; ++i) { 156 execute(new RunnablesRunnable(runnables, parts[i], parts[i + 1])); 157 } 158 } 159 } 160 161 @Override 162 public Executor getInnerExecutor() { 163 return _service; 164 } 165 166 @Override 167 public void close() { 168 Concurrency.join(_futures); 169 } 170 171 } 172 173 private static void join(final Iterable<? extends Future<?>> jobs) { 174 Future<?> task = null; 175 Iterator<? extends Future<?>> tasks = null; 176 try { 177 tasks = jobs.iterator(); 178 while (tasks.hasNext()) { 179 task = tasks.next(); 180 task.get(); 181 } 182 } catch (ExecutionException e) { 183 Concurrency.cancel(task, tasks); 184 final String msg = e.getCause() != null 185 ? e.getCause().getMessage() 186 : null; 187 throw (CancellationException)new CancellationException(msg) 188 .initCause(e.getCause()); 189 } catch (InterruptedException e) { 190 Thread.currentThread().interrupt(); 191 Concurrency.cancel(task, tasks); 192 final String msg = e.getMessage(); 193 throw (CancellationException)new CancellationException(msg) 194 .initCause(e); 195 } 196 } 197 198 private static void cancel( 199 final Future<?> task, 200 final Iterator<? extends Future<?>> tasks 201 ) { 202 if (task != null) { 203 task.cancel(true); 204 } 205 if (tasks != null) { 206 tasks.forEachRemaining(t -> t.cancel(true)); 207 } 208 } 209 210 /** 211 * This Concurrency uses an Executor. 212 */ 213 private static final class ExecutorConcurrency extends Concurrency { 214 private final List<FutureTask<?>> _tasks = new ArrayList<>(); 215 private final Executor _executor; 216 217 ExecutorConcurrency(final Executor executor) { 218 _executor = requireNonNull(executor); 219 } 220 221 @Override 222 public void execute(final Runnable command) { 223 final FutureTask<?> task = new FutureTask<>(command, null); 224 _tasks.add(task); 225 _executor.execute(task); 226 } 227 228 @Override 229 public void execute(final Seq<? extends Runnable> runnables) { 230 if (runnables.nonEmpty()) { 231 final int[] parts = partition( 232 runnables.size(), 233 max( 234 (CORES + 1)*2, 235 (int)ceil(runnables.size()/(double)maxBatchSize()) 236 ) 237 ); 238 239 for (int i = 0; i < parts.length - 1; ++i) { 240 execute(new RunnablesRunnable(runnables, parts[i], parts[i + 1])); 241 } 242 } 243 } 244 245 @Override 246 public void close() { 247 Concurrency.join(_tasks); 248 } 249 } 250 251 /** 252 * This Concurrency executes the runnables within the main thread. 253 */ 254 private static final class SerialConcurrency extends Concurrency { 255 256 @Override 257 public void execute(final Runnable command) { 258 command.run(); 259 } 260 261 @Override 262 public void execute(final Seq<? extends Runnable> runnables) { 263 runnables.forEach(Runnable::run); 264 } 265 266 @Override 267 public void close() { 268 } 269 } 270 271 272 /** 273 * Return a array with the indexes of the partitions of an array with the 274 * given size. The length of the returned array is {@code min(size, prts) + 1}. 275 * <p> 276 * Some examples: 277 * <pre> 278 * partition(10, 3): [0, 3, 6, 10] 279 * partition(15, 6): [0, 2, 4, 6, 9, 12, 15] 280 * partition(5, 10): [0, 1, 2, 3, 4, 5] 281 * </pre> 282 * 283 * The following examples print the start index (inclusive) and the end 284 * index (exclusive) of the {@code partition(15, 6)}. 285 * <pre>{@code 286 * int[] parts = partition(15, 6); 287 * for (int i = 0; i < parts.length - 1; ++i) { 288 * System.out.println(i + ": " + parts[i] + "\t" + parts[i + 1]); 289 * } 290 * }</pre> 291 * <pre> 292 * 0: 0 2 293 * 1: 2 4 294 * 2: 4 6 295 * 3: 6 9 296 * 4: 9 12 297 * 5: 12 15 298 * </pre> 299 * 300 * This example shows how this can be used in a concurrent environment: 301 * <pre>{@code 302 * try (final Concurrency c = Concurrency.start()) { 303 * final int[] parts = arrays.partition(population.size(), _maxThreads); 304 * 305 * for (int i = 0; i < parts.length - 1; ++i) { 306 * final int part = i; 307 * c.execute(new Runnable() { @Override public void run() { 308 * for (int j = parts[part + 1]; --j <= parts[part];) { 309 * population.get(j).evaluate(); 310 * } 311 * }}); 312 * } 313 * } 314 * }</pre> 315 * 316 * @param size the size of the array to partition. 317 * @param parts the number of parts the (virtual) array should be partitioned. 318 * @return the partition array with the length of {@code min(size, parts) + 1}. 319 * @throws IllegalArgumentException if {@code size} or {@code p} is less than one. 320 */ 321 private static int[] partition(final int size, final int parts) { 322 if (size < 1) { 323 throw new IllegalArgumentException( 324 "Size must greater than zero: " + size 325 ); 326 } 327 if (parts < 1) { 328 throw new IllegalArgumentException( 329 "Number of partitions must greater than zero: " + parts 330 ); 331 } 332 333 final int pts = Math.min(size, parts); 334 final int[] partition = new int[pts + 1]; 335 336 final int bulk = size/pts; 337 final int rest = size%pts; 338 assert (bulk*pts + rest) == size; 339 340 for (int i = 0, n = pts - rest; i < n; ++i) { 341 partition[i] = i*bulk; 342 } 343 for (int i = 0, n = rest + 1; i < n; ++i) { 344 partition[pts - rest + i] = (pts - rest)*bulk + i*(bulk + 1); 345 } 346 347 return partition; 348 } 349 350 static int maxBatchSize() { 351 return Env.maxBatchSize; 352 } 353 354 @SuppressWarnings("removal") 355 private static final class Env { 356 private static final int maxBatchSize = max( 357 java.security.AccessController.doPrivileged( 358 (java.security.PrivilegedAction<Integer>)() -> Integer.getInteger( 359 "io.jenetics.concurrency.maxBatchSize", 360 Integer.MAX_VALUE 361 )), 362 1 363 ); 364 } 365 366}