Changeset 8314 in josm for trunk/src/org/openstreetmap
- Timestamp:
- 2015-05-02T22:53:56+02:00 (10 years ago)
- Location:
- trunk/src/org/openstreetmap/josm/data
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/src/org/openstreetmap/josm/data/cache/JCSCachedTileLoaderJob.java
r8291 r8314 56 56 * maximum download threads that will be started 57 57 */ 58 public static final IntegerProperty THREAD_LIMIT = new IntegerProperty("cache.jcs.max_threads", 10); 58 public final static IntegerProperty THREAD_LIMIT = new IntegerProperty("cache.jcs.max_threads", 10); 59 60 public static class LIFOQueue extends LinkedBlockingDeque<Runnable> { 61 public LIFOQueue(int capacity) { 62 super(capacity); 63 } 64 65 @Override 66 public boolean offer(Runnable t) { 67 return super.offerFirst(t); 68 } 69 70 @Override 71 public Runnable remove() { 72 return super.removeFirst(); 73 } 74 } 75 76 77 /* 78 * ThreadPoolExecutor starts new threads, until THREAD_LIMIT is reached. Then it puts tasks into LIFOQueue, which is fairly 79 * small, but we do not want a lot of outstanding tasks queued, but rather prefer the class consumer to resubmit the task, which are 80 * important right now. 81 * 82 * This way, if some task gets outdated (for example - user paned the map, and we do not want to download this tile any more), 83 * the task will not be resubmitted, and thus - never queued. 84 * 85 * There is no point in canceling tasks, that are already taken by worker threads (if we made so much effort, we can at least cache 86 * the response, so later it could be used). We could actually cancel what is in LIFOQueue, but this is a tradeoff between simplicity 87 * and performance (we do want to have something to offer to worker threads before tasks will be resubmitted by class consumer) 88 */ 59 89 private static Executor DOWNLOAD_JOB_DISPATCHER = new ThreadPoolExecutor( 60 90 2, // we have a small queue, so threads will be quickly started (threads are started only, when queue is full) … … 63 93 TimeUnit.SECONDS, 64 94 // make queue of LIFO type - so recently requested tiles will be loaded first (assuming that these are which user is waiting to see) 65 new LinkedBlockingDeque<Runnable>(5) { 66 /* keep the queue size fairly small, we do not want to 67 download a lot of tiles, that user is not seeing anyway */ 68 @Override 69 public boolean offer(Runnable t) { 70 return super.offerFirst(t); 71 } 72 73 @Override 74 public Runnable remove() { 75 return super.removeFirst(); 76 } 77 } 95 new LIFOQueue(5) 78 96 ); 97 79 98 private static ConcurrentMap<String,Set<ICachedLoaderListener>> inProgress = new ConcurrentHashMap<>(); 80 99 private static ConcurrentMap<String, Boolean> useHead = new ConcurrentHashMap<>(); … … 157 176 // object not in cache, so submit work to separate thread 158 177 try { 159 // use getter method, so subclasses may override executors, to get separate thread pool 160 getDownloadExecutor().execute(JCSCachedTileLoaderJob.this); 178 if (executionGuard()) { 179 // use getter method, so subclasses may override executors, to get separate thread pool 180 getDownloadExecutor().execute(this); 181 } else { 182 log.log(Level.FINE, "JCS - guard rejected job for: {0}", getCacheKey()); 183 finishLoading(LoadResult.REJECTED); 184 } 161 185 } catch (RejectedExecutionException e) { 162 186 // queue was full, try again later … … 165 189 } 166 190 } 191 } 192 193 /** 194 * Guard method for execution. If guard returns true, the execution of download task will commence 195 * otherwise, execution will finish with result LoadResult.REJECTED 196 * 197 * It is responsibility of the overriding class, to handle properly situation in finishLoading class 198 * @return 199 */ 200 protected boolean executionGuard() { 201 return true; 202 } 203 204 /** 205 * This method is run when job has finished 206 */ 207 protected void executionFinished() { 167 208 } 168 209 … … 219 260 } 220 261 } finally { 262 executionFinished(); 221 263 currentThread.setName(oldName); 222 264 } -
trunk/src/org/openstreetmap/josm/data/imagery/TMSCachedTileLoaderJob.java
r8307 r8314 8 8 import java.util.concurrent.ConcurrentHashMap; 9 9 import java.util.concurrent.Executor; 10 import java.util.concurrent.LinkedBlockingDeque;11 10 import java.util.concurrent.Semaphore; 12 11 import java.util.concurrent.ThreadPoolExecutor; … … 42 41 43 42 /** 43 * Limit definition for per host concurrent connections 44 */ 45 public final static IntegerProperty HOST_LIMIT = new IntegerProperty("imagery.tms.tmsloader.maxjobsperhost", 6); 46 47 /* 48 * Host limit guards the area - between submission to the queue up to loading is finished. It uses executionGuard method 49 * from JCSCachedTileLoaderJob to acquire the semaphore, and releases it - when loadingFinished is called (but not when 50 * LoadResult.GUARD_REJECTED is set) 51 * 52 */ 53 54 private Semaphore getSemaphore() { 55 String host = getUrl().getHost(); 56 Semaphore limit = HOST_LIMITS.get(host); 57 if (limit == null) { 58 synchronized(HOST_LIMITS) { 59 limit = HOST_LIMITS.get(host); 60 if (limit == null) { 61 limit = new Semaphore(HOST_LIMIT.get().intValue()); 62 HOST_LIMITS.put(host, limit); 63 } 64 } 65 } 66 return limit; 67 } 68 69 private boolean acquireSemaphore() { 70 boolean ret = true; 71 Semaphore limit = getSemaphore(); 72 if (limit != null) { 73 ret = limit.tryAcquire(); 74 if (!ret) { 75 Main.debug("rejecting job because of per host limit"); 76 } 77 } 78 return ret; 79 80 } 81 82 private void releaseSemaphore() { 83 Semaphore limit = getSemaphore(); 84 if (limit != null) { 85 limit.release(); 86 } 87 } 88 89 90 private static Map<String, Semaphore> HOST_LIMITS = new ConcurrentHashMap<>(); 91 92 /** 44 93 * overrides the THREAD_LIMIT in superclass, as we want to have separate limit and pool for TMS 45 94 */ 46 public static final IntegerProperty THREAD_LIMIT = new IntegerProperty("imagery.tms.tmsloader.maxjobs", 25); 47 48 /** 49 * Limit definition for per host concurrent connections 50 */ 51 public static final IntegerProperty HOST_LIMIT = new IntegerProperty("imagery.tms.tmsloader.maxjobsperhost", 6); 52 53 54 private static class LIFOQueue extends LinkedBlockingDeque<Runnable> { 55 public LIFOQueue(int capacity) { 56 super(capacity); 57 } 58 59 private final static Semaphore getSemaphore(Runnable r) { 60 if (!(r instanceof TMSCachedTileLoaderJob)) 61 return null; 62 TMSCachedTileLoaderJob cachedJob = (TMSCachedTileLoaderJob) r; 63 Semaphore limit = HOST_LIMITS.get(cachedJob.getUrl().getHost()); 64 if (limit == null) { 65 synchronized(HOST_LIMITS) { 66 limit = HOST_LIMITS.get(cachedJob.getUrl().getHost()); 67 if (limit == null) { 68 limit = new Semaphore(HOST_LIMIT.get().intValue()); 69 HOST_LIMITS.put(cachedJob.getUrl().getHost(), limit); 70 } 71 } 72 } 73 return limit; 74 } 75 76 private boolean acquireSemaphore(Runnable r) { 77 boolean ret = true; 78 Semaphore limit = getSemaphore(r); 79 if (limit != null) { 80 ret = limit.tryAcquire(); 81 if (!ret) { 82 Main.debug("rejecting job because of per host limit"); 83 } 84 } 85 return ret; 86 } 87 88 @Override 89 public boolean offer(Runnable t) { 90 return acquireSemaphore(t) && super.offerFirst(t); 91 } 92 93 private Runnable releaseSemaphore(Runnable r) { 94 Semaphore limit = getSemaphore(r); 95 if (limit != null) 96 limit.release(); 97 return r; 98 } 99 100 @Override 101 public Runnable remove() { 102 return releaseSemaphore(super.removeFirst()); 103 } 104 105 @Override 106 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { 107 return releaseSemaphore(super.poll(timeout, unit)); 108 } 109 110 @Override 111 public Runnable take() throws InterruptedException { 112 return releaseSemaphore(super.take()); 113 } 114 } 115 116 private static Map<String, Semaphore> HOST_LIMITS = new ConcurrentHashMap<>(); 95 public final static IntegerProperty THREAD_LIMIT = new IntegerProperty("imagery.tms.tmsloader.maxjobs", 25); 117 96 118 97 /** … … 143 122 } 144 123 145 146 124 /** 147 125 * Constructor for creating a job, to get a specific tile from cache … … 175 153 * this doesn't needs to be synchronized, as it's not that costly to keep only one execution 176 154 * in parallel, but URL creation and Tile.getUrl() are costly and are not needed when fetching 177 * data from cache 155 * data from cache, that's why URL creation is postponed until it's needed 178 156 * 179 157 * We need to have static url value for TileLoaderJob, as for some TileSources we might get different … … 234 212 } 235 213 214 @Override 215 protected boolean executionGuard() { 216 return acquireSemaphore(); 217 } 218 219 @Override 220 protected void executionFinished() { 221 releaseSemaphore(); 222 } 223 236 224 public void submit() { 237 225 tile.initLoading(); … … 246 234 case FAILURE: 247 235 tile.setError("Problem loading tile"); 236 // no break intentional here 248 237 case SUCCESS: 249 238 handleNoTileAtZoom(); … … 254 243 } 255 244 } 245 // no break intentional here 256 246 case REJECTED: 257 // do not set anything here, leave waiting sign247 // do nothing 258 248 } 259 249 if (listener != null) {
Note:
See TracChangeset
for help on using the changeset viewer.