001/*
002 * Java Genetic Algorithm Library (jenetics-7.1.0).
003 * Copyright (c) 2007-2022 Franz Wilhelmstötter
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 * Author:
018 *    Franz Wilhelmstötter (franz.wilhelmstoetter@gmail.com)
019 */
020package io.jenetics.internal.util;
021
022import static java.lang.Math.ceil;
023import static java.lang.Math.max;
024import static java.util.Objects.requireNonNull;
025
026import java.util.ArrayList;
027import java.util.Iterator;
028import java.util.List;
029import java.util.concurrent.CancellationException;
030import java.util.concurrent.ExecutionException;
031import java.util.concurrent.Executor;
032import java.util.concurrent.ExecutorService;
033import java.util.concurrent.ForkJoinPool;
034import java.util.concurrent.ForkJoinTask;
035import java.util.concurrent.Future;
036import java.util.concurrent.FutureTask;
037
038import io.jenetics.util.Seq;
039
040/**
041 * @author <a href="mailto:franz.wilhelmstoetter@gmail.com">Franz Wilhelmstötter</a>
042 * @version 3.8
043 * @since 2.0
044 */
045public abstract class Concurrency implements Executor, AutoCloseable {
046
047        public static final int CORES = Runtime.getRuntime().availableProcessors();
048
049        public static final Concurrency SERIAL_EXECUTOR = new SerialConcurrency();
050
051        public abstract void execute(final Seq<? extends Runnable> runnables);
052
053        @Override
054        public abstract void close();
055
056        /**
057         * Return the underlying {@code Executor}, which is used for performing the
058         * actual task execution.
059         *
060         * @return the underlying {@code Executor} object
061         */
062        public Executor getInnerExecutor() {
063                return this;
064        }
065
066        /**
067         * Return an new Concurrency object from the given executor.
068         *
069         * @param executor the underlying Executor
070         * @return a new Concurrency object
071         */
072        public static Concurrency with(final Executor executor) {
073                if (executor instanceof ForkJoinPool e) {
074                        return new ForkJoinPoolConcurrency(e);
075                } else if (executor instanceof ExecutorService e) {
076                        return new ExecutorServiceConcurrency(e);
077                } else if (executor == SERIAL_EXECUTOR) {
078                        return SERIAL_EXECUTOR;
079                } else {
080                        return new ExecutorConcurrency(executor);
081                }
082        }
083
084        /**
085         * Return a new Concurrency object using the common ForkJoinPool.
086         *
087         * @return a new Concurrency object using the new ForkJoinPool
088         */
089        public static Concurrency withCommonPool() {
090                return with(ForkJoinPool.commonPool());
091        }
092
093
094        /**
095         * This Concurrency uses a ForkJoinPool.
096         */
097        private static final class ForkJoinPoolConcurrency extends Concurrency {
098                private final List<ForkJoinTask<?>> _tasks = new ArrayList<>();
099                private final ForkJoinPool _pool;
100
101                ForkJoinPoolConcurrency(final ForkJoinPool pool) {
102                        _pool = requireNonNull(pool);
103                }
104
105                @Override
106                public void execute(final Runnable runnable) {
107                        _tasks.add(_pool.submit(runnable));
108                }
109
110                @Override
111                public void execute(final Seq<? extends Runnable> runnables) {
112                        if (runnables.nonEmpty()) {
113                                _tasks.add(_pool.submit(new RunnablesAction(runnables)));
114                        }
115                }
116
117                @Override
118                public Executor getInnerExecutor() {
119                        return _pool;
120                }
121
122                @Override
123                public void close() {
124                        Concurrency.join(_tasks);
125                }
126        }
127
128        /**
129         * This Concurrency uses an ExecutorService.
130         */
131        private static final class ExecutorServiceConcurrency extends Concurrency {
132                private final List<Future<?>> _futures = new ArrayList<>();
133                private final ExecutorService _service;
134
135                ExecutorServiceConcurrency(final ExecutorService service) {
136                        _service = requireNonNull(service);
137                }
138
139                @Override
140                public void execute(final Runnable command) {
141                        _futures.add(_service.submit(command));
142                }
143
144                @Override
145                public void execute(final Seq<? extends Runnable> runnables) {
146                        if (runnables.nonEmpty()) {
147                                final int[] parts = partition(
148                                        runnables.size(),
149                                        max(
150                                                (CORES + 1)*2,
151                                                (int)ceil(runnables.size()/(double)Env.maxBatchSize)
152                                        )
153                                );
154
155                                for (int i = 0; i < parts.length - 1; ++i) {
156                                        execute(new RunnablesRunnable(runnables, parts[i], parts[i + 1]));
157                                }
158                        }
159                }
160
161                @Override
162                public Executor getInnerExecutor() {
163                        return _service;
164                }
165
166                @Override
167                public void close() {
168                        Concurrency.join(_futures);
169                }
170
171        }
172
173        private static void join(final Iterable<? extends Future<?>> jobs) {
174                Future<?> task = null;
175                Iterator<? extends Future<?>> tasks = null;
176                try {
177                        tasks = jobs.iterator();
178                        while (tasks.hasNext()) {
179                                task = tasks.next();
180                                task.get();
181                        }
182                } catch (ExecutionException e) {
183                        Concurrency.cancel(task, tasks);
184                        final String msg = e.getCause() != null
185                                ? e.getCause().getMessage()
186                                : null;
187                        throw (CancellationException)new CancellationException(msg)
188                                .initCause(e.getCause());
189                } catch (InterruptedException e) {
190                        Thread.currentThread().interrupt();
191                        Concurrency.cancel(task, tasks);
192                        final String msg = e.getMessage();
193                        throw (CancellationException)new CancellationException(msg)
194                                .initCause(e);
195                }
196        }
197
198        private static void cancel(
199                final Future<?> task,
200                final Iterator<? extends Future<?>> tasks
201        ) {
202                if (task != null) {
203                        task.cancel(true);
204                }
205                if (tasks != null) {
206                        tasks.forEachRemaining(t -> t.cancel(true));
207                }
208        }
209
210        /**
211         * This Concurrency uses an Executor.
212         */
213        private static final class ExecutorConcurrency extends Concurrency {
214                private final List<FutureTask<?>> _tasks = new ArrayList<>();
215                private final Executor _executor;
216
217                ExecutorConcurrency(final Executor executor) {
218                        _executor = requireNonNull(executor);
219                }
220
221                @Override
222                public void execute(final Runnable command) {
223                        final FutureTask<?> task = new FutureTask<>(command, null);
224                        _tasks.add(task);
225                        _executor.execute(task);
226                }
227
228                @Override
229                public void execute(final Seq<? extends Runnable> runnables) {
230                        if (runnables.nonEmpty()) {
231                                final int[] parts = partition(
232                                        runnables.size(),
233                                        max(
234                                                (CORES + 1)*2,
235                                                (int)ceil(runnables.size()/(double)Env.maxBatchSize)
236                                        )
237                                );
238
239                                for (int i = 0; i < parts.length - 1; ++i) {
240                                        execute(new RunnablesRunnable(runnables, parts[i], parts[i + 1]));
241                                }
242                        }
243                }
244
245                @Override
246                public void close() {
247                        Concurrency.join(_tasks);
248                }
249        }
250
251        /**
252         * This Concurrency executes the runnables within the main thread.
253         */
254        private static final class SerialConcurrency extends Concurrency {
255
256                @Override
257                public void execute(final Runnable command) {
258                        command.run();
259                }
260
261                @Override
262                public void execute(final Seq<? extends Runnable> runnables) {
263                        runnables.forEach(Runnable::run);
264                }
265
266                @Override
267                public void close() {
268                }
269        }
270
271
272        /**
273         * Return a array with the indexes of the partitions of an array with the
274         * given size. The length of the returned array is {@code min(size, prts) + 1}.
275         * <p>
276         * Some examples:
277         * <pre>
278         *       partition(10, 3): [0, 3, 6, 10]
279         *       partition(15, 6): [0, 2, 4, 6, 9, 12, 15]
280         *       partition(5, 10): [0, 1, 2, 3, 4, 5]
281         * </pre>
282         *
283         * The following examples prints the start index (inclusive) and the end
284         * index (exclusive) of the {@code partition(15, 6)}.
285         * <pre>{@code
286         * int[] parts = partition(15, 6);
287         * for (int i = 0; i < parts.length - 1; ++i) {
288         *     System.out.println(i + ": " + parts[i] + "\t" + parts[i + 1]);
289         * }
290         * }</pre>
291         * <pre>
292         *       0: 0   2
293         *       1: 2   4
294         *       2: 4   6
295         *       3: 6   9
296         *       4: 9   12
297         *       5: 12  15
298         * </pre>
299         *
300         * This example shows how this can be used in an concurrent environment:
301         * <pre>{@code
302         * try (final Concurrency c = Concurrency.start()) {
303         *     final int[] parts = arrays.partition(population.size(), _maxThreads);
304         *
305         *     for (int i = 0; i < parts.length - 1; ++i) {
306         *         final int part = i;
307         *         c.execute(new Runnable() { @Override public void run() {
308         *             for (int j = parts[part + 1]; --j <= parts[part];) {
309         *                 population.get(j).evaluate();
310         *             }
311         *         }});
312         *     }
313         * }
314         * }</pre>
315         *
316         * @param size the size of the array to partition.
317         * @param parts the number of parts the (virtual) array should be partitioned.
318         * @return the partition array with the length of {@code min(size, parts) + 1}.
319         * @throws IllegalArgumentException if {@code size} or {@code p} is less than one.
320         */
321        private static int[] partition(final int size, final int parts) {
322                if (size < 1) {
323                        throw new IllegalArgumentException(
324                                "Size must greater than zero: " + size
325                        );
326                }
327                if (parts < 1) {
328                        throw new IllegalArgumentException(
329                                "Number of partitions must greater than zero: " + parts
330                        );
331                }
332
333                final int pts = Math.min(size, parts);
334                final int[] partition = new int[pts + 1];
335
336                final int bulk = size/pts;
337                final int rest = size%pts;
338                assert (bulk*pts + rest) == size;
339
340                for (int i = 0, n = pts - rest; i < n; ++i) {
341                        partition[i] = i*bulk;
342                }
343                for (int i = 0, n = rest + 1; i < n; ++i) {
344                        partition[pts - rest + i] = (pts - rest)*bulk + i*(bulk + 1);
345                }
346
347                return partition;
348        }
349
350        @SuppressWarnings("removal")
351        private static final class Env {
352                private static final int maxBatchSize = max(
353                        java.security.AccessController.doPrivileged(
354                                (java.security.PrivilegedAction<Integer>)() -> Integer.getInteger(
355                                        "io.jenetics.concurrency.maxBatchSize",
356                                        Integer.MAX_VALUE
357                                )),
358                        1
359                );
360        }
361
362}