source: osm/applications/editors/josm/plugins/imagerycache/src/org/mapdb/Queues.java@ 29484

Last change on this file since 29484 was 29484, checked in by akks, 11 years ago

JOSM/ImageryCache: updated MapDB (no more deadlocks, Java 1.6 compatible), less crashes, multiple-JOSM support

File size: 19.2 KB
Line 
1package org.mapdb;
2
3import java.io.DataInput;
4import java.io.DataOutput;
5import java.io.IOException;
6import java.util.Collection;
7import java.util.Iterator;
8import java.util.NoSuchElementException;
9import java.util.concurrent.locks.Lock;
10import java.util.concurrent.locks.ReentrantLock;
11
12/**
13 * Various queue algorithms
14 */
15public final class Queues {
16
17 private Queues(){}
18
19
20 public static abstract class SimpleQueue<E> implements java.util.Queue<E>{
21
22 protected final Engine engine;
23 protected final Serializer<E> serializer;
24
25 protected final Atomic.Long head;
26
27
28 protected static class NodeSerializer<E> implements Serializer<Node<E>> {
29 private final Serializer<E> serializer;
30
31 public NodeSerializer(Serializer<E> serializer) {
32 this.serializer = serializer;
33 }
34
35 @Override
36 public void serialize(DataOutput out, Node<E> value) throws IOException {
37 if(value==Node.EMPTY) return;
38 Utils.packLong(out,value.next);
39 serializer.serialize(out, value.value);
40 }
41
42 @Override
43 public Node<E> deserialize(DataInput in, int available) throws IOException {
44 if(available==0)return Node.EMPTY;
45 return new Node<E>(Utils.unpackLong(in), serializer.deserialize(in,-1));
46 }
47 }
48
49 protected final Serializer<Node<E>> nodeSerializer;
50
51
52 public SimpleQueue(Engine engine, Serializer<E> serializer, long headRecid) {
53 this.engine = engine;
54 this.serializer = serializer;
55 if(headRecid == 0) headRecid = engine.put(0L, Serializer.LONG_SERIALIZER);
56 head = new Atomic.Long(engine,headRecid);
57 nodeSerializer = new NodeSerializer<E>(serializer);
58 }
59
60
61 /**
62 * Closes underlying storage and releases all resources.
63 * Used mostly with temporary collections where engine is not accessible.
64 */
65 public void close(){
66 engine.close();
67 }
68
69
70
71 protected static final class Node<E>{
72
73 protected static final Node EMPTY = new Node(0L, null);
74
75 final protected long next;
76 final protected E value;
77
78 public Node(long next, E value) {
79 this.next = next;
80 this.value = value;
81 }
82
83 @Override
84 public boolean equals(Object o) {
85 if (this == o) return true;
86 if (o == null || getClass() != o.getClass()) return false;
87
88 Node node = (Node) o;
89
90 if (next != node.next) return false;
91 if (value != null ? !value.equals(node.value) : node.value != null) return false;
92
93 return true;
94 }
95
96 @Override
97 public int hashCode() {
98 int result = (int) (next ^ (next >>> 32));
99 result = 31 * result + (value != null ? value.hashCode() : 0);
100 return result;
101 }
102 }
103
104 @Override
105 public void clear() {
106 while(!isEmpty())
107 poll();
108 }
109
110
111 @Override
112 public E remove() {
113 E ret = poll();
114 if(ret == null) throw new NoSuchElementException();
115 return ret;
116 }
117
118
119 @Override
120 public E element() {
121 E ret = peek();
122 if(ret == null) throw new NoSuchElementException();
123 return ret;
124 }
125
126
127 @Override
128 public boolean offer(E e) {
129 return add(e);
130 }
131
132
133 @Override
134 public boolean isEmpty() {
135 return head.get()==0;
136 }
137
138
139 @Override
140 public int size() {
141 throw new UnsupportedOperationException();
142 }
143
144
145 @Override
146 public boolean contains(Object o) {
147 throw new UnsupportedOperationException();
148 }
149
150 @Override
151 public Iterator<E> iterator() {
152 throw new UnsupportedOperationException();
153 }
154
155 @Override
156 public Object[] toArray() {
157 throw new UnsupportedOperationException();
158 }
159
160 @Override
161 public <T> T[] toArray(T[] a) {
162 throw new UnsupportedOperationException();
163 }
164
165
166 @Override
167 public boolean remove(Object o) {
168 throw new UnsupportedOperationException();
169 }
170
171 @Override
172 public boolean containsAll(Collection<?> c) {
173 throw new UnsupportedOperationException();
174 }
175
176 @Override
177 public boolean addAll(Collection<? extends E> c) {
178 throw new UnsupportedOperationException();
179 }
180
181 @Override
182 public boolean removeAll(Collection<?> c) {
183 throw new UnsupportedOperationException();
184 }
185
186 @Override
187 public boolean retainAll(Collection<?> c) {
188 throw new UnsupportedOperationException();
189 }
190 }
191
192 /**
193 * Last in first out lock-free queue
194 *
195 * @param <E>
196 */
197 public static class Stack<E> extends SimpleQueue<E> {
198
199 protected final boolean useLocks;
200 protected final ReentrantLock[] locks;
201
202
203 public Stack(Engine engine, Serializer<E> serializer, long headerRecid, boolean useLocks) {
204 super(engine, serializer, headerRecid);
205 this.useLocks = useLocks;
206 locks = useLocks? Utils.newLocks(32) : null;
207 }
208
209 @Override
210 public E peek() {
211 while(true){
212 long head2 = head.get();
213 if(0 == head2) return null;
214 Node<E> n = engine.get(head2, nodeSerializer);
215 long head3 = head.get();
216 if(0 == head2) return null;
217 if(head2 == head3) return (E) n.value;
218 }
219 }
220
221 @Override
222 public E poll() {
223 long head2 = 0;
224 Node<E> n;
225 do{
226 if(useLocks && head2!=0)Utils.lock(locks,head2);
227 head2 =head.get();
228 if(head2 == 0) return null;
229
230 if(useLocks && head2!=0)Utils.lock(locks,head2);
231 n = engine.get(head2, nodeSerializer);
232 }while(n==null || !head.compareAndSet(head2, n.next));
233 if(useLocks && head2!=0){
234 engine.delete(head2,Serializer.LONG_SERIALIZER);
235 Utils.unlock(locks,head2);
236 }else{
237 engine.update(head2, null, nodeSerializer);
238 }
239 return (E) n.value;
240 }
241
242
243 @Override
244 public boolean add(E e) {
245 long head2 = head.get();
246 Node<E> n = new Node<E>(head2, e);
247 long recid = engine.put(n, nodeSerializer);
248 while(!head.compareAndSet(head2, recid)){
249 //failed to update head, so read new value and start over
250 head2 = head.get();
251 n = new Node<E>(head2, e);
252 engine.update(recid, n, nodeSerializer);
253 }
254 return true;
255 }
256 }
257
258 protected static final class StackRoot{
259 final long headerRecid;
260 final boolean useLocks;
261 final Serializer serializer;
262
263 public StackRoot(long headerRecid, boolean useLocks, Serializer serializer) {
264 this.headerRecid = headerRecid;
265 this.useLocks = useLocks;
266 this.serializer = serializer;
267 }
268 }
269
270 protected static final class StackRootSerializer implements Serializer<StackRoot>{
271
272 final Serializer<Serializer> serialierSerializer;
273
274 public StackRootSerializer(Serializer<Serializer> serialierSerializer) {
275 this.serialierSerializer = serialierSerializer;
276 }
277
278 @Override
279 public void serialize(DataOutput out, StackRoot value) throws IOException {
280 out.write(SerializationHeader.MAPDB_STACK);
281 Utils.packLong(out, value.headerRecid);
282 out.writeBoolean(value.useLocks);
283 serialierSerializer.serialize(out,value.serializer);
284 }
285
286 @Override
287 public StackRoot deserialize(DataInput in, int available) throws IOException {
288 if(in.readUnsignedByte()!=SerializationHeader.MAPDB_STACK) throw new InternalError();
289 return new StackRoot(
290 Utils.unpackLong(in),
291 in.readBoolean(),
292 serialierSerializer.deserialize(in,-1)
293 );
294 }
295 }
296
297 static <E> long createStack(Engine engine, Serializer<Serializer> serializerSerializer, Serializer<E> serializer, boolean useLocks){
298 long headerRecid = engine.put(0L, Serializer.LONG_SERIALIZER);
299 StackRoot root = new StackRoot(headerRecid, useLocks, serializer);
300 StackRootSerializer rootSerializer = new StackRootSerializer(serializerSerializer);
301 return engine.put(root, rootSerializer);
302 }
303
304 static <E> Stack<E> getStack(Engine engine, Serializer<Serializer> serializerSerializer, long rootRecid){
305 StackRoot root = engine.get(rootRecid, new StackRootSerializer(serializerSerializer));
306 return new Stack<E>(engine, root.serializer, root.headerRecid, root.useLocks);
307 }
308
309 /**
310 * First in first out lock-free queue
311 *
312 * @param <E>
313 */
314 public static class Queue<E> extends SimpleQueue<E> {
315
316 protected final Atomic.Long tail;
317 protected final Atomic.Long size;
318
319 public Queue(Engine engine, Serializer<E> serializer, long headerRecid, long nextTailRecid, long sizeRecid) {
320 super(engine, serializer,headerRecid);
321 tail = new Atomic.Long(engine,nextTailRecid);
322 size = new Atomic.Long(engine,sizeRecid);
323 }
324
325 @Override
326 public boolean add(E item){
327 final long nextTail = engine.put((Node<E>)Node.EMPTY, nodeSerializer);
328 Node<E> n = new Node<E>(nextTail, item);
329 long tail2 = tail.get();
330 while(!engine.compareAndSwap(tail2, (Node<E>)Node.EMPTY, n, nodeSerializer)){
331 tail2 = tail.get();
332 }
333 head.compareAndSet(0,tail2);
334 tail.set(nextTail);
335 size.incrementAndGet();
336 return true;
337 }
338
339 @Override
340 public E poll(){
341 while(true){
342 long head2 = head.get();
343 if(head2 == 0)return null;
344 Node<E> n = engine.get(head2,nodeSerializer);
345 if(n==null){
346 //TODO we need to know when queue is empty and we can break the cycle
347 // I am not really sure under what concurrent situation is n==null, so there is 'size' hack
348 // but 'size' hack is probably not thread-safe
349 if(size.get()==0)return null ;
350 continue;
351 }
352 if(!engine.compareAndSwap(head2,n, (Node<E>)Node.EMPTY, nodeSerializer))
353 continue;
354 if(!head.compareAndSet(head2,n.next)) throw new InternalError();
355 size.decrementAndGet();
356 return n.value;
357 }
358 }
359
360 @Override
361 public E peek() {
362 long head2 = head.get();
363 if(head2==0) return null;
364 Node<E> n = engine.get(head2,nodeSerializer);
365 while(n == null){
366 if(size.get()==0) return null;
367 n = engine.get(head2,nodeSerializer);
368 }
369
370 return n.value;
371 }
372 }
373
374
375 protected static final class QueueRoot{
376 final long headerRecid;
377 final long nextTailRecid;
378 final Serializer serializer;
379 final long sizeRecid;
380
381 public QueueRoot(long headerRecid, long nextTailRecid, long sizeRecid, Serializer serializer) {
382 this.headerRecid = headerRecid;
383 this.nextTailRecid = nextTailRecid;
384 this.serializer = serializer;
385 this.sizeRecid = sizeRecid;
386 }
387 }
388
389 protected static final class QueueRootSerializer implements Serializer<QueueRoot>{
390
391 final Serializer<Serializer> serialierSerializer;
392
393 public QueueRootSerializer(Serializer<Serializer> serialierSerializer) {
394 this.serialierSerializer = serialierSerializer;
395 }
396
397 @Override
398 public void serialize(DataOutput out, QueueRoot value) throws IOException {
399 out.write(SerializationHeader.MAPDB_QUEUE);
400 Utils.packLong(out, value.headerRecid);
401 Utils.packLong(out, value.nextTailRecid);
402 Utils.packLong(out, value.sizeRecid);
403 serialierSerializer.serialize(out,value.serializer);
404 }
405
406 @Override
407 public QueueRoot deserialize(DataInput in, int available) throws IOException {
408 if(in.readUnsignedByte()!=SerializationHeader.MAPDB_QUEUE) throw new InternalError();
409 return new QueueRoot(
410 Utils.unpackLong(in),
411 Utils.unpackLong(in),
412 Utils.unpackLong(in),
413 serialierSerializer.deserialize(in,-1)
414 );
415 }
416 }
417
418 static <E> long createQueue(Engine engine, Serializer<Serializer> serializerSerializer, Serializer<E> serializer){
419 long headerRecid = engine.put(0L, Serializer.LONG_SERIALIZER);
420 long nextTail = engine.put(SimpleQueue.Node.EMPTY, new SimpleQueue.NodeSerializer(null));
421 long nextTailRecid = engine.put(nextTail, Serializer.LONG_SERIALIZER);
422 long sizeRecid = engine.put(0L, Serializer.LONG_SERIALIZER);
423 QueueRoot root = new QueueRoot(headerRecid, nextTailRecid, sizeRecid, serializer);
424 QueueRootSerializer rootSerializer = new QueueRootSerializer(serializerSerializer);
425 return engine.put(root, rootSerializer);
426 }
427
428
429 static <E> Queue<E> getQueue(Engine engine, Serializer<Serializer> serializerSerializer, long rootRecid){
430 QueueRoot root = engine.get(rootRecid, new QueueRootSerializer(serializerSerializer));
431 return new Queue<E>(engine, root.serializer, root.headerRecid, root.nextTailRecid,root.sizeRecid);
432 }
433
434 public static class CircularQueue<E> extends SimpleQueue<E> {
435
436 protected final Atomic.Long headInsert;
437 //TODO is there a way to implement this without global locks?
438 protected final Lock lock = new ReentrantLock();
439 protected final long size;
440
441 public CircularQueue(Engine engine, Serializer serializer, long headRecid, long headInsertRecid, long size) {
442 super(engine, serializer, headRecid);
443 headInsert = new Atomic.Long(engine, headInsertRecid);
444 this.size = size;
445 }
446
447 @Override
448 public boolean add(Object o) {
449 lock.lock();
450 try{
451 long nRecid = headInsert.get();
452 Node<E> n = engine.get(nRecid, nodeSerializer);
453 n = new Node<E>(n.next, (E) o);
454 engine.update(nRecid, n, nodeSerializer);
455 headInsert.set(n.next);
456 //move 'poll' head if it points to currently replaced item
457 head.compareAndSet(nRecid, n.next);
458 return true;
459 }finally {
460 lock.unlock();
461 }
462 }
463
464 @Override
465 public void clear() {
466 // praise locking
467 lock.lock();
468 try {
469 for (int i = 0; i < size; i++) {
470 poll();
471 }
472 } finally {
473 lock.unlock();
474 }
475 }
476
477 @Override
478 public E poll() {
479 lock.lock();
480 try{
481 long nRecid = head.get();
482 Node<E> n = engine.get(nRecid, nodeSerializer);
483 engine.update(nRecid, new Node<E>(n.next, null), nodeSerializer);
484 head.set(n.next);
485 return n.value;
486 }finally {
487 lock.unlock();
488 }
489 }
490
491 @Override
492 public E peek() {
493 lock.lock();
494 try{
495 long nRecid = head.get();
496 Node<E> n = engine.get(nRecid, nodeSerializer);
497 return n.value;
498 }finally {
499 lock.unlock();
500 }
501 }
502 }
503
504 protected static final class CircularQueueRoot{
505 final long headerRecid;
506 final long headerInsertRecid;
507 final Serializer serializer;
508 final long sizeRecid;
509
510 public CircularQueueRoot(long headerRecid, long headerInsertRecid, long sizeRecid, Serializer serializer) {
511 this.headerRecid = headerRecid;
512 this.headerInsertRecid = headerInsertRecid;
513 this.serializer = serializer;
514 this.sizeRecid = sizeRecid;
515 }
516 }
517
518 protected static final class CircularQueueRootSerializer implements Serializer<CircularQueueRoot>{
519
520 final Serializer<Serializer> serialierSerializer;
521
522 public CircularQueueRootSerializer(Serializer<Serializer> serialierSerializer) {
523 this.serialierSerializer = serialierSerializer;
524 }
525
526 @Override
527 public void serialize(DataOutput out, CircularQueueRoot value) throws IOException {
528 out.write(SerializationHeader.MAPDB_CIRCULAR_QUEUE);
529 Utils.packLong(out, value.headerRecid);
530 Utils.packLong(out, value.headerInsertRecid);
531 Utils.packLong(out, value.sizeRecid);
532 serialierSerializer.serialize(out,value.serializer);
533 }
534
535 @Override
536 public CircularQueueRoot deserialize(DataInput in, int available) throws IOException {
537 if(in.readUnsignedByte()!=SerializationHeader.MAPDB_CIRCULAR_QUEUE) throw new InternalError();
538 return new CircularQueueRoot(
539 Utils.unpackLong(in),
540 Utils.unpackLong(in),
541 Utils.unpackLong(in),
542 serialierSerializer.deserialize(in,-1)
543 );
544 }
545 }
546
547 static <E> long createCircularQueue(Engine engine, Serializer<Serializer> serializerSerializer, Serializer<E> serializer, long size){
548 if(size<2) throw new IllegalArgumentException();
549 //insert N Nodes empty nodes into a circle
550 long prevRecid = 0;
551 long firstRecid = 0;
552 Serializer<SimpleQueue.Node> nodeSer = new SimpleQueue.NodeSerializer(serializerSerializer);
553 for(long i=0;i<size;i++){
554 SimpleQueue.Node n = new SimpleQueue.Node(prevRecid, null);
555 prevRecid = engine.put(n, nodeSer);
556 if(firstRecid==0) firstRecid = prevRecid;
557 }
558 //update first node to point to last recid
559 engine.update(firstRecid, new SimpleQueue.Node(prevRecid, null), nodeSer );
560
561 long headerRecid = engine.put(prevRecid, Serializer.LONG_SERIALIZER);
562 long headerInsertRecid = engine.put(prevRecid, Serializer.LONG_SERIALIZER);
563
564 CircularQueueRoot root = new CircularQueueRoot(headerRecid, headerInsertRecid, size, serializer);
565 CircularQueueRootSerializer rootSerializer = new CircularQueueRootSerializer(serializerSerializer);
566 return engine.put(root, rootSerializer);
567 }
568
569
570 static <E> CircularQueue<E> getCircularQueue(Engine engine, Serializer<Serializer> serializerSerializer, long rootRecid){
571 CircularQueueRoot root = engine.get(rootRecid, new CircularQueueRootSerializer(serializerSerializer));
572 return new CircularQueue<E>(engine, root.serializer, root.headerRecid, root.headerInsertRecid,root.sizeRecid);
573 }
574
575}
Note: See TracBrowser for help on using the repository browser.