View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.directory.mavibot.btree;
21  
22  
23  import java.io.File;
24  import java.io.IOException;
25  import java.io.RandomAccessFile;
26  import java.nio.ByteBuffer;
27  import java.nio.channels.FileChannel;
28  import java.util.ArrayList;
29  import java.util.HashMap;
30  import java.util.HashSet;
31  import java.util.LinkedHashMap;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.Queue;
35  import java.util.Set;
36  import java.util.concurrent.LinkedBlockingQueue;
37  import java.util.concurrent.atomic.AtomicLong;
38  import java.util.concurrent.locks.ReadWriteLock;
39  import java.util.concurrent.locks.ReentrantLock;
40  import java.util.concurrent.locks.ReentrantReadWriteLock;
41  
42  import org.apache.directory.mavibot.btree.exception.BTreeAlreadyManagedException;
43  import org.apache.directory.mavibot.btree.exception.BTreeCreationException;
44  import org.apache.directory.mavibot.btree.exception.EndOfFileExceededException;
45  import org.apache.directory.mavibot.btree.exception.FileException;
46  import org.apache.directory.mavibot.btree.exception.InvalidOffsetException;
47  import org.apache.directory.mavibot.btree.exception.KeyNotFoundException;
48  import org.apache.directory.mavibot.btree.exception.RecordManagerException;
49  import org.apache.directory.mavibot.btree.serializer.ElementSerializer;
50  import org.apache.directory.mavibot.btree.serializer.IntSerializer;
51  import org.apache.directory.mavibot.btree.serializer.LongArraySerializer;
52  import org.apache.directory.mavibot.btree.serializer.LongSerializer;
53  import org.apache.directory.mavibot.btree.util.Strings;
54  import org.slf4j.Logger;
55  import org.slf4j.LoggerFactory;
56  
57  
58  /**
59   * The RecordManager is used to manage the file in which we will store the B-trees.
60   * A RecordManager will manage more than one B-tree.<br/>
61   *
62   * It stores data in fixed size pages (default size is 512 bytes), which may be linked one to
63   * the other if the data we want to store is too big for a page.
64   *
65   * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
66   */
67  public class RecordManager extends AbstractTransactionManager
68  {
69      /** The LoggerFactory used by this class */
70      protected static final Logger LOG = LoggerFactory.getLogger( RecordManager.class );
71  
72      /** The LoggerFactory used to trace TXN operations */
73      protected static final Logger TXN_LOG = LoggerFactory.getLogger( "TXN_LOG" );
74  
75      /** The LoggerFactory used by this class */
76      protected static final Logger LOG_PAGES = LoggerFactory.getLogger( "org.apache.directory.mavibot.LOG_PAGES" );
77  
78      /** A dedicated logger for the check */
79      protected static final Logger LOG_CHECK = LoggerFactory.getLogger( "org.apache.directory.mavibot.LOG_CHECK" );
80  
81      /** The associated file */
82      private File file;
83  
84      /** The channel used to read and write data */
85      /* no qualifier */FileChannel fileChannel;
86  
87      /** The number of managed B-trees */
88      /* no qualifier */int nbBtree;
89  
90      /** The first and last free page */
91      /* no qualifier */long firstFreePage;
92  
93      /** Some counters to track the number of free pages */
94      public AtomicLong nbFreedPages = new AtomicLong( 0 );
95      public AtomicLong nbCreatedPages = new AtomicLong( 0 );
96      public AtomicLong nbReusedPages = new AtomicLong( 0 );
97      public AtomicLong nbUpdateRMHeader = new AtomicLong( 0 );
98      public AtomicLong nbUpdateBtreeHeader = new AtomicLong( 0 );
99      public AtomicLong nbUpdatePageIOs = new AtomicLong( 0 );
100 
101     /** The offset of the end of the file */
102     private long endOfFileOffset;
103 
104     /**
105      * A B-tree used to manage the page that has been copied in a new version.
106      * Those pages can be reclaimed when the associated version is dead.
107      **/
108     /* no qualifier */ BTree<RevisionName, long[]> copiedPageBtree;
109 
110     /** A constant for an offset on a non existing page */
111     public static final long NO_PAGE = -1L;
112 
113     /** The number of bytes used to store the size of a page */
114     private static final int PAGE_SIZE = 4;
115 
116     /** The size of the link to next page */
117     private static final int LINK_SIZE = 8;
118 
119     /** Some constants */
120     private static final int BYTE_SIZE = 1;
121     /* no qualifier */static final int INT_SIZE = 4;
122     /* no qualifier */static final int LONG_SIZE = 8;
123 
124     /** The default page size */
125     public static final int DEFAULT_PAGE_SIZE = 512;
126 
127     /** The minimal page size. Can't be below 64, as we have to store many thing sin the RMHeader */
128     private static final int MIN_PAGE_SIZE = 64;
129 
130     /** The RecordManager header size */
131     /* no qualifier */static int RECORD_MANAGER_HEADER_SIZE = DEFAULT_PAGE_SIZE;
132 
133     /** A global buffer used to store the RecordManager header */
134     private ByteBuffer RECORD_MANAGER_HEADER_BUFFER;
135 
136     /** A static buffer used to store the RecordManager header */
137     private byte[] RECORD_MANAGER_HEADER_BYTES;
138 
139     /** The length of an Offset, as a negative value */
140     private byte[] LONG_LENGTH = new byte[]
141         { ( byte ) 0xFF, ( byte ) 0xFF, ( byte ) 0xFF, ( byte ) 0xF8 };
142 
143     /** The RecordManager underlying page size. */
144     /* no qualifier */int pageSize = DEFAULT_PAGE_SIZE;
145 
146     /** The set of managed B-trees */
147     private Map<String, BTree<Object, Object>> managedBtrees;
148 
149     /** The queue of recently closed transactions */
150     private Queue<RevisionName> closedTransactionsQueue = new LinkedBlockingQueue<RevisionName>();
151 
152     /** The default file name */
153     private static final String DEFAULT_FILE_NAME = "mavibot.db";
154 
155     /** A flag set to true if we want to keep old revisions */
156     private boolean keepRevisions;
157 
158     /** A flag used by internal btrees */
159     public static final boolean INTERNAL_BTREE = true;
160 
161     /** A flag used by internal btrees */
162     public static final boolean NORMAL_BTREE = false;
163 
164     /** The B-tree of B-trees */
165     /* no qualifier */ BTree<NameRevision, Long> btreeOfBtrees;
166 
167     /** The B-tree of B-trees management btree name */
168     /* no qualifier */static final String BTREE_OF_BTREES_NAME = "_btree_of_btrees_";
169 
170     /** The CopiedPages management btree name */
171     /* no qualifier */static final String COPIED_PAGE_BTREE_NAME = "_copiedPageBtree_";
172 
173     /** The current B-tree of B-trees header offset */
174     /* no qualifier */long currentBtreeOfBtreesOffset;
175 
176     /** The previous B-tree of B-trees header offset */
177     private long previousBtreeOfBtreesOffset = NO_PAGE;
178 
179     /** The offset on the current copied pages B-tree */
180     /* no qualifier */ long currentCopiedPagesBtreeOffset = NO_PAGE;
181 
182     /** The offset on the previous copied pages B-tree */
183     private long previousCopiedPagesBtreeOffset = NO_PAGE;
184 
185     /** A lock to protect the transaction handling */
186     private ReentrantLock transactionLock = new ReentrantLock();
187 
188     /** A ThreadLocalStorage used to store the current transaction */
189     private static final ThreadLocal<Integer> context = new ThreadLocal<Integer>();
190 
191     /** The list of PageIO that can be freed after a commit */
192     List<PageIO> freedPages = new ArrayList<PageIO>();
193 
194     /** The list of PageIO that can be freed after a roolback */
195     private List<PageIO> allocatedPages = new ArrayList<PageIO>();
196 
197     /** A Map keeping the latest revisions for each managed BTree */
198     private Map<String, BTreeHeader<?, ?>> currentBTreeHeaders = new HashMap<String, BTreeHeader<?, ?>>();
199 
200     /** A Map storing the new revisions when some change have been made in some BTrees */
201     private Map<String, BTreeHeader<?, ?>> newBTreeHeaders = new HashMap<String, BTreeHeader<?, ?>>();
202 
203     /** A lock to protect the BtreeHeader maps */
204     private ReadWriteLock btreeHeadersLock = new ReentrantReadWriteLock();
205 
206     /** A value stored into the transaction context for rollbacked transactions */
207     private static final int ROLLBACKED_TXN = 0;
208 
209     /** A lock to protect the freepage pointers */
210     private ReentrantLock freePageLock = new ReentrantLock();
211 
212     /** the space reclaimer */
213     private PageReclaimer reclaimer;
214 
215     /** variable to keep track of the write commit count */
216     private int commitCount = 0;
217 
218     /** the threshold at which the PageReclaimer will be run to free the copied pages */
219     // FIXME the below value is derived after seeing that anything higher than that
220     // is resulting in a "This thread does not hold the transactionLock" error
221     private int pageReclaimerThreshold = 70;
222     
223     /* a flag used to disable the free page reclaimer (used for internal testing only) */
224     private boolean disableReclaimer = false;
225 
226     public Map<Long, Integer> writeCounter = new HashMap<Long, Integer>();
227 
228 
229     /**
230      * Create a Record manager which will either create the underlying file
231      * or load an existing one. If a folder is provided, then we will create
232      * a file with a default name : mavibot.db
233      *
234      * @param name The file name, or a folder name
235      */
236     public RecordManager( String fileName )
237     {
238         this( fileName, DEFAULT_PAGE_SIZE );
239     }
240 
241 
242     /**
243      * Create a Record manager which will either create the underlying file
244      * or load an existing one. If a folder is provider, then we will create
245      * a file with a default name : mavibot.db
246      *
247      * @param name The file name, or a folder name
248      * @param pageSize the size of a page on disk, in bytes
249      */
250     public RecordManager( String fileName, int pageSize )
251     {
252         managedBtrees = new LinkedHashMap<String, BTree<Object, Object>>();
253 
254         if ( pageSize < MIN_PAGE_SIZE )
255         {
256             this.pageSize = MIN_PAGE_SIZE;
257         }
258         else
259         {
260             this.pageSize = pageSize;
261         }
262 
263         RECORD_MANAGER_HEADER_BUFFER = ByteBuffer.allocate( this.pageSize );
264         RECORD_MANAGER_HEADER_BYTES = new byte[this.pageSize];
265         RECORD_MANAGER_HEADER_SIZE = this.pageSize;
266 
267         // Open the file or create it
268         File tmpFile = new File( fileName );
269 
270         if ( tmpFile.isDirectory() )
271         {
272             // It's a directory. Check that we don't have an existing mavibot file
273             tmpFile = new File( tmpFile, DEFAULT_FILE_NAME );
274         }
275 
276         // We have to create a new file, if it does not already exist
277         boolean isNewFile = createFile( tmpFile );
278 
279         try
280         {
281             RandomAccessFile randomFile = new RandomAccessFile( file, "rw" );
282             fileChannel = randomFile.getChannel();
283 
284             // get the current end of file offset
285             endOfFileOffset = fileChannel.size();
286 
287             if ( isNewFile )
288             {
289                 initRecordManager();
290             }
291             else
292             {
293                 loadRecordManager();
294             }
295 
296             reclaimer = new PageReclaimer( this );
297             runReclaimer();
298         }
299         catch ( Exception e )
300         {
301             LOG.error( "Error while initializing the RecordManager : {}", e.getMessage() );
302             LOG.error( "", e );
303             throw new RecordManagerException( e );
304         }
305     }
306 
307 
308     /**
309      * runs the PageReclaimer to free the copied pages
310      */
311     private void runReclaimer()
312     {
313         if ( disableReclaimer )
314         {
315             LOG.warn( "Free page reclaimer is disabled, this should not be disabled on production systems." );
316             return; 
317         }
318         
319         try
320         {
321             commitCount = 0;
322             reclaimer.reclaim();
323             // must update the headers after reclaim operation
324             updateRecordManagerHeader();
325         }
326         catch ( Exception e )
327         {
328             LOG.warn( "PageReclaimer failed to free the pages", e );
329         }
330     }
331 
332 
333     /**
334      * Create the mavibot file if it does not exist
335      */
336     private boolean createFile( File mavibotFile )
337     {
338         try
339         {
340             boolean creation = mavibotFile.createNewFile();
341 
342             file = mavibotFile;
343 
344             if ( mavibotFile.length() == 0 )
345             {
346                 return true;
347             }
348             else
349             {
350                 return creation;
351             }
352         }
353         catch ( IOException ioe )
354         {
355             LOG.error( "Cannot create the file {}", mavibotFile.getName() );
356             return false;
357         }
358     }
359 
360 
361     /**
362      * We will create a brand new RecordManager file, containing nothing, but the RecordManager header,
363      * a B-tree to manage the old revisions we want to keep and
364      * a B-tree used to manage pages associated with old versions.
365      * <br/>
366      * The RecordManager header contains the following details :
367      * <pre>
368      * +--------------------------+
369      * | PageSize                 | 4 bytes : The size of a physical page (default to 512)
370      * +--------------------------+
371      * |  NbTree                  | 4 bytes : The number of managed B-trees (zero or more)
372      * +--------------------------+
373      * | FirstFree                | 8 bytes : The offset of the first free page
374      * +--------------------------+
375      * | current BoB offset       | 8 bytes : The offset of the current BoB
376      * +--------------------------+
377      * | previous BoB offset      | 8 bytes : The offset of the previous BoB
378      * +--------------------------+
379      * | current CP btree offset  | 8 bytes : The offset of the current BoB
380      * +--------------------------+
381      * | previous CP btree offset | 8 bytes : The offset of the previous BoB
382      * +--------------------------+
383      * </pre>
384      *
385      * We then store the B-tree managing the pages that have been copied when we have added
386      * or deleted an element in the B-tree. They are associated with a version.
387      *
388      * Last, we add the bTree that keep a track on each revision we can have access to.
389      */
390     private void initRecordManager() throws IOException
391     {
392         // Create a new Header
393         nbBtree = 0;
394         firstFreePage = NO_PAGE;
395         currentBtreeOfBtreesOffset = NO_PAGE;
396 
397         updateRecordManagerHeader();
398 
399         // Set the offset of the end of the file
400         endOfFileOffset = fileChannel.size();
401 
402         // First, create the btree of btrees <NameRevision, Long>
403         createBtreeOfBtrees();
404 
405         // Now, initialize the Copied Page B-tree
406         createCopiedPagesBtree();
407 
408         // Inject these B-trees into the RecordManager. They are internal B-trees.
409         try
410         {
411             manageSubBtree( btreeOfBtrees );
412 
413             currentBtreeOfBtreesOffset = ( ( PersistedBTree<NameRevision, Long> ) btreeOfBtrees ).getBtreeHeader()
414                 .getBTreeHeaderOffset();
415             updateRecordManagerHeader();
416 
417             // Inject the BtreeOfBtrees into the currentBtreeHeaders map
418             currentBTreeHeaders.put( BTREE_OF_BTREES_NAME,
419                 ( ( PersistedBTree<NameRevision, Long> ) btreeOfBtrees ).getBtreeHeader() );
420             newBTreeHeaders.put( BTREE_OF_BTREES_NAME,
421                 ( ( PersistedBTree<NameRevision, Long> ) btreeOfBtrees ).getBtreeHeader() );
422 
423             // The FreePage B-tree
424             manageSubBtree( copiedPageBtree );
425 
426             currentCopiedPagesBtreeOffset = ( ( PersistedBTree<RevisionName, long[]> ) copiedPageBtree ).getBtreeHeader().getBTreeHeaderOffset();
427             updateRecordManagerHeader();
428             
429             // Inject the CopiedPagesBTree into the currentBtreeHeaders map
430             currentBTreeHeaders.put( COPIED_PAGE_BTREE_NAME, ( ( PersistedBTree<RevisionName, long[]> ) copiedPageBtree ).getBtreeHeader() );
431             newBTreeHeaders.put( COPIED_PAGE_BTREE_NAME, ( ( PersistedBTree<RevisionName, long[]> ) copiedPageBtree ).getBtreeHeader() );
432         }
433         catch ( BTreeAlreadyManagedException btame )
434         {
435             // Can't happen here.
436         }
437 
438         // We are all set ! Verify the file
439         if ( LOG_CHECK.isDebugEnabled() )
440         {
441             MavibotInspector.check( this );
442         }
443 
444     }
445 
446 
447     /**
448      * Create the B-treeOfBtrees
449      */
450     private void createBtreeOfBtrees()
451     {
452         PersistedBTreeConfiguration<NameRevision, Long> configuration = new PersistedBTreeConfiguration<NameRevision, Long>();
453         configuration.setKeySerializer( NameRevisionSerializer.INSTANCE );
454         configuration.setName( BTREE_OF_BTREES_NAME );
455         configuration.setValueSerializer( LongSerializer.INSTANCE );
456         configuration.setBtreeType( BTreeTypeEnum.BTREE_OF_BTREES );
457         configuration.setCacheSize( PersistedBTree.DEFAULT_CACHE_SIZE );
458 
459         btreeOfBtrees = BTreeFactory.createPersistedBTree( configuration );
460     }
461 
462 
463     /**
464      * Create the CopiedPagesBtree
465      */
466     private void createCopiedPagesBtree()
467     {
468         PersistedBTreeConfiguration<RevisionName, long[]> configuration = new PersistedBTreeConfiguration<RevisionName, long[]>();
469         configuration.setKeySerializer( RevisionNameSerializer.INSTANCE );
470         configuration.setName( COPIED_PAGE_BTREE_NAME );
471         configuration.setValueSerializer( LongArraySerializer.INSTANCE );
472         configuration.setBtreeType( BTreeTypeEnum.COPIED_PAGES_BTREE );
473         configuration.setCacheSize( PersistedBTree.DEFAULT_CACHE_SIZE );
474 
475         copiedPageBtree = BTreeFactory.createPersistedBTree( configuration );
476     }
477 
478 
479     /**
480      * Load the BTrees from the disk.
481      *
482      * @throws InstantiationException
483      * @throws IllegalAccessException
484      * @throws ClassNotFoundException
485      * @throws NoSuchFieldException
486      * @throws SecurityException
487      * @throws IllegalArgumentException
488      */
489     private void loadRecordManager() throws IOException, ClassNotFoundException, IllegalAccessException,
490         InstantiationException, IllegalArgumentException, SecurityException, NoSuchFieldException, KeyNotFoundException
491     {
492         if ( fileChannel.size() != 0 )
493         {
494             ByteBuffer recordManagerHeader = ByteBuffer.allocate( RECORD_MANAGER_HEADER_SIZE );
495 
496             // The file exists, we have to load the data now
497             fileChannel.read( recordManagerHeader );
498 
499             recordManagerHeader.rewind();
500 
501             // read the RecordManager Header :
502             // +---------------------+
503             // | PageSize            | 4 bytes : The size of a physical page (default to 4096)
504             // +---------------------+
505             // | NbTree              | 4 bytes : The number of managed B-trees (at least 1)
506             // +---------------------+
507             // | FirstFree           | 8 bytes : The offset of the first free page
508             // +---------------------+
509             // | current BoB offset  | 8 bytes : The offset of the current B-tree of B-trees
510             // +---------------------+
511             // | previous BoB offset | 8 bytes : The offset of the previous B-tree of B-trees
512             // +---------------------+
513             // | current CP offset   | 8 bytes : The offset of the current Copied Pages B-tree
514             // +---------------------+
515             // | previous CP offset  | 8 bytes : The offset of the previous Copied Pages B-tree
516             // +---------------------+
517 
518             // The page size
519             pageSize = recordManagerHeader.getInt();
520 
521             // The number of managed B-trees
522             nbBtree = recordManagerHeader.getInt();
523 
524             // The first and last free page
525             firstFreePage = recordManagerHeader.getLong();
526 
527             // Read all the free pages
528             checkFreePages();
529 
530             // The current BOB offset
531             currentBtreeOfBtreesOffset = recordManagerHeader.getLong();
532 
533             // The previous BOB offset
534             previousBtreeOfBtreesOffset = recordManagerHeader.getLong();
535 
536             // The current Copied Pages B-tree offset
537             currentCopiedPagesBtreeOffset = recordManagerHeader.getLong();
538 
539             // The previous Copied Pages B-tree offset
540             previousCopiedPagesBtreeOffset = recordManagerHeader.getLong();
541 
542             // read the B-tree of B-trees
543             PageIO[] bobHeaderPageIos = readPageIOs( currentBtreeOfBtreesOffset, Long.MAX_VALUE );
544 
545             btreeOfBtrees = BTreeFactory.<NameRevision, Long> createPersistedBTree( BTreeTypeEnum.BTREE_OF_BTREES );
546             //BTreeFactory.<NameRevision, Long> setBtreeHeaderOffset( ( PersistedBTree<NameRevision, Long> )btreeOfBtrees, currentBtreeOfBtreesOffset );
547 
548             loadBtree( bobHeaderPageIos, btreeOfBtrees );
549 
550             // read the copied page B-tree
551             PageIO[] copiedPagesPageIos = readPageIOs( currentCopiedPagesBtreeOffset, Long.MAX_VALUE );
552 
553             copiedPageBtree = BTreeFactory.<RevisionName, long[]> createPersistedBTree( BTreeTypeEnum.COPIED_PAGES_BTREE );
554             //( ( PersistedBTree<RevisionName, long[]> ) copiedPageBtree ).setBtreeHeaderOffset( currentCopiedPagesBtreeOffset );
555 
556             loadBtree( copiedPagesPageIos, copiedPageBtree );
557 
558             // Now, read all the B-trees from the btree of btrees
559             TupleCursor<NameRevision, Long> btreeCursor = btreeOfBtrees.browse();
560             Map<String, Long> loadedBtrees = new HashMap<String, Long>();
561 
562             // loop on all the btrees we have, and keep only the latest revision
563             long currentRevision = -1L;
564 
565             while ( btreeCursor.hasNext() )
566             {
567                 Tuple<NameRevision, Long> btreeTuple = btreeCursor.next();
568                 NameRevision nameRevision = btreeTuple.getKey();
569                 long btreeOffset = btreeTuple.getValue();
570                 long revision = nameRevision.getValue();
571 
572                 // Check if we already have processed this B-tree
573                 Long loadedBtreeRevision = loadedBtrees.get( nameRevision.getName() );
574 
575                 if ( loadedBtreeRevision != null )
576                 {
577                     // The btree has already been loaded. The revision is necessarily higher
578                     if ( revision > currentRevision )
579                     {
580                         // We have a newer revision : switch to the new revision (we keep the offset atm)
581                         loadedBtrees.put( nameRevision.getName(), btreeOffset );
582                         currentRevision = revision;
583                     }
584                 }
585                 else
586                 {
587                     // This is a new B-tree
588                     loadedBtrees.put( nameRevision.getName(), btreeOffset );
589                     currentRevision = nameRevision.getRevision();
590                 }
591             }
592 
593             // TODO : clean up the old revisions...
594 
595 
596             // Now, we can load the real btrees using the offsets
597             for ( String btreeName : loadedBtrees.keySet() )
598             {
599                 long btreeOffset = loadedBtrees.get( btreeName );
600 
601                 PageIO[] btreePageIos = readPageIOs( btreeOffset, Long.MAX_VALUE );
602 
603                 BTree<?, ?> btree = BTreeFactory.<NameRevision, Long> createPersistedBTree();
604                 //( ( PersistedBTree<NameRevision, Long> ) btree ).setBtreeHeaderOffset( btreeOffset );
605                 loadBtree( btreePageIos, btree );
606 
607                 // Add the btree into the map of managed B-trees
608                 managedBtrees.put( btreeName, ( BTree<Object, Object> ) btree );
609             }
610 
611             // We are done ! Let's finish with the last initialization parts
612             endOfFileOffset = fileChannel.size();
613         }
614     }
615 
616 
617     /**
618      * Starts a transaction
619      */
620     public void beginTransaction()
621     {
622         if ( TXN_LOG.isDebugEnabled() )
623         {
624             TXN_LOG.debug( "Begining a new transaction on thread {}, TxnLevel {}",
625                 Thread.currentThread().getName(), getTxnLevel() );
626         }
627 
628         // First, take the lock if it's not already taken
629         if ( !( ( ReentrantLock ) transactionLock ).isHeldByCurrentThread() )
630         {
631             TXN_LOG.debug( "--> Lock taken" );
632             transactionLock.lock();
633         }
634         else
635         {
636             TXN_LOG.debug( "..o The current thread already holds the lock" );
637         }
638 
639         // Now, check the TLS state
640         incrementTxnLevel();
641     }
642 
643 
644     /**
645      * Commits a transaction
646      */
647     public void commit()
648     {
649         // We *must* own the transactionLock
650         if ( !transactionLock.isHeldByCurrentThread() )
651         {
652             String name = Thread.currentThread().getName();
653             String err = "This thread, '" + name + "' does not hold the transactionLock ";
654             TXN_LOG.error( err );
655             throw new RecordManagerException( err );
656         }
657 
658         if ( TXN_LOG.isDebugEnabled() )
659         {
660             TXN_LOG.debug( "Committing a transaction on thread {}, TxnLevel {}",
661                 Thread.currentThread().getName(), getTxnLevel() );
662         }
663 
664         if ( !fileChannel.isOpen() )
665         {
666             // Still we have to decrement the TransactionLevel
667             int txnLevel = decrementTxnLevel();
668 
669             if ( txnLevel == 0 )
670             {
671                 // We can safely release the lock
672                 // The file has been closed, nothing remains to commit, let's get out
673                 transactionLock.unlock();
674             }
675 
676             return;
677         }
678 
679         int nbTxnStarted = context.get();
680 
681         switch ( nbTxnStarted )
682         {
683             case ROLLBACKED_TXN:
684                 // The transaction was rollbacked, quit immediatelly
685                 transactionLock.unlock();
686 
687                 return;
688 
689             case 1:
690                 // We are done with the transaction, we can update the RMHeader and swap the BTreeHeaders
691                 // First update the RMHeader to be sure that we have a way to restore from a crash
692                 updateRecordManagerHeader();
693 
694                 // Swap the BtreeHeaders maps
695                 swapCurrentBtreeHeaders();
696 
697                 // We can now free pages
698                 for ( PageIO pageIo : freedPages )
699                 {
700                     try
701                     {
702                         free( pageIo );
703                     }
704                     catch ( IOException ioe )
705                     {
706                         throw new RecordManagerException( ioe.getMessage() );
707                     }
708                 }
709 
710                 // Release the allocated and freed pages list
711                 freedPages.clear();
712                 allocatedPages.clear();
713 
714                 // And update the RMHeader again, removing the old references to BOB and CPB b-tree headers
715                 // here, we have to erase the old references to keep only the new ones.
716                 updateRecordManagerHeader();
717 
718                 commitCount++;
719 
720                 if ( commitCount >= pageReclaimerThreshold )
721                 {
722                     runReclaimer();
723                 }
724 
725                 // Finally, decrement the number of started transactions
726                 // and release the global lock if possible
727                 int txnLevel = decrementTxnLevel();
728 
729                 if ( txnLevel == 0 )
730                 {
731                     transactionLock.unlock();
732                 }
733 
734                 return;
735 
736             default:
737                 // We are inner an existing transaction. Just update the necessary elements
738                 // Update the RMHeader to be sure that we have a way to restore from a crash
739                 updateRecordManagerHeader();
740 
741                 // Swap the BtreeHeaders maps
742                 //swapCurrentBtreeHeaders();
743 
744                 // We can now free pages
745                 for ( PageIO pageIo : freedPages )
746                 {
747                     try
748                     {
749                         free( pageIo );
750                     }
751                     catch ( IOException ioe )
752                     {
753                         throw new RecordManagerException( ioe.getMessage() );
754                     }
755                 }
756 
757                 // Release the allocated and freed pages list
758                 freedPages.clear();
759                 allocatedPages.clear();
760 
761                 // And update the RMHeader again, removing the old references to BOB and CPB b-tree headers
762                 // here, we have to erase the old references to keep only the new ones.
763                 updateRecordManagerHeader();
764 
765                 commitCount++;
766 
767                 if ( commitCount >= pageReclaimerThreshold )
768                 {
769                     runReclaimer();
770                 }
771 
772                 // Finally, decrement the number of started transactions
773                 // and release the global lock
774                 txnLevel = decrementTxnLevel();
775 
776                 if ( txnLevel == 0 )
777                 {
778                     transactionLock.unlock();
779                 }
780 
781                 return;
782         }
783     }
784 
785 
786     public boolean isContextOk()
787     {
788         return ( context == null ? true : ( context.get() == 0 ) );
789     }
790 
791 
792     /**
793      * Get the transactionLevel, ie the number of encapsulated update ops
794      */
795     private int getTxnLevel()
796     {
797         Integer nbTxnLevel = context.get();
798 
799         if ( nbTxnLevel == null )
800         {
801             return -1;
802         }
803 
804         return nbTxnLevel;
805     }
806 
807 
808     /**
809      * Increment the transactionLevel
810      */
811     private void incrementTxnLevel()
812     {
813         Integer nbTxnLevel = context.get();
814 
815         if ( nbTxnLevel == null )
816         {
817             context.set( 1 );
818         }
819         else
820         {
821             // And increment the counter of inner txn.
822             context.set( nbTxnLevel + 1 );
823         }
824 
825         if ( TXN_LOG.isDebugEnabled() )
826         {
827             TXN_LOG.debug( "Incrementing the TxnLevel : {}", context.get() );
828         }
829     }
830 
831 
832     /**
833      * Decrement the transactionLevel
834      */
835     private int decrementTxnLevel()
836     {
837         int nbTxnStarted = context.get() - 1;
838 
839         context.set( nbTxnStarted );
840 
841         if ( TXN_LOG.isDebugEnabled() )
842         {
843             TXN_LOG.debug( "Decrementing the TxnLevel : {}", context.get() );
844         }
845 
846         return nbTxnStarted;
847     }
848 
849 
850     /**
851      * Rollback a transaction
852      */
853     public void rollback()
854     {
855         // We *must* own the transactionLock
856         if ( !transactionLock.isHeldByCurrentThread() )
857         {
858             TXN_LOG.error( "This thread does not hold the transactionLock" );
859             throw new RecordManagerException( "This thread does not hold the transactionLock" );
860         }
861 
862         if ( TXN_LOG.isDebugEnabled() )
863         {
864             TXN_LOG.debug( "Rollbacking a new transaction on thread {}, TxnLevel {}",
865                 Thread.currentThread().getName(), getTxnLevel() );
866         }
867 
868         // Reset the counter
869         context.set( ROLLBACKED_TXN );
870 
871         // We can now free allocated pages, this is the end of the transaction
872         for ( PageIO pageIo : allocatedPages )
873         {
874             try
875             {
876                 free( pageIo );
877             }
878             catch ( IOException ioe )
879             {
880                 throw new RecordManagerException( ioe.getMessage() );
881             }
882         }
883 
884         // Release the allocated and freed pages list
885         freedPages.clear();
886         allocatedPages.clear();
887 
888         // And update the RMHeader
889         updateRecordManagerHeader();
890 
891         // And restore the BTreeHeaders new Map to the current state
892         revertBtreeHeaders();
893 
894         // This is an all-of-nothing operation : we can't have a transaction within
895         // a transaction that would survive an inner transaction rollback.
896         transactionLock.unlock();
897     }
898 
899 
900     /**
901      * Reads all the PageIOs that are linked to the page at the given position, including
902      * the first page.
903      *
904      * @param position The position of the first page
905      * @param limit The maximum bytes to read. Set this value to -1 when the size is unknown.
906      * @return An array of pages
907      */
908     /*no qualifier*/PageIO[] readPageIOs( long position, long limit ) throws IOException, EndOfFileExceededException
909     {
910         LOG.debug( "Read PageIOs at position {}", position );
911 
912         if ( limit <= 0 )
913         {
914             limit = Long.MAX_VALUE;
915         }
916 
917         PageIO firstPage = fetchPage( position );
918         firstPage.setSize();
919         List<PageIO> listPages = new ArrayList<PageIO>();
920         listPages.add( firstPage );
921         long dataRead = pageSize - LONG_SIZE - INT_SIZE;
922 
923         // Iterate on the pages, if needed
924         long nextPage = firstPage.getNextPage();
925 
926         if ( ( dataRead < limit ) && ( nextPage != NO_PAGE ) )
927         {
928             while ( dataRead < limit )
929             {
930                 PageIO page = fetchPage( nextPage );
931                 listPages.add( page );
932                 nextPage = page.getNextPage();
933                 dataRead += pageSize - LONG_SIZE;
934 
935                 if ( nextPage == NO_PAGE )
936                 {
937                     page.setNextPage( NO_PAGE );
938                     break;
939                 }
940             }
941         }
942 
943         LOG.debug( "Nb of PageIOs read : {}", listPages.size() );
944 
945         // Return
946         return listPages.toArray( new PageIO[]
947             {} );
948     }
949 
950 
951     /**
952      * Check the offset to be sure it's a valid one :
953      * <ul>
954      * <li>It's >= 0</li>
955      * <li>It's below the end of the file</li>
956      * <li>It's a multipl of the pageSize
957      * </ul>
958      * @param offset The offset to check
959      * @throws InvalidOffsetException If the offset is not valid
960      */
961     /* no qualifier */void checkOffset( long offset )
962     {
963         if ( ( offset < 0 ) || ( offset > endOfFileOffset ) || ( ( offset % pageSize ) != 0 ) )
964         {
965             throw new InvalidOffsetException( "Bad Offset : " + offset );
966         }
967     }
968 
969 
970     /**
971      * Read a B-tree from the disk. The meta-data are at the given position in the list of pages.
972      * We load a B-tree in two steps : first, we load the B-tree header, then the common informations
973      *
974      * @param pageIos The list of pages containing the meta-data
975      * @param btree The B-tree we have to initialize
976      * @throws InstantiationException
977      * @throws IllegalAccessException
978      * @throws ClassNotFoundException
979      * @throws NoSuchFieldException
980      * @throws SecurityException
981      * @throws IllegalArgumentException
982      */
983     private <K, V> void loadBtree( PageIO[] pageIos, BTree<K, V> btree ) throws EndOfFileExceededException,
984         IOException, ClassNotFoundException, IllegalAccessException, InstantiationException, IllegalArgumentException,
985         SecurityException, NoSuchFieldException
986     {
987         loadBtree( pageIos, btree, null );
988     }
989 
990 
991     /**
992      * Read a B-tree from the disk. The meta-data are at the given position in the list of pages.
993      * We load a B-tree in two steps : first, we load the B-tree header, then the common informations
994      *
995      * @param pageIos The list of pages containing the meta-data
996      * @param btree The B-tree we have to initialize
997      * @throws InstantiationException
998      * @throws IllegalAccessException
999      * @throws ClassNotFoundException
1000      * @throws NoSuchFieldException
1001      * @throws SecurityException
1002      * @throws IllegalArgumentException
1003      */
1004     /* no qualifier */<K, V> void loadBtree( PageIO[] pageIos, BTree btree, BTree<K, V> parentBTree )
1005         throws EndOfFileExceededException,
1006         IOException, ClassNotFoundException, IllegalAccessException, InstantiationException, IllegalArgumentException,
1007         SecurityException, NoSuchFieldException
1008     {
1009         long dataPos = 0L;
1010 
1011         // Process the B-tree header
1012         BTreeHeader<K, V> btreeHeader = new BTreeHeader<K, V>();
1013         btreeHeader.setBtree( btree );
1014 
1015         // The BtreeHeader offset
1016         btreeHeader.setBTreeHeaderOffset( pageIos[0].getOffset() );
1017 
1018         // The B-tree current revision
1019         long revision = readLong( pageIos, dataPos );
1020         btreeHeader.setRevision( revision );
1021         dataPos += LONG_SIZE;
1022 
1023         // The nb elems in the tree
1024         long nbElems = readLong( pageIos, dataPos );
1025         btreeHeader.setNbElems( nbElems );
1026         dataPos += LONG_SIZE;
1027 
1028         // The B-tree rootPage offset
1029         long rootPageOffset = readLong( pageIos, dataPos );
1030         btreeHeader.setRootPageOffset( rootPageOffset );
1031         dataPos += LONG_SIZE;
1032 
1033         // The B-tree information offset
1034         long btreeInfoOffset = readLong( pageIos, dataPos );
1035 
1036         // Now, process the common informations
1037         PageIO[] infoPageIos = readPageIOs( btreeInfoOffset, Long.MAX_VALUE );
1038         ( ( PersistedBTree<K, V> ) btree ).setBtreeInfoOffset( infoPageIos[0].getOffset() );
1039         dataPos = 0L;
1040 
1041         // The B-tree page size
1042         int btreePageSize = readInt( infoPageIos, dataPos );
1043         BTreeFactory.setPageSize( btree, btreePageSize );
1044         dataPos += INT_SIZE;
1045 
1046         // The tree name
1047         ByteBuffer btreeNameBytes = readBytes( infoPageIos, dataPos );
1048         dataPos += INT_SIZE + btreeNameBytes.limit();
1049         String btreeName = Strings.utf8ToString( btreeNameBytes );
1050         BTreeFactory.setName( btree, btreeName );
1051 
1052         // The keySerializer FQCN
1053         ByteBuffer keySerializerBytes = readBytes( infoPageIos, dataPos );
1054         dataPos += INT_SIZE + keySerializerBytes.limit();
1055 
1056         String keySerializerFqcn = "";
1057 
1058         if ( keySerializerBytes != null )
1059         {
1060             keySerializerFqcn = Strings.utf8ToString( keySerializerBytes );
1061         }
1062 
1063         BTreeFactory.setKeySerializer( btree, keySerializerFqcn );
1064 
1065         // The valueSerialier FQCN
1066         ByteBuffer valueSerializerBytes = readBytes( infoPageIos, dataPos );
1067 
1068         String valueSerializerFqcn = "";
1069         dataPos += INT_SIZE + valueSerializerBytes.limit();
1070 
1071         if ( valueSerializerBytes != null )
1072         {
1073             valueSerializerFqcn = Strings.utf8ToString( valueSerializerBytes );
1074         }
1075 
1076         BTreeFactory.setValueSerializer( btree, valueSerializerFqcn );
1077 
1078         // The B-tree allowDuplicates flag
1079         int allowDuplicates = readInt( infoPageIos, dataPos );
1080         ( ( PersistedBTree<K, V> ) btree ).setAllowDuplicates( allowDuplicates != 0 );
1081         dataPos += INT_SIZE;
1082 
1083         // Set the recordManager in the btree
1084         ( ( PersistedBTree<K, V> ) btree ).setRecordManager( this );
1085 
1086         // Set the current revision to the one stored in the B-tree header
1087         // Here, we have to tell the BTree to keep this revision in the
1088         // btreeRevisions Map, thus the 'true' parameter at the end.
1089         ( ( PersistedBTree<K, V> ) btree ).storeRevision( btreeHeader, true );
1090 
1091         // Now, init the B-tree
1092         ( ( PersistedBTree<K, V> ) btree ).init( parentBTree );
1093 
1094         // Update the BtreeHeaders Maps
1095         currentBTreeHeaders.put( btree.getName(), ( ( PersistedBTree<K, V> ) btree ).getBtreeHeader() );
1096         newBTreeHeaders.put( btree.getName(), ( ( PersistedBTree<K, V> ) btree ).getBtreeHeader() );
1097 
1098         // Read the rootPage pages on disk
1099         PageIO[] rootPageIos = readPageIOs( rootPageOffset, Long.MAX_VALUE );
1100 
1101         Page<K, V> btreeRoot = readPage( btree, rootPageIos );
1102         BTreeFactory.setRecordManager( btree, this );
1103 
1104         BTreeFactory.setRootPage( btree, btreeRoot );
1105     }
1106 
1107 
1108     /**
1109      * Deserialize a Page from a B-tree at a give position
1110      *
1111      * @param btree The B-tree we want to read a Page from
1112      * @param offset The position in the file for this page
1113      * @return The read page
1114      * @throws EndOfFileExceededException If we have reached the end of the file while reading the page
1115      */
1116     public <K, V> Page<K, V> deserialize( BTree<K, V> btree, long offset ) throws EndOfFileExceededException,
1117         IOException
1118     {
1119         checkOffset( offset );
1120         PageIO[] rootPageIos = readPageIOs( offset, Long.MAX_VALUE );
1121 
1122         Page<K, V> page = readPage( btree, rootPageIos );
1123 
1124         return page;
1125     }
1126 
1127 
1128     /**
1129      * Read a page from some PageIO for a given B-tree
1130      * @param btree The B-tree we want to read a page for
1131      * @param pageIos The PageIO containing the raw data
1132      * @return The read Page if successful
1133      * @throws IOException If the deserialization failed
1134      */
1135     private <K, V> Page<K, V> readPage( BTree<K, V> btree, PageIO[] pageIos ) throws IOException
1136     {
1137         // Deserialize the rootPage now
1138         long position = 0L;
1139 
1140         // The revision
1141         long revision = readLong( pageIos, position );
1142         position += LONG_SIZE;
1143 
1144         // The number of elements in the page
1145         int nbElems = readInt( pageIos, position );
1146         position += INT_SIZE;
1147 
1148         // The size of the data containing the keys and values
1149         Page<K, V> page = null;
1150 
1151         // Reads the bytes containing all the keys and values, if we have some
1152         // We read  big blog of data into  ByteBuffer, then we will process
1153         // this ByteBuffer
1154         ByteBuffer byteBuffer = readBytes( pageIos, position );
1155 
1156         // Now, deserialize the data block. If the number of elements
1157         // is positive, it's a Leaf, otherwise it's a Node
1158         // Note that only a leaf can have 0 elements, and it's the root page then.
1159         if ( nbElems >= 0 )
1160         {
1161             // It's a leaf
1162             page = readLeafKeysAndValues( btree, nbElems, revision, byteBuffer, pageIos );
1163         }
1164         else
1165         {
1166             // It's a node
1167             page = readNodeKeysAndValues( btree, -nbElems, revision, byteBuffer, pageIos );
1168         }
1169 
1170         ( ( AbstractPage<K, V> ) page ).setOffset( pageIos[0].getOffset() );
1171         if ( pageIos.length > 1 )
1172         {
1173             ( ( AbstractPage<K, V> ) page ).setLastOffset( pageIos[pageIos.length - 1].getOffset() );
1174         }
1175 
1176         return page;
1177     }
1178 
1179 
1180     /**
1181      * Deserialize a Leaf from some PageIOs
1182      */
1183     private <K, V> PersistedLeaf<K, V> readLeafKeysAndValues( BTree<K, V> btree, int nbElems, long revision,
1184         ByteBuffer byteBuffer, PageIO[] pageIos )
1185     {
1186         // Its a leaf, create it
1187         PersistedLeaf<K, V> leaf = ( PersistedLeaf<K, V> ) BTreeFactory.createLeaf( btree, revision, nbElems );
1188 
1189         // Store the page offset on disk
1190         leaf.setOffset( pageIos[0].getOffset() );
1191         leaf.setLastOffset( pageIos[pageIos.length - 1].getOffset() );
1192 
1193         int[] keyLengths = new int[nbElems];
1194         int[] valueLengths = new int[nbElems];
1195 
1196         boolean isNotSubTree = ( btree.getType() != BTreeTypeEnum.PERSISTED_SUB );
1197 
1198         // Read each key and value
1199         for ( int i = 0; i < nbElems; i++ )
1200         {
1201             if ( isNotSubTree )
1202             {
1203                 // Read the number of values
1204                 int nbValues = byteBuffer.getInt();
1205                 PersistedValueHolder<V> valueHolder = null;
1206 
1207                 if ( nbValues < 0 )
1208                 {
1209                     // This is a sub-btree
1210                     byte[] btreeOffsetBytes = new byte[LONG_SIZE];
1211                     byteBuffer.get( btreeOffsetBytes );
1212 
1213                     // Create the valueHolder. As the number of values is negative, we have to switch
1214                     // to a positive value but as we start at -1 for 0 value, add 1.
1215                     valueHolder = new PersistedValueHolder<V>( btree, 1 - nbValues, btreeOffsetBytes );
1216                 }
1217                 else
1218                 {
1219                     // This is an array
1220                     // Read the value's array length
1221                     valueLengths[i] = byteBuffer.getInt();
1222 
1223                     // This is an Array of values, read the byte[] associated with it
1224                     byte[] arrayBytes = new byte[valueLengths[i]];
1225                     byteBuffer.get( arrayBytes );
1226                     valueHolder = new PersistedValueHolder<V>( btree, nbValues, arrayBytes );
1227                 }
1228 
1229                 BTreeFactory.setValue( btree, leaf, i, valueHolder );
1230             }
1231 
1232             keyLengths[i] = byteBuffer.getInt();
1233             byte[] data = new byte[keyLengths[i]];
1234             byteBuffer.get( data );
1235             BTreeFactory.setKey( btree, leaf, i, data );
1236         }
1237 
1238         return leaf;
1239     }
1240 
1241 
1242     /**
1243      * Deserialize a Node from some PageIos
1244      */
1245     private <K, V> PersistedNode<K, V> readNodeKeysAndValues( BTree<K, V> btree, int nbElems, long revision,
1246         ByteBuffer byteBuffer, PageIO[] pageIos ) throws IOException
1247     {
1248         PersistedNode<K, V> node = ( PersistedNode<K, V> ) BTreeFactory.createNode( btree, revision, nbElems );
1249 
1250         // Read each value and key
1251         for ( int i = 0; i < nbElems; i++ )
1252         {
1253             // This is an Offset
1254             long offset = LongSerializer.INSTANCE.deserialize( byteBuffer );
1255             long lastOffset = LongSerializer.INSTANCE.deserialize( byteBuffer );
1256 
1257             PersistedPageHolder<K, V> valueHolder = new PersistedPageHolder<K, V>( btree, null, offset, lastOffset );
1258             node.setValue( i, valueHolder );
1259 
1260             // Read the key length
1261             int keyLength = byteBuffer.getInt();
1262 
1263             int currentPosition = byteBuffer.position();
1264 
1265             // and the key value
1266             K key = btree.getKeySerializer().deserialize( byteBuffer );
1267 
1268             // Set the new position now
1269             byteBuffer.position( currentPosition + keyLength );
1270 
1271             BTreeFactory.setKey( btree, node, i, key );
1272         }
1273 
1274         // and read the last value, as it's a node
1275         long offset = LongSerializer.INSTANCE.deserialize( byteBuffer );
1276         long lastOffset = LongSerializer.INSTANCE.deserialize( byteBuffer );
1277 
1278         PersistedPageHolder<K, V> valueHolder = new PersistedPageHolder<K, V>( btree, null, offset, lastOffset );
1279         node.setValue( nbElems, valueHolder );
1280 
1281         return node;
1282     }
1283 
1284 
1285     /**
1286      * Read a byte[] from pages.
1287      *
1288      * @param pageIos The pages we want to read the byte[] from
1289      * @param position The position in the data stored in those pages
1290      * @return The byte[] we have read
1291      */
1292     /* no qualifier */ByteBuffer readBytes( PageIO[] pageIos, long position )
1293     {
1294         // Read the byte[] length first
1295         int length = readInt( pageIos, position );
1296         position += INT_SIZE;
1297 
1298         // Compute the page in which we will store the data given the
1299         // current position
1300         int pageNb = computePageNb( position );
1301 
1302         // Compute the position in the current page
1303         int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize;
1304 
1305         // Check that the length is correct : it should fit in the provided pageIos
1306         int pageEnd = computePageNb( position + length );
1307 
1308         if ( pageEnd > pageIos.length )
1309         {
1310             // This is wrong...
1311             LOG.error( "Wrong size : {}, it's larger than the number of provided pages {}", length, pageIos.length );
1312             throw new ArrayIndexOutOfBoundsException();
1313         }
1314 
1315         ByteBuffer pageData = pageIos[pageNb].getData();
1316         int remaining = pageData.capacity() - pagePos;
1317 
1318         if ( length == 0 )
1319         {
1320             // No bytes to read : return null;
1321             return null;
1322         }
1323         else
1324         {
1325             ByteBuffer bytes = ByteBuffer.allocate( length );
1326 
1327             while ( length > 0 )
1328             {
1329                 if ( length <= remaining )
1330                 {
1331                     pageData.mark();
1332                     pageData.position( pagePos );
1333                     int oldLimit = pageData.limit();
1334                     pageData.limit( pagePos + length );
1335                     bytes.put( pageData );
1336                     pageData.limit( oldLimit );
1337                     pageData.reset();
1338                     bytes.rewind();
1339 
1340                     return bytes;
1341                 }
1342 
1343                 pageData.mark();
1344                 pageData.position( pagePos );
1345                 int oldLimit = pageData.limit();
1346                 pageData.limit( pagePos + remaining );
1347                 bytes.put( pageData );
1348                 pageData.limit( oldLimit );
1349                 pageData.reset();
1350                 pageNb++;
1351                 pagePos = LINK_SIZE;
1352                 pageData = pageIos[pageNb].getData();
1353                 length -= remaining;
1354                 remaining = pageData.capacity() - pagePos;
1355             }
1356 
1357             bytes.rewind();
1358 
1359             return bytes;
1360         }
1361     }
1362 
1363 
1364     /**
1365      * Read an int from pages
1366      * @param pageIos The pages we want to read the int from
1367      * @param position The position in the data stored in those pages
1368      * @return The int we have read
1369      */
1370     /* no qualifier */int readInt( PageIO[] pageIos, long position )
1371     {
1372         // Compute the page in which we will store the data given the
1373         // current position
1374         int pageNb = computePageNb( position );
1375 
1376         // Compute the position in the current page
1377         int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize;
1378 
1379         ByteBuffer pageData = pageIos[pageNb].getData();
1380         int remaining = pageData.capacity() - pagePos;
1381         int value = 0;
1382 
1383         if ( remaining >= INT_SIZE )
1384         {
1385             value = pageData.getInt( pagePos );
1386         }
1387         else
1388         {
1389             value = 0;
1390 
1391             switch ( remaining )
1392             {
1393                 case 3:
1394                     value += ( ( pageData.get( pagePos + 2 ) & 0x00FF ) << 8 );
1395                     // Fallthrough !!!
1396 
1397                 case 2:
1398                     value += ( ( pageData.get( pagePos + 1 ) & 0x00FF ) << 16 );
1399                     // Fallthrough !!!
1400 
1401                 case 1:
1402                     value += ( pageData.get( pagePos ) << 24 );
1403                     break;
1404             }
1405 
1406             // Now deal with the next page
1407             pageData = pageIos[pageNb + 1].getData();
1408             pagePos = LINK_SIZE;
1409 
1410             switch ( remaining )
1411             {
1412                 case 1:
1413                     value += ( pageData.get( pagePos ) & 0x00FF ) << 16;
1414                     // fallthrough !!!
1415 
1416                 case 2:
1417                     value += ( pageData.get( pagePos + 2 - remaining ) & 0x00FF ) << 8;
1418                     // fallthrough !!!
1419 
1420                 case 3:
1421                     value += ( pageData.get( pagePos + 3 - remaining ) & 0x00FF );
1422                     break;
1423             }
1424         }
1425 
1426         return value;
1427     }
1428 
1429 
1430     /**
1431      * Read a byte from pages
1432      * @param pageIos The pages we want to read the byte from
1433      * @param position The position in the data stored in those pages
1434      * @return The byte we have read
1435      */
1436     private byte readByte( PageIO[] pageIos, long position )
1437     {
1438         // Compute the page in which we will store the data given the
1439         // current position
1440         int pageNb = computePageNb( position );
1441 
1442         // Compute the position in the current page
1443         int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize;
1444 
1445         ByteBuffer pageData = pageIos[pageNb].getData();
1446         byte value = 0;
1447 
1448         value = pageData.get( pagePos );
1449 
1450         return value;
1451     }
1452 
1453 
1454     /**
1455      * Read a long from pages
1456      * @param pageIos The pages we want to read the long from
1457      * @param position The position in the data stored in those pages
1458      * @return The long we have read
1459      */
1460     /* no qualifier */long readLong( PageIO[] pageIos, long position )
1461     {
1462         // Compute the page in which we will store the data given the
1463         // current position
1464         int pageNb = computePageNb( position );
1465 
1466         // Compute the position in the current page
1467         int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize;
1468 
1469         ByteBuffer pageData = pageIos[pageNb].getData();
1470         int remaining = pageData.capacity() - pagePos;
1471         long value = 0L;
1472 
1473         if ( remaining >= LONG_SIZE )
1474         {
1475             value = pageData.getLong( pagePos );
1476         }
1477         else
1478         {
1479             switch ( remaining )
1480             {
1481                 case 7:
1482                     value += ( ( ( long ) pageData.get( pagePos + 6 ) & 0x00FF ) << 8 );
1483                     // Fallthrough !!!
1484 
1485                 case 6:
1486                     value += ( ( ( long ) pageData.get( pagePos + 5 ) & 0x00FF ) << 16 );
1487                     // Fallthrough !!!
1488 
1489                 case 5:
1490                     value += ( ( ( long ) pageData.get( pagePos + 4 ) & 0x00FF ) << 24 );
1491                     // Fallthrough !!!
1492 
1493                 case 4:
1494                     value += ( ( ( long ) pageData.get( pagePos + 3 ) & 0x00FF ) << 32 );
1495                     // Fallthrough !!!
1496 
1497                 case 3:
1498                     value += ( ( ( long ) pageData.get( pagePos + 2 ) & 0x00FF ) << 40 );
1499                     // Fallthrough !!!
1500 
1501                 case 2:
1502                     value += ( ( ( long ) pageData.get( pagePos + 1 ) & 0x00FF ) << 48 );
1503                     // Fallthrough !!!
1504 
1505                 case 1:
1506                     value += ( ( long ) pageData.get( pagePos ) << 56 );
1507                     break;
1508             }
1509 
1510             // Now deal with the next page
1511             pageData = pageIos[pageNb + 1].getData();
1512             pagePos = LINK_SIZE;
1513 
1514             switch ( remaining )
1515             {
1516                 case 1:
1517                     value += ( ( long ) pageData.get( pagePos ) & 0x00FF ) << 48;
1518                     // fallthrough !!!
1519 
1520                 case 2:
1521                     value += ( ( long ) pageData.get( pagePos + 2 - remaining ) & 0x00FF ) << 40;
1522                     // fallthrough !!!
1523 
1524                 case 3:
1525                     value += ( ( long ) pageData.get( pagePos + 3 - remaining ) & 0x00FF ) << 32;
1526                     // fallthrough !!!
1527 
1528                 case 4:
1529                     value += ( ( long ) pageData.get( pagePos + 4 - remaining ) & 0x00FF ) << 24;
1530                     // fallthrough !!!
1531 
1532                 case 5:
1533                     value += ( ( long ) pageData.get( pagePos + 5 - remaining ) & 0x00FF ) << 16;
1534                     // fallthrough !!!
1535 
1536                 case 6:
1537                     value += ( ( long ) pageData.get( pagePos + 6 - remaining ) & 0x00FF ) << 8;
1538                     // fallthrough !!!
1539 
1540                 case 7:
1541                     value += ( ( long ) pageData.get( pagePos + 7 - remaining ) & 0x00FF );
1542                     break;
1543             }
1544         }
1545 
1546         return value;
1547     }
1548 
1549 
1550     /**
1551      * Manage a B-tree. The btree will be added and managed by this RecordManager. We will create a
1552      * new RootPage for this added B-tree, which will contain no data.<br/>
1553      * This method is threadsafe.
1554      * Managing a btree is a matter of storing an reference to the managed B-tree in the B-tree Of B-trees.
1555      * We store a tuple of NameRevision (where revision is 0L) and a offset to the B-tree header.
1556      * At the same time, we keep a track of the managed B-trees in a Map.
1557      *
1558      * @param btree The new B-tree to manage.
1559      * @param treeType flag indicating if this is an internal tree
1560      *
1561      * @throws BTreeAlreadyManagedException If the B-tree is already managed
1562      * @throws IOException if there was a problem while accessing the file
1563      */
1564     public synchronized <K, V> void manage( BTree<K, V> btree ) throws BTreeAlreadyManagedException, IOException
1565     {
1566         beginTransaction();
1567 
1568         try
1569         {
1570             LOG.debug( "Managing the btree {}", btree.getName() );
1571             BTreeFactory.setRecordManager( btree, this );
1572 
1573             String name = btree.getName();
1574 
1575             if ( managedBtrees.containsKey( name ) )
1576             {
1577                 // There is already a B-tree with this name in the recordManager...
1578                 LOG.error( "There is already a B-tree named '{}' managed by this recordManager", name );
1579                 rollback();
1580                 throw new BTreeAlreadyManagedException( name );
1581             }
1582 
1583             // Now, write the B-tree informations
1584             long btreeInfoOffset = writeBtreeInfo( btree );
1585             BTreeHeader<K, V> btreeHeader = ( ( AbstractBTree<K, V> ) btree ).getBtreeHeader();
1586             ( ( PersistedBTree<K, V> ) btree ).setBtreeInfoOffset( btreeInfoOffset );
1587 
1588             // Serialize the B-tree root page
1589             Page<K, V> rootPage = btreeHeader.getRootPage();
1590 
1591             PageIO[] rootPageIos = serializePage( btree, btreeHeader.getRevision(), rootPage );
1592 
1593             // Get the reference on the first page
1594             long rootPageOffset = rootPageIos[0].getOffset();
1595 
1596             // Store the rootPageOffset into the Btree header and into the rootPage
1597             btreeHeader.setRootPageOffset( rootPageOffset );
1598             ( ( PersistedLeaf<K, V> ) rootPage ).setOffset( rootPageOffset );
1599 
1600             LOG.debug( "Flushing the newly managed '{}' btree rootpage", btree.getName() );
1601             flushPages( rootPageIos );
1602 
1603             // And the B-tree header
1604             long btreeHeaderOffset = writeBtreeHeader( btree, btreeHeader );
1605 
1606             // Now, if this is a new B-tree, add it to the B-tree of B-trees
1607             // Add the btree into the map of managed B-trees
1608             managedBtrees.put( name, ( BTree<Object, Object> ) btree );
1609 
1610             // And in the Map of currentBtreeHeaders and newBtreeHeaders
1611             currentBTreeHeaders.put( name, btreeHeader );
1612             newBTreeHeaders.put( name, btreeHeader );
1613 
1614             // We can safely increment the number of managed B-trees
1615             nbBtree++;
1616 
1617             // Create the new NameRevision
1618             NameRevision nameRevision = new NameRevision( name, 0L );
1619 
1620             // Inject it into the B-tree of B-tree
1621             btreeOfBtrees.insert( nameRevision, btreeHeaderOffset );
1622             commit();
1623         }
1624         catch ( IOException ioe )
1625         {
1626             rollback();
1627             throw ioe;
1628         }
1629     }
1630 
1631 
1632     /**
1633      * Managing a btree is a matter of storing an reference to the managed B-tree in the B-tree Of B-trees.
1634      * We store a tuple of NameRevision (where revision is 0L) and a offset to the B-tree header.
1635      * At the same time, we keep a track of the managed B-trees in a Map.
1636      *
1637      * @param btree The new B-tree to manage.
1638      * @param treeType flag indicating if this is an internal tree
1639      *
1640      * @throws BTreeAlreadyManagedException If the B-tree is already managed
1641      * @throws IOException
1642      */
1643     public synchronized <K, V> void manageSubBtree( BTree<K, V> btree )
1644         throws BTreeAlreadyManagedException, IOException
1645     {
1646         LOG.debug( "Managing the sub-btree {}", btree.getName() );
1647         BTreeFactory.setRecordManager( btree, this );
1648 
1649         String name = btree.getName();
1650 
1651         if ( managedBtrees.containsKey( name ) )
1652         {
1653             // There is already a subB-tree with this name in the recordManager...
1654             LOG.error( "There is already a sub-B-tree named '{}' managed by this recordManager", name );
1655             throw new BTreeAlreadyManagedException( name );
1656         }
1657 
1658         // Now, write the subB-tree informations
1659         long btreeInfoOffset = writeBtreeInfo( btree );
1660         BTreeHeader<K, V> btreeHeader = ( ( AbstractBTree<K, V> ) btree ).getBtreeHeader();
1661         ( ( PersistedBTree<K, V> ) btree ).setBtreeInfoOffset( btreeInfoOffset );
1662 
1663         // Serialize the B-tree root page
1664         Page<K, V> rootPage = btreeHeader.getRootPage();
1665 
1666         PageIO[] rootPageIos = serializePage( btree, btreeHeader.getRevision(), rootPage );
1667 
1668         // Get the reference on the first page
1669         long rootPageOffset = rootPageIos[0].getOffset();
1670 
1671         // Store the rootPageOffset into the Btree header and into the rootPage
1672         btreeHeader.setRootPageOffset( rootPageOffset );
1673 
1674         ( ( AbstractPage<K, V> ) rootPage ).setOffset( rootPageOffset );
1675 
1676         LOG.debug( "Flushing the newly managed '{}' btree rootpage", btree.getName() );
1677         flushPages( rootPageIos );
1678 
1679         // And the B-tree header
1680         long btreeHeaderOffset = writeBtreeHeader( btree, btreeHeader );
1681 
1682         // Now, if this is a new B-tree, add it to the B-tree of B-trees
1683         // Add the btree into the map of managed B-trees
1684         if ( ( btree.getType() != BTreeTypeEnum.BTREE_OF_BTREES ) && 
1685             ( btree.getType() != BTreeTypeEnum.COPIED_PAGES_BTREE ) && 
1686             ( btree.getType() != BTreeTypeEnum.PERSISTED_SUB ) )
1687         {
1688             managedBtrees.put( name, ( BTree<Object, Object> ) btree );
1689         }
1690 
1691         // And in the Map of currentBtreeHeaders and newBtreeHeaders
1692         currentBTreeHeaders.put( name, btreeHeader );
1693         newBTreeHeaders.put( name, btreeHeader );
1694 
1695         // Create the new NameRevision
1696         NameRevision nameRevision = new NameRevision( name, 0L );
1697 
1698         // Inject it into the B-tree of B-tree
1699         if ( ( btree.getType() != BTreeTypeEnum.BTREE_OF_BTREES ) && 
1700             ( btree.getType() != BTreeTypeEnum.COPIED_PAGES_BTREE ) && 
1701             ( btree.getType() != BTreeTypeEnum.PERSISTED_SUB ) )
1702         {
1703             // We can safely increment the number of managed B-trees
1704             nbBtree++;
1705 
1706             btreeOfBtrees.insert( nameRevision, btreeHeaderOffset );
1707         }
1708 
1709         updateRecordManagerHeader();
1710     }
1711 
1712 
1713     /**
1714      * Serialize a new Page. It will contain the following data :<br/>
1715      * <ul>
1716      * <li>the revision : a long</li>
1717      * <li>the number of elements : an int (if <= 0, it's a Node, otherwise it's a Leaf)</li>
1718      * <li>the size of the values/keys when serialized
1719      * <li>the keys : an array of serialized keys</li>
1720      * <li>the values : an array of references to the children pageIO offset (stored as long)
1721      * if it's a Node, or a list of values if it's a Leaf</li>
1722      * <li></li>
1723      * </ul>
1724      *
1725      * @param revision The node revision
1726      * @param keys The keys to serialize
1727      * @param children The references to the children
1728      * @return An array of pages containing the serialized node
1729      * @throws IOException
1730      */
1731     private <K, V> PageIO[] serializePage( BTree<K, V> btree, long revision, Page<K, V> page ) throws IOException
1732     {
1733         int nbElems = page.getNbElems();
1734 
1735         boolean isNotSubTree = ( btree.getType() != BTreeTypeEnum.PERSISTED_SUB );
1736 
1737         if ( nbElems == 0 )
1738         {
1739             return serializeRootPage( revision );
1740         }
1741         else
1742         {
1743             // Prepare a list of byte[] that will contain the serialized page
1744             int nbBuffers = 1 + 1 + 1 + nbElems * 3;
1745             int dataSize = 0;
1746             int serializedSize = 0;
1747 
1748             if ( page.isNode() )
1749             {
1750                 // A Node has one more value to store
1751                 nbBuffers++;
1752             }
1753 
1754             // Now, we can create the list with the right size
1755             List<byte[]> serializedData = new ArrayList<byte[]>( nbBuffers );
1756 
1757             // The revision
1758             byte[] buffer = LongSerializer.serialize( revision );
1759             serializedData.add( buffer );
1760             serializedSize += buffer.length;
1761 
1762             // The number of elements
1763             // Make it a negative value if it's a Node
1764             int pageNbElems = nbElems;
1765 
1766             if ( page.isNode() )
1767             {
1768                 pageNbElems = -nbElems;
1769             }
1770 
1771             buffer = IntSerializer.serialize( pageNbElems );
1772             serializedData.add( buffer );
1773             serializedSize += buffer.length;
1774 
1775             // Iterate on the keys and values. We first serialize the value, then the key
1776             // until we are done with all of them. If we are serializing a page, we have
1777             // to serialize one more value
1778             for ( int pos = 0; pos < nbElems; pos++ )
1779             {
1780                 // Start with the value
1781                 if ( page.isNode() )
1782                 {
1783                     dataSize += serializeNodeValue( ( PersistedNode<K, V> ) page, pos, serializedData );
1784                     dataSize += serializeNodeKey( ( PersistedNode<K, V> ) page, pos, serializedData );
1785                 }
1786                 else
1787                 {
1788                     if ( isNotSubTree )
1789                     {
1790                         dataSize += serializeLeafValue( ( PersistedLeaf<K, V> ) page, pos, serializedData );
1791                     }
1792 
1793                     dataSize += serializeLeafKey( ( PersistedLeaf<K, V> ) page, pos, serializedData );
1794                 }
1795             }
1796 
1797             // Nodes have one more value to serialize
1798             if ( page.isNode() )
1799             {
1800                 dataSize += serializeNodeValue( ( PersistedNode<K, V> ) page, nbElems, serializedData );
1801             }
1802 
1803             // Store the data size
1804             buffer = IntSerializer.serialize( dataSize );
1805             serializedData.add( 2, buffer );
1806             serializedSize += buffer.length;
1807 
1808             serializedSize += dataSize;
1809 
1810             // We are done. Allocate the pages we need to store the data
1811             PageIO[] pageIos = getFreePageIOs( serializedSize );
1812 
1813             // And store the data into those pages
1814             long position = 0L;
1815 
1816             for ( byte[] bytes : serializedData )
1817             {
1818                 position = storeRaw( position, bytes, pageIos );
1819             }
1820 
1821             return pageIos;
1822         }
1823     }
1824 
1825 
1826     /**
1827      * Serialize a Node's key
1828      */
1829     private <K, V> int serializeNodeKey( PersistedNode<K, V> node, int pos, List<byte[]> serializedData )
1830     {
1831         KeyHolder<K> holder = node.getKeyHolder( pos );
1832         byte[] buffer = ( ( PersistedKeyHolder<K> ) holder ).getRaw();
1833 
1834         // We have to store the serialized key length
1835         byte[] length = IntSerializer.serialize( buffer.length );
1836         serializedData.add( length );
1837 
1838         // And store the serialized key now if not null
1839         if ( buffer.length != 0 )
1840         {
1841             serializedData.add( buffer );
1842         }
1843 
1844         return buffer.length + INT_SIZE;
1845     }
1846 
1847 
1848     /**
1849      * Serialize a Node's Value. We store the two offsets of the child page.
1850      */
1851     private <K, V> int serializeNodeValue( PersistedNode<K, V> node, int pos, List<byte[]> serializedData )
1852         throws IOException
1853     {
1854         // For a node, we just store the children's offsets
1855         Page<K, V> child = node.getReference( pos );
1856 
1857         // The first offset
1858         byte[] buffer = LongSerializer.serialize( ( ( AbstractPage<K, V> ) child ).getOffset() );
1859         serializedData.add( buffer );
1860         int dataSize = buffer.length;
1861 
1862         // The last offset
1863         buffer = LongSerializer.serialize( ( ( AbstractPage<K, V> ) child ).getLastOffset() );
1864         serializedData.add( buffer );
1865         dataSize += buffer.length;
1866 
1867         return dataSize;
1868     }
1869 
1870 
1871     /**
1872      * Serialize a Leaf's key
1873      */
1874     private <K, V> int serializeLeafKey( PersistedLeaf<K, V> leaf, int pos, List<byte[]> serializedData )
1875     {
1876         int dataSize = 0;
1877         KeyHolder<K> keyHolder = leaf.getKeyHolder( pos );
1878         byte[] keyData = ( ( PersistedKeyHolder<K> ) keyHolder ).getRaw();
1879 
1880         if ( keyData != null )
1881         {
1882             // We have to store the serialized key length
1883             byte[] length = IntSerializer.serialize( keyData.length );
1884             serializedData.add( length );
1885 
1886             // And the key data
1887             serializedData.add( keyData );
1888             dataSize += keyData.length + INT_SIZE;
1889         }
1890         else
1891         {
1892             serializedData.add( IntSerializer.serialize( 0 ) );
1893             dataSize += INT_SIZE;
1894         }
1895 
1896         return dataSize;
1897     }
1898 
1899 
1900     /**
1901      * Serialize a Leaf's Value.
1902      */
1903     private <K, V> int serializeLeafValue( PersistedLeaf<K, V> leaf, int pos, List<byte[]> serializedData )
1904         throws IOException
1905     {
1906         // The value can be an Array or a sub-btree, but we don't care
1907         // we just iterate on all the values
1908         ValueHolder<V> valueHolder = leaf.getValue( pos );
1909         int dataSize = 0;
1910         int nbValues = valueHolder.size();
1911 
1912         if ( nbValues == 0 )
1913         {
1914             // No value.
1915             byte[] buffer = IntSerializer.serialize( nbValues );
1916             serializedData.add( buffer );
1917 
1918             return buffer.length;
1919         }
1920 
1921         if ( !valueHolder.isSubBtree() )
1922         {
1923             // Write the nb elements first
1924             byte[] buffer = IntSerializer.serialize( nbValues );
1925             serializedData.add( buffer );
1926             dataSize = INT_SIZE;
1927 
1928             // We have a serialized value. Just flush it
1929             byte[] data = ( ( PersistedValueHolder<V> ) valueHolder ).getRaw();
1930             dataSize += data.length;
1931 
1932             // Store the data size
1933             buffer = IntSerializer.serialize( data.length );
1934             serializedData.add( buffer );
1935             dataSize += INT_SIZE;
1936 
1937             // and add the data if it's not 0
1938             if ( data.length > 0 )
1939             {
1940                 serializedData.add( data );
1941             }
1942         }
1943         else
1944         {
1945             // Store the nbVlues as a negative number. We add 1 so that 0 is not confused with an Array value
1946             byte[] buffer = IntSerializer.serialize( -( nbValues + 1 ) );
1947             serializedData.add( buffer );
1948             dataSize += buffer.length;
1949 
1950             // the B-tree offset
1951             buffer = LongSerializer.serialize( ( ( PersistedValueHolder<V> ) valueHolder ).getOffset() );
1952             serializedData.add( buffer );
1953             dataSize += buffer.length;
1954         }
1955 
1956         return dataSize;
1957     }
1958 
1959 
1960     /**
1961      * Write a root page with no elements in it
1962      */
1963     private PageIO[] serializeRootPage( long revision ) throws IOException
1964     {
1965         // We will have 1 single page if we have no elements
1966         PageIO[] pageIos = new PageIO[1];
1967 
1968         // This is either a new root page or a new page that will be filled later
1969         PageIO newPage = fetchNewPage();
1970 
1971         // We need first to create a byte[] that will contain all the data
1972         // For the root page, this is easy, as we only have to store the revision,
1973         // and the number of elements, which is 0.
1974         long position = 0L;
1975 
1976         position = store( position, revision, newPage );
1977         position = store( position, 0, newPage );
1978 
1979         // Update the page size now
1980         newPage.setSize( ( int ) position );
1981 
1982         // Insert the result into the array of PageIO
1983         pageIos[0] = newPage;
1984 
1985         return pageIos;
1986     }
1987 
1988 
1989     /**
1990      * Update the RecordManager header, injecting the following data :
1991      *
1992      * <pre>
1993      * +---------------------+
1994      * | PageSize            | 4 bytes : The size of a physical page (default to 4096)
1995      * +---------------------+
1996      * | NbTree              | 4 bytes : The number of managed B-trees (at least 1)
1997      * +---------------------+
1998      * | FirstFree           | 8 bytes : The offset of the first free page
1999      * +---------------------+
2000      * | current BoB offset  | 8 bytes : The offset of the current B-tree of B-trees
2001      * +---------------------+
2002      * | previous BoB offset | 8 bytes : The offset of the previous B-tree of B-trees
2003      * +---------------------+
2004      * | current CP offset   | 8 bytes : The offset of the current CopiedPages B-tree
2005      * +---------------------+
2006      * | previous CP offset  | 8 bytes : The offset of the previous CopiedPages B-tree
2007      * +---------------------+
2008      * </pre>
2009      */
2010     public void updateRecordManagerHeader()
2011     {
2012         // The page size
2013         int position = writeData( RECORD_MANAGER_HEADER_BYTES, 0, pageSize );
2014 
2015         // The number of managed B-tree
2016         position = writeData( RECORD_MANAGER_HEADER_BYTES, position, nbBtree );
2017 
2018         // The first free page
2019         position = writeData( RECORD_MANAGER_HEADER_BYTES, position, firstFreePage );
2020 
2021         // The offset of the current B-tree of B-trees
2022         position = writeData( RECORD_MANAGER_HEADER_BYTES, position, currentBtreeOfBtreesOffset );
2023 
2024         // The offset of the copied pages B-tree
2025         position = writeData( RECORD_MANAGER_HEADER_BYTES, position, previousBtreeOfBtreesOffset );
2026 
2027         // The offset of the current B-tree of B-trees
2028         position = writeData( RECORD_MANAGER_HEADER_BYTES, position, currentCopiedPagesBtreeOffset );
2029 
2030         // The offset of the copied pages B-tree
2031         position = writeData( RECORD_MANAGER_HEADER_BYTES, position, previousCopiedPagesBtreeOffset );
2032 
2033         // Write the RecordManager header on disk
2034         RECORD_MANAGER_HEADER_BUFFER.put( RECORD_MANAGER_HEADER_BYTES );
2035         RECORD_MANAGER_HEADER_BUFFER.flip();
2036 
2037         LOG.debug( "Update RM header" );
2038 
2039         if ( LOG_PAGES.isDebugEnabled() )
2040         {
2041             StringBuilder sb = new StringBuilder();
2042 
2043             sb.append( "First free page     : 0x" ).append( Long.toHexString( firstFreePage ) ).append( "\n" );
2044             sb.append( "Current BOB header  : 0x" ).append( Long.toHexString( currentBtreeOfBtreesOffset ) ).append( "\n" );
2045             sb.append( "Previous BOB header : 0x" ).append( Long.toHexString( previousBtreeOfBtreesOffset ) ).append( "\n" );
2046             sb.append( "Current CPB header  : 0x" ).append( Long.toHexString( currentCopiedPagesBtreeOffset ) ).append( "\n" );
2047             sb.append( "Previous CPB header : 0x" ).append( Long.toHexString( previousCopiedPagesBtreeOffset ) ).append( "\n" );
2048 
2049             if ( firstFreePage != NO_PAGE )
2050             {
2051                 long freePage = firstFreePage;
2052                 sb.append( "free pages list : " );
2053 
2054                 boolean isFirst = true;
2055 
2056                 while ( freePage != NO_PAGE )
2057                 {
2058                     if ( isFirst )
2059                     {
2060                         isFirst = false;
2061                     }
2062                     else
2063                     {
2064                         sb.append( " -> " );
2065                     }
2066 
2067                     sb.append( "0x" ).append( Long.toHexString( freePage ) );
2068 
2069                     try
2070                     {
2071                         PageIO[] freePageIO = readPageIOs( freePage, 8 );
2072 
2073                         freePage = freePageIO[0].getNextPage();
2074                     }
2075                     catch ( EndOfFileExceededException e )
2076                     {
2077                         // TODO Auto-generated catch block
2078                         e.printStackTrace();
2079                     }
2080                     catch ( IOException e )
2081                     {
2082                         // TODO Auto-generated catch block
2083                         e.printStackTrace();
2084                     }
2085                 }
2086 
2087             }
2088 
2089             LOG_PAGES.debug( "Update RM Header : \n{}", sb.toString() );
2090         }
2091 
2092         try
2093         {
2094 
2095             Integer nbTxnStarted = context.get();
2096 
2097             if ( ( nbTxnStarted == null ) || ( nbTxnStarted <= 1 ) )
2098             {
2099                 //System.out.println( "Writing page at 0000" );
2100                 writeCounter.put( 0L, writeCounter.containsKey( 0L ) ? writeCounter.get( 0L ) + 1 : 1 );
2101                 fileChannel.write( RECORD_MANAGER_HEADER_BUFFER, 0 );
2102             }
2103         }
2104         catch ( IOException ioe )
2105         {
2106             throw new FileException( ioe.getMessage() );
2107         }
2108 
2109         RECORD_MANAGER_HEADER_BUFFER.clear();
2110 
2111         // Reset the old versions
2112         previousBtreeOfBtreesOffset = -1L;
2113         previousCopiedPagesBtreeOffset = -1L;
2114 
2115         nbUpdateRMHeader.incrementAndGet();
2116     }
2117 
2118 
2119     /**
2120      * Update the RecordManager header, injecting the following data :
2121      *
2122      * <pre>
2123      * +---------------------+
2124      * | PageSize            | 4 bytes : The size of a physical page (default to 4096)
2125      * +---------------------+
2126      * | NbTree              | 4 bytes : The number of managed B-trees (at least 1)
2127      * +---------------------+
2128      * | FirstFree           | 8 bytes : The offset of the first free page
2129      * +---------------------+
2130      * | current BoB offset  | 8 bytes : The offset of the current B-tree of B-trees
2131      * +---------------------+
2132      * | previous BoB offset | 8 bytes : The offset of the previous B-tree of B-trees
2133      * +---------------------+
2134      * | current CP offset   | 8 bytes : The offset of the current CopiedPages B-tree
2135      * +---------------------+
2136      * | previous CP offset  | 8 bytes : The offset of the previous CopiedPages B-tree
2137      * +---------------------+
2138      * </pre>
2139      */
2140     public void updateRecordManagerHeader( long newBtreeOfBtreesOffset, long newCopiedPageBtreeOffset )
2141     {
2142         if ( newBtreeOfBtreesOffset != -1L )
2143         {
2144             previousBtreeOfBtreesOffset = currentBtreeOfBtreesOffset;
2145             currentBtreeOfBtreesOffset = newBtreeOfBtreesOffset;
2146         }
2147 
2148         if ( newCopiedPageBtreeOffset != -1L )
2149         {
2150             previousCopiedPagesBtreeOffset = currentCopiedPagesBtreeOffset;
2151             currentCopiedPagesBtreeOffset = newCopiedPageBtreeOffset;
2152         }
2153     }
2154 
2155 
2156     /**
2157      * Inject an int into a byte[] at a given position.
2158      */
2159     private int writeData( byte[] buffer, int position, int value )
2160     {
2161         RECORD_MANAGER_HEADER_BYTES[position] = ( byte ) ( value >>> 24 );
2162         RECORD_MANAGER_HEADER_BYTES[position + 1] = ( byte ) ( value >>> 16 );
2163         RECORD_MANAGER_HEADER_BYTES[position + 2] = ( byte ) ( value >>> 8 );
2164         RECORD_MANAGER_HEADER_BYTES[position + 3] = ( byte ) ( value );
2165 
2166         return position + 4;
2167     }
2168 
2169 
2170     /**
2171      * Inject a long into a byte[] at a given position.
2172      */
2173     private int writeData( byte[] buffer, int position, long value )
2174     {
2175         RECORD_MANAGER_HEADER_BYTES[position] = ( byte ) ( value >>> 56 );
2176         RECORD_MANAGER_HEADER_BYTES[position + 1] = ( byte ) ( value >>> 48 );
2177         RECORD_MANAGER_HEADER_BYTES[position + 2] = ( byte ) ( value >>> 40 );
2178         RECORD_MANAGER_HEADER_BYTES[position + 3] = ( byte ) ( value >>> 32 );
2179         RECORD_MANAGER_HEADER_BYTES[position + 4] = ( byte ) ( value >>> 24 );
2180         RECORD_MANAGER_HEADER_BYTES[position + 5] = ( byte ) ( value >>> 16 );
2181         RECORD_MANAGER_HEADER_BYTES[position + 6] = ( byte ) ( value >>> 8 );
2182         RECORD_MANAGER_HEADER_BYTES[position + 7] = ( byte ) ( value );
2183 
2184         return position + 8;
2185     }
2186 
2187 
2188     /**
2189      * Add a new <btree, revision> tuple into the B-tree of B-trees.
2190      *
2191      * @param name The B-tree name
2192      * @param revision The B-tree revision
2193      * @param btreeHeaderOffset The B-tree offset
2194      * @throws IOException If the update failed
2195      */
2196     /* no qualifier */<K, V> void addInBtreeOfBtrees( String name, long revision, long btreeHeaderOffset )
2197         throws IOException
2198     {
2199         checkOffset( btreeHeaderOffset );
2200         NameRevision nameRevision = new NameRevision( name, revision );
2201 
2202         btreeOfBtrees.insert( nameRevision, btreeHeaderOffset );
2203 
2204         // Update the B-tree of B-trees offset
2205         currentBtreeOfBtreesOffset = getNewBTreeHeader( BTREE_OF_BTREES_NAME ).getBTreeHeaderOffset();
2206     }
2207 
2208 
2209     /**
2210      * Add a new <btree, revision> tuple into the CopiedPages B-tree.
2211      *
2212      * @param name The B-tree name
2213      * @param revision The B-tree revision
2214      * @param btreeHeaderOffset The B-tree offset
2215      * @throws IOException If the update failed
2216      */
2217     /* no qualifier */<K, V> void addInCopiedPagesBtree( String name, long revision, List<Page<K, V>> pages )
2218         throws IOException
2219     {
2220         RevisionName revisionName = new RevisionName( revision, name );
2221 
2222         long[] pageOffsets = new long[pages.size()];
2223         int pos = 0;
2224 
2225         for ( Page<K, V> page : pages )
2226         {
2227             pageOffsets[pos++] = ( ( AbstractPage<K, V> ) page ).getOffset();
2228         }
2229 
2230         copiedPageBtree.insert( revisionName, pageOffsets );
2231 
2232         // Update the CopiedPageBtree offset
2233         currentCopiedPagesBtreeOffset = ( ( AbstractBTree<RevisionName, long[]> ) copiedPageBtree ).getBtreeHeader().getBTreeHeaderOffset();
2234     }
2235 
2236 
2237     /**
2238      * Internal method used to update the B-tree of B-trees offset
2239      * @param btreeOfBtreesOffset The new offset
2240      */
2241     /* no qualifier */void setBtreeOfBtreesOffset( long btreeOfBtreesOffset )
2242     {
2243         checkOffset( btreeOfBtreesOffset );
2244         this.currentBtreeOfBtreesOffset = btreeOfBtreesOffset;
2245     }
2246 
2247 
2248     /**
2249      * Write the B-tree header on disk. We will write the following informations :
2250      * <pre>
2251      * +------------+
2252      * | revision   | The B-tree revision
2253      * +------------+
2254      * | nbElems    | The B-tree number of elements
2255      * +------------+
2256      * | rootPage   | The root page offset
2257      * +------------+
2258      * | BtreeInfo  | The B-tree info offset
2259      * +------------+
2260      * </pre>
2261      * @param btree The B-tree which header has to be written
2262      * @param btreeInfoOffset The offset of the B-tree informations
2263      * @return The B-tree header offset
2264      * @throws IOException If we weren't able to write the B-tree header
2265      */
2266     /* no qualifier */<K, V> long writeBtreeHeader( BTree<K, V> btree, BTreeHeader<K, V> btreeHeader )
2267         throws IOException
2268     {
2269         int bufferSize =
2270             LONG_SIZE + // The revision
2271                 LONG_SIZE + // the number of element
2272                 LONG_SIZE + // The root page offset
2273                 LONG_SIZE; // The B-tree info page offset
2274 
2275         // Get the pageIOs we need to store the data. We may need more than one.
2276         PageIO[] btreeHeaderPageIos = getFreePageIOs( bufferSize );
2277 
2278         // Store the B-tree header Offset into the B-tree
2279         long btreeHeaderOffset = btreeHeaderPageIos[0].getOffset();
2280 
2281         // Now store the B-tree data in the pages :
2282         // - the B-tree revision
2283         // - the B-tree number of elements
2284         // - the B-tree root page offset
2285         // - the B-tree info page offset
2286         // Starts at 0
2287         long position = 0L;
2288 
2289         // The B-tree current revision
2290         position = store( position, btreeHeader.getRevision(), btreeHeaderPageIos );
2291 
2292         // The nb elems in the tree
2293         position = store( position, btreeHeader.getNbElems(), btreeHeaderPageIos );
2294 
2295         // Now, we can inject the B-tree rootPage offset into the B-tree header
2296         position = store( position, btreeHeader.getRootPageOffset(), btreeHeaderPageIos );
2297 
2298         // The B-tree info page offset
2299         position = store( position, ( ( PersistedBTree<K, V> ) btree ).getBtreeInfoOffset(), btreeHeaderPageIos );
2300 
2301         // And flush the pages to disk now
2302         LOG.debug( "Flushing the newly managed '{}' btree header", btree.getName() );
2303 
2304         if ( LOG_PAGES.isDebugEnabled() )
2305         {
2306             LOG_PAGES.debug( "Writing BTreeHeader revision {} for {}", btreeHeader.getRevision(), btree.getName() );
2307             StringBuilder sb = new StringBuilder();
2308 
2309             sb.append( "Offset : " ).append( Long.toHexString( btreeHeaderOffset ) ).append( "\n" );
2310             sb.append( "    Revision : " ).append( btreeHeader.getRevision() ).append( "\n" );
2311             sb.append( "    NbElems  : " ).append( btreeHeader.getNbElems() ).append( "\n" );
2312             sb.append( "    RootPage : 0x" ).append( Long.toHexString( btreeHeader.getRootPageOffset() ) )
2313                 .append( "\n" );
2314             sb.append( "    Info     : 0x" )
2315                 .append( Long.toHexString( ( ( PersistedBTree<K, V> ) btree ).getBtreeInfoOffset() ) ).append( "\n" );
2316 
2317             LOG_PAGES.debug( "Btree Header[{}]\n{}", btreeHeader.getRevision(), sb.toString() );
2318         }
2319 
2320         flushPages( btreeHeaderPageIos );
2321 
2322         btreeHeader.setBTreeHeaderOffset( btreeHeaderOffset );
2323 
2324         return btreeHeaderOffset;
2325     }
2326 
2327 
2328     /**
2329      * Write the B-tree informations on disk. We will write the following informations :
2330      * <pre>
2331      * +------------+
2332      * | pageSize   | The B-tree page size (ie, the number of elements per page max)
2333      * +------------+
2334      * | nameSize   | The B-tree name size
2335      * +------------+
2336      * | name       | The B-tree name
2337      * +------------+
2338      * | keySerSize | The keySerializer FQCN size
2339      * +------------+
2340      * | keySerFQCN | The keySerializer FQCN
2341      * +------------+
2342      * | valSerSize | The Value serializer FQCN size
2343      * +------------+
2344      * | valSerKQCN | The valueSerializer FQCN
2345      * +------------+
2346      * | dups       | The flags that tell if the dups are allowed
2347      * +------------+
2348      * </pre>
2349      * @param btree The B-tree which header has to be written
2350      * @return The B-tree header offset
2351      * @throws IOException If we weren't able to write the B-tree header
2352      */
2353     private <K, V> long writeBtreeInfo( BTree<K, V> btree ) throws IOException
2354     {
2355         // We will add the newly managed B-tree at the end of the header.
2356         byte[] btreeNameBytes = Strings.getBytesUtf8( btree.getName() );
2357         byte[] keySerializerBytes = Strings.getBytesUtf8( btree.getKeySerializerFQCN() );
2358         byte[] valueSerializerBytes = Strings.getBytesUtf8( btree.getValueSerializerFQCN() );
2359 
2360         int bufferSize =
2361             INT_SIZE + // The page size
2362                 INT_SIZE + // The name size
2363                 btreeNameBytes.length + // The name
2364                 INT_SIZE + // The keySerializerBytes size
2365                 keySerializerBytes.length + // The keySerializerBytes
2366                 INT_SIZE + // The valueSerializerBytes size
2367                 valueSerializerBytes.length + // The valueSerializerBytes
2368                 INT_SIZE; // The allowDuplicates flag
2369 
2370         // Get the pageIOs we need to store the data. We may need more than one.
2371         PageIO[] btreeHeaderPageIos = getFreePageIOs( bufferSize );
2372 
2373         // Keep the B-tree header Offset into the B-tree
2374         long btreeInfoOffset = btreeHeaderPageIos[0].getOffset();
2375 
2376         // Now store the B-tree information data in the pages :
2377         // - the B-tree page size
2378         // - the B-tree name
2379         // - the keySerializer FQCN
2380         // - the valueSerializer FQCN
2381         // - the flags that tell if the dups are allowed
2382         // Starts at 0
2383         long position = 0L;
2384 
2385         // The B-tree page size
2386         position = store( position, btree.getPageSize(), btreeHeaderPageIos );
2387 
2388         // The tree name
2389         position = store( position, btreeNameBytes, btreeHeaderPageIos );
2390 
2391         // The keySerializer FQCN
2392         position = store( position, keySerializerBytes, btreeHeaderPageIos );
2393 
2394         // The valueSerialier FQCN
2395         position = store( position, valueSerializerBytes, btreeHeaderPageIos );
2396 
2397         // The allowDuplicates flag
2398         position = store( position, ( btree.isAllowDuplicates() ? 1 : 0 ), btreeHeaderPageIos );
2399 
2400         // And flush the pages to disk now
2401         LOG.debug( "Flushing the newly managed '{}' btree header", btree.getName() );
2402         flushPages( btreeHeaderPageIos );
2403 
2404         return btreeInfoOffset;
2405     }
2406 
2407 
2408     /**
2409      * Update the B-tree header after a B-tree modification. This will make the latest modification
2410      * visible.<br/>
2411      * We update the following fields :
2412      * <ul>
2413      * <li>the revision</li>
2414      * <li>the number of elements</li>
2415      * <li>the B-tree root page offset</li>
2416      * </ul>
2417      * <br/>
2418      * As a result, a new version of the BtreHeader will be created, which will replace the previous
2419      * B-tree header
2420      * @param btree TheB-tree to update
2421      * @param btreeHeaderOffset The offset of the modified btree header
2422      * @return The offset of the new B-tree Header
2423      * @throws IOException If we weren't able to write the file on disk
2424      * @throws EndOfFileExceededException If we tried to write after the end of the file
2425      */
2426     /* no qualifier */<K, V> long updateBtreeHeader( BTree<K, V> btree, long btreeHeaderOffset )
2427         throws EndOfFileExceededException, IOException
2428     {
2429         return updateBtreeHeader( btree, btreeHeaderOffset, false );
2430     }
2431 
2432 
2433     /**
2434      * Update the B-tree header after a B-tree modification. This will make the latest modification
2435      * visible.<br/>
2436      * We update the following fields :
2437      * <ul>
2438      * <li>the revision</li>
2439      * <li>the number of elements</li>
2440      * <li>the reference to the current B-tree revisions</li>
2441      * <li>the reference to the old B-tree revisions</li>
2442      * </ul>
2443      * <br/>
2444      * As a result, we new version of the BtreHeader will be created
2445      * @param btree The B-tree to update
2446      * @param btreeHeaderOffset The offset of the modified btree header
2447      * @return The offset of the new B-tree Header if it has changed (ie, when the onPlace flag is set to true)
2448      * @throws IOException
2449      * @throws EndOfFileExceededException
2450      */
2451     /* no qualifier */<K, V> void updateBtreeHeaderOnPlace( BTree<K, V> btree, long btreeHeaderOffset )
2452         throws EndOfFileExceededException,
2453         IOException
2454     {
2455         updateBtreeHeader( btree, btreeHeaderOffset, true );
2456     }
2457 
2458 
2459     /**
2460      * Update the B-tree header after a B-tree modification. This will make the latest modification
2461      * visible.<br/>
2462      * We update the following fields :
2463      * <ul>
2464      * <li>the revision</li>
2465      * <li>the number of elements</li>
2466      * <li>the reference to the current B-tree revisions</li>
2467      * <li>the reference to the old B-tree revisions</li>
2468      * </ul>
2469      * <br/>
2470      * As a result, a new version of the BtreHeader will be created, which may replace the previous
2471      * B-tree header (if the onPlace flag is set to true) or a new set of pageIos will contain the new
2472      * version.
2473      *
2474      * @param btree The B-tree to update
2475      * @param rootPageOffset The offset of the modified rootPage
2476      * @param onPlace Tells if we modify the B-tree on place, or if we create a copy
2477      * @return The offset of the new B-tree Header if it has changed (ie, when the onPlace flag is set to true)
2478      * @throws EndOfFileExceededException If we tried to write after the end of the file
2479      * @throws IOException If tehre were some error while writing the data on disk
2480      */
2481     private <K, V> long updateBtreeHeader( BTree<K, V> btree, long btreeHeaderOffset, boolean onPlace )
2482         throws EndOfFileExceededException, IOException
2483     {
2484         // Read the pageIOs associated with this B-tree
2485         PageIO[] pageIos;
2486         long newBtreeHeaderOffset = NO_PAGE;
2487         long offset = ( ( PersistedBTree<K, V> ) btree ).getBtreeOffset();
2488 
2489         if ( onPlace )
2490         {
2491             // We just have to update the existing BTreeHeader
2492             long headerSize = LONG_SIZE + LONG_SIZE + LONG_SIZE;
2493 
2494             pageIos = readPageIOs( offset, headerSize );
2495 
2496             // Now, update the revision
2497             long position = 0;
2498 
2499             position = store( position, btree.getRevision(), pageIos );
2500             position = store( position, btree.getNbElems(), pageIos );
2501             position = store( position, btreeHeaderOffset, pageIos );
2502 
2503             // Write the pages on disk
2504             if ( LOG.isDebugEnabled() )
2505             {
2506                 LOG.debug( "-----> Flushing the '{}' B-treeHeader", btree.getName() );
2507                 LOG.debug( "  revision : " + btree.getRevision() + ", NbElems : " + btree.getNbElems()
2508                     + ", btreeHeader offset : 0x"
2509                     + Long.toHexString( btreeHeaderOffset ) );
2510             }
2511 
2512             // Get new place on disk to store the modified BTreeHeader if it's not onPlace
2513             // Rewrite the pages at the same place
2514             LOG.debug( "Rewriting the B-treeHeader on place for B-tree " + btree.getName() );
2515             flushPages( pageIos );
2516         }
2517         else
2518         {
2519             // We have to read and copy the existing BTreeHeader and to create a new one
2520             pageIos = readPageIOs( offset, Long.MAX_VALUE );
2521 
2522             // Now, copy every read page
2523             PageIO[] newPageIOs = new PageIO[pageIos.length];
2524             int pos = 0;
2525 
2526             for ( PageIO pageIo : pageIos )
2527             {
2528                 // Fetch a free page
2529                 newPageIOs[pos] = fetchNewPage();
2530 
2531                 // keep a track of the allocated and copied pages so that we can
2532                 // free them when we do a commit or rollback, if the btree is an management one
2533                 if ( ( btree.getType() == BTreeTypeEnum.BTREE_OF_BTREES )
2534                     || ( btree.getType() == BTreeTypeEnum.COPIED_PAGES_BTREE ) )
2535                 {
2536                     freedPages.add( pageIo );
2537                     allocatedPages.add( newPageIOs[pos] );
2538                 }
2539 
2540                 pageIo.copy( newPageIOs[pos] );
2541 
2542                 if ( pos > 0 )
2543                 {
2544                     newPageIOs[pos - 1].setNextPage( newPageIOs[pos].getOffset() );
2545                 }
2546 
2547                 pos++;
2548             }
2549 
2550             // store the new btree header offset
2551             // and update the revision
2552             long position = 0;
2553 
2554             position = store( position, btree.getRevision(), newPageIOs );
2555             position = store( position, btree.getNbElems(), newPageIOs );
2556             position = store( position, btreeHeaderOffset, newPageIOs );
2557 
2558             // Get new place on disk to store the modified BTreeHeader if it's not onPlace
2559             // Flush the new B-treeHeader on disk
2560             LOG.debug( "Rewriting the B-treeHeader on place for B-tree " + btree.getName() );
2561             flushPages( newPageIOs );
2562 
2563             newBtreeHeaderOffset = newPageIOs[0].getOffset();
2564         }
2565 
2566         nbUpdateBtreeHeader.incrementAndGet();
2567 
2568         if ( LOG_CHECK.isDebugEnabled() )
2569         {
2570             MavibotInspector.check( this );
2571         }
2572 
2573         return newBtreeHeaderOffset;
2574     }
2575 
2576 
2577     /**
2578      * Write the pages on disk, either at the end of the file, or at
2579      * the position they were taken from.
2580      *
2581      * @param pageIos The list of pages to write
2582      * @throws IOException If the write failed
2583      */
2584     private void flushPages( PageIO... pageIos ) throws IOException
2585     {
2586         if ( LOG.isDebugEnabled() )
2587         {
2588             for ( PageIO pageIo : pageIos )
2589             {
2590                 dump( pageIo );
2591             }
2592         }
2593 
2594         for ( PageIO pageIo : pageIos )
2595         {
2596             pageIo.getData().rewind();
2597             long pos = pageIo.getOffset();
2598 
2599             if ( fileChannel.size() < ( pageIo.getOffset() + pageSize ) )
2600             {
2601                 LOG.debug( "Adding a page at the end of the file" );
2602                 // This is a page we have to add to the file
2603                 pos = fileChannel.size();
2604                 fileChannel.write( pageIo.getData(), pos );
2605                 //fileChannel.force( false );
2606             }
2607             else
2608             {
2609                 LOG.debug( "Writing a page at position {}", pageIo.getOffset() );
2610                 fileChannel.write( pageIo.getData(), pageIo.getOffset() );
2611                 //fileChannel.force( false );
2612             }
2613 
2614             //System.out.println( "Writing page at " + Long.toHexString( pos ) );
2615             writeCounter.put( pos, writeCounter.containsKey( pos ) ? writeCounter.get( pos ) + 1 : 1 );
2616 
2617             nbUpdatePageIOs.incrementAndGet();
2618 
2619             pageIo.getData().rewind();
2620         }
2621     }
2622 
2623 
2624     /**
2625      * Compute the page in which we will store data given an offset, when
2626      * we have a list of pages.
2627      *
2628      * @param offset The position in the data
2629      * @return The page number in which the offset will start
2630      */
2631     private int computePageNb( long offset )
2632     {
2633         long pageNb = 0;
2634 
2635         offset -= pageSize - LINK_SIZE - PAGE_SIZE;
2636 
2637         if ( offset < 0 )
2638         {
2639             return ( int ) pageNb;
2640         }
2641 
2642         pageNb = 1 + offset / ( pageSize - LINK_SIZE );
2643 
2644         return ( int ) pageNb;
2645     }
2646 
2647 
2648     /**
2649      * Stores a byte[] into one ore more pageIO (depending if the long is stored
2650      * across a boundary or not)
2651      *
2652      * @param position The position in a virtual byte[] if all the pages were contiguous
2653      * @param bytes The byte[] to serialize
2654      * @param pageIos The pageIOs we have to store the data in
2655      * @return The new offset
2656      */
2657     private long store( long position, byte[] bytes, PageIO... pageIos )
2658     {
2659         if ( bytes != null )
2660         {
2661             // Write the bytes length
2662             position = store( position, bytes.length, pageIos );
2663 
2664             // Compute the page in which we will store the data given the
2665             // current position
2666             int pageNb = computePageNb( position );
2667 
2668             // Get back the buffer in this page
2669             ByteBuffer pageData = pageIos[pageNb].getData();
2670 
2671             // Compute the position in the current page
2672             int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize;
2673 
2674             // Compute the remaining size in the page
2675             int remaining = pageData.capacity() - pagePos;
2676             int nbStored = bytes.length;
2677 
2678             // And now, write the bytes until we have none
2679             while ( nbStored > 0 )
2680             {
2681                 if ( remaining > nbStored )
2682                 {
2683                     pageData.mark();
2684                     pageData.position( pagePos );
2685                     pageData.put( bytes, bytes.length - nbStored, nbStored );
2686                     pageData.reset();
2687                     nbStored = 0;
2688                 }
2689                 else
2690                 {
2691                     pageData.mark();
2692                     pageData.position( pagePos );
2693                     pageData.put( bytes, bytes.length - nbStored, remaining );
2694                     pageData.reset();
2695                     pageNb++;
2696                     pageData = pageIos[pageNb].getData();
2697                     pagePos = LINK_SIZE;
2698                     nbStored -= remaining;
2699                     remaining = pageData.capacity() - pagePos;
2700                 }
2701             }
2702 
2703             // We are done
2704             position += bytes.length;
2705         }
2706         else
2707         {
2708             // No bytes : write 0 and return
2709             position = store( position, 0, pageIos );
2710         }
2711 
2712         return position;
2713     }
2714 
2715 
2716     /**
2717      * Stores a byte[] into one ore more pageIO (depending if the long is stored
2718      * across a boundary or not). We don't add the byte[] size, it's already present
2719      * in the received byte[].
2720      *
2721      * @param position The position in a virtual byte[] if all the pages were contiguous
2722      * @param bytes The byte[] to serialize
2723      * @param pageIos The pageIOs we have to store the data in
2724      * @return The new offset
2725      */
2726     private long storeRaw( long position, byte[] bytes, PageIO... pageIos )
2727     {
2728         if ( bytes != null )
2729         {
2730             // Compute the page in which we will store the data given the
2731             // current position
2732             int pageNb = computePageNb( position );
2733 
2734             // Get back the buffer in this page
2735             ByteBuffer pageData = pageIos[pageNb].getData();
2736 
2737             // Compute the position in the current page
2738             int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize;
2739 
2740             // Compute the remaining size in the page
2741             int remaining = pageData.capacity() - pagePos;
2742             int nbStored = bytes.length;
2743 
2744             // And now, write the bytes until we have none
2745             while ( nbStored > 0 )
2746             {
2747                 if ( remaining > nbStored )
2748                 {
2749                     pageData.mark();
2750                     pageData.position( pagePos );
2751                     pageData.put( bytes, bytes.length - nbStored, nbStored );
2752                     pageData.reset();
2753                     nbStored = 0;
2754                 }
2755                 else
2756                 {
2757                     pageData.mark();
2758                     pageData.position( pagePos );
2759                     pageData.put( bytes, bytes.length - nbStored, remaining );
2760                     pageData.reset();
2761                     pageNb++;
2762 
2763                     if ( pageNb == pageIos.length )
2764                     {
2765                         // We can stop here : we have reach the end of the page
2766                         break;
2767                     }
2768 
2769                     pageData = pageIos[pageNb].getData();
2770                     pagePos = LINK_SIZE;
2771                     nbStored -= remaining;
2772                     remaining = pageData.capacity() - pagePos;
2773                 }
2774             }
2775 
2776             // We are done
2777             position += bytes.length;
2778         }
2779         else
2780         {
2781             // No bytes : write 0 and return
2782             position = store( position, 0, pageIos );
2783         }
2784 
2785         return position;
2786     }
2787 
2788 
2789     /**
2790      * Stores an Integer into one ore more pageIO (depending if the int is stored
2791      * across a boundary or not)
2792      *
2793      * @param position The position in a virtual byte[] if all the pages were contiguous
2794      * @param value The int to serialize
2795      * @param pageIos The pageIOs we have to store the data in
2796      * @return The new offset
2797      */
2798     private long store( long position, int value, PageIO... pageIos )
2799     {
2800         // Compute the page in which we will store the data given the
2801         // current position
2802         int pageNb = computePageNb( position );
2803 
2804         // Compute the position in the current page
2805         int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize;
2806 
2807         // Get back the buffer in this page
2808         ByteBuffer pageData = pageIos[pageNb].getData();
2809 
2810         // Compute the remaining size in the page
2811         int remaining = pageData.capacity() - pagePos;
2812 
2813         if ( remaining < INT_SIZE )
2814         {
2815             // We have to copy the serialized length on two pages
2816 
2817             switch ( remaining )
2818             {
2819                 case 3:
2820                     pageData.put( pagePos + 2, ( byte ) ( value >>> 8 ) );
2821                     // Fallthrough !!!
2822 
2823                 case 2:
2824                     pageData.put( pagePos + 1, ( byte ) ( value >>> 16 ) );
2825                     // Fallthrough !!!
2826 
2827                 case 1:
2828                     pageData.put( pagePos, ( byte ) ( value >>> 24 ) );
2829                     break;
2830             }
2831 
2832             // Now deal with the next page
2833             pageData = pageIos[pageNb + 1].getData();
2834             pagePos = LINK_SIZE;
2835 
2836             switch ( remaining )
2837             {
2838                 case 1:
2839                     pageData.put( pagePos, ( byte ) ( value >>> 16 ) );
2840                     // fallthrough !!!
2841 
2842                 case 2:
2843                     pageData.put( pagePos + 2 - remaining, ( byte ) ( value >>> 8 ) );
2844                     // fallthrough !!!
2845 
2846                 case 3:
2847                     pageData.put( pagePos + 3 - remaining, ( byte ) ( value ) );
2848                     break;
2849             }
2850         }
2851         else
2852         {
2853             // Store the value in the page at the selected position
2854             pageData.putInt( pagePos, value );
2855         }
2856 
2857         // Increment the position to reflect the addition of an Int (4 bytes)
2858         position += INT_SIZE;
2859 
2860         return position;
2861     }
2862 
2863 
2864     /**
2865      * Stores a Long into one ore more pageIO (depending if the long is stored
2866      * across a boundary or not)
2867      *
2868      * @param position The position in a virtual byte[] if all the pages were contiguous
2869      * @param value The long to serialize
2870      * @param pageIos The pageIOs we have to store the data in
2871      * @return The new offset
2872      */
2873     private long store( long position, long value, PageIO... pageIos )
2874     {
2875         // Compute the page in which we will store the data given the
2876         // current position
2877         int pageNb = computePageNb( position );
2878 
2879         // Compute the position in the current page
2880         int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize;
2881 
2882         // Get back the buffer in this page
2883         ByteBuffer pageData = pageIos[pageNb].getData();
2884 
2885         // Compute the remaining size in the page
2886         int remaining = pageData.capacity() - pagePos;
2887 
2888         if ( remaining < LONG_SIZE )
2889         {
2890             // We have to copy the serialized length on two pages
2891 
2892             switch ( remaining )
2893             {
2894                 case 7:
2895                     pageData.put( pagePos + 6, ( byte ) ( value >>> 8 ) );
2896                     // Fallthrough !!!
2897 
2898                 case 6:
2899                     pageData.put( pagePos + 5, ( byte ) ( value >>> 16 ) );
2900                     // Fallthrough !!!
2901 
2902                 case 5:
2903                     pageData.put( pagePos + 4, ( byte ) ( value >>> 24 ) );
2904                     // Fallthrough !!!
2905 
2906                 case 4:
2907                     pageData.put( pagePos + 3, ( byte ) ( value >>> 32 ) );
2908                     // Fallthrough !!!
2909 
2910                 case 3:
2911                     pageData.put( pagePos + 2, ( byte ) ( value >>> 40 ) );
2912                     // Fallthrough !!!
2913 
2914                 case 2:
2915                     pageData.put( pagePos + 1, ( byte ) ( value >>> 48 ) );
2916                     // Fallthrough !!!
2917 
2918                 case 1:
2919                     pageData.put( pagePos, ( byte ) ( value >>> 56 ) );
2920                     break;
2921             }
2922 
2923             // Now deal with the next page
2924             pageData = pageIos[pageNb + 1].getData();
2925             pagePos = LINK_SIZE;
2926 
2927             switch ( remaining )
2928             {
2929                 case 1:
2930                     pageData.put( pagePos, ( byte ) ( value >>> 48 ) );
2931                     // fallthrough !!!
2932 
2933                 case 2:
2934                     pageData.put( pagePos + 2 - remaining, ( byte ) ( value >>> 40 ) );
2935                     // fallthrough !!!
2936 
2937                 case 3:
2938                     pageData.put( pagePos + 3 - remaining, ( byte ) ( value >>> 32 ) );
2939                     // fallthrough !!!
2940 
2941                 case 4:
2942                     pageData.put( pagePos + 4 - remaining, ( byte ) ( value >>> 24 ) );
2943                     // fallthrough !!!
2944 
2945                 case 5:
2946                     pageData.put( pagePos + 5 - remaining, ( byte ) ( value >>> 16 ) );
2947                     // fallthrough !!!
2948 
2949                 case 6:
2950                     pageData.put( pagePos + 6 - remaining, ( byte ) ( value >>> 8 ) );
2951                     // fallthrough !!!
2952 
2953                 case 7:
2954                     pageData.put( pagePos + 7 - remaining, ( byte ) ( value ) );
2955                     break;
2956             }
2957         }
2958         else
2959         {
2960             // Store the value in the page at the selected position
2961             pageData.putLong( pagePos, value );
2962         }
2963 
2964         // Increment the position to reflect the addition of an Long (8 bytes)
2965         position += LONG_SIZE;
2966 
2967         return position;
2968     }
2969 
2970 
2971     /**
2972      * Write the page in a serialized form.
2973      *
2974      * @param btree The persistedBtree we will create a new PageHolder for
2975      * @param newPage The page to write on disk
2976      * @param newRevision The page's revision
2977      * @return A PageHolder containing the copied page
2978      * @throws IOException If the page can't be written on disk
2979      */
2980     /* No qualifier*/<K, V> PageHolder<K, V> writePage( BTree<K, V> btree, Page<K, V> newPage,
2981         long newRevision ) throws IOException
2982     {
2983         // We first need to save the new page on disk
2984         PageIO[] pageIos = serializePage( btree, newRevision, newPage );
2985 
2986         if ( LOG_PAGES.isDebugEnabled() )
2987         {
2988             LOG_PAGES.debug( "Write data for '{}' btree", btree.getName() );
2989 
2990             logPageIos( pageIos );
2991         }
2992 
2993         // Write the page on disk
2994         flushPages( pageIos );
2995 
2996         // Build the resulting reference
2997         long offset = pageIos[0].getOffset();
2998         long lastOffset = pageIos[pageIos.length - 1].getOffset();
2999         PersistedPageHolder<K, V> pageHolder = new PersistedPageHolder<K, V>( btree, newPage, offset,
3000             lastOffset );
3001 
3002         return pageHolder;
3003     }
3004 
3005 
3006     /* No qualifier */static void logPageIos( PageIO[] pageIos )
3007     {
3008         int pageNb = 0;
3009 
3010         for ( PageIO pageIo : pageIos )
3011         {
3012             StringBuilder sb = new StringBuilder();
3013             sb.append( "PageIO[" ).append( pageNb ).append( "]:0x" );
3014             sb.append( Long.toHexString( pageIo.getOffset() ) ).append( "/" );
3015             sb.append( pageIo.getSize() );
3016             pageNb++;
3017 
3018             ByteBuffer data = pageIo.getData();
3019 
3020             int position = data.position();
3021             int dataLength = ( int ) pageIo.getSize() + 12;
3022 
3023             if ( dataLength > data.limit() )
3024             {
3025                 dataLength = data.limit();
3026             }
3027 
3028             byte[] bytes = new byte[dataLength];
3029 
3030             data.get( bytes );
3031             data.position( position );
3032             int pos = 0;
3033 
3034             for ( byte b : bytes )
3035             {
3036                 int mod = pos % 16;
3037 
3038                 switch ( mod )
3039                 {
3040                     case 0:
3041                         sb.append( "\n    " );
3042                         // No break
3043                     case 4:
3044                     case 8:
3045                     case 12:
3046                         sb.append( " " );
3047                     case 1:
3048                     case 2:
3049                     case 3:
3050                     case 5:
3051                     case 6:
3052                     case 7:
3053                     case 9:
3054                     case 10:
3055                     case 11:
3056                     case 13:
3057                     case 14:
3058                     case 15:
3059                         sb.append( Strings.dumpByte( b ) ).append( " " );
3060                 }
3061                 pos++;
3062             }
3063 
3064             LOG_PAGES.debug( sb.toString() );
3065         }
3066     }
3067 
3068 
3069     /**
3070      * Compute the number of pages needed to store some specific size of data.
3071      *
3072      * @param dataSize The size of the data we want to store in pages
3073      * @return The number of pages needed
3074      */
3075     private int computeNbPages( int dataSize )
3076     {
3077         if ( dataSize <= 0 )
3078         {
3079             return 0;
3080         }
3081 
3082         // Compute the number of pages needed.
3083         // Considering that each page can contain PageSize bytes,
3084         // but that the first 8 bytes are used for links and we
3085         // use 4 bytes to store the data size, the number of needed
3086         // pages is :
3087         // NbPages = ( (dataSize - (PageSize - 8 - 4 )) / (PageSize - 8) ) + 1
3088         // NbPages += ( if (dataSize - (PageSize - 8 - 4 )) % (PageSize - 8) > 0 : 1 : 0 )
3089         int availableSize = ( pageSize - LONG_SIZE );
3090         int nbNeededPages = 1;
3091 
3092         // Compute the number of pages that will be full but the first page
3093         if ( dataSize > availableSize - INT_SIZE )
3094         {
3095             int remainingSize = dataSize - ( availableSize - INT_SIZE );
3096             nbNeededPages += remainingSize / availableSize;
3097             int remain = remainingSize % availableSize;
3098 
3099             if ( remain > 0 )
3100             {
3101                 nbNeededPages++;
3102             }
3103         }
3104 
3105         return nbNeededPages;
3106     }
3107 
3108 
3109     /**
3110      * Get as many pages as needed to store the data of the given size. The returned
3111      * PageIOs are all linked together.
3112      *
3113      * @param dataSize The data size
3114      * @return An array of pages, enough to store the full data
3115      */
3116     private PageIO[] getFreePageIOs( int dataSize ) throws IOException
3117     {
3118         if ( dataSize == 0 )
3119         {
3120             return new PageIO[]
3121                 {};
3122         }
3123 
3124         int nbNeededPages = computeNbPages( dataSize );
3125 
3126         PageIO[] pageIOs = new PageIO[nbNeededPages];
3127 
3128         // The first page : set the size
3129         pageIOs[0] = fetchNewPage();
3130         pageIOs[0].setSize( dataSize );
3131 
3132         for ( int i = 1; i < nbNeededPages; i++ )
3133         {
3134             pageIOs[i] = fetchNewPage();
3135 
3136             // Create the link
3137             pageIOs[i - 1].setNextPage( pageIOs[i].getOffset() );
3138         }
3139 
3140         return pageIOs;
3141     }
3142 
3143 
3144     /**
3145      * Return a new Page. We take one of the existing free pages, or we create
3146      * a new page at the end of the file.
3147      *
3148      * @return The fetched PageIO
3149      */
3150     private PageIO fetchNewPage() throws IOException
3151     {
3152         //System.out.println( "Fetching new page" );
3153         if ( firstFreePage == NO_PAGE )
3154         {
3155             nbCreatedPages.incrementAndGet();
3156 
3157             // We don't have any free page. Reclaim some new page at the end
3158             // of the file
3159             PageIO newPage = new PageIO( endOfFileOffset );
3160 
3161             endOfFileOffset += pageSize;
3162 
3163             ByteBuffer data = ByteBuffer.allocateDirect( pageSize );
3164 
3165             newPage.setData( data );
3166             newPage.setNextPage( NO_PAGE );
3167             newPage.setSize( 0 );
3168 
3169             LOG.debug( "Requiring a new page at offset {}", newPage.getOffset() );
3170 
3171             return newPage;
3172         }
3173         else
3174         {
3175             nbReusedPages.incrementAndGet();
3176 
3177             freePageLock.lock();
3178 
3179             // We have some existing free page. Fetch it from disk
3180             PageIO pageIo = fetchPage( firstFreePage );
3181 
3182             // Update the firstFreePage pointer
3183             firstFreePage = pageIo.getNextPage();
3184 
3185             freePageLock.unlock();
3186 
3187             // overwrite the data of old page
3188             ByteBuffer data = ByteBuffer.allocateDirect( pageSize );
3189             pageIo.setData( data );
3190 
3191             pageIo.setNextPage( NO_PAGE );
3192             pageIo.setSize( 0 );
3193 
3194             LOG.debug( "Reused page at offset {}", pageIo.getOffset() );
3195 
3196             return pageIo;
3197         }
3198     }
3199 
3200 
3201     /**
3202      * fetch a page from disk, knowing its position in the file.
3203      *
3204      * @param offset The position in the file
3205      * @return The found page
3206      */
3207     /* no qualifier */PageIO fetchPage( long offset ) throws IOException, EndOfFileExceededException
3208     {
3209         checkOffset( offset );
3210 
3211         if ( fileChannel.size() < offset + pageSize )
3212         {
3213             // Error : we are past the end of the file
3214             throw new EndOfFileExceededException( "We are fetching a page on " + offset +
3215                 " when the file's size is " + fileChannel.size() );
3216         }
3217         else
3218         {
3219             // Read the page
3220             fileChannel.position( offset );
3221 
3222             ByteBuffer data = ByteBuffer.allocate( pageSize );
3223             fileChannel.read( data );
3224             data.rewind();
3225 
3226             PageIO readPage = new PageIO( offset );
3227             readPage.setData( data );
3228 
3229             return readPage;
3230         }
3231     }
3232 
3233 
3234     /**
3235      * @return the pageSize
3236      */
3237     public int getPageSize()
3238     {
3239         return pageSize;
3240     }
3241 
3242 
3243     /**
3244      * Set the page size, ie the number of bytes a page can store.
3245      *
3246      * @param pageSize The number of bytes for a page
3247      */
3248     /* no qualifier */void setPageSize( int pageSize )
3249     {
3250         if ( this.pageSize >= 13 )
3251         {
3252             this.pageSize = pageSize;
3253         }
3254         else
3255         {
3256             this.pageSize = DEFAULT_PAGE_SIZE;
3257         }
3258     }
3259 
3260 
3261     /**
3262      * Close the RecordManager and flush everything on disk
3263      */
3264     public void close() throws IOException
3265     {
3266         beginTransaction();
3267 
3268         // Close all the managed B-trees
3269         for ( BTree<Object, Object> tree : managedBtrees.values() )
3270         {
3271             tree.close();
3272         }
3273 
3274         // Close the management B-trees
3275         copiedPageBtree.close();
3276         btreeOfBtrees.close();
3277 
3278         managedBtrees.clear();
3279 
3280         // Write the data
3281         fileChannel.force( true );
3282 
3283         // And close the channel
3284         fileChannel.close();
3285 
3286         commit();
3287     }
3288 
3289     /** Hex chars */
3290     private static final byte[] HEX_CHAR = new byte[]
3291         { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F' };
3292 
3293 
3294     public static String dump( byte octet )
3295     {
3296         return new String( new byte[]
3297             { HEX_CHAR[( octet & 0x00F0 ) >> 4], HEX_CHAR[octet & 0x000F] } );
3298     }
3299 
3300 
3301     /**
3302      * Dump a pageIO
3303      */
3304     private void dump( PageIO pageIo )
3305     {
3306         ByteBuffer buffer = pageIo.getData();
3307         buffer.mark();
3308         byte[] longBuffer = new byte[LONG_SIZE];
3309         byte[] intBuffer = new byte[INT_SIZE];
3310 
3311         // get the next page offset
3312         buffer.get( longBuffer );
3313         long nextOffset = LongSerializer.deserialize( longBuffer );
3314 
3315         // Get the data size
3316         buffer.get( intBuffer );
3317         int size = IntSerializer.deserialize( intBuffer );
3318 
3319         buffer.reset();
3320 
3321         System.out.println( "PageIO[" + Long.toHexString( pageIo.getOffset() ) + "], size = " + size + ", NEXT PageIO:"
3322             + Long.toHexString( nextOffset ) );
3323         System.out.println( " 0  1  2  3  4  5  6  7  8  9  A  B  C  D  E  F " );
3324         System.out.println( "+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+" );
3325 
3326         for ( int i = 0; i < buffer.limit(); i += 16 )
3327         {
3328             System.out.print( "|" );
3329 
3330             for ( int j = 0; j < 16; j++ )
3331             {
3332                 System.out.print( dump( buffer.get() ) );
3333 
3334                 if ( j == 15 )
3335                 {
3336                     System.out.println( "|" );
3337                 }
3338                 else
3339                 {
3340                     System.out.print( " " );
3341                 }
3342             }
3343         }
3344 
3345         System.out.println( "+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+" );
3346 
3347         buffer.reset();
3348     }
3349 
3350 
3351     /**
3352      * Dump the RecordManager file
3353      * @throws IOException
3354      */
3355     public void dump()
3356     {
3357         System.out.println( "/---------------------------- Dump ----------------------------\\" );
3358 
3359         try
3360         {
3361             RandomAccessFile randomFile = new RandomAccessFile( file, "r" );
3362             FileChannel fileChannel = randomFile.getChannel();
3363 
3364             ByteBuffer recordManagerHeader = ByteBuffer.allocate( RECORD_MANAGER_HEADER_SIZE );
3365 
3366             // load the RecordManager header
3367             fileChannel.read( recordManagerHeader );
3368 
3369             recordManagerHeader.rewind();
3370 
3371             // The page size
3372             long fileSize = fileChannel.size();
3373             int pageSize = recordManagerHeader.getInt();
3374             long nbPages = fileSize / pageSize;
3375 
3376             // The number of managed B-trees
3377             int nbBtree = recordManagerHeader.getInt();
3378 
3379             // The first free page
3380             long firstFreePage = recordManagerHeader.getLong();
3381 
3382             // The current B-tree of B-trees
3383             long currentBtreeOfBtreesPage = recordManagerHeader.getLong();
3384 
3385             // The previous B-tree of B-trees
3386             long previousBtreeOfBtreesPage = recordManagerHeader.getLong();
3387 
3388             // The current CopiedPages B-tree
3389             long currentCopiedPagesBtreePage = recordManagerHeader.getLong();
3390 
3391             // The previous CopiedPages B-tree
3392             long previousCopiedPagesBtreePage = recordManagerHeader.getLong();
3393 
3394             System.out.println( "  RecordManager" );
3395             System.out.println( "  -------------" );
3396             System.out.println( "  Size = 0x" + Long.toHexString( fileSize ) );
3397             System.out.println( "  NbPages = " + nbPages );
3398             System.out.println( "    Header " );
3399             System.out.println( "      page size : " + pageSize );
3400             System.out.println( "      nbTree : " + nbBtree );
3401             System.out.println( "      firstFreePage : 0x" + Long.toHexString( firstFreePage ) );
3402             System.out.println( "      current BOB : 0x" + Long.toHexString( currentBtreeOfBtreesPage ) );
3403             System.out.println( "      previous BOB : 0x" + Long.toHexString( previousBtreeOfBtreesPage ) );
3404             System.out.println( "      current CopiedPages : 0x" + Long.toHexString( currentCopiedPagesBtreePage ) );
3405             System.out.println( "      previous CopiedPages : 0x" + Long.toHexString( previousCopiedPagesBtreePage ) );
3406 
3407             // Dump the Free pages list
3408             dumpFreePages( firstFreePage );
3409 
3410             // Dump the B-tree of B-trees
3411             dumpBtreeHeader( currentBtreeOfBtreesPage );
3412 
3413             // Dump the previous B-tree of B-trees if any
3414             if ( previousBtreeOfBtreesPage != NO_PAGE )
3415             {
3416                 dumpBtreeHeader( previousBtreeOfBtreesPage );
3417             }
3418 
3419             // Dump the CopiedPages B-tree
3420             dumpBtreeHeader( currentCopiedPagesBtreePage );
3421 
3422             // Dump the previous B-tree of B-trees if any
3423             if ( previousCopiedPagesBtreePage != NO_PAGE )
3424             {
3425                 dumpBtreeHeader( previousCopiedPagesBtreePage );
3426             }
3427 
3428             // Dump all the user's B-tree
3429             randomFile.close();
3430             System.out.println( "\\---------------------------- Dump ----------------------------/" );
3431         }
3432         catch ( IOException ioe )
3433         {
3434             System.out.println( "Exception while dumping the file : " + ioe.getMessage() );
3435         }
3436     }
3437 
3438 
3439     /**
3440      * Dump the free pages
3441      */
3442     private void dumpFreePages( long freePageOffset ) throws EndOfFileExceededException, IOException
3443     {
3444         System.out.println( "\n  FreePages : " );
3445         int pageNb = 1;
3446 
3447         while ( freePageOffset != NO_PAGE )
3448         {
3449             PageIO pageIo = fetchPage( freePageOffset );
3450 
3451             System.out.println( "    freePage[" + pageNb + "] : 0x" + Long.toHexString( pageIo.getOffset() ) );
3452 
3453             freePageOffset = pageIo.getNextPage();
3454             pageNb++;
3455         }
3456     }
3457 
3458 
3459     /**
3460      * Dump a B-tree Header
3461      */
3462     private long dumpBtreeHeader( long btreeOffset ) throws EndOfFileExceededException, IOException
3463     {
3464         // First read the B-tree header
3465         PageIO[] pageIos = readPageIOs( btreeOffset, Long.MAX_VALUE );
3466 
3467         long dataPos = 0L;
3468 
3469         // The B-tree current revision
3470         long revision = readLong( pageIos, dataPos );
3471         dataPos += LONG_SIZE;
3472 
3473         // The nb elems in the tree
3474         long nbElems = readLong( pageIos, dataPos );
3475         dataPos += LONG_SIZE;
3476 
3477         // The B-tree rootPage offset
3478         long rootPageOffset = readLong( pageIos, dataPos );
3479         dataPos += LONG_SIZE;
3480 
3481         // The B-tree page size
3482         int btreePageSize = readInt( pageIos, dataPos );
3483         dataPos += INT_SIZE;
3484 
3485         // The tree name
3486         ByteBuffer btreeNameBytes = readBytes( pageIos, dataPos );
3487         dataPos += INT_SIZE + btreeNameBytes.limit();
3488         String btreeName = Strings.utf8ToString( btreeNameBytes );
3489 
3490         // The keySerializer FQCN
3491         ByteBuffer keySerializerBytes = readBytes( pageIos, dataPos );
3492         dataPos += INT_SIZE + keySerializerBytes.limit();
3493 
3494         String keySerializerFqcn = "";
3495 
3496         if ( keySerializerBytes != null )
3497         {
3498             keySerializerFqcn = Strings.utf8ToString( keySerializerBytes );
3499         }
3500 
3501         // The valueSerialier FQCN
3502         ByteBuffer valueSerializerBytes = readBytes( pageIos, dataPos );
3503 
3504         String valueSerializerFqcn = "";
3505         dataPos += INT_SIZE + valueSerializerBytes.limit();
3506 
3507         if ( valueSerializerBytes != null )
3508         {
3509             valueSerializerFqcn = Strings.utf8ToString( valueSerializerBytes );
3510         }
3511 
3512         // The B-tree allowDuplicates flag
3513         int allowDuplicates = readInt( pageIos, dataPos );
3514         boolean dupsAllowed = allowDuplicates != 0;
3515 
3516         dataPos += INT_SIZE;
3517 
3518         //        System.out.println( "\n  B-Tree " + btreeName );
3519         //        System.out.println( "  ------------------------- " );
3520 
3521         //        System.out.println( "    nbPageIOs[" + pageIos.length + "] = " + pageIoList );
3522         if ( LOG.isDebugEnabled() )
3523         {
3524             StringBuilder sb = new StringBuilder();
3525             boolean isFirst = true;
3526 
3527             for ( PageIO pageIo : pageIos )
3528             {
3529                 if ( isFirst )
3530                 {
3531                     isFirst = false;
3532                 }
3533                 else
3534                 {
3535                     sb.append( ", " );
3536                 }
3537 
3538                 sb.append( "0x" ).append( Long.toHexString( pageIo.getOffset() ) );
3539             }
3540 
3541             String pageIoList = sb.toString();
3542 
3543             LOG.debug( "    PageIOs[{}] = {}", pageIos.length, pageIoList );
3544 
3545             //        System.out.println( "    dataSize = "+ pageIos[0].getSize() );
3546             LOG.debug( "    dataSize = {}", pageIos[0].getSize() );
3547 
3548             LOG.debug( "    B-tree '{}'", btreeName );
3549             LOG.debug( "    revision : {}", revision );
3550             LOG.debug( "    nbElems : {}", nbElems );
3551             LOG.debug( "    rootPageOffset : 0x{}", Long.toHexString( rootPageOffset ) );
3552             LOG.debug( "    B-tree page size : {}", btreePageSize );
3553             LOG.debug( "    keySerializer : '{}'", keySerializerFqcn );
3554             LOG.debug( "    valueSerializer : '{}'", valueSerializerFqcn );
3555             LOG.debug( "    dups allowed : {}", dupsAllowed );
3556             //
3557             //        System.out.println( "    B-tree '" + btreeName + "'" );
3558             //        System.out.println( "    revision : " + revision );
3559             //        System.out.println( "    nbElems : " + nbElems );
3560             //        System.out.println( "    rootPageOffset : 0x" + Long.toHexString( rootPageOffset ) );
3561             //        System.out.println( "    B-tree page size : " + btreePageSize );
3562             //        System.out.println( "    keySerializer : " + keySerializerFqcn );
3563             //        System.out.println( "    valueSerializer : " + valueSerializerFqcn );
3564             //        System.out.println( "    dups allowed : " + dupsAllowed );
3565         }
3566 
3567         return rootPageOffset;
3568     }
3569 
3570 
3571     /**
3572      * Get the number of managed trees. We don't count the CopiedPage B-tree and the B-tree of B-trees
3573      *
3574      * @return The number of managed B-trees
3575      */
3576     public int getNbManagedTrees()
3577     {
3578         return nbBtree;
3579     }
3580 
3581 
3582     /**
3583      * Get the managed B-trees. We don't return the CopiedPage B-tree nor the B-tree of B-trees.
3584      *
3585      * @return The managed B-trees
3586      */
3587     public Set<String> getManagedTrees()
3588     {
3589         Set<String> btrees = new HashSet<String>( managedBtrees.keySet() );
3590 
3591         return btrees;
3592     }
3593 
3594 
3595     /**
3596      * Stores the copied pages into the CopiedPages B-tree
3597      *
3598      * @param name The B-tree name
3599      * @param revision The revision
3600      * @param copiedPages The pages that have been copied while creating this revision
3601      * @throws IOException If we weren't able to store the data on disk
3602      */
3603     /* No Qualifier */void storeCopiedPages( String name, long revision, long[] copiedPages ) throws IOException
3604     {
3605         RevisionName revisionName = new RevisionName( revision, name );
3606 
3607         copiedPageBtree.insert( revisionName, copiedPages );
3608     }
3609 
3610 
3611     /**
3612      * Store a reference to an old rootPage into the Revision B-tree
3613      *
3614      * @param btree The B-tree we want to keep an old RootPage for
3615      * @param rootPage The old rootPage
3616      * @throws IOException If we have an issue while writing on disk
3617      */
3618     /* No qualifier */<K, V> void storeRootPage( BTree<K, V> btree, Page<K, V> rootPage ) throws IOException
3619     {
3620         if ( !isKeepRevisions() )
3621         {
3622             return;
3623         }
3624 
3625         if ( btree == copiedPageBtree )
3626         {
3627             return;
3628         }
3629 
3630         NameRevision nameRevision = new NameRevision( btree.getName(), rootPage.getRevision() );
3631 
3632         ( ( AbstractBTree<NameRevision, Long> ) btreeOfBtrees ).insert( nameRevision,
3633             ( ( AbstractPage<K, V> ) rootPage ).getOffset(), 0 );
3634 
3635         if ( LOG_CHECK.isDebugEnabled() )
3636         {
3637             MavibotInspector.check( this );
3638         }
3639     }
3640 
3641 
3642     /**
3643      * Fetch the rootPage of a given B-tree for a given revision.
3644      *
3645      * @param btree The B-tree we are interested in
3646      * @param revision The revision we want to get back
3647      * @return The rootPage for this B-tree and this revision, if any
3648      * @throws KeyNotFoundException If we can't find the rootPage for this revision and this B-tree
3649      * @throws IOException If we had an ise while accessing the data on disk
3650      */
3651     /* No qualifier */<K, V> Page<K, V> getRootPage( BTree<K, V> btree, long revision ) throws KeyNotFoundException,
3652         IOException
3653     {
3654         if ( btree.getRevision() == revision )
3655         {
3656             // We are asking for the current revision
3657             return btree.getRootPage();
3658         }
3659 
3660         // Get the B-tree header offset
3661         NameRevision nameRevision = new NameRevision( btree.getName(), revision );
3662         long btreeHeaderOffset = btreeOfBtrees.get( nameRevision );
3663 
3664         // get the B-tree rootPage
3665         Page<K, V> btreeRoot = readRootPage( btree, btreeHeaderOffset );
3666 
3667         return btreeRoot;
3668     }
3669 
3670 
3671     /**
3672      * Read a root page from the B-tree header offset
3673      */
3674     private <K, V> Page<K, V> readRootPage( BTree<K, V> btree, long btreeHeaderOffset )
3675         throws EndOfFileExceededException, IOException
3676     {
3677         // Read the B-tree header pages on disk
3678         PageIO[] btreeHeaderPageIos = readPageIOs( btreeHeaderOffset, Long.MAX_VALUE );
3679         long dataPos = LONG_SIZE + LONG_SIZE;
3680 
3681         // The B-tree rootPage offset
3682         long rootPageOffset = readLong( btreeHeaderPageIos, dataPos );
3683 
3684         // Read the rootPage pages on disk
3685         PageIO[] rootPageIos = readPageIOs( rootPageOffset, Long.MAX_VALUE );
3686 
3687         // Now, convert it to a Page
3688         Page<K, V> btreeRoot = readPage( btree, rootPageIos );
3689 
3690         return btreeRoot;
3691     }
3692 
3693 
3694     /**
3695      * Get one managed trees, knowing its name.
3696      *
3697      * @param name The B-tree name we are looking for
3698      * @return The managed B-trees
3699      */
3700     public <K, V> BTree<K, V> getManagedTree( String name )
3701     {
3702         return ( BTree<K, V> ) managedBtrees.get( name );
3703     }
3704 
3705 
3706     /**
3707      * Move a list of pages to the free page list. A logical page is associated with one
3708      * or more physical PageIOs, which are on the disk. We have to move all those PagIO instances
3709      * to the free list, and do the same in memory (we try to keep a reference to a set of
3710      * free pages.
3711      *
3712      * @param btree The B-tree which were owning the pages
3713      * @param revision The current revision
3714      * @param pages The pages to free
3715      * @throws IOException If we had a problem while updating the file
3716      * @throws EndOfFileExceededException If we tried to write after the end of the file
3717      */
3718     /* Package protected */<K, V> void freePages( BTree<K, V> btree, long revision, List<Page<K, V>> pages )
3719         throws EndOfFileExceededException, IOException
3720     {
3721         if ( ( pages == null ) || pages.isEmpty() )
3722         {
3723             return;
3724         }
3725 
3726         if ( !keepRevisions )
3727         {
3728             // if the B-tree doesn't keep revisions, we can safely move
3729             // the pages to the freed page list.
3730             if ( LOG.isDebugEnabled() )
3731             {
3732                 LOG.debug( "Freeing the following pages :" );
3733 
3734                 for ( Page<K, V> page : pages )
3735                 {
3736                     LOG.debug( "    {}", page );
3737                 }
3738             }
3739 
3740             for ( Page<K, V> page : pages )
3741             {
3742                 long pageOffset = ( ( AbstractPage<K, V> ) page ).getOffset();
3743 
3744                 PageIO[] pageIos = readPageIOs( pageOffset, Long.MAX_VALUE );
3745 
3746                 for ( PageIO pageIo : pageIos )
3747                 {
3748                     freedPages.add( pageIo );
3749                 }
3750             }
3751         }
3752         else
3753         {
3754             // We are keeping revisions of standard B-trees, so we move the pages to the CopiedPages B-tree
3755             // but only for non managed B-trees
3756             if ( LOG.isDebugEnabled() )
3757             {
3758                 LOG.debug( "Moving the following pages to the CopiedBtree :" );
3759 
3760                 for ( Page<K, V> page : pages )
3761                 {
3762                     LOG.debug( "    {}", page );
3763                 }
3764             }
3765 
3766             long[] pageOffsets = new long[pages.size()];
3767             int pos = 0;
3768 
3769             for ( Page<K, V> page : pages )
3770             {
3771                 pageOffsets[pos++] = ( ( AbstractPage<K, V> ) page ).offset;
3772             }
3773 
3774             if ( ( btree.getType() != BTreeTypeEnum.BTREE_OF_BTREES )
3775                 && ( btree.getType() != BTreeTypeEnum.COPIED_PAGES_BTREE ) )
3776             {
3777                 // Deal with standard B-trees
3778                 RevisionName revisionName = new RevisionName( revision, btree.getName() );
3779 
3780                 copiedPageBtree.insert( revisionName, pageOffsets );
3781 
3782                 // Update the RecordManager Copiedpage Offset
3783                 currentCopiedPagesBtreeOffset = ( ( PersistedBTree<RevisionName, long[]> ) copiedPageBtree ).getBtreeOffset();
3784             }
3785             else
3786             {
3787                 // Managed B-trees : we simply free the copied pages
3788                 for ( long pageOffset : pageOffsets )
3789                 {
3790                     PageIO[] pageIos = readPageIOs( pageOffset, Long.MAX_VALUE );
3791 
3792                     for ( PageIO pageIo : pageIos )
3793                     {
3794                         freedPages.add( pageIo );
3795                     }
3796                 }
3797             }
3798         }
3799     }
3800 
3801 
3802     /**
3803      * Add a PageIO to the list of free PageIOs
3804      *
3805      * @param pageIo The page to free
3806      * @throws IOException If we weren't capable of updating the file
3807      */
3808     /* no qualifier */ void free( PageIO pageIo ) throws IOException
3809     {
3810         freePageLock.lock();
3811 
3812         // We add the Page's PageIOs before the
3813         // existing free pages.
3814         // Link it to the first free page
3815         pageIo.setNextPage( firstFreePage );
3816 
3817         LOG.debug( "Flushing the first free page" );
3818 
3819         // And flush it to disk
3820         //FIXME can be flushed last after releasing the lock
3821         flushPages( pageIo );
3822 
3823         // We can update the firstFreePage offset
3824         firstFreePage = pageIo.getOffset();
3825 
3826         freePageLock.unlock();
3827     }
3828 
3829 
3830     /**
3831      * Add an array of PageIOs to the list of free PageIOs
3832      *
3833      * @param offsets The offsets of the pages whose associated PageIOs will be fetched and freed.
3834      * @throws IOException If we weren't capable of updating the file
3835      */
3836     /*no qualifier*/ void free( long... offsets ) throws IOException
3837     {
3838         freePageLock.lock();
3839 
3840         List<PageIO> pageIos = new ArrayList<PageIO>();
3841         int pageIndex = 0;
3842         for ( int i = 0; i < offsets.length; i++ )
3843         {
3844             PageIO[] ios = readPageIOs( offsets[i], Long.MAX_VALUE );
3845             for ( PageIO io : ios )
3846             {
3847                 pageIos.add( io );
3848 
3849                 if ( pageIndex > 0 )
3850                 {
3851                     pageIos.get( pageIndex - 1 ).setNextPage( io.getOffset() );
3852                 }
3853 
3854                 pageIndex++;
3855             }
3856         }
3857 
3858 
3859         // We add the Page's PageIOs before the
3860         // existing free pages.
3861         // Link it to the first free page
3862         pageIos.get( pageIndex - 1 ).setNextPage( firstFreePage );
3863 
3864         LOG.debug( "Flushing the first free page" );
3865 
3866         // And flush it to disk
3867         //FIXME can be flushed last after releasing the lock
3868         flushPages( pageIos.toArray( new PageIO[0] ) );
3869 
3870         // We can update the firstFreePage offset
3871         firstFreePage = pageIos.get( 0 ).getOffset();
3872 
3873         freePageLock.unlock();
3874     }
3875 
3876 
3877     /**
3878      * @return the keepRevisions flag
3879      */
3880     public boolean isKeepRevisions()
3881     {
3882         return keepRevisions;
3883     }
3884 
3885 
3886     /**
3887      * @param keepRevisions the keepRevisions flag to set
3888      */
3889     public void setKeepRevisions( boolean keepRevisions )
3890     {
3891         this.keepRevisions = keepRevisions;
3892     }
3893 
3894 
3895     /**
3896      * Creates a B-tree and automatically adds it to the list of managed btrees
3897      *
3898      * @param name the name of the B-tree
3899      * @param keySerializer key serializer
3900      * @param valueSerializer value serializer
3901      * @param allowDuplicates flag for allowing duplicate keys
3902      * @return a managed B-tree
3903      * @throws IOException If we weren't able to update the file on disk
3904      * @throws BTreeAlreadyManagedException If the B-tree is already managed
3905      */
3906     @SuppressWarnings("all")
3907     public <K, V> BTree<K, V> addBTree( String name, ElementSerializer<K> keySerializer,
3908         ElementSerializer<V> valueSerializer, boolean allowDuplicates )
3909         throws IOException, BTreeAlreadyManagedException
3910     {
3911         PersistedBTreeConfiguration config = new PersistedBTreeConfiguration();
3912 
3913         config.setName( name );
3914         config.setKeySerializer( keySerializer );
3915         config.setValueSerializer( valueSerializer );
3916         config.setAllowDuplicates( allowDuplicates );
3917 
3918         BTree btree = new PersistedBTree( config );
3919         manage( btree );
3920 
3921         if ( LOG_CHECK.isDebugEnabled() )
3922         {
3923             MavibotInspector.check( this );
3924         }
3925 
3926         return btree;
3927     }
3928 
3929 
3930     /**
3931      * Add a newly closd transaction into the closed transaction queue
3932      */
3933     /* no qualifier */<K, V> void releaseTransaction( ReadTransaction<K, V> readTransaction )
3934     {
3935         RevisionName revisionName = new RevisionName(
3936             readTransaction.getRevision(),
3937             readTransaction.getBtreeHeader().getBtree().getName() );
3938         //closedTransactionsQueue.add( revisionName );
3939     }
3940 
3941 
3942     /**
3943      * Get the current BTreeHeader for a given Btree. It might not exist
3944      */
3945     public BTreeHeader getBTreeHeader( String name )
3946     {
3947         // Get a lock
3948         btreeHeadersLock.readLock().lock();
3949 
3950         // get the current BTree Header for this BTree and revision
3951         BTreeHeader<?, ?> btreeHeader = currentBTreeHeaders.get( name );
3952 
3953         // And unlock 
3954         btreeHeadersLock.readLock().unlock();
3955 
3956         return btreeHeader;
3957     }
3958 
3959 
3960     /**
3961      * Get the new BTreeHeader for a given Btree. It might not exist
3962      */
3963     public BTreeHeader getNewBTreeHeader( String name )
3964     {
3965         // get the current BTree Header for this BTree and revision
3966         BTreeHeader<?, ?> btreeHeader = newBTreeHeaders.get( name );
3967 
3968         return btreeHeader;
3969     }
3970 
3971 
3972     /**
3973      * {@inheritDoc}
3974      */
3975     public void updateNewBTreeHeaders( BTreeHeader btreeHeader )
3976     {
3977         newBTreeHeaders.put( btreeHeader.getBtree().getName(), btreeHeader );
3978     }
3979 
3980 
3981     /**
3982      * Swap the current BtreeHeader map with the new one. This method will only
3983      * be called in a single trhead, when the current transaction will be committed.
3984      */
3985     private void swapCurrentBtreeHeaders()
3986     {
3987         // Copy the reference to the current BtreeHeader Map
3988         Map<String, BTreeHeader<?, ?>> tmp = currentBTreeHeaders;
3989 
3990         // Get a write lock
3991         btreeHeadersLock.writeLock().lock();
3992 
3993         // Swap the new BTreeHeader Map
3994         currentBTreeHeaders = newBTreeHeaders;
3995 
3996         // And unlock 
3997         btreeHeadersLock.writeLock().unlock();
3998 
3999         // Last, not least, clear the Map and reinject the latest revision in it
4000         tmp.clear();
4001         tmp.putAll( currentBTreeHeaders );
4002 
4003         // And update the new BTreeHeader map
4004         newBTreeHeaders = tmp;
4005     }
4006 
4007 
4008     /**
4009      * revert the new BTreeHeaders Map to the current BTreeHeader Map. This method
4010      * is called when we have to rollback a transaction.
4011      */
4012     private void revertBtreeHeaders()
4013     {
4014         // Clean up teh new BTreeHeaders Map
4015         newBTreeHeaders.clear();
4016 
4017         // Reinject the latest revision in it
4018         newBTreeHeaders.putAll( currentBTreeHeaders );
4019     }
4020 
4021 
4022     /**
4023      * Loads a B-tree holding the values of a duplicate key
4024      * This tree is also called as dups tree or sub tree
4025      *
4026      * @param offset the offset of the B-tree header
4027      * @return the deserialized B-tree
4028      */
4029     /* No qualifier */<K, V> BTree<V, V> loadDupsBtree( long btreeHeaderOffset, BTree<K, V> parentBtree )
4030     {
4031         PageIO[] pageIos = null;
4032         try
4033         {
4034             pageIos = readPageIOs( btreeHeaderOffset, Long.MAX_VALUE );
4035 
4036             BTree<V, V> subBtree = BTreeFactory.<V, V> createPersistedBTree( BTreeTypeEnum.PERSISTED_SUB );
4037             loadBtree( pageIos, subBtree, parentBtree );
4038 
4039             return subBtree;
4040         }
4041         catch ( Exception e )
4042         {
4043             // should not happen
4044             throw new BTreeCreationException( e );
4045         }
4046     }
4047 
4048 
4049     private void checkFreePages() throws EndOfFileExceededException, IOException
4050     {
4051         //System.out.println( "Checking the free pages, starting from " + Long.toHexString( firstFreePage ) );
4052 
4053         // read all the free pages, add them into a set, to be sure we don't have a cycle
4054         Set<Long> freePageOffsets = new HashSet<Long>();
4055 
4056         long currentFreePageOffset = firstFreePage;
4057 
4058         while ( currentFreePageOffset != NO_PAGE )
4059         {
4060             //System.out.println( "Next page offset :" + Long.toHexString( currentFreePageOffset ) );
4061 
4062             if ( ( currentFreePageOffset % pageSize ) != 0 )
4063             {
4064                 throw new InvalidOffsetException( "Wrong offset : " + Long.toHexString( currentFreePageOffset ) );
4065             }
4066 
4067             if ( freePageOffsets.contains( currentFreePageOffset ) )
4068             {
4069                 throw new InvalidOffsetException( "Offset : " + Long.toHexString( currentFreePageOffset )
4070                     + " already read, there is a cycle" );
4071             }
4072 
4073             freePageOffsets.add( currentFreePageOffset );
4074             PageIO pageIO = fetchPage( currentFreePageOffset );
4075 
4076             currentFreePageOffset = pageIO.getNextPage();
4077         }
4078 
4079         return;
4080     }
4081 
4082 
4083     /**
4084      * sets the threshold of the number of commits to be performed before
4085      * reclaiming the free pages.
4086      * 
4087      * @param pageReclaimerThreshold the number of commits before the reclaimer runs
4088      */
4089     /* no qualifier */ void setPageReclaimerThreshold( int pageReclaimerThreshold )
4090     {
4091         this.pageReclaimerThreshold = pageReclaimerThreshold;
4092     }
4093 
4094     
4095     /* no qualifier */ void _disableReclaimer( boolean toggle )
4096     {
4097         this.disableReclaimer = toggle;
4098     }
4099 
4100     
4101     /**
4102      * @see Object#toString()
4103      */
4104     public String toString()
4105     {
4106         StringBuilder sb = new StringBuilder();
4107 
4108         sb.append( "RM free pages : [" );
4109 
4110         if ( firstFreePage != NO_PAGE )
4111         {
4112             long current = firstFreePage;
4113             boolean isFirst = true;
4114 
4115             while ( current != NO_PAGE )
4116             {
4117                 if ( isFirst )
4118                 {
4119                     isFirst = false;
4120                 }
4121                 else
4122                 {
4123                     sb.append( ", " );
4124                 }
4125 
4126                 PageIO pageIo;
4127 
4128                 try
4129                 {
4130                     pageIo = fetchPage( current );
4131                     sb.append( pageIo.getOffset() );
4132                     current = pageIo.getNextPage();
4133                 }
4134                 catch ( EndOfFileExceededException e )
4135                 {
4136                     e.printStackTrace();
4137                 }
4138                 catch ( IOException e )
4139                 {
4140                     e.printStackTrace();
4141                 }
4142 
4143             }
4144         }
4145 
4146         sb.append( "]" );
4147 
4148         return sb.toString();
4149     }
4150 }