source: osm/applications/editors/josm/plugins/imagerycache/src/org/mapdb/Queues.java@ 29363

Last change on this file since 29363 was 29363, checked in by akks, 11 years ago

JOSM/ImageryCache: Initial commit

File size: 19.0 KB
Line 
1package org.mapdb;
2
3
4
5import java.io.DataInput;
6import java.io.DataOutput;
7import java.io.IOException;
8import java.util.Collection;
9import java.util.Iterator;
10import java.util.NoSuchElementException;
11import java.util.concurrent.locks.Lock;
12import java.util.concurrent.locks.ReentrantLock;
13
14/**
15 * Various queues algorithms
16 */
17public final class Queues {
18
19 private Queues(){}
20
21
22 public static abstract class SimpleQueue<E> implements java.util.Queue<E>{
23
24 protected final Engine engine;
25 protected final Serializer<E> serializer;
26
27 protected final Atomic.Long head;
28
29
30 protected static class NodeSerializer<E> implements Serializer<Node<E>> {
31 private final Serializer<E> serializer;
32
33 public NodeSerializer(Serializer<E> serializer) {
34 this.serializer = serializer;
35 }
36
37 @Override
38 public void serialize(DataOutput out, Node<E> value) throws IOException {
39 if(value==Node.EMPTY) return;
40 Utils.packLong(out,value.next);
41 serializer.serialize(out, value.value);
42 }
43
44 @Override
45 public Node<E> deserialize(DataInput in, int available) throws IOException {
46 if(available==0)return Node.EMPTY;
47 return new Node<E>(Utils.unpackLong(in), serializer.deserialize(in,-1));
48 }
49 }
50
51 protected final Serializer<Node<E>> nodeSerializer;
52
53
54 public SimpleQueue(Engine engine, Serializer<E> serializer, long headRecid) {
55 this.engine = engine;
56 this.serializer = serializer;
57 if(headRecid == 0) headRecid = engine.put(0L, Serializer.LONG_SERIALIZER);
58 head = new Atomic.Long(engine,headRecid);
59 nodeSerializer = new NodeSerializer<E>(serializer);
60 }
61
62
63 /**
64 * Closes underlying storage and releases all resources.
65 * Used mostly with temporary collections where engine is not accessible.
66 */
67 public void close(){
68 engine.close();
69 }
70
71
72
73 protected static final class Node<E>{
74
75 protected static final Node EMPTY = new Node(0L, null);
76
77 final protected long next;
78 final protected E value;
79
80 public Node(long next, E value) {
81 this.next = next;
82 this.value = value;
83 }
84
85 @Override
86 public boolean equals(Object o) {
87 if (this == o) return true;
88 if (o == null || getClass() != o.getClass()) return false;
89
90 Node node = (Node) o;
91
92 if (next != node.next) return false;
93 if (value != null ? !value.equals(node.value) : node.value != null) return false;
94
95 return true;
96 }
97
98 @Override
99 public int hashCode() {
100 int result = (int) (next ^ (next >>> 32));
101 result = 31 * result + (value != null ? value.hashCode() : 0);
102 return result;
103 }
104 }
105
106 @Override
107 public void clear() {
108 while(!isEmpty())
109 remove();
110 }
111
112
113 @Override
114 public E remove() {
115 E ret = poll();
116 if(ret == null) throw new NoSuchElementException();
117 return ret;
118 }
119
120
121 @Override
122 public E element() {
123 E ret = peek();
124 if(ret == null) throw new NoSuchElementException();
125 return ret;
126
127 }
128
129
130 @Override
131 public boolean offer(E e) {
132 return add(e);
133 }
134
135
136
137 @Override
138 public boolean isEmpty() {
139 return head.get()==0;
140 }
141
142
143 @Override
144 public int size() {
145 throw new UnsupportedOperationException();
146 }
147
148
149 @Override
150 public boolean contains(Object o) {
151 throw new UnsupportedOperationException();
152 }
153
154 @Override
155 public Iterator<E> iterator() {
156 throw new UnsupportedOperationException();
157 }
158
159 @Override
160 public Object[] toArray() {
161 throw new UnsupportedOperationException();
162 }
163
164 @Override
165 public <T> T[] toArray(T[] a) {
166 throw new UnsupportedOperationException();
167 }
168
169
170 @Override
171 public boolean remove(Object o) {
172 throw new UnsupportedOperationException();
173 }
174
175 @Override
176 public boolean containsAll(Collection<?> c) {
177 throw new UnsupportedOperationException();
178 }
179
180 @Override
181 public boolean addAll(Collection<? extends E> c) {
182 throw new UnsupportedOperationException();
183 }
184
185 @Override
186 public boolean removeAll(Collection<?> c) {
187 throw new UnsupportedOperationException();
188 }
189
190 @Override
191 public boolean retainAll(Collection<?> c) {
192 throw new UnsupportedOperationException();
193 }
194 }
195
196 /**
197 * Last in first out lock-free queue
198 *
199 * @param <E>
200 */
201 public static class Stack<E> extends SimpleQueue<E> {
202
203 protected final boolean useLocks;
204 protected final Locks.RecidLocks locks;
205
206
207
208 public Stack(Engine engine, Serializer<E> serializer, long headerRecid, boolean useLocks) {
209 super(engine, serializer, headerRecid);
210 this.useLocks = useLocks;
211 locks = useLocks? new Locks.LongHashMapRecidLocks() : null;
212 }
213
214 @Override
215 public E peek() {
216 while(true){
217 long head2 = head.get();
218 if(0 == head2) return null;
219 Node<E> n = engine.get(head2, nodeSerializer);
220 long head3 = head.get();
221 if(0 == head2) return null;
222 if(head2 == head3) return (E) n.value;
223 }
224 }
225
226 @Override
227 public E poll() {
228 long head2 = 0;
229 Node<E> n;
230 do{
231 if(useLocks && head2!=0)locks.unlock(head2);
232 head2 =head.get();
233 if(head2 == 0) return null;
234
235 if(useLocks && head2!=0)locks.lock(head2);
236 n = engine.get(head2, nodeSerializer);
237 }while(n==null || !head.compareAndSet(head2, n.next));
238 if(useLocks && head2!=0){
239 engine.delete(head2,Serializer.LONG_SERIALIZER);
240 locks.unlock(head2);
241 }else{
242 engine.update(head2, null, nodeSerializer);
243 }
244 return (E) n.value;
245 }
246
247
248 @Override
249 public boolean add(E e) {
250 long head2 = head.get();
251 Node<E> n = new Node<E>(head2, e);
252 long recid = engine.put(n, nodeSerializer);
253 while(!head.compareAndSet(head2, recid)){
254 //failed to update head, so read new value and start over
255 head2 = head.get();
256 n = new Node<E>(head2, e);
257 engine.update(recid, n, nodeSerializer);
258 }
259 return true;
260 }
261 }
262
263 protected static final class StackRoot{
264 final long headerRecid;
265 final boolean useLocks;
266 final Serializer serializer;
267
268 public StackRoot(long headerRecid, boolean useLocks, Serializer serializer) {
269 this.headerRecid = headerRecid;
270 this.useLocks = useLocks;
271 this.serializer = serializer;
272 }
273 }
274
275 protected static final class StackRootSerializer implements Serializer<StackRoot>{
276
277 final Serializer<Serializer> serialierSerializer;
278
279 public StackRootSerializer(Serializer<Serializer> serialierSerializer) {
280 this.serialierSerializer = serialierSerializer;
281 }
282
283 @Override
284 public void serialize(DataOutput out, StackRoot value) throws IOException {
285 out.write(SerializationHeader.MAPDB_STACK);
286 Utils.packLong(out, value.headerRecid);
287 out.writeBoolean(value.useLocks);
288 serialierSerializer.serialize(out,value.serializer);
289 }
290
291 @Override
292 public StackRoot deserialize(DataInput in, int available) throws IOException {
293 if(in.readUnsignedByte()!=SerializationHeader.MAPDB_STACK) throw new InternalError();
294 return new StackRoot(
295 Utils.unpackLong(in),
296 in.readBoolean(),
297 serialierSerializer.deserialize(in,-1)
298 );
299 }
300 }
301
302 static <E> long createStack(Engine engine, Serializer<Serializer> serializerSerializer, Serializer<E> serializer, boolean useLocks){
303 long headerRecid = engine.put(0L, Serializer.LONG_SERIALIZER);
304 StackRoot root = new StackRoot(headerRecid, useLocks, serializer);
305 StackRootSerializer rootSerializer = new StackRootSerializer(serializerSerializer);
306 return engine.put(root, rootSerializer);
307 }
308
309 static <E> Stack<E> getStack(Engine engine, Serializer<Serializer> serializerSerializer, long rootRecid){
310 StackRoot root = engine.get(rootRecid, new StackRootSerializer(serializerSerializer));
311 return new Stack<E>(engine, root.serializer, root.headerRecid, root.useLocks);
312 }
313
314 /**
315 * First in first out lock-free queue
316 *
317 * @param <E>
318 */
319 public static class Queue<E> extends SimpleQueue<E> {
320
321 protected final Atomic.Long tail;
322 protected final Atomic.Long size;
323
324 public Queue(Engine engine, Serializer<E> serializer, long headerRecid, long nextTailRecid, long sizeRecid) {
325 super(engine, serializer,headerRecid);
326 tail = new Atomic.Long(engine,nextTailRecid);
327 size = new Atomic.Long(engine,sizeRecid);
328 }
329
330
331 @Override
332 public boolean isEmpty() {
333 return head.get() == 0;
334 }
335
336 public boolean add(E item){
337 final long nextTail = engine.put((Node<E>)Node.EMPTY, nodeSerializer);
338 Node<E> n = new Node<E>(nextTail, item);
339 long tail2 = tail.get();
340 while(!engine.compareAndSwap(tail2, (Node<E>)Node.EMPTY, n, nodeSerializer)){
341 tail2 = tail.get();
342 }
343 head.compareAndSet(0,tail2);
344 tail.set(nextTail);
345 size.incrementAndGet();
346 return true;
347 }
348
349 public E poll(){
350 while(true){
351 long head2 = head.get();
352 if(head2 == 0)return null;
353 Node<E> n = engine.get(head2,nodeSerializer);
354 if(n==null){
355 //TODO we need to know when queue is empty and we can break the cycle
356 // I am not really sure under what concurrent situation is n==null, so there is 'size' hack
357 // but 'size' hack is probably not thread-safe
358 if(size.get()==0)return null ;
359 continue;
360 }
361 if(!engine.compareAndSwap(head2,n, (Node<E>)Node.EMPTY, nodeSerializer))
362 continue;
363 if(!head.compareAndSet(head2,n.next)) throw new InternalError();
364 size.decrementAndGet();
365 return n.value;
366 }
367 }
368
369 @Override
370 public E peek() {
371 long head2 = head.get();
372 if(head2==0) return null;
373 Node<E> n = engine.get(head2,nodeSerializer);
374 while(n == null){
375 if(size.get()==0) return null;
376 n = engine.get(head2,nodeSerializer);
377 }
378
379 return n.value;
380 }
381 }
382
383
384 protected static final class QueueRoot{
385 final long headerRecid;
386 final long nextTailRecid;
387 final Serializer serializer;
388 final long sizeRecid;
389
390 public QueueRoot(long headerRecid, long nextTailRecid, long sizeRecid, Serializer serializer) {
391 this.headerRecid = headerRecid;
392 this.nextTailRecid = nextTailRecid;
393 this.serializer = serializer;
394 this.sizeRecid = sizeRecid;
395 }
396 }
397
398 protected static final class QueueRootSerializer implements Serializer<QueueRoot>{
399
400 final Serializer<Serializer> serialierSerializer;
401
402 public QueueRootSerializer(Serializer<Serializer> serialierSerializer) {
403 this.serialierSerializer = serialierSerializer;
404 }
405
406 @Override
407 public void serialize(DataOutput out, QueueRoot value) throws IOException {
408 out.write(SerializationHeader.MAPDB_QUEUE);
409 Utils.packLong(out, value.headerRecid);
410 Utils.packLong(out, value.nextTailRecid);
411 Utils.packLong(out, value.sizeRecid);
412 serialierSerializer.serialize(out,value.serializer);
413 }
414
415 @Override
416 public QueueRoot deserialize(DataInput in, int available) throws IOException {
417 if(in.readUnsignedByte()!=SerializationHeader.MAPDB_QUEUE) throw new InternalError();
418 return new QueueRoot(
419 Utils.unpackLong(in),
420 Utils.unpackLong(in),
421 Utils.unpackLong(in),
422 serialierSerializer.deserialize(in,-1)
423 );
424 }
425 }
426
427 static <E> long createQueue(Engine engine, Serializer<Serializer> serializerSerializer, Serializer<E> serializer){
428 long headerRecid = engine.put(0L, Serializer.LONG_SERIALIZER);
429 long nextTail = engine.put(SimpleQueue.Node.EMPTY, new SimpleQueue.NodeSerializer(null));
430 long nextTailRecid = engine.put(nextTail, Serializer.LONG_SERIALIZER);
431 long sizeRecid = engine.put(0L, Serializer.LONG_SERIALIZER);
432 QueueRoot root = new QueueRoot(headerRecid, nextTailRecid, sizeRecid, serializer);
433 QueueRootSerializer rootSerializer = new QueueRootSerializer(serializerSerializer);
434 return engine.put(root, rootSerializer);
435 }
436
437
438 static <E> Queue<E> getQueue(Engine engine, Serializer<Serializer> serializerSerializer, long rootRecid){
439 QueueRoot root = engine.get(rootRecid, new QueueRootSerializer(serializerSerializer));
440 return new Queue<E>(engine, root.serializer, root.headerRecid, root.nextTailRecid,root.sizeRecid);
441 }
442
443 public static class CircularQueue<E> extends SimpleQueue<E> {
444
445 protected final Atomic.Long headInsert;
446 //TODO is there a way to implement this without global locks?
447 protected final Lock lock = new ReentrantLock();
448 protected final long size;
449
450 public CircularQueue(Engine engine, Serializer serializer, long headRecid, long headInsertRecid, long size) {
451 super(engine, serializer, headRecid);
452 headInsert = new Atomic.Long(engine, headInsertRecid);
453 this.size = size;
454 }
455
456 @Override
457 public boolean add(Object o) {
458 lock.lock();
459 try{
460 long nRecid = headInsert.get();
461 Node<E> n = engine.get(nRecid, nodeSerializer);
462 n = new Node<E>(n.next, (E) o);
463 engine.update(nRecid, n, nodeSerializer);
464 headInsert.set(n.next);
465 //move 'poll' head if it points to currently replaced item
466 head.compareAndSet(nRecid, n.next);
467 return true;
468 }finally {
469 lock.unlock();
470 }
471 }
472
473 @Override
474 public E poll() {
475 lock.lock();
476 try{
477 long nRecid = head.get();
478 Node<E> n = engine.get(nRecid, nodeSerializer);
479 engine.update(nRecid, new Node<E>(n.next, null), nodeSerializer);
480 head.set(n.next);
481 return n.value;
482 }finally {
483 lock.unlock();
484 }
485 }
486
487 @Override
488 public E peek() {
489 lock.lock();
490 try{
491 long nRecid = head.get();
492 Node<E> n = engine.get(nRecid, nodeSerializer);
493 return n.value;
494 }finally {
495 lock.unlock();
496 }
497 }
498 }
499
500 protected static final class CircularQueueRoot{
501 final long headerRecid;
502 final long headerInsertRecid;
503 final Serializer serializer;
504 final long sizeRecid;
505
506 public CircularQueueRoot(long headerRecid, long headerInsertRecid, long sizeRecid, Serializer serializer) {
507 this.headerRecid = headerRecid;
508 this.headerInsertRecid = headerInsertRecid;
509 this.serializer = serializer;
510 this.sizeRecid = sizeRecid;
511 }
512 }
513
514 protected static final class CircularQueueRootSerializer implements Serializer<CircularQueueRoot>{
515
516 final Serializer<Serializer> serialierSerializer;
517
518 public CircularQueueRootSerializer(Serializer<Serializer> serialierSerializer) {
519 this.serialierSerializer = serialierSerializer;
520 }
521
522 @Override
523 public void serialize(DataOutput out, CircularQueueRoot value) throws IOException {
524 out.write(SerializationHeader.MAPDB_CIRCULAR_QUEUE);
525 Utils.packLong(out, value.headerRecid);
526 Utils.packLong(out, value.headerInsertRecid);
527 Utils.packLong(out, value.sizeRecid);
528 serialierSerializer.serialize(out,value.serializer);
529 }
530
531 @Override
532 public CircularQueueRoot deserialize(DataInput in, int available) throws IOException {
533 if(in.readUnsignedByte()!=SerializationHeader.MAPDB_CIRCULAR_QUEUE) throw new InternalError();
534 return new CircularQueueRoot(
535 Utils.unpackLong(in),
536 Utils.unpackLong(in),
537 Utils.unpackLong(in),
538 serialierSerializer.deserialize(in,-1)
539 );
540 }
541 }
542
543 static <E> long createCircularQueue(Engine engine, Serializer<Serializer> serializerSerializer, Serializer<E> serializer, long size){
544 if(size<2) throw new IllegalArgumentException();
545 //insert N Nodes empty nodes into a circle
546 long prevRecid = 0;
547 long firstRecid = 0;
548 Serializer<SimpleQueue.Node> nodeSer = new SimpleQueue.NodeSerializer(serializerSerializer);
549 for(long i=0;i<size;i++){
550 SimpleQueue.Node n = new SimpleQueue.Node(prevRecid, null);
551 prevRecid = engine.put(n, nodeSer);
552 if(firstRecid==0) firstRecid = prevRecid;
553 }
554 //update first node to point to last recid
555 engine.update(firstRecid, new SimpleQueue.Node(prevRecid, null), nodeSer );
556
557 long headerRecid = engine.put(prevRecid, Serializer.LONG_SERIALIZER);
558 long headerInsertRecid = engine.put(prevRecid, Serializer.LONG_SERIALIZER);
559
560 CircularQueueRoot root = new CircularQueueRoot(headerRecid, headerInsertRecid, size, serializer);
561 CircularQueueRootSerializer rootSerializer = new CircularQueueRootSerializer(serializerSerializer);
562 return engine.put(root, rootSerializer);
563 }
564
565
566 static <E> CircularQueue<E> getCircularQueue(Engine engine, Serializer<Serializer> serializerSerializer, long rootRecid){
567 CircularQueueRoot root = engine.get(rootRecid, new CircularQueueRootSerializer(serializerSerializer));
568 return new CircularQueue<E>(engine, root.serializer, root.headerRecid, root.headerInsertRecid,root.sizeRecid);
569 }
570
571
572}
Note: See TracBrowser for help on using the repository browser.