source: osm/applications/editors/josm/plugins/imagerycache/src/org/mapdb/AsyncWriteEngine.java@ 30532

Last change on this file since 30532 was 30532, checked in by donvip, 10 years ago

[josm_plugins] fix compilation warnings

File size: 10.8 KB
Line 
1/*
2 * Copyright (c) 2012 Jan Kotek
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.mapdb;
18
19import java.lang.ref.WeakReference;
20import java.util.concurrent.*;
21import java.util.concurrent.atomic.AtomicLong;
22import java.util.concurrent.locks.*;
23
24/**
25 * {@link Engine} wrapper which provides asynchronous serialization and write.
26 * This class takes an object instance, passes it to background thread (using Queue)
27 * where it is serialized and written to disk.
28 * <p/>
29 * Async write does not affect commit durability, write queue is flushed before each commit.
30 *
31 * @author Jan Kotek
32 */
33public class AsyncWriteEngine extends EngineWrapper implements Engine {
34
35 protected static final AtomicLong threadCounter = new AtomicLong();
36 protected final long threadNum = threadCounter.incrementAndGet();
37
38 protected final BlockingQueue<Long> newRecids = new ArrayBlockingQueue<Long>(128);
39
40 protected volatile boolean closeInProgress = false;
41 protected final CountDownLatch shutdownCondition = new CountDownLatch(2);
42 protected final int asyncFlushDelay;
43
44 protected static final Object DELETED = new Object();
45 protected final ReentrantLock[] writeLocks = Utils.newLocks(32);
46
47 protected final ReentrantReadWriteLock commitLock;
48
49 protected Throwable writerFailedException = null;
50
51 protected final LongConcurrentHashMap<Fun.Tuple2<Object,Serializer<Object>>> items = new LongConcurrentHashMap<>();
52
53 protected final Thread newRecidsThread = new Thread("MapDB prealloc #"+threadNum){
54 @Override public void run() {
55 try{
56 for(;;){
57 if(closeInProgress || (parentEngineWeakRef!=null && parentEngineWeakRef.get()==null) || writerFailedException!=null) return;
58 Long newRecid = getWrappedEngine().put(Utils.EMPTY_STRING, Serializer.EMPTY_SERIALIZER);
59 newRecids.put(newRecid);
60 }
61 } catch (Throwable e) {
62 writerFailedException = e;
63 }finally {
64 shutdownCondition.countDown();
65 }
66 }
67 };
68
69 protected final Thread writerThread = new Thread("MapDB writer #"+threadNum){
70 @Override public void run() {
71 try{
72
73 for(;;){
74 LongMap.LongMapIterator<Fun.Tuple2<Object,Serializer<Object>>> iter = items.longMapIterator();
75
76 if(!iter.moveToNext()){
77 //empty map, pause for a moment to give it chance to fill
78 if( (parentEngineWeakRef!=null && parentEngineWeakRef.get()==null) || writerFailedException!=null) return;
79 Thread.sleep(asyncFlushDelay);
80 if(closeInProgress){
81 //lock world and write everything
82 Utils.lockAll(writeLocks);
83 try{
84 while(!items.isEmpty()){
85 iter = items.longMapIterator();
86 while(iter.moveToNext()){
87 long recid = iter.key();
88 Fun.Tuple2<Object,Serializer<Object>> value = iter.value();
89 if(value.a==DELETED){
90 AsyncWriteEngine.super.delete(recid, value.b);
91 }else{
92 AsyncWriteEngine.super.update(recid, value.a, value.b);
93 }
94 items.remove(recid, value);
95 }
96 }
97 return;
98 }finally{
99 Utils.unlockAll(writeLocks);
100 }
101 }
102 }else do{
103 //iterate over items and write them
104 long recid = iter.key();
105
106 Utils.lock(writeLocks,recid);
107 try{
108 Fun.Tuple2<Object,Serializer<Object>> value = iter.value();
109 if(value.a==DELETED){
110 AsyncWriteEngine.super.delete(recid, value.b);
111 }else{
112 AsyncWriteEngine.super.update(recid, value.a, value.b);
113 }
114 items.remove(recid, value);
115 }finally {
116 Utils.unlock(writeLocks, recid);
117 }
118 }while(iter.moveToNext());
119
120 }
121 } catch (Throwable e) {
122 writerFailedException = e;
123 }finally {
124 shutdownCondition.countDown();
125 }
126 }
127 };
128
129
130
131 protected AsyncWriteEngine(Engine engine, boolean _transactionsDisabled, boolean _powerSavingMode, int _asyncFlushDelay) {
132 super(engine);
133
134 newRecidsThread.setDaemon(true);
135 writerThread.setDaemon(true);
136
137 commitLock = _transactionsDisabled? null: new ReentrantReadWriteLock();
138 newRecidsThread.start();
139 writerThread.start();
140 asyncFlushDelay = _asyncFlushDelay;
141
142 }
143
144 @Override
145 public <A> long put(A value, Serializer<A> serializer) {
146 if(commitLock!=null) commitLock.readLock().lock();
147 try{
148 try {
149 Long recid = newRecids.take(); //TODO possible deadlock while closing
150 update(recid, value, serializer);
151 return recid;
152 } catch (InterruptedException e) {
153 throw new RuntimeException(e);
154 }
155 }finally{
156 if(commitLock!=null) commitLock.readLock().unlock();
157 }
158
159 }
160
161 protected void checkState() {
162 if(closeInProgress) throw new IllegalAccessError("db has been closed");
163 if(writerFailedException!=null) throw new RuntimeException("Writer thread failed", writerFailedException);
164 }
165
166 @SuppressWarnings("unchecked")
167 @Override
168 public <A> A get(long recid, Serializer<A> serializer) {
169 if(commitLock!=null) commitLock.readLock().lock();
170 try{
171 Utils.lock(writeLocks,recid);
172 try{
173 checkState();
174 Fun.Tuple2<Object,Serializer<Object>> item = items.get(recid);
175 if(item!=null){
176 if(item.a == DELETED) return null;
177 return (A) item.a;
178 }
179
180 return super.get(recid, serializer);
181 }finally{
182 Utils.unlock(writeLocks,recid);
183 }
184 }finally{
185 if(commitLock!=null) commitLock.readLock().unlock();
186 }
187 }
188
189 @SuppressWarnings({ "rawtypes", "unchecked" })
190 @Override
191 public <A> void update(long recid, A value, Serializer<A> serializer) {
192
193 if(commitLock!=null && serializer!=SerializerPojo.serializer) commitLock.readLock().lock();
194 try{
195
196 Utils.lock(writeLocks, recid);
197 try{
198 checkState();
199 items.put(recid, new Fun.Tuple2(value,serializer));
200 }finally{
201 Utils.unlock(writeLocks, recid);
202 }
203 }finally{
204 if(commitLock!=null&& serializer!=SerializerPojo.serializer) commitLock.readLock().unlock();
205 }
206
207 }
208
209 @SuppressWarnings({ "unchecked", "rawtypes" })
210 @Override
211 public <A> boolean compareAndSwap(long recid, A expectedOldValue, A newValue, Serializer<A> serializer) {
212 //TODO commit lock?
213 Utils.lock(writeLocks, recid);
214 try{
215 checkState();
216 Fun.Tuple2<Object, Serializer<Object>> existing = items.get(recid);
217 A oldValue = existing!=null? (A) existing.a : super.get(recid, serializer);
218 if(oldValue == expectedOldValue || (oldValue!=null && oldValue.equals(expectedOldValue))){
219 items.put(recid, new Fun.Tuple2(newValue,serializer));
220 return true;
221 }else{
222 return false;
223 }
224 }finally{
225 Utils.unlock(writeLocks, recid);
226
227 }
228 }
229
230 @SuppressWarnings("unchecked")
231 @Override
232 public <A> void delete(long recid, Serializer<A> serializer) {
233 update(recid, (A) DELETED, serializer);
234 }
235
236 @Override
237 public void close() {
238 try {
239 if(closeInProgress) return;
240 closeInProgress = true;
241 //put preallocated recids back to store
242 for(Long recid = newRecids.poll(); recid!=null; recid = newRecids.poll()){
243 super.delete(recid, Serializer.EMPTY_SERIALIZER);
244 }
245 //TODO commit after returning recids?
246
247 //wait for worker threads to shutdown
248 shutdownCondition.await();
249
250 super.close();
251 } catch (InterruptedException e) {
252 throw new RuntimeException(e);
253 }
254 }
255
256 protected WeakReference<Engine> parentEngineWeakRef = null;
257
258 /**
259 * Main thread may die, leaving Writer Thread orphaned.
260 * To prevent this we periodically check if WeakReference was GCed.
261 * This method sets WeakReference to user facing Engine,
262 * if this instance if GCed it means that user may no longer manage
263 * and we can exit Writer Thread.
264 *
265 * @param parentEngineReference reference to user facing Engine
266 */
267 public void setParentEngineReference(Engine parentEngineReference) {
268 parentEngineWeakRef = new WeakReference<Engine>(parentEngineReference);
269 }
270
271 @Override
272 public void commit() {
273 checkState();
274 if(commitLock==null){
275 super.commit();
276 return;
277 }
278 commitLock.writeLock().lock();
279 try{
280 while(!items.isEmpty()) {
281 checkState();
282 LockSupport.parkNanos(100);
283 }
284
285 super.commit();
286 }finally {
287 commitLock.writeLock().unlock();
288 }
289 }
290
291 @Override
292 public void rollback() {
293 checkState();
294 if(commitLock == null) throw new UnsupportedOperationException("transactions disabled");
295 commitLock.writeLock().lock();
296 try{
297 while(!items.isEmpty()) LockSupport.parkNanos(100);
298 newRecids.clear();
299 super.rollback();
300 }finally {
301 commitLock.writeLock().unlock();
302 }
303 }
304}
Note: See TracBrowser for help on using the repository browser.