1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.directory.mavibot.btree;
21
22
23 import java.io.Closeable;
24 import java.io.EOFException;
25 import java.io.File;
26 import java.io.FileOutputStream;
27 import java.io.IOException;
28 import java.io.RandomAccessFile;
29 import java.nio.ByteBuffer;
30 import java.nio.channels.FileChannel;
31 import java.util.concurrent.ConcurrentLinkedQueue;
32
33 import org.apache.directory.mavibot.btree.exception.InitializationException;
34 import org.apache.directory.mavibot.btree.exception.KeyNotFoundException;
35 import org.apache.directory.mavibot.btree.exception.MissingSerializerException;
36 import org.apache.directory.mavibot.btree.serializer.BufferHandler;
37 import org.apache.directory.mavibot.btree.serializer.LongSerializer;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41
42
43
44
45
46
47
48
49
50 class InMemoryBTree<K, V> extends AbstractBTree<K, V> implements Closeable
51 {
52
53 protected static final Logger LOG = LoggerFactory.getLogger( InMemoryBTree.class );
54
55
56 public static final String DEFAULT_JOURNAL = "mavibot.log";
57
58
59 public static final String DATA_SUFFIX = ".db";
60
61
62 public static final String JOURNAL_SUFFIX = ".log";
63
64
65
66 private File file;
67
68
69 private boolean withJournal;
70
71
72 private File journal;
73
74
75 private File envDir;
76
77
78 private FileChannel journalChannel = null;
79
80
81
82
83
84 InMemoryBTree()
85 {
86 super();
87 setType( BTreeTypeEnum.IN_MEMORY );
88 }
89
90
91
92
93
94
95
96
97 InMemoryBTree( InMemoryBTreeConfiguration<K, V> configuration )
98 {
99 super();
100 String btreeName = configuration.getName();
101
102 if ( btreeName == null )
103 {
104 throw new IllegalArgumentException( "BTree name cannot be null" );
105 }
106
107 String filePath = configuration.getFilePath();
108
109 if ( filePath != null )
110 {
111 envDir = new File( filePath );
112 }
113
114
115 setName( btreeName );
116 setPageSize( configuration.getPageSize() );
117 setKeySerializer( configuration.getKeySerializer() );
118 setValueSerializer( configuration.getValueSerializer() );
119 setAllowDuplicates( configuration.isAllowDuplicates() );
120 setType( configuration.getType() );
121
122 readTimeOut = configuration.getReadTimeOut();
123 writeBufferSize = configuration.getWriteBufferSize();
124
125 if ( keySerializer.getComparator() == null )
126 {
127 throw new IllegalArgumentException( "Comparator should not be null" );
128 }
129
130
131 BTreeHeader<K, V> newBtreeHeader = new BTreeHeader<K, V>();
132
133
134
135 newBtreeHeader.setBTreeHeaderOffset( 0L );
136 newBtreeHeader.setRevision( 0L );
137 newBtreeHeader.setNbElems( 0L );
138 newBtreeHeader.setRootPage( new InMemoryLeaf<K, V>( this ) );
139 newBtreeHeader.setRootPageOffset( 0L );
140
141 btreeRevisions.put( 0L, newBtreeHeader );
142 currentBtreeHeader = newBtreeHeader;
143
144
145 try
146 {
147 init();
148 }
149 catch ( IOException ioe )
150 {
151 throw new InitializationException( ioe.getMessage() );
152 }
153 }
154
155
156
157
158
159
160
161 private void init() throws IOException
162 {
163
164 if ( envDir != null )
165 {
166 if ( !envDir.exists() )
167 {
168 boolean created = envDir.mkdirs();
169
170 if ( !created )
171 {
172 throw new IllegalStateException( "Could not create the directory " + envDir + " for storing data" );
173 }
174 }
175
176 this.file = new File( envDir, getName() + DATA_SUFFIX );
177
178 this.journal = new File( envDir, file.getName() + JOURNAL_SUFFIX );
179 setType( BTreeTypeEnum.BACKED_ON_DISK );
180 }
181
182
183 readTransactions = new ConcurrentLinkedQueue<ReadTransaction<K, V>>();
184
185
186 transactionManager = new InMemoryTransactionManager();
187
188
189
190 if ( getType() == BTreeTypeEnum.BACKED_ON_DISK )
191 {
192 if ( file.length() > 0 )
193 {
194
195 load( file );
196 }
197
198 withJournal = true;
199
200 FileOutputStream stream = new FileOutputStream( journal );
201 journalChannel = stream.getChannel();
202
203
204
205 if ( journal.length() > 0 )
206 {
207 applyJournal();
208 }
209 }
210 else
211 {
212 setType( BTreeTypeEnum.IN_MEMORY );
213
214
215 BTreeHeader<K, V> btreeHeader = new BTreeHeader<K, V>();
216 btreeHeader.setRootPage( new InMemoryLeaf<K, V>( this ) );
217 btreeHeader.setBtree( this );
218 storeRevision( btreeHeader );
219 }
220
221
222
223
224 }
225
226
227
228
229
230 protected ReadTransaction<K, V> beginReadTransaction()
231 {
232 BTreeHeader<K, V> btreeHeader = getBtreeHeader();
233
234 ReadTransaction<K, V> readTransaction = new ReadTransaction<K, V>( btreeHeader, readTransactions );
235
236 readTransactions.add( readTransaction );
237
238 return readTransaction;
239 }
240
241
242
243
244
245 protected ReadTransaction<K, V> beginReadTransaction( long revision )
246 {
247 BTreeHeader<K, V> btreeHeader = getBtreeHeader( revision );
248
249 if ( btreeHeader != null )
250 {
251 ReadTransaction<K, V> readTransaction = new ReadTransaction<K, V>( btreeHeader, readTransactions );
252
253 readTransactions.add( readTransaction );
254
255 return readTransaction;
256 }
257 else
258 {
259 return null;
260 }
261 }
262
263
264
265
266
267 public void close() throws IOException
268 {
269
270
271
272
273 if ( getType() == BTreeTypeEnum.BACKED_ON_DISK )
274 {
275
276 flush();
277 journalChannel.close();
278 }
279 }
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294 protected Tuple<K, V> delete( K key, V value, long revision ) throws IOException
295 {
296 if ( revision == -1L )
297 {
298 revision = currentRevision.get() + 1;
299 }
300
301 BTreeHeader<K, V> oldBtreeHeader = getBtreeHeader();
302 BTreeHeader<K, V> newBtreeHeader = createNewBtreeHeader( oldBtreeHeader, revision );
303 newBtreeHeader.setBtree( this );
304
305
306
307 Tuple<K, V> tuple = null;
308
309
310
311 DeleteResult<K, V> result = getRootPage().delete( key, value, revision );
312
313 if ( result instanceof NotPresentResult )
314 {
315
316 return null;
317 }
318
319
320
321
322 if ( result instanceof RemoveResult )
323 {
324
325 RemoveResult<K, V> removeResult = ( RemoveResult<K, V> ) result;
326
327 Page<K, V> modifiedPage = removeResult.getModifiedPage();
328
329
330 newBtreeHeader.setRootPage( modifiedPage );
331 tuple = removeResult.getRemovedElement();
332 }
333
334 if ( withJournal )
335 {
336
337 writeToJournal( new Deletion<K, V>( key ) );
338 }
339
340
341 if ( tuple != null )
342 {
343 newBtreeHeader.decrementNbElems();
344 }
345
346 storeRevision( newBtreeHeader );
347
348
349 if ( oldBtreeHeader.getNbUsers() == 0 )
350 {
351 btreeRevisions.remove( oldBtreeHeader.getRevision() );
352 }
353
354 return tuple;
355 }
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371 InsertResult<K, V> insert( K key, V value, long revision ) throws IOException
372 {
373
374
375 if ( revision == -1L )
376 {
377 revision = currentRevision.get() + 1;
378 }
379
380 BTreeHeader<K, V> oldBtreeHeader = getBtreeHeader();
381 BTreeHeader<K, V> newBtreeHeader = createNewBtreeHeader( oldBtreeHeader, revision );
382 newBtreeHeader.setBtree( this );
383
384
385
386 V modifiedValue = null;
387
388
389
390
391 InsertResult<K, V> result = newBtreeHeader.getRootPage().insert( key, value, revision );
392
393 if ( result instanceof ExistsResult )
394 {
395 return result;
396 }
397
398 if ( result instanceof ModifyResult )
399 {
400 ModifyResult<K, V> modifyResult = ( ( ModifyResult<K, V> ) result );
401
402 Page<K, V> modifiedPage = modifyResult.getModifiedPage();
403
404
405
406 newBtreeHeader.setRootPage( modifiedPage );
407
408 modifiedValue = modifyResult.getModifiedValue();
409 }
410 else
411 {
412
413
414 SplitResult<K, V> splitResult = ( ( SplitResult<K, V> ) result );
415
416 K pivot = splitResult.getPivot();
417 Page<K, V> leftPage = splitResult.getLeftPage();
418 Page<K, V> rightPage = splitResult.getRightPage();
419
420
421 newBtreeHeader.setRootPage( new InMemoryNode<K, V>( this, revision, pivot, leftPage, rightPage ) );
422 }
423
424
425 if ( withJournal )
426 {
427 writeToJournal( new Addition<K, V>( key, value ) );
428 }
429
430
431
432 if ( modifiedValue == null )
433 {
434 newBtreeHeader.incrementNbElems();
435 }
436
437 storeRevision( newBtreeHeader );
438
439 if ( oldBtreeHeader.getNbUsers() == 0 )
440 {
441 long oldRevision = oldBtreeHeader.getRevision();
442
443 if ( oldRevision < newBtreeHeader.getRevision() )
444 {
445 btreeRevisions.remove( oldBtreeHeader.getRevision() );
446 }
447 }
448
449
450 return result;
451 }
452
453
454
455
456
457
458
459
460
461
462 private void writeBuffer( FileChannel channel, ByteBuffer bb, byte[] buffer ) throws IOException
463 {
464 int size = buffer.length;
465 int pos = 0;
466
467
468 do
469 {
470 if ( bb.remaining() >= size )
471 {
472
473 bb.put( buffer, pos, size );
474 size = 0;
475 }
476 else
477 {
478
479 int len = bb.remaining();
480 size -= len;
481 bb.put( buffer, pos, len );
482 pos += len;
483
484 bb.flip();
485
486 channel.write( bb );
487
488 bb.clear();
489 }
490 }
491 while ( size > 0 );
492 }
493
494
495
496
497
498
499 public void flush( File file ) throws IOException
500 {
501 File parentFile = file.getParentFile();
502 File baseDirectory = null;
503
504 if ( parentFile != null )
505 {
506 baseDirectory = new File( file.getParentFile().getAbsolutePath() );
507 }
508 else
509 {
510 baseDirectory = new File( "." );
511 }
512
513
514 File tmpFileFD = File.createTempFile( "mavibot", null, baseDirectory );
515 FileOutputStream stream = new FileOutputStream( tmpFileFD );
516 FileChannel ch = stream.getChannel();
517
518
519 ByteBuffer bb = ByteBuffer.allocateDirect( writeBufferSize );
520
521 try
522 {
523 TupleCursor<K, V> cursor = browse();
524
525 if ( keySerializer == null )
526 {
527 throw new MissingSerializerException( "Cannot flush the btree without a Key serializer" );
528 }
529
530 if ( valueSerializer == null )
531 {
532 throw new MissingSerializerException( "Cannot flush the btree without a Value serializer" );
533 }
534
535
536 bb.putLong( getBtreeHeader().getNbElems() );
537
538 while ( cursor.hasNext() )
539 {
540 Tuple<K, V> tuple = cursor.next();
541
542 byte[] keyBuffer = keySerializer.serialize( tuple.getKey() );
543
544 writeBuffer( ch, bb, keyBuffer );
545
546 byte[] valueBuffer = valueSerializer.serialize( tuple.getValue() );
547
548 writeBuffer( ch, bb, valueBuffer );
549 }
550
551
552 if ( bb.position() > 0 )
553 {
554 bb.flip();
555 ch.write( bb );
556 }
557
558
559 ch.force( true );
560 ch.close();
561 }
562 catch ( KeyNotFoundException knfe )
563 {
564 knfe.printStackTrace();
565 throw new IOException( knfe.getMessage() );
566 }
567
568
569 File backupFile = File.createTempFile( "mavibot", null, baseDirectory );
570 file.renameTo( backupFile );
571
572
573 tmpFileFD.renameTo( file );
574
575
576 backupFile.delete();
577 }
578
579
580
581
582
583
584
585 private void applyJournal() throws IOException
586 {
587 if ( !journal.exists() )
588 {
589 throw new IOException( "The journal does not exist" );
590 }
591
592 FileChannel channel =
593 new RandomAccessFile( journal, "rw" ).getChannel();
594 ByteBuffer buffer = ByteBuffer.allocate( 65536 );
595
596 BufferHandler bufferHandler = new BufferHandler( channel, buffer );
597
598
599 try
600 {
601 while ( true )
602 {
603
604 byte[] type = bufferHandler.read( 1 );
605
606 if ( type[0] == Modification.ADDITION )
607 {
608
609 K key = keySerializer.deserialize( bufferHandler );
610
611
612
613
614 V value = valueSerializer.deserialize( bufferHandler );
615
616
617
618
619 insert( key, value, getBtreeHeader().getRevision() );
620 }
621 else
622 {
623
624 K key = keySerializer.deserialize( bufferHandler );
625
626
627 delete( key, getBtreeHeader().getRevision() );
628 }
629 }
630 }
631 catch ( EOFException eofe )
632 {
633 eofe.printStackTrace();
634
635 journalChannel.truncate( 0 );
636 }
637 }
638
639
640
641
642
643
644
645
646
647 public void load( File file ) throws IOException
648 {
649 if ( !file.exists() )
650 {
651 throw new IOException( "The file does not exist" );
652 }
653
654 FileChannel channel =
655 new RandomAccessFile( file, "rw" ).getChannel();
656 ByteBuffer buffer = ByteBuffer.allocate( 65536 );
657
658 BufferHandler bufferHandler = new BufferHandler( channel, buffer );
659
660 long nbElems = LongSerializer.deserialize( bufferHandler.read( 8 ) );
661
662
663 boolean isJournalActivated = withJournal;
664
665 withJournal = false;
666
667
668 for ( long i = 0; i < nbElems; i++ )
669 {
670
671 K key = keySerializer.deserialize( bufferHandler );
672
673
674 V value = valueSerializer.deserialize( bufferHandler );
675
676
677 insert( key, value, getBtreeHeader().getRevision() );
678 }
679
680
681 withJournal = isJournalActivated;
682
683
684
685 }
686
687
688
689
690
691
692
693
694
695
696 public Page<K, V> getRootPage( long revision ) throws IOException, KeyNotFoundException
697 {
698
699 return getBtreeHeader().getRootPage();
700 }
701
702
703
704
705
706
707
708 public Page<K, V> getRootPage()
709 {
710 return getBtreeHeader().getRootPage();
711 }
712
713
714 void setRootPage( Page<K, V> root )
715 {
716 getBtreeHeader().setRootPage( root );
717 }
718
719
720
721
722
723
724 public void flush() throws IOException
725 {
726 if ( getType() == BTreeTypeEnum.BACKED_ON_DISK )
727 {
728
729 flush( file );
730 journalChannel.truncate( 0 );
731 }
732 }
733
734
735
736
737
738 public File getFile()
739 {
740 return file;
741 }
742
743
744
745
746
747
748
749 public void setFilePath( String filePath )
750 {
751 if ( filePath != null )
752 {
753 envDir = new File( filePath );
754 }
755 }
756
757
758
759
760
761 public File getJournal()
762 {
763 return journal;
764 }
765
766
767
768
769
770 public boolean isInMemory()
771 {
772 return getType() == BTreeTypeEnum.IN_MEMORY;
773 }
774
775
776
777
778
779 public boolean isPersistent()
780 {
781 return getType() == BTreeTypeEnum.IN_MEMORY;
782 }
783
784
785 private void writeToJournal( Modification<K, V> modification )
786 throws IOException
787 {
788 if ( modification instanceof Addition )
789 {
790 byte[] keyBuffer = keySerializer.serialize( modification.getKey() );
791 ByteBuffer bb = ByteBuffer.allocateDirect( keyBuffer.length + 1 );
792 bb.put( Modification.ADDITION );
793 bb.put( keyBuffer );
794 bb.flip();
795
796 journalChannel.write( bb );
797
798 byte[] valueBuffer = valueSerializer.serialize( modification.getValue() );
799 bb = ByteBuffer.allocateDirect( valueBuffer.length );
800 bb.put( valueBuffer );
801 bb.flip();
802
803 journalChannel.write( bb );
804 }
805 else if ( modification instanceof Deletion )
806 {
807 byte[] keyBuffer = keySerializer.serialize( modification.getKey() );
808 ByteBuffer bb = ByteBuffer.allocateDirect( keyBuffer.length + 1 );
809 bb.put( Modification.DELETION );
810 bb.put( keyBuffer );
811 bb.flip();
812
813 journalChannel.write( bb );
814 }
815
816
817 journalChannel.force( true );
818 }
819
820
821
822
823
824
825 private BTreeHeader<K, V> createNewBtreeHeader( BTreeHeader<K, V> btreeHeader, long revision )
826 {
827 BTreeHeader<K, V> newBtreeHeader = new BTreeHeader<K, V>();
828
829 newBtreeHeader.setBTreeHeaderOffset( btreeHeader.getBTreeHeaderOffset() );
830 newBtreeHeader.setRevision( revision );
831 newBtreeHeader.setNbElems( btreeHeader.getNbElems() );
832 newBtreeHeader.setRootPage( btreeHeader.getRootPage() );
833
834 return newBtreeHeader;
835 }
836
837
838
839
840
841 public String toString()
842 {
843 StringBuilder sb = new StringBuilder();
844
845 switch ( getType() )
846 {
847 case IN_MEMORY:
848 sb.append( "In-memory " );
849 break;
850
851 case BACKED_ON_DISK:
852 sb.append( "Persistent " );
853 break;
854
855 default:
856 sb.append( "Wrong type... " );
857 break;
858 }
859
860 sb.append( "BTree" );
861 sb.append( "[" ).append( getName() ).append( "]" );
862 sb.append( "( pageSize:" ).append( getPageSize() );
863
864 if ( getBtreeHeader().getRootPage() != null )
865 {
866 sb.append( ", nbEntries:" ).append( getBtreeHeader().getNbElems() );
867 }
868 else
869 {
870 sb.append( ", nbEntries:" ).append( 0 );
871 }
872
873 sb.append( ", comparator:" );
874
875 if ( keySerializer.getComparator() == null )
876 {
877 sb.append( "null" );
878 }
879 else
880 {
881 sb.append( keySerializer.getComparator().getClass().getSimpleName() );
882 }
883
884 sb.append( ", DuplicatesAllowed: " ).append( isAllowDuplicates() );
885
886 if ( getType() == BTreeTypeEnum.BACKED_ON_DISK )
887 {
888 try
889 {
890 sb.append( ", file : " );
891
892 if ( file != null )
893 {
894 sb.append( file.getCanonicalPath() );
895 }
896 else
897 {
898 sb.append( "Unknown" );
899 }
900
901 sb.append( ", journal : " );
902
903 if ( journal != null )
904 {
905 sb.append( journal.getCanonicalPath() );
906 }
907 else
908 {
909 sb.append( "Unkown" );
910 }
911 }
912 catch ( IOException ioe )
913 {
914
915 }
916 }
917
918 sb.append( ") : \n" );
919 sb.append( getRootPage().dumpPage( "" ) );
920
921 return sb.toString();
922 }
923 }