Changeset 9351 in josm


Ignore:
Timestamp:
2016-01-09T15:41:47+01:00 (9 years ago)
Author:
simon04
Message:

Refactoring: use Fork/Join Tasks for parallel execution

Location:
trunk/src/org/openstreetmap/josm
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/org/openstreetmap/josm/data/osm/MultipolygonBuilder.java

    r9243 r9351  
    1414import java.util.concurrent.Callable;
    1515import java.util.concurrent.ExecutionException;
    16 import java.util.concurrent.ExecutorService;
     16import java.util.concurrent.ForkJoinPool;
     17import java.util.concurrent.ForkJoinTask;
    1718import java.util.concurrent.Future;
     19import java.util.concurrent.RecursiveTask;
    1820
    1921import org.openstreetmap.josm.tools.Geometry;
     
    3133public class MultipolygonBuilder {
    3234
    33     private static final Pair<Integer, ExecutorService> THREAD_POOL =
    34             Utils.newThreadPool("multipolygon_creation.numberOfThreads", "multipolygon-builder-%d", Thread.NORM_PRIORITY);
     35    private static final ForkJoinPool THREAD_POOL =
     36            Utils.newForkJoinPool("multipolygon_creation.numberOfThreads", "multipolygon-builder-%d", Thread.NORM_PRIORITY);
    3537
    3638    /**
     
    303305     */
    304306    private static List<PolygonLevel> findOuterWaysMultiThread(List<JoinedPolygon> boundaryWays) {
    305         final List<PolygonLevel> result = new ArrayList<>();
    306         final List<Worker> tasks = new ArrayList<>();
    307         final int bucketsize = Math.max(32, boundaryWays.size()/THREAD_POOL.a/3);
    308         final int noBuckets = (boundaryWays.size() + bucketsize - 1) / bucketsize;
    309         final boolean singleThread = THREAD_POOL.a == 1 || noBuckets == 1;
    310         for (int i = 0; i < noBuckets; i++) {
    311             int from = i*bucketsize;
    312             int to = Math.min((i+1)*bucketsize, boundaryWays.size());
    313             List<PolygonLevel> target = singleThread ? result : new ArrayList<PolygonLevel>(to - from);
    314             tasks.add(new Worker(boundaryWays, from, to, target));
    315         }
    316         if (singleThread) {
    317             try {
    318                 for (Worker task : tasks) {
    319                     if (task.call() == null) {
    320                         return null;
    321                     }
    322                 }
    323             } catch (Exception ex) {
    324                 throw new RuntimeException(ex);
    325             }
    326         } else if (!tasks.isEmpty()) {
    327             try {
    328                 for (Future<List<PolygonLevel>> future : THREAD_POOL.b.invokeAll(tasks)) {
    329                     List<PolygonLevel> res = future.get();
    330                     if (res == null) {
    331                         return null;
    332                     }
    333                     result.addAll(res);
    334                 }
    335             } catch (InterruptedException | ExecutionException ex) {
    336                 throw new RuntimeException(ex);
    337             }
    338         }
    339         return result;
    340     }
    341 
    342     private static class Worker implements Callable<List<PolygonLevel>> {
     307        return THREAD_POOL.invoke(new Worker(boundaryWays, 0, boundaryWays.size(), new ArrayList<PolygonLevel>(),
     308                Math.max(32, boundaryWays.size() / THREAD_POOL.getParallelism() / 3)));
     309    }
     310
     311    private static class Worker extends RecursiveTask<List<PolygonLevel>> {
    343312
    344313        private final List<JoinedPolygon> input;
     
    346315        private final int to;
    347316        private final List<PolygonLevel> output;
    348 
    349         Worker(List<JoinedPolygon> input, int from, int to, List<PolygonLevel> output) {
     317        private final int directExecutionTaskSize;
     318
     319        Worker(List<JoinedPolygon> input, int from, int to, List<PolygonLevel> output, int directExecutionTaskSize) {
    350320            this.input = input;
    351321            this.from = from;
    352322            this.to = to;
    353323            this.output = output;
     324            this.directExecutionTaskSize = directExecutionTaskSize;
    354325        }
    355326
     
    407378
    408379        @Override
    409         public List<PolygonLevel> call() throws Exception {
     380        protected List<PolygonLevel> compute() {
     381            if (to - from < directExecutionTaskSize) {
     382                return computeDirectly();
     383            } else {
     384                final Collection<ForkJoinTask<List<PolygonLevel>>> tasks = new ArrayList<>();
     385                for (int fromIndex = from; fromIndex < to; fromIndex += directExecutionTaskSize) {
     386                    final List<PolygonLevel> output = new ArrayList<>();
     387                    tasks.add(new Worker(input, fromIndex, Math.min(fromIndex + directExecutionTaskSize, to), output, directExecutionTaskSize));
     388                }
     389                for (ForkJoinTask<List<PolygonLevel>> task : tasks) {
     390                    output.addAll(task.join());
     391                }
     392                return output;
     393            }
     394        }
     395
     396        List<PolygonLevel> computeDirectly() {
    410397            for (int i = from; i < to; i++) {
    411398                if (processOuterWay(0, input, output, input.get(i)) == null) {
  • trunk/src/org/openstreetmap/josm/data/osm/visitor/paint/StyledMapRenderer.java

    r9284 r9351  
    3333import java.util.List;
    3434import java.util.Map;
    35 import java.util.concurrent.Callable;
    36 import java.util.concurrent.ExecutionException;
    37 import java.util.concurrent.ExecutorService;
    38 import java.util.concurrent.Future;
     35import java.util.concurrent.ForkJoinPool;
     36import java.util.concurrent.ForkJoinTask;
     37import java.util.concurrent.RecursiveTask;
    3938
    4039import javax.swing.AbstractButton;
     
    7877import org.openstreetmap.josm.tools.Geometry.AreaAndPerimeter;
    7978import org.openstreetmap.josm.tools.ImageProvider;
    80 import org.openstreetmap.josm.tools.Pair;
    8179import org.openstreetmap.josm.tools.Utils;
    8280
     
    8785public class StyledMapRenderer extends AbstractMapRenderer {
    8886
    89     private static final Pair<Integer, ExecutorService> THREAD_POOL =
    90             Utils.newThreadPool("mappaint.StyledMapRenderer.style_creation.numberOfThreads", "styled-map-renderer-%d", Thread.NORM_PRIORITY);
     87    private static final ForkJoinPool THREAD_POOL =
     88            Utils.newForkJoinPool("mappaint.StyledMapRenderer.style_creation.numberOfThreads", "styled-map-renderer-%d", Thread.NORM_PRIORITY);
    9189
    9290    /**
     
    17611759    }
    17621760
    1763     private class ComputeStyleListWorker implements Callable<List<StyleRecord>>, Visitor {
     1761    private class ComputeStyleListWorker extends RecursiveTask<List<StyleRecord>> implements Visitor {
    17641762        private final List<? extends OsmPrimitive> input;
    1765         private final int from;
    1766         private final int to;
    17671763        private final List<StyleRecord> output;
    17681764
    17691765        private final ElemStyles styles = MapPaintStyles.getStyles();
     1766        private final int directExecutionTaskSize;
    17701767
    17711768        private final boolean drawArea = circum <= Main.pref.getInteger("mappaint.fillareas", 10000000);
     
    17761773         * Constructs a new {@code ComputeStyleListWorker}.
    17771774         * @param input the primitives to process
    1778          * @param from first index of <code>input</code> to use
    1779          * @param to last index + 1
    17801775         * @param output the list of styles to which styles will be added
     1776         * @param directExecutionTaskSize the threshold deciding whether to subdivide the tasks
    17811777         */
    1782         ComputeStyleListWorker(final List<? extends OsmPrimitive> input, int from, int to, List<StyleRecord> output) {
     1778        ComputeStyleListWorker(final List<? extends OsmPrimitive> input, List<StyleRecord> output, int directExecutionTaskSize) {
    17831779            this.input = input;
    1784             this.from = from;
    1785             this.to = to;
    17861780            this.output = output;
     1781            this.directExecutionTaskSize = directExecutionTaskSize;
    17871782            this.styles.setDrawMultipolygon(drawMultipolygon);
    17881783        }
    17891784
    17901785        @Override
    1791         public List<StyleRecord> call() throws Exception {
     1786        protected List<StyleRecord> compute() {
     1787            if (input.size() <= directExecutionTaskSize) {
     1788                return computeDirectly();
     1789            } else {
     1790                final Collection<ForkJoinTask<List<StyleRecord>>> tasks = new ArrayList<>();
     1791                for (int fromIndex = 0; fromIndex < input.size(); fromIndex += directExecutionTaskSize) {
     1792                    final int toIndex = Math.min(fromIndex + directExecutionTaskSize, input.size());
     1793                    final List<StyleRecord> output = new ArrayList<>(directExecutionTaskSize);
     1794                    tasks.add(new ComputeStyleListWorker(input.subList(fromIndex, toIndex), output, directExecutionTaskSize).fork());
     1795                }
     1796                for (ForkJoinTask<List<StyleRecord>> task : tasks) {
     1797                    output.addAll(task.join());
     1798                }
     1799                return output;
     1800            }
     1801        }
     1802
     1803        public List<StyleRecord> computeDirectly() {
    17921804            MapCSSStyleSource.STYLE_SOURCE_LOCK.readLock().lock();
    17931805            try {
    1794                 for (int i = from; i < to; i++) {
    1795                     OsmPrimitive osm = input.get(i);
     1806                for (final OsmPrimitive osm : input) {
    17961807                    if (osm.isDrawable()) {
    17971808                        osm.accept(this);
     
    18531864    }
    18541865
    1855     private class ConcurrentTasksHelper {
    1856 
    1857         private final List<StyleRecord> allStyleElems;
    1858 
    1859         ConcurrentTasksHelper(List<StyleRecord> allStyleElems) {
    1860             this.allStyleElems = allStyleElems;
    1861         }
    1862 
    1863         void process(List<? extends OsmPrimitive> prims) {
    1864             final List<ComputeStyleListWorker> tasks = new ArrayList<>();
    1865             final int bucketsize = Math.max(100, prims.size()/THREAD_POOL.a/3);
    1866             final int noBuckets = (prims.size() + bucketsize - 1) / bucketsize;
    1867             final boolean singleThread = THREAD_POOL.a == 1 || noBuckets == 1;
    1868             for (int i = 0; i < noBuckets; i++) {
    1869                 int from = i*bucketsize;
    1870                 int to = Math.min((i+1)*bucketsize, prims.size());
    1871                 List<StyleRecord> target = singleThread ? allStyleElems : new ArrayList<StyleRecord>(to - from);
    1872                 tasks.add(new ComputeStyleListWorker(prims, from, to, target));
    1873             }
    1874             if (singleThread) {
    1875                 try {
    1876                     for (ComputeStyleListWorker task : tasks) {
    1877                         task.call();
    1878                     }
    1879                 } catch (Exception ex) {
    1880                     throw new RuntimeException(ex);
    1881                 }
    1882             } else if (!tasks.isEmpty()) {
    1883                 try {
    1884                     for (Future<List<StyleRecord>> future : THREAD_POOL.b.invokeAll(tasks)) {
    1885                         allStyleElems.addAll(future.get());
    1886                     }
    1887                 } catch (InterruptedException | ExecutionException ex) {
    1888                     throw new RuntimeException(ex);
    1889                 }
    1890             }
    1891         }
    1892     }
    1893 
    18941866    @Override
    18951867    public void render(final DataSet data, boolean renderVirtualNodes, Bounds bounds) {
     
    19141886            final List<StyleRecord> allStyleElems = new ArrayList<>(nodes.size()+ways.size()+relations.size());
    19151887
    1916             ConcurrentTasksHelper helper = new ConcurrentTasksHelper(allStyleElems);
    1917 
    19181888            // Need to process all relations first.
    19191889            // Reason: Make sure, ElemStyles.getStyleCacheWithRange is
     
    19211891            // (Could be synchronized, but try to avoid this for
    19221892            // performance reasons.)
    1923             helper.process(relations);
    1924             helper.process(new CompositeList<>(nodes, ways));
     1893            THREAD_POOL.invoke(new ComputeStyleListWorker(relations, allStyleElems,
     1894                    Math.max(20, relations.size() / THREAD_POOL.getParallelism() / 3)));
     1895            THREAD_POOL.invoke(new ComputeStyleListWorker(new CompositeList<>(nodes, ways), allStyleElems,
     1896                    Math.max(100, (nodes.size() + ways.size()) / THREAD_POOL.getParallelism() / 3)));
    19251897
    19261898            if (benchmark) {
  • trunk/src/org/openstreetmap/josm/tools/Utils.java

    r9297 r9351  
    5151import java.util.concurrent.ExecutorService;
    5252import java.util.concurrent.Executors;
     53import java.util.concurrent.ForkJoinPool;
     54import java.util.concurrent.ForkJoinWorkerThread;
    5355import java.util.concurrent.ThreadFactory;
    5456import java.util.concurrent.atomic.AtomicLong;
     
    14111413
    14121414    /**
    1413      * Returns a pair containing the number of threads (n), and a thread pool (if n &gt; 1) to perform
    1414      * multi-thread computation in the context of the given preference key.
    1415      * @param pref The preference key
     1415     * Returns a {@link ForkJoinPool} with the parallelism given by the preference key.
     1416     * @param pref The preference key to determine parallelism
    14161417     * @param nameFormat see {@link #newThreadFactory(String, int)}
    14171418     * @param threadPriority see {@link #newThreadFactory(String, int)}
    1418      * @return a pair containing the number of threads (n), and a thread pool (if n &gt; 1, null otherwise)
    1419      * @since 7423
    1420      */
    1421     public static Pair<Integer, ExecutorService> newThreadPool(String pref, String nameFormat, int threadPriority) {
     1419     * @return a {@link ForkJoinPool}
     1420     */
     1421    public static ForkJoinPool newForkJoinPool(String pref, final String nameFormat, final int threadPriority) {
    14221422        int noThreads = Main.pref.getInteger(pref, Runtime.getRuntime().availableProcessors());
    1423         ExecutorService pool = noThreads <= 1 ? null : Executors.newFixedThreadPool(noThreads, newThreadFactory(nameFormat, threadPriority));
    1424         return new Pair<>(noThreads, pool);
     1423        return new ForkJoinPool(noThreads, new ForkJoinPool.ForkJoinWorkerThreadFactory() {
     1424            final AtomicLong count = new AtomicLong(0);
     1425            @Override
     1426            public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
     1427                final ForkJoinWorkerThread thread = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
     1428                thread.setName(String.format(Locale.ENGLISH, nameFormat, count.getAndIncrement()));
     1429                thread.setPriority(threadPriority);
     1430                return thread;
     1431            }
     1432        }, null, true);
    14251433    }
    14261434
Note: See TracChangeset for help on using the changeset viewer.