source: osm/applications/editors/josm/plugins/imagerycache/src/org/mapdb/StoreDirect.java@ 29484

Last change on this file since 29484 was 29484, checked in by akks, 11 years ago

JOSM/ImageryCache: updated MapDB (no more deadlocks, Java 1.6 compatible), less crashes, multiple-JOSM support

File size: 22.1 KB
Line 
1/*
2 * Copyright (c) 2012 Jan Kotek
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.mapdb;
17
18import java.io.File;
19import java.io.IOError;
20import java.io.IOException;
21import java.util.Arrays;
22import java.util.concurrent.locks.ReentrantLock;
23import java.util.concurrent.locks.ReentrantReadWriteLock;
24
25/**
26 * Storage Engine which saves record directly into file.
27 * Is used when transaction journal is disabled.
28 *
29 * @author Jan Kotek
30 */
31public class StoreDirect implements Engine{
32
33 protected static final long MASK_OFFSET = 0x0000FFFFFFFFFFFFL;
34
35 protected static final long MASK_SIZE = 0x7fff000000000000L;
36 protected static final long MASK_IS_LINKED = 0x8000000000000000L;
37
38 protected static final long HEADER = 9032094932889042394L;
39
40 /** maximal non linked record size */
41 protected static final int MAX_REC_SIZE = 32767;
42
43 /** number of free physical slots */
44 protected static final int PHYS_FREE_SLOTS_COUNT = 2048;
45
46 /** index file offset where current size of index file is stored*/
47 protected static final int IO_INDEX_SIZE = 1*8;
48 /** index file offset where current size of phys file is stored */
49 protected static final int IO_PHYS_SIZE = 2*8;
50 /** index file offset where reference to longstack of free recid is stored*/
51 protected static final int IO_FREE_RECID = 15*8;
52
53 /** index file offset where first recid available to user is stored */
54 protected static final int IO_USER_START = IO_FREE_RECID+PHYS_FREE_SLOTS_COUNT*8+8;
55
56 public static final String DATA_FILE_EXT = ".p";
57
58
59 static final int LONG_STACK_PER_PAGE = 204;
60
61 static final int LONG_STACK_PAGE_SIZE = 8 + LONG_STACK_PER_PAGE * 6;
62
63
64 protected final ReentrantReadWriteLock[] locks = Utils.newReadWriteLocks(32);
65 protected final ReentrantLock structuralLock = new ReentrantLock();
66
67 protected Volume index;
68 protected Volume phys;
69
70 protected long physSize;
71 protected long indexSize;
72
73 protected final boolean deleteFilesAfterClose;
74
75 protected final boolean readOnly;
76
77 public StoreDirect(Volume.Factory volFac, boolean readOnly, boolean deleteFilesAfterClose) {
78 this.readOnly = readOnly;
79 this.deleteFilesAfterClose = deleteFilesAfterClose;
80
81
82 index = volFac.createIndexVolume();
83 phys = volFac.createPhysVolume();
84 if(index.isEmpty()){
85 createStructure();
86 }else{
87 checkHeaders();
88 indexSize = index.getLong(IO_INDEX_SIZE);
89 physSize = index.getLong(IO_PHYS_SIZE);
90 }
91
92 }
93
94 public StoreDirect(Volume.Factory volFac) {
95 this(volFac, false,false);
96 }
97
98 protected void checkHeaders() {
99 if(index.getLong(0)!=HEADER||phys.getLong(0)!=HEADER)throw new IOError(new IOException("storage has invalid header"));
100 }
101
102 protected void createStructure() {
103 indexSize = IO_USER_START+Engine.LAST_RESERVED_RECID*8+8;
104 index.ensureAvailable(indexSize);
105 for(int i=0;i<indexSize;i+=8) index.putLong(i,0L);
106 index.putLong(0, HEADER);
107 index.putLong(IO_INDEX_SIZE,indexSize);
108 physSize =16;
109 phys.ensureAvailable(physSize);
110 phys.putLong(0, HEADER);
111 index.putLong(IO_PHYS_SIZE,physSize);
112 }
113
114
115 @Override
116 public <A> long put(A value, Serializer<A> serializer) {
117 DataOutput2 out = serialize(value, serializer);
118
119 structuralLock.lock();
120 final long ioRecid;
121 final long[] indexVals;
122 try{
123 ioRecid = freeIoRecidTake(true) ;
124 indexVals = physAllocate(out.pos,true);
125 }finally {
126 structuralLock.unlock();
127 }
128
129 put2(out, ioRecid, indexVals);
130
131 return (ioRecid-IO_USER_START)/8;
132 }
133
134 private void put2(DataOutput2 out, long ioRecid, long[] indexVals) {
135 index.putLong(ioRecid, indexVals[0]);
136 //write stuff
137 if(indexVals.length==1||indexVals[1]==0){ //is more then one? ie linked
138 //write single
139
140 phys.putData(indexVals[0]&MASK_OFFSET, out.buf, 0, out.pos);
141
142 }else{
143 int outPos = 0;
144 //write linked
145 for(int i=0;i<indexVals.length;i++){
146 final int c = ccc(indexVals.length, i);
147 final long indexVal = indexVals[i];
148 final boolean isLast = (indexVal & MASK_IS_LINKED) ==0;
149 if(isLast!=(i==indexVals.length-1)) throw new InternalError();
150 final int size = (int) ((indexVal& MASK_SIZE)>>48);
151 final long offset = indexVal&MASK_OFFSET;
152
153 //write data
154 phys.putData(offset+c,out.buf,outPos, size-c);
155 outPos+=size-c;
156
157 if(c>0){
158 //write position of next linked record
159 phys.putLong(offset, indexVals[i+1]);
160 }
161 if(c==12){
162 //write total size in first record
163 phys.putInt(offset+8, out.pos);
164 }
165 }
166 if(outPos!=out.pos) throw new InternalError();
167 }
168 }
169
170
171 @Override
172 public <A> A get(long recid, Serializer<A> serializer) {
173 final long ioRecid = IO_USER_START + recid*8;
174 Utils.readLock(locks, recid);
175 try{
176 return get2(ioRecid,serializer);
177 }catch(IOException e){
178 throw new IOError(e);
179 }finally{
180 Utils.readUnlock(locks, recid);
181 }
182 }
183
184 protected <A> A get2(long ioRecid,Serializer<A> serializer) throws IOException {
185 long indexVal = index.getLong(ioRecid);
186
187 int size = (int) ((indexVal&MASK_SIZE)>>>48);
188 DataInput2 di;
189 long offset = indexVal&MASK_OFFSET;
190 if((indexVal&MASK_IS_LINKED)==0){
191 //read single record
192 di = phys.getDataInput(offset, size);
193
194 }else{
195 //is linked, first construct buffer we will read data to
196 int totalSize = phys.getInt(offset+8);
197 int pos = 0;
198 int c = 12;
199 byte[] buf = new byte[totalSize];
200 //read parts into segment
201 for(;;){
202 DataInput2 in = phys.getDataInput(offset + c, size-c);
203 in.readFully(buf,pos,size-c);
204 pos+=size-c;
205 if(c==0) break;
206 //read next part
207 long next = phys.getLong(offset);
208 offset = next&MASK_OFFSET;
209 size = (int) ((next&MASK_SIZE)>>>48);
210 //is the next part last?
211 c = ((next&MASK_IS_LINKED)==0)? 0 : 8;
212 }
213 if(pos!=totalSize) throw new InternalError();
214 di = new DataInput2(buf);
215 size = totalSize;
216 }
217 int start = di.pos;
218 A ret = serializer.deserialize(di,size);
219 if(size+start>di.pos)throw new InternalError("data were not fully read, check your serializier");
220 if(size+start<di.pos)throw new InternalError("data were read beyond record size, check your serializier");
221 return ret;
222 }
223
224
225 @Override
226 public <A> void update(long recid, A value, Serializer<A> serializer) {
227 DataOutput2 out = serialize(value, serializer);
228
229 final long ioRecid = IO_USER_START + recid*8;
230
231 Utils.writeLock(locks, recid);
232 try{
233 final long[] indexVals;
234 structuralLock.lock();
235 try{
236 indexVals = physAllocate(out.pos,true);
237 }finally {
238 structuralLock.unlock();
239 }
240
241 put2(out, ioRecid, indexVals);
242 }finally{
243 Utils.writeUnlock(locks, recid);
244 }
245 }
246
247 @Override
248 public <A> boolean compareAndSwap(long recid, A expectedOldValue, A newValue, Serializer<A> serializer) {
249 final long ioRecid = IO_USER_START + recid*8;
250 Utils.writeLock(locks, recid);
251 try{
252 /*
253 * deserialize old value
254 */
255
256 A oldVal = get2(ioRecid,serializer);
257
258 /*
259 * compare oldValue and expected
260 */
261 if((oldVal == null && expectedOldValue!=null) || (oldVal!=null && !oldVal.equals(expectedOldValue)))
262 return false;
263
264 /*
265 * write new value
266 */
267 DataOutput2 out = serialize(newValue, serializer);
268
269 final long[] indexVals;
270 structuralLock.lock();
271 try{
272 indexVals = physAllocate(out.pos,true);
273 }finally {
274 structuralLock.unlock();
275 }
276
277 put2(out, ioRecid, indexVals);
278 return true;
279 }catch(IOException e){
280 throw new IOError(e);
281 }finally{
282 Utils.writeUnlock(locks, recid);
283 }
284 }
285
286 @Override
287 public <A> void delete(long recid, Serializer<A> serializer) {
288 final long ioRecid = IO_USER_START + recid*8;
289 Utils.writeLock(locks, recid);
290 try{
291 //get index val and zero it out
292 final long indexVal = index.getLong(ioRecid);
293 index.putLong(ioRecid,0L);
294
295 long[] linkedRecords = null;
296 int linkedPos = 0;
297 if((indexVal&MASK_IS_LINKED)!=0){
298 //record is composed of multiple linked records, so collect all of them
299 linkedRecords = new long[2];
300
301 //traverse linked records
302 long linkedVal = phys.getLong(indexVal&MASK_OFFSET);
303 for(;;){
304 if(linkedPos==linkedRecords.length) //grow if necessary
305 linkedRecords = Arrays.copyOf(linkedRecords, linkedRecords.length*2);
306 //store last linkedVal
307 linkedRecords[linkedPos] = linkedVal;
308
309 if((linkedVal&MASK_IS_LINKED)==0){
310 break; //this is last linked record, so break
311 }
312 //move and read to next
313 linkedPos++;
314 linkedVal = phys.getLong(linkedVal&MASK_OFFSET);
315 }
316 }
317
318 //now lock everything and mark free space
319 structuralLock.lock();
320 try{
321 //free recid
322 freeIoRecidPut(ioRecid);
323 //free first record pointed from indexVal
324 freePhysPut(indexVal);
325
326 //if there are more linked records, free those as well
327 if(linkedRecords!=null){
328 for(int i=0;i<linkedPos;i++){
329 freePhysPut(linkedRecords[i]);
330 }
331 }
332 }finally {
333 structuralLock.unlock();
334 }
335
336 }finally{
337 Utils.writeUnlock(locks, recid);
338 }
339 }
340
341 protected long[] physAllocate(int size, boolean ensureAvail) {
342 if(size==0L) return new long[]{0L};
343 //append to end of file
344 if(size<MAX_REC_SIZE){
345 long indexVal = freePhysTake(size,ensureAvail);
346 indexVal |= ((long)size)<<48;
347 return new long[]{indexVal};
348 }else{
349 long[] ret = new long[2];
350 int retPos = 0;
351 int c = 12;
352
353 while(size>0){
354 if(retPos == ret.length) ret = Arrays.copyOf(ret, ret.length*2);
355 int allocSize = Math.min(size, MAX_REC_SIZE);
356 size -= allocSize - c;
357
358 //append to end of file
359 long indexVal = freePhysTake(allocSize, ensureAvail);
360 indexVal |= (((long)allocSize)<<48);
361 if(c!=0) indexVal|=MASK_IS_LINKED;
362 ret[retPos++] = indexVal;
363
364 c = size<=MAX_REC_SIZE ? 0 : 8;
365 }
366 if(size!=0) throw new InternalError();
367
368 return Arrays.copyOf(ret, retPos);
369 }
370 }
371
372
373
374 protected static long roundTo16(long offset){
375 long rem = offset%16;
376 if(rem!=0) offset +=16-rem;
377 return offset;
378 }
379
380
381 protected static int ccc(int size, int i) {
382 return (size==1|| i==size-1)? 0: (i==0?12:8);
383 }
384
385
386
387 @Override
388 public void close() {
389 structuralLock.lock();
390 Utils.writeLockAll(locks);
391 if(!readOnly){
392 index.putLong(IO_PHYS_SIZE,physSize);
393 index.putLong(IO_INDEX_SIZE,indexSize);
394 }
395
396 index.sync();
397 phys.sync();
398 index.close();
399 phys.close();
400 if(deleteFilesAfterClose){
401 index.deleteFile();
402 phys.deleteFile();
403 }
404 index = null;
405 phys = null;
406 Utils.writeUnlockAll(locks);
407 structuralLock.unlock();
408 }
409
410 @Override
411 public boolean isClosed() {
412 return index==null;
413 }
414
415 @Override
416 public void commit() {
417 if(!readOnly){
418 index.putLong(IO_PHYS_SIZE,physSize);
419 index.putLong(IO_INDEX_SIZE,indexSize);
420 }
421 index.sync();
422 phys.sync();
423
424 }
425
426 @Override
427 public void rollback() throws UnsupportedOperationException {
428 throw new UnsupportedOperationException("rollback not supported with journal disabled");
429 }
430
431 @Override
432 public boolean isReadOnly() {
433 return readOnly;
434 }
435
436 @Override
437 public void compact() {
438
439 if(readOnly) throw new IllegalAccessError();
440 index.putLong(IO_PHYS_SIZE,physSize);
441 index.putLong(IO_INDEX_SIZE,indexSize);
442
443 if(index.getFile()==null) throw new UnsupportedOperationException("compact not supported for memory storage yet");
444 structuralLock.lock();
445 for(ReentrantReadWriteLock l:locks) l.writeLock().lock();
446 try{
447 //create secondary files for compaction
448 //TODO RAF
449 //TODO memory based stores
450 final File indexFile = index.getFile();
451 final File physFile = phys.getFile();
452 final boolean isRaf = index instanceof Volume.RandomAccessFileVol;
453 Volume.Factory fab = Volume.fileFactory(false, isRaf, new File(indexFile+".compact"));
454 StoreDirect store2 = new StoreDirect(fab);
455 store2.structuralLock.lock();
456
457 //transfer stack of free recids
458 for(long recid =longStackTake(IO_FREE_RECID);
459 recid!=0; recid=longStackTake(IO_FREE_RECID)){
460 store2.longStackPut(recid, IO_FREE_RECID);
461 }
462
463 //iterate over recids and transfer physical records
464 store2.index.putLong(IO_INDEX_SIZE, indexSize);
465
466 for(long ioRecid = IO_USER_START; ioRecid<indexSize;ioRecid+=8){
467 byte[] bb = get2(ioRecid,Serializer.BYTE_ARRAY_SERIALIZER);
468 long[] indexVals = store2.physAllocate(bb.length,true);
469 DataOutput2 out = new DataOutput2();
470 out.buf = bb;
471 out.pos = bb.length;
472 store2.index.ensureAvailable(ioRecid+8);
473 store2.put2(out, ioRecid,indexVals);
474 }
475
476
477
478 File indexFile2 = store2.index.getFile();
479 File physFile2 = store2.phys.getFile();
480 store2.structuralLock.unlock();
481 store2.close();
482
483 long time = System.currentTimeMillis();
484 File indexFile_ = new File(indexFile.getPath()+"_"+time+"_orig");
485 File physFile_ = new File(physFile.getPath()+"_"+time+"_orig");
486
487 index.close();
488 phys.close();
489 if(!indexFile.renameTo(indexFile_))throw new InternalError("could not rename file");
490 if(!physFile.renameTo(physFile_))throw new InternalError("could not rename file");
491
492 if(!indexFile2.renameTo(indexFile))throw new InternalError("could not rename file");
493 //TODO process may fail in middle of rename, analyze sequence and add recovery
494 if(!physFile2.renameTo(physFile))throw new InternalError("could not rename file");
495
496 indexFile_.delete();
497 physFile_.delete();
498
499 Volume.Factory fac2 = Volume.fileFactory(false, isRaf, indexFile);
500 index = fac2.createIndexVolume();
501 phys = fac2.createPhysVolume();
502
503 }catch(IOException e){
504 throw new IOError(e);
505 }finally {
506 structuralLock.unlock();
507 for(ReentrantReadWriteLock l:locks) l.writeLock().unlock();
508 }
509
510 }
511
512
513 protected long longStackTake(final long ioList) {
514 if(!structuralLock.isLocked())throw new InternalError();
515 if(ioList<IO_FREE_RECID || ioList>=IO_USER_START) throw new IllegalArgumentException("wrong ioList: "+ioList);
516
517 final long dataOffset = index.getLong(ioList) &MASK_OFFSET;
518 if(dataOffset == 0)
519 return 0; //there is no such list, so just return 0
520
521
522 final int numberOfRecordsInPage = phys.getUnsignedByte(dataOffset);
523
524 if(numberOfRecordsInPage<=0)
525 throw new InternalError();
526 if(numberOfRecordsInPage> LONG_STACK_PER_PAGE)
527 throw new InternalError();
528
529 final long ret = phys.getSixLong(dataOffset + 2 + numberOfRecordsInPage * 6);
530
531 //was it only record at that page?
532 if(numberOfRecordsInPage == 1){
533 //yes, delete this page
534 final long previousListPhysid =phys.getSixLong(dataOffset+2);
535 if(previousListPhysid !=0){
536 //update index so it points to previous page
537 index.putLong(ioList , previousListPhysid | (((long) LONG_STACK_PAGE_SIZE) << 48));
538 }else{
539 //zero out index
540 index.putLong(ioList , 0L);
541 }
542 //put space used by this page into free list
543 freePhysPut(dataOffset | (((long)LONG_STACK_PAGE_SIZE)<<48));
544 }else{
545 //no, it was not last record at this page, so just decrement the counter
546 phys.putUnsignedByte(dataOffset, (byte) (numberOfRecordsInPage - 1));
547 }
548
549 //System.out.println("longStackTake: "+ioList+" - "+ret);
550
551 return ret;
552
553 }
554
555
556 protected void longStackPut(final long ioList, long offset){
557 offset = offset & MASK_OFFSET;
558 if(!structuralLock.isLocked())throw new InternalError();
559 if(ioList<IO_FREE_RECID || ioList>=IO_USER_START) throw new InternalError("wrong ioList: "+ioList);
560
561 //System.out.println("longStackPut: "+ioList+" - "+offset);
562
563 //index position was cleared, put into free index list
564 final long listPhysid2 = index.getLong(ioList) &MASK_OFFSET;
565
566 if(listPhysid2 == 0){ //empty list?
567 //yes empty, create new page and fill it with values
568 final long listPhysid = freePhysTake(LONG_STACK_PAGE_SIZE,true) &MASK_OFFSET;
569 if(listPhysid == 0) throw new InternalError();
570 //set previous Free Index List page to zero as this is first page
571 phys.putSixLong(listPhysid + 2, 0L);
572 //set number of free records in this page to 1
573 phys.putUnsignedByte(listPhysid, (byte) 1);
574 //set record
575 phys.putSixLong(listPhysid + 8, offset);
576 //and update index file with new page location
577 index.putLong(ioList , (((long) LONG_STACK_PAGE_SIZE) << 48) | listPhysid);
578 }else{
579 final int numberOfRecordsInPage = phys.getUnsignedByte(listPhysid2);
580 if(numberOfRecordsInPage == LONG_STACK_PER_PAGE){ //is current page full?
581 //yes it is full, so we need to allocate new page and write our number there
582
583 final long listPhysid = freePhysTake(LONG_STACK_PAGE_SIZE,true) &MASK_OFFSET;
584 if(listPhysid == 0) throw new InternalError();
585 //final ByteBuffers dataBuf = dataBufs[((int) (listPhysid / BUF_SIZE))];
586 //set location to previous page
587 phys.putSixLong(listPhysid + 2, listPhysid2);
588 //set number of free records in this page to 1
589 phys.putUnsignedByte(listPhysid, (byte) 1);
590 //set free record
591 phys.putSixLong(listPhysid + 8, offset);
592 //and update index file with new page location
593 index.putLong(ioList , (((long) LONG_STACK_PAGE_SIZE) << 48) | listPhysid);
594 }else{
595 //there is space on page, so just write released recid and increase the counter
596 phys.putSixLong(listPhysid2 + 8 + 6 * numberOfRecordsInPage, offset);
597 phys.putUnsignedByte(listPhysid2, (byte) (numberOfRecordsInPage + 1));
598 }
599 }
600 }
601
602
603
604 protected void freeIoRecidPut(long ioRecid) {
605 longStackPut(IO_FREE_RECID, ioRecid);
606 }
607
608 protected long freeIoRecidTake(boolean ensureAvail){
609 long ioRecid = longStackTake(IO_FREE_RECID);
610 if(ioRecid!=0) return ioRecid;
611 indexSize+=8;
612 if(ensureAvail)
613 index.ensureAvailable(indexSize);
614 return indexSize-8;
615 }
616
617 protected static final long size2ListIoRecid(long size){
618 return IO_FREE_RECID + 8 + ((size-1)/16)*8;
619 }
620 protected void freePhysPut(long indexVal) {
621 long size = (indexVal&MASK_SIZE) >>>48;
622 longStackPut(size2ListIoRecid(size), indexVal & MASK_OFFSET);
623 }
624
625 protected long freePhysTake(int size, boolean ensureAvail) {
626 if(size==0)throw new IllegalArgumentException();
627
628 //check free space
629 long ret = longStackTake(size2ListIoRecid(size));
630 if(ret!=0) return ret;
631 //not available, increase file size
632 if(physSize%Volume.BUF_SIZE+size>Volume.BUF_SIZE)
633 physSize += Volume.BUF_SIZE - physSize%Volume.BUF_SIZE;
634 long physSize2 = physSize;
635 physSize = roundTo16(physSize+size);
636 if(ensureAvail)
637 phys.ensureAvailable(physSize);
638 return physSize2;
639 }
640
641
642 protected <A> DataOutput2 serialize(A value, Serializer<A> serializer) {
643 try {
644 DataOutput2 out = new DataOutput2();
645 serializer.serialize(out,value);
646 return out;
647 } catch (IOException e) {
648 throw new IOError(e);
649 }
650 }
651
652}
Note: See TracBrowser for help on using the repository browser.