source: osm/applications/editors/josm/plugins/imagerycache/src/org/mapdb/Queues.java@ 30532

Last change on this file since 30532 was 30532, checked in by donvip, 10 years ago

[josm_plugins] fix compilation warnings

File size: 19.6 KB
Line 
1package org.mapdb;
2
3import java.io.DataInput;
4import java.io.DataOutput;
5import java.io.IOException;
6import java.util.Collection;
7import java.util.Iterator;
8import java.util.NoSuchElementException;
9import java.util.concurrent.locks.Lock;
10import java.util.concurrent.locks.ReentrantLock;
11
12/**
13 * Various queue algorithms
14 */
15public final class Queues {
16
17 private Queues(){}
18
19
20 public static abstract class SimpleQueue<E> implements java.util.Queue<E>{
21
22 protected final Engine engine;
23 protected final Serializer<E> serializer;
24
25 protected final Atomic.Long head;
26
27
28 protected static class NodeSerializer<E> implements Serializer<Node<E>> {
29 private final Serializer<E> serializer;
30
31 public NodeSerializer(Serializer<E> serializer) {
32 this.serializer = serializer;
33 }
34
35 @Override
36 public void serialize(DataOutput out, Node<E> value) throws IOException {
37 if(value==Node.EMPTY) return;
38 Utils.packLong(out,value.next);
39 serializer.serialize(out, value.value);
40 }
41
42 @SuppressWarnings("unchecked")
43 @Override
44 public Node<E> deserialize(DataInput in, int available) throws IOException {
45 if(available==0)return Node.EMPTY;
46 return new Node<E>(Utils.unpackLong(in), serializer.deserialize(in,-1));
47 }
48 }
49
50 protected final Serializer<Node<E>> nodeSerializer;
51
52
53 public SimpleQueue(Engine engine, Serializer<E> serializer, long headRecid) {
54 this.engine = engine;
55 this.serializer = serializer;
56 if(headRecid == 0) headRecid = engine.put(0L, Serializer.LONG_SERIALIZER);
57 head = new Atomic.Long(engine,headRecid);
58 nodeSerializer = new NodeSerializer<E>(serializer);
59 }
60
61
62 /**
63 * Closes underlying storage and releases all resources.
64 * Used mostly with temporary collections where engine is not accessible.
65 */
66 public void close(){
67 engine.close();
68 }
69
70
71
72 protected static final class Node<E>{
73
74 @SuppressWarnings({ "unchecked", "rawtypes" })
75 protected static final Node EMPTY = new Node(0L, null);
76
77 final protected long next;
78 final protected E value;
79
80 public Node(long next, E value) {
81 this.next = next;
82 this.value = value;
83 }
84
85 @Override
86 public boolean equals(Object o) {
87 if (this == o) return true;
88 if (o == null || getClass() != o.getClass()) return false;
89
90 Node node = (Node) o;
91
92 if (next != node.next) return false;
93 if (value != null ? !value.equals(node.value) : node.value != null) return false;
94
95 return true;
96 }
97
98 @Override
99 public int hashCode() {
100 int result = (int) (next ^ (next >>> 32));
101 result = 31 * result + (value != null ? value.hashCode() : 0);
102 return result;
103 }
104 }
105
106 @Override
107 public void clear() {
108 while(!isEmpty())
109 poll();
110 }
111
112
113 @Override
114 public E remove() {
115 E ret = poll();
116 if(ret == null) throw new NoSuchElementException();
117 return ret;
118 }
119
120
121 @Override
122 public E element() {
123 E ret = peek();
124 if(ret == null) throw new NoSuchElementException();
125 return ret;
126 }
127
128
129 @Override
130 public boolean offer(E e) {
131 return add(e);
132 }
133
134
135 @Override
136 public boolean isEmpty() {
137 return head.get()==0;
138 }
139
140
141 @Override
142 public int size() {
143 throw new UnsupportedOperationException();
144 }
145
146
147 @Override
148 public boolean contains(Object o) {
149 throw new UnsupportedOperationException();
150 }
151
152 @Override
153 public Iterator<E> iterator() {
154 throw new UnsupportedOperationException();
155 }
156
157 @Override
158 public Object[] toArray() {
159 throw new UnsupportedOperationException();
160 }
161
162 @Override
163 public <T> T[] toArray(T[] a) {
164 throw new UnsupportedOperationException();
165 }
166
167
168 @Override
169 public boolean remove(Object o) {
170 throw new UnsupportedOperationException();
171 }
172
173 @Override
174 public boolean containsAll(Collection<?> c) {
175 throw new UnsupportedOperationException();
176 }
177
178 @Override
179 public boolean addAll(Collection<? extends E> c) {
180 throw new UnsupportedOperationException();
181 }
182
183 @Override
184 public boolean removeAll(Collection<?> c) {
185 throw new UnsupportedOperationException();
186 }
187
188 @Override
189 public boolean retainAll(Collection<?> c) {
190 throw new UnsupportedOperationException();
191 }
192 }
193
194 /**
195 * Last in first out lock-free queue
196 *
197 * @param <E>
198 */
199 public static class Stack<E> extends SimpleQueue<E> {
200
201 protected final boolean useLocks;
202 protected final ReentrantLock[] locks;
203
204
205 public Stack(Engine engine, Serializer<E> serializer, long headerRecid, boolean useLocks) {
206 super(engine, serializer, headerRecid);
207 this.useLocks = useLocks;
208 locks = useLocks? Utils.newLocks(32) : null;
209 }
210
211 @Override
212 public E peek() {
213 while(true){
214 long head2 = head.get();
215 if(0 == head2) return null;
216 Node<E> n = engine.get(head2, nodeSerializer);
217 long head3 = head.get();
218 if(0 == head2) return null;
219 if(head2 == head3) return (E) n.value;
220 }
221 }
222
223 @Override
224 public E poll() {
225 long head2 = 0;
226 Node<E> n;
227 do{
228 if(useLocks && head2!=0)Utils.lock(locks,head2);
229 head2 =head.get();
230 if(head2 == 0) return null;
231
232 if(useLocks && head2!=0)Utils.lock(locks,head2);
233 n = engine.get(head2, nodeSerializer);
234 }while(n==null || !head.compareAndSet(head2, n.next));
235 if(useLocks && head2!=0){
236 engine.delete(head2,Serializer.LONG_SERIALIZER);
237 Utils.unlock(locks,head2);
238 }else{
239 engine.update(head2, null, nodeSerializer);
240 }
241 return (E) n.value;
242 }
243
244
245 @Override
246 public boolean add(E e) {
247 long head2 = head.get();
248 Node<E> n = new Node<E>(head2, e);
249 long recid = engine.put(n, nodeSerializer);
250 while(!head.compareAndSet(head2, recid)){
251 //failed to update head, so read new value and start over
252 head2 = head.get();
253 n = new Node<E>(head2, e);
254 engine.update(recid, n, nodeSerializer);
255 }
256 return true;
257 }
258 }
259
260 protected static final class StackRoot{
261 final long headerRecid;
262 final boolean useLocks;
263 final Serializer serializer;
264
265 public StackRoot(long headerRecid, boolean useLocks, Serializer serializer) {
266 this.headerRecid = headerRecid;
267 this.useLocks = useLocks;
268 this.serializer = serializer;
269 }
270 }
271
272 protected static final class StackRootSerializer implements Serializer<StackRoot>{
273
274 final Serializer<Serializer> serialierSerializer;
275
276 public StackRootSerializer(Serializer<Serializer> serialierSerializer) {
277 this.serialierSerializer = serialierSerializer;
278 }
279
280 @Override
281 public void serialize(DataOutput out, StackRoot value) throws IOException {
282 out.write(SerializationHeader.MAPDB_STACK);
283 Utils.packLong(out, value.headerRecid);
284 out.writeBoolean(value.useLocks);
285 serialierSerializer.serialize(out,value.serializer);
286 }
287
288 @Override
289 public StackRoot deserialize(DataInput in, int available) throws IOException {
290 if(in.readUnsignedByte()!=SerializationHeader.MAPDB_STACK) throw new InternalError();
291 return new StackRoot(
292 Utils.unpackLong(in),
293 in.readBoolean(),
294 serialierSerializer.deserialize(in,-1)
295 );
296 }
297 }
298
299 static <E> long createStack(Engine engine, Serializer<Serializer> serializerSerializer, Serializer<E> serializer, boolean useLocks){
300 long headerRecid = engine.put(0L, Serializer.LONG_SERIALIZER);
301 StackRoot root = new StackRoot(headerRecid, useLocks, serializer);
302 StackRootSerializer rootSerializer = new StackRootSerializer(serializerSerializer);
303 return engine.put(root, rootSerializer);
304 }
305
306 @SuppressWarnings("unchecked")
307 static <E> Stack<E> getStack(Engine engine, Serializer<Serializer> serializerSerializer, long rootRecid){
308 StackRoot root = engine.get(rootRecid, new StackRootSerializer(serializerSerializer));
309 return new Stack<E>(engine, root.serializer, root.headerRecid, root.useLocks);
310 }
311
312 /**
313 * First in first out lock-free queue
314 *
315 * @param <E>
316 */
317 public static class Queue<E> extends SimpleQueue<E> {
318
319 protected final Atomic.Long tail;
320 protected final Atomic.Long size;
321
322 public Queue(Engine engine, Serializer<E> serializer, long headerRecid, long nextTailRecid, long sizeRecid) {
323 super(engine, serializer,headerRecid);
324 tail = new Atomic.Long(engine,nextTailRecid);
325 size = new Atomic.Long(engine,sizeRecid);
326 }
327
328 @SuppressWarnings("unchecked")
329 @Override
330 public boolean add(E item){
331 final long nextTail = engine.put((Node<E>)Node.EMPTY, nodeSerializer);
332 Node<E> n = new Node<E>(nextTail, item);
333 long tail2 = tail.get();
334 while(!engine.compareAndSwap(tail2, (Node<E>)Node.EMPTY, n, nodeSerializer)){
335 tail2 = tail.get();
336 }
337 head.compareAndSet(0,tail2);
338 tail.set(nextTail);
339 size.incrementAndGet();
340 return true;
341 }
342
343 @SuppressWarnings("unchecked")
344 @Override
345 public E poll(){
346 while(true){
347 long head2 = head.get();
348 if(head2 == 0)return null;
349 Node<E> n = engine.get(head2,nodeSerializer);
350 if(n==null){
351 //TODO we need to know when queue is empty and we can break the cycle
352 // I am not really sure under what concurrent situation is n==null, so there is 'size' hack
353 // but 'size' hack is probably not thread-safe
354 if(size.get()==0)return null ;
355 continue;
356 }
357 if(!engine.compareAndSwap(head2,n, (Node<E>)Node.EMPTY, nodeSerializer))
358 continue;
359 if(!head.compareAndSet(head2,n.next)) throw new InternalError();
360 size.decrementAndGet();
361 return n.value;
362 }
363 }
364
365 @Override
366 public E peek() {
367 long head2 = head.get();
368 if(head2==0) return null;
369 Node<E> n = engine.get(head2,nodeSerializer);
370 while(n == null){
371 if(size.get()==0) return null;
372 n = engine.get(head2,nodeSerializer);
373 }
374
375 return n.value;
376 }
377 }
378
379
380 protected static final class QueueRoot{
381 final long headerRecid;
382 final long nextTailRecid;
383 final Serializer serializer;
384 final long sizeRecid;
385
386 public QueueRoot(long headerRecid, long nextTailRecid, long sizeRecid, Serializer serializer) {
387 this.headerRecid = headerRecid;
388 this.nextTailRecid = nextTailRecid;
389 this.serializer = serializer;
390 this.sizeRecid = sizeRecid;
391 }
392 }
393
394 protected static final class QueueRootSerializer implements Serializer<QueueRoot>{
395
396 final Serializer<Serializer> serialierSerializer;
397
398 public QueueRootSerializer(Serializer<Serializer> serialierSerializer) {
399 this.serialierSerializer = serialierSerializer;
400 }
401
402 @Override
403 public void serialize(DataOutput out, QueueRoot value) throws IOException {
404 out.write(SerializationHeader.MAPDB_QUEUE);
405 Utils.packLong(out, value.headerRecid);
406 Utils.packLong(out, value.nextTailRecid);
407 Utils.packLong(out, value.sizeRecid);
408 serialierSerializer.serialize(out,value.serializer);
409 }
410
411 @Override
412 public QueueRoot deserialize(DataInput in, int available) throws IOException {
413 if(in.readUnsignedByte()!=SerializationHeader.MAPDB_QUEUE) throw new InternalError();
414 return new QueueRoot(
415 Utils.unpackLong(in),
416 Utils.unpackLong(in),
417 Utils.unpackLong(in),
418 serialierSerializer.deserialize(in,-1)
419 );
420 }
421 }
422
423 @SuppressWarnings("unchecked")
424 static <E> long createQueue(Engine engine, Serializer<Serializer> serializerSerializer, Serializer<E> serializer){
425 long headerRecid = engine.put(0L, Serializer.LONG_SERIALIZER);
426 long nextTail = engine.put(SimpleQueue.Node.EMPTY, new SimpleQueue.NodeSerializer(null));
427 long nextTailRecid = engine.put(nextTail, Serializer.LONG_SERIALIZER);
428 long sizeRecid = engine.put(0L, Serializer.LONG_SERIALIZER);
429 QueueRoot root = new QueueRoot(headerRecid, nextTailRecid, sizeRecid, serializer);
430 QueueRootSerializer rootSerializer = new QueueRootSerializer(serializerSerializer);
431 return engine.put(root, rootSerializer);
432 }
433
434 @SuppressWarnings("unchecked")
435 static <E> Queue<E> getQueue(Engine engine, Serializer<Serializer> serializerSerializer, long rootRecid){
436 QueueRoot root = engine.get(rootRecid, new QueueRootSerializer(serializerSerializer));
437 return new Queue<E>(engine, root.serializer, root.headerRecid, root.nextTailRecid,root.sizeRecid);
438 }
439
440 public static class CircularQueue<E> extends SimpleQueue<E> {
441
442 protected final Atomic.Long headInsert;
443 //TODO is there a way to implement this without global locks?
444 protected final Lock lock = new ReentrantLock();
445 protected final long size;
446
447 @SuppressWarnings("unchecked")
448 public CircularQueue(Engine engine, Serializer serializer, long headRecid, long headInsertRecid, long size) {
449 super(engine, serializer, headRecid);
450 headInsert = new Atomic.Long(engine, headInsertRecid);
451 this.size = size;
452 }
453
454 @SuppressWarnings("unchecked")
455 @Override
456 public boolean add(Object o) {
457 lock.lock();
458 try{
459 long nRecid = headInsert.get();
460 Node<E> n = engine.get(nRecid, nodeSerializer);
461 n = new Node<E>(n.next, (E) o);
462 engine.update(nRecid, n, nodeSerializer);
463 headInsert.set(n.next);
464 //move 'poll' head if it points to currently replaced item
465 head.compareAndSet(nRecid, n.next);
466 return true;
467 }finally {
468 lock.unlock();
469 }
470 }
471
472 @Override
473 public void clear() {
474 // praise locking
475 lock.lock();
476 try {
477 for (int i = 0; i < size; i++) {
478 poll();
479 }
480 } finally {
481 lock.unlock();
482 }
483 }
484
485 @Override
486 public E poll() {
487 lock.lock();
488 try{
489 long nRecid = head.get();
490 Node<E> n = engine.get(nRecid, nodeSerializer);
491 engine.update(nRecid, new Node<E>(n.next, null), nodeSerializer);
492 head.set(n.next);
493 return n.value;
494 }finally {
495 lock.unlock();
496 }
497 }
498
499 @Override
500 public E peek() {
501 lock.lock();
502 try{
503 long nRecid = head.get();
504 Node<E> n = engine.get(nRecid, nodeSerializer);
505 return n.value;
506 }finally {
507 lock.unlock();
508 }
509 }
510 }
511
512 protected static final class CircularQueueRoot{
513 final long headerRecid;
514 final long headerInsertRecid;
515 final Serializer serializer;
516 final long sizeRecid;
517
518 public CircularQueueRoot(long headerRecid, long headerInsertRecid, long sizeRecid, Serializer serializer) {
519 this.headerRecid = headerRecid;
520 this.headerInsertRecid = headerInsertRecid;
521 this.serializer = serializer;
522 this.sizeRecid = sizeRecid;
523 }
524 }
525
526 protected static final class CircularQueueRootSerializer implements Serializer<CircularQueueRoot>{
527
528 final Serializer<Serializer> serialierSerializer;
529
530 public CircularQueueRootSerializer(Serializer<Serializer> serialierSerializer) {
531 this.serialierSerializer = serialierSerializer;
532 }
533
534 @Override
535 public void serialize(DataOutput out, CircularQueueRoot value) throws IOException {
536 out.write(SerializationHeader.MAPDB_CIRCULAR_QUEUE);
537 Utils.packLong(out, value.headerRecid);
538 Utils.packLong(out, value.headerInsertRecid);
539 Utils.packLong(out, value.sizeRecid);
540 serialierSerializer.serialize(out,value.serializer);
541 }
542
543 @Override
544 public CircularQueueRoot deserialize(DataInput in, int available) throws IOException {
545 if(in.readUnsignedByte()!=SerializationHeader.MAPDB_CIRCULAR_QUEUE) throw new InternalError();
546 return new CircularQueueRoot(
547 Utils.unpackLong(in),
548 Utils.unpackLong(in),
549 Utils.unpackLong(in),
550 serialierSerializer.deserialize(in,-1)
551 );
552 }
553 }
554
555 @SuppressWarnings({ "rawtypes", "unchecked" })
556 static <E> long createCircularQueue(Engine engine, Serializer<Serializer> serializerSerializer, Serializer<E> serializer, long size){
557 if(size<2) throw new IllegalArgumentException();
558 //insert N Nodes empty nodes into a circle
559 long prevRecid = 0;
560 long firstRecid = 0;
561 Serializer<SimpleQueue.Node> nodeSer = new SimpleQueue.NodeSerializer(serializerSerializer);
562 for(long i=0;i<size;i++){
563 SimpleQueue.Node n = new SimpleQueue.Node(prevRecid, null);
564 prevRecid = engine.put(n, nodeSer);
565 if(firstRecid==0) firstRecid = prevRecid;
566 }
567 //update first node to point to last recid
568 engine.update(firstRecid, new SimpleQueue.Node(prevRecid, null), nodeSer );
569
570 long headerRecid = engine.put(prevRecid, Serializer.LONG_SERIALIZER);
571 long headerInsertRecid = engine.put(prevRecid, Serializer.LONG_SERIALIZER);
572
573 CircularQueueRoot root = new CircularQueueRoot(headerRecid, headerInsertRecid, size, serializer);
574 CircularQueueRootSerializer rootSerializer = new CircularQueueRootSerializer(serializerSerializer);
575 return engine.put(root, rootSerializer);
576 }
577
578
579 static <E> CircularQueue<E> getCircularQueue(Engine engine, Serializer<Serializer> serializerSerializer, long rootRecid){
580 CircularQueueRoot root = engine.get(rootRecid, new CircularQueueRootSerializer(serializerSerializer));
581 return new CircularQueue<E>(engine, root.serializer, root.headerRecid, root.headerInsertRecid,root.sizeRecid);
582 }
583
584}
Note: See TracBrowser for help on using the repository browser.