source: osm/applications/editors/josm/plugins/imagerycache/src/org/mapdb/AsyncWriteEngine.java@ 29484

Last change on this file since 29484 was 29484, checked in by akks, 11 years ago

JOSM/ImageryCache: updated MapDB (no more deadlocks, Java 1.6 compatible), less crashes, multiple-JOSM support

File size: 10.7 KB
Line 
1/*
2 * Copyright (c) 2012 Jan Kotek
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.mapdb;
18
19import java.lang.ref.WeakReference;
20import java.util.concurrent.*;
21import java.util.concurrent.atomic.AtomicLong;
22import java.util.concurrent.locks.*;
23
24/**
25 * {@link Engine} wrapper which provides asynchronous serialization and write.
26 * This class takes an object instance, passes it to background thread (using Queue)
27 * where it is serialized and written to disk.
28 * <p/>
29 * Async write does not affect commit durability, write queue is flushed before each commit.
30 *
31 * @author Jan Kotek
32 */
33public class AsyncWriteEngine extends EngineWrapper implements Engine {
34
35 protected static final AtomicLong threadCounter = new AtomicLong();
36 protected final long threadNum = threadCounter.incrementAndGet();
37
38 protected final BlockingQueue<Long> newRecids = new ArrayBlockingQueue<Long>(128);
39
40 protected volatile boolean closeInProgress = false;
41 protected final CountDownLatch shutdownCondition = new CountDownLatch(2);
42 protected final int asyncFlushDelay;
43
44 protected static final Object DELETED = new Object();
45 protected final ReentrantLock[] writeLocks = Utils.newLocks(32);
46
47 protected final ReentrantReadWriteLock commitLock;
48
49 protected Throwable writerFailedException = null;
50
51
52 protected final LongConcurrentHashMap<Fun.Tuple2<Object,Serializer>> items = new LongConcurrentHashMap<Fun.Tuple2<Object, Serializer>>();
53
54 protected final Thread newRecidsThread = new Thread("MapDB prealloc #"+threadNum){
55 @Override public void run() {
56 try{
57 for(;;){
58 if(closeInProgress || (parentEngineWeakRef!=null && parentEngineWeakRef.get()==null) || writerFailedException!=null) return;
59 Long newRecid = getWrappedEngine().put(Utils.EMPTY_STRING, Serializer.EMPTY_SERIALIZER);
60 newRecids.put(newRecid);
61 }
62 } catch (Throwable e) {
63 writerFailedException = e;
64 }finally {
65 shutdownCondition.countDown();
66 }
67 }
68 };
69
70 protected final Thread writerThread = new Thread("MapDB writer #"+threadNum){
71 @Override public void run() {
72 try{
73
74 for(;;){
75 LongMap.LongMapIterator<Fun.Tuple2<Object,Serializer>> iter = items.longMapIterator();
76
77 if(!iter.moveToNext()){
78 //empty map, pause for a moment to give it chance to fill
79 if( (parentEngineWeakRef!=null && parentEngineWeakRef.get()==null) || writerFailedException!=null) return;
80 Thread.sleep(asyncFlushDelay);
81 if(closeInProgress){
82 //lock world and write everything
83 Utils.lockAll(writeLocks);
84 try{
85 while(!items.isEmpty()){
86 iter = items.longMapIterator();
87 while(iter.moveToNext()){
88 long recid = iter.key();
89 Fun.Tuple2<Object,Serializer> value = iter.value();
90 if(value.a==DELETED){
91 AsyncWriteEngine.super.delete(recid, value.b);
92 }else{
93 AsyncWriteEngine.super.update(recid, value.a, value.b);
94 }
95 items.remove(recid, value);
96 }
97 }
98 return;
99 }finally{
100 Utils.unlockAll(writeLocks);
101 }
102 }
103 }else do{
104 //iterate over items and write them
105 long recid = iter.key();
106
107 Utils.lock(writeLocks,recid);
108 try{
109 Fun.Tuple2<Object,Serializer> value = iter.value();
110 if(value.a==DELETED){
111 AsyncWriteEngine.super.delete(recid, value.b);
112 }else{
113 AsyncWriteEngine.super.update(recid, value.a, value.b);
114 }
115 items.remove(recid, value);
116 }finally {
117 Utils.unlock(writeLocks, recid);
118 }
119 }while(iter.moveToNext());
120
121 }
122 } catch (Throwable e) {
123 writerFailedException = e;
124 }finally {
125 shutdownCondition.countDown();
126 }
127 }
128 };
129
130
131
132 protected AsyncWriteEngine(Engine engine, boolean _transactionsDisabled, boolean _powerSavingMode, int _asyncFlushDelay) {
133 super(engine);
134
135 newRecidsThread.setDaemon(true);
136 writerThread.setDaemon(true);
137
138 commitLock = _transactionsDisabled? null: new ReentrantReadWriteLock();
139 newRecidsThread.start();
140 writerThread.start();
141 asyncFlushDelay = _asyncFlushDelay;
142
143 }
144
145 @Override
146 public <A> long put(A value, Serializer<A> serializer) {
147 if(commitLock!=null) commitLock.readLock().lock();
148 try{
149 try {
150 Long recid = newRecids.take(); //TODO possible deadlock while closing
151 update(recid, value, serializer);
152 return recid;
153 } catch (InterruptedException e) {
154 throw new RuntimeException(e);
155 }
156 }finally{
157 if(commitLock!=null) commitLock.readLock().unlock();
158 }
159
160 }
161
162 protected void checkState() {
163 if(closeInProgress) throw new IllegalAccessError("db has been closed");
164 if(writerFailedException!=null) throw new RuntimeException("Writer thread failed", writerFailedException);
165 }
166
167 @Override
168 public <A> A get(long recid, Serializer<A> serializer) {
169 if(commitLock!=null) commitLock.readLock().lock();
170 try{
171 Utils.lock(writeLocks,recid);
172 try{
173 checkState();
174 Fun.Tuple2<Object,Serializer> item = items.get(recid);
175 if(item!=null){
176 if(item.a == DELETED) return null;
177 return (A) item.a;
178 }
179
180 return super.get(recid, serializer);
181 }finally{
182 Utils.unlock(writeLocks,recid);
183 }
184 }finally{
185 if(commitLock!=null) commitLock.readLock().unlock();
186 }
187 }
188
189 @Override
190 public <A> void update(long recid, A value, Serializer<A> serializer) {
191
192 if(commitLock!=null && serializer!=SerializerPojo.serializer) commitLock.readLock().lock();
193 try{
194
195 Utils.lock(writeLocks, recid);
196 try{
197 checkState();
198 items.put(recid, new Fun.Tuple2(value,serializer));
199 }finally{
200 Utils.unlock(writeLocks, recid);
201 }
202 }finally{
203 if(commitLock!=null&& serializer!=SerializerPojo.serializer) commitLock.readLock().unlock();
204 }
205
206 }
207
208 @Override
209 public <A> boolean compareAndSwap(long recid, A expectedOldValue, A newValue, Serializer<A> serializer) {
210 //TODO commit lock?
211 Utils.lock(writeLocks, recid);
212 try{
213 checkState();
214 Fun.Tuple2<Object, Serializer> existing = items.get(recid);
215 A oldValue = existing!=null? (A) existing.a : super.get(recid, serializer);
216 if(oldValue == expectedOldValue || (oldValue!=null && oldValue.equals(expectedOldValue))){
217 items.put(recid, new Fun.Tuple2(newValue,serializer));
218 return true;
219 }else{
220 return false;
221 }
222 }finally{
223 Utils.unlock(writeLocks, recid);
224
225 }
226 }
227
228 @Override
229 public <A> void delete(long recid, Serializer<A> serializer) {
230 update(recid, (A) DELETED, serializer);
231 }
232
233 @Override
234 public void close() {
235 try {
236 if(closeInProgress) return;
237 closeInProgress = true;
238 //put preallocated recids back to store
239 for(Long recid = newRecids.poll(); recid!=null; recid = newRecids.poll()){
240 super.delete(recid, Serializer.EMPTY_SERIALIZER);
241 }
242 //TODO commit after returning recids?
243
244 //wait for worker threads to shutdown
245 shutdownCondition.await();
246
247
248 super.close();
249 } catch (InterruptedException e) {
250 throw new RuntimeException(e);
251 }
252 }
253
254
255
256 protected WeakReference<Engine> parentEngineWeakRef = null;
257
258 /**
259 * Main thread may die, leaving Writer Thread orphaned.
260 * To prevent this we periodically check if WeakReference was GCed.
261 * This method sets WeakReference to user facing Engine,
262 * if this instance if GCed it means that user may no longer manage
263 * and we can exit Writer Thread.
264 *
265 * @param parentEngineReference reference to user facing Engine
266 */
267 public void setParentEngineReference(Engine parentEngineReference) {
268 parentEngineWeakRef = new WeakReference<Engine>(parentEngineReference);
269 }
270
271 @Override
272 public void commit() {
273 checkState();
274 if(commitLock==null){
275 super.commit();
276 return;
277 }
278 commitLock.writeLock().lock();
279 try{
280 while(!items.isEmpty()) {
281 checkState();
282 LockSupport.parkNanos(100);
283 }
284
285 super.commit();
286 }finally {
287 commitLock.writeLock().unlock();
288 }
289 }
290
291 @Override
292 public void rollback() {
293 checkState();
294 if(commitLock == null) throw new UnsupportedOperationException("transactions disabled");
295 commitLock.writeLock().lock();
296 try{
297 while(!items.isEmpty()) LockSupport.parkNanos(100);
298 newRecids.clear();
299 super.rollback();
300 }finally {
301 commitLock.writeLock().unlock();
302 }
303 }
304
305}
Note: See TracBrowser for help on using the repository browser.