001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.directory.mavibot.btree;
021
022
023import java.io.File;
024import java.io.IOException;
025import java.io.RandomAccessFile;
026import java.nio.ByteBuffer;
027import java.nio.channels.FileChannel;
028import java.util.ArrayList;
029import java.util.HashMap;
030import java.util.HashSet;
031import java.util.LinkedHashMap;
032import java.util.List;
033import java.util.Map;
034import java.util.Queue;
035import java.util.Set;
036import java.util.concurrent.LinkedBlockingQueue;
037import java.util.concurrent.atomic.AtomicLong;
038import java.util.concurrent.locks.ReadWriteLock;
039import java.util.concurrent.locks.ReentrantLock;
040import java.util.concurrent.locks.ReentrantReadWriteLock;
041
042import org.apache.directory.mavibot.btree.exception.BTreeAlreadyManagedException;
043import org.apache.directory.mavibot.btree.exception.BTreeCreationException;
044import org.apache.directory.mavibot.btree.exception.EndOfFileExceededException;
045import org.apache.directory.mavibot.btree.exception.FileException;
046import org.apache.directory.mavibot.btree.exception.InvalidOffsetException;
047import org.apache.directory.mavibot.btree.exception.KeyNotFoundException;
048import org.apache.directory.mavibot.btree.exception.RecordManagerException;
049import org.apache.directory.mavibot.btree.serializer.ElementSerializer;
050import org.apache.directory.mavibot.btree.serializer.IntSerializer;
051import org.apache.directory.mavibot.btree.serializer.LongArraySerializer;
052import org.apache.directory.mavibot.btree.serializer.LongSerializer;
053import org.apache.directory.mavibot.btree.util.Strings;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057
058/**
059 * The RecordManager is used to manage the file in which we will store the B-trees.
060 * A RecordManager will manage more than one B-tree.<br/>
061 *
062 * It stores data in fixed size pages (default size is 512 bytes), which may be linked one to
063 * the other if the data we want to store is too big for a page.
064 *
065 * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
066 */
067public class RecordManager extends AbstractTransactionManager
068{
069    /** The LoggerFactory used by this class */
070    protected static final Logger LOG = LoggerFactory.getLogger( RecordManager.class );
071
072    /** The LoggerFactory used to trace TXN operations */
073    protected static final Logger TXN_LOG = LoggerFactory.getLogger( "TXN_LOG" );
074
075    /** The LoggerFactory used by this class */
076    protected static final Logger LOG_PAGES = LoggerFactory.getLogger( "org.apache.directory.mavibot.LOG_PAGES" );
077
078    /** A dedicated logger for the check */
079    protected static final Logger LOG_CHECK = LoggerFactory.getLogger( "org.apache.directory.mavibot.LOG_CHECK" );
080
081    /** The associated file */
082    private File file;
083
084    /** The channel used to read and write data */
085    /* no qualifier */FileChannel fileChannel;
086
087    /** The number of managed B-trees */
088    /* no qualifier */int nbBtree;
089
090    /** The first and last free page */
091    /* no qualifier */long firstFreePage;
092
093    /** Some counters to track the number of free pages */
094    public AtomicLong nbFreedPages = new AtomicLong( 0 );
095    public AtomicLong nbCreatedPages = new AtomicLong( 0 );
096    public AtomicLong nbReusedPages = new AtomicLong( 0 );
097    public AtomicLong nbUpdateRMHeader = new AtomicLong( 0 );
098    public AtomicLong nbUpdateBtreeHeader = new AtomicLong( 0 );
099    public AtomicLong nbUpdatePageIOs = new AtomicLong( 0 );
100
101    /** The offset of the end of the file */
102    private long endOfFileOffset;
103
104    /**
105     * A B-tree used to manage the page that has been copied in a new version.
106     * Those pages can be reclaimed when the associated version is dead.
107     **/
108    /* no qualifier */ BTree<RevisionName, long[]> copiedPageBtree;
109
110    /** A constant for an offset on a non existing page */
111    public static final long NO_PAGE = -1L;
112
113    /** The number of bytes used to store the size of a page */
114    private static final int PAGE_SIZE = 4;
115
116    /** The size of the link to next page */
117    private static final int LINK_SIZE = 8;
118
119    /** Some constants */
120    private static final int BYTE_SIZE = 1;
121    /* no qualifier */static final int INT_SIZE = 4;
122    /* no qualifier */static final int LONG_SIZE = 8;
123
124    /** The default page size */
125    public static final int DEFAULT_PAGE_SIZE = 512;
126
127    /** The minimal page size. Can't be below 64, as we have to store many thing sin the RMHeader */
128    private static final int MIN_PAGE_SIZE = 64;
129
130    /** The RecordManager header size */
131    /* no qualifier */static int RECORD_MANAGER_HEADER_SIZE = DEFAULT_PAGE_SIZE;
132
133    /** A global buffer used to store the RecordManager header */
134    private ByteBuffer RECORD_MANAGER_HEADER_BUFFER;
135
136    /** A static buffer used to store the RecordManager header */
137    private byte[] RECORD_MANAGER_HEADER_BYTES;
138
139    /** The length of an Offset, as a negative value */
140    private byte[] LONG_LENGTH = new byte[]
141        { ( byte ) 0xFF, ( byte ) 0xFF, ( byte ) 0xFF, ( byte ) 0xF8 };
142
143    /** The RecordManager underlying page size. */
144    /* no qualifier */int pageSize = DEFAULT_PAGE_SIZE;
145
146    /** The set of managed B-trees */
147    private Map<String, BTree<Object, Object>> managedBtrees;
148
149    /** The queue of recently closed transactions */
150    private Queue<RevisionName> closedTransactionsQueue = new LinkedBlockingQueue<RevisionName>();
151
152    /** The default file name */
153    private static final String DEFAULT_FILE_NAME = "mavibot.db";
154
155    /** A flag set to true if we want to keep old revisions */
156    private boolean keepRevisions;
157
158    /** A flag used by internal btrees */
159    public static final boolean INTERNAL_BTREE = true;
160
161    /** A flag used by internal btrees */
162    public static final boolean NORMAL_BTREE = false;
163
164    /** The B-tree of B-trees */
165    /* no qualifier */ BTree<NameRevision, Long> btreeOfBtrees;
166
167    /** The B-tree of B-trees management btree name */
168    /* no qualifier */static final String BTREE_OF_BTREES_NAME = "_btree_of_btrees_";
169
170    /** The CopiedPages management btree name */
171    /* no qualifier */static final String COPIED_PAGE_BTREE_NAME = "_copiedPageBtree_";
172
173    /** The current B-tree of B-trees header offset */
174    /* no qualifier */long currentBtreeOfBtreesOffset;
175
176    /** The previous B-tree of B-trees header offset */
177    private long previousBtreeOfBtreesOffset = NO_PAGE;
178
179    /** The offset on the current copied pages B-tree */
180    /* no qualifier */ long currentCopiedPagesBtreeOffset = NO_PAGE;
181
182    /** The offset on the previous copied pages B-tree */
183    private long previousCopiedPagesBtreeOffset = NO_PAGE;
184
185    /** A lock to protect the transaction handling */
186    private ReentrantLock transactionLock = new ReentrantLock();
187
188    /** A ThreadLocalStorage used to store the current transaction */
189    private static final ThreadLocal<Integer> context = new ThreadLocal<Integer>();
190
191    /** The list of PageIO that can be freed after a commit */
192    List<PageIO> freedPages = new ArrayList<PageIO>();
193
194    /** The list of PageIO that can be freed after a roolback */
195    private List<PageIO> allocatedPages = new ArrayList<PageIO>();
196
197    /** A Map keeping the latest revisions for each managed BTree */
198    private Map<String, BTreeHeader<?, ?>> currentBTreeHeaders = new HashMap<String, BTreeHeader<?, ?>>();
199
200    /** A Map storing the new revisions when some change have been made in some BTrees */
201    private Map<String, BTreeHeader<?, ?>> newBTreeHeaders = new HashMap<String, BTreeHeader<?, ?>>();
202
203    /** A lock to protect the BtreeHeader maps */
204    private ReadWriteLock btreeHeadersLock = new ReentrantReadWriteLock();
205
206    /** A value stored into the transaction context for rollbacked transactions */
207    private static final int ROLLBACKED_TXN = 0;
208
209    /** A lock to protect the freepage pointers */
210    private ReentrantLock freePageLock = new ReentrantLock();
211
212    /** the space reclaimer */
213    private PageReclaimer reclaimer;
214
215    /** variable to keep track of the write commit count */
216    private int commitCount = 0;
217
218    /** the threshold at which the PageReclaimer will be run to free the copied pages */
219    // FIXME the below value is derived after seeing that anything higher than that
220    // is resulting in a "This thread does not hold the transactionLock" error
221    private int pageReclaimerThreshold = 70;
222    
223    /* a flag used to disable the free page reclaimer (used for internal testing only) */
224    private boolean disableReclaimer = false;
225
226    public Map<Long, Integer> writeCounter = new HashMap<Long, Integer>();
227
228
229    /**
230     * Create a Record manager which will either create the underlying file
231     * or load an existing one. If a folder is provided, then we will create
232     * a file with a default name : mavibot.db
233     *
234     * @param name The file name, or a folder name
235     */
236    public RecordManager( String fileName )
237    {
238        this( fileName, DEFAULT_PAGE_SIZE );
239    }
240
241
242    /**
243     * Create a Record manager which will either create the underlying file
244     * or load an existing one. If a folder is provider, then we will create
245     * a file with a default name : mavibot.db
246     *
247     * @param name The file name, or a folder name
248     * @param pageSize the size of a page on disk, in bytes
249     */
250    public RecordManager( String fileName, int pageSize )
251    {
252        managedBtrees = new LinkedHashMap<String, BTree<Object, Object>>();
253
254        if ( pageSize < MIN_PAGE_SIZE )
255        {
256            this.pageSize = MIN_PAGE_SIZE;
257        }
258        else
259        {
260            this.pageSize = pageSize;
261        }
262
263        RECORD_MANAGER_HEADER_BUFFER = ByteBuffer.allocate( this.pageSize );
264        RECORD_MANAGER_HEADER_BYTES = new byte[this.pageSize];
265        RECORD_MANAGER_HEADER_SIZE = this.pageSize;
266
267        // Open the file or create it
268        File tmpFile = new File( fileName );
269
270        if ( tmpFile.isDirectory() )
271        {
272            // It's a directory. Check that we don't have an existing mavibot file
273            tmpFile = new File( tmpFile, DEFAULT_FILE_NAME );
274        }
275
276        // We have to create a new file, if it does not already exist
277        boolean isNewFile = createFile( tmpFile );
278
279        try
280        {
281            RandomAccessFile randomFile = new RandomAccessFile( file, "rw" );
282            fileChannel = randomFile.getChannel();
283
284            // get the current end of file offset
285            endOfFileOffset = fileChannel.size();
286
287            if ( isNewFile )
288            {
289                initRecordManager();
290            }
291            else
292            {
293                loadRecordManager();
294            }
295
296            reclaimer = new PageReclaimer( this );
297            runReclaimer();
298        }
299        catch ( Exception e )
300        {
301            LOG.error( "Error while initializing the RecordManager : {}", e.getMessage() );
302            LOG.error( "", e );
303            throw new RecordManagerException( e );
304        }
305    }
306
307
308    /**
309     * runs the PageReclaimer to free the copied pages
310     */
311    private void runReclaimer()
312    {
313        if ( disableReclaimer )
314        {
315            LOG.warn( "Free page reclaimer is disabled, this should not be disabled on production systems." );
316            return; 
317        }
318        
319        try
320        {
321            commitCount = 0;
322            reclaimer.reclaim();
323            // must update the headers after reclaim operation
324            updateRecordManagerHeader();
325        }
326        catch ( Exception e )
327        {
328            LOG.warn( "PageReclaimer failed to free the pages", e );
329        }
330    }
331
332
333    /**
334     * Create the mavibot file if it does not exist
335     */
336    private boolean createFile( File mavibotFile )
337    {
338        try
339        {
340            boolean creation = mavibotFile.createNewFile();
341
342            file = mavibotFile;
343
344            if ( mavibotFile.length() == 0 )
345            {
346                return true;
347            }
348            else
349            {
350                return creation;
351            }
352        }
353        catch ( IOException ioe )
354        {
355            LOG.error( "Cannot create the file {}", mavibotFile.getName() );
356            return false;
357        }
358    }
359
360
361    /**
362     * We will create a brand new RecordManager file, containing nothing, but the RecordManager header,
363     * a B-tree to manage the old revisions we want to keep and
364     * a B-tree used to manage pages associated with old versions.
365     * <br/>
366     * The RecordManager header contains the following details :
367     * <pre>
368     * +--------------------------+
369     * | PageSize                 | 4 bytes : The size of a physical page (default to 512)
370     * +--------------------------+
371     * |  NbTree                  | 4 bytes : The number of managed B-trees (zero or more)
372     * +--------------------------+
373     * | FirstFree                | 8 bytes : The offset of the first free page
374     * +--------------------------+
375     * | current BoB offset       | 8 bytes : The offset of the current BoB
376     * +--------------------------+
377     * | previous BoB offset      | 8 bytes : The offset of the previous BoB
378     * +--------------------------+
379     * | current CP btree offset  | 8 bytes : The offset of the current BoB
380     * +--------------------------+
381     * | previous CP btree offset | 8 bytes : The offset of the previous BoB
382     * +--------------------------+
383     * </pre>
384     *
385     * We then store the B-tree managing the pages that have been copied when we have added
386     * or deleted an element in the B-tree. They are associated with a version.
387     *
388     * Last, we add the bTree that keep a track on each revision we can have access to.
389     */
390    private void initRecordManager() throws IOException
391    {
392        // Create a new Header
393        nbBtree = 0;
394        firstFreePage = NO_PAGE;
395        currentBtreeOfBtreesOffset = NO_PAGE;
396
397        updateRecordManagerHeader();
398
399        // Set the offset of the end of the file
400        endOfFileOffset = fileChannel.size();
401
402        // First, create the btree of btrees <NameRevision, Long>
403        createBtreeOfBtrees();
404
405        // Now, initialize the Copied Page B-tree
406        createCopiedPagesBtree();
407
408        // Inject these B-trees into the RecordManager. They are internal B-trees.
409        try
410        {
411            manageSubBtree( btreeOfBtrees );
412
413            currentBtreeOfBtreesOffset = ( ( PersistedBTree<NameRevision, Long> ) btreeOfBtrees ).getBtreeHeader()
414                .getBTreeHeaderOffset();
415            updateRecordManagerHeader();
416
417            // Inject the BtreeOfBtrees into the currentBtreeHeaders map
418            currentBTreeHeaders.put( BTREE_OF_BTREES_NAME,
419                ( ( PersistedBTree<NameRevision, Long> ) btreeOfBtrees ).getBtreeHeader() );
420            newBTreeHeaders.put( BTREE_OF_BTREES_NAME,
421                ( ( PersistedBTree<NameRevision, Long> ) btreeOfBtrees ).getBtreeHeader() );
422
423            // The FreePage B-tree
424            manageSubBtree( copiedPageBtree );
425
426            currentCopiedPagesBtreeOffset = ( ( PersistedBTree<RevisionName, long[]> ) copiedPageBtree ).getBtreeHeader().getBTreeHeaderOffset();
427            updateRecordManagerHeader();
428            
429            // Inject the CopiedPagesBTree into the currentBtreeHeaders map
430            currentBTreeHeaders.put( COPIED_PAGE_BTREE_NAME, ( ( PersistedBTree<RevisionName, long[]> ) copiedPageBtree ).getBtreeHeader() );
431            newBTreeHeaders.put( COPIED_PAGE_BTREE_NAME, ( ( PersistedBTree<RevisionName, long[]> ) copiedPageBtree ).getBtreeHeader() );
432        }
433        catch ( BTreeAlreadyManagedException btame )
434        {
435            // Can't happen here.
436        }
437
438        // We are all set ! Verify the file
439        if ( LOG_CHECK.isDebugEnabled() )
440        {
441            MavibotInspector.check( this );
442        }
443
444    }
445
446
447    /**
448     * Create the B-treeOfBtrees
449     */
450    private void createBtreeOfBtrees()
451    {
452        PersistedBTreeConfiguration<NameRevision, Long> configuration = new PersistedBTreeConfiguration<NameRevision, Long>();
453        configuration.setKeySerializer( NameRevisionSerializer.INSTANCE );
454        configuration.setName( BTREE_OF_BTREES_NAME );
455        configuration.setValueSerializer( LongSerializer.INSTANCE );
456        configuration.setBtreeType( BTreeTypeEnum.BTREE_OF_BTREES );
457        configuration.setCacheSize( PersistedBTree.DEFAULT_CACHE_SIZE );
458
459        btreeOfBtrees = BTreeFactory.createPersistedBTree( configuration );
460    }
461
462
463    /**
464     * Create the CopiedPagesBtree
465     */
466    private void createCopiedPagesBtree()
467    {
468        PersistedBTreeConfiguration<RevisionName, long[]> configuration = new PersistedBTreeConfiguration<RevisionName, long[]>();
469        configuration.setKeySerializer( RevisionNameSerializer.INSTANCE );
470        configuration.setName( COPIED_PAGE_BTREE_NAME );
471        configuration.setValueSerializer( LongArraySerializer.INSTANCE );
472        configuration.setBtreeType( BTreeTypeEnum.COPIED_PAGES_BTREE );
473        configuration.setCacheSize( PersistedBTree.DEFAULT_CACHE_SIZE );
474
475        copiedPageBtree = BTreeFactory.createPersistedBTree( configuration );
476    }
477
478
479    /**
480     * Load the BTrees from the disk.
481     *
482     * @throws InstantiationException
483     * @throws IllegalAccessException
484     * @throws ClassNotFoundException
485     * @throws NoSuchFieldException
486     * @throws SecurityException
487     * @throws IllegalArgumentException
488     */
489    private void loadRecordManager() throws IOException, ClassNotFoundException, IllegalAccessException,
490        InstantiationException, IllegalArgumentException, SecurityException, NoSuchFieldException, KeyNotFoundException
491    {
492        if ( fileChannel.size() != 0 )
493        {
494            ByteBuffer recordManagerHeader = ByteBuffer.allocate( RECORD_MANAGER_HEADER_SIZE );
495
496            // The file exists, we have to load the data now
497            fileChannel.read( recordManagerHeader );
498
499            recordManagerHeader.rewind();
500
501            // read the RecordManager Header :
502            // +---------------------+
503            // | PageSize            | 4 bytes : The size of a physical page (default to 4096)
504            // +---------------------+
505            // | NbTree              | 4 bytes : The number of managed B-trees (at least 1)
506            // +---------------------+
507            // | FirstFree           | 8 bytes : The offset of the first free page
508            // +---------------------+
509            // | current BoB offset  | 8 bytes : The offset of the current B-tree of B-trees
510            // +---------------------+
511            // | previous BoB offset | 8 bytes : The offset of the previous B-tree of B-trees
512            // +---------------------+
513            // | current CP offset   | 8 bytes : The offset of the current Copied Pages B-tree
514            // +---------------------+
515            // | previous CP offset  | 8 bytes : The offset of the previous Copied Pages B-tree
516            // +---------------------+
517
518            // The page size
519            pageSize = recordManagerHeader.getInt();
520
521            // The number of managed B-trees
522            nbBtree = recordManagerHeader.getInt();
523
524            // The first and last free page
525            firstFreePage = recordManagerHeader.getLong();
526
527            // Read all the free pages
528            checkFreePages();
529
530            // The current BOB offset
531            currentBtreeOfBtreesOffset = recordManagerHeader.getLong();
532
533            // The previous BOB offset
534            previousBtreeOfBtreesOffset = recordManagerHeader.getLong();
535
536            // The current Copied Pages B-tree offset
537            currentCopiedPagesBtreeOffset = recordManagerHeader.getLong();
538
539            // The previous Copied Pages B-tree offset
540            previousCopiedPagesBtreeOffset = recordManagerHeader.getLong();
541
542            // read the B-tree of B-trees
543            PageIO[] bobHeaderPageIos = readPageIOs( currentBtreeOfBtreesOffset, Long.MAX_VALUE );
544
545            btreeOfBtrees = BTreeFactory.<NameRevision, Long> createPersistedBTree( BTreeTypeEnum.BTREE_OF_BTREES );
546            //BTreeFactory.<NameRevision, Long> setBtreeHeaderOffset( ( PersistedBTree<NameRevision, Long> )btreeOfBtrees, currentBtreeOfBtreesOffset );
547
548            loadBtree( bobHeaderPageIos, btreeOfBtrees );
549
550            // read the copied page B-tree
551            PageIO[] copiedPagesPageIos = readPageIOs( currentCopiedPagesBtreeOffset, Long.MAX_VALUE );
552
553            copiedPageBtree = BTreeFactory.<RevisionName, long[]> createPersistedBTree( BTreeTypeEnum.COPIED_PAGES_BTREE );
554            //( ( PersistedBTree<RevisionName, long[]> ) copiedPageBtree ).setBtreeHeaderOffset( currentCopiedPagesBtreeOffset );
555
556            loadBtree( copiedPagesPageIos, copiedPageBtree );
557
558            // Now, read all the B-trees from the btree of btrees
559            TupleCursor<NameRevision, Long> btreeCursor = btreeOfBtrees.browse();
560            Map<String, Long> loadedBtrees = new HashMap<String, Long>();
561
562            // loop on all the btrees we have, and keep only the latest revision
563            long currentRevision = -1L;
564
565            while ( btreeCursor.hasNext() )
566            {
567                Tuple<NameRevision, Long> btreeTuple = btreeCursor.next();
568                NameRevision nameRevision = btreeTuple.getKey();
569                long btreeOffset = btreeTuple.getValue();
570                long revision = nameRevision.getValue();
571
572                // Check if we already have processed this B-tree
573                Long loadedBtreeRevision = loadedBtrees.get( nameRevision.getName() );
574
575                if ( loadedBtreeRevision != null )
576                {
577                    // The btree has already been loaded. The revision is necessarily higher
578                    if ( revision > currentRevision )
579                    {
580                        // We have a newer revision : switch to the new revision (we keep the offset atm)
581                        loadedBtrees.put( nameRevision.getName(), btreeOffset );
582                        currentRevision = revision;
583                    }
584                }
585                else
586                {
587                    // This is a new B-tree
588                    loadedBtrees.put( nameRevision.getName(), btreeOffset );
589                    currentRevision = nameRevision.getRevision();
590                }
591            }
592
593            // TODO : clean up the old revisions...
594
595
596            // Now, we can load the real btrees using the offsets
597            for ( String btreeName : loadedBtrees.keySet() )
598            {
599                long btreeOffset = loadedBtrees.get( btreeName );
600
601                PageIO[] btreePageIos = readPageIOs( btreeOffset, Long.MAX_VALUE );
602
603                BTree<?, ?> btree = BTreeFactory.<NameRevision, Long> createPersistedBTree();
604                //( ( PersistedBTree<NameRevision, Long> ) btree ).setBtreeHeaderOffset( btreeOffset );
605                loadBtree( btreePageIos, btree );
606
607                // Add the btree into the map of managed B-trees
608                managedBtrees.put( btreeName, ( BTree<Object, Object> ) btree );
609            }
610
611            // We are done ! Let's finish with the last initialization parts
612            endOfFileOffset = fileChannel.size();
613        }
614    }
615
616
617    /**
618     * Starts a transaction
619     */
620    public void beginTransaction()
621    {
622        if ( TXN_LOG.isDebugEnabled() )
623        {
624            TXN_LOG.debug( "Begining a new transaction on thread {}, TxnLevel {}",
625                Thread.currentThread().getName(), getTxnLevel() );
626        }
627
628        // First, take the lock if it's not already taken
629        if ( !( ( ReentrantLock ) transactionLock ).isHeldByCurrentThread() )
630        {
631            TXN_LOG.debug( "--> Lock taken" );
632            transactionLock.lock();
633        }
634        else
635        {
636            TXN_LOG.debug( "..o The current thread already holds the lock" );
637        }
638
639        // Now, check the TLS state
640        incrementTxnLevel();
641    }
642
643
644    /**
645     * Commits a transaction
646     */
647    public void commit()
648    {
649        // We *must* own the transactionLock
650        if ( !transactionLock.isHeldByCurrentThread() )
651        {
652            String name = Thread.currentThread().getName();
653            String err = "This thread, '" + name + "' does not hold the transactionLock ";
654            TXN_LOG.error( err );
655            throw new RecordManagerException( err );
656        }
657
658        if ( TXN_LOG.isDebugEnabled() )
659        {
660            TXN_LOG.debug( "Committing a transaction on thread {}, TxnLevel {}",
661                Thread.currentThread().getName(), getTxnLevel() );
662        }
663
664        if ( !fileChannel.isOpen() )
665        {
666            // Still we have to decrement the TransactionLevel
667            int txnLevel = decrementTxnLevel();
668
669            if ( txnLevel == 0 )
670            {
671                // We can safely release the lock
672                // The file has been closed, nothing remains to commit, let's get out
673                transactionLock.unlock();
674            }
675
676            return;
677        }
678
679        int nbTxnStarted = context.get();
680
681        switch ( nbTxnStarted )
682        {
683            case ROLLBACKED_TXN:
684                // The transaction was rollbacked, quit immediatelly
685                transactionLock.unlock();
686
687                return;
688
689            case 1:
690                // We are done with the transaction, we can update the RMHeader and swap the BTreeHeaders
691                // First update the RMHeader to be sure that we have a way to restore from a crash
692                updateRecordManagerHeader();
693
694                // Swap the BtreeHeaders maps
695                swapCurrentBtreeHeaders();
696
697                // We can now free pages
698                for ( PageIO pageIo : freedPages )
699                {
700                    try
701                    {
702                        free( pageIo );
703                    }
704                    catch ( IOException ioe )
705                    {
706                        throw new RecordManagerException( ioe.getMessage() );
707                    }
708                }
709
710                // Release the allocated and freed pages list
711                freedPages.clear();
712                allocatedPages.clear();
713
714                // And update the RMHeader again, removing the old references to BOB and CPB b-tree headers
715                // here, we have to erase the old references to keep only the new ones.
716                updateRecordManagerHeader();
717
718                commitCount++;
719
720                if ( commitCount >= pageReclaimerThreshold )
721                {
722                    runReclaimer();
723                }
724
725                // Finally, decrement the number of started transactions
726                // and release the global lock if possible
727                int txnLevel = decrementTxnLevel();
728
729                if ( txnLevel == 0 )
730                {
731                    transactionLock.unlock();
732                }
733
734                return;
735
736            default:
737                // We are inner an existing transaction. Just update the necessary elements
738                // Update the RMHeader to be sure that we have a way to restore from a crash
739                updateRecordManagerHeader();
740
741                // Swap the BtreeHeaders maps
742                //swapCurrentBtreeHeaders();
743
744                // We can now free pages
745                for ( PageIO pageIo : freedPages )
746                {
747                    try
748                    {
749                        free( pageIo );
750                    }
751                    catch ( IOException ioe )
752                    {
753                        throw new RecordManagerException( ioe.getMessage() );
754                    }
755                }
756
757                // Release the allocated and freed pages list
758                freedPages.clear();
759                allocatedPages.clear();
760
761                // And update the RMHeader again, removing the old references to BOB and CPB b-tree headers
762                // here, we have to erase the old references to keep only the new ones.
763                updateRecordManagerHeader();
764
765                commitCount++;
766
767                if ( commitCount >= pageReclaimerThreshold )
768                {
769                    runReclaimer();
770                }
771
772                // Finally, decrement the number of started transactions
773                // and release the global lock
774                txnLevel = decrementTxnLevel();
775
776                if ( txnLevel == 0 )
777                {
778                    transactionLock.unlock();
779                }
780
781                return;
782        }
783    }
784
785
786    public boolean isContextOk()
787    {
788        return ( context == null ? true : ( context.get() == 0 ) );
789    }
790
791
792    /**
793     * Get the transactionLevel, ie the number of encapsulated update ops
794     */
795    private int getTxnLevel()
796    {
797        Integer nbTxnLevel = context.get();
798
799        if ( nbTxnLevel == null )
800        {
801            return -1;
802        }
803
804        return nbTxnLevel;
805    }
806
807
808    /**
809     * Increment the transactionLevel
810     */
811    private void incrementTxnLevel()
812    {
813        Integer nbTxnLevel = context.get();
814
815        if ( nbTxnLevel == null )
816        {
817            context.set( 1 );
818        }
819        else
820        {
821            // And increment the counter of inner txn.
822            context.set( nbTxnLevel + 1 );
823        }
824
825        if ( TXN_LOG.isDebugEnabled() )
826        {
827            TXN_LOG.debug( "Incrementing the TxnLevel : {}", context.get() );
828        }
829    }
830
831
832    /**
833     * Decrement the transactionLevel
834     */
835    private int decrementTxnLevel()
836    {
837        int nbTxnStarted = context.get() - 1;
838
839        context.set( nbTxnStarted );
840
841        if ( TXN_LOG.isDebugEnabled() )
842        {
843            TXN_LOG.debug( "Decrementing the TxnLevel : {}", context.get() );
844        }
845
846        return nbTxnStarted;
847    }
848
849
850    /**
851     * Rollback a transaction
852     */
853    public void rollback()
854    {
855        // We *must* own the transactionLock
856        if ( !transactionLock.isHeldByCurrentThread() )
857        {
858            TXN_LOG.error( "This thread does not hold the transactionLock" );
859            throw new RecordManagerException( "This thread does not hold the transactionLock" );
860        }
861
862        if ( TXN_LOG.isDebugEnabled() )
863        {
864            TXN_LOG.debug( "Rollbacking a new transaction on thread {}, TxnLevel {}",
865                Thread.currentThread().getName(), getTxnLevel() );
866        }
867
868        // Reset the counter
869        context.set( ROLLBACKED_TXN );
870
871        // We can now free allocated pages, this is the end of the transaction
872        for ( PageIO pageIo : allocatedPages )
873        {
874            try
875            {
876                free( pageIo );
877            }
878            catch ( IOException ioe )
879            {
880                throw new RecordManagerException( ioe.getMessage() );
881            }
882        }
883
884        // Release the allocated and freed pages list
885        freedPages.clear();
886        allocatedPages.clear();
887
888        // And update the RMHeader
889        updateRecordManagerHeader();
890
891        // And restore the BTreeHeaders new Map to the current state
892        revertBtreeHeaders();
893
894        // This is an all-of-nothing operation : we can't have a transaction within
895        // a transaction that would survive an inner transaction rollback.
896        transactionLock.unlock();
897    }
898
899
900    /**
901     * Reads all the PageIOs that are linked to the page at the given position, including
902     * the first page.
903     *
904     * @param position The position of the first page
905     * @param limit The maximum bytes to read. Set this value to -1 when the size is unknown.
906     * @return An array of pages
907     */
908    /*no qualifier*/PageIO[] readPageIOs( long position, long limit ) throws IOException, EndOfFileExceededException
909    {
910        LOG.debug( "Read PageIOs at position {}", position );
911
912        if ( limit <= 0 )
913        {
914            limit = Long.MAX_VALUE;
915        }
916
917        PageIO firstPage = fetchPage( position );
918        firstPage.setSize();
919        List<PageIO> listPages = new ArrayList<PageIO>();
920        listPages.add( firstPage );
921        long dataRead = pageSize - LONG_SIZE - INT_SIZE;
922
923        // Iterate on the pages, if needed
924        long nextPage = firstPage.getNextPage();
925
926        if ( ( dataRead < limit ) && ( nextPage != NO_PAGE ) )
927        {
928            while ( dataRead < limit )
929            {
930                PageIO page = fetchPage( nextPage );
931                listPages.add( page );
932                nextPage = page.getNextPage();
933                dataRead += pageSize - LONG_SIZE;
934
935                if ( nextPage == NO_PAGE )
936                {
937                    page.setNextPage( NO_PAGE );
938                    break;
939                }
940            }
941        }
942
943        LOG.debug( "Nb of PageIOs read : {}", listPages.size() );
944
945        // Return
946        return listPages.toArray( new PageIO[]
947            {} );
948    }
949
950
951    /**
952     * Check the offset to be sure it's a valid one :
953     * <ul>
954     * <li>It's >= 0</li>
955     * <li>It's below the end of the file</li>
956     * <li>It's a multipl of the pageSize
957     * </ul>
958     * @param offset The offset to check
959     * @throws InvalidOffsetException If the offset is not valid
960     */
961    /* no qualifier */void checkOffset( long offset )
962    {
963        if ( ( offset < 0 ) || ( offset > endOfFileOffset ) || ( ( offset % pageSize ) != 0 ) )
964        {
965            throw new InvalidOffsetException( "Bad Offset : " + offset );
966        }
967    }
968
969
970    /**
971     * Read a B-tree from the disk. The meta-data are at the given position in the list of pages.
972     * We load a B-tree in two steps : first, we load the B-tree header, then the common informations
973     *
974     * @param pageIos The list of pages containing the meta-data
975     * @param btree The B-tree we have to initialize
976     * @throws InstantiationException
977     * @throws IllegalAccessException
978     * @throws ClassNotFoundException
979     * @throws NoSuchFieldException
980     * @throws SecurityException
981     * @throws IllegalArgumentException
982     */
983    private <K, V> void loadBtree( PageIO[] pageIos, BTree<K, V> btree ) throws EndOfFileExceededException,
984        IOException, ClassNotFoundException, IllegalAccessException, InstantiationException, IllegalArgumentException,
985        SecurityException, NoSuchFieldException
986    {
987        loadBtree( pageIos, btree, null );
988    }
989
990
991    /**
992     * Read a B-tree from the disk. The meta-data are at the given position in the list of pages.
993     * We load a B-tree in two steps : first, we load the B-tree header, then the common informations
994     *
995     * @param pageIos The list of pages containing the meta-data
996     * @param btree The B-tree we have to initialize
997     * @throws InstantiationException
998     * @throws IllegalAccessException
999     * @throws ClassNotFoundException
1000     * @throws NoSuchFieldException
1001     * @throws SecurityException
1002     * @throws IllegalArgumentException
1003     */
1004    /* no qualifier */<K, V> void loadBtree( PageIO[] pageIos, BTree btree, BTree<K, V> parentBTree )
1005        throws EndOfFileExceededException,
1006        IOException, ClassNotFoundException, IllegalAccessException, InstantiationException, IllegalArgumentException,
1007        SecurityException, NoSuchFieldException
1008    {
1009        long dataPos = 0L;
1010
1011        // Process the B-tree header
1012        BTreeHeader<K, V> btreeHeader = new BTreeHeader<K, V>();
1013        btreeHeader.setBtree( btree );
1014
1015        // The BtreeHeader offset
1016        btreeHeader.setBTreeHeaderOffset( pageIos[0].getOffset() );
1017
1018        // The B-tree current revision
1019        long revision = readLong( pageIos, dataPos );
1020        btreeHeader.setRevision( revision );
1021        dataPos += LONG_SIZE;
1022
1023        // The nb elems in the tree
1024        long nbElems = readLong( pageIos, dataPos );
1025        btreeHeader.setNbElems( nbElems );
1026        dataPos += LONG_SIZE;
1027
1028        // The B-tree rootPage offset
1029        long rootPageOffset = readLong( pageIos, dataPos );
1030        btreeHeader.setRootPageOffset( rootPageOffset );
1031        dataPos += LONG_SIZE;
1032
1033        // The B-tree information offset
1034        long btreeInfoOffset = readLong( pageIos, dataPos );
1035
1036        // Now, process the common informations
1037        PageIO[] infoPageIos = readPageIOs( btreeInfoOffset, Long.MAX_VALUE );
1038        ( ( PersistedBTree<K, V> ) btree ).setBtreeInfoOffset( infoPageIos[0].getOffset() );
1039        dataPos = 0L;
1040
1041        // The B-tree page size
1042        int btreePageSize = readInt( infoPageIos, dataPos );
1043        BTreeFactory.setPageSize( btree, btreePageSize );
1044        dataPos += INT_SIZE;
1045
1046        // The tree name
1047        ByteBuffer btreeNameBytes = readBytes( infoPageIos, dataPos );
1048        dataPos += INT_SIZE + btreeNameBytes.limit();
1049        String btreeName = Strings.utf8ToString( btreeNameBytes );
1050        BTreeFactory.setName( btree, btreeName );
1051
1052        // The keySerializer FQCN
1053        ByteBuffer keySerializerBytes = readBytes( infoPageIos, dataPos );
1054        dataPos += INT_SIZE + keySerializerBytes.limit();
1055
1056        String keySerializerFqcn = "";
1057
1058        if ( keySerializerBytes != null )
1059        {
1060            keySerializerFqcn = Strings.utf8ToString( keySerializerBytes );
1061        }
1062
1063        BTreeFactory.setKeySerializer( btree, keySerializerFqcn );
1064
1065        // The valueSerialier FQCN
1066        ByteBuffer valueSerializerBytes = readBytes( infoPageIos, dataPos );
1067
1068        String valueSerializerFqcn = "";
1069        dataPos += INT_SIZE + valueSerializerBytes.limit();
1070
1071        if ( valueSerializerBytes != null )
1072        {
1073            valueSerializerFqcn = Strings.utf8ToString( valueSerializerBytes );
1074        }
1075
1076        BTreeFactory.setValueSerializer( btree, valueSerializerFqcn );
1077
1078        // The B-tree allowDuplicates flag
1079        int allowDuplicates = readInt( infoPageIos, dataPos );
1080        ( ( PersistedBTree<K, V> ) btree ).setAllowDuplicates( allowDuplicates != 0 );
1081        dataPos += INT_SIZE;
1082
1083        // Set the recordManager in the btree
1084        ( ( PersistedBTree<K, V> ) btree ).setRecordManager( this );
1085
1086        // Set the current revision to the one stored in the B-tree header
1087        // Here, we have to tell the BTree to keep this revision in the
1088        // btreeRevisions Map, thus the 'true' parameter at the end.
1089        ( ( PersistedBTree<K, V> ) btree ).storeRevision( btreeHeader, true );
1090
1091        // Now, init the B-tree
1092        ( ( PersistedBTree<K, V> ) btree ).init( parentBTree );
1093
1094        // Update the BtreeHeaders Maps
1095        currentBTreeHeaders.put( btree.getName(), ( ( PersistedBTree<K, V> ) btree ).getBtreeHeader() );
1096        newBTreeHeaders.put( btree.getName(), ( ( PersistedBTree<K, V> ) btree ).getBtreeHeader() );
1097
1098        // Read the rootPage pages on disk
1099        PageIO[] rootPageIos = readPageIOs( rootPageOffset, Long.MAX_VALUE );
1100
1101        Page<K, V> btreeRoot = readPage( btree, rootPageIos );
1102        BTreeFactory.setRecordManager( btree, this );
1103
1104        BTreeFactory.setRootPage( btree, btreeRoot );
1105    }
1106
1107
1108    /**
1109     * Deserialize a Page from a B-tree at a give position
1110     *
1111     * @param btree The B-tree we want to read a Page from
1112     * @param offset The position in the file for this page
1113     * @return The read page
1114     * @throws EndOfFileExceededException If we have reached the end of the file while reading the page
1115     */
1116    public <K, V> Page<K, V> deserialize( BTree<K, V> btree, long offset ) throws EndOfFileExceededException,
1117        IOException
1118    {
1119        checkOffset( offset );
1120        PageIO[] rootPageIos = readPageIOs( offset, Long.MAX_VALUE );
1121
1122        Page<K, V> page = readPage( btree, rootPageIos );
1123
1124        return page;
1125    }
1126
1127
1128    /**
1129     * Read a page from some PageIO for a given B-tree
1130     * @param btree The B-tree we want to read a page for
1131     * @param pageIos The PageIO containing the raw data
1132     * @return The read Page if successful
1133     * @throws IOException If the deserialization failed
1134     */
1135    private <K, V> Page<K, V> readPage( BTree<K, V> btree, PageIO[] pageIos ) throws IOException
1136    {
1137        // Deserialize the rootPage now
1138        long position = 0L;
1139
1140        // The revision
1141        long revision = readLong( pageIos, position );
1142        position += LONG_SIZE;
1143
1144        // The number of elements in the page
1145        int nbElems = readInt( pageIos, position );
1146        position += INT_SIZE;
1147
1148        // The size of the data containing the keys and values
1149        Page<K, V> page = null;
1150
1151        // Reads the bytes containing all the keys and values, if we have some
1152        // We read  big blog of data into  ByteBuffer, then we will process
1153        // this ByteBuffer
1154        ByteBuffer byteBuffer = readBytes( pageIos, position );
1155
1156        // Now, deserialize the data block. If the number of elements
1157        // is positive, it's a Leaf, otherwise it's a Node
1158        // Note that only a leaf can have 0 elements, and it's the root page then.
1159        if ( nbElems >= 0 )
1160        {
1161            // It's a leaf
1162            page = readLeafKeysAndValues( btree, nbElems, revision, byteBuffer, pageIos );
1163        }
1164        else
1165        {
1166            // It's a node
1167            page = readNodeKeysAndValues( btree, -nbElems, revision, byteBuffer, pageIos );
1168        }
1169
1170        ( ( AbstractPage<K, V> ) page ).setOffset( pageIos[0].getOffset() );
1171        if ( pageIos.length > 1 )
1172        {
1173            ( ( AbstractPage<K, V> ) page ).setLastOffset( pageIos[pageIos.length - 1].getOffset() );
1174        }
1175
1176        return page;
1177    }
1178
1179
1180    /**
1181     * Deserialize a Leaf from some PageIOs
1182     */
1183    private <K, V> PersistedLeaf<K, V> readLeafKeysAndValues( BTree<K, V> btree, int nbElems, long revision,
1184        ByteBuffer byteBuffer, PageIO[] pageIos )
1185    {
1186        // Its a leaf, create it
1187        PersistedLeaf<K, V> leaf = ( PersistedLeaf<K, V> ) BTreeFactory.createLeaf( btree, revision, nbElems );
1188
1189        // Store the page offset on disk
1190        leaf.setOffset( pageIos[0].getOffset() );
1191        leaf.setLastOffset( pageIos[pageIos.length - 1].getOffset() );
1192
1193        int[] keyLengths = new int[nbElems];
1194        int[] valueLengths = new int[nbElems];
1195
1196        boolean isNotSubTree = ( btree.getType() != BTreeTypeEnum.PERSISTED_SUB );
1197
1198        // Read each key and value
1199        for ( int i = 0; i < nbElems; i++ )
1200        {
1201            if ( isNotSubTree )
1202            {
1203                // Read the number of values
1204                int nbValues = byteBuffer.getInt();
1205                PersistedValueHolder<V> valueHolder = null;
1206
1207                if ( nbValues < 0 )
1208                {
1209                    // This is a sub-btree
1210                    byte[] btreeOffsetBytes = new byte[LONG_SIZE];
1211                    byteBuffer.get( btreeOffsetBytes );
1212
1213                    // Create the valueHolder. As the number of values is negative, we have to switch
1214                    // to a positive value but as we start at -1 for 0 value, add 1.
1215                    valueHolder = new PersistedValueHolder<V>( btree, 1 - nbValues, btreeOffsetBytes );
1216                }
1217                else
1218                {
1219                    // This is an array
1220                    // Read the value's array length
1221                    valueLengths[i] = byteBuffer.getInt();
1222
1223                    // This is an Array of values, read the byte[] associated with it
1224                    byte[] arrayBytes = new byte[valueLengths[i]];
1225                    byteBuffer.get( arrayBytes );
1226                    valueHolder = new PersistedValueHolder<V>( btree, nbValues, arrayBytes );
1227                }
1228
1229                BTreeFactory.setValue( btree, leaf, i, valueHolder );
1230            }
1231
1232            keyLengths[i] = byteBuffer.getInt();
1233            byte[] data = new byte[keyLengths[i]];
1234            byteBuffer.get( data );
1235            BTreeFactory.setKey( btree, leaf, i, data );
1236        }
1237
1238        return leaf;
1239    }
1240
1241
1242    /**
1243     * Deserialize a Node from some PageIos
1244     */
1245    private <K, V> PersistedNode<K, V> readNodeKeysAndValues( BTree<K, V> btree, int nbElems, long revision,
1246        ByteBuffer byteBuffer, PageIO[] pageIos ) throws IOException
1247    {
1248        PersistedNode<K, V> node = ( PersistedNode<K, V> ) BTreeFactory.createNode( btree, revision, nbElems );
1249
1250        // Read each value and key
1251        for ( int i = 0; i < nbElems; i++ )
1252        {
1253            // This is an Offset
1254            long offset = LongSerializer.INSTANCE.deserialize( byteBuffer );
1255            long lastOffset = LongSerializer.INSTANCE.deserialize( byteBuffer );
1256
1257            PersistedPageHolder<K, V> valueHolder = new PersistedPageHolder<K, V>( btree, null, offset, lastOffset );
1258            node.setValue( i, valueHolder );
1259
1260            // Read the key length
1261            int keyLength = byteBuffer.getInt();
1262
1263            int currentPosition = byteBuffer.position();
1264
1265            // and the key value
1266            K key = btree.getKeySerializer().deserialize( byteBuffer );
1267
1268            // Set the new position now
1269            byteBuffer.position( currentPosition + keyLength );
1270
1271            BTreeFactory.setKey( btree, node, i, key );
1272        }
1273
1274        // and read the last value, as it's a node
1275        long offset = LongSerializer.INSTANCE.deserialize( byteBuffer );
1276        long lastOffset = LongSerializer.INSTANCE.deserialize( byteBuffer );
1277
1278        PersistedPageHolder<K, V> valueHolder = new PersistedPageHolder<K, V>( btree, null, offset, lastOffset );
1279        node.setValue( nbElems, valueHolder );
1280
1281        return node;
1282    }
1283
1284
1285    /**
1286     * Read a byte[] from pages.
1287     *
1288     * @param pageIos The pages we want to read the byte[] from
1289     * @param position The position in the data stored in those pages
1290     * @return The byte[] we have read
1291     */
1292    /* no qualifier */ByteBuffer readBytes( PageIO[] pageIos, long position )
1293    {
1294        // Read the byte[] length first
1295        int length = readInt( pageIos, position );
1296        position += INT_SIZE;
1297
1298        // Compute the page in which we will store the data given the
1299        // current position
1300        int pageNb = computePageNb( position );
1301
1302        // Compute the position in the current page
1303        int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize;
1304
1305        // Check that the length is correct : it should fit in the provided pageIos
1306        int pageEnd = computePageNb( position + length );
1307
1308        if ( pageEnd > pageIos.length )
1309        {
1310            // This is wrong...
1311            LOG.error( "Wrong size : {}, it's larger than the number of provided pages {}", length, pageIos.length );
1312            throw new ArrayIndexOutOfBoundsException();
1313        }
1314
1315        ByteBuffer pageData = pageIos[pageNb].getData();
1316        int remaining = pageData.capacity() - pagePos;
1317
1318        if ( length == 0 )
1319        {
1320            // No bytes to read : return null;
1321            return null;
1322        }
1323        else
1324        {
1325            ByteBuffer bytes = ByteBuffer.allocate( length );
1326
1327            while ( length > 0 )
1328            {
1329                if ( length <= remaining )
1330                {
1331                    pageData.mark();
1332                    pageData.position( pagePos );
1333                    int oldLimit = pageData.limit();
1334                    pageData.limit( pagePos + length );
1335                    bytes.put( pageData );
1336                    pageData.limit( oldLimit );
1337                    pageData.reset();
1338                    bytes.rewind();
1339
1340                    return bytes;
1341                }
1342
1343                pageData.mark();
1344                pageData.position( pagePos );
1345                int oldLimit = pageData.limit();
1346                pageData.limit( pagePos + remaining );
1347                bytes.put( pageData );
1348                pageData.limit( oldLimit );
1349                pageData.reset();
1350                pageNb++;
1351                pagePos = LINK_SIZE;
1352                pageData = pageIos[pageNb].getData();
1353                length -= remaining;
1354                remaining = pageData.capacity() - pagePos;
1355            }
1356
1357            bytes.rewind();
1358
1359            return bytes;
1360        }
1361    }
1362
1363
1364    /**
1365     * Read an int from pages
1366     * @param pageIos The pages we want to read the int from
1367     * @param position The position in the data stored in those pages
1368     * @return The int we have read
1369     */
1370    /* no qualifier */int readInt( PageIO[] pageIos, long position )
1371    {
1372        // Compute the page in which we will store the data given the
1373        // current position
1374        int pageNb = computePageNb( position );
1375
1376        // Compute the position in the current page
1377        int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize;
1378
1379        ByteBuffer pageData = pageIos[pageNb].getData();
1380        int remaining = pageData.capacity() - pagePos;
1381        int value = 0;
1382
1383        if ( remaining >= INT_SIZE )
1384        {
1385            value = pageData.getInt( pagePos );
1386        }
1387        else
1388        {
1389            value = 0;
1390
1391            switch ( remaining )
1392            {
1393                case 3:
1394                    value += ( ( pageData.get( pagePos + 2 ) & 0x00FF ) << 8 );
1395                    // Fallthrough !!!
1396
1397                case 2:
1398                    value += ( ( pageData.get( pagePos + 1 ) & 0x00FF ) << 16 );
1399                    // Fallthrough !!!
1400
1401                case 1:
1402                    value += ( pageData.get( pagePos ) << 24 );
1403                    break;
1404            }
1405
1406            // Now deal with the next page
1407            pageData = pageIos[pageNb + 1].getData();
1408            pagePos = LINK_SIZE;
1409
1410            switch ( remaining )
1411            {
1412                case 1:
1413                    value += ( pageData.get( pagePos ) & 0x00FF ) << 16;
1414                    // fallthrough !!!
1415
1416                case 2:
1417                    value += ( pageData.get( pagePos + 2 - remaining ) & 0x00FF ) << 8;
1418                    // fallthrough !!!
1419
1420                case 3:
1421                    value += ( pageData.get( pagePos + 3 - remaining ) & 0x00FF );
1422                    break;
1423            }
1424        }
1425
1426        return value;
1427    }
1428
1429
1430    /**
1431     * Read a byte from pages
1432     * @param pageIos The pages we want to read the byte from
1433     * @param position The position in the data stored in those pages
1434     * @return The byte we have read
1435     */
1436    private byte readByte( PageIO[] pageIos, long position )
1437    {
1438        // Compute the page in which we will store the data given the
1439        // current position
1440        int pageNb = computePageNb( position );
1441
1442        // Compute the position in the current page
1443        int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize;
1444
1445        ByteBuffer pageData = pageIos[pageNb].getData();
1446        byte value = 0;
1447
1448        value = pageData.get( pagePos );
1449
1450        return value;
1451    }
1452
1453
1454    /**
1455     * Read a long from pages
1456     * @param pageIos The pages we want to read the long from
1457     * @param position The position in the data stored in those pages
1458     * @return The long we have read
1459     */
1460    /* no qualifier */long readLong( PageIO[] pageIos, long position )
1461    {
1462        // Compute the page in which we will store the data given the
1463        // current position
1464        int pageNb = computePageNb( position );
1465
1466        // Compute the position in the current page
1467        int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize;
1468
1469        ByteBuffer pageData = pageIos[pageNb].getData();
1470        int remaining = pageData.capacity() - pagePos;
1471        long value = 0L;
1472
1473        if ( remaining >= LONG_SIZE )
1474        {
1475            value = pageData.getLong( pagePos );
1476        }
1477        else
1478        {
1479            switch ( remaining )
1480            {
1481                case 7:
1482                    value += ( ( ( long ) pageData.get( pagePos + 6 ) & 0x00FF ) << 8 );
1483                    // Fallthrough !!!
1484
1485                case 6:
1486                    value += ( ( ( long ) pageData.get( pagePos + 5 ) & 0x00FF ) << 16 );
1487                    // Fallthrough !!!
1488
1489                case 5:
1490                    value += ( ( ( long ) pageData.get( pagePos + 4 ) & 0x00FF ) << 24 );
1491                    // Fallthrough !!!
1492
1493                case 4:
1494                    value += ( ( ( long ) pageData.get( pagePos + 3 ) & 0x00FF ) << 32 );
1495                    // Fallthrough !!!
1496
1497                case 3:
1498                    value += ( ( ( long ) pageData.get( pagePos + 2 ) & 0x00FF ) << 40 );
1499                    // Fallthrough !!!
1500
1501                case 2:
1502                    value += ( ( ( long ) pageData.get( pagePos + 1 ) & 0x00FF ) << 48 );
1503                    // Fallthrough !!!
1504
1505                case 1:
1506                    value += ( ( long ) pageData.get( pagePos ) << 56 );
1507                    break;
1508            }
1509
1510            // Now deal with the next page
1511            pageData = pageIos[pageNb + 1].getData();
1512            pagePos = LINK_SIZE;
1513
1514            switch ( remaining )
1515            {
1516                case 1:
1517                    value += ( ( long ) pageData.get( pagePos ) & 0x00FF ) << 48;
1518                    // fallthrough !!!
1519
1520                case 2:
1521                    value += ( ( long ) pageData.get( pagePos + 2 - remaining ) & 0x00FF ) << 40;
1522                    // fallthrough !!!
1523
1524                case 3:
1525                    value += ( ( long ) pageData.get( pagePos + 3 - remaining ) & 0x00FF ) << 32;
1526                    // fallthrough !!!
1527
1528                case 4:
1529                    value += ( ( long ) pageData.get( pagePos + 4 - remaining ) & 0x00FF ) << 24;
1530                    // fallthrough !!!
1531
1532                case 5:
1533                    value += ( ( long ) pageData.get( pagePos + 5 - remaining ) & 0x00FF ) << 16;
1534                    // fallthrough !!!
1535
1536                case 6:
1537                    value += ( ( long ) pageData.get( pagePos + 6 - remaining ) & 0x00FF ) << 8;
1538                    // fallthrough !!!
1539
1540                case 7:
1541                    value += ( ( long ) pageData.get( pagePos + 7 - remaining ) & 0x00FF );
1542                    break;
1543            }
1544        }
1545
1546        return value;
1547    }
1548
1549
1550    /**
1551     * Manage a B-tree. The btree will be added and managed by this RecordManager. We will create a
1552     * new RootPage for this added B-tree, which will contain no data.<br/>
1553     * This method is threadsafe.
1554     * Managing a btree is a matter of storing an reference to the managed B-tree in the B-tree Of B-trees.
1555     * We store a tuple of NameRevision (where revision is 0L) and a offset to the B-tree header.
1556     * At the same time, we keep a track of the managed B-trees in a Map.
1557     *
1558     * @param btree The new B-tree to manage.
1559     * @param treeType flag indicating if this is an internal tree
1560     *
1561     * @throws BTreeAlreadyManagedException If the B-tree is already managed
1562     * @throws IOException if there was a problem while accessing the file
1563     */
1564    public synchronized <K, V> void manage( BTree<K, V> btree ) throws BTreeAlreadyManagedException, IOException
1565    {
1566        beginTransaction();
1567
1568        try
1569        {
1570            LOG.debug( "Managing the btree {}", btree.getName() );
1571            BTreeFactory.setRecordManager( btree, this );
1572
1573            String name = btree.getName();
1574
1575            if ( managedBtrees.containsKey( name ) )
1576            {
1577                // There is already a B-tree with this name in the recordManager...
1578                LOG.error( "There is already a B-tree named '{}' managed by this recordManager", name );
1579                rollback();
1580                throw new BTreeAlreadyManagedException( name );
1581            }
1582
1583            // Now, write the B-tree informations
1584            long btreeInfoOffset = writeBtreeInfo( btree );
1585            BTreeHeader<K, V> btreeHeader = ( ( AbstractBTree<K, V> ) btree ).getBtreeHeader();
1586            ( ( PersistedBTree<K, V> ) btree ).setBtreeInfoOffset( btreeInfoOffset );
1587
1588            // Serialize the B-tree root page
1589            Page<K, V> rootPage = btreeHeader.getRootPage();
1590
1591            PageIO[] rootPageIos = serializePage( btree, btreeHeader.getRevision(), rootPage );
1592
1593            // Get the reference on the first page
1594            long rootPageOffset = rootPageIos[0].getOffset();
1595
1596            // Store the rootPageOffset into the Btree header and into the rootPage
1597            btreeHeader.setRootPageOffset( rootPageOffset );
1598            ( ( PersistedLeaf<K, V> ) rootPage ).setOffset( rootPageOffset );
1599
1600            LOG.debug( "Flushing the newly managed '{}' btree rootpage", btree.getName() );
1601            flushPages( rootPageIos );
1602
1603            // And the B-tree header
1604            long btreeHeaderOffset = writeBtreeHeader( btree, btreeHeader );
1605
1606            // Now, if this is a new B-tree, add it to the B-tree of B-trees
1607            // Add the btree into the map of managed B-trees
1608            managedBtrees.put( name, ( BTree<Object, Object> ) btree );
1609
1610            // And in the Map of currentBtreeHeaders and newBtreeHeaders
1611            currentBTreeHeaders.put( name, btreeHeader );
1612            newBTreeHeaders.put( name, btreeHeader );
1613
1614            // We can safely increment the number of managed B-trees
1615            nbBtree++;
1616
1617            // Create the new NameRevision
1618            NameRevision nameRevision = new NameRevision( name, 0L );
1619
1620            // Inject it into the B-tree of B-tree
1621            btreeOfBtrees.insert( nameRevision, btreeHeaderOffset );
1622            commit();
1623        }
1624        catch ( IOException ioe )
1625        {
1626            rollback();
1627            throw ioe;
1628        }
1629    }
1630
1631
1632    /**
1633     * Managing a btree is a matter of storing an reference to the managed B-tree in the B-tree Of B-trees.
1634     * We store a tuple of NameRevision (where revision is 0L) and a offset to the B-tree header.
1635     * At the same time, we keep a track of the managed B-trees in a Map.
1636     *
1637     * @param btree The new B-tree to manage.
1638     * @param treeType flag indicating if this is an internal tree
1639     *
1640     * @throws BTreeAlreadyManagedException If the B-tree is already managed
1641     * @throws IOException
1642     */
1643    public synchronized <K, V> void manageSubBtree( BTree<K, V> btree )
1644        throws BTreeAlreadyManagedException, IOException
1645    {
1646        LOG.debug( "Managing the sub-btree {}", btree.getName() );
1647        BTreeFactory.setRecordManager( btree, this );
1648
1649        String name = btree.getName();
1650
1651        if ( managedBtrees.containsKey( name ) )
1652        {
1653            // There is already a subB-tree with this name in the recordManager...
1654            LOG.error( "There is already a sub-B-tree named '{}' managed by this recordManager", name );
1655            throw new BTreeAlreadyManagedException( name );
1656        }
1657
1658        // Now, write the subB-tree informations
1659        long btreeInfoOffset = writeBtreeInfo( btree );
1660        BTreeHeader<K, V> btreeHeader = ( ( AbstractBTree<K, V> ) btree ).getBtreeHeader();
1661        ( ( PersistedBTree<K, V> ) btree ).setBtreeInfoOffset( btreeInfoOffset );
1662
1663        // Serialize the B-tree root page
1664        Page<K, V> rootPage = btreeHeader.getRootPage();
1665
1666        PageIO[] rootPageIos = serializePage( btree, btreeHeader.getRevision(), rootPage );
1667
1668        // Get the reference on the first page
1669        long rootPageOffset = rootPageIos[0].getOffset();
1670
1671        // Store the rootPageOffset into the Btree header and into the rootPage
1672        btreeHeader.setRootPageOffset( rootPageOffset );
1673
1674        ( ( AbstractPage<K, V> ) rootPage ).setOffset( rootPageOffset );
1675
1676        LOG.debug( "Flushing the newly managed '{}' btree rootpage", btree.getName() );
1677        flushPages( rootPageIos );
1678
1679        // And the B-tree header
1680        long btreeHeaderOffset = writeBtreeHeader( btree, btreeHeader );
1681
1682        // Now, if this is a new B-tree, add it to the B-tree of B-trees
1683        // Add the btree into the map of managed B-trees
1684        if ( ( btree.getType() != BTreeTypeEnum.BTREE_OF_BTREES ) && 
1685            ( btree.getType() != BTreeTypeEnum.COPIED_PAGES_BTREE ) && 
1686            ( btree.getType() != BTreeTypeEnum.PERSISTED_SUB ) )
1687        {
1688            managedBtrees.put( name, ( BTree<Object, Object> ) btree );
1689        }
1690
1691        // And in the Map of currentBtreeHeaders and newBtreeHeaders
1692        currentBTreeHeaders.put( name, btreeHeader );
1693        newBTreeHeaders.put( name, btreeHeader );
1694
1695        // Create the new NameRevision
1696        NameRevision nameRevision = new NameRevision( name, 0L );
1697
1698        // Inject it into the B-tree of B-tree
1699        if ( ( btree.getType() != BTreeTypeEnum.BTREE_OF_BTREES ) && 
1700            ( btree.getType() != BTreeTypeEnum.COPIED_PAGES_BTREE ) && 
1701            ( btree.getType() != BTreeTypeEnum.PERSISTED_SUB ) )
1702        {
1703            // We can safely increment the number of managed B-trees
1704            nbBtree++;
1705
1706            btreeOfBtrees.insert( nameRevision, btreeHeaderOffset );
1707        }
1708
1709        updateRecordManagerHeader();
1710    }
1711
1712
1713    /**
1714     * Serialize a new Page. It will contain the following data :<br/>
1715     * <ul>
1716     * <li>the revision : a long</li>
1717     * <li>the number of elements : an int (if <= 0, it's a Node, otherwise it's a Leaf)</li>
1718     * <li>the size of the values/keys when serialized
1719     * <li>the keys : an array of serialized keys</li>
1720     * <li>the values : an array of references to the children pageIO offset (stored as long)
1721     * if it's a Node, or a list of values if it's a Leaf</li>
1722     * <li></li>
1723     * </ul>
1724     *
1725     * @param revision The node revision
1726     * @param keys The keys to serialize
1727     * @param children The references to the children
1728     * @return An array of pages containing the serialized node
1729     * @throws IOException
1730     */
1731    private <K, V> PageIO[] serializePage( BTree<K, V> btree, long revision, Page<K, V> page ) throws IOException
1732    {
1733        int nbElems = page.getNbElems();
1734
1735        boolean isNotSubTree = ( btree.getType() != BTreeTypeEnum.PERSISTED_SUB );
1736
1737        if ( nbElems == 0 )
1738        {
1739            return serializeRootPage( revision );
1740        }
1741        else
1742        {
1743            // Prepare a list of byte[] that will contain the serialized page
1744            int nbBuffers = 1 + 1 + 1 + nbElems * 3;
1745            int dataSize = 0;
1746            int serializedSize = 0;
1747
1748            if ( page.isNode() )
1749            {
1750                // A Node has one more value to store
1751                nbBuffers++;
1752            }
1753
1754            // Now, we can create the list with the right size
1755            List<byte[]> serializedData = new ArrayList<byte[]>( nbBuffers );
1756
1757            // The revision
1758            byte[] buffer = LongSerializer.serialize( revision );
1759            serializedData.add( buffer );
1760            serializedSize += buffer.length;
1761
1762            // The number of elements
1763            // Make it a negative value if it's a Node
1764            int pageNbElems = nbElems;
1765
1766            if ( page.isNode() )
1767            {
1768                pageNbElems = -nbElems;
1769            }
1770
1771            buffer = IntSerializer.serialize( pageNbElems );
1772            serializedData.add( buffer );
1773            serializedSize += buffer.length;
1774
1775            // Iterate on the keys and values. We first serialize the value, then the key
1776            // until we are done with all of them. If we are serializing a page, we have
1777            // to serialize one more value
1778            for ( int pos = 0; pos < nbElems; pos++ )
1779            {
1780                // Start with the value
1781                if ( page.isNode() )
1782                {
1783                    dataSize += serializeNodeValue( ( PersistedNode<K, V> ) page, pos, serializedData );
1784                    dataSize += serializeNodeKey( ( PersistedNode<K, V> ) page, pos, serializedData );
1785                }
1786                else
1787                {
1788                    if ( isNotSubTree )
1789                    {
1790                        dataSize += serializeLeafValue( ( PersistedLeaf<K, V> ) page, pos, serializedData );
1791                    }
1792
1793                    dataSize += serializeLeafKey( ( PersistedLeaf<K, V> ) page, pos, serializedData );
1794                }
1795            }
1796
1797            // Nodes have one more value to serialize
1798            if ( page.isNode() )
1799            {
1800                dataSize += serializeNodeValue( ( PersistedNode<K, V> ) page, nbElems, serializedData );
1801            }
1802
1803            // Store the data size
1804            buffer = IntSerializer.serialize( dataSize );
1805            serializedData.add( 2, buffer );
1806            serializedSize += buffer.length;
1807
1808            serializedSize += dataSize;
1809
1810            // We are done. Allocate the pages we need to store the data
1811            PageIO[] pageIos = getFreePageIOs( serializedSize );
1812
1813            // And store the data into those pages
1814            long position = 0L;
1815
1816            for ( byte[] bytes : serializedData )
1817            {
1818                position = storeRaw( position, bytes, pageIos );
1819            }
1820
1821            return pageIos;
1822        }
1823    }
1824
1825
1826    /**
1827     * Serialize a Node's key
1828     */
1829    private <K, V> int serializeNodeKey( PersistedNode<K, V> node, int pos, List<byte[]> serializedData )
1830    {
1831        KeyHolder<K> holder = node.getKeyHolder( pos );
1832        byte[] buffer = ( ( PersistedKeyHolder<K> ) holder ).getRaw();
1833
1834        // We have to store the serialized key length
1835        byte[] length = IntSerializer.serialize( buffer.length );
1836        serializedData.add( length );
1837
1838        // And store the serialized key now if not null
1839        if ( buffer.length != 0 )
1840        {
1841            serializedData.add( buffer );
1842        }
1843
1844        return buffer.length + INT_SIZE;
1845    }
1846
1847
1848    /**
1849     * Serialize a Node's Value. We store the two offsets of the child page.
1850     */
1851    private <K, V> int serializeNodeValue( PersistedNode<K, V> node, int pos, List<byte[]> serializedData )
1852        throws IOException
1853    {
1854        // For a node, we just store the children's offsets
1855        Page<K, V> child = node.getReference( pos );
1856
1857        // The first offset
1858        byte[] buffer = LongSerializer.serialize( ( ( AbstractPage<K, V> ) child ).getOffset() );
1859        serializedData.add( buffer );
1860        int dataSize = buffer.length;
1861
1862        // The last offset
1863        buffer = LongSerializer.serialize( ( ( AbstractPage<K, V> ) child ).getLastOffset() );
1864        serializedData.add( buffer );
1865        dataSize += buffer.length;
1866
1867        return dataSize;
1868    }
1869
1870
1871    /**
1872     * Serialize a Leaf's key
1873     */
1874    private <K, V> int serializeLeafKey( PersistedLeaf<K, V> leaf, int pos, List<byte[]> serializedData )
1875    {
1876        int dataSize = 0;
1877        KeyHolder<K> keyHolder = leaf.getKeyHolder( pos );
1878        byte[] keyData = ( ( PersistedKeyHolder<K> ) keyHolder ).getRaw();
1879
1880        if ( keyData != null )
1881        {
1882            // We have to store the serialized key length
1883            byte[] length = IntSerializer.serialize( keyData.length );
1884            serializedData.add( length );
1885
1886            // And the key data
1887            serializedData.add( keyData );
1888            dataSize += keyData.length + INT_SIZE;
1889        }
1890        else
1891        {
1892            serializedData.add( IntSerializer.serialize( 0 ) );
1893            dataSize += INT_SIZE;
1894        }
1895
1896        return dataSize;
1897    }
1898
1899
1900    /**
1901     * Serialize a Leaf's Value.
1902     */
1903    private <K, V> int serializeLeafValue( PersistedLeaf<K, V> leaf, int pos, List<byte[]> serializedData )
1904        throws IOException
1905    {
1906        // The value can be an Array or a sub-btree, but we don't care
1907        // we just iterate on all the values
1908        ValueHolder<V> valueHolder = leaf.getValue( pos );
1909        int dataSize = 0;
1910        int nbValues = valueHolder.size();
1911
1912        if ( nbValues == 0 )
1913        {
1914            // No value.
1915            byte[] buffer = IntSerializer.serialize( nbValues );
1916            serializedData.add( buffer );
1917
1918            return buffer.length;
1919        }
1920
1921        if ( !valueHolder.isSubBtree() )
1922        {
1923            // Write the nb elements first
1924            byte[] buffer = IntSerializer.serialize( nbValues );
1925            serializedData.add( buffer );
1926            dataSize = INT_SIZE;
1927
1928            // We have a serialized value. Just flush it
1929            byte[] data = ( ( PersistedValueHolder<V> ) valueHolder ).getRaw();
1930            dataSize += data.length;
1931
1932            // Store the data size
1933            buffer = IntSerializer.serialize( data.length );
1934            serializedData.add( buffer );
1935            dataSize += INT_SIZE;
1936
1937            // and add the data if it's not 0
1938            if ( data.length > 0 )
1939            {
1940                serializedData.add( data );
1941            }
1942        }
1943        else
1944        {
1945            // Store the nbVlues as a negative number. We add 1 so that 0 is not confused with an Array value
1946            byte[] buffer = IntSerializer.serialize( -( nbValues + 1 ) );
1947            serializedData.add( buffer );
1948            dataSize += buffer.length;
1949
1950            // the B-tree offset
1951            buffer = LongSerializer.serialize( ( ( PersistedValueHolder<V> ) valueHolder ).getOffset() );
1952            serializedData.add( buffer );
1953            dataSize += buffer.length;
1954        }
1955
1956        return dataSize;
1957    }
1958
1959
1960    /**
1961     * Write a root page with no elements in it
1962     */
1963    private PageIO[] serializeRootPage( long revision ) throws IOException
1964    {
1965        // We will have 1 single page if we have no elements
1966        PageIO[] pageIos = new PageIO[1];
1967
1968        // This is either a new root page or a new page that will be filled later
1969        PageIO newPage = fetchNewPage();
1970
1971        // We need first to create a byte[] that will contain all the data
1972        // For the root page, this is easy, as we only have to store the revision,
1973        // and the number of elements, which is 0.
1974        long position = 0L;
1975
1976        position = store( position, revision, newPage );
1977        position = store( position, 0, newPage );
1978
1979        // Update the page size now
1980        newPage.setSize( ( int ) position );
1981
1982        // Insert the result into the array of PageIO
1983        pageIos[0] = newPage;
1984
1985        return pageIos;
1986    }
1987
1988
1989    /**
1990     * Update the RecordManager header, injecting the following data :
1991     *
1992     * <pre>
1993     * +---------------------+
1994     * | PageSize            | 4 bytes : The size of a physical page (default to 4096)
1995     * +---------------------+
1996     * | NbTree              | 4 bytes : The number of managed B-trees (at least 1)
1997     * +---------------------+
1998     * | FirstFree           | 8 bytes : The offset of the first free page
1999     * +---------------------+
2000     * | current BoB offset  | 8 bytes : The offset of the current B-tree of B-trees
2001     * +---------------------+
2002     * | previous BoB offset | 8 bytes : The offset of the previous B-tree of B-trees
2003     * +---------------------+
2004     * | current CP offset   | 8 bytes : The offset of the current CopiedPages B-tree
2005     * +---------------------+
2006     * | previous CP offset  | 8 bytes : The offset of the previous CopiedPages B-tree
2007     * +---------------------+
2008     * </pre>
2009     */
2010    public void updateRecordManagerHeader()
2011    {
2012        // The page size
2013        int position = writeData( RECORD_MANAGER_HEADER_BYTES, 0, pageSize );
2014
2015        // The number of managed B-tree
2016        position = writeData( RECORD_MANAGER_HEADER_BYTES, position, nbBtree );
2017
2018        // The first free page
2019        position = writeData( RECORD_MANAGER_HEADER_BYTES, position, firstFreePage );
2020
2021        // The offset of the current B-tree of B-trees
2022        position = writeData( RECORD_MANAGER_HEADER_BYTES, position, currentBtreeOfBtreesOffset );
2023
2024        // The offset of the copied pages B-tree
2025        position = writeData( RECORD_MANAGER_HEADER_BYTES, position, previousBtreeOfBtreesOffset );
2026
2027        // The offset of the current B-tree of B-trees
2028        position = writeData( RECORD_MANAGER_HEADER_BYTES, position, currentCopiedPagesBtreeOffset );
2029
2030        // The offset of the copied pages B-tree
2031        position = writeData( RECORD_MANAGER_HEADER_BYTES, position, previousCopiedPagesBtreeOffset );
2032
2033        // Write the RecordManager header on disk
2034        RECORD_MANAGER_HEADER_BUFFER.put( RECORD_MANAGER_HEADER_BYTES );
2035        RECORD_MANAGER_HEADER_BUFFER.flip();
2036
2037        LOG.debug( "Update RM header" );
2038
2039        if ( LOG_PAGES.isDebugEnabled() )
2040        {
2041            StringBuilder sb = new StringBuilder();
2042
2043            sb.append( "First free page     : 0x" ).append( Long.toHexString( firstFreePage ) ).append( "\n" );
2044            sb.append( "Current BOB header  : 0x" ).append( Long.toHexString( currentBtreeOfBtreesOffset ) ).append( "\n" );
2045            sb.append( "Previous BOB header : 0x" ).append( Long.toHexString( previousBtreeOfBtreesOffset ) ).append( "\n" );
2046            sb.append( "Current CPB header  : 0x" ).append( Long.toHexString( currentCopiedPagesBtreeOffset ) ).append( "\n" );
2047            sb.append( "Previous CPB header : 0x" ).append( Long.toHexString( previousCopiedPagesBtreeOffset ) ).append( "\n" );
2048
2049            if ( firstFreePage != NO_PAGE )
2050            {
2051                long freePage = firstFreePage;
2052                sb.append( "free pages list : " );
2053
2054                boolean isFirst = true;
2055
2056                while ( freePage != NO_PAGE )
2057                {
2058                    if ( isFirst )
2059                    {
2060                        isFirst = false;
2061                    }
2062                    else
2063                    {
2064                        sb.append( " -> " );
2065                    }
2066
2067                    sb.append( "0x" ).append( Long.toHexString( freePage ) );
2068
2069                    try
2070                    {
2071                        PageIO[] freePageIO = readPageIOs( freePage, 8 );
2072
2073                        freePage = freePageIO[0].getNextPage();
2074                    }
2075                    catch ( EndOfFileExceededException e )
2076                    {
2077                        // TODO Auto-generated catch block
2078                        e.printStackTrace();
2079                    }
2080                    catch ( IOException e )
2081                    {
2082                        // TODO Auto-generated catch block
2083                        e.printStackTrace();
2084                    }
2085                }
2086
2087            }
2088
2089            LOG_PAGES.debug( "Update RM Header : \n{}", sb.toString() );
2090        }
2091
2092        try
2093        {
2094
2095            Integer nbTxnStarted = context.get();
2096
2097            if ( ( nbTxnStarted == null ) || ( nbTxnStarted <= 1 ) )
2098            {
2099                //System.out.println( "Writing page at 0000" );
2100                writeCounter.put( 0L, writeCounter.containsKey( 0L ) ? writeCounter.get( 0L ) + 1 : 1 );
2101                fileChannel.write( RECORD_MANAGER_HEADER_BUFFER, 0 );
2102            }
2103        }
2104        catch ( IOException ioe )
2105        {
2106            throw new FileException( ioe.getMessage() );
2107        }
2108
2109        RECORD_MANAGER_HEADER_BUFFER.clear();
2110
2111        // Reset the old versions
2112        previousBtreeOfBtreesOffset = -1L;
2113        previousCopiedPagesBtreeOffset = -1L;
2114
2115        nbUpdateRMHeader.incrementAndGet();
2116    }
2117
2118
2119    /**
2120     * Update the RecordManager header, injecting the following data :
2121     *
2122     * <pre>
2123     * +---------------------+
2124     * | PageSize            | 4 bytes : The size of a physical page (default to 4096)
2125     * +---------------------+
2126     * | NbTree              | 4 bytes : The number of managed B-trees (at least 1)
2127     * +---------------------+
2128     * | FirstFree           | 8 bytes : The offset of the first free page
2129     * +---------------------+
2130     * | current BoB offset  | 8 bytes : The offset of the current B-tree of B-trees
2131     * +---------------------+
2132     * | previous BoB offset | 8 bytes : The offset of the previous B-tree of B-trees
2133     * +---------------------+
2134     * | current CP offset   | 8 bytes : The offset of the current CopiedPages B-tree
2135     * +---------------------+
2136     * | previous CP offset  | 8 bytes : The offset of the previous CopiedPages B-tree
2137     * +---------------------+
2138     * </pre>
2139     */
2140    public void updateRecordManagerHeader( long newBtreeOfBtreesOffset, long newCopiedPageBtreeOffset )
2141    {
2142        if ( newBtreeOfBtreesOffset != -1L )
2143        {
2144            previousBtreeOfBtreesOffset = currentBtreeOfBtreesOffset;
2145            currentBtreeOfBtreesOffset = newBtreeOfBtreesOffset;
2146        }
2147
2148        if ( newCopiedPageBtreeOffset != -1L )
2149        {
2150            previousCopiedPagesBtreeOffset = currentCopiedPagesBtreeOffset;
2151            currentCopiedPagesBtreeOffset = newCopiedPageBtreeOffset;
2152        }
2153    }
2154
2155
2156    /**
2157     * Inject an int into a byte[] at a given position.
2158     */
2159    private int writeData( byte[] buffer, int position, int value )
2160    {
2161        RECORD_MANAGER_HEADER_BYTES[position] = ( byte ) ( value >>> 24 );
2162        RECORD_MANAGER_HEADER_BYTES[position + 1] = ( byte ) ( value >>> 16 );
2163        RECORD_MANAGER_HEADER_BYTES[position + 2] = ( byte ) ( value >>> 8 );
2164        RECORD_MANAGER_HEADER_BYTES[position + 3] = ( byte ) ( value );
2165
2166        return position + 4;
2167    }
2168
2169
2170    /**
2171     * Inject a long into a byte[] at a given position.
2172     */
2173    private int writeData( byte[] buffer, int position, long value )
2174    {
2175        RECORD_MANAGER_HEADER_BYTES[position] = ( byte ) ( value >>> 56 );
2176        RECORD_MANAGER_HEADER_BYTES[position + 1] = ( byte ) ( value >>> 48 );
2177        RECORD_MANAGER_HEADER_BYTES[position + 2] = ( byte ) ( value >>> 40 );
2178        RECORD_MANAGER_HEADER_BYTES[position + 3] = ( byte ) ( value >>> 32 );
2179        RECORD_MANAGER_HEADER_BYTES[position + 4] = ( byte ) ( value >>> 24 );
2180        RECORD_MANAGER_HEADER_BYTES[position + 5] = ( byte ) ( value >>> 16 );
2181        RECORD_MANAGER_HEADER_BYTES[position + 6] = ( byte ) ( value >>> 8 );
2182        RECORD_MANAGER_HEADER_BYTES[position + 7] = ( byte ) ( value );
2183
2184        return position + 8;
2185    }
2186
2187
2188    /**
2189     * Add a new <btree, revision> tuple into the B-tree of B-trees.
2190     *
2191     * @param name The B-tree name
2192     * @param revision The B-tree revision
2193     * @param btreeHeaderOffset The B-tree offset
2194     * @throws IOException If the update failed
2195     */
2196    /* no qualifier */<K, V> void addInBtreeOfBtrees( String name, long revision, long btreeHeaderOffset )
2197        throws IOException
2198    {
2199        checkOffset( btreeHeaderOffset );
2200        NameRevision nameRevision = new NameRevision( name, revision );
2201
2202        btreeOfBtrees.insert( nameRevision, btreeHeaderOffset );
2203
2204        // Update the B-tree of B-trees offset
2205        currentBtreeOfBtreesOffset = getNewBTreeHeader( BTREE_OF_BTREES_NAME ).getBTreeHeaderOffset();
2206    }
2207
2208
2209    /**
2210     * Add a new <btree, revision> tuple into the CopiedPages B-tree.
2211     *
2212     * @param name The B-tree name
2213     * @param revision The B-tree revision
2214     * @param btreeHeaderOffset The B-tree offset
2215     * @throws IOException If the update failed
2216     */
2217    /* no qualifier */<K, V> void addInCopiedPagesBtree( String name, long revision, List<Page<K, V>> pages )
2218        throws IOException
2219    {
2220        RevisionName revisionName = new RevisionName( revision, name );
2221
2222        long[] pageOffsets = new long[pages.size()];
2223        int pos = 0;
2224
2225        for ( Page<K, V> page : pages )
2226        {
2227            pageOffsets[pos++] = ( ( AbstractPage<K, V> ) page ).getOffset();
2228        }
2229
2230        copiedPageBtree.insert( revisionName, pageOffsets );
2231
2232        // Update the CopiedPageBtree offset
2233        currentCopiedPagesBtreeOffset = ( ( AbstractBTree<RevisionName, long[]> ) copiedPageBtree ).getBtreeHeader().getBTreeHeaderOffset();
2234    }
2235
2236
2237    /**
2238     * Internal method used to update the B-tree of B-trees offset
2239     * @param btreeOfBtreesOffset The new offset
2240     */
2241    /* no qualifier */void setBtreeOfBtreesOffset( long btreeOfBtreesOffset )
2242    {
2243        checkOffset( btreeOfBtreesOffset );
2244        this.currentBtreeOfBtreesOffset = btreeOfBtreesOffset;
2245    }
2246
2247
2248    /**
2249     * Write the B-tree header on disk. We will write the following informations :
2250     * <pre>
2251     * +------------+
2252     * | revision   | The B-tree revision
2253     * +------------+
2254     * | nbElems    | The B-tree number of elements
2255     * +------------+
2256     * | rootPage   | The root page offset
2257     * +------------+
2258     * | BtreeInfo  | The B-tree info offset
2259     * +------------+
2260     * </pre>
2261     * @param btree The B-tree which header has to be written
2262     * @param btreeInfoOffset The offset of the B-tree informations
2263     * @return The B-tree header offset
2264     * @throws IOException If we weren't able to write the B-tree header
2265     */
2266    /* no qualifier */<K, V> long writeBtreeHeader( BTree<K, V> btree, BTreeHeader<K, V> btreeHeader )
2267        throws IOException
2268    {
2269        int bufferSize =
2270            LONG_SIZE + // The revision
2271                LONG_SIZE + // the number of element
2272                LONG_SIZE + // The root page offset
2273                LONG_SIZE; // The B-tree info page offset
2274
2275        // Get the pageIOs we need to store the data. We may need more than one.
2276        PageIO[] btreeHeaderPageIos = getFreePageIOs( bufferSize );
2277
2278        // Store the B-tree header Offset into the B-tree
2279        long btreeHeaderOffset = btreeHeaderPageIos[0].getOffset();
2280
2281        // Now store the B-tree data in the pages :
2282        // - the B-tree revision
2283        // - the B-tree number of elements
2284        // - the B-tree root page offset
2285        // - the B-tree info page offset
2286        // Starts at 0
2287        long position = 0L;
2288
2289        // The B-tree current revision
2290        position = store( position, btreeHeader.getRevision(), btreeHeaderPageIos );
2291
2292        // The nb elems in the tree
2293        position = store( position, btreeHeader.getNbElems(), btreeHeaderPageIos );
2294
2295        // Now, we can inject the B-tree rootPage offset into the B-tree header
2296        position = store( position, btreeHeader.getRootPageOffset(), btreeHeaderPageIos );
2297
2298        // The B-tree info page offset
2299        position = store( position, ( ( PersistedBTree<K, V> ) btree ).getBtreeInfoOffset(), btreeHeaderPageIos );
2300
2301        // And flush the pages to disk now
2302        LOG.debug( "Flushing the newly managed '{}' btree header", btree.getName() );
2303
2304        if ( LOG_PAGES.isDebugEnabled() )
2305        {
2306            LOG_PAGES.debug( "Writing BTreeHeader revision {} for {}", btreeHeader.getRevision(), btree.getName() );
2307            StringBuilder sb = new StringBuilder();
2308
2309            sb.append( "Offset : " ).append( Long.toHexString( btreeHeaderOffset ) ).append( "\n" );
2310            sb.append( "    Revision : " ).append( btreeHeader.getRevision() ).append( "\n" );
2311            sb.append( "    NbElems  : " ).append( btreeHeader.getNbElems() ).append( "\n" );
2312            sb.append( "    RootPage : 0x" ).append( Long.toHexString( btreeHeader.getRootPageOffset() ) )
2313                .append( "\n" );
2314            sb.append( "    Info     : 0x" )
2315                .append( Long.toHexString( ( ( PersistedBTree<K, V> ) btree ).getBtreeInfoOffset() ) ).append( "\n" );
2316
2317            LOG_PAGES.debug( "Btree Header[{}]\n{}", btreeHeader.getRevision(), sb.toString() );
2318        }
2319
2320        flushPages( btreeHeaderPageIos );
2321
2322        btreeHeader.setBTreeHeaderOffset( btreeHeaderOffset );
2323
2324        return btreeHeaderOffset;
2325    }
2326
2327
2328    /**
2329     * Write the B-tree informations on disk. We will write the following informations :
2330     * <pre>
2331     * +------------+
2332     * | pageSize   | The B-tree page size (ie, the number of elements per page max)
2333     * +------------+
2334     * | nameSize   | The B-tree name size
2335     * +------------+
2336     * | name       | The B-tree name
2337     * +------------+
2338     * | keySerSize | The keySerializer FQCN size
2339     * +------------+
2340     * | keySerFQCN | The keySerializer FQCN
2341     * +------------+
2342     * | valSerSize | The Value serializer FQCN size
2343     * +------------+
2344     * | valSerKQCN | The valueSerializer FQCN
2345     * +------------+
2346     * | dups       | The flags that tell if the dups are allowed
2347     * +------------+
2348     * </pre>
2349     * @param btree The B-tree which header has to be written
2350     * @return The B-tree header offset
2351     * @throws IOException If we weren't able to write the B-tree header
2352     */
2353    private <K, V> long writeBtreeInfo( BTree<K, V> btree ) throws IOException
2354    {
2355        // We will add the newly managed B-tree at the end of the header.
2356        byte[] btreeNameBytes = Strings.getBytesUtf8( btree.getName() );
2357        byte[] keySerializerBytes = Strings.getBytesUtf8( btree.getKeySerializerFQCN() );
2358        byte[] valueSerializerBytes = Strings.getBytesUtf8( btree.getValueSerializerFQCN() );
2359
2360        int bufferSize =
2361            INT_SIZE + // The page size
2362                INT_SIZE + // The name size
2363                btreeNameBytes.length + // The name
2364                INT_SIZE + // The keySerializerBytes size
2365                keySerializerBytes.length + // The keySerializerBytes
2366                INT_SIZE + // The valueSerializerBytes size
2367                valueSerializerBytes.length + // The valueSerializerBytes
2368                INT_SIZE; // The allowDuplicates flag
2369
2370        // Get the pageIOs we need to store the data. We may need more than one.
2371        PageIO[] btreeHeaderPageIos = getFreePageIOs( bufferSize );
2372
2373        // Keep the B-tree header Offset into the B-tree
2374        long btreeInfoOffset = btreeHeaderPageIos[0].getOffset();
2375
2376        // Now store the B-tree information data in the pages :
2377        // - the B-tree page size
2378        // - the B-tree name
2379        // - the keySerializer FQCN
2380        // - the valueSerializer FQCN
2381        // - the flags that tell if the dups are allowed
2382        // Starts at 0
2383        long position = 0L;
2384
2385        // The B-tree page size
2386        position = store( position, btree.getPageSize(), btreeHeaderPageIos );
2387
2388        // The tree name
2389        position = store( position, btreeNameBytes, btreeHeaderPageIos );
2390
2391        // The keySerializer FQCN
2392        position = store( position, keySerializerBytes, btreeHeaderPageIos );
2393
2394        // The valueSerialier FQCN
2395        position = store( position, valueSerializerBytes, btreeHeaderPageIos );
2396
2397        // The allowDuplicates flag
2398        position = store( position, ( btree.isAllowDuplicates() ? 1 : 0 ), btreeHeaderPageIos );
2399
2400        // And flush the pages to disk now
2401        LOG.debug( "Flushing the newly managed '{}' btree header", btree.getName() );
2402        flushPages( btreeHeaderPageIos );
2403
2404        return btreeInfoOffset;
2405    }
2406
2407
2408    /**
2409     * Update the B-tree header after a B-tree modification. This will make the latest modification
2410     * visible.<br/>
2411     * We update the following fields :
2412     * <ul>
2413     * <li>the revision</li>
2414     * <li>the number of elements</li>
2415     * <li>the B-tree root page offset</li>
2416     * </ul>
2417     * <br/>
2418     * As a result, a new version of the BtreHeader will be created, which will replace the previous
2419     * B-tree header
2420     * @param btree TheB-tree to update
2421     * @param btreeHeaderOffset The offset of the modified btree header
2422     * @return The offset of the new B-tree Header
2423     * @throws IOException If we weren't able to write the file on disk
2424     * @throws EndOfFileExceededException If we tried to write after the end of the file
2425     */
2426    /* no qualifier */<K, V> long updateBtreeHeader( BTree<K, V> btree, long btreeHeaderOffset )
2427        throws EndOfFileExceededException, IOException
2428    {
2429        return updateBtreeHeader( btree, btreeHeaderOffset, false );
2430    }
2431
2432
2433    /**
2434     * Update the B-tree header after a B-tree modification. This will make the latest modification
2435     * visible.<br/>
2436     * We update the following fields :
2437     * <ul>
2438     * <li>the revision</li>
2439     * <li>the number of elements</li>
2440     * <li>the reference to the current B-tree revisions</li>
2441     * <li>the reference to the old B-tree revisions</li>
2442     * </ul>
2443     * <br/>
2444     * As a result, we new version of the BtreHeader will be created
2445     * @param btree The B-tree to update
2446     * @param btreeHeaderOffset The offset of the modified btree header
2447     * @return The offset of the new B-tree Header if it has changed (ie, when the onPlace flag is set to true)
2448     * @throws IOException
2449     * @throws EndOfFileExceededException
2450     */
2451    /* no qualifier */<K, V> void updateBtreeHeaderOnPlace( BTree<K, V> btree, long btreeHeaderOffset )
2452        throws EndOfFileExceededException,
2453        IOException
2454    {
2455        updateBtreeHeader( btree, btreeHeaderOffset, true );
2456    }
2457
2458
2459    /**
2460     * Update the B-tree header after a B-tree modification. This will make the latest modification
2461     * visible.<br/>
2462     * We update the following fields :
2463     * <ul>
2464     * <li>the revision</li>
2465     * <li>the number of elements</li>
2466     * <li>the reference to the current B-tree revisions</li>
2467     * <li>the reference to the old B-tree revisions</li>
2468     * </ul>
2469     * <br/>
2470     * As a result, a new version of the BtreHeader will be created, which may replace the previous
2471     * B-tree header (if the onPlace flag is set to true) or a new set of pageIos will contain the new
2472     * version.
2473     *
2474     * @param btree The B-tree to update
2475     * @param rootPageOffset The offset of the modified rootPage
2476     * @param onPlace Tells if we modify the B-tree on place, or if we create a copy
2477     * @return The offset of the new B-tree Header if it has changed (ie, when the onPlace flag is set to true)
2478     * @throws EndOfFileExceededException If we tried to write after the end of the file
2479     * @throws IOException If tehre were some error while writing the data on disk
2480     */
2481    private <K, V> long updateBtreeHeader( BTree<K, V> btree, long btreeHeaderOffset, boolean onPlace )
2482        throws EndOfFileExceededException, IOException
2483    {
2484        // Read the pageIOs associated with this B-tree
2485        PageIO[] pageIos;
2486        long newBtreeHeaderOffset = NO_PAGE;
2487        long offset = ( ( PersistedBTree<K, V> ) btree ).getBtreeOffset();
2488
2489        if ( onPlace )
2490        {
2491            // We just have to update the existing BTreeHeader
2492            long headerSize = LONG_SIZE + LONG_SIZE + LONG_SIZE;
2493
2494            pageIos = readPageIOs( offset, headerSize );
2495
2496            // Now, update the revision
2497            long position = 0;
2498
2499            position = store( position, btree.getRevision(), pageIos );
2500            position = store( position, btree.getNbElems(), pageIos );
2501            position = store( position, btreeHeaderOffset, pageIos );
2502
2503            // Write the pages on disk
2504            if ( LOG.isDebugEnabled() )
2505            {
2506                LOG.debug( "-----> Flushing the '{}' B-treeHeader", btree.getName() );
2507                LOG.debug( "  revision : " + btree.getRevision() + ", NbElems : " + btree.getNbElems()
2508                    + ", btreeHeader offset : 0x"
2509                    + Long.toHexString( btreeHeaderOffset ) );
2510            }
2511
2512            // Get new place on disk to store the modified BTreeHeader if it's not onPlace
2513            // Rewrite the pages at the same place
2514            LOG.debug( "Rewriting the B-treeHeader on place for B-tree " + btree.getName() );
2515            flushPages( pageIos );
2516        }
2517        else
2518        {
2519            // We have to read and copy the existing BTreeHeader and to create a new one
2520            pageIos = readPageIOs( offset, Long.MAX_VALUE );
2521
2522            // Now, copy every read page
2523            PageIO[] newPageIOs = new PageIO[pageIos.length];
2524            int pos = 0;
2525
2526            for ( PageIO pageIo : pageIos )
2527            {
2528                // Fetch a free page
2529                newPageIOs[pos] = fetchNewPage();
2530
2531                // keep a track of the allocated and copied pages so that we can
2532                // free them when we do a commit or rollback, if the btree is an management one
2533                if ( ( btree.getType() == BTreeTypeEnum.BTREE_OF_BTREES )
2534                    || ( btree.getType() == BTreeTypeEnum.COPIED_PAGES_BTREE ) )
2535                {
2536                    freedPages.add( pageIo );
2537                    allocatedPages.add( newPageIOs[pos] );
2538                }
2539
2540                pageIo.copy( newPageIOs[pos] );
2541
2542                if ( pos > 0 )
2543                {
2544                    newPageIOs[pos - 1].setNextPage( newPageIOs[pos].getOffset() );
2545                }
2546
2547                pos++;
2548            }
2549
2550            // store the new btree header offset
2551            // and update the revision
2552            long position = 0;
2553
2554            position = store( position, btree.getRevision(), newPageIOs );
2555            position = store( position, btree.getNbElems(), newPageIOs );
2556            position = store( position, btreeHeaderOffset, newPageIOs );
2557
2558            // Get new place on disk to store the modified BTreeHeader if it's not onPlace
2559            // Flush the new B-treeHeader on disk
2560            LOG.debug( "Rewriting the B-treeHeader on place for B-tree " + btree.getName() );
2561            flushPages( newPageIOs );
2562
2563            newBtreeHeaderOffset = newPageIOs[0].getOffset();
2564        }
2565
2566        nbUpdateBtreeHeader.incrementAndGet();
2567
2568        if ( LOG_CHECK.isDebugEnabled() )
2569        {
2570            MavibotInspector.check( this );
2571        }
2572
2573        return newBtreeHeaderOffset;
2574    }
2575
2576
2577    /**
2578     * Write the pages on disk, either at the end of the file, or at
2579     * the position they were taken from.
2580     *
2581     * @param pageIos The list of pages to write
2582     * @throws IOException If the write failed
2583     */
2584    private void flushPages( PageIO... pageIos ) throws IOException
2585    {
2586        if ( LOG.isDebugEnabled() )
2587        {
2588            for ( PageIO pageIo : pageIos )
2589            {
2590                dump( pageIo );
2591            }
2592        }
2593
2594        for ( PageIO pageIo : pageIos )
2595        {
2596            pageIo.getData().rewind();
2597            long pos = pageIo.getOffset();
2598
2599            if ( fileChannel.size() < ( pageIo.getOffset() + pageSize ) )
2600            {
2601                LOG.debug( "Adding a page at the end of the file" );
2602                // This is a page we have to add to the file
2603                pos = fileChannel.size();
2604                fileChannel.write( pageIo.getData(), pos );
2605                //fileChannel.force( false );
2606            }
2607            else
2608            {
2609                LOG.debug( "Writing a page at position {}", pageIo.getOffset() );
2610                fileChannel.write( pageIo.getData(), pageIo.getOffset() );
2611                //fileChannel.force( false );
2612            }
2613
2614            //System.out.println( "Writing page at " + Long.toHexString( pos ) );
2615            writeCounter.put( pos, writeCounter.containsKey( pos ) ? writeCounter.get( pos ) + 1 : 1 );
2616
2617            nbUpdatePageIOs.incrementAndGet();
2618
2619            pageIo.getData().rewind();
2620        }
2621    }
2622
2623
2624    /**
2625     * Compute the page in which we will store data given an offset, when
2626     * we have a list of pages.
2627     *
2628     * @param offset The position in the data
2629     * @return The page number in which the offset will start
2630     */
2631    private int computePageNb( long offset )
2632    {
2633        long pageNb = 0;
2634
2635        offset -= pageSize - LINK_SIZE - PAGE_SIZE;
2636
2637        if ( offset < 0 )
2638        {
2639            return ( int ) pageNb;
2640        }
2641
2642        pageNb = 1 + offset / ( pageSize - LINK_SIZE );
2643
2644        return ( int ) pageNb;
2645    }
2646
2647
2648    /**
2649     * Stores a byte[] into one ore more pageIO (depending if the long is stored
2650     * across a boundary or not)
2651     *
2652     * @param position The position in a virtual byte[] if all the pages were contiguous
2653     * @param bytes The byte[] to serialize
2654     * @param pageIos The pageIOs we have to store the data in
2655     * @return The new offset
2656     */
2657    private long store( long position, byte[] bytes, PageIO... pageIos )
2658    {
2659        if ( bytes != null )
2660        {
2661            // Write the bytes length
2662            position = store( position, bytes.length, pageIos );
2663
2664            // Compute the page in which we will store the data given the
2665            // current position
2666            int pageNb = computePageNb( position );
2667
2668            // Get back the buffer in this page
2669            ByteBuffer pageData = pageIos[pageNb].getData();
2670
2671            // Compute the position in the current page
2672            int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize;
2673
2674            // Compute the remaining size in the page
2675            int remaining = pageData.capacity() - pagePos;
2676            int nbStored = bytes.length;
2677
2678            // And now, write the bytes until we have none
2679            while ( nbStored > 0 )
2680            {
2681                if ( remaining > nbStored )
2682                {
2683                    pageData.mark();
2684                    pageData.position( pagePos );
2685                    pageData.put( bytes, bytes.length - nbStored, nbStored );
2686                    pageData.reset();
2687                    nbStored = 0;
2688                }
2689                else
2690                {
2691                    pageData.mark();
2692                    pageData.position( pagePos );
2693                    pageData.put( bytes, bytes.length - nbStored, remaining );
2694                    pageData.reset();
2695                    pageNb++;
2696                    pageData = pageIos[pageNb].getData();
2697                    pagePos = LINK_SIZE;
2698                    nbStored -= remaining;
2699                    remaining = pageData.capacity() - pagePos;
2700                }
2701            }
2702
2703            // We are done
2704            position += bytes.length;
2705        }
2706        else
2707        {
2708            // No bytes : write 0 and return
2709            position = store( position, 0, pageIos );
2710        }
2711
2712        return position;
2713    }
2714
2715
2716    /**
2717     * Stores a byte[] into one ore more pageIO (depending if the long is stored
2718     * across a boundary or not). We don't add the byte[] size, it's already present
2719     * in the received byte[].
2720     *
2721     * @param position The position in a virtual byte[] if all the pages were contiguous
2722     * @param bytes The byte[] to serialize
2723     * @param pageIos The pageIOs we have to store the data in
2724     * @return The new offset
2725     */
2726    private long storeRaw( long position, byte[] bytes, PageIO... pageIos )
2727    {
2728        if ( bytes != null )
2729        {
2730            // Compute the page in which we will store the data given the
2731            // current position
2732            int pageNb = computePageNb( position );
2733
2734            // Get back the buffer in this page
2735            ByteBuffer pageData = pageIos[pageNb].getData();
2736
2737            // Compute the position in the current page
2738            int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize;
2739
2740            // Compute the remaining size in the page
2741            int remaining = pageData.capacity() - pagePos;
2742            int nbStored = bytes.length;
2743
2744            // And now, write the bytes until we have none
2745            while ( nbStored > 0 )
2746            {
2747                if ( remaining > nbStored )
2748                {
2749                    pageData.mark();
2750                    pageData.position( pagePos );
2751                    pageData.put( bytes, bytes.length - nbStored, nbStored );
2752                    pageData.reset();
2753                    nbStored = 0;
2754                }
2755                else
2756                {
2757                    pageData.mark();
2758                    pageData.position( pagePos );
2759                    pageData.put( bytes, bytes.length - nbStored, remaining );
2760                    pageData.reset();
2761                    pageNb++;
2762
2763                    if ( pageNb == pageIos.length )
2764                    {
2765                        // We can stop here : we have reach the end of the page
2766                        break;
2767                    }
2768
2769                    pageData = pageIos[pageNb].getData();
2770                    pagePos = LINK_SIZE;
2771                    nbStored -= remaining;
2772                    remaining = pageData.capacity() - pagePos;
2773                }
2774            }
2775
2776            // We are done
2777            position += bytes.length;
2778        }
2779        else
2780        {
2781            // No bytes : write 0 and return
2782            position = store( position, 0, pageIos );
2783        }
2784
2785        return position;
2786    }
2787
2788
2789    /**
2790     * Stores an Integer into one ore more pageIO (depending if the int is stored
2791     * across a boundary or not)
2792     *
2793     * @param position The position in a virtual byte[] if all the pages were contiguous
2794     * @param value The int to serialize
2795     * @param pageIos The pageIOs we have to store the data in
2796     * @return The new offset
2797     */
2798    private long store( long position, int value, PageIO... pageIos )
2799    {
2800        // Compute the page in which we will store the data given the
2801        // current position
2802        int pageNb = computePageNb( position );
2803
2804        // Compute the position in the current page
2805        int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize;
2806
2807        // Get back the buffer in this page
2808        ByteBuffer pageData = pageIos[pageNb].getData();
2809
2810        // Compute the remaining size in the page
2811        int remaining = pageData.capacity() - pagePos;
2812
2813        if ( remaining < INT_SIZE )
2814        {
2815            // We have to copy the serialized length on two pages
2816
2817            switch ( remaining )
2818            {
2819                case 3:
2820                    pageData.put( pagePos + 2, ( byte ) ( value >>> 8 ) );
2821                    // Fallthrough !!!
2822
2823                case 2:
2824                    pageData.put( pagePos + 1, ( byte ) ( value >>> 16 ) );
2825                    // Fallthrough !!!
2826
2827                case 1:
2828                    pageData.put( pagePos, ( byte ) ( value >>> 24 ) );
2829                    break;
2830            }
2831
2832            // Now deal with the next page
2833            pageData = pageIos[pageNb + 1].getData();
2834            pagePos = LINK_SIZE;
2835
2836            switch ( remaining )
2837            {
2838                case 1:
2839                    pageData.put( pagePos, ( byte ) ( value >>> 16 ) );
2840                    // fallthrough !!!
2841
2842                case 2:
2843                    pageData.put( pagePos + 2 - remaining, ( byte ) ( value >>> 8 ) );
2844                    // fallthrough !!!
2845
2846                case 3:
2847                    pageData.put( pagePos + 3 - remaining, ( byte ) ( value ) );
2848                    break;
2849            }
2850        }
2851        else
2852        {
2853            // Store the value in the page at the selected position
2854            pageData.putInt( pagePos, value );
2855        }
2856
2857        // Increment the position to reflect the addition of an Int (4 bytes)
2858        position += INT_SIZE;
2859
2860        return position;
2861    }
2862
2863
2864    /**
2865     * Stores a Long into one ore more pageIO (depending if the long is stored
2866     * across a boundary or not)
2867     *
2868     * @param position The position in a virtual byte[] if all the pages were contiguous
2869     * @param value The long to serialize
2870     * @param pageIos The pageIOs we have to store the data in
2871     * @return The new offset
2872     */
2873    private long store( long position, long value, PageIO... pageIos )
2874    {
2875        // Compute the page in which we will store the data given the
2876        // current position
2877        int pageNb = computePageNb( position );
2878
2879        // Compute the position in the current page
2880        int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize;
2881
2882        // Get back the buffer in this page
2883        ByteBuffer pageData = pageIos[pageNb].getData();
2884
2885        // Compute the remaining size in the page
2886        int remaining = pageData.capacity() - pagePos;
2887
2888        if ( remaining < LONG_SIZE )
2889        {
2890            // We have to copy the serialized length on two pages
2891
2892            switch ( remaining )
2893            {
2894                case 7:
2895                    pageData.put( pagePos + 6, ( byte ) ( value >>> 8 ) );
2896                    // Fallthrough !!!
2897
2898                case 6:
2899                    pageData.put( pagePos + 5, ( byte ) ( value >>> 16 ) );
2900                    // Fallthrough !!!
2901
2902                case 5:
2903                    pageData.put( pagePos + 4, ( byte ) ( value >>> 24 ) );
2904                    // Fallthrough !!!
2905
2906                case 4:
2907                    pageData.put( pagePos + 3, ( byte ) ( value >>> 32 ) );
2908                    // Fallthrough !!!
2909
2910                case 3:
2911                    pageData.put( pagePos + 2, ( byte ) ( value >>> 40 ) );
2912                    // Fallthrough !!!
2913
2914                case 2:
2915                    pageData.put( pagePos + 1, ( byte ) ( value >>> 48 ) );
2916                    // Fallthrough !!!
2917
2918                case 1:
2919                    pageData.put( pagePos, ( byte ) ( value >>> 56 ) );
2920                    break;
2921            }
2922
2923            // Now deal with the next page
2924            pageData = pageIos[pageNb + 1].getData();
2925            pagePos = LINK_SIZE;
2926
2927            switch ( remaining )
2928            {
2929                case 1:
2930                    pageData.put( pagePos, ( byte ) ( value >>> 48 ) );
2931                    // fallthrough !!!
2932
2933                case 2:
2934                    pageData.put( pagePos + 2 - remaining, ( byte ) ( value >>> 40 ) );
2935                    // fallthrough !!!
2936
2937                case 3:
2938                    pageData.put( pagePos + 3 - remaining, ( byte ) ( value >>> 32 ) );
2939                    // fallthrough !!!
2940
2941                case 4:
2942                    pageData.put( pagePos + 4 - remaining, ( byte ) ( value >>> 24 ) );
2943                    // fallthrough !!!
2944
2945                case 5:
2946                    pageData.put( pagePos + 5 - remaining, ( byte ) ( value >>> 16 ) );
2947                    // fallthrough !!!
2948
2949                case 6:
2950                    pageData.put( pagePos + 6 - remaining, ( byte ) ( value >>> 8 ) );
2951                    // fallthrough !!!
2952
2953                case 7:
2954                    pageData.put( pagePos + 7 - remaining, ( byte ) ( value ) );
2955                    break;
2956            }
2957        }
2958        else
2959        {
2960            // Store the value in the page at the selected position
2961            pageData.putLong( pagePos, value );
2962        }
2963
2964        // Increment the position to reflect the addition of an Long (8 bytes)
2965        position += LONG_SIZE;
2966
2967        return position;
2968    }
2969
2970
2971    /**
2972     * Write the page in a serialized form.
2973     *
2974     * @param btree The persistedBtree we will create a new PageHolder for
2975     * @param newPage The page to write on disk
2976     * @param newRevision The page's revision
2977     * @return A PageHolder containing the copied page
2978     * @throws IOException If the page can't be written on disk
2979     */
2980    /* No qualifier*/<K, V> PageHolder<K, V> writePage( BTree<K, V> btree, Page<K, V> newPage,
2981        long newRevision ) throws IOException
2982    {
2983        // We first need to save the new page on disk
2984        PageIO[] pageIos = serializePage( btree, newRevision, newPage );
2985
2986        if ( LOG_PAGES.isDebugEnabled() )
2987        {
2988            LOG_PAGES.debug( "Write data for '{}' btree", btree.getName() );
2989
2990            logPageIos( pageIos );
2991        }
2992
2993        // Write the page on disk
2994        flushPages( pageIos );
2995
2996        // Build the resulting reference
2997        long offset = pageIos[0].getOffset();
2998        long lastOffset = pageIos[pageIos.length - 1].getOffset();
2999        PersistedPageHolder<K, V> pageHolder = new PersistedPageHolder<K, V>( btree, newPage, offset,
3000            lastOffset );
3001
3002        return pageHolder;
3003    }
3004
3005
3006    /* No qualifier */static void logPageIos( PageIO[] pageIos )
3007    {
3008        int pageNb = 0;
3009
3010        for ( PageIO pageIo : pageIos )
3011        {
3012            StringBuilder sb = new StringBuilder();
3013            sb.append( "PageIO[" ).append( pageNb ).append( "]:0x" );
3014            sb.append( Long.toHexString( pageIo.getOffset() ) ).append( "/" );
3015            sb.append( pageIo.getSize() );
3016            pageNb++;
3017
3018            ByteBuffer data = pageIo.getData();
3019
3020            int position = data.position();
3021            int dataLength = ( int ) pageIo.getSize() + 12;
3022
3023            if ( dataLength > data.limit() )
3024            {
3025                dataLength = data.limit();
3026            }
3027
3028            byte[] bytes = new byte[dataLength];
3029
3030            data.get( bytes );
3031            data.position( position );
3032            int pos = 0;
3033
3034            for ( byte b : bytes )
3035            {
3036                int mod = pos % 16;
3037
3038                switch ( mod )
3039                {
3040                    case 0:
3041                        sb.append( "\n    " );
3042                        // No break
3043                    case 4:
3044                    case 8:
3045                    case 12:
3046                        sb.append( " " );
3047                    case 1:
3048                    case 2:
3049                    case 3:
3050                    case 5:
3051                    case 6:
3052                    case 7:
3053                    case 9:
3054                    case 10:
3055                    case 11:
3056                    case 13:
3057                    case 14:
3058                    case 15:
3059                        sb.append( Strings.dumpByte( b ) ).append( " " );
3060                }
3061                pos++;
3062            }
3063
3064            LOG_PAGES.debug( sb.toString() );
3065        }
3066    }
3067
3068
3069    /**
3070     * Compute the number of pages needed to store some specific size of data.
3071     *
3072     * @param dataSize The size of the data we want to store in pages
3073     * @return The number of pages needed
3074     */
3075    private int computeNbPages( int dataSize )
3076    {
3077        if ( dataSize <= 0 )
3078        {
3079            return 0;
3080        }
3081
3082        // Compute the number of pages needed.
3083        // Considering that each page can contain PageSize bytes,
3084        // but that the first 8 bytes are used for links and we
3085        // use 4 bytes to store the data size, the number of needed
3086        // pages is :
3087        // NbPages = ( (dataSize - (PageSize - 8 - 4 )) / (PageSize - 8) ) + 1
3088        // NbPages += ( if (dataSize - (PageSize - 8 - 4 )) % (PageSize - 8) > 0 : 1 : 0 )
3089        int availableSize = ( pageSize - LONG_SIZE );
3090        int nbNeededPages = 1;
3091
3092        // Compute the number of pages that will be full but the first page
3093        if ( dataSize > availableSize - INT_SIZE )
3094        {
3095            int remainingSize = dataSize - ( availableSize - INT_SIZE );
3096            nbNeededPages += remainingSize / availableSize;
3097            int remain = remainingSize % availableSize;
3098
3099            if ( remain > 0 )
3100            {
3101                nbNeededPages++;
3102            }
3103        }
3104
3105        return nbNeededPages;
3106    }
3107
3108
3109    /**
3110     * Get as many pages as needed to store the data of the given size. The returned
3111     * PageIOs are all linked together.
3112     *
3113     * @param dataSize The data size
3114     * @return An array of pages, enough to store the full data
3115     */
3116    private PageIO[] getFreePageIOs( int dataSize ) throws IOException
3117    {
3118        if ( dataSize == 0 )
3119        {
3120            return new PageIO[]
3121                {};
3122        }
3123
3124        int nbNeededPages = computeNbPages( dataSize );
3125
3126        PageIO[] pageIOs = new PageIO[nbNeededPages];
3127
3128        // The first page : set the size
3129        pageIOs[0] = fetchNewPage();
3130        pageIOs[0].setSize( dataSize );
3131
3132        for ( int i = 1; i < nbNeededPages; i++ )
3133        {
3134            pageIOs[i] = fetchNewPage();
3135
3136            // Create the link
3137            pageIOs[i - 1].setNextPage( pageIOs[i].getOffset() );
3138        }
3139
3140        return pageIOs;
3141    }
3142
3143
3144    /**
3145     * Return a new Page. We take one of the existing free pages, or we create
3146     * a new page at the end of the file.
3147     *
3148     * @return The fetched PageIO
3149     */
3150    private PageIO fetchNewPage() throws IOException
3151    {
3152        //System.out.println( "Fetching new page" );
3153        if ( firstFreePage == NO_PAGE )
3154        {
3155            nbCreatedPages.incrementAndGet();
3156
3157            // We don't have any free page. Reclaim some new page at the end
3158            // of the file
3159            PageIO newPage = new PageIO( endOfFileOffset );
3160
3161            endOfFileOffset += pageSize;
3162
3163            ByteBuffer data = ByteBuffer.allocateDirect( pageSize );
3164
3165            newPage.setData( data );
3166            newPage.setNextPage( NO_PAGE );
3167            newPage.setSize( 0 );
3168
3169            LOG.debug( "Requiring a new page at offset {}", newPage.getOffset() );
3170
3171            return newPage;
3172        }
3173        else
3174        {
3175            nbReusedPages.incrementAndGet();
3176
3177            freePageLock.lock();
3178
3179            // We have some existing free page. Fetch it from disk
3180            PageIO pageIo = fetchPage( firstFreePage );
3181
3182            // Update the firstFreePage pointer
3183            firstFreePage = pageIo.getNextPage();
3184
3185            freePageLock.unlock();
3186
3187            // overwrite the data of old page
3188            ByteBuffer data = ByteBuffer.allocateDirect( pageSize );
3189            pageIo.setData( data );
3190
3191            pageIo.setNextPage( NO_PAGE );
3192            pageIo.setSize( 0 );
3193
3194            LOG.debug( "Reused page at offset {}", pageIo.getOffset() );
3195
3196            return pageIo;
3197        }
3198    }
3199
3200
3201    /**
3202     * fetch a page from disk, knowing its position in the file.
3203     *
3204     * @param offset The position in the file
3205     * @return The found page
3206     */
3207    /* no qualifier */PageIO fetchPage( long offset ) throws IOException, EndOfFileExceededException
3208    {
3209        checkOffset( offset );
3210
3211        if ( fileChannel.size() < offset + pageSize )
3212        {
3213            // Error : we are past the end of the file
3214            throw new EndOfFileExceededException( "We are fetching a page on " + offset +
3215                " when the file's size is " + fileChannel.size() );
3216        }
3217        else
3218        {
3219            // Read the page
3220            fileChannel.position( offset );
3221
3222            ByteBuffer data = ByteBuffer.allocate( pageSize );
3223            fileChannel.read( data );
3224            data.rewind();
3225
3226            PageIO readPage = new PageIO( offset );
3227            readPage.setData( data );
3228
3229            return readPage;
3230        }
3231    }
3232
3233
3234    /**
3235     * @return the pageSize
3236     */
3237    public int getPageSize()
3238    {
3239        return pageSize;
3240    }
3241
3242
3243    /**
3244     * Set the page size, ie the number of bytes a page can store.
3245     *
3246     * @param pageSize The number of bytes for a page
3247     */
3248    /* no qualifier */void setPageSize( int pageSize )
3249    {
3250        if ( this.pageSize >= 13 )
3251        {
3252            this.pageSize = pageSize;
3253        }
3254        else
3255        {
3256            this.pageSize = DEFAULT_PAGE_SIZE;
3257        }
3258    }
3259
3260
3261    /**
3262     * Close the RecordManager and flush everything on disk
3263     */
3264    public void close() throws IOException
3265    {
3266        beginTransaction();
3267
3268        // Close all the managed B-trees
3269        for ( BTree<Object, Object> tree : managedBtrees.values() )
3270        {
3271            tree.close();
3272        }
3273
3274        // Close the management B-trees
3275        copiedPageBtree.close();
3276        btreeOfBtrees.close();
3277
3278        managedBtrees.clear();
3279
3280        // Write the data
3281        fileChannel.force( true );
3282
3283        // And close the channel
3284        fileChannel.close();
3285
3286        commit();
3287    }
3288
3289    /** Hex chars */
3290    private static final byte[] HEX_CHAR = new byte[]
3291        { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F' };
3292
3293
3294    public static String dump( byte octet )
3295    {
3296        return new String( new byte[]
3297            { HEX_CHAR[( octet & 0x00F0 ) >> 4], HEX_CHAR[octet & 0x000F] } );
3298    }
3299
3300
3301    /**
3302     * Dump a pageIO
3303     */
3304    private void dump( PageIO pageIo )
3305    {
3306        ByteBuffer buffer = pageIo.getData();
3307        buffer.mark();
3308        byte[] longBuffer = new byte[LONG_SIZE];
3309        byte[] intBuffer = new byte[INT_SIZE];
3310
3311        // get the next page offset
3312        buffer.get( longBuffer );
3313        long nextOffset = LongSerializer.deserialize( longBuffer );
3314
3315        // Get the data size
3316        buffer.get( intBuffer );
3317        int size = IntSerializer.deserialize( intBuffer );
3318
3319        buffer.reset();
3320
3321        System.out.println( "PageIO[" + Long.toHexString( pageIo.getOffset() ) + "], size = " + size + ", NEXT PageIO:"
3322            + Long.toHexString( nextOffset ) );
3323        System.out.println( " 0  1  2  3  4  5  6  7  8  9  A  B  C  D  E  F " );
3324        System.out.println( "+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+" );
3325
3326        for ( int i = 0; i < buffer.limit(); i += 16 )
3327        {
3328            System.out.print( "|" );
3329
3330            for ( int j = 0; j < 16; j++ )
3331            {
3332                System.out.print( dump( buffer.get() ) );
3333
3334                if ( j == 15 )
3335                {
3336                    System.out.println( "|" );
3337                }
3338                else
3339                {
3340                    System.out.print( " " );
3341                }
3342            }
3343        }
3344
3345        System.out.println( "+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+" );
3346
3347        buffer.reset();
3348    }
3349
3350
3351    /**
3352     * Dump the RecordManager file
3353     * @throws IOException
3354     */
3355    public void dump()
3356    {
3357        System.out.println( "/---------------------------- Dump ----------------------------\\" );
3358
3359        try
3360        {
3361            RandomAccessFile randomFile = new RandomAccessFile( file, "r" );
3362            FileChannel fileChannel = randomFile.getChannel();
3363
3364            ByteBuffer recordManagerHeader = ByteBuffer.allocate( RECORD_MANAGER_HEADER_SIZE );
3365
3366            // load the RecordManager header
3367            fileChannel.read( recordManagerHeader );
3368
3369            recordManagerHeader.rewind();
3370
3371            // The page size
3372            long fileSize = fileChannel.size();
3373            int pageSize = recordManagerHeader.getInt();
3374            long nbPages = fileSize / pageSize;
3375
3376            // The number of managed B-trees
3377            int nbBtree = recordManagerHeader.getInt();
3378
3379            // The first free page
3380            long firstFreePage = recordManagerHeader.getLong();
3381
3382            // The current B-tree of B-trees
3383            long currentBtreeOfBtreesPage = recordManagerHeader.getLong();
3384
3385            // The previous B-tree of B-trees
3386            long previousBtreeOfBtreesPage = recordManagerHeader.getLong();
3387
3388            // The current CopiedPages B-tree
3389            long currentCopiedPagesBtreePage = recordManagerHeader.getLong();
3390
3391            // The previous CopiedPages B-tree
3392            long previousCopiedPagesBtreePage = recordManagerHeader.getLong();
3393
3394            System.out.println( "  RecordManager" );
3395            System.out.println( "  -------------" );
3396            System.out.println( "  Size = 0x" + Long.toHexString( fileSize ) );
3397            System.out.println( "  NbPages = " + nbPages );
3398            System.out.println( "    Header " );
3399            System.out.println( "      page size : " + pageSize );
3400            System.out.println( "      nbTree : " + nbBtree );
3401            System.out.println( "      firstFreePage : 0x" + Long.toHexString( firstFreePage ) );
3402            System.out.println( "      current BOB : 0x" + Long.toHexString( currentBtreeOfBtreesPage ) );
3403            System.out.println( "      previous BOB : 0x" + Long.toHexString( previousBtreeOfBtreesPage ) );
3404            System.out.println( "      current CopiedPages : 0x" + Long.toHexString( currentCopiedPagesBtreePage ) );
3405            System.out.println( "      previous CopiedPages : 0x" + Long.toHexString( previousCopiedPagesBtreePage ) );
3406
3407            // Dump the Free pages list
3408            dumpFreePages( firstFreePage );
3409
3410            // Dump the B-tree of B-trees
3411            dumpBtreeHeader( currentBtreeOfBtreesPage );
3412
3413            // Dump the previous B-tree of B-trees if any
3414            if ( previousBtreeOfBtreesPage != NO_PAGE )
3415            {
3416                dumpBtreeHeader( previousBtreeOfBtreesPage );
3417            }
3418
3419            // Dump the CopiedPages B-tree
3420            dumpBtreeHeader( currentCopiedPagesBtreePage );
3421
3422            // Dump the previous B-tree of B-trees if any
3423            if ( previousCopiedPagesBtreePage != NO_PAGE )
3424            {
3425                dumpBtreeHeader( previousCopiedPagesBtreePage );
3426            }
3427
3428            // Dump all the user's B-tree
3429            randomFile.close();
3430            System.out.println( "\\---------------------------- Dump ----------------------------/" );
3431        }
3432        catch ( IOException ioe )
3433        {
3434            System.out.println( "Exception while dumping the file : " + ioe.getMessage() );
3435        }
3436    }
3437
3438
3439    /**
3440     * Dump the free pages
3441     */
3442    private void dumpFreePages( long freePageOffset ) throws EndOfFileExceededException, IOException
3443    {
3444        System.out.println( "\n  FreePages : " );
3445        int pageNb = 1;
3446
3447        while ( freePageOffset != NO_PAGE )
3448        {
3449            PageIO pageIo = fetchPage( freePageOffset );
3450
3451            System.out.println( "    freePage[" + pageNb + "] : 0x" + Long.toHexString( pageIo.getOffset() ) );
3452
3453            freePageOffset = pageIo.getNextPage();
3454            pageNb++;
3455        }
3456    }
3457
3458
3459    /**
3460     * Dump a B-tree Header
3461     */
3462    private long dumpBtreeHeader( long btreeOffset ) throws EndOfFileExceededException, IOException
3463    {
3464        // First read the B-tree header
3465        PageIO[] pageIos = readPageIOs( btreeOffset, Long.MAX_VALUE );
3466
3467        long dataPos = 0L;
3468
3469        // The B-tree current revision
3470        long revision = readLong( pageIos, dataPos );
3471        dataPos += LONG_SIZE;
3472
3473        // The nb elems in the tree
3474        long nbElems = readLong( pageIos, dataPos );
3475        dataPos += LONG_SIZE;
3476
3477        // The B-tree rootPage offset
3478        long rootPageOffset = readLong( pageIos, dataPos );
3479        dataPos += LONG_SIZE;
3480
3481        // The B-tree page size
3482        int btreePageSize = readInt( pageIos, dataPos );
3483        dataPos += INT_SIZE;
3484
3485        // The tree name
3486        ByteBuffer btreeNameBytes = readBytes( pageIos, dataPos );
3487        dataPos += INT_SIZE + btreeNameBytes.limit();
3488        String btreeName = Strings.utf8ToString( btreeNameBytes );
3489
3490        // The keySerializer FQCN
3491        ByteBuffer keySerializerBytes = readBytes( pageIos, dataPos );
3492        dataPos += INT_SIZE + keySerializerBytes.limit();
3493
3494        String keySerializerFqcn = "";
3495
3496        if ( keySerializerBytes != null )
3497        {
3498            keySerializerFqcn = Strings.utf8ToString( keySerializerBytes );
3499        }
3500
3501        // The valueSerialier FQCN
3502        ByteBuffer valueSerializerBytes = readBytes( pageIos, dataPos );
3503
3504        String valueSerializerFqcn = "";
3505        dataPos += INT_SIZE + valueSerializerBytes.limit();
3506
3507        if ( valueSerializerBytes != null )
3508        {
3509            valueSerializerFqcn = Strings.utf8ToString( valueSerializerBytes );
3510        }
3511
3512        // The B-tree allowDuplicates flag
3513        int allowDuplicates = readInt( pageIos, dataPos );
3514        boolean dupsAllowed = allowDuplicates != 0;
3515
3516        dataPos += INT_SIZE;
3517
3518        //        System.out.println( "\n  B-Tree " + btreeName );
3519        //        System.out.println( "  ------------------------- " );
3520
3521        //        System.out.println( "    nbPageIOs[" + pageIos.length + "] = " + pageIoList );
3522        if ( LOG.isDebugEnabled() )
3523        {
3524            StringBuilder sb = new StringBuilder();
3525            boolean isFirst = true;
3526
3527            for ( PageIO pageIo : pageIos )
3528            {
3529                if ( isFirst )
3530                {
3531                    isFirst = false;
3532                }
3533                else
3534                {
3535                    sb.append( ", " );
3536                }
3537
3538                sb.append( "0x" ).append( Long.toHexString( pageIo.getOffset() ) );
3539            }
3540
3541            String pageIoList = sb.toString();
3542
3543            LOG.debug( "    PageIOs[{}] = {}", pageIos.length, pageIoList );
3544
3545            //        System.out.println( "    dataSize = "+ pageIos[0].getSize() );
3546            LOG.debug( "    dataSize = {}", pageIos[0].getSize() );
3547
3548            LOG.debug( "    B-tree '{}'", btreeName );
3549            LOG.debug( "    revision : {}", revision );
3550            LOG.debug( "    nbElems : {}", nbElems );
3551            LOG.debug( "    rootPageOffset : 0x{}", Long.toHexString( rootPageOffset ) );
3552            LOG.debug( "    B-tree page size : {}", btreePageSize );
3553            LOG.debug( "    keySerializer : '{}'", keySerializerFqcn );
3554            LOG.debug( "    valueSerializer : '{}'", valueSerializerFqcn );
3555            LOG.debug( "    dups allowed : {}", dupsAllowed );
3556            //
3557            //        System.out.println( "    B-tree '" + btreeName + "'" );
3558            //        System.out.println( "    revision : " + revision );
3559            //        System.out.println( "    nbElems : " + nbElems );
3560            //        System.out.println( "    rootPageOffset : 0x" + Long.toHexString( rootPageOffset ) );
3561            //        System.out.println( "    B-tree page size : " + btreePageSize );
3562            //        System.out.println( "    keySerializer : " + keySerializerFqcn );
3563            //        System.out.println( "    valueSerializer : " + valueSerializerFqcn );
3564            //        System.out.println( "    dups allowed : " + dupsAllowed );
3565        }
3566
3567        return rootPageOffset;
3568    }
3569
3570
3571    /**
3572     * Get the number of managed trees. We don't count the CopiedPage B-tree and the B-tree of B-trees
3573     *
3574     * @return The number of managed B-trees
3575     */
3576    public int getNbManagedTrees()
3577    {
3578        return nbBtree;
3579    }
3580
3581
3582    /**
3583     * Get the managed B-trees. We don't return the CopiedPage B-tree nor the B-tree of B-trees.
3584     *
3585     * @return The managed B-trees
3586     */
3587    public Set<String> getManagedTrees()
3588    {
3589        Set<String> btrees = new HashSet<String>( managedBtrees.keySet() );
3590
3591        return btrees;
3592    }
3593
3594
3595    /**
3596     * Stores the copied pages into the CopiedPages B-tree
3597     *
3598     * @param name The B-tree name
3599     * @param revision The revision
3600     * @param copiedPages The pages that have been copied while creating this revision
3601     * @throws IOException If we weren't able to store the data on disk
3602     */
3603    /* No Qualifier */void storeCopiedPages( String name, long revision, long[] copiedPages ) throws IOException
3604    {
3605        RevisionName revisionName = new RevisionName( revision, name );
3606
3607        copiedPageBtree.insert( revisionName, copiedPages );
3608    }
3609
3610
3611    /**
3612     * Store a reference to an old rootPage into the Revision B-tree
3613     *
3614     * @param btree The B-tree we want to keep an old RootPage for
3615     * @param rootPage The old rootPage
3616     * @throws IOException If we have an issue while writing on disk
3617     */
3618    /* No qualifier */<K, V> void storeRootPage( BTree<K, V> btree, Page<K, V> rootPage ) throws IOException
3619    {
3620        if ( !isKeepRevisions() )
3621        {
3622            return;
3623        }
3624
3625        if ( btree == copiedPageBtree )
3626        {
3627            return;
3628        }
3629
3630        NameRevision nameRevision = new NameRevision( btree.getName(), rootPage.getRevision() );
3631
3632        ( ( AbstractBTree<NameRevision, Long> ) btreeOfBtrees ).insert( nameRevision,
3633            ( ( AbstractPage<K, V> ) rootPage ).getOffset(), 0 );
3634
3635        if ( LOG_CHECK.isDebugEnabled() )
3636        {
3637            MavibotInspector.check( this );
3638        }
3639    }
3640
3641
3642    /**
3643     * Fetch the rootPage of a given B-tree for a given revision.
3644     *
3645     * @param btree The B-tree we are interested in
3646     * @param revision The revision we want to get back
3647     * @return The rootPage for this B-tree and this revision, if any
3648     * @throws KeyNotFoundException If we can't find the rootPage for this revision and this B-tree
3649     * @throws IOException If we had an ise while accessing the data on disk
3650     */
3651    /* No qualifier */<K, V> Page<K, V> getRootPage( BTree<K, V> btree, long revision ) throws KeyNotFoundException,
3652        IOException
3653    {
3654        if ( btree.getRevision() == revision )
3655        {
3656            // We are asking for the current revision
3657            return btree.getRootPage();
3658        }
3659
3660        // Get the B-tree header offset
3661        NameRevision nameRevision = new NameRevision( btree.getName(), revision );
3662        long btreeHeaderOffset = btreeOfBtrees.get( nameRevision );
3663
3664        // get the B-tree rootPage
3665        Page<K, V> btreeRoot = readRootPage( btree, btreeHeaderOffset );
3666
3667        return btreeRoot;
3668    }
3669
3670
3671    /**
3672     * Read a root page from the B-tree header offset
3673     */
3674    private <K, V> Page<K, V> readRootPage( BTree<K, V> btree, long btreeHeaderOffset )
3675        throws EndOfFileExceededException, IOException
3676    {
3677        // Read the B-tree header pages on disk
3678        PageIO[] btreeHeaderPageIos = readPageIOs( btreeHeaderOffset, Long.MAX_VALUE );
3679        long dataPos = LONG_SIZE + LONG_SIZE;
3680
3681        // The B-tree rootPage offset
3682        long rootPageOffset = readLong( btreeHeaderPageIos, dataPos );
3683
3684        // Read the rootPage pages on disk
3685        PageIO[] rootPageIos = readPageIOs( rootPageOffset, Long.MAX_VALUE );
3686
3687        // Now, convert it to a Page
3688        Page<K, V> btreeRoot = readPage( btree, rootPageIos );
3689
3690        return btreeRoot;
3691    }
3692
3693
3694    /**
3695     * Get one managed trees, knowing its name.
3696     *
3697     * @param name The B-tree name we are looking for
3698     * @return The managed B-trees
3699     */
3700    public <K, V> BTree<K, V> getManagedTree( String name )
3701    {
3702        return ( BTree<K, V> ) managedBtrees.get( name );
3703    }
3704
3705
3706    /**
3707     * Move a list of pages to the free page list. A logical page is associated with one
3708     * or more physical PageIOs, which are on the disk. We have to move all those PagIO instances
3709     * to the free list, and do the same in memory (we try to keep a reference to a set of
3710     * free pages.
3711     *
3712     * @param btree The B-tree which were owning the pages
3713     * @param revision The current revision
3714     * @param pages The pages to free
3715     * @throws IOException If we had a problem while updating the file
3716     * @throws EndOfFileExceededException If we tried to write after the end of the file
3717     */
3718    /* Package protected */<K, V> void freePages( BTree<K, V> btree, long revision, List<Page<K, V>> pages )
3719        throws EndOfFileExceededException, IOException
3720    {
3721        if ( ( pages == null ) || pages.isEmpty() )
3722        {
3723            return;
3724        }
3725
3726        if ( !keepRevisions )
3727        {
3728            // if the B-tree doesn't keep revisions, we can safely move
3729            // the pages to the freed page list.
3730            if ( LOG.isDebugEnabled() )
3731            {
3732                LOG.debug( "Freeing the following pages :" );
3733
3734                for ( Page<K, V> page : pages )
3735                {
3736                    LOG.debug( "    {}", page );
3737                }
3738            }
3739
3740            for ( Page<K, V> page : pages )
3741            {
3742                long pageOffset = ( ( AbstractPage<K, V> ) page ).getOffset();
3743
3744                PageIO[] pageIos = readPageIOs( pageOffset, Long.MAX_VALUE );
3745
3746                for ( PageIO pageIo : pageIos )
3747                {
3748                    freedPages.add( pageIo );
3749                }
3750            }
3751        }
3752        else
3753        {
3754            // We are keeping revisions of standard B-trees, so we move the pages to the CopiedPages B-tree
3755            // but only for non managed B-trees
3756            if ( LOG.isDebugEnabled() )
3757            {
3758                LOG.debug( "Moving the following pages to the CopiedBtree :" );
3759
3760                for ( Page<K, V> page : pages )
3761                {
3762                    LOG.debug( "    {}", page );
3763                }
3764            }
3765
3766            long[] pageOffsets = new long[pages.size()];
3767            int pos = 0;
3768
3769            for ( Page<K, V> page : pages )
3770            {
3771                pageOffsets[pos++] = ( ( AbstractPage<K, V> ) page ).offset;
3772            }
3773
3774            if ( ( btree.getType() != BTreeTypeEnum.BTREE_OF_BTREES )
3775                && ( btree.getType() != BTreeTypeEnum.COPIED_PAGES_BTREE ) )
3776            {
3777                // Deal with standard B-trees
3778                RevisionName revisionName = new RevisionName( revision, btree.getName() );
3779
3780                copiedPageBtree.insert( revisionName, pageOffsets );
3781
3782                // Update the RecordManager Copiedpage Offset
3783                currentCopiedPagesBtreeOffset = ( ( PersistedBTree<RevisionName, long[]> ) copiedPageBtree ).getBtreeOffset();
3784            }
3785            else
3786            {
3787                // Managed B-trees : we simply free the copied pages
3788                for ( long pageOffset : pageOffsets )
3789                {
3790                    PageIO[] pageIos = readPageIOs( pageOffset, Long.MAX_VALUE );
3791
3792                    for ( PageIO pageIo : pageIos )
3793                    {
3794                        freedPages.add( pageIo );
3795                    }
3796                }
3797            }
3798        }
3799    }
3800
3801
3802    /**
3803     * Add a PageIO to the list of free PageIOs
3804     *
3805     * @param pageIo The page to free
3806     * @throws IOException If we weren't capable of updating the file
3807     */
3808    /* no qualifier */ void free( PageIO pageIo ) throws IOException
3809    {
3810        freePageLock.lock();
3811
3812        // We add the Page's PageIOs before the
3813        // existing free pages.
3814        // Link it to the first free page
3815        pageIo.setNextPage( firstFreePage );
3816
3817        LOG.debug( "Flushing the first free page" );
3818
3819        // And flush it to disk
3820        //FIXME can be flushed last after releasing the lock
3821        flushPages( pageIo );
3822
3823        // We can update the firstFreePage offset
3824        firstFreePage = pageIo.getOffset();
3825
3826        freePageLock.unlock();
3827    }
3828
3829
3830    /**
3831     * Add an array of PageIOs to the list of free PageIOs
3832     *
3833     * @param offsets The offsets of the pages whose associated PageIOs will be fetched and freed.
3834     * @throws IOException If we weren't capable of updating the file
3835     */
3836    /*no qualifier*/ void free( long... offsets ) throws IOException
3837    {
3838        freePageLock.lock();
3839
3840        List<PageIO> pageIos = new ArrayList<PageIO>();
3841        int pageIndex = 0;
3842        for ( int i = 0; i < offsets.length; i++ )
3843        {
3844            PageIO[] ios = readPageIOs( offsets[i], Long.MAX_VALUE );
3845            for ( PageIO io : ios )
3846            {
3847                pageIos.add( io );
3848
3849                if ( pageIndex > 0 )
3850                {
3851                    pageIos.get( pageIndex - 1 ).setNextPage( io.getOffset() );
3852                }
3853
3854                pageIndex++;
3855            }
3856        }
3857
3858
3859        // We add the Page's PageIOs before the
3860        // existing free pages.
3861        // Link it to the first free page
3862        pageIos.get( pageIndex - 1 ).setNextPage( firstFreePage );
3863
3864        LOG.debug( "Flushing the first free page" );
3865
3866        // And flush it to disk
3867        //FIXME can be flushed last after releasing the lock
3868        flushPages( pageIos.toArray( new PageIO[0] ) );
3869
3870        // We can update the firstFreePage offset
3871        firstFreePage = pageIos.get( 0 ).getOffset();
3872
3873        freePageLock.unlock();
3874    }
3875
3876
3877    /**
3878     * @return the keepRevisions flag
3879     */
3880    public boolean isKeepRevisions()
3881    {
3882        return keepRevisions;
3883    }
3884
3885
3886    /**
3887     * @param keepRevisions the keepRevisions flag to set
3888     */
3889    public void setKeepRevisions( boolean keepRevisions )
3890    {
3891        this.keepRevisions = keepRevisions;
3892    }
3893
3894
3895    /**
3896     * Creates a B-tree and automatically adds it to the list of managed btrees
3897     *
3898     * @param name the name of the B-tree
3899     * @param keySerializer key serializer
3900     * @param valueSerializer value serializer
3901     * @param allowDuplicates flag for allowing duplicate keys
3902     * @return a managed B-tree
3903     * @throws IOException If we weren't able to update the file on disk
3904     * @throws BTreeAlreadyManagedException If the B-tree is already managed
3905     */
3906    @SuppressWarnings("all")
3907    public <K, V> BTree<K, V> addBTree( String name, ElementSerializer<K> keySerializer,
3908        ElementSerializer<V> valueSerializer, boolean allowDuplicates )
3909        throws IOException, BTreeAlreadyManagedException
3910    {
3911        PersistedBTreeConfiguration config = new PersistedBTreeConfiguration();
3912
3913        config.setName( name );
3914        config.setKeySerializer( keySerializer );
3915        config.setValueSerializer( valueSerializer );
3916        config.setAllowDuplicates( allowDuplicates );
3917
3918        BTree btree = new PersistedBTree( config );
3919        manage( btree );
3920
3921        if ( LOG_CHECK.isDebugEnabled() )
3922        {
3923            MavibotInspector.check( this );
3924        }
3925
3926        return btree;
3927    }
3928
3929
3930    /**
3931     * Add a newly closd transaction into the closed transaction queue
3932     */
3933    /* no qualifier */<K, V> void releaseTransaction( ReadTransaction<K, V> readTransaction )
3934    {
3935        RevisionName revisionName = new RevisionName(
3936            readTransaction.getRevision(),
3937            readTransaction.getBtreeHeader().getBtree().getName() );
3938        //closedTransactionsQueue.add( revisionName );
3939    }
3940
3941
3942    /**
3943     * Get the current BTreeHeader for a given Btree. It might not exist
3944     */
3945    public BTreeHeader getBTreeHeader( String name )
3946    {
3947        // Get a lock
3948        btreeHeadersLock.readLock().lock();
3949
3950        // get the current BTree Header for this BTree and revision
3951        BTreeHeader<?, ?> btreeHeader = currentBTreeHeaders.get( name );
3952
3953        // And unlock 
3954        btreeHeadersLock.readLock().unlock();
3955
3956        return btreeHeader;
3957    }
3958
3959
3960    /**
3961     * Get the new BTreeHeader for a given Btree. It might not exist
3962     */
3963    public BTreeHeader getNewBTreeHeader( String name )
3964    {
3965        // get the current BTree Header for this BTree and revision
3966        BTreeHeader<?, ?> btreeHeader = newBTreeHeaders.get( name );
3967
3968        return btreeHeader;
3969    }
3970
3971
3972    /**
3973     * {@inheritDoc}
3974     */
3975    public void updateNewBTreeHeaders( BTreeHeader btreeHeader )
3976    {
3977        newBTreeHeaders.put( btreeHeader.getBtree().getName(), btreeHeader );
3978    }
3979
3980
3981    /**
3982     * Swap the current BtreeHeader map with the new one. This method will only
3983     * be called in a single trhead, when the current transaction will be committed.
3984     */
3985    private void swapCurrentBtreeHeaders()
3986    {
3987        // Copy the reference to the current BtreeHeader Map
3988        Map<String, BTreeHeader<?, ?>> tmp = currentBTreeHeaders;
3989
3990        // Get a write lock
3991        btreeHeadersLock.writeLock().lock();
3992
3993        // Swap the new BTreeHeader Map
3994        currentBTreeHeaders = newBTreeHeaders;
3995
3996        // And unlock 
3997        btreeHeadersLock.writeLock().unlock();
3998
3999        // Last, not least, clear the Map and reinject the latest revision in it
4000        tmp.clear();
4001        tmp.putAll( currentBTreeHeaders );
4002
4003        // And update the new BTreeHeader map
4004        newBTreeHeaders = tmp;
4005    }
4006
4007
4008    /**
4009     * revert the new BTreeHeaders Map to the current BTreeHeader Map. This method
4010     * is called when we have to rollback a transaction.
4011     */
4012    private void revertBtreeHeaders()
4013    {
4014        // Clean up teh new BTreeHeaders Map
4015        newBTreeHeaders.clear();
4016
4017        // Reinject the latest revision in it
4018        newBTreeHeaders.putAll( currentBTreeHeaders );
4019    }
4020
4021
4022    /**
4023     * Loads a B-tree holding the values of a duplicate key
4024     * This tree is also called as dups tree or sub tree
4025     *
4026     * @param offset the offset of the B-tree header
4027     * @return the deserialized B-tree
4028     */
4029    /* No qualifier */<K, V> BTree<V, V> loadDupsBtree( long btreeHeaderOffset, BTree<K, V> parentBtree )
4030    {
4031        PageIO[] pageIos = null;
4032        try
4033        {
4034            pageIos = readPageIOs( btreeHeaderOffset, Long.MAX_VALUE );
4035
4036            BTree<V, V> subBtree = BTreeFactory.<V, V> createPersistedBTree( BTreeTypeEnum.PERSISTED_SUB );
4037            loadBtree( pageIos, subBtree, parentBtree );
4038
4039            return subBtree;
4040        }
4041        catch ( Exception e )
4042        {
4043            // should not happen
4044            throw new BTreeCreationException( e );
4045        }
4046    }
4047
4048
4049    private void checkFreePages() throws EndOfFileExceededException, IOException
4050    {
4051        //System.out.println( "Checking the free pages, starting from " + Long.toHexString( firstFreePage ) );
4052
4053        // read all the free pages, add them into a set, to be sure we don't have a cycle
4054        Set<Long> freePageOffsets = new HashSet<Long>();
4055
4056        long currentFreePageOffset = firstFreePage;
4057
4058        while ( currentFreePageOffset != NO_PAGE )
4059        {
4060            //System.out.println( "Next page offset :" + Long.toHexString( currentFreePageOffset ) );
4061
4062            if ( ( currentFreePageOffset % pageSize ) != 0 )
4063            {
4064                throw new InvalidOffsetException( "Wrong offset : " + Long.toHexString( currentFreePageOffset ) );
4065            }
4066
4067            if ( freePageOffsets.contains( currentFreePageOffset ) )
4068            {
4069                throw new InvalidOffsetException( "Offset : " + Long.toHexString( currentFreePageOffset )
4070                    + " already read, there is a cycle" );
4071            }
4072
4073            freePageOffsets.add( currentFreePageOffset );
4074            PageIO pageIO = fetchPage( currentFreePageOffset );
4075
4076            currentFreePageOffset = pageIO.getNextPage();
4077        }
4078
4079        return;
4080    }
4081
4082
4083    /**
4084     * sets the threshold of the number of commits to be performed before
4085     * reclaiming the free pages.
4086     * 
4087     * @param pageReclaimerThreshold the number of commits before the reclaimer runs
4088     */
4089    /* no qualifier */ void setPageReclaimerThreshold( int pageReclaimerThreshold )
4090    {
4091        this.pageReclaimerThreshold = pageReclaimerThreshold;
4092    }
4093
4094    
4095    /* no qualifier */ void _disableReclaimer( boolean toggle )
4096    {
4097        this.disableReclaimer = toggle;
4098    }
4099
4100    
4101    /**
4102     * @see Object#toString()
4103     */
4104    public String toString()
4105    {
4106        StringBuilder sb = new StringBuilder();
4107
4108        sb.append( "RM free pages : [" );
4109
4110        if ( firstFreePage != NO_PAGE )
4111        {
4112            long current = firstFreePage;
4113            boolean isFirst = true;
4114
4115            while ( current != NO_PAGE )
4116            {
4117                if ( isFirst )
4118                {
4119                    isFirst = false;
4120                }
4121                else
4122                {
4123                    sb.append( ", " );
4124                }
4125
4126                PageIO pageIo;
4127
4128                try
4129                {
4130                    pageIo = fetchPage( current );
4131                    sb.append( pageIo.getOffset() );
4132                    current = pageIo.getNextPage();
4133                }
4134                catch ( EndOfFileExceededException e )
4135                {
4136                    e.printStackTrace();
4137                }
4138                catch ( IOException e )
4139                {
4140                    e.printStackTrace();
4141                }
4142
4143            }
4144        }
4145
4146        sb.append( "]" );
4147
4148        return sb.toString();
4149    }
4150}