Changeset 14261 in josm


Ignore:
Timestamp:
2018-09-19T21:57:27+02:00 (6 years ago)
Author:
wiktorn
Message:

Use corePoolSize = maxPoolSize and unbounded queue

Remove hack with calling setCorePoolSize to spawn new threads when already
corePoolSize is running. For bigger sizes of the ThreadPool setCorePoolSize took
a long time and blocked EDT thread. Now we will have always `maximum concurren
downloads` threads running for TMS layers, but this is not a big issue, as the
thread pool is shared between imagery layers.

As long as number of threads will not reach the maximum concurrent downloads
the maximum concurrent downloads per host limit will not be enforced (first n
downloads).

See: #16734

Location:
trunk
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/org/openstreetmap/josm/data/cache/HostLimitQueue.java

    r12865 r14261  
    99import java.util.concurrent.LinkedBlockingDeque;
    1010import java.util.concurrent.Semaphore;
    11 import java.util.concurrent.ThreadPoolExecutor;
    1211import java.util.concurrent.TimeUnit;
    1312
     
    3433    private final int hostLimit;
    3534
    36     private ThreadPoolExecutor executor;
    37 
    38     private int corePoolSize;
    39 
    40     private int maximumPoolSize;
    41 
    4235    /**
    4336     * Creates an unbounded queue
     
    4639    public HostLimitQueue(int hostLimit) {
    4740        super(); // create unbounded queue
     41        this.hostLimit = hostLimit;
     42    }
     43
     44    /**
     45     * Creates bounded queue
     46     * @param hostLimit how many parallel calls to host to allow
     47     * @param queueLimit how deep the queue should be
     48     */
     49    public HostLimitQueue(int hostLimit, int queueLimit) {
     50        super(queueLimit); // create bounded queue
    4851        this.hostLimit = hostLimit;
    4952    }
     
    117120    }
    118121
    119     /**
    120      * Set the executor for which this queue works. It's needed to spawn new threads.
    121      * See: http://stackoverflow.com/questions/9622599/java-threadpoolexecutor-strategy-direct-handoff-with-queue#
    122      *
    123      * @param executor executor for which this queue works
    124      */
    125     public void setExecutor(ThreadPoolExecutor executor) {
    126         this.executor = executor;
    127         this.maximumPoolSize = executor.getMaximumPoolSize();
    128         this.corePoolSize = executor.getCorePoolSize();
    129     }
    130 
    131     @Override
    132     public boolean offer(Runnable e) {
    133         if (!super.offer(e)) {
    134             return false;
    135         }
    136 
    137         if (executor != null) {
    138             // See: http://stackoverflow.com/questions/9622599/java-threadpoolexecutor-strategy-direct-handoff-with-queue#
    139             // force spawn of a thread if not reached maximum
    140             int currentPoolSize = executor.getPoolSize();
    141             if (currentPoolSize < maximumPoolSize
    142                     && currentPoolSize >= corePoolSize) {
    143                 executor.setCorePoolSize(currentPoolSize + 1);
    144                 executor.setCorePoolSize(corePoolSize);
    145             }
    146         }
    147         return true;
    148     }
    149 
    150122    private Semaphore getSemaphore(JCSCachedTileLoaderJob<?, ?> job) {
    151123        String host;
  • trunk/src/org/openstreetmap/josm/data/imagery/TMSCachedTileLoader.java

    r13733 r14261  
    6868     */
    6969    public static ThreadPoolExecutor getNewThreadPoolExecutor(String nameFormat, int workers) {
    70         HostLimitQueue workQueue = new HostLimitQueue(HOST_LIMIT.get().intValue());
     70        return getNewThreadPoolExecutor(nameFormat, workers, HOST_LIMIT.get().intValue());
     71    }
     72
     73    /**
     74     * @param nameFormat see {@link Utils#newThreadFactory(String, int)}
     75     * @param workers number of worker thread to keep
     76     * @param hostLimit number of concurrent downloads per host allowed
     77     * @return new ThreadPoolExecutor that will use a @see HostLimitQueue based queue
     78     */
     79    public static ThreadPoolExecutor getNewThreadPoolExecutor(String nameFormat, int workers, int hostLimit) {
    7180        ThreadPoolExecutor executor = new ThreadPoolExecutor(
    72                 0, // 0 so for unused thread pools threads will eventually die, freeing also the threadpool
    73                 workers, // do not this number of threads
    74                 300, // keepalive for thread
     81                workers, // keep core pool the same size as max, as we use unbounded queue so there will
     82                workers, // be never more threads than corePoolSize
     83                300, // keep alive for thread
    7584                TimeUnit.SECONDS,
    76                 workQueue,
     85                new HostLimitQueue(hostLimit),
    7786                Utils.newThreadFactory(nameFormat, Thread.NORM_PRIORITY)
    7887                );
    79         workQueue.setExecutor(executor);
    8088        return executor;
    8189    }
  • trunk/src/org/openstreetmap/josm/data/imagery/WMSCachedTileLoader.java

    r13740 r14261  
    2424
    2525    /**
    26      * Creates a TileLoader with separate WMS downloader.
     26     * Creates a TileLoader with separate WMS download thread pool.
    2727     *
    2828     * @param listener that will be notified when tile is loaded
  • trunk/test/unit/org/openstreetmap/josm/data/cache/HostLimitQueueTest.java

    r13733 r14261  
    1414import org.junit.Rule;
    1515import org.junit.Test;
     16import org.openstreetmap.josm.data.imagery.TMSCachedTileLoader;
    1617import org.openstreetmap.josm.data.imagery.TileJobOptions;
    1718import org.openstreetmap.josm.testutils.JOSMTestRules;
    1819import org.openstreetmap.josm.tools.Logging;
    19 import org.openstreetmap.josm.tools.Utils;
    2020
    2121import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
     
    3333    public JOSMTestRules test = new JOSMTestRules().preferences().timeout(20 * 1000);
    3434
    35     private static ThreadPoolExecutor getNewThreadPoolExecutor(String nameFormat, int workers, int queueLimit) {
    36         HostLimitQueue workQueue = new HostLimitQueue(queueLimit);
    37         ThreadPoolExecutor executor = new ThreadPoolExecutor(
    38                 0, // 0 so for unused thread pools threads will eventually die, freeing also the threadpool
    39                 workers, // do not this number of threads
    40                 300, // keepalive for thread
    41                 TimeUnit.SECONDS,
    42                 workQueue,
    43                 Utils.newThreadFactory(nameFormat, Thread.NORM_PRIORITY)
    44                 );
    45         workQueue.setExecutor(executor);
    46         return executor;
    47     }
    4835
    4936    /**
     
    9481    @Test
    9582    public void testSingleThreadPerHost() throws Exception {
    96         ThreadPoolExecutor tpe = getNewThreadPoolExecutor("test-%d", 3, 1);
     83        ThreadPoolExecutor tpe = TMSCachedTileLoader.getNewThreadPoolExecutor("test-%d", 3, 1);
    9784        ICacheAccess<String, CacheEntry> cache = JCSCacheManager.getCache("test", 3, 0, "");
    9885        AtomicInteger counter = new AtomicInteger(0);
     
    10794        assertEquals(10, counter.get());
    10895        // although there are 3 threads, we can make only 1 parallel call to localhost
    109         // so it should take ~10 seconds to finish
     96        // first three jobs will be not limited, as they spawn the thread
     97        // so it should take ~8 seconds to finish
    11098        // if it's shorter, it means that host limit does not work
    111         assertTrue("Expected duration between 9 and 11 seconds not met. Actual duration: " + (duration /1000),
    112                 duration < 11*1000 & duration > 9*1000);
     99        assertTrue("Expected duration between 8 and 11 seconds not met. Actual duration: " + (duration /1000),
     100                duration < 11*1000 & duration > 8*1000);
    113101    }
    114102
     
    119107    @Test
    120108    public void testMultipleThreadPerHost() throws Exception {
    121         ThreadPoolExecutor tpe = getNewThreadPoolExecutor("test-%d", 3, 2);
     109        ThreadPoolExecutor tpe = TMSCachedTileLoader.getNewThreadPoolExecutor("test-%d", 3, 2);
    122110        ICacheAccess<String, CacheEntry> cache = JCSCacheManager.getCache("test", 3, 0, "");
    123111        AtomicInteger counter = new AtomicInteger(0);
     
    144132    @Test
    145133    public void testTwoHosts() throws Exception {
    146         ThreadPoolExecutor tpe = getNewThreadPoolExecutor("test-%d", 3, 1);
     134        ThreadPoolExecutor tpe = TMSCachedTileLoader.getNewThreadPoolExecutor("test-%d", 3, 1);
    147135        ICacheAccess<String, CacheEntry> cache = JCSCacheManager.getCache("test", 3, 0, "");
    148136        AtomicInteger counter = new AtomicInteger(0);
Note: See TracChangeset for help on using the changeset viewer.